官宣|Apache Paimon 0.9.0 发布公告

科技   2024-09-18 20:02   湖南  
Apache Paimon PMC 正式发布 Apache Paimon 0.9.0 版本。此版本经历了4个月,共有 80 人参与了该版本的开发,并完成了 600 多条提交。感谢所有贡献者的支持!

社区已决定下个版本是 1.0 版本,标志着 Apache Paimon 的大部分功能已经比较完善与稳定。

01

01

版本综述


Paimon 的长期规划是成为统一的湖存储格式,满足时效分钟级大数据主流需求:离线批计算、实时流计算、OLAP 计算。此版本值得注意的改动有:


  1. Paimon Branch:此版本 Branch 功能正式生产可用,并且引入了 'scan.fallback-branch' 功能帮助业务更好的统一流批一体存储。
  2. Universal Format:此版本引入了原生的 Iceberg 兼容,你可以开启 Iceberg 兼容模式,Paimon 将实时的额外产生 Iceberg 兼容的 Snapshots,你可以使用 Iceberg 相关生态来读取此 Paimon 表。
  3. Caching Catalog:此版本默认引入了 Caching Catalog 的实现,Table 元数据以及 Manifest 文件都将被缓存到 Catalog 里面,这可以加速 OLAP Query 的性能。
  4. Bucketed Append 表可用性改进,它的小文件问题得到大大缓解,并且它可以被 Spark 应用到 Bucketed Join 中 (减少了 Join 中的 Shuffle)。
  5. Append 表的删改支持:此版本引入了 Append 的 DELETE & UPDATE & MERGEINTO 支持,你可以通过 Spark SQL 来删改 Append 表,并且它还支持 Deletion Vectors 模式。

02

01

兼容性修改


以下改动可能影响你使用的兼容性。


Bucketed Append 表


定义一张没有主键的表时,如果定义了 'bucket' 个数,那这张表就是 Bucketed Append,以前也叫 Append Queue 表,因为它更多被应用在有序的流写流读上。它的小文件问题得到大大缓解,并且它可以被 Spark 应用到 Bucketed Join 中 (减少了 Join 中的 Shuffle)。

以下是它的一些默认值改变:

  1. Bucketed Append 表未定义 bucket-key 时被禁止,之前版本的行为是按照全行来进行哈希取对应的 Bucket,但是这是难以理解的行为,我们建议你使用老版本 Paimon 来进行数据的重刷 (写入新的合法的表)。
  2. Bucketed Append 表的 'compaction.max.file-num' 参数默认值被调整为5,这意味着在单个 bucket 内只会有更少的小文件,避免小文件太多影响生产可用性。

虽然如此,我们仍然推荐没必要不要定义 Bucketed Append 表,默认 bucket -1 的模式是更易用的。

文件与压缩


Paimon 社区着力于提高默认参数下的综合表现,0.9 版本修改如下参数默认值:

  1. 文件格式 'file.format':默认由 orc 修改为 parquet,这两个格式没有本质上的差距,parquet 的综合表现要更好,并且社区补全了 parquet 的所有能力,包括嵌套类型的支持、Filter PushDown 等等。
  2. 文件大小 'target-file-size':主键表保持 128MB,非主键表 (Append 表) 默认值调整为 256MB。
  3. 压缩默认 'file.compression':默认由 lz4 修改为 zstd,默认 zstd 是 level 1,你可以通过 'file.compression.zstd-level' 来调整压缩等级,消费更多 Cpu 来获得更大的压缩率。
  4. 本地 Spill 压缩等级 'spill-compression.zstd-level':同样本地 Spill 也可以通过调整 Level 来获得更大的压缩率。


CDC 入湖


Flink CDC 的依赖升级到 3.1 版本,由于 Flink CDC 在这个版本成为了 Flink 的子项目,代码的包名已经被修改了,所以老版本的 CDC 将不能被支持。Mysql CDC、MongoDB CDC、Postgres CDC 将会被影响。

03

01

Paimon Branch


分支是一个有趣的功能,他可以让我们操作 Git 一样去操作 Paimon 表,它在 Paimon 0.9 达到了生产可用的状态,阿里巴巴内部企业已经在生产环境使用它来做数据订正和流批一体等功能。

比如你可以用分支来做数据订正:

-- create branch named 'branch1' from tag 'tag1'CALL sys.create_branch('default.T', 'branch1', 'tag1');
-- write to branch 'branch1'INSERT INTO `t$branch_branch1` SELECT ...
-- read from branch 'branch1'SELECT * FROM `t$branch_branch1`;
-- replace master branch with 'branch1'CALL sys.fast_forward('default.T', 'branch1');

比如你也可以用分支来做流批一体存储,比如你可以单独设置一个流分支,然后设置 'scan.fallback-branch',这样当一个批处理作业从当前分支读取时,如果某个分区不存在,会尝试从备用分支读取该分区。

假设你创建了一个按日期分区的 Paimon 表。你有一个长时间运行的流作业,它向 Paimon 插入记录,以便今天的数据能够及时查询。你还有一个在每晚运行的批处理作业,覆盖 Paimon 的分区,以确保数据的准确性。当您从这个 Paimon 表查询时,您希望首先从批处理作业的结果中读取。但如果某个分区(例如,今天的分区)在其结果中不存在,那么您希望从流作业的结果中读取。在这种情况下,您可以为流作业创建一个分支,并将 scan.fallback-branch 设置为该流分支。

-- create a branch for streaming job (realtime)CALL sys.create_branch('default.T', 'rt');
-- set primary key and bucket number for the branchALTER TABLE `T$branch_rt` SET ( 'primary-key' = 'dt,name', 'bucket' = '2', 'changelog-producer' = 'lookup');
-- set fallback branchALTER TABLE T SET ( 'scan.fallback-branch' = 'rt');
SELECT * FROM T;

04

01

Universal Format


Paimon 的 Universal Format 可以让你使用 Iceberg 的客户端或者计算引擎来读取 Paimon 里面的数据。通过 'metadata.iceberg-compatible' 参数,Paimon 在生成 Snapshots 会顺便在文件系统上生成 Iceberg 的 Snapshots,你不用引入任何依赖,也不用担心任何管控相关的问题。

值得注意的:

  1. Iceberg 的元数据放在文件目录中 (对应 Iceberg 的 HadoopCatalog),比如使用 Spark 的 DF 来读取:spark.read.format("iceberg").load("path").
  2. Iceberg 视图只读,通过此方式写入可能会将表破坏掉。
  3. 对于主键表,Iceberg 视图只能读到最高层 (LSM Level) 的文件,你可以配置 'compaction.optimization-interval' 来控制数据的可见性。


05

01

Caching Catalog

Paimon 的元数据放在文件系统中,这导致在计算引擎 Plan 时频繁去访问文件系统,单点的性能很有可能受影响,而对于对象存储,这个代价更为昂贵。

此版本默认引入了 Caching Catalog 的实现,它将被默认开启 (默认只 Cache 1MB 以下的 Manifest 文件),这可以加速 OLAP Query 的性能。

你可以通过以下参数控制 Cache 的行为:

cache-enabled
true
Boolean
Controls whether the catalog will cache databases, tables and manifests.
cache.expiration-interval
1 min
Duration
Controls the duration for which databases and tables in the catalog are cached.
cache.manifest.max-memory
(none)
MemorySize
Controls the maximum memory to cache manifest content.
cache.manifest.small-file-memory
128 mb
MemorySize
Controls the cache memory to cache small manifest files.
cache.manifest.small-file-threshold
1 mb
MemorySize
Controls the threshold of small manifest file.

06

01

Deletion Vectors


Deletion Vectors 模式在 0.9 版本中全面可用。

主键表的 Deletion Vectors 模式在 0.9 中支持了异步 Compaction 的配合 (默认半同步),这让它的可用性大大提升,不再强影响 Checkpoint,由于 DV 模式需要使用本地磁盘,我们推荐使用 SSD 的本地磁盘,如果是比较差的 HDD 本地盘,性能会比较糟糕。

非主键表 (Append 表) 在 0.9 中支持了 Spark SQL 的 DELETE & UPDATE & MERGEINTO,这使得 Paimon Append 表更像一个完整的数据库表,以支撑用户细粒度的删改。

不仅如此,非主键表也支持了 Deletion Vectors 模式,在没有开启前,删改是 Copy On Write;开启 DV 模式后,删改是 Merge On Write。Deletion 文件将在 Compaction 时候被删除。

07

01

核心能力


新增 Aggregation 函数


hll_sketch、theta_sketch、rbm32、rbm64,你可以通过 sketch 相关函数来进行 COUNT DISTINCT 的估算。Paimon 不支持自定义聚合函数,而是推荐你在社区提出需求丰富内置的函数库。

通用文件索引


通用文件索引支持了 Bitmap 类型,也支持了 REWRITE 的 CALL 命令,可以让你重新生成对应的索引,Bitmap 索引在多个字段的联合过滤条件表现不错。

历史分区 Compact


另外,如果你的表是张分区表,虽然 Paimon 有内置的自动 Compaction,但是它的历史分区可能并没有做过完整的 Full Compaction,我们在 0.9 中引入了 partition_idle_time 来自动选取没有更新的分区来进行 Full Compaction,以较少小文件和提高查询性能。

08

01

Flink 引擎


Cluster


Cluster 允许你在写入过程中根据某些列的值对 Append 表中的数据进行聚类。这种数据组织方式可以显著提高下游任务读取数据的效率,因为它能够实现更快速和更有针对性的数据查询。该功能仅支持 Append 表(bucket = -1)和批处理执行模式。
INSERT INTO my_table /*+ OPTIONS('sink.clustering.by-columns' = 'a,b') */   SELECT * FROM source;

分区 done 标


对于分区表,每个分区可能需要被调度以触发下游的批处理计算。因此,有必要选择这个时机来表明它已准备好进行调度,并尽量减少调度过程中的数据漂移。我们称这个过程为:“分区 Mark Done”。

CREATE TABLE my_partitioned_table (    f0 INT,    f1 INT,    f2 INT,    ...    dt STRING) PARTITIONED BY (dt) WITH (    'partition.timestamp-formatter'='yyyyMMdd',    'partition.timestamp-pattern'='$dt',    'partition.time-interval'='1 d',    'partition.idle-time-to-done'='15 m');

  1. 首先,需要定义分区的时间解析和分区之间的时间间隔,以确定何时可以恰当地标记分区为完成。
  2. 其次,需要定义 idle 时间,这决定了分区在没有新数据的情况下,需要多长时间后才能标记为完成。
  3. 第三,默认情况下,分区标记完成将创建一个 _SUCCESS 文件,你也可以配置 'partition.mark-done-action' 定义具体 Action。


表 Clone


Paimon 0.9 中支持了克隆表 Action 以进行数据迁移。目前,只会克隆最新快照使用的表文件。如果你克隆的表在此期间没有被修改,建议提交一个 Flink 批处理作业以获得更好的性能。然而,如果你希望在写入的同时克隆表,请提交一个 Flink 流处理作业以实现自动故障恢复。

这个命令可以帮助你进行方便的数据备份,以及数据迁移。

Procedures


Paimon 0.9 版本新增了大量 Procedures,另外 Flink Procedures 支持了最新版本的 Named Procedures 的模式,让你的执行更加方便,不用强行写全部参数。

09

01

Spark 引擎


动态参数


一直以来,Spark SQL 都没有动态参数的能力,像是 Flink SQL 有动态 Options,非常的方便。在 0.9 中,Spark 通过 SET 添加了此项能力。SET 命令中专门设置 Paimon 配置,需要添加 spark.paimon. 前缀。

-- set paimon confSET spark.paimon.file.block-size=512M;
-- reset confRESET spark.paimon.file.block-size;

Bucketed Join


Hive 中有一个特性:Bucketed Join。当两个表按照相同的列进行分桶时,可以执行 Bucketed Join。这种情况下,没必要对数据进行 Shuffle,直接按桶来进行 Join 即可,非常的高效。

而 Bucket 其实是 Paimon 的核心概念之一,一直以来,Paimon 缺少与计算引擎的对接,没法利用上这一特性进行优化。

0.9 中,Paimon 与 Spark SQL 深度集成,实现了此优化:

-- Enable bucketing optimizationSET spark.sql.sources.v2.bucketing.enabled=true;
-- Bucketed JoinSELECT * FROM t1 JOIN t2 on t1.id = t2.id

只要 Join 的两张表都是 Bucketed 表 (无论是主键表还是非主键表),且 bucket-key 字段就是 Join 字段,那这里就会进行高效的 Bucketed Join。

写入动态 Bucket 表


  1. Spark SQL 可以写入动态 Bucket 表经过了优化,第一次写入会少一次数据的 Shuffle。
  2. Spark SQL 可以写入跨分区更新的表了,但是整体上它的效率并不高。


10

01

其它进展


  1. Paimon Web UI:正在发布中,欢迎大家试用
  2. Paimon Python:预计近期会启动 0.1 的发布流程
  3. Paimon Rust:研发中,预计 0.1 发布可读版本


11

01

关于 Paimon


  • 微信公众号:Apache Paimon ,了解行业实践与最新动态
  • 官网:https://paimon.apache.org/ 查询文档和关注项目


     
 

活动推荐


阿里云基于 Apache Flink 构建的企业级产品-实时计算 Flink 版现开启活动:
新用户复制下方链接或者扫描二维码即可0元免费试用 Flink + Paimon
了解活动详情:https://free.aliyun.com/?pipCode=sc




▼ 关注「Apache Flink」,获取更多技术干货 


   点击「阅读原文」跳转阿里云实时计算 Flink

Apache Flink
Apache Flink 中文社区唯一官微,由 Flink PMC 维护;
 最新文章