在 RisingWave 中实现 Sink 与上游物化视图解耦

文摘   科技   2024-06-06 14:51   北京  

|作者:温一鸣 RisingWave Labs SWE


1RisingWave 中的 Sink

在 RisingWave 中,我们除了可以通过 CREATE MATERIALIZED VIEW 语句创建实时物化视图以外,还可以通过 CREATE SINK语句[1],将物化视图中的变更实时导出到下游系统中,以便与外部生态进行集成。

2全局 Checkpoint 引起的稳定性问题

RisingWave 使用与 Apache Flink 相同的 Chandy-Lamport 算法[2] 对状态进行 Checkpoint。Barrier 会周期性地从最上游的 Source 注入,然后沿着整个 Streaming Graph,从 Source 流到下游的物化视图和 Sink,等 Barrier 流过所有节点以后,才能将两个 Barrier 之间的状态进行持久化,而只有持久化以后,才能查询到物化视图的最新结果。

物化视图是 RisingWave 中的内部概念,其数据处理速度以及稳定性比较可控,但是 Sink 需要与外部系统交互,其处理速度与稳定性很难保证。因此,当下游的外部系统出现抖动时,Sink 可能会需要比较长的时间才能把 Barrier 之间的数据处理完,而因为上述的全局 Checkpoint 机制,物化视图会受到 Sink 影响,其数据需要等 Sink 将数据处理完后才能进行 Checkpoint ,甚至在下游外部系统不可用的时候,整个系统将会无法完成 Checkpoint ,进入不可用的状态。

3在 Sink 中内置 Log Store 实现与上游解耦

上述情况中,最大的问题在于整个系统是做全局 Checkpoint ,尽管物化视图已经处理完最新的数据,但仍然需要等待 Sink 处理完数据以后才能完成 Checkpoint 并可见。要想解决这个问题,直观上看,只需要全局 Checkpoint 不等待 Sink 处理数据即可,也就是不等 Barrier 流过 Sink 算子就进行 Checkpoint。

然而,RisingWave 的 Sink 需要保证 At-Least-Once 语义,也就是说要确保数据不会丢,只会在故障恢复等少数边界情况可能输出重复数据。如果做全局 Checkpoint 时不等 Sink 处理完数据,则会破坏 At-Least-Once 的语义,造成数据丢失。以下图为例。

在上述例子中,假设 Source 中有两条分别为 Alice 和 Bob 的数据,而在 Sink 只把 Alice 的数据传给下游以后,系统就做了一次 Checkpoint 。当故障恢复发生时 ,系统会从 Checkpoint 的状态恢复,而由于在 Checkpoint 的时候 Sink 只处理了 Alice 的数据,但 Alice 以及 Bob 的数据在上游物化视图已经处理过,因此不会再将 Bob 的数据传给 Sink ,将会丢失数据。

要想解决数据丢失的问题,我们需要将 Sink 尚未处理的数据作为状态包含进 Checkpoint 中,在上述例子中,也就是说,我们需要将未处理的 Bob 的数据作为状态记录下来,在故障恢复以后可以继续处理 Bob 的数据。在 RisingWave 中,我们在 Sink 中引入了 Log Store 状态,在进行 Checkpoint 时,Sink 未处理的数据将会被写入到 Log Store 中,被包含到 Checkpoint 的状态中。

在故障恢复时,Sink 会先查询并处理 Log Store 中尚未被处理的数据,然后才会处理新写入的数据。引入 Log Store 以后,Checkpoint 不需要等待 Sink 处理完数据,实现了 Sink 与上游物化视图的解耦,提高了物化视图的稳定性以及新鲜度。

在引入了 Log Store 以后,不仅可以隔离外部系统处理速度慢对物化视图的影响,甚至在外部系统不可用时,依然可以做到与物化视图的隔离。当外部系统不可用导致 Sink 出现错误时,Sink 会恢复到 Log Store 中最新的 Offset,并在递增的重试延时后,不断尝试重新处理数据。

在 RisingWave 中,所有的状态都被抽象成表,Log Store 的状态也不例外。Log Store 的表可以简单视作包含了以下的列:

  • Epoch,Barrier 中的递增时间戳,可以唯一的标识一个 Barrier
  • Epoch 内的递增 Row Id,用来标记每条 Log 的顺序
  • Row Op,可以是 Insert, Delete 和 Update (Update 会被拆成 UpdateDelete 和 UpdateInsert)
  • Sink 中的所有列,用于存放实际的数据
Log Store 表的主键是 Epoch + Row Id。RisingWave 的底层状态存储使用自研的 KV 存储引擎 Hummock,通过调用其 Iter 方法,将数据以 Epoch 从老到新,Row Id 从小到大的数据依次扫出来,按顺序还原出其原来的数据。当 Sink 处理完新的数据后,会对 Log Store 在最新消费的 Offset 进行 Truncate,以对 Log Store 数据进行状态清理。Truncate 的过程中将 Offset 转化为 KV 存储引擎的 Range Delete 操作,以较为轻量的方式实现状态清理。
4用法
在 RisingWave 中,我们使用 sink_decouple 这一 Session Variable 来配置是否使用 Log Store。默认情况下, sink_decouple 的值为 Default,表示按照各种 Sink 的默认配置来决定是否使用 Log Store,不同 Sink 的最新默认配置请见文档[3]
默认情况下,在输入为 Append-only 的时候,以下类型的 Sink 会使用 Log Store 来实现 Sink 与上游解耦。
  • 消息队列类型的 Sink,如 Kafka, Pulsar, Kinesis, NATS 等
  • 使用 JDBC 实现的 Sink,如 MySQL, PostgreSQL, TiDB, CockroachDB 等
  • ClickHouse
如果需要显式地开启 Log Store,则可以执行 set sink_decouple=true 来指定之后创建的 Sink 都是带 Log Store 的。值得注意的是,这是一个 Session Variable,设置以后,只在当前的连接生效。
以上 Sink 之所以会默认开启 Log Store,是因为他们的数据是逐条处理的,在处理完一条或者一小批数据后,就能对 Log Store 进行 Truncate,在外部系统能够及时处理新数据的情况下,大部分数据不需要持久化到 Log Store 中去。而其他 Sink 在目前的实现中,只有在 Barrier 到达的时候才将数据持久化到外部系统中,因此绝大部分数据都需要持久化到 Log Store中,造成额外不必要的计算和存储成本,因此其他 Sink 需要手动设置 set sink_decouple=true 才会开启 Log Store。
5部分 Checkpoint
以上描述的是在 Sink 中引入 Log Store 状态来将 Sink 与上游物化视图解耦的实现。但同时,由于 RisingWave 中可以基于物化视图创建物化视图,类似的问题还可能出现在上下游的物化视图中,上游的物化视图会受下游的物化视图影响。要将上下游物化视图解耦,我们同样可以在上下游物化视图之间插入 Log Store,同时将全局 Checkpoint 的机制改成部分 Checkpoint (Partial Checkpoint),当上游物化视图完成时,可以不等待下游物化视图处理完,就进行持久化。由于 RisingWave 的状态存储采用的是 LSM Tree[4],其新写入的数据本身就会作为独立的 SST 文件加入到 LSM Tree 中,因此,我们不需要像 Sink 一样单独写一份 Log 数据,而是可以直接将物化视图新写入的 SST 作为 Log Store,下游物化视图在落后的时候可以直接从新写入的 SST 文件中读增量的 Log,避免额外写入数据,减少存储和计算成本。具体细节可以看 Partial Checkpoint 的 RFC[5]
目前 Partial Checkpoint 的功能正在开发中,最新开发进度可以在 Issue[6] 中查看。完成后, Sink 与物化视图的解耦可以直接基于部分 Checkpoint 实现,可以避免额外写入 Log Store 数据,减少存储和计算成本。

参考资料

[1]

CREATE SINK 语句: https://docs.risingwave.com/docs/current/sql-create-sink/

[2]

Chandy-Lamport 算法: https://en.wikipedia.org/wiki/Chandy–Lamport_algorithm

[3]

不同 Sink 的最新默认配置请见文档: https://docs.risingwave.com/docs/current/sql-create-sink/

[4]

LSM Tree: https://en.wikipedia.org/wiki/Log-structured_merge-tree

[5]

Partial Checkpoint 的 RFC: https://github.com/risingwavelabs/rfcs/pull/84

[6]

Issue: https://github.com/risingwavelabs/risingwave/issues/1404

关于 RisingWave 

RisingWave 是一款基于 Apache 2.0 协议开源的分布式流数据库,致力于为用户提供极致简单、高效的流数据处理与管理能力。RisingWave 采用存算分离架构,实现了高效的复杂查询、瞬时动态扩缩容以及快速故障恢复,并助力用户极大地简化流计算架构,轻松搭建稳定且高效的流计算应用。
RisingWave 始终聆听来自社区的声音,并积极回应用户的反馈。目前,RisingWave 已汇聚了 150+ 名开源贡献者和 3000+ 名社区成员。全球范围内,已有上百个 RisingWave 集群在生产环境中部署。


往期推荐

技术内幕

如何上手 RisingWave 👉 新手入门教程

RisingWave 中文用户文档上线,阅读更高效!

深入探索 RisingWave 中的高可用性与容错机制

深入理解 RisingWave 流处理引擎(三):触发机制

深入理解 RisingWave 流处理引擎(二):计算模型

深入理解 RisingWave 流处理引擎(一):总览

用户案例
视源股份(CVTE)IT 流计算应用历程
尘锋 SCRM 如何使用 RisingWave 实时打宽
RisingWave 在超百亿管理规模对冲基金公司中的应用
金融科技公司 Kaito 使用 RisingWave 实现实时智能化
龙腾出行如何通过 RisingWave 实现实时数据分析
RisingWave 助力乾象投资打造实时监控平台

RisingWave中文开源社区
RisingWave 是一款开源分布式 SQL 流数据库,致力于大幅降低流计算使用门槛与复杂度。RisingWave 已为全球超百家企业构建新一代流处理与分析平台。
 最新文章