目录

Number-of-Partitions-for-groupBy-Aggregation

概述

本文对原文进行翻译并且重新排版,英文没有问题的同学可以直接看原文。

原文地址为 https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-performance-tuning-groupBy-aggregation.html 本次 case study 的目标是在 Spark SQL 使用 groupBy 聚合的合理的 partition 数量的调优。

创建一个具有两个 partition 的数据集。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
// 2 partition dataset
val ids = spark.range(start = 0, end = 4, step = 1, numPartitions = 2)
scala> ids.show
+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
+---+

scala> ids.rdd.toDebugString
res1: String =
(2) MapPartitionsRDD[8] at rdd at <console>:26 []
 |  MapPartitionsRDD[7] at rdd at <console>:26 []
 |  MapPartitionsRDD[6] at rdd at <console>:26 []
 |  MapPartitionsRDD[5] at rdd at <console>:26 []
 |  ParallelCollectionRDD[4] at rdd at <console>:26 []

默认的 Spark SQL 会使用 spark.sql.shuffle.partitions 的数量来进行 aggregationjoin,默认值为 200。这会导致 partition 膨胀的问题,200个 partition 都需要执行,无论大小,尽管有些 partition 是没有数据的。

Case 1: Default Number of Partitions — spark.sql.shuffle.partitions Property

当你看完这个 case 之后,就会发现,有时候依赖默认设置,反而会导致更差的 performance。考虑一下,下面的程序,这个 query 会产生多少个 partition?

1
2
3
4
val groupingExpr = 'id % 2 as "group"
val q = ids.
    groupBy(groupingExpr).
    agg(count($"id") as "count")

你以为只有两个分区吗?错!

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
scala> q.explain
== Physical Plan ==
*HashAggregate(keys=[(id#0L % 2)#17L], functions=[count(1)])
+- Exchange hashpartitioning((id#0L % 2)#17L, 200)
   +- *HashAggregate(keys=[(id#0L % 2) AS (id#0L % 2)#17L], functions=[partial_count(1)])
      +- *Range (0, 4, step=1, splits=2)

scala> q.rdd.toDebugString
res5: String =
(200) MapPartitionsRDD[16] at rdd at <console>:30 []
  |   MapPartitionsRDD[15] at rdd at <console>:30 []
  |   MapPartitionsRDD[14] at rdd at <console>:30 []
  |   ShuffledRowRDD[13] at rdd at <console>:30 []
  +-(2) MapPartitionsRDD[12] at rdd at <console>:30 []
     |  MapPartitionsRDD[11] at rdd at <console>:30 []
     |  MapPartitionsRDD[10] at rdd at <console>:30 []
     |  ParallelCollectionRDD[9] at rdd at <console>:30 []

当你运行这个 query 的时候,应该会在 Spark Web UI 上看到200个左右的分区。

1
2
3
4
5
6
7
scala> q.show
+-----+-----+
|group|count|
+-----+-----+
|    0|    2|
|    1|    2|
+-----+-----+

Case 2: Using repartition Operator

Case 2里,我们用 repartition 试一下。

repartition 算子实际上会导致一些无必要的 shuffle。考虑一下,以下操作,会有几个 partition?

1
2
3
4
5
val groupingExpr = 'id % 2 as "group"'
val q = ids.
  repartition(groupingExpr). // <-- repartition per groupBy expression
  groupBy(groupingExpr).
  agg(count($"id") as "count")

你还是觉得是两个分区是吧?错!

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
scala> q.explain
== Physical Plan ==
*HashAggregate(keys=[(id#6L % 2)#105L], functions=[count(1)])
+- Exchange hashpartitioning((id#6L % 2)#105L, 200)
   +- *HashAggregate(keys=[(id#6L % 2) AS (id#6L % 2)#105L], functions=[partial_count(1)])
      +- Exchange hashpartitioning((id#6L % 2), 200)
         +- *Range (0, 4, step=1, splits=2)

scala> q.rdd.toDebugString
res1: String =
(200) MapPartitionsRDD[57] at rdd at <console>:30 []
  |   MapPartitionsRDD[56] at rdd at <console>:30 []
  |   MapPartitionsRDD[55] at rdd at <console>:30 []
  |   ShuffledRowRDD[54] at rdd at <console>:30 []
  +-(200) MapPartitionsRDD[53] at rdd at <console>:30 []
      |   MapPartitionsRDD[52] at rdd at <console>:30 []
      |   ShuffledRowRDD[51] at rdd at <console>:30 []
      +-(2) MapPartitionsRDD[50] at rdd at <console>:30 []
         |  MapPartitionsRDD[49] at rdd at <console>:30 []
         |  MapPartitionsRDD[48] at rdd at <console>:30 []
         |  ParallelCollectionRDD[47] at rdd at <console>:30 []
Compare the physical plans of the two queries and you will surely regret using repartition operator in the latter as you did cause an extra shuffle stage (!)

Case 3: Using repartition Operator With Explicit Number of Partitions

事实上,repartition 需要两个参数,一个是 numPartitions,一个是 partitionExprs

1
repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T]

下面再来看看以下 query 的分区数。A

1
2
3
4
5
val groupingExpr = 'id % 2 as "group"
val q = ids.
  repartition(numPartitions = 2, groupingExpr). // <-- repartition per groupBy expression
  groupBy(groupingExpr).
  agg(count($"id") as "count")

这次是真的两个了…

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
scala> q.explain
== Physical Plan ==
*HashAggregate(keys=[(id#6L % 2)#129L], functions=[count(1)])
+- Exchange hashpartitioning((id#6L % 2)#129L, 200)
   +- *HashAggregate(keys=[(id#6L % 2) AS (id#6L % 2)#129L], functions=[partial_count(1)])
      +- Exchange hashpartitioning((id#6L % 2), 2)
         +- *Range (0, 4, step=1, splits=2)

scala> q.rdd.toDebugString
res14: String =
(200) MapPartitionsRDD[78] at rdd at <console>:30 []
  |   MapPartitionsRDD[77] at rdd at <console>:30 []
  |   MapPartitionsRDD[76] at rdd at <console>:30 []
  |   ShuffledRowRDD[75] at rdd at <console>:30 []
  +-(2) MapPartitionsRDD[74] at rdd at <console>:30 []
     |  MapPartitionsRDD[73] at rdd at <console>:30 []
     |  ShuffledRowRDD[72] at rdd at <console>:30 []
     +-(2) MapPartitionsRDD[71] at rdd at <console>:30 []
        |  MapPartitionsRDD[70] at rdd at <console>:30 []
        |  MapPartitionsRDD[69] at rdd at <console>:30 []
        |  ParallelCollectionRDD[68] at rdd at <console>:30 []

Case 4: Remember spark.sql.shuffle.partitions Property? Set It Up Properly

SHUFFLE_PARTITIONS 作为参数,肯定是可以设置默认值的,以下是设置的过程和结果。

1
2
3
import org.apache.spark.sql.internal.SQLConf.SHUFFLE_PARTITIONS
spark.sessionState.conf.setConf(SHUFFLE_PARTITIONS, 2)
// spark.conf.set(SHUFFLE_PARTITIONS.key, 2)
1
2
scala> spark.sessionState.conf.numShufflePartitions
res8: Int = 2
1
2
3
val q = ids.
  groupBy(groupingExpr).
  agg(count($"id") as "count")
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
scala> q.explain
== Physical Plan ==
*HashAggregate(keys=[(id#0L % 2)#40L], functions=[count(1)])
+- Exchange hashpartitioning((id#0L % 2)#40L, 2)
   +- *HashAggregate(keys=[(id#0L % 2) AS (id#0L % 2)#40L], functions=[partial_count(1)])
      +- *Range (0, 4, step=1, splits=2)

scala> q.rdd.toDebugString
res10: String =
(2) MapPartitionsRDD[31] at rdd at <console>:31 []
 |  MapPartitionsRDD[30] at rdd at <console>:31 []
 |  MapPartitionsRDD[29] at rdd at <console>:31 []
 |  ShuffledRowRDD[28] at rdd at <console>:31 []
 +-(2) MapPartitionsRDD[27] at rdd at <console>:31 []
    |  MapPartitionsRDD[26] at rdd at <console>:31 []
    |  MapPartitionsRDD[25] at rdd at <console>:31 []
    |  ParallelCollectionRDD[24] at rdd at <console>:31 []
警告
本文最后更新于 2017年2月1日,文中内容可能已过时,请谨慎参考。