目录

Spark性能调优之Shuffle调优

概述

本文整理自: https://www.cnblogs.com/haozhengfei/p/5fc4a976a864f33587b094f36b72c7d3.html

正文

Spark 底层 shuffle 的传输方式是使用 netty 传输,netty 在进行网络传输的过程会申请堆外内存(netty是零拷贝),所以使用了堆外内存。

shuffle过程中常出现的问题

常见问题一: reduce oom?

问题原因: reduce task 去 map 端获取数据,reduce 一边拉取数据一边聚合,reduce 端有一块聚合内存(executor memory * 0.2),也就是这块内存不够。

解决办法:

  1. 增加reduce聚合操作的内存的比例
  2. 增加Executor memory的大小
  3. 减少reduce task每次拉取的数据量,设置spark.reducer.maxSizeInFlight 24m,拉取的次数就多了,因此建立连接的次数增多,有可能会连接不上(正好赶上map task端进行GC)

常见问题二: shuffle file cannot find or executor lost

什么时候需要调节Executor的堆外内存大小?

  • shuffle file cannot find (DAGScheduler,resubmitting task)
  • executor lost
  • task lost
  • out of memory

问题原因:

  1. map task所运行的executor内存不足,导致executor挂掉了,executor里面的BlockManager就挂掉了,导致ConnectionManager不能用,也就无法建立连接,从而不能拉取数据(这个还是很容易想到的)
  2. executor并没有挂掉 2.1 BlockManager之间的连接失败(map task所运行的executor正在GC) 2.2 建立连接成功,map task所运行的executor正在GC
  3. reduce task向Driver 中的MapOutputTracker获取shuffle file位置的时候出现了问题

解决办法:

  1. 增大Executor内存(即堆内内存),申请的堆外内存也会随之增加(按比例上升)--executor-memory 5G
  2. 增大堆外内存--conf spark.yarn.executor.memoryoverhead 2048M --conf spark.executor.memoryoverhead 2048M(默认申请的堆外内存是Executor内存的10%,真正处理大数据的时候,这里都会出现问题,导致spark作业反复崩溃,无法运行;此时就会去调节这个参数,到至少1G(1024M),甚至说2G、4G)
/spark%E6%80%A7%E8%83%BD%E8%B0%83%E4%BC%98%E4%B9%8Bshuffle%E8%B0%83%E4%BC%98/image_1cug06rhr16asnpe1qj21ologfp19.png

buffer 32k // 缓冲区默认大小为32k SparkConf.set("spark.shuffle.file.buffer","64k"),有什么好处呢?因为 Shuffle 阶段维护的那个 HashMap 是在内存里的,如果缓冲区大一点。 reduce 48M // reduce 端拉取数据的时候,默认大小是48M SparkConf.set("spark.reducer.maxSizeInFlight","96M")

spark.shuffle.file.buffer

默认值: 32k 参数说明: 该参数用于设置 shuffle write task 的 BufferedOutputStream 的 buffer 缓冲大小。将数据写到磁盘文件之前,会先写入 buffer 缓冲中,待缓冲写满之后,才会溢写到磁盘。 调优建议: 如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升!!!

spark.reducer.maxSizeInFlight

默认值: 48m 参数说明: 该参数用于设置 shuffle read task 的 buffer 缓冲大小,而这个 buffer 缓冲决定了每次能够拉取多少数据。 调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。

错误:reduce oom reduce task去map拉数据,reduce 一边拉数据一边聚合 reduce段有一块聚合内存(executor memory * 0.2) 解决办法:

  1. 增加reduce聚合的内存的比例,设置spark.shuffle.memoryFraction
  2. 增加executor memory的大小--executor-memory 5G
  3. 减少reduce task每次拉取的数据量spark.reducer.maxSizeInFlight 24m

spark.shuffle.io.maxRetries

默认值: 3 参数说明: shuffle read task 从 shuffle write task 所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败。 调优建议: 对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(比如60次),以避免由于JVM的full gc 或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的 shuffle 过程,调节该参数可以大幅度提升稳定性。shuffle file not find taskScheduler 不负责重试 task,由 DAGScheduler 负责重试stage。

spark.shuffle.io.retryWait

默认值: 5s 参数说明: 具体解释同上,该参数代表了每次重试拉取数据的等待间隔,默认是5s。 调优建议: 建议加大间隔时长(比如60s),以增加 shuffle 操作的稳定性。

spark.shuffle.memoryFraction

默认值: 0.2 参数说明: 该参数代表了Executor内存中,分配给 shuffle read task 进行聚合操作的内存比例,默认是20%。 调优建议: 在资源参数调优中讲解过这个参数。如果内存充足,而且很少使用持久化操作,建议调高这个比例,给shuffle read 的聚合操作更多内存,以避免由于内存不足导致聚合过程中频繁读写磁盘。在实践中发现,合理调节该参数可以将性能提升10%左右。

spark.shuffle.manager

默认值: sort 参数说明: 该参数用于设置ShuffleManager的类型。Spark 1.5以后,有三个可选项:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默认选项,但是Spark 1.2以及之后的版本默认都是SortShuffleManager了。tungsten-sort与sort类似,但是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高。 调优建议: 由于SortShuffleManager默认会对数据进行排序,因此如果你的业务逻辑中需要该排序机制的话,则使用默认的SortShuffleManager就可以;而如果你的业务逻辑不需要对数据进行排序,那么建议参考后面的几个参数调优,通过bypass机制或优化的HashShuffleManager来避免排序操作,同时提供较好的磁盘读写性能。这里要注意的是,tungsten-sort要慎用,因为之前发现了一些相应的bug。

spark.shuffle.sort.bypassMergeThreshold

默认值: 200 参数说明: 当 ShuffleManager 为 SortShuffleManager 时,如果 shuffle read task 的数量小于这个阈值(默认是200),则 shuffle write 过程中不会进行排序操作,而是直接按照未经优化的 HashShuffleManager 的方式去写数据,但是最后会将每个 task 产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。 调优建议:当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于 shuffle read task 的数量。那么此时就会自动启用bypass机制,map-side 就不会进行排序了,减少了排序的性能开销。但是这种方式下,依然会产生大量的磁盘文件,因此 shuffle write 性能有待提高。

spark.shuffle.consolidateFiles

默认值: false 参数说明: 如果使用 HashShuffleManager,该参数有效。如果设置为 true,那么就会开启 consolidate 机制,会大幅度合并 shuffle write 的输出文件,对于 shuffle read task 数量特别多的情况下,这种方法可以极大地减少磁盘IO开销,提升性能。 调优建议: 如果的确不需要 SortShuffleManager 的排序机制,那么除了使用 bypass 机制,还可以尝试将 spark.shffle.manager 参数手动指定为 hash,使用 HashShuffleManager,同时开启 consolidate 机制。在实践中尝试过,发现其性能比开启了 bypass 机制的 SortShuffleManager 要高出10%~30%。

警告
本文最后更新于 2017年2月1日,文中内容可能已过时,请谨慎参考。