任务执行过程
问题:
- 一个Spark任务究竟是怎么执行的?
一个Spark任务究竟是怎么执行的
老实说,我不知道从哪里开始, 考虑到一般Spark程序通过 spark-submit
或 spark-shell
提交之后, 最终其实只会以 write
和 collect
进行实际执行,所以就从这2个函数入手。
write
可能会比较复杂,涉及到实际存储,所以只是为了搞清楚执行过程,还是从 collect
方法会比较简单。
所以我决定从一段简单的REPL代码入手:
半途发现任务序列化有点区别,所以就把groupRDD改成了下面的代码:
- val groupRdd = sc.parallelize(Seq((1,2),(3,4),(5,6),(1,5),(3,8)))
+ val groupRdd = sc.textFile("D:\\test.csv").map(
+ x => {
+ val p = x.split(",")
+ ( p(0) , p(1) )
+ })
val shuffRDD = groupRdd.groupByKey()
shuffRDD.collect
挑了一个会产生ShuffledRDD
的groupByKey
函数,这个函数在 PairRDDFunctions
里,在 RDD
本身代码里是找不到的。
顺便提一下这个 PairRDDFunctions
,签名是这样的:
class PairRDDFunctions[K, V](self: RDD[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null)
extends Logging with Serializable
这里用了scala的隐式转换,把RDD转换为一个PairRDDFunctions
, 隐含的参数是pairRDD中包含的key和value的ClassTag,由于不确定RDD中key和value的类型, 采用了常见的隐式参数保留运行时类型信息。(看不明白的补一下ClassTag
)
把RDD转换为 PairRDDFunctions
的隐式转换方法在spark 1.3之前需要 import SparkContext._
, 1.3之后挪到了RDD的 rddToPairRDDFunctions
方法里。
在上一节,我们已经看到 groupRdd
是个 ParallelCollectionRDD
, 接下来我们看看产生shuffRDD的groupByKey
函数:
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
// groupByKey shouldn't use map side combine because map side combine does not
// reduce the amount of data shuffled and requires all map side data be inserted
// into a hash table, leading to more objects in the old gen.
// 这里实际传入的是CompactBuffer伴生类的apply函数
val createCombiner = (v: V) => CompactBuffer(v)
val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
bufs.asInstanceOf[RDD[(K, Iterable[V])]]
}
可以看到方法先建了一个CompactBuffer
关于CompactBuffer
的进一步深挖可以看这里, 外加定义了两个个函数,作为参数传入 combineByKeyWithClassTag
,我们继续下到该函数里:
def combineByKeyWithClassTag[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
if (keyClass.isArray) {
if (mapSideCombine) {
throw new SparkException("Cannot use map-side combining with array keys.")
}
if (partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("HashPartitioner cannot partition array keys.")
}
}
val aggregator = new Aggregator[K, V, C](
self.context.clean(createCombiner),
self.context.clean(mergeValue),
self.context.clean(mergeCombiners))
if (self.partitioner == Some(partitioner)) {
self.mapPartitions(iter => {
val context = TaskContext.get()
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true)
} else {
new ShuffledRDD[K, V, C](self, partitioner)
.setSerializer(serializer)
.setAggregator(aggregator)
.setMapSideCombine(mapSideCombine)
}
}
这个函数在2.4里还是带 @Experimental
的,最新版里已经去掉了这个注解,看来大家对它很满意=^_^=。
这是一个通用的pairRDD聚合方法,把 RDD[(K,V)]
转换成 RDD[(K,CombinedType)]
。
注意,这个方法是public的,也就是说有必要的话可以通过自定义以下三个聚合函数,创建自己的聚合方法:
-
createCombiner
:用来将创建包含单个V类型元素的某种C集合类型(比如说 val custom1 = (i:Int) => Seq(i),当然上面用的是CompactBuffer
) -
mergeValue
:用来将单个V元素添加到C集合里去(比如说 val custom2 = (c:Seq[Int],i:Int) => c:+i) -
mergeCombiners
:用来聚合C集合(比如说 val custom3 = (c1:Seq[Int],c2:Seq[Int])=>c1++c2)
欸嘿,我们这就顺手自定义了一个简版的聚合函数,给它们起个可爱的名字(当然也可以匿名):
val miemie1 = (i:Int) => Seq(i)
val miemie2 = (c:Seq[Int],i:Int) => c:+i
val miemie3 = (c1:Seq[Int],c2:Seq[Int])=>c1++c2
groupRdd.combineByKeyWithClassTag(miemie1,miemie2,miemie3)
groupRdd.combineByKeyWithClassTag(miemie1,miemie2,miemie3).collect
执行结果如下: 是不是很有趣?
下面的Aggregator
就不往下深挖了,本篇的主要目的还是看任务执行。
快进到collect
函数:
def collect(): Array[T] = withScope {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}
很简单,就2行,可以看出第二行是个对Driver压力很大的操作,
直接把各服务器上返回的Iterator
返回成Array
,再合并成一个,如果数据较多的话显然对Driver产生很大的内存压力。
一路往下我们可以看到调用链是这样的:
RDD.collect => sparkContext.runJob (当中套了好几次runJob的多态函数)=> dagScheduler.runJob =>dagScheduler.submitJob
一路最后来到DAGScheuler
的submitJob
函数:
def submitJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): JobWaiter[U] = {
//...省略
// 真正提交任务的就这么2句:
// 先创建一个waiter,waiter的主要工作是在运行时向dagScheduler上报一些状态信息,同时控制任务工作
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
//费了九牛二虎之力,终于把任务提交上去了
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
SerializationUtils.clone(properties)))
//...省略
}
submitJob
把这个event提交到了这里,兴冲冲点进去一看,结果只有:
/**
* Put the event into the event queue. The event thread will process it later.
*/
// 注意这里的eventQueue,采用了`LinkedBlockingDeque`的线程安全双向链表并发阻塞队列
def post(event: E): Unit = {
eventQueue.put(event)
}
摔(′д` )…彡…彡,在post
和put
里也只是将获得的event提交上去而已,那么真正执行在哪里呢?
有post当然就有Receive,上面我们说的LinkedBlockingDeque
楼下就有个eventThread
此处注意的是,这段获取事件队列的线程循环代码虽然在EventLoop
里,
但实际上运行时调用的是其子类DAGSchedulerEventProcessLoop
(2.x里EventLoop
的正经子类实现,spark-core
里也就只有这么一个,其他都在testSuit里)
所以这个onReceive
其实是DAGSchedulerEventProcessLoop
里override的方法。
private[spark] val eventThread = new Thread(name) {
setDaemon(true)
override def run(): Unit = {
try {
while (!stopped.get) {
// 从队列里取出事件
val event = eventQueue.take()
try {
// 交给子类的onReceive
onReceive(event)
} catch {
//省略异常处理……
}
}
} catch {
// 省略异常处理……
}
}
}
那么 DAGSchedulerEventProcessLoop
是怎么处理的呢?
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
// 直接跳转到DAGScheduler的handleJobSubmitted方法
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
// 加上这次我们主要看的JobSubmitted,一共有14种event
// 省略其他event的处理……
}
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {
var finalStage: ResultStage = null
try {
// New stage creation may throw an exception if, for example, jobs are run on a
// HadoopRDD whose underlying HDFS files have been deleted.
// 建立最终stage
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
// 此处省略一些异常处理的消息上报,值得一提的是有2个默认值,
// 最大重试次数 maxFailureNumTasksCheck 40
// 如果有错误则会新增一个messageScheduler来重新提交一遍进eventProcessLoop
// 延迟时间timeIntervalNumTasksCheck为15秒
}
//……这里暂时省略到下半场
}
后面的调用链有点长:
DAGScheduler.createResultStage =>
DAGScheduler.getOrCreateParentStages =>
DAGScheduler.getShuffleDependencies =>
DAGScheduler.getOrCreateShuffleMapStage =>
DAGScheduler.getMissingAncestorShuffleDependencies (需要补stage的时候)
这里是真正计算Shuffle依赖关系的地方:
/**
简单翻译一下注释,这个方法只会返回直接的parent,
如果存在 A <- B <- C的依赖,则只会返回 B--C 的依赖
*/
private[scheduler] def getShuffleDependencies(
rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
// 生成3个HashSet 来进行过滤
val parents = new HashSet[ShuffleDependency[_, _, _]]
val visited = new HashSet[RDD[_]]
val waitingForVisit = new ArrayStack[RDD[_]]
waitingForVisit.push(rdd)
while (waitingForVisit.nonEmpty) {
val toVisit = waitingForVisit.pop()
if (!visited(toVisit)) {
visited += toVisit
//注意这里!
toVisit.dependencies.foreach {
case shuffleDep: ShuffleDependency[_, _, _] =>
parents += shuffleDep
case dependency =>
waitingForVisit.push(dependency.rdd)
}
}
}
parents
}
注意toVisit.dependencies
,这里同样调用了dependencies
方法,还记得吗?
如其名,在这个函数里会对每一个当前RDD依赖的每一个RDD进行遍历,求它的ShuffleDependency
,
它是Dependency
的子类,内部加入了一些shuffle专用的方法,先留个坑,
后续研究shuffle过程的时候再仔细看。
实际上就是传递了一个shuffle的依赖链返回给getOrCreateParentStages
, 对每一个shuffleDep求getOrCreateShuffleMapStage
:
/**
* 简而言之就是前面有stage就取stage,没有就补一下
*/
private def getOrCreateShuffleMapStage(
shuffleDep: ShuffleDependency[_, _, _],
firstJobId: Int): ShuffleMapStage = {
// shuffleIdToMapStage 是个HashMap[Int, ShuffleMapStage]
shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
// 注意这里的stage是 ShuffleMapStage,即函数定义的返回值
case Some(stage) =>
stage
case None =>
// 按上面的代码,DAGScheduler里肯定是个空的HashMap咯,那就开始补
// 注意这里补全部shuffle依赖,这里用了 ancestor,不确定是不是会追溯再之前的stage依赖。
// 继续往下看
// 这个函数的内容跟之前 getShuffleDependencies 的内容几乎是一样的,
// 实际上关键的寻找shuffle依赖部分用的还是getShuffleDependencies
// 只是循环查找的是之前每个RDD里是否存在shuffleStage生成。
// 这个例子里是没有的,所以这段代码就被跳过了。
getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
// Even though getMissingAncestorShuffleDependencies only returns shuffle dependencies
// that were not already in shuffleIdToMapStage, it's possible that by the time we
// get to a particular dependency in the foreach loop, it's been added to
// shuffleIdToMapStage by the stage creation process for an earlier dependency. See
// SPARK-13902 for more information.
// createShuffleMapStage函数会生成shuffleMapStage,并且扔进shuffleIdToMapStage
// 关于这部分,下次在Shuffle过程里做个例子详细研究
if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
createShuffleMapStage(dep, firstJobId)
}
}
// Finally, create a stage for the given shuffle dependency.
// 为当前shuffleDependancy生成shuffleMapStage,并且扔进shuffleIdToMapStage里
createShuffleMapStage(shuffleDep, firstJobId)
}
}
再继续往下,进入到createShuffleMapStage
,我们平时spark UI里看到的stages,就是在这里生成的:
def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = {
val rdd = shuffleDep.rdd
//……省略 BarrierStage 的检查
// 任务数是由shuffleDep的rdd分区数量决定的
val numTasks = rdd.partitions.length
// 这里又调用一次,spark团队你有多害怕漏了前置依赖啊。。。
val parents = getOrCreateParentStages(rdd, jobId)
// 直接分配stage的id,注意这是在DAGScheduler里以并发安全方式分配的
val id = nextStageId.getAndIncrement()
// 生成新的 shuffleMapStage
// 注意这里的 mapOutputTracker,这是在DAGScheduler生成的时候就同时生成的
// 这是一个 MapOutputTrackerMaster 类,对这个类说明:
// Driver-side class that keeps track of the location of the map output of a stage.
// 就是用来指明map之后产生的数据去哪里拿的,用来提供信息给reduce程序
// 然后这个creationSite也很有意思,这个creationSite就是创建这个RDD的代码文本(RDD里乱七八糟的东西还真多)
val stage = new ShuffleMapStage(
id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker)
stageIdToStage(id) = stage
shuffleIdToMapStage(shuffleDep.shuffleId) = stage
updateJobIdStageIdMaps(jobId, stage)
if (!mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
// Kind of ugly: need to register RDDs with the cache and map output tracker here
// since we can't do it in the RDD constructor because # of partitions is unknown
// 这里算官方吐槽了吧……
logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
// 去 mapOutputTracker (MapOutputTrackerMaster)里注册
mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
}
stage
}
这个函数主要修改了DAGScheduler的3个内部变量,有趣的是这3个都是HashMap:
- jobIdToStageIds:内部变量
nextJobId
产生的 JobId 和nextStageId
产生的 StageId 的对应关系, 是最后被updateJobIdStageIdMaps
递归修改的 - stageIdToStage:最先被修改的,
nextStageId
产生的 StageId 与实际Stage之间的关系 - shuffleIdToMapStage:
newShuffleId
产生的ShuffleId与MapStage的关系, 注意这个newShuffleId
是sparkContext的方法,也就是说这个shuffleId是sparkContext层面的唯一。
好了,shuffleStage终于完成了DAG生成、创建、注册的一系列过程,终于可以返回了, 此时ResultStage已经生成,回到handleJobSubmitted
的下半场:
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {
//……此处省略上半场
// resultStage已生成
// 用上面的信息创建一个ActiveJob
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
//清除RDD的分区和位置缓存信息,注意这个方法是synchronized
clearCacheLocs()
//……省略一些日志……日志里还不放心,还要调一次getMissingParentStages
val jobSubmissionTime = clock.getTimeMillis()
// 把这个ActiveJob放进 DAGScheduler 的 jobIdToActiveJob 里,后面会用到
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.setActiveJob(job)
// 注意这里,上面更新 jobIdToStageIds 的用处来了,这个测试job有2个stage
// 一个 shuffleStage <- 0,一个 resultStage <- 1
val stageIds = jobIdToStageIds(jobId).toArray
// 获取信息
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
// 先提交给 LiveListenerBus 注册一个事件,这个事件实际上会通过 trait SparkListenerBus 转发
// 实现类是 AsyncEventQueue
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
// 把最终stage提交上去
submitStage(finalStage)
}
}
接着看这个 submitStage
:
/** Submits stage, but first recursively submits any missing parents. */
private def submitStage(stage: Stage) {
// 把stage的jobId取出来,然后从上一个方法更新过的 jobIdToActiveJob 里找相应的jobId 并取出来
// 实际上就是确定当前需要跑的ActiveJob的Id
// 注意它返回的是个Option,可能没有符合条件的jobId
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
logDebug("submitStage(" + stage + ")")
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
// 我们的老朋友 getMissingParentStages,看来无时无刻不担心stage发生变化……
// 从方法注释上看这个方法的目的是递归提交stage,直到之前的父stage都跑完
// 所以可以理解每次都需要重新计算一下之前的stages的状态
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
if (missing.isEmpty) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
// 激动吧?我们兜了这么大一圈,终于看到真正提交Task的地方了
submitMissingTasks(stage, jobId.get)
} else {
for (parent <- missing) {
// 递归提交上一层stage
submitStage(parent)
}
waitingStages += stage
}
}
} else {
abortStage(stage, "No active job for stage " + stage.id, None)
}
}
来,我们看看这个 submitMissingTasks
(这个方法好长……酌情省略):
/** Called when stage's parents are available and we can now do its task. */
private def submitMissingTasks(stage: Stage, jobId: Int) {
logDebug("submitMissingTasks(" + stage + ")")
// First figure out the indexes of partition ids to compute.
// 先取stage的partition,之前说了,Stage是个抽象类,它的实现只有2种:
// ShuffleMapStage 和 ResultStage,结合测试代码看,第一次调用此方法的肯定是ShuffleMapStage
// 还记得之前创建ShuffleMapStage的构造函数吗?创建的时候传入了一个MapOutputTrackerMaster
// 由这个 MapOutputTrackerMaster 决定 ShuffleMapStage的物理位置
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
// Use the scheduling pool, job group, description, etc. from an ActiveJob associated
// with this Stage
// 获得一些配置,这个properties将来执行任务会用到
val properties = jobIdToActiveJob(jobId).properties
//把当前stage加入执行队列
runningStages += stage
// SparkListenerStageSubmitted should be posted before testing whether tasks are
// serializable. If tasks are not serializable, a SparkListenerStageCompleted event
// will be posted, which should always come after a corresponding SparkListenerStageSubmitted
// event.
stage match {
case s: ShuffleMapStage =>
// 这个 outputCommitCoordinator 可厉害了,是Spark集群启动时维护的 OutputCommitCoordinator 对象
// 它会同时跟踪 task Id 和 stage Id,用来避免某些情况下同一个 stage 的 task 执行两遍,
// 通过在内部维护着一个私有变量stageStates来决定partition上的task是否有权对这个partition提交任务输出。
// 这部分实现被修改过,在2.0和1.x时代通过一个内部变量 authorizedCommittersByStage 来实现
// 详细实现下次开个坑说吧,这里调用stageStart主要就是对task进行授权注册,表示这个stage要开始执行了。
// 注意 stageStart 这个方法是 synchronized 的。
outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
case s: ResultStage =>
outputCommitCoordinator.stageStart(
stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
}
/**
// 敲重点:getPreferredLocs
// 看名字就知道了吧,这里是对执行的最佳物理位置的计算
// 主要调用链是:getPreferredLocs => getPreferredLocsInternal => rdd.preferredLocations =>
// MapOutputTrackerMaster.getPreferredLocationsForShuffle
// 在 getPreferredLocsInternal 里把 RDD 的 partition 取出来(这里会重新检查一下分区),
// 传入 RDD 的 preferredLocations 里。
// 注意这里的RDD,我们的测试例子里第一个产生的是 ShuffledRDD ,在 ShuffledRDD 里 overwrite 了这个方法。
// ShuffledRDD 的 preferredLocations 里,
// 会把第一个 dependencies 依赖和 partition 的index 传进 MapOutputTrackerMaster (SprakEnv里get出来)
// 的 getPreferredLocationsForShuffle 函数里。
// 敲重点,优化注意!
在上述方法里有3个阈值变量,都是静态直接写死的,分别是:
1. SHUFFLE_PREF_MAP_THRESHOLD = 1000
2. SHUFFLE_PREF_REDUCE_THRESHOLD =1000
3. REDUCER_PREF_LOCS_FRACTION = 0.2
1和2代表了map和reduce的分区上限,超过这个数都会直接跳过最佳物理位置计算,
理由是超过了这2个数,计算最佳位置本身就很耗时。
3是最佳物理位置计算时每个location时大于这个阈值则视为reduce任务的首选位置,
把这个值放大的话会更倾向于本地读取数据,但如果该位置很忙的话,会带来更多调度延迟。
*/
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
stage match {
case s: ShuffleMapStage =>
partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
case s: ResultStage =>
partitionsToCompute.map { id =>
val p = s.partitions(id)
(id, getPreferredLocs(stage.rdd, p))
}.toMap
}
} catch {
case NonFatal(e) =>
stage.makeNewStageAttempt(partitionsToCompute.size)
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}
// 最后我们从 taskIdToLocations 得到了分区与最佳位置的对应关系
// 这里会注册一个 TaskMetrics 统计stage的相关信息
stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)
//……省略部分代码
// 照例向ListenerBus上报一个event
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
// TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.
// Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast
// the serialized copy of the RDD and for each task we will deserialize it, which means each
// task gets a different copy of the RDD. This provides stronger isolation between tasks that
// might modify state of objects referenced in their closures. This is necessary in Hadoop
// where the JobConf/Configuration object is not thread-safe.
// 开始了开始了,把RDD和shuffleDep序列化,然后广播出去
var taskBinary: Broadcast[Array[Byte]] = null
var partitions: Array[Partition] = null
try {
// For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
// For ResultTask, serialize and broadcast (rdd, func).
var taskBinaryBytes: Array[Byte] = null
// taskBinaryBytes and partitions are both effected by the checkpoint status. We need
// this synchronization in case another concurrent job is checkpointing this RDD, so we get a
// consistent view of both variables.
RDDCheckpointData.synchronized {
taskBinaryBytes = stage match {
// 这里遇到的问题,ParallelCollectionRDD 的话这个地方会直接把内部的data一起序列化广播出去
// HadoopRDD则会不会有data传递
case stage: ShuffleMapStage =>
JavaUtils.bufferToArray(
closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
case stage: ResultStage =>
JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
}
partitions = stage.rdd.partitions
}
// 广播序列化后的任务数据
taskBinary = sc.broadcast(taskBinaryBytes)
} catch {
// …… 序列化失败时的一些异常处理
}
val tasks: Seq[Task[_]] = try {
val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
stage match {
case stage: ShuffleMapStage =>
stage.pendingPartitions.clear()
// 对每一个Partitions新生成一个Task,
// Task是个抽象类,只有两种有效实现,跟stage一样,ShuffleMapTast和ResultTask
partitionsToCompute.map { id =>
// 这个时候用到了之前计算的task和位置的关系
val locs = taskIdToLocations(id)
val part = partitions(id)
stage.pendingPartitions += id
new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
}
case stage: ResultStage =>
// ……省略resultTask生成处理
}
} catch {
// ……省略异常处理
}
if (tasks.size > 0) {
//……省略若干无关代码
// 提交任务,taskScheduler只有一种实现:TaskSchedulerImpl
// 提交后会生成一个TaskManager,处理一些任务冲突和资源问题
// 之后挂进 SchedulableBuilder 的Pool里,这个SchedulableBuilder 是 TaskScheduler 初始化的时候生成用来管理Task的,
// 有两个我们耳熟能详的调度实现:
// FIFOSchedulableBuilder 和 FairSchedulableBuilder,
// 任务如何调度取决于配置文件里配了哪一种方案,会在实际初始化 TaskSchedulerImpl 的时候生成哪一个Builder
// 之后通过 SchedulerBackend 提交给任务池队列里,同时用 reviveOffers 唤醒 SchedulerBackend,
// 实际上就是发条消息给 RpcEndpoint (实现类NettyRpcEndpointRef)
// 这里多插一句,schedulerBackend 的实现类有2种,CoarseGrainedSchedulerBackend 和 LocalSchedulerBackend
// 顾名思义,后者就是本地模式采用的,这里用了testSuit,所以启动的也是 LocalSchedulerBackend
// 收到消息的RpcEndpoint会直接扔给postOneWayMessage(远程RPC会扔给 postToOutbox)
// 后面都是远程消息处理部分,下一篇详细研究,简单说就是投递给endpoint相对应的Inbox,
// 在Inbox里处理消息,转回给endpoint的receive函数进行处理(当然有些别的,此处不赘述),
// 其实就是转了一圈又回到了 LocalSchedulerBackend 这里。
// 而 LocalSchedulerBackend 的 receive 函数对 ReviveOffers 消息用的是 ReviveOffers() 进行处理。
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
} else {
// ……省略若干stage完成后处理工作
}
实际 LocalSchedulerBackend
调用 executor
的地方在这里: 所以你可以看到,如果用的是 local[*]
的模式,这个 executor
是启动 endpoint
时候直接 new 出来的, 下面代码里的 localexecutorid
一直都是 driver
(因为没别的类型的execuror了) 在 launchTask
函数之后就没什么特殊之处了, 构建一个 TaskRunner 的任务封装(这个类的 run
函数超复杂, 想象一下它要完成反序列化任务,进行实际的计算等等) 然后调用 executor 的线程池开始执行。
private val executor = new Executor(
localexecutorid, localExecutorHostname, SparkEnv.get, userClassPath, isLocal = true)
// …… 省略其他代码
def reviveOffers() {
val offers = IndexedSeq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores,
Some(rpcEnv.address.hostPort)))
for (task <- scheduler.resourceOffers(offers).flatten) {
freeCores -= scheduler.CPUS_PER_TASK
executor.launchTask(executorBackend, task)
}
}
其实任务执行的时候executor
还会有个 reportHeartBeat
发送一个心跳消息,去上报状态, 而全局的 NettyRpcEnv
也会有个 ask
函数定期去获取状态。
。。。总算写完了,好累。。。
凡Spark任务执行,驰车千驷,革车千乘,带甲十万,千里馈粮。 则内外之费,宾客之用,胶漆之材,车甲之奉,日费千金,然后十万之师举矣
说个鬼故事: 这才只是local[*]
参考:
- 为啥key不能是array?
- 调试Spark:
调试Spark需要拿出
魔法调试这个万能工具了,不能不感叹Spark团队代码的优秀, 为Spark代码提供了完整的测试用例,直接打开ShuffleSuite
,执行第一个。 结果调试到一半的时候发现ParallelCollectionRDD
的行为和HadoopRDD
的行为不太一样,然后半途换了一下。 如果是ParallelCollectionRDD
,在生成taskBinaryBytes
的时候会直接把数据都一起做序列化送出去, 如果是HadoopRDD
,则只会把URI和相关信息序列化送出去。