精通LangGraph-Stream

文摘   2025-01-16 12:10   四川  
LangGraph 内置一流的stream支持。

流式输出graph
.stream 和 .astream 是用于从graph运行中流回输出的同步和异步方法。调用这些方法时,可以指定几种不同的mode(例如 `graph.stream(..., mode="...")):
  • values:这会在graph的每个步骤之后传输状态的完整值。
  • updates:这会在graph的每个步骤之后将更新传输到状态。如果在同一步骤中进行多项更新(例如运行多个节点),则这些更新将分别传输。

  • custom:这将从图形节点内部流式传输自定义数据。

  • messages:这将为调用 LLM 的图形节点传输 LLM 令牌和元数据。

  • debug:这会在图表的整个执行过程中传输尽可能多的信息。

还可以通过list传递来同时指定多种流式传输模式。执行此操作时,流式传输输出将是tuples (stream_mode, data)。例如:
graph.stream(..., stream_mode=["updates""messages"])
...('messages', (AIMessageChunk(content='Hi'), {'langgraph_step'3'langgraph_node''agent', ...}))...('updates', {'agent': {'messages': [AIMessage(content="Hi, how can I help you?")]}})
下面的可视化显示了values和updates模式之间的区别:

Streaming LLM tokens and events
此外,还可以使用 astream_events 方法流式传输节点内发生的事件。这对于流式传输 LLM 调用的token非常有用。
这是所有 LangChain 对象上的标准方法。这意味着在执行graph时,会沿途发出某些event,如果使用 .astream_events 运行graph,则可以看到这些event。
所有event都有(除其他内容外)event、name和data字段。
  • event:这是正在发出的事件类型。可以在此处(https://python.langchain.com/docs/concepts/#callback-events)找到所有回调事件和触发器的详细表格。
  • name: event的name
  • data:与event相关的数据
什么类型的东西会导致event的发生?
  • 每个节点(runnable)在开始执行时发出 on_chain_start,在节点执行期间发出 on_chain_stream,在节点完成时发出 on_chain_end。节点事件将在事件的名称字段中包含节点名称
  • 将在graph执行开始时发出 on_chain_start,在每个节点执行后发出 on_chain_stream,并在graph完成时发出 on_chain_end。图表事件将在事件的名称字段中包含 LangGraph
  • 对状态通道的任何写入(即任何时候更新某个状态key的value)都将发出 on_chain_start 和 on_chain_end 事件
此外,在节点内创建的任何事件(LLM 事件、工具事件、手动发出的事件等)也将在 .astream_events 的输出中可见。
为了使其更加具体并了解它是什么样子,让我们看看运行简单图表时会返回哪些事件:
from langchain_ollama import ChatOllamaimport base_conffrom langgraph.graph import StateGraph, MessagesState, START, ENDimport asynciofrom langchain_core.messages import HumanMessagemodel = ChatOllama(base_url=base_conf.base_url,                   model=base_conf.model_name,                   temperature=base_conf.temperature)
def call_model(state: MessagesState):    response = model.invoke(state['messages'])    return {"messages": response}
workflow = StateGraph(MessagesState)workflow.add_node(call_model)workflow.add_edge(START, "call_model")workflow.add_edge("call_model", END)app = workflow.compile()
async def run(app):    async for event in app.astream_events({"messages": [HumanMessage("你好")]}, version="v2"):        kind = event["event"]        print(f"{kind}{event['name']}")
asyncio.run(run(app))
on_chain_start: LangGraphon_chain_start: __start__on_chain_start: _writeon_chain_end: _writeon_chain_start: _writeon_chain_end: _writeon_chain_stream: __start__on_chain_end: __start__on_chain_start: call_modelon_chat_model_start: ChatOllamaon_chat_model_stream: ChatOllamaon_chat_model_stream: ChatOllamaon_chat_model_stream: ChatOllamaon_chat_model_stream: ChatOllamaon_chat_model_end: ChatOllamaon_chain_start: _writeon_chain_end: _writeon_chain_stream: call_modelon_chain_end: call_modelon_chain_stream: LangGraphon_chain_end: LangGraph
每种类型的事件都包含不同格式的数据。让我们看看 on_chat_model_stream 事件是什么样子的。这是一种重要的事件类型,因为它是 LLM 响应中流式传输token所必需的。

这些事件如下:

{'event''on_chat_model_stream', 'name''ChatOpenAI', 'run_id''3fdbf494-acce-402e-9b50-4eab46403859', 'tags': ['seq:step:1'], 'metadata': {'langgraph_step': 1,  'langgraph_node''call_model',  'langgraph_triggers': ['start:call_model'],  'langgraph_task_idx': 0,  'checkpoint_id''1ef657a0-0f9d-61b8-bffe-0c39e4f9ad6c',  'checkpoint_ns''call_model',  'ls_provider''openai',  'ls_model_name''gpt-4o-mini',  'ls_model_type''chat',  'ls_temperature': 0.7}, 'data': {'chunk': AIMessageChunk(content='Hello'id='run-3fdbf494-acce-402e-9b50-4eab46403859')}, 'parent_ids': []}
我们可以看到我们有事件类型和名称(我们之前就知道了)。
我们在元数据中也有很多内容。值得注意的是,“langgraph_node”:“call_model”是一些非常有用的信息,它告诉我们这个模型在哪个节点中被调用。
最后,data是一个非常重要的字段。它包含此事件的实际数据!在本例中是 AIMessageChunk。它包含消息的内容以及 ID。这是整个 AIMessage(而不仅仅是这个块)的 ID,非常有用 - 它可以帮助我们追踪哪些块是同一条消息的一部分(因此我们可以在 UI 中一起显示它们)。
此信息包含创建用于流式传输 LLM 令牌的 UI 所需的全部内容。

半夏决明
读书,摄影,随笔
 最新文章