Documentation Index Fetch the complete documentation index at: https://langchain.idochub.dev/llms.txt
Use this file to discover all available pages before exploring further.
LangGraph 实现了一套流式传输系统,用于实时推送更新。流式传输对于提升基于大语言模型(LLM)构建的应用程序的响应速度至关重要。通过在完整响应尚未生成完毕时就逐步显示输出内容,流式传输显著改善了用户体验(UX),尤其在应对大语言模型固有的延迟时效果尤为明显。
LangGraph 流式传输支持的功能:
流式传输图状态 — 使用 updates 和 values 模式获取状态更新/值。
流式传输子图输出 — 同时包含父图和任何嵌套子图的输出。
流式传输 LLM 令牌 — 从任意位置捕获令牌流:节点内部、子图或工具中。
流式传输自定义数据 — 从工具函数中直接发送自定义更新或进度信号。
使用多种流式传输模式 — 可选择 values(完整状态)、updates(状态增量)、messages(LLM 令牌 + 元数据)、custom(任意用户数据)或 debug(详细追踪信息)。
支持的流式传输模式
将以下一个或多个流式传输模式作为列表传递给 stream() 或 astream() 方法:
模式 描述 values在图的每一步执行后,流式传输状态的完整值。 updates在图的每一步执行后,流式传输状态的更新内容。如果同一步骤中进行了多次更新(例如,运行了多个节点),这些更新将被分别流式传输。 custom从图节点内部流式传输自定义数据。 messages从调用 LLM 的任何图节点中流式传输二元组(LLM 令牌, 元数据)。 debug在图执行过程中尽可能多地流式传输信息。
基本用法示例
LangGraph 图暴露了 .stream() (同步)和 .astream() (异步)方法,以迭代器形式生成流式输出。
for chunk in graph.stream(inputs, stream_mode = "updates" ):
print (chunk)
from typing import TypedDict
from langgraph.graph import StateGraph, START , END
class State ( TypedDict ):
topic: str
joke: str
def refine_topic ( state : State):
return { "topic" : state[ "topic" ] + " and cats" }
def generate_joke ( state : State):
return { "joke" : f "This is a joke about { state[ 'topic' ] } " }
graph = (
StateGraph(State)
.add_node(refine_topic)
.add_node(generate_joke)
.add_edge( START , "refine_topic" )
.add_edge( "refine_topic" , "generate_joke" )
.add_edge( "generate_joke" , END )
.compile()
)
for chunk in graph.stream( # (1)!
{ "topic" : "ice cream" },
stream_mode = "updates" , # (2)!
):
print (chunk)
stream() 方法返回一个迭代器,用于生成流式输出。
设置 stream_mode="updates" 以仅流式传输每个节点执行后对图状态的更新。也可使用其他流式传输模式。详情请参阅支持的流式传输模式 。
{'refineTopic': {'topic': 'ice cream and cats'}}
{'generateJoke': {'joke': 'This is a joke about ice cream and cats'}}
流式传输多种模式
你可以将 stream_mode 参数设置为一个列表,以同时流式传输多种模式。
流式输出将是 (mode, chunk) 形式的元组,其中 mode 是流式传输模式的名称,chunk 是该模式流式传输的数据。
for mode, chunk in graph.stream(inputs, stream_mode = [ "updates" , "custom" ]):
print (chunk)
流式传输图状态
使用 updates 和 values 流式传输模式在图执行时流式传输其状态。
updates 在图的每一步执行后流式传输状态的更新 。
values 在图的每一步执行后流式传输状态的完整值 。
from typing import TypedDict
from langgraph.graph import StateGraph, START , END
class State ( TypedDict ):
topic: str
joke: str
def refine_topic ( state : State):
return { "topic" : state[ "topic" ] + " and cats" }
def generate_joke ( state : State):
return { "joke" : f "This is a joke about { state[ 'topic' ] } " }
graph = (
StateGraph(State)
.add_node(refine_topic)
.add_node(generate_joke)
.add_edge( START , "refine_topic" )
.add_edge( "refine_topic" , "generate_joke" )
.add_edge( "generate_joke" , END )
.compile()
)
使用此模式仅流式传输每个步骤后节点返回的状态更新 。流式输出包含节点名称及其更新内容。 for chunk in graph.stream(
{ "topic" : "ice cream" },
stream_mode = "updates" ,
):
print (chunk)
使用此模式在每个步骤后流式传输图的完整状态 。 for chunk in graph.stream(
{ "topic" : "ice cream" },
stream_mode = "values" ,
):
print (chunk)
流式传输子图输出
为了在流式输出中包含子图 的输出,你可以在父图的 .stream() 方法中设置 subgraphs=True。这将同时流式传输父图和任何子图的输出。
输出将以 (namespace, data) 元组的形式流式传输,其中 namespace 是一个元组,表示调用子图的节点路径,例如 ("parent_node:<task_id>", "child_node:<task_id>")。
for chunk in graph.stream(
{ "foo" : "foo" },
subgraphs = True , # (1)!
stream_mode = "updates" ,
):
print (chunk)
设置 subgraphs=True 以流式传输子图的输出。
from langgraph.graph import START , StateGraph
from typing import TypedDict
# 定义子图
class SubgraphState ( TypedDict ):
foo: str # 注意此键与父图状态共享
bar: str
def subgraph_node_1 ( state : SubgraphState):
return { "bar" : "bar" }
def subgraph_node_2 ( state : SubgraphState):
return { "foo" : state[ "foo" ] + state[ "bar" ]}
subgraph_builder = StateGraph(SubgraphState)
subgraph_builder.add_node(subgraph_node_1)
subgraph_builder.add_node(subgraph_node_2)
subgraph_builder.add_edge( START , "subgraph_node_1" )
subgraph_builder.add_edge( "subgraph_node_1" , "subgraph_node_2" )
subgraph = subgraph_builder.compile()
# 定义父图
class ParentState ( TypedDict ):
foo: str
def node_1 ( state : ParentState):
return { "foo" : "hi! " + state[ "foo" ]}
builder = StateGraph(ParentState)
builder.add_node( "node_1" , node_1)
builder.add_node( "node_2" , subgraph)
builder.add_edge( START , "node_1" )
builder.add_edge( "node_1" , "node_2" )
graph = builder.compile()
for chunk in graph.stream(
{ "foo" : "foo" },
stream_mode = "updates" ,
subgraphs = True , # (1)!
):
print (chunk)
设置 subgraphs=True 以流式传输子图的输出。
((), {'node_1': {'foo': 'hi! foo'}})
(('node_2:dfddc4ba-c3c5-6887-5012-a243b5b377c2',), {'subgraph_node_1': {'bar': 'bar'}})
(('node_2:dfddc4ba-c3c5-6887-5012-a243b5b377c2',), {'subgraph_node_2': {'foo': 'hi! foobar'}})
((), {'node_2': {'foo': 'hi! foobar'}})
注意 我们不仅接收到了节点更新,还接收到了命名空间,它告诉我们当前流式传输来自哪个图(或子图)。
使用 debug 流式传输模式在图执行过程中尽可能多地流式传输信息。流式输出包括节点名称和完整状态。
for chunk in graph.stream(
{ "topic" : "ice cream" },
stream_mode = "debug" ,
):
print (chunk)
LLM 令牌
使用 messages 流式传输模式,从图的任何部分(包括节点、工具、子图或任务)逐令牌 流式传输大语言模型(LLM)的输出。
messages 模式 的流式输出是一个元组 (message_chunk, metadata),其中:
message_chunk:来自 LLM 的令牌或消息片段。
metadata:包含有关图节点和 LLM 调用详细信息的字典。
如果你的 LLM 没有 LangChain 集成,可以改用 custom 模式流式传输其输出。详情请参阅与任意 LLM 一起使用 。
Python < 3.11 的异步需要手动配置
在 Python < 3.11 中使用异步代码时,必须显式地将 RunnableConfig 传递给 ainvoke() 以启用正确的流式传输。详情请参阅异步与 Python < 3.11 或升级到 Python 3.11+。
from dataclasses import dataclass
from langchain.chat_models import init_chat_model
from langgraph.graph import StateGraph, START
@dataclass
class MyState :
topic: str
joke: str = ""
llm = init_chat_model( model = "openai:gpt-4o-mini" )
def call_model ( state : MyState):
"""调用 LLM 生成关于某个主题的笑话"""
llm_response = llm.invoke( # (1)!
[
{ "role" : "user" , "content" : f "Generate a joke about { state.topic } " }
]
)
return { "joke" : llm_response.content}
graph = (
StateGraph(MyState)
.add_node(call_model)
.add_edge( START , "call_model" )
.compile()
)
for message_chunk, metadata in graph.stream( # (2)!
{ "topic" : "ice cream" },
stream_mode = "messages" ,
):
if message_chunk.content:
print (message_chunk.content, end = "|" , flush = True )
注意,即使使用 .invoke 而不是 .stream 运行 LLM,也会发出消息事件。
“messages” 流式传输模式返回一个 (message_chunk, metadata) 元组的迭代器,其中 message_chunk 是 LLM 流式传输的令牌,metadata 是包含调用 LLM 的图节点信息及其他信息的字典。
按 LLM 调用过滤
你可以为 LLM 调用关联 tags,以按 LLM 调用过滤流式传输的令牌。
from langchain.chat_models import init_chat_model
llm_1 = init_chat_model( model = "openai:gpt-4o-mini" , tags = [ 'joke' ]) # (1)!
llm_2 = init_chat_model( model = "openai:gpt-4o-mini" , tags = [ 'poem' ]) # (2)!
graph = ... # 定义一个使用这些 LLM 的图
async for msg, metadata in graph.astream( # (3)!
{ "topic" : "cats" },
stream_mode = "messages" ,
):
if metadata[ "tags" ] == [ "joke" ]: # (4)!
print (msg.content, end = "|" , flush = True )
llm_1 被标记为 “joke”。
llm_2 被标记为 “poem”。
stream_mode 设置为 “messages” 以流式传输 LLM 令牌。metadata 包含有关 LLM 调用的信息,包括标签。
通过 metadata 中的 tags 字段过滤流式传输的令牌,仅包含标记为 “joke” 的 LLM 调用的令牌。
from typing import TypedDict
from langchain.chat_models import init_chat_model
from langgraph.graph import START , StateGraph
joke_model = init_chat_model( model = "openai:gpt-4o-mini" , tags = [ "joke" ]) # (1)!
poem_model = init_chat_model( model = "openai:gpt-4o-mini" , tags = [ "poem" ]) # (2)!
class State ( TypedDict ):
topic: str
joke: str
poem: str
async def call_model ( state , config ):
topic = state[ "topic" ]
print ( "正在写笑话..." )
# 注意:在 Python < 3.11 中,必须显式传递 config
# 因为在此之前未添加上下文变量支持:https://docs.python.org/3/library/asyncio-task.html#creating-tasks
joke_response = await joke_model.ainvoke(
[{ "role" : "user" , "content" : f "写一个关于 { topic } 的笑话" }],
config, # (3)!
)
print ( " \n\n 正在写诗..." )
poem_response = await poem_model.ainvoke(
[{ "role" : "user" , "content" : f "写一首关于 { topic } 的短诗" }],
config, # (3)!
)
return { "joke" : joke_response.content, "poem" : poem_response.content}
graph = (
StateGraph(State)
.add_node(call_model)
.add_edge( START , "call_model" )
.compile()
)
async for msg, metadata in graph.astream(
{ "topic" : "cats" },
stream_mode = "messages" , # (4)!
):
if metadata[ "tags" ] == [ "joke" ]: # (4)!
print (msg.content, end = "|" , flush = True )
joke_model 被标记为 “joke”。
poem_model 被标记为 “poem”。
显式传递 config 以确保上下文变量正确传播。这在 Python < 3.11 中使用异步代码时是必需的。详情请参阅异步部分 。
stream_mode 设置为 “messages” 以流式传输 LLM 令牌。metadata 包含有关 LLM 调用的信息,包括标签。
按节点过滤
要仅从特定节点流式传输令牌,请使用 stream_mode="messages" 并通过流式传输元数据中的 langgraph_node 字段过滤输出:
for msg, metadata in graph.stream( # (1)!
inputs,
stream_mode = "messages" ,
):
if msg.content and metadata[ "langgraph_node" ] == "some_node_name" : # (2)!
...
“messages” 流式传输模式返回一个 (message_chunk, metadata) 元组,其中 message_chunk 是 LLM 流式传输的令牌,metadata 是包含调用 LLM 的图节点信息及其他信息的字典。
通过 metadata 中的 langgraph_node 字段过滤流式传输的令牌,仅包含来自 write_poem 节点的令牌。
from typing import TypedDict
from langgraph.graph import START , StateGraph
from langchain_openai import ChatOpenAI
model = ChatOpenAI( model = "gpt-4o-mini" )
class State ( TypedDict ):
topic: str
joke: str
poem: str
def write_joke ( state : State):
topic = state[ "topic" ]
joke_response = model.invoke(
[{ "role" : "user" , "content" : f "写一个关于 { topic } 的笑话" }]
)
return { "joke" : joke_response.content}
def write_poem ( state : State):
topic = state[ "topic" ]
poem_response = model.invoke(
[{ "role" : "user" , "content" : f "写一首关于 { topic } 的短诗" }]
)
return { "poem" : poem_response.content}
graph = (
StateGraph(State)
.add_node(write_joke)
.add_node(write_poem)
# 并发地写笑话和诗
.add_edge( START , "write_joke" )
.add_edge( START , "write_poem" )
.compile()
)
for msg, metadata in graph.stream( # (1)!
{ "topic" : "cats" },
stream_mode = "messages" ,
):
if msg.content and metadata[ "langgraph_node" ] == "write_poem" : # (2)!
print (msg.content, end = "|" , flush = True )
“messages” 流式传输模式返回一个 (message_chunk, metadata) 元组,其中 message_chunk 是 LLM 流式传输的令牌,metadata 是包含调用 LLM 的图节点信息及其他信息的字典。
通过 metadata 中的 langgraph_node 字段过滤流式传输的令牌,仅包含来自 write_poem 节点的令牌。
流式传输自定义数据
要从 LangGraph 节点或工具内部发送自定义用户定义数据 ,请按照以下步骤操作:
使用 get_stream_writer() 访问流式写入器并发出自定义数据。
在调用 .stream() 或 .astream() 时设置 stream_mode="custom" 以在流中获取自定义数据。你可以组合多种模式(例如,["updates", "custom"]),但至少必须包含 "custom"。
Python < 3.11 的异步中无 get_stream_writer()
在 Python < 3.11 上运行的异步代码中,get_stream_writer() 将无法工作。
相反,请在你的节点或工具中添加一个 writer 参数并手动传递它。
有关用法示例,请参阅异步与 Python < 3.11 。
from typing import TypedDict
from langgraph.config import get_stream_writer
from langgraph.graph import StateGraph, START
class State ( TypedDict ):
query: str
answer: str
def node ( state : State):
writer = get_stream_writer() # (1)!
writer({ "custom_key" : "在节点内部生成自定义数据" }) # (2)!
return { "answer" : "some data" }
graph = (
StateGraph(State)
.add_node(node)
.add_edge( START , "node" )
.compile()
)
inputs = { "query" : "example" }
# 用法
for chunk in graph.stream(inputs, stream_mode = "custom" ): # (3)!
print (chunk)
获取流式写入器以发送自定义数据。
发出一个自定义键值对(例如,进度更新)。
3. 设置 stream_mode="custom" 以在流中接收自定义数据。
from langchain_core.tools import tool
from langgraph.config import get_stream_writer
@tool
def query_database ( query : str ) -> str :
"""查询数据库。"""
writer = get_stream_writer() # (1)!
writer({ "data" : "已检索 0/100 条记录" , "type" : "progress" }) # (2)!
# 执行查询
writer({ "data" : "已检索 100/100 条记录" , "type" : "progress" }) # (3)!
return "some-answer"
graph = ... # 定义一个使用此工具的图
for chunk in graph.stream(inputs, stream_mode = "custom" ): # (4)!
print (chunk)
访问流式写入器以发送自定义数据。
发出一个自定义键值对(例如,进度更新)。
发出另一个自定义键值对。
4. 设置 stream_mode="custom" 以在流中接收自定义数据。
与任意 LLM 一起使用
你可以使用 stream_mode="custom" 从任意 LLM API 流式传输数据 —— 即使该 API 未 实现 LangChain 聊天模型接口。
这使你可以集成原始 LLM 客户端或提供自身流式接口的外部服务,使 LangGraph 对于自定义设置具有高度灵活性。
from langgraph.config import get_stream_writer
def call_arbitrary_model ( state ):
"""调用任意模型并流式传输输出的示例节点"""
writer = get_stream_writer() # (1)!
# 假设你有一个生成块的流式客户端
for chunk in your_custom_streaming_client(state[ "topic" ]): # (2)!
writer({ "custom_llm_chunk" : chunk}) # (3)!
return { "result" : "completed" }
graph = (
StateGraph(State)
.add_node(call_arbitrary_model)
# 根据需要添加其他节点和边
.compile()
)
for chunk in graph.stream(
{ "topic" : "cats" },
stream_mode = "custom" , # (4)!
):
# 块将包含从 LLM 流式传输的自定义数据
print (chunk)
获取流式写入器以发送自定义数据。
使用你的自定义流式客户端生成 LLM 令牌。
使用写入器将自定义数据发送到流中。
设置 stream_mode="custom" 以在流中接收自定义数据。
import operator
import json
from typing import TypedDict
from typing_extensions import Annotated
from langgraph.graph import StateGraph, START
from openai import AsyncOpenAI
openai_client = AsyncOpenAI()
model_name = "gpt-4o-mini"
async def stream_tokens ( model_name : str , messages : list[ dict ]):
response = await openai_client.chat.completions.create(
messages = messages, model = model_name, stream = True
)
role = None
async for chunk in response:
delta = chunk.choices[ 0 ].delta
if delta.role is not None :
role = delta.role
if delta.content:
yield { "role" : role, "content" : delta.content}
# 这是我们的工具
async def get_items ( place : str ) -> str :
"""使用此工具列出你被询问的地方可能找到的物品。"""
writer = get_stream_writer()
response = ""
async for msg_chunk in stream_tokens(
model_name,
[
{
"role" : "user" ,
"content" : (
"你能告诉我以下地方可能找到哪些物品吗:' {place} '。"
"至少列出 3 个这样的物品,用逗号分隔。"
"并为每个物品提供简要描述。"
),
}
],
):
response += msg_chunk[ "content" ]
writer(msg_chunk)
return response
class State ( TypedDict ):
messages: Annotated[list[ dict ], operator.add]
# 这是调用工具的图节点
async def call_tool ( state : State):
ai_message = state[ "messages" ][ - 1 ]
tool_call = ai_message[ "tool_calls" ][ - 1 ]
function_name = tool_call[ "function" ][ "name" ]
if function_name != "get_items" :
raise ValueError ( f "不支持工具 { function_name } " )
function_arguments = tool_call[ "function" ][ "arguments" ]
arguments = json.loads(function_arguments)
function_response = await get_items( ** arguments)
tool_message = {
"tool_call_id" : tool_call[ "id" ],
"role" : "tool" ,
"name" : function_name,
"content" : function_response,
}
return { "messages" : [tool_message]}
graph = (
StateGraph(State)
.add_node(call_tool)
.add_edge( START , "call_tool" )
.compile()
)
让我们用一个包含工具调用的 AI 消息调用图: inputs = {
"messages" : [
{
"content" : None ,
"role" : "assistant" ,
"tool_calls" : [
{
"id" : "1" ,
"function" : {
"arguments" : '{"place":"卧室"}' ,
"name" : "get_items" ,
},
"type" : "function" ,
}
],
}
]
}
async for chunk in graph.astream(
inputs,
stream_mode = "custom" ,
):
print (chunk[ "content" ], end = "|" , flush = True )
为特定聊天模型禁用流式传输
如果你的应用程序混合了支持流式传输和不支持流式传输的模型,你可能需要显式地为不支持流式传输的模型禁用流式传输。
在初始化模型时设置 disable_streaming=True。
from langchain.chat_models import init_chat_model
model = init_chat_model(
"anthropic:claude-3-7-sonnet-latest" ,
disable_streaming = True # (1)!
)
设置 disable_streaming=True 以禁用聊天模型的流式传输。
from langchain_openai import ChatOpenAI
llm = ChatOpenAI( model = "o1-preview" , disable_streaming = True ) # (1)!
设置 disable_streaming=True 以禁用聊天模型的流式传输。
Python < 3.11 的异步
在 Python 版本 < 3.11 中,asyncio 任务 不支持 context 参数。
这限制了 LangGraph 自动传播上下文的能力,并以两种关键方式影响 LangGraph 的流式传输机制:
你必须 在异步 LLM 调用(例如,ainvoke())中显式传递 RunnableConfig ,因为回调不会自动传播。
你不能 在异步节点或工具中使用 get_stream_writer() —— 你必须直接传递一个 writer 参数。
from typing import TypedDict
from langgraph.graph import START , StateGraph
from langchain.chat_models import init_chat_model
llm = init_chat_model( model = "openai:gpt-4o-mini" )
class State ( TypedDict ):
topic: str
joke: str
async def call_model ( state , config ): # (1)!
topic = state[ "topic" ]
print ( "正在生成笑话..." )
joke_response = await llm.ainvoke(
[{ "role" : "user" , "content" : f "写一个关于 { topic } 的笑话" }],
config, # (2)!
)
return { "joke" : joke_response.content}
graph = (
StateGraph(State)
.add_node(call_model)
.add_edge( START , "call_model" )
.compile()
)
async for chunk, metadata in graph.astream(
{ "topic" : "ice cream" },
stream_mode = "messages" , # (3)!
):
if chunk.content:
print (chunk.content, end = "|" , flush = True )
在异步节点函数中接受 config 作为参数。
将 config 传递给 llm.ainvoke() 以确保上下文正确传播。
设置 stream_mode="messages" 以流式传输 LLM 令牌。
from typing import TypedDict
from langgraph.types import StreamWriter
class State ( TypedDict ):
topic: str
joke: str
async def generate_joke ( state : State, writer : StreamWriter): # (1)!
writer({ "custom_key" : "在生成笑话时流式传输自定义数据" })
return { "joke" : f "这是一个关于 { state[ 'topic' ] } 的笑话" }
graph = (
StateGraph(State)
.add_node(generate_joke)
.add_edge( START , "generate_joke" )
.compile()
)
async for chunk in graph.astream(
{ "topic" : "ice cream" },
stream_mode = "custom" , # (2)!
):
print (chunk)
在异步节点或工具的函数签名中添加 writer 作为参数。LangGraph 将自动将流式写入器传递给函数。
设置 stream_mode="custom" 以在流中接收自定义数据。