导 读 本文主要介绍马蜂窝原有实时平台不支持 Flink On Kubernetes,通过引入 Apache StreamPark 来解决原有系统的弊端,分享了在引入 StreamPark 落地过程中遇到的一些问题、介绍了为高效迁移原有作业到 StreamPark 而开发了一个迁移工具来批量迁,最后比原计划提前一个月完成。使用StreamPark 后极大地降低运维成本,显著地提升人效。
Github: https://github.com/apache/streampark马蜂窝旅游网是中国领先的旅行玩乐平台之一,创立于 2006 年,从 2010 年正式开始公司化运营,十年来在旅游 UGC 内容领域累积了大量资源。马蜂窝旅游网是旅游社交网站,是数据驱动平台,也是新型旅游电商平台,提供全球 6 万个旅游目的地的交通、酒店、景点、餐饮、购物、当地玩乐等信息内容和产品预订服务。
业务背景与挑战
随着马蜂窝公司业务和大数据处理需求的不断增多,算力要求和机器使用成本也随之增加。为了应对这一挑战,降本提效变得至关重要。在马蜂窝数据平台,我们引入了多种计算引擎以满足不同场景的计算需求。在实际应用过程中,这些引擎部署在不同的资源上,如 StarRocks 运行在物理机上,而 Flink 和 Spark 则运行在 YARN 上,分散的资源使用方式,导致了运维成本高昂且资源利用率低下。为了解决这个问题,我们迫切需要一个能够统一管理和调度资源的方案。
经过深入研究,我们发现 Kubernetes 作为一种广泛使用的容器管理技术,具有出色的容器化应用管理能力,能够支持混合部署,提高资源利用率,同时确保应用的隔离性和安全性,并且 Kubernetes 在马蜂窝其他场景已经应用多年,有足够的技术沉淀。最终我们决定采用 Kubernetes 作为大数据资源的统一管理平台,并逐步将现有的大数据计算组件迁移到 Kubernetes 环境中。在云原生技术日益盛行的背景下,我们已经成功迁移了Apache Spark、Presto 和 StarRocks 等集群,而 Flink on Kubernetes 的迁移则是我们接下来的重要任务。本文旨在分享马蜂窝在 Apache StreamPark 方面的实践经验,以及它如何助力我们实现 Flink on Kubernetes 的迁移。我们将详细阐述迁移过程中遇到的一些问题,使用 StreamPark 所带来的显著效益,并展望未来的发展规划。
选择 Apache StreamPark
在采用 Apache StreamPark 之前,我们团队自主研发了实时计算管理平台 DP,其功能与业界常见的 Flink 管理平台相似,满足了开发人员对 Flink 作业管理与运维的基本需求。然而,面对 Flink on Kubernetes 云原生的支持,我们的现有平台显得力不从心。
因此我们面临两种选择:一是对现有平台进行扩展改造以支持云原生,二是寻找并采纳合适的开源项目。在深入评估后,我们发现 StreamPark 的功能与我们的需求高度契合。与其他开源工具以及马蜂窝的实时计算管理平台相比,StreamPark 不仅覆盖了 DP 的现有功能,还在核心功能、可扩展性、操作体验等方面展现出明显优势。
更为关键的是,StreamPark 支持 Flink on Kubernetes 部署,这为我们降低了在现有平台上进行 Kubernetes 支持改造的复杂性和高成本。综合考虑,我们决定采用 Apache StreamPark,期望借助 StreamPark 简化 Flink on Kubernetes 的云原生流程,并大幅降低迁移成本。
接下来介绍 StreamPark 在马蜂窝的落地实践中我们遇到的一些问题,期望给社区的用户带来一些输入和参考。在马蜂窝,众多业务线都在开发实时作业,为了保障各业务之间的有效隔离,我们引入了租户管理的概念。StreamPark 为我们提供了出色的租户管理支持,这使得多租户管理更为便捷。
在实际操作中,我们首先根据公司内部的各个业务部门创建了对应的 Team,确保 Team 与业务部门之间有明确的一一对应关系。系统账号被明确分为两类:团队账号 和 系统管理员。团队账号 实质上就是租户账号。每个 Team 都会被分配一个专属账号。通过这个账号,团队成员可以轻松地管理其内部作业,包括作业的增加、删除、修改和查询,从而确保了业务的独立性和安全性。
而 系统管理员,也就是我们的 Admin 账号,拥有对整个系统操作的全面视角。管理员能够查看各业务线的作业执行情况和资源使用情况,这不仅有助于我们进行成本分摊,还便于在系统出现问题时,管理员能迅速追踪并定位问题所在。
Flink on Kubernetes 模式中,依赖 Docker 镜像提供作业依赖JAR包。在现有的StreamPark 方案中,是将基础镜像与用户 JAR 合并成一个 Fat 镜像,每次版本更新需要重新构建 Fat 镜像。我们优化了这一过程,所有 StreamPark 上的作业现在共享一个基础 Docker 镜像。在容器启动时,它们会从共享存储中动态拉取各自的作业 JAR 包。
具体实施步骤如下:首先,用户通过 StreamPark 上传的 JAR 包会被自动同步到以作业名称为前缀的 HDFS 路径上。接着,我们修改了 Flink 的代码,以便在作业启动时能够动态挂载所需的资源,通过在 Kubernetes Deployment 中定义一个自定义的 Init Container 来,这个初始容器会负责从 HDFS 下载用户作业的 JAR 包及其依赖,并通过 emptyDir 卷将它们挂载到主容器中。通过这种优化方式,我们解决了作业发布过程中频繁构建作业镜像的问题,实现了镜像的复用。提升了作业发布的速度,也降低了镜像仓库的管理成本和存储压力。DP平台(自研平台) 的历史作业迁移至 StreamPark,如果是使用方手动迁移将带来很大人工成本。因此,我们自研了自动迁移工具,简化这一过程。该工具利用我们之前自研的DP平台中保存的作业信息,通过转换,自动在 StreamPark 中创建作业,实现从 DP 到 StreamPark 的自动作业迁移。在马蜂窝,提交最多的作业类型是 Flink SQL 和 JAR 包作业,接下来我将分别介绍这两种作业在 StreamPark 平台的自动迁移过程。对于 JAR 包作业,得益于我们之前对镜像复用问题的改进,我们现在可以将 DP平台中保存的用户 JAR 包按照统一命名的路径上传到 HDFS 的作业库目录。这样,在 Flink on Kubernetes 环境下,作业 Pod 可以直接从 HDFS 拉取运行作业所需的 JAR 包。同时,在 StreamPark 中,我们会根据 DP平台中的作业元数据信息,转换成 StreamPark JAR 包作业所需的元数据信息,如 Flink版本、作业名称、并行度等,并将这些信息写入StreamPark 的元数据库中,从而自动创建作业。对于 Flink SQL 作业,除了需要进行与 JAR 包作业相似的元数据转换外,我们还需要考虑 Flink SQL 所依赖的 Flink Connector JAR 和 UDF JAR。为了提升用户体验和开发效率,我们将使用频率最高的 Flink Connector JAR 预先打包到 Flink 基础镜像中,因此在迁移过程中无需额外处理。然而,对于部分使用了 UDF JAR 的 Flink SQL作业,我们需要采取与 JAR 包作业相同的处理方式:在转换元数据并生成 StreamPark 作业之前,将 UDF 上传到 HDFS,并确保其命名规则与普通 JAR 包保持一致,以便在元数据转换时能够正确记录 JAR 包路径信息。最后,借助StreamPark开放的作业API,包括:我们能够自动化地完成作业的发布验证、启动以及状态获取等任务,进而实现整个迁移流程的自动化。Kubernetes 默认的调度器在灵活性和扩展性方面存在不足。为了满足我们的需求,我们期望能在 Kubernetes 上实现与 YARN 平台相似的使用方式,包括多队列调度、优先级调度以及资源抢占等功能,以降低使用和理解成本。为此,我们采用了 Yunikorn 资源管理,它是一种轻量级的通用资源调度器,可以为 Kubernetes 提供精细化的资源调度。Yunikorn 的队列管理与Hadoop 的 YARN 相似,使得开发者能够无缝对接,并已成为我们当前 Kubernetes 环境中的调度器。
在实际应用中,我们根据业务线划分了多个业务队列,并针对每个队列的业务资源使用情况配置了相应的资源保障。
kind: ConfigMap
metadata:
name: yunikorn-configs
data:
queues.yaml: |
partitions:
- name: default
queues:
- name: department1
submitacl: '*'
resources:
max:
{ memory: 1000Mi, vcore: 1000m }
guaranteed:
{ memory: 300Mi, vcore: 300m }
- name: department2
submitacl: '*'
resources:
max:
{ memory: 1000Mi, vcore: 1000m }
guaranteed:
{ memory: 300Mi, vcore: 300m }
...
为了提高资源利用率,我们为在线和离线负载的指定队列配置了资源争抢和队列优先级。
apiVersion: v1
kind: ConfigMap
metadata:
name: yunikorn-configs
data:
queues.yaml: |
partitions:
- name: default
queues:
- name: department1
properties:
preemption.policy: fence
preemption.delay: 10s
submitacl: '*'
resources:
max:
{ memory: 1000Mi, vcore: 1000m }
queues:
- name: online-workload
submitacl: '*'
resources:
guaranteed:
{memory: 300Mi, vcore: 300m}
properties:
priority.offset: "0"
- name: offline-workload
submitacl: '*'
resources:
guaranteed:
{memory: 300Mi, vcore: 300m}
properties:
priority.offset: "1000"
.......
由于我们的团队是按业务线划分的,这与 Yunikorn 配置的业务队列一一对应,这为Flink 与 Yunikorn 的集成提供了便利。为了实现这一集成,我们对 StreamPark 进行了改造。在作业提交时,系统会动态获取团队对应的队列名称,并在动态属性中自动为JobManager 和 TaskManager 添加标签,例如:-Dkubernetes.jobmanager.labels=queue:root.dm.flink 和
-Dkubernetes.taskmanager.labels=queue:root.dm.flink。这样,相应的 Flink 作业就可以被提交到其所属的业务队列中。马蜂窝对 StreamPark 进行了改造,为其告警信息中新增了“立即拉起”功能,为作业维护人员能够迅速响应并恢复作业提供便利。此外,我们也对 StreamPark 的告警方式进行了扩展,实现了与数据平台内部告警系统的无缝对接。对接流程如下:1. 在内部告警平台上,配置分派信息对特定的告警人员进行详尽的配置2. 在 StreamPark 中,配置内部告警平台的 Callback 地址,实现了两个系统之间的集成
在 StreamPark 落地实践中,我们也遇到了一些问题,这里记录下来,期望给社区的用户带来一些输入和参考。
在使用 CNI 或 Kubernetes 的网络化插件时,网络性能损耗是不可避免的。在我们的实际使用中,就经常遇到由网络问题导致的作业错误,具体表现为 Akka 框架中的请求超时异常,如:akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://xxx]] after [xxx ms]为了解决这类问题,通过适当调整Flink集群的网络相关参数来增强其网络容错能力。具体来说,可以考虑增加 akka.ask.timeout 、taskmanager.network.request-backoff.max 以及 heartbeat.timeout 等参数的值,减少因网络延迟或不稳定而导致的作业失败,使 Flink 集群在面对网络性能损耗时更加稳健可靠。在 Flink 运行在 Kubernetes 的环境中,为了处理内部节点间的通信以及与外部环境的交互,如与外部存储进行数据读写,我们需要确保 Kubernetes 内部能正确解析外部机器的 hostname 为真实IP。为此,我们选择了支持热更新的 CoreDNS,它是一个灵活、模块化且高性能的开源 DNS 服务器。通过利用在 ConfigMap 中定义的 Hostsfile 来配置 CoreDNS,Hostsfile 存储了 Kubernetes 外部所有大数据机器的 hostname 与 IP 映射,并能在机器变更时及时同步。apiVersion: v1
kind: ConfigMap
metadata:
name: coredns
namespace: kube-system
labels:
addonmanager.kubernetes.io/mode: EnsureExists
data:
Hostsfile: |
192.168.xx.11 host1
192.168.xx.12 host2
192.168.xx.13 host2
192.168.xx.14 host3
.......
Kubernetes 会自动将更新的 ConfigMap 内容挂载到 DNS 容器中,而 CoreDNS 能迅速识别并重新加载配置文件,实现新设置即时生效,从而极大地提升了系统的灵活性和可用性,且整个过程完全自动化,无需人工干预或重启服务。在 Flink 作业问题排查过程中,查看日志是至关重要的环节。当前,StreamPark 已经提供了查看 Flink 作业启动日志的功能。然而,在某些异常情况下,例如 Flink 作业重启或失败导致的 Kubernetes Pod 终止,相关日志也会随 Pod 的消失而丢失,这使得 StreamPark 在作业异常结束后无法查看这些日志。
为了解决这一问题,并支持在上述异常场景下对作业日志进行定位、查看和分析,我们采用了 hostPath 存储卷方案。具体而言,我们将 Kubernetes 节点的宿主机上的特定目录挂载到 Pod 中的 Flink 日志目录下。为了确保不同作业的日志得到有效区分,宿主机的这个目录会以作业名称为前缀,从而简化了日志管理和收集工作。
随后,宿主机上的 Flink 日志会通过 Filebeat 被自动采集并传输到 Kafka,进而写入 Elasticsearch 中,这样即使 Flink 作业异常结束,其相关日志依然可以被检索和查询。
带来的收益
Apache StreamPark 为我们使用方带来了显著的效率提升和维护成本的降低,自上线以来,我们收到了来自不同使用方的反馈。对于计算平台管理员而言,StreamPark 提供了多租户的隔离与管理功能,从而大大增强了平台的可控性,并有效降低了管理成本;对于开发者,与之前的 DP 平台相比,StreamPark 在易用性方面有了显著改进。它提供的一站式服务使得开发者能够在平台上直接完成作业的开发、编译和发布,这不仅提高了开发效率,还使开发者能够更专注于业务逻辑的开发与优化。此外,使用 Apache StreamPark 极大地简化了迁移流程,并降低了迁移成本。StreamPark 屏蔽了底层 Kubernetes 和 Yarn 环境以及资源类型之间的差异,使用户无需再为环境迁移和适配问题而分心。截至目前,我们已经成功跑通了整个迁移流程,并将上百个作业迁移至 StreamPark 平台。经过近半年的观察和验证,我们发现 StreamPark 以及在 Kubernetes 上运行的 Flink 作业都表现出了超出预期的稳定性。目前在马蜂窝,StreamPark 带来的具体成效包括:- StreamPark平台显著提升了作业的从开发到维护的整体效率达 500%,并在功能性、易用性和可维护性等方面较原 DP 平台有了显著的正向提升,通过问卷调查反馈用户满意度提升至 100%。
- 通过 Flink on Kubernetes 架构调整,去除强绑定 Hadoop 体系带来部署和运维成本,同时通过云原生统一资源的申请和调度模式,将离线资源和在线资源进行混部,结合 HPA、抢占式资源调度、不同负载的错峰策略,用于实时计算场景下的机器资源利用率提升 150%,机器成本降低了 50%。
- StreamPark 平台及批量迁移工具的引入,大大提升了 Flink on Kubernetes 云原生的实施效率,使项目完成时间比预期提前了一个月。
未来我们计划迁移更多计算组件至 kubernetes 集群,实现统一管理和资源优化,在 Kubernetes 统一管理调度下,通过智能利用空闲时段执行作业、合理利用资源抢占等方式提升资源利用率和高效运维。在本文的最后,要特别感谢 StreamPark 社区的大力支持,在 StreamPark 使用、问题追踪等方面给了很多建议和帮助,后面我们会持续跟社区保持密切交流,积极参与社区共建。期望 StreamPark 在未来做得越来越好,早日毕业成为顶级 Apache 项目!
Apache StreamPark 是一个流处理应用程序开发管理框架。旨在轻松构建和管理流处理应用程序,提供使用 Apache Flink 和 Apache Spark 编写流处理应用程序的开发框架和一站式流计算平台,核心能力包括但不限于应用开发、调试、交互查询、部署、运维、实时数仓等。目前已有腾讯、百度、联通、天翼云、自如、马蜂窝、同程数科、长安汽车、大健云仓等众多公司在生产环境使用、并且获得了多项业内荣誉,是近年来成长较快的开源项目。💻 项目地址:https://github.com/apache/streampark🧐 提交问题和建议:https://github.com/apache/streampark/issues🥁 贡献代码:https://github.com/apache/streampark/pulls📮 Proposal:https://cwiki.apache.org/confluence/display/INCUBATOR/StreamPark+Proposal📧 订阅社区开发邮件列表:dev@streampark.apache.org