任务执行过程

问题:

  • 一个Spark任务究竟是怎么执行的?

一个Spark任务究竟是怎么执行的

老实说,我不知道从哪里开始, 考虑到一般Spark程序通过 spark-submitspark-shell 提交之后, 最终其实只会以 writecollect 进行实际执行,所以就从这2个函数入手。

write可能会比较复杂,涉及到实际存储,所以只是为了搞清楚执行过程, 还是从 collect 方法会比较简单。

所以我决定从一段简单的REPL代码入手:

半途发现任务序列化有点区别,所以就把groupRDD改成了下面的代码:

-   val groupRdd = sc.parallelize(Seq((1,2),(3,4),(5,6),(1,5),(3,8)))
+   val groupRdd = sc.textFile("D:\\test.csv").map(
+     x => {
+       val p = x.split(",")
+       ( p(0) , p(1) )
+   })

    val shuffRDD = groupRdd.groupByKey()
    shuffRDD.collect

挑了一个会产生ShuffledRDDgroupByKey 函数,这个函数在 PairRDDFunctions 里,在 RDD 本身代码里是找不到的。

顺便提一下这个 PairRDDFunctions,签名是这样的:

    class PairRDDFunctions[K, V](self: RDD[(K, V)])
    (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null)
  extends Logging with Serializable

这里用了scala的隐式转换,把RDD转换为一个PairRDDFunctions, 隐含的参数是pairRDD中包含的key和value的ClassTag,由于不确定RDD中key和value的类型, 采用了常见的隐式参数保留运行时类型信息。(看不明白的补一下ClassTag)

把RDD转换为 PairRDDFunctions的隐式转换方法在spark 1.3之前需要 import SparkContext._, 1.3之后挪到了RDD的 rddToPairRDDFunctions 方法里。

上一节,我们已经看到 groupRdd 是个 ParallelCollectionRDD, 接下来我们看看产生shuffRDD的groupByKey函数:

    def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
    // groupByKey shouldn't use map side combine because map side combine does not
    // reduce the amount of data shuffled and requires all map side data be inserted
    // into a hash table, leading to more objects in the old gen.

    // 这里实际传入的是CompactBuffer伴生类的apply函数
    val createCombiner = (v: V) => CompactBuffer(v)
    val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
    val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
    val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
      createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
    bufs.asInstanceOf[RDD[(K, Iterable[V])]]
  }

可以看到方法先建了一个CompactBuffer关于CompactBuffer的进一步深挖可以看这里, 外加定义了两个个函数,作为参数传入 combineByKeyWithClassTag,我们继续下到该函数里:

      def combineByKeyWithClassTag[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
    require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
    if (keyClass.isArray) {
      if (mapSideCombine) {
        throw new SparkException("Cannot use map-side combining with array keys.")
      }
      if (partitioner.isInstanceOf[HashPartitioner]) {
        throw new SparkException("HashPartitioner cannot partition array keys.")
      }
    }
    val aggregator = new Aggregator[K, V, C](
      self.context.clean(createCombiner),
      self.context.clean(mergeValue),
      self.context.clean(mergeCombiners))
    if (self.partitioner == Some(partitioner)) {
      self.mapPartitions(iter => {
        val context = TaskContext.get()
        new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
      }, preservesPartitioning = true)
    } else {
      new ShuffledRDD[K, V, C](self, partitioner)
        .setSerializer(serializer)
        .setAggregator(aggregator)
        .setMapSideCombine(mapSideCombine)
    }
  }

这个函数在2.4里还是带 @Experimental 的,最新版里已经去掉了这个注解,看来大家对它很满意=^_^=。

这是一个通用的pairRDD聚合方法,把 RDD[(K,V)] 转换成 RDD[(K,CombinedType)]

注意,这个方法是public的,也就是说有必要的话可以通过自定义以下三个聚合函数,创建自己的聚合方法:

  1. createCombiner:用来将创建包含单个V类型元素的某种C集合类型(比如说 val custom1 = (i:Int) => Seq(i),当然上面用的是CompactBuffer
  2. mergeValue:用来将单个V元素添加到C集合里去(比如说 val custom2 = (c:Seq[Int],i:Int) => c:+i)
  3. mergeCombiners:用来聚合C集合(比如说 val custom3 = (c1:Seq[Int],c2:Seq[Int])=>c1++c2)

欸嘿,我们这就顺手自定义了一个简版的聚合函数,给它们起个可爱的名字(当然也可以匿名):

 val miemie1 = (i:Int) => Seq(i)
 val miemie2 = (c:Seq[Int],i:Int) => c:+i
 val miemie3 = (c1:Seq[Int],c2:Seq[Int])=>c1++c2
groupRdd.combineByKeyWithClassTag(miemie1,miemie2,miemie3)
groupRdd.combineByKeyWithClassTag(miemie1,miemie2,miemie3).collect

执行结果如下: avatar 是不是很有趣?

下面的Aggregator就不往下深挖了,本篇的主要目的还是看任务执行。

快进到collect函数:

  def collect(): Array[T] = withScope {
    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
    Array.concat(results: _*)
  }

很简单,就2行,可以看出第二行是个对Driver压力很大的操作,

直接把各服务器上返回的Iterator返回成Array,再合并成一个,如果数据较多的话显然对Driver产生很大的内存压力。

一路往下我们可以看到调用链是这样的:

RDD.collect => sparkContext.runJob (当中套了好几次runJob的多态函数)=> dagScheduler.runJob =>dagScheduler.submitJob

一路最后来到DAGScheulersubmitJob函数:

def submitJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): JobWaiter[U] = {

          //...省略
    // 真正提交任务的就这么2句:
    // 先创建一个waiter,waiter的主要工作是在运行时向dagScheduler上报一些状态信息,同时控制任务工作
     val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
     //费了九牛二虎之力,终于把任务提交上去了
    eventProcessLoop.post(JobSubmitted(
      jobId, rdd, func2, partitions.toArray, callSite, waiter,
      SerializationUtils.clone(properties)))
      //...省略
      }

submitJob把这个event提交到了这里,兴冲冲点进去一看,结果只有:

      /**
   * Put the event into the event queue. The event thread will process it later.
   */
   // 注意这里的eventQueue,采用了`LinkedBlockingDeque`的线程安全双向链表并发阻塞队列
  def post(event: E): Unit = {
    eventQueue.put(event)
  }

摔(′д` )…彡…彡,在postput里也只是将获得的event提交上去而已,那么真正执行在哪里呢?

有post当然就有Receive,上面我们说的LinkedBlockingDeque楼下就有个eventThread

此处注意的是,这段获取事件队列的线程循环代码虽然在EventLoop里,

但实际上运行时调用的是其子类DAGSchedulerEventProcessLoop

(2.x里EventLoop的正经子类实现,spark-core里也就只有这么一个,其他都在testSuit里)

所以这个onReceive其实是DAGSchedulerEventProcessLoop里override的方法。

      private[spark] val eventThread = new Thread(name) {
    setDaemon(true)

    override def run(): Unit = {
      try {
        while (!stopped.get) {
            // 从队列里取出事件
          val event = eventQueue.take()
          try {
              // 交给子类的onReceive
            onReceive(event)
          } catch {
              //省略异常处理……
          }
        }
      } catch {
          // 省略异常处理……
      }
    }

  }

那么 DAGSchedulerEventProcessLoop 是怎么处理的呢?

     private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
         // 直接跳转到DAGScheduler的handleJobSubmitted方法
    case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
      // 加上这次我们主要看的JobSubmitted,一共有14种event
      // 省略其他event的处理……
     }
    private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      callSite: CallSite,
      listener: JobListener,
      properties: Properties) {
    var finalStage: ResultStage = null
    try {
      // New stage creation may throw an exception if, for example, jobs are run on a
      // HadoopRDD whose underlying HDFS files have been deleted.
      // 建立最终stage
      finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
    } catch {
        // 此处省略一些异常处理的消息上报,值得一提的是有2个默认值,
        // 最大重试次数 maxFailureNumTasksCheck 40
        // 如果有错误则会新增一个messageScheduler来重新提交一遍进eventProcessLoop
        // 延迟时间timeIntervalNumTasksCheck为15秒
    }

    //……这里暂时省略到下半场
    }

后面的调用链有点长:

DAGScheduler.createResultStage => 
    DAGScheduler.getOrCreateParentStages => 
        DAGScheduler.getShuffleDependencies =>
            DAGScheduler.getOrCreateShuffleMapStage =>
                DAGScheduler.getMissingAncestorShuffleDependencies (需要补stage的时候)

这里是真正计算Shuffle依赖关系的地方:

    /**
    简单翻译一下注释,这个方法只会返回直接的parent,
    如果存在 A <- B <- C的依赖,则只会返回 B--C 的依赖
   */
  private[scheduler] def getShuffleDependencies(
      rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
          // 生成3个HashSet 来进行过滤
    val parents = new HashSet[ShuffleDependency[_, _, _]]
    val visited = new HashSet[RDD[_]]
    val waitingForVisit = new ArrayStack[RDD[_]]
    waitingForVisit.push(rdd)
    while (waitingForVisit.nonEmpty) {
      val toVisit = waitingForVisit.pop()
      if (!visited(toVisit)) {
        visited += toVisit
        //注意这里!
        toVisit.dependencies.foreach {
          case shuffleDep: ShuffleDependency[_, _, _] =>
            parents += shuffleDep
          case dependency =>
            waitingForVisit.push(dependency.rdd)
        }
      }
    }
    parents
  }

注意toVisit.dependencies,这里同样调用了dependencies方法,还记得吗?

如其名,在这个函数里会对每一个当前RDD依赖的每一个RDD进行遍历,求它的ShuffleDependency

它是Dependency的子类,内部加入了一些shuffle专用的方法,先留个,

后续研究shuffle过程的时候再仔细看。

实际上就是传递了一个shuffle的依赖链返回给getOrCreateParentStages, 对每一个shuffleDep求getOrCreateShuffleMapStage

    /**
   * 简而言之就是前面有stage就取stage,没有就补一下
   */
  private def getOrCreateShuffleMapStage(
      shuffleDep: ShuffleDependency[_, _, _],
      firstJobId: Int): ShuffleMapStage = {
          // shuffleIdToMapStage 是个HashMap[Int, ShuffleMapStage]
    shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
        // 注意这里的stage是  ShuffleMapStage,即函数定义的返回值
      case Some(stage) =>
        stage

      case None =>
        // 按上面的代码,DAGScheduler里肯定是个空的HashMap咯,那就开始补
        // 注意这里补全部shuffle依赖,这里用了 ancestor,不确定是不是会追溯再之前的stage依赖。
        // 继续往下看
        // 这个函数的内容跟之前 getShuffleDependencies 的内容几乎是一样的,
        // 实际上关键的寻找shuffle依赖部分用的还是getShuffleDependencies
        // 只是循环查找的是之前每个RDD里是否存在shuffleStage生成。
        // 这个例子里是没有的,所以这段代码就被跳过了。
        getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
          // Even though getMissingAncestorShuffleDependencies only returns shuffle dependencies
          // that were not already in shuffleIdToMapStage, it's possible that by the time we
          // get to a particular dependency in the foreach loop, it's been added to
          // shuffleIdToMapStage by the stage creation process for an earlier dependency. See
          // SPARK-13902 for more information.
          // createShuffleMapStage函数会生成shuffleMapStage,并且扔进shuffleIdToMapStage
          // 关于这部分,下次在Shuffle过程里做个例子详细研究
          if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
            createShuffleMapStage(dep, firstJobId)
          }
        }
        // Finally, create a stage for the given shuffle dependency.
        // 为当前shuffleDependancy生成shuffleMapStage,并且扔进shuffleIdToMapStage里
        createShuffleMapStage(shuffleDep, firstJobId)
    }
  }

再继续往下,进入到createShuffleMapStage,我们平时spark UI里看到的stages,就是在这里生成的:

      def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = {
    val rdd = shuffleDep.rdd
    //……省略 BarrierStage 的检查
    // 任务数是由shuffleDep的rdd分区数量决定的
    val numTasks = rdd.partitions.length
    // 这里又调用一次,spark团队你有多害怕漏了前置依赖啊。。。
    val parents = getOrCreateParentStages(rdd, jobId)
    // 直接分配stage的id,注意这是在DAGScheduler里以并发安全方式分配的
    val id = nextStageId.getAndIncrement()
    // 生成新的 shuffleMapStage
    // 注意这里的 mapOutputTracker,这是在DAGScheduler生成的时候就同时生成的
    // 这是一个 MapOutputTrackerMaster 类,对这个类说明:
    // Driver-side class that keeps track of the location of the map output of a stage.
    // 就是用来指明map之后产生的数据去哪里拿的,用来提供信息给reduce程序
    // 然后这个creationSite也很有意思,这个creationSite就是创建这个RDD的代码文本(RDD里乱七八糟的东西还真多)
    val stage = new ShuffleMapStage(
      id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker)

    stageIdToStage(id) = stage
    shuffleIdToMapStage(shuffleDep.shuffleId) = stage
    updateJobIdStageIdMaps(jobId, stage)

    if (!mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
      // Kind of ugly: need to register RDDs with the cache and map output tracker here
      // since we can't do it in the RDD constructor because # of partitions is unknown
      // 这里算官方吐槽了吧……
      logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
      // 去 mapOutputTracker (MapOutputTrackerMaster)里注册
      mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
    }
    stage
  }

这个函数主要修改了DAGScheduler的3个内部变量,有趣的是这3个都是HashMap:

  • jobIdToStageIds:内部变量 nextJobId 产生的 JobId 和nextStageId产生的 StageId 的对应关系, 是最后被updateJobIdStageIdMaps递归修改的
  • stageIdToStage:最先被修改的,nextStageId产生的 StageId 与实际Stage之间的关系
  • shuffleIdToMapStage:newShuffleId 产生的ShuffleId与MapStage的关系, 注意这个newShuffleId是sparkContext的方法,也就是说这个shuffleId是sparkContext层面的唯一。

好了,shuffleStage终于完成了DAG生成、创建、注册的一系列过程,终于可以返回了, 此时ResultStage已经生成,回到handleJobSubmitted的下半场:

    private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      callSite: CallSite,
      listener: JobListener,
      properties: Properties) {
        //……此处省略上半场
        // resultStage已生成
    
    // 用上面的信息创建一个ActiveJob
     val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
     //清除RDD的分区和位置缓存信息,注意这个方法是synchronized
    clearCacheLocs()
    //……省略一些日志……日志里还不放心,还要调一次getMissingParentStages

    val jobSubmissionTime = clock.getTimeMillis()
    // 把这个ActiveJob放进 DAGScheduler 的 jobIdToActiveJob 里,后面会用到
    jobIdToActiveJob(jobId) = job
    activeJobs += job
    finalStage.setActiveJob(job)
    // 注意这里,上面更新 jobIdToStageIds 的用处来了,这个测试job有2个stage
    // 一个 shuffleStage <- 0,一个 resultStage <- 1
    val stageIds = jobIdToStageIds(jobId).toArray
    // 获取信息
    val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
    // 先提交给 LiveListenerBus 注册一个事件,这个事件实际上会通过 trait SparkListenerBus 转发 
    // 实现类是 AsyncEventQueue
    listenerBus.post(
      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
    // 把最终stage提交上去
    submitStage(finalStage)
  }
    }

接着看这个 submitStage:

    /** Submits stage, but first recursively submits any missing parents. */
  private def submitStage(stage: Stage) {
    // 把stage的jobId取出来,然后从上一个方法更新过的 jobIdToActiveJob 里找相应的jobId 并取出来 
    // 实际上就是确定当前需要跑的ActiveJob的Id
    // 注意它返回的是个Option,可能没有符合条件的jobId
    val jobId = activeJobForStage(stage)
    if (jobId.isDefined) {
      logDebug("submitStage(" + stage + ")")
      if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
          // 我们的老朋友 getMissingParentStages,看来无时无刻不担心stage发生变化……
          // 从方法注释上看这个方法的目的是递归提交stage,直到之前的父stage都跑完
          // 所以可以理解每次都需要重新计算一下之前的stages的状态
        val missing = getMissingParentStages(stage).sortBy(_.id)
        logDebug("missing: " + missing)
        if (missing.isEmpty) {
          logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
          // 激动吧?我们兜了这么大一圈,终于看到真正提交Task的地方了
          submitMissingTasks(stage, jobId.get)
        } else {
          for (parent <- missing) {
              // 递归提交上一层stage
            submitStage(parent)
          }
          waitingStages += stage
        }
      }
    } else {
      abortStage(stage, "No active job for stage " + stage.id, None)
    }
  }

来,我们看看这个 submitMissingTasks(这个方法好长……酌情省略):

      /** Called when stage's parents are available and we can now do its task. */
  private def submitMissingTasks(stage: Stage, jobId: Int) {
    logDebug("submitMissingTasks(" + stage + ")")

    // First figure out the indexes of partition ids to compute.
    // 先取stage的partition,之前说了,Stage是个抽象类,它的实现只有2种:
    // ShuffleMapStage 和 ResultStage,结合测试代码看,第一次调用此方法的肯定是ShuffleMapStage
    // 还记得之前创建ShuffleMapStage的构造函数吗?创建的时候传入了一个MapOutputTrackerMaster
    // 由这个 MapOutputTrackerMaster 决定 ShuffleMapStage的物理位置
    val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()

    // Use the scheduling pool, job group, description, etc. from an ActiveJob associated
    // with this Stage
    // 获得一些配置,这个properties将来执行任务会用到
    val properties = jobIdToActiveJob(jobId).properties

    //把当前stage加入执行队列
    runningStages += stage
    // SparkListenerStageSubmitted should be posted before testing whether tasks are
    // serializable. If tasks are not serializable, a SparkListenerStageCompleted event
    // will be posted, which should always come after a corresponding SparkListenerStageSubmitted
    // event.
    stage match {
      case s: ShuffleMapStage =>
      // 这个 outputCommitCoordinator 可厉害了,是Spark集群启动时维护的 OutputCommitCoordinator 对象
      // 它会同时跟踪 task Id 和 stage Id,用来避免某些情况下同一个 stage 的 task 执行两遍,
      // 通过在内部维护着一个私有变量stageStates来决定partition上的task是否有权对这个partition提交任务输出。
      // 这部分实现被修改过,在2.0和1.x时代通过一个内部变量 authorizedCommittersByStage 来实现
      // 详细实现下次开个坑说吧,这里调用stageStart主要就是对task进行授权注册,表示这个stage要开始执行了。
      // 注意 stageStart 这个方法是 synchronized 的。
        outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
      case s: ResultStage =>
        outputCommitCoordinator.stageStart(
          stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
    }
    /**
    // 敲重点:getPreferredLocs
    // 看名字就知道了吧,这里是对执行的最佳物理位置的计算
    // 主要调用链是:getPreferredLocs => getPreferredLocsInternal => rdd.preferredLocations =>
    // MapOutputTrackerMaster.getPreferredLocationsForShuffle
    // 在 getPreferredLocsInternal 里把 RDD 的 partition 取出来(这里会重新检查一下分区),
    // 传入 RDD 的 preferredLocations 里。
    // 注意这里的RDD,我们的测试例子里第一个产生的是 ShuffledRDD ,在 ShuffledRDD 里 overwrite 了这个方法。
    // ShuffledRDD 的 preferredLocations 里,
    // 会把第一个 dependencies 依赖和 partition 的index 传进 MapOutputTrackerMaster (SprakEnv里get出来) 
    // 的 getPreferredLocationsForShuffle 函数里。
    // 敲重点,优化注意!
       在上述方法里有3个阈值变量,都是静态直接写死的,分别是:
        1. SHUFFLE_PREF_MAP_THRESHOLD = 1000 
        2. SHUFFLE_PREF_REDUCE_THRESHOLD =1000
        3. REDUCER_PREF_LOCS_FRACTION = 0.2 
       1和2代表了map和reduce的分区上限,超过这个数都会直接跳过最佳物理位置计算,
       理由是超过了这2个数,计算最佳位置本身就很耗时。
       3是最佳物理位置计算时每个location时大于这个阈值则视为reduce任务的首选位置,
        把这个值放大的话会更倾向于本地读取数据,但如果该位置很忙的话,会带来更多调度延迟。
    */
    val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
      stage match {
        case s: ShuffleMapStage =>
          partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
        case s: ResultStage =>
          partitionsToCompute.map { id =>
            val p = s.partitions(id)
            (id, getPreferredLocs(stage.rdd, p))
          }.toMap
      }
    } catch {
      case NonFatal(e) =>
        stage.makeNewStageAttempt(partitionsToCompute.size)
        listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
        abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
        runningStages -= stage
        return
    }

    // 最后我们从 taskIdToLocations 得到了分区与最佳位置的对应关系
    // 这里会注册一个 TaskMetrics 统计stage的相关信息
    stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)

    //……省略部分代码 

    // 照例向ListenerBus上报一个event
    listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))

    // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.
    // Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast
    // the serialized copy of the RDD and for each task we will deserialize it, which means each
    // task gets a different copy of the RDD. This provides stronger isolation between tasks that
    // might modify state of objects referenced in their closures. This is necessary in Hadoop
    // where the JobConf/Configuration object is not thread-safe.
    // 开始了开始了,把RDD和shuffleDep序列化,然后广播出去
    var taskBinary: Broadcast[Array[Byte]] = null
    var partitions: Array[Partition] = null
    try {
      // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
      // For ResultTask, serialize and broadcast (rdd, func).
      var taskBinaryBytes: Array[Byte] = null
      // taskBinaryBytes and partitions are both effected by the checkpoint status. We need
      // this synchronization in case another concurrent job is checkpointing this RDD, so we get a
      // consistent view of both variables.
      RDDCheckpointData.synchronized {
        taskBinaryBytes = stage match {
          // 这里遇到的问题,ParallelCollectionRDD 的话这个地方会直接把内部的data一起序列化广播出去
          // HadoopRDD则会不会有data传递
          case stage: ShuffleMapStage =>
            JavaUtils.bufferToArray(
              closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
          case stage: ResultStage =>
            JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
        }

        partitions = stage.rdd.partitions
      }
      // 广播序列化后的任务数据
      taskBinary = sc.broadcast(taskBinaryBytes)
    } catch {
        // …… 序列化失败时的一些异常处理
    }

    val tasks: Seq[Task[_]] = try {
      val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
      stage match {
        case stage: ShuffleMapStage =>
          stage.pendingPartitions.clear()
          // 对每一个Partitions新生成一个Task,
          // Task是个抽象类,只有两种有效实现,跟stage一样,ShuffleMapTast和ResultTask
          partitionsToCompute.map { id =>
            // 这个时候用到了之前计算的task和位置的关系
            val locs = taskIdToLocations(id)
            val part = partitions(id)
            stage.pendingPartitions += id
            new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
              taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
              Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
          }

        case stage: ResultStage =>
          // ……省略resultTask生成处理
      }
    } catch {
        // ……省略异常处理
    }

    if (tasks.size > 0) {
      //……省略若干无关代码
      // 提交任务,taskScheduler只有一种实现:TaskSchedulerImpl
      // 提交后会生成一个TaskManager,处理一些任务冲突和资源问题
      // 之后挂进 SchedulableBuilder 的Pool里,这个SchedulableBuilder 是 TaskScheduler 初始化的时候生成用来管理Task的,
      // 有两个我们耳熟能详的调度实现:
      // FIFOSchedulableBuilder 和 FairSchedulableBuilder,
      // 任务如何调度取决于配置文件里配了哪一种方案,会在实际初始化 TaskSchedulerImpl 的时候生成哪一个Builder
      // 之后通过 SchedulerBackend 提交给任务池队列里,同时用 reviveOffers 唤醒 SchedulerBackend,
      // 实际上就是发条消息给 RpcEndpoint (实现类NettyRpcEndpointRef)
      // 这里多插一句,schedulerBackend 的实现类有2种,CoarseGrainedSchedulerBackend 和 LocalSchedulerBackend
      // 顾名思义,后者就是本地模式采用的,这里用了testSuit,所以启动的也是 LocalSchedulerBackend
      // 收到消息的RpcEndpoint会直接扔给postOneWayMessage(远程RPC会扔给 postToOutbox)
      // 后面都是远程消息处理部分,下一篇详细研究,简单说就是投递给endpoint相对应的Inbox,
      // 在Inbox里处理消息,转回给endpoint的receive函数进行处理(当然有些别的,此处不赘述),
      // 其实就是转了一圈又回到了 LocalSchedulerBackend 这里。
      // 而 LocalSchedulerBackend 的 receive 函数对 ReviveOffers 消息用的是 ReviveOffers() 进行处理。
      
      taskScheduler.submitTasks(new TaskSet(
        tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
    } else {
     
     // ……省略若干stage完成后处理工作
    }

实际 LocalSchedulerBackend 调用 executor 的地方在这里: 所以你可以看到,如果用的是 local[*] 的模式,这个 executor 是启动 endpoint时候直接 new 出来的, 下面代码里的 localexecutorid 一直都是 driver(因为没别的类型的execuror了) 在 launchTask 函数之后就没什么特殊之处了, 构建一个 TaskRunner 的任务封装(这个类的 run 函数超复杂, 想象一下它要完成反序列化任务,进行实际的计算等等) 然后调用 executor 的线程池开始执行。

 private val executor = new Executor(
    localexecutorid, localExecutorHostname, SparkEnv.get, userClassPath, isLocal = true)
  // …… 省略其他代码
  def reviveOffers() {
    val offers = IndexedSeq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores,
      Some(rpcEnv.address.hostPort)))
    for (task <- scheduler.resourceOffers(offers).flatten) {
      freeCores -= scheduler.CPUS_PER_TASK
      executor.launchTask(executorBackend, task)
    }
  }

其实任务执行的时候executor 还会有个 reportHeartBeat 发送一个心跳消息,去上报状态, 而全局的 NettyRpcEnv 也会有个 ask 函数定期去获取状态。

。。。总算写完了,好累。。。

凡Spark任务执行,驰车千驷,革车千乘,带甲十万,千里馈粮。 则内外之费,宾客之用,胶漆之材,车甲之奉,日费千金,然后十万之师举矣

说个鬼故事: 这才只是local[*]


参考:

  1. 为啥key不能是array?
  2. 调试Spark:

    调试Spark需要拿出魔法调试这个万能工具了,不能不感叹Spark团队代码的优秀, 为Spark代码提供了完整的测试用例,直接打开 ShuffleSuite,执行第一个。 结果调试到一半的时候发现 ParallelCollectionRDD 的行为和 HadoopRDD 的行为不太一样,然后半途换了一下。 如果是ParallelCollectionRDD,在生成 taskBinaryBytes 的时候会直接把数据都一起做序列化送出去, 如果是HadoopRDD,则只会把URI和相关信息序列化送出去。

    avatar