基于Spark的大文件差异比较工具(diff)

  |   0 评论   |   965 浏览

背景

最近在做平台迁移,为了保证迁移后的数据和流程一致,需要对比两边的数据差异,而数据量也比较大,列式存储Parquet格式+压缩,数据文件大小有100 GB+,记录数有6.5亿+,单机的diff工具肯定hold不住了,于是有了下面的工具。

代码

Spark版本:1.6.0-cdh5.14.0

Scala版本:2.11.12

package com.cxy7.dw
import org.apache.commons.cli._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}
import org.slf4j.LoggerFactory
object DataCompare {
  val logger = LoggerFactory.getLogger(DataCompare.getClass)
  def main(args: Array[String]): Unit = {
    val options = new Options
    options.addOption("l", "left", true, "源数据目录")
    options.addOption("r", "right", true, "目的数据目录")
    options.addOption("t", "fileType", true, "文件类型,可选:text、parquet、orc、json")
    options.addOption("o", "output", true, "输出结果目录")
    val parser = new BasicParser
    val cmd = parser.parse(options, args)
    val hf = new HelpFormatter
    if (!cmd.hasOption('l') || !cmd.hasOption('r')) {
      hf.printHelp("必须指定两个目录", options)
      return
    }
    if (!cmd.hasOption('o')) {
      hf.printHelp("必须指定输出目录", options)
      return
    }
    val leftPath = cmd.getOptionValue("l")
    val rightPath = cmd.getOptionValue('r')
    val outputPath = cmd.getOptionValue('o')
    var fileType = "text"
    if (cmd.hasOption('t')) {
      fileType = cmd.getOptionValue('t')
    }
    val conf = new SparkConf().setAppName("DataCompare")
    if (!leftPath.startsWith("hdfs://")) {
      conf.setMaster("local")
      conf.set("spark.local.dir", "D:/tmp")
      conf.set("spark.driver.host", "0.0.0.0")
    }
    val sc = new SparkContext(conf);
    val sqlContext = new HiveContext(sc)
    var tmpLeftDF = getSourceDF(sqlContext, leftPath, fileType)
    var tmpRightDF = getSourceDF(sqlContext, rightPath, fileType)
    // 将所有字段拼接为一个字符串
    val leftDF = tmpLeftDF.map(row => (row.toString(), row.toString()))
    val rightDF = tmpRightDF.map(row => (row.toString(), row.toString()))
    val joinDF = leftDF.fullOuterJoin(rightDF)
    // 左表有且右表无,输出左表内容
    joinDF.filter(t => t._2._1.nonEmpty && t._2._2.isEmpty).map(t => t._2._1.get).saveAsTextFile(outputPath + "/left")
    // 右表有且左表无,输出右表内容
    joinDF.filter(t => t._2._1.isEmpty && t._2._2.nonEmpty).map(t => t._2._2.get).saveAsTextFile(outputPath + "/right")
  }
  def getSourceDF(sqlContext: SQLContext, path: String, fileType: String) = fileType match {
    case "parquet" => sqlContext.read.parquet(path)
    case "orc" => sqlContext.read.orc(path)
    case "json" => sqlContext.read.json(path)
    case _ => sqlContext.read.text(path)
  }
}

打包运行

用法:

usage: 
 -l,--left <arg>       源数据目录
 -o,--output <arg>     输出结果目录
 -r,--right <arg>      目的数据目录
 -t,--fileType <arg>   文件类型,可选:text、parquet、orc、json

示例:

spark-submit --master yarn --deploy-mode client \
--num-executors 40 \
--conf spark.driver.maxResultSize=9G \
--conf spark.yarn.executor.memoryOverhead=10096 \
--conf spark.shuffle.consolidateFiles=true \
--conf spark.driver.extraJavaOptions="-Dspark.hadoop.dfs.replication=2" \
--driver-memory 10G \
--executor-memory 8G \
--executor-cores 3 \
--class com.cxy7.dw.DataCompare dw_compare.jar \
-l hdfs://ns1/wh/dw/biz/order/20180911 \
-r hdfs://ns1/wh/dw/biz/order/20180911_idc \
-t parquet \
-o hdfs://ns1/wh/dw/biz/order/20180911_diff