Map-reduce操作对于高效任务分解和并行处理至关重要。此方法涉及将任务分解为较小的子任务,并行处理每个子任务,并汇总所有已完成子任务的结果。给定一个来自用户的一般主题,生成相关主题列表,为每个主题生成一个笑话,并从结果列表中选择最佳笑话。在这个设计模式中,第一个节点可能会生成一个对象列表(例如,相关主题),我们想要将其他节点(例如,生成一个笑话)应用于所有这些对象(例如,主题)。(1)当我们布置图形时,对象(例如,主体)的数量可能提前未知(意味着可能不知道边的数量)(2)下游节点的输入状态应该不同(每个生成的对象一个)。LangGraph 通过其 Send API 解决了这些挑战。通过利用条件边,Send 可以将不同的状态(例如,主题)分发到节点的多个实例(例如,笑话生成)。重要的是,发送的状态可以与核心图的状态不同,从而允许灵活和动态的工作流管理。图片的流程很好理解,我们看下send api到底是如何使用的
import operator
from typing import Annotated
from typing_extensions import TypedDict
from langchain_ollama import ChatOllama
import base_conf
from langgraph.types import Send
from langgraph.graph import END, StateGraph, START
from pydantic import BaseModel, Field
# 模型和提示语
# 定义我们将使用的模型和提示语
subjects_prompt = """生成与以下主题相关的3到5个示例,以逗号分隔:{topic}。"""
joke_prompt = """生成关于 {subject} 的笑话"""
best_joke_prompt = """以下是关于 {topic} 的一些笑话。请选择最好的一条!返回最佳笑话的ID。
{jokes}"""
# 定义数据模型
class Subjects(BaseModel):
subjects: list[str]
class Joke(BaseModel):
joke: str
class BestJoke(BaseModel):
id: int = Field(description="最佳笑话的索引,从0开始", ge=0)
# 初始化使用的模型
model = ChatOllama(base_url=base_conf.base_url,model=base_conf.model_name,temperature=0.7)
# 图的组件:定义构成状态图的组件
# 这是主图的整体状态
# 包含一个主题(由用户提供)
# 然后生成一个主题列表,并为每个主题生成一个笑话
class OverallState(TypedDict):
topic: str
subjects: list
# 注意这里我们使用了 operator.add
# 因为我们需要将每个节点生成的笑话合并到一个列表中
# 这相当于 "reduce" 操作
jokes: Annotated[list, operator.add]
best_selected_joke: str
# 这是节点的状态定义,我们将对所有生成的主题进行映射以生成笑话
class JokeState(TypedDict):
subject: str
# 生成笑话主题的函数
def generate_topics(state: OverallState):
prompt = subjects_prompt.format(topic=state["topic"])
response = model.with_structured_output(Subjects).invoke(prompt)
return {"subjects": response.subjects}
# 生成笑话的函数,根据主题生成笑话
def generate_joke(state: JokeState):
prompt = joke_prompt.format(subject=state["subject"])
response = model.with_structured_output(Joke).invoke(prompt)
return {"jokes": [response.joke]}
# 定义逻辑,将生成的主题映射到生成笑话的节点
# 我们将使用此函数作为图中的边
def continue_to_jokes(state: OverallState):
# 返回一组 `Send` 对象
# 每个 `Send` 对象包含图中节点的名称和发送到该节点的状态
return [Send("generate_joke", {"subject": s}) for s in state["subjects"]]
# 选择最佳笑话的函数
def best_joke(state: OverallState):
jokes = "\n\n".join(state["jokes"])
prompt = best_joke_prompt.format(topic=state["topic"], jokes=jokes)
response = model.with_structured_output(BestJoke).invoke(prompt)
return {"best_selected_joke": state["jokes"][response.id]}
# 构建状态图:将所有部分组合起来构建状态图
graph = StateGraph(OverallState)
graph.add_node("generate_topics", generate_topics)
graph.add_node("generate_joke", generate_joke)
graph.add_node("best_joke", best_joke)
graph.add_edge(START, "generate_topics")
graph.add_conditional_edges("generate_topics", continue_to_jokes, ["generate_joke"])
graph.add_edge("generate_joke", "best_joke")
graph.add_edge("best_joke", END)
# 编译图,生成应用程序
app = graph.compile()
for s in app.stream({"topic": "动物"}):
print(s)
{'generate_topics': {'subjects': ['动物保护的重要性', '与动物互动对人类的好处', '动物园中的科技应用']}}
{'generate_joke': {'jokes': ['为什么动物园的动物都不参加保护组织?因为他们都已经在‘圈’子里了!']}}
{'generate_joke': {'jokes': ['为什么程序员喜欢和猫一起工作?因为猫来猫往,调试起来方便!']}}
{'generate_joke': {'jokes': ['动物园里的科技真发达,大熊猫用手机一刷就进竹林,连管理员都感叹:这是要搞‘熊猫经济’吗?']}}
{'best_joke': {'best_selected_joke': '为什么动物园的动物都不参加保护组织?因为他们都已经在‘圈’子里了!'}}
def continue_to_jokes(state: OverallState):
# 返回一组 `Send` 对象
# 每个 `Send` 对象包含图中节点的名称和发送到该节点的参数
return [Send("generate_joke", {"subject": s}) for s in state["subjects"]]
graph.add_conditional_edges("generate_topics", continue_to_jokes, ["generate_joke"])
他的核心思路是动态生成对应的node,然后将这些node route到generate_joke节点,最后获得所有的joke选择最好笑的。
https://langchain-ai.github.io/langgraph/how-tos/map-reduce/