Multi-agent coordination CLI with SQLite message bus: - State machine: ASSIGNED -> WORKING -> IN_REVIEW -> APPROVED -> COMPLETED - Commands: spawn, start, done, approve, merge, cancel, fail, heartbeat - SQLite WAL mode, dedicated heartbeat thread, channel-based IPC - cligen for CLI, tiny_sqlite for DB, ORC memory management Design docs for branch-per-worker, state machine, message passing, and human observability patterns. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
17 KiB
Worker CLI Primitives Design
Status: Draft (v3 - Nim implementation) Bead: skills-sse Epic: skills-s6y (Multi-agent orchestration: Lego brick architecture) Language: Nim (cligen, ORC)
Changelog
- v3: Nim implementation (cligen, tiny_sqlite, osproc)
- v2: Fixed BLOCKERs from orch spec-review (approve command, conflict flow, source of truth, races, STALE semantics)
Overview
This document defines the CLI commands for multi-agent worker coordination. The CLI is the interface between human orchestrators, AI agents, and the underlying coordination infrastructure (SQLite bus, git worktrees, state machine).
Design Decisions (from orch consensus)
Strong Consensus (4/4 models)
| Decision | Choice | Rationale |
|---|---|---|
| Spawn semantics | Prepare workspace only | Separation of concerns; debug before execute |
| Implementation | Nim CLI | Single binary, fast startup, compiled |
| CLI framework | cligen | Auto-generates from proc signatures |
| Idempotency | All commands idempotent | Crash recovery, safe retries |
| Worker ID = Task ID | Yes, 1:1 mapping | Simplifies mental model; add attempt IDs later if needed |
| State source of truth | SQLite only | All state reads/writes via DB; JSON files are derived caches |
| STALE detection | Computed for display | STALE is not a persistent state; computed from heartbeat age |
Split Decision: Agent Discovery
| Approach | Advocates | Trade-off |
|---|---|---|
| Local file in worktree | Gemini, GPT | cd worktree && cmd just works; no args needed |
| Explicit CLI arg | Qwen, Sonar | More explicit; no "magic" context |
Decision: Hybrid - local file for context, but allow explicit --task override.
Split Decision: Heartbeat Mechanism
| Approach | Advocates | Trade-off |
|---|---|---|
| Wrapper/supervisor | Gemini, GPT | Reliable; agent doesn't need to remember |
| Explicit call | Qwen, Sonar | Simple; observable; agent controls timing |
Decision: Start with explicit call; add wrapper as optional enhancement.
Command Structure
worker <command> [options]
Orchestrator Commands (Human Runs)
| Command | Purpose | State Transition |
|---|---|---|
worker spawn <task-id> |
Create workspace | → ASSIGNED |
worker status |
Dashboard of all workers | (read-only) |
worker approve <task-id> |
Approve completed work | IN_REVIEW → APPROVED |
worker request-changes <task-id> |
Request revisions | IN_REVIEW → WORKING |
worker merge <task-id> |
Merge approved work | APPROVED → COMPLETED |
worker cancel <task-id> |
Abort and cleanup (state only) | * → FAILED |
Agent Commands (Worker Runs)
| Command | Purpose | State Transition |
|---|---|---|
worker start |
Begin work | ASSIGNED → WORKING |
worker done |
Complete work (includes rebase) | WORKING → IN_REVIEW |
worker heartbeat |
Liveness signal | (no transition) |
worker fail <reason> |
Signal failure | WORKING → FAILED |
Command Specifications
worker spawn <task-id>
Create a new worker workspace for a task.
worker spawn skills-abc [--description "Fix auth bug"] [--from integration]
Actions:
- Check task doesn't already exist (or return existing if idempotent)
- Create branch
feat/<task-id>from--from(default:origin/integration) - Create worktree at
worktrees/<task-id>/ - Write context file
worktrees/<task-id>/.worker-ctx.json - Insert
task_assignmessage into bus.db - Create state file
.worker-state/workers/<task-id>.json
Context File (.worker-ctx.json):
{
"task_id": "skills-abc",
"branch": "feat/skills-abc",
"worktree": "worktrees/skills-abc",
"created_at": "2026-01-10T15:00:00Z",
"description": "Fix auth bug"
}
State File (.worker-state/workers/<task-id>.json):
{
"task_id": "skills-abc",
"state": "ASSIGNED",
"branch": "feat/skills-abc",
"assigned_at": "2026-01-10T15:00:00Z",
"state_changed_at": "2026-01-10T15:00:00Z"
}
Idempotency: If task exists and worktree exists, return success with existing path.
Output:
Created worker: skills-abc
Branch: feat/skills-abc
Worktree: worktrees/skills-abc
State: ASSIGNED
worker status
Show dashboard of all workers.
worker status [--state working] [--stale] [--json]
Output:
TASK STATE BRANCH LAST HEARTBEAT AGE
skills-abc WORKING feat/skills-abc 2m ago 45m
skills-xyz IN_REVIEW feat/skills-xyz -- 2h
skills-123 STALE feat/skills-123 15m ago 3h
Stale Detection: Worker is STALE if:
- State is ASSIGNED/WORKING and no heartbeat in 5 minutes
- State is IN_REVIEW and no activity in 1 hour
worker start
Agent signals it's beginning work. Run from inside worktree.
cd worktrees/skills-abc
worker start
Actions:
- Read task_id from
.worker-ctx.json(or--taskarg) - Verify current state is ASSIGNED
- Insert
state_changemessage (ASSIGNED → WORKING) - Update state file
- Record first heartbeat
Idempotency: If already WORKING, log warning and succeed.
worker done
Agent signals work is complete. Includes mandatory rebase.
cd worktrees/skills-abc
worker done [--skip-rebase]
Actions:
- Read task_id from context
- Verify current state is WORKING or CONFLICTED (for retry)
- If not
--skip-rebase:- Run
git fetch origin integration - Run
git rebase origin/integration - If conflict: DO NOT ABORT - leave rebase in progress, set state to CONFLICTED, exit with error
- Run
- If
--skip-rebase: verify no rebase in progress and branch is up-to-date - Run
git push -u origin feat/<task-id> - Insert
review_requestmessage - Transition to IN_REVIEW via
transition()with compare-and-set
Rebase Failure (rebase left in progress):
ERROR: Rebase conflict detected
Conflicting files: src/auth.py
State: CONFLICTED
To resolve:
1. Fix conflicts in worktree
2. git add <files>
3. git rebase --continue
4. worker done --skip-rebase
State Guards:
- From WORKING: proceed with rebase
- From CONFLICTED with
--skip-rebase: proceed (assumes conflicts resolved) - From CONFLICTED without
--skip-rebase: attempt rebase again
worker heartbeat
Emit liveness signal. Call periodically (every 10-30s).
cd worktrees/skills-abc
worker heartbeat [--status working] [--progress 0.5]
Actions:
- Read task_id from context
- Insert
heartbeatmessage with timestamp - Update
last_heartbeatin state file
Idempotency: Always succeeds; updates timestamp.
worker approve <task-id>
Human/review-gate approves completed work.
worker approve skills-abc [--by reviewer] [--comment "LGTM"]
Actions:
- Verify state is IN_REVIEW
- Transition to APPROVED via
transition() - Insert
review_approvedmessage with reviewer and comment
Idempotency: If already APPROVED, log and succeed.
worker request-changes <task-id>
Human/review-gate requests revisions.
worker request-changes skills-abc [--comment "Fix error handling"]
Actions:
- Verify state is IN_REVIEW
- Transition to WORKING via
transition() - Insert
changes_requestedmessage with comment
Idempotency: If already WORKING, log and succeed.
worker merge <task-id>
Merge approved work to integration branch.
worker merge skills-abc [--delete-branch]
Actions (with retry loop for contention):
- Verify state is APPROVED
git fetch origin integration feat/<task-id>git checkout integration && git reset --hard origin/integration- Merge
feat/<task-id>with--no-ff - Push integration
- If push fails (remote changed): go to step 2 (max 3 retries)
- If merge conflict: transition APPROVED → WORKING, exit with error
- Transition to COMPLETED via
transition() - If
--delete-branch: delete remote and local branch - Remove worktree via
git worktree remove
Idempotency: If already COMPLETED, log and succeed.
Merge Conflict (rare - only if integration moved significantly):
ERROR: Merge conflict during integration
State: WORKING (returned for revision)
To resolve:
1. Rebase in worktree: git rebase origin/integration
2. worker done
3. Get re-approval
worker kill <task-id>
Abort a worker and cleanup.
worker kill skills-abc [--reason "Scope changed"]
Actions:
- Insert
task_failedmessage with reason - Update state to FAILED
- Optionally remove worktree (with
--cleanup) - Optionally archive branch (with
--archive)
State Transitions via CLI
spawn
│
▼
┌──────────┐ ┌──────────┐ start ┌─────────┐
│ (none) │ ───► │ ASSIGNED │ ─────────────► │ WORKING │
└──────────┘ └──────────┘ └────┬────┘
│
┌────────────────────────────┤
│ │
│ done (rebase ok) │ done (conflict)
▼ ▼
┌───────────┐ ┌────────────┐
│ IN_REVIEW │ │ CONFLICTED │
└─────┬─────┘ └────────────┘
│ │
approve │ │ fix + done
(external) │ │
▼ │
┌──────────┐ │
│ APPROVED │ ◄─────────────────────┘
└────┬─────┘
│
│ merge
▼
┌───────────┐
│ COMPLETED │
└───────────┘
Directory Structure
project/
├── .worker-state/
│ ├── bus.db # SQLite message bus
│ ├── bus.jsonl # Debug export
│ └── workers/
│ ├── skills-abc.json # State file
│ └── skills-xyz.json
├── worktrees/ # Git worktrees (gitignored)
│ ├── skills-abc/
│ │ ├── .worker-ctx.json # Context for this worker
│ │ └── (project files)
│ └── skills-xyz/
└── .git/
Implementation
Nim Project Structure
src/
├── libs/sqlite3.c # SQLite amalgamation (static linking)
├── worker.nim # CLI entry point (cligen dispatch)
├── worker/
│ ├── db.nim # SQLite operations (tiny_sqlite)
│ ├── git.nim # Git/worktree operations (osproc)
│ ├── state.nim # State machine logic
│ ├── context.nim # Worker context handling
│ ├── heartbeat.nim # Dedicated heartbeat thread
│ └── types.nim # Shared types and constants
├── config.nims # Build configuration
└── worker.nimble # Package definition
CLI Entry Point (cligen)
import cligen
import worker/[db, git, state, context, heartbeat]
proc spawn(taskId: string, description = "", fromBranch = "origin/integration") =
## Create a new worker workspace
let ctx = createWorker(taskId, description, fromBranch)
echo "Created worker: ", taskId
echo " Branch: ", ctx.branch
echo " Worktree: ", ctx.worktree
echo " State: ASSIGNED"
proc status(state = "", stale = false, json = false, watch = false) =
## Show worker dashboard
let workers = getAllWorkers()
if json:
echo toJson(workers)
else:
printStatusTable(workers, stale)
proc start(task = "") =
## Signal ASSIGNED → WORKING (run from worktree)
let taskId = if task != "": task else: readContext().taskId
transition(taskId, "ASSIGNED", "WORKING")
echo "Started work on ", taskId
proc done(skipRebase = false) =
## Signal WORKING → IN_REVIEW (includes rebase)
let ctx = readContext()
if not skipRebase:
let ok = rebaseOnIntegration(ctx)
if not ok:
transition(ctx.taskId, "WORKING", "CONFLICTED")
quit("Rebase conflict - resolve and run: worker done --skip-rebase", 1)
pushBranch(ctx)
transition(ctx.taskId, "WORKING", "IN_REVIEW")
echo "Ready for review: ", ctx.taskId
proc approve(taskId: string, by = "", comment = "") =
## Approve completed work (IN_REVIEW → APPROVED)
transition(taskId, "IN_REVIEW", "APPROVED")
echo "Approved: ", taskId
proc requestChanges(taskId: string, comment = "") =
## Request revisions (IN_REVIEW → WORKING)
transition(taskId, "IN_REVIEW", "WORKING")
echo "Changes requested: ", taskId
proc merge(taskId: string, deleteBranch = false) =
## Merge approved work (APPROVED → COMPLETED)
mergeToIntegration(taskId, deleteBranch)
transition(taskId, "APPROVED", "COMPLETED")
echo "Merged: ", taskId
proc cancel(taskId: string, reason = "") =
## Abort a worker (* → FAILED)
transitionToFailed(taskId, reason)
echo "Cancelled: ", taskId
when isMainModule:
dispatchMulti(
[spawn],
[status],
[start],
[done],
[approve],
[requestChanges, cmdName = "request-changes"],
[merge],
[cancel]
)
State Machine Guards
import std/[tables, strformat]
import tiny_sqlite
type
WorkerState* = enum
wsAssigned = "ASSIGNED"
wsWorking = "WORKING"
wsConflicted = "CONFLICTED"
wsInReview = "IN_REVIEW"
wsApproved = "APPROVED"
wsCompleted = "COMPLETED"
wsFailed = "FAILED"
InvalidTransition* = object of CatchableError
StaleState* = object of CatchableError
# NOTE: STALE is NOT a persistent state - computed for display from heartbeat age
const ValidTransitions* = {
wsAssigned: @[wsWorking, wsFailed],
wsWorking: @[wsInReview, wsConflicted, wsFailed],
wsConflicted: @[wsInReview, wsWorking, wsFailed],
wsInReview: @[wsApproved, wsWorking, wsFailed],
wsApproved: @[wsCompleted, wsWorking, wsFailed],
wsCompleted: @[],
wsFailed: @[wsAssigned], # Retry
}.toTable
proc transition*(db: DbConn, taskId: string, fromState, toState: WorkerState) =
## Attempt state transition with compare-and-set guard
if toState notin ValidTransitions[fromState]:
raise newException(InvalidTransition, &"{fromState} → {toState}")
db.exec("BEGIN IMMEDIATE")
try:
let current = db.value("SELECT state FROM workers WHERE task_id = ?", taskId).get.fromDbValue(string)
if current != $fromState:
raise newException(StaleState, &"Expected {fromState}, got {current}")
db.exec("UPDATE workers SET state = ?, state_changed_at = ? WHERE task_id = ?",
$toState, epochTime(), taskId)
publishMessage(db, taskId, "state_change", %*{"from": $fromState, "to": $toState})
db.exec("COMMIT")
except:
db.exec("ROLLBACK")
raise
Build Configuration
# config.nims
--mm:orc
--threads:on
-d:release
--opt:size
# Static SQLite
switch("passC", "-DSQLITE_THREADSAFE=1")
switch("compile", "libs/sqlite3.c")
Exit Codes
const
ExitSuccess* = 0
ExitUsageError* = 2
ExitInvalidTransition* = 3
ExitGitError* = 4
ExitDbError* = 5
ExitConflict* = 6
Integration with Existing Components
With Message Bus (skills-ms5)
All state transitions publish messages:
db.publish_message(task_id, 'state_change', {'from': 'ASSIGNED', 'to': 'WORKING'})
With review-gate (skills-byq)
Review-gate reads worker state:
state = db.get_state(task_id)
if state == 'IN_REVIEW':
# Check review status
...
With Status Dashboard (skills-yak)
Status command queries message bus:
workers = db.query("""
SELECT task_id, state, last_heartbeat
FROM workers
ORDER BY state_changed_at DESC
""")
Open Questions
Approval flow: Resolved -worker approveandworker request-changescommands added- Agent wrapper: Should
worker run <agent-cmd>wrap agent execution with heartbeats? - Multiple attempts: If task fails,
worker retry <task-id>or new spawn? - Integration branch: Create on first spawn, or require it exists?
Resolved from Spec Review
| Issue | Resolution |
|---|---|
| Missing approve command | Added worker approve and worker request-changes |
| Rebase conflict flow | Don't abort, leave in progress for resolution |
| Source of truth conflict | SQLite only; JSON files are derived caches |
| STALE as persistent state | Changed to computed for display from heartbeat age |
done from CONFLICTED |
Now allowed with --skip-rebase |
| Merge race condition | Added fetch-merge-push retry loop |
| Spawn race condition | Use DB unique constraint (implementation detail) |
References
- State machine design:
docs/design/worker-state-machine.md - Message passing:
docs/design/message-passing-layer.md - Branch isolation:
docs/design/branch-per-worker.md