导读:自如作为一家专注于提供租房产品和服务的 O2O 互联网公司,构建了一个涵盖城市居住生活领域全链条的在线化、数据化、智能化平台,实时计算在自如一直扮演着重要的角色。到目前为止,自如每日需要处理 TB 级别的数据,本文由来自自如的实时计算小伙伴带来,介绍了自如基于 StreamPark 的实时计算平台深度实践。
实时计算遇到的挑战 需求解决方案之路 基于 StreamPark 的深度实践 实践经验总结和示例 带来的收益 未来规划
自如通过打造涵盖 To C 和 To B 的品质居住产品、逐步实现城市居住生活领域全链条的线上化、数据化、智能化的平台能力。自如 APP 装机量累计达 1.4 亿次,日均线上服务调用达 4 亿次,拥有智能化房源万余间。自如现已在 PC、APP、微信全渠道实现租房、服务、社区的 O2O 闭环,省去传统租房模式所有中间冗余环节,通过 O2O 模式重构居住市场格局,并建立了中国最大的 O2O 青年居住社区。
实时计算遇到的挑战
在自如,实时计算大概分为 2 个应用场景:
数据同步:包括 Kafka、Mysql、Mongo 数据同步到 Hive / Paimon / ClickHouse 等。
实时数仓:包括出租、收房、家服等业务实时指标。
在实时计算实践过程中遇到了一些挑战,大致如下:
作业上线效率低
作业归属信息不明确
由于实时计算平台没有对作业进行统一管理,业务代码都是由 GitLab 管理,虽然解决了部分问题,但是我们发现仓库代码与线上部署的 Flink 作业管理之间仍然存在缺陷:缺乏明确归属、缺少分组和有效权限控制,导致作业管理混乱且责任链路难以追溯。为确保代码和上线作业的一致和可控性,亟需建立严格且清晰的作业管理体系,其中包括实行严格的代码版本控制、明确作业归属和负责人、以及建立有效的权限控制。
作业维护困难
在自如有多个不同版本的 Flink 作业在运行,由于 Apache Flink 的 API 在大版本升级中经常会发生变动,且不保证向下兼容性,这直接导致流作业项目代码的升级成本变得很高。因此如何管理这些不同版本的作业成了头痛的问题。
由于没有统一的作业平台,这些作业在提交时,只能通过执行脚本的形式进行提交。不同的作业有不同重要程度和数据量级,作业所需资源和运行参数也都各不相同,都需要相应的修改。我们可以通过修改提交脚本或直接在代码中设置参数来进行修改,但这使得配置信息的获取变得困难,尤其是当作业出现重启或失败时,FlinkUI 无法打开,配置信息变成一个黑盒。因此,亟需建立一个更加高效、支持配置实时计算平台。
作业开发调试困难
因此,急需提高开发、调试的效率。
寻求解决方案之路
在平台构建的初期阶段,2022 年初开始我们就全面调查了行业内的几乎所有相关项目,涵盖了商业付费版和开源版。经过调查和对比发现,这些项目都或多或少地存在一定的局限性,可用性和稳定性也无法有效地保障。
编者按
目前在最新的 2.2 版本中社区已经重构了这部分实现[1]。
2. 在开源组件的选择上,我们经过各项指标综合对比评估,最终选择了当时的 StreamX。后续和社区保持密切的沟通,在此过程中深刻感受到创始人认真负责的态度和社区的团结友善的氛围,也见证了项目 2022 年 09 月加入 Apache 孵化器的过程,这让我们对该项目的未来充满希望。
基于 StreamPark 的深度实践
LDAP 登录支持
在 StreamPark 的基础上,我们进一步完善了相关的功能,其中包括对 LDAP 的支持,以便我们未来可以完全开放实时能力,让公司的四个业务线所属的分析师能够使用该平台,预计届时人数将达到 170 人左右。随着人数的增加,账号的管理变得越发重要,特别是在人员变动时,账号的注销和申请将成为一项频繁且耗时的操作。所以,接入 LDAP 变得尤为重要。因此我们及时和社区沟通,并且发起讨论,最终我们贡献了该 Feature[2]。现在在 StreamPark 开启 LDAP 已经变得非常简单,只需要简单两步即可:
step1: 填写对应的 LDAP 配置:
编辑 application.yml 文件,设置 LDAP 基础信息,如下:
ldap:
# Is ldap enabled?
enable: false
## AD server IP, default port 389
urls: ldap://99.99.99.99:389
## Login Account
base-dn: dc=streampark,dc=com
username: cn=Manager,dc=streampark,dc=com
password: streampark
user:
identity-attribute: uid
email-attribute: mail
登录界面点击 LDAP 方式登录,然后输入对应的账号密码,点击登录即可
提交作业自动生成 Ingress
由于公司的网络安全政策,运维人员在 Kubernetes 的宿主机上仅开放了 80 端口,这导致我们无法直接通过 “域名+随机端口” 的方式访问在 Kubernetes 上的作业 WebUI。为了解决这个问题,我们需要使用Ingress在访问路径上增加一层代理,从而启到访问路由的效果。在 StreamPark 2.0 版本我们贡献了 Ingress 相关的功能[3]。采用了策略模式的实现方式,在初始构建阶段,获取 Kubernetes 的元数据信息来识别其版本,针对不同版本来进行相应的对象构建,确保了在各种 Kubernetes 环境中都能够顺利地使用 Ingress 功能。
支持查看作业部署日志
在持续部署作业的过程中,我们逐渐意识到,没有日志就无法进行有效的运维操作,日志的留存归档和查看成为了我们在后期排查问题时非常重要的一环。因此在 StreamPark 2.0 版本我们贡献了 On Kubernetes 模式下启动日志存档、页面查看的能力[4],现在点击作业列表里的日志查看按钮,可以很方便的查看作业的实时日志。
集成 Grafana 监控图表链接
在实际的使用过程中,我们发现随着作业数量的增加,使用人数的上升,以及涉及的部门的增多,面临故障自排查困难的问题。我们团队的运维能力实际上是非常有限的。由于专业领域的不同,当我们告诉用户去 Grafana、ELK 上查看图表和日志时,用户通常会感到无从下手,不知道如何去找到与自己作业相关的信息。
step1: 创建徽章标签
step2: 将徽章标签和跳转链接进行关联
集成 Flink sql security 权限控制
在我们的系统中,血缘关系管理采用 Apache Atlas,权限管理基于开源项目 Flink-sql-security[6],这是一个FlinkSQL数据脱敏和行级权限解决方案的开源项目,支持面向用户级别的数据脱敏和行级数据访问控制,即特定用户只能访问到脱敏后的数据或授权过的行。
这种设计是为了处理一些复杂的继承逻辑。例如,当将加密字段 age 的A 表与 B 表进行 join 操作,得到 C 表时,C 表中的 age 字段应继承 A 表的加密逻辑,以确保数据的加密状态不会因数据处理过程中的转换而失效。这样,我们可以更好地保护数据的安全性,确保数据在整个处理过程中都符合安全标准。
行级权限条件
SELECT * FROM orders;执行SQL
SELECT * FROM orders WHERE region = 'beijing';
用户 B 的真实执行 SQL:
SELECT * FROM orders WHERE region = 'hangzhou';
字段脱敏条件
SELECT name, age, price, phone FROM user;
用户 A 的真实执行 SQL:
SELECT Encryption_function(name), age, price, Sensitive_field_functions(phone) FROM user;
SELECT name, Encryption_function(age), price, Sensitive_field_functions(phone) FROM user;
基于 StreamPark 的数据同步平台
curl -X POST '/flink/app/start' \
-H 'Authorization: $token' \
-H 'Content-Type: application/x-www-form-urlencoded; charset=UTF-8' \
--data-urlencode 'savePoint=' \
--data-urlencode 'allowNonRestored=false' \
--data-urlencode 'savePointed=false' \
--data-urlencode 'id=100501'
实践经验总结
在深度使用 StreamPark 实践过程中,我们总结了一些常见问题和实践过程中所探索出解决方案,我们把这些汇总成示例,仅供大家参考。
构建 Base 镜像
要使用 StreamPark 在 Kubernetes 上部署一个 Flink 作业,首先要准备一个基于 Flink 构建的 Base 镜像。然后,在 Kubernetes 平台上,会使用用户所提供的镜像来启动 Flink 作业。如果是沿用官方所提供的 “裸镜像”,在实际开发中是远远不够的,用户开发的业务逻辑往往会涉及到上下游多个数据源,这就需要相关数据源的 Connector,以及 Hadoop 等关联依赖。因此需要将这部分依赖项打入镜像中,下面我将介绍具体操作步骤。
FROM apache/flink:1.14.5-scala_2.11-java8
ENV TIME_ZONE=Asia/Shanghai
COPY ./conf /opt/hadoop/conf
COPY lib $FLINK_HOME/lib/
step2: 镜像构建命令使用多架构构建模式,如下:
docker buildx build --push --platform linux/amd64 -t ${私有镜像仓库地址}
Base 镜像集成 Arthas 示例
FROM apache/flink:1.14.5-scala_2.11-java8
ENV TIME_ZONE=Asia/Shanghai
COPY ./conf /opt/hadoop/conf
COPY lib $FLINK_HOME/lib/
RUN apt-get update --fix-missing && apt-get install -y fontconfig --fix-missing && \
apt-get install -y openjdk-8-jdk && \
apt-get install -y ant && \
apt-get clean;
RUN apt-get install sudo -y
# Fix certificate issues
RUN apt-get update && \
apt-get install ca-certificates-java && \
apt-get clean && \
update-ca-certificates -f;
# Setup JAVA_HOME -- useful for docker commandline
ENV JAVA_HOME /usr/lib/jvm/java-8-openjdk-amd64/
RUN export JAVA_HOME
RUN apt-get install -y unzip
RUN curl -Lo arthas-packaging-latest-bin.zip 'https://arthas.aliyun.com/download/latest_version?mirror=aliyun'
RUN unzip -d arthas-latest-bin arthas-packaging-latest-bin.zip
镜像中依赖冲突的解决方式
step1: Clone flink-shaded 项目到本地👇
git clone https://github.com/apache/flink-shaded.git
step2: 项目加载到 IDEA 中
集中作业配置示例
cd /flink-1.14.5/conf
vim flink-conf.yaml
StreamPark 配置 DNS 解析
在使用 StreamPark 平台提交 FlinkSQL 的过程中,一个正确合理的 DNS 解析配置非常重要。主要涉及到以下几点:
apiVersion: v1
kind: Pod
metadata:
name: pod-template
spec:
hostAliases:
- ip: 10.216.xxx.79
hostnames:
- handoop1
- hostnames:
- handoop2
ip: 10.16.xx.48
- hostnames:
- handoop3
ip: 10.16.xx.49
- hostnames:
- handoop4
ip: 10.16.xx.50
.......
apiVersion: v1
data:
resolv.conf: "nameserver 10.216.138.226" //DNS服务
kind: ConfigMap
metadata:
creationTimestamp: "2022-07-13T10:16:18Z"
managedFields:
name: dns-configmap
namespace: native-flink
在通过 Pod Template 进行挂载。
多实例部署实践
在实际生产环境中,我们常常需要操作多个集群,包括一套用于测试的集群和一套线上正式集群。任务首先在测试集群中进行结果验证和性能压测,确保无误后再发布到线上正式集群。
step1: 修改端口号,避免多个服务端口冲突
export HADOOP_CONF_DIR=/home/streamx/conf
// 寻找hadoop配置文件的流程
//1、先去寻找是否添加了参数:kubernetes.hadoop.conf.config-map.name
@Override
public Optional<String> getExistingHadoopConfigurationConfigMap() {
final String existingHadoopConfigMap =
flinkConfig.getString(KubernetesConfigOptions.HADOOP_CONF_CONFIG_MAP);
if (StringUtils.isBlank(existingHadoopConfigMap)) {
return Optional.empty();
} else {
return Optional.of(existingHadoopConfigMap.trim());
}
}
@Override
public Optional<String> getLocalHadoopConfigurationDirectory() {
// 2、如果没有 1 中指定的参数,查找提交 native 命令的本地环境是否有环境变量:HADOOP_CONF_DIR
final String hadoopConfDirEnv = System.getenv(Constants.ENV_HADOOP_CONF_DIR);
if (StringUtils.isNotBlank(hadoopConfDirEnv)) {
return Optional.of(hadoopConfDirEnv);
}
// 3、如果没有 2 中环境变量,再继续看否有环境变量:HADOOP_HOME
final String hadoopHomeEnv = System.getenv(Constants.ENV_HADOOP_HOME);
if (StringUtils.isNotBlank(hadoopHomeEnv)) {
// Hadoop 2.2+
final File hadoop2ConfDir = new File(hadoopHomeEnv, "/etc/hadoop");
if (hadoop2ConfDir.exists()) {
return Optional.of(hadoop2ConfDir.getAbsolutePath());
}
// Hadoop 1.x
final File hadoop1ConfDir = new File(hadoopHomeEnv, "/conf");
if (hadoop1ConfDir.exists()) {
return Optional.of(hadoop1ConfDir.getAbsolutePath());
}
}
return Optional.empty();
}
final List<File> hadoopConfigurationFileItems = getHadoopConfigurationFileItems(localHadoopConfigurationDirectory.get());
// 如果没有找到 1、2、3 说明没有 hadoop 环境
if (hadoopConfigurationFileItems.isEmpty()) {
LOG.warn(
"Found 0 files in directory {}, skip to mount the Hadoop Configuration ConfigMap.",
localHadoopConfigurationDirectory.get());
return flinkPod;
}
// 如果 2 或者 3 存在,会在路径下查找 core-site.xml 和 hdfs-site.xml 文件
private List<File> getHadoopConfigurationFileItems(String localHadoopConfigurationDirectory) {
final List<String> expectedFileNames = new ArrayList<>();
expectedFileNames.add("core-site.xml");
expectedFileNames.add("hdfs-site.xml");
final File directory = new File(localHadoopConfigurationDirectory);
if (directory.exists() && directory.isDirectory()) {
return Arrays.stream(directory.listFiles())
.filter(
file ->
file.isFile()
&& expectedFileNames.stream()
.anyMatch(name -> file.getName().equals(name)))
.collect(Collectors.toList());
} else {
return Collections.emptyList();
}
}
//如果有 hadoop 的环境,将会把上述两个文件解析为 kv 对,然后构建成一个 ConfigMap,名字命名规则如下
public static String getHadoopConfConfigMapName(String clusterId) {
return Constants.HADOOP_CONF_CONFIG_MAP_PREFIX + clusterId;
}
netstat -tlnp | grep 10000
netstat -tlnp | grep 10002
带来的收益
未 来 期 待
自如作为 StreamPark 早期的用户之一,我们一直和社区同学保持密切交流,参与 StreamPark 的稳定性打磨,我们将生产运维中遇到的 Bug 和新的 Feature 提交给了社区。在未来,我们希望可以在 StreamPark 上管理 Apache Paimon [8] 湖表的元数据信息和 Paimon 的 Action 辅助作业的能力,基于 Flink 引擎通过对接湖表的 Catalog 和 Action 作业,来实现湖表作业的管理、优化于一体的能力。目前 StreamPark 正在对接 Paimon 数据集成的能力,这一块在未来对于实时一键入湖会提供很大的帮助。
在此也非常感谢 StreamPark 团队一直以来对我们的技术支持,祝 Apache StreamPark 越来越好,越来越多用户去使用,早日毕业成为顶级 Apache 项目。
作者简介 PROFILE
刘涛
自如大数据平台研发经理
专注于大数据离线、实时、机器学习平台的建设
梁研生
大数据平台研发工程师
Apache StreamPark Contributor
专注于实时计算平台、大数据权限、血缘
陈卓宇
大数据平台开发工程师
Apache StreamPark PPMC 成员
专注于数据湖&流计算领域
加入我们
StreamPark 是一个流处理应用程序开发管理框架。旨在轻松构建和管理流处理应用程序,提供使用 Apache Flink 和 Apache Spark 编写流处理应用程序的开发框架。同时 StreamPark 提供了一个流处理应用管理平台,核心能力包括但不限于应用开发、调试、交互查询、部署、运维、实时数仓等,最初开源时项目名称叫 StreamX ,于 2022 年 8 月更名为 StreamPark,随后通过投票正式成为 Apache 开源软件基金会的孵化项目。目前已有腾讯、百度、联通、天翼云、自如、马蜂窝、同程数科、长安汽车等众多公司在生产环境使用[9]。
StreamPark 社区一直以来都以用心做好一个项目为原则,高度关注项目质量,努力建设发展社区。加入 Apache 孵化器以来,认真学习和遵循「The Apache Way」,我们将秉承更加兼容并包的心态,迎接更多的机遇与挑战。诚挚欢迎更多的贡献者参与到社区建设中来,和我们一道携手共建。
💁♀️社区沟通: