eBay关于Spark push based shuffle 的调研及优化

文摘   2022-08-05 10:50  

01


背景

Carmel 是eBay内部基于Apache Spark打造的一款 SQL-on-Hadoop 查询引擎。通过对Apache Spark的改进,Carmel为eBay内部用户提供了一套高可用高性能的服务,以满足大量分析型的查询需求。


随着公司数据量的快速增长,用户提交的查询的Shuffle 数据量不断变大。为了提升用户查询的稳定性,缩短用户查询的执行时间,eBay Carmel team 对 Spark Shuffle 过程做了很多内部优化。本文将重点阐述eBay Carmel team 对社区版 Spark 中 push based shuffle 功能的相关研究。


02


Spark 现有 shuffle 机制概述及其不足

无论是以前的 MapReduce 计算模型还是 Spark 的 DAG 计算模型,shuffle都是确保数据能够可靠稳定的从map端传输到 reduce 端的一个重要过程。当处理的数据量越大,计算任务在 map阶段的分片和 reduce 阶段的分片越多,shuffle 过程中的计算和网络传输规模越大(注:复杂度 M * R)。因此,Spark开源社区在推动 push based shuffle来优化shuffle过程。


Spark 现有 shuffle 原理分析


Spark 在开启动态资源分配的时候,在 map task 执行完成之后,需要有一个外部服务来托管 map task 传输的shuffle 数据,这个服务叫 Spark External Shuffle Service ,后面简称ESS 服务 [1]。在 map task 执行完成后,reduce task 会向 ESS 服务请求 shuffle 数据。如果是 On yarn 模式,一般是将ESS服务作为 NodeManager 的 Aux Service 来管理。


传统的Spark shuffle 是 pull based 模型,详细 shuffle 过程如下:


  • Executor启动时向当前机器的 ESS 服务进行注册,同步 shuffle 目录配置等信息。


  • Map task 执行完会将计算结果写到本地磁盘, 然后将所有 blocks 信息上报给Driver。


  • Reduce task启动时会向 Driver 获取shuffle信息,提取当前 reduce task 需要读取的 blocks信息。Shuffle 请求使用线程池异步向所有 map task 所在的ESS 服务请求 shuffle 数据,reduce task 轮询消费请求结果,执行 reduce 计算。


图:Pull based shuffle 过程

(M. Shen, Y. Zhou and C. Singh. Magnet 2020)


Spark现有shuffle模型不足


Pull based shuffle 模型实际只发生一次从ESS服务到 reduce task 的网络传输,设计相对简单,大部分时候表现稳定。但是如果 spark job 非常大(比如 map task 和 reduce task 都是一万个,那么理论上的请求就有一亿个)也会存在如下一些问题:


  • 磁盘 I/O 效率低。虽然 map task 最终会将 shuffle 结果合并成一个文件,但是每个reduce task 在请求 shuffle 数据的时候,一次只会请求一个 block 数据分片;ESS 服务在接收到无序的 shuffle 数据请求时,只能重复通过随机 I/O 方式读取大小在数十 KB 的 block 数据,因此磁盘吞吐会非常低。


  • Shuffle 网络连接可靠性问题。Spark executor 使用连接池来管理不同机器间的网络连接,对于相同地址的请求,会复用同一个连接。当 map tasks 的 shuffle 结果存储在 S 个 ESS 服务上,reduce tasks 分布在 E 个 executors 上,那么理论上还是会建立 E * S 个网络连接,生产环境中大的Spark job,E 和 S 都可能会超过1000,那么网络出现问题的概率就会比较大。虽然在网络出现问题的时候,即使 Spark 内部通过重试,恢复网络连接,重新获取到了 shuffle 数据,但是这些 task 的执行时间也会变长,从而影响 Spark job 执行时间。


  • Shuffle 数据的本地性不好。现在的计算机硬件,CPU速度远大于磁盘和网络 I/O,所以对于分布式系统,将计算分配到数据所在机器,可以得到更好的性能提升。但是 pull based shuffle 模型中,reduce task 计算需要的 shuffle 数据分散在不同的节点上,虽然在 Spark 中,fetch shuffle 数据和reduce task 计算可以同时进行,但是reduce task的计算一般还是快于 shuffle 数据的获取过程,从而限制了 reduce task 计算速度。


03


Spark push based shuffle 原理分析

虽然Spark已经对 shuffle 过程做了很多优化,但是当集群的规模足够大的时候,shuffle read 仍然会有很多不稳定的情况。Linkedin 向 Spark社区贡献了他们内部基于 push based shuffle 实现的框架 Magnet [2],并在社区 Spark 3.2 版本 [3]实现基本可用。在 pull based shuffle 模型中,每个 reduce task 需要主动 pull 其 map stage 中每个 map task 输出的对应的 reduce 分片数据,但是在 push based shuffle 模型中,所有 map task 都会主动将同一个 reduce task 的数据 push 到同一个 ESS 服务, reduce task 就可以到这个 ESS 服务 fetch 合并好的 shuffle  数据了。


详细  shuffle 流程如下:

  • Executor启动时候向当前机器的 ESS 服务进行注册,同步 shuffle 目录配置等信息。


  • Map 阶段在启动之前,DAG scheduler 会尝试给该Stage 选择一组ESS 服务作为 PushMergerLocations 。


  • Map task 执行完会将计算结果写到本地磁盘, 然后将所有 blocks 信息上报给Driver。最后多了一个判断,如果开启 push based shuffle, map task会启动一个线程池读取本地的 shuffle 数据,将 shuffle block 数据推送到 block 所属的 reduce task 对应的 remote ESS 服务。


  • ESS 服务接收自不同 Executor 推送过来的 shuffle blocks,相同 reduce 的 shuffle 数据会合并到同一文件中,多个 shuffle blocks 会合并成一个 chunk 进行存储,此外还会有 index 文件和 meta 文件来保证数据的可靠性。


  • Reduce 阶段在启动之前会有一个等待,让更多的 map 结果被 push 到 remote ESS,然后 Spark driver 会向所有 PushMergerLocations 的 ESS 发送一个 FinalizeShuffleMerge 请求,ESS 服务收到请求后,停止接收 pushed shuffle 数据,并持久化所有缓存数据到文件中,最后向 Driver 返回最终 merged shuffle blocks 信息。


  • Reduce task启动后获取 shuffle Map Status 元数据和 merged status 元数据。对于已经 merge 好的 shuffle 数据,reduce task 先向ESS 服务请求获取 merged shuffle blocks 的 meta 数据,然后获取对应的 merged shuffle 数据;对于还没有被 merge 的 shuffle 数据,直接从原来的map task 所在节点的 ESS 服务请求读取 shuffle 数据。获取到 shuffle 数据后,继续执行 reduce task。


Push based shuffle 过程

(M. Shen, Y. Zhou and C. Singh. Magnet 2020)


04


若干讨论

Push based shuffle模型相比于 pull based shuffle 模型需要额外将 shuffle 数据传输到 remote ESS服务,但是为什么开启Push based shuffle 还会更快呢?下面我们一起详细分析 push based shuffle 到底快在哪些地方。


push shuffle数据和map tasks同时进行


Spark DAG在计算调度的时候,会将Job划分为stage,然后根据依赖关系逐个调度 stage来执行,其中reduce stage一定会等待map stage的所有task执行完成。因此在 map stage执行完成之前,先执行完 map task 的 executor 就有机会将 shuffle 数据传输到 remote ESS服务,而不影响 executor 同时执行其他 job 的tasks,所以开启push based shuffle并不影响集群的计算吞吐。


作为对比,在MapReduce 框架中有一个类似的优化。当 map tasks 和 reduce tasks 非常多的时候,一般情况下 map tasks 不会同时完成。为了优化 shuffle 过程, MapReduce 框架允许当 map tasks 完成一定的百分比后,就开始提前调度和启动部分 reduce task,这样提前启动的 reduce tasks 在仍有 map tasks 在计算的时候,就可以读取 shuffle 数据了;但是提前启动的这部分 reduce tasks还是依赖于全部map task 的输出,所以要等待所有map tasks执行完成,才能接着完成reduce task,同时还占用着集群的计算资源,所以在MapReduce框架要协调好他们之间的调度,才能更好的优化计算任务。


合并Shuffle blocks请求


Reduce shuffle wait 一般主要是因为ESS响应慢导致的。面对一个大的Spark Job时,ESS服务面临的是请求量大,请求时间比较集中,请求的数据量较小的 shuffle 数据请求。比如我们有十万个map task和一万个reduce task, 每个map task shuffle写数据是200M,如不考虑本地读 shuffle 数据的情况,则平均每个reduce 要向ESS服务发送十万个,平均大小为20k的 block请求,如果我们Spark集群有1000台ESS服务机器,则每个ESS服务要在短时间内服务一百万个平均大小为20k的shuffle block read RPC请求。


每个ESS服务使用Netty 线程池来响应这些请求,但受限于机器的CPU和磁盘资源,RPC请求量大的时候,仍会有请求等待Netty 分配线程来响应;对收到的 shuffle 数据请求,ESS服务先通过磁盘读取shuffle index文件,然后读取shuffle data 文件中对应的 shuffle block数据,最后返回结果。如果map task的 shuffle 数据是存储在 SSD磁盘上,磁盘I/O时间相对会快一些;但如果 shuffle 数据存储在HDD磁盘,shuffle block数据又非常小,频繁的随机 I/O 会导致整体磁盘吞吐下降,shuffle 请求延迟变大。


一般情况下,shuffle 服务都是和其他大数据组件部署在一起的。当 shuffle 节点上其他服务占用较多CPU或磁盘I/0资源时,也有可能会导致 ESS 服务响应比平时慢。当集群 ESS 节点较多时,这种情况发生概率更高。


开启push based shuffle 之后,ESS服务会将接收到的同一个 reduce task 的多个 shuffle block 数据合并为文件大小更大的 chunks,reduce 在请求 shuffle 数据的时候,每次返回一个chunk,大大减轻了网络请求的压力,同时磁盘I/O从随机读取变成顺序读取,I/O效率明显提升,所以ESS服务响应更快更稳定。


Reduce本地读shuffle数据


Remote ESS 服务接收到shuffle 数据后,会将同一个 reduce task 的shuffle数据合并成同一个文件。Spark 为了避免 reduce task再通过网络读取该reduce task的 shuffle 数据,当remote ESS 服务合并了超过 REDUCER_PREF_LOCS_FRACTION (默认 20%)的 blocks 时,Spark DAG scheduler 会尽可能的把 reduce task调度到该 romote ESS 服务所在机器上执行,该reduce task的大部分shuffle数据就是本机读取,不再需要网络传输。


在集群 executor 资源相对充足时,reduce task 被调度到 preferred location 上执行,统计 shuffle 数据为 TB 级的 query metrics ,不开启 push based shuffle 的local shuffle read 占总shuffle量的 0.43 %, 开启 push based shuffle 的 local shuffle read 占总shuffle量的 76%



05


eBay Carmel在push based shuffle的优化

为了提升生产环境 Spark 集群 shuffle 的稳定性,Carmel team 在社区版 Spark 的基础上做了进一步的优化,比如对 ESS 服务增加了拥塞管理,当 ESS 服务接收到的请求过多时,会拒绝部分请求,然后由客户端等待一段时间,再重新发起请求,以提高 ESS 服务的稳定性。针对 push based shuffle做了如下优化:


ESS服务在接收和合并shuffle数据优化


社区的代码,负责接收和处理 pushed blocks数据的线程,在收到map task 的 push 数据时,会在当前Netty线程中将数据写入到 merged shuffle file中。对于同一个 reduce 的其他map task 的 push 数据会先保存到内存中,等待写入。但是如果后到达的block 数据先传输完毕,会抛出 BLOCK_APPEND_COLLISION_DETECTED (写冲突)的报错,然后由 map task 重新发起 push 数据的请求,写冲突严重的时候merge 效率会非常低,另外因为接收数据和写merge文件都是由 netty workerGroup 中的线程完成,所以当 ESS 需要同时处理的请求的个数非常多的时候,netty线程池很容易被用光,导致ESS服务不再响应其他Job的 reduce  task 发过来的Fetch数据请求;此外,当同时 push 的 shuffle 数据量非常大时(例如TB级别),ESS 服务中缓存的数据有可能会导致NodeManager OOM。


Carmel team 对ESS的merge流程进行了优化。


异步写merged shuffle文件


我们将接收 shuffle 数据和合并写 merged shuffle 文件分为两个独立的过程,ESS 服务中的Netty 服务只负责接收数据,并保存数据到的内存池中,合并写merged shuffle 文件由一个独立的线程池来完成。因为写数据是在 block 数据全部接收完成之后,不用等待网络数据传输,写数据的工作会更快,同时释放了netty worker 线程。


解决写merged shuffle文件冲突


为了优化社区Spark中的 BLOCK_APPEND_COLLISION_DETECTED (写冲突)的报错,因为同一个 reduce 只对应一个最终合并 shuffle 文件,且同时只能有一个线程写,所以我们将同一个 reduce 的 shuffle blocks 数据分配给写数据的线程池中的同一个写线程即可。


增加对接收pushed blocks的内存池管理


当等待写merge文件的 pushed blocks 过多,有可能会导致NodeManager OOM。因此我们在对接收 pushed blocks 的内存池做了管理,当接收的 pushed blocks 大于内存池上限时,会开始将部分 pushed blocks 数据 spill 到临时文件中,等待合并到最终 shuffle 文件中。


Reduce task合并MapStatus和MergedStatus优化


如前所述,reduce task 在 fetch 数据前,reduce task 需要合并 MapStatus 和 MergedStatus ,以确定哪些block 数据是从 merged ESS 服务获取,哪些block数据是从 map task 的ESS服务获取,当需要合并的block 非常多时候,merge过程可能要好几十秒,Carmel Team  通过调整遍历 map shuffle blocks 和 merged blocks 的遍历顺序对这个 merge过程进行了优化,并反馈给了社区 [4]。



06


优化效果展示以及结果分析说明

Carmel Team 使用优化后 push based shuffle 代码分别针对 TPCDS 数据集和生产环境数据进行了测试。


基于 TPCDS 5T测试集的测试是在90台机器节点上进行的,开启 push based shuffle 和不开启,query 执行时间相差不大。


我们在生产环境中启动了一个有233个executor,每个executor有18个cores 的queue分别对 shuffle 数据量为GB级和TB级的 query 进行了性能测试。


当shuffle数据量在GB级别的时候,平均每个 reduce task 读取的数据量非常少,reduce stage也非常短,所以开启 push based shuffle 可能还会比不开启略慢一些。


当shuffle数据量在TB级时,reduce task会出现明显的 shuffle wait 等待。开启push based shuffle 后,query 的 shuffle wait 等待时间平均可以减少 90%,整体query的执行时间有10%左右的性能提升。



对比 query 的 max fetch wait time (间接反映了 fetch 请求在 ESS 服务比较繁忙的时候的等待时间)有了明显减少。


测试结果分析:

  • 我们公司Spark集群使用的都是SSD磁盘,ESS服务对 shuffle 数据的读写相比HDD磁盘要快很多,所以在 Spark shuffle 数据量较小的时候,开启 push based shuffle, query 执行时间并不能有明显性能提升,甚至会变慢一些。


  • 在 Spark shuffle 数据量较大的时候,开启 push based shuffle,reduce tasks 获取 shuffle 数据的时候有非常明显的性能提升,fetch 数据等待时间大幅减少,从而导致 query 执行时间有明显性能提升。


测试说明:

  • 因为在生产环境中是和其他生产queue共享ESS服务,所以是在集群比较空闲的时候,测试了多轮测试,取运行时间较短的结果。


  • Shuffle数据量比较大的query一般也需要读取大量HDFS数据,由于HDFS read的时候可能存在slow read node,比如测试50T Disable push based shuffle 时,HDFS read stage要多5.4 min。


07


总结

从Spark job shuffle 量上来看,当Spark Shuffle 数据量较小的时候,push based shuffle 没有明显优势;但是当Spark shuffle 数据量较大的时候,push based shuffle 会明显降低 shuffle wait time, 从而加速 reduce Stage的计算。


目前Spark社区主要的 push based shuffle 代码来自Linkedin 贡献[5],阿里巴巴开源了自己的push based shuffle 框架RemoteShuffleService[6], 腾讯开源了自己的push based shuffle 框架Firestorm[7],各自版本的实现都有自己的优势,比如社区版本的实现和Spark现有的代码在通信协议,调用方式等做了深度的集成,阿里和腾讯的代码则可以独立部署,一个shuffle server 宕了,并不会有导致 NodeManager 挂掉的风险等优势。


参考文献

  1. https://spark.apache.org/docs/3.3.0/running-on-yarn.html#configuring-the-external-shuffle-service

  2. M. Shen, Y. Zhou and C. Singh. Magnet: Push-based Shuffle Service for Large-scale Data Processing. PVLDB, 13(12): 3382- 3395, 2020.

  3. https://spark.apache.org/docs/3.3.0/configuration.html#push-based-shuffle-overview

  4. https://issues.apache.org/jira/browse/SPARK-39325

  5. https://issues.apache.org/jira/browse/SPARK-30602

  6. https://github.com/Tencent/Firestorm

  7. https://github.com/alibaba/RemoteShuffleService



eBay技术荟
eBay技术荟,与你分享最卓越的技术,最前沿的讯息,最多元的文化。
 最新文章