Iceberg x Amoro 在多点 DMALL 数据入湖的探索实践

科技   2024-09-04 08:24   辽宁  
点击上方蓝字关注我们,了解更多内容

Amoro 是一个构建在 Apache Iceberg 等开放数据湖表格之上的湖仓管理系统,提供了一套可插拔的数据自优化机制和管理服务,旨在为用户带来开箱即用的湖仓使用体验。

作者简介

贾敏,多点 DMALL 资深大数据研发工程师,主导公司核心数据集成平台架构设计与 LakeHouse 的技术落地。其拥有丰富的大数据平台架构经验,长期专注于PB 级数据实时同步、数据湖建设以及 Spark 计算引擎性能调优等主流大数据技术领域。作为Active Contributor,持续在多个开源项目如 Apache Spark、Apache Iceberg、Apache Amoro (incubating) 、Volcano、Flink CDC 等贡献并推动相关特性的改进。

李铭,多点 DMALL 资深大数据研发工程师,目前负责公司大数据云原生架构设计与数据基座新特性研究;研究领域为大数据统一 SQL 网关、分布式文件存储、高性能计算、数据安全等。DataFun 技术社区年度星级志愿者,大数据开源社区爱好者,重点关注多个开源项目Apache Kyuubi、JuiceFS、Apache Celeborn、StarRocks等在司内的适配和应用。


01 背景

随着大数据技术的迅猛发展,数据湖作为新兴的数据管理范式应运而生。在此基础上,通过简化架构、优化数据流程、引入创新技术等方式,降低企业数字化转型中的 TCO(总体拥有成本)成为可能。多点 DMALL 作为零售科技领域的先行者,我们持续探索和实践前沿技术,旨在提升数据处理效率,实现数据管理的标准化和自动化。通过这些努力,我们逐步构建了一个更高效、更具成本效益的大数据生态系统,赋能企业决策和生产运营。

1.1 多点大数据云原生架构简介

多点 DMALL 为行业提供全方位的零售数字化解决方案,而为了满足越来越多 B 端客户的需求,我们需要在多云环境下构建更具性价比、可复用的大数据底层基座和平台工具链。为此,多点 DMALL 大数据团队充分利用已有经验,引入 Kubernetes、JuiceFS、Apache Kyuubi、Apache Celeborn 等开源技术,并结合未来业务需求,设计构建了一整套基于云原生的大数据集群架构,达成了存算分离、轻量级、可扩展、云中立的技术目标。

1.2 原有“数据入仓”方案

对于关系型数据库,“入仓”的数据为前一日业务库数据的快照,也就是 Hive 表数据的更新频率为 T+1。

具体步骤如下:

  • 基线任务:通过 JDBC 一次性将 MySQL 表中的所有存量数据拉取到已经映射的 Hive 表中。

  • 消费任务:以小时级的固定频率消费 Kafka 集群的 Binlog 数据,经过解析、过滤和清洗后写入 HDFS 待用。

  • 合并任务:每日凌晨,基于 Hive 存量数据和消费任务生成的已落 HDFS 的数据进行合并,还原出业务数据。


以上通过一个实例详细说明了数据合并流程:首先将当天生成的 Binlog 数据存储到 Delta 表中,然后与已有存量数据基于主键取最大 Offset 进行合并。

1.3 遇到的问题


问题

描述

数据时效性差

业务团队在业务处理中广泛应用大数据,T+1的离线同步已不能满足诉求,越来越多的数据场景需要实时或准实时的支持。例如,某业务团队需要在一天中不定时查询商业智能(BI)报表获取最新数据,以便及时进行商品补货。

多时区需求

海外业务场景,商家数据涉及多个时区,不同的时区会独立触发计算任务,显然当前的 ODS 同步方案无法实现数据小时级进仓。

同步期间表不可用

当平台进行数据表同步时,数据文件将被重写。若下游任务正在运行,可能导致运行失败。因此,平台与其他数据使用方约定在凌晨 0-2 点进行 ODS 表数据同步,其余数据依赖任务推至2点后执行。

新增字段容易超时

Hive 表添加字段的过程非常耗时,因为需要为每个分区逐一更新 Schema 信息。如果表分区较多,添加字段操作很容易导致与Hive MetaStore连接超时、任务失败并触发告警。

写入放大严重

数据合并写入的最小粒度是分区,只要分区中的任意一行数据发生变化,就需要重写整个分区的数据。

表分区设计不合理导致的治理困难

Hive 表引入了“分区列”的概念。在进行数据查询时,只有显式地添加过滤条件才能发挥过滤作用。在实际使用过程中发现,建表时的分区设计不够合理会导致下游查询慢,而调整分区则需要协调所有使用该表的业务方,修改下游的 SQL 语句或代码,“牵一发而动全身”,带来了极为繁重的运维治理工作。

JuiceFS + TiKV 同时写入大量文件引发的性能问题

在存算分离架构设计中,我们引入了 JuiceFS 来替换 HDFS,并利用TiKV作为元数据存储 。但是在测试时发现,一旦需要处理的变化的文件过多,同时写入时JuiceFS 会卡在修改元数据,也就是修改父目录的属性上。这是因为TiKV对事务有“锁”的机制导致的。


02 技术选型

2.1 Iceberg

多点 DMALL 的大数据平台团队在 2022 年底深入研究了主流数据湖技术方案,包括 Delta Lake、Hudi 和 Iceberg(当时 Paimon 尚未独立孵化)。经过全面的对比分析,我们认为 Apache Iceberg 是最符合我们场景的选择。

我们最终选择 Apache Iceberg 的原因:

  • 优秀的表结构设计:支持高效的数据读写、灵活的 Schema 演化、时间旅行和快照管理、高并发写入、隐式分区、详细的元数据管理,并且与大数据生态系统集成良好,还支持 ACID 事务。

  • 高质量代码:采用了模块化设计,文档和注释详尽,拥有活跃的开源社区,具备性能优化策略,代码风格统一,这使得我们更容易理解技术细节且参与其中。

  • 隐藏分区:Hive 表结构中繁琐且易出错的“分区列”已被废弃。在查询表数据时,不再需要显式添加分区过滤条件,Iceberg 会基于 Meta 元信息自动跳过不必要的文件。此外,随着数据或查询的变化,表布局可以动态更新,这对于我们后期优化表布局非常有利。

  • SQL 兼容性强:我们的业务 ETL 开发几乎全部基于 Spark SQL,因此切换到 Iceberg 后,业务上的 SQL 基本不需要进行任何调整。

2.2 Amoro

在引入 Iceberg 后,我们遇到了一些问题,如快照过期清理、MOR 模式写入产生大量小文件和 Delete 文件、写入失败产生孤儿文件等。如果使用 Iceberg 官方提供的 Compaction 运行 Spark 任务来解决这些问题,将难以长期和及时地保障 Iceberg 的可用性,同时也会大大增加后续的维护工作。幸运的是,我们发现了 Arctic(当时还未改名为 Amoro),能够解决我们引入 Iceberg 后遇到的相关难题。

Apache Amoro (incubating) (以下简称Amoro)是构建在 Apache Iceberg 等开放数据湖表格之上的湖仓管理系统,提供了一套可插拔的数据自优化机制和管理服务,旨在为用户带来即开即用的湖仓使用体验。

我们比较关注 Amoro 的功能 :

  • Compaction 服务:持续进行小文件合并。对于 Iceberg 表 V2 格式(Row-level Deletes),除了合并小文件,还需要对 Delete File 进行转换和合并。Amoro 引入了一套 Self-optimizing 机制,目标是基于新型数据湖表格式打造像数据库,传统数仓一样开箱即用的流式湖仓服务,Self-optimizing 包含但不限于文件合并、去重、排序、孤儿文件和过期快照的清理。

  • Expire Snapshots 服务:定期对过期的 Snapshots 进行清理。如果 Snapshots 一直不清理,那么元数据的文件就会越来越多,最终导致文件系统庞大而崩溃。

  • Orphan Files Clean 服务:事务的失败或快照的过期,均会导致这些文件在元数据文件中已经不再引用了。Amoro支持定期清理这些“孤儿文件”,减少人工繁琐操作。

  • 提供管理工具:包括但不限于 DSL 指令、Metrics、Dashboard等,能够轻松地日常运维和表管理。

03 数据入湖实践

上图展示的是多点 DMALL 的最新大数据架构。我们已将 Apache Iceberg 引入并设置为平台默认表湖格式,也在持续推进替换原有 Hive 表格式。当前,各个集群的 ODS 层表已全部升级完毕,DWD 层等其余数据仓库层的表也在逐步升级替换。

Amoro 可以通过 Helm 一键部署在 Kubernetes 集群上,其中包括 AMS Server 和 Optimizer 的安装。我们选择 Spark 作为 Optimizer 的实现,充分利用 Spark 的 Dynamic Resource Allocation (DRA) 特性,使优化器具有一定的弹性能力。此外,我们也会在部署时打开 Metrics 上报给 Prometheus,并结合 Granafa 进行监控报警。

3.1 同步链路改造

在整个 ODS 数据同步链路上,我们仍然采用 Spark 跑批任务的方式将数据同步到 Iceberg,相较于原有的“数据入仓”方式,缩短了同步链路:没有中间 Delta 表,Kafka 数据将直接进入 Iceberg 。在生产环境中,我们已验证间隔 20 分钟的数据同步能够保证数据完整性和任务稳定性,并且能满足当前和未来较长时间业务对时效性的需求。

以下是两个核心改造点:

Spark 自定义支持 Upsert

在 Spark 中,仅提供了 Merge Into 语法来进行数据合并。验证结果显示,Merge Into 会读取现存的数据集,然后根据唯一键进行连接(Join)并更新数据集,最后选择采用 Copy-on-Write 或 Merge-on-Read 方式写入数据。然而,这种方式显然无法满足我们在 ODS 快速写入场景中的需求。

在研究了 Flink 的写入源代码逻辑后,我们决定在 Spark 上实现 Upsert 语义。按照 Iceberg 规范,我们新增了 identifier-fields 表属性来存储表的 Primary Key。在写入 Data File 之前会先将主键写入 Equality Delete File。每个 Spark Task 至少会产生 1 个 Data File 和 Equality Delete File,并且重写 Spark Partitioner 实现(Partition + Primary Key)来确保同一行数据在一个 Task 内处理。

下图为这个处理逻辑示例:

设计弹性扩容的同步任务组

在原有的同步方案中,每个同步库都会对应一个 Spark 任务。这种方式虽然有助于库与库之间资源的隔离和同步的并发效率,但当调度频率提高到分钟级后,Spark 任务启动资源的开销和耗时就变得不可忽视了。因此,我们引入了同步任务组,将同步库和任务组之间的关系从原来的一对一改为了多对一。

根据库表的重要程度和实时要求,将同步任务划分为不同的组,任务组之间相互独立且可以随时切换库表到新的任务组上。原本需要同时启动的数百个 Spark 任务,现在只需几个Spark调度任务即可完成。

同步任务组启用弹性支持。在串行执行 Job 的场景中,常会出现同一库下不同数据表数据变化差异较大,导致任务耗时拖尾,需要等待当前 Job 运行完成后才能够进行下一个 Job。为了解决这个问题,可以采用多 Job 提交模式,一个 Job 对应一个同步库,这样同一时刻可以并行执行几个同步库,减少 Executor 的空闲时间。

3.2 平滑迁移

作为多点 DMALL 大数据基座支持部门,我们同时运维大量集群和海量库表。因此,将这些环境中的 ODS 库表格式替换为 Iceberg 表格式,是一项极具挑战性的任务:想要在迁移过程中确保业务无感知、数据正确和安全稳定不是一件容易的事。

分批迁移与迁移工具开发

在实际迁移中,我们选择对集群、库表进行分批次处理切换。我们更倾向于先处理风险低的集群,以及数据量少的库表。一方面新技术更新需要控制风险,二来也通过少量数据对新方案和迁移工具进行再次验证。

在迁移工具的选择中,虽然Iceberg 社区提供了表迁移工具,但是由于我们对表结构进行了增强,迁移代码的定制要求较高,我们还是决定自主开发来适配特殊场景。最终我们自主开发的迁移工具支持的功能包括:批量迁移、新老元信息迁移、表结构适配、表数据适配、一键回滚等。

兼容 dt 字段

在 Hive 表中,我们会将时间字段格式化为年-月-日的格式,并设置为统一的 dt 字段作为表的分区列。因此在下游的业务查询中,dt 字段是一个非常重要的 SQL 条件。

升级到 Iceberg 表时,我们决定充分利用隐式分区的特性,解决之前数据难以治理的情况。但为了方便用户层无感知迁移SQL,我们决定在 Iceberg 表上也添加字段 dt,只是不再单独作为分区字段,也不影响隐式分区与文件过滤。该字段的数据来源与老 Hive 表的生成逻辑完全兼容。具体实现方式是通过在表上增加 Properties 来进行扩展。

Properties

### 自定义格式:$fieldName|$formatUdf|$referFieldName|$referFieldType

unidata.source.compatible-partition=dt|format2date|biz_time|datetime

3.3 Amoro 实践

Spark Optimizer:已贡献社区

在 DMALL 大数据团队中 Spark 是主要依赖的计算引擎。由于 Optimizer 能够快速利用云计算的弹性能力,因此我们首先在 DMALL 内部进行了试验性的 Spark Optimizer 功能实现,并经过一段时间的运行后,效果良好。我们与社区进行了沟通,并决定贡献给社区。最终在社区同学的帮助下,该特性被合并到了Amoro主干分支。

[AMORO-1812] Support spark-based external optimizer

[Feature]: Support spark optimizer container for AMS

Amoro Dashboard 为表治理提供便捷

Amoro 提供了丰富且详细的查询视图,能够直观地查看表的状态,比如文件数量、平均文件大小、分区文件夹大小、优化器运行情况等。我们在日常运维中重度依赖这个功能,通过信息的分析观察,为日常表治理提供了优化灵感。

根据我们的治理经验,归纳了以下几种优化场景:

表特征

优化方案

分区数多且平均文件较小

提高分区数据粒度,如将天分区改为月分区

分区数多且大部分分区变更,数据更新频繁

将表转换为非分区表

非分区表、数据量大且数据更新不频繁

选择不可变时间的字段进行时间分区,如 created 字段

非分区表、数据量大且数据更新频繁

按主键字段进行数据分桶,以提高并行读写能力

定制监控报表和报警

在深入使用 Amoro 一段时间后,我们遇到了新的问题:

  • 当优化器突然异常退出,或者 Pending 表较多需要增加优化器资源时,我们该如何及时感知?

  • 能否制作一些我们关注的看板,以直观地观测运行状态,例如数据表在一段时间内的变化、优化时长排行等?

经过分析,我们利用社区提供的Metrics模块,通过 Prometheus 进行采集,自主研发了 Grafana 上的可视化图表,可以直观的展示优化器状态信息和优化文件的处理信息。


除此之外,我们也设计配置了 Grafana 规则,对接内部飞书告警,及时通知异常情况进行人工介入。


04 效果和收益

4.1 同步资源消耗明显减少

采用 Iceberg 表格式,能显著提升写入性能。通过优化同步链路,还能降低单个库表同步的资源消耗。此外,同步数据频率的提高,也能充分利用大数据空闲时间段的资源。根据我们在生产环境中对资源监控,发现能节省大量计算资源。

下图是某个私有化环境切换前后资源监控对比:

4.2 数据利用率提高,表治理效果显著

我们承诺向业务方提供1小时级的数据同步服务,这一改进满足了公司绝大多数用数场景。此前,由于统一的数据接入仅支持 T+1,部分业务方只好利用同步工具单独同步,造成了资源的浪费。现在统一通过平台进行处理,不仅稳定高效,也提升了数据利用率。

另一方面,通过针对分区不合理的表进行无感知优化,平台上文件数量显著减少,写入和查询效率也得到了一定程度的提升。

4.3 减轻了运维工作

原有同步方案技术相对落后,容易引发诸多问题,例如:加字段超时、分区变化频繁导致同步失败以及凌晨资源不足导致任务等待超时等。这些问题往往需要运维人员在凌晨人工介入处理告警,这无疑增加了日常运维的工作负担。然而,自从新数据同步方案上线以来,相当于以往夜间的压力平摊到每一个小时,减少了资源利用高峰带来的不稳定风险,此类问题引发的告警再也没有出现过。


05 踩坑经验

5.1 Hive Metastore锁不会被释放

Iceberg 使用 Hive Metastore作为元数据存储时,如果在写入数据过程中发生异常退出,锁将不会被释放。为了解决这个问题,可以在 Hive Metastore 服务端进行如下配置:

XML

<property>

<name>hive.support.concurrency</name>

<value>true</value>

</property>

<property>

<name>hive.txn.manager</name>

<value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>

</property>

<property>

<name>hive.compactor.worker.threads</name>

<value>1</value>

</property>

<property>

<name>hive.compactor.initiator.on</name>

<value>true</value>

</property>

5.2 Iceberg 提供的 SparkSessionCatalog 不加载 UDF

我们最初使用 Iceberg 1.2.0 版本时发现,使用 Spark 的 CREATE FUNCTION 语句后,该UDF不会被写入 Hive Metastore 供其他引擎使用。1.3.0之后的版本中官方已经解决了这个问题。

https://github.com/apache/iceberg/pull/7153

5.3 Amoro Spark Optimizer:启动 JuiceFS 缓存,读取数据加速

我们使用 JuiceFS 文件系统,它支持通过 SparkConf 打开 Pod 的 Hostpath 方式挂载。启用缓存后,读性能有所提升,因为热数据文件可以直接从本地读取,速度比从远程对象存储获取更快。

Properties

# Spark的配置产生,可根据各自情况设置缓存目录

spark.Kubernetes.executor.volumes.hostPath.jfscache-dir.mount.readOnly=false

spark.kubernetes.executor.volumes.hostPath.jfscache-dir.mount.path=/data/jfscache

spark.kubernetes.executor.volumes.hostPath.jfscache-dir.options.path=/data/jfscache

spark.kubernetes.driver.volumes.hostPath.jfscache-dir.mount.readOnly=false

spark.kubernetes.driver.volumes.hostPath.jfscache-dir.options.path=/data/jfscache

spark.kubernetes.driver.volumes.hostPath.jfscache-dir.mount.path=/data/jfscache

5.4 Amoro 打开 Full optimizing 后表一直处于 Pending 状态

在开启了定期 Full 优化后,我们发现经过一段时间的运行,表不再进行优化。我们将该问题反馈到社区后,0.7.0 版本已经解决了这个问题。

https://github.com/apache/amoro/issues/2934

5.5 Spark 拉取 Kafka 参数调优

我们的 Kafka 服务端版本为 2.3.4,采用 Spark 提供的 Kafka 0.10+ Source for Structured Streaming 的方式读取数据。如果想解决由于 Topic Partition 分布不均导致的读取速度慢的问题,可以考虑增大 minPartitions 值。

06 未来规划

目前该入湖方案已经在多点 DMALL 的多个私有化环境中部署并已稳定运行一段时间,为业务的快速发展提供了必要的支持。同时,该方案在一定程度上实现了降本增效,节省了计算、存储和运维成本。接下来,我们将朝着分钟级时效性方向继续努力,并尝试引入 Apache Flink + Apache Paimon 构建实时 LakeHouse 的解决方案。

同时,我们也在推进湖仓一体化建设。现已引入 StarRocks on K8s 并作为主要查询引擎,可以直接查询 Hive MetaStore 中的 Hive/Iceberg 表,减少数据搬运,提高数据一致性。此外,StarRocks 后续还将支持“仓冷沉湖”模式,自动将“冷”数据同步回 Iceberg 表中,为实时和离线场景提供更智能的数据支持。

END


精彩回顾

 社区动态:
Amoro 0.7.0 第一个孵化器版本正式发布
 用户案例:

有道基于 Amoro Mixed Format 构建准实时湖仓实践

思科基于 Amoro + Iceberg 构建云原生湖仓实践

更多资讯

社区鼓励任何形式的参与,并期待大家能与 Amoro 共同成长
欢迎 Watch Fork Star 一键三连~
官网:https://amoro.apache.org/
源码:https://github.com/apache/amoro


 Amoro 社群


后台回复【社群

扫描二维码添加小助手,邀你进群~


点击下方【阅读原文】直达 Amoro 官网

大数据流动
专注于大数据 数据治理 人工智能知识分享;提供数据要素 数据资产 数据入表 数字化转型 数据管理 数据架构 实时计算 数据中台 数据仓库 数据湖 元数据管理 Datahub dama最新资料;定期组织CDMP培训;开源 技术 数据。
 最新文章