并行执行概述

PinableAgents 的并行任务执行引擎允许将多个独立或半独立的任务同时分发到 AI 后端执行,从而大幅缩短整体完成时间。与简单的并发请求不同,PinableAgents 的并行引擎是依赖感知的——它理解任务之间的先后关系,只有当一个任务的所有前置依赖都完成后,才会启动该任务的执行。

并行执行的核心数据结构是有向无环图(DAG)。每个任务是图中的一个节点,任务之间的依赖关系用有向边表示。引擎通过拓扑排序确定执行顺序,然后按照拓扑层级将同一层级的任务并行分发。这种设计在保证正确性的同时最大化了并行度。

并行执行在以下场景中特别有效:为多个独立文件编写单元测试、对多个模块同时进行代码审查、在多个后端上执行同一任务以比较结果、以及大型重构中的多文件并行修改。据内部测量,对于 10 个独立文件的测试编写任务,并行执行(3 个 worker)相比串行执行可以节省约 65% 的总时间。

并行执行不是银弹。只有当任务之间真正独立(没有共享状态或文件冲突)时,并行才能带来收益。盲目并行化有依赖关系的任务可能导致冲突和错误。

任务定义格式

并行任务使用 JSON 格式定义。每个任务包含唯一标识、描述、依赖列表、目标文件和可选的后端指定。以下是一个完整的任务定义示例:

{
  "parallel_tasks": {
    "max_workers": 3,
    "timeout": "30m",
    "fail_strategy": "fail_fast",
    "tasks": [
      {
        "id": "test_user",
        "task": "为 user.go 编写完整的单元测试",
        "files": ["src/user.go"],
        "depends_on": [],
        "backend": "claude",
        "priority": 1
      },
      {
        "id": "test_order",
        "task": "为 order.go 编写完整的单元测试",
        "files": ["src/order.go"],
        "depends_on": [],
        "backend": "codex",
        "priority": 1
      },
      {
        "id": "test_payment",
        "task": "为 payment.go 编写完整的单元测试",
        "files": ["src/payment.go"],
        "depends_on": [],
        "backend": "gemini",
        "priority": 1
      },
      {
        "id": "test_integration",
        "task": "编写 user、order、payment 三个模块的集成测试",
        "files": ["src/user.go", "src/order.go", "src/payment.go"],
        "depends_on": ["test_user", "test_order", "test_payment"],
        "backend": "claude",
        "priority": 2
      }
    ]
  }
}

关键字段说明:id 是任务的唯一标识符,用于依赖引用;depends_on 列出该任务依赖的其他任务 ID,空数组表示无依赖(可以立即执行);priority 用于在同一拓扑层级中决定执行顺序(数字越小优先级越高);fail_strategy 控制失败行为,fail_fast 表示任一任务失败则立即终止所有任务,continue 表示忽略失败继续执行其余任务。

DAG 依赖模型

任务之间的依赖关系构成一个有向无环图(DAG)。引擎在启动执行前会进行 DAG 验证,确保图中不存在环形依赖。如果检测到环形依赖,引擎会报错并指出构成环的任务列表。

以上述示例为基础,DAG 结构可以用文本可视化如下:

Layer 0 (并行执行):
  [test_user] ----+
  [test_order] ---+---> [test_integration]
  [test_payment] -+

Layer 1 (等待 Layer 0 全部完成):
  [test_integration]

DAG 可视化:
  test_user --------\
                     \
  test_order ---------+---> test_integration
                     /
  test_payment -----/

执行时间线:
  t=0s   开始 test_user, test_order, test_payment (并行)
  t=7s   test_order 完成
  t=8s   test_user 完成
  t=9s   test_payment 完成
  t=9s   开始 test_integration (所有依赖已满足)
  t=18s  test_integration 完成
  总时间: 18s (串行需要约 36s)

拓扑排序算法

PinableAgents 使用 Kahn 算法(基于入度的广度优先拓扑排序)来确定任务执行顺序。该算法的选择基于两个原因:第一,它能自然地将任务分层,同一层的任务可以并行执行;第二,它能在排序过程中检测环形依赖。

算法的核心步骤如下:首先计算每个节点的入度(即依赖数量)。将所有入度为零的节点(无依赖任务)放入执行队列,构成第一层。执行队列中的任务并行执行。当一个任务完成后,将其所有后继节点的入度减一。如果某个后继节点的入度变为零,将其加入下一层的执行队列。重复此过程直到所有任务完成。

// scheduler.go - 拓扑排序与并行调度
func (s *Scheduler) buildExecutionPlan(tasks []Task) ([][]Task, error) {
    // 计算入度
    inDegree := make(map[string]int)
    graph := make(map[string][]string)
    for _, t := range tasks {
        inDegree[t.ID] = len(t.DependsOn)
        for _, dep := range t.DependsOn {
            graph[dep] = append(graph[dep], t.ID)
        }
    }

    // 按层级分组
    var layers [][]Task
    taskMap := make(map[string]Task)
    for _, t := range tasks {
        taskMap[t.ID] = t
    }

    for {
        var layer []Task
        for _, t := range tasks {
            if inDegree[t.ID] == 0 {
                layer = append(layer, t)
                inDegree[t.ID] = -1 // 标记已处理
            }
        }
        if len(layer) == 0 {
            break
        }
        // 更新后继节点入度
        for _, t := range layer {
            for _, next := range graph[t.ID] {
                inDegree[next]--
            }
        }
        layers = append(layers, layer)
    }

    // 检测环形依赖
    for id, deg := range inDegree {
        if deg > 0 {
            return nil, fmt.Errorf("circular dependency detected involving task: %s", id)
        }
    }
    return layers, nil
}

并发控制与资源管理

并行执行需要精细的并发控制以避免资源耗尽。PinableAgents 通过以下机制管理并发:

Worker 池

max_workers 参数限定同时执行的最大任务数量。即使同一层有 10 个任务,如果 max_workers 设为 3,也只会同时执行 3 个,其余在队列中等待。Worker 数量的设置应考虑 API 速率限制和系统资源。

速率限制器

针对每个 AI 后端独立维护速率限制状态。如果某个后端返回 429(速率限制)错误,调度器会自动暂停向该后端的请求发送,等待限制窗口过去后再恢复。这不会影响其他后端的任务执行。

内存管理

每个 worker 的内存使用独立计量。当总内存使用接近系统限制时,调度器会暂停启动新任务,等待现有任务完成释放内存后再继续。默认内存上限为系统可用内存的 70%。

# 配置并发控制参数
pinable-agents run parallel --config tasks.json \
  --max-workers 4 \
  --rate-limit-per-backend 10 \
  --memory-limit 4GB \
  --timeout 30m

错误传播机制

并行执行中的错误处理比串行执行复杂得多,因为需要处理多个任务同时失败的情况,以及失败任务对后续依赖任务的影响。PinableAgents 提供两种错误处理策略:

fail_fast(快速失败):当任何一个任务失败时,立即取消所有正在执行和等待执行的任务。这适用于任务间高度相关的场景,一个失败意味着整体结果无意义。引擎会发送取消信号给所有正在运行的 worker,等待它们优雅退出后汇总错误报告。

continue(继续执行):当一个任务失败时,标记该任务为失败,跳过所有直接或间接依赖于该任务的后续任务,但继续执行所有不受影响的任务。最终报告中会列出成功、失败和跳过的任务。这适用于任务相对独立的场景。

// 错误传播示例
// 假设 DAG: A -> C, B -> C, B -> D

// fail_fast 模式:
// A 失败 -> 立即取消 B(如果还在运行), 跳过 C, D
// 结果: A=失败, B=取消, C=跳过, D=跳过

// continue 模式:
// A 失败 -> B 继续执行
// B 成功 -> C 跳过(因为依赖 A 已失败), D 正常执行
// 结果: A=失败, B=成功, C=跳过(依赖失败), D=成功

在 continue 模式下,被跳过的任务不会计入失败计数。最终报告会明确区分"执行失败"和"依赖未满足而跳过"两种状态,帮助你快速定位根本原因。

Worktree 隔离集成

当多个并行任务需要修改文件时,如果它们操作同一个工作目录,就可能产生文件冲突。PinableAgents 通过 Git worktree 机制解决这个问题。每个并行任务可以在独立的 worktree 中执行,完成后将变更合并回主分支。

Worktree 隔离的工作原理如下:启动并行执行时,引擎为每个任务创建一个独立的 Git worktree(基于当前分支的相同 commit)。每个任务在自己的 worktree 中执行文件修改,互不影响。所有任务完成后,引擎按照拓扑顺序将各 worktree 的变更依次合并回主分支。如果合并时出现冲突,引擎会暂停并提示用户手动解决。

# 启用 worktree 隔离的并行执行
pinable-agents run parallel --config tasks.json --worktree

# 引擎内部执行的 Git 操作
git worktree add .worktrees/task_test_user HEAD
git worktree add .worktrees/task_test_order HEAD
git worktree add .worktrees/task_test_payment HEAD

# 任务完成后的合并流程
cd .worktrees/task_test_user && git add -A && git commit -m "task: test_user"
cd .worktrees/task_test_order && git add -A && git commit -m "task: test_order"
cd .worktrees/task_test_payment && git add -A && git commit -m "task: test_payment"

# 合并回主分支
git merge task_test_user task_test_order task_test_payment --no-ff

# 清理 worktrees
git worktree remove .worktrees/task_test_user
git worktree remove .worktrees/task_test_order
git worktree remove .worktrees/task_test_payment

跨后端并行执行

并行执行引擎的一个强大特性是支持跨后端并行。你可以将不同的任务分配给不同的 AI 后端同时执行。例如,将擅长代码生成的 Codex 用于单元测试编写,将擅长分析的 Claude 用于代码审查,将擅长快速响应的 Gemini 用于文档生成。

跨后端并行的优势不仅在于速度。由于不同后端有独立的速率限制配额,跨后端并行可以突破单一后端的速率瓶颈。例如,如果 Codex 每分钟限制 60 次请求,Claude 每分钟限制 40 次请求,那么跨两个后端并行可以实现每分钟 100 次的有效请求吞吐量。

执行监控与进度追踪

并行任务的执行状态可以通过多种方式监控。CLI 提供实时的进度输出,桌面端提供可视化的 DAG 执行状态图。

# CLI 实时监控输出示例
$ pinable-agents run parallel --config tasks.json --progress

[00:00] Starting parallel execution (4 tasks, 3 workers)
[00:00] [Layer 0] Starting 3 tasks in parallel
[00:00] [test_user]    RUNNING  (claude)  ████░░░░░░  40%
[00:00] [test_order]   RUNNING  (codex)   ██████░░░░  60%
[00:00] [test_payment] RUNNING  (gemini)  ███░░░░░░░  30%
[00:07] [test_order]   DONE     (codex)   ██████████ 100%  7.2s
[00:08] [test_user]    DONE     (claude)  ██████████ 100%  8.1s
[00:09] [test_payment] DONE     (gemini)  ██████████ 100%  9.4s
[00:09] [Layer 1] Starting 1 task
[00:09] [test_integration] RUNNING (claude) ░░░░░░░░░░  0%
[00:18] [test_integration] DONE    (claude) ██████████ 100%  8.7s

Summary: 4/4 tasks completed, 0 failed
Total time: 18.2s (serial estimate: 33.4s, speedup: 1.84x)

性能调优指南

并行执行的性能受多个因素影响。以下表格总结了关键调优参数及其推荐值:

参数 默认值 推荐范围 说明
max_workers 3 2-6 同时执行的最大任务数,受 API 速率限制约束
timeout 10m 5m-30m 单个任务超时时间,按复杂度分级设置
retry_count 2 1-3 可重试错误的最大重试次数
retry_backoff 1s 1s-5s 重试间隔基数(指数退避)
memory_limit 70% 50%-80% 允许使用的系统内存上限百分比

max_workers 设置:worker 数量不是越多越好。过多的 worker 会触发 API 速率限制,反而降低效率。建议的起始值是 3,然后根据实际的速率限制情况逐步调整。如果使用多个后端,每个后端可以独立配置 worker 数量。

任务粒度:任务不宜过大也不宜过小。过大的任务(如"为整个项目添加测试")无法并行化;过小的任务(如"为某个函数添加一行注释")会产生过多的调度开销。理想的任务粒度是单个文件级别的操作。

超时设置:为每个任务设置合理的超时时间。过短的超时会导致复杂任务被误杀;过长的超时会导致失败的任务长时间占用 worker。建议根据任务复杂度设置不同的超时值,而非使用统一的全局超时。

重试策略:对于可重试的错误(网络超时、速率限制),建议设置 1-2 次重试。每次重试之间的间隔应采用指数退避策略(1 秒、2 秒、4 秒)。不要设置过多的重试次数,否则可能导致任务长时间阻塞 worker。

完整示例:并行化测试套件

以下是一个完整的实际示例,展示如何使用 PinableAgents 的并行执行引擎为一个 Go 项目的多个模块并行编写单元测试。

# 步骤 1:创建任务定义文件 parallel-tests.json
cat << 'EOF' > parallel-tests.json
{
  "parallel_tasks": {
    "max_workers": 3,
    "timeout": "30m",
    "fail_strategy": "continue",
    "worktree": true,
    "tasks": [
      {
        "id": "test_auth",
        "task": "为 auth 模块编写单元测试,覆盖登录、注册、密码重置",
        "files": ["internal/auth/auth.go", "internal/auth/token.go"],
        "depends_on": [],
        "backend": "claude"
      },
      {
        "id": "test_db",
        "task": "为数据库访问层编写单元测试,使用 mock 数据库",
        "files": ["internal/db/repository.go", "internal/db/migrations.go"],
        "depends_on": [],
        "backend": "codex"
      },
      {
        "id": "test_api",
        "task": "为 REST API 处理器编写单元测试,覆盖所有端点",
        "files": ["internal/api/handlers.go", "internal/api/middleware.go"],
        "depends_on": [],
        "backend": "gemini"
      },
      {
        "id": "test_e2e",
        "task": "编写端到端集成测试,验证完整的用户注册到登录流程",
        "files": ["internal/auth/auth.go", "internal/db/repository.go", "internal/api/handlers.go"],
        "depends_on": ["test_auth", "test_db", "test_api"],
        "backend": "claude"
      }
    ]
  }
}
EOF

# 步骤 2:执行并行任务
pinable-agents run parallel --config parallel-tests.json --progress

# 步骤 3:查看执行报告
pinable-agents report parallel --session last

在这个示例中,三个模块的单元测试(test_auth、test_db、test_api)分别使用不同的 AI 后端并行执行。集成测试(test_e2e)在所有单元测试完成后才开始。通过 worktree 隔离,三个并行任务各自在独立的工作目录中生成测试文件,完成后自动合并。