解密开源Serverless容器框架:事件驱动篇

文摘   2024-12-16 17:00   浙江  



关于事件驱动




早在2018年,Gartner评估报告将Event-Driven Model列为10大战略技术趋势之一,并强调事件驱动架构 (EDA) 是技术和软件领域发展的主要驱动力。


事件驱动是指在分布式系统中,各个组件之间的交互基于事件通信,而非直接的请求-响应模式,具有异步、松散耦合等特征。在EDA中,组件之间通过发布(Publish)和订阅(Subscribe)事件来实现协作,事件可以是用户操作、系统状态变化、传感器数据等。





云原生Serverless事件

驱动框架:Knative Eventing




Knative是一款基于Kubernetes集群的开源Serverless框架,提供了云原生、跨平台的Serverless 编排标准。作为Serverless 中必不可少的事件驱动能力,Knative Eventing提供了云原生的事件驱动能力。


Knative Eventing是一个独立平台,为各种类型的工作负载提供支持,包括标准Kubernetes服务和Knative Serving服务。使用标准HTTP POST请求在事件生产者和接收器之间发送和接收事件。这些事件符合CloudEvents规范,支持使用任何编程语言创建、解析、发送和接收事件。此外Knative Eventing组件是松散耦合的,可以彼此独立开发和部署。


Knative Eventing一般使用场景:


发布事件:可以将事件作为HTTP POST发送到代理(broker),与生产事件的应用程序解耦。


消费事件:可以使用触发器(trigger)根据事件属性消费来自代理的事件。消费服务以HTTP POST形式接收事件。





Knative中的事件网格

(Broker/Trigger)




事件网格是一种动态基础设施,旨在简化从发送者到接收者的事件分发。与Apache Kafka或RabbitMQ等传统消息通道架构类似,事件网格提供异步(存储转发)消息传递,允许及时解耦发送者和接收者。不过与传统的基于消息通道的集成模式不同,事件网格还通过将发送者/接收者与底层事件传输基础设施(可能是一组联合的解决方案,如Kafka、RabbitMQ或云提供商基础设施)解耦,简化了发送者和接收者的路由问题。事件网格通过任何环境中互连的事件Broker将事件从生产者传输到消费者,甚至以无缝和松散耦合的方式在云之间传输。


图片来源于Knative社区


如上图所示,Knative事件网格定义了事件入口和出口的Broker和TriggerAPI。Knative Eventing使用一种称为“鸭子类型”的模式,允许多种资源参与事件网格。鸭子类型允许多种资源类型宣传通用功能,例如“可以在URL处接收事件”或“可以将事件传递到目的地”。Knative Eventing使用这些功能来提供可互操作的源池,用于将事件发送到Broker和作为路由事件的目的地Trigger。Knative Eventing API包含三类API:


事件入口:支持连接事件发送者:Source duck type和SinkBinding,支持轻松配置应用程序以将事件传递到Broker。即使没有安装任何源,应用程序也可以提交事件并使用Eventing。


事件路由:Broker和Trigger对象支持定义网格和事件路由。请注意,Broker符合可寻址事件目标的定义,因此可以将事件从一个集群中的Broker中继到另一个集群中的Broker。同样,Trigger使用与许多源相同的Deliverable鸭子类型,因此很容易用事件网格替代直接传递事件。


事件出口:Deliverable契约支持指定裸URL或引用实现Addressable接口(具有)的Kubernetes对象status.address.url作为目的地。



- 事件源 -


Knative事件源


Knative社区提供了丰富的事件源支持,主要如下:


APIServerSource:将Kubernetes API服务器事件引入 Knative。每次创建、更新或删除Kubernetes资源时,APIServerSource都会触发一个新事件。


PingSource:定时发送一条Ping的事件通知。


Apache CouchDB:将Apache CouchDB消息传入Knative。


Apache Kafka:KafkaSource从Apache Kafka集群读取事件,并将这些事件传递到发送给事件消费端。


RabbitMQ


GitHub


GitLab


RedisSource


此外还支持Apache Camel、VMware等第三方事件源。



- 事件转发 -


Broker/Trigger事件转发流程


我们以InMemoryChannel (IMC) 事件处理流程为例,介绍 Knative中事件处理流程。


在Knative Eventing中使用Broker/Trigger模型需要选择相应的Channel, 也就是事件流转系统,当前社区支持Kafka、NATS Streaming、InMemoryChannel事件转发通道,默认是 InMemoryChannel。



其中几个关键组件说明如下:


Ingress: Broker/Trigger模型中,事件接收入口,用于接收事件并转发到对应的Channel服务上。


imc-dispatch: InMemoryChannel事件转发服务。用于接收Ingress过来的事件请求,并根据InMemoryChannel中描述的转发目标(subscription)Fan Out到Filter服务上。


Filter: 用于事件过滤,实现逻辑是基于Trigger中定义的过滤规则,进行事件过滤,并最终转发到对应的目标服务上。


以如下Broker示例进行解释说明:


apiVersion: eventing.knative.dev/v1kind: Brokermetadata:  generation: 1  name: default  namespace: defaultspec:  config:    apiVersion: v1    kind: ConfigMap    name: config-br-default-channel    namespace: knative-eventing  delivery:    backoffDelay: PT0.2S    backoffPolicy: exponential    retry: 10status:  address:    name: http    url: http://broker-ingress.knative-eventing.svc.cluster.local/default/default  annotations:    knative.dev/channelAPIVersion: messaging.knative.dev/v1    knative.dev/channelAddress: http://default-kne-trigger-kn-channel.default.svc.cluster.local    knative.dev/channelKind: InMemoryChannel    knative.dev/channelName: default-kne-trigger  ...


这里status.address.url 实际传入的参数: http://broker-ingress.knative-eventing.svc.cluster.local/{namespace}/{broker}。也就是说每个broker创建完成之后,会在Ingress服务中对应请求的Path。


Ingress接收到事件之后转发到哪里呢?它会根据status中 knative.dev/channelAddress地址进行转发。在 IMC Channel中,会转发到default-kne-trigger-kn-channel服务。


http://default-kne-trigger-kn-channel.default.svc.cluster.local


我们接着看这个default-kne-trigger-kn-channel服务对应哪里,它实际对应了imc-dispatcher pod,也就是说通过dispatcher进行转发。


kubectl get svc default-kne-trigger-kn-channelNAME                             TYPE           CLUSTER-IP   EXTERNAL-IP                                         PORT(S)   AGEdefault-kne-trigger-kn-channel   ExternalName   <none>       imc-dispatcher.knative-eventing.svc.cluster.local   80/TCP    98m


dispatcher中核心处理在Fanout Handler中,它负责将接收到的事件分发到不同的Subscription。


现在来看一下InMemoryChannel的配置, 里面定义了所转发的 Subscription:


prapiVersion: messaging.knative.dev/v1kind: InMemoryChannelmetadata:  labels:    eventing.knative.dev/broker: default    eventing.knative.dev/brokerEverything: "true"  name: default-kne-trigger  namespace: default  ownerReferences:  - apiVersion: eventing.knative.dev/v1    blockOwnerDeletion: true    controller: true    kind: Broker    name: default    uid: cb148e43-6e6c-45b0-a7b9-c5b1d81eeeb6spec:  delivery:    backoffDelay: PT0.2S    backoffPolicy: exponential    retry: 10  subscribers:  - delivery:      backoffDelay: PT0.2S      backoffPolicy: exponential      retry: 10    generation: 1    replyUri: http://broker-ingress.knative-eventing.svc.cluster.local/default/default    subscriberUri: http://broker-filter.knative-eventing.svc.cluster.local/triggers/default/my-service-trigger/f8df36a0-df4c-47cb-8c9b-1405111aa7dd    uid: 382fe07c-ce4d-409b-a316-9be0b585183astatus:  address:    name: http    url: http://default-kne-trigger-kn-channel.default.svc.cluster.local  ...  subscribers:  - observedGeneration: 1    ready: "True"    uid: 382fe07c-ce4d-409b-a316-9be0b585183a


这里对应的http://broker-filter.knative-eventing.svc.cluster.local/triggers/default/my-service-trigger/f8df36a0-df4c-47cb-8c9b-1405111aa7dd服务。也就是对应转发到broker filter服务上。


然后filter服务根据trigger中定义的过滤条件(filter属性)进行事件过滤,将过滤之后的事件发送到status中的subscriberUri访问地址。这里为http://event-display.default.svc.cluster.local。


apiVersion: eventing.knative.dev/v1kind: Triggermetadata:  labels:    eventing.knative.dev/broker: default  name: my-service-trigger  namespace: defaultspec:  broker: default  filter: {}  subscriber:    ref:      apiVersion: serving.knative.dev/v1      kind: Service      name: event-display      namespace: defaultstatus:  ...  observedGeneration: 1  subscriberUri: http://event-display.default.svc.cluster.local


到此,基于Broker/Trigger模型使用inmemorychannel channel转发的整个流程结束。


curl -v "http://172.16.85.64/default/default" -X POST -H "Ce-Id: 536808d3-88be-4077-9d7a-a3f162705f79" -H "Ce-Specversion: 1.0" -H "Ce-Type: dev.knative.samples.helloworld" -H "Ce-Source: dev.knative.samples/helloworldsource" -H "Content-Type: application/json" -d '{"msg":"Hello World from the curl pod."}'


2024/09/23 03:25:23 receive cloudevents.Event: %!(EXTRA string=Validation: valid Context Attributes,   specversion: 1.0   type: dev.knative.samples.helloworld   source: dev.knative.samples/helloworldsource   id: 536808d3-88be-4077-9d7a-a3f162705f79   time: 2024-09-23T03:25:03.355819672Z   datacontenttype: application/json Extensions,   knativearrivaltime: 2024-09-23T03:25:23.380731115Z Data,   {     "msg": "Hello World from the curl pod."   } )



- 事件编排 -


Knative Eventing提供了2类自定义资源(CRDs)用于定义事件编排流程:


Sequence :顺序事件处理流程


Parallel:并行事件处理流程


Sequence


Sequence是将多个Knative服务顺序编排在一起,并将处理之后的结果输出作为下一个服务的输入。配置示例如下:


apiVersion: flows.knative.dev/v1kind: Sequencemetadata:  name: sequencespec:  channelTemplate:    apiVersion: messaging.knative.dev/v1    kind: InMemoryChannel  steps:    - ref:        apiVersion: serving.knative.dev/v1        kind: Service        name: first    - ref:        apiVersion: serving.knative.dev/v1        kind: Service        name: second    - ref:        apiVersion: serving.knative.dev/v1        kind: Service        name: third  reply:    ref:      kind: Service      apiVersion: serving.knative.dev/v1      name: event-display


使用场景包括:


顺序处理


如图创建一个PingSource,将事件提供给Sequence,然后获取该Sequence的输出并显示结果输出。



Sequence连接另外一个Sequence


如图创建一个PingSource,将事件发送到Sequence服务中,然后获取该Sequence服务处理的输出并将其发送到第二个Sequence服务,最后显示结果输出。



直接处理


如图创建一个PingSource,将事件提供给Sequence。然后直接通过Sequence顺序执行服务。



通过Broker/Trigger模型


如图创建一个PingSource,将事件输入Broker,然后创建一个 Filter,将这些事件连接到由3个服务组成的Sequence中。然后,我们获取Sequence的输出,并将新创建的事件发送回Broker,并创建另一个Trigger,最后通过EventDisplay服务打印这些事件。



Parallel


Parallel是Knative Eventing中定义的并行处理工作流,配置示例如下:


apiVersion: flows.knative.dev/v1kind: Parallelmetadata:  name: demo-parallel  namespace: defaultspec:  branches:  - subscriber:      ref:        apiVersion: v1        kind: Service        name: demo-ksvc1        namespace: default  - subscriber:      ref:        apiVersion: v1        kind: Service        name: demo-ksvc2        namespace: default  channelTemplate:    apiVersion: messaging.knative.dev/v1    kind: InMemoryChannel


Parallel事件处理流程如图:



在Trigger中配置目标为Parallel即可,然后通过Parallel可以配置多个目标服务,系统默认创建相应的Subscription用于转发到对应的Knative Service服务。





与EventBridge结合




Knative Eventing中默认的InMemoryChannel是基于内存的Channel,目前社区并不推荐生产环境使用。建议使用Kafka、EventBridge等消息或事件驱动产品。


事件总线EventBridge是阿里云提供的一款无服务器事件总线服务,支持阿里云服务、自定义应用、SaaS应用以标准化、中心化的方式接入,并能够以标准化的CloudEvents 1.0协议在这些应用之间路由事件,帮助您轻松构建松耦合、分布式的事件驱动架构。


EventBridge支持多种数据源接入,通过配置事件总线、事件规则、事件目标,经过事件过滤、转换,可以触发EventBridge事件驱动Knative服务消费事件,实现资源的按需使用。技术架构图如下所示:



当前阿里云容器服务Knative中提供了一键配置Trigger的产品化能力,如图:



通过EventBridge投递事件效果如下:



与EventBridge结合的优势如下:


标准与生态


兼容CloudEvents协议,全面拥抱开源社区生态;


集成阿里云更多的事件源与事件目标处理服务,覆盖大部分用户的场景。


高吞吐和容灾能力


底层基于高吞吐、高可靠、多副本容灾的消息内核作为存储;


提供事件回放、事件轨迹追踪等差异化特性。


功能完善


简单灵活的配置,支持事件过滤和事件路由;


提供跨Region、混合云、多云的事件推送能力。


配套工具链


schema中心化存储与多语言映射,提升事件处理协作效率;


schema发现,自动注册与校验,IDE插件集成。


可观测性&可治理性


提供事件的可观测性能力,支持事件查询、审计以及全链路的追踪;


提供事件的可治理能力,支持事件流控、事件回放、事件重试策略等。





与KEDA的差异




说起K8s中事件驱动,KEDA也是一个广为熟知的事件驱动服务。 那我对于KEDA与Knative Eventing二者有什么不同,经常有些疑问。这里从我的角度来解释一下。其实从这二者的定位,就更容易进行区分。


先看KEDA官方定义:


KEDA is a Kubernetes-based Event Driven Autoscaler. With KEDA, you can drive the scaling of any container in Kubernetes based on the number of events needing to be processed.


KEDA是一个基于Kubernetes的事件驱动自动缩放器。借助 KEDA,您可以根据需要处理的事件数量来驱动Kubernetes中任何容器的扩缩容。我们可以简单理解为KEDA是支持多种指标能力增强HPA,它基于事件数量的指标,扩容Pod,然后Pod自身负责从对应的服务中获取事件进行消费。在整个事件驱动流程中KEDA并不会接管事件的流转,它仅根据事件指标扩缩容消费服务的Pod数量,可以说是一个非常简单、易上手的事件驱动弹性服务。


再看Knative Eventing:


Knative Eventing is a collection of APIs that enable you to use an event-driven architecture with your applications.


就像本文所介绍的,Knative Eventing则接管了事件的编排、流转、规则过滤、分发,提供了一个比较完整的事件驱动框架。


因此对于选择Knative Eventing和KEDA,最好结合自身的使用场景进行选择。





一个有趣的场景




最后在Knative中基于事件驱动分享一个简单的demo场景。


我们知道,口渴了,那么要喝水。人体执行这个简单的事情,处理流程可以简单抽象为下图:



那么在Knative中模拟这个场景,可以进行如下操作:


发送一个口渴的事件


千问大模型接收到口渴输入,给出决策


执行模拟喝水的服务



根据相关的健康饮水指引,上班族应该形成良好的喝水习惯,每1小时应该喝一次水。我们这里假设通过PingSource模拟每1个小时发送一个口渴的信号。


apiVersion: sources.knative.dev/v1kind: PingSourcemetadata:  name: ping-sourcespec:  schedule: "* */1 * * *"  contentType: "application/json"  data: '{"model": "qwen", "messages": [{"role": "user", "content": "渴了"}], "max_tokens": 10, "temperature": 0.7, "top_p": 0.9, "seed": 10}'  sink:    ref:      apiVersion: flows.knative.dev/v1      kind: Sequence      name: sequence


通过Sequence进行编排,将接收到的信号发送给qwen大模型服务(参考:基于Knative部署vLLM推理应用[1])。


apiVersion: flows.knative.dev/v1kind: Sequencemetadata:  name: sequencespec:  channelTemplate:    apiVersion: messaging.knative.dev/v1    kind: InMemoryChannel  steps:    - ref:        apiVersion: serving.knative.dev/v1        kind: Service        name: qwen  reply:    ref:      kind: Service      apiVersion: serving.knative.dev/v1      name: drink-svc


qwen服务给出决策结果如下:


{"id":"cmpl-6251aab6a0dc4932beb82714373db2ac","object":"chat.completion","created":1733899095,"model":"qwen","choices":[{"index":0,"message":{"role":"assistant","content":"如果你感到口渴,可以尝试喝一些水"},"logprobs":null,"finish_reason":"length","stop_reason":null}],"usage":{"prompt_tokens":10,"total_tokens":20,"completion_tokens":10}}


然后调用drink-svc喝水服务


apiVersion: serving.knative.dev/v1kind: Servicemetadata:  name: drink-svc  namespace: defaultspec:  template:    spec:      containers:      - image: registry.cn-hangzhou.aliyuncs.com/knative-sample/event-display:v1211-action        env:        - name: ACTION          value: "drink water"


这里直接打印输出结果:


# 日志输出结果如下所示:ACTION: drink water


最后我们再来看一张Nvidia对Agentic AI[2]场景定义的图片:



Agentic AI是具有更高程度自主性的AI系统,它们能够主动思考、规划和执行任务,而不仅仅依赖于预设的指令。因此在 Agentic AI应用场景下,或许Knative可以提供一些帮助。





小结




Knative Eventing提供了一款开源的云原生Serverless事件驱动框架,相信在Serverless与AI进行结合的场景中有更多的应用空间。欢迎有兴趣的加入阿里云Knative钉钉交流群(群号: 23302777)。


参考:


[1]基于Knative部署vLLM推理应用:


https://help.aliyun.com/zh/ack/ack-managed-and-ack-dedicated/user-guide/deploy-a-vllm-inference-application-based-on-knative


[2]What Is Agentic AI?


https://blogs.nvidia.com/blog/what-is-agentic-ai/


/ END /

阿里云基础设施
为了永不停机的计算服务
 最新文章