LangGraph Persistence 可轻松在图形运行(线程级持久性)和线程(跨线程持久性)之间持久化状态。本教程展示了如何将持久性添加到图形中。LangGraph 有一个内置的持久层,通过检查点实现。当使用checkpoint编译graph时,checkpoint会在每个step中保存graph状态的检查点。这些checkpoint被保存到一个thread,可以在graph执行后访问。由于线程允许在执行后访问graph的状态,因此可以实现包括人机交互、内存、历史回溯和容错在内的多种强大的功能。线程是分配给检查点程序保存的每个检查点的唯一 ID 或线程标识符。使用检查点程序调用图表时,必须在config的可配置部分中指定 thread_id:{"configurable": {"thread_id": "1"}}
检查点是在每个super-step中保存的graph状态的快照,由具有以下关键属性的 StateSnapshot 对象表示:- tasks:PregelTask 对象的元组,包含有关接下来要执行的任务的信息。如果之前尝试过此步骤,它将包含错误信息。如果图表从节点内部动态中断,则任务将包含与中断相关的附加数据。
让我们看看当调用简单图形时会保存哪些检查点,如下所示:from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from typing import Annotated
from typing_extensions import TypedDict
from operator import add
class State(TypedDict):
foo: int
bar: Annotated[list[str], add]
def node_a(state: State):
return {"foo": "a", "bar": ["a"]}
def node_b(state: State):
return {"foo": "b", "bar": ["b"]}
workflow = StateGraph(State)
workflow.add_node(node_a)
workflow.add_node(node_b)
workflow.add_edge(START, "node_a")
workflow.add_edge("node_a", "node_b")
workflow.add_edge("node_b", END)
checkpointer = MemorySaver()
graph = workflow.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "1"}}
print(graph.invoke({"foo": ""}, config))
- 检查点,用户输入 {'foo': '', 'bar': []} 并且 node_a 作为下一个要执行的节点
- checkpoint 的输出为 node_a {'foo': 'a', 'bar': ['a']} ,node_b 为下一个要执行的节点
- 检查点,node_b 的输出为 {'foo': 'b', 'bar': ['a', 'b']},并且没有要执行的下一个节点
获取状态与已保存的图状态交互时,必须指定线程标识符。可以通过调用 graph.get_state(config) 查看图的最新状态。这将返回一个 StateSnapshot 对象,该对象对应于与配置中提供的线程 ID 关联的最新检查点或与线程的检查点 ID 关联的检查点(如果提供)。# 获取 latest state snapshot
config = {"configurable": {"thread_id": "1"}}
graph.get_state(config)
# 获取指定 checkpoint_id的 state snapshot
config = {"configurable": {"thread_id": "1", "checkpoint_id": "1ef663ba-28fe-6528-8002-5a559208592c"}}
graph.get_state(config)
可以通过调用 graph.get_state_history(config) 获取给定线程的图形执行的完整历史记录。这将返回与配置中提供的线程 ID 关联的 StateSnapshot 对象列表。重要的是,检查点将按时间顺序排序,最近的检查点/StateSnapshot 将位于列表中的第一个。config = {"configurable": {"thread_id": "1"}}
list(graph.get_state_history(config))
还可以回放先前的图形执行。如果我们使用thread_id和checkpoint_id invoke graph,那么我们将从与checkpoint_id相对应的检查点开始重新执行graph:- thread_id 只是线程的 ID。这始终是必需的。
- checkpoint_id 此标识符指的是线程内的特定检查点。
# {"configurable": {"thread_id": "1"}} # 有效的 config
# {"configurable": {"thread_id": "1", "checkpoint_id": "0c62ca34-ac19-445d-bbb0-5b4984975b2a"}} # 也是有效的 config
config = {"configurable": {"thread_id": "1"}}
graph.invoke(None, config=config)
重要的是,LangGraph 知道特定的检查点是否之前已经执行过。重要的是,LangGraph 知道特定的检查点是否之前已经执行过。除了从特定检查点重新播放图表外,我们还可以编辑图表状态。我们使用 graph.update_state() 来执行此操作。此方法接受三个不同的参数:配置应包含指定要更新哪个线程的thread_id。当仅传递thread_id时,我们会更新(或分叉)当前状态。或者,如果我们包含checkpoint_id字段,则我们会分叉所选检查点。这些是将用于更新状态的值。请注意,此更新的处理方式与来自节点的任何更新的处理方式完全相同。这意味着如果为graph状态中的某些通道定义了这些值,这些值将被传递给reducer函数。因此, update_state 不会自动覆盖每个通道的通道值,而只会覆盖没有 Reducer 的通道值。看个例子(假设您已经使用以下模式定义了图形的状态(参见上面的完整示例):):from typing import Annotated
from typing_extensions import TypedDict
from operator import add
class State(TypedDict):
foo: int
# 指定了reducer 为add
bar: Annotated[list[str], add]
graph.update_state(config, {"foo": 2, "bar": ["b"]})
{"foo": 2, "bar": ["a", "b"]}
foo 键(通道)已完全更改(因为该通道未指定任何 Reducer,因此 update_state 会将其覆盖)。但是,bar 键已指定任何 Reducer,因此它会将“b”附加到 bar 的状态。调用 update_state 时,可以选择指定的最后一项是 as_node。如果提供了它,则更新将像来自节点 as_node 一样应用。如果没有提供 as_node,它将被设置为最后一个更新状态的节点(如果不明确的话)。这很重要是因为,接下来要执行的步骤取决于最后一个给出更新的节点,因此可以使用它来控制接下来执行哪个节点。
state schema 指定了一组在执行graph时填充的key。如上所述,状态可以通过检查点在每个图形步骤中写入thread,从而实现状态持久性。但是,如果我们想在各个thread之间保留一些信息怎么办?考虑一个聊天机器人的情况,我们想在与该用户的所有聊天对话(例如线程)中保留有关该用户的特定信息!也就是说,仅使用检查点,我们无法跨线程共享信息。因为checkpoint的粒度是精确到thread的,但是thread和user的关系是多对一的,也就是一个用户可以有多个thread,仅仅使用checkpoint做不到一个user间的信息共享。举例来说,我们可以定义一个 InMemoryStore 来跨线程存储有关用户的信息。我们只需像以前一样使用检查点和新的 in_memory_store 变量来编译我们的图表即可。我们先单独看一下InMemoryStore 的用法from langgraph.store.memory import InMemoryStore
in_memory_store = InMemoryStore()
memory由一个元组命名,在这个特定示例中为 (<user_id>, "memories")。命名空间可以是任意长度,表示任何内容,不必特定于用户。user_id = "1"
namespace_for_memory = (user_id, "memories")
memory_id = str(uuid.uuid4())
memory = {"food_preference" : "I like pizza"}
in_memory_store.put(namespace_for_memory, memory_id, memory)
我们可以使用 store.search 方法读取命名空间中的记忆,该方法将以列表形式返回给定用户的所有记忆。最近的记忆位于列表中的最后一个。memories = in_memory_store.search(namespace_for_memory)
print(memories[-1].dict())
{'namespace': ['1', 'memories'],
'key': '7afaa64d-d0cf-44db-bc65-7bfe1bb59025',
'value': {'food_preference': 'I like pizza'},
'created_at': '2025-01-07T02:28:24.755296+00:00',
'updated_at': '2025-01-07T02:28:24.755301+00:00',
'score': None}
每种内存类型都是具有某些属性的 Python 类(Item)。我们可以像上面一样通过 .dict 进行转换,将其作为字典访问。它具有的属性包括:- value:此memory的值(本身就是一个字典)
- namespace:字符串列表,此内存类型的命名空间
除了简单的检索之外,存储还支持语义搜索,允许根据含义而不是精确匹配来查找记忆。要实现此功能,请使用embeddibg model配置store:from langchain.embeddings import init_embeddings
from langgraph.store.memory import InMemoryStore,IndexConfig
index = IndexConfig(embed=init_embeddings("openai:text-embedding-3-small"), # Embedding provider
dims=1536, # Embedding dimensions
fields=["food_preference", "$"])
store = InMemoryStore(index=index)
现在搜索时,可以使用自然语言查询来查找相关的记忆:memories = store.search(
namespace_for_memory,
query="What does the user like to eat?",
limit=3
)
您可以通过配置fields参数或在存储记忆时index索引参数来控制嵌入记忆的哪些部分:# 嵌入 "food_preference" 字段
store.put(
namespace_for_memory,
str(uuid.uuid4()),
{
"food_preference": "I love Italian cuisine",
"context": "Discussing dinner plans"
},
index=["food_preference"] # 只嵌入 "food_preferences" 字段
)
# 仅存储,不嵌入
store.put(
namespace_for_memory,
str(uuid.uuid4()),
{"system_info": "Last updated: 2024-01-01"},
index=False
)
从向量存储来看,注意,我们一般把嵌入和所谓的index等同,嵌入是做index的必要步骤,把一个字段embedding就是要对其进行索引。
一切就绪后,我们使用 LangGraph 中的 in_memory_store。in_memory_store 与 checkpointer 协同工作:如上所述,检查点将状态保存到线程,并且 in_memory_store 允许我们存储任意信息以供跨线程访问,我们使用检查点和 in_memory_store 编译图表,如下所示。from langgraph.checkpoint.memory import MemorySaver
checkpointer = MemorySaver()
# ... 定义graph ...
# 通过 checkpointer 和 store 编译 graph
graph = graph.compile(checkpointer=checkpointer, store=in_memory_store)
像前面一样,我们使用 thread_id 调用graph,同时使用 user_id,我们将使用它将我们的记忆命名空间到这个特定的用户,如上所示。# Invoke graph
user_id = "1"
config = {"configurable": {"thread_id": "1", "user_id": user_id}}
# 首先,让我们向AI打个招呼
for update in graph.stream(
{"messages": [{"role": "user", "content": "hi"}]}, config, stream_mode="updates"
):
print(update)
我们可以通过传递 store: BaseStore 和 config: RunnableConfig 作为节点参数来访问任何节点中的 in_memory_store 和 user_id。def update_memory(state: MessagesState, config: RunnableConfig, *, store: BaseStore):
user_id = config["configurable"]["user_id"]
namespace = (user_id, "memories")
# ... 分析对话并创建新的记忆
# 为新的记忆创建一个新的 ID
memory_id = str(uuid.uuid4())
# 创建一个新的记忆
store.put(namespace, memory_id, {"memory": memory})
我们可以访问这些memory并在我们的模型调用中使用它们。def call_model(state: MessagesState, config: RunnableConfig, *, store: BaseStore):
user_id = config["configurable"]["user_id"]
memories = store.search(
namespace,
query=state["messages"][-1].content,
limit=3
)
info = "\n".join([d.value["memory"] for d in memories])
如果我们创建一个新thread,只要user_id相同,我们仍然可以访问相同的memory。# Invoke graph
config = {"configurable": {"thread_id": "2", "user_id": "1"}}
for update in graph.stream(
{"messages": [{"role": "user", "content": "hi, tell me about my memories"}]}, config, stream_mode="updates"
):
print(update)
上面只说了in_memory_serve,这种的局限是一旦机器重启,checkpoint就没有了,实际上我们很少使用,或者说至少需要配合持久化的lib使用,下面是langgraph的提供的抽象和实现:- langgraph-checkpoint:检查点保存器(BaseCheckpointSaver)的基本接口和序列化/反序列化接口(SerializerProtocol)。包括用于实验的内存检查点实现 (MemorySaver)。LangGraph 附带有 langgraph-checkpoint。
- langgraph-checkpoint-sqlite:使用 SQLite 数据库(SqliteSaver / AsyncSqliteSaver)的 LangGraph 检查点实现。非常适合实验和本地工作流程。需要单独安装。
- langgraph-checkpoint-postgres:使用 Postgres 数据库(PostgresSaver / AsyncPostgresSaver)的高级检查点程序,用于 LangGraph Cloud。非常适合在生产中使用。需要单独安装。
每个检查点都符合 BaseCheckpointSaver 接口并实现以下方法:- .put_writes - 存储与检查点相关的中间写入(比如pending writes)。
- .get_tuple - 使用给定配置(thread_id 和 checkpoint_id)获取检查点元组。这用于在 graph.get_state() 中填充 StateSnapshot。
- .list - 列出符合给定配置和过滤条件的检查点。这用于在 graph.get_state_history() 中填充状态历史记录
当检查点保存图形状态时,它们需要序列化状态中的通道值。这是使用序列化器对象完成的。langgraph_checkpoint 定义了用于实现序列化器的协议,并提供了一个默认实现(JsonPlusSerializer),可以处理各种类型,包括 LangChain 和 LangGraph 原语、日期时间、枚举等。
https://langchain-ai.github.io/langgraph/concepts/persistence/