Documentation Index Fetch the complete documentation index at: https://langchain.idochub.dev/llms.txt
Use this file to discover all available pages before exploring further.
函数式 API 允许您在对现有代码进行最小改动的前提下,为应用程序添加 LangGraph 的核心功能 —— 持久化 、记忆 、人在回路 和 流式传输 。
它旨在将这些功能集成到可能使用标准语言原语(如 if 语句、for 循环和函数调用)进行分支和控制流的现有代码中。与许多要求将代码重构为显式管道或 DAG 的数据编排框架不同,函数式 API 允许您在不强制执行固定执行模型的情况下引入这些功能。
函数式 API 使用两个关键构建块:
@entrypoint – 将函数标记为工作流的起点,封装逻辑并管理执行流程,包括处理长时间运行的任务和中断。
@task – 代表一个离散的工作单元,例如 API 调用或数据处理步骤,可在入口点内异步执行。任务返回一个类似 future 的对象,可以同步等待或解析。
这为构建具有状态管理和流式传输的工作流提供了最小抽象。
函数式 API 与图 API
对于偏好声明式方法的用户,LangGraph 的 图 API 允许您使用图范式定义工作流。两个 API 共享相同的底层运行时,因此您可以在同一应用程序中同时使用它们。
以下是主要区别:
控制流 :函数式 API 不需要考虑图结构。您可以使用标准 Python 结构定义工作流。这通常会减少您需要编写的代码量。
短期记忆 :GraphAPI 要求声明一个 State ,并可能需要定义 reducers 来管理图状态的更新。@entrypoint 和 @tasks 不需要显式状态管理,因为它们的状态作用域限定在函数内,不跨函数共享。
检查点 :两个 API 都会生成和使用检查点。在 Graph API 中,每个 超级步骤 后都会生成一个新的检查点。在 函数式 API 中,当任务执行时,其结果会保存到与给定入口点关联的现有检查点中,而不是创建新检查点。
可视化 :图 API 便于将工作流可视化为图,这对调试、理解工作流和与他人共享非常有用。函数式 API 不支持可视化,因为图是在运行时动态生成的。
下面演示一个编写文章并中断 以请求人工审核的简单应用程序。
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.func import entrypoint, task
from langgraph.types import interrupt
@task
def write_essay ( topic : str ) -> str :
"""撰写关于指定主题的文章。"""
time.sleep( 1 ) # 长时间运行任务的占位符。
return f "关于主题 { topic } 的文章"
@entrypoint ( checkpointer = InMemorySaver())
def workflow ( topic : str ) -> dict :
"""一个编写文章并请求审核的简单工作流。"""
essay = write_essay( "cat" ).result()
is_approved = interrupt({
# 传递给 interrupt 的任何 JSON 可序列化负载。
# 在从工作流流式传输数据时,它将在客户端显示为中断。
"essay" : essay, # 我们希望审核的文章。
# 我们可以添加任何需要的额外信息。
# 例如,引入一个名为 "action" 的键并附上一些说明。
"action" : "请批准/拒绝该文章" ,
})
return {
"essay" : essay, # 生成的文章
"is_approved" : is_approved, # 来自 HIL 的响应
}
此工作流将撰写一篇关于“猫”主题的文章,然后暂停以获取人工审核。工作流可以无限期中断,直到提供审核。 当工作流恢复时,它会从头开始执行,但由于 writeEssay 任务的结果已保存,任务结果将从检查点加载,而不是重新计算。 import time
import uuid
from langgraph.func import entrypoint, task
from langgraph.types import interrupt
from langgraph.checkpoint.memory import InMemorySaver
@task
def write_essay ( topic : str ) -> str :
"""撰写关于指定主题的文章。"""
time.sleep( 1 ) # 这是长时间运行任务的占位符。
return f "关于主题 { topic } 的文章"
@entrypoint ( checkpointer = InMemorySaver())
def workflow ( topic : str ) -> dict :
"""一个编写文章并请求审核的简单工作流。"""
essay = write_essay( "cat" ).result()
is_approved = interrupt(
{
# 传递给 interrupt 的任何 JSON 可序列化负载。
# 在从工作流流式传输数据时,它将在客户端显示为中断。
"essay" : essay, # 我们希望审核的文章。
# 我们可以添加任何需要的额外信息。
# 例如,引入一个名为 "action" 的键并附上一些说明。
"action" : "请批准/拒绝该文章" ,
}
)
return {
"essay" : essay, # 生成的文章
"is_approved" : is_approved, # 来自 HIL 的响应
}
thread_id = str (uuid.uuid4())
config = { "configurable" : { "thread_id" : thread_id}}
for item in workflow.stream( "cat" , config):
print (item)
# > {'write_essay': '关于主题 cat 的文章'}
# > {
# > '__interrupt__': (
# > Interrupt(
# > value={
# > 'essay': '关于主题 cat 的文章',
# > 'action': '请批准/拒绝该文章'
# > },
# > id='b9b2b9d788f482663ced6dc755c9e981'
# > ),
# > )
# > }
文章已撰写完毕,准备审核。一旦提供审核,我们可以恢复工作流: from langgraph.types import Command
# 从用户获取审核(例如,通过 UI)
# 在此情况下,我们使用布尔值,但可以是任何 JSON 可序列化的值。
human_review = True
for item in workflow.stream(Command( resume = human_review), config):
print (item)
{'workflow': {'essay': '关于主题 cat 的文章', 'is_approved': False}}
工作流已完成,审核已添加到文章中。
入口点
@entrypoint 装饰器可用于从函数创建工作流。它封装工作流逻辑并管理执行流程,包括处理_长时间运行的任务_和中断 。
通过使用 @entrypoint 装饰器装饰函数来定义 入口点 。
函数必须接受一个位置参数 ,该参数作为工作流输入。如果需要传递多个数据片段,请使用字典作为第一个参数的输入类型。
使用 entrypoint 装饰函数会产生一个 Pregel 实例,用于帮助管理工作流的执行(例如,处理流式传输、恢复和检查点)。
通常,您需要向 @entrypoint 装饰器传递一个 检查点器 以启用持久化并使用人在回路 等功能。
from langgraph.func import entrypoint
@entrypoint ( checkpointer = checkpointer)
def my_workflow ( some_input : dict ) -> int :
# 一些可能涉及长时间运行任务(如 API 调用)的逻辑,
# 并可能因人在回路而中断。
...
return result
from langgraph.func import entrypoint
@entrypoint ( checkpointer = checkpointer)
async def my_workflow ( some_input : dict ) -> int :
# 一些可能涉及长时间运行任务(如 API 调用)的逻辑,
# 并可能因人在回路而中断
...
return result
序列化
入口点的输入 和输出 必须是 JSON 可序列化的,以支持检查点。请参阅序列化 部分了解详情。
可注入参数
声明 entrypoint 时,您可以请求在运行时自动注入的附加参数。这些参数包括:
参数 说明 previous 访问给定线程的上一个 checkpoint 关联的状态。请参阅 短期记忆 。 store [BaseStore][langgraph.store.base.BaseStore] 的实例。用于 长期记忆 。 writer 在使用 Async Python < 3.11 时访问 StreamWriter。请参阅 使用函数式 API 进行流式传输的详细信息 。 config 用于访问运行时配置。请参阅 RunnableConfig 了解信息。
from langchain_core.runnables import RunnableConfig
from langgraph.func import entrypoint
from langgraph.store.base import BaseStore
from langgraph.store.memory import InMemoryStore
in_memory_store = InMemoryStore( ... ) # 用于长期记忆的 InMemoryStore 实例
@entrypoint (
checkpointer = checkpointer, # 指定检查点器
store = in_memory_store # 指定存储
)
def my_workflow (
some_input : dict , # 输入(例如,通过 `invoke` 传递)
* ,
previous : Any = None , # 用于短期记忆
store : BaseStore, # 用于长期记忆
writer : StreamWriter, # 用于流式传输自定义数据
config : RunnableConfig # 用于访问传递给入口点的配置
) -> ... :
使用 @entrypoint 会产生一个 Pregel 对象,可以使用 invoke、ainvoke、stream 和 astream 方法执行。
config = {
"configurable" : {
"thread_id" : "some_thread_id"
}
}
my_workflow.invoke(some_input, config) # 同步等待结果
config = {
"configurable" : {
"thread_id" : "some_thread_id"
}
}
await my_workflow.ainvoke(some_input, config) # 异步等待结果
config = {
"configurable" : {
"thread_id" : "some_thread_id"
}
}
for chunk in my_workflow.stream(some_input, config):
print (chunk)
config = {
"configurable" : {
"thread_id" : "some_thread_id"
}
}
async for chunk in my_workflow.astream(some_input, config):
print (chunk)
在 interrupt 后恢复执行可以通过向 Command 原语传递 resume 值来完成。
from langgraph.types import Command
config = {
"configurable" : {
"thread_id" : "some_thread_id"
}
}
my_workflow.invoke(Command( resume = some_resume_value), config)
from langgraph.types import Command
config = {
"configurable" : {
"thread_id" : "some_thread_id"
}
}
await my_workflow.ainvoke(Command( resume = some_resume_value), config)
from langgraph.types import Command
config = {
"configurable" : {
"thread_id" : "some_thread_id"
}
}
for chunk in my_workflow.stream(Command( resume = some_resume_value), config):
print (chunk)
from langgraph.types import Command
config = {
"configurable" : {
"thread_id" : "some_thread_id"
}
}
async for chunk in my_workflow.astream(Command( resume = some_resume_value), config):
print (chunk)
错误后恢复
要在错误后恢复,请使用 None 和相同的 线程 ID (配置)运行 entrypoint。
这假设底层错误 已解决,执行可以成功继续。
config = {
"configurable" : {
"thread_id" : "some_thread_id"
}
}
my_workflow.invoke( None , config)
config = {
"configurable" : {
"thread_id" : "some_thread_id"
}
}
await my_workflow.ainvoke( None , config)
config = {
"configurable" : {
"thread_id" : "some_thread_id"
}
}
for chunk in my_workflow.stream( None , config):
print (chunk)
config = {
"configurable" : {
"thread_id" : "some_thread_id"
}
}
async for chunk in my_workflow.astream( None , config):
print (chunk)
短期记忆
当使用 checkpointer 定义 entrypoint 时,它会在同一线程 ID 的连续调用之间将信息存储在检查点 中。
这允许使用 previous 参数访问上一次调用的状态。
默认情况下,previous 参数是上一次调用的返回值。
@entrypoint ( checkpointer = checkpointer)
def my_workflow ( number : int , * , previous : Any = None ) -> int :
previous = previous or 0
return number + previous
config = {
"configurable" : {
"thread_id" : "some_thread_id"
}
}
my_workflow.invoke( 1 , config) # 1 (previous 为 None)
my_workflow.invoke( 2 , config) # 3 (previous 为上次调用的 1)
entrypoint.final
entrypoint.final 是一个可以从入口点返回的特殊原语,允许解耦 保存在检查点中的值 与入口点的返回值 。
第一个值是入口点的返回值,第二个值是将保存在检查点中的值。类型注解为 entrypoint.final[return_type, save_type]。
@entrypoint ( checkpointer = checkpointer)
def my_workflow ( number : int , * , previous : Any = None ) -> entrypoint.final[ int , int ]:
previous = previous or 0
# 这将向调用者返回 previous 值,将
# 2 * number 保存到检查点,将在下一次调用中
# 用于 `previous` 参数。
return entrypoint.final( value = previous, save = 2 * number)
config = {
"configurable" : {
"thread_id" : "1"
}
}
my_workflow.invoke( 3 , config) # 0 (previous 为 None)
my_workflow.invoke( 1 , config) # 6 (previous 为上次调用的 3 * 2)
:::js
entrypoint.final 是