RDD构造

来,先俗套一下

什么是RDD?

  1. 什么是RDD? 按官方文档的定义,RDD is a resilient distributed dataset, 所谓"弹性分布式数据集
  2. RDD 代码位于 org.apache.spark.rdd.RDD,本身是个抽象类,RDD三个最重要属性是 immutable(不可变)partitioned(分区)can be operated on in parallel(可被并行操作)
  3. 抽象类RDD定义了一些必须实现的方法,例如map,filter,presist等等。
  4. 任意一个RDD都应当具有以下五个基本特性:
    1. 可分区性,包含一个分区(原文patition)列表(我总觉得2.x以后提到分片(split/slices)和分区(partition)的时候有点微妙的区别,因为map操作之后的RDD从MapRdd变成了MapPartitionsRDD)
    2. 计算每一个分片(你看,来了吧,原文用的是split)的compute方法(有趣的是compute的参数之一split的类型是Partition)
    3. 一个依赖其他RDD的列表(RDD血缘关系)
    4. (可选)一个针对(key,value)类RDD的分区器(Partitioner),(例如hash-partitioned RDD,用hash作为分区方案)
    5. (可选)记录首选对分片执行计算的位置列表(感觉这个特性主要还是为了在HDFS上执行计算而进行的)
  5. RDD的创建方式:There are two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat. 除了new 一个 RDD 之外的主要途径是用SparkContext提供的parallelize方法把已存在的数据集转换为RDD,或者读取符合Hadoop InputFormat的数据集(多数情况下用这个)。

RDD的创建过程

为了搞清楚RDD的创建过程,我们先来看一下这两种方法都做了什么(官方文档的2个生成例子)

  1. sc.parallelize方法

    该方法需要注意的有两点:

    1. Seq是可变集合(mutiable)的话,由于该方法是lazy的,如果实际执行之前改变了集合,则执行会按更新后的数据进行计算。
    2. emptyRDDparallelize(Seq[T]())这样确定的类型T的方法去生成空RDD(注释里说的,原因没有说,试了一下,DAGScheduler报错了,目测应该是无具体类型的RDD无法生成DAG,如果带有类型的话,就会检测到UnsupportedOperationException,告诉你 empty collection。) 方法就2行,检测sparkContext没有停止(感觉2.x之后很多方法执行之前都加了这个代码),参数里有个默认分区数量,可以用 spark.default.parallelism 来指定。
       def parallelize[T: ClassTag](
           seq: Seq[T],
           numSlices: Int = defaultParallelism): RDD[T] = withScope {
         assertNotStopped()
         new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
       }
    

    ParallelCollectionRDD 的声明中可以看到:

     private[spark] class ParallelCollectionRDD[T: ClassTag](
     sc: SparkContext,
     @transient private val data: Seq[T],
     numSlices: Int,
     locationPrefs: Map[Int, Seq[String]])
     extends RDD[T](sc, Nil)
    

    它继承的RDD构造器是没有deps的那个(抽象类RDD的构造方法是需要一个sparkContext,一个依赖列表deps: Seq[Dependency[_]]),实际上也没有依赖,再往上就直接是scala collections(Seq)了。

    注意到它重载了父类RDD的getPartions方法:

     // 父类注释,本方法要求满足:
     // `rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }`的对应表达式
     override def getPartitions: Array[Partition] = {
     // 调用伴生对象的slice方法获得Seq数据的切片列表,返回一个Seq[Seq[T]]的列表,其实就是把整个Seq的内容按numSlices切分为多段,
     // 当中有一些对Range和NumericRange对象的不同处理方式(不过slice内部方法position输出的是一个Iterator[(start:Int, end:Int)]
     // 所以有点奇怪numericRange处理的必要性。。。) 
     val slices = ParallelCollectionRDD.slice(data, numSlices).toArray
     // 真正完成分区操作的是返回一个 ParallelCollectionPartition(Partition的子类) 的Array[Partition]对象,
     // 到Partition层面就涉及spark对物理block的操作了,等看物理存储的时候再展开吧
     slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray
     }
    
  2. val textRdd = sc.textFile("data.txt")

    这个方法也是一样,位于SparkContext类:

     // 参数 minPartitions 同样跟着 `spark.default.parallelism` 
     def textFile(
       path: String,
       minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
     // 先判一下sc还活着吗?
     assertNotStopped()
    
     // 此处立刻调用了一个map,指定了路径、输入的类型[K,V]、(输出的)keyClass的类型、
     // (输出的)valueClass的类型,以及最小分区数量。
     // 参考下面hadoopFile的方法签名,去掉path,和minPartitions。加上inputFormatClass是个
     // InputFormat[K,V],相当于输入是个[K Class,V Class],加上keyClass和valueClass,这四个是不是很熟悉?
     // 想想Hadoop MapReduce过程,extends Mapper<输入key,输入的value,输出的key,输出的value>
     hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
       minPartitions).map(pair => pair._2.toString).setName(path)
     }
    

    接着看,hadoopFile也在SparkContext类里:

     // hadoopFile 的方法签名,指定了路径、输入的类型、Key的类型、Value的类型,以及最小分区数量
     def hadoopFile[K, V](
       path: String,
       inputFormatClass: Class[_ <: InputFormat[K, V]],
       keyClass: Class[K],
       valueClass: Class[V],
       minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
     // 惯例探测一下sc是不是还活着
     assertNotStopped()
    
     // This is a hack to enforce loading hdfs-site.xml.
     // See SPARK-11227 for details.
     FileSystem.getLocal(hadoopConfiguration)
    
     // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
     // 把hadoopConfigration序列化,广播一下提高效率
     // 顺便一提,broadcast是个比较重的操作,从需要序列化就可以看出来,
     // RDD不能直接被广播,必须做完collect动作以后把实际内容广播出去。
     // broadcast是把值直接扔给broadcastManager,发送给全部node,所以要是实际物理大小太大的话就有点糟糕
     val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
     val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
     new HadoopRDD(
       this,
       confBroadcast,
       Some(setInputPathsFunc),
       inputFormatClass,
       keyClass,
       valueClass,
       minPartitions).setName(path)
     }
    

    方法里新生成了一个HadoopRDD(org.apache.spark.rdd.HadoopRDD)返回,继续往下之前有2点值得注意: 1. withScope方法,稍微Google了一下,主要是为了标示RDD操作的作用域,获取一些信息,方便DAG进行可视化,详情参考这里 2. 强制加载 hdfs-site.xml 的hack,看了一下 SPARK-11227,是一个跟hadoop HA有关的bug,HA时取不到hostname的问题(以后做相关开发的时候可能需要注意一下)。

    然后我们下一节继续返回来看生成HadoopRdd之后的map操作。

RDD的转换过程

上面我们看到了五个特性中的可分区性依赖列表,接下来具体看一看RDD的转换过程。

回到SparkContexttextFile方法,接着刚才生成的HadoopRDD之后的map操作,HadoopRDD中并没有overwrite map方法,因此直接继承了RDD中的map方法。

  // 方法字面上的意思是要求传入一个将T转换为U的函数,最后返回一个类型为U的RDD
  def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    // 好像以前没有clean这个东西,用来清除一些用不到的变量之类的,同时主动检查一下f能不能序列化
    val cleanF = sc.clean(f)
    // 定义了这样一个方法:入参是 this,(TaskContext, partition index, iterator) ,方法体是对迭代器进行map操作
    // 此处的map是scala 的 Iterator 对象的map
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
  }

实际上入参就是把经过clean()的f方法二次封装成了(注意这个f的形式)

// 即从原始的:
f:T=>U 
// 变成了 f 分区执行版,注意这个map,它是 Iterator的方法,而不是RDD的方法:
f:(context:TaskContext,pid:Int,iter:Iterator[T]) =>  Iterator[T].map(cleanF) 

的形式,作为 MapPartitionsRDD的入参。 对照MapPartitionsRDD 的方法签名:

private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
    var prev: RDD[T],
    f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, partition index, iterator)
    preservesPartitioning: Boolean = false,
    isFromBarrier: Boolean = false,
    isOrderSensitive: Boolean = false)
  extends RDD[U](prev)
实参 形参
this prev
(context,pid,iter) f
f的实参 f的形参
context TaskContext
pid Int (partition index)
iter Iterator[T]

可以看到它继承了RDD[U](prev)的构造方法,而上面map方法中入参的 this 也就是 HadoopRDD,就是被MapPartitionRDD当作prev, 同时f的主要工作就是把一个T类型的迭代器 Iterator[T] 变换为一个U类型的迭代器 Iterator[U]

此处划个重点,它继承的父类RDD构造器是下面这个:

  // MapPartitionsRDD 是个OneToOne的依赖,参考 Dependency 抽象类可以看到它继承了NarrowDependency,
  // 属于窄依赖的一种。
  def this(@transient oneParent: RDD[_]) =
    this(oneParent.context, List(new OneToOneDependency(oneParent)))

在生成新MapPartitionRDD的时候把前一个RDD的sc和前一个RDD的依赖关系传入了。

说到这里,有没有发现生成MapPartitionsRDD的时候只是重新封装了一个方法, 当时并没有TaskContextPartition.index(作为traitPartition有个声明为Int类型的index方法), 有此疑问的话可以提前看看MapPartitionRDD里被重载computer方法,之前我们说了, RDD其实是延迟计算的,所以这个时候只是在建立RDD之间的血缘关系,还没到实际计算的时候。

好,我们继续看下去,刚才我们说了,任意一个RDD,都应该有五个特征,我们万丈来看一下这个新生的MapPartitionsRDD的五个特征在哪里:

  1. 可分区性

    可分区性依然体现在getPartitions方法里,很简单,就一句,取了firstParent的分区:

       override def getPartitions: Array[Partition] = firstParent[T].partitions
    

    这个firstParent何许人也?我们回RDD去认识一下:

       /** Returns the first parent RDD */
     protected[spark] def firstParent[U: ClassTag]: RDD[U] = {
         dependencies.head.rdd.asInstanceOf[RDD[U]]
     }
    

    dependencies也是个方法:

    
     final def dependencies: Seq[Dependency[_]] = {
         // 会先从checkpoint里捞一下,如果没有的话就直接就再去dependencises_里捞一下
       checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {
         if (dependencies_ == null) {
             //dependencies_ 里也没有,实在捞不着的时候再用getDependencies挖一下
           dependencies_ = getDependencies
         }
         dependencies_
       }
     }
    

    而这个getDependencies也很有意思,就直接去取了 deps,是不是有点眼熟?传送门

       protected def getDependencies: Seq[Dependency[_]] = deps
    

    所以对 MapPartitionsRDD 来说,firstParent就是依赖链列表里的第一个RDD, 当时new的时候用的是HadoopRDD对吧?所以找到的这个firstParent就是HadoopRdd

    通过上面的调用过程可以发现,MapPartitionsRDD 的分区列表的寻找其实是个方法的调用链, 对当前RDD做 getPartitions 获取分区的操作,其实是通过一系列的方法套娃过程计算出来的 (这是在没有生成checkPointRDD的情况下,有checkpointRDD的话会直接从当中某个阶段求起, 这个下次单独挖checkpoint()方法的时候再说):

     MapPartitionsRDD.getPartitions() {
         //
         MapPartitionsRDD.firstParent(){
             //
             MapPartitionsRDD.dependencies(){
                 //一路调用链到此处,发现就是deps,参考过Dependency的实现就知道了,
                 // 实际上Dependency就是rdd外面又套了个壳
                 MapPartitionsRDD.getDependencies() 
             }
         }
     }
    

    所以我们可以得出一个这样的结论: MapPartitionsRDD的依赖列表里就只有一个元素,就是这个从上一个RDD生成的Seq[Dependency[_]], 不仅MapPartitionsRDD如此,采用此类构造器,没有overwrite过相关方法的RDD皆然,所以想见大部分RDD实现都是One2One的窄依赖。

    最后的partitions 就简单了,直接调用了返回的HadoopRDD的partitions方法, 但实际上HadoopRDD里并没有overwrite这个方法,所以还是RDD本身的partitions方法:

     final def partitions: Array[Partition] = {
         checkpointRDD.map(_.partitions).getOrElse {
           if (partitions_ == null) {
             partitions_ = getPartitions //所以最后用的还是HadoopRDD的getPartitions方法
             partitions_.zipWithIndex.foreach { case (partition, index) =>
               require(partition.index == index,
                 s"partitions($index).partition == ${partition.index}, but it should equal $index")
             }
           }
           partitions_
         }
       }
    

    HadoopRDDgetPartitions方法就不继续分析了, 无非就是生成了一堆HadoopPartition,感觉以后挖分区机制和物理存储的时候再展开比较好。

  2. 可用于计算的 compute 函数

    MapPartitionsRDD 重载了这个方法,注意下面这个 f此处回忆杀):

       override def compute(split: Partition, context: TaskContext): Iterator[U] =
         f(context, split.index, firstParent[T].iterator(split, context))
     }
    

    你会发现RDD类里没有内置的Partition类型变量和TaskContext变量,这些都是计算时传入,到任务执行时候再执行。 firstParent上一小节我们已经看过了,实际上它做了一个f方法套娃, 把我们自定义f方法转换成了适合分布式执行的新f方法, 注意这个方法是作用在分区(Partition)上,而非每一条数据上, 简而言之这个compute的作用就是按Task+Partition的方式把自定义方法分配下去, 而在Partition内部执行的时候,将会利用scala本身的并发特性,用map函数分配到每一条数据上。

    因此真正执行套娃函数的地方就是这个compute,注意入参最后一项,它用的是我们已经认识的firstParent的迭代器, 在这个例子里是HadoopRDD(实际上HadoopRDD里并没有overwrite iterator方法,意即使用的是RDD的原始iterator方法:

           final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
            if (storageLevel != StorageLevel.NONE) {
                //有cache拿cache,没cache直接算,实际上最后就是返回一个Iterator
              getOrCompute(split, context)
            } else {
                //从checkpoint里拿
              computeOrReadCheckpoint(split, context)
            }
          }
    

    实际上跟依赖列表类似,操作compute的时候是一层层迭代器套迭代器,然后一路返回。 所以我们想象个栗子:如果在这个textFile方法(我们把里面2步标记为new_hadoopRddmapRdd1.map(f1)), 后面再套上一个mapRdd2.map(f2)函数,那么真正进行compute的时候,mapRdd2compute会先找到依赖mapRdd1去执行compute, 发现还要继续往前(回忆一下我们的老朋友FirstParent),于是再往前得到new_hadoopRDD, 执行compute,一路将迭代器返回:new_hadoopRDD.compute() => mapRDD1.compute => mapRDD2.compute iterator方法再往下就涉及到org.apache.spark.storage.BlockResult 部分,也即Spark的物理存储部分,本篇就不再深入了。

  3. RDD依赖列表(血缘关系)

    分析分区的时候仔细分析过了,是个长长的函数调用链套娃:

    • HadoopRDD.iter
      • MapPartitionsRDD_1.computeOrReadCheckpoint
        • MapPartitionsRDD_1.compute
          • MapPartitionsRDD_2.computeOrReadCheckpoint
            • MapPartitionsRDD_2.compute

45的主题其实都比较大了,放到以后挖分区机制和物理存储的时候再看吧


依赖链可以动手验证一下: RDD.dependencies,这几个都是公开函数。

  val textRdd = sc.textFile("data.txt")
  val mapRdd = textRdd.map(x=>x+"1")
  val map2Rdd = mapRdd.map(x=>x+"2")
  textRdd.dependencies
  mapRdd.dependencies
  map2Rdd.dependencies

结果: avatar


后记:本篇花了一个多礼拜写完,感觉才刚刚挖了Spark这个怪物的冰山一角。 下一篇想写写Spark的基础框架,也就是spark用netty写的分布式框架内容,或者shuffle过程。