Spark2.3源码分析——RDD的checkpoint(检查点)实现

  |   0 评论   |   2,177 浏览

checkpoint简介

checkpoint(检查点)是很多分布式系统为了容灾容错引入的机制,其实质是将系统运行时的内存数据结构和状态持久化到磁盘上,在需要的时候通过读取这些数据,重新构造出之前的运行时状态。

Spark中使用检查点来将RDD的执行状态保存下来,在作业失败重试的时候,从检查点中恢复之前已经运行成功的RDD结果,这样就会大大减少重新计算的成本,提高任务恢复效率和执行效率,节省Spark各个计算节点的资源。

使用方法

调用RDD的checkpoint方法

scala> val data = sc.parallelize(1 to 100000000, 3)
scala> sc.setCheckpointDir("D:/tmp/spark/checkpoint")
scala> data.checkpoint()
scala> data.count

查看checkpoint目录

image.png

可以看到,生成了partition个二进制文件

代码分析

设置checkpoint目录

org.apache.spark.SparkContext#setCheckpointDir

如果运行在cluster模式则checkpoint目录必须是一个HDFS的路径,原因见下图注释

image.png

调用checkpoint方法

org.apache.spark.rdd.RDD#checkpoint

检查checkpointDir和checkpointData

checkpointData的定义如下

private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None

image.png

主要就是构造checkpointData的实现类ReliableRDDCheckpointData的实例,并没有进行checkpoint操作,此处只对RDD做了标记

ReliableRDDCheckpointData

ReliableRDDCheckpointData的类型是RDDCheckpointData,它实现了将RDD的数据保存到可靠存储上的功能。这能在Driver在失败重试时,恢复到之前的计算状态。

RDDCheckpointData类包含了与RDD检查点相关的所有信息. 这个类的每个实例都与RDD相关联。它也管理相关联的RDD的Checkpoint过程,并通过提供更新后的分区、迭代器和Checkpoint RDD的首选位置来管理Checkpoint后的状态。

执行checkpint的时机

那什么触发真正的checkpoint操作?仔细看上面例子,执行data.count之后才会生成checkpoint文件。是的,只有在执行Action的时候才会进行checkpint。Spark在执行完Job之后会判断是否需要checkpint

我们可以从org.apache.spark.SparkContext#runJob方法得到验证

image.png

执行checkpoint

doCheckpoint()的定义如下,通过递归调用依次将父RDD、依赖的RDD、当前RDD进行checkpoint

org.apache.spark.rdd.RDD#doCheckpoint

image.png

只有定义了checkpointData时才会对自己的RDD进行checkpoint

更改状态后,调用子类的doCheckpoint方法

org.apache.spark.rdd.RDDCheckpointData#checkpoint

image.png

org.apache.spark.rdd.ReliableRDDCheckpointData#doCheckpoint

image.png

将RDD保存到checkpoint文件中

通过调用ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, cpDir)方法,将RDD保存到checkpoint文件中

org.apache.spark.rdd.ReliableCheckpointRDD#writeRDDToCheckpointDirectory

分为两步

  1. 将分区数据写入文件

  2. 将Partitioner写入文件

需要注意的是第2步,这个操作是尽力而为的,当写partitioner时,有任何异常发生, 只会打日志,然后忽略

image.png

写完Checkpint文件之后,会返回代表该RDD的ReliableCheckpointRDD对象,并最后赋值给cpRDD,并将Checkpint的状态变成Checkpointed。最后将这个RDD的依赖全部清除(markCheckpointed())

整个写操作到此就完成了

读checkpoint文件

在RDD Checkpint完之后,Checkpint的信息(比如数据存放的目录)都由RDDCheckpointData去管理

后续获取RDD的dependencies、partitions、preferredLocations都是通过checkpointRDD对象

/**
 * Get the list of dependencies of this RDD, taking into account whether the
 * RDD is checkpointed or not.
 */
final def dependencies: Seq[Dependency[_]] = {
  checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {
    if (dependencies_ == null) {
      dependencies_ = getDependencies
    }
    dependencies_
  }
}
/**
 * Get the array of partitions of this RDD, taking into account whether the
 * RDD is checkpointed or not.
 */
final def partitions: Array[Partition] = {
  checkpointRDD.map(_.partitions).getOrElse {
    if (partitions_ == null) {
      partitions_ = getPartitions
      partitions_.zipWithIndex.foreach { case (partition, index) =>
        require(partition.index == index,
          s"partitions($index).partition == ${partition.index}, but it should equal $index")
      }
    }
    partitions_
  }
}
final def preferredLocations(split: Partition): Seq[String] = {
  checkpointRDD.map(_.getPreferredLocations(split)).getOrElse {
    getPreferredLocations(split)
  }
}

当RDD被缓存时会调用RDD.iterator()

image.png

读后有收获可以支付宝请作者喝咖啡