ActionOMS | 从 OceanBase 到实时数仓:数据同步如何助力业务优化

科技   科技   2024-11-25 16:30   上海  

作者:郭奥门,爱可生研发工程师,负责数据迁移同步产品的开发。偶尔觉得自己能搬山,结果只会搬数据。

爱可生开源社区出品,原创内容未经授权不得随意使用,转载请联系小编并注明来源。

本文约 1800 字,预计阅读需要 6 分钟。


1背景

在当今数字化时代,实时数仓技术已广泛应用于众多企业,成为支持业务决策的关键因素。金融机构需实时监控风险,电商平台要动态推荐商品,制造业则依靠实时数据优化生产链。在这些场景中,及时获取数据库增量记录至关重要,其同步效率直接影响分析的实时性和精准度。

OceanBase 作为一款高性能分布式关系型数据库,以其强一致性和高吞吐能力,为企业核心业务系统提供了有力支撑。为有效捕获和同步 OceanBase 的增量数据,ActionOMS 数据同步工具提供了高效灵活的解决方案。ActionOMS 基于原生 OceanBase CDC 技术,通过 RPC 方式分布式拉取多个日志流的 Redo 日志,进行分布式事务组装和排序、数据解析、语句格式化等处理(基于多版本 schema 映射至正确的表和列,将 Redo 日志转换为逻辑日志格式),最终以事务为单位输出变更数据。此外,ActionOMS 支持 OceanBase 数据通过 Kafka、RocketMQ、DataHub 等多种数据管道同步至目标系统,助力企业快速构建高效、稳定的实时数仓。

接下来,我们将通过具体示例来演示如何运用 ActionOMS 实现将 OceanBase 的数据同步至 Kafka。

2示例

2.1 业务场景

银行的交易流水表存储着客户每日交易记录,但由于系统延迟、重复提交等问题,可能存在重复记录(如同一笔交易多次记录)。同时,银行需对去重后的数据进行汇总,以分析客户消费习惯和每日交易金额。

以下是包含重复数据的交易流水表示例:

2.2 数据源 - 数据通道

ActionOMS 支持将源端的表结构、全量数据和增量数据(包含 DML/DDL)同步至 Kafka,且同步到 Kafka 支持多种消息格式,如 Default、Canal、Dataworks(支持 2.0 版本)、SharePlex、DefaultExtendColumnType、Debezium、DebeziumFlatten、DebeziumSmt 和 Avro。

注:当项目意外中断进行断点续传时,Kafka 实例中可能会存在部分重复数据(最近一分钟内),因此下游系统需具备排重能力。

以下是通过 ActionOMS 搭建 OceanBase 同步到 Kafka 链路,将全增量数据同步至 Kafka 的示例:

创建链路时使用默认格式,最终同步到 Kafka 中的消息格式如下:

{
    "prevStruct"null,    // 变更前镜像
    "postStruct": {    // 变更后镜像
        "order_id""RTDW202411210006",    // 键值对,包含全量键值
        "user""u001",
        "product""p008",
        "num"800,
        "proctime""1732181459",
        "__pk_increment"8    //针对无主键表,会同步OB的隐藏主键
    },
    "allMetaData": {
        "checkpoint""1732168058",    // 当前同步位点,增量阶段表示同步到的时间位点(秒级时间戳),全量阶段使用主键键值对表示
        "dbType""OB_MYSQL",    // 数据库的类型
        "storeDataSequence"173216805935500000,
        "db""oms_mysql.rt_dw_test",    // 使用 SQL 语句进行变更的数据库的名称
        "timestamp""1732168059",    // 数据变更秒级时间戳,仅增量存在
        "uniqueId""1002_1001_7681208\u0000\u0000_5572734820_0",    // 增量中表示 STORE 传递下来的事务序号标识
        "transId""1002_7681208",    // 在 OceanBase 数据库中表示事务 ID
        "clusterId""33",    // 在 OceanBase 数据库中表示 clusterId
        "ddlType"null,    // DDL 具体类型
        "record_primary_key""__pk_increment",    // 主键列的名称。如果存在多列使用 \u0001 分割
        "source_identity""OB_MYSQL_ten_1_698lmn9kj7cw-1-0",    // 源端标识
        "record_primary_value""8",    // 主键值。如果存在多列使用 \u0001 分割
        "table_name""orders"    // 使用 SQL 语句进行变更的表的名称
    },
    "recordType""INSERT"    // 变更类型,INSERT/UPDATE/DELETE/HEARTBEAT/DDL
}

2.3 数据通道 - Flink - 数据仓库

Flink 订阅 Kafka 中的消息,依据主键或非空唯一键去重,并统计每日交易量和交易总金额,最终将数据存储至数据仓库。

以下是 Flink 从上述 Kafka 中取出消息,使用 Flink  ROW_NUMBER 方法去重后统计每日交易量和交易总金额,并将汇总后的数据存入 OceanBase 的示例:

1. 定义输入

根据 Kafka中 消息格式,定义输入表并连接 Kafka 服务。

CREATE TABLE kafka_input (
  prevStruct ROW<>,
  postStruct ROW<
    order_id STRING,
    `user` STRING,
    product STRING,
    num INT,
    proctime STRING
  >,
  allMetaData ROW<>,
  recordType STRING
WITH (
  'connector' = 'kafka',
  'topic' = 'rt_dw_test',    //kafka的topic
  'properties.bootstrap.servers' = 'ip:port',    //kafka连接信息
  'properties.group.id' = 'oms_test_1',    //Kafka source 的消费组 id
  'scan.startup.mode' = 'earliest-offset',    //Kafka consumer 的启动模式
  'format' = 'json',
  'json.fail-on-missing-field' = 'false',
  'json.ignore-parse-errors' = 'true',
  'json.timestamp-format.standard' = 'ISO-8601'
);

2. 定义输出

根据 OceanBase 中汇总表结构,定义输出表并通过 JDBC 连接 OceanBase 服务。

CREATE TABLE daily_order_summary (
  order_date DATE,
  total_orders BIGINT,
  total_amount DECIMAL(102),
  PRIMARY KEY (order_date) NOT ENFORCED
WITH (
  'connector' = 'jdbc',
  'table-name' = 'daily_order_summary',    //连接到 JDBC 表的名称
  'url' = 'jdbc:mysql://ip:port/rt_dw_test',    //JDBC 数据库 url
  'username' = 'test',    //账号
  'password' = 'test'    //密码
);

3. Flink 计算

根据 Flink ROW_NUMBER方法,指定 order_id 为去重键,对 proctime 转成时间格式并以此作为排序列,设置 WHERE rownum = 1 实现数据去重后,对交易日期分组统计每日交易量和交易总金额。

INSERT INTO daily_order_summary
SELECT 
    CAST(FROM_UNIXTIME(CAST(postStruct.proctime AS BIGINT)) AS DATEAS order_date,
    COUNT(DISTINCT postStruct.order_id) AS total_orders,
    SUM(postStruct.num) AS total_amount
FROM (
    SELECT *,
           ROW_NUMBER() OVER (PARTITION BY postStruct.order_id 
                              ORDER BY TO_TIMESTAMP(FROM_UNIXTIME(CAST(postStruct.proctime AS BIGINT))) DESCAS row_num
    FROM kafka_input
WHERE row_num = 1
GROUP BY CAST(FROM_UNIXTIME(CAST(postStruct.proctime AS BIGINT)) AS DATE);

2.4 验证

2.4.1 处理存量数据

Flink 运行上述 SQL 任务后,对存量汇总后的数据进行检查,发现不包含重复数据,符合预期。

2.4.2 处理增量数据

在 OceanBase 中新增订单数据和新增重复数据后:

数据实时同步至 Kafka,继而经过 Flink 计算汇总后的数据包含新增订单数据且不包含重复数据,符合预期。

2.5 总计

2.5.1 数据同步与处理流程

  • 首先,通过 ActionOMS 工具将 OceanBase 中的交易流水表数据(包含全量和增量数据)同步至 Kafka,同步过程中支持多种消息格式,且需注意断点续传时可能产生的重复数据问题。
  • 然后,Flink 订阅 Kafka 中的消息,利用 ROW_NUMBER 方法根据 order_id 去重,并对 proctime 进行处理后按照交易日期分组统计每日交易量和交易总金额。
  • 最后,将处理后的数据存储至数据仓库中,完成整个数据处理流程。

2.5.2 数据准确性验证

  • 针对存量数据,经过 Flink 处理后,成功去除重复数据,汇总结果准确反映了各日期的交易情况。
  • 对于增量数据,新增订单数据被正确纳入统计,重复数据未影响最终结果,进一步证明了整个数据同步和处理流程的有效性和准确性。

3总结

在实时数仓建设中,利用 ActionOMS 同步 OceanBase 数据是实现高效实时分析的关键路径。它赋予企业敏捷的数据处理能力,使其能快速响应业务变化。未来,随着数据同步技术持续演进,ActionOMS 有望进一步提升性能和功能,为企业在实时分析与智能决策领域提供更广泛、更有力的支持与保障。


什么是 ActionDB?

ActionDB 作为一款卓越的企业级分布式数据库,其设计核心依托于 OceanBase 的开源内核,辅以爱可生在开源数据库领域的深厚积累与技术专长,荣获原厂的正式授权及内核级技术支持。

ActionDB 集 OceanBase 的稳健性与高性能于一身,更进一步强化了与 MySQL 的兼容性,融合爱可生独有的安全特性与用户友好的运维管理工具,缔造了更高品质、更全面的数据库解决方案。

ActionDB 的 MySQL 8.0 协议全面兼容能力,辅以基于 MySQL binlog 的双向复制技术,为业务系统与下游数据平台提供了安全无虞、无缝迁移的完美方案,确保数据迁移的零风险与无感知。

更多了解:ActionDB 扩展 OB GIS 能力:新增 ST_PointN 函数

什么是 ActionOMS?

ActionOMS 基于 OMS 本身的优秀能力,并依托于爱可生公司在数据库及周边工具的多年开发经验、对数据迁移/同步过程的深刻理解与运维经验,推出的定制化版本。

ActionOMS 由 OceanBase 向爱可生进行了全部代码授权,可对 OMS 问题进行源码解释并修复,同时可以接受定制化开发的 OMS 版本。

更多了解:

参考资料

[1]

OceanBase: https://www.oceanbase.com/

[2]

达梦数据库: https://www.dameng.com/DM8.html

[3]

ActionDB: https://www.actionsky.com/actionDB


本文关键字:#OceanBase# #ActionDB# #ActionOMS# #Kafka #数据迁移#




微信扫描小程序码,进行在线咨询预约:



商业支持团队联系方式如下:

400-820-6580 / 13916131869 / 18930110869


✨ Github:https://github.com/actiontech/sqle

📚 文档:https://actiontech.github.io/sqle-docs/

💻 官网:https://opensource.actionsky.com/sqle/

👥 微信群:请添加小助手加入 ActionOpenSource

🔗 商业支持:https://www.actionsky.com/sqle

爱可生开源社区
爱可生开源社区,提供稳定的MySQL企业级开源工具及服务,每年1024开源一款优良组件,并持续运营维护。
 最新文章