skills/docs/design/worker-cli-primitives.md
dan 1c66d019bd feat: add worker CLI scaffold in Nim
Multi-agent coordination CLI with SQLite message bus:
- State machine: ASSIGNED -> WORKING -> IN_REVIEW -> APPROVED -> COMPLETED
- Commands: spawn, start, done, approve, merge, cancel, fail, heartbeat
- SQLite WAL mode, dedicated heartbeat thread, channel-based IPC
- cligen for CLI, tiny_sqlite for DB, ORC memory management

Design docs for branch-per-worker, state machine, message passing,
and human observability patterns.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-10 18:47:47 -08:00

568 lines
17 KiB
Markdown

# Worker CLI Primitives Design
**Status**: Draft (v3 - Nim implementation)
**Bead**: skills-sse
**Epic**: skills-s6y (Multi-agent orchestration: Lego brick architecture)
**Language**: Nim (cligen, ORC)
## Changelog
- **v3**: Nim implementation (cligen, tiny_sqlite, osproc)
- **v2**: Fixed BLOCKERs from orch spec-review (approve command, conflict flow, source of truth, races, STALE semantics)
## Overview
This document defines the CLI commands for multi-agent worker coordination. The CLI is the interface between human orchestrators, AI agents, and the underlying coordination infrastructure (SQLite bus, git worktrees, state machine).
## Design Decisions (from orch consensus)
### Strong Consensus (4/4 models)
| Decision | Choice | Rationale |
|----------|--------|-----------|
| Spawn semantics | Prepare workspace only | Separation of concerns; debug before execute |
| Implementation | **Nim CLI** | Single binary, fast startup, compiled |
| CLI framework | **cligen** | Auto-generates from proc signatures |
| Idempotency | All commands idempotent | Crash recovery, safe retries |
| Worker ID = Task ID | Yes, 1:1 mapping | Simplifies mental model; add attempt IDs later if needed |
| State source of truth | SQLite only | All state reads/writes via DB; JSON files are derived caches |
| STALE detection | Computed for display | STALE is not a persistent state; computed from heartbeat age |
### Split Decision: Agent Discovery
| Approach | Advocates | Trade-off |
|----------|-----------|-----------|
| Local file in worktree | Gemini, GPT | `cd worktree && cmd` just works; no args needed |
| Explicit CLI arg | Qwen, Sonar | More explicit; no "magic" context |
**Decision**: Hybrid - local file for context, but allow explicit `--task` override.
### Split Decision: Heartbeat Mechanism
| Approach | Advocates | Trade-off |
|----------|-----------|-----------|
| Wrapper/supervisor | Gemini, GPT | Reliable; agent doesn't need to remember |
| Explicit call | Qwen, Sonar | Simple; observable; agent controls timing |
**Decision**: Start with explicit call; add wrapper as optional enhancement.
## Command Structure
```
worker <command> [options]
```
### Orchestrator Commands (Human Runs)
| Command | Purpose | State Transition |
|---------|---------|------------------|
| `worker spawn <task-id>` | Create workspace | → ASSIGNED |
| `worker status` | Dashboard of all workers | (read-only) |
| `worker approve <task-id>` | Approve completed work | IN_REVIEW → APPROVED |
| `worker request-changes <task-id>` | Request revisions | IN_REVIEW → WORKING |
| `worker merge <task-id>` | Merge approved work | APPROVED → COMPLETED |
| `worker cancel <task-id>` | Abort and cleanup (state only) | * → FAILED |
### Agent Commands (Worker Runs)
| Command | Purpose | State Transition |
|---------|---------|------------------|
| `worker start` | Begin work | ASSIGNED → WORKING |
| `worker done` | Complete work (includes rebase) | WORKING → IN_REVIEW |
| `worker heartbeat` | Liveness signal | (no transition) |
| `worker fail <reason>` | Signal failure | WORKING → FAILED |
## Command Specifications
### `worker spawn <task-id>`
Create a new worker workspace for a task.
```bash
worker spawn skills-abc [--description "Fix auth bug"] [--from integration]
```
**Actions**:
1. Check task doesn't already exist (or return existing if idempotent)
2. Create branch `feat/<task-id>` from `--from` (default: `origin/integration`)
3. Create worktree at `worktrees/<task-id>/`
4. Write context file `worktrees/<task-id>/.worker-ctx.json`
5. Insert `task_assign` message into bus.db
6. Create state file `.worker-state/workers/<task-id>.json`
**Context File** (`.worker-ctx.json`):
```json
{
"task_id": "skills-abc",
"branch": "feat/skills-abc",
"worktree": "worktrees/skills-abc",
"created_at": "2026-01-10T15:00:00Z",
"description": "Fix auth bug"
}
```
**State File** (`.worker-state/workers/<task-id>.json`):
```json
{
"task_id": "skills-abc",
"state": "ASSIGNED",
"branch": "feat/skills-abc",
"assigned_at": "2026-01-10T15:00:00Z",
"state_changed_at": "2026-01-10T15:00:00Z"
}
```
**Idempotency**: If task exists and worktree exists, return success with existing path.
**Output**:
```
Created worker: skills-abc
Branch: feat/skills-abc
Worktree: worktrees/skills-abc
State: ASSIGNED
```
### `worker status`
Show dashboard of all workers.
```bash
worker status [--state working] [--stale] [--json]
```
**Output**:
```
TASK STATE BRANCH LAST HEARTBEAT AGE
skills-abc WORKING feat/skills-abc 2m ago 45m
skills-xyz IN_REVIEW feat/skills-xyz -- 2h
skills-123 STALE feat/skills-123 15m ago 3h
```
**Stale Detection**: Worker is STALE if:
- State is ASSIGNED/WORKING and no heartbeat in 5 minutes
- State is IN_REVIEW and no activity in 1 hour
### `worker start`
Agent signals it's beginning work. Run from inside worktree.
```bash
cd worktrees/skills-abc
worker start
```
**Actions**:
1. Read task_id from `.worker-ctx.json` (or `--task` arg)
2. Verify current state is ASSIGNED
3. Insert `state_change` message (ASSIGNED → WORKING)
4. Update state file
5. Record first heartbeat
**Idempotency**: If already WORKING, log warning and succeed.
### `worker done`
Agent signals work is complete. Includes mandatory rebase.
```bash
cd worktrees/skills-abc
worker done [--skip-rebase]
```
**Actions**:
1. Read task_id from context
2. Verify current state is WORKING or CONFLICTED (for retry)
3. If not `--skip-rebase`:
- Run `git fetch origin integration`
- Run `git rebase origin/integration`
- If conflict: **DO NOT ABORT** - leave rebase in progress, set state to CONFLICTED, exit with error
4. If `--skip-rebase`: verify no rebase in progress and branch is up-to-date
5. Run `git push -u origin feat/<task-id>`
6. Insert `review_request` message
7. Transition to IN_REVIEW via `transition()` with compare-and-set
**Rebase Failure** (rebase left in progress):
```
ERROR: Rebase conflict detected
Conflicting files: src/auth.py
State: CONFLICTED
To resolve:
1. Fix conflicts in worktree
2. git add <files>
3. git rebase --continue
4. worker done --skip-rebase
```
**State Guards**:
- From WORKING: proceed with rebase
- From CONFLICTED with `--skip-rebase`: proceed (assumes conflicts resolved)
- From CONFLICTED without `--skip-rebase`: attempt rebase again
### `worker heartbeat`
Emit liveness signal. Call periodically (every 10-30s).
```bash
cd worktrees/skills-abc
worker heartbeat [--status working] [--progress 0.5]
```
**Actions**:
1. Read task_id from context
2. Insert `heartbeat` message with timestamp
3. Update `last_heartbeat` in state file
**Idempotency**: Always succeeds; updates timestamp.
### `worker approve <task-id>`
Human/review-gate approves completed work.
```bash
worker approve skills-abc [--by reviewer] [--comment "LGTM"]
```
**Actions**:
1. Verify state is IN_REVIEW
2. Transition to APPROVED via `transition()`
3. Insert `review_approved` message with reviewer and comment
**Idempotency**: If already APPROVED, log and succeed.
### `worker request-changes <task-id>`
Human/review-gate requests revisions.
```bash
worker request-changes skills-abc [--comment "Fix error handling"]
```
**Actions**:
1. Verify state is IN_REVIEW
2. Transition to WORKING via `transition()`
3. Insert `changes_requested` message with comment
**Idempotency**: If already WORKING, log and succeed.
### `worker merge <task-id>`
Merge approved work to integration branch.
```bash
worker merge skills-abc [--delete-branch]
```
**Actions** (with retry loop for contention):
1. Verify state is APPROVED
2. `git fetch origin integration feat/<task-id>`
3. `git checkout integration && git reset --hard origin/integration`
4. Merge `feat/<task-id>` with `--no-ff`
5. Push integration
- If push fails (remote changed): go to step 2 (max 3 retries)
- If merge conflict: transition APPROVED → WORKING, exit with error
6. Transition to COMPLETED via `transition()`
7. If `--delete-branch`: delete remote and local branch
8. Remove worktree via `git worktree remove`
**Idempotency**: If already COMPLETED, log and succeed.
**Merge Conflict** (rare - only if integration moved significantly):
```
ERROR: Merge conflict during integration
State: WORKING (returned for revision)
To resolve:
1. Rebase in worktree: git rebase origin/integration
2. worker done
3. Get re-approval
```
### `worker kill <task-id>`
Abort a worker and cleanup.
```bash
worker kill skills-abc [--reason "Scope changed"]
```
**Actions**:
1. Insert `task_failed` message with reason
2. Update state to FAILED
3. Optionally remove worktree (with `--cleanup`)
4. Optionally archive branch (with `--archive`)
## State Transitions via CLI
```
spawn
┌──────────┐ ┌──────────┐ start ┌─────────┐
│ (none) │ ───► │ ASSIGNED │ ─────────────► │ WORKING │
└──────────┘ └──────────┘ └────┬────┘
┌────────────────────────────┤
│ │
│ done (rebase ok) │ done (conflict)
▼ ▼
┌───────────┐ ┌────────────┐
│ IN_REVIEW │ │ CONFLICTED │
└─────┬─────┘ └────────────┘
│ │
approve │ │ fix + done
(external) │ │
▼ │
┌──────────┐ │
│ APPROVED │ ◄─────────────────────┘
└────┬─────┘
│ merge
┌───────────┐
│ COMPLETED │
└───────────┘
```
## Directory Structure
```
project/
├── .worker-state/
│ ├── bus.db # SQLite message bus
│ ├── bus.jsonl # Debug export
│ └── workers/
│ ├── skills-abc.json # State file
│ └── skills-xyz.json
├── worktrees/ # Git worktrees (gitignored)
│ ├── skills-abc/
│ │ ├── .worker-ctx.json # Context for this worker
│ │ └── (project files)
│ └── skills-xyz/
└── .git/
```
## Implementation
### Nim Project Structure
```
src/
├── libs/sqlite3.c # SQLite amalgamation (static linking)
├── worker.nim # CLI entry point (cligen dispatch)
├── worker/
│ ├── db.nim # SQLite operations (tiny_sqlite)
│ ├── git.nim # Git/worktree operations (osproc)
│ ├── state.nim # State machine logic
│ ├── context.nim # Worker context handling
│ ├── heartbeat.nim # Dedicated heartbeat thread
│ └── types.nim # Shared types and constants
├── config.nims # Build configuration
└── worker.nimble # Package definition
```
### CLI Entry Point (cligen)
```nim
import cligen
import worker/[db, git, state, context, heartbeat]
proc spawn(taskId: string, description = "", fromBranch = "origin/integration") =
## Create a new worker workspace
let ctx = createWorker(taskId, description, fromBranch)
echo "Created worker: ", taskId
echo " Branch: ", ctx.branch
echo " Worktree: ", ctx.worktree
echo " State: ASSIGNED"
proc status(state = "", stale = false, json = false, watch = false) =
## Show worker dashboard
let workers = getAllWorkers()
if json:
echo toJson(workers)
else:
printStatusTable(workers, stale)
proc start(task = "") =
## Signal ASSIGNED → WORKING (run from worktree)
let taskId = if task != "": task else: readContext().taskId
transition(taskId, "ASSIGNED", "WORKING")
echo "Started work on ", taskId
proc done(skipRebase = false) =
## Signal WORKING → IN_REVIEW (includes rebase)
let ctx = readContext()
if not skipRebase:
let ok = rebaseOnIntegration(ctx)
if not ok:
transition(ctx.taskId, "WORKING", "CONFLICTED")
quit("Rebase conflict - resolve and run: worker done --skip-rebase", 1)
pushBranch(ctx)
transition(ctx.taskId, "WORKING", "IN_REVIEW")
echo "Ready for review: ", ctx.taskId
proc approve(taskId: string, by = "", comment = "") =
## Approve completed work (IN_REVIEW → APPROVED)
transition(taskId, "IN_REVIEW", "APPROVED")
echo "Approved: ", taskId
proc requestChanges(taskId: string, comment = "") =
## Request revisions (IN_REVIEW → WORKING)
transition(taskId, "IN_REVIEW", "WORKING")
echo "Changes requested: ", taskId
proc merge(taskId: string, deleteBranch = false) =
## Merge approved work (APPROVED → COMPLETED)
mergeToIntegration(taskId, deleteBranch)
transition(taskId, "APPROVED", "COMPLETED")
echo "Merged: ", taskId
proc cancel(taskId: string, reason = "") =
## Abort a worker (* → FAILED)
transitionToFailed(taskId, reason)
echo "Cancelled: ", taskId
when isMainModule:
dispatchMulti(
[spawn],
[status],
[start],
[done],
[approve],
[requestChanges, cmdName = "request-changes"],
[merge],
[cancel]
)
```
### State Machine Guards
```nim
import std/[tables, strformat]
import tiny_sqlite
type
WorkerState* = enum
wsAssigned = "ASSIGNED"
wsWorking = "WORKING"
wsConflicted = "CONFLICTED"
wsInReview = "IN_REVIEW"
wsApproved = "APPROVED"
wsCompleted = "COMPLETED"
wsFailed = "FAILED"
InvalidTransition* = object of CatchableError
StaleState* = object of CatchableError
# NOTE: STALE is NOT a persistent state - computed for display from heartbeat age
const ValidTransitions* = {
wsAssigned: @[wsWorking, wsFailed],
wsWorking: @[wsInReview, wsConflicted, wsFailed],
wsConflicted: @[wsInReview, wsWorking, wsFailed],
wsInReview: @[wsApproved, wsWorking, wsFailed],
wsApproved: @[wsCompleted, wsWorking, wsFailed],
wsCompleted: @[],
wsFailed: @[wsAssigned], # Retry
}.toTable
proc transition*(db: DbConn, taskId: string, fromState, toState: WorkerState) =
## Attempt state transition with compare-and-set guard
if toState notin ValidTransitions[fromState]:
raise newException(InvalidTransition, &"{fromState} → {toState}")
db.exec("BEGIN IMMEDIATE")
try:
let current = db.value("SELECT state FROM workers WHERE task_id = ?", taskId).get.fromDbValue(string)
if current != $fromState:
raise newException(StaleState, &"Expected {fromState}, got {current}")
db.exec("UPDATE workers SET state = ?, state_changed_at = ? WHERE task_id = ?",
$toState, epochTime(), taskId)
publishMessage(db, taskId, "state_change", %*{"from": $fromState, "to": $toState})
db.exec("COMMIT")
except:
db.exec("ROLLBACK")
raise
```
### Build Configuration
```nim
# config.nims
--mm:orc
--threads:on
-d:release
--opt:size
# Static SQLite
switch("passC", "-DSQLITE_THREADSAFE=1")
switch("compile", "libs/sqlite3.c")
```
### Exit Codes
```nim
const
ExitSuccess* = 0
ExitUsageError* = 2
ExitInvalidTransition* = 3
ExitGitError* = 4
ExitDbError* = 5
ExitConflict* = 6
```
## Integration with Existing Components
### With Message Bus (skills-ms5)
All state transitions publish messages:
```python
db.publish_message(task_id, 'state_change', {'from': 'ASSIGNED', 'to': 'WORKING'})
```
### With review-gate (skills-byq)
Review-gate reads worker state:
```python
state = db.get_state(task_id)
if state == 'IN_REVIEW':
# Check review status
...
```
### With Status Dashboard (skills-yak)
Status command queries message bus:
```python
workers = db.query("""
SELECT task_id, state, last_heartbeat
FROM workers
ORDER BY state_changed_at DESC
""")
```
## Open Questions
1. ~~**Approval flow**~~: Resolved - `worker approve` and `worker request-changes` commands added
2. **Agent wrapper**: Should `worker run <agent-cmd>` wrap agent execution with heartbeats?
3. **Multiple attempts**: If task fails, `worker retry <task-id>` or new spawn?
4. **Integration branch**: Create on first spawn, or require it exists?
## Resolved from Spec Review
| Issue | Resolution |
|-------|------------|
| Missing approve command | Added `worker approve` and `worker request-changes` |
| Rebase conflict flow | Don't abort, leave in progress for resolution |
| Source of truth conflict | SQLite only; JSON files are derived caches |
| STALE as persistent state | Changed to computed for display from heartbeat age |
| `done` from CONFLICTED | Now allowed with --skip-rebase |
| Merge race condition | Added fetch-merge-push retry loop |
| Spawn race condition | Use DB unique constraint (implementation detail) |
## References
- State machine design: `docs/design/worker-state-machine.md`
- Message passing: `docs/design/message-passing-layer.md`
- Branch isolation: `docs/design/branch-per-worker.md`