大数据血缘分析系统设计(四)

  |   0 评论   |   5,855 浏览

在前一篇文章《大数据血缘分析系统设计(三)》中,我们介绍了数据级别的血缘关系分析,接下来,我们分析以下字段级别的血缘关系

尽管在大多数场景下,血缘关系分析到数据级别已经够用了,但在某些场景下仍显得不足,比如我想了解更改字段类型的影响有多大、字段是如何产生的等,此时就需要字段级别的血缘关系

字段级别的血缘关系说明

按照Hive当中的定义,分为两种:Projection和Predicate

Projection

投影,只影响单一输出字段,因此可以看到单个字段的产生、转换、后续使用等

如下图,是字段orddatte的DAG


Predicate

谓语、断言,影响所有输出字段

如下图,是createtime字段的血缘关系图,由于createtime字段参与了where子句中的筛选,因此,输出表的所有字段都会跟该字段关联起来

image.png

血缘关系的收集

SQL

类似于数据级别的血缘关系,可以通过在Hive中配置hive.exec.post.hooks参数来收集

查看org.apache.hadoop.hive.ql.hooks.LineageLogger代码

org.apache.hadoop.hive.ql.hooks.LineageLogger#run

  1. 从Context中拿到QueryPlan对象

  2. 解析查询计划,拿到Edge列表,即字段转化关系

  3. 根据Edge,拿到Vertex集合,即字段节点

image.png

org.apache.hadoop.hive.ql.hooks.LineageLogger#getEdges

核心代码就两段

基于查询计划的输出, 找出目标表名和字段列表

//拿到最终的SelectOperator列表
LinkedHashMap<String, ObjectPair<SelectOperator,
  org.apache.hadoop.hive.ql.metadata.Table>> finalSelOps = index.getFinalSelectOps();
Map<String, Vertex> vertexCache = new LinkedHashMap<String, Vertex>();
List<Edge> edges = new ArrayList<Edge>();
for (ObjectPair<SelectOperator, org.apache.hadoop.hive.ql.metadata.Table> pair: finalSelOps.values()) {
  List<FieldSchema> fieldSchemas = plan.getResultSchema().getFieldSchemas();
  SelectOperator finalSelOp = pair.getFirst();
  org.apache.hadoop.hive.ql.metadata.Table t = pair.getSecond();
  String destTableName = null;
  List<String> colNames = null;
  if (t != null) {
    destTableName = t.getDbName() + "." + t.getTableName();
    fieldSchemas = t.getCols();
  } else {
    // 基于查询计划的输出, 找出目标表名和字段列表.
    for (WriteEntity output : plan.getOutputs()) {
      Entity.Type entityType = output.getType();
      if (entityType == Entity.Type.TABLE
          || entityType == Entity.Type.PARTITION) {
        t = output.getTable();
        destTableName = t.getDbName() + "." + t.getTableName();
        List<FieldSchema> cols = t.getCols();
        if (cols != null && !cols.isEmpty()) {
          colNames = Utilities.getColumnNamesFromFieldSchema(cols);
        }
        break;
      }
    }
  }

遍历每个目标字段,生成血缘关系的边

Set<Vertex> targets = new LinkedHashSet<Vertex>();
for (int i = 0; i < fields; i++) {
  Vertex target = getOrCreateVertex(vertexCache,
    getTargetFieldName(i, destTableName, colNames, fieldSchemas),
    Vertex.Type.COLUMN);
  targets.add(target);
  Dependency dep = dependencies.get(i);
  addEdge(vertexCache, edges, dep.getBaseCols(), target,
    dep.getExpr(), Edge.Type.PROJECTION);
}
Set<Predicate> conds = index.getPredicates(finalSelOp);
if (conds != null && !conds.isEmpty()) {
  for (Predicate cond: conds) {
    addEdge(vertexCache, edges, cond.getBaseCols(),
      new LinkedHashSet<Vertex>(targets), cond.getExpr(),
      Edge.Type.PREDICATE);
  }
}

org.apache.hadoop.hive.ql.hooks.LineageLogger#getVertices

直接从Edge中取出sources和targets


private Set<Vertex> getVertices(List<Edge> edges) {
  Set<Vertex> vertices = new LinkedHashSet<Vertex>();
  for (Edge edge: edges) {
    vertices.addAll(edge.targets);
  }
  for (Edge edge: edges) {
    vertices.addAll(edge.sources);
  }
  // Assign ids to all vertices,
  // targets at first, then sources.
  int id = 0;
  for (Vertex vertex: vertices) {
    vertex.id = id++;
  }
  return vertices;
}

人工录入

字段相比数据,更加难以获取血缘关系,除了SQL类型的,可以通过语法分析拿到,其它的都要靠人工录入来完成,因此,需要以下的维护页面

新增依赖关系

由于维护字段的关系工作量繁重,为了减轻工作量,可以给出字段建议,通过分析当前字段所在表的前后依赖关系,可以给出来源字段和目标字段的候选项

image.png

列表页

image.png

字段血缘关系图

准备数据

上面收集到的原始数据,需要进行ETL,与元数据系统中的ID关联起来,才能方便提供给前端页面展示

下面给出一个demo数据

{
	"vertices":["ll_inland_detail_basic.actioncode(4420694)","ll_inland_detail_basic_copy.actioncode(4141049)","mm_transform_source.actioncode(4423049)","mm_transform_source_page.actioncode(4443827)"],
	"edges":[
		{
			"expression":"n.actioncode",
			"source":{
				"name":"actioncode",
				"id":"ll_inland_detail_basic.actioncode(4420694)",
				"type":"string",
				"dataset":{
					"name":"ll_inland_detail_basic",
					"id":4587,
					"ref_datasource_id":0,
					"status":0
				},
				"status":0
			},
			"type":"PROJECTION",
			"target":{
				"name":"actioncode",
				"id":"mm_transform_source.actioncode(4423049)",
				"type":"string",
				"dataset":{
					"name":"mm_transform_source",
					"id":16544,
					"ref_datasource_id":0,
					"status":0
				},
				"status":0
			}
		},
		{
			"expression":"n.actioncode",
			"source":{
				"name":"actioncode",
				"id":"ll_inland_detail_basic.actioncode(4420694)",
				"type":"string",
				"dataset":{
					"name":"ll_inland_detail_basic",
					"id":4587,
					"ref_datasource_id":0,
					"status":0
				},
				"status":0
			},
			"type":"PROJECTION",
			"target":{
				"name":"actioncode",
				"id":"mm_transform_source_page.actioncode(4443827)",
				"type":"string",
				"dataset":{
					"name":"mm_transform_source_page",
					"id":17188,
					"ref_datasource_id":0,
					"status":0
				},
				"status":0
			}
		},
		{
			"expression":"NULL",
			"source":{
				"name":"actioncode",
				"id":"ll_inland_detail_basic.actioncode(4420694)",
				"type":"string",
				"dataset":{
					"name":"ll_inland_detail_basic",
					"id":4587,
					"ref_datasource_id":0,
					"status":0
				},
				"status":0
			},
			"type":"PROJECTION",
			"target":{
				"name":"actioncode",
				"id":"ll_inland_detail_basic_copy.actioncode(4141049)",
				"type":"string",
				"dataset":{
					"name":"ll_inland_detail_basic_copy",
					"id":11452,
					"ref_datasource_id":0,
					"status":0
				},
				"status":0
			}
		}
	]
}

前端页面展示image.png

读后有收获可以支付宝请作者喝咖啡