IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    [原]Spark技术内幕:Shuffle Pluggable框架详解,你怎么开发自己的Shuffle Service?

    anzhsoft2008发表于 2015-01-08 07:58:40
    love 0

    首先介绍一下需要实现的接口。框架的类图如图所示(今天CSDN抽风,竟然上传不了图片。如果需要实现新的Shuffle机制,那么需要实现这些接口。


    1.1.1 org.apache.spark.shuffle.ShuffleManager

    Driver和每个Executor都会持有一个ShuffleManager,这个ShuffleManager可以通过配置项spark.shuffle.manager指定,并且由SparkEnv创建。Driver中的ShuffleManager负责注册Shuffle的元数据,比如Shuffle ID,map task的数量等。Executor中的ShuffleManager 则负责读和写Shuffle的数据。

    需要实现的函数及其功能说明:

    1) 由Driver注册元数据信息

    defregisterShuffle[K, V, C](

    shuffleId: Int,

    numMaps: Int,

    dependency:ShuffleDependency[K, V, C]): ShuffleHandle

    一般如果没有特殊的需求,可以使用下面的实现,实际上Hash BasedShuffle 和Sort BasedShuffle都是这么实现的。

    override def registerShuffle[K, V, C](

    shuffleId: Int,

    numMaps: Int,

    dependency: ShuffleDependency[K, V, C]):ShuffleHandle = {

    new BaseShuffleHandle(shuffleId, numMaps,dependency)

    }

    2) 获得Shuffle Writer, 根据Shuffle Map Task的ID为其创建Shuffle Writer。

    def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context:TaskContext): ShuffleWriter[K, V]

    3) 获得Shuffle Reader,根据Shuffle ID和partition的ID为其创建ShuffleReader。

    def getReader[K, C](

    handle: ShuffleHandle,

    startPartition: Int,

    endPartition: Int,

    context: TaskContext): ShuffleReader[K,C]

    4) 为数据成员shuffleBlockManager赋值,以保存实际的ShuffleBlockManager

    5) defunregisterShuffle(shuffleId: Int): Boolean,删除本地的Shuffle的元数据。

    6) def stop(): Unit,停止Shuffle Manager。

    每个接口的具体实现的例子,可以参照org.apache.spark.shuffle.sort.SortShuffleManager 和org.apache.spark.shuffle.hash.HashShuffleManager。

    1.1.2 org.apache.spark.shuffle.ShuffleWriter

    Shuffle Map Task通过ShuffleWriter将Shuffle数据写入本地。这个Writer主要通过ShuffleBlockManager来写入数据,因此它的功能是比较轻量级的。

    1) def write(records: Iterator[_ <:Product2[K, V]]): Unit, 写入所有的数据。需要注意的是如果需要在Map端做聚合。(aggregate),那么写入前需要将records做聚合。

    2) def stop(success: Boolean): Option[MapStatus],写入完成后提交本次写入。

    对于Hash BasedShuffle,请查看org.apache.spark.shuffle.hash.HashShuffleWriter;对于Sort Based Shuffle,请查看org.apache.spark.shuffle.sort.SortShuffleWriter。

    1.1.3 org.apache.spark.shuffle.ShuffleBlockManager

    主要使用从本地读取Shuffle数据的功能。这些接口都是通过org.apache.spark.storage.BlockManager调用的。

    1) def getBytes(blockId: ShuffleBlockId):Option[ByteBuffer], 一般通过调用下一个接口实现,只不过将ManagedBuffer转换成了ByteBuffer。

    2) def getBlockData(blockId:ShuffleBlockId): ManagedBuffer,核心读取逻辑。比如Hash Based Shuffle的从本地读取文件都是通过这个接口实现的。因为不同的实现可能文件的组织方式是不一样的,比如Sort Based Shuffle需要通过先读取Index索引文件获得每个partition的起始位置后,才能读取真正的数据文件。

    3) def stop(): Unit,停止该Manager。

    对于Hash Based Shuffle,请查看org.apache.spark.shuffle.FileShuffleBlockManager;对于Sort Based Shuffle,请查看org.apache.spark.shuffle.IndexShuffleBlockManager。

    1.1.4 org.apache.spark.shuffle.ShuffleReader

    ShuffleReader实现了下游的Task如何读取上游的ShuffleMapTask的Shuffle输出的逻辑。这个逻辑比较复杂,简单来说就是通过org.apache.spark.MapOutputTracker获得数据的位置信息,然后如果数据在本地那么调用org.apache.spark.storage.BlockManager的getBlockData读取本地数据(实际上getBlockData最终会调用org.apache.spark.shuffle.ShuffleBlockManager的getBlockData)。具体的Shuffle Read的逻辑请查看下面的章节。

    1) def read():Iterator[Product2[K, C]]


    如何开发自己的Shuffle机制?到这里你应该知道怎么做了。不知道? 再看一遍吧。



    如果您喜欢 本文,那么请动一下手指支持以下博客之星的评比吧。非常感谢您的投票。每天可以一票哦。

    点我投票



沪ICP备19023445号-2号
友情链接