|作者:温一鸣 RisingWave Labs SWE
1RisingWave 中的 Sink
在 RisingWave 中,我们除了可以通过 CREATE MATERIALIZED VIEW
语句创建实时物化视图以外,还可以通过 CREATE SINK
语句[1],将物化视图中的变更实时导出到下游系统中,以便与外部生态进行集成。
2全局 Checkpoint 引起的稳定性问题
RisingWave 使用与 Apache Flink 相同的 Chandy-Lamport 算法[2] 对状态进行 Checkpoint。Barrier 会周期性地从最上游的 Source 注入,然后沿着整个 Streaming Graph,从 Source 流到下游的物化视图和 Sink,等 Barrier 流过所有节点以后,才能将两个 Barrier 之间的状态进行持久化,而只有持久化以后,才能查询到物化视图的最新结果。
物化视图是 RisingWave 中的内部概念,其数据处理速度以及稳定性比较可控,但是 Sink 需要与外部系统交互,其处理速度与稳定性很难保证。因此,当下游的外部系统出现抖动时,Sink 可能会需要比较长的时间才能把 Barrier 之间的数据处理完,而因为上述的全局 Checkpoint 机制,物化视图会受到 Sink 影响,其数据需要等 Sink 将数据处理完后才能进行 Checkpoint ,甚至在下游外部系统不可用的时候,整个系统将会无法完成 Checkpoint ,进入不可用的状态。
3在 Sink 中内置 Log Store 实现与上游解耦
上述情况中,最大的问题在于整个系统是做全局 Checkpoint ,尽管物化视图已经处理完最新的数据,但仍然需要等待 Sink 处理完数据以后才能完成 Checkpoint 并可见。要想解决这个问题,直观上看,只需要全局 Checkpoint 不等待 Sink 处理数据即可,也就是不等 Barrier 流过 Sink 算子就进行 Checkpoint。
然而,RisingWave 的 Sink 需要保证 At-Least-Once 语义,也就是说要确保数据不会丢,只会在故障恢复等少数边界情况可能输出重复数据。如果做全局 Checkpoint 时不等 Sink 处理完数据,则会破坏 At-Least-Once 的语义,造成数据丢失。以下图为例。
在上述例子中,假设 Source 中有两条分别为 Alice 和 Bob 的数据,而在 Sink 只把 Alice 的数据传给下游以后,系统就做了一次 Checkpoint 。当故障恢复发生时 ,系统会从 Checkpoint 的状态恢复,而由于在 Checkpoint 的时候 Sink 只处理了 Alice 的数据,但 Alice 以及 Bob 的数据在上游物化视图已经处理过,因此不会再将 Bob 的数据传给 Sink ,将会丢失数据。
要想解决数据丢失的问题,我们需要将 Sink 尚未处理的数据作为状态包含进 Checkpoint 中,在上述例子中,也就是说,我们需要将未处理的 Bob 的数据作为状态记录下来,在故障恢复以后可以继续处理 Bob 的数据。在 RisingWave 中,我们在 Sink 中引入了 Log Store 状态,在进行 Checkpoint 时,Sink 未处理的数据将会被写入到 Log Store 中,被包含到 Checkpoint 的状态中。
在故障恢复时,Sink 会先查询并处理 Log Store 中尚未被处理的数据,然后才会处理新写入的数据。引入 Log Store 以后,Checkpoint 不需要等待 Sink 处理完数据,实现了 Sink 与上游物化视图的解耦,提高了物化视图的稳定性以及新鲜度。
在引入了 Log Store 以后,不仅可以隔离外部系统处理速度慢对物化视图的影响,甚至在外部系统不可用时,依然可以做到与物化视图的隔离。当外部系统不可用导致 Sink 出现错误时,Sink 会恢复到 Log Store 中最新的 Offset,并在递增的重试延时后,不断尝试重新处理数据。
在 RisingWave 中,所有的状态都被抽象成表,Log Store 的状态也不例外。Log Store 的表可以简单视作包含了以下的列:
Epoch,Barrier 中的递增时间戳,可以唯一的标识一个 Barrier Epoch 内的递增 Row Id,用来标记每条 Log 的顺序 Row Op,可以是 Insert, Delete 和 Update (Update 会被拆成 UpdateDelete 和 UpdateInsert) Sink 中的所有列,用于存放实际的数据
sink_decouple
这一 Session Variable 来配置是否使用 Log Store。默认情况下, sink_decouple
的值为 Default,表示按照各种 Sink 的默认配置来决定是否使用 Log Store,不同 Sink 的最新默认配置请见文档[3]。消息队列类型的 Sink,如 Kafka, Pulsar, Kinesis, NATS 等 使用 JDBC 实现的 Sink,如 MySQL, PostgreSQL, TiDB, CockroachDB 等 ClickHouse
set sink_decouple=true
来指定之后创建的 Sink 都是带 Log Store 的。值得注意的是,这是一个 Session Variable,设置以后,只在当前的连接生效。set sink_decouple=true
才会开启 Log Store。参考资料
CREATE SINK
语句: https://docs.risingwave.com/docs/current/sql-create-sink/
Chandy-Lamport 算法: https://en.wikipedia.org/wiki/Chandy–Lamport_algorithm
[3]不同 Sink 的最新默认配置请见文档: https://docs.risingwave.com/docs/current/sql-create-sink/
[4]LSM Tree: https://en.wikipedia.org/wiki/Log-structured_merge-tree
[5]Partial Checkpoint 的 RFC: https://github.com/risingwavelabs/rfcs/pull/84
[6]Issue: https://github.com/risingwavelabs/risingwave/issues/1404
关于 RisingWave
往期推荐
技术内幕