重大升级!RisingWave 2.0 发布!

文摘   科技   2024-09-26 15:58   湖北  

RisingWave 2.0 版本正式发布!此次更新为重大升级,和 1.0 版本相比,在生态支持、数据库管理、可扩展性和 SQL 功能等方面有了显著提升。

RisingWave 2.0 版本新增了许多 SQL 命令和子句,可以安全存储外部数据库凭据并执行时间旅行查询;新增了多种 Source 和 Sink,并对现有 Connectors 进行了显著改进。RisingWave 现在还支持以 Parquet 格式进行数据的摄取和导出。

如果您对 2.0 版的完整更新信息感兴趣,请查看发布说明[1]

1重点更新

以下是 RisingWave 2.0 版本的一些重点更新。

增强订阅和游标

RisingWave 2.0 版对订阅和游标进行了重大更改,改变了它们的默认行为和输出。

DECLARE cursor_name SUBSCRIPTION CURSOR FOR subscription_name 将不再包括历史数据。相反,它将只返回自声明时起的增量数据。要包含历史数据,请在 DECLARE 语句末尾添加SINCE begin()

此外,从游标获取数据后的输出操作已从 int 更新为 varchar 以提高可读性。现在的输出将显示为INSERTUPDATE_INSERTDELETE UPDATE_DELETE,而不是简单的 123 4

最后,SQL命令 SHOW CURSORS SHOW SUBSCRIPTION CURSORS 现已可用,它们将返回当前会话中创建的所有游标和订阅。

更多详细信息,请参见:

  • 订阅[2]

RisingWave Python SDK

除了能使用 sqlalchemy[3]ibis[4]psycopg2[5] 等与PostgreSQL 兼容的 Python 工具连接 RisingWave 外,Python SDK 目前已进入公开预览阶段。只需运行 pip install risingwave-py 即可安装此 SDK。该 SDK 专为开发者设计,旨在简化与 RisingWave 的交互,优化事件驱动应用程序的部署。

更多详细信息,请参见:

  • risingwave-py 0.0.1[6]

时间旅行

时间旅行查询为 RisingWave Premium 特有功能。

时间旅行查询支持在特定时间点访问历史数据,对于跟踪数据变更、审计和合规至关重要。同时,简化了错误恢复,以便快速恢复数据以应对意外修改。

在 RisingWave 中,只需在任何 SELECT 查询后添加FOR SYSTEM_TIME AS OF 子句即可轻松访问历史数据。此外,系统参数 time_travel_retention_ms须设置为非零值以启用时间旅行查询。该系统参数还指示历史数据在被删除之前应存储多长时间。

例如,以下 SQL 查询可检索一天前的数据。

SELECT * FROM table_name FOR SYSTEM_TIME AS OF NOW() - '1' DAY;

您也可以通过使用日期时间或时间戳格式的时间戳来检索特定日期的数据。

更多详细信息,请参见:

  • 时间旅行查询[7]

隐私数据管理

隐私数据管理为 RisingWave Premium 特有功能。

为安全管理外部数据库凭据(如 MySQL 和 PostgreSQL ),您可以使用 CREATE SECRET 命令创建凭据。在连接外部数据库时,作为 Source 或 Sink,必须在 WITH 选项中明确指定数据库凭据。然而,在大型组织中,这可能会带来安全风险。

为解决此问题,管理员可以创建存储数据库凭据的隐私数据,并允许其他人在创建连接到外部接收器时引用该隐私数据。这样,直接访问数据库凭据的人就会减少。

更多详细信息,请参见:

  • 管理隐私数[8]

集合操作关键字 CORRESPONDING

现在可以在集合操作 UNIONINTERSECT EXCEPT 中使用 CORRESPONDING 关键字。通过使用 CORRESPONDING 关键字,各表的列将自动匹配。您还可以通过使用 CORRESPONDING BY col1_name[, col2_name, ...]  指定要匹配的列

假设有以下两个表。

CREATE TABLE employees (
    employees_id int,
    first_name varchar,
    last_name varchar,
    last_update timestamp
);

CREATE TABLE customers (
    customer_id integer,
    store_id int,
    first_name varchar,
    last_name varchar,
    email varchar,
    last_update timestamp,
);

如果在不使用 CORRESPONDING 的情况下合并表,则从每个表中选择的列必须匹配。以下查询将返回员工和客户的 first_name last_name

SELECT first_name, last_name
FROM employees
UNION ALL
SELECT first_name, last_name
FROM customers;

现在如果使用CORRESPONDING,则每个表中的列不需要匹配。以下查询将返回员工和客户的 first_namelast_namelast_update

SELECT *
FROM employees
UNION ALL CORRESPONDING
SELECT *
FROM customers;

新关键字简化了集合操作,节省了寻找每个表匹配列的麻烦。

更多详细信息,请参见:

  • CORRESPONDING在集合操作中的使用[9]

兼容 Parquet 文件

现在您可以从 Parquet 文件摄取数据,并将数据以 Parquet 格式导出到 AWS S3 和 Google Cloud Storage。

可以通过使用函数 file_scan 从 AWS S3 批量读取 Parquet 文件。或者,如果要从 AWS S3 或 Google Cloud Storage 创建表或源,可以指定编码格式和匹配模式为 parquet

函数 file_scan 的语法如下。使用此函数时,第一个 Parquet 文件应包含 Schema。目录中的所有 Parquet 文件应具有相同的 Schema,以避免出现问题。

SELECT
  col1,
  col2,
  col3
FROM file_scan(
 file_format,
 storage_type,
 s3_region,
 s3_access_key,
 s3_secret_key,
 file_location);

RisingWave 也支持向文件系统导出 Parquet 格式的数据。为此,match_pattern file_type 参数现在可以分别指定为 *.parquet parquetENCODE 选项也可以指定为 PARQUET

对 Parquet 文件的支持进一步增强了 RisingWave 作为统一流处理和批处理系统的能力。

更多详细信息,请参见:

  • 以 Parquet 格式导出数据[10]

MySQL 的自动表 Schema 映射

MySQL 的自动表 Schema 映射为 RisingWave Premium 特有功能。

除了在摄取来自 MySQL 表的 CDC 数据时自动进行 Schema 映射外,RisingWave 现在还能够注册自动 Schema 更改。每当对源 MySQL 表的 Schema 进行修改时,这些更改将立即反映在您的 RisingWave 表中。之前,表 Schema 的修改只能通过使用 ALTER TABLE 命令手动进行添加或删除列。现在,通过在创建源时将参数auto.schema.change 设置为 true,这一过程可以实现自动化。

CREATE SOURCE mysql_source WITH (
 connector = 'mysql-cdc',
 hostname = 'localhost',
 port = '3306',
 username = 'username',
 password = 'password',
 database.name = 'db_name',
 server.id = '5701',
 auto.schema.change = 'true'
);

更多详细信息,请参见:

  • 自动映射上游表 Schema[11]
  • 自动更改 Schema[12]

新增 Sink

RisingWave 通过为下游系统新增 Sink 继续扩展其生态系统。您现在可以直接将数据导出到 MongoDB 和 Azure Blob。如果您对特定 Sink 感兴趣,请查看我们的集成页面,投票表示对特定 Sink 的兴趣,或请求在其可用时通知您。

MongoDB

MongoDB 是一种流行的开源 NoSQL 数据库,专为存储和管理大量非结构化或半结构化数据而构建。现在您可以使用 CREATE SINK 命令直接将数据从 RisingWave 导出到 MongoDB。支持追加类型和 upsert 类型的接收器。对于追加类型的接收器,不需要定义主键,而 upsert 类型的接收器必须定义主键。

CREATE SINK mongodb_sink FROM mv
WITH (
 connector='mongodb',
    type = 'upsert',
    mongodb.url = 'mongodb://mongodb:27017/?replicaSet=rs0',
    collection.name = 'collection1',
    primary_key='id'
);

Azure Blob

Azure Blob Storage 是一种基于云的对象存储解决方案,旨在存储和管理大量非结构化数据。您现在可以使用 CREATE SINK 命令将数据从 RisingWave 导出到 Azure Blob。

CREATE SINK azure_sink FROM mv
WITH (
 connector = 'azblob',
 azblob.container_name = 'container_name',
 azblob.path = 'sink/file/location',
 azblob.credentials.account_name = 'account_name',
 azblob.credentials.account_key = 'account_key',
 azblob.endpoint_url = 'url',
 type = 'append-only',
 force_append_only = 'true'
);

更多详细信息,请参见:

  • 将数据从 RisingWave 导出到 MongoDB[13]
  • 将数据从 RisingWave 导出到 Azure Blob[14]

2版本对比

在了解 2.0 版的新功能后,让我们回顾一下自 1.0 版以来的显著升级与改进。

扩展外部 Connector

作为分布式 SQL 数据库,我们深知拥有一个广泛而灵活的生态系统的重要性。对于想把 RisingWave 添加到其技术栈中的用户来说,轻松连接任何上下游系统是必须具备的功能。

自 1.0 版以来,我们增强了可用的上游 Connector 列表,特别是对 Apache Kafka 兼容平台的支持。例如,您现在还可以从 AutoMQ、Confluent Cloud、WarpStream 等平台摄取数据。

同时,我们的下游 Connector 数量已经从 6 个增加到 25 个,包含 ClickHouse、Delta Lake、Elasticsearch、Redis、Snowflake 等。

此外,我们对现有 Connector 进行了持续改进,以确保用户体验和性能随着每次迭代而提升。内置的 PostgreSQL 和 MySQL Source Connector 经过重大优化,摄取 CDC 数据变得更便捷和可定制。同时,Kafka Source 和 Sink 是我们最受欢迎的 Connector 之一,我们也对其进行了改善,以优化其性能。

我们会继续简化将 RisingWave 集成到您现有数据栈的过程,并不断增加新的集成选项。

高级 SQL 功能

RisingWave 支持完全通过 SQL 来管理和查询数据,因此我们内置了高级 SQL 命令和功能。

在过去一年中,我们实施和升级了一系列新 SQL 命令和子句:支持 INCLUDE 子句,以摄取每个负载中包含的元数据;创建订阅和游标以便于从表和物化视图中检索数据;ALTER 命令可以全面控制所有数据库对象,更易于操作各类属性等。

针对各种种类型的数据,RisingWave 都提供了可用的函数。我们引入了处理 jsonb 类型和 array 类型数据的高级函数。而对于复杂的字符串搜索模式,可以参考我们的正则表达式函数集合。

用户定义函数(UDF)也扩展到更多语言,并支持嵌入式 UDF,您可以在 RisingWave 中定义它们。此外,借助嵌入式 UDF,您还可以创建聚合函数。

作为一个流处理平台,RisingWave 提供了丰富的工具来处理实时数据流。水印、时间窗口和时间过滤等核心功能从一开始就已实现并持续改进。

可扩展性和数据库管理

为了保证流处理的顺畅体验,您可以轻松配置  RisingWave 的部署方式和计算资源以适应需求。

RisingWave 默认使用自适应并行度,当新的节点或 CPU 添加到集群时,它会自动调整并行性,提供更流畅的扩展体验。您也可以选择禁用此功能。

在管理元数据方面,您可以选择使用某种 SQL 后端(SQLite、PostgreSQL 或 MySQL)作为元存储后端。从 etcd 转向 SQL 后端会更加稳定并提升性能。

您可以根据需要配置一组运行时参数,包括改变创建的源和接收器的行为、执行的连接类型、流式和批处理并行性等,将 RisingWave 配置为完全适合您的需求。

3发展方向

从 2.0 版开始,RisingWave 将扩展其能力,不再仅仅是一个流处理平台,而是一个流批一体解决方案,但处理实时数据将始终是 RisingWave 的重点。现在我们还将扩展对数据湖和数据仓库等传统数据平台的支持,用户将能够对这些数据源执行即时查询。

欲了解 RisingWave 后续发展详细信息,请参见展望 RisingWave 2.0: 提供流批一体功能的 SQL 数据库

此外,从 2.0 版本开始,我们将提供 RisingWave Premium,如果您有兴趣咨询采购,请发送邮件至 sales@risingwave-labs.com 或者添加微信:risingwave_assistant。

4结论

以上只是 RisingWave 2.0 版本新增的部分功能,如果您想了解本次更新的完整信息,请查看更详细的发布说明[15]

参考资料
[1]

发布说明: https://docs.risingwave.com/release-notes/

[2]

订阅: https://docs.risingwave.com/docs/current/subscription/

[3]

sqlalchemy: https://pypi.org/project/sqlalchemy-risingwave/

[4]

ibis: https://ibis-project.org/backends/risingwave

[5]

psycopg2: https://pypi.org/project/psycopg2/

[6]

risingwave-py 0.0.1: https://pypi.org/project/risingwave-py/#description

[7]

时间旅行查询: https://docs.risingwave.com/docs/current/time-travel-queries/

[8]

管理隐私数据: https://docs.risingwave.com/docs/current/manage-secrets/

[9]

CORRESPONDING在集合操作中的使用: https://docs.risingwave.com/docs/current/query-syntax-set-operations/#corresponding-in-set-operations

[10]

以 Parquet 格式导出数据: https://docs.risingwave.com/docs/current/data-delivery/#sink-data-in-parquet-format

[11]

自动映射上游表 Schema: https://docs.risingwave.com/docs/current/ingest-from-mysql-cdc/#automatically-map-upstream-table-schema

[12]

自动更改 Schema: https://docs.risingwave.com/docs/current/ingest-from-mysql-cdc/#automatically-change-schema

[13]

将数据从 RisingWave 导出到 MongoDB: https://docs.risingwave.com/docs/current/sink-to-mongodb/

[14]

将数据从 RisingWave 导出到 Azure Blob: https://docs.risingwave.com/docs/current/sink-to-azure-blob/

[15]

发布说明: https://docs.risingwave.com/release-notes/

关于 RisingWave 

RisingWave 是一款基于 Apache 2.0 协议开源的分布式流数据库,致力于为用户提供极致简单、高效的流数据处理与管理能力。RisingWave 采用存算分离架构,实现了高效的复杂查询、瞬时动态扩缩容以及快速故障恢复,并助力用户极大地简化流计算架构,轻松搭建稳定且高效的流计算应用。
RisingWave 始终聆听来自社区的声音,并积极回应用户的反馈。目前,RisingWave 已汇聚了 150+ 名开源贡献者和 3000+ 名社区成员。全球范围内,已有上百个 RisingWave 集群在生产环境中部署。

往期推荐

技术内幕

如何上手 RisingWave 👉 新手入门教程

RisingWave 中文用户文档上线,阅读更高效!

深入探索 RisingWave 中的高可用性与容错机制

深入理解 RisingWave 流处理引擎(三):触发机制

深入理解 RisingWave 流处理引擎(二):计算模型

深入理解 RisingWave 流处理引擎(一):总览

用户案例
视源股份(CVTE)IT 流计算应用历程
尘锋 SCRM 如何使用 RisingWave 实时打宽
RisingWave 在超百亿管理规模对冲基金公司中的应用
金融科技公司 Kaito 使用 RisingWave 实现实时智能化
龙腾出行如何通过 RisingWave 实现实时数据分析
RisingWave 助力乾象投资打造实时监控平台

RisingWave中文开源社区
RisingWave 是一款开源分布式 SQL 流数据库,致力于大幅降低流计算使用门槛与复杂度。RisingWave 已为全球超百家企业构建新一代流处理与分析平台。
 最新文章