作业帮历史数据计算引擎主要依赖 Apache Hive 2.3.7,主要用于数仓建设、即席查询、算法特征分析、实验效果统计等方面。虽然 Hive 在数据管理和计算方面有自己的优势,但随着湖技术、云原生、引擎向量化等技术发展,以及业务对成本敏感程度的变化,Hive 逐渐暴露出一些局限性,主要体现在引擎生态、资源利用效率和系统稳定性方面。
Spark 作为计算引擎基本已经成为行业大数据标配。能力上不仅有 SQL,还有 ML、Steaming、GraphX,以及对各种编程语言的支持。在 Catalyst 优化器、内存计算等资源利用效率方面明显优于 Hive。同时官方支持除 Yarn 部署模式外还支持 K8S,为在离线资源峰谷互补提供支持。Spark 作为事实上的标配,也吸引了很多外围开源项目的深度适配,较 Hive 的简单支持在性能、功能方面差异很大,例如 Apache Iceberg、Apache Kyuubi。
Hive 由 hive metastore 和 hiveserver2 两部分组成,metastore 主要解决 Hive 表数据管理,也是目前行业的主流方案,hiveserver2 主要用于接收 SQL 任务请求转为物理执行计划提交给 Yarn 等资源执行。随着湖相关技术的不断成熟,Databricks、Snowflake、公有云 EMR 等商业组织积极推广。Databricks 收购 Iceberg 背后公司 Tabular 后,Unity Catalog、Apache Gravitino 等元数据技术更是被提到了新的高度,用于统一解决 Data + AI 的数据管理,Hive metastore 也将会被逐渐替代。而 hiveserver2 抛开 Hive 计算引擎本身的没落,自身稳定性不足也比较突出,尤其在资源隔离、限流控制等方面。
在行业技术趋势以及业务需求上考虑,我们期望通过计算引擎迭代,实现规模性降本、整体架构可长期演进,系统性提高服务稳定性。
结合作业帮当前 Hive 应用情况,Spark SQL 替换 Hive 在技术架构上核心需要考虑 SQL 提交、认证和鉴权、以及数据开发平台适配。
SQL 提交开源解决方案主要有 Apache Kyuubi、Spark Thrift Server。STS 是 Spark 主导的 SQL 解析和优化器,兼容了 HiveServer2 的 Thrift 通信协议,但是 Thrift Server 和 Driver 在同一个进程内,所有 SQL 都由这个 Driver 负责编译、执行,此阶段资源消耗高,高并发情况容易造成服务节点异常。Kyuubi 则更加灵活,兼容了 STS 服务能力,Kyuubi Server 基于 ZooKeeper 实现服务发现机制保障高可用,针对 ETL、即系查询等不同应用场景设计灵活、可选的 Engine 共享级别。具体对比如下图 (引自 Apache Kyuubi 微信公众号)
认证和鉴权部分在平台(调度 / 即系等系统)和客户端(beeline)访问处理逻辑并不统一,解决起来会稍微麻烦些。为了兼顾项目效率我们采用了与 HiveServer2 类似的鉴权逻辑。如下图简单举例说明整体流程。
Beeline 客户端访问时,通过 Kyuubi 实现信息透传 Spark App,二次开发 Hive Metastore 实现认证,结合 Ranger Hive Metastore Plugin 实现鉴权。相对来说,平台实现会简单些,系统直接和公司内部 IPS 系统对接实现认证。服务入口获取相应 SQL 并解析,在通过 AuthServer 权限服务进行权限校验,较 Client 方式执行速度更快,不需要等待资源创建 Kyuubi Engine。认证鉴权完成后会以超级用户继续走后续流程。本质上账号体系是两套,为了实现一次授权双向可用,用户名必须统一。
整体迁移大概分为两个阶段,一是部门内部流量数仓由 Hive 迁移到 Spark SQL + Iceberg,同时沉淀平台能力。这部分主要为提供基础能力,人工执行迁移动作,规模性验证收益效果。此处不多介绍,可参考 https://www.infoq.cn/article/SXZhixk65MjUfEecrhGu。二是全面推广 Spark 引擎,规模性迁移快速拿到收益。
开始考虑仍然以业务人工自主迁移的方式进行,历史存量 Hive 任务有 1W+,牵扯多个组织角色和团队,少量团队结合当前成本压力意愿度较高,但从全局视角看节奏远低预期。整体上可投入精力有限,新引擎学习、迁移流程复杂(SQL 转换、双跑对数)、遇到问题后迁移意愿降低是主要影响因素。
为尽快拿到收益,以工具化迁移替代人工,大概流程如下图。首先通过调用离线调度平台接口获取 Hive SQL,替换 SQL 中时间等变量占位符,形成可执行 Hive SQL。解析可执行 Hive SQL 基于规则转换为 Spark SQL,同时在测试库下创建目标表,并替换输出结果到测试表,再做 explain 校验。资源低峰期执行 Spark SQL 任务,对比 Hive 产出数据结果(对比方式见下文)。对比收集 Yarn Application 中 Aggregate Resource Allocation 资源消耗指标。数据准确且资源收益为正,持续双跑一段时间无异常后切换为 Spark 引擎,并关注迁移任务接下来一周异常情况,迁移首天出现异常可自动回滚。整个过程当出现超预期问题时,人工分析具体原因迭代转换规则、优化集群和任务参数。经过几轮迭代,可迁移任务覆盖率逐步提高。
迁移工具流程图
基础类型对数 sum(hash(xxx)) 即可,复杂类型需要做些特殊处理,例如
array类型:sum(hash(array_sort(array())))
map类型:sum(hash(array_sort(map_entries(map()))))
array<map>类型:sum(hash(array_sort(flatten(transform(array(map()), x -> map_entries(x))))))
在控制增量任务时,用户视角主要关注以下情况,一是平台功能是否对齐;二是 SQL 语法是否存在差异;三是是否稳定;四是新引擎的收益。在平台功能上,主要是即席查询、例行调度、表 / 任务血缘、语法 / 语义检测、自定义 UDF、Holo/ES 外表查询等方面,随着存量任务迁移能力已补充。SQL 语法差异上,虽说两个引擎语法差异不大,但是明显感觉 Spark SQL 3.3.2 较 Hive 2.7 语法校验上更严谨,在应用上还是会有小的差别,例如 map 类型不支持 group by、distinct、join key;数据类型自动转换、空值做 map key 等异常情况下的输出表现存在差异等等。针对这部分主要以培训宣贯、整理用户使用手册的方式解决。稳定性问题在存量任务双跑阶段基本已经覆盖了绝大头,主要通过调整相关参数解决。切换 Spark 后的收益已有论证,不存在争议。总体上,为避免用户反馈太过强烈,平台能力具备后并没有急于做任务的增量控制,仍然以迁移存量任务和宣贯优势为主。当存量任务覆盖率占绝对优势后,修改默认计算引擎逐步关闭 Hive 入口。
在 Spark 应用中,OOM 问题一直比较突出,随着自身内存管理的不断发展和迭代已有明显改善,但在一些场景中仍然有这种风险。默认情况下,Executor 数据和计算过程所用内存资源主要在 JVM 管理的 On-Heap Unified Memory Pool 中,而 JVM 内对象通常是其原始数据的 2-5 倍,这种高内存占用主要因为封装为 JVM Object 时一些额外开销。加持数据内容、存储格式和文件存储压缩比差异,想要准确预测一个任务需要多少内存资源比较合理是件很难的事情。
为了兼顾迁移进度、稳定性、性能多方面因素,我们提供了两版参数。一是集群级别默认参数,兼顾绝大多数任务性能和稳定性。二是针对稳定性要求较高,允许一定性能损失的参数。以上参数工具化迁移阶段基本覆盖。部分异常 case 整理如下
Broadcast Join 时,广播数据阈值是通过 spark.sql.autoBroadcastJoinThreshold=10M 控制的,当压缩比较高时,需要申请较多的 On-Heap Storage Memory 内存,会导致等待资源时间长或者 OOM 问题,需要禁用或调低阈值才能解决。
执行窗口函数、开启向量化读时,缓存在内存中数据条数默认都为 4096 行,单行记录过大时会导致 OOM,适当调低阈值、避免扫描非必要的大字段可解决。
Kyuubi 提交 Spark SQL 任务时,Driver 会额外加载一些类、启动一些线程,默认的 Spark spark.driver.memoryOverhead 内存 10% 或 384M 总体偏小,导致任务 Driver 使用内存超限被 yarn kill,利用 jmcd 简单分析 Driver 内存消耗,这部分资源使用相对固定的,适当增加后解决。
针对 Spark 内存管理划分整理如下图,结合 Spark UI Metric 可用于辅助分析内存情况。
离线任务中有很多用户设置整点调度,同时任务实例数也比较多。Kyuubi 是通过线程数量来控制提交任务并发的,如果线程数较低,会产生 Yarn 资源空闲但是任务提交限流情况。如果线程数调高,高并发时瞬时产生大量 SparkSubmit 进程,消耗 Kyuubi 节点 Cpu、Mem 资源,Kyuubi 服务有稳定性风险。我们在 Kyuubi engine 启动后释放 startupProcessSemaphore 可以解除并发限制,Kyuubi 节点资源利用率较之前提高 70%,Yarn 资源也可以打满。相关 PR 地址 https://github.com/apache/kyuubi/pull/6463
在部分场景中(例如算法特征数据同步到 GPU 节点、业务数仓结果同步到 OLAP 引擎用于 B 端系统),对任务稳定性、结果集拉取速度比较敏感。Kyuubi 在处理结果集返回有多种方式,但针对我们的场景使用每种方式都有些缺陷。首先是大结果集返回时,因为 Kyuubi 默认使用的方式是 resultDF.collect(),这样会把所有的数据拉取到 Driver 的内存中再返回给 kyuubi Server,数据量大容易发生 OOM 问题。之后测试串行拉取 kyuubi.engine.spark.operation.incremental.collect=true 解决了内存问题,但是拉取速度上会比较慢,尤其是遇到 scan + filter 这种简单查询时。最后 kyuubi.operation.result.saveToFile.enabled=true,结果集大于 minSize 后会将结果存储到 hdfs/cos 等文件系统,但是会触发 Kyuubi 的小文件优化,用户 order by 后获取的结果集仍然乱序,最后优化 Repartition 小文件合并判断逻辑后问题解决。
日志数据在 Hive 表存储中占比非常高,为了保障日志打点的灵活性表设计一般都会包含嵌套类型(StructType、ArrayType、MapType),数仓在构建 DWD、DWS 层表时也会延续这种设计方式。而针对这种嵌套类型查询时会因为嵌套解析、未做 projection pushdown、逐行操作等原因导致资源消耗较多。Spark 在 3.4 版本默认开启嵌套类型向量化读,而我们引用的是 3.3.2 主要因测试覆盖度问题默认未开启。结合我们批量工具迁移逻辑及定向任务性能测试判断无风险有收益。开启向量化后效果非常显著,详细见下图。
SQL
spark.sql.orc.enableNestedColumnVectorizedReader=true
spark.sql.parquet.enableNestedColumnVectorizedReader=true
历史情况,大数据离线主要采用腾讯云定制高 CPU 核数和内存的大规格机型,高峰期 cpu idle 几乎为 0。明显感受处理数据吞吐能力下降,偶尔出现因 CPU 高导致的节点卡死、网络超时等异常情况,与腾讯 EMR 团队配合针对这种情况定向分析。节点高峰时定位消耗 cpu 较多的 yarn container,利用 async-profiler 分析 cpu 占用,发现瓶颈在 jvm gc。利用 jstack 分析 jvm gc 线程情况,发现相关线程数和 cpu 核数有关,有小几百个线程。一个节点 192c,跑 100 多个 container,jvm 线程总共约小几万个,大批量数据处理情况下,内存频繁分配和释放 GC 表现更明显。调整参数 ParallelGCThreads=8 限制 jvm gc 并行度,cpu 使用率变化情况如图。
https://docs.oracle.com/javase/8/docs/technotes/guides/vm/gctuning/g1_gc_tuning.html#g1_gc_tuning
总体上看,从平台能力建设 -> 内部少量任务灰度沉淀平台能力、验证整体效果 -> 存量任务规模化迁移 -> 关闭 Hive 增量入口,通过平台和工具化手段完成,整个迁移过程对业务影响很小。Spark 任务覆盖量从 0 到 1.5W,占例行任务约 80%,资源节省约 54%(同逻辑、同数据),收益超预期。同时未来 Spark on K8S 弹在线资源、基于 Kyuubi 做计算网关收口等技术演进提供了基础。