1. 项目概述与核心价值最近在探索多智能体系统Multi-Agent System, MAS的落地应用时我遇到了一个非常有意思的开源项目yohey-w/multi-agent-shogun。这个项目名字本身就很有冲击力“Shogun”将军的意象暗示着一种集中式协调与调度的架构。经过一段时间的深度使用和代码剖析我发现它并非又一个简单的Agent框架玩具而是一个在工程化、可观测性和协作模式上有着独特思考的实践性项目。它试图解决一个核心痛点当我们拥有多个能力各异的AI智能体时如何让它们像一支训练有素的军队一样高效、可靠、透明地协同完成复杂任务而不仅仅是让它们“各自为战”然后简单拼接结果。对于任何尝试将大语言模型LLM从单点问答推向复杂工作流自动化的开发者来说多智能体协作都是一个必然要面对的课题。市面上有很多框架有的强调自治有的强调流程编排。multi-agent-shogun给我的第一印象是它非常注重“可控”与“可见”。它不追求极致的Agent自治而是通过一个中央协调器Shogun来统筹规划、任务分发、依赖管理和状态监控这使得整个系统的行为更加可预测、可调试。这对于开发企业级应用或对输出稳定性有要求的场景至关重要。简单来说如果你受够了智能体们偶尔“放飞自我”或协作时陷入逻辑死循环那么这个框架提供的“中央集权”式管理思路值得你花时间深入了解。2. 架构深度解析为何是“将军”模式2.1 核心架构设计哲学multi-agent-shogun的核心架构可以概括为“一中心多智能体全链路可观测”。其设计哲学明显偏向于解决复杂任务下的可靠性问题而非单纯追求Agent的自主性。中央协调器Shogun这是项目的心脏也是“将军”之名的由来。它不是一个简单的路由器而是一个拥有全局视野的决策中心。Shogun的主要职责包括任务解析与规划接收用户提交的顶层目标将其分解为一系列有逻辑顺序和依赖关系的子任务。智能体路由与调度根据子任务的需求从注册的智能体池中匹配合适的“专家”Agent来执行。这里涉及能力匹配、负载均衡等策略。工作流引擎管理子任务之间的依赖关系例如任务B需要等待任务A的输出结果才能启动。它确保工作流能正确、顺序地执行。状态管理与监控实时跟踪每个子任务和每个Agent的执行状态等待、执行中、成功、失败并提供统一的监控接口。这种模式的优势非常明显控制力强、调试方便、流程稳定。所有任务流转和决策逻辑都集中在Shogun当出现问题时我们可以快速定位是规划出错、路由不当还是某个Agent自身故障。相比之下完全去中心化的多Agent系统如基于Agent间通信的架构虽然更灵活但一旦出现异常问题排查就像在迷宫里找一只特定的蚂蚁非常困难。2.2 智能体Agent的角色与标准化在这个框架中单个Agent被设计为相对“纯粹”的执行单元。每个Agent都有明确的职能边界和标准化的输入输出接口。这类似于一个公司里的各个职能部门市场部、研发部、法务部各司其职。框架通常要求Agent实现以下标准方法describe_capability(): 向Shogun声明自己的能力例如“我能进行文本总结”、“我能调用搜索引擎API”、“我能生成Python代码”。这是Shogun进行任务匹配的基础。execute(task_context, history): 核心执行方法。接收来自Shogun的任务上下文包含目标、参数、上游任务结果等和历史对话记录执行具体操作并返回标准化结果。这种设计带来了高内聚、低耦合的好处。我们可以独立开发、测试和升级每一个Agent只要其接口契约不变就不会影响整个系统。例如你可以把GPT-4的Agent轻松替换为Claude的Agent或者为同一个功能开发不同版本的Agent进行A/B测试。2.3 通信与状态管理机制框架内部的信息流转是架构可靠性的关键。multi-agent-shogun通常采用基于消息队列或事件总线的异步通信模式。任务消息Shogun将子任务封装成标准消息发送到任务队列。空闲的、具备相应能力的Agent从队列中领取任务。这解耦了调度和执行提升了系统的吞吐量和弹性。结果回调Agent完成任务后将结果成功或失败封装成消息通过回调机制通知Shogun。Shogun更新该任务状态并触发后续依赖任务的调度。全局状态存储所有任务、Agent的状态以及中间结果都被持久化在一个共享状态存储中如Redis、数据库。这使得Shogun在重启后能恢复现场也方便实现一个实时更新的仪表盘供开发者监控整个系统的运行情况。注意在实际部署中消息队列如RabbitMQ、Redis Streams和状态存储的选择至关重要。对于高吞吐场景需要精心设计消息格式和序列化协议避免成为性能瓶颈。我个人的经验是初期可以用Redis同时承担队列和存储的角色简化部署当规模扩大后再进行拆分。3. 从零到一搭建你的第一个Shogun多智能体系统3.1 环境准备与基础依赖安装假设我们使用Python作为开发语言。首先需要克隆项目仓库并设置环境。我强烈建议使用虚拟环境如venv或conda来隔离依赖。# 克隆项目 git clone https://github.com/yohey-w/multi-agent-shogun.git cd multi-agent-shogun # 创建并激活虚拟环境以venv为例 python -m venv venv source venv/bin/activate # Linux/macOS # venv\Scripts\activate # Windows # 安装核心依赖 pip install -r requirements.txt项目的requirements.txt通常会包含一些核心库例如langchain/llama-index: 用于构建Agent的基础能力与大模型交互。pydantic: 用于数据验证和设置管理确保消息和配置的结构化。redis/pika: 作为消息队列和状态存储的客户端。fastapi/uvicorn: 可能用于提供Shogun和Agent的HTTP API服务。如果项目没有提供明确的requirements.txt你需要根据代码中的导入语句手动安装。一个更稳妥的方法是查阅项目的setup.py或pyproject.toml文件。3.2 定义你的第一个智能体WriterAgent让我们从一个简单的“写作Agent”开始。这个Agent的能力是接受一个主题生成一篇短文。# agents/writer_agent.py import logging from typing import Dict, Any from pydantic import BaseModel, Field from some_llm_wrapper import LLMClient # 假设有一个LLM客户端 # 定义Agent的输入数据模型 class WriterInput(BaseModel): topic: str Field(description文章的主题) tone: str Field(defaultinformative, description文章的语气如 informative, humorous, formal) class WriterAgent: name WriterAgent version 1.0 def __init__(self, llm_client: LLMClient): self.llm_client llm_client self.logger logging.getLogger(__name__) def describe_capability(self) - Dict[str, Any]: 向Shogun注册自己的能力描述 return { name: self.name, version: self.version, description: 根据给定主题和语气撰写短文。, input_schema: WriterInput.schema(), # 提供输入格式的JSON Schema output_type: text } async def execute(self, task_context: Dict[str, Any], history: list None) - Dict[str, Any]: 执行写作任务 try: # 1. 从任务上下文中解析输入 input_data WriterInput(**task_context.get(input, {})) self.logger.info(f{self.name} 开始处理主题: {input_data.topic}) # 2. 构造LLM提示词 prompt f请以{input_data.tone}的语气撰写一篇关于{input_data.topic}的短文字数在200-300字之间。 # 3. 调用大模型 response await self.llm_client.generate(prompt) # 4. 返回标准化结果 return { status: success, data: { article: response, topic: input_data.topic, word_count: len(response.split()) }, message: 文章生成成功 } except Exception as e: self.logger.error(f{self.name} 执行失败: {e}) return { status: failed, data: None, message: f文章生成失败: {str(e)} }关键点解析输入验证使用Pydantic模型WriterInput来定义和验证输入参数。这能确保Shogun传来的数据格式正确避免在Agent内部处理脏数据。能力描述describe_capability方法返回的字典是Shogun进行任务匹配的“能力说明书”。input_schema尤其重要它让Shogun知道调用这个Agent需要提供什么样的数据。标准化输出execute方法返回一个固定结构的字典包含status成功/失败、data核心结果和message附加信息。这是所有Agent必须遵守的契约方便Shogun统一处理。3.3 构建中央协调器Shogun并编排任务流Shogun的核心是任务规划与调度逻辑。这里我们实现一个简化版的Shogun演示如何将“生成一份市场报告”这个复杂任务分解为“研究Agent”和“写作Agent”的串联任务。# shogun/simple_shogun.py import asyncio from typing import List, Dict, Any from agents.research_agent import ResearchAgent # 假设有一个研究Agent from agents.writer_agent import WriterAgent class SimpleShogun: def __init__(self, research_agent: ResearchAgent, writer_agent: WriterAgent): self.agents { research: research_agent, writer: writer_agent } self.task_queue asyncio.Queue() self.logger logging.getLogger(__name__) async def plan_tasks(self, user_goal: str) - List[Dict[str, Any]]: 根据用户目标进行任务规划 # 这是一个简单的硬编码规划器。在实际项目中这里可能会引入一个LLM作为“规划师”来动态分解任务。 if 市场报告 in user_goal: return [ { id: task_1, agent_type: research, input: {query: user_goal, max_results: 5}, depends_on: [] # 没有依赖是第一个任务 }, { id: task_2, agent_type: writer, input: {topic: 基于研究结果的综合市场报告, tone: professional}, depends_on: [task_1] # 依赖任务1的输出 } ] # ... 其他目标的规划逻辑 return [] async def execute_workflow(self, user_goal: str): 执行完整的工作流 self.logger.info(fShogun 开始处理目标: {user_goal}) # 1. 任务规划 tasks await self.plan_tasks(user_goal) task_results {} # 存储每个任务的结果 # 2. 任务调度与执行这里简化了依赖检查的并发逻辑 for task in tasks: task_id task[id] # 检查依赖是否全部完成 dependencies_met all(dep in task_results for dep in task[depends_on]) if not dependencies_met: # 在实际框架中这里应该将任务重新排队或等待 self.logger.warning(f任务 {task_id} 依赖未满足暂缓执行) continue # 准备任务上下文注入上游任务结果 context task[input].copy() for dep in task[depends_on]: # 例如将研究结果作为写作的输入 if dep task_1 and task_id task_2: context[research_data] task_results[dep][data] # 调度给对应Agent agent self.agents[task[agent_type]] self.logger.info(f调度任务 {task_id} 给 {agent.name}) result await agent.execute(context) task_results[task_id] result if result[status] failed: self.logger.error(f任务 {task_id} 执行失败工作流终止。) break # 3. 汇总最终结果 final_output task_results.get(tasks[-1][id]) if tasks else None self.logger.info(f工作流执行完毕。最终状态: {final_output[status] if final_output else 无任务}) return final_output实操心得在初期任务规划器plan_tasks可以用简单的规则来实现。但当任务复杂度增加时最好的方法是引入一个专门的“规划Agent”它本身也是一个LLM驱动的智能体负责理解用户目标并生成任务分解图。这样整个系统就形成了“规划-执行”的两层架构灵活性会大大增强。3.4 运行与测试你的多智能体系统最后我们需要一个主程序来将一切串联起来。# main.py import asyncio import logging from llm_client import get_llm_client # 假设的LLM客户端工厂函数 from agents.research_agent import ResearchAgent from agents.writer_agent import WriterAgent from shogun.simple_shogun import SimpleShogun logging.basicConfig(levellogging.INFO) async def main(): # 1. 初始化LLM客户端这里以OpenAI为例 llm_client get_llm_client(provideropenai, modelgpt-4) # 2. 初始化各个智能体 research_agent ResearchAgent(llm_clientllm_client) writer_agent WriterAgent(llm_clientllm_client) # 3. 初始化Shogun将军 shogun SimpleShogun(research_agentresearch_agent, writer_agentwriter_agent) # 4. 定义用户目标并执行 user_goal 请生成一份关于2024年人工智能趋势的市场报告摘要 final_result await shogun.execute_workflow(user_goal) # 5. 输出结果 if final_result and final_result[status] success: print( 市场报告生成成功 ) print(final_result[data][article]) else: print(工作流执行失败。) if __name__ __main__: asyncio.run(main())运行这个程序你将看到Shogun如何依次调度研究Agent和写作Agent最终生成一份报告。通过日志你可以清晰地追踪到每个步骤的状态这正是“可控”与“可见”价值的体现。4. 进阶实战实现一个具备自我反思与修正能力的评审Agent基础的多智能体协作是线性的任务A - 任务B - 任务C。但在真实场景中我们常常需要对中间结果进行质量检查不合格则需要打回重做。这就需要在工作流中引入“循环”和“条件判断”。让我们扩展系统增加一个“评审Agent”ReviewerAgent使其具备让工作流“自我修正”的能力。4.1 设计评审逻辑与工作流循环评审Agent的核心职责是评估上游任务产出的质量并给出“通过”、“需修改”或“失败”的判定。如果“需修改”它还需要生成具体的修改意见然后让上游Agent重新执行。我们需要修改Shogun的工作流引擎以支持这种带循环的流程。一种常见的模式是“执行-评审-修正”循环。# agents/reviewer_agent.py class ReviewerAgent: name ReviewerAgent def describe_capability(self): return { name: self.name, description: 评审文本内容的质量检查事实准确性、逻辑连贯性、语法错误等。, input_schema: { type: object, properties: { content_to_review: {type: string}, review_criteria: {type: string} # 评审标准如“检查是否有数据错误” } } } async def execute(self, task_context, history): content task_context[content_to_review] criteria task_context.get(review_criteria, 通用质量标准) # 调用LLM进行评审 prompt f你是一个严格的质检员。请根据以下标准评审这段内容 评审标准{criteria} 待评审内容{content} 请给出你的评审结果格式必须是JSON {{ verdict: pass | needs_revision | fail, score: 0-100的整数, feedback: 详细的评审反馈如果verdict是needs_revision请明确指出需要修改的部分和建议。 }} llm_response await self.llm_client.generate(prompt) # 这里需要解析LLM返回的JSON实际代码中应包含健壮的JSON解析和错误处理 review_result json.loads(llm_response) return { status: success, data: review_result, message: 评审完成 }4.2 在Shogun中集成循环与条件逻辑现在我们需要增强Shogun使其能处理评审结果并决定是继续前进、返回修改还是彻底失败。# shogun/advanced_shogun.py (部分代码) class AdvancedShogun(SimpleShogun): async def execute_task_with_review(self, task_def, upstream_result, max_retries3): 执行一个任务并附带评审循环 agent self.agents[task_def[agent_type]] task_id task_def[id] for attempt in range(max_retries): # 执行主任务 execution_result await agent.execute(upstream_result) if execution_result[status] ! success: return {status: failed, attempt: attempt, error: execution_result[message]} # 如果有评审环节 if task_def.get(needs_review, False): review_task task_def[review_task] # 评审任务定义 review_agent self.agents[review_task[agent_type]] # 构造评审上下文 review_context { content_to_review: execution_result[data], review_criteria: review_task.get(criteria, 通用质量标准) } review_result await review_agent.execute(review_context) if review_result[data][verdict] pass: # 评审通过返回结果 return { status: success, data: execution_result[data], review_score: review_result[data][score], attempts: attempt 1 } elif review_result[data][verdict] needs_revision: # 需要修改将反馈注入下一轮循环 self.logger.info(f任务 {task_id} 第{attempt1}次尝试未通过评审反馈: {review_result[data][feedback]}) upstream_result {**upstream_result, feedback: review_result[data][feedback]} continue # 进入下一轮重试循环 else: # verdict fail return {status: failed, attempt: attempt, reason: 评审不通过, feedback: review_result[data][feedback]} else: # 无需评审直接返回 return {status: success, data: execution_result[data], attempts: 1} # 重试次数用尽 return {status: failed, reason: 达到最大重试次数仍未通过评审}关键设计最大重试次数必须设置一个上限如3次防止因为评审标准过于严苛或Agent逻辑问题导致无限循环。反馈注入将评审Agent的反馈feedback作为新的输入参数传递给执行Agent指导其进行修改。这要求执行Agent能够理解并处理feedback字段。状态持久化在循环中每一次尝试的结果和评审意见都应该被记录到状态存储中这对于调试和后期分析至关重要。4.3 效果评估与循环控制策略引入评审循环后系统的输出质量通常会显著提升但代价是增加延迟和计算成本更多的LLM调用。你需要制定策略来平衡质量和效率。分层评审不是所有任务都需要评审。只为最关键、最容易出错的环节如最终报告生成、代码输出设置评审。对于数据搜集等前置任务可以降低评审频率或标准。动态评审阈值评审Agent的打分score可以用来实现动态控制。例如第一次执行后如果评分高于90分则直接通过低于70分则触发修改介于70-90分之间则根据任务紧急程度决定是否重试。并行评审与投票对于极其重要的任务可以引入多个评审Agent进行“同行评议”采用投票机制决定是否通过以提高评审的客观性。踩坑提醒实现评审循环时最容易出现的问题是“死循环”。除了设置最大重试次数外一定要确保评审反馈是具体、可操作的。模糊的反馈如“写得更好一点”会让执行Agent无所适从导致每次修改都无法满足要求。在实践中可以训练评审Agent提供结构化反馈例如“第一段的数据引用需要更新为2023年的数据”、“结论部分需要增加与竞争对手的对比”。5. 生产环境部署考量与性能优化当你的多智能体系统从Demo走向生产环境会面临一系列新的挑战并发请求、Agent故障、LLM API的速率限制和成本控制等。multi-agent-shogun的集中式架构为应对这些挑战提供了良好的基础。5.1 高可用与弹性伸缩设计Shogun的高可用作为系统的单点Shogun必须是无状态的并且可以水平扩展。这意味着所有的状态任务、Agent状态必须存储在外部共享数据库如PostgreSQL或缓存如Redis Cluster中。多个Shogun实例可以同时运行通过负载均衡器如Nginx分发用户请求。它们从共享状态存储中读取和更新任务信息通过分布式锁如Redis Redlock来保证对同一任务的调度不会冲突。Agent的弹性伸缩每个Agent应该被部署为独立的微服务。使用容器化技术Docker打包每个Agent及其依赖。使用Kubernetes或Docker Swarm等编排工具来管理Agent的部署和伸缩。当任务队列积压时可以自动增加某个类型Agent的副本数当空闲时则可以缩减以节省资源。Agent服务通过健康检查接口向Shogun或服务注册中心如Consul报告其状态。Shogun只会将任务分发给健康的Agent实例。5.2 消息队列与异步通信的选型在原型阶段你可能使用内存队列或简单的Redis列表。但在生产环境需要一个健壮的消息队列来保证消息不丢失、支持复杂的路由模式。RabbitMQ功能强大支持多种消息模式工作队列、发布/订阅等消息确认机制完善社区成熟。适合对消息可靠性要求极高的场景。Apache Kafka高吞吐量、持久化日志适合数据流处理场景。如果你的多智能体系统需要将所有的任务事件、中间结果都作为日志流保存下来供后续分析Kafka是很好的选择。Redis Streams如果系统已经重度使用Redis并且消息量不是极端巨大Redis Streams是一个轻量级且高效的选择。它提供了消费者组等高级功能足以满足大多数多Agent系统的需求。选择建议对于大多数应用从Redis Streams开始是最快最省事的。当消息量剧增或需要更复杂的路由时再迁移到RabbitMQ。5.3 监控、日志与可观测性体系“可控”与“可见”离不开完善的监控。你需要监控三个层面系统层面CPU、内存、网络IO。使用Prometheus Grafana来收集和展示指标。应用层面Shogun监控任务队列长度、任务平均处理时间、任务成功率/失败率。每个Agent监控调用次数、平均响应时间、LLM API调用失败率、Token消耗量。在每个关键函数中埋点记录耗时和结果状态。这些日志需要结构化JSON格式并统一发送到日志聚合系统如ELK Stack或Loki。业务层面定义关键业务指标。例如对于写作流水线可以定义“文章平均生成时间”、“一次通过评审率”、“人工介入率”等。这些指标能直接反映系统的业务价值。一个简单的实现是为Shogun和每个Agent添加一个Telemetry类自动记录每次执行的元数据。# common/telemetry.py import time import json from contextlib import contextmanager from typing import Dict, Any class Telemetry: def __init__(self, logger, agent_name): self.logger logger self.agent_name agent_name contextmanager def track_execution(self, task_id: str, extra: Dict[str, Any] None): 跟踪一次任务执行的上下文管理器 start_time time.time() status started try: yield status success except Exception as e: status failed raise e finally: duration time.time() - start_time log_entry { timestamp: time.time(), agent: self.agent_name, task_id: task_id, status: status, duration_seconds: round(duration, 3), **(extra or {}) } self.logger.info(json.dumps(log_entry)) # 结构化日志在Agent的execute方法中使用这个跟踪器async def execute(self, task_context, history): with self.telemetry.track_execution(task_context.get(task_id), {input_topic: task_context.get(topic)}): # ... 原有的执行逻辑这样每一条执行记录都包含了丰富的上下文信息便于后续的问题排查和性能分析。6. 典型问题排查与调试技巧实录即使设计得再完善在实际运行中也会遇到各种问题。以下是我在开发和运维这类系统时遇到的一些典型问题及解决方法。6.1 Agent执行超时或无响应现象Shogun将任务分配给某个Agent后长时间收不到回复导致整个工作流卡住。排查思路检查Agent健康状态首先确认Agent的微服务容器是否正在运行其健康检查接口如/health是否返回正常。检查网络与依赖Agent能否正常访问它所依赖的外部服务例如LLM API端点、数据库、或其他微服务。在Agent的日志中查找连接超时或拒绝连接的报错。审查任务负载是否某个任务过于复杂导致LLM生成时间极长可以在Agent的execute方法入口和出口打上时间戳记录实际处理耗时。对于长任务应考虑实现异步处理或支持进度上报。设置超时与重试在Shogun调用Agent时必须设置网络超时如HTTP请求超时和业务超时如等待任务完成的超时。超时后Shogun应能将任务标记为失败或重新分配给另一个Agent实例。# Shogun侧的任务调用示例使用httpx import httpx from asyncio import timeout async def dispatch_to_agent(agent_url, task_payload): try: async with timeout(30): # 总超时30秒 async with httpx.AsyncClient(timeouthttpx.Timeout(10.0)) as client: # 连接超时10秒 response await client.post(agent_url, jsontask_payload) response.raise_for_status() return response.json() except (httpx.TimeoutException, asyncio.TimeoutError): # 记录超时触发重试或失败处理 return {status: failed, message: Agent响应超时} except Exception as e: # 处理其他异常 return {status: failed, message: f调用Agent失败: {str(e)}}6.2 任务依赖死锁现象工作流停滞日志显示任务都在等待其他任务完成形成循环等待。原因任务规划器可能是LLM生成了错误的任务依赖图例如任务A依赖任务B的结果同时任务B又依赖任务A的结果。解决方案依赖图校验在Shogun接受任务规划结果后增加一个校验环节检查任务依赖关系是否为有向无环图DAG。可以使用图论算法如拓扑排序进行检测如果发现环则拒绝执行并报错。规划Agent的提示词工程给负责规划的LLM更明确的指令强调“必须生成一个无循环的、线性的或分阶段的任务列表”。可以提供正确和错误的示例进行Few-shot学习。超时解锁与人工干预为每个任务设置一个“最大等待依赖时间”。超时后系统可以自动将该任务标记为“依赖失败”并通知管理员进行人工检查和解锁。6.3 LLM API成本与速率限制失控现象系统运行一段时间后LLM API调用费用激增或开始频繁收到速率限制429错误。管控策略实施配额管理在Shogun或一个专门的“配额管理Agent”中为每个用户、每个任务类型或每个Agent设置Token消耗或调用次数的每日/每月配额。达到配额后新的任务请求会被拒绝或排队。请求队列与限流将所有对LLM的调用都通过一个中央的“LLM网关”进行。这个网关负责缓存对相同的提示词请求直接返回缓存结果。限流根据LLM服务商的速率限制如RPM, TPM实现令牌桶或漏桶算法平滑请求流量。降级当主要模型如GPT-4达到限额或响应缓慢时自动降级到更便宜的模型如GPT-3.5-Turbo。详细审计日志记录每一次LLM调用的时间、模型、提示词可脱敏、消耗的Token数和成本。这不仅能用于计费更是分析和优化提示词、减少不必要开销的依据。6.4 智能体“幻觉”导致任务质量不稳定现象Agent尤其是LLM驱动的有时会产生不符合事实或逻辑的“幻觉”输出导致下游任务失败或产出低质量结果。缓解措施提示词约束与模板化尽可能使用严格的输出模板要求LLM以指定格式如JSON、XML返回数据。这能大大降低其“自由发挥”导致解析失败的概率。事实核查Agent在关键信息生成环节后插入一个专门的事实核查Agent。这个Agent可以利用搜索引擎API、知识库检索RAG来验证生成内容中的关键事实和数据。集成不确定性评估要求LLM在输出答案的同时给出一个置信度分数。Shogun可以根据这个分数来决定是否接受该结果或者触发二次验证、人工审核流程。多智能体投票对于非常重要的问题可以同时让多个同类型的Agent独立处理然后通过一个“投票Agent”或简单的规则如多数决来汇总最终结果。这能在一定程度上抵消单个Agent的幻觉风险。调试这类系统一个强大的可视化工具至关重要。可以考虑开发一个简单的Web仪表盘实时展示任务依赖图、每个节点的状态颜色表示成功/失败/运行中、日志流以及关键指标。这能让你一眼看清系统卡在哪里快速定位问题根源。