注意: Pregel 运行时的名称来源于 Google 的 Pregel 算法,该算法描述了一种使用图进行大规模并行计算的高效方法。
概述
在 LangGraph 中,Pregel 将 参与者(actors) 和 通道(channels) 组合到一个单一应用程序中。参与者 从通道读取数据并向通道写入数据。Pregel 按照 Pregel 算法/批量同步并行(Bulk Synchronous Parallel) 模型组织应用程序的执行步骤。 每个步骤包含三个阶段:- 规划(Plan):确定本步骤中要执行哪些 参与者。例如,在第一步中,选择订阅特殊 输入 通道的 参与者;在后续步骤中,选择订阅上一步中已更新通道的 参与者。
- 执行(Execution):并行执行所有选定的 参与者,直到全部完成、其中一个失败或达到超时时间。在此阶段,通道更新对参与者不可见,直到下一步。
- 更新(Update):使用本步骤中 参与者 写入的值更新通道。
参与者(Actors)
一个 参与者 是一个PregelNode。它订阅通道,从通道读取数据并向通道写入数据。可以将其视为 Pregel 算法中的一个 参与者。PregelNodes 实现了 LangChain 的 Runnable 接口。
通道(Channels)
通道用于在参与者(PregelNodes)之间通信。每个通道都有一个值类型、一个更新类型和一个更新函数——该函数接收一系列更新并修改存储的值。通道可用于将数据从一个链发送到另一个链,或在未来的步骤中将数据从一个链发送到其自身。LangGraph 提供了若干内置通道:- LastValue:默认通道,存储发送到该通道的最后一个值,适用于输入和输出值,或用于在步骤之间传递数据。
- Topic:可配置的发布-订阅主题,适用于在 参与者 之间发送多个值,或累积输出。可配置为去重或在多个步骤中累积值。
- BinaryOperatorAggregate:存储一个持久值,通过将二元运算符应用于当前值和发送到通道的每个更新来更新,适用于在多个步骤中计算聚合值;例如,
total = BinaryOperatorAggregate(int, operator.add)
示例
虽然大多数用户将通过 StateGraph API 或 entrypoint 装饰器与 Pregel 交互,但也可以直接与 Pregel 交互。 以下是几个不同的示例,帮助您了解 Pregel API。- 单节点
- 多节点
- Topic
- BinaryOperatorAggregate
- 循环
高级 API
LangGraph 提供了两种用于创建 Pregel 应用程序的高级 API:StateGraph (图 API) 和 函数式 API。- StateGraph (图 API)
- 函数式 API
StateGraph (Graph API) 是一个更高级的抽象,简化了 Pregel 应用程序的创建。它允许您定义节点和边的图。当您编译图时,StateGraph API 会自动为您创建 Pregel 应用程序。编译后的 Pregel 实例将关联一组节点和通道。您可以通过打印它们来检查节点和通道。您将看到类似以下内容:您应看到类似以下内容: