RDD构造
来,先俗套一下
什么是RDD?
- 什么是RDD? 按官方文档的定义,RDD is a resilient distributed dataset, 所谓"弹性分布式数据集
- RDD 代码位于
org.apache.spark.rdd.RDD
,本身是个抽象类,RDD三个最重要属性是 immutable(不可变)、partitioned(分区)、can be operated on in parallel(可被并行操作)。 - 抽象类RDD定义了一些必须实现的方法,例如
map
,filter
,presist
等等。 - 任意一个RDD都应当具有以下五个基本特性:
- 可分区性,包含一个分区(原文patition)列表(我总觉得2.x以后提到分片(split/slices)和分区(partition)的时候有点微妙的区别,因为
map
操作之后的RDD从MapRdd
变成了MapPartitionsRDD
) - 计算每一个分片(你看,来了吧,原文用的是split)的compute方法(有趣的是compute的参数之一split的类型是Partition)
- 一个依赖其他RDD的列表(RDD血缘关系)
- (可选)一个针对(key,value)类RDD的分区器(Partitioner),(例如hash-partitioned RDD,用hash作为分区方案)
- (可选)记录首选对分片执行计算的位置列表(感觉这个特性主要还是为了在HDFS上执行计算而进行的)
- 可分区性,包含一个分区(原文patition)列表(我总觉得2.x以后提到分片(split/slices)和分区(partition)的时候有点微妙的区别,因为
- RDD的创建方式:There are two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat. 除了new 一个 RDD 之外的主要途径是用
SparkContext
提供的parallelize
方法把已存在的数据集转换为RDD,或者读取符合Hadoop InputFormat的数据集(多数情况下用这个)。
RDD的创建过程
为了搞清楚RDD的创建过程,我们先来看一下这两种方法都做了什么(官方文档的2个生成例子)
-
sc.parallelize
方法该方法需要注意的有两点:
- Seq是可变集合(mutiable)的话,由于该方法是lazy的,如果实际执行之前改变了集合,则执行会按更新后的数据进行计算。
- 用
emptyRDD
和parallelize(Seq[T]())
这样确定的类型T的方法去生成空RDD(注释里说的,原因没有说,试了一下,DAGScheduler报错了,目测应该是无具体类型的RDD无法生成DAG,如果带有类型的话,就会检测到UnsupportedOperationException
,告诉你empty collection
。) 方法就2行,检测sparkContext没有停止(感觉2.x之后很多方法执行之前都加了这个代码),参数里有个默认分区数量,可以用spark.default.parallelism
来指定。
def parallelize[T: ClassTag]( seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = withScope { assertNotStopped() new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]()) }
从
ParallelCollectionRDD
的声明中可以看到:private[spark] class ParallelCollectionRDD[T: ClassTag]( sc: SparkContext, @transient private val data: Seq[T], numSlices: Int, locationPrefs: Map[Int, Seq[String]]) extends RDD[T](sc, Nil)
它继承的RDD构造器是没有deps的那个(抽象类RDD的构造方法是需要一个sparkContext,一个依赖列表
deps: Seq[Dependency[_]]
),实际上也没有依赖,再往上就直接是scala collections(Seq)了。注意到它重载了父类RDD的
getPartions
方法:// 父类注释,本方法要求满足: // `rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }`的对应表达式 override def getPartitions: Array[Partition] = { // 调用伴生对象的slice方法获得Seq数据的切片列表,返回一个Seq[Seq[T]]的列表,其实就是把整个Seq的内容按numSlices切分为多段, // 当中有一些对Range和NumericRange对象的不同处理方式(不过slice内部方法position输出的是一个Iterator[(start:Int, end:Int)] // 所以有点奇怪numericRange处理的必要性。。。) val slices = ParallelCollectionRDD.slice(data, numSlices).toArray // 真正完成分区操作的是返回一个 ParallelCollectionPartition(Partition的子类) 的Array[Partition]对象, // 到Partition层面就涉及spark对物理block的操作了,等看物理存储的时候再展开吧 slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray }
-
val textRdd = sc.textFile("data.txt")
这个方法也是一样,位于
SparkContext
类:// 参数 minPartitions 同样跟着 `spark.default.parallelism` def textFile( path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = withScope { // 先判一下sc还活着吗? assertNotStopped() // 此处立刻调用了一个map,指定了路径、输入的类型[K,V]、(输出的)keyClass的类型、 // (输出的)valueClass的类型,以及最小分区数量。 // 参考下面hadoopFile的方法签名,去掉path,和minPartitions。加上inputFormatClass是个 // InputFormat[K,V],相当于输入是个[K Class,V Class],加上keyClass和valueClass,这四个是不是很熟悉? // 想想Hadoop MapReduce过程,extends Mapper<输入key,输入的value,输出的key,输出的value> hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minPartitions).map(pair => pair._2.toString).setName(path) }
接着看,
hadoopFile
也在SparkContext
类里:// hadoopFile 的方法签名,指定了路径、输入的类型、Key的类型、Value的类型,以及最小分区数量 def hadoopFile[K, V]( path: String, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope { // 惯例探测一下sc是不是还活着 assertNotStopped() // This is a hack to enforce loading hdfs-site.xml. // See SPARK-11227 for details. FileSystem.getLocal(hadoopConfiguration) // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it. // 把hadoopConfigration序列化,广播一下提高效率 // 顺便一提,broadcast是个比较重的操作,从需要序列化就可以看出来, // RDD不能直接被广播,必须做完collect动作以后把实际内容广播出去。 // broadcast是把值直接扔给broadcastManager,发送给全部node,所以要是实际物理大小太大的话就有点糟糕 val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration)) val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path) new HadoopRDD( this, confBroadcast, Some(setInputPathsFunc), inputFormatClass, keyClass, valueClass, minPartitions).setName(path) }
方法里新生成了一个
HadoopRDD
(org.apache.spark.rdd.HadoopRDD
)返回,继续往下之前有2点值得注意: 1. withScope方法,稍微Google了一下,主要是为了标示RDD操作的作用域,获取一些信息,方便DAG进行可视化,详情参考这里 2. 强制加载hdfs-site.xml
的hack,看了一下 SPARK-11227,是一个跟hadoop HA有关的bug,HA时取不到hostname的问题(以后做相关开发的时候可能需要注意一下)。然后我们下一节继续返回来看生成
HadoopRdd
之后的map操作。
RDD的转换过程
上面我们看到了五个特性中的可分区性和依赖列表,接下来具体看一看RDD的转换过程。
回到SparkContext
的textFile
方法,接着刚才生成的HadoopRDD
之后的map操作,HadoopRDD
中并没有overwrite
map
方法,因此直接继承了RDD中的map
方法。
// 方法字面上的意思是要求传入一个将T转换为U的函数,最后返回一个类型为U的RDD
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
// 好像以前没有clean这个东西,用来清除一些用不到的变量之类的,同时主动检查一下f能不能序列化
val cleanF = sc.clean(f)
// 定义了这样一个方法:入参是 this,(TaskContext, partition index, iterator) ,方法体是对迭代器进行map操作
// 此处的map是scala 的 Iterator 对象的map
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
实际上入参就是把经过clean()的f方法二次封装成了(注意这个f的形式)
// 即从原始的:
f:T=>U
// 变成了 f 分区执行版,注意这个map,它是 Iterator的方法,而不是RDD的方法:
f:(context:TaskContext,pid:Int,iter:Iterator[T]) => Iterator[T].map(cleanF)
的形式,作为 MapPartitionsRDD的入参。 对照MapPartitionsRDD
的方法签名:
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
var prev: RDD[T],
f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator)
preservesPartitioning: Boolean = false,
isFromBarrier: Boolean = false,
isOrderSensitive: Boolean = false)
extends RDD[U](prev)
实参 | 形参 |
---|---|
this | prev |
(context,pid,iter) | f |
f的实参 | f的形参 |
context | TaskContext |
pid | Int (partition index) |
iter | Iterator[T] |
可以看到它继承了RDD[U](prev)
的构造方法,而上面map方法中入参的 this
也就是 HadoopRDD
,就是被MapPartitionRDD
当作prev
, 同时f
的主要工作就是把一个T类型的迭代器 Iterator[T]
变换为一个U类型的迭代器 Iterator[U]
。
此处划个重点,它继承的父类RDD构造器是下面这个:
// MapPartitionsRDD 是个OneToOne的依赖,参考 Dependency 抽象类可以看到它继承了NarrowDependency,
// 属于窄依赖的一种。
def this(@transient oneParent: RDD[_]) =
this(oneParent.context, List(new OneToOneDependency(oneParent)))
在生成新MapPartitionRDD
的时候把前一个RDD的sc和前一个RDD的依赖关系传入了。
说到这里,有没有发现生成MapPartitionsRDD
的时候只是重新封装了一个方法, 当时并没有TaskContext
和Partition.index
(作为trait
的Partition
有个声明为Int
类型的index
方法), 有此疑问的话可以提前看看MapPartitionRDD
里被重载computer
方法,之前我们说了, RDD其实是延迟计算的,所以这个时候只是在建立RDD之间的血缘关系,还没到实际计算的时候。
好,我们继续看下去,刚才我们说了,任意一个RDD,都应该有五个特征,我们万丈来看一下这个新生的MapPartitionsRDD
的五个特征在哪里:
-
可分区性:
可分区性依然体现在
getPartitions
方法里,很简单,就一句,取了firstParent
的分区:override def getPartitions: Array[Partition] = firstParent[T].partitions
这个
firstParent
何许人也?我们回RDD
去认识一下:/** Returns the first parent RDD */ protected[spark] def firstParent[U: ClassTag]: RDD[U] = { dependencies.head.rdd.asInstanceOf[RDD[U]] }
dependencies
也是个方法:final def dependencies: Seq[Dependency[_]] = { // 会先从checkpoint里捞一下,如果没有的话就直接就再去dependencises_里捞一下 checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse { if (dependencies_ == null) { //dependencies_ 里也没有,实在捞不着的时候再用getDependencies挖一下 dependencies_ = getDependencies } dependencies_ } }
而这个
getDependencies
也很有意思,就直接去取了deps
,是不是有点眼熟?传送门protected def getDependencies: Seq[Dependency[_]] = deps
所以对
MapPartitionsRDD
来说,firstParent
就是依赖链列表里的第一个RDD, 当时new的时候用的是HadoopRDD
对吧?所以找到的这个firstParent就是HadoopRdd
。通过上面的调用过程可以发现,
MapPartitionsRDD
的分区列表的寻找其实是个方法的调用链, 对当前RDD做getPartitions
获取分区的操作,其实是通过一系列的方法套娃过程计算出来的 (这是在没有生成checkPointRDD
的情况下,有checkpointRDD的话会直接从当中某个阶段求起, 这个下次单独挖checkpoint()
方法的时候再说):MapPartitionsRDD.getPartitions() { // MapPartitionsRDD.firstParent(){ // MapPartitionsRDD.dependencies(){ //一路调用链到此处,发现就是deps,参考过Dependency的实现就知道了, // 实际上Dependency就是rdd外面又套了个壳 MapPartitionsRDD.getDependencies() } } }
所以我们可以得出一个这样的结论:
MapPartitionsRDD
的依赖列表里就只有一个元素,就是这个从上一个RDD生成的Seq[Dependency[_]]
, 不仅MapPartitionsRDD
如此,采用此类构造器,没有overwrite
过相关方法的RDD皆然,所以想见大部分RDD实现都是One2One的窄依赖。最后的
partitions
就简单了,直接调用了返回的HadoopRDD
的partitions方法, 但实际上HadoopRDD
里并没有overwrite这个方法,所以还是RDD本身的partitions
方法:final def partitions: Array[Partition] = { checkpointRDD.map(_.partitions).getOrElse { if (partitions_ == null) { partitions_ = getPartitions //所以最后用的还是HadoopRDD的getPartitions方法 partitions_.zipWithIndex.foreach { case (partition, index) => require(partition.index == index, s"partitions($index).partition == ${partition.index}, but it should equal $index") } } partitions_ } }
HadoopRDD
的getPartitions
方法就不继续分析了, 无非就是生成了一堆HadoopPartition
,感觉以后挖分区机制和物理存储的时候再展开比较好。 -
MapPartitionsRDD
重载了这个方法,注意下面这个f
(此处回忆杀):override def compute(split: Partition, context: TaskContext): Iterator[U] = f(context, split.index, firstParent[T].iterator(split, context)) }
你会发现RDD类里没有内置的Partition类型变量和TaskContext变量,这些都是计算时传入,到任务执行时候再执行。
firstParent
上一小节我们已经看过了,实际上它做了一个f
方法套娃, 把我们自定义f
方法转换成了适合分布式执行的新f
方法, 注意这个方法是作用在分区(Partition)上,而非每一条数据上, 简而言之这个compute
的作用就是按Task
+Partition
的方式把自定义方法分配下去, 而在Partition
内部执行的时候,将会利用scala本身的并发特性,用map
函数分配到每一条数据上。因此真正执行套娃函数的地方就是这个
compute
,注意入参最后一项,它用的是我们已经认识的firstParent
的迭代器, 在这个例子里是HadoopRDD
(实际上HadoopRDD
里并没有overwriteiterator
方法,意即使用的是RDD的原始iterator
方法:final def iterator(split: Partition, context: TaskContext): Iterator[T] = { if (storageLevel != StorageLevel.NONE) { //有cache拿cache,没cache直接算,实际上最后就是返回一个Iterator getOrCompute(split, context) } else { //从checkpoint里拿 computeOrReadCheckpoint(split, context) } }
实际上跟依赖列表类似,操作
compute
的时候是一层层迭代器套迭代器,然后一路返回。 所以我们想象个栗子:如果在这个textFile
方法(我们把里面2步标记为new_hadoopRdd
和mapRdd1.map(f1)
), 后面再套上一个mapRdd2.map(f2)
函数,那么真正进行compute
的时候,mapRdd2
的compute
会先找到依赖mapRdd1
去执行compute
, 发现还要继续往前(回忆一下我们的老朋友FirstParent
),于是再往前得到new_hadoopRDD
, 执行compute
,一路将迭代器返回:new_hadoopRDD.compute() => mapRDD1.compute => mapRDD2.compute
iterator
方法再往下就涉及到org.apache.spark.storage.BlockResult
部分,也即Spark的物理存储部分,本篇就不再深入了。 -
分析分区的时候仔细分析过了,是个长长的函数调用链套娃:
- HadoopRDD.iter
- MapPartitionsRDD_1.computeOrReadCheckpoint
- MapPartitionsRDD_1.compute
- MapPartitionsRDD_2.computeOrReadCheckpoint
- MapPartitionsRDD_2.compute
- …
- MapPartitionsRDD_2.compute
- MapPartitionsRDD_2.computeOrReadCheckpoint
- MapPartitionsRDD_1.compute
- MapPartitionsRDD_1.computeOrReadCheckpoint
- HadoopRDD.iter
4和5的主题其实都比较大了,放到以后挖分区机制和物理存储的时候再看吧
依赖链可以动手验证一下: RDD.dependencies
,这几个都是公开函数。
val textRdd = sc.textFile("data.txt")
val mapRdd = textRdd.map(x=>x+"1")
val map2Rdd = mapRdd.map(x=>x+"2")
textRdd.dependencies
mapRdd.dependencies
map2Rdd.dependencies
结果:
后记:本篇花了一个多礼拜写完,感觉才刚刚挖了Spark这个怪物的冰山一角。 下一篇想写写Spark的基础框架,也就是spark用netty写的分布式框架内容,或者shuffle过程。