目录

Spark程序优化建议

RDD缓存

/spark%E7%A8%8B%E5%BA%8F%E4%BC%98%E5%8C%96%E5%BB%BA%E8%AE%AE/image_1ddn6q6gpo121btsit718k8ode9.png

Persist 到内存的 RDD,比较多,9T左右,Excutor 一共分配了25T内存。剩下给 Shuffle 的空间不算大了,所以会引起频繁的 GC。

建议:

  1. 减少缓存级别,可以适当持久化到磁盘的方式保存 RDD。
  2. 采用 Kryo 的序列化方式,减少所需要的内存空间

合理的 Cache 会带来性能的飞跃….

/spark%E7%A8%8B%E5%BA%8F%E4%BC%98%E5%8C%96%E5%BB%BA%E8%AE%AE/image_1ddn84hkgcb81edklqtaod9cv13.png

5T纯内存操作,非常快。

/spark%E7%A8%8B%E5%BA%8F%E4%BC%98%E5%8C%96%E5%BB%BA%E8%AE%AE/image_1ddn8aj60rc4ff1eme1rvo1v5s2g.png

RDD cache 的原则

  1. Reusing them in an iterative loop (ie. ML algos)
  2. Reuse the RDD multiple times in a single application, job, or notebook.
  3. When the upfront cost to regenerate the RDD partitions is costly (ie. HDFS, after a complex set of map(), filter(), etc.) This helps in the recovery process if a Worker node dies.

并行度

/spark%E7%A8%8B%E5%BA%8F%E4%BC%98%E5%8C%96%E5%BB%BA%E8%AE%AE/image_1ddn79tjo1b0joei18si15gi1mp9m.png

部分算子使用了 map,想对而言,map 是针对 RDD 的每个元素调用一次方法,mapPartion 则是每个 partition 调用一次,效率会更高。

/spark%E7%A8%8B%E5%BA%8F%E4%BC%98%E5%8C%96%E5%BB%BA%E8%AE%AE/image_1ddn9ik0n1u4d1papr8c1a5fsph4a.png
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 不高效的方法
val firstLayer = nodes.leftOuterJoin(edges, partitioner)
  .map {
    case (u, (pid, dataitr)) => {
      (pid, (u, dataitr.getOrElse(-1L)))
    }
  }

// 高效的方法,自定义迭代器
class ChangeMeIterator(iter: Iterator[(Long, (Int, Option[Long]))]) extends Iterator[(Long, (Int,
  Long))] {
  def hasNext: Boolean = {
    iter.hasNext
  }

  def next(): (Long, (Int, Long)) = {
    val cur = iter.next()
    (cur._1, (cur._2._1, cur._2._2.getOrElse(-1L)))
  }
}

val firstLayer = nodes.leftOuterJoin(edges, partitioner).mapPartitions(
  v => new ChangeMeIterator(v)
)

另外还有一种采用 xxxPartion 但是相对没那么高效的情况。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
val firstLayer = nodes.leftOuterJoin(edges, partitioner)
  .map { case (u, (pid, dataitr)) => (pid, (u, dataitr.getOrElse(-1L))) }
  .partitionBy(new HashPartitioner(numSubsets))
  .mapPartitionsWithIndex { case (pid, dataitr) =>
    // TODO: 内部维护了一个数据结构,不是很高效的做法
    // 如果采用自定义迭代器的话,是不需要这个数据结构b
    val sourceNodes = mutable.HashSet.empty[Long]
    val data = dataitr.toArray.map(_._2)

    data.foreach { case (u, v) => if (!sourceNodes.contains(u)) sourceNodes.add(u) }

    val tmpEdges = data.filter(x => x._2 != -1L)

    if (tmpEdges.nonEmpty) {
      this.MetisLocal(tmpEdges, numParts, relativePathPrefix)
        .filter(x => sourceNodes.contains(x._1)).map { case (u, i) => (u, (pid, i)) }.toIterator
    } else Iterator.empty
  }.partitionBy(partitioner)
  .persist(StorageLevel.MEMORY_AND_DISK).setName("firstLayer")
/spark%E7%A8%8B%E5%BA%8F%E4%BC%98%E5%8C%96%E5%BB%BA%E8%AE%AE/image_1ddnbt7noqdoilmss1dc148s4n.png

Jobs/Stages太多

/spark%E7%A8%8B%E5%BA%8F%E4%BC%98%E5%8C%96%E5%BB%BA%E8%AE%AE/image_1ddni2vbfk2uhu11c891og81hki6h.png

虽然 RDD 都有缓存,相对来说 transformation 都是内存操作,但是对于程序本身的没有太大意义的,设计 shuffle 的操作,建议还是通过调试部分数据后,在正式环境,或者定时任务中去掉,这样可以减少任务运行的时间。

警告
本文最后更新于 2017年2月1日,文中内容可能已过时,请谨慎参考。