Multi-agent coordination CLI with SQLite message bus: - State machine: ASSIGNED -> WORKING -> IN_REVIEW -> APPROVED -> COMPLETED - Commands: spawn, start, done, approve, merge, cancel, fail, heartbeat - SQLite WAL mode, dedicated heartbeat thread, channel-based IPC - cligen for CLI, tiny_sqlite for DB, ORC memory management Design docs for branch-per-worker, state machine, message passing, and human observability patterns. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
568 lines
17 KiB
Markdown
568 lines
17 KiB
Markdown
# Worker CLI Primitives Design
|
|
|
|
**Status**: Draft (v3 - Nim implementation)
|
|
**Bead**: skills-sse
|
|
**Epic**: skills-s6y (Multi-agent orchestration: Lego brick architecture)
|
|
**Language**: Nim (cligen, ORC)
|
|
|
|
## Changelog
|
|
|
|
- **v3**: Nim implementation (cligen, tiny_sqlite, osproc)
|
|
- **v2**: Fixed BLOCKERs from orch spec-review (approve command, conflict flow, source of truth, races, STALE semantics)
|
|
|
|
## Overview
|
|
|
|
This document defines the CLI commands for multi-agent worker coordination. The CLI is the interface between human orchestrators, AI agents, and the underlying coordination infrastructure (SQLite bus, git worktrees, state machine).
|
|
|
|
## Design Decisions (from orch consensus)
|
|
|
|
### Strong Consensus (4/4 models)
|
|
|
|
| Decision | Choice | Rationale |
|
|
|----------|--------|-----------|
|
|
| Spawn semantics | Prepare workspace only | Separation of concerns; debug before execute |
|
|
| Implementation | **Nim CLI** | Single binary, fast startup, compiled |
|
|
| CLI framework | **cligen** | Auto-generates from proc signatures |
|
|
| Idempotency | All commands idempotent | Crash recovery, safe retries |
|
|
| Worker ID = Task ID | Yes, 1:1 mapping | Simplifies mental model; add attempt IDs later if needed |
|
|
| State source of truth | SQLite only | All state reads/writes via DB; JSON files are derived caches |
|
|
| STALE detection | Computed for display | STALE is not a persistent state; computed from heartbeat age |
|
|
|
|
### Split Decision: Agent Discovery
|
|
|
|
| Approach | Advocates | Trade-off |
|
|
|----------|-----------|-----------|
|
|
| Local file in worktree | Gemini, GPT | `cd worktree && cmd` just works; no args needed |
|
|
| Explicit CLI arg | Qwen, Sonar | More explicit; no "magic" context |
|
|
|
|
**Decision**: Hybrid - local file for context, but allow explicit `--task` override.
|
|
|
|
### Split Decision: Heartbeat Mechanism
|
|
|
|
| Approach | Advocates | Trade-off |
|
|
|----------|-----------|-----------|
|
|
| Wrapper/supervisor | Gemini, GPT | Reliable; agent doesn't need to remember |
|
|
| Explicit call | Qwen, Sonar | Simple; observable; agent controls timing |
|
|
|
|
**Decision**: Start with explicit call; add wrapper as optional enhancement.
|
|
|
|
## Command Structure
|
|
|
|
```
|
|
worker <command> [options]
|
|
```
|
|
|
|
### Orchestrator Commands (Human Runs)
|
|
|
|
| Command | Purpose | State Transition |
|
|
|---------|---------|------------------|
|
|
| `worker spawn <task-id>` | Create workspace | → ASSIGNED |
|
|
| `worker status` | Dashboard of all workers | (read-only) |
|
|
| `worker approve <task-id>` | Approve completed work | IN_REVIEW → APPROVED |
|
|
| `worker request-changes <task-id>` | Request revisions | IN_REVIEW → WORKING |
|
|
| `worker merge <task-id>` | Merge approved work | APPROVED → COMPLETED |
|
|
| `worker cancel <task-id>` | Abort and cleanup (state only) | * → FAILED |
|
|
|
|
### Agent Commands (Worker Runs)
|
|
|
|
| Command | Purpose | State Transition |
|
|
|---------|---------|------------------|
|
|
| `worker start` | Begin work | ASSIGNED → WORKING |
|
|
| `worker done` | Complete work (includes rebase) | WORKING → IN_REVIEW |
|
|
| `worker heartbeat` | Liveness signal | (no transition) |
|
|
| `worker fail <reason>` | Signal failure | WORKING → FAILED |
|
|
|
|
## Command Specifications
|
|
|
|
### `worker spawn <task-id>`
|
|
|
|
Create a new worker workspace for a task.
|
|
|
|
```bash
|
|
worker spawn skills-abc [--description "Fix auth bug"] [--from integration]
|
|
```
|
|
|
|
**Actions**:
|
|
1. Check task doesn't already exist (or return existing if idempotent)
|
|
2. Create branch `feat/<task-id>` from `--from` (default: `origin/integration`)
|
|
3. Create worktree at `worktrees/<task-id>/`
|
|
4. Write context file `worktrees/<task-id>/.worker-ctx.json`
|
|
5. Insert `task_assign` message into bus.db
|
|
6. Create state file `.worker-state/workers/<task-id>.json`
|
|
|
|
**Context File** (`.worker-ctx.json`):
|
|
```json
|
|
{
|
|
"task_id": "skills-abc",
|
|
"branch": "feat/skills-abc",
|
|
"worktree": "worktrees/skills-abc",
|
|
"created_at": "2026-01-10T15:00:00Z",
|
|
"description": "Fix auth bug"
|
|
}
|
|
```
|
|
|
|
**State File** (`.worker-state/workers/<task-id>.json`):
|
|
```json
|
|
{
|
|
"task_id": "skills-abc",
|
|
"state": "ASSIGNED",
|
|
"branch": "feat/skills-abc",
|
|
"assigned_at": "2026-01-10T15:00:00Z",
|
|
"state_changed_at": "2026-01-10T15:00:00Z"
|
|
}
|
|
```
|
|
|
|
**Idempotency**: If task exists and worktree exists, return success with existing path.
|
|
|
|
**Output**:
|
|
```
|
|
Created worker: skills-abc
|
|
Branch: feat/skills-abc
|
|
Worktree: worktrees/skills-abc
|
|
State: ASSIGNED
|
|
```
|
|
|
|
### `worker status`
|
|
|
|
Show dashboard of all workers.
|
|
|
|
```bash
|
|
worker status [--state working] [--stale] [--json]
|
|
```
|
|
|
|
**Output**:
|
|
```
|
|
TASK STATE BRANCH LAST HEARTBEAT AGE
|
|
skills-abc WORKING feat/skills-abc 2m ago 45m
|
|
skills-xyz IN_REVIEW feat/skills-xyz -- 2h
|
|
skills-123 STALE feat/skills-123 15m ago 3h
|
|
```
|
|
|
|
**Stale Detection**: Worker is STALE if:
|
|
- State is ASSIGNED/WORKING and no heartbeat in 5 minutes
|
|
- State is IN_REVIEW and no activity in 1 hour
|
|
|
|
### `worker start`
|
|
|
|
Agent signals it's beginning work. Run from inside worktree.
|
|
|
|
```bash
|
|
cd worktrees/skills-abc
|
|
worker start
|
|
```
|
|
|
|
**Actions**:
|
|
1. Read task_id from `.worker-ctx.json` (or `--task` arg)
|
|
2. Verify current state is ASSIGNED
|
|
3. Insert `state_change` message (ASSIGNED → WORKING)
|
|
4. Update state file
|
|
5. Record first heartbeat
|
|
|
|
**Idempotency**: If already WORKING, log warning and succeed.
|
|
|
|
### `worker done`
|
|
|
|
Agent signals work is complete. Includes mandatory rebase.
|
|
|
|
```bash
|
|
cd worktrees/skills-abc
|
|
worker done [--skip-rebase]
|
|
```
|
|
|
|
**Actions**:
|
|
1. Read task_id from context
|
|
2. Verify current state is WORKING or CONFLICTED (for retry)
|
|
3. If not `--skip-rebase`:
|
|
- Run `git fetch origin integration`
|
|
- Run `git rebase origin/integration`
|
|
- If conflict: **DO NOT ABORT** - leave rebase in progress, set state to CONFLICTED, exit with error
|
|
4. If `--skip-rebase`: verify no rebase in progress and branch is up-to-date
|
|
5. Run `git push -u origin feat/<task-id>`
|
|
6. Insert `review_request` message
|
|
7. Transition to IN_REVIEW via `transition()` with compare-and-set
|
|
|
|
**Rebase Failure** (rebase left in progress):
|
|
```
|
|
ERROR: Rebase conflict detected
|
|
Conflicting files: src/auth.py
|
|
State: CONFLICTED
|
|
|
|
To resolve:
|
|
1. Fix conflicts in worktree
|
|
2. git add <files>
|
|
3. git rebase --continue
|
|
4. worker done --skip-rebase
|
|
```
|
|
|
|
**State Guards**:
|
|
- From WORKING: proceed with rebase
|
|
- From CONFLICTED with `--skip-rebase`: proceed (assumes conflicts resolved)
|
|
- From CONFLICTED without `--skip-rebase`: attempt rebase again
|
|
|
|
### `worker heartbeat`
|
|
|
|
Emit liveness signal. Call periodically (every 10-30s).
|
|
|
|
```bash
|
|
cd worktrees/skills-abc
|
|
worker heartbeat [--status working] [--progress 0.5]
|
|
```
|
|
|
|
**Actions**:
|
|
1. Read task_id from context
|
|
2. Insert `heartbeat` message with timestamp
|
|
3. Update `last_heartbeat` in state file
|
|
|
|
**Idempotency**: Always succeeds; updates timestamp.
|
|
|
|
### `worker approve <task-id>`
|
|
|
|
Human/review-gate approves completed work.
|
|
|
|
```bash
|
|
worker approve skills-abc [--by reviewer] [--comment "LGTM"]
|
|
```
|
|
|
|
**Actions**:
|
|
1. Verify state is IN_REVIEW
|
|
2. Transition to APPROVED via `transition()`
|
|
3. Insert `review_approved` message with reviewer and comment
|
|
|
|
**Idempotency**: If already APPROVED, log and succeed.
|
|
|
|
### `worker request-changes <task-id>`
|
|
|
|
Human/review-gate requests revisions.
|
|
|
|
```bash
|
|
worker request-changes skills-abc [--comment "Fix error handling"]
|
|
```
|
|
|
|
**Actions**:
|
|
1. Verify state is IN_REVIEW
|
|
2. Transition to WORKING via `transition()`
|
|
3. Insert `changes_requested` message with comment
|
|
|
|
**Idempotency**: If already WORKING, log and succeed.
|
|
|
|
### `worker merge <task-id>`
|
|
|
|
Merge approved work to integration branch.
|
|
|
|
```bash
|
|
worker merge skills-abc [--delete-branch]
|
|
```
|
|
|
|
**Actions** (with retry loop for contention):
|
|
1. Verify state is APPROVED
|
|
2. `git fetch origin integration feat/<task-id>`
|
|
3. `git checkout integration && git reset --hard origin/integration`
|
|
4. Merge `feat/<task-id>` with `--no-ff`
|
|
5. Push integration
|
|
- If push fails (remote changed): go to step 2 (max 3 retries)
|
|
- If merge conflict: transition APPROVED → WORKING, exit with error
|
|
6. Transition to COMPLETED via `transition()`
|
|
7. If `--delete-branch`: delete remote and local branch
|
|
8. Remove worktree via `git worktree remove`
|
|
|
|
**Idempotency**: If already COMPLETED, log and succeed.
|
|
|
|
**Merge Conflict** (rare - only if integration moved significantly):
|
|
```
|
|
ERROR: Merge conflict during integration
|
|
State: WORKING (returned for revision)
|
|
|
|
To resolve:
|
|
1. Rebase in worktree: git rebase origin/integration
|
|
2. worker done
|
|
3. Get re-approval
|
|
```
|
|
|
|
### `worker kill <task-id>`
|
|
|
|
Abort a worker and cleanup.
|
|
|
|
```bash
|
|
worker kill skills-abc [--reason "Scope changed"]
|
|
```
|
|
|
|
**Actions**:
|
|
1. Insert `task_failed` message with reason
|
|
2. Update state to FAILED
|
|
3. Optionally remove worktree (with `--cleanup`)
|
|
4. Optionally archive branch (with `--archive`)
|
|
|
|
## State Transitions via CLI
|
|
|
|
```
|
|
spawn
|
|
│
|
|
▼
|
|
┌──────────┐ ┌──────────┐ start ┌─────────┐
|
|
│ (none) │ ───► │ ASSIGNED │ ─────────────► │ WORKING │
|
|
└──────────┘ └──────────┘ └────┬────┘
|
|
│
|
|
┌────────────────────────────┤
|
|
│ │
|
|
│ done (rebase ok) │ done (conflict)
|
|
▼ ▼
|
|
┌───────────┐ ┌────────────┐
|
|
│ IN_REVIEW │ │ CONFLICTED │
|
|
└─────┬─────┘ └────────────┘
|
|
│ │
|
|
approve │ │ fix + done
|
|
(external) │ │
|
|
▼ │
|
|
┌──────────┐ │
|
|
│ APPROVED │ ◄─────────────────────┘
|
|
└────┬─────┘
|
|
│
|
|
│ merge
|
|
▼
|
|
┌───────────┐
|
|
│ COMPLETED │
|
|
└───────────┘
|
|
```
|
|
|
|
## Directory Structure
|
|
|
|
```
|
|
project/
|
|
├── .worker-state/
|
|
│ ├── bus.db # SQLite message bus
|
|
│ ├── bus.jsonl # Debug export
|
|
│ └── workers/
|
|
│ ├── skills-abc.json # State file
|
|
│ └── skills-xyz.json
|
|
├── worktrees/ # Git worktrees (gitignored)
|
|
│ ├── skills-abc/
|
|
│ │ ├── .worker-ctx.json # Context for this worker
|
|
│ │ └── (project files)
|
|
│ └── skills-xyz/
|
|
└── .git/
|
|
```
|
|
|
|
## Implementation
|
|
|
|
### Nim Project Structure
|
|
|
|
```
|
|
src/
|
|
├── libs/sqlite3.c # SQLite amalgamation (static linking)
|
|
├── worker.nim # CLI entry point (cligen dispatch)
|
|
├── worker/
|
|
│ ├── db.nim # SQLite operations (tiny_sqlite)
|
|
│ ├── git.nim # Git/worktree operations (osproc)
|
|
│ ├── state.nim # State machine logic
|
|
│ ├── context.nim # Worker context handling
|
|
│ ├── heartbeat.nim # Dedicated heartbeat thread
|
|
│ └── types.nim # Shared types and constants
|
|
├── config.nims # Build configuration
|
|
└── worker.nimble # Package definition
|
|
```
|
|
|
|
### CLI Entry Point (cligen)
|
|
|
|
```nim
|
|
import cligen
|
|
import worker/[db, git, state, context, heartbeat]
|
|
|
|
proc spawn(taskId: string, description = "", fromBranch = "origin/integration") =
|
|
## Create a new worker workspace
|
|
let ctx = createWorker(taskId, description, fromBranch)
|
|
echo "Created worker: ", taskId
|
|
echo " Branch: ", ctx.branch
|
|
echo " Worktree: ", ctx.worktree
|
|
echo " State: ASSIGNED"
|
|
|
|
proc status(state = "", stale = false, json = false, watch = false) =
|
|
## Show worker dashboard
|
|
let workers = getAllWorkers()
|
|
if json:
|
|
echo toJson(workers)
|
|
else:
|
|
printStatusTable(workers, stale)
|
|
|
|
proc start(task = "") =
|
|
## Signal ASSIGNED → WORKING (run from worktree)
|
|
let taskId = if task != "": task else: readContext().taskId
|
|
transition(taskId, "ASSIGNED", "WORKING")
|
|
echo "Started work on ", taskId
|
|
|
|
proc done(skipRebase = false) =
|
|
## Signal WORKING → IN_REVIEW (includes rebase)
|
|
let ctx = readContext()
|
|
if not skipRebase:
|
|
let ok = rebaseOnIntegration(ctx)
|
|
if not ok:
|
|
transition(ctx.taskId, "WORKING", "CONFLICTED")
|
|
quit("Rebase conflict - resolve and run: worker done --skip-rebase", 1)
|
|
pushBranch(ctx)
|
|
transition(ctx.taskId, "WORKING", "IN_REVIEW")
|
|
echo "Ready for review: ", ctx.taskId
|
|
|
|
proc approve(taskId: string, by = "", comment = "") =
|
|
## Approve completed work (IN_REVIEW → APPROVED)
|
|
transition(taskId, "IN_REVIEW", "APPROVED")
|
|
echo "Approved: ", taskId
|
|
|
|
proc requestChanges(taskId: string, comment = "") =
|
|
## Request revisions (IN_REVIEW → WORKING)
|
|
transition(taskId, "IN_REVIEW", "WORKING")
|
|
echo "Changes requested: ", taskId
|
|
|
|
proc merge(taskId: string, deleteBranch = false) =
|
|
## Merge approved work (APPROVED → COMPLETED)
|
|
mergeToIntegration(taskId, deleteBranch)
|
|
transition(taskId, "APPROVED", "COMPLETED")
|
|
echo "Merged: ", taskId
|
|
|
|
proc cancel(taskId: string, reason = "") =
|
|
## Abort a worker (* → FAILED)
|
|
transitionToFailed(taskId, reason)
|
|
echo "Cancelled: ", taskId
|
|
|
|
when isMainModule:
|
|
dispatchMulti(
|
|
[spawn],
|
|
[status],
|
|
[start],
|
|
[done],
|
|
[approve],
|
|
[requestChanges, cmdName = "request-changes"],
|
|
[merge],
|
|
[cancel]
|
|
)
|
|
```
|
|
|
|
### State Machine Guards
|
|
|
|
```nim
|
|
import std/[tables, strformat]
|
|
import tiny_sqlite
|
|
|
|
type
|
|
WorkerState* = enum
|
|
wsAssigned = "ASSIGNED"
|
|
wsWorking = "WORKING"
|
|
wsConflicted = "CONFLICTED"
|
|
wsInReview = "IN_REVIEW"
|
|
wsApproved = "APPROVED"
|
|
wsCompleted = "COMPLETED"
|
|
wsFailed = "FAILED"
|
|
|
|
InvalidTransition* = object of CatchableError
|
|
StaleState* = object of CatchableError
|
|
|
|
# NOTE: STALE is NOT a persistent state - computed for display from heartbeat age
|
|
const ValidTransitions* = {
|
|
wsAssigned: @[wsWorking, wsFailed],
|
|
wsWorking: @[wsInReview, wsConflicted, wsFailed],
|
|
wsConflicted: @[wsInReview, wsWorking, wsFailed],
|
|
wsInReview: @[wsApproved, wsWorking, wsFailed],
|
|
wsApproved: @[wsCompleted, wsWorking, wsFailed],
|
|
wsCompleted: @[],
|
|
wsFailed: @[wsAssigned], # Retry
|
|
}.toTable
|
|
|
|
proc transition*(db: DbConn, taskId: string, fromState, toState: WorkerState) =
|
|
## Attempt state transition with compare-and-set guard
|
|
if toState notin ValidTransitions[fromState]:
|
|
raise newException(InvalidTransition, &"{fromState} → {toState}")
|
|
|
|
db.exec("BEGIN IMMEDIATE")
|
|
try:
|
|
let current = db.value("SELECT state FROM workers WHERE task_id = ?", taskId).get.fromDbValue(string)
|
|
if current != $fromState:
|
|
raise newException(StaleState, &"Expected {fromState}, got {current}")
|
|
|
|
db.exec("UPDATE workers SET state = ?, state_changed_at = ? WHERE task_id = ?",
|
|
$toState, epochTime(), taskId)
|
|
publishMessage(db, taskId, "state_change", %*{"from": $fromState, "to": $toState})
|
|
db.exec("COMMIT")
|
|
except:
|
|
db.exec("ROLLBACK")
|
|
raise
|
|
```
|
|
|
|
### Build Configuration
|
|
|
|
```nim
|
|
# config.nims
|
|
--mm:orc
|
|
--threads:on
|
|
-d:release
|
|
--opt:size
|
|
|
|
# Static SQLite
|
|
switch("passC", "-DSQLITE_THREADSAFE=1")
|
|
switch("compile", "libs/sqlite3.c")
|
|
```
|
|
|
|
### Exit Codes
|
|
|
|
```nim
|
|
const
|
|
ExitSuccess* = 0
|
|
ExitUsageError* = 2
|
|
ExitInvalidTransition* = 3
|
|
ExitGitError* = 4
|
|
ExitDbError* = 5
|
|
ExitConflict* = 6
|
|
```
|
|
|
|
## Integration with Existing Components
|
|
|
|
### With Message Bus (skills-ms5)
|
|
|
|
All state transitions publish messages:
|
|
```python
|
|
db.publish_message(task_id, 'state_change', {'from': 'ASSIGNED', 'to': 'WORKING'})
|
|
```
|
|
|
|
### With review-gate (skills-byq)
|
|
|
|
Review-gate reads worker state:
|
|
```python
|
|
state = db.get_state(task_id)
|
|
if state == 'IN_REVIEW':
|
|
# Check review status
|
|
...
|
|
```
|
|
|
|
### With Status Dashboard (skills-yak)
|
|
|
|
Status command queries message bus:
|
|
```python
|
|
workers = db.query("""
|
|
SELECT task_id, state, last_heartbeat
|
|
FROM workers
|
|
ORDER BY state_changed_at DESC
|
|
""")
|
|
```
|
|
|
|
## Open Questions
|
|
|
|
1. ~~**Approval flow**~~: Resolved - `worker approve` and `worker request-changes` commands added
|
|
2. **Agent wrapper**: Should `worker run <agent-cmd>` wrap agent execution with heartbeats?
|
|
3. **Multiple attempts**: If task fails, `worker retry <task-id>` or new spawn?
|
|
4. **Integration branch**: Create on first spawn, or require it exists?
|
|
|
|
## Resolved from Spec Review
|
|
|
|
| Issue | Resolution |
|
|
|-------|------------|
|
|
| Missing approve command | Added `worker approve` and `worker request-changes` |
|
|
| Rebase conflict flow | Don't abort, leave in progress for resolution |
|
|
| Source of truth conflict | SQLite only; JSON files are derived caches |
|
|
| STALE as persistent state | Changed to computed for display from heartbeat age |
|
|
| `done` from CONFLICTED | Now allowed with --skip-rebase |
|
|
| Merge race condition | Added fetch-merge-push retry loop |
|
|
| Spawn race condition | Use DB unique constraint (implementation detail) |
|
|
|
|
## References
|
|
|
|
- State machine design: `docs/design/worker-state-machine.md`
|
|
- Message passing: `docs/design/message-passing-layer.md`
|
|
- Branch isolation: `docs/design/branch-per-worker.md`
|