要审查、编辑并批准代理或工作流中的工具调用,请使用中断功能暂停图的执行并等待人工输入。中断功能利用 LangGraph 的持久化层保存图状态,从而无限期暂停图的执行,直到您恢复运行。
image
使用 interrupt 暂停
动态中断(也称为动态断点)根据图的当前状态触发。您可以通过在适当位置调用 interrupt 函数 来设置动态中断。图将暂停以允许人工干预,然后根据人工输入继续执行。这适用于审批、编辑或收集额外上下文等任务。
自 v1.0 起,interrupt 是暂停图的推荐方式。NodeInterrupt 已弃用,并将在 v2.0 中移除。
要在图中使用 interrupt,您需要:
- 指定一个检查点器,以便在每一步后保存图的状态。
- 在适当位置调用
interrupt()。请参阅常见模式部分获取示例。
- 使用线程 ID 运行图,直到触发
interrupt。
- 使用
invoke/stream 恢复执行(参见Command 原语)。
from langgraph.types import interrupt, Command
def human_node(state: State):
value = interrupt( # (1)!
{
"text_to_revise": state["some_text"] # (2)!
}
)
return {
"some_text": value # (3)!
}
graph = graph_builder.compile(checkpointer=checkpointer) # (4)!
# 运行图直到触发中断。
config = {"configurable": {"thread_id": "some_id"}}
result = graph.invoke({"some_text": "原始文本"}, config=config) # (5)!
print(result['__interrupt__']) # (6)!
# > [
# > Interrupt(
# > value={'text_to_revise': '原始文本'},
# > resumable=True,
# > ns=['human_node:6ce9e64f-edef-fe5d-f7dc-511fa9526960']
# > )
# > ]
print(graph.invoke(Command(resume="已编辑的文本"), config=config)) # (7)!
# > {'some_text': '已编辑的文本'}
interrupt(...) 在 human_node 处暂停执行,向人工展示给定的有效载荷。
- 可向
interrupt 函数传递任何 JSON 可序列化的值。此处为包含待修订文本的字典。
- 恢复后,
interrupt(...) 的返回值即为人工提供的输入,用于更新状态。
- 需要检查点器来持久化图状态。在生产环境中,应使用持久化存储(如数据库支持)。
- 图以某些初始状态被调用。
- 当图触发中断时,会返回包含有效载荷和元数据的
Interrupt 对象。
- 图通过
Command(resume=...) 恢复,注入人工输入并继续执行。
from typing import TypedDict
import uuid
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.constants import START
from langgraph.graph import StateGraph
from langgraph.types import interrupt, Command
class State(TypedDict):
some_text: str
def human_node(state: State):
value = interrupt( # (1)!
{
"text_to_revise": state["some_text"] # (2)!
}
)
return {
"some_text": value # (3)!
}
# 构建图
graph_builder = StateGraph(State)
graph_builder.add_node("human_node", human_node)
graph_builder.add_edge(START, "human_node")
checkpointer = InMemorySaver() # (4)!
graph = graph_builder.compile(checkpointer=checkpointer)
# 向图传入线程 ID 以运行它。
config = {"configurable": {"thread_id": uuid.uuid4()}}
# 运行图直到触发中断。
result = graph.invoke({"some_text": "原始文本"}, config=config) # (5)!
print(result['__interrupt__']) # (6)!
# > [
# > Interrupt(
# > value={'text_to_revise': '原始文本'},
# > resumable=True,
# > ns=['human_node:6ce9e64f-edef-fe5d-f7dc-511fa9526960']
# > )
# > ]
print(result["__interrupt__"]) # (6)!
# > [Interrupt(value={'text_to_revise': '原始文本'}, id='6d7c4048049254c83195429a3659661d')]
print(graph.invoke(Command(resume="已编辑的文本"), config=config)) # (7)!
# > {'some_text': '已编辑的文本'}
interrupt(...) 在 human_node 处暂停执行,向人工展示给定的有效载荷。
- 可向
interrupt 函数传递任何 JSON 可序列化的值。此处为包含待修订文本的字典。
- 恢复后,
interrupt(...) 的返回值即为人工提供的输入,用于更新状态。
- 需要检查点器来持久化图状态。在生产环境中,应使用持久化存储(如数据库支持)。
- 图以某些初始状态被调用。
- 当图触发中断时,会返回包含有效载荷和元数据的
Interrupt 对象。
- 图通过
Command(resume=...) 恢复,注入人工输入并继续执行。
从开发者体验角度看,中断类似于 Python 的 input() 函数,但它们不会自动从中断点恢复执行。相反,它们会重新运行使用中断的整个节点。因此,中断通常最好放置在节点开头或专用节点中。
使用 Command 原语恢复
从 interrupt 恢复不同于 Python 的 input() 函数,后者会从调用 input() 函数的确切位置恢复执行。
当在图内使用 interrupt 函数时,执行会在该点暂停并等待用户输入。
要恢复执行,请使用 Command 原语,可通过 invoke 或 stream 方法提供。图将从最初调用 interrupt(...) 的节点开头恢复执行。此时,interrupt 函数将返回 Command(resume=value) 中提供的值,而不再次暂停。从节点开头到 interrupt 的所有代码都将重新执行。
# 通过提供用户输入恢复图的执行。
graph.invoke(Command(resume={"age": "25"}), thread_config)
一次调用恢复多个中断
当具有中断条件的节点并行运行时,任务队列中可能出现多个中断。
例如,以下图有两个需要人工输入的节点并行运行:
two nodes run in parallel that require human input
一旦您的图被中断并停滞,您可以使用 Command.resume 一次性恢复所有中断,传入一个将中断 ID 映射到恢复值的字典。
from typing import TypedDict
import uuid
from langchain_core.runnables import RunnableConfig
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.constants import START
from langgraph.graph import StateGraph
from langgraph.types import interrupt, Command
class State(TypedDict):
text_1: str
text_2: str
def human_node_1(state: State):
value = interrupt({"text_to_revise": state["text_1"]})
return {"text_1": value}
def human_node_2(state: State):
value = interrupt({"text_to_revise": state["text_2"]})
return {"text_2": value}
graph_builder = StateGraph(State)
graph_builder.add_node("human_node_1", human_node_1)
graph_builder.add_node("human_node_2", human_node_2)
# 从 START 并行添加两个节点
graph_builder.add_edge(START, "human_node_1")
graph_builder.add_edge(START, "human_node_2")
checkpointer = InMemorySaver()
graph = graph_builder.compile(checkpointer=checkpointer)
thread_id = str(uuid.uuid4())
config: RunnableConfig = {"configurable": {"thread_id": thread_id}}
result = graph.invoke(
{"text_1": "原始文本 1", "text_2": "原始文本 2"}, config=config
)
# 使用中断 ID 到值的映射恢复
resume_map = {
i.id: f"为 {i.value['text_to_revise']} 编辑的文本"
for i in graph.get_state(config).interrupts
}
print(graph.invoke(Command(resume=resume_map), config=config))
# > {'text_1': '为原始文本 1 编辑的文本', 'text_2': '为原始文本 2 编辑的文本'}
常见模式
您可以使用 interrupt 和 Command 实现四种典型设计模式:
- 批准或拒绝:在关键步骤(如 API 调用)前暂停图,以审查并批准操作。如果操作被拒绝,您可以阻止图执行该步骤,并可能采取替代操作。此模式通常涉及根据人工输入路由图。
- 编辑图状态:暂停图以审查并编辑图状态。这对于纠正错误或使用附加信息更新状态非常有用。此模式通常涉及使用人工输入更新状态。
- 审查工具调用:暂停图以在工具执行前审查并编辑 LLM 请求的工具调用。
- 验证人工输入:暂停图以在进行下一步之前验证人工输入。
下面展示了可以使用 interrupt 和 Command 实现的不同设计模式。
批准或拒绝
Depending on the human’s approval or rejection, the graph can proceed with the action or take an alternative path
在关键步骤(如 API 调用)前暂停图,以审查并批准操作。如果操作被拒绝,您可以阻止图执行该步骤,并可能采取替代操作。
from typing import Literal
from langgraph.types import interrupt, Command
def human_approval(state: State) -> Command[Literal["some_node", "another_node"]]:
is_approved = interrupt(
{
"question": "这正确吗?",
# 展示应由人工审查和批准的输出。
"llm_output": state["llm_output"]
}
)
if is_approved:
return Command(goto="some_node")
else:
return Command(goto="another_node")
# 在适当位置将节点添加到图中
# 并将其连接到相关节点。
graph_builder.add_node("human_approval", human_approval)
graph = graph_builder.compile(checkpointer=checkpointer)
# 运行图并触发中断后,图将暂停。
# 使用批准或拒绝恢复它。
thread_config = {"configurable": {"thread_id": "some_id"}}
graph.invoke(Command(resume=True), config=thread_config)
from typing import Literal, TypedDict
import uuid
from langgraph.constants import START, END
from langgraph.graph import StateGraph
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import InMemorySaver
# 定义共享图状态
class State(TypedDict):
llm_output: str
decision: str
# 模拟 LLM 输出节点
def generate_llm_output(state: State) -> State:
return {"llm_output": "这是生成的输出。"}
# 人工批准节点
def human_approval(state: State) -> Command[Literal["approved_path", "rejected_path"]]:
decision = interrupt({
"question": "您是否批准以下输出?",
"llm_output": state["llm_output"]
})
if decision == "approve":
return Command(goto="approved_path", update={"decision": "approved"})
else:
return Command(goto="rejected_path", update={"decision": "rejected"})
# 批准后的后续步骤
def approved_node(state: State) -> State:
print("✅ 采取了批准路径。")
return state
# 拒绝后的替代路径
def rejected_node(state: State) -> State:
print("❌ 采取了拒绝路径。")
return state
# 构建图
builder = StateGraph(State)
builder.add_node("generate_llm_output", generate_llm_output)
builder.add_node("human_approval", human_approval)
builder.add_node("approved_path", approved_node)
builder.add_node("rejected_path", rejected_node)
builder.set_entry_point("generate_llm_output")
builder.add_edge("generate_llm_output", "human_approval")
builder.add_edge("approved_path", END)
builder.add_edge("rejected_path", END)
checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer)
# 运行直到中断
config = {"configurable": {"thread_id": uuid.uuid4()}}
result = graph.invoke({}, config=config)
print(result["__interrupt__"])
# 输出:
# Interrupt(value={'question': '您是否批准以下输出?', 'llm_output': '这是生成的输出。'}, ...)
# 模拟使用人工输入恢复
# 要测试拒绝,请将 resume="approve" 替换为 resume="reject"
final_result = graph.invoke(Command(resume="approve"), config=config)
print(final_result)
审查和编辑状态
A human can review and edit the state of the graph. This is useful for correcting mistakes or updating the state with additional information
from langgraph.types import interrupt
def human_editing(state: State):
...
result = interrupt(
# 要呈现给客户端的中断信息。
# 可以是任何 JSON 可序列化的值。
{
"task": "审查 LLM 的输出并进行必要的编辑。",
"llm_generated_summary": state["llm_generated_summary"]
}
)
# 使用编辑后的文本更新状态
return {
"llm_generated_summary": result["edited_text"]
}
# 在适当位置将节点添加到图中
# 并将其连接到相关节点。
graph_builder.add_node("human_editing", human_editing)
graph = graph_builder.compile(checkpointer=checkpointer)
...
# 运行图并触发中断后,图将暂停。
# 使用编辑后的文本恢复它。
thread_config = {"configurable": {"thread_id": "some_id"}}
graph.invoke(
Command(resume={"edited_text": "已编辑的文本"}),
config=thread_config
)
from typing import TypedDict
import uuid
from langgraph.constants import START, END
from langgraph.graph import StateGraph
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import InMemorySaver
# 定义图状态
class State(TypedDict):
summary: str
# 模拟 LLM 摘要生成
def generate_summary(state: State) -> State:
return {
"summary": "猫坐在垫子上,望着星星。"
}
# 人工审查编辑节点
def human_review_edit(state: State) -> State:
result = interrupt({
"task": "请审查并根据需要编辑生成的摘要。",
"generated_summary": state["summary"]
})
return {
"summary": result["edited_summary"]
}
# 模拟下游对编辑后摘要的使用
def downstream_use(state: State) -> State:
print(f"✅ 使用编辑后的摘要:{state['summary']}")
return state
# 构建图
builder = StateGraph(State)
builder.add_node("generate_summary", generate_summary)
builder.add_node("human_review_edit", human_review_edit)
builder.add_node("downstream_use", downstream_use)
builder.set_entry_point("generate_summary")
builder.add_edge("generate_summary", "human_review_edit")
builder.add_edge("human_review_edit", "downstream_use")
builder.add_edge("downstream_use", END)
# 设置内存检查点以支持中断
checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer)
# 调用图直到触发中断
config = {"configurable": {"thread_id": uuid.uuid4()}}
result = graph.invoke({}, config=config)
# 输出中断有效载荷
print(result["__interrupt__"])
# 示例输出:
# > [
# > Interrupt(
# > value={
# > 'task': '请审查并根据需要编辑生成的摘要。',
# > 'generated_summary': '猫坐在垫子上,望着星星。'
# > },
# > id='...'
# > )
# > ]
# 使用人工编辑的输入恢复图
edited_summary = "猫躺在地毯上,平静地凝视着夜空。"
resumed_result = graph.invoke(
Command(resume={"edited_summary": edited_summary}),
config=config
)
print(resumed_result)
审查工具调用
A human can review and edit the output from the LLM before proceeding. This is particularly critical in applications where the tool calls requested by the LLM may be sensitive or require human oversight.
要为工具添加人工批准步骤:
- 在工具中使用
interrupt() 暂停执行。
- 使用
Command 根据人工输入继续执行。
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import interrupt
from langgraph.prebuilt import create_react_agent
# 一个需要人工审查/批准的敏感工具示例
def book_hotel(hotel_name: str):
"""预订酒店"""
response = interrupt( # (1)!
f"尝试调用 `book_hotel`,参数为 {{'hotel_name': {hotel_name}}}。 "
"请批准或建议编辑。"
)
if response["type"] == "accept":
pass
elif response["type"] == "edit":
hotel_name = response["args"]["hotel_name"]
else:
raise ValueError(f"未知响应类型:{response['type']}")
return f"成功预订了 {hotel_name} 的住宿。"
checkpointer = InMemorySaver() # (2)!
agent = create_react_agent(
model="anthropic:claude-3-5-sonnet-latest",
tools=[book_hotel],
checkpointer=checkpointer, # (3)!
)
interrupt 函数 在特定节点处暂停代理图。在此情况下,我们在工具函数开头调用 interrupt(),这会在执行工具的节点处暂停图。interrupt() 内的信息(如工具调用)可呈现给人类,图可根据用户输入(工具调用批准、编辑或反馈)恢复。
InMemorySaver 用于在工具调用循环的每一步存储代理状态。这启用了短期记忆和人在回路功能。在此示例中,我们使用 InMemorySaver 将代理状态存储在内存中。在生产应用中,代理状态将存储在数据库中。
- 使用
checkpointer 初始化代理。
使用 stream() 方法运行代理,传入 config 对象以指定线程 ID。这允许代理在未来的调用中恢复相同的对话。
config = {
"configurable": {
"thread_id": "1"
}
}
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "预订 McKittrick 酒店的住宿"}]},
config
):
print(chunk)
print("\n")
您应该看到代理运行直到达到 interrupt() 调用,此时它会暂停并等待人工输入。
使用 Command 恢复代理以根据人工输入继续执行。
from langgraph.types import Command
for chunk in agent.stream(
Command(resume={"type": "accept"}), # (1)!
# Command(resume={"type": "edit", "args": {"hotel_name": "McKittrick Hotel"}}),
config
):
print(chunk)
print("\n")
-
interrupt 函数 与 Command 对象结合使用,以人工提供的值恢复图。
-
@interrupt 函数与 @Command 对象结合使用,以便用人类提供的值恢复图的执行。
:::
为任意工具添加中断
你可以创建一个包装器,为任意工具添加中断功能。下面的示例提供了一个与 Agent Inbox UI 和 Agent Chat UI 兼容的参考实现。
from typing import Callable
from langchain_core.tools import BaseTool, tool as create_tool
from langchain_core.runnables import RunnableConfig
from langgraph.types import interrupt
from langgraph.prebuilt.interrupt import HumanInterruptConfig, HumanInterrupt
def add_human_in_the_loop(
tool: Callable | BaseTool,
*,
interrupt_config: HumanInterruptConfig = None,
) -> BaseTool:
"""包装工具以支持人在回路审查。"""
if not isinstance(tool, BaseTool):
tool = create_tool(tool)
if interrupt_config is None:
interrupt_config = {
"allow_accept": True,
"allow_edit": True,
"allow_respond": True,
}
@create_tool( # (1)!
tool.name,
description=tool.description,
args_schema=tool.args_schema
)
def call_tool_with_interrupt(config: RunnableConfig, **tool_input):
request: HumanInterrupt = {
"action_request": {
"action": tool.name,
"args": tool_input
},
"config": interrupt_config,
"description": "请审查工具调用"
}
response = interrupt([request])[0] # (2)!
# 批准工具调用
if response["type"] == "accept":
tool_response = tool.invoke(tool_input, config)
# 更新工具调用参数
elif response["type"] == "edit":
tool_input = response["args"]["args"]
tool_response = tool.invoke(tool_input, config)
# 使用用户反馈响应LLM
elif response["type"] == "response":
user_feedback = response["args"]
tool_response = user_feedback
else:
raise ValueError(f"不支持的中断响应类型: {response['type']}")
return tool_response
return call_tool_with_interrupt
- 此包装器创建一个新工具,在执行被包装的工具之前调用
interrupt()。
interrupt() 使用 Agent Inbox UI 期望的特殊输入和输出格式: - 将 [HumanInterrupt] 对象列表发送给 AgentInbox,向最终用户渲染中断信息 - AgentInbox 提供的恢复值是一个列表(即 Command(resume=[...]))
你可以使用此包装器为任何工具添加 interrupt(),而无需在工具内部添加它:
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.prebuilt import create_react_agent
checkpointer = InMemorySaver()
def book_hotel(hotel_name: str):
"""预订酒店"""
return f"成功预订了 {hotel_name} 的住宿。"
agent = create_react_agent(
model="anthropic:claude-3-5-sonnet-latest",
tools=[
add_human_in_the_loop(book_hotel), # (1)!
],
checkpointer=checkpointer,
)
config = {"configurable": {"thread_id": "1"}}
# 运行代理
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "预订 McKittrick 酒店的住宿"}]},
config
):
print(chunk)
print("\n")
add_human_in_the_loop 包装器用于为工具添加 interrupt()。这允许代理在继续执行工具调用之前暂停并等待人类输入。
你应该会看到代理运行直到它到达 interrupt() 调用,
此时它会暂停并等待人类输入。
使用 Command 恢复代理以根据人类输入继续执行。
from langgraph.types import Command
for chunk in agent.stream(
Command(resume=[{"type": "accept"}]),
# Command(resume=[{"type": "edit", "args": {"args": {"hotel_name": "McKittrick Hotel"}}}]),
config
):
print(chunk)
print("\n")
验证人类输入
如果你需要在图本身内验证人类提供的输入(而不是在客户端),可以通过在一个节点内使用多个中断调用来实现。
from langgraph.types import interrupt
def human_node(state: State):
"""带验证的人类节点。"""
question = "你的年龄是多少?"
while True:
answer = interrupt(question)
# 验证答案,如果答案无效则再次请求输入。
if not isinstance(answer, int) or answer < 0:
question = f"'{answer} 不是有效的年龄。你的年龄是多少?"
answer = None
continue
else:
# 如果答案有效,我们可以继续。
break
print(f"回路中的人类年龄为 {answer} 岁。")
return {
"age": answer
}
from typing import TypedDict
import uuid
from langgraph.constants import START, END
from langgraph.graph import StateGraph
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import InMemorySaver
# 定义图状态
class State(TypedDict):
age: int
# 请求人类输入并验证它的节点
def get_valid_age(state: State) -> State:
prompt = "请输入您的年龄(必须是非负整数)。"
while True:
user_input = interrupt(prompt)
# 验证输入
try:
age = int(user_input)
if age < 0:
raise ValueError("年龄必须是非负数。")
break # 收到有效输入
except (ValueError, TypeError):
prompt = f"'{user_input}' 无效。请输入一个非负整数作为年龄。"
return {"age": age}
# 使用有效输入的节点
def report_age(state: State) -> State:
print(f"✅ 人类年龄为 {state['age']} 岁。")
return state
# 构建图
builder = StateGraph(State)
builder.add_node("get_valid_age", get_valid_age)
builder.add_node("report_age", report_age)
builder.set_entry_point("get_valid_age")
builder.add_edge("get_valid_age", "report_age")
builder.add_edge("report_age", END)
# 使用内存检查点创建图
checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer)
# 运行图直到第一次中断
config = {"configurable": {"thread_id": uuid.uuid4()}}
result = graph.invoke({}, config=config)
print(result["__interrupt__"]) # 第一次提示:“请输入您的年龄...”
# 模拟无效输入(例如,字符串而不是整数)
result = graph.invoke(Command(resume="not a number"), config=config)
print(result["__interrupt__"]) # 带验证消息的后续提示
# 模拟第二次无效输入(例如,负数)
result = graph.invoke(Command(resume="-10"), config=config)
print(result["__interrupt__"]) # 另一次重试
# 提供有效输入
final_result = graph.invoke(Command(resume="25"), config=config)
print(final_result) # 应包含有效年龄
使用中断进行调试
要调试和测试图,请使用静态中断(也称为静态断点)逐步执行图,一次一个节点,或在特定节点暂停图的执行。静态中断在定义的点触发,要么在节点执行之前,要么在节点执行之后。你可以在编译时或运行时通过指定 interrupt_before 和 interrupt_after 来设置静态中断。
不建议将静态中断用于人在回路工作流。请改用动态中断。
graph = graph_builder.compile( # (1)!
interrupt_before=["node_a"], # (2)!
interrupt_after=["node_b", "node_c"], # (3)!
checkpointer=checkpointer, # (4)!
)
config = {
"configurable": {
"thread_id": "some_thread"
}
}
# 运行图直到断点
graph.invoke(inputs, config=thread_config) # (5)!
# 恢复图
graph.invoke(None, config=thread_config) # (6)!
- 断点在
compile 时设置。
interrupt_before 指定应在执行节点之前暂停执行的节点。
interrupt_after 指定应在执行节点之后暂停执行的节点。
- 必须启用检查点才能使用断点。
- 图运行直到命中第一个断点。
- 通过传入
None 作为输入来恢复图。这将运行图直到命中下一个断点。
from IPython.display import Image, display
from typing_extensions import TypedDict
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import StateGraph, START, END
class State(TypedDict):
input: str
def step_1(state):
print("---Step 1---")
pass
def step_2(state):
print("---Step 2---")
pass
def step_3(state):
print("---Step 3---")
pass
builder = StateGraph(State)
builder.add_node("step_1", step_1)
builder.add_node("step_2", step_2)
builder.add_node("step_3", step_3)
builder.add_edge(START, "step_1")
builder.add_edge("step_1", "step_2")
builder.add_edge("step_2", "step_3")
builder.add_edge("step_3", END)
# 设置检查点
checkpointer = InMemorySaver() # (1)!
graph = builder.compile(
checkpointer=checkpointer, # (2)!
interrupt_before=["step_3"] # (3)!
)
# 查看
display(Image(graph.get_graph().draw_mermaid_png()))
# 输入
initial_input = {"input": "hello world"}
# 线程
thread = {"configurable": {"thread_id": "1"}}
# 运行图直到第一次中断
for event in graph.stream(initial_input, thread, stream_mode="values"):
print(event)
# 这将运行直到断点
# 此时你可以获取图的状态
print(graph.get_state(config))
# 你可以通过传入 `None` 作为输入来继续图的执行
for event in graph.stream(None, thread, stream_mode="values"):
print(event)
在 LangGraph Studio 中使用静态中断
你可以使用 LangGraph Studio 来调试你的图。你可以在 UI 中设置静态断点,然后运行图。你也可以使用 UI 在执行的任何时刻检查图的状态。
image
使用 langgraph dev 本地部署的应用程序,LangGraph Studio 是免费的。
注意事项
使用人在回路时,有一些注意事项需要牢记。
与有副作用的代码一起使用
将具有副作用的代码(例如 API 调用)放在 interrupt 之后或放在单独的节点中,以避免重复执行,因为这些代码在节点每次恢复时都会重新触发。
from langgraph.types import interrupt
def human_node(state: State):
"""带验证的人类节点。"""
answer = interrupt(question)
api_call(answer) # 放在中断后,没问题
与作为函数调用的子图一起使用
当作为函数调用子图时,父图将从调用子图的节点开头恢复执行,该节点是触发 interrupt 的地方。同样,子图将从调用 interrupt() 函数的节点开头恢复执行。
def node_in_parent_graph(state: State):
some_code() # <-- 当子图恢复时,这段代码将重新执行。
# 作为函数调用子图。
# 子图包含一个 `interrupt` 调用。
subgraph_result = subgraph.invoke(some_input)
...
假设我们有一个包含 3 个节点的父图:父图: node_1 → node_2 (子图调用) → node_3而子图有 3 个节点,其中第二个节点包含一个 interrupt:子图: sub_node_1 → sub_node_2 (interrupt) → sub_node_3当恢复图时,执行将按以下顺序进行:
- 跳过父图中的
node_1(已执行,图状态已保存在快照中)。
- 从头开始重新执行父图中的
node_2。
- 跳过子图中的
sub_node_1(已执行,图状态已保存在快照中)。
- 从头开始重新执行子图中的
sub_node_2。
- 继续执行
sub_node_3 及后续节点。
下面是一个简化的示例代码,你可以用它来理解子图如何与中断一起工作。
它计算每个节点被进入的次数并打印计数。import uuid
from typing import TypedDict
from langgraph.graph import StateGraph
from langgraph.constants import START
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import InMemorySaver
class State(TypedDict):
"""图状态。"""
state_counter: int
counter_node_in_subgraph = 0
def node_in_subgraph(state: State):
"""子图中的节点。"""
global counter_node_in_subgraph
counter_node_in_subgraph += 1 # 这段代码将**不会**再次运行!
print(f"总共进入了 `node_in_subgraph` {counter_node_in_subgraph} 次")
counter_human_node = 0
def human_node(state: State):
global counter_human_node
counter_human_node += 1 # 这段代码将再次运行!
print(f"总共进入了子图中的 human_node {counter_human_node} 次")
answer = interrupt("你叫什么名字?")
print(f"得到了答案 {answer}")
checkpointer = InMemorySaver()
subgraph_builder = StateGraph(State)
subgraph_builder.add_node("some_node", node_in_subgraph)
subgraph_builder.add_node("human_node", human_node)
subgraph_builder.add_edge(START, "some_node")
subgraph_builder.add_edge("some_node", "human_node")
subgraph = subgraph_builder.compile(checkpointer=checkpointer)
counter_parent_node = 0
def parent_node(state: State):
"""这个父节点将调用子图。"""
global counter_parent_node
counter_parent_node += 1 # 在恢复时这段代码将再次运行!
print(f"总共进入了 `parent_node` {counter_parent_node} 次")
# 请注意,我们有意增加图状态中的状态计数器
# 以证明子图对相同键的更新不会与父图冲突(直到
subgraph_state = subgraph.invoke(state)
return subgraph_state
builder = StateGraph(State)
builder.add_node("parent_node", parent_node)
builder.add_edge(START, "parent_node")
# 必须启用检查点才能使中断正常工作!
checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer)
config = {
"configurable": {
"thread_id": uuid.uuid4(),
}
}
for chunk in graph.stream({"state_counter": 1}, config):
print(chunk)
print('--- 恢复 ---')
for chunk in graph.stream(Command(resume="35"), config):
print(chunk)
这将打印出总共进入了 `parent_node` 1 次
总共进入了 `node_in_subgraph` 1 次
总共进入了子图中的 human_node 1 次
{'__interrupt__': (Interrupt(value='你叫什么名字?', id='...'),)}
--- 恢复 ---
总共进入了 `parent_node` 2 次
总共进入了子图中的 human_node 2 次
得到了答案 35
{'parent_node': {'state_counter': 1}}
在单个节点中使用多个中断
在单个节点中使用多个中断对于像验证人类输入这样的模式很有帮助。但是,如果不小心处理,在同一个节点中使用多个中断可能会导致意外行为。
当一个节点包含多个中断调用时,LangGraph 会为执行该节点的任务保留一个恢复值列表。每当执行恢复时,它都会从节点的开头开始。对于遇到的每个中断,LangGraph 会检查任务的恢复列表中是否存在匹配的值。匹配是严格基于索引的,因此节点内中断调用的顺序至关重要。
为避免问题,请避免在执行之间动态更改节点的结构。这包括添加、删除或重新排序中断调用,因为此类更改可能导致索引不匹配。这些问题通常源于非常规模式,例如通过 Command(resume=..., update=SOME_STATE_MUTATION) 突变状态或依赖全局变量来动态修改节点结构。
import uuid
from typing import TypedDict, Optional
from langgraph.graph import StateGraph
from langgraph.constants import START
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import InMemorySaver
class State(TypedDict):
"""图状态。"""
age: Optional[str]
name: Optional[str]
def human_node(state: State):
if not state.get('name'):
name = interrupt("你叫什么名字?")
else:
name = "N/A"
if not state.get('age'):
age = interrupt("你的年龄是多少?")
else:
age = "N/A"
print(f"姓名: {name}. 年龄: {age}")
return {
"age": age,
"name": name,
}
builder = StateGraph(State)
builder.add_node("human_node", human_node)
builder.add_edge(START, "human_node")
# 必须启用检查点才能使中断正常工作!
checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer)
config = {
"configurable": {
"thread_id": uuid.uuid4(),
}
}
for chunk in graph.stream({"age": None, "name": None}, config):
print(chunk)
for chunk in graph.stream(Command(resume="John", update={"name": "foo"}), config):
print(chunk)
{'__interrupt__': (Interrupt(value='你叫什么名字?', id='...'),)}
姓名: N/A. 年龄: John
{'human_node': {'age': 'John', 'name': 'N/A'}}