概述
本文对原文进行翻译并且重新排版,英文没有问题的同学可以直接看原文。
原文地址为 https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-performance-tuning-groupBy-aggregation.html
本次 case study 的目标是在 Spark SQL 使用 groupBy
聚合的合理的 partition 数量的调优。
创建一个具有两个 partition 的数据集。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
// 2 partition dataset
val ids = spark.range(start = 0, end = 4, step = 1, numPartitions = 2)
scala> ids.show
+---+
| id|
+---+
| 0|
| 1|
| 2|
| 3|
+---+
scala> ids.rdd.toDebugString
res1: String =
(2) MapPartitionsRDD[8] at rdd at <console>:26 []
| MapPartitionsRDD[7] at rdd at <console>:26 []
| MapPartitionsRDD[6] at rdd at <console>:26 []
| MapPartitionsRDD[5] at rdd at <console>:26 []
| ParallelCollectionRDD[4] at rdd at <console>:26 []
|
默认的 Spark SQL 会使用 spark.sql.shuffle.partitions
的数量来进行 aggregation
和 join
,默认值为 200。这会导致 partition 膨胀的问题,200个 partition 都需要执行,无论大小,尽管有些 partition 是没有数据的。
Case 1: Default Number of Partitions — spark.sql.shuffle.partitions Property
当你看完这个 case 之后,就会发现,有时候依赖默认设置,反而会导致更差的 performance。考虑一下,下面的程序,这个 query 会产生多少个 partition?
1
2
3
4
|
val groupingExpr = 'id % 2 as "group"
val q = ids.
groupBy(groupingExpr).
agg(count($"id") as "count")
|
你以为只有两个分区吗?错!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
scala> q.explain
== Physical Plan ==
*HashAggregate(keys=[(id#0L % 2)#17L], functions=[count(1)])
+- Exchange hashpartitioning((id#0L % 2)#17L, 200)
+- *HashAggregate(keys=[(id#0L % 2) AS (id#0L % 2)#17L], functions=[partial_count(1)])
+- *Range (0, 4, step=1, splits=2)
scala> q.rdd.toDebugString
res5: String =
(200) MapPartitionsRDD[16] at rdd at <console>:30 []
| MapPartitionsRDD[15] at rdd at <console>:30 []
| MapPartitionsRDD[14] at rdd at <console>:30 []
| ShuffledRowRDD[13] at rdd at <console>:30 []
+-(2) MapPartitionsRDD[12] at rdd at <console>:30 []
| MapPartitionsRDD[11] at rdd at <console>:30 []
| MapPartitionsRDD[10] at rdd at <console>:30 []
| ParallelCollectionRDD[9] at rdd at <console>:30 []
|
当你运行这个 query 的时候,应该会在 Spark Web UI 上看到200个左右的分区。
1
2
3
4
5
6
7
|
scala> q.show
+-----+-----+
|group|count|
+-----+-----+
| 0| 2|
| 1| 2|
+-----+-----+
|
Case 2: Using repartition Operator
Case 2里,我们用 repartition
试一下。
repartition
算子实际上会导致一些无必要的 shuffle。考虑一下,以下操作,会有几个 partition?
1
2
3
4
5
|
val groupingExpr = 'id % 2 as "group"'
val q = ids.
repartition(groupingExpr). // <-- repartition per groupBy expression
groupBy(groupingExpr).
agg(count($"id") as "count")
|
你还是觉得是两个分区是吧?错!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
scala> q.explain
== Physical Plan ==
*HashAggregate(keys=[(id#6L % 2)#105L], functions=[count(1)])
+- Exchange hashpartitioning((id#6L % 2)#105L, 200)
+- *HashAggregate(keys=[(id#6L % 2) AS (id#6L % 2)#105L], functions=[partial_count(1)])
+- Exchange hashpartitioning((id#6L % 2), 200)
+- *Range (0, 4, step=1, splits=2)
scala> q.rdd.toDebugString
res1: String =
(200) MapPartitionsRDD[57] at rdd at <console>:30 []
| MapPartitionsRDD[56] at rdd at <console>:30 []
| MapPartitionsRDD[55] at rdd at <console>:30 []
| ShuffledRowRDD[54] at rdd at <console>:30 []
+-(200) MapPartitionsRDD[53] at rdd at <console>:30 []
| MapPartitionsRDD[52] at rdd at <console>:30 []
| ShuffledRowRDD[51] at rdd at <console>:30 []
+-(2) MapPartitionsRDD[50] at rdd at <console>:30 []
| MapPartitionsRDD[49] at rdd at <console>:30 []
| MapPartitionsRDD[48] at rdd at <console>:30 []
| ParallelCollectionRDD[47] at rdd at <console>:30 []
Compare the physical plans of the two queries and you will surely regret using repartition operator in the latter as you did cause an extra shuffle stage (!)
|
Case 3: Using repartition Operator With Explicit Number of Partitions
事实上,repartition
需要两个参数,一个是 numPartitions
,一个是 partitionExprs
。
1
|
repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T]
|
下面再来看看以下 query 的分区数。A
1
2
3
4
5
|
val groupingExpr = 'id % 2 as "group"
val q = ids.
repartition(numPartitions = 2, groupingExpr). // <-- repartition per groupBy expression
groupBy(groupingExpr).
agg(count($"id") as "count")
|
这次是真的两个了…
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
scala> q.explain
== Physical Plan ==
*HashAggregate(keys=[(id#6L % 2)#129L], functions=[count(1)])
+- Exchange hashpartitioning((id#6L % 2)#129L, 200)
+- *HashAggregate(keys=[(id#6L % 2) AS (id#6L % 2)#129L], functions=[partial_count(1)])
+- Exchange hashpartitioning((id#6L % 2), 2)
+- *Range (0, 4, step=1, splits=2)
scala> q.rdd.toDebugString
res14: String =
(200) MapPartitionsRDD[78] at rdd at <console>:30 []
| MapPartitionsRDD[77] at rdd at <console>:30 []
| MapPartitionsRDD[76] at rdd at <console>:30 []
| ShuffledRowRDD[75] at rdd at <console>:30 []
+-(2) MapPartitionsRDD[74] at rdd at <console>:30 []
| MapPartitionsRDD[73] at rdd at <console>:30 []
| ShuffledRowRDD[72] at rdd at <console>:30 []
+-(2) MapPartitionsRDD[71] at rdd at <console>:30 []
| MapPartitionsRDD[70] at rdd at <console>:30 []
| MapPartitionsRDD[69] at rdd at <console>:30 []
| ParallelCollectionRDD[68] at rdd at <console>:30 []
|
Case 4: Remember spark.sql.shuffle.partitions Property? Set It Up Properly
SHUFFLE_PARTITIONS
作为参数,肯定是可以设置默认值的,以下是设置的过程和结果。
1
2
3
|
import org.apache.spark.sql.internal.SQLConf.SHUFFLE_PARTITIONS
spark.sessionState.conf.setConf(SHUFFLE_PARTITIONS, 2)
// spark.conf.set(SHUFFLE_PARTITIONS.key, 2)
|
1
2
|
scala> spark.sessionState.conf.numShufflePartitions
res8: Int = 2
|
1
2
3
|
val q = ids.
groupBy(groupingExpr).
agg(count($"id") as "count")
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
scala> q.explain
== Physical Plan ==
*HashAggregate(keys=[(id#0L % 2)#40L], functions=[count(1)])
+- Exchange hashpartitioning((id#0L % 2)#40L, 2)
+- *HashAggregate(keys=[(id#0L % 2) AS (id#0L % 2)#40L], functions=[partial_count(1)])
+- *Range (0, 4, step=1, splits=2)
scala> q.rdd.toDebugString
res10: String =
(2) MapPartitionsRDD[31] at rdd at <console>:31 []
| MapPartitionsRDD[30] at rdd at <console>:31 []
| MapPartitionsRDD[29] at rdd at <console>:31 []
| ShuffledRowRDD[28] at rdd at <console>:31 []
+-(2) MapPartitionsRDD[27] at rdd at <console>:31 []
| MapPartitionsRDD[26] at rdd at <console>:31 []
| MapPartitionsRDD[25] at rdd at <console>:31 []
| ParallelCollectionRDD[24] at rdd at <console>:31 []
|
警告
本文最后更新于 2017年2月1日,文中内容可能已过时,请谨慎参考。