RisingWave 2.0 版本正式发布!此次更新为重大升级,和 1.0 版本相比,在生态支持、数据库管理、可扩展性和 SQL 功能等方面有了显著提升。
RisingWave 2.0 版本新增了许多 SQL 命令和子句,可以安全存储外部数据库凭据并执行时间旅行查询;新增了多种 Source 和 Sink,并对现有 Connectors 进行了显著改进。RisingWave 现在还支持以 Parquet 格式进行数据的摄取和导出。
如果您对 2.0 版的完整更新信息感兴趣,请查看发布说明[1]。
1重点更新
以下是 RisingWave 2.0 版本的一些重点更新。
增强订阅和游标
RisingWave 2.0 版对订阅和游标进行了重大更改,改变了它们的默认行为和输出。
DECLARE cursor_name SUBSCRIPTION CURSOR FOR subscription_name
将不再包括历史数据。相反,它将只返回自声明时起的增量数据。要包含历史数据,请在 DECLARE
语句末尾添加SINCE begin()
。
此外,从游标获取数据后的输出操作已从 int
更新为 varchar
以提高可读性。现在的输出将显示为INSERT
、UPDATE_INSERT
、DELETE
或 UPDATE_DELETE
,而不是简单的 1
、2
、3
或 4
。
最后,SQL命令 SHOW CURSORS
和 SHOW SUBSCRIPTION CURSORS
现已可用,它们将返回当前会话中创建的所有游标和订阅。
更多详细信息,请参见:
订阅[2]
RisingWave Python SDK
除了能使用 sqlalchemy[3]、ibis[4]、psycopg2[5] 等与PostgreSQL 兼容的 Python 工具连接 RisingWave 外,Python SDK 目前已进入公开预览阶段。只需运行 pip install risingwave-py
即可安装此 SDK。该 SDK 专为开发者设计,旨在简化与 RisingWave 的交互,优化事件驱动应用程序的部署。
更多详细信息,请参见:
risingwave-py 0.0.1[6]
时间旅行
时间旅行查询为 RisingWave Premium 特有功能。
时间旅行查询支持在特定时间点访问历史数据,对于跟踪数据变更、审计和合规至关重要。同时,简化了错误恢复,以便快速恢复数据以应对意外修改。
在 RisingWave 中,只需在任何 SELECT
查询后添加FOR SYSTEM_TIME AS OF
子句即可轻松访问历史数据。此外,系统参数 time_travel_retention_ms
必须设置为非零值以启用时间旅行查询。该系统参数还指示历史数据在被删除之前应存储多长时间。
例如,以下 SQL 查询可检索一天前的数据。
SELECT * FROM table_name FOR SYSTEM_TIME AS OF NOW() - '1' DAY;
您也可以通过使用日期时间或时间戳格式的时间戳来检索特定日期的数据。
更多详细信息,请参见:
时间旅行查询[7]
隐私数据管理
隐私数据管理为 RisingWave Premium 特有功能。
为安全管理外部数据库凭据(如 MySQL 和 PostgreSQL ),您可以使用 CREATE SECRET
命令创建凭据。在连接外部数据库时,作为 Source 或 Sink,必须在 WITH
选项中明确指定数据库凭据。然而,在大型组织中,这可能会带来安全风险。
为解决此问题,管理员可以创建存储数据库凭据的隐私数据,并允许其他人在创建连接到外部接收器时引用该隐私数据。这样,直接访问数据库凭据的人就会减少。
更多详细信息,请参见:
管理隐私数据[8]
集合操作关键字 CORRESPONDING
现在可以在集合操作 UNION
、INTERSECT
和 EXCEPT
中使用 CORRESPONDING
关键字。通过使用 CORRESPONDING
关键字,各表的列将自动匹配。您还可以通过使用 CORRESPONDING BY col1_name[, col2_name, ...]
指定要匹配的列。
假设有以下两个表。
CREATE TABLE employees (
employees_id int,
first_name varchar,
last_name varchar,
last_update timestamp
);
CREATE TABLE customers (
customer_id integer,
store_id int,
first_name varchar,
last_name varchar,
email varchar,
last_update timestamp,
);
如果在不使用 CORRESPONDING
的情况下合并表,则从每个表中选择的列必须匹配。以下查询将返回员工和客户的 first_name
和 last_name
。
SELECT first_name, last_name
FROM employees
UNION ALL
SELECT first_name, last_name
FROM customers;
现在如果使用CORRESPONDING
,则每个表中的列不需要匹配。以下查询将返回员工和客户的 first_name
、last_name
和 last_update
。
SELECT *
FROM employees
UNION ALL CORRESPONDING
SELECT *
FROM customers;
新关键字简化了集合操作,节省了寻找每个表匹配列的麻烦。
更多详细信息,请参见:
CORRESPONDING
在集合操作中的使用[9]
兼容 Parquet 文件
现在您可以从 Parquet 文件摄取数据,并将数据以 Parquet 格式导出到 AWS S3 和 Google Cloud Storage。
可以通过使用函数 file_scan
从 AWS S3 批量读取 Parquet 文件。或者,如果要从 AWS S3 或 Google Cloud Storage 创建表或源,可以指定编码格式和匹配模式为 parquet
。
函数 file_scan
的语法如下。使用此函数时,第一个 Parquet 文件应包含 Schema。目录中的所有 Parquet 文件应具有相同的 Schema,以避免出现问题。
SELECT
col1,
col2,
col3
FROM file_scan(
file_format,
storage_type,
s3_region,
s3_access_key,
s3_secret_key,
file_location);
RisingWave 也支持向文件系统导出 Parquet 格式的数据。为此,match_pattern
和 file_type
参数现在可以分别指定为 *.parquet
和 parquet
。ENCODE
选项也可以指定为 PARQUET
。
对 Parquet 文件的支持进一步增强了 RisingWave 作为统一流处理和批处理系统的能力。
更多详细信息,请参见:
以 Parquet 格式导出数据[10]
MySQL 的自动表 Schema 映射
MySQL 的自动表 Schema 映射为 RisingWave Premium 特有功能。
除了在摄取来自 MySQL 表的 CDC 数据时自动进行 Schema 映射外,RisingWave 现在还能够注册自动 Schema 更改。每当对源 MySQL 表的 Schema 进行修改时,这些更改将立即反映在您的 RisingWave 表中。之前,表 Schema 的修改只能通过使用 ALTER TABLE
命令手动进行添加或删除列。现在,通过在创建源时将参数auto.schema.change
设置为 true
,这一过程可以实现自动化。
CREATE SOURCE mysql_source WITH (
connector = 'mysql-cdc',
hostname = 'localhost',
port = '3306',
username = 'username',
password = 'password',
database.name = 'db_name',
server.id = '5701',
auto.schema.change = 'true'
);
更多详细信息,请参见:
自动映射上游表 Schema[11] 自动更改 Schema[12]
新增 Sink
RisingWave 通过为下游系统新增 Sink 继续扩展其生态系统。您现在可以直接将数据导出到 MongoDB 和 Azure Blob。如果您对特定 Sink 感兴趣,请查看我们的集成页面,投票表示对特定 Sink 的兴趣,或请求在其可用时通知您。
MongoDB
MongoDB 是一种流行的开源 NoSQL 数据库,专为存储和管理大量非结构化或半结构化数据而构建。现在您可以使用 CREATE SINK
命令直接将数据从 RisingWave 导出到 MongoDB。支持追加类型和 upsert 类型的接收器。对于追加类型的接收器,不需要定义主键,而 upsert 类型的接收器必须定义主键。
CREATE SINK mongodb_sink FROM mv
WITH (
connector='mongodb',
type = 'upsert',
mongodb.url = 'mongodb://mongodb:27017/?replicaSet=rs0',
collection.name = 'collection1',
primary_key='id'
);
Azure Blob
Azure Blob Storage 是一种基于云的对象存储解决方案,旨在存储和管理大量非结构化数据。您现在可以使用 CREATE SINK
命令将数据从 RisingWave 导出到 Azure Blob。
CREATE SINK azure_sink FROM mv
WITH (
connector = 'azblob',
azblob.container_name = 'container_name',
azblob.path = 'sink/file/location',
azblob.credentials.account_name = 'account_name',
azblob.credentials.account_key = 'account_key',
azblob.endpoint_url = 'url',
type = 'append-only',
force_append_only = 'true'
);
更多详细信息,请参见:
将数据从 RisingWave 导出到 MongoDB[13] 将数据从 RisingWave 导出到 Azure Blob[14]
2版本对比
在了解 2.0 版的新功能后,让我们回顾一下自 1.0 版以来的显著升级与改进。
扩展外部 Connector
作为分布式 SQL 数据库,我们深知拥有一个广泛而灵活的生态系统的重要性。对于想把 RisingWave 添加到其技术栈中的用户来说,轻松连接任何上下游系统是必须具备的功能。
自 1.0 版以来,我们增强了可用的上游 Connector 列表,特别是对 Apache Kafka 兼容平台的支持。例如,您现在还可以从 AutoMQ、Confluent Cloud、WarpStream 等平台摄取数据。
同时,我们的下游 Connector 数量已经从 6 个增加到 25 个,包含 ClickHouse、Delta Lake、Elasticsearch、Redis、Snowflake 等。
此外,我们对现有 Connector 进行了持续改进,以确保用户体验和性能随着每次迭代而提升。内置的 PostgreSQL 和 MySQL Source Connector 经过重大优化,摄取 CDC 数据变得更便捷和可定制。同时,Kafka Source 和 Sink 是我们最受欢迎的 Connector 之一,我们也对其进行了改善,以优化其性能。
我们会继续简化将 RisingWave 集成到您现有数据栈的过程,并不断增加新的集成选项。
高级 SQL 功能
RisingWave 支持完全通过 SQL 来管理和查询数据,因此我们内置了高级 SQL 命令和功能。
在过去一年中,我们实施和升级了一系列新 SQL 命令和子句:支持 INCLUDE
子句,以摄取每个负载中包含的元数据;创建订阅和游标以便于从表和物化视图中检索数据;ALTER
命令可以全面控制所有数据库对象,更易于操作各类属性等。
针对各种种类型的数据,RisingWave 都提供了可用的函数。我们引入了处理 jsonb
类型和 array
类型数据的高级函数。而对于复杂的字符串搜索模式,可以参考我们的正则表达式函数集合。
用户定义函数(UDF)也扩展到更多语言,并支持嵌入式 UDF,您可以在 RisingWave 中定义它们。此外,借助嵌入式 UDF,您还可以创建聚合函数。
作为一个流处理平台,RisingWave 提供了丰富的工具来处理实时数据流。水印、时间窗口和时间过滤等核心功能从一开始就已实现并持续改进。
可扩展性和数据库管理
为了保证流处理的顺畅体验,您可以轻松配置 RisingWave 的部署方式和计算资源以适应需求。
RisingWave 默认使用自适应并行度,当新的节点或 CPU 添加到集群时,它会自动调整并行性,提供更流畅的扩展体验。您也可以选择禁用此功能。
在管理元数据方面,您可以选择使用某种 SQL 后端(SQLite、PostgreSQL 或 MySQL)作为元存储后端。从 etcd 转向 SQL 后端会更加稳定并提升性能。
您可以根据需要配置一组运行时参数,包括改变创建的源和接收器的行为、执行的连接类型、流式和批处理并行性等,将 RisingWave 配置为完全适合您的需求。
3发展方向
从 2.0 版开始,RisingWave 将扩展其能力,不再仅仅是一个流处理平台,而是一个流批一体解决方案,但处理实时数据将始终是 RisingWave 的重点。现在我们还将扩展对数据湖和数据仓库等传统数据平台的支持,用户将能够对这些数据源执行即时查询。
欲了解 RisingWave 后续发展详细信息,请参见展望 RisingWave 2.0: 提供流批一体功能的 SQL 数据库。
此外,从 2.0 版本开始,我们将提供 RisingWave Premium,如果您有兴趣咨询采购,请发送邮件至 sales@risingwave-labs.com 或者添加微信:risingwave_assistant。
4结论
以上只是 RisingWave 2.0 版本新增的部分功能,如果您想了解本次更新的完整信息,请查看更详细的发布说明[15]。
发布说明: https://docs.risingwave.com/release-notes/
[2]订阅: https://docs.risingwave.com/docs/current/subscription/
[3]sqlalchemy: https://pypi.org/project/sqlalchemy-risingwave/
[4]ibis: https://ibis-project.org/backends/risingwave
[5]psycopg2: https://pypi.org/project/psycopg2/
[6]risingwave-py 0.0.1: https://pypi.org/project/risingwave-py/#description
[7]时间旅行查询: https://docs.risingwave.com/docs/current/time-travel-queries/
[8]管理隐私数据: https://docs.risingwave.com/docs/current/manage-secrets/
[9]CORRESPONDING
在集合操作中的使用: https://docs.risingwave.com/docs/current/query-syntax-set-operations/#corresponding-in-set-operations
以 Parquet 格式导出数据: https://docs.risingwave.com/docs/current/data-delivery/#sink-data-in-parquet-format
[11]自动映射上游表 Schema: https://docs.risingwave.com/docs/current/ingest-from-mysql-cdc/#automatically-map-upstream-table-schema
[12]自动更改 Schema: https://docs.risingwave.com/docs/current/ingest-from-mysql-cdc/#automatically-change-schema
[13]将数据从 RisingWave 导出到 MongoDB: https://docs.risingwave.com/docs/current/sink-to-mongodb/
[14]将数据从 RisingWave 导出到 Azure Blob: https://docs.risingwave.com/docs/current/sink-to-azure-blob/
[15]发布说明: https://docs.risingwave.com/release-notes/
关于 RisingWave
往期推荐
技术内幕