精通LangGraph-可控性01

文摘   2025-01-11 12:10   四川  

LangGraph 对图表的执行提供了高级别的控制。

如何创建并行执行的分支

node的并行执行对于加快整体graph操作至关重要。LangGraph 提供对节点并行执行的原生支持,可显著提高基于图形的工作流的性能。这种并行化是通过扇出和扇入机制实现的,利用标准edges和条件 edges。
pip install -U langgraph
Parallel node fan-out 和 fan-in

在这个例子中,我们从节点 A 散开到节点 B 和 C,然后扇入到节点 D。通过我们的状态,我们指定了 Reducer 添加操作。

这将合并或累积state中特定key的value,而不是简单地覆盖现有value。对于list,这意味着将新列表与现有列表join起来。

请注意,LangGraph 使用Annotated类型来为 State 中的特定键指定 Reducer 函数:它维护原始类型(列表)以进行类型检查,但允许将 Reducer 函数(添加)append到该类型而不改变类型本身。

import operatorfrom typing import Annotated, Anyfrom typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, END
class State(TypedDict):    # operator.add reducer fn 使其仅支持追加    aggregate: Annotated[list, operator.add]
class ReturnNodeValue:    def __init__(self, node_secret: str):        self._value = node_secret    def __call__(self, state: State) -> Any:        print(f"Adding {self._value} to {state['aggregate']}")        return {"aggregate": [self._value]}
builder = StateGraph(State)builder.add_node("a", ReturnNodeValue("I'm A"))builder.add_edge(START, "a")builder.add_node("b", ReturnNodeValue("I'm B"))builder.add_node("c", ReturnNodeValue("I'm C"))builder.add_node("d", ReturnNodeValue("I'm D"))builder.add_edge("a""b")builder.add_edge("a""c")builder.add_edge("b""d")builder.add_edge("c""d")builder.add_edge("d", END)graph = builder.compile()
结构如下图所示

通过reducer你可以看到每个节点中添加的值都被累积起来了。
print(graph.invoke({"aggregate": []}, {"configurable": {"thread_id""foo"}}))
Adding I'm A to []Adding I'm B to ["I'm A"]Adding I'm C to ["I'm A"]Adding I'm D to ["I'm A", "I'm B", "I'm C"]{'aggregate': ["I'm A", "I'm B", "I'm C", "I'm D"]}

带额外步骤的Parallel node fan-out and fan-in

上面的例子展示了当每条路径只有一步时如何扇出和扇入。但如果一条路径有多个步骤怎么办?

其它不变,只改变graph的构建:

builder = StateGraph(State)builder.add_node("a", ReturnNodeValue("I'm A"))builder.add_edge(START, "a")builder.add_node("b", ReturnNodeValue("I'm B"))builder.add_node("b2", ReturnNodeValue("I'm B2"))builder.add_node("c", ReturnNodeValue("I'm C"))builder.add_node("d", ReturnNodeValue("I'm D"))builder.add_edge("a", "b")builder.add_edge("a", "c")builder.add_edge("b", "b2")builder.add_edge(["b2", "c"], "d")builder.add_edge("d", END)graph = builder.compile()
print(graph.invoke({"aggregate": []}))
如下图所示
Adding I'm A to []Adding I'm B to ["I'm A"]Adding I'm C to ["I'm A"]Adding I'm B2 to ["I'm A", "I'm B", "I'm C"]Adding I'm D to ["I'm A""I'm B""I'm C""I'm B2"]{'aggregate': ["I'm A""I'm B""I'm C""I'm B2""I'm D"]}
到这里我们基本上可以得到一个结论:
langgraph 会自动并行处理可以并行的node流程,而不需要我们手动指定,因为graph的本质是一个有向无环图graph,因此这种是否可以并行的node流程是完全可以推断出来的。

条件分支
如果扇出不确定,可以直接使用 add_conditional_edges函数。
如果有一个已知的“sink”节点,条件分支随后将路由到该节点,则可以在创建条件边时提供 then=<final-node-name>。
class State(TypedDict):    aggregate: Annotated[list, operator.add]    # 新加入的属性    which: str
class ReturnNodeValue:    def __init__(self, node_secret: str):        self._value = node_secret    def __call__(self, state: State) -> Any:        print(f"Adding {self._value} to {state['aggregate']}")        return {"aggregate": [self._value]}
builder = StateGraph(State)builder.add_node("a", ReturnNodeValue("I'm A"))builder.add_edge(START, "a")builder.add_node("b", ReturnNodeValue("I'm B"))builder.add_node("c", ReturnNodeValue("I'm C"))builder.add_node("d", ReturnNodeValue("I'm D"))builder.add_node("e", ReturnNodeValue("I'm E"))
# route的函数,用于判断下一步走哪个分支def route_bc_or_cd(state: State) -> Sequence[str]:    if state["which"] == "cd":        return ["c""d"]    return ["b""c"]
intermediates = ["b""c""d"]builder.add_conditional_edges(    "a",    route_bc_or_cd,    intermediates,)for node in intermediates:    builder.add_edge(node, "e")builder.add_edge("e", END)
graph = builder.compile()print(graph.invoke({"aggregate": [], "which""bc"}))
Adding I'm A to []Adding I'm B to ["I'm A"]Adding I'm C to ["I'm A"]Adding I'm E to ["I'm A", "I'm B", "I'm C"]{'aggregate': ["I'm A", "I'm B", "I'm C", "I'm E"], 'which': 'bc'}
流程如图所示

途中的虚线表示可能会走的路线,实线表示组织必定的路线。


稳定排序

正常来说,散开后,节点将作为单个“超级步骤”并行运行。一旦超级步骤完成,每个超级步骤的更新都将按顺序apply于state。

如果我们需要从并行超级步骤中对更新进行一致的、预定的排序,则应将输出(连同identifying key)写入状态中的单独字段,

然后通过将每个扇出节点的常规边添加到会合点,将它们组合在“sink”节点中。假设我想按“可靠性”对并行步骤的输出进行排序。
import operatorfrom typing import Annotated, Sequence, Anyfrom typing_extensions import TypedDictfrom langgraph.graph import StateGraph, END, STARTdef reduce_fanouts(left, right):    if left is None:        left = []    if not right:        # Overwrite        return []    return left + rightclass State(TypedDict):    aggregate: Annotated[list, operator.add]    fanout_values: Annotated[list, reduce_fanouts]    which: strclass ReturnNodeValue:    def __init__(self, node_secret: str):        self._value = node_secret    def __call__(self, state: State) -> Any:        print(f"Adding {self._value} to {state['aggregate']}")        return {"aggregate": [self._value]}builder = StateGraph(State)builder.add_node("a", ReturnNodeValue("I'm A"))builder.add_edge(START, "a")class ParallelReturnNodeValue:    def __init__(            self,            node_secret: str,            reliability: float,    ):        self._value = node_secret        # 假设我们想按照可靠性排序        self._reliability = reliability    def __call__(self, state: State) -> Any:        print(f"Adding {self._value} to {state['aggregate']} in parallel.")        return {            "fanout_values": [                {                    "value": [self._value],                    "reliability": self._reliability,                }            ]        }
builder.add_node("b", ParallelReturnNodeValue("I'm B", reliability=0.9))builder.add_node("c", ParallelReturnNodeValue("I'm C", reliability=0.1))builder.add_node("d", ParallelReturnNodeValue("I'm D", reliability=0.3))
def aggregate_fanout_values(state: State) -> Any:    # Sort by reliability    ranked_values = sorted(        state["fanout_values"], key=lambda x: x["reliability"], reverse=True    )    return {        "aggregate": [x["value"for x in ranked_values] + ["I'm E"],        "fanout_values": [],    }
# 最终在这里聚合,这里的聚合是按照可靠性排序builder.add_node("e", aggregate_fanout_values)def route_bc_or_cd(state: State) -> Sequence[str]:    if state["which"] == "cd":        return ["c""d"]    return ["b""c"]intermediates = ["b""c""d"]builder.add_conditional_edges("a", route_bc_or_cd, intermediates)for node in intermediates:    builder.add_edge(node, "e")builder.add_edge("e", END)
graph = builder.compile()print(graph.invoke({"aggregate": [], "fanout_values": [], "which""bc"}))
可以看到实际上e节点除了自己节点本身,还充当了用来对最后的执行进行排序的,输出
Adding I'm A to []Adding I'm B to ["I'm A"in parallel.Adding I'm C to ["I'm A"] in parallel.{'aggregate': ["I'm A", ["I'm B"], ["I'm C"], "I'm E"], 'fanout_values': [], 'which': 'bc'}
这里新手可能有点难懂这个逻辑,他的图的流程总体和上面还是一样的,如下:

从代码和图中可以看到,a节点是开始节点,然后分出三个子流程,一个是b_e,一个是c_e,一个是d_e,因为 b c d本身自己是并发执行的,这里所说的控制,其实是说 b c d 流向e的时候,谁先到e。

再直白一点,就是e需要先接收谁的入参,此时来说,b c d本身节点的功能已经完成了,但是对于e来说,有的场景我们是需要组织b c d完成工作的。这就是所谓的稳定排序。

有兴趣的可以动手试一下,加深印象!!


参考链接:

https://langchain-ai.github.io/langgraph/how-tos/branching/

半夏决明
读书,摄影,随笔
 最新文章