背景
Carmel是eBay内部基于Apache Spark打造的一款SQL-on-Hadoop查询引擎。通过对Apache Spark的改进,我们为用户提供了一套高可用、高性能的数据分析服务,以满足eBay内部大量的交互式数据分析需求(单日查询量超过40万)。
近年来,eBay Carmel开发团队一直致力于改进 Spark SQL的查询性能,尤其是Join的查询性能。针对不同类型的Join,我们已经开发了Runtime filter for broadcast join,Bloom filter for shuffle join和Range join,它们均获得了明显的性能提升。去年,我们留意到生产上另一类Join的性能不是很理想,即先Join后聚合而且聚合后的数据会减少很多(如下图所示)。本文将详细介绍我们对这类查询所做的优化。
介绍
针对这类场景,我们增加了一个新的逻辑优化Rule。在合适的时候,聚合中的局部聚合会下推到Join之前执行。局部聚合下推在不引入额外的Shuffle下有多个好处:
◆ ◆ 减少Join之前Shuffle和Sort的数据量
◆ ◆ 减少参与Join的数据量
◆ ◆ 因为减少了参与Join的数据量,有时能避免Join膨胀或者倾斜
◆ ◆ 因为在Shuffle之前减少数据量,有时能将Shuffle Join转成Broadcast Join
以下示例(TPC-DS的表)说明了下推局部聚合的基本思想和三种下推的方式,左边是原始的计划,右边是下推之后的计划。
1
只下推Partial Aggregate到Join的左边
2
只下推Partial Aggregation到Join的右边
3
下推Partial Aggregation到Join的两边
下推Group By ss_item_sk到store_sales这边能减少大量数据(14,399,993,722 行 vs 330,583,819行)。下推Group By i_item_sk, i_brand_id到item这边不能减少数据,也就是说下推不是100%能获得收益,针对这种情况,我们开发了Adaptive Partial Aggregation(不在本文讨论范围内,思想是每个Partial Aggragate任务在处理到一定行数的时候如果不能明显聚合数据,则把剩余的数据直接输出,不做聚合计算)。在 Spark上进行的实验证实,对于这个查询,第一种和第三种下推方式都能获得收益。
支持下推的聚合函数
最常用的几个聚合函数都可以下推,我们把它分成四类:
◆◆ MIN, MAX, FIRST 和 LAST。这些聚合函数下推最简单,只需要把原始的聚合函数的输入替换成下推之后的输出。比如对于聚合函数:min(col),下推的是:min(col) AS push_min_col,则原始的聚合函数应该替换为min(push_min_col)。
◆◆ SUM。Sum需要替换成下推之后的Sum乘以另一边的Count。必须要乘以另外一边的Count,因为之前的Join key上可能有重复值。比如对于聚合函数:sum(col),下推的是sum(col) as push_sum_col,则原始的聚合函数应该替换为sum(push_sum_col) * count(*)。
◆◆ COUNT。Count需要替换成sum(left_count * right_count)。并且如果有Count函数出现,必须要有Group By字段。这是因为count(*)计算一个空的输入,结果是0,但当sum(0 * 0)计算一个空的输入,结果是null。
◆◆ AVG。Avg需要重写为sum/count之后再下推。
支持下推的join类型
不是所有的Join类型都可以下推,我们当前支持了Inner Join和Left/right Outer Join。
◆◆ 对于Inner Join,支持下推到左边,右边,或者左右两边。
◆◆ 对于Left Outer Join, 只能下推到左边。如果下推到右边,可能会给右边Join不上的补NULL。导致最终的结果不正确。我们来看一个例子:
我们分别比较下推和没下推之前Join的结果:
由此可见,Group By NULL和Group by F的结果定然不同。
◆◆ Right Outer Join和Left Outer Join类似,只支持下推到Join的右边。
Partial aggregation下推优化流程
整个执行流程大致分为六步(见下图):
首先看Join后面是否是聚合。如果存在Avg聚合函数,重写Avg聚合函数为sum/count。将Join Keys,聚合函数和Group By表达式都按照他们的References从Join左右两边完全分开。比如我们将上面例子中分成这样:
如果能完全分开,并且所有的聚合函数都是可下推的聚合函数,那么根据统计信息分别判断下推到左右两边是否有收益(Join左右两边分别按照如下的方式聚合,聚合的输出能明显减少认为有收益):
如果下推有收益,为有收益的这边构造Partial Aggragate;如果两边都有收益,两边都构造。构造出来的Partial Aggregate的Group By表达式是该边的Join Keys加上Group By Keys, 聚合函数是该边的聚合函数加上count(*):
替换原始的聚合函数,整个流程结束:
TPC-DS 基准测试结果
在我们使用 TPC-DS 基准测试中,通过下推Partial Aggregation 到Join之前能产生最高6.2倍的加速,并且 16 个查询的加速超过 1.1 倍(见下图)。其中q4,q6,q11,q23a,q23b,q31,q37,q38,q54,q71,q74,q82,q87减少了大量Shuffle数据;q24a, q24b减少了Shuffle数据,避免了Join膨胀,并通过AQE将Shuffle Join转成Broadcast Join,进一步提升了Join的性能;q68减少了Shuffle数据之后将Shuffle Join转成Broadcast Join。
总结
本文介绍了在Spark SQL中如何实现下推Partial Aggregation到Join之前、支持下推的集合函数和支持下推的Join类型、以及TPC-DS的基准测试结果。
该功能已经上生产半年多。我们采样了56739个查询,其中有16731个包含Join,4753个用到了该优化。对一些Join的提升非常明显,特别是Join膨胀的类型。最明显的一个在没这个功能之前需要3600秒,有了这个功能之后只需要186秒。
参考链接
[1] https://docs.teradata.com/r/Daz9Bt8GiwSdtthYFn~vdw/Y7gpiuhWtXKl7~~RlPUeog
[2] https://www.vldb.org/conf/1995/P345.PDF
[3] https://issues.apache.org/jira/browse/SPARK-38506
[4] https://issues.apache.org/jira/browse/SPARK-38505
[5] https://github.com/wangyum/tpcds-benchmark