深入浅出 OpenAI Swarm 源码二:多 Agent 框架调度流程

文摘   2024-10-31 16:03   北京  


作者陈迪豪——顺丰科技AI技术平台高级工程师

负责顺丰集团AI和大模型基础架构功能,曾任第四范式平台架构师和OpenMLDB项目PMC,过去在小米担心云深度学习平台架构师以及优思德云计算公司存储和容器团队负责人。活跃于>分布式系统、机器学习相关的开源社区,也是HBase、OpenStack、TensorFlow、TVM等开源项目贡献者。


简介

请首先阅读前文《tobe:深入浅出 OpenAI Swarm 源码一:多 Agent 调度框架概念抽象》,本文将介绍 OpenAI Swarm 的多 Agent 完整调度流程,从源码角度介绍其实现原理。

其他领域抽象

这里介绍剩下的两个领域抽象,包括 Response 和 Result,其中 Response 源码如下。

class Response(BaseModel):
messages: List = []
agent: Optional[Agent] = None
context_variables: dict = {}

这个表示一次 Agent run 返回的结果,同时也是 handle_tool_calls 返回的对象,因为 Agent run 的逻辑第一步是调用 Agent 本身的大模型来分析需要调用的 function,第二步就是调用 handle_tool_calls 函数,因此两者可以统一都认为是用户定义的 Python function 的返回结果。

调用 handle_tool_calls 函数返回的其实是 partial_response,第一变量是大模型返回的消息组成的结果,例如大模型返回的 tool call 的 id,用户定义的 Python 函数的 __name__ 对象,以及执行 Python 函数的返回结果。

[{
'role': 'assistant',
'content': None,
'refusal': None,
'audio': None,
'function_call': None,
'tool_calls': [
{
'id': 'call_hUPRYkX5pcL4FxGQRYRTCpnf',
'function': {
'arguments': '{}',
'name': 'transfer_to_refunds'
},
'type': 'function'
}
],
'sender': 'Triage Agent'
},{
'role': 'tool',
'tool_call_id': 'call_hUPRYkX5pcL4FxGQRYRTCpnf',
'tool_name': 'transfer_to_refunds',
'content': '{"assistant": "Refunds Agent"}',
}]

第二个参数是是否返回新的 Agent,第三个参数是全局的 context_variables。然后这个 partial_response 会一直加到 history 中,因此 Agent run 返回的 Response 对象除了第一个参数是包含历史所有 history message 的,另外两个参数基本是继承 handle_tool_calls 返回的。

还有一个领域概念是 Result,源码如下。

class Result(BaseModel):
value: str = ""
agent: Optional[Agent] = None
context_variables: dict = {}

这个其实只有处理用户 Python 函数结果时用到,代码如下。

raw_result = function_map[name](**args)

result: Result = self.handle_function_result(raw_result, debug)

首先通过 Python 的反射机制运行函数,然后对 Python 函数返回的对象封装成 Result 对象,代码如下。

    def handle_function_result(self, result, debug) -> Result:
match result:
case Result() as result:
return result

case Agent() as agent:
return Result(
value=json.dumps({"assistant": agent.name}),
agent=agent,
)
case _:
try:
return Result(value=str(result))
except Exception as e:
error_message = f"Failed to cast response to string: {result}. Make sure agent functions return a string or Result object. Error: {str(e)}"
debug_print(debug, error_message)
raise TypeError(error_message)

这里逻辑就是用户如果返回 Agent 对象,那么就设置 Result 的 agent 参数为 Agent 对象,其他情况就只设置 value 参数,这个 value 其实就是 Agent 最终返回的结果。


run 函数执行流程

了解前面的概念抽象,我们可以直接阅读 run 函数,对关键步骤抽象如下。

    def run() -> Response:

......

while len(history) - init_len < max_turns and active_agent:

# get completion with current history, agent
completion = self.get_chat_completion(
agent=active_agent,
history=history,
context_variables=context_variables,
model_override=model_override,
stream=stream,
debug=debug,
)
message = completion.choices[0].message

......

if not message.tool_calls or not execute_tools:
debug_print(debug, "Ending turn.")
break

......

# handle function calls, updating context_variables, and switching agents
partial_response = self.handle_tool_calls(
message.tool_calls, active_agent.functions, context_variables, debug
)

......

if partial_response.agent:
active_agent = partial_response.agent

return Response(
messages=history[init_len:],
agent=active_agent,
context_variables=context_variables,
)

从主流程可以看到,Agent 的运行也是一个循环,首先是用 Agent 本身的 system prompt 和 functions 来调用大模型输出,如果直接返回结果没有使用 function,那么就可以直接返回结果了。如果有要使用的 function,那么就需要调用 handle_tool_calls 函数来执行 function,并且处理函数返回的结果,如果返回结果是一个 Agent,那么把 active agent 改为前面返回的 Agent 继续调用大模型,这样就可以循环多次来调用了。

总结一些多 Agent 调度流程,首先是基于大模型的 Function call 来实现的,如果 function 返回 Agent 对象则 handoff 给 Agent 继续执行。讲人话就是每个 Agent 可以注册很多工具,其中一些工具是移交工作,大模型会判断是使用工具或者直接输出,使用工具的话可以直接返回,也可以使用移交工作的工具让其他 Agent 在这个上下文中执行。


handle_tool_calls 函数执行流程

为了了解前面介绍的 funtions 是如何工作的,可以查看 handle_tool_calls 函数的实现。

    def handle_tool_calls() -> Response:
function_map = {f.__name__: f for f in functions}

......

for tool_call in tool_calls:
name = tool_call.function.name

......

func = function_map[name]

......

raw_result = function_map[name](**args)

result: Result = self.handle_function_result(raw_result, debug)

......

if result.agent:
partial_response.agent = result.agent

return partial_response

首先是获取这个 Agent 的 functions 函数名,然后根据前面大模型的输出拿到需要调用的 function 的名字,通过大模型返回的 args 参数,使用 Python 函数调用的方法执行。这里对结果进行了额外的处理,调用的 handle_function_result 函数在前面也介绍过,就是如果有返回 Agent 对象就单独提取出来。每个 function 执行的结果都会添加到返回的 Response 对象中,并且合并 context_variables ,如果其中一个 function 返回了 Agent 则后面也要基于新的 Agent 继续执行。


理解多 Agent 调度

在了解 Swarm 的 run 函数逻辑之前,首先介绍 Agent 的概念抽象。Agent 类的定义也比较简单,代码如下。

class Agent(BaseModel):
n
看完了 OpenAI Swarm 全部的实现代码,可以概括一下多 Agent 的调度实现原理。首先是使用大模型本身的 function call 能力,用户在 Python 中定义的函数会结构化成 JSON 字符串添加到大模型的请求参数重,大模型选择执行 function 后,如果其中至少一个 function 返回了 Agent 对象,那么就基于新 Agent 和历史消息继续执行,从而完成 handoff 或者所谓调度的工作。


总结

从 OpenAI Swarm 的项目文档中,可以了解这是一个轻量级的调度工具,并且提供了 Swarm、Agent、handoff 等概念和功能,从而实现复杂场景的自动化路由或者调度功能。但通过阅读源码,我们发现原理比描述更简单,就是使用 Function call 能力,并且提供 Python 函数的友好接口,让用户定义 Agent 或者 handoff 更加简单。

想要了解更多Agent应用落地情况,就来11月8-9日,在深圳举办的AI+研发数字峰会(AiDD)吧,我将带来改造 Dify 实现生产可用的 AI Agent 应用落地》主题演讲,将会介绍顺丰科技内部对 Dify 的改造,以及实际的应用落地案例。 

AiDD峰会以AI驱动研发变革,促进企业降本增效”为主题。邀请60+工业界和学术界的技术专家与会分享,聚焦“AI+产品、开发、测试、工程、领域”5大条线,设立15大论坛,内容覆盖内容涵盖当下实时热点:AI Agents、大模型对齐与安全、端侧大模型与云端协同、领域大模型SFT与优化、知识增强与数据智能、AI+流程自动化、AI驱动产品及设计创新、LLM驱动编程与单测等精彩内容,聚焦于实践操作与经验共享。欢迎各位技术大佬共赴盛会学习交流!




推荐活动




由AiDD组委会联合多个社区发起的「2024软件研发应用大模型」调查结束了,报告将在11月8日AiDD峰会深圳站主会场Keynote重磅发布。想要线上下载的同学,可以关注「中智凯灵」公众号,输入“AiDD2024调查”即可免费预约下载。



中智凯灵
中智凯灵(KeyLink)是国内领先的专业数字人才发展平台,面向科技研发型企业和组织提供数字化人才培养的专属成长地图,数字化转型的方法 + 智库。
 最新文章