流式 SQL 是一种将声明式 SQL 的强大功能引入实时数据流的方法。使用流式 SQL 能在数据事件生成时立刻进行分析,从而即时获取有用信息。RisingWave Streaming SQL 是 RisingWave 处理流数据的 SQL 变体,且由于 RisingWave 与 PostgreSQL 兼容,许多语句与 PostgreSQL 中的语句相似,使得其应用更加方便。
为了帮助大家快速掌握 RisingWave Streaming SQL,我们准备了这篇文章供大家参考。
1核心概念
Source
表
RisingWave 中的表可以存储实时数据和历史数据。
Sink
Sink 是导出数据的目的地。通常 Sink 包括 Kafka 主题和数据库表。
物化视图
RisingWave 以独特的方式利用物化视图,实现高效的管道编排和数据转换。RisingWave 中的物化视图在接收到新事件时会自动刷新并增量计算。
2数据摄取
您可以创建一个 Source 或表来摄取实时数据。对于 Source 而言,数据并不会持久化存储在 RisingWave 中;而对于表格来说,数据则会持久化存储在 RisingWave 中。对于 CDC 数据,需要使用表。
创建 MySQL Source
创建 MySQL Source 以从 MySQL 表中摄取 CDC 数据。
CREATE TABLE my_risingwave_table (
id INT PRIMARY KEY,
name TEXT,
age INT
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '{hostname}',
'port' = 3306,
'username' = '{username}',
'password' = '{password}',
'database-name' = '{my_db}',
'table-name' = '{my_table}'
);
创建 Debezium Avro Source
摄取序列化为 Debezium Avro 的流数据:
CREATE SOURCE ... WITH (..)
FORMAT DEBEZIUM ENCODE AVRO (
'schema.registry.url' = '<http://your-schema-registry:8081>'
);
创建 Protobuf Source
假设在 twitter.schema
包(命名空间)中有一个名为 Event
的 Protobuf 消息,那么创建 Source 的 SQL 语句如下:
CREATE SOURCE ... WITH (..)
FORMAT PLAIN ENCODE PROTOBUF (
'message' = 'twitter.schema.Event',
'schema.location' = 'http://your_schema_host/schema_file_path'
);
推荐使用 Confluent Schema Registry 来管理 Protobuf Schema。
创建 Avro 格式的 Upsert Kafka Source
Upsert 是一种将墓碑记录(值为 null 的记录)视为 DELETE 事件的格式,通常用于 CDC 场景。
CREATE SOURCE ... WITH (..)
FORMAT UPSERT ENCODE AVRO (
'schema.registry.url' = '<http://your-schema-registry:8081>'
);
向表中添加列
ALTER TABLE my_event_table ADD COLUMN new_field INT;
向表中批量插入值
INSERT INTO my_table (column1, column2, column3)
VALUES ('value1', 'value2', 'value3'),
('value4', 'value5', 'value6');
FLUSH;
其中 FLUSH 命令用于确保数据已提交。
3数据转换
您应在物化视图的定义中表达转换逻辑,以便享受物化视图的优势。如果您希望使用 RisingWave 实时转换数据,我们推荐创建物化视图。
创建物化视图
CREATE MATERIALIZED VIEW mv_avg_speed AS
SELECT
SUM(distance) AS total_distance,
SUM(duration) AS total_duration,
SUM(distance) / SUM(duration) AS avg_speed
FROM taxi_trips;
按组取前 N 名
SELECT * FROM (
SELECT *, ROW_NUMBER()
OVER (PARTITION BY category ORDER BY sales DESC)
AS row_num
FROM shop_sales)
WHERE row_num <= N;
时间过滤器
SELECT *
FROM sales
WHERE sale_date > NOW() - INTERVAL '1 week';
时间 Join
SELECT transaction_id, product_id, quantity, sale_date, product_name
FROM sales
JOIN products FOR SYSTEM_TIME AS OF PROCTIME() ON product_id = id;
TUMBLE 时间窗口函数
TUMBLE
函数用于将时间序列数据窗口化为固定大小、非重叠的区间。例如:
CREATE MATERIALIZED VIEW hot_hashtags AS
SELECT hashtag, COUNT(*) AS hashtag_occurrences, window_start
FROM TUMBLE(
twitter_stream, time_col, INTERVAL '10 MINUTES')
GROUP BY hashtag, window_start;
HOP 时间窗口函数
HOP 窗口允许指定重叠的时间区间。HOP 函数中指定了两个区间。第一个是窗口之间的间隔宽度,第二个是 HOP 窗口的窗口大小。
CREATE MATERIALIZED VIEW hop_windows AS
SELECT window_start, window_end, count(trip_id) AS cnt
FROM HOP(
trips, trip_time, INTERVAL '2 MINUTES', INTERVAL '10 MINUTES')
GROUP BY window_start, window_end;
基于时间窗口的流式 Join
SELECT trip.window_start, trip.window_end, trip.distance, fare.fare_amount
FROM TUMBLE(trips, completed_at, INTERVAL '2 MINUTES') as trip
JOIN TUMBLE(fares, completed_at, INTERVAL '2 MINUTES') as fare
ON trip.trip_id = fare.trip_id AND trip.window_start = fare.window_start
ORDER BY trip.window_start ASC;
MV-on-MV
在 RisingWave 中,可以在现有的物化视图之上创建物化视图 (MV-on-MV),这使得无需额外的中间件即可编排流管道。此功能简化了构建实时数据仓库时常见的分层数据架构。
Watermark 和窗口关闭时触发 (EOWC)
以下示例每 5 分钟输出一次每分钟的订单总数。此优化允许物化视图保持追加语义,这可以提高效率,尤其是在下游系统(如 S3 Sink)偏好追加写入时。
CREATE SOURCE orders (
product VARCHAR,
price DOUBLE PRECISION,
order_time TIMESTAMPTZ,
WATERMARK FOR order_time AS order_time - INTERVAL '5 MINUTES'
);
CREATE MATERIALIZED VIEW orders_per_minute AS
SELECT window_start, COUNT(*) as orders_count
FROM TUMBLE(orders, order_time, INTERVAL '1 MINUTE')
GROUP BY window_start
EMIT ON WINDOW CLOSE;
JSONB
由于实时事件往往是未处理的、结构复杂且具有嵌套特性,JSONB 在流处理系统中被广泛使用。
4数据导出
创建 Avro 格式的 Upsert Kafka Sink
CREATE SINK IF NOT EXISTS orders_count_sink FROM orders_count
WITH (
'connector' = 'kafka',
'properties.bootstrap.server' = 'your_kafka_broker_url',
'topic' = 'orders_count_topic',
'primary_key' = 'order_id'
) FORMAT UPSERT ENCODE AVRO;
创建 Upsert Postgres Sink
CREATE SINK
WITH (
'connector' = 'jdbc',
'jdbc.url' = 'jdbc:postgresql://postgres_url:5432/mydatabase?user=myuser&password=mypassword',
'table.name' = 'orders_count',
'type' = 'upsert',
'primary_key' = 'order_id'
);
RisingWave 支持 MySQL Sink 以及通过 JDBC 的更多 Sink 连接器。
5总结
在传统的数据流处理语言过于复杂的情况下,流式 SQL 应运而生,它简化了任务,使用户能够从快速变化的数据中获得实时见解。
关于 RisingWave
往期推荐
技术内幕