精通LangGraph-可控性02

文摘   2025-01-13 12:55   四川  

Map-reduce操作对于高效任务分解和并行处理至关重要。此方法涉及将任务分解为较小的子任务,并行处理每个子任务,并汇总所有已完成子任务的结果。
考虑这个例子:
给定一个来自用户的一般主题,生成相关主题列表,为每个主题生成一个笑话,并从结果列表中选择最佳笑话。
在这个设计模式中,第一个节点可能会生成一个对象列表(例如,相关主题),我们想要将其他节点(例如,生成一个笑话)应用于所有这些对象(例如,主题)。
然而,出现了两个主要挑战。
(1)当我们布置图形时,对象(例如,主体)的数量可能提前未知(意味着可能不知道边的数量)
(2)下游节点的输入状态应该不同(每个生成的对象一个)。
LangGraph 通过其 Send API 解决了这些挑战。通过利用条件边,Send 可以将不同的状态(例如,主题)分发到节点的多个实例(例如,笑话生成)。
重要的是,发送的状态可以与核心图的状态不同,从而允许灵活和动态的工作流管理。
图片的流程很好理解,我们看下send api到底是如何使用的

定义graph
import operatorfrom typing import Annotatedfrom typing_extensions import TypedDictfrom langchain_ollama import ChatOllamaimport base_conffrom langgraph.types import Sendfrom langgraph.graph import END, StateGraph, STARTfrom pydantic import BaseModel, Field# 模型和提示语# 定义我们将使用的模型和提示语subjects_prompt = """生成与以下主题相关的3到5个示例,以逗号分隔:{topic}。"""joke_prompt = """生成关于 {subject} 的笑话"""best_joke_prompt = """以下是关于 {topic} 的一些笑话。请选择最好的一条!返回最佳笑话的ID。{jokes}"""# 定义数据模型class Subjects(BaseModel):    subjects: list[str]class Joke(BaseModel):    joke: strclass BestJoke(BaseModel):    id: int = Field(description="最佳笑话的索引,从0开始", ge=0)# 初始化使用的模型model = ChatOllama(base_url=base_conf.base_url,model=base_conf.model_name,temperature=0.7)# 图的组件:定义构成状态图的组件# 这是主图的整体状态# 包含一个主题(由用户提供)# 然后生成一个主题列表,并为每个主题生成一个笑话class OverallState(TypedDict):    topic: str    subjects: list    # 注意这里我们使用了 operator.add    # 因为我们需要将每个节点生成的笑话合并到一个列表中    # 这相当于 "reduce" 操作    jokes: Annotated[list, operator.add]    best_selected_joke: str# 这是节点的状态定义,我们将对所有生成的主题进行映射以生成笑话class JokeState(TypedDict):    subject: str# 生成笑话主题的函数def generate_topics(state: OverallState):    prompt = subjects_prompt.format(topic=state["topic"])    response = model.with_structured_output(Subjects).invoke(prompt)    return {"subjects": response.subjects}# 生成笑话的函数,根据主题生成笑话def generate_joke(state: JokeState):    prompt = joke_prompt.format(subject=state["subject"])    response = model.with_structured_output(Joke).invoke(prompt)    return {"jokes": [response.joke]}# 定义逻辑,将生成的主题映射到生成笑话的节点# 我们将使用此函数作为图中的边def continue_to_jokes(state: OverallState):    # 返回一组 `Send` 对象    # 每个 `Send` 对象包含图中节点的名称和发送到该节点的状态    return [Send("generate_joke", {"subject": s}) for s in state["subjects"]]# 选择最佳笑话的函数def best_joke(state: OverallState):    jokes = "\n\n".join(state["jokes"])    prompt = best_joke_prompt.format(topic=state["topic"], jokes=jokes)    response = model.with_structured_output(BestJoke).invoke(prompt)    return {"best_selected_joke": state["jokes"][response.id]}# 构建状态图:将所有部分组合起来构建状态图graph = StateGraph(OverallState)graph.add_node("generate_topics", generate_topics)graph.add_node("generate_joke", generate_joke)graph.add_node("best_joke", best_joke)graph.add_edge(START, "generate_topics")graph.add_conditional_edges("generate_topics", continue_to_jokes, ["generate_joke"])graph.add_edge("generate_joke""best_joke")graph.add_edge("best_joke", END)# 编译图,生成应用程序app = graph.compile()for s in app.stream({"topic""动物"}):    print(s)
回答:
{'generate_topics': {'subjects': ['动物保护的重要性''与动物互动对人类的好处''动物园中的科技应用']}}{'generate_joke': {'jokes': ['为什么动物园的动物都不参加保护组织?因为他们都已经在‘圈’子里了!']}}{'generate_joke': {'jokes': ['为什么程序员喜欢和猫一起工作?因为猫来猫往,调试起来方便!']}}{'generate_joke': {'jokes': ['动物园里的科技真发达,大熊猫用手机一刷就进竹林,连管理员都感叹:这是要搞‘熊猫经济’吗?']}}{'best_joke': {'best_selected_joke''为什么动物园的动物都不参加保护组织?因为他们都已经在‘圈’子里了!'}}
核心代码是这一句
def continue_to_jokes(state: OverallState):    # 返回一组 `Send` 对象    # 每个 `Send` 对象包含图中节点的名称和发送到该节点的参数    return [Send("generate_joke", {"subject": s}) for s in state["subjects"]]
graph.add_conditional_edges("generate_topics", continue_to_jokes, ["generate_joke"])
他的核心思路是动态生成对应的node,然后将这些node route到generate_joke节点,最后获得所有的joke选择最好笑的。
是不是很有趣,你也尝试一下吧!!!

参考链接:
https://langchain-ai.github.io/langgraph/how-tos/map-reduce/

半夏决明
读书,摄影,随笔
 最新文章