作者逆境不可逃技术永无止境希望我的内容可以帮助到你大家吼 ! 我是 逆境不可逃 今天给大家带来文章《【与我学 ClaudeCode】协作篇 之 Worktree Task Isolation 目录隔离的并行执行通道》.Learn-Claude-Code 官方地址 :shareAI-lab/learn-claude-code: Bash is all you need - A nano claude code–like 「agent harness」, built from 0 to 1上一篇文章【与我学 ClaudeCode】协作篇 之 Autonomous Agents 自组织任务认领与空闲治理-CSDN博客Worktree Task Isolation 是迭代的第 12 个版本s12核心解决多 Agent 并行修改同一文件导致的冲突问题。它在 s11 自组织任务认领的基础上引入了 Git Worktree 实现目录级隔离构建了「共享任务板 隔离执行通道」的双层架构让每个任务都在独立目录中执行彻底避免编辑冲突和 Git 状态污染。学习路线s01 s02 s03 s04 s05 s06| s07s08 s09 s10 s11 s12一、问题根源为什么共享目录撑不起并行任务到 s11 版本Agent 已经能自主认领和完成任务但所有任务共享一个工作目录两个 Agent 同时重构不同模块A 修改config.pyB 也修改config.py未提交的改动互相污染任务回滚困难修改混在一起无法干净撤销单个任务的变更并行执行的 Git 状态混乱无法区分不同任务的提交和变更真正的并行任务需要每个任务有独立的执行目录修改互不干扰任务与目录强绑定状态可追踪、可恢复全局可见性与局部隔离性兼顾既知道谁在做什么又避免冲突二、六大核心设计决策Worktree Task Isolation 通过六个关键设计构建了一个简单、可靠、可恢复的并行任务执行系统。1. 共享任务板 隔离执行通道核心设计任务板继续集中在.tasks/而文件改动发生在按任务划分的 worktree 目录中。这样既保留了全局可见性谁在做什么、完成到哪又避免所有人同时写同一目录导致的冲突。协调层简单一个任务板执行层安全多条隔离通道。替代方案的致命缺陷单个共享工作区实现更简单但会导致编辑冲突和混乱的 Git 状态每个任务完全独立的存储目录可以避免冲突但会失去团队级可见性让规划变得更困难。2. 显式 worktree 生命周期索引核心设计.worktrees/index.json记录每个 worktree 的名称、路径、分支、task_id与状态。即使上下文压缩或进程重启这些生命周期状态仍可检查和恢复。它也为list/status/remove提供了确定性的本地数据源。替代方案的致命缺陷仅依赖git worktree list可以维护本地记录但会丢失任务绑定元数据和自定义生命周期状态仅在内存中保存所有状态代码更简单但会破坏可恢复性。3. 按通道cwd路由 禁止重入核心设计命令通过worktree_run(name, command)使用cwd参数路由到 worktree 目录。重入保护避免了在已激活的 worktree 上下文中意外二次进入保持生命周期归属清晰。替代方案的致命缺陷全局cwd变量修改容易实现但会在并行工作中泄漏上下文允许静默重入会让生命周期归属变得模糊并使清理行为复杂化。4. 追加式生命周期事件流核心设计生命周期事件写入.worktrees/events.jsonl如worktree.create.*、worktree.remove.*、task.completed。这样状态迁移可查询、可追踪失败也会以*.failed显式暴露而不是静默丢失。替代方案的致命缺陷仅依赖控制台日志更轻量但在长时间会话中很脆弱且难以审计完整的事件总线基础设施功能强大但对于这个教学基线来说过于笨重。5. 任务与工作区一起收尾核心设计worktree_remove(..., complete_taskTrue)允许在一个动作里完成收尾删除隔离目录并把绑定任务标记为completed。收尾保持为显式工具驱动迁移worktree_keep/worktree_remove而不是隐藏的自动清理。这样可减少状态悬挂任务已完成但临时工作区仍活跃或反过来。替代方案的致命缺陷完全手动收尾提供了灵活性但会增加操作偏差每次完成时自动删除工作区存在在最终审查前误删工作区的风险。6. 事件流是观测旁路不是状态机替身核心设计生命周期事件提升可审计性但真实状态源仍是任务 / 工作区状态文件。事件更适合做迁移轨迹而不是替代主状态机。替代方案的致命缺陷仅使用日志会隐藏结构化的状态迁移仅使用事件作为状态源在重放 / 修复语义未定义时容易出现状态漂移。三、系统整体架构与工作原理1. 双层架构控制平面 执行平面Control plane (.tasks/) Execution plane (.worktrees/) ------------------ ------------------------ | task_1.json | | auth-refactor/ | | status: in_progress ------ branch: wt/auth-refactor | worktree: auth-refactor | task_id: 1 | ------------------ ------------------------ | task_2.json | | ui-login/ | | status: pending ------ branch: wt/ui-login | worktree: ui-login | task_id: 2 | ------------------ ------------------------ | index.json (worktree registry) events.jsonl (lifecycle log)控制平面.tasks/目录下的 JSON 文件记录任务的目标、状态、owner 和绑定的 worktree 名称执行平面.worktrees/目录下的独立 Git 工作区每个 worktree 对应一个任务拥有独立的分支和目录2. 关键组件与实现细节(1) TaskManager共享任务板class TaskManager: def __init__(self, tasks_dir: Path): self.dir tasks_dir self.dir.mkdir(parentsTrue, exist_okTrue) self._next_id self._max_id() 1 def create(self, subject: str, description: str ) - str: 创建新任务状态为 pending task { id: self._next_id, subject: subject, description: description, status: pending, owner: , worktree: , blockedBy: [], created_at: time.time(), updated_at: time.time(), } self._save(task) self._next_id 1 return json.dumps(task, indent2) def bind_worktree(self, task_id: int, worktree: str, owner: str ) - str: 绑定任务到 worktree状态推进为 in_progress task self._load(task_id) task[worktree] worktree if owner: task[owner] owner if task[status] pending: task[status] in_progress task[updated_at] time.time() self._save(task) return json.dumps(task, indent2)(2) WorktreeManager隔离执行通道管理class WorktreeManager: def __init__(self, repo_root: Path, tasks: TaskManager, events: EventBus): self.repo_root repo_root self.tasks tasks self.events events self.dir repo_root / .worktrees self.dir.mkdir(parentsTrue, exist_okTrue) self.index_path self.dir / index.json if not self.index_path.exists(): self.index_path.write_text(json.dumps({worktrees: []}, indent2)) self.git_available self._is_git_repo() def create(self, name: str, task_id: int None, base_ref: str HEAD) - str: 创建 worktree 并绑定任务 self._validate_name(name) if self._find(name): raise ValueError(fWorktree {name} already exists in index) if task_id is not None and not self.tasks.exists(task_id): raise ValueError(fTask {task_id} not found) path self.dir / name branch fwt/{name} # 发出创建前事件 self.events.emit(worktree.create.before, task{id: task_id}, worktree{name: name, base_ref: base_ref}) try: # 创建 Git worktree self._run_git([worktree, add, -b, branch, str(path), base_ref]) # 更新索引 entry { name: name, path: str(path), branch: branch, task_id: task_id, status: active, created_at: time.time(), } idx self._load_index() idx[worktrees].append(entry) self._save_index(idx) # 绑定任务 if task_id is not None: self.tasks.bind_worktree(task_id, name) # 发出创建后事件 self.events.emit(worktree.create.after, task{id: task_id}, worktreeentry) return json.dumps(entry, indent2) except Exception as e: self.events.emit(worktree.create.failed, task{id: task_id}, worktree{name: name, base_ref: base_ref}, errorstr(e)) raise def run(self, name: str, command: str) - str: 在指定 worktree 中执行命令自动路由 cwd wt self._find(name) if not wt: return fError: Unknown worktree {name} path Path(wt[path]) if not path.exists(): return fError: Worktree path missing: {path} try: # 命令在 worktree 目录中执行不影响其他任务 r subprocess.run( command, shellTrue, cwdpath, capture_outputTrue, textTrue, timeout300, ) out (r.stdout r.stderr).strip() return out[:50000] if out else (no output) except subprocess.TimeoutExpired: return Error: Timeout (300s) def remove(self, name: str, force: bool False, complete_task: bool False) - str: 删除 worktree可选择同时完成绑定任务 wt self._find(name) if not wt: return fError: Unknown worktree {name} self.events.emit(worktree.remove.before, task{id: wt.get(task_id)}, worktree{name: name, path: wt.get(path)}) try: # 删除 Git worktree args [worktree, remove] if force: args.append(--force) args.append(wt[path]) self._run_git(args) # 完成绑定任务 if complete_task and wt.get(task_id) is not None: task_id wt[task_id] before json.loads(self.tasks.get(task_id)) self.tasks.update(task_id, statuscompleted) self.tasks.unbind_worktree(task_id) self.events.emit(task.completed, task{id: task_id, subject: before.get(subject, ), status: completed}, worktree{name: name}) # 更新索引状态 idx self._load_index() for item in idx.get(worktrees, []): if item.get(name) name: item[status] removed item[removed_at] time.time() self._save_index(idx) self.events.emit(worktree.remove.after, task{id: wt.get(task_id)}, worktree{name: name, path: wt.get(path), status: removed}) return fRemoved worktree {name} except Exception as e: self.events.emit(worktree.remove.failed, task{id: wt.get(task_id)}, worktree{name: name, path: wt.get(path)}, errorstr(e)) raise(3) EventBus生命周期事件流class EventBus: def __init__(self, event_log_path: Path): self.path event_log_path self.path.parent.mkdir(parentsTrue, exist_okTrue) if not self.path.exists(): self.path.write_text() def emit(self, event: str, task: dict | None None, worktree: dict | None None, error: str | None None): 追加写入事件流记录状态迁移 payload { event: event, ts: time.time(), task: task or {}, worktree: worktree or {}, } if error: payload[error] error with self.path.open(a, encodingutf-8) as f: f.write(json.dumps(payload) \n) def list_recent(self, limit: int 20) - str: 读取最近的事件用于审计和调试 n max(1, min(int(limit or 20), 200)) lines self.path.read_text(encodingutf-8).splitlines() recent lines[-n:] items [] for line in recent: try: items.append(json.loads(line)) except Exception: items.append({event: parse_error, raw: line}) return json.dumps(items, indent2)(4) 新增工具集TOOL_HANDLERS { # 基础工具 bash: lambda **kw: run_bash(kw[command]), read_file: lambda **kw: run_read(kw[path], kw.get(limit)), write_file: lambda **kw: run_write(kw[path], kw[content]), edit_file: lambda **kw: run_edit(kw[path], kw[old_text], kw[new_text]), # 任务管理工具 task_create: lambda **kw: TASKS.create(kw[subject], kw.get(description, )), task_list: lambda **kw: TASKS.list_all(), task_get: lambda **kw: TASKS.get(kw[task_id]), task_update: lambda **kw: TASKS.update(kw[task_id], kw.get(status), kw.get(owner)), task_bind_worktree: lambda **kw: TASKS.bind_worktree(kw[task_id], kw[worktree], kw.get(owner, )), # Worktree 管理工具 worktree_create: lambda **kw: WORKTREES.create(kw[name], kw.get(task_id), kw.get(base_ref, HEAD)), worktree_list: lambda **kw: WORKTREES.list_all(), worktree_status: lambda **kw: WORKTREES.status(kw[name]), worktree_run: lambda **kw: WORKTREES.run(kw[name], kw[command]), worktree_keep: lambda **kw: WORKTREES.keep(kw[name]), worktree_remove: lambda **kw: WORKTREES.remove(kw[name], kw.get(force, False), kw.get(complete_task, False)), worktree_events: lambda **kw: EVENTS.list_recent(kw.get(limit, 20)), }(5) 执行流程四、与 Autonomous Agentss11的关键变更对比组件之前s11 Autonomous Agents之后s12 Worktree Task Isolation协调机制任务板owner/status任务板 worktree 显式绑定执行范围共享目录每个任务独立目录Git Worktree可恢复性仅任务状态任务状态 worktree 索引 事件流收尾流程任务完成隐式任务完成 显式keep/remove操作生命周期可见性隐式日志.worktrees/events.jsonl显式事件流冲突防护无共享目录易冲突目录级隔离修改互不干扰五、核心优势与创新点目录级隔离彻底避免冲突每个任务在独立的 Git Worktree 中执行修改互不干扰解决了并行任务的文件编辑冲突问题任务与目录强绑定状态可追踪通过task_id关联任务和 worktree任务状态与目录状态同步崩溃后可通过.tasks/和.worktrees/index.json重建现场可恢复的生命周期管理worktree 索引和事件流记录了完整的状态迁移过程进程重启后可恢复所有任务和 worktree 状态显式收尾流程避免状态悬挂worktree_remove可同时删除目录并完成任务worktree_keep可保留目录供后续使用避免任务完成但目录未清理或反之的状态不一致问题可审计的事件流追加式事件流记录了所有 worktree 和任务的生命周期事件便于调试和审计失败场景六、运行示例并行任务隔离执行流程创建任务领导调用task_create(Implement auth refactor)创建任务#1状态为pending创建 worktree 并绑定调用worktree_create(auth-refactor, task_id1)创建 Git Worktree 目录任务状态推进为in_progressworktree 索引记录状态为active在隔离目录中执行命令调用worktree_run(auth-refactor, python auth.py)命令在.worktrees/auth-refactor目录中执行不影响其他任务并行执行其他任务同时创建任务#2UI 登录优化绑定 worktreeui-login两个任务在各自目录中并行执行修改互不干扰任务收尾任务#1完成后调用worktree_remove(auth-refactor, complete_taskTrue)删除 worktree 目录任务状态更新为completed同时发出task.completed和worktree.remove.after事件七、可扩展方向Worktree 权限控制为不同角色的 Agent 分配不同的 worktree 访问权限实现更细粒度的安全控制Worktree 分支管理支持为 worktree 指定不同的 Git 分支实现基于分支的任务隔离任务依赖与 worktree 复用支持任务完成后保留 worktree供后续依赖任务复用减少重复创建开销事件流告警机制为特定事件如worktree.create.failed添加告警实时通知系统异常多仓库支持扩展 worktree 管理支持多个 Git 仓库的任务隔离执行学习路线s01 s02 s03 s04 s05 s06| s07s08s09 s10s11 s12