Documentation Index
Fetch the complete documentation index at: https://langchain.idochub.dev/llms.txt
Use this file to discover all available pages before exploring further.
Pregel 实现了 LangGraph 的运行时,负责管理 LangGraph 应用程序的执行。
编译一个 StateGraph 或创建一个 entrypoint 会生成一个 Pregel 实例,该实例可通过输入调用。
本指南在较高层次上解释了运行时机制,并提供了直接使用 Pregel 实现应用程序的说明。
注意: Pregel 运行时的名称来源于 Google 的 Pregel 算法,该算法描述了一种使用图进行大规模并行计算的高效方法。
在 LangGraph 中,Pregel 将 参与者(actors) 和 通道(channels) 组合到一个单一应用程序中。参与者 从通道读取数据并向通道写入数据。Pregel 按照 Pregel 算法/批量同步并行(Bulk Synchronous Parallel) 模型组织应用程序的执行步骤。
每个步骤包含三个阶段:
- 规划(Plan):确定本步骤中要执行哪些 参与者。例如,在第一步中,选择订阅特殊 输入 通道的 参与者;在后续步骤中,选择订阅上一步中已更新通道的 参与者。
- 执行(Execution):并行执行所有选定的 参与者,直到全部完成、其中一个失败或达到超时时间。在此阶段,通道更新对参与者不可见,直到下一步。
- 更新(Update):使用本步骤中 参与者 写入的值更新通道。
重复上述过程,直到没有 参与者 被选中执行,或达到最大步骤数。
参与者(Actors)
一个 参与者 是一个 PregelNode。它订阅通道,从通道读取数据并向通道写入数据。可以将其视为 Pregel 算法中的一个 参与者。PregelNodes 实现了 LangChain 的 Runnable 接口。
通道(Channels)
通道用于在参与者(PregelNodes)之间通信。每个通道都有一个值类型、一个更新类型和一个更新函数——该函数接收一系列更新并修改存储的值。通道可用于将数据从一个链发送到另一个链,或在未来的步骤中将数据从一个链发送到其自身。LangGraph 提供了若干内置通道:
- LastValue:默认通道,存储发送到该通道的最后一个值,适用于输入和输出值,或用于在步骤之间传递数据。
- Topic:可配置的发布-订阅主题,适用于在 参与者 之间发送多个值,或累积输出。可配置为去重或在多个步骤中累积值。
- BinaryOperatorAggregate:存储一个持久值,通过将二元运算符应用于当前值和发送到通道的每个更新来更新,适用于在多个步骤中计算聚合值;例如,
total = BinaryOperatorAggregate(int, operator.add)
虽然大多数用户将通过 StateGraph API 或 entrypoint 装饰器与 Pregel 交互,但也可以直接与 Pregel 交互。
以下是几个不同的示例,帮助您了解 Pregel API。
单节点
多节点
Topic
BinaryOperatorAggregate
循环
from langgraph.channels import EphemeralValue
from langgraph.pregel import Pregel, NodeBuilder
node1 = (
NodeBuilder().subscribe_only("a")
.do(lambda x: x + x)
.write_to("b")
)
app = Pregel(
nodes={"node1": node1},
channels={
"a": EphemeralValue(str),
"b": EphemeralValue(str),
},
input_channels=["a"],
output_channels=["b"],
)
app.invoke({"a": "foo"})
from langgraph.channels import LastValue, EphemeralValue
from langgraph.pregel import Pregel, NodeBuilder
node1 = (
NodeBuilder().subscribe_only("a")
.do(lambda x: x + x)
.write_to("b")
)
node2 = (
NodeBuilder().subscribe_only("b")
.do(lambda x: x + x)
.write_to("c")
)
app = Pregel(
nodes={"node1": node1, "node2": node2},
channels={
"a": EphemeralValue(str),
"b": LastValue(str),
"c": EphemeralValue(str),
},
input_channels=["a"],
output_channels=["b", "c"],
)
app.invoke({"a": "foo"})
{'b': 'foofoo', 'c': 'foofoofoofoo'}
from langgraph.channels import EphemeralValue, Topic
from langgraph.pregel import Pregel, NodeBuilder
node1 = (
NodeBuilder().subscribe_only("a")
.do(lambda x: x + x)
.write_to("b", "c")
)
node2 = (
NodeBuilder().subscribe_to("b")
.do(lambda x: x["b"] + x["b"])
.write_to("c")
)
app = Pregel(
nodes={"node1": node1, "node2": node2},
channels={
"a": EphemeralValue(str),
"b": EphemeralValue(str),
"c": Topic(str, accumulate=True),
},
input_channels=["a"],
output_channels=["c"],
)
app.invoke({"a": "foo"})
{'c': ['foofoo', 'foofoofoofoo']}
此示例演示如何使用 BinaryOperatorAggregate 通道实现一个 reducer(归约器)。from langgraph.channels import EphemeralValue, BinaryOperatorAggregate
from langgraph.pregel import Pregel, NodeBuilder
node1 = (
NodeBuilder().subscribe_only("a")
.do(lambda x: x + x)
.write_to("b", "c")
)
node2 = (
NodeBuilder().subscribe_only("b")
.do(lambda x: x + x)
.write_to("c")
)
def reducer(current, update):
if current:
return current + " | " + update
else:
return update
app = Pregel(
nodes={"node1": node1, "node2": node2},
channels={
"a": EphemeralValue(str),
"b": EphemeralValue(str),
"c": BinaryOperatorAggregate(str, operator=reducer),
},
input_channels=["a"],
output_channels=["c"],
)
app.invoke({"a": "foo"})
此示例演示如何通过让一个链写入其订阅的通道来在图中引入循环。执行将持续进行,直到向通道写入 None 值。from langgraph.channels import EphemeralValue
from langgraph.pregel import Pregel, NodeBuilder, ChannelWriteEntry
example_node = (
NodeBuilder().subscribe_only("value")
.do(lambda x: x + x if len(x) < 10 else None)
.write_to(ChannelWriteEntry("value", skip_none=True))
)
app = Pregel(
nodes={"example_node": example_node},
channels={
"value": EphemeralValue(str),
},
input_channels=["value"],
output_channels=["value"],
)
app.invoke({"value": "a"})
{'value': 'aaaaaaaaaaaaaaaa'}
高级 API
LangGraph 提供了两种用于创建 Pregel 应用程序的高级 API:StateGraph (图 API) 和 函数式 API。
StateGraph (图 API)
函数式 API
StateGraph (Graph API) 是一个更高级的抽象,简化了 Pregel 应用程序的创建。它允许您定义节点和边的图。当您编译图时,StateGraph API 会自动为您创建 Pregel 应用程序。from typing import TypedDict, Optional
from langgraph.constants import START
from langgraph.graph import StateGraph
class Essay(TypedDict):
topic: str
content: Optional[str]
score: Optional[float]
def write_essay(essay: Essay):
return {
"content": f"Essay about {essay['topic']}",
}
def score_essay(essay: Essay):
return {
"score": 10
}
builder = StateGraph(Essay)
builder.add_node(write_essay)
builder.add_node(score_essay)
builder.add_edge(START, "write_essay")
# 编译图。
# 这将返回一个 Pregel 实例。
graph = builder.compile()
编译后的 Pregel 实例将关联一组节点和通道。您可以通过打印它们来检查节点和通道。您将看到类似以下内容:{'__start__': <langgraph.pregel.read.PregelNode at 0x7d05e3ba1810>,
'write_essay': <langgraph.pregel.read.PregelNode at 0x7d05e3ba14d0>,
'score_essay': <langgraph.pregel.read.PregelNode at 0x7d05e3ba1710>}
您应看到类似以下内容:{'topic': <langgraph.channels.last_value.LastValue at 0x7d05e3294d80>,
'content': <langgraph.channels.last_value.LastValue at 0x7d05e3295040>,
'score': <langgraph.channels.last_value.LastValue at 0x7d05e3295980>,
'__start__': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e3297e00>,
'write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e32960c0>,
'score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8ab80>,
'branch:__start__:__self__:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e32941c0>,
'branch:__start__:__self__:score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d88800>,
'branch:write_essay:__self__:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e3295ec0>,
'branch:write_essay:__self__:score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8ac00>,
'branch:score_essay:__self__:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d89700>,
'branch:score_essay:__self__:score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8b400>,
'start:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8b280>}
在 函数式 API 中,您可以使用 entrypoint 创建一个 Pregel 应用程序。entrypoint 装饰器允许您定义一个接收输入并返回输出的函数。from typing import TypedDict, Optional
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.func import entrypoint
class Essay(TypedDict):
topic: str
content: Optional[str]
score: Optional[float]
checkpointer = InMemorySaver()
@entrypoint(checkpointer=checkpointer)
def write_essay(essay: Essay):
return {
"content": f"Essay about {essay['topic']}",
}
print("Nodes: ")
print(write_essay.nodes)
print("Channels: ")
print(write_essay.channels)
Nodes:
{'write_essay': <langgraph.pregel.read.PregelNode object at 0x7d05e2f9aad0>}
Channels:
{'__start__': <langgraph.channels.ephemeral_value.EphemeralValue object at 0x7d05e2c906c0>, '__end__': <langgraph.channels.last_value.LastValue object at 0x7d05e2c90c40>, '__previous__': <langgraph.channels.last_value.LastValue object at 0x7d05e1007280>}