目录

Spark-RDD依赖的深度优先搜索

概述

最近在刷刷算法题,看到经典的树搜索的算法,正巧之前记得 Spark RDD 中有一处利用 DFS 来判断 RDD 依赖关系的代码,因此专门拿出来分析一下。

代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* Return the ancestors of the given RDD that are related to it only through a sequence of
* narrow dependencies. This traverses the given RDD's dependency tree using DFS, but maintains
* no ordering on the RDDs returned.
*/
private[spark] def getNarrowAncestors: Seq[RDD[_]] = {
    val ancestors = new mutable.HashSet[RDD[_]]
    
    def visit(rdd: RDD[_]): Unit = {
      val narrowDependencies = rdd.dependencies.filter(_.isInstanceOf[NarrowDependency[_]])
      val narrowParents = narrowDependencies.map(_.rdd)
      val narrowParentsNotVisited = narrowParents.filterNot(ancestors.contains)
      narrowParentsNotVisited.foreach { parent =>
        ancestors.add(parent)
        visit(parent)
      }
    }
    
    visit(this)
    
    // In case there is a cycle, do not include the root itself
    ancestors.filterNot(_ == this).toSeq
}

分析

代码很清晰,就是用递归的方式写完这个寻找 RDD 的 Narrow 祖先。

1
val ancestors = new mutable.HashSet[RDD[_]]

ancestors 是一个 Set 数据结构,用来存放已经查找过的 父 RDD。

narrowDependencies, narrowParents, narrowParentsNotVisited 三个变量,按照名字是很容易理解的,分别是找到 RDD 的窄依赖,窄依赖的父依赖以及没有被访问过的窄依赖。

最后这一段,将没有被访问过的父依赖,依次加入 ancetors 表示已经访问过了。

1
2
3
4
narrowParentsNotVisited.foreach { parent =>
    ancestors.add(parent)
    visit(parent)
}

有心的读者会发现最后一行注释。

In case there is a cycle, do not include the root itself

大意就是如果如果不去除根节点 RDD,那么 narrowParentsNotVisited 是不能被结束的,意思就是相乘了环而导致循环无法结束。

Test Case

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// org/apache/spark/rdd/RDDSuite.scala
test("getNarrowAncestors") {
    val rdd1 = sc.parallelize(1 to 100, 4)
    val rdd2 = rdd1.filter(_ % 2 == 0).map(_ + 1)
    val rdd3 = rdd2.map(_ - 1).filter(_ < 50).map(i => (i, i))
    val rdd4 = rdd3.reduceByKey(_ + _)
    val rdd5 = rdd4.mapValues(_ + 1).mapValues(_ + 2).mapValues(_ + 3)
    val ancestors1 = rdd1.getNarrowAncestors
    val ancestors2 = rdd2.getNarrowAncestors
    val ancestors3 = rdd3.getNarrowAncestors
    val ancestors4 = rdd4.getNarrowAncestors
    val ancestors5 = rdd5.getNarrowAncestors
    
    // Simple dependency tree with a single branch
    assert(ancestors1.size === 0)
    assert(ancestors2.size === 2)
    assert(ancestors2.count(_ === rdd1) === 1)
    assert(ancestors2.count(_.isInstanceOf[MapPartitionsRDD[_, _]]) === 1)
    assert(ancestors3.size === 5)
    assert(ancestors3.count(_.isInstanceOf[MapPartitionsRDD[_, _]]) === 4)
    
    // Any ancestors before the shuffle are not considered
    assert(ancestors4.size === 0)
    assert(ancestors4.count(_.isInstanceOf[ShuffledRDD[_, _, _]]) === 0)
    assert(ancestors5.size === 3)
    assert(ancestors5.count(_.isInstanceOf[ShuffledRDD[_, _, _]]) === 1)
    assert(ancestors5.count(_ === rdd3) === 0)
    assert(ancestors5.count(_.isInstanceOf[MapPartitionsRDD[_, _]]) === 2)
}

建议可以跑一下 RDDSuite.scala 测试类中的关于 getNarrowAncestors 方法。很显然,针对第二部分的情况,窄依赖只跟踪到 shuffle 之前,也就是一个 RDD 血缘遇到 shuffle 操作,那么窄依赖的依赖链条就会重新计数。

警告
本文最后更新于 2017年2月1日,文中内容可能已过时,请谨慎参考。