从hive-1.1.0-cdh5.11.0起Lzo压缩不需要再建立.index索引文件了

  |   0 评论   |   926 浏览

背景

这些天在做平台迁移,对Hive进行了一个小版本升级,从1.1.0-cdh5.9.0升级至1.1.0-cdh5.14.0,结果迁移完的两边数据怎么都对不上,查来查去,发现是lzo压缩导致的问题。

为什么要有lzo索引文件?

在Hadoop诞生的早期,并没有太多的文件格式供选择,Parquet和ORC这些嵌套格式还不成熟,没有大规模应用开来,因此,大量使用的,还是基于文本格式+压缩的方式。

lzo,是一种压缩格式,特点是解压速度特别快,且解压时不需要内存支持,非常适合于Hadoop这种读多写少的场景。

我们知道,为了并行地执行任务,Mapper的任务数是由文件的大小以及文件块的大小共同决定的,但是,如何对压缩之后的文件进行切分呢?这就引出了lzo的索引文件。

我们一般都会这样来给lzo的文件添加索引

hadoop jar ${HADOOP_HOME}/lib/hadoop-lzo.jar com.hadoop.compression.lzo.DistributedLzoIndexer $DATA_DIR

之后在$DATA_DIR目录下,所有lzo格式的文件都多了一个后缀为.index的文件

Hive处理可分割压缩文件的方式

在hive-1.1.0-cdh5.11.0之前,由于Hadoop的org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat类不能正确的处理non-splittable文件,因此Hive提供了一个临时性的解决方案,通过控制hive.hadoop.supports.splittable.combineinputformat的值来启用如下的逻辑

org.apache.hadoop.hive.ql.io.CombineHiveInputFormat#getCombineSplits

// we use a configuration variable for the same
if (this.mrwork != null && !this.mrwork.getHadoopSupportsSplittable()) {
  // The following code should be removed, once
  // https://issues.apache.org/jira/browse/MAPREDUCE-1597 is fixed.
  // Hadoop does not handle non-splittable files correctly for CombineFileInputFormat,
  // so don't use CombineFileInputFormat for non-splittable files
  //ie, dont't combine if inputformat is a TextInputFormat and has compression turned on
  if (inputFormat instanceof TextInputFormat) {
    Queue<Path> dirs = new LinkedList<Path>();
    FileStatus fStats = inpFs.getFileStatus(path);
    // If path is a directory
    if (fStats.isDir()) {
      dirs.offer(path);
    } else if ((new CompressionCodecFactory(job)).getCodec(path) != null) {
      //if compresssion codec is set, use HiveInputFormat.getSplits (don't combine)
      splits = super.getSplits(job, numSplits);
      return splits;
    }
    while (dirs.peek() != null) {
      Path tstPath = dirs.remove();
      FileStatus[] fStatus = inpFs.listStatus(tstPath, FileUtils.HIDDEN_FILES_PATH_FILTER);
      for (int idx = 0; idx < fStatus.length; idx++) {
        if (fStatus[idx].isDir()) {
          dirs.offer(fStatus[idx].getPath());
        } else if ((new CompressionCodecFactory(job)).getCodec(
            fStatus[idx].getPath()) != null) {
          //if compresssion codec is set, use HiveInputFormat.getSplits (don't combine)
          splits = super.getSplits(job, numSplits);
          return splits;
        }
      }
    }
  }
}

对于lzo文件,将hive.hadoop.supports.splittable.combineinputformat设置为true,则调用父类org.apache.hadoop.hive.ql.io.HiveInputFormat#getSplits的分片方法,最终调用com.hadoop.mapred.DeprecatedLzoTextInputFormat的分片方法。

但是,有一个问题,我们给lzo文件添加了索引.index文件,这也是一个文件,如果加到Mapper的输入中,不就出错了?

在com.hadoop.mapred.DeprecatedLzoTextInputFormat#listStatus方法当中,通过加入下列逻辑,跳过.index文件

// Get rid of non-LZO files, unless the conf explicitly tells us to
// keep them.
// However, always skip over files that end with ".lzo.index", since
// they are not part of the input.
if (ignoreNonLzo || LzoInputFormatCommon.isLzoIndexFile(file.toString())) {
  it.remove();
}

问题回顾

原因分析

从hive-1.1.0-cdh5.11.0起,由于MAPREDUCE-1597已经修复了处理压缩文件的bug,因此HIVE-11376删除了CombineHiveInputFormat中那段判断参数hive.hadoop.supports.splittable.combineinputformat的代码,这就导致不会调用父类HiveInputFormat的分片方法,而是将.index文件也作为正常的数据添加到了输出当中,因此,最终的计算结果就会多出来一些记录。

解决办法

1.对于lzo压缩文件,使用HiveInputFormat替换CombineHiveInputFormat

方法是:在HiveQL的前面设置如下属性:

SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;

2.干脆删除.index文件