精通LangGraph-持久化

文摘   2025-01-12 12:10   四川  

LangGraph Persistence 可轻松在图形运行(线程级持久性)和线程(跨线程持久性)之间持久化状态。本教程展示了如何将持久性添加到图形中。
LangGraph 有一个内置的持久层,通过检查点实现。当使用checkpoint编译graph时,checkpoint会在每个step中保存graph状态的检查点。这些checkpoint被保存到一个thread,可以在graph执行后访问。
由于线程允许在执行后访问graph的状态,因此可以实现包括人机交互、内存、历史回溯和容错在内的多种强大的功能。
threads
线程是分配给检查点程序保存的每个检查点的唯一 ID 或线程标识符。使用检查点程序调用图表时,必须在config的可配置部分中指定 thread_id:
{"configurable": {"thread_id""1"}}

Checkpoints
检查点是在每个super-step中保存的graph状态的快照,由具有以下关键属性的 StateSnapshot 对象表示:
  • config:与此检查点相关的配置。
  • metadata:与此检查点相关的元数据。
  • values:此时间点的状态通道的值。
  • next:图中接下来要执行的节点名称的元组。
  • tasks:PregelTask 对象的元组,包含有关接下来要执行的任务的信息。如果之前尝试过此步骤,它将包含错误信息。如果图表从节点内部动态中断,则任务将包含与中断相关的附加数据。
让我们看看当调用简单图形时会保存哪些检查点,如下所示:
from langgraph.graph import StateGraph, START, ENDfrom langgraph.checkpoint.memory import MemorySaverfrom typing import Annotatedfrom typing_extensions import TypedDictfrom operator import add
class State(TypedDict):    foo: int    bar: Annotated[list[str], add]
def node_a(state: State):    return {"foo""a""bar": ["a"]}
def node_b(state: State):    return {"foo""b""bar": ["b"]}
workflow = StateGraph(State)workflow.add_node(node_a)workflow.add_node(node_b)workflow.add_edge(START, "node_a")workflow.add_edge("node_a""node_b")workflow.add_edge("node_b", END)checkpointer = MemorySaver()graph = workflow.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id""1"}}print(graph.invoke({"foo"""}, config))
运行graph后,我们期望看到 4 个检查点:
  1. 空检查点,以 START 为下一个要执行的节点
  2. 检查点,用户输入 {'foo': '', 'bar': []} 并且 node_a 作为下一个要执行的节点
  3. checkpoint 的输出为 node_a {'foo': 'a', 'bar': ['a']} ,node_b 为下一个要执行的节点
  4. 检查点,node_b 的输出为 {'foo': 'b', 'bar': ['a', 'b']},并且没有要执行的下一个节点
获取状态
与已保存的图状态交互时,必须指定线程标识符。可以通过调用 graph.get_state(config) 查看图的最新状态。
这将返回一个 StateSnapshot 对象,该对象对应于与配置中提供的线程 ID 关联的最新检查点或与线程的检查点 ID 关联的检查点(如果提供)。
# 获取 latest state snapshotconfig = {"configurable": {"thread_id""1"}}graph.get_state(config)# 获取指定 checkpoint_id的 state snapshotconfig = {"configurable": {"thread_id""1""checkpoint_id""1ef663ba-28fe-6528-8002-5a559208592c"}}graph.get_state(config)
获取历史状态
可以通过调用 graph.get_state_history(config) 获取给定线程的图形执行的完整历史记录。这将返回与配置中提供的线程 ID 关联的 StateSnapshot 对象列表。
重要的是,检查点将按时间顺序排序,最近的检查点/StateSnapshot 将位于列表中的第一个。
config = {"configurable": {"thread_id""1"}}list(graph.get_state_history(config))
重放
还可以回放先前的图形执行。如果我们使用thread_id和checkpoint_id invoke graph,那么我们将从与checkpoint_id相对应的检查点开始重新执行graph:
  • thread_id 只是线程的 ID。这始终是必需的。
  • checkpoint_id 此标识符指的是线程内的特定检查点。
# {"configurable": {"thread_id""1"}}  # 有效的 config# {"configurable": {"thread_id""1""checkpoint_id""0c62ca34-ac19-445d-bbb0-5b4984975b2a"}}  # 也是有效的 config
config = {"configurable": {"thread_id""1"}}graph.invoke(None, config=config)
重要的是,LangGraph 知道特定的检查点是否之前已经执行过。重要的是,LangGraph 知道特定的检查点是否之前已经执行过。
更新状态
除了从特定检查点重新播放图表外,我们还可以编辑图表状态。我们使用 graph.update_state() 来执行此操作。此方法接受三个不同的参数:
config:
配置应包含指定要更新哪个线程的thread_id。当仅传递thread_id时,我们会更新(或分叉)当前状态。或者,如果我们包含checkpoint_id字段,则我们会分叉所选检查点。
values:
这些是将用于更新状态的值。请注意,此更新的处理方式与来自节点的任何更新的处理方式完全相同。这意味着如果为graph状态中的某些通道定义了这些值,这些值将被传递给reducer函数。
因此, update_state 不会自动覆盖每个通道的通道值,而只会覆盖没有 Reducer 的通道值。看个例子(假设您已经使用以下模式定义了图形的状态(参见上面的完整示例):):
from typing import Annotatedfrom typing_extensions import TypedDictfrom operator import add
class State(TypedDict):    # 没有指定reducer    foo: int    # 指定了reducer 为add    bar: Annotated[list[str], add]
现在我们假设图的当前状态是
{"foo"1"bar": ["a"]}
如果我们按如下方式更新状态:
graph.update_state(config, {"foo"2"bar": ["b"]})
那么图的新状态将是:
{"foo"2"bar": ["a""b"]}
foo 键(通道)已完全更改(因为该通道未指定任何 Reducer,因此 update_state 会将其覆盖)。但是,bar 键已指定任何 Reducer,因此它会将“b”附加到 bar 的状态。
as_node
调用 update_state 时,可以选择指定的最后一项是 as_node。如果提供了它,则更新将像来自节点 as_node 一样应用。如果没有提供 as_node,它将被设置为最后一个更新状态的节点(如果不明确的话)。
这很重要是因为,接下来要执行的步骤取决于最后一个给出更新的节点,因此可以使用它来控制接下来执行哪个节点。

memory store

state schema 指定了一组在执行graph时填充的key。如上所述,状态可以通过检查点在每个图形步骤中写入thread,从而实现状态持久性。
但是,如果我们想在各个thread之间保留一些信息怎么办?考虑一个聊天机器人的情况,我们想在与该用户的所有聊天对话(例如线程)中保留有关该用户的特定信息!
也就是说,仅使用检查点,我们无法跨线程共享信息。因为checkpoint的粒度是精确到thread的,但是thread和user的关系是多对一的,也就是一个用户可以有多个thread,仅仅使用checkpoint做不到一个user间的信息共享。
这激发了对 Store 接口的需求。
举例来说,我们可以定义一个 InMemoryStore 来跨线程存储有关用户的信息。我们只需像以前一样使用检查点和新的 in_memory_store 变量来编译我们的图表即可。
我们先单独看一下InMemoryStore 的用法
from langgraph.store.memory import InMemoryStorein_memory_store = InMemoryStore()
memory由一个元组命名,在这个特定示例中为 (<user_id>, "memories")。命名空间可以是任意长度,表示任何内容,不必特定于用户。
user_id = "1"namespace_for_memory = (user_id, "memories")
memory_id = str(uuid.uuid4())memory = {"food_preference" : "I like pizza"}in_memory_store.put(namespace_for_memory, memory_id, memory)
我们可以使用 store.search 方法读取命名空间中的记忆,该方法将以列表形式返回给定用户的所有记忆。最近的记忆位于列表中的最后一个。
memories = in_memory_store.search(namespace_for_memory)print(memories[-1].dict())
{'namespace': ['1''memories'], 'key''7afaa64d-d0cf-44db-bc65-7bfe1bb59025''value': {'food_preference''I like pizza'}, 'created_at''2025-01-07T02:28:24.755296+00:00''updated_at''2025-01-07T02:28:24.755301+00:00''score'None}
每种内存类型都是具有某些属性的 Python 类(Item)。我们可以像上面一样通过 .dict 进行转换,将其作为字典访问。它具有的属性包括:
  • value:此memory的值(本身就是一个字典)
  • key:此命名空间中此内存的唯一键
  • namespace:字符串列表,此内存类型的命名空间
  • created_at:创建此内存的时间戳
  • updated_at:此内存更新的时间戳
语义搜索
除了简单的检索之外,存储还支持语义搜索,允许根据含义而不是精确匹配来查找记忆。要实现此功能,请使用embeddibg model配置store:
from langchain.embeddings import init_embeddingsfrom langgraph.store.memory import InMemoryStore,IndexConfigindex = IndexConfig(embed=init_embeddings("openai:text-embedding-3-small"), # Embedding provider                    dims=1536,  # Embedding dimensions                    fields=["food_preference""$"])    # Fields to embedstore = InMemoryStore(index=index)
现在搜索时,可以使用自然语言查询来查找相关的记忆:
memories = store.search(    namespace_for_memory,    query="What does the user like to eat?",    limit=3  # 返回匹配的 top 3 )
您可以通过配置fields参数或在存储记忆时index索引参数来控制嵌入记忆的哪些部分:
# 嵌入 "food_preference" 字段store.put(    namespace_for_memory,    str(uuid.uuid4()),    {        "food_preference""I love Italian cuisine",        "context""Discussing dinner plans"    },    index=["food_preference"]  # 只嵌入 "food_preferences" 字段)# 仅存储,不嵌入store.put(    namespace_for_memory,    str(uuid.uuid4()),    {"system_info""Last updated: 2024-01-01"},    index=False)
从向量存储来看,注意,我们一般把嵌入和所谓的index等同,嵌入是做index的必要步骤,把一个字段embedding就是要对其进行索引。

结合langgraph使用
一切就绪后,我们使用 LangGraph 中的 in_memory_store。in_memory_store 与 checkpointer 协同工作:
如上所述,检查点将状态保存到线程,并且 in_memory_store 允许我们存储任意信息以供跨线程访问,我们使用检查点和 in_memory_store 编译图表,如下所示。
from langgraph.checkpoint.memory import MemorySaver# 中文:我们需要这个,因为我们想要启用线程(对话)checkpointer = MemorySaver()# ... 定义graph ...# 通过 checkpointer 和 store 编译 graphgraph = graph.compile(checkpointer=checkpointer, store=in_memory_store)
像前面一样,我们使用 thread_id 调用graph,同时使用 user_id,我们将使用它将我们的记忆命名空间到这个特定的用户,如上所示。
Invoke graphuser_id = "1"config = {"configurable": {"thread_id""1""user_id": user_id}}# 首先,让我们向AI打个招呼for update in graph.stream(        {"messages": [{"role""user""content""hi"}]}, config, stream_mode="updates"):    print(update)
我们可以通过传递 store: BaseStore 和 config: RunnableConfig 作为节点参数来访问任何节点中的 in_memory_store 和 user_id。
我们可以更新memory:
def update_memory(state: MessagesState, config: RunnableConfig, *, store: BaseStore):    user_id = config["configurable"]["user_id"]    namespace = (user_id, "memories")    # ... 分析对话并创建新的记忆    # 为新的记忆创建一个新的 ID    memory_id = str(uuid.uuid4())    # 创建一个新的记忆    store.put(namespace, memory_id, {"memory": memory})
我们可以访问这些memory并在我们的模型调用中使用它们。
def call_model(state: MessagesState, config: RunnableConfig, *, store: BaseStore):    user_id = config["configurable"]["user_id"]    memories = store.search(        namespace,        query=state["messages"][-1].content,        limit=3    )    info = "\n".join([d.value["memory"for d in memories])    # ... 在模型调用中使用记忆
如果我们创建一个新thread,只要user_id相同,我们仍然可以访问相同的memory。
Invoke graphconfig = {"configurable": {"thread_id""2""user_id""1"}}for update in graph.stream(        {"messages": [{"role""user""content""hi, tell me about my memories"}]}, config, stream_mode="updates"):    print(update)
checkpoint的库
上面只说了in_memory_serve,这种的局限是一旦机器重启,checkpoint就没有了,实际上我们很少使用,或者说至少需要配合持久化的lib使用,下面是langgraph的提供的抽象和实现:
  • langgraph-checkpoint:检查点保存器(BaseCheckpointSaver)的基本接口和序列化/反序列化接口(SerializerProtocol)。包括用于实验的内存检查点实现 (MemorySaver)。LangGraph 附带有 langgraph-checkpoint。
  • langgraph-checkpoint-sqlite:使用 SQLite 数据库(SqliteSaver / AsyncSqliteSaver)的 LangGraph 检查点实现。非常适合实验和本地工作流程。需要单独安装。
  • langgraph-checkpoint-postgres:使用 Postgres 数据库(PostgresSaver / AsyncPostgresSaver)的高级检查点程序,用于 LangGraph Cloud。非常适合在生产中使用。需要单独安装。
checkpoint的接口:
每个检查点都符合 BaseCheckpointSaver 接口并实现以下方法:
  • .put-存储检查点及其配置和元数据。
  • .put_writes - 存储与检查点相关的中间写入(比如pending writes)。
  • .get_tuple - 使用给定配置(thread_id 和 checkpoint_id)获取检查点元组。这用于在 graph.get_state() 中填充 StateSnapshot。
  • .list - 列出符合给定配置和过滤条件的检查点。这用于在 graph.get_state_history() 中填充状态历史记录

Serializer
当检查点保存图形状态时,它们需要序列化状态中的通道值。这是使用序列化器对象完成的。
langgraph_checkpoint 定义了用于实现序列化器的协议,并提供了一个默认实现(JsonPlusSerializer),可以处理各种类型,包括 LangChain 和 LangGraph 原语、日期时间、枚举等。

参考链接:
https://langchain-ai.github.io/langgraph/concepts/persistence/

半夏决明
读书,摄影,随笔
 最新文章