【LangGraph】LangGraph 协调者-工作者模式完全解析:从零构建一个智能报告生成系统
目录LangGraph 协调者-工作者模式完全解析从零构建一个智能报告生成系统一、这个系统是做什么的二、整体流程图三、核心概念协调者-工作者模式3.1 和普通并行化的区别3.2 生活例子四、状态定义State4.1 字段说明4.2 重要operator.add 的作用五、节点详解5.1 节点1orchestrator协调者5.2 分配函数assign_workers关键边重点Send 是什么5.3 节点2llm_call工作者5.4 节点3synthesizer合成器六、图的构建核心难点6.1 完整代码6.2 关键add_conditional_edges 的两种用法七、执行过程详解八、完整可运行代码九、总结LangGraph 协调者-工作者模式完全解析从零构建一个智能报告生成系统一、这个系统是做什么的一句话输入一个主题自动生成一份完整的报告。比如你输入“中国近代史”系统会自动分析主题决定报告分几个章节为每个章节分配一个写作任务多个“员工”同时写作不同章节把所有章节拼成完整报告二、整体流程图用户输入: {topic: 中国近代史} │ ▼ ┌─────────────────────────────────────────────────────────────┐ │ 节点1: orchestrator协调者/领导 │ │ │ │ 作用分析主题制定章节计划 │ │ 输入{topic: 中国近代史} │ │ 输出{sections: [章1, 章2, 章3]} │ │ │ │ 内部逻辑调用 LLM强制输出固定格式的章节列表 │ └─────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────┐ │ 函数: assign_workers分配任务(边) │ │ │ │ 作用为每个章节创建一个 Send 对象 │ │ 输入{sections: [章1, 章2, 章3]} │ │ 输出[Send(llm_call, {section: 章1}), │ │ Send(llm_call, {section: 章2}), │ │ Send(llm_call, {section: 章3})] │ └─────────────────────────────────────────────────────────────┘ │ │ 动态创建 3 条边 │ ┌───────────┼───────────┐ │ │ │ ▼ ▼ ▼ ┌────────┐ ┌────────┐ ┌────────┐ │ 节点2 │ │ 节点2 │ │ 节点2 │ │llm_call│ │llm_call│ │llm_call│ │ (员工) │ │ (员工) │ │ (员工) │ │ │ │ │ │ │ │ 写章1 │ │ 写章2 │ │ 写章3 │ └───┬────┘ └───┬────┘ └───┬────┘ │ │ │ └──────────┼──────────┘ ▼ ┌─────────────────────────────────────────────────────────────┐ │ 节点3: synthesizer合成器/文员 │ │ │ │ 作用把所有章节拼成完整报告 │ │ 输入{completed_sections: [章1内容, 章2内容, ...]} │ │ 输出{final_report: 章1内容\n---\n章2内容\n---\n...} │ └─────────────────────────────────────────────────────────────┘ │ ▼ 输出报告三、核心概念协调者-工作者模式3.1 和普通并行化的区别对比普通并行化协调者-工作者模式任务数量设计时就固定运行时动态决定任务内容预先知道协调者分析后才知道代码中的体现add_edge写死Send动态创建3.2 生活例子角色类比代码中的节点协调者领导决定写哪几章orchestrator工作者员工每人写一章llm_call合成器文员把章拼成书synthesizer关键领导可能决定写3章也可能写5章。员工数量取决于领导的决定。四、状态定义StateclassState(TypedDict):topic:str# 用户输入的主题sections:list# 协调者生成的章节列表completed_sections:Annotated[List,operator.add]# 员工写好的内容final_report:str# 最终报告4.1 字段说明字段类型作用谁写入topicstr用户输入的主题调用者sectionslist章节计划orchestratorcompleted_sectionslist员工写好的内容llm_call多个final_reportstr最终报告synthesizer4.2 重要operator.add的作用completed_sections:Annotated[List,operator.add]没有operator.add有operator.add后返回的结果覆盖前面的所有结果追加到列表只有一个员工的结果所有员工的结果都在列表里如果不加这个多个员工同时交稿只有最后一个会被保留。五、节点详解5.1 节点1orchestrator协调者deforchestrator(state:State):responseplanner.invoke(f为主题{state[topic]}制定报告大纲包含3个章节)return{sections:response.sections}项目说明输入{topic: 中国近代史}输出{sections: [Section对象, Section对象, ...]}作用分析主题制定章节计划5.2 分配函数assign_workers关键边defassign_workers(state:State):worker_tasks[]forsectioninstate[sections]:worker_tasks.append(Send(llm_call,{section:section}))returnworker_tasks项目说明输入{sections: [章1, 章2, 章3]}输出[Send(...), Send(...), Send(...)]作用为每个章节创建一个工作者任务重点Send是什么Send(llm_call,{section:section})参数含义第一个参数目标节点的名字第二个参数传给该节点的stateSend的作用动态创建一条边并携带数据给目标节点。注意llm_call收到的state只有Send传递的{section: ...}。5.3 节点2llm_call工作者defllm_call(state:dict):sectionstate[section]resultmodel.invoke(f编写报告章节:{section.name}, 内容要求:{section.description})return{completed_sections:[result.content]}项目说明输入{section: 章1}来自Send输出{completed_sections: [章1内容]}作用写一个章节的内容5.4 节点3synthesizer合成器defsynthesizer(state:State):completed_sectionsstate[completed_sections]final_report\n\n---\n\n.join(completed_sections)return{final_report:final_report}项目说明输入{completed_sections: [章1内容, 章2内容, ...]}输出{final_report: 章1内容\n---\n章2内容\n---\n...}作用把所有章节拼成完整报告\n\n---\n\n.join(completed_sections)把列表里的多个字符串用 \n\n—\n\n 作为分隔符拼接成一个字符串。六、图的构建核心难点6.1 完整代码builderStateGraph(State)# 添加节点builder.add_node(orchestrator,orchestrator)builder.add_node(llm_call,llm_call)builder.add_node(synthesizer,synthesizer)# 边1开始 → 协调者builder.add_edge(START,orchestrator)# 边2条件边关键builder.add_conditional_edges(orchestrator,assign_workers,[llm_call]# 用于类型检查声明可能去的节点。实际去多少次、每次带什么数据由 assign_workers 返回的 Send 列表决定。)# 边3工作者 → 合成器builder.add_edge(llm_call,synthesizer)builder.add_edge(synthesizer,END)6.2 关键add_conditional_edges的两种用法普通用法本代码的用法路由函数返回一个节点名assign_workers返回多个Send对象只去一个节点可以去多个节点同一个节点多次不能携带数据每个Send可以携带不同数据# 普通用法defroute(state):ifstate[type]A:returnnode_Aelse:returnnode_Bbuilder.add_conditional_edges(node,route,[node_A,node_B])# 本代码的用法动态创建多个defassign_workers(state):return[Send(llm_call,{section:s})forsinstate[sections]]builder.add_conditional_edges(orchestrator,assign_workers,[llm_call])七、执行过程详解第1步用户输入resultworkflow.invoke({topic:中国近代史})第2步orchestrator执行# 输入state{topic:中国近代史}# 调用 LLM生成3个章节responseplanner.invoke(为主题中国近代史制定报告大纲包含3个章节)# response 是 Sections 对象# response.sections [章1, 章2, 章3]# 返回return{sections:response.sections}第3步assign_workers执行# 输入state{topic:中国近代史,sections:[章1,章2,章3]}# 为每个章节创建 Sendreturn[Send(llm_call,{section:章1}),Send(llm_call,{section:章2}),Send(llm_call,{section:章3})]第4步llm_call执行3次并行# 第一次调用state{section:章1}# 写第一章内容return{completed_sections:[第一章内容...]}# 第二次调用state{section:章2}return{completed_sections:[第二章内容...]}# 第三次调用state{section:章3}return{completed_sections:[第三章内容...]}由于completed_sections使用了operator.add三个结果自动合并成{completed_sections:[第一章内容...,第二章内容...,第三章内容...]}第5步synthesizer执行# 输入state{completed_sections:[第一章内容...,第二章内容...,第三章内容...]}# 拼接final_report第一章内容...\n\n---\n\n第二章内容...\n\n---\n\n第三章内容...# 返回return{final_report:final_report}第6步输出print(result[final_report])# 输出完整的报告八、完整可运行代码fromtypingimportAnnotated,List,TypedDictimportoperatorfromlangchain.chat_modelsimportinit_chat_modelfromlanggraph.constantsimportSTART,ENDfromlanggraph.graphimportStateGraphfromlanggraph.typesimportSendfrompydanticimportBaseModel# # 1. 定义状态# classState(TypedDict):topic:strsections:listcompleted_sections:Annotated[List,operator.add]final_report:str# # 2. 定义结构化输出# classSection(BaseModel):name:strdescription:strclassSections(BaseModel):sections:List[Section]# # 3. 创建模型# modelinit_chat_model(gpt-4o-mini)# 普通模型用于生成内容plannermodel.with_structured_output(Sections)# 结构化模型用于生成大纲# # 4. 协调者节点# deforchestrator(state:State):responseplanner.invoke(f为主题{state[topic]}制定报告大纲包含3个章节)return{sections:response.sections}# # 5. 工作者节点# defllm_call(state:dict):sectionstate[section]resultmodel.invoke(f编写报告章节:{section.name}, 内容要求:{section.description})return{completed_sections:[result.content]}# # 6. 合成器节点# defsynthesizer(state:State):completed_sectionsstate[completed_sections]final_report\n\n---\n\n.join(completed_sections)return{final_report:final_report}# # 7. 任务分配函数# defassign_workers(state:State):worker_tasks[]forsectioninstate[sections]:worker_tasks.append(Send(llm_call,{section:section}))returnworker_tasks# # 8. 构建图# builderStateGraph(State)builder.add_node(orchestrator,orchestrator)builder.add_node(llm_call,llm_call)builder.add_node(synthesizer,synthesizer)builder.add_edge(START,orchestrator)builder.add_conditional_edges(orchestrator,assign_workers,[llm_call])builder.add_edge(llm_call,synthesizer)builder.add_edge(synthesizer,END)workflowbuilder.compile()# # 9. 运行# if__name____main__:resultworkflow.invoke({topic:中国近代史})print(\n*50)print(最终报告)print(*50)print(result[final_report])九、总结概念解释协调者-工作者模式领导动态分配任务员工并行执行Send动态创建边可以携带不同数据operator.add让多个结果自动合并到列表with_structured_output强制 LLM 输出固定格式add_conditional_edges可以返回Send列表动态创建多条边工作者的state不是全局状态只是Send传递的数据一句话总结协调者动态决定任务数量Send动态创建边并携带数据工作者并行执行结果自动合并最终合成完整报告。