文档

PocketFlow是一个只用100代码编写的Agent框架,可以实现多种LLM设计模式,如Agent、Workflow、RAG等,也提供了很多功能样例。

最重要的是,这是一款符合cursor编程的Agent框架,内置了多个cursor-rules,同时由于本身源代码量就很少,所以cursor很容易理解PocketFlow,并基于PocketFlow编写应用。

另一个好处是学习成本低,更容易上手。相比于其他重量级的框架如LangGraph,需要我们熟悉LangGraph的许多框架规则,虽然他提供的功能很多,但也会增加我们的学习成本,也不易调试程序。而PocketFlow规则少、清晰简单。

LangGraph文档

PocketFlow文字也比较详细了,在这里将记录符合我习惯的文档风格,也作为一个学习的过程。

1. 要素

1.1 节点 Node

描述

shared:全局的数据(内存中的字典)。一般作为prep的读取和post的输出。

节点中会顺序执行三个步骤

  1. prep(shared):数据预处理。读取shared,进行数据的预处理。
  2. exec(prep_res):动作执行。接收处理好的数据,执行动作。exec中不直接访问shared。
    • 返回exec_res,传递给post()
    • 如果只需要处理数据,可以不实现exec。
  3. post(shared, prep_res, exec_res):数据后处理。接收exec的输出,整理数据并写入shared。
    • 通过返回字符串action = "default")决定下一步操作。
image-20250502173057395
1
2
3
4
5
6
7
8
9
class Start(Node):
def prep(self, shared):
pass

def exec(self,prep_res):
pass

def post(self, shared, prep_res, exec_res):
pass
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class Node(Node):
def prep(self, shared):
# 数据预处理
return shared.get("name")

def exec(self,prep_res):
# 动作
exec_res = prep_res * 10
return exec_res

def post(self, shared, prep_res, exec_res):
# 更新状态
shared["name"] = exec_res
# 确定方向
return exec_res

容错参数

  • max_retries:错误重试次数

  • wait:下次重试的等待时间,单位秒。场景如 LLM 提供商的速率限制时。

  • 获取当前重试次数:

    1
    2
    3
    4
    class RetryNode(Node):
    def exec(self, prep_res):
    print(f"Retry {self.cur_retry} times")
    raise Exception("Failed")

容错函数:重试后仍错误,将执行exec_fallback,可准备一个后备结果

1
2
def exec_fallback(self, prep_res, exc):
raise exc

代码

样例代码

节点运行使用node.run(shared)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
from pocketflow import Node
from utils.call_llm import call_llm

shared = {
"data": """**春天的作文**
春天,是四季中最充满希望和生机的季节。当冬日的寒冷渐渐退去,温暖的阳光洒满大地,万物开始苏醒,春天就这样悄无声息地来到了我们的身边。
清晨,我走出家门,迎面扑来的是清新的空气,夹杂着泥土的芬芳和花草的香气。路边的小草悄悄地从土里探出头来,嫩绿的叶子在微风中轻轻摇曳,仿佛在向人们招手。树木也披上了新装,枝头上绽开了朵朵花苞,有的已经盛开,粉红的桃花、雪白的梨花、金黄的迎春花,把整个世界装扮得五彩缤纷。
公园里,孩子们在草地上奔跑、嬉戏,欢笑声回荡在空气中。老人们坐在长椅上,享受着温暖的阳光,脸上洋溢着幸福的笑容。鸟儿们也从南方飞回来了,在树枝间跳跃,唱着动听的歌谣,仿佛在庆祝春天的到来。
春天不仅带来了美丽的景色,还给人们带来了无限的希望和动力。农民伯伯开始忙碌起来,播种希望;学生们也重新投入到学习中,迎接新的挑战。春天就像一位温柔的画家,用她的画笔描绘出一幅幅生动的画面,让人心情愉悦,充满活力。
我爱春天,爱她的美丽,爱她的生机,更爱她带给人们的希望与梦想。让我们珍惜这美好的季节,努力拼搏,迎接更加灿烂的明天!
""",
"summary": ""
}

class SummarizeFile(Node):
def prep(self, shared):
return shared["data"]

def exec(self, prep_res):
if not prep_res:
return "Empty file content"
prompt = f"Summarize this text in 10 words: {prep_res}"
summary = call_llm(prompt) # 现在调用的是工具函数
return summary

def exec_fallback(self, prep_res, exc):
# Provide a simple fallback instead of crashing
return "There was an error processing your request."

def post(self, shared, prep_res, exec_res):
shared["summary"] = exec_res
# Return "default" by not returning

summarize_node = SummarizeFile(max_retries=3)

# node.run() calls prep->exec->post
# If exec() fails, it retries up to 3 times before calling exec_fallback()
action_result = summarize_node.run(shared)

print("Action returned:", action_result) # "default"
print("Summary stored:", shared["summary"])

# 输出
# Action returned: None
# Summary stored: There was an error processing your request.

1.2 流程 Flow

使用Flow做流式编排,通过简单的定义实现LangGraph的功能。

Flow的方向

每个Node都会返回一个Action字符串。如果节点的post没有返回值,默认Action“defaut”

  1. 默认的方向node_a >> node_b ,意味着如果节点node_a没有返回值或者返回'defaut',则方向是node_b
  2. 命名的方向node_a - "action_name" >> node_b,意味着如果节点node_a的返回值为"action_name",则方向是node_b

基于以上两个方向的定义,可以通过Flow创建循环、分支等

Flow的创建

  1. 先定义节点方向
  2. 创建Flow,用start定义初始节点
  3. run 运行Flow,并根据节点post返回值决定方向并执行,直到没有下一个节点
1
2
3
node_a >> node_b
flow = Flow(start=node_a)
flow.run(shared)

样例代码:分支、循环

结合 节点post、默认方向流、命名方向流,可以实现节点间的循环、分支判断。

1
2
3
4
5
6
7
8
9
10
# Define the flow connections
review - "approved" >> payment # If approved, process payment
review - "needs_revision" >> revise # If needs changes, go to revision
review - "rejected" >> finish # If rejected, finish the process

revise >> review # After revision, go back for another review
payment >> finish # After payment, finish the process

flow = Flow(start=review)

image-20250502221552350

做了一个循环、退出循环的测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# 循环、分支、结束
# 循环:a -> b -> c -> d -> a -> ...
# 每次循环,shared 的 loop 减 1
# 当 loop 为 0 时,返回 'default'
# 结束:d -> end
# 在post中判断方向

from pocketflow import Node,Flow

class A(Node):
def prep(self, shared):
print("-" * 50)
if shared.get("name") == "a":
print(f"开始倒数第{shared.get('loop')}次循环")
return shared.get("name")

def exec(self,prep_res):
exec_res = chr(ord(prep_res) + 1)
return exec_res

def post(self, shared, prep_res, exec_res):
print(f"当前节点: {prep_res}")
print(f"下一个节点: {exec_res}")
shared["name"] = exec_res
if prep_res == "d":
shared["name"] = "a"
shared["loop"] = shared.get("loop") - 1
if shared.get("loop") == 0:
return 'default'
return 'a'
else:
print(f"通过 {shared.get('name')}")
return exec_res
class End(Node):
def prep(self, shared):
print("-" * 50)
return None
def post(self, shared, prep_res, exec_res):
print("Game Over")
return None


a = A()
b = A()
c = A()
d = A()
end = End()

a - 'b' >> b
b - "c" >> c
c - 'd' >> d
d - 'a' >> a
d >> end
flow = Flow(start=a)

shared = {
"name": "a",
"loop": 3
}

flow.run(shared)

Flow 嵌套

流程也是节点,适用节点的规则。

1
2
3
4
5
6
7
8
9
10
# Create a sub-flow
node_a >> node_b
subflow = Flow(start=node_a)

# Connect it to another node
subflow >> node_c

# Create the parent flow
parent_flow = Flow(start=subflow)

样例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
from pocketflow import Node,Flow

class Node(Node):
def prep(self, shared):
print(f"当前节点: {shared.get('name')}")
return shared.get("name")

def exec(self,prep_res):
exec_res = chr(ord(prep_res) + 1)
return exec_res

def post(self, shared, prep_res, exec_res):
# 更新状态
shared["name"] = exec_res
if prep_res == "d":
return 'end'
# 确定方向
return None

class End(Node):
def prep(self, shared):
return None

def exec(self,prep_res):
return None

def post(self, shared, prep_res, exec_res):
print(f"Game Over")
return None

a = Node()
b = Node()
c = Node()
d = Node()
end = End()

a >> b
flow_a_b = Flow(start=a)

c >> d
flow_c_d = Flow(start=c)

flow_a_b >> flow_c_d

flow_c_d - 'end' >> end

flow = Flow(start=flow_a_b)

shared = {
"name": "a",
}
flow.run(shared)

# flow_a_b.run(shared)

1.3 数据流

Shared

一般是内存中的字典类型,可以包含数据库连接等实现数据持久化。

1
shared = {"data": {}, "summary": {}, "config": {...}, ...}
  1. LoadData写入数据到shared["data"]
  2. Summarizeshared["data"] 读取数据,总结写入 shared["summary"]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
from pocketflow import Node,Flow
from utils.call_llm import call_llm

# 处理shared数据
class LoadData(Node):
def post(self, shared, prep_res, exec_res):
# We write data to shared store
shared["data"] = """**春天的作文**
春天,是四季中最充满希望和生机的季节。当冬日的寒冷渐渐退去,温暖的阳光洒满大地,万物开始苏醒,春天就这样悄无声息地来到了我们的身边。
清晨,我走出家门,迎面扑来的是清新的空气,夹杂着泥土的芬芳和花草的香气。路边的小草悄悄地从土里探出头来,嫩绿的叶子在微风中轻轻摇曳,仿佛在向人们招手。树木也披上了新装,枝头上绽开了朵朵花苞,有的已经盛开,粉红的桃花、雪白的梨花、金黄的迎春花,把整个世界装扮得五彩缤纷。
公园里,孩子们在草地上奔跑、嬉戏,欢笑声回荡在空气中。老人们坐在长椅上,享受着温暖的阳光,脸上洋溢着幸福的笑容。鸟儿们也从南方飞回来了,在树枝间跳跃,唱着动听的歌谣,仿佛在庆祝春天的到来。
春天不仅带来了美丽的景色,还给人们带来了无限的希望和动力。农民伯伯开始忙碌起来,播种希望;学生们也重新投入到学习中,迎接新的挑战。春天就像一位温柔的画家,用她的画笔描绘出一幅幅生动的画面,让人心情愉悦,充满活力。
我爱春天,爱她的美丽,爱她的生机,更爱她带给人们的希望与梦想。让我们珍惜这美好的季节,努力拼搏,迎接更加灿烂的明天!
"""
return None

# 执行
class Summarize(Node):
def prep(self, shared):
# We read data from shared store
return shared["data"]

def exec(self, prep_res):
# Call LLM to summarize
prompt = f"Summarize: {prep_res}"
summary = call_llm(prompt)
return summary

def post(self, shared, prep_res, exec_res):
# We write summary to shared store
shared["summary"] = exec_res
return "default"

load_data = LoadData()
summarize = Summarize()
load_data >> summarize
flow = Flow(start=load_data)

shared = {}
flow.run(shared)

print(shared["summary"])

Params

所有的 NodeFlow 都可以设置专属参数。

  1. Node运行周期内不可变动
  2. node.set_params() flow.set_params()
  3. Flow参数会覆盖子节点参数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 1) Create a Node that uses params
class SummarizeFile(Node):
def prep(self, shared):
# Access the node's param
filename = self.params["filename"]
return shared["data"].get(filename, "")

def exec(self, prep_res):
prompt = f"Summarize: {prep_res}"
return call_llm(prompt)

def post(self, shared, prep_res, exec_res):
filename = self.params["filename"]
shared["summary"][filename] = exec_res
return "default"

# 2) Set params
node = SummarizeFile()

# 3) Set Node params directly (for testing)
node.set_params({"filename": "doc1.txt"})
node.run(shared)

# 4) Create Flow
flow = Flow(start=node)

# 5) Set Flow params (overwrites node params)
flow.set_params({"filename": "doc2.txt"})
flow.run(shared) # The node summarizes doc2, not doc1

1.4 Batch

BatchNode

  1. prep(shared): 返回一个可迭代对象,如数组,字符串。
  2. **exec(item)**:对数组的每项进行处理
  3. **post(shared, prep_res, exec_res_list)**:接收exec的结果数组,exec_res_list
1
2
3
4
5
6
7
8
9
class BatchNode(BatchNode):
def prep(self, shared):
return []

def exec(self, chunk):
pass

def post(self,shared, prep_res,exec_res_list):
return "default"
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from pocketflow import BatchNode,BatchFlow,Flow
from utils.call_llm import call_llm


class MapSummaries(BatchNode):
def prep(self, shared):
content = shared["data"]
chunk_size = 20
chunks = [content[i:i+chunk_size] for i in range(0, len(content), chunk_size)]
return chunks

def exec(self, chunk):
prompt = f"Summarize this chunk in 10 words: {chunk}"
summary = call_llm(prompt)
return summary

def post(self,shared, prep_res,exec_res_list):
combined = "\n".join(exec_res_list)
shared["summary"] = combined
return None

map_summaries = MapSummaries()
flow = Flow(start=map_summaries)
shared = {
'data': """
**春天的作文**
春天,是四季中最充满希望和生机的季节。当冬日的寒冷渐渐退去,温暖的阳光洒满大地,万物开始苏醒,春天就这样悄无声息地来到了我们的身边。
清晨,我走出家门,迎面扑来的是清新的空气,夹杂着泥土的芬芳和花草的香气。路边的小草悄悄地从土里探出头来,嫩绿的叶子在微风中轻轻摇曳,仿佛在向人们招手。树木也披上了新装,枝头上绽开了朵朵花苞,有的已经盛开,粉红的桃花、雪白的梨花、金黄的迎春花,把整个世界装扮得五彩缤纷。
公园里,孩子们在草地上奔跑、嬉戏,欢笑声回荡在空气中。老人们坐在长椅上,享受着温暖的阳光,脸上洋溢着幸福的笑容。鸟儿们也从南方飞回来了,在树枝间跳跃,唱着动听的歌谣,仿佛在庆祝春天的到来。
春天不仅带来了美丽的景色,还给人们带来了无限的希望和动力。农民伯伯开始忙碌起来,播种希望;学生们也重新投入到学习中,迎接新的挑战。春天就像一位温柔的画家,用她的画笔描绘出一幅幅生动的画面,让人心情愉悦,充满活力。
我爱春天,爱她的美丽,爱她的生机,更爱她带给人们的希望与梦想。让我们珍惜这美好的季节,努力拼搏,迎接更加灿烂的明天!""",
'summary': ""
}
flow.run(shared)

print(shared["summary"])


BatchFlow

使用params而不是shared,每个子流程通过不同的参数独立运行。

  1. BatchFlow 将一批参数传递给子 Flow,每个参数的Flow独立运行
  2. Flow中通过self.params获取参数
  3. 子流程/节点 为常规 流程/节点,而不是Batch

将文件名作为参数传递给Flow。每个Flow处理一个文件。在shared中收集数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
from pocketflow import BatchNode,BatchFlow,Flow,Node
from utils.call_llm import call_llm


file = {
"file1.txt": "1234567",
"file2.txt": "8901234",
"file3.txt": "5678901"
}
shared = {
"data": file
}

class SummarizeAllFiles(BatchFlow):
def prep(self, shared):
# IMPORTANT: Return a list of param dictionaries (not data for processing)
filenames = list(shared["data"].keys()) # e.g., ["file1.txt", "file2.txt", ...]
return [{"filename": fn} for fn in filenames]

# Child node that accesses filename from params, not shared store
class LoadFile(Node):
def prep(self, shared):
# Access filename from params (not from shared)
filename = self.params["filename"] # Important! Use self.params, not shared
return filename

def exec(self, filename):
# 模拟从文件中读取内容
return file[filename]

def post(self, shared, prep_res, exec_res):
# Store file content in shared
shared["current_file_content"] = exec_res
return "default"

# Summarize node that works on the currently loaded file
class Summarize(Node):
def prep(self, shared):
return shared["current_file_content"]

def exec(self, content):
# 模拟调用llm总结
prompt = f"Summarize this file in 50 words: {content}"
return prompt

def post(self, shared, prep_res, exec_res):
# Store summary in shared, indexed by current filename
filename = self.params["filename"] # Again, using params
if "summaries" not in shared:
shared["summaries"] = {}
shared["summaries"][filename] = exec_res
return "default"

# Create a per-file flow
load_file = LoadFile()
summarize = Summarize()
load_file >> summarize
summarize_file = Flow(start=load_file)

# Wrap in a BatchFlow to process all files
summarize_all_files = SummarizeAllFiles(start=summarize_file)
summarize_all_files.run(shared)


print("\n".join(f"{f}: {s}" for (f,s) in shared["summaries"].items()))

BatchFlow 嵌套

可以将一个BatchFlow嵌套在另一个BatchFlow中。

每一层BatchFlow都有参数,在每一层,会将自身参数和父节点参数合并。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
from pocketflow import BatchNode,BatchFlow,Flow,Node
from utils.call_llm import call_llm


directory = {
"dir1": {
"file1.txt": "dir1 file1 content",
"file2.txt": "dir1 file2 content",
"file3.txt": "dir1 file3 content"
},
"dir2": {
"file1.txt": "dir2 file1 content",
"file2.txt": "dir2 file2 content",
"file3.txt": "dir2 file3 content"
}
}

shared = {
"data": directory
}


class FileBatchFlow(BatchFlow):
def prep(self, shared):
directory = self.params["directory"]
files = list(shared["data"][directory].keys())
# files = [f for f in os.listdir(directory) if f.endswith(".txt")]
return [{"filename": f} for f in files]

class DirectoryBatchFlow(BatchFlow):
def prep(self, shared):
directories = list(shared["data"].keys())
return [{"directory": d} for d in directories]

# The actual processing node
class ProcessFile(Node):
def prep(self, shared):
# Access both directory and filename from params
directory = self.params["directory"] # From outer batch
filename = self.params["filename"] # From inner batch
# full_path = os.path.join(directory, filename)
full_path = shared["data"][directory][filename]
return full_path

def exec(self, full_path):
return f"Processed {full_path}"

def post(self, shared, prep_res, exec_res):
if "results" not in shared:
shared["results"] = {}
shared["results"][prep_res] = exec_res
return "default"

# Set up the nested batch structure
process_node = ProcessFile()
inner_flow = FileBatchFlow(start=process_node)
outer_flow = DirectoryBatchFlow(start=inner_flow)

# Run it
outer_flow.run(shared)


print("\n".join(f"{f}: {s}" for (f,s) in shared["results"].items()))

1.5 异步

异步节点实现prep_async()exec_async()exec_fallback_async()post_async()




总访问
发表了 19 篇文章 🔸 总计 43.8k 字