摘要:本文撰写自某新能源企业的研发工程师 单葛尧 老师。本文详细介绍该新能源企业的大数据平台中 CDC 技术架构选型和 Flink CDC 的最佳实践。主要有以下几个内容:
1. CDC 方案选型
2. 方案落地实施
3. 作业平台管理
4. 后续规划
我们是一家专注于新能源动力电池制造的企业,致力于推动能源技术的发展与应用。作为一家具有多年行业经验的企业,我们在新能源领域积累了深厚的技术实力和市场认知,业务涵盖了新能源产业链的关键环节。从上游的装备制造到下游的应用解决方案,为客户提供了全方位的服务。
随着企业业务的不断发展,对数据实时性的要求日益提高。如何管理复杂的 Flink 任务,提高任务的开发效率,降低任务的运维难度,成为我们亟待解决的问题。接下来,我将介绍在我们团队在新能源制造业中大数据平台 CDC 技术架构的选型和 Flink CDC 的最佳实践。
CDC 方案选型
目前,我们团队规划的数据平台架构如上图所示,数据源这一层主要以 Oracle 为主,且跨网络的分散在各个基地,考虑到数据源支持和时效性,我们需要支持 Oracle CDC 数据采集,结合我们的业务场景,我们调研了实时采集的如下方案:
此外,考虑到 OGG 成本高,链路长,且文档资源少,而 Debezium 配置复杂,而且在后续大数据流量或存在复杂的计算逻辑时难以为继,最终我们选择了 Flink CDC 作为 Oracle 的实时采集的工具方案。
方案落地实施
我们团队在 Flink CDC 中的实施落地主要分为以下几步:
确保 Oracle 日志归档模式开启,开通 Oracle 整库补充日志,开通表级别的全列补充日志。
赋予操作用户各类权限,如表的操作权限,logminer 与 archive log 相关视图查询权限等。
开启跨基地网络权限,打通 Hadoop 集群对 Oracle 的访问权限。
通过平台编写 Flink SQL 任务,将 Oracle 的数据直接存入到 Doris 中,下图为线上作业效果图。
该配置开启了 Oracle 日志在线挖掘模式,如果没有这个设置,会导致生产环境默认策略读取 log 较慢,且默认策略会写入数据字典信息到 redo log 中导致日志量增加较多。但该配置也有缺点,不写入数据字典到 redo log 中,会导致无法处理 DDL 语句。
'debezium.log.mining.strategy'='online_catalog',
'debezium.log.mining.continuous.mine'='true'
我们在同步 Oracle11g 的数据时,遇到了报错
Caused by: io.debezium.DebeziumException: Supplemental logging not configured for table xxx. Use command: ALTER TABLE xxx ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS
该错误是因为对于 Oracle11 版本,Debezium 会默认把 tableIdCaseInsensitive 设置为 true, 导致表名被更新为小写,因此在 Oracle 中查询不到 这个表补全日志设置,导致误报这个 Supplemental logging not configured for table 错误”,我们只需要开启如下设置即可
'debezium.database.tablename.case.insensitive'='false'
我们的任务在运行一段时间后报错
Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.DataException: file is not a valid field name
Flink Oracle CDC connector在采集分区表的时候存在问题,报错为
Caused by: io.debezium.DebeziumException: The db history topic or its content is fully or partially missing. Please check database history topic configuration and re-execute the snapshot
这是由于分区表未能被成功扫描到,我们进入到源码的类 OracleConnectionUtils 中可以看到
String queryTablesSql = "SELECT OWNER ,TABLE_NAME,TABLESPACE_NAME FROM ALL_TABLES WHERE TABLESPACE_NAME IS NOT NULL AND TABLESPACE_NAME NOT IN ('SYSTEM','SYSAUX') AND NESTED = 'NO' AND TABLE_NAME NOT IN (SELECT PARENT_TABLE_NAME FROM ALL_NESTED_TABLES)";
当然写入 Doris 的速度也可能存在慢的现象,特别是 StreamLoad 中进程 stopped 与返回 loadResult 结果之间时间较长时的问题,得益于 Apache Doris 社区的帮助,我们总结以下几点优化方案:
看BE的资源是不是满了,检查 IO、CPU、内存
换成 2pc 导⼊,⾮ 2pc 会等待 publish,2pc 导⼊ publish 是异步的
mow 尝试换成 mor 表
show proc '/transactions' 看看事务的状态,如果都是 committed,说明 publish 卡住了,如果 io 不⾼,可以增加 publish 的线程和超时
publish_version_timeout_second=60
publish_version_worker_count=16
增加 be.conf 的配置 ,适当提⾼下⾯的参数的配置
doris_scanner_thread_pool_thread_num=1024
webserver_num_workers=1024
fragment_pool_queue_size=4096
fragment_pool_thread_num_max=2048
doris_max_remote_scanner_thread_pool_thread_num=2048
作业平台管理
此外 Apache StreamPark 也给我们带来了很多惊喜,在开发 FlinkSQL 任务的过程中,借助 StreamPark,极大的提升了开发时的效率,我罗列了我们开发团队觉得非常优秀的点:
3.1 多版本 Flink 管理
在开发过程中,可能会因为需求Flink新版本的某项能力,或是某些组件对 Flink版本的强需求而引入多版本的 Flink,在 StreamPark 中,只需要配置一个 Flink 的路径并为其命名,即可在开发任务中轻松地选择我所需要的 Flink 版本。
3.2 对于Git的集成
3.3 强大的作业运维能力及告警机制
StreamPark 有着完善的作业操作记录,可以清晰的记录所有任务 Release、Start、Cancel 的记录,对于 yarn application 形式的任务,记录了每次运行的 appId,便于后续的 yarn 日志翻阅、运行状态的监测等等。一旦报错,StreamPark 会迅速发送告警,如果配置了重启机制,可以实现分钟级的重启效率。
3.4 Flink SQL 在线开发
StreamPark 的类 IDE 样式的开发窗口相当优越,可以帮助我们检查低级的语法错误,简单的轻松地便可以配置各类想要配置的参数,如分配 JM、TM 的资源,配置预备提交的 yarn 队列,它的作业依赖配置更是相当简易,提供了填写 pom 和上传 jar 包两种形式,再也无需将包傻傻地通过hdfs上传,极大地提升了我们的开发效率。
后续规划
后续团队考虑通过 Oracle 的机制, 将 redo log 异步传输到另外一台没有业务压力的 Oracle 实例上, 然后在另外一台机器上开启并发切分 scn range 事件,开启 LogMiner 线程并发的解析对应 scn range 事件,顺序的处理获取到的事件。
欢迎大家多多关注 Flink CDC,从钉钉用户交流群[1]、微信公众号[2]、Slack 频道[3]、邮件列表[4]加入 CDC 用户社区,以及在 Flink CDC GitHub 仓库[5]上参与代码贡献!
[1] “ Flink CDC 社区 ② 群”群的钉钉群号:80655011780
[2] ” Flink CDC 公众号“:
[3] https://flink.apache.org/what-is-flink/community/#slack
[4] https://flink.apache.org/what-is-flink/community/#mailing-lists
[5] https://github.com/apache/flink-cdc
活动推荐