如何创建并行执行的分支
pip install -U langgraph
在这个例子中,我们从节点 A 散开到节点 B 和 C,然后扇入到节点 D。通过我们的状态,我们指定了 Reducer 添加操作。
这将合并或累积state中特定key的value,而不是简单地覆盖现有value。对于list,这意味着将新列表与现有列表join起来。
请注意,LangGraph 使用Annotated类型来为 State 中的特定键指定 Reducer 函数:它维护原始类型(列表)以进行类型检查,但允许将 Reducer 函数(添加)append到该类型而不改变类型本身。
import operator
from typing import Annotated, Any
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
class State(TypedDict):
# operator.add reducer fn 使其仅支持追加
aggregate: Annotated[list, operator.add]
class ReturnNodeValue:
def __init__(self, node_secret: str):
self._value = node_secret
def __call__(self, state: State) -> Any:
print(f"Adding {self._value} to {state['aggregate']}")
return {"aggregate": [self._value]}
builder = StateGraph(State)
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_edge(START, "a")
builder.add_node("b", ReturnNodeValue("I'm B"))
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "d")
builder.add_edge("c", "d")
builder.add_edge("d", END)
graph = builder.compile()
print(graph.invoke({"aggregate": []}, {"configurable": {"thread_id": "foo"}}))
Adding I'm A to []
Adding I'm B to ["I'm A"]
Adding I'm C to ["I'm A"]
Adding I'm D to ["I'm A", "I'm B", "I'm C"]
{'aggregate': ["I'm A", "I'm B", "I'm C", "I'm D"]}
上面的例子展示了当每条路径只有一步时如何扇出和扇入。但如果一条路径有多个步骤怎么办?
其它不变,只改变graph的构建:
builder = StateGraph(State)
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_edge(START, "a")
builder.add_node("b", ReturnNodeValue("I'm B"))
builder.add_node("b2", ReturnNodeValue("I'm B2"))
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "b2")
builder.add_edge(["b2", "c"], "d")
builder.add_edge("d", END)
graph = builder.compile()
print(graph.invoke({"aggregate": []}))
Adding I'm A to []
Adding I'm B to ["I'm A"]
Adding I'm C to ["I'm A"]
Adding I'm B2 to ["I'm A", "I'm B", "I'm C"]
Adding I'm D to ["I'm A", "I'm B", "I'm C", "I'm B2"]
{'aggregate': ["I'm A", "I'm B", "I'm C", "I'm B2", "I'm D"]}
class State(TypedDict):
aggregate: Annotated[list, operator.add]
# 新加入的属性
which: str
class ReturnNodeValue:
def __init__(self, node_secret: str):
self._value = node_secret
def __call__(self, state: State) -> Any:
print(f"Adding {self._value} to {state['aggregate']}")
return {"aggregate": [self._value]}
builder = StateGraph(State)
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_edge(START, "a")
builder.add_node("b", ReturnNodeValue("I'm B"))
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))
builder.add_node("e", ReturnNodeValue("I'm E"))
# route的函数,用于判断下一步走哪个分支
def route_bc_or_cd(state: State) -> Sequence[str]:
if state["which"] == "cd":
return ["c", "d"]
return ["b", "c"]
intermediates = ["b", "c", "d"]
builder.add_conditional_edges(
"a",
route_bc_or_cd,
intermediates,
)
for node in intermediates:
builder.add_edge(node, "e")
builder.add_edge("e", END)
graph = builder.compile()
print(graph.invoke({"aggregate": [], "which": "bc"}))
Adding I'm A to []
Adding I'm B to ["I'm A"]
Adding I'm C to ["I'm A"]
Adding I'm E to ["I'm A", "I'm B", "I'm C"]
{'aggregate': ["I'm A", "I'm B", "I'm C", "I'm E"], 'which': 'bc'}
途中的虚线表示可能会走的路线,实线表示组织必定的路线。
稳定排序
正常来说,散开后,节点将作为单个“超级步骤”并行运行。一旦超级步骤完成,每个超级步骤的更新都将按顺序apply于state。
如果我们需要从并行超级步骤中对更新进行一致的、预定的排序,则应将输出(连同identifying key)写入状态中的单独字段,
import operator
from typing import Annotated, Sequence, Any
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, END, START
def reduce_fanouts(left, right):
if left is None:
left = []
if not right:
# Overwrite
return []
return left + right
class State(TypedDict):
aggregate: Annotated[list, operator.add]
fanout_values: Annotated[list, reduce_fanouts]
which: str
class ReturnNodeValue:
def __init__(self, node_secret: str):
self._value = node_secret
def __call__(self, state: State) -> Any:
print(f"Adding {self._value} to {state['aggregate']}")
return {"aggregate": [self._value]}
builder = StateGraph(State)
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_edge(START, "a")
class ParallelReturnNodeValue:
def __init__(
self,
node_secret: str,
reliability: float,
):
self._value = node_secret
# 假设我们想按照可靠性排序
self._reliability = reliability
def __call__(self, state: State) -> Any:
print(f"Adding {self._value} to {state['aggregate']} in parallel.")
return {
"fanout_values": [
{
"value": [self._value],
"reliability": self._reliability,
}
]
}
builder.add_node("b", ParallelReturnNodeValue("I'm B", reliability=0.9))
builder.add_node("c", ParallelReturnNodeValue("I'm C", reliability=0.1))
builder.add_node("d", ParallelReturnNodeValue("I'm D", reliability=0.3))
def aggregate_fanout_values(state: State) -> Any:
# Sort by reliability
ranked_values = sorted(
state["fanout_values"], key=lambda x: x["reliability"], reverse=True
)
return {
"aggregate": [x["value"] for x in ranked_values] + ["I'm E"],
"fanout_values": [],
}
# 最终在这里聚合,这里的聚合是按照可靠性排序
builder.add_node("e", aggregate_fanout_values)
def route_bc_or_cd(state: State) -> Sequence[str]:
if state["which"] == "cd":
return ["c", "d"]
return ["b", "c"]
intermediates = ["b", "c", "d"]
builder.add_conditional_edges("a", route_bc_or_cd, intermediates)
for node in intermediates:
builder.add_edge(node, "e")
builder.add_edge("e", END)
graph = builder.compile()
print(graph.invoke({"aggregate": [], "fanout_values": [], "which": "bc"}))
Adding I'm A to []
Adding I'm B to ["I'm A"] in parallel.
Adding I'm C to ["I'm A"] in parallel.
{'aggregate': ["I'm A", ["I'm B"], ["I'm C"], "I'm E"], 'fanout_values': [], 'which': 'bc'}
从代码和图中可以看到,a节点是开始节点,然后分出三个子流程,一个是b_e,一个是c_e,一个是d_e,因为 b c d本身自己是并发执行的,这里所说的控制,其实是说 b c d 流向e的时候,谁先到e。
再直白一点,就是e需要先接收谁的入参,此时来说,b c d本身节点的功能已经完成了,但是对于e来说,有的场景我们是需要组织b c d完成工作的。这就是所谓的稳定排序。
有兴趣的可以动手试一下,加深印象!!
参考链接:
https://langchain-ai.github.io/langgraph/how-tos/branching/