在Spark 1.2.0中,Spark Core的一个重要的升级就是将默认的Hash Based Shuffle换成了Sort Based Shuffle,即spark.shuffle.manager 从hash换成了sort,对应的实现类分别是org.apache.spark.shuffle.hash.HashShuffleManager和org.apache.spark.shuffle.sort.SortShuffleManager。
这个方式的选择是在org.apache.spark.SparkEnv完成的:
// Let the user specify short names forshuffle managers val shortShuffleMgrNames = Map( "hash" ->"org.apache.spark.shuffle.hash.HashShuffleManager", "sort" ->"org.apache.spark.shuffle.sort.SortShuffleManager") val shuffleMgrName =conf.get("spark.shuffle.manager", "sort") //获得Shuffle Manager的type,sort为默认 val shuffleMgrClass =shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName) val shuffleManager =instantiateClass[ShuffleManager](shuffleMgrClass)
那么Sort BasedShuffle“取代”Hash BasedShuffle作为默认选项的原因是什么?
正如前面提到的,Hashbased shuffle的每个mapper都需要为每个reducer写一个文件,供reducer读取,即需要产生M*R个数量的文件,如果mapper和reducer的数量比较大,产生的文件数会非常多。Hash based shuffle设计的目标之一就是避免不需要的排序(Hadoop Map Reduce被人诟病的地方,很多不需要sort的地方的sort导致了不必要的开销)。但是它在处理超大规模数据集的时候,产生了大量的DiskIO和内存的消耗,这无疑很影响性能。Hash based shuffle也在不断的优化中,正如前面讲到的Spark 0.8.1引入的file consolidation在一定程度上解决了这个问题。为了更好的解决这个问题,Spark 1.1 引入了Sort based shuffle。首先,每个Shuffle Map Task不会为每个Reducer生成一个单独的文件;相反,它会将所有的结果写到一个文件里,同时会生成一个index文件,Reducer可以通过这个index文件取得它需要处理的数据。避免产生大量的文件的直接收益就是节省了内存的使用和顺序Disk IO带来的低延时。节省内存的使用可以减少GC的风险和频率。而减少文件的数量可以避免同时写多个文件对系统带来的压力。
并且从作者ReynoldXin的几乎所有的测试来看,Sortbased shuffle在速度和内存使用方面优于Hashbased shuffle:“sort-basedshuffle has lower memory usage and seems to outperformhash-based in almost allof our testing.”
性能数据:from:https://issues.apache.org/jira/browse/SPARK-3280
Shuffle Map Task会按照key相对应的partition ID进行sort,其中属于同一个partition的key不会sort。因为对于不需要sort的操作来说,这个sort是负收益的;要知道之前Spark刚开始使用Hash based的shuffle而不是sort based就是为了避免Hadoop Map Reduce对于所有计算都会sort的性能损耗。对于那些需要sort的运算,比如sortByKey,这个sort在Spark 1.2.0里还是由reducer完成的。
如果这个过程内存不够用了,那么这些已经sort的内容会被spill到外部存储。然后在结束的时候将这些不同的文件进行merge sort。
为了便于下游的Taskfetch到其需要的partition,这里会生成一个index文件,去记录不同的partition的位置信息。当然了org.apache.spark.storage.BlockManager需要也有响应的实现以实现这种新的寻址方式。
核心实现的逻辑都在类org.apache.spark.shuffle.sort.SortShuffleWriter。下面简要分析一下它的实现:
1) 对于每个partition,创建一个scala.Array存储它所包含的key,value对。每个待处理的key,value对都会插入相应的scala.Array。
2) 如果scala.Array的大小超过阈值,那么需要将这个in memory的数据spill到外部存储。这个文件的开始部分会记录这个partition的ID,这个文件保存了多少个pair等信息。
3) 最后需要将所有spill到外部存储的文件进行mergesort。同时打开的文件不能过多,过多的话会消耗大量的内存,增加OOM或者GC的风险;也不能过少,过少的话就会影响性能,增大计算的延时。一般的话推荐每次同时打开10 – 100个文件。
4) 在生成最后的数据文件时,需要同时生成index索引文件。正如前面提到的,这个索引文件将记录不同partition的range。
当然了,你可能还有个疑问,就是Hash Based Shuffle说白了就是根据key需要写入的org.apache.spark.HashPartitioner,为每个Reducer写入单独的Partition。只不过对于同一个Core启动的Shuffle Map Task,如果选择spark.shuffle.consolidateFiles的话,第二个Shuffle Map Task会把结果append到上一个文件中去。那么sort的逻辑是完全可以整合到Hash Based Shuffle中去,为什么又要重新实现一种Shuffle Writer呢?我认为有以下几点: