多应用+插件架构,代码干净,二开方便,首家独创一键云编译技术,文档视频完善,免费商用码云13.8K 广告
**1. ShuffleManager** Spark Shuffle 分为 Write 和 Read 两个过程。 在 Spark 中负责 shuffle 过程的执行、计算、处理的组件主要是 ShuffleManager,其时一个 trait,负责管理本地以及远程的 block 数据的 shuffle 操作。 ```scala // ShuffleManager有如下方法 private[spark] trait ShuffleManager { /** * 注册 ShuffleDependency(宽依赖),同时获取一个ShuffleHandle 并将其传递给任务 */ def registerShuffle[K, V, C]( shuffleId: Int, numMaps: Int, dependency: ShuffleDependency[K, V, C]): ShuffleHandle /** 返回 ShuffleWriter 用于 Shuffle Write 过程。对一个分区返回一个 ShuffleWriter, * 并由 executors 上的 ShuffleMapTask 任务调用 */ def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext): ShuffleWriter[K, V] /** * 返回 ShuffleReader 用于 Shuffle Read 过程 */ def getReader[K, C]( handle: ShuffleHandle, startPartition: Int, endPartition: Int, context: TaskContext): ShuffleReader[K, C] /** * Remove a shuffle's metadata from the ShuffleManager. * @return true if the metadata removed successfully, otherwise false. */ def unregisterShuffle(shuffleId: Int): Boolean /** * Return a resolver capable of retrieving shuffle block data based on block coordinates. */ def shuffleBlockResolver: ShuffleBlockResolver /** Shut down this ShuffleManager. */ def stop(): Unit } ``` <br/> **2. SortShuffleManager** ```scala private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager with Logging { ``` 每个 Task 在 Shuffle Write 操作时,虽然也会产生较大的磁盘文件,但最后会将所有的临时文件合并 (merge) 成一个磁盘文件,因此每个 Task 就只有一个磁盘文件。在下一个 Stage 的 Shuffle Read Task 拉取自己数据的时候,只要根据索引拉取每个磁盘文件中的部分数据即可。