defexec(self, prep_res): ifnot prep_res: return"Empty file content" prompt = f"Summarize this text in 10 words: {prep_res}" summary = call_llm(prompt) # 现在调用的是工具函数 return summary
defexec_fallback(self, prep_res, exc): # Provide a simple fallback instead of crashing return"There was an error processing your request."
defpost(self, shared, prep_res, exec_res): shared["summary"] = exec_res # Return "default" by not returning
summarize_node = SummarizeFile(max_retries=3)
# node.run() calls prep->exec->post # If exec() fails, it retries up to 3 times before calling exec_fallback() action_result = summarize_node.run(shared)
# Define the flow connections review - "approved" >> payment # If approved, process payment review - "needs_revision" >> revise # If needs changes, go to revision review - "rejected" >> finish # If rejected, finish the process
revise >> review # After revision, go back for another review payment >> finish # After payment, finish the process
classSummarizeAllFiles(BatchFlow): defprep(self, shared): # IMPORTANT: Return a list of param dictionaries (not data for processing) filenames = list(shared["data"].keys()) # e.g., ["file1.txt", "file2.txt", ...] return [{"filename": fn} for fn in filenames]
# Child node that accesses filename from params, not shared store classLoadFile(Node): defprep(self, shared): # Access filename from params (not from shared) filename = self.params["filename"] # Important! Use self.params, not shared return filename defexec(self, filename): # 模拟从文件中读取内容 return file[filename] defpost(self, shared, prep_res, exec_res): # Store file content in shared shared["current_file_content"] = exec_res return"default"
# Summarize node that works on the currently loaded file classSummarize(Node): defprep(self, shared): return shared["current_file_content"] defexec(self, content): # 模拟调用llm总结 prompt = f"Summarize this file in 50 words: {content}" return prompt defpost(self, shared, prep_res, exec_res): # Store summary in shared, indexed by current filename filename = self.params["filename"] # Again, using params if"summaries"notin shared: shared["summaries"] = {} shared["summaries"][filename] = exec_res return"default"
classFileBatchFlow(BatchFlow): defprep(self, shared): directory = self.params["directory"] files = list(shared["data"][directory].keys()) # files = [f for f in os.listdir(directory) if f.endswith(".txt")] return [{"filename": f} for f in files]
classDirectoryBatchFlow(BatchFlow): defprep(self, shared): directories = list(shared["data"].keys()) return [{"directory": d} for d in directories]
# The actual processing node classProcessFile(Node): defprep(self, shared): # Access both directory and filename from params directory = self.params["directory"] # From outer batch filename = self.params["filename"] # From inner batch # full_path = os.path.join(directory, filename) full_path = shared["data"][directory][filename] return full_path defexec(self, full_path): returnf"Processed {full_path}" defpost(self, shared, prep_res, exec_res): if"results"notin shared: shared["results"] = {} shared["results"][prep_res] = exec_res return"default"
# Set up the nested batch structure process_node = ProcessFile() inner_flow = FileBatchFlow(start=process_node) outer_flow = DirectoryBatchFlow(start=inner_flow)
# Run it outer_flow.run(shared)
print("\n".join(f"{f}: {s}"for (f,s) in shared["results"].items()))