作者陈迪豪——顺丰科技AI技术平台高级工程师
负责顺丰集团AI和大模型基础架构功能,曾任第四范式平台架构师和OpenMLDB项目PMC,过去在小米担心云深度学习平台架构师以及优思德云计算公司存储和容器团队负责人。活跃于分布式系统、机器学习相关的开源社区,也是HBase、OpenStack、TensorFlow、TVM等开源项目贡献者。
简介
OpenAI Swarm 是 OpenAI 开源的多 Agent 调度框架,全部实现逻辑仅用不到500行 Python 代码,就能实现多 Agent 的调度和交互式多轮对话等功能。
本文将从源码角度,深度剖析 OpenAI Swarm 的概念抽象和实现原理,从最底层的代码视角了解多 Agent 间的交互流程,以及从调度框架本身去拓展更多更高级的功能。
用户接口
OpenAI Swarm 项目提供了多个 examples 使用案例,主要包括单轮对话和交互式多轮对话的功能。单轮对话中,提出了 Swarm 和 Agent 的代码抽象,请看下面的代码例子。
from swarm import Swarm, Agent
client = Swarm()
def transfer_to_agent_b():
return agent_b
agent_a = Agent(
name="Agent A",
instructions="You are a helpful agent.",
functions=[transfer_to_agent_b],
)
agent_b = Agent(
name="Agent B",
instructions="Only speak in Haikus.",
)
response = client.run(
agent=agent_a,
messages=[{"role": "user", "content": "I want to talk to agent B."}],
)
print(response.messages[-1]["content"])
执行上面的代码可以得到一个大模型的文本输出,打开 debug 日志可以看出实际代码会调用多次 OpenAI API 并且经过多个 Agent 的调度返回最终结果,后面会详细介绍 Swarm 和 Agent 的代码逻辑。
交互式多轮对接接口是一个 Python 函数封装,大部分 examples 中都提供了类似下面的入口函数。
from swarm.repl import run_demo_loop
from agents import triage_agent
if __name__ == "__main__":
run_demo_loop(triage_agent)
执行上面的代码,首先会等待用户输入,然后经过多轮 Agent 计算,得到结果后,继续等待用户输入。其中 run_demo_loop 函数实现逻辑比较简单可以提前介绍。
交互式多轮对话实现原理
OpenAI Swarm 提供了一个简单的 run_demo_loop 函数,提供一个命令行交互的接口,可以方便用户多次输入与多 Agent 进行交互。函数实现也非常简单,代码如下。
def run_demo_loop(
starting_agent, context_variables=None, stream=False, debug=False
) -> None:
client = Swarm()
print("Starting Swarm CLI ")
messages = []
agent = starting_agent
while True:
user_input = input("\033[90mUser\033[0m: ")
messages.append({"role": "user", "content": user_input})
response = client.run(
agent=agent,
messages=messages,
context_variables=context_variables or {},
stream=stream,
debug=debug,
)
if stream:
response = process_and_print_streaming_response(response)
else:
pretty_print_messages(response.messages)
messages.extend(response.messages)
agent = response.agent
首先进入一个 while True 的死循环,通过 Python input 函数获取用户的命令行输入,然后使用 Swarm 对象的接口来启动第一个 Agent,并且输入的内容为前面用户在命令行输入的字符串,对于流式和非流式的参数都提供一个 print 函数,可以打印最终返回的 Agent 名以及 Agent 输出。注意,这里一个用户提问可能涉及到多个 Agent 的多次函数调用或者多个 Agent 输出,这里只会输出最终的 Agent 信息,然后进入下一次用户输入和返回的循环。
为了了解 其中更底层的实现细节,下面进入 Swarm 和 Agent 接口的底层实现。
Swarm 的领域抽象
前面无论是单轮对话还是交互式多轮对话,所有 Agent 调度的入口都是 Swarm 类以及 run 函数,因此我们首先看一下 Swarm 类的定义,简化一下具体实现得到下面的类定义。
class Swarm:
def __init__(self, client=None):
self.client = client
def run() -> Response
def get_chat_completion() -> ChatCompletionMessage
def handle_function_result() -> Result
def handle_tool_calls() -> Response
def run_and_stream() -> None
首先 Swarm 是一个工具类,只有一个成员变量就是访问 OpenAI API 大模型的客户端,然后提供了 run 函数来运行各个 Agent ,其他都是 run 函数使用的内部工具函数。由于 run 的具体逻辑涉及 Agent 对象的内部函数,这里先关注和 Agent 无关的抽象。
首先是 run 函数返回的 Response 对象,其实流式的 run 也会 yield Response 对象,这个对象的定义如下。
class Response(BaseModel):
messages: List = []
agent: Optional[Agent] = None
context_variables: dict = {}
这个就是用户使用 Swarm 框架可以得到的返回结果,其中第一个对象 messages 对象是历史所有 Agent 的输出历史,其中一个示例如下,不仅包含大模型的输出,还有是否调用 function 和 tools 等信息。
[{'content': '中国的首都是北京。', 'refusal': None, 'role': 'assistant', 'audio': None, 'function_call': None, 'tool_calls': None, 'sender': 'agent'}]
第二个对象是 agent ,如果为空说明不需要继续调用其他 Agent,如果不为空,那么在 run 函数里有这样一个逻辑,判断存在需要继续调用的 Agent 则会继续循环调用。
while len(history) - init_len < max_turns and active_agent:
......
partial_response = self.handle_tool_calls(
message.tool_calls, active_agent.functions, context_variables, debug
)
......
if partial_response.agent:
active_agent = partial_response.agent
......
第三个对象是全局的 context_variables,这个对象会传递给每一个 Agent 和 Agent 的 functions,用户可以增加任意元素到这个全局 map 中方便 function 得到更多全局信息,这个会在后面 Agent 部分详细展开。
Agent 的领域抽象
在了解 Swarm 的 run 函数逻辑之前,首先介绍 Agent 的概念抽象。Agent 类的定义也比较简单,代码如下。
class Agent(BaseModel):
name: str = "Agent"
model: str = "gpt-4o"
instructions: Union[str, Callable[[], str]] = "You are a helpful agent."
functions: List[AgentFunction] = []
tool_choice: str = None
parallel_tool_calls: bool = True
首先每个 Agent 有一个字符串的名字,这个名字主要是为了在调用 OpenAI API 大模型时,可以配置 sender 参数,并且后期可以通过打印 sender 名来了解输出是来自哪个 Agent,而实际 Agent 间的依赖关系其实是通过 Python 的 Agent 类对象来关联的,与是否配置 name 关系不大。
第二个参数是使用的大模型,在 Swarm 对象中规定了使用的 OpenAI API 服务,因此所有的 Agent 目前只能使用同一个大模型服务,但不同 Agent 可以在 model 属性中定义使用的大模型,处理不同复杂度任务的 Agent 需要用到的模型参数也不同,在 Agent 级别选择模型也是非常合理的。
第三个参数是 instructions ,理论上可以改名为 system prompt,因此这个参数就是为了生成大模型请求时的 system prompt 参数。这里支持的参数除了可以是 string 类型外,还支持传入 Python 函数,这个函数可以从前面提到的 context_variables 来获取信息,从而生成一个较为复杂的 system prompt,用法如下。
def instructions(context_variables):
user_name = context_variables["user_name"]
return f"Help the user, {user_name}, do whatever they want."
agent = Agent(
instructions=instructions
)
显然 Agent 类的 instructions 变量的类型定义有 bug,应该修改为下面的代码,社区还没有接受这个 PR 的修复 https://github.com/openai/swarm/pull/44/files 。
instructions: Union[str, Callable[[dict[str, Any]], str]] = "You are a helpful agent."
第四个参数是 functions,也就是一组 AgentFunction 对象,AgentFunction 类型定义如下。
AgentFunction = Callable[[], Union[str, "Agent", dict]]
这个其实就是一个标准的 Python 函数,也就是 OpenAI Swarm 框架要求的传给 Agent 的函数类型,实际上这里要求返回的字符串、“Agent” 或者字典对象也是一个 bug,因为处理 Agent 函数的结果是用下面的模式匹配。
def handle_function_result(self, result, debug) -> Result:
match result:
case Result() as result:
return result
case Agent() as agent:
return Result(
value=json.dumps({"assistant": agent.name}),
agent=agent,
)
case _:
try:
return Result(value=str(result))
except Exception as e:
error_message = f"Failed to cast response to string: {result}. Make sure agent functions return a string or Result object. Error: {str(e)}"
debug_print(debug, error_message)
raise TypeError(error_message)
在下面的处理逻辑中,如果用户函数返回 str 或者 “Agent” 或者 “dict”,都会被简单认为字符串处理,在大部分 examples 代码中 Agent 函数都是直接返回 Agent 对象。由于 OpenAI Swarm 官方项目关闭了 Issue 和 PR 评论,因此这部分代码修改计划在 orchard-swarm 中维护 https://github.com/OrchardUniverse/orchard-swarm 。
最后第五第六个参数 tool_choice 和 parallel_tool_calls 是 OpenAI Chat Completions API 的参数,可以控制工具的调用,但目前 OpenAI Swarm 项目不会用到和修改这个参数,因此可以直接忽略。
总结
目前我们已经深度解读了 OpenAI Swarm 项目对于 Swarm 工具类和 Agent 抽象类的定义,在此基础上我们要理解 Agent 的调度以及大模型的调用逻辑就非常简单了。
想要了解更多Agent应用落地情况,就来11月8-9日,在深圳举办的“AI+研发数字峰会(AiDD)吧,我将带来《改造 Dify 实现生产可用的 AI Agent 应用落地》主题演讲,将会介绍顺丰科技内部对 Dify 的改造,以及实际的应用落地案例。
推荐活动