Spark2.3源码分析——Task的启动

  |   0 评论   |   1,676 浏览

背景

我们知道,在Spark任务运行时,会在NodeManager上生成CoarseGrainedExecutorBackend进程来执行最终的计算任务。

如图

image.png

该进程是怎么启动的呢?下面我们来分析一下

CoarseGrainedExecutorBackend的启动

在CoarseGrainedExecutorBackend类的伴生对象中定义了main方法,可以用来启动CoarseGrainedExecutorBackend进程

def main(args: Array[String]) {
  var driverUrl: String = null
  var executorId: String = null
  var hostname: String = null
  var cores: Int = 0
  var appId: String = null
  var workerUrl: Option[String] = None
  val userClassPath = new mutable.ListBuffer[URL]()

  var argv = args.toList
  while (!argv.isEmpty) {
    argv match {
      case ("--driver-url") :: value :: tail =>
        driverUrl = value
        argv = tail
      case ("--executor-id") :: value :: tail =>
        executorId = value
        argv = tail
      case ("--hostname") :: value :: tail =>
        hostname = value
        argv = tail
      case ("--cores") :: value :: tail =>
        cores = value.toInt
        argv = tail
      case ("--app-id") :: value :: tail =>
        appId = value
        argv = tail
      case ("--worker-url") :: value :: tail =>
        // Worker url is used in spark standalone mode to enforce fate-sharing with worker
        workerUrl = Some(value)
        argv = tail
      case ("--user-class-path") :: value :: tail =>
        userClassPath += new URL(value)
        argv = tail
      case Nil =>
      case tail =>
        // scalastyle:off println
        System.err.println(s"Unrecognized options: ${tail.mkString(" ")}")
        // scalastyle:on println
        printUsageAndExit()
    }
  }

  if (driverUrl == null || executorId == null || hostname == null || cores <= 0 ||
    appId == null) {
    printUsageAndExit()
  }

  run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)
  System.exit(0)
}

该方法中会调用run方法启动

org.apache.spark.executor.CoarseGrainedExecutorBackend#run

image.png

image.png

处理接收到的事件消息

CoarseGrainedExecutorBackend启动后,就等待接受其他工作进程所发送的事件消息,并做对应的响应

org.apache.spark.executor.CoarseGrainedExecutorBackend#receive

override def receive: PartialFunction[Any, Unit] = {
  case RegisteredExecutor =>
    logInfo("Successfully registered with driver")
    try {
      executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
    } catch {
      case NonFatal(e) =>
        exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
    }

  case RegisterExecutorFailed(message) =>
    exitExecutor(1, "Slave registration failed: " + message)

  case LaunchTask(data) =>
    if (executor == null) {
      exitExecutor(1, "Received LaunchTask command but executor was null")
    } else {
      val taskDesc = TaskDescription.decode(data.value)
      logInfo("Got assigned task " + taskDesc.taskId)
      executor.launchTask(this, taskDesc)
    }

  case KillTask(taskId, _, interruptThread, reason) =>
    if (executor == null) {
      exitExecutor(1, "Received KillTask command but executor was null")
    } else {
      executor.killTask(taskId, interruptThread, reason)
    }

  case StopExecutor =>
    stopping.set(true)
    logInfo("Driver commanded a shutdown")
    // Cannot shutdown here because an ack may need to be sent back to the caller. So send
    // a message to self to actually do the shutdown.
    self.send(Shutdown)

  case Shutdown =>
    stopping.set(true)
    new Thread("CoarseGrainedExecutorBackend-stop-executor") {
      override def run(): Unit = {
        // executor.stop() will call `SparkEnv.stop()` which waits until RpcEnv stops totally.
        // However, if `executor.stop()` runs in some thread of RpcEnv, RpcEnv won't be able to
        // stop until `executor.stop()` returns, which becomes a dead-lock (See SPARK-14180).
        // Therefore, we put this line in a new thread.
        executor.stop()
      }
    }.start()

  case UpdateDelegationTokens(tokenBytes) =>
    logInfo(s"Received tokens of ${tokenBytes.length} bytes")
    SparkHadoopUtil.get.addDelegationTokens(tokenBytes, env.conf)
}


RegisteredExecutor

image.png

启动心跳

org.apache.spark.executor.Executor#startDriverHeartbeater

image.png

LaunchTask

image.png

org.apache.spark.executor.Executor#launchTask

image.png

org.apache.spark.executor.Executor#threadPool

image.png

org.apache.spark.executor.Executor.TaskRunner#run

构造Task并启动

image.png

Task的启动

org.apache.spark.scheduler.Task#run

构造TaskContextImpl

image.png

ShuffleMapTask#runTask

org.apache.spark.scheduler.ShuffleMapTask#runTask

image.png

image.png

ResultTask#runTask

org.apache.spark.scheduler.ResultTask#runTask

image.png

读后有收获可以支付宝请作者喝咖啡