Argo Workflows 简介
Cloud Native
Argo Workflows(https://github.com/argoproj/argo-workflows)是 CNCF 毕业项目,专为 Kubernetes 上编排并行 Job 而设计,将工作流中的每一个任务实现为一个单独的容器实例单独运行,具有轻量级、易扩展、并行性高等特点。
Argo Workflows 主要被应用在以下的场景:
批处理系统:对于需要定期或按需执行的大规模数据处理任务,如ETL作业、数据分析报告生成等,Argo Workflows 提供了一种声明式的方式来定义和执行这些批处理作业。
机器学习工作流:在机器学习项目中,Argo Workflows 可以协调数据预处理、模型训练、验证、调参和部署等步骤,同时利用 Kubernetes 的资源调度能力,高效地分配GPU等资源,支持大规模并行计算需求。
基础设施自动化:在管理云原生基础设施时,可用于执行一系列自动化任务,比如创建和配置 Kubernetes 资源、执行备份恢复操作、监控系统的健康状态等。
CI/CD:持续集成和持续部署流程通常包含代码构建、测试、部署等多个阶段。Argo Workflows 能够很好地整合这些步骤,实现自动化流水线,提高软件交付的速度和质量。
本文将会对 Argo Workflows 3.6 版本的关键新特性做一个深入的解析。
新特性解析
Cloud Native
1)CronWorkflows:调度策略增强
多个 cron scheduler 调度:可以在单个 CronWorkflow 中集成多个定时调度策略来进行工作流调度。
增加停止策略:可以设定策略在特定情况下停止调度,可以避免定时工作流持续失败,导致集群中失败工作流积压。
When 表达式:在每次调度之前检查表达式是否为 true,提供了和 cron scheduler 更灵活的组合机制。
apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
name: cron-workflow-example
spec:
schedules: # 多个调度策略,每3分钟和5分钟执行一次,在第15分钟只执行一次。
- "*/3 * * * *"
- "*/5 * * * *"
concurrencyPolicy: "Allow"
stopStrategy: # 在Faild的工作流超过10个之后停止该Cron Workflow
condition: "failed >= 10"
# 通过表达式限制在两次执行间隔超过3600s
when: "{{= cronworkflow.lastScheduledTime == nil || (now() - cronworkflow.lastScheduledTime).Seconds() > 3600 }}"
startingDeadlineSeconds: 0
workflowSpec:
entrypoint: whalesay
templates:
- name: whalesay
container:
image: alpine:3.6
command: [sh, -c]
args: ["date; sleep 1"]
2)用户界面优化
这些增强完善了用户界面的可用性、可观测性,方便用户能够更好的观测自己的工作流状况。
3)Argo Workflows 控制器:大规模、稳定性、安全性、功能增强
归档工作流使用队列,改善了同时归档大量工作流时的内存管理。
并行清理 Pod,在 Retry 超大工作流时非常有用,可以在容忍时间内完成 Retry。
Pod 增加 Pod Kubernetes finalizer, 避免过早删除导致出现"Pod Delete" Error,便于控制器 Reconceil。
超大扁平工作流并行解析,可以让大型工作流的解析更快。
超大参数自动 Offload,可以支持更长的启动参数,这对于大型的科学模拟场景很有帮助。
自动设置 seccomp profile 为 RuntimeDefault,提高容器安全性,降低被攻击的风险。
a. OSS Artifacts 自动回收
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: artifact-gc-
spec:
entrypoint: main
artifactGC:
strategy: OnWorkflowDeletion # the overall strategy, which can be overridden
podMetadata:
annotations:
kubernetes.io/resource-type: eci
templates:
- name: main
container:
image: argoproj/argosay:v2
command:
- sh
- -c
args:
- |
echo "hello world" > /tmp/on-completion.txt
echo "hello world" > /tmp/on-deletion.txt
outputs:
artifacts:
- name: on-completion # 该Artifact会在工作流完成时回收
path: /tmp/on-completion.txt
oss:
endpoint: http://oss-cn-zhangjiakou-internal.aliyuncs.com
bucket: my-argo-workflow
key: on-completion.txt
accessKeySecret:
name: my-argo-workflow-credentials
key: accessKey
secretKeySecret:
name: my-argo-workflow-credentials
key: secretKey
artifactGC:
strategy: OnWorkflowCompletion # overriding the default strategy for this artifact
- name: on-deletion # 该Artifact会在工作流被删除时回收
path: /tmp/on-deletion.txt
oss:
endpoint: http://oss-cn-zhangjiakou-internal.aliyuncs.com
bucket: my-argo-workflow
key: on-deletion.txt
accessKeySecret:
name: my-argo-workflow-credentials
key: accessKey
secretKeySecret:
name: my-argo-workflow-credentials
key: secretKey
b. 模板支持调度约束
apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
name: benchmarks
spec:
entrypoint: main
serviceAccountName: workflow
templates:
- dag:
tasks:
- arguments:
parameters:
- name: msg
value: 'hello'
name: benchmark
template: benchmark
name: main
nodeSelector: # 模版上定义节点选择器,该选择器会传递到Pod上
pool: workflows
tolerations: # 模版上定义容忍,该容忍会传递到Pod上
- key: pool
operator: Equal
value: workflows
- inputs:
parameters:
- name: msg
name: benchmark
script:
command:
- python
image: python:latest
source: |
print("{{inputs.parameters.msg}}")
c. 支持动态模板引用
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: hello-world-wf-global-arg-
namespace: default
spec:
entrypoint: whalesay
arguments:
parameters:
- name: global-parameter
value: hello
templates:
- name: whalesay
steps:
- - name: hello-world
templateRef: # Step 中动态模板引用
name: '{{item.workflow-template}}' # 从循环中读取需要调用的模板工作流
template: '{{item.template-name}}' # 从循环中读取template名称
withItems: # 定义循环参数
- { workflow-template: 'hello-world-template-global-arg', template-name: 'hello-world'}
- name: hello-world-dag
template: diamond
- name: diamond
dag:
tasks:
- name: A
templateRef: # DAG 中动态模板引用
name: '{{item.workflow-template}}' # 从循环中读取需要调用的模板工作流
template: '{{item.template-name}}' # 从循环中读取template名称
withItems:
- { workflow-template: 'hello-world-template-global-arg', template-name: 'hello-world'}
d. 更新 expr 库,支持多种函数。
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: test-expression-
namespace: argo
spec:
entrypoint: main
arguments:
parameters:
- name: expr
value: "{{= concat(['a', 'b'], ['c', 'd']) | join('\\n') }}" # 使用列表拼接和字符串合并函数
templates:
- name: main
inputs:
parameters:
- name: expr
script:
image: alpine:3.6
command: ["sh"]
source: |
echo result: '{{ inputs.parameters.expr }}'
4)Argo CLI:模板易用性
argo cron update FILE1 # 更新定时工作流
argo template update FILE1 # 更新 workflow-template
argo cluster-template update FILE1 # 更新 cluster-workflow-template
通过 label 过滤模板,有助于对模板进行分类管理:
argo template list -l app=test # 通过label进行过滤
快速使用 Argo Workflows
Cloud Native
Argo Workflows 作为一款云原生的批量任务编排引擎,是在 Kubernetes 上编排各类型任务、提高业务自动化水平的必备利器,无论您是企业的架构师、数据科学家、还是 DevOps 工程师,都能使用 Argo Workflows 提高您的工作效率。
简单易用:托管核心组件,完全免运维,提供 RestAPI 和 Python SDK,集成简单。
稳定高性能:控制面优化,支持大规模工作流编排,整体规模可达到 4w。
产品化支持:众多领域最佳实践,构建高效工作流,用户只需关注业务创新。
参考文档:
[2] Argo Workflows
https://github.com/argoproj/argo-workflows
[3] Serverless Argo Workflows
https://help.aliyun.com/zh/ack/distributed-cloud-container-platform-for-kubernetes/user-guide/overview-12
https://help.aliyun.com/zh/ack/distributed-cloud-container-platform-for-kubernetes/user-guide/best-practices
文章转载自阿里云云原生。点击这里阅读原文了解更多。
CNCF概况(幻灯片)
扫描二维码联系我们!
CNCF (Cloud Native Computing Foundation)成立于2015年12月,隶属于Linux Foundation,是非营利性组织。
CNCF(云原生计算基金会)致力于培育和维护一个厂商中立的开源生态系统,来推广云原生技术。我们通过将最前沿的模式民主化,让这些创新为大众所用。请关注CNCF微信公众号。