大数据血缘分析系统设计(三)

  |   0 评论   |   11,293 浏览

在前面一篇《大数据血缘分析系统设计(二)》中,对大数据血缘分析系统做了整体的介绍,任务级别的血缘关系计划放在分布式调度系统的设计当中介绍,因此本系列后面主要针对数据级别和字段级别进行介绍

数据级别血缘关系介绍

参考《数据级别

血缘关系数据的收集

数据ID的标识

要想血缘关系图中方便的定位到数据,首要解决的问题,就是数据ID的唯一标识。最容易想到的,就是利用服务器IP-数据库-数据表这种方式,但这种方式的不足之处在于,一是标识符长,不容易传播,二是无法统一不同数据源,如数据可能是Kafka、FTP、本地文件等方式存储的,相应的标识方式也互不相同。因此,容易想到,使用元数据统一分配的ID,是比较合适的。

参考《WhereHows》中的设计,不同数据源的数据,经过ETL进入元数据系统时,由元数据系统唯一分配ID

如下图,为WhereHows的dict_dataset数据表内容样例

可见,同一数据,既有id标识,也有URN的标识

image.png

数据流转的收集

解决了数据ID标识的问题,另一个难点在于数据流转关系的收集,针对不同的数据处理方式,收集方式也不一样

SQL

在《利用LineageInfo分析HiveQL中的表级别血缘关系》一文中,我提到,利用org.apache.hadoop.hive.ql.tools.LineageInfo类,可以用来分析HiveQL中的表级别血缘关系,然而,如何获取到运行的HiveQL的语句呢?

Hive提供了Hook机制,在Hive编译、执行的各个阶段,可以调用参数配置的各种Hook

我们利用hive.exec.post.hooks这个钩子,在每条语句执行结束后自动调用该钩子

image.png


配置方法,在hive-site.xml中配置以下参数

<property>

      <name>hive.exec.post.hooks</name>

      <value>org.apache.hadoop.hive.ql.hooks.LineageLogger</value>

</property>

在org.apache.hadoop.hive.ql.hooks.LineageLogger的run方法中加入以下代码

image.png

在HiveQL执行完成后,在Driver端的日志当中,就会打印出如下信息

image.png

MapReduce

对与Mapreduce程序,由于输入输出均是由各种InputFormat/OutFormat执行,因此可以在Job提交时获取

在org.apache.hadoop.mapreduce.JobSubmitter#submitJobInternal方法中添加一些逻辑,如在submitJob之后

image.png

org.apache.hadoop.mapreduce.HadoopLineageLogger的代码如下

package org.apache.hadoop.mapreduce;
import com.google.gson.stream.JsonWriter;
import org.apache.commons.io.output.StringBuilderWriter;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class HadoopLineageLogger {
    protected static final Log LOG = LogFactory.getLog(HadoopLineageLogger.class);
    public static void lineage(Configuration conf, JobContext job) {
        try {
            StringBuilderWriter out = new StringBuilderWriter(1024);
            JsonWriter writer = new JsonWriter(out);
            writer.beginObject();
            //添加赫拉的参数
            String taskId = System.getenv("hera.task.id");
            String taskName = System.getenv("hera.task.name");
            if (StringUtils.isNotBlank(taskId)) {
                conf.set("hera.task.id", taskId);
                writer.name("heraTaskId").value(taskId);
            }
            if (StringUtils.isNotBlank(taskName)) {
                conf.set("hera.task.name", taskName);
                writer.name("heraTaskName").value(taskName);
            }
            writer.name("inputs");
            writer.beginArray();
            String[] inputs = getPaths(job, FileInputFormat.INPUT_DIR);
            for (String dir : inputs) {
                writer.value(dir);
            }
            inputs = getPaths(job, "mapreduce.input.multipleinputs.dir.formats");
            for (String dir : inputs) {
                writer.value(dir.split(";")[0]);
            }
            writer.endArray();
            writer.name("outputs");
            writer.beginArray();
            String[] outputs = getPaths(job, FileOutputFormat.OUTDIR);
            for (String dir : outputs) {
                writer.value(dir);
            }
            writer.endArray();
            writer.endObject();
            writer.close();
            String lineage = out.toString();
            LOG.info(lineage);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    public static String[] getPaths(JobContext context, String param) {
        String dirs = context.getConfiguration().get(param, "");
        String [] list = org.apache.hadoop.util.StringUtils.split(dirs);
        String[] result = new String[list.length];
        for (int i = 0; i < list.length; i++) {
            result[i] = new String(org.apache.hadoop.util.StringUtils.unEscapeString(list[i]));
        }
        return result;
    }
}

MR任务运行完成之后,在日志之中会输出如下信息

2018-05-25 17:51:12 18/05/25 17:51:12 INFO mapreduce.LineageLogger: {"heraTaskId":"289384","inputs":["hdfs://ns1/wh/source/tp/cxy/biz/online/20180524"],"outputs":["hdfs://ns1/tmp/selfdel/online/20180525175102_93192"]}

Spark

类似于Hive,可以通过自定义org.apache.spark.scheduler.SparkListener

然后在spark/conf中指定spark.sql.queryExecutionListeners和spark.extraListeners

订阅事件总线上的事件

有关Spark事件总线的详情,可以参考《Spark2.3源码分析——LiveListenerBus(事件总线)

其他

虽然Hive/MR/Spark任务占了数据处理的多数,但仍然有一些数据处理不能覆盖,怎么样能够获取到这些信息呢?

这就需要祭出杀手锏——人工录入了

通过在元数据系统的数据详情页,维护当前数据的前置数据,即可串联起整个数据流程

image.png


血缘关系图的可视化

按照《数据血缘关系图中的元素》的介绍,一个血缘关系图中的要素有两部分,数据节点的和流转线路

故需要将前面收集到的原始信息做一次ETL处理,与元数据系统中的ID库做一个映射,进而封装成JSON数据,提供给前端展示,

如按照下面方式组织

{
	"vertices":["dp_acc_cc.dict_basicmm(5405)","cc_db.tt_plan(5115)","dp_prod_report.ttplan_cc_basicmm_minprice_crm(17120)","dp_prod_report.ttplan_cc_basicmm_minprice(17119)","dp_acc_cc.dict_llinfo(5415)","dp_prod_report.ttplan_cc(17118)","dp_acc_cc.dict_mminfo(5438)"],
	"edges":[
		{
			"source":{
				"database_name":"dp_prod_report",
				"name":"ttplan_cc_basicmm_minprice",
				"id":"dp_prod_report.ttplan_cc_basicmm_minprice(17119)",
				"ref_datasource_id":0,
				"job":{
					"name":"【报表平台】MM计划基础SS最低价",
					"id":471
				},
				"status":0
			},
			"target":{
				"database_name":"dp_acc_cc",
				"name":"dict_llinfo",
				"id":"dp_acc_cc.dict_llinfo(5415)",
				"ref_datasource_id":0,
				"job":{
					"name":"【FTP】下载dict_llinfo",
					"id":64
				},
				"status":0
			}
		},
		{
			"source":{
				"database_name":"dp_prod_report",
				"name":"ttplan_cc",
				"id":"dp_prod_report.ttplan_cc(17118)",
				"ref_datasource_id":0,
				"job":{
					"name":"【报表平台】【ETL】MM计划",
					"id":470
				},
				"status":0
			},
			"target":{
				"database_name":"cc_db",
				"name":"tt_plan",
				"id":"cc_db.tt_plan(5115)",
				"ref_datasource_id":0,
				"job":{
					"name":"【ETL】解析MM计划",
					"id":15
				},
				"status":0
			}
		},
		{
			"source":{
				"database_name":"dp_prod_report",
				"name":"ttplan_cc_basicmm_minprice",
				"id":"dp_prod_report.ttplan_cc_basicmm_minprice(17119)",
				"ref_datasource_id":0,
				"job":{
					"name":"【报表平台】MM计划基础SS最低价",
					"id":471
				},
				"status":0
			},
			"target":{
				"database_name":"dp_acc_cc",
				"name":"dict_basicmm",
				"id":"dp_acc_cc.dict_basicmm(5405)",
				"ref_datasource_id":0,
				"job":{
					"name":"【FTP】下载dict_basicmm",
					"id":30
				},
				"status":0
			}
		},
		{
			"source":{
				"database_name":"dp_prod_report",
				"name":"ttplan_cc_basicmm_minprice",
				"id":"dp_prod_report.ttplan_cc_basicmm_minprice(17119)",
				"ref_datasource_id":0,
				"job":{
					"name":"【报表平台】MM计划基础SS最低价",
					"id":471
				},
				"status":0
			},
			"target":{
				"database_name":"dp_prod_report",
				"name":"ttplan_cc",
				"id":"dp_prod_report.ttplan_cc(17118)",
				"ref_datasource_id":0,
				"job":{
					"name":"【报表平台】【ETL】MM计划",
					"id":470
				},
				"status":0
			}
		},
		{
			"source":{
				"database_name":"dp_prod_report",
				"name":"ttplan_cc_basicmm_minprice_crm",
				"id":"dp_prod_report.ttplan_cc_basicmm_minprice_crm(17120)",
				"ref_datasource_id":0,
				"job":{
					"name":"【数据支持】CRM-YY基础SS最低价",
					"id":669
				},
				"status":0
			},
			"target":{
				"database_name":"dp_prod_report",
				"name":"ttplan_cc_basicmm_minprice",
				"id":"dp_prod_report.ttplan_cc_basicmm_minprice(17119)",
				"ref_datasource_id":0,
				"job":{
					"name":"【报表平台】MM计划基础SS最低价",
					"id":471
				},
				"status":0
			}
		},
		{
			"source":{
				"database_name":"dp_prod_report",
				"name":"ttplan_cc_basicmm_minprice",
				"id":"dp_prod_report.ttplan_cc_basicmm_minprice(17119)",
				"ref_datasource_id":0,
				"job":{
					"name":"【报表平台】MM计划基础SS最低价",
					"id":471
				},
				"status":0
			},
			"target":{
				"database_name":"dp_acc_cc",
				"name":"dict_mminfo",
				"id":"dp_acc_cc.dict_mminfo(5438)",
				"ref_datasource_id":0,
				"job":{
					"name":"【FTP】下载dict_mminfo",
					"id":71
				},
				"status":0
			}
		}
	]
}

展示页面

image.png

读后有收获可以支付宝请作者喝咖啡