RisingWave Streaming SQL 速查表

文摘   科技   2024-08-20 15:26   北京  

流式 SQL 是一种将声明式 SQL 的强大功能引入实时数据流的方法。使用流式 SQL 能在数据事件生成时立刻进行分析,从而即时获取有用信息。RisingWave Streaming SQL 是 RisingWave 处理流数据的 SQL 变体,且由于 RisingWave 与 PostgreSQL 兼容,许多语句与 PostgreSQL 中的语句相似,使得其应用更加方便。

为了帮助大家快速掌握 RisingWave Streaming SQL,我们准备了这篇文章供大家参考。

1核心概念

Source

您可以从 Source(数据源)中摄取实时数据。通常 Source 包括 Kafka 主题和 PostgreSQL 变更日志流。

RisingWave 中的表可以存储实时数据和历史数据

Sink

Sink 是导出数据的目的地。通常 Sink 包括 Kafka 主题和数据库表。

物化视图

RisingWave 以独特的方式利用物化视图,实现高效的管道编排和数据转换。RisingWave 中的物化视图在接收到新事件时会自动刷新并增量计算。

2数据摄取

您可以创建一个 Source 或表来摄取实时数据。对于 Source 而言,数据并不会持久化存储在 RisingWave 中;而对于表格来说,数据则会持久化存储在 RisingWave 中。对于 CDC 数据,需要使用表。

创建 MySQL Source

创建 MySQL Source 以从 MySQL 表中摄取 CDC 数据。

CREATE TABLE my_risingwave_table (
    id INT PRIMARY KEY,
    name TEXT,
    age INT
WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '{hostname}',
    'port' = 3306,
    'username' = '{username}',
    'password' = '{password}',
    'database-name' = '{my_db}',
    'table-name' = '{my_table}'
);

创建 Debezium Avro Source

摄取序列化为 Debezium Avro 的流数据:

CREATE SOURCE ... WITH (..)
FORMAT DEBEZIUM ENCODE AVRO (
    'schema.registry.url' = '<http://your-schema-registry:8081>'
);

创建 Protobuf Source

假设在 twitter.schema 包(命名空间)中有一个名为 Event 的 Protobuf 消息,那么创建 Source 的 SQL 语句如下:

CREATE SOURCE ... WITH (..)
FORMAT PLAIN ENCODE PROTOBUF (
    'message' = 'twitter.schema.Event',
    'schema.location' = 'http://your_schema_host/schema_file_path'
);

推荐使用 Confluent Schema Registry 来管理 Protobuf Schema。

创建 Avro 格式的 Upsert Kafka Source

Upsert 是一种将墓碑记录(值为 null 的记录)视为 DELETE 事件的格式,通常用于 CDC 场景。

CREATE SOURCE ... WITH (..)
FORMAT UPSERT ENCODE AVRO (
    'schema.registry.url' = '<http://your-schema-registry:8081>'
);

向表中添加列

ALTER TABLE my_event_table ADD COLUMN new_field INT;

向表中批量插入值

INSERT INTO my_table (column1, column2, column3)
VALUES ('value1''value2''value3'),
       ('value4''value5''value6');
FLUSH;

其中 FLUSH 命令用于确保数据已提交。

3数据转换

您应在物化视图的定义中表达转换逻辑,以便享受物化视图的优势。如果您希望使用 RisingWave 实时转换数据,我们推荐创建物化视图。

创建物化视图

CREATE MATERIALIZED VIEW mv_avg_speed AS
SELECT
    SUM(distance) AS total_distance,
    SUM(durationAS total_duration,
    SUM(distance) / SUM(durationAS avg_speed
FROM taxi_trips;

按组取前 N 名

SELECT * FROM (
  SELECT *, ROW_NUMBER()
  OVER (PARTITION BY category ORDER BY sales DESC)
  AS row_num
  FROM shop_sales)
WHERE row_num <= N;

时间过滤器

SELECT *
FROM sales
WHERE sale_date > NOW() - INTERVAL '1 week';

时间 Join

SELECT transaction_id, product_id, quantity, sale_date, product_name
FROM sales
JOIN products FOR SYSTEM_TIME AS OF PROCTIME() ON product_id = id;

TUMBLE 时间窗口函数

TUMBLE 函数用于将时间序列数据窗口化为固定大小、非重叠的区间。例如:

CREATE MATERIALIZED VIEW hot_hashtags AS
SELECT hashtag, COUNT(*) AS hashtag_occurrences, window_start
FROM TUMBLE(
  twitter_stream, time_col, INTERVAL '10 MINUTES')
GROUP BY hashtag, window_start;

HOP 时间窗口函数

HOP 窗口允许指定重叠的时间区间。HOP 函数中指定了两个区间。第一个是窗口之间的间隔宽度,第二个是 HOP 窗口的窗口大小。

CREATE MATERIALIZED VIEW hop_windows AS
SELECT window_start, window_end, count(trip_id) AS cnt
FROM HOP(
  trips, trip_time, INTERVAL '2 MINUTES'INTERVAL '10 MINUTES')
GROUP BY window_start, window_end;

基于时间窗口的流式 Join

SELECT trip.window_start, trip.window_end, trip.distance, fare.fare_amount
FROM TUMBLE(trips, completed_at, INTERVAL '2 MINUTES'as trip
JOIN TUMBLE(fares, completed_at, INTERVAL '2 MINUTES'as fare
ON trip.trip_id = fare.trip_id AND trip.window_start = fare.window_start
ORDER BY trip.window_start ASC;

MV-on-MV

在 RisingWave 中,可以在现有的物化视图之上创建物化视图 (MV-on-MV),这使得无需额外的中间件即可编排流管道。此功能简化了构建实时数据仓库时常见的分层数据架构。

Watermark 和窗口关闭时触发 (EOWC)

以下示例每 5 分钟输出一次每分钟的订单总数。此优化允许物化视图保持追加语义,这可以提高效率,尤其是在下游系统(如 S3 Sink)偏好追加写入时。

CREATE SOURCE orders (
  product VARCHAR,
  price DOUBLE PRECISION,
  order_time TIMESTAMPTZ,
  WATERMARK FOR order_time AS order_time - INTERVAL '5 MINUTES'
);

CREATE MATERIALIZED VIEW orders_per_minute AS
SELECT window_start, COUNT(*) as orders_count
FROM TUMBLE(orders, order_time, INTERVAL '1 MINUTE')
GROUP BY window_start
EMIT ON WINDOW CLOSE;

JSONB

由于实时事件往往是未处理的、结构复杂且具有嵌套特性,JSONB 在流处理系统中被广泛使用。

4数据导出

创建 Avro 格式的 Upsert Kafka Sink

CREATE SINK IF NOT EXISTS orders_count_sink FROM orders_count
WITH (
  'connector' = 'kafka',
  'properties.bootstrap.server' = 'your_kafka_broker_url',
  'topic' = 'orders_count_topic',
  'primary_key' = 'order_id'
FORMAT UPSERT ENCODE AVRO;

创建 Upsert Postgres Sink

CREATE SINK
WITH (
  'connector' = 'jdbc',
  'jdbc.url' = 'jdbc:postgresql://postgres_url:5432/mydatabase?user=myuser&password=mypassword',
  'table.name' = 'orders_count',
  'type' = 'upsert',
  'primary_key' = 'order_id'
);

RisingWave 支持 MySQL Sink 以及通过 JDBC 的更多 Sink 连接器。

5总结

在传统的数据流处理语言过于复杂的情况下,流式 SQL 应运而生,它简化了任务,使用户能够从快速变化的数据中获得实时见解。

关于 RisingWave 

RisingWave 是一款基于 Apache 2.0 协议开源的分布式流数据库,致力于为用户提供极致简单、高效的流数据处理与管理能力。RisingWave 采用存算分离架构,实现了高效的复杂查询、瞬时动态扩缩容以及快速故障恢复,并助力用户极大地简化流计算架构,轻松搭建稳定且高效的流计算应用。
RisingWave 始终聆听来自社区的声音,并积极回应用户的反馈。目前,RisingWave 已汇聚了 150+ 名开源贡献者和 3000+ 名社区成员。全球范围内,已有上百个 RisingWave 集群在生产环境中部署。

往期推荐

技术内幕

如何上手 RisingWave 👉 新手入门教程

RisingWave 中文用户文档上线,阅读更高效!

深入探索 RisingWave 中的高可用性与容错机制

深入理解 RisingWave 流处理引擎(三):触发机制

深入理解 RisingWave 流处理引擎(二):计算模型

深入理解 RisingWave 流处理引擎(一):总览

用户案例
视源股份(CVTE)IT 流计算应用历程
尘锋 SCRM 如何使用 RisingWave 实时打宽
RisingWave 在超百亿管理规模对冲基金公司中的应用
金融科技公司 Kaito 使用 RisingWave 实现实时智能化
龙腾出行如何通过 RisingWave 实现实时数据分析
RisingWave 助力乾象投资打造实时监控平台

RisingWave中文开源社区
RisingWave 是一款开源分布式 SQL 流数据库,致力于大幅降低流计算使用门槛与复杂度。RisingWave 已为全球超百家企业构建新一代流处理与分析平台。
 最新文章