公司业务情况介绍
大数据技术痛点以及选型
生产实践
问题排查分析
未来规划
01
海程邦达集团一直专注于供应链物流领域,通过打造优秀的国际化物流平台,为客户提供端到端一站式智慧型供应链物流服务。集团现有员工 2000 余人,年营业额逾 120 亿人民币,网络遍及全球 200 余个港口,在海内外有超 80 家分、子公司,助力中国企业与世界互联互通。
业务背景
随着公司规模的不断扩大和业务复杂性的增加,为了更好地实现资源优化和流程改进,公司运营与流程管理部需要实时监控公司的业务运转情况,以确保业务流程的稳定性和高效性。
公司运营与流程管理部负责监督公司各类业务流程的执行,包括海运、空运、铁运各个大区和事业部的订单量,大客户的订单量,航线订单量,关务、仓储、陆运各个操作站点的委托量,公司当天各个大区和事业部实际收入和支出情况等。通过对这些流程的监控和分析,公司能够识别出潜在的问题和瓶颈,提出改进措施和建议,以优化公司运营效率。
实时数仓架构:
当前系统要求直接从生产系统收集实时数据,但存在多个数据源需要进行关联查询,而帆软报表在处理多个数据源时展示不够友好,且无法再次聚合多个数据源。定时查询生产系统会给生产系统数据库带来压力,影响生产系统的稳定运行。因此,我们需要引入一个可以通过 Flink CDC [1] 技术实现流式处理的数仓,以解决实时数据处理的问题。这个数仓需要能够从多个数据源收集实时数据并在此基础上实现复杂的关联 SQL 查询、机器学习等操作,并且可以避免不定时查询生产系统,从而减轻生产系统的压力,保障生产系统的稳定运行。
02
海程邦达大数据团队建立以来一直以高效运维工具或平台来实现对人员的高效配置,优化重复劳动,手工作业。
在离线批数处理已能够支持集团基础驾驶舱和管理报表的情况下,集团运管部门提出了业务要实时统计订单数量,操作单量的需求,财务部门有现金流实时展示的需求,在这样的背景下,基于大数据的流批一体方案势在必行。
虽然大数据部门已经使用了 Apache Doris [2] 来实现湖仓一体的存储和计算,此前已在 Doris 社区发表湖仓一体建设的文章,但是有些问题有待解决,流式数据存储无法复用、中间层数据不可查、做不到实时聚合计算问题。
按照架构演进时间排序,近几年通用的架构解决方案如下:
hadoop架构
lambda架构
Batch层: 对离线数据进行处理,最后提供 view 服务给到业务; Speed层: 对实时增量数据进行处理,最后提供 view 服务给到业务;
Serving层: 响应用户的请求,实现离线和增量数据的聚合计算,并最终提供服务;
它通常使用流处理引擎实现,例如Apache Flink、Apache Storm、Apache Kinesis、 Apache Kafka,旨在处理大量数据流并提供快速可靠的查询访问结果。
优点是:单数据流处理框架
为此我们也调研了 Iceberg [4] ,它的快照功能一定程度上能够实现流批一体,但是它的问题是基于 kafka 做的实时表中间层不可查或者无法复用已经存在的表,对 kafka 有强依赖,需要利用 kafka 将中间结果写到 iceberg 表,增加了系统的复杂度和可维护性。
流式数仓(kappa架构的延续)
延续了 kappa 架构的特点,一套流处理架构,好处在与,底层 Paimon 的技术支撑使得数据在全链路可查,数仓分层架构得以复用,同时兼顾了离线和实时的处理能力,减少存储和计算的浪费
03
本方案采用 Flink Application On K8s 集群,Flink CDC 实时摄取业务系统关系型数据库数据,通过 StreamPark 任务平台提交 Flink + Paimon Streaming Data Warehouse 任务, 最后采用 Trino 引擎接入 Finereport 提供服务和开发人员的查询。Paimon 底层存储支持 S3 协议,因为公司大数据服务依赖于阿里云所以使用对象存储OSS作为数据文件系统。
主要采用组件版本如下:
flink-1.16.0-scala-2.12
paimon-flink-1.16-0.4-20230424.001927-40.jar
apache-streampark_2.12-2.0.0
kubernetes v1.18.3
环境构建
#解压
tar zxvf flink-1.16.0-scala-2.12.tar.gz
#修改 flink-conf 配置文件并启动集群
jobmanager.rpc.address: localhost
jobmanager.rpc.port: 6123
jobmanager.bind-host: localhost
jobmanager.memory.process.size: 4096m
taskmanager.bind-host: localhost
taskmanager.host: localhost
taskmanager.memory.process.size: 4096m
taskmanager.numberOfTaskSlots: 4
parallelism.default: 4
akka.ask.timeout: 100s
web.timeout: 1000000
#checkpoints&&savepoints
state.checkpoints.dir: file:///opt/flink/checkpoints
state.savepoints.dir: file:///opt/flink/savepoints
execution.checkpointing.interval: 2min
#当作业手动取消/暂停时,将会保留作业的 Checkpoint 状态信息
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
state.backend: rocksdb
#已完成的CP保存个数
state.checkpoints.num-retained: 2000
state.backend.incremental: true
execution.checkpointing.checkpoints-after-tasks-finish.enabled: true
#OSS
fs.oss.endpoint: oss-cn-zhangjiakou-internal.aliyuncs.com
fs.oss.accessKeyId: xxxxxxxxxxxxxxxxxxxxxxx
fs.oss.accessKeySecret: xxxxxxxxxxxxxxxxxxxxxxx
fs.oss.impl: org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem
jobmanager.execution.failover-strategy: region
rest.port: 8081
rest.address: localhost
建议可以在本地添加 FLINK_HOME 方便在上 k8s 之前本地排查问题使用
vim /etc/profile
#FLINK
export FLINK_HOME=/data/src/flink-1.16.0-scala-2.12
export PATH=$PATH:$FLINK_HOME/bin
source /etc/profile
在 StreamPark 添加 Flink conf:
构建 Flink 1.16.0 基础镜像从 dockerhub拉取对应版本的镜像
#拉取镜像
docker pull flink:1.16.0-scala_2.12-java8
#打上 tag
docker tagflink:1.16.0-scala_2.12-java8 registry-vpc.cn-zhangjiakou.aliyuncs.com/xxxxx/flink:1.16.0-scala_2.12-java8
#push 到公司仓库
docker pushregistry-vpc.cn-zhangjiakou.aliyuncs.com/xxxxx/flink:1.16.0-scala_2.12-java8
创建 Dockerfile 文件 & target 目录将 flink-oss-fs-hadoop JAR包放置在该目录 Shaded Hadoop OSS file system jar 包下载地址:
https://repository.apache.org/snapshots/org/apache/paimon/paimon-oss/
.
├── Dockerfile
└── target
└── flink-oss-fs-hadoop-1.16.0.jar
touch Dockerfile
mkdir target
#vim Dockerfile
FROM registry-vpc.cn-zhangjiakou.aliyuncs.com/xxxxx/flink:1.16.0-scala_2.12-java8
RUN mkdir /opt/flink/plugins/oss-fs-hadoop
COPY target/flink-oss-fs-hadoop-1.16.0.jar /opt/flink/plugins/oss-fs-hadoop
build 基础镜像
docker build -t flink-table-store:v1.16.0 .
docker tag flink-table-store:v1.16.0 registry-vpc.cn-zhangjiakou.aliyuncs.com/xxxxx/flink-table-store:v1.16.0
docker push registry-vpc.cn-zhangjiakou.aliyuncs.com/xxxxx/flink-table-store:v1.16.0
接下来准备 Paimon jar 包,可以在 Apache Repository [8] 下载对应版本,需要注意的是要和 flink 大版本保持一致
使用 StreamPark 管理作业
前提条件:
Kubernetes 客户端连接配置 Kubernetes RBAC 配置 容器镜像仓库配置 (案例中使用的是阿里云镜像免费版) 创建挂载 checkpoint/savepoint 的 pvc 资源
Kubernetes 客户端连接配置:
将 k8s master节点~/.kube/config 配置直接拷贝到 StreamPark 服务器的目录,之后在 StreamPark 服务器执行以下命令显示 k8s 集群 running 代表权限和网络验证成功。
kubectl cluster-info
Kubernetes RBAC 配置,创建 streamx 命名空间:
kubectl create ns streamx
kubectl create secret docker-registry streamparksecret --docker-server=registry-vpc.cn-zhangjiakou.aliyuncs.com --docker-username=xxxxxx --docker-password=xxxxxx -n streamx```
容器镜像仓库配置:
创建 k8s secret 密钥用来拉取 ACR 中的镜像 streamparksecret 为密钥名称 自定义
kubectl create secret docker-registry streamparksecret --docker-server=registry-vpc.cn-zhangjiakou.aliyuncs.com --docker-username=xxxxxx --docker-password=xxxxxx -n streamx
OSS CSI 插件:
可以使用 OSS CSI 插件来帮助简化存储管理。您可以使用 csi 配置创建 pv,并且 pvc、pod 像往常一样定义,yaml 文件参考:
https://bondextest.oss-cn-zhangjiakou.aliyuncs.com/ossyaml.zip
- 创建具有所需 RBAC 权限的服务帐户,参考:
https://github.com/kubernetes-sigs/alibaba-cloud-csi-driver/blob/master/docs/oss.md
kubectl -f rbac.yaml
- 部署OSS CSI 插件
kubectl -f oss-plugin.yaml
kubectl -f checkpoints_pv.yaml kubectl -f savepoints_pv.yaml
- 创建 CP&SP 的 PVC
kubectl -f checkpoints_pvc.yaml kubectl -f savepoints_pvc.yaml
案例
统计海运空运实时委托单量
任务提交:初始化 Paimon catalog 配置
SET 'execution.runtime-mode' = 'streaming';
set 'table.exec.sink.upsert-materialize' = 'none';
SET 'sql-client.execution.result-mode' = 'tableau';
-- 创建并使用 FTS Catalog 底层存储方案采用阿里云oss
CREATE CATALOG `table_store` WITH (
'type' = 'paimon',
'warehouse' = 'oss://xxxxx/xxxxx' #自定义oss存储路径
);
USE CATALOG `table_store`;
一个任务同时抽取 postgres、mysql、sqlserver 三种数据库的表数据写入到 P aimon
信息如下:
Development Mode:Flink SQL
Execution Mode :kubernetes application
Flink Version :flink-1.16.0-scala-2.12
Kubernetes Namespace :streamx
Kubernetes ClusterId :(任务名自定义即可)
Flink Base Docker Image :registry-vpc.cn-zhangjiakou.aliyuncs.com/xxxxx/flink-table-store:v1.16.0 #上传到阿里云镜像仓库的基础镜像
Rest-Service Exposed Type:NodePort
#paimon基础依赖包:
paimon-flink-1.16-0.4-20230424.001927-40.jar
flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
#flinkcdc依赖包下载地址:
https://github.com/ververica/flink-cdc-connectors/releases/tag/release-2.2.0
pod template:
apiVersion: v1
kind: Pod
metadata:
name: pod-template
spec:
containers:
- name: flink-main-container
volumeMounts:
- name: flink-checkpoints-csi-pvc
mountPath: /opt/flink/checkpoints
- name: flink-savepoints-csi-pvc
mountPath: /opt/flink/savepoints
volumes:
- name: flink-checkpoints-csi-pvc
persistentVolumeClaim:
claimName: flink-checkpoints-csi-pvc
- name: flink-savepoints-csi-pvc
persistentVolumeClaim:
claimName: flink-savepoints-csi-pvc
imagePullSecrets:
- name: streamparksecret
Flink sql:
-- postgre数据库 示例
CREATE TEMPORARY TABLE `shy_doc_hdworkdochd` (
`doccode` varchar(50) not null COMMENT '主键',
`businessmodel` varchar(450) COMMENT '业务模式',
`businesstype` varchar(450) COMMENT '业务性质',
`transporttype` varchar(50) COMMENT '运输类型',
......
`bookingguid` varchar(50) COMMENT '操作编号',
PRIMARY KEY (`doccode`) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc',
'hostname' = '数据库服务器IP地址',
'port' = '端口号',
'username' = '用户名',
'password' = '密码',
'database-name' = '数据库名',
'schema-name' = 'dev',
'decoding.plugin.name' = 'wal2json',,
'table-name' = 'doc_hdworkdochd',
'debezium.slot.name' = 'hdworkdochdslotname03'
);
CREATE TEMPORARY TABLE `shy_base_enterprise` (
`entguid` varchar(50) not null COMMENT '主键',
`entorgcode` varchar(450) COMMENT '客户编号',
`entnature` varchar(450) COMMENT '客户类型',
`entfullname` varchar(50) COMMENT '客户名称',
PRIMARY KEY (`entguid`,`entorgcode`) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc',
'hostname' = '数据库服务器IP地址',
'port' = '端口号',
'username' = '用户名',
'password' = '密码',
'database-name' = '数据库名',
'schema-name' = 'dev',
'decoding.plugin.name' = 'wal2json',
'table-name' = 'base_enterprise',
'debezium.snapshot.mode'='never', -- 增量同步(全量+增量忽略该属性)
'debezium.slot.name' = 'base_enterprise_slotname03'
);
-- 根据源表结构在paimon上ods层创建对应的目标表
CREATE TABLE IF NOT EXISTS ods.`ods_shy_jh_doc_hdworkdochd` (
`o_year` BIGINT NOT NULL COMMENT '分区字段',
`create_date` timestamp NOT NULL COMMENT '创建时间',
PRIMARY KEY (`o_year`, `doccode`) NOT ENFORCED
) PARTITIONED BY (`o_year`)
WITH (
'changelog-producer.compaction-interval' = '2m'
) LIKE `shy_doc_hdworkdochd` (EXCLUDING CONSTRAINTS EXCLUDING OPTIONS);
CREATE TABLE IF NOT EXISTS ods.`ods_shy_base_enterprise` (
`create_date` timestamp NOT NULL COMMENT '创建时间',
PRIMARY KEY (`entguid`,`entorgcode`) NOT ENFORCED
)
WITH (
'changelog-producer.compaction-interval' = '2m'
) LIKE `shy_base_enterprise` (EXCLUDING CONSTRAINTS EXCLUDING OPTIONS);
-- 设置作业名,执行作业任务将源表数据实时写入到paimon对应表中
SET 'pipeline.name' = 'ods_doc_hdworkdochd';
INSERT INTO
ods.`ods_shy_jh_doc_hdworkdochd`
SELECT
*
,YEAR(`docdate`) AS `o_year`
,TO_TIMESTAMP(CONVERT_TZ(cast(CURRENT_TIMESTAMP as varchar), 'UTC', 'Asia/Shanghai')) AS `create_date`
FROM
`shy_doc_hdworkdochd` where `docdate` is not null and `docdate` > '2023-01-01';
SET 'pipeline.name' = 'ods_shy_base_enterprise';
INSERT INTO
ods.`ods_shy_base_enterprise`
SELECT
*
,TO_TIMESTAMP(CONVERT_TZ(cast(CURRENT_TIMESTAMP as varchar), 'UTC', 'Asia/Shanghai')) AS `create_date`
FROM
`shy_base_enterprise` where entorgcode is not null and entorgcode <> '';
-- mysql数据库 示例
CREATE TEMPORARY TABLE `doc_order` (
`id` BIGINT NOT NULL COMMENT '主键',
`order_no` varchar(50) NOT NULL COMMENT '订单号',
`business_no` varchar(50) COMMENT 'OMS服务号',
......
`is_deleted` int COMMENT '是否作废',
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '数据库服务器地址',
'port' = '端口号',
'username' = '用户名',
'password' = '密码',
'database-name' = '库名',
'table-name' = 'doc_order'
);
-- 根据源表结构在paimon上ods层创建对应的目标表
CREATE TABLE IF NOT EXISTS ods.`ods_bondexsea_doc_order` (
`o_year` BIGINT NOT NULL COMMENT '分区字段',
`create_date` timestamp NOT NULL COMMENT '创建时间',
PRIMARY KEY (`o_year`, `id`) NOT ENFORCED
) PARTITIONED BY (`o_year`)
WITH (
'changelog-producer.compaction-interval' = '2m'
) LIKE `doc_order` (EXCLUDING CONSTRAINTS EXCLUDING OPTIONS);
-- 设置作业名,执行作业任务将源表数据实时写入到paimon对应表中
SET 'pipeline.name' = 'ods_bondexsea_doc_order';
INSERT INTO
ods.`ods_bondexsea_doc_order`
SELECT
*
,YEAR(`gmt_create`) AS `o_year`
,TO_TIMESTAMP(CONVERT_TZ(cast(CURRENT_TIMESTAMP as varchar), 'UTC', 'Asia/Shanghai')) AS `create_date`
FROM `doc_order` where gmt_create > '2023-01-01';
-- sqlserver数据库 示例
CREATE TEMPORARY TABLE `OrderHAWB` (
`HBLIndex` varchar(50) NOT NULL COMMENT '主键',
`CustomerNo` varchar(50) COMMENT '客户编号',
......
`CreateOPDate` timestamp COMMENT '制单日期',
PRIMARY KEY (`HBLIndex`) NOT ENFORCED
) WITH (
'connector' = 'sqlserver-cdc',
'hostname' = '数据库服务器地址',
'port' = '端口号',
'username' = '用户名',
'password' = '密码',
'database-name' = '数据库名',
'schema-name' = 'dbo',
-- 'debezium.snapshot.mode' = 'initial' -- 全量增量都抽取
'scan.startup.mode' = 'latest-offset',-- 只抽取增量数据
'table-name' = 'OrderHAWB'
);
-- 根据源表结构在paimon上ods层创建对应的目标表
CREATE TABLE IF NOT EXISTS ods.`ods_airsea_airfreight_orderhawb` (
`o_year` BIGINT NOT NULL COMMENT '分区字段',
`create_date` timestamp NOT NULL COMMENT '创建时间',
PRIMARY KEY (`o_year`, `HBLIndex`) NOT ENFORCED
) PARTITIONED BY (`o_year`)
WITH (
'changelog-producer.compaction-interval' = '2m'
) LIKE `OrderHAWB` (EXCLUDING CONSTRAINTS EXCLUDING OPTIONS);
-- 设置作业名,执行作业任务将源表数据实时写入到paimon对应表中
SET 'pipeline.name' = 'ods_airsea_airfreight_orderhawb';
INSERT INTO
ods.`ods_airsea_airfreight_orderhawb`
SELECT
RTRIM(`HBLIndex`) as `HBLIndex`
......
,`CreateOPDate`
,YEAR(`CreateOPDate`) AS `o_year`
,TO_TIMESTAMP(CONVERT_TZ(cast(CURRENT_TIMESTAMP as varchar), 'UTC', 'Asia/Shanghai')) AS `create_date`
FROM `OrderHAWB` where CreateOPDate > '2023-01-01';
业务表数据实时写入 Paimon ods 表效果如下:
dim 维表我们这边直接拿的 doris 里面处理好的维表来使用,维表更新频率低,所以没有在 Paimon 里面进行二次开发。
-- 在paimon-dwd层创建宽表
CREATE TABLE IF NOT EXISTS dwd.`dwd_business_order` (
`reference_no` varchar(50) NOT NULL COMMENT '委托单号主键',
`bondex_shy_flag` varchar(8) NOT NULL COMMENT '区分',
`is_server_item` int NOT NULL COMMENT '是否已经关联订单',
`order_type_name` varchar(50) NOT NULL COMMENT '业务分类',
`consignor_date` DATE COMMENT '统计日期',
`consignor_code` varchar(50) COMMENT '客户编号',
`consignor_name` varchar(160) COMMENT '客户名称',
`sales_code` varchar(32) NOT NULL COMMENT '销售编号',
`sales_name` varchar(200) NOT NULL COMMENT '销售名称',
`delivery_center_op_id` varchar(32) NOT NULL COMMENT '交付编号',
`delivery_center_op_name` varchar(200) NOT NULL COMMENT '交付名称',
`pol_code` varchar(100) NOT NULL COMMENT '起运港代码',
`pot_code` varchar(100) NOT NULL COMMENT '中转港代码',
`port_of_dest_code` varchar(100) NOT NULL COMMENT '目的港代码',
`is_delete` int not NULL COMMENT '是否作废',
`order_status` varchar(8) NOT NULL COMMENT '订单状态',
`count_order` BIGINT not NULL COMMENT '订单数',
`o_year` BIGINT NOT NULL COMMENT '分区字段',
`create_date` timestamp NOT NULL COMMENT '创建时间',
PRIMARY KEY (`o_year`,`reference_no`,`bondex_shy_flag`) NOT ENFORCED
) PARTITIONED BY (`o_year`)
WITH (
-- 每个 partition 下设置 2 个 bucket
'bucket' = '2',
'changelog-producer' = 'full-compaction',
'snapshot.time-retained' = '2h',
'changelog-producer.compaction-interval' = '2m'
);
-- 设置作业名,将ods层的相关业务表合并写入到dwd层
SET 'pipeline.name' = 'dwd_business_order';
INSERT INTO
dwd.`dwd_business_order`
SELECT
o.doccode,
......,
YEAR (o.docdate) AS o_year
,TO_TIMESTAMP(CONVERT_TZ(cast(CURRENT_TIMESTAMP as varchar), 'UTC', 'Asia/Shanghai')) AS `create_date`
FROM
ods.ods_shy_jh_doc_hdworkdochd o
INNER JOIN ods.ods_shy_base_enterprise en ON o.businessguid = en.entguid
LEFT JOIN dim.dim_hhl_user_code sales ON o.salesguid = sales.USER_GUID
LEFT JOIN dim.dim_hhl_user_code op ON o.bookingguid = op.USER_GUID
UNION ALL
SELECT
business_no,
......,
YEAR ( gmt_create ) AS o_year
,TO_TIMESTAMP(CONVERT_TZ(cast(CURRENT_TIMESTAMP as varchar), 'UTC', 'Asia/Shanghai')) AS `create_date`
FROM
ods.ods_bondexsea_doc_order
UNION ALL
SELECT
HBLIndex,
......,
YEAR ( CreateOPDate ) AS o_year
,TO_TIMESTAMP(CONVERT_TZ(cast(CURRENT_TIMESTAMP as varchar), 'UTC', 'Asia/Shanghai')) AS `create_date`
FROM
ods.`ods_airsea_airfreight_orderhawb`
;
3. 将dwd层数据轻度聚合到dwm层中,将相关数据写入dwm.`dwm_business_order_count` 表中,该表数据会根据主键对聚合字段做 sum,sum_orderCount 字段就是聚合结果,物理删除的数据 sum 时 paimon 会自动处理。
-- 创建dwm层轻度汇总表,根据日期、销售、操作、业务类别、客户、起运港、目的港汇总单量
CREATE TABLE IF NOT EXISTS dwm.`dwm_business_order_count` (
`l_year` BIGINT NOT NULL COMMENT '统计年',
`l_month` BIGINT NOT NULL COMMENT '统计月',
`l_date` DATE NOT NULL COMMENT '统计日期',
`bondex_shy_flag` varchar(8) NOT NULL COMMENT '区分',
`order_type_name` varchar(50) NOT NULL COMMENT '业务分类',
`is_server_item` int NOT NULL COMMENT '是否已经关联订单',
`customer_code` varchar(50) NOT NULL COMMENT '客户编号',
`sales_code` varchar(50) NOT NULL COMMENT '销售编号',
`delivery_center_op_id` varchar(50) NOT NULL COMMENT '交付编号',
`pol_code` varchar(100) NOT NULL COMMENT '起运港代码',
`pot_code` varchar(100) NOT NULL COMMENT '中转港代码',
`port_of_dest_code` varchar(100) NOT NULL COMMENT '目的港代码',
`customer_name` varchar(200) NOT NULL COMMENT '客户名称',
`sales_name` varchar(200) NOT NULL COMMENT '销售名称',
`delivery_center_op_name` varchar(200) NOT NULL COMMENT '交付名称',
`sum_orderCount` BIGINT NOT NULL COMMENT '订单数',
`create_date` timestamp NOT NULL COMMENT '创建时间',
PRIMARY KEY (`l_year`, `l_month`,`l_date`,`order_type_name`,`bondex_shy_flag`,`is_server_item`,`customer_code`,`sales_code`,`delivery_center_op_id`,`pol_code`,`pot_code`,`port_of_dest_code`) NOT ENFORCED
) WITH (
'changelog-producer' = 'full-compaction',
'changelog-producer.compaction-interval' = '2m',
'merge-engine' = 'aggregation', -- 使用 aggregation 聚合计算 sum
'fields.sum_orderCount.aggregate-function' = 'sum',
'fields.create_date.ignore-retract'='true',
'fields.sales_name.ignore-retract'='true',
'fields.customer_name.ignore-retract'='true',
'snapshot.time-retained' = '2h',
'fields.delivery_center_op_name.ignore-retract'='true'
);
-- 设置作业名
SET 'pipeline.name' = 'dwm_business_order_count';
INSERT INTO
dwm.`dwm_business_order_count`
SELECT
YEAR(o.`consignor_date`) AS `l_year`
,MONTH(o.`consignor_date`) AS `l_month`
......,
,TO_TIMESTAMP(CONVERT_TZ(cast(CURRENT_TIMESTAMP as varchar), 'UTC', 'Asia/Shanghai')) AS create_date
FROM
dwd.`dwd_business_order` o
;
-- 创建根据操作人、业务类型聚合当天的单量
CREATE TABLE IF NOT EXISTS dws.`dws_business_order_count_op` (
`l_year` BIGINT NOT NULL COMMENT '统计年',
`l_month` BIGINT NOT NULL COMMENT '统计月',
`l_date` DATE NOT NULL COMMENT '统计日期',
`order_type_name` varchar(50) NOT NULL COMMENT '业务分类',
`delivery_center_op_id` varchar(50) NOT NULL COMMENT '交付编号',
`delivery_center_op_name` varchar(200) NOT NULL COMMENT '交付名称',
`sum_orderCount` BIGINT NOT NULL COMMENT '订单数',
`create_date` timestamp NOT NULL COMMENT '创建时间',
PRIMARY KEY (`l_year`, `l_month`,`l_date`,`order_type_name`,`delivery_center_op_id`) NOT ENFORCED
) WITH (
'merge-engine' = 'aggregation', -- 使用 aggregation 聚合计算 sum
'fields.sum_orderCount.aggregate-function' = 'sum',
'fields.create_date.ignore-retract'='true',
'snapshot.time-retained' = '2h',
'fields.delivery_center_op_name.ignore-retract'='true'
);
-- 设置作业名
SET 'pipeline.name' = 'dws_business_order_count_op';
INSERT INTO
dws.`dws_business_order_count_op`
SELECT
o.`l_year`
,o.`l_month`
,o.`l_date`
,o.`order_type_name`
,o.`delivery_center_op_id`
,o.`delivery_center_op_name`
,o.`sum_orderCount`
,TO_TIMESTAMP(CONVERT_TZ(cast(CURRENT_TIMESTAMP as varchar), 'UTC', 'Asia/Shanghai')) AS create_date
FROM
dwm.`dwm_business_order_count` o
;
总体数据流转示例
paimon-ods:
paimon-dwd:
paimon-dwm:
paimon-dws:
04
1. 聚合数据计算不准
ADS 层聚合表采用 agg sum 会出现 dwd 数据流不产生 update_before,产生错误数据流 update_after 比如上游源表 update 10 到 30 dwd 层数据会变更为 30,ads 聚合层数据也会变更为 30,但是现在变为了 append 数据变成了 10+30=40 的错误数据。
解决办法:
By specifying 'changelog-producer' = 'full-compaction', Table Store will compare the results between full compactions and produce the differences as changelog. The latency of changelog is affected by the frequency of full compactions.
By specifying changelog-producer.compaction-interval table property (default value 30min), users can define the maximum interval between two full compactions to ensure latency. This table property does not affect normal compactions and they may still be performed once in a while by writers to reduce reader costs.
这样能解决上述问题。但是随之而来出现了新的问题。默认 changelog-producer.compaction-interval 是 30min,意味着 上游的改动到 ads 查询要间隔 30min,生产过程中发现将压缩间隔时间改成 1min 或者 2 分钟的情况下,又会出现上述 ADS 层聚合数据不准的情况。
需要在写入 Flink Table Store 时需要配置 table.exec.sink.upsert-materialize= none,避免产生 Upsert 流,以保证 Flink Table Store 中能够保存完整的 changelog,为后续的流读操作做准备。
2. 相同 sequence.field 导致 dwd 明细宽表无法收到 update 数据更新
mysql cdc 采集数据到 paimon 表
说明:
在 MySQL 源端执行 update 数据修改成功后,dwd_orders 表数据能同步成功
解决:
3. Aggregate function 'last_non_null_value' does not support retraction
报错:
Caused by: java.lang.UnsupportedOperationException: Aggregate function 'last_non_null_value' does not support retraction, If you allow this function to ignore retraction messages, you can configure 'fields.${field_name}.ignore-retract'='true'.
可以在官方文档找到解释:
Only sum supports retraction (UPDATE_BEFORE and DELETE), others aggregate functions do not support retraction.
可以理解为:除了 SUM 函数,其他的 Agg 函数都不支持 Retraction,为了避免接收到 DELETE 和 UPDATEBEFORE 消息报错,需要通过给指定字段配 'fields.${field_name}.ignore-retract'='true' 忽略,解决这个报错
WITH (
'merge-engine' = 'aggregation', -- 使用 aggregation 聚合计算 sum
'fields.sum_orderCount.aggregate-function' = 'sum',
'fields.create_date.ignore-retract'='true' #create_date 字段
);
4. paimon任务中断失败
任务异常中断 导致pod挂掉,查看loki日志显示akka.pattern.AskTimeoutException: Ask timed out on
java.util.concurrent.TimeoutException: Invocation of [RemoteRpcInvocation(JobMasterGateway.updateTaskExecutionState(TaskExecutionState))] at recipient [akka.tcp://flink@fts-business-order-count.streamx:6123/user/rpc/jobmanager_2] timed out. This is usually caused by: 1) Akka failed sending the message silently, due to problems like oversized payload or serialization failures. In that case, you should find detailed error information in the logs. 2) The recipient needs more time for responding, due to problems like slow machines or network jitters. In that case, you can try to increase akka.ask.timeout.\n"
初步判断应该是由于以上2个原因导致触发了 akka 的超时机制,那就调整集群的akka超时间配置和进行单个任务拆分或者调大资源配置。
我们这边先看如何进行参数修改:
key | default | describe |
akka.ask.timeout | 10s | Timeout used for all futures and blocking Akka calls. If Flink fails due to timeouts then you should try to increase this value. Timeouts can be caused by slow machines or a congested network. The timeout value requires a time-unit specifier (ms/s/min/h/d). |
web.timeout
| 600000
| Timeout for asynchronous operations by the web monitor in milliseconds. |
在 conf/flink-conf.yaml 最后增加下面参数:
akka.ask.timeout: 100s
web.timeout:1000000
然后在 streampark 手动刷新下 flink-conf.yaml 验证参数是否同步成功。
5. snapshot no such file or director
定位cp失败原因为:计算量大,CPU密集性,导致TM内线程一直在processElement,而没有时间做CP
无法读取 Snapshot 原因为:flink 集群资源不够,Writer 和 Committer 产生竞争,Full-Compaction 时读到了已过期部分的不完整的 Snapshot,目前官方针对这个问题已经修复:
https://github.com/apache/incubator-paimon/pull/1308
而解决 cp 失败的解决办法增加并行度,增加 deploymenttaskmanager slot 和 jobmanager cpu
-D kubernetes.jobmanager.cpu=0.8
-D kubernetes.jobmanager.cpu.limit-factor=1
-D taskmanager.numberOfTaskSlots=8
-D jobmanager.adaptive-batch-scheduler.default-source-parallelism=2
在复杂的实时任务中,可以通过修改动态参数的方式,增加资源。
05
自建的数据平台 bondata 正在集成 paimon 的元数据信息、数据指标体系、血缘、一键 pipline 等功能,形成海程邦达的数据资产,并将在此基础上展开一站式数据治理 后面将基于trino Catalog接入Doris,实现真正的离线数据和实时数据的one service 采用 doris + paimon 的架构方案继续推进集团内部流批一体数仓建设的步伐
在这里要感谢之信老师和 StreamPark 社区在使用 StreamPark + Paimon 过程中的大力支持,在学习使用过程中遇到的问题,都能在第一时间给到解惑并得到解决,我们后面也会积极参与社区的交流和建设,让 paimon 能为更多开发者和企业提供流批一体的数据湖解决方案。
作者简介 PROFILE
王新
数据架构师 专注于大数据流批一体架构在企业的落地实现。
周文轩
数据开发工程师 对数据分析和流计算的实时开发充满热情。
参考资料 [1] https://github.com/ververica/flink-cdc-connectors [2] https://github.com/apache/doris [3] https://github.com/apache/storm [4] https://github.com/apache/Iceberg [5] https://github.com/apache/incubator-streampark [6] https://github.com/apache/incubator-paimon [7] https://github.com/apache/flink [8] https://repository.apache.org/content/groups/snapshots/org/apache/paimon
StreamPark 2.0.0 重磅发布,首个 Apache 版本终于来了!|发版通告
StreamX 在联通数科万亿级实时计算中的生产实践
StreamX 最佳实践 - Flink on Kubernetes 篇