Spark2.3源码分析——DAGScheduler处理Job的过程

  |   0 评论   |   1,758 浏览

处理Job的提交

在《Spark2.3源码分析——Job提交过程》一文中,我们提到DAGSchedulerEventProcessLoop中的事件处理线程,根据事件的不同类型,最终又回调了dagScheduler中对应的方法

DAGScheduler#handleJobSubmitted

image.png

整个过程是这样的:

  1. 构建ResultStage。

  2. 创建ActiveJob,ActiveJob表示已经激活的Job,即被DAGScheduler接收处理的Job

  3. 获取当前Job的所有Stage对应的StageInfo,然后向LiveListenerBus发送Job提交事件。

划分Stage

一个Job可能被划分为多个Stage,各个Stage之间存在着依赖关系,下游的Stage依赖于上游的Stage,Stage划分过程是从最后一个Stage开始往前执行的,最后一个Stage的类型是ResultStage。

ResultStage可以使用指定的函数对RDD中的分区进行计算并得到最终结果。ResultStage是最后执行的Stage,此阶段主要进行作业的收尾工作(例如:对各个分区的数据收集、打印到控制台或写入HDFS)

创建ResultStage

DAGScheduler#createResultStage用来创建ResultStage

创建ResultStage之前需要获取所有父Stage的列表

image.png

获取或创建父Stage列表

对于给定的RDD,获取或创建父Stage列表,从当前的RDD向前探索,找到宽依赖处划分出parentStage,并用提供的firstJobId创建新Stage

org.apache.spark.scheduler.DAGScheduler#getOrCreateParentStages

image.png

获取RDD的所有ShuffleDependencie的序列

org.apache.spark.scheduler.DAGScheduler#getShuffleDependencies

image.png

获取ShuffleMapStage

org.apache.spark.scheduler.DAGScheduler#getOrCreateShuffleMapStageimage.png

获取在shuffleToMapStage中注册的祖先Shuffle依赖关系

org.apache.spark.scheduler.DAGScheduler#getMissingAncestorShuffleDependencies

image.png

提交ResultStage

handleJobSubmitted方法的最后一步是调用submitStage提交Stage

判断当前Stage的父Stage是否完成,如果有未完成的,则递归的提交父Stage

org.apache.spark.scheduler.DAGScheduler#submitStage

image.png

获取当前Stage所有未提交的父Stage

org.apache.spark.scheduler.DAGScheduler#getMissingParentStages

image.png

image.png

判断Stage的未提交父Stage的条件如下:

  1. Stage的RDD分区中存在没有对应的TaskLocation序列的分区,即调用getCacheLocs方法获取不到某个分区的TaskLocation序列,说明当前Stage的某个上游ShuffleMapStage的某个分区未执行。

  2. Stage的上游ShuffleMapStage不可用(即调用ShuffleMapStage的isAvailable方法返回false)

提交还未计算的Task

在Stage没有不可用的父Stage时,提交当前Stage中还未提交的Task

提交Task的入口是org.apache.spark.scheduler.DAGScheduler#submitMissingTasks

image.png

image.png

image.png

image.png

image.png

将Stage标记为完成

org.apache.spark.scheduler.DAGScheduler#markStageAsFinished

image.png

读后有收获可以支付宝请作者喝咖啡