# Worker CLI Primitives Design **Status**: Draft (v3 - Nim implementation) **Bead**: skills-sse **Epic**: skills-s6y (Multi-agent orchestration: Lego brick architecture) **Language**: Nim (cligen, ORC) ## Changelog - **v3**: Nim implementation (cligen, tiny_sqlite, osproc) - **v2**: Fixed BLOCKERs from orch spec-review (approve command, conflict flow, source of truth, races, STALE semantics) ## Overview This document defines the CLI commands for multi-agent worker coordination. The CLI is the interface between human orchestrators, AI agents, and the underlying coordination infrastructure (SQLite bus, git worktrees, state machine). ## Design Decisions (from orch consensus) ### Strong Consensus (4/4 models) | Decision | Choice | Rationale | |----------|--------|-----------| | Spawn semantics | Prepare workspace only | Separation of concerns; debug before execute | | Implementation | **Nim CLI** | Single binary, fast startup, compiled | | CLI framework | **cligen** | Auto-generates from proc signatures | | Idempotency | All commands idempotent | Crash recovery, safe retries | | Worker ID = Task ID | Yes, 1:1 mapping | Simplifies mental model; add attempt IDs later if needed | | State source of truth | SQLite only | All state reads/writes via DB; JSON files are derived caches | | STALE detection | Computed for display | STALE is not a persistent state; computed from heartbeat age | ### Split Decision: Agent Discovery | Approach | Advocates | Trade-off | |----------|-----------|-----------| | Local file in worktree | Gemini, GPT | `cd worktree && cmd` just works; no args needed | | Explicit CLI arg | Qwen, Sonar | More explicit; no "magic" context | **Decision**: Hybrid - local file for context, but allow explicit `--task` override. ### Split Decision: Heartbeat Mechanism | Approach | Advocates | Trade-off | |----------|-----------|-----------| | Wrapper/supervisor | Gemini, GPT | Reliable; agent doesn't need to remember | | Explicit call | Qwen, Sonar | Simple; observable; agent controls timing | **Decision**: Start with explicit call; add wrapper as optional enhancement. ## Command Structure ``` worker [options] ``` ### Orchestrator Commands (Human Runs) | Command | Purpose | State Transition | |---------|---------|------------------| | `worker spawn ` | Create workspace | → ASSIGNED | | `worker status` | Dashboard of all workers | (read-only) | | `worker approve ` | Approve completed work | IN_REVIEW → APPROVED | | `worker request-changes ` | Request revisions | IN_REVIEW → WORKING | | `worker merge ` | Merge approved work | APPROVED → COMPLETED | | `worker cancel ` | Abort and cleanup (state only) | * → FAILED | ### Agent Commands (Worker Runs) | Command | Purpose | State Transition | |---------|---------|------------------| | `worker start` | Begin work | ASSIGNED → WORKING | | `worker done` | Complete work (includes rebase) | WORKING → IN_REVIEW | | `worker heartbeat` | Liveness signal | (no transition) | | `worker fail ` | Signal failure | WORKING → FAILED | ## Command Specifications ### `worker spawn ` Create a new worker workspace for a task. ```bash worker spawn skills-abc [--description "Fix auth bug"] [--from integration] ``` **Actions**: 1. Check task doesn't already exist (or return existing if idempotent) 2. Create branch `feat/` from `--from` (default: `origin/integration`) 3. Create worktree at `worktrees//` 4. Write context file `worktrees//.worker-ctx.json` 5. Insert `task_assign` message into bus.db 6. Create state file `.worker-state/workers/.json` **Context File** (`.worker-ctx.json`): ```json { "task_id": "skills-abc", "branch": "feat/skills-abc", "worktree": "worktrees/skills-abc", "created_at": "2026-01-10T15:00:00Z", "description": "Fix auth bug" } ``` **State File** (`.worker-state/workers/.json`): ```json { "task_id": "skills-abc", "state": "ASSIGNED", "branch": "feat/skills-abc", "assigned_at": "2026-01-10T15:00:00Z", "state_changed_at": "2026-01-10T15:00:00Z" } ``` **Idempotency**: If task exists and worktree exists, return success with existing path. **Output**: ``` Created worker: skills-abc Branch: feat/skills-abc Worktree: worktrees/skills-abc State: ASSIGNED ``` ### `worker status` Show dashboard of all workers. ```bash worker status [--state working] [--stale] [--json] ``` **Output**: ``` TASK STATE BRANCH LAST HEARTBEAT AGE skills-abc WORKING feat/skills-abc 2m ago 45m skills-xyz IN_REVIEW feat/skills-xyz -- 2h skills-123 STALE feat/skills-123 15m ago 3h ``` **Stale Detection**: Worker is STALE if: - State is ASSIGNED/WORKING and no heartbeat in 5 minutes - State is IN_REVIEW and no activity in 1 hour ### `worker start` Agent signals it's beginning work. Run from inside worktree. ```bash cd worktrees/skills-abc worker start ``` **Actions**: 1. Read task_id from `.worker-ctx.json` (or `--task` arg) 2. Verify current state is ASSIGNED 3. Insert `state_change` message (ASSIGNED → WORKING) 4. Update state file 5. Record first heartbeat **Idempotency**: If already WORKING, log warning and succeed. ### `worker done` Agent signals work is complete. Includes mandatory rebase. ```bash cd worktrees/skills-abc worker done [--skip-rebase] ``` **Actions**: 1. Read task_id from context 2. Verify current state is WORKING or CONFLICTED (for retry) 3. If not `--skip-rebase`: - Run `git fetch origin integration` - Run `git rebase origin/integration` - If conflict: **DO NOT ABORT** - leave rebase in progress, set state to CONFLICTED, exit with error 4. If `--skip-rebase`: verify no rebase in progress and branch is up-to-date 5. Run `git push -u origin feat/` 6. Insert `review_request` message 7. Transition to IN_REVIEW via `transition()` with compare-and-set **Rebase Failure** (rebase left in progress): ``` ERROR: Rebase conflict detected Conflicting files: src/auth.py State: CONFLICTED To resolve: 1. Fix conflicts in worktree 2. git add 3. git rebase --continue 4. worker done --skip-rebase ``` **State Guards**: - From WORKING: proceed with rebase - From CONFLICTED with `--skip-rebase`: proceed (assumes conflicts resolved) - From CONFLICTED without `--skip-rebase`: attempt rebase again ### `worker heartbeat` Emit liveness signal. Call periodically (every 10-30s). ```bash cd worktrees/skills-abc worker heartbeat [--status working] [--progress 0.5] ``` **Actions**: 1. Read task_id from context 2. Insert `heartbeat` message with timestamp 3. Update `last_heartbeat` in state file **Idempotency**: Always succeeds; updates timestamp. ### `worker approve ` Human/review-gate approves completed work. ```bash worker approve skills-abc [--by reviewer] [--comment "LGTM"] ``` **Actions**: 1. Verify state is IN_REVIEW 2. Transition to APPROVED via `transition()` 3. Insert `review_approved` message with reviewer and comment **Idempotency**: If already APPROVED, log and succeed. ### `worker request-changes ` Human/review-gate requests revisions. ```bash worker request-changes skills-abc [--comment "Fix error handling"] ``` **Actions**: 1. Verify state is IN_REVIEW 2. Transition to WORKING via `transition()` 3. Insert `changes_requested` message with comment **Idempotency**: If already WORKING, log and succeed. ### `worker merge ` Merge approved work to integration branch. ```bash worker merge skills-abc [--delete-branch] ``` **Actions** (with retry loop for contention): 1. Verify state is APPROVED 2. `git fetch origin integration feat/` 3. `git checkout integration && git reset --hard origin/integration` 4. Merge `feat/` with `--no-ff` 5. Push integration - If push fails (remote changed): go to step 2 (max 3 retries) - If merge conflict: transition APPROVED → WORKING, exit with error 6. Transition to COMPLETED via `transition()` 7. If `--delete-branch`: delete remote and local branch 8. Remove worktree via `git worktree remove` **Idempotency**: If already COMPLETED, log and succeed. **Merge Conflict** (rare - only if integration moved significantly): ``` ERROR: Merge conflict during integration State: WORKING (returned for revision) To resolve: 1. Rebase in worktree: git rebase origin/integration 2. worker done 3. Get re-approval ``` ### `worker kill ` Abort a worker and cleanup. ```bash worker kill skills-abc [--reason "Scope changed"] ``` **Actions**: 1. Insert `task_failed` message with reason 2. Update state to FAILED 3. Optionally remove worktree (with `--cleanup`) 4. Optionally archive branch (with `--archive`) ## State Transitions via CLI ``` spawn │ ▼ ┌──────────┐ ┌──────────┐ start ┌─────────┐ │ (none) │ ───► │ ASSIGNED │ ─────────────► │ WORKING │ └──────────┘ └──────────┘ └────┬────┘ │ ┌────────────────────────────┤ │ │ │ done (rebase ok) │ done (conflict) ▼ ▼ ┌───────────┐ ┌────────────┐ │ IN_REVIEW │ │ CONFLICTED │ └─────┬─────┘ └────────────┘ │ │ approve │ │ fix + done (external) │ │ ▼ │ ┌──────────┐ │ │ APPROVED │ ◄─────────────────────┘ └────┬─────┘ │ │ merge ▼ ┌───────────┐ │ COMPLETED │ └───────────┘ ``` ## Directory Structure ``` project/ ├── .worker-state/ │ ├── bus.db # SQLite message bus │ ├── bus.jsonl # Debug export │ └── workers/ │ ├── skills-abc.json # State file │ └── skills-xyz.json ├── worktrees/ # Git worktrees (gitignored) │ ├── skills-abc/ │ │ ├── .worker-ctx.json # Context for this worker │ │ └── (project files) │ └── skills-xyz/ └── .git/ ``` ## Implementation ### Nim Project Structure ``` src/ ├── libs/sqlite3.c # SQLite amalgamation (static linking) ├── worker.nim # CLI entry point (cligen dispatch) ├── worker/ │ ├── db.nim # SQLite operations (tiny_sqlite) │ ├── git.nim # Git/worktree operations (osproc) │ ├── state.nim # State machine logic │ ├── context.nim # Worker context handling │ ├── heartbeat.nim # Dedicated heartbeat thread │ └── types.nim # Shared types and constants ├── config.nims # Build configuration └── worker.nimble # Package definition ``` ### CLI Entry Point (cligen) ```nim import cligen import worker/[db, git, state, context, heartbeat] proc spawn(taskId: string, description = "", fromBranch = "origin/integration") = ## Create a new worker workspace let ctx = createWorker(taskId, description, fromBranch) echo "Created worker: ", taskId echo " Branch: ", ctx.branch echo " Worktree: ", ctx.worktree echo " State: ASSIGNED" proc status(state = "", stale = false, json = false, watch = false) = ## Show worker dashboard let workers = getAllWorkers() if json: echo toJson(workers) else: printStatusTable(workers, stale) proc start(task = "") = ## Signal ASSIGNED → WORKING (run from worktree) let taskId = if task != "": task else: readContext().taskId transition(taskId, "ASSIGNED", "WORKING") echo "Started work on ", taskId proc done(skipRebase = false) = ## Signal WORKING → IN_REVIEW (includes rebase) let ctx = readContext() if not skipRebase: let ok = rebaseOnIntegration(ctx) if not ok: transition(ctx.taskId, "WORKING", "CONFLICTED") quit("Rebase conflict - resolve and run: worker done --skip-rebase", 1) pushBranch(ctx) transition(ctx.taskId, "WORKING", "IN_REVIEW") echo "Ready for review: ", ctx.taskId proc approve(taskId: string, by = "", comment = "") = ## Approve completed work (IN_REVIEW → APPROVED) transition(taskId, "IN_REVIEW", "APPROVED") echo "Approved: ", taskId proc requestChanges(taskId: string, comment = "") = ## Request revisions (IN_REVIEW → WORKING) transition(taskId, "IN_REVIEW", "WORKING") echo "Changes requested: ", taskId proc merge(taskId: string, deleteBranch = false) = ## Merge approved work (APPROVED → COMPLETED) mergeToIntegration(taskId, deleteBranch) transition(taskId, "APPROVED", "COMPLETED") echo "Merged: ", taskId proc cancel(taskId: string, reason = "") = ## Abort a worker (* → FAILED) transitionToFailed(taskId, reason) echo "Cancelled: ", taskId when isMainModule: dispatchMulti( [spawn], [status], [start], [done], [approve], [requestChanges, cmdName = "request-changes"], [merge], [cancel] ) ``` ### State Machine Guards ```nim import std/[tables, strformat] import tiny_sqlite type WorkerState* = enum wsAssigned = "ASSIGNED" wsWorking = "WORKING" wsConflicted = "CONFLICTED" wsInReview = "IN_REVIEW" wsApproved = "APPROVED" wsCompleted = "COMPLETED" wsFailed = "FAILED" InvalidTransition* = object of CatchableError StaleState* = object of CatchableError # NOTE: STALE is NOT a persistent state - computed for display from heartbeat age const ValidTransitions* = { wsAssigned: @[wsWorking, wsFailed], wsWorking: @[wsInReview, wsConflicted, wsFailed], wsConflicted: @[wsInReview, wsWorking, wsFailed], wsInReview: @[wsApproved, wsWorking, wsFailed], wsApproved: @[wsCompleted, wsWorking, wsFailed], wsCompleted: @[], wsFailed: @[wsAssigned], # Retry }.toTable proc transition*(db: DbConn, taskId: string, fromState, toState: WorkerState) = ## Attempt state transition with compare-and-set guard if toState notin ValidTransitions[fromState]: raise newException(InvalidTransition, &"{fromState} → {toState}") db.exec("BEGIN IMMEDIATE") try: let current = db.value("SELECT state FROM workers WHERE task_id = ?", taskId).get.fromDbValue(string) if current != $fromState: raise newException(StaleState, &"Expected {fromState}, got {current}") db.exec("UPDATE workers SET state = ?, state_changed_at = ? WHERE task_id = ?", $toState, epochTime(), taskId) publishMessage(db, taskId, "state_change", %*{"from": $fromState, "to": $toState}) db.exec("COMMIT") except: db.exec("ROLLBACK") raise ``` ### Build Configuration ```nim # config.nims --mm:orc --threads:on -d:release --opt:size # Static SQLite switch("passC", "-DSQLITE_THREADSAFE=1") switch("compile", "libs/sqlite3.c") ``` ### Exit Codes ```nim const ExitSuccess* = 0 ExitUsageError* = 2 ExitInvalidTransition* = 3 ExitGitError* = 4 ExitDbError* = 5 ExitConflict* = 6 ``` ## Integration with Existing Components ### With Message Bus (skills-ms5) All state transitions publish messages: ```python db.publish_message(task_id, 'state_change', {'from': 'ASSIGNED', 'to': 'WORKING'}) ``` ### With review-gate (skills-byq) Review-gate reads worker state: ```python state = db.get_state(task_id) if state == 'IN_REVIEW': # Check review status ... ``` ### With Status Dashboard (skills-yak) Status command queries message bus: ```python workers = db.query(""" SELECT task_id, state, last_heartbeat FROM workers ORDER BY state_changed_at DESC """) ``` ## Open Questions 1. ~~**Approval flow**~~: Resolved - `worker approve` and `worker request-changes` commands added 2. **Agent wrapper**: Should `worker run ` wrap agent execution with heartbeats? 3. **Multiple attempts**: If task fails, `worker retry ` or new spawn? 4. **Integration branch**: Create on first spawn, or require it exists? ## Resolved from Spec Review | Issue | Resolution | |-------|------------| | Missing approve command | Added `worker approve` and `worker request-changes` | | Rebase conflict flow | Don't abort, leave in progress for resolution | | Source of truth conflict | SQLite only; JSON files are derived caches | | STALE as persistent state | Changed to computed for display from heartbeat age | | `done` from CONFLICTED | Now allowed with --skip-rebase | | Merge race condition | Added fetch-merge-push retry loop | | Spawn race condition | Use DB unique constraint (implementation detail) | ## References - State machine design: `docs/design/worker-state-machine.md` - Message passing: `docs/design/message-passing-layer.md` - Branch isolation: `docs/design/branch-per-worker.md`