概述
最近在刷刷算法题,看到经典的树搜索的算法,正巧之前记得 Spark RDD 中有一处利用 DFS 来判断 RDD 依赖关系的代码,因此专门拿出来分析一下。
代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
/**
* Return the ancestors of the given RDD that are related to it only through a sequence of
* narrow dependencies. This traverses the given RDD's dependency tree using DFS, but maintains
* no ordering on the RDDs returned.
*/
private[spark] def getNarrowAncestors: Seq[RDD[_]] = {
val ancestors = new mutable.HashSet[RDD[_]]
def visit(rdd: RDD[_]): Unit = {
val narrowDependencies = rdd.dependencies.filter(_.isInstanceOf[NarrowDependency[_]])
val narrowParents = narrowDependencies.map(_.rdd)
val narrowParentsNotVisited = narrowParents.filterNot(ancestors.contains)
narrowParentsNotVisited.foreach { parent =>
ancestors.add(parent)
visit(parent)
}
}
visit(this)
// In case there is a cycle, do not include the root itself
ancestors.filterNot(_ == this).toSeq
}
|
分析
代码很清晰,就是用递归的方式写完这个寻找 RDD 的 Narrow 祖先。
1
|
val ancestors = new mutable.HashSet[RDD[_]]
|
ancestors 是一个 Set 数据结构,用来存放已经查找过的 父 RDD。
narrowDependencies, narrowParents, narrowParentsNotVisited 三个变量,按照名字是很容易理解的,分别是找到 RDD 的窄依赖,窄依赖的父依赖以及没有被访问过的窄依赖。
最后这一段,将没有被访问过的父依赖,依次加入 ancetors 表示已经访问过了。
1
2
3
4
|
narrowParentsNotVisited.foreach { parent =>
ancestors.add(parent)
visit(parent)
}
|
有心的读者会发现最后一行注释。
In case there is a cycle, do not include the root itself
大意就是如果如果不去除根节点 RDD,那么 narrowParentsNotVisited
是不能被结束的,意思就是相乘了环而导致循环无法结束。
Test Case
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
// org/apache/spark/rdd/RDDSuite.scala
test("getNarrowAncestors") {
val rdd1 = sc.parallelize(1 to 100, 4)
val rdd2 = rdd1.filter(_ % 2 == 0).map(_ + 1)
val rdd3 = rdd2.map(_ - 1).filter(_ < 50).map(i => (i, i))
val rdd4 = rdd3.reduceByKey(_ + _)
val rdd5 = rdd4.mapValues(_ + 1).mapValues(_ + 2).mapValues(_ + 3)
val ancestors1 = rdd1.getNarrowAncestors
val ancestors2 = rdd2.getNarrowAncestors
val ancestors3 = rdd3.getNarrowAncestors
val ancestors4 = rdd4.getNarrowAncestors
val ancestors5 = rdd5.getNarrowAncestors
// Simple dependency tree with a single branch
assert(ancestors1.size === 0)
assert(ancestors2.size === 2)
assert(ancestors2.count(_ === rdd1) === 1)
assert(ancestors2.count(_.isInstanceOf[MapPartitionsRDD[_, _]]) === 1)
assert(ancestors3.size === 5)
assert(ancestors3.count(_.isInstanceOf[MapPartitionsRDD[_, _]]) === 4)
// Any ancestors before the shuffle are not considered
assert(ancestors4.size === 0)
assert(ancestors4.count(_.isInstanceOf[ShuffledRDD[_, _, _]]) === 0)
assert(ancestors5.size === 3)
assert(ancestors5.count(_.isInstanceOf[ShuffledRDD[_, _, _]]) === 1)
assert(ancestors5.count(_ === rdd3) === 0)
assert(ancestors5.count(_.isInstanceOf[MapPartitionsRDD[_, _]]) === 2)
}
|
建议可以跑一下 RDDSuite.scala 测试类中的关于 getNarrowAncestors
方法。很显然,针对第二部分的情况,窄依赖只跟踪到 shuffle 之前,也就是一个 RDD 血缘遇到 shuffle 操作,那么窄依赖的依赖链条就会重新计数。
警告
本文最后更新于 2017年2月1日,文中内容可能已过时,请谨慎参考。