作者陈迪豪——顺丰科技AI技术平台高级工程师
负责顺丰集团AI和大模型基础架构功能,曾任第四范式平台架构师和OpenMLDB项目PMC,过去在小米担心云深度学习平台架构师以及优思德云计算公司存储和容器团队负责人。活跃于>分布式系统、机器学习相关的开源社区,也是HBase、OpenStack、TensorFlow、TVM等开源项目贡献者。
简介
请首先阅读前文《tobe:深入浅出 OpenAI Swarm 源码一:多 Agent 调度框架概念抽象》,本文将介绍 OpenAI Swarm 的多 Agent 完整调度流程,从源码角度介绍其实现原理。
其他领域抽象
这里介绍剩下的两个领域抽象,包括 Response 和 Result,其中 Response 源码如下。
class Response(BaseModel):
messages: List = []
agent: Optional[Agent] = None
context_variables: dict = {}
这个表示一次 Agent run 返回的结果,同时也是 handle_tool_calls 返回的对象,因为 Agent run 的逻辑第一步是调用 Agent 本身的大模型来分析需要调用的 function,第二步就是调用 handle_tool_calls 函数,因此两者可以统一都认为是用户定义的 Python function 的返回结果。
调用 handle_tool_calls 函数返回的其实是 partial_response,第一变量是大模型返回的消息组成的结果,例如大模型返回的 tool call 的 id,用户定义的 Python 函数的 __name__ 对象,以及执行 Python 函数的返回结果。
[{
'role': 'assistant',
'content': None,
'refusal': None,
'audio': None,
'function_call': None,
'tool_calls': [
{
'id': 'call_hUPRYkX5pcL4FxGQRYRTCpnf',
'function': {
'arguments': '{}',
'name': 'transfer_to_refunds'
},
'type': 'function'
}
],
'sender': 'Triage Agent'
},{
'role': 'tool',
'tool_call_id': 'call_hUPRYkX5pcL4FxGQRYRTCpnf',
'tool_name': 'transfer_to_refunds',
'content': '{"assistant": "Refunds Agent"}',
}]
第二个参数是是否返回新的 Agent,第三个参数是全局的 context_variables。然后这个 partial_response 会一直加到 history 中,因此 Agent run 返回的 Response 对象除了第一个参数是包含历史所有 history message 的,另外两个参数基本是继承 handle_tool_calls 返回的。
还有一个领域概念是 Result,源码如下。
class Result(BaseModel):
value: str = ""
agent: Optional[Agent] = None
context_variables: dict = {}
这个其实只有处理用户 Python 函数结果时用到,代码如下。
raw_result = function_map[name](**args)
result: Result = self.handle_function_result(raw_result, debug)
首先通过 Python 的反射机制运行函数,然后对 Python 函数返回的对象封装成 Result 对象,代码如下。
def handle_function_result(self, result, debug) -> Result:
match result:
case Result() as result:
return result
case Agent() as agent:
return Result(
value=json.dumps({"assistant": agent.name}),
agent=agent,
)
case _:
try:
return Result(value=str(result))
except Exception as e:
error_message = f"Failed to cast response to string: {result}. Make sure agent functions return a string or Result object. Error: {str(e)}"
debug_print(debug, error_message)
raise TypeError(error_message)
这里逻辑就是用户如果返回 Agent 对象,那么就设置 Result 的 agent 参数为 Agent 对象,其他情况就只设置 value 参数,这个 value 其实就是 Agent 最终返回的结果。
run 函数执行流程
了解前面的概念抽象,我们可以直接阅读 run 函数,对关键步骤抽象如下。
def run() -> Response:
......
while len(history) - init_len < max_turns and active_agent:
# get completion with current history, agent
completion = self.get_chat_completion(
agent=active_agent,
history=history,
context_variables=context_variables,
model_override=model_override,
stream=stream,
debug=debug,
)
message = completion.choices[0].message
......
if not message.tool_calls or not execute_tools:
debug_print(debug, "Ending turn.")
break
......
# handle function calls, updating context_variables, and switching agents
partial_response = self.handle_tool_calls(
message.tool_calls, active_agent.functions, context_variables, debug
)
......
if partial_response.agent:
active_agent = partial_response.agent
return Response(
messages=history[init_len:],
agent=active_agent,
context_variables=context_variables,
)
从主流程可以看到,Agent 的运行也是一个循环,首先是用 Agent 本身的 system prompt 和 functions 来调用大模型输出,如果直接返回结果没有使用 function,那么就可以直接返回结果了。如果有要使用的 function,那么就需要调用 handle_tool_calls 函数来执行 function,并且处理函数返回的结果,如果返回结果是一个 Agent,那么把 active agent 改为前面返回的 Agent 继续调用大模型,这样就可以循环多次来调用了。
总结一些多 Agent 调度流程,首先是基于大模型的 Function call 来实现的,如果 function 返回 Agent 对象则 handoff 给 Agent 继续执行。讲人话就是每个 Agent 可以注册很多工具,其中一些工具是移交工作,大模型会判断是使用工具或者直接输出,使用工具的话可以直接返回,也可以使用移交工作的工具让其他 Agent 在这个上下文中执行。
handle_tool_calls 函数执行流程
为了了解前面介绍的 funtions 是如何工作的,可以查看 handle_tool_calls 函数的实现。
def handle_tool_calls() -> Response:
function_map = {f.__name__: f for f in functions}
......
for tool_call in tool_calls:
name = tool_call.function.name
......
func = function_map[name]
......
raw_result = function_map[name](**args)
result: Result = self.handle_function_result(raw_result, debug)
......
if result.agent:
partial_response.agent = result.agent
return partial_response
首先是获取这个 Agent 的 functions 函数名,然后根据前面大模型的输出拿到需要调用的 function 的名字,通过大模型返回的 args 参数,使用 Python 函数调用的方法执行。这里对结果进行了额外的处理,调用的 handle_function_result 函数在前面也介绍过,就是如果有返回 Agent 对象就单独提取出来。每个 function 执行的结果都会添加到返回的 Response 对象中,并且合并 context_variables ,如果其中一个 function 返回了 Agent 则后面也要基于新的 Agent 继续执行。
理解多 Agent 调度
在了解 Swarm 的 run 函数逻辑之前,首先介绍 Agent 的概念抽象。Agent 类的定义也比较简单,代码如下。
class Agent(BaseModel):
n
看完了 OpenAI Swarm 全部的实现代码,可以概括一下多 Agent 的调度实现原理。首先是使用大模型本身的 function call 能力,用户在 Python 中定义的函数会结构化成 JSON 字符串添加到大模型的请求参数重,大模型选择执行 function 后,如果其中至少一个 function 返回了 Agent 对象,那么就基于新 Agent 和历史消息继续执行,从而完成 handoff 或者所谓调度的工作。
总结
从 OpenAI Swarm 的项目文档中,可以了解这是一个轻量级的调度工具,并且提供了 Swarm、Agent、handoff 等概念和功能,从而实现复杂场景的自动化路由或者调度功能。但通过阅读源码,我们发现原理比描述更简单,就是使用 Function call 能力,并且提供 Python 函数的友好接口,让用户定义 Agent 或者 handoff 更加简单。
想要了解更多Agent应用落地情况,就来11月8-9日,在深圳举办的“AI+研发数字峰会(AiDD)吧,我将带来《改造 Dify 实现生产可用的 AI Agent 应用落地》主题演讲,将会介绍顺丰科技内部对 Dify 的改造,以及实际的应用落地案例。
推荐活动