有关函数式 API 的概念性信息,请参阅 函数式 API。
创建简单的工作流
定义entrypoint 时,输入仅限于函数的第一个参数。若要传递多个输入,可以使用字典。
Copy
@entrypoint(checkpointer=checkpointer)
def my_workflow(inputs: dict) -> int:
value = inputs["value"]
another_value = inputs["another_value"]
...
my_workflow.invoke({"value": 1, "another_value": 2})
扩展示例:简单工作流
扩展示例:简单工作流
Copy
import uuid
from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import InMemorySaver
# 检查数字是否为偶数的任务
@task
def is_even(number: int) -> bool:
return number % 2 == 0
# 格式化消息的任务
@task
def format_message(is_even: bool) -> str:
return "该数字是偶数。" if is_even else "该数字是奇数。"
# 创建用于持久化的检查点器
checkpointer = InMemorySaver()
@entrypoint(checkpointer=checkpointer)
def workflow(inputs: dict) -> str:
"""用于分类数字的简单工作流。"""
even = is_even(inputs["number"]).result()
return format_message(even).result()
# 使用唯一的线程 ID 运行工作流
config = {"configurable": {"thread_id": str(uuid.uuid4())}}
result = workflow.invoke({"number": 7}, config=config)
print(result)
扩展示例:使用 LLM 撰写文章
扩展示例:使用 LLM 撰写文章
本示例展示了如何在语法上使用
@task 和 @entrypoint 装饰器。
由于提供了检查点器,工作流的结果将被持久化存储在检查点器中。Copy
import uuid
from langchain.chat_models import init_chat_model
from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import InMemorySaver
llm = init_chat_model('openai:gpt-3.5-turbo')
# 使用 LLM 生成文章的任务
@task
def compose_essay(topic: str) -> str:
"""生成关于给定主题的文章。"""
return llm.invoke([
{"role": "system", "content": "你是一个乐于助人的助手,专门撰写文章。"},
{"role": "user", "content": f"写一篇关于{topic}的文章。"}
]).content
# 创建用于持久化的检查点器
checkpointer = InMemorySaver()
@entrypoint(checkpointer=checkpointer)
def workflow(topic: str) -> str:
"""使用 LLM 生成文章的简单工作流。"""
return compose_essay(topic).result()
# 执行工作流
config = {"configurable": {"thread_id": str(uuid.uuid4())}}
result = workflow.invoke("飞行的历史", config=config)
print(result)
并行执行
可以通过并发调用任务并等待结果来实现并行执行。这对于提高 I/O 密集型任务(例如调用 LLM 的 API)的性能非常有用。Copy
@task
def add_one(number: int) -> int:
return number + 1
@entrypoint(checkpointer=checkpointer)
def graph(numbers: list[int]) -> list[str]:
futures = [add_one(i) for i in numbers]
return [f.result() for f in futures]
扩展示例:并行 LLM 调用
扩展示例:并行 LLM 调用
本示例展示了如何使用 此示例使用 LangGraph 的并发模型来提高执行时间,特别是在任务涉及 I/O(如 LLM 补全)时。
@task 并行运行多个 LLM 调用。每个调用生成关于不同主题的段落,并将结果合并为单一文本输出。Copy
import uuid
from langchain.chat_models import init_chat_model
from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import InMemorySaver
# 初始化 LLM 模型
llm = init_chat_model("openai:gpt-3.5-turbo")
# 生成关于给定主题段落的任务
@task
def generate_paragraph(topic: str) -> str:
response = llm.invoke([
{"role": "system", "content": "你是一个乐于助人的助手,专门撰写教育性段落。"},
{"role": "user", "content": f"写一段关于{topic}的内容。"}
])
return response.content
# 创建用于持久化的检查点器
checkpointer = InMemorySaver()
@entrypoint(checkpointer=checkpointer)
def workflow(topics: list[str]) -> str:
"""并行生成多个段落并将其组合。"""
futures = [generate_paragraph(topic) for topic in topics]
paragraphs = [f.result() for f in futures]
return "\n\n".join(paragraphs)
# 运行工作流
config = {"configurable": {"thread_id": str(uuid.uuid4())}}
result = workflow.invoke(["量子计算", "气候变化", "航空历史"], config=config)
print(result)
调用图
函数式 API 和 图 API 可在同一应用程序中结合使用,因为它们共享相同的底层运行时。Copy
from langgraph.func import entrypoint
from langgraph.graph import StateGraph
builder = StateGraph()
...
some_graph = builder.compile()
@entrypoint()
def some_workflow(some_input: dict) -> int:
# 调用使用图 API 定义的图
result_1 = some_graph.invoke(...)
# 调用另一个使用图 API 定义的图
result_2 = another_graph.invoke(...)
return {
"result_1": result_1,
"result_2": result_2
}
扩展示例:从函数式 API 调用简单图
扩展示例:从函数式 API 调用简单图
Copy
import uuid
from typing import TypedDict
from langgraph.func import entrypoint
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import StateGraph
# 定义共享状态类型
class State(TypedDict):
foo: int
# 定义一个简单的转换节点
def double(state: State) -> State:
return {"foo": state["foo"] * 2}
# 使用图 API 构建图
builder = StateGraph(State)
builder.add_node("double", double)
builder.set_entry_point("double")
graph = builder.compile()
# 定义函数式 API 工作流
checkpointer = InMemorySaver()
@entrypoint(checkpointer=checkpointer)
def workflow(x: int) -> dict:
result = graph.invoke({"foo": x})
return {"bar": result["foo"]}
# 执行工作流
config = {"configurable": {"thread_id": str(uuid.uuid4())}}
print(workflow.invoke(5, config=config)) # 输出:{'bar': 10}
调用其他入口点
您可以在 入口点 或 任务 内部调用其他 入口点。Copy
@entrypoint() # 将自动使用父入口点的检查点器
def some_other_workflow(inputs: dict) -> int:
return inputs["value"]
@entrypoint(checkpointer=checkpointer)
def my_workflow(inputs: dict) -> int:
value = some_other_workflow.invoke({"value": 1})
return value
扩展示例:调用另一个入口点
扩展示例:调用另一个入口点
Copy
import uuid
from langgraph.func import entrypoint
from langgraph.checkpoint.memory import InMemorySaver
# 初始化检查点器
checkpointer = InMemorySaver()
# 可重用的子工作流,用于将数字相乘
@entrypoint()
def multiply(inputs: dict) -> int:
return inputs["a"] * inputs["b"]
# 调用子工作流的主工作流
@entrypoint(checkpointer=checkpointer)
def main(inputs: dict) -> dict:
result = multiply.invoke({"a": inputs["x"], "b": inputs["y"]})
return {"product": result}
# 执行主工作流
config = {"configurable": {"thread_id": str(uuid.uuid4())}}
print(main.invoke({"x": 6, "y": 7}, config=config)) # 输出:{'product': 42}
流式处理
函数式 API 使用与 图 API 相同的流式处理机制。更多详细信息请参阅 流式处理指南 部分。 使用流式 API 同时流式传输更新和自定义数据的示例。Copy
from langgraph.func import entrypoint
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.config import get_stream_writer # (1)!
checkpointer = InMemorySaver()
@entrypoint(checkpointer=checkpointer)
def main(inputs: dict) -> int:
writer = get_stream_writer() # (2)!
writer("开始处理") # (3)!
result = inputs["x"] * 2
writer(f"结果为 {result}") # (4)!
return result
config = {"configurable": {"thread_id": "abc"}}
for mode, chunk in main.stream( # (5)!
{"x": 5},
stream_mode=["custom", "updates"], # (6)!
config=config
):
print(f"{mode}: {chunk}")
- 从
langgraph.config导入get_stream_writer。 - 在入口点内获取流式写入器实例。
- 在计算开始前发送自定义数据。
- 在计算结果后发送另一条自定义消息。
- 使用
.stream()处理流式输出。 - 指定要使用的流式模式。
Copy
('updates', {'add_one': 2})
('updates', {'add_two': 3})
('custom', 'hello')
('custom', 'world')
('updates', {'main': 5})
Python < 3.11 的异步支持
如果使用 Python < 3.11 并编写异步代码,
get_stream_writer() 将无法工作。请直接使用 StreamWriter 类。更多详情请参阅 Python < 3.11 的异步支持。Copy
from langgraph.types import StreamWriter
@entrypoint(checkpointer=checkpointer)
async def main(inputs: dict, writer: StreamWriter) -> int:
...
重试策略
Copy
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.func import entrypoint, task
from langgraph.types import RetryPolicy
# 此变量仅用于演示目的,以模拟网络故障。
# 实际代码中不会包含此变量。
attempts = 0
# 配置 RetryPolicy 以在 ValueError 时重试。
# 默认的 RetryPolicy 针对特定网络错误的重试进行了优化。
retry_policy = RetryPolicy(retry_on=ValueError)
@task(retry_policy=retry_policy)
def get_info():
global attempts
attempts += 1
if attempts < 2:
raise ValueError('失败')
return "OK"
checkpointer = InMemorySaver()
@entrypoint(checkpointer=checkpointer)
def main(inputs, writer):
return get_info().result()
config = {
"configurable": {
"thread_id": "1"
}
}
main.invoke({'any_input': 'foobar'}, config=config)
Copy
'OK'
缓存任务
Copy
import time
from langgraph.cache.memory import InMemoryCache
from langgraph.func import entrypoint, task
from langgraph.types import CachePolicy
@task(cache_policy=CachePolicy(ttl=120)) # (1)!
def slow_add(x: int) -> int:
time.sleep(1)
return x * 2
@entrypoint(cache=InMemoryCache())
def main(inputs: dict) -> dict[str, int]:
result1 = slow_add(inputs["x"]).result()
result2 = slow_add(inputs["x"]).result()
return {"result1": result1, "result2": result2}
for chunk in main.stream({"x": 5}, stream_mode="updates"):
print(chunk)
#> {'slow_add': 10}
#> {'slow_add': 10, '__metadata__': {'cached': True}}
#> {'main': {'result1': 10, 'result2': 10}}
ttl以秒为单位指定。缓存将在该时间后失效。
错误后恢复
Copy
import time
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.func import entrypoint, task
from langgraph.types import StreamWriter
# 此变量仅用于演示目的,以模拟网络故障。
# 实际代码中不会包含此变量。
attempts = 0
@task()
def get_info():
"""
模拟一个在失败一次后成功的任务。
第一次尝试时抛出异常,后续尝试返回 "OK"。
"""
global attempts
attempts += 1
if attempts < 2:
raise ValueError("失败") # 模拟第一次尝试失败
return "OK"
# 初始化用于持久化的内存检查点器
checkpointer = InMemorySaver()
@task
def slow_task():
"""
通过引入 1 秒延迟来模拟一个运行缓慢的任务。
"""
time.sleep(1)
return "已运行慢速任务。"
@entrypoint(checkpointer=checkpointer)
def main(inputs, writer: StreamWriter):
"""
主工作流函数,按顺序运行 slow_task 和 get_info 任务。
参数:
- inputs: 包含工作流输入值的字典。
- writer: 用于流式传输自定义数据的 StreamWriter。
工作流首先执行 `slow_task`,然后尝试执行 `get_info`,
该任务在第一次调用时会失败。
"""
slow_task_result = slow_task().result() # 阻塞调用 slow_task
get_info().result() # 第一次尝试时将在此处抛出异常
return slow_task_result
# 带有唯一线程标识符的工作流执行配置
config = {
"configurable": {
"thread_id": "1" # 用于跟踪工作流执行的唯一标识符
}
}
# 此次调用将因 slow_task 的执行而耗时约 1 秒
try:
# 第一次调用将因 `get_info` 任务失败而抛出异常
main.invoke({'any_input': 'foobar'}, config=config)
except ValueError:
pass # 优雅地处理失败
slow_task,因为其结果已保存在检查点中。
Copy
main.invoke(None, config=config)
Copy
'已运行慢速任务。'
人机协同
函数式 API 使用interrupt 函数和 Command 原语支持 人机协同 工作流。
基本人机协同工作流
我们将创建三个 任务:- 追加
"bar"。 - 暂停以等待人工输入。恢复时,追加人工输入。
- 追加
"qux"。
Copy
from langgraph.func import entrypoint, task
from langgraph.types import Command, interrupt
@task
def step_1(input_query):
"""追加 bar。"""
return f"{input_query} bar"
@task
def human_feedback(input_query):
"""追加用户输入。"""
feedback = interrupt(f"请提供反馈:{input_query}")
return f"{input_query} {feedback}"
@task
def step_3(input_query):
"""追加 qux。"""
return f"{input_query} qux"
我们现在可以使用 entrypoint 将这些任务组合起来:
Copy
from langgraph.checkpoint.memory import InMemorySaver
checkpointer = InMemorySaver()
@entrypoint(checkpointer=checkpointer)
def graph(input_query):
result_1 = step_1(input_query).result()
result_2 = human_feedback(result_1).result()
result_3 = step_3(result_2).result()
return result_3
step_1 —— 会被持久化保存,因此在 interrupt 后不会再次执行。
让我们传入一个查询字符串:
Copy
config = {"configurable": {"thread_id": "1"}}
for event in graph.stream("foo", config):
print(event)
print("\n")
step_1 之后我们因 interrupt 而暂停。中断会提供恢复运行的指令。要恢复运行,我们需发送一个包含 human_feedback 任务所需数据的 Command:
Copy
# 继续执行
for event in graph.stream(Command(resume="baz"), config):
print(event)
print("\n")
审查工具调用
要在执行前审查工具调用,我们添加一个调用interrupt 的 review_tool_call 函数。调用此函数时,执行将暂停,直到我们发出恢复命令。
给定一个工具调用,我们的函数将 interrupt 以供人工审查。此时我们可以选择:
- 接受该工具调用
- 修改工具调用后继续
- 生成自定义工具消息(例如,指示模型重新格式化其工具调用)
Copy
from typing import Union
def review_tool_call(tool_call: ToolCall) -> Union[ToolCall, ToolMessage]:
"""审查工具调用,返回经验证的版本。"""
human_review = interrupt(
{
"question": "这个调用正确吗?",
"tool_call": tool_call,
}
)
review_action = human_review["action"]
review_data = human_review.get("data")
if review_action == "continue":
return tool_call
elif review_action == "update":
updated_tool_call = {**tool_call, **{"args": review_data}}
return updated_tool_call
elif review_action == "feedback":
return ToolMessage(
content=review_data, name=tool_call["name"], tool_call_id=tool_call["id"]
)
ToolMessage。先前任务的结果——例如初始模型调用——会被持久化,因此在 interrupt 后不会再次执行。
Copy
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph.message import add_messages
from langgraph.types import Command, interrupt
checkpointer = InMemorySaver()
@entrypoint(checkpointer=checkpointer)
def agent(messages, previous):
if previous is not None:
messages = add_messages(previous, messages)
llm_response = call_model(messages).result()
while True:
if not llm_response.tool_calls:
break
# 审查工具调用
tool_results = []
tool_calls = []
for i, tool_call in enumerate(llm_response.tool_calls):
review = review_tool_call(tool_call)
if isinstance(review, ToolMessage):
tool_results.append(review)
else: # 是已验证的工具调用
tool_calls.append(review)
if review != tool_call:
llm_response.tool_calls[i] = review # 更新消息
# 执行剩余工具调用
tool_result_futures = [call_tool(tool_call) for tool_call in tool_calls]
remaining_tool_results = [fut.result() for fut in tool_result_futures]
# 追加到消息列表
messages = add_messages(
messages,
[llm_response, *tool_results, *remaining_tool_results],
)
# 再次调用模型
llm_response = call_model(messages).result()
# 生成最终响应
messages = add_messages(messages, llm_response)
return entrypoint.final(value=llm_response, save=messages)
短期记忆
短期记忆允许在同一个 thread id 的不同 调用 之间存储信息。更多详情请参阅 短期记忆。管理检查点
你可以查看和删除检查点器存储的信息。查看线程状态
Copy
config = {
"configurable": {
"thread_id": "1",
# 可选:提供特定检查点的 ID,
# 否则显示最新检查点
# "checkpoint_id": "1f029ca3-1f5b-6704-8004-820c16b69a5a"
}
}
graph.get_state(config)
Copy
StateSnapshot(
values={'messages': [HumanMessage(content="hi! I'm bob"), AIMessage(content='Hi Bob! How are you doing today?), HumanMessage(content="what's my name?"), AIMessage(content='Your name is Bob.')]}, next=(),
config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f029ca3-1f5b-6704-8004-820c16b69a5a'}},
metadata={
'source': 'loop',
'writes': {'call_model': {'messages': AIMessage(content='Your name is Bob.')}},
'step': 4,
'parents': {},
'thread_id': '1'
},
created_at='2025-05-05T16:01:24.680462+00:00',
parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f029ca3-1790-6b0a-8003-baf965b6a38f'}},
tasks=(),
interrupts=()
)
查看线程历史
Copy
config = {
"configurable": {
"thread_id": "1"
}
}
list(graph.get_state_history(config))
Copy
[
StateSnapshot(
values={'messages': [HumanMessage(content="hi! I'm bob"), AIMessage(content='Hi Bob! How are you doing today? Is there anything I can help you with?'), HumanMessage(content="what's my name?"), AIMessage(content='Your name is Bob.')]},
next=(),
config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f029ca3-1f5b-6704-8004-820c16b69a5a'}},
metadata={'source': 'loop', 'writes': {'call_model': {'messages': AIMessage(content='Your name is Bob.')}}, 'step': 4, 'parents': {}, 'thread_id': '1'},
created_at='2025-05-05T16:01:24.680462+00:00',
parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f029ca3-1790-6b0a-8003-baf965b6a38f'}},
tasks=(),
interrupts=()
),
StateSnapshot(
values={'messages': [HumanMessage(content="hi! I'm bob"), AIMessage(content='Hi Bob! How are you doing today? Is there anything I can help you with?'), HumanMessage(content="what's my name?")]},
next=('call_model',),
config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f029ca3-1790-6b0a-8003-baf965b6a38f'}},
metadata={'source': 'loop', 'writes': None, 'step': 3, 'parents': {}, 'thread_id': '1'},
created_at='2025-05-05T16:01:23.863421+00:00',
parent_config={...}
tasks=(PregelTask(id='8ab4155e-6b15-b885-9ce5-bed69a2c305c', name='call_model', path=('__pregel_pull', 'call_model'), error=None, interrupts=(), state=None, result={'messages': AIMessage(content='Your name is Bob.')}),),
interrupts=()
),
StateSnapshot(
values={'messages': [HumanMessage(content="hi! I'm bob"), AIMessage(content='Hi Bob! How are you doing today? Is there anything I can help you with?')]},
next=('__start__',),
config={...},
metadata={'source': 'input', 'writes': {'__start__': {'messages': [{'role': 'user', 'content': "what's my name?"}]}}, 'step': 2, 'parents': {}, 'thread_id': '1'},
created_at='2025-05-05T16:01:23.863173+00:00',
parent_config={...}
tasks=(PregelTask(id='24ba39d6-6db1-4c9b-f4c5-682aeaf38dcd', name='__start__', path=('__pregel_pull', '__start__'), error=None, interrupts=(), state=None, result={'messages': [{'role': 'user', 'content': "what's my name?"}]}),),
interrupts=()
),
StateSnapshot(
values={'messages': [HumanMessage(content="hi! I'm bob"), AIMessage(content='Hi Bob! How are you doing today? Is there anything I can help you with?')]},
next=(),
config={...},
metadata={'source': 'loop', 'writes': {'call_model': {'messages': AIMessage(content='Hi Bob! How are you doing today? Is there anything I can help you with?')}}, 'step': 1, 'parents': {}, 'thread_id': '1'},
created_at='2025-05-05T16:01:23.862295+00:00',
parent_config={...}
tasks=(),
interrupts=()
),
StateSnapshot(
values={'messages': [HumanMessage(content="hi! I'm bob")]},
next=('call_model',),
config={...},
metadata={'source': 'loop', 'writes': None, 'step': 0, 'parents': {}, 'thread_id': '1'},
created_at='2025-05-05T16:01:22.278960+00:00',
parent_config={...}
tasks=(PregelTask(id='8cbd75e0-3720-b056-04f7-71ac805140a0', name='call_model', path=('__pregel_pull', 'call_model'), error=None, interrupts=(), state=None, result={'messages': AIMessage(content='Hi Bob! How are you doing today? Is there anything I can help you with?')}),),
interrupts=()
),
StateSnapshot(
values={'messages': []},
next=('__start__',),
config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f029ca3-0870-6ce2-bfff-1f3f14c3e565'}},
metadata={'source': 'input', 'writes': {'__start__': {'messages': [{'role': 'user', 'content': "hi! I'm bob"}]}}, 'step': -1, 'parents': {}, 'thread_id': '1'},
created_at='2025-05-05T16:01:22.277497+00:00',
parent_config=None,
tasks=(PregelTask(id='d458367b-8265-812c-18e2-33001d199ce6', name='__start__', path=('__pregel_pull', '__start__'), error=None, interrupts=(), state=None, result={'messages': [{'role': 'user', 'content': "hi! I'm bob"}]}),),
interrupts=()
)
]
解耦返回值与保存值
使用entrypoint.final 可以解耦返回给调用者的内容与保存到检查点的内容。这在以下场景中很有用:
- 你想返回一个计算结果(如摘要或状态),但为下次调用保存不同的内部值。
- 你需要控制下次运行时传递给
previous参数的内容。
Copy
from typing import Optional
from langgraph.func import entrypoint
from langgraph.checkpoint.memory import InMemorySaver
checkpointer = InMemorySaver()
@entrypoint(checkpointer=checkpointer)
def accumulate(n: int, *, previous: Optional[int]) -> entrypoint.final[int, int]:
previous = previous or 0
total = previous + n
# 向调用者返回 *之前* 的值,但将 *新的* 总数保存到检查点。
return entrypoint.final(value=previous, save=total)
config = {"configurable": {"thread_id": "my-thread"}}
print(accumulate.invoke(1, config=config)) # 0
print(accumulate.invoke(2, config=config)) # 1
print(accumulate.invoke(3, config=config)) # 3
聊天机器人示例
以下是一个使用函数式 API 和InMemorySaver 检查点器的简单聊天机器人示例。机器人能够记住之前的对话并从中断处继续。
Copy
from langchain_core.messages import BaseMessage
from langgraph.graph import add_messages
from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import InMemorySaver
from langchain_anthropic import ChatAnthropic
model = ChatAnthropic(model="claude-3-5-sonnet-latest")
@task
def call_model(messages: list[BaseMessage]):
response = model.invoke(messages)
return response
checkpointer = InMemorySaver()
@entrypoint(checkpointer=checkpointer)
def workflow(inputs: list[BaseMessage], *, previous: list[BaseMessage]):
if previous:
inputs = add_messages(previous, inputs)
response = call_model(inputs).result()
return entrypoint.final(value=response, save=add_messages(inputs, response))
config = {"configurable": {"thread_id": "1"}}
input_message = {"role": "user", "content": "hi! I'm bob"}
for chunk in workflow.stream([input_message], config, stream_mode="values"):
chunk.pretty_print()
input_message = {"role": "user", "content": "what's my name?"}
for chunk in workflow.stream([input_message], config, stream_mode="values"):
chunk.pretty_print()
长期记忆
长期记忆 允许在不同的 thread id 之间存储信息。这在一次对话中学习某个用户的信息并在另一次对话中使用时非常有用。工作流
- 更多使用函数式 API 构建工作流的示例,请参阅 工作流与智能体 指南。
与其他库集成
- 使用函数式API将LangGraph的功能添加到其他框架:为其他未原生提供持久化、记忆和流式处理等功能的代理框架添加LangGraph的这些特性。