Skip to main content
函数式 API 允许您在对现有代码进行最小改动的前提下,为应用程序添加 LangGraph 的核心功能 —— 持久化记忆人在回路流式传输 它旨在将这些功能集成到可能使用标准语言原语(如 if 语句、for 循环和函数调用)进行分支和控制流的现有代码中。与许多要求将代码重构为显式管道或 DAG 的数据编排框架不同,函数式 API 允许您在不强制执行固定执行模型的情况下引入这些功能。 函数式 API 使用两个关键构建块:
  • @entrypoint – 将函数标记为工作流的起点,封装逻辑并管理执行流程,包括处理长时间运行的任务和中断。
  • @task – 代表一个离散的工作单元,例如 API 调用或数据处理步骤,可在入口点内异步执行。任务返回一个类似 future 的对象,可以同步等待或解析。
这为构建具有状态管理和流式传输的工作流提供了最小抽象。
有关如何使用函数式 API 的信息,请参阅 使用函数式 API

函数式 API 与图 API

对于偏好声明式方法的用户,LangGraph 的 图 API 允许您使用图范式定义工作流。两个 API 共享相同的底层运行时,因此您可以在同一应用程序中同时使用它们。 以下是主要区别:
  • 控制流:函数式 API 不需要考虑图结构。您可以使用标准 Python 结构定义工作流。这通常会减少您需要编写的代码量。
  • 短期记忆GraphAPI 要求声明一个 State,并可能需要定义 reducers 来管理图状态的更新。@entrypoint@tasks 不需要显式状态管理,因为它们的状态作用域限定在函数内,不跨函数共享。
  • 检查点:两个 API 都会生成和使用检查点。在 Graph API 中,每个 超级步骤 后都会生成一个新的检查点。在 函数式 API 中,当任务执行时,其结果会保存到与给定入口点关联的现有检查点中,而不是创建新检查点。
  • 可视化:图 API 便于将工作流可视化为图,这对调试、理解工作流和与他人共享非常有用。函数式 API 不支持可视化,因为图是在运行时动态生成的。

示例

下面演示一个编写文章并中断以请求人工审核的简单应用程序。
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.func import entrypoint, task
from langgraph.types import interrupt

@task
def write_essay(topic: str) -> str:
    """撰写关于指定主题的文章。"""
    time.sleep(1) # 长时间运行任务的占位符。
    return f"关于主题 {topic} 的文章"

@entrypoint(checkpointer=InMemorySaver())
def workflow(topic: str) -> dict:
    """一个编写文章并请求审核的简单工作流。"""
    essay = write_essay("cat").result()
    is_approved = interrupt({
        # 传递给 interrupt 的任何 JSON 可序列化负载。
        # 在从工作流流式传输数据时,它将在客户端显示为中断。
        "essay": essay, # 我们希望审核的文章。
        # 我们可以添加任何需要的额外信息。
        # 例如,引入一个名为 "action" 的键并附上一些说明。
        "action": "请批准/拒绝该文章",
    })

    return {
        "essay": essay, # 生成的文章
        "is_approved": is_approved, # 来自 HIL 的响应
    }
此工作流将撰写一篇关于“猫”主题的文章,然后暂停以获取人工审核。工作流可以无限期中断,直到提供审核。当工作流恢复时,它会从头开始执行,但由于 writeEssay 任务的结果已保存,任务结果将从检查点加载,而不是重新计算。
import time
import uuid
from langgraph.func import entrypoint, task
from langgraph.types import interrupt
from langgraph.checkpoint.memory import InMemorySaver


@task
def write_essay(topic: str) -> str:
    """撰写关于指定主题的文章。"""
    time.sleep(1)  # 这是长时间运行任务的占位符。
    return f"关于主题 {topic} 的文章"

@entrypoint(checkpointer=InMemorySaver())
def workflow(topic: str) -> dict:
    """一个编写文章并请求审核的简单工作流。"""
    essay = write_essay("cat").result()
    is_approved = interrupt(
        {
            # 传递给 interrupt 的任何 JSON 可序列化负载。
            # 在从工作流流式传输数据时,它将在客户端显示为中断。
            "essay": essay,  # 我们希望审核的文章。
            # 我们可以添加任何需要的额外信息。
            # 例如,引入一个名为 "action" 的键并附上一些说明。
            "action": "请批准/拒绝该文章",
        }
    )
    return {
        "essay": essay,  # 生成的文章
        "is_approved": is_approved,  # 来自 HIL 的响应
    }


thread_id = str(uuid.uuid4())
config = {"configurable": {"thread_id": thread_id}}
for item in workflow.stream("cat", config):
    print(item)
# > {'write_essay': '关于主题 cat 的文章'}
# > {
# >     '__interrupt__': (
# >        Interrupt(
# >            value={
# >                'essay': '关于主题 cat 的文章',
# >                'action': '请批准/拒绝该文章'
# >            },
# >            id='b9b2b9d788f482663ced6dc755c9e981'
# >        ),
# >    )
# > }
文章已撰写完毕,准备审核。一旦提供审核,我们可以恢复工作流:
from langgraph.types import Command

# 从用户获取审核(例如,通过 UI)
# 在此情况下,我们使用布尔值,但可以是任何 JSON 可序列化的值。
human_review = True

for item in workflow.stream(Command(resume=human_review), config):
    print(item)
{'workflow': {'essay': '关于主题 cat 的文章', 'is_approved': False}}
工作流已完成,审核已添加到文章中。

入口点

@entrypoint 装饰器可用于从函数创建工作流。它封装工作流逻辑并管理执行流程,包括处理_长时间运行的任务_和中断

定义

通过使用 @entrypoint 装饰器装饰函数来定义 入口点 函数必须接受一个位置参数,该参数作为工作流输入。如果需要传递多个数据片段,请使用字典作为第一个参数的输入类型。 使用 entrypoint 装饰函数会产生一个 Pregel 实例,用于帮助管理工作流的执行(例如,处理流式传输、恢复和检查点)。 通常,您需要向 @entrypoint 装饰器传递一个 检查点器 以启用持久化并使用人在回路等功能。
  • 同步
  • 异步
from langgraph.func import entrypoint

@entrypoint(checkpointer=checkpointer)
def my_workflow(some_input: dict) -> int:
    # 一些可能涉及长时间运行任务(如 API 调用)的逻辑,
    # 并可能因人在回路而中断。
    ...
    return result
序列化 入口点的输入输出必须是 JSON 可序列化的,以支持检查点。请参阅序列化部分了解详情。

可注入参数

声明 entrypoint 时,您可以请求在运行时自动注入的附加参数。这些参数包括:
参数说明
previous访问给定线程的上一个 checkpoint 关联的状态。请参阅 短期记忆
store[BaseStore][langgraph.store.base.BaseStore] 的实例。用于 长期记忆
writer在使用 Async Python < 3.11 时访问 StreamWriter。请参阅 使用函数式 API 进行流式传输的详细信息
config用于访问运行时配置。请参阅 RunnableConfig 了解信息。
使用适当的名称和类型注解声明参数。
from langchain_core.runnables import RunnableConfig
from langgraph.func import entrypoint
from langgraph.store.base import BaseStore
from langgraph.store.memory import InMemoryStore

in_memory_store = InMemoryStore(...)  # 用于长期记忆的 InMemoryStore 实例

@entrypoint(
    checkpointer=checkpointer,  # 指定检查点器
    store=in_memory_store  # 指定存储
)
def my_workflow(
    some_input: dict,  # 输入(例如,通过 `invoke` 传递)
    *,
    previous: Any = None, # 用于短期记忆
    store: BaseStore,  # 用于长期记忆
    writer: StreamWriter,  # 用于流式传输自定义数据
    config: RunnableConfig  # 用于访问传递给入口点的配置
) -> ...:

执行

使用 @entrypoint 会产生一个 Pregel 对象,可以使用 invokeainvokestreamastream 方法执行。
  • 调用
  • 异步调用
  • 流式传输
  • 异步流式传输
config = {
    "configurable": {
        "thread_id": "some_thread_id"
    }
}
my_workflow.invoke(some_input, config)  # 同步等待结果

恢复

interrupt 后恢复执行可以通过向 Command 原语传递 resume 值来完成。
  • 调用
  • 异步调用
  • 流式传输
  • 异步流式传输
from langgraph.types import Command

config = {
    "configurable": {
        "thread_id": "some_thread_id"
    }
}

my_workflow.invoke(Command(resume=some_resume_value), config)
错误后恢复 要在错误后恢复,请使用 None 和相同的 线程 ID(配置)运行 entrypoint 这假设底层错误已解决,执行可以成功继续。
  • 调用
  • 异步调用
  • 流式传输
  • 异步流式传输

config = {
    "configurable": {
        "thread_id": "some_thread_id"
    }
}

my_workflow.invoke(None, config)

短期记忆

当使用 checkpointer 定义 entrypoint 时,它会在同一线程 ID的连续调用之间将信息存储在检查点中。 这允许使用 previous 参数访问上一次调用的状态。 默认情况下,previous 参数是上一次调用的返回值。
@entrypoint(checkpointer=checkpointer)
def my_workflow(number: int, *, previous: Any = None) -> int:
    previous = previous or 0
    return number + previous

config = {
    "configurable": {
        "thread_id": "some_thread_id"
    }
}

my_workflow.invoke(1, config)  # 1 (previous 为 None)
my_workflow.invoke(2, config)  # 3 (previous 为上次调用的 1)

entrypoint.final

entrypoint.final 是一个可以从入口点返回的特殊原语,允许解耦保存在检查点中的值入口点的返回值 第一个值是入口点的返回值,第二个值是将保存在检查点中的值。类型注解为 entrypoint.final[return_type, save_type]
@entrypoint(checkpointer=checkpointer)
def my_workflow(number: int, *, previous: Any = None) -> entrypoint.final[int, int]:
    previous = previous or 0
    # 这将向调用者返回 previous 值,将
    # 2 * number 保存到检查点,将在下一次调用中
    # 用于 `previous` 参数。
    return entrypoint.final(value=previous, save=2 * number)

config = {
    "configurable": {
        "thread_id": "1"
    }
}

my_workflow.invoke(3, config)  # 0 (previous 为 None)
my_workflow.invoke(1, config)  # 6 (previous 为上次调用的 3 * 2)
:::js entrypoint.final