Skip to main content
持久化执行 是一种在关键节点保存进程或工作流进度的技术,使其能够暂停并在之后从停止处精确恢复。该技术在需要 人工介入(human-in-the-loop) 的场景中尤为有用——用户可在继续执行前检查、验证或修改流程;同时也适用于可能遭遇中断或错误(例如调用大语言模型时超时)的长时间运行任务。通过保留已完成的工作,持久化执行允许流程在无需重新处理先前步骤的情况下恢复——即使间隔较长时间(如一周后)亦可。 LangGraph 内置的 持久化层 为工作流提供持久化执行能力,确保每个执行步骤的状态均被保存至持久化存储中。该功能保证:无论工作流因系统故障中断,还是因 人工介入 而暂停,均可从其最后记录的状态恢复执行。
如果您在使用 LangGraph 时启用了检查点(checkpointer),则持久化执行功能已默认开启。您可在任意时刻暂停和恢复工作流,即使遭遇中断或故障亦无妨。 为充分利用持久化执行,请确保您的工作流设计为 确定性幂等性,并将任何副作用或非确定性操作封装在 任务(tasks) 中。您可在 StateGraph(图 API)函数式 API 中使用 任务(tasks)

要求

要在 LangGraph 中启用持久化执行,您需:
  1. 通过指定一个 检查点器(checkpointer) 来启用工作流的 持久化功能,以保存工作流进度。
  2. 在执行工作流时指定一个 线程标识符(thread identifier),用于跟踪特定工作流实例的执行历史。
  3. 将任何非确定性操作(如随机数生成)或具有副作用的操作(如文件写入、API 调用)封装在 tasks 中,以确保工作流恢复时,这些操作不会被重复执行,而是从持久化层中检索其结果。更多信息请参阅 确定性与一致重放

确定性与一致重放

当您恢复一个工作流运行时,代码并非从执行停止的同一行代码处恢复;而是会识别一个合适的 起始点,从该点开始继续执行。这意味着工作流将从 起始点 开始重放所有步骤,直至到达其先前停止的位置。 因此,在编写支持持久化执行的工作流时,您必须将所有非确定性操作(如随机数生成)和具有副作用的操作(如文件写入、API 调用)封装在 任务(tasks)节点(nodes) 中。 为确保工作流具有确定性并可一致重放,请遵循以下准则:
  • 避免重复工作:若一个 节点(node) 包含多个具有副作用的操作(如日志记录、文件写入或网络调用),请将每个操作封装在一个独立的 任务 中。这可确保工作流恢复时,这些操作不会被重复执行,其结果将从持久化层中检索。
  • 封装非确定性操作:将任何可能产生非确定性结果的代码(如随机数生成)封装在 任务节点 中。这可确保工作流恢复时,遵循完全相同的记录步骤序列并获得相同结果。
  • 使用幂等操作:尽可能确保副作用操作(如 API 调用、文件写入)是幂等的。这意味着若操作因工作流失败而重试,其效果应与首次执行相同。这对导致数据写入的操作尤为重要。若 任务 启动但未能成功完成,工作流恢复时将重新运行该 任务,并依赖记录的结果以保持一致性。请使用幂等键或验证现有结果,以避免意外重复,确保工作流执行顺畅且可预测。
有关需避免的常见陷阱示例,请参阅函数式 API 中的 常见陷阱 部分,其中展示了如何使用 任务 组织代码以避免这些问题。这些原则同样适用于 StateGraph(图 API)

持久化模式

LangGraph 支持三种持久化模式,允许您根据应用程序需求在性能与数据一致性之间取得平衡。持久化模式按持久化强度由低到高排列如下: 更高的持久化模式会为工作流执行增加更多开销。
v0.6.0 新增功能 请使用 durability 参数替代 checkpoint_during(v0.6.0 中已弃用)来管理持久化策略:
  • durability="async" 替代 checkpoint_during=True
  • durability="exit" 替代 checkpoint_during=False
持久化策略映射关系如下:
  • checkpoint_during=True -> durability="async"
  • checkpoint_during=False -> durability="exit"

"exit"

仅在图执行完成时(无论成功或失败)持久化更改。此模式为长时间运行的图提供最佳性能,但意味着中间状态不会被保存,因此无法从执行中途的故障中恢复,也无法中断图的执行。

"async"

在执行下一步的同时异步持久化更改。此模式在性能与持久化之间取得良好平衡,但存在极小风险:若进程在执行期间崩溃,检查点可能未被写入。

"sync"

在下一步开始前同步持久化更改。此模式确保在继续执行前写入每个检查点,以牺牲部分性能为代价提供高持久化保障。 您可在调用任何图执行方法时指定持久化模式:
graph.stream(
    {"input": "test"},
    durability="sync"
)

在节点中使用任务

若一个 节点(node) 包含多个操作,您可能会发现将每个操作转换为 任务 比将其重构为独立节点更为简便。
  • 原始版本
  • 使用任务
from typing import NotRequired
from typing_extensions import TypedDict
import uuid

from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import StateGraph, START, END
import requests

# 定义一个 TypedDict 表示状态
class State(TypedDict):
    url: str
    result: NotRequired[str]

def call_api(state: State):
    """示例节点,执行 API 请求。"""
    result = requests.get(state['url']).text[:100]  # 副作用
    return {
        "result": result
    }

# 创建 StateGraph 构建器并为 call_api 函数添加节点
builder = StateGraph(State)
builder.add_node("call_api", call_api)

# 将开始和结束节点连接到 call_api 节点
builder.add_edge(START, "call_api")
builder.add_edge("call_api", END)

# 指定检查点器
checkpointer = InMemorySaver()

# 使用检查点器编译图
graph = builder.compile(checkpointer=checkpointer)

# 定义包含线程 ID 的配置。
thread_id = uuid.uuid4()
config = {"configurable": {"thread_id": thread_id}}

# 调用图
graph.invoke({"url": "https://www.example.com"}, config)

恢复工作流

在您的工作流中启用持久化执行后,您可在以下场景中恢复执行:
  • 暂停与恢复工作流:使用 interrupt 函数在特定点暂停工作流,并使用 Command 原语以更新后的状态恢复执行。详见 人工介入(Human-in-the-Loop)
  • 从故障中恢复:在发生异常(如大语言模型服务中断)后,自动从最后一个成功检查点恢复工作流。这涉及使用相同的线程标识符执行工作流,并提供 None 作为输入值(参见函数式 API 的 示例)。

恢复工作流的起始点

  • 若您使用的是 StateGraph(图 API),起始点为执行停止处的 节点 开始处。
  • 若您在节点内调用子图,起始点将是调用被中止子图的父节点。 在子图内部,起始点将是执行停止处的特定 节点
  • 若您使用函数式 API,起始点为执行停止处的 入口点(entrypoint) 开始处。