版本综述
Paimon 的长期规划是成为统一的湖存储格式,满足时效分钟级大数据主流需求:离线批计算、实时流计算、OLAP 计算。此版本值得注意的改动有:
Paimon Branch:此版本 Branch 功能正式生产可用,并且引入了 'scan.fallback-branch' 功能帮助业务更好的统一流批一体存储。 Universal Format:此版本引入了原生的 Iceberg 兼容,你可以开启 Iceberg 兼容模式,Paimon 将实时的额外产生 Iceberg 兼容的 Snapshots,你可以使用 Iceberg 相关生态来读取此 Paimon 表。 Caching Catalog:此版本默认引入了 Caching Catalog 的实现,Table 元数据以及 Manifest 文件都将被缓存到 Catalog 里面,这可以加速 OLAP Query 的性能。 Bucketed Append 表可用性改进,它的小文件问题得到大大缓解,并且它可以被 Spark 应用到 Bucketed Join 中 (减少了 Join 中的 Shuffle)。 Append 表的删改支持:此版本引入了 Append 的 DELETE & UPDATE & MERGEINTO 支持,你可以通过 Spark SQL 来删改 Append 表,并且它还支持 Deletion Vectors 模式。
兼容性修改
以下改动可能影响你使用的兼容性。
Bucketed Append 表
Bucketed Append 表未定义 bucket-key 时被禁止,之前版本的行为是按照全行来进行哈希取对应的 Bucket,但是这是难以理解的行为,我们建议你使用老版本 Paimon 来进行数据的重刷 (写入新的合法的表)。 Bucketed Append 表的 'compaction.max.file-num' 参数默认值被调整为5,这意味着在单个 bucket 内只会有更少的小文件,避免小文件太多影响生产可用性。
文件与压缩
文件格式 'file.format':默认由 orc 修改为 parquet,这两个格式没有本质上的差距,parquet 的综合表现要更好,并且社区补全了 parquet 的所有能力,包括嵌套类型的支持、Filter PushDown 等等。 文件大小 'target-file-size':主键表保持 128MB,非主键表 (Append 表) 默认值调整为 256MB。 压缩默认 'file.compression':默认由 lz4 修改为 zstd,默认 zstd 是 level 1,你可以通过 'file.compression.zstd-level' 来调整压缩等级,消费更多 Cpu 来获得更大的压缩率。 本地 Spill 压缩等级 'spill-compression.zstd-level':同样本地 Spill 也可以通过调整 Level 来获得更大的压缩率。
CDC 入湖
Paimon Branch
-- create branch named 'branch1' from tag 'tag1'
CALL sys.create_branch('default.T', 'branch1', 'tag1');
-- write to branch 'branch1'
INSERT INTO `t$branch_branch1` SELECT ...
-- read from branch 'branch1'
SELECT * FROM `t$branch_branch1`;
-- replace master branch with 'branch1'
CALL sys.fast_forward('default.T', 'branch1');
-- create a branch for streaming job (realtime)
CALL sys.create_branch('default.T', 'rt');
-- set primary key and bucket number for the branch
ALTER TABLE `T$branch_rt` SET (
'primary-key' = 'dt,name',
'bucket' = '2',
'changelog-producer' = 'lookup'
);
-- set fallback branch
ALTER TABLE T SET (
'scan.fallback-branch' = 'rt'
);
SELECT * FROM T;
Universal Format
Iceberg 的元数据放在文件目录中 (对应 Iceberg 的 HadoopCatalog),比如使用 Spark 的 DF 来读取:spark.read.format("iceberg").load("path"). Iceberg 视图只读,通过此方式写入可能会将表破坏掉。 对于主键表,Iceberg 视图只能读到最高层 (LSM Level) 的文件,你可以配置 'compaction.optimization-interval' 来控制数据的可见性。
Caching Catalog
Deletion Vectors
核心能力
新增 Aggregation 函数
通用文件索引
历史分区 Compact
Flink 引擎
Cluster
INSERT INTO my_table /*+ OPTIONS('sink.clustering.by-columns' = 'a,b') */
SELECT * FROM source;
分区 done 标
CREATE TABLE my_partitioned_table (
f0 INT,
f1 INT,
f2 INT,
...
dt STRING
) PARTITIONED BY (dt) WITH (
'partition.timestamp-formatter'='yyyyMMdd',
'partition.timestamp-pattern'='$dt',
'partition.time-interval'='1 d',
'partition.idle-time-to-done'='15 m'
);
首先,需要定义分区的时间解析和分区之间的时间间隔,以确定何时可以恰当地标记分区为完成。 其次,需要定义 idle 时间,这决定了分区在没有新数据的情况下,需要多长时间后才能标记为完成。 第三,默认情况下,分区标记完成将创建一个 _SUCCESS 文件,你也可以配置 'partition.mark-done-action' 定义具体 Action。
表 Clone
Procedures
Spark 引擎
动态参数
-- set paimon conf
SET spark.paimon.file.block-size=512M;
-- reset conf
RESET spark.paimon.file.block-size;
Bucketed Join
-- Enable bucketing optimization
SET spark.sql.sources.v2.bucketing.enabled=true;
-- Bucketed Join
SELECT * FROM t1 JOIN t2 on t1.id = t2.id
写入动态 Bucket 表
Spark SQL 可以写入动态 Bucket 表经过了优化,第一次写入会少一次数据的 Shuffle。 Spark SQL 可以写入跨分区更新的表了,但是整体上它的效率并不高。
其它进展
Paimon Web UI:正在发布中,欢迎大家试用 Paimon Python:预计近期会启动 0.1 的发布流程 Paimon Rust:研发中,预计 0.1 发布可读版本
关于 Paimon
微信公众号:Apache Paimon ,了解行业实践与最新动态 官网:https://paimon.apache.org/ 查询文档和关注项目
活动推荐