LangGraph

函数定义

官方手册

1. 定义

LangGraph定义

LangGraph将Agent运转流程建模为一个图,可以想象成代码版的应用编排,定义三个要素:状态、节点、边。

  • 状态:一个数据结构。作为节点的输入、输出。 是图中不断流转、改变的数据。可以是任何Python类型,一般用TypedDict或 Pydantic BaseModel

  • 节点:Python 函数。动作执行单元。接受当前状态并对其进行处理,然后返回更新后的状态。

  • 边:Python 函数。根据当前节点确定下一个节点。可以说条件分支,也可以是固定路径。

一些函数/特征的定义

  1. StateGraph:状态图。LangGraph中使用的主要图类。
  2. MessageGraph:消息图。一种特殊的图类型,MessageGraphState 仅为消息列表,一般仅用于聊天机器人。
  3. graph_builder.compile(…):图编译。必须要编译,会对图结构做检查。
  4. add_node:参数为节点函数。增加节点,如果没有指定节点名称,默认为函数名。
  5. StateGraph:构建图。参数为状态的数据结构。
  6. Annotated:状态中定义属性时,用这个来实现reducer。

2. 特性

2.1 基本流程

一个简单的样例,其定义的流程为:

  1. 定义状态(数据结构)
  2. 定义节点(执行动作)
  3. 定义边(执行路径)
  4. 图编译
  5. 图执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class InputState(TypedDict):
user_input: str

class OutputState(TypedDict):
graph_output: str

class OverallState(TypedDict):
foo: str
user_input: str
graph_output: str

class PrivateState(TypedDict):
bar: str

def node_1(state: InputState) -> OverallState:
# Write to OverallState
return {"foo": state["user_input"] + " name"}

def node_2(state: OverallState) -> PrivateState:
# Read from OverallState, write to PrivateState
return {"bar": state["foo"] + " is"}

def node_3(state: PrivateState) -> OutputState:
# Read from PrivateState, write to OutputState
return {"graph_output": state["bar"] + " Lance"}

# StateGraph参数是状态的数据结构,如果状态都是dict类型,那么可以 builder = StateGraph(dict)
builder = StateGraph(OverallState,input=InputState,output=OutputState)
# 如果没有制定节点名称,那么默认名称为函数名
builder.add_node("node_1", node_1)
builder.add_node("node_2", node_2)
builder.add_node("node_3", node_3)
builder.add_edge(START, "node_1")
builder.add_edge("node_1", "node_2")
builder.add_edge("node_2", "node_3")
builder.add_edge("node_3", END)

graph = builder.compile()
graph.invoke({"user_input":"My"})

输出:{'graph_output': 'My name is Lance'}

2.2 operator

operator : Python 标准库中的一个模块,它提供了一系列对应于 Python 内置运算符的函数。

Annotated 类型:Annotated 是 Python 3.9 引入的类型提示工具。

Reducer:在 Langgraph 中,reducer 是一个特殊的函数,用于定义当状态更新时如何处理和合并数据。
Reducer 本质上是一个接收两个参数并返回一个结果的函数:

  1. 当前状态中的值
  2. 节点返回的新值
  3. 返回合并后的结果

如果您没有指定 reducer,则每次状态更新都会用最近提供的消息列表覆盖现有消息列表。如果您想简单地将消息追加到现有列表中,则可以使用 operator.add 作为 reducer。

对于这个代码,bar在更新时会将新值加入list,而不是覆盖。比如

  1. 初始值:{"foo": 1, "bar": ["hi"]}
  2. 节点1返回:{"foo": 2}。输出{"foo": 2, "bar": ["hi"]}
  3. 节点2返回:{ "bar": ["Tom"]}。输出**{"foo": 2, "bar": ["hi",'Tom']}**

注意:节点返回时可以不完全返回状态的数据结构,可以只返回其中的一个属性。那么输出状态只会更新其新属性。

1
2
3
4
5
6
7
from typing import Annotated
from typing_extensions import TypedDict
from operator import add

class State(TypedDict):
foo: int
bar: Annotated[list[str], add]

2.3 消息

对于消息的数据结构,预定义了reducer函数:add_messages

  • 状态更新时会反序列化为LangChain的Message对象。

    1
    2
    3
    4
    5
    # this is supported
    {"messages": [HumanMessage(content="message")]}

    # and this is also supported
    {"messages": [{"type": "human", "content": "message"}]}
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    from langchain_core.messages import AnyMessage
    from langgraph.graph.message import add_messages
    from typing import Annotated
    from typing_extensions import TypedDict

    class GraphState(TypedDict):
    messages: Annotated[list[AnyMessage], add_messages]

    # 也可以使用MessagesState。MessagesState 使用 add_messages reducer

    from langgraph.graph import MessagesState
    class State(MessagesState):
    documents: list[str]

3. 节点、边

节点

  • START 节点:用户初始输入数据到达的节点。
  • END 节点:终端节点。表示此边后没有操作。

  • 普通边:直接从一个节点到下一个节点。
  • 条件边:调用一个函数来确定要转到下一个节点。
  • 入口点:用户输入到达时要调用的第一个节点。
  • 条件入口点:调用一个函数来确定用户输入到达时要调用的第一个节点。

并行策略:如果一个节点有多个输出边,那么这些目标节点会并行执行。

普通边

1
graph.add_edge("node_a", "node_b")

条件边

add_conditional_edges

路由到1条边或多条边。

1
graph.add_conditional_edges("node_a", routing_function)
  • routing_function类似节点,接受图的当前 state 并返回一个值。

  • 默认将routing_function的返回状态作为下一节点的输入,然后下组节点并行。

指定下一节点名称:

1
graph.add_conditional_edges("node_a", routing_function, {True: "node_b", False: "node_c"})

入口点

1
2
from langgraph.graph import START
graph.add_edge(START, "node_a")

条件入口点

条件边

1
2
3
4
from langgraph.graph import START
graph.add_conditional_edges(START, routing_function)
#或
graph.add_conditional_edges(START, routing_function, {True: "node_b", False: "node_c"})

4. Send

并行多次调用同一个节点,并使用不同的状态,然后将结果聚合回主图的状态。

属性:

  • node要发送消息的目标节点的名称。
  • arg要发送到目标节点的状态或消息。

Send是发送到节点的一种策略,可以动态指定发生到某个节点(通过参数node)。
那么继而可以实现多次发送到同一个节点,实现同节点的并行处理一批数据。

此样例:

  • 定义状态OverallState
  • 定义节点generate_joke
  • 定义条件边continue_to_jokes
    • 定义将一批数据都发送到一个节点,实现并行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from typing import Annotated, TypedDict
import operator
from langgraph.types import Send
from langgraph.graph import END, START
from langgraph.graph import StateGraph

class OverallState(TypedDict):
subjects: list[str]
jokes: Annotated[list[str], operator.add]
def continue_to_jokes(state: OverallState):
return [Send("generate_joke", {"subject": s}) for s in state['subjects']]

builder = StateGraph(OverallState)
builder.add_node("generate_joke", lambda state: {"jokes": [f"Joke about {state['subject']}"]})
builder.add_conditional_edges(START, continue_to_jokes)
builder.add_edge("generate_joke", END)
graph = builder.compile()

# Invoking with two subjects results in a generated joke for each
print(graph.invoke({"subjects": ["cats", "dogs"]}))

# {'subjects': ['cats', 'dogs'], 'jokes': ['Joke about cats', 'Joke about dogs']}

ps:Map-Reduce 设计模式介绍

Map-Reduce 是一种用于处理和生成大型数据集的编程模型,最初由 Google 提出。这种设计模式主要包含两个阶段:

基本概念

  1. Map 阶段 :将输入数据分割成独立的块,并行处理这些块,生成中间结果(键值对)。
  2. Reduce 阶段 :收集 Map 阶段的所有中间结果,进行合并和处理,生成最终输出。

5. 配置

标记图的某些部分是可配置的,通过传入配置数据。比如在一些节点选择不同的llm,或者提示词等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14

class ConfigSchema(TypedDict):
llm: str
# 实例化时传入 状态和配置 的数据结构
graph = StateGraph(State, config_schema=ConfigSchema)

# 执行时使用的配置数据
config = {"configurable": {"llm": "anthropic"}}
graph.invoke(inputs, config=config)

def node_a(state, config):
llm_type = config.get("configurable", {}).get("llm", "openai")
llm = get_llm(llm_type)
...

6. 递归限制

文档

设置正确的图递归限制对于避免图运行陷入长时间运行的循环很重要,因此有助于最大限度地减少不必要的成本。

限制图最多能走几个节点。

1
graph.invoke(inputs, config={"recursion_limit": 5, "configurable":{"llm": "anthropic"}})

7. 断点

断点

在某些节点执行之前或之后设置断点。

也可以动态添加断点:根据某些条件,动态地从给定节点内部中断图。

8. 显示图

1
2
3
4
5
6
7
8
9
10
11
12
# 显示图像
from IPython.display import Image, display

# 生成并保存图像
png_data = graph.get_graph().draw_mermaid_png(draw_method=MermaidDrawMethod.API,)

# 保存图像到文件
with open('workflow_graph.png', 'wb') as f:
f.write(png_data)

# 显示图像
display(Image(png_data))

9. 流式输出

文档

.stream.astream 是同步和异步方法,用于从图运行中流式返回输出。

可以指定多种不同的模式:

  • "values": 这会在图的每一步之后流式传输状态的完整值。
  • "updates": 这会在图的每一步之后流式传输状态的更新。如果在同一步骤中进行了多个更新(例如运行多个节点),则这些更新将单独流式传输。
  • "debug": 这在整个图执行过程中流式传输尽可能多的信息。

values vs updates

流式 LLM 标记和事件 (.astream_events)

可以使用 astream_events 方法流式传输节点内部发生的事件。

  • 每个节点(可运行的)都会在开始执行时发出 on_chain_start,在节点执行期间发出 on_chain_stream,并在节点完成时发出 on_chain_end。节点事件将具有事件的 name 字段中的节点名称
  • 对状态通道的任何写入(即任何时候更新其中一个状态键的值)都会发出 on_chain_starton_chain_end 事件

可以利用节点名称等信息,在返回图表等特殊数据时,前端对其进行特殊渲染。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import os
import asyncio
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, MessagesState, START, END
modelName = 'qwen-max-latest'
baseUrl = "https://dashscope.aliyuncs.com/compatible-mode/v1"
os.environ["OPENAI_API_KEY"] = ''
model = ChatOpenAI(model=modelName, base_url=baseUrl)

def call_model(state: MessagesState):
response = model.invoke(state['messages'])
return {"messages": response}

workflow = StateGraph(MessagesState)
workflow.add_node(call_model)
workflow.add_edge(START, "call_model")
workflow.add_edge("call_model", END)
app = workflow.compile()

async def main():
inputs = [{"role": "user", "content": "hi!"}]
async for event in app.astream_events({"messages": inputs}, version="v1"):
kind = event["event"]
print(f"{kind}: {event['name']}")

if __name__ == "__main__":
asyncio.run(main())

我们从整体图开始(on_chain_start: LangGraph)。然后我们写入 __start__ 节点(这是一个处理输入的特殊节点)。然后我们启动 call_model 节点(on_chain_start: call_model)。然后我们启动聊天模型调用(on_chat_model_start: ChatOpenAI),按标记流式返回(on_chat_model_stream: ChatOpenAI),然后完成聊天模型(on_chat_model_end: ChatOpenAI)。从那里,我们将结果写回通道(ChannelWrite<call_model,messages>),然后完成 call_model 节点,最后完成整个图。

一个完整的事件数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
{'event': 'on_chat_model_stream',
'name': 'ChatOpenAI',
'run_id': '3fdbf494-acce-402e-9b50-4eab46403859',
'tags': ['seq:step:1'],
'metadata': {'langgraph_step': 1,
'langgraph_node': 'call_model',
'langgraph_triggers': ['start:call_model'],
'langgraph_task_idx': 0,
'checkpoint_id': '1ef657a0-0f9d-61b8-bffe-0c39e4f9ad6c',
'checkpoint_ns': 'call_model',
'ls_provider': 'openai',
'ls_model_name': 'gpt-4o-mini',
'ls_model_type': 'chat',
'ls_temperature': 0.7},
'data': {'chunk': AIMessageChunk(content='Hello', id='run-3fdbf494-acce-402e-9b50-4eab46403859')},
'parent_ids': []}

'langgraph_node': 'call_model',告诉我们这个模型是在哪个节点内调用的。




总访问
发表了 19 篇文章 🔸 总计 43.8k 字