From 1c66d019bde7f0af0ca54076a8e68af2bacf4895 Mon Sep 17 00:00:00 2001 From: dan Date: Sat, 10 Jan 2026 18:47:47 -0800 Subject: [PATCH] feat: add worker CLI scaffold in Nim Multi-agent coordination CLI with SQLite message bus: - State machine: ASSIGNED -> WORKING -> IN_REVIEW -> APPROVED -> COMPLETED - Commands: spawn, start, done, approve, merge, cancel, fail, heartbeat - SQLite WAL mode, dedicated heartbeat thread, channel-based IPC - cligen for CLI, tiny_sqlite for DB, ORC memory management Design docs for branch-per-worker, state machine, message passing, and human observability patterns. Co-Authored-By: Claude Opus 4.5 --- docs/design/branch-per-worker.md | 406 ++++++++++ docs/design/human-observability.md | 405 ++++++++++ docs/design/message-passing-comparison.md | 140 ++++ docs/design/message-passing-layer.md | 749 ++++++++++++++++++ .../multi-agent-footguns-and-patterns.md | 219 +++++ docs/design/mvp-scope.md | 277 +++++++ docs/design/worker-cli-primitives.md | 567 +++++++++++++ docs/design/worker-state-machine.md | 258 ++++++ src/.gitignore | 1 + src/README.md | 80 ++ src/config.nims | 22 + src/libs/.gitkeep | 2 + src/worker.nim | 341 ++++++++ src/worker.nimble | 15 + src/worker/context.nim | 59 ++ src/worker/db.nim | 265 +++++++ src/worker/git.nim | 143 ++++ src/worker/heartbeat.nim | 120 +++ src/worker/state.nim | 241 ++++++ src/worker/types.nim | 113 +++ 20 files changed, 4423 insertions(+) create mode 100644 docs/design/branch-per-worker.md create mode 100644 docs/design/human-observability.md create mode 100644 docs/design/message-passing-comparison.md create mode 100644 docs/design/message-passing-layer.md create mode 100644 docs/design/multi-agent-footguns-and-patterns.md create mode 100644 docs/design/mvp-scope.md create mode 100644 docs/design/worker-cli-primitives.md create mode 100644 docs/design/worker-state-machine.md create mode 100644 src/.gitignore create mode 100644 src/README.md create mode 100644 src/config.nims create mode 100644 src/libs/.gitkeep create mode 100644 src/worker.nim create mode 100644 src/worker.nimble create mode 100644 src/worker/context.nim create mode 100644 src/worker/db.nim create mode 100644 src/worker/git.nim create mode 100644 src/worker/heartbeat.nim create mode 100644 src/worker/state.nim create mode 100644 src/worker/types.nim diff --git a/docs/design/branch-per-worker.md b/docs/design/branch-per-worker.md new file mode 100644 index 0000000..cbd9ab1 --- /dev/null +++ b/docs/design/branch-per-worker.md @@ -0,0 +1,406 @@ +# Branch-per-Worker Isolation Design + +**Status**: Draft +**Bead**: skills-roq +**Epic**: skills-s6y (Multi-agent orchestration: Lego brick architecture) + +## Overview + +This document defines the git branching strategy for multi-agent coordination. Each worker operates in an isolated git worktree on a dedicated branch, with mandatory rebase before review. + +## Design Principles + +1. **Orchestrator controls branch lifecycle** - Creates, assigns, cleans up +2. **Worktrees for parallelism** - Each worker gets isolated directory +3. **Integration branch as staging** - Buffer before main +4. **SQLite = process truth, Git = code truth** - Don't duplicate state +5. **Mandatory rebase** - Fresh base before review (consensus requirement) + +## Key Decisions + +### Branch Naming: `type/task-id` + +**Decision**: Use `type/task-id` (e.g., `feat/skills-abc`, `fix/skills-xyz`) + +**Rationale** (2/3 consensus): +- Branch describes *work*, not *worker* +- Survives task reassignment (if Claude fails, Gemini can continue) +- Worker identity in commit author: `Author: claude-3.5 ` + +**Rejected alternative**: `worker-id/task-id` - becomes misleading on reassignment + +### Worktrees vs Checkout + +**Decision**: Git worktrees (parallel directories) + +**Rationale** (3/3 consensus): +- `git checkout` updates working directory globally +- If Worker A checks out while Worker B writes, corruption possible +- Worktrees share `.git` object database, isolate filesystem +- Maps cleanly to "one worker = one workspace" + +``` +/project/ +├── .git/ # Shared object database +├── worktrees/ +│ ├── skills-abc/ # Worker 1's worktree +│ │ └── (full working copy) +│ └── skills-xyz/ # Worker 2's worktree +│ └── (full working copy) +└── (main working copy) # For orchestrator +``` + +### Integration Branch + +**Decision**: Use rolling `integration` branch as staging before `main` + +**Rationale** (3/3 consensus): +- AI agents introduce subtle regressions +- Integration branch = "demilitarized zone" for combined testing +- Human review before promoting to main +- Allows batching of related changes + +``` +main ─────────────────────────●───────────── + ↑ +integration ────●────●────●────●────● + ↑ ↑ ↑ ↑ +feat/T-101 ────● │ │ │ +feat/T-102 ─────────● │ │ +fix/T-103 ──────────────● │ +feat/T-104 ───────────────────● +``` + +### Conflict Handling + +**Decision**: Worker resolves trivial conflicts, escalates semantic conflicts + +**Rationale** (2/3 consensus - flash-or, gpt): +- Blanket "never resolve" is safe but slows throughput +- Mechanical conflicts (formatting, imports, non-overlapping) are safe +- Logic conflicts require human judgment + +**Rules**: +```python +def handle_rebase_conflict(conflict_info): + # Trivial: resolve automatically + if is_trivial_conflict(conflict_info): + resolve_mechanically() + run_tests() + if tests_pass(): + continue_rebase() + else: + abort_and_escalate() + + # Semantic: always escalate + else: + git_rebase_abort() + set_state(CONFLICTED) + notify_orchestrator() +``` + +**Trivial conflict criteria**: +- Only whitespace/formatting changes +- Import statement ordering +- Non-overlapping edits in same file +- Less than N lines changed in conflict region + +**Escalate if**: +- Conflict touches core logic +- Conflict spans multiple files +- Test failures after resolution +- Uncertain about correctness + +### State Machine Mapping + +**Decision**: SQLite is process truth, Git is code truth + +**Rationale** (2/3 consensus - gemini, gpt): +- Don't encode state in Git (tags, notes) - causes sync issues +- Observable Git signals already exist: + +| Worker State | Git Observable | +|--------------|----------------| +| ASSIGNED | Branch exists, worktree created | +| WORKING | New commits appearing | +| IN_REVIEW | Branch pushed, PR opened (or flag in SQLite) | +| APPROVED | PR approved | +| COMPLETED | Merged to integration/main | +| CONFLICTED | Rebase aborted, no new commits | + +**Link via task-id**: Commit trailers connect the two: +``` +feat: implement user authentication + +Task: skills-abc +Agent: claude-3.5 +``` + +### Cross-Worker Dependencies + +**Decision**: Strict serialization - don't depend on uncommitted work + +**Rationale** (3/3 consensus): +- "Speculative execution" creates house of cards +- If A's code rejected, B's work becomes invalid +- Cheaper to wait than waste tokens on orphaned code + +**Pattern for parallel work on related features**: +1. Orchestrator creates epic branch: `epic/auth-system` +2. Both workers branch from epic: `feat/T-101`, `feat/T-102` +3. Workers rebase onto epic, not main +4. Epic merged to integration when all tasks complete + +### Branch Cleanup + +**Decision**: Delete after merge, archive failures + +**Rationale** (3/3 consensus): +- Prevent branch bloat +- Archive failures for post-mortem analysis + +```bash +# On successful merge +git branch -d feat/T-101 +git worktree remove worktrees/T-101 + +# On failure/abandonment +git branch -m feat/T-101 archive/T-101-$(date +%Y%m%d) +git worktree remove worktrees/T-101 +``` + +## Workflow + +### 1. Task Assignment + +Orchestrator prepares workspace: + +```bash +# 1. Fetch latest +git fetch origin + +# 2. Create branch from integration +git branch feat/$TASK_ID origin/integration + +# 3. Create worktree +git worktree add worktrees/$TASK_ID feat/$TASK_ID + +# 4. Update SQLite +publish(db, 'orchestrator', { + 'type': 'task_assign', + 'to': worker_id, + 'correlation_id': task_id, + 'payload': { + 'branch': f'feat/{task_id}', + 'worktree': f'worktrees/{task_id}' + } +}) +``` + +### 2. Worker Starts + +Worker receives assignment: + +```bash +cd worktrees/$TASK_ID + +# Confirm environment +git status +git log --oneline -3 + +# Begin work... +``` + +### 3. Worker Commits + +During work: + +```bash +git add -A +git commit -m "feat: implement feature X + +Task: $TASK_ID +Agent: $AGENT_ID" +``` + +Update SQLite: +```python +publish(db, agent_id, { + 'type': 'state_change', + 'correlation_id': task_id, + 'payload': {'from': 'ASSIGNED', 'to': 'WORKING'} +}) +``` + +### 4. Pre-Review Rebase (Mandatory) + +Before requesting review: + +```bash +# 1. Fetch latest integration +git fetch origin integration + +# 2. Attempt rebase +git rebase origin/integration + +# 3. Handle result +if [ $? -eq 0 ]; then + # Success - push and request review + git push -u origin feat/$TASK_ID + # Update SQLite: IN_REVIEW +else + # Conflict - check if trivial + if is_trivial_conflict; then + resolve_and_continue + else + git rebase --abort + # Update SQLite: CONFLICTED + fi +fi +``` + +### 5. Review + +Review happens (human or review-gate): + +```python +# Check review state +review = get_review_state(task_id) + +if review['decision'] == 'approved': + publish(db, 'reviewer', { + 'type': 'review_result', + 'correlation_id': task_id, + 'payload': {'decision': 'approved'} + }) +elif review['decision'] == 'changes_requested': + publish(db, 'reviewer', { + 'type': 'review_result', + 'correlation_id': task_id, + 'payload': { + 'decision': 'changes_requested', + 'feedback': review['comments'] + } + }) + # Worker returns to WORKING state +``` + +### 6. Merge + +On approval: + +```bash +# Orchestrator merges to integration +git checkout integration +git merge --no-ff feat/$TASK_ID -m "Merge feat/$TASK_ID: $TITLE" +git push origin integration + +# Cleanup +git branch -d feat/$TASK_ID +git push origin --delete feat/$TASK_ID +git worktree remove worktrees/$TASK_ID +``` + +### 7. Promote to Main + +Periodically (or per-task): + +```bash +# When integration is green +git checkout main +git merge --ff-only integration +git push origin main +``` + +## Directory Structure + +``` +/project/ +├── .git/ +├── .worker-state/ +│ ├── bus.db # SQLite message bus +│ └── workers/ +│ └── worker-auth.json +├── worktrees/ # Worker worktrees (gitignored) +│ ├── skills-abc/ +│ └── skills-xyz/ +└── (main working copy) +``` + +Add to `.gitignore`: +``` +worktrees/ +``` + +## Conflict Resolution Script + +```bash +#!/bin/bash +# scripts/try-rebase.sh + +TASK_ID=$1 +TARGET_BRANCH=${2:-origin/integration} + +cd worktrees/$TASK_ID + +git fetch origin + +# Attempt rebase +if git rebase $TARGET_BRANCH; then + echo "Rebase successful" + exit 0 +fi + +# Check conflict severity +CONFLICT_FILES=$(git diff --name-only --diff-filter=U) +CONFLICT_COUNT=$(echo "$CONFLICT_FILES" | wc -l) + +# Trivial: single file, small diff +if [ "$CONFLICT_COUNT" -le 2 ]; then + # Try automatic resolution for whitespace/formatting + for file in $CONFLICT_FILES; do + if git checkout --theirs "$file" 2>/dev/null; then + git add "$file" + else + echo "Cannot auto-resolve: $file" + git rebase --abort + exit 2 # CONFLICTED + fi + done + + if git rebase --continue; then + echo "Auto-resolved trivial conflicts" + exit 0 + fi +fi + +# Non-trivial: abort and escalate +git rebase --abort +echo "Conflict requires human intervention" +exit 2 # CONFLICTED +``` + +## Integration with State Machine + +| State | Git Action | SQLite Message | +|-------|------------|----------------| +| IDLE → ASSIGNED | Branch + worktree created | `task_assign` | +| ASSIGNED → WORKING | First commit | `state_change` | +| WORKING → IN_REVIEW | Push + rebase success | `review_request` | +| WORKING → CONFLICTED | Rebase failed | `state_change` + `escalate` | +| IN_REVIEW → APPROVED | Review passes | `review_result` | +| IN_REVIEW → WORKING | Changes requested | `review_result` | +| APPROVED → COMPLETED | Merged | `task_done` | + +## Open Questions + +1. **Worktree location**: `./worktrees/` or `/tmp/worktrees/`? +2. **Integration → main cadence**: Per-task, hourly, daily, manual? +3. **Epic branches**: How complex should the epic workflow be? +4. **Failed branch retention**: How long to keep archived branches? + +## References + +- OpenHands: https://docs.openhands.dev/sdk/guides/iterative-refinement +- Gastown worktrees: https://github.com/steveyegge/gastown +- Git worktrees: https://git-scm.com/docs/git-worktree diff --git a/docs/design/human-observability.md b/docs/design/human-observability.md new file mode 100644 index 0000000..7e7da63 --- /dev/null +++ b/docs/design/human-observability.md @@ -0,0 +1,405 @@ +# Human Observability Design + +**Status**: Draft +**Bead**: skills-yak +**Epic**: skills-s6y (Multi-agent orchestration: Lego brick architecture) + +## Overview + +This document defines the observability interface for human orchestrators managing AI workers. The design follows kubectl/docker patterns: one root command with subcommands, table output, watch mode, and detailed describe views. + +## Design Decisions (from orch consensus) + +| Decision | Choice | Rationale | +|----------|--------|-----------| +| Command structure | One root, many subcommands | kubectl-style; composable; easy to extend | +| Status columns | id, state, age, heartbeat, staleness | At-a-glance signal for stuck/healthy workers | +| Stale threshold | 3x heartbeat = WARN, 10x = STALE | Avoid false positives from jitter | +| Watch mode | `--watch` flag | Multi-agent is event-driven; real-time visibility | +| Detail view | Full spec, transitions, errors, "why stuck" | Answer "what, why, where stuck?" | + +## Commands + +### Command Overview + +``` +worker status [options] # Dashboard table +worker show # Detailed view +worker logs # Message history +worker stats # Aggregate metrics +``` + +### `worker status` + +Dashboard view of all workers. + +```bash +worker status [--state STATE] [--stale] [--watch] [--json] +``` + +**Default Output**: +``` +TASK STATE AGE HEARTBEAT STATUS SUMMARY +skills-abc WORKING 45m 2m ago ok Fix auth bug +skills-xyz IN_REVIEW 2h -- ok Add login form +skills-123 WORKING 3h 12m ago STALE Refactor database +skills-456 CONFLICTED 1h 5m ago blocked Update API endpoints +``` + +**Columns**: + +| Column | Source | Description | +|--------|--------|-------------| +| TASK | state file | Task ID | +| STATE | state file | Current state (color-coded) | +| AGE | state file | Time since created | +| HEARTBEAT | bus.db | Time since last heartbeat | +| STATUS | computed | ok, WARN, STALE, blocked, error | +| SUMMARY | task description | First 30 chars of description | + +**State Colors** (if terminal supports): +- 🟢 Green: WORKING +- 🟡 Yellow: IN_REVIEW, ASSIGNED +- 🔴 Red: FAILED, CONFLICTED, STALE +- ⚪ Gray: COMPLETED + +**Options**: + +| Option | Description | +|--------|-------------| +| `--state STATE` | Filter by state (WORKING, IN_REVIEW, etc.) | +| `--stale` | Show only stale workers | +| `--watch` | Refresh every 2 seconds | +| `--json` | Output as JSON array | +| `--wide` | Show additional columns (branch, worker type) | + +**Watch Mode**: +```bash +worker status --watch +# Clears screen, refreshes every 2s +# Ctrl+C to exit +``` + +### `worker show ` + +Detailed view of a single worker. + +```bash +worker show skills-abc [--events] [--json] +``` + +**Output**: +``` +Task: skills-abc +Description: Fix authentication bug in login flow +State: WORKING +Branch: feat/skills-abc +Worktree: worktrees/skills-abc + +Created: 2026-01-10 14:00:00 (45m ago) +State Changed: 2026-01-10 14:05:00 (40m ago) +Last Heartbeat: 2026-01-10 14:43:00 (2m ago) + +Status: ok + ✓ Heartbeat within threshold + ✓ State progressing normally + +State History: + 14:00:00 → ASSIGNED (spawned) + 14:05:00 → WORKING (worker start) + +Git Status: + Branch: feat/skills-abc + Ahead of integration: 3 commits + Behind integration: 0 commits + Uncommitted changes: 2 files + +Recent Messages: + 14:43:00 heartbeat status=working, progress=0.6 + 14:35:00 heartbeat status=working, progress=0.4 + 14:05:00 state_change ASSIGNED → WORKING + 14:00:00 task_assign from=orchestrator +``` + +**With `--events`**: Show full message history. + +**Sections**: + +1. **Header**: Task ID, description, current state, branch +2. **Timestamps**: Created, state changed, last heartbeat +3. **Status Check**: Is it healthy? What's wrong? +4. **State History**: Timeline of transitions +5. **Git Status**: Branch status, commits, conflicts +6. **Recent Messages**: Last 10 messages from bus.db + +### `worker logs ` + +Stream message history for a task. + +```bash +worker logs skills-abc [--follow] [--since 1h] [--type heartbeat] +``` + +**Output**: +``` +2026-01-10 14:00:00 task_assign from=orchestrator +2026-01-10 14:05:00 state_change ASSIGNED → WORKING +2026-01-10 14:15:00 heartbeat status=working +2026-01-10 14:25:00 heartbeat status=working +... +``` + +**Options**: + +| Option | Description | +|--------|-------------| +| `--follow, -f` | Stream new messages as they arrive | +| `--since DURATION` | Show messages from last N minutes/hours | +| `--type TYPE` | Filter by message type | +| `--limit N` | Show last N messages (default: 50) | + +### `worker stats` + +Aggregate metrics across all workers. + +```bash +worker stats [--since 24h] +``` + +**Output**: +``` +Workers by State: + WORKING: 2 + IN_REVIEW: 1 + COMPLETED: 5 + FAILED: 1 + Total: 9 + +Health: + Healthy: 3 (33%) + Stale: 1 (11%) + +Timing (median): + ASSIGNED → WORKING: 2m + WORKING → IN_REVIEW: 45m + IN_REVIEW → APPROVED: 15m + Full cycle: 1h 10m + +Failures (last 24h): 1 + skills-789: Rebase conflict (3h ago) +``` + +## Stale Detection + +### Thresholds + +Based on heartbeat interval `H` (default: 10s): + +| Level | Threshold | Meaning | +|-------|-----------|---------| +| OK | < 3H (30s) | Normal operation | +| WARN | 3H - 10H (30s - 100s) | Possible issue | +| STALE | > 10H (100s) | Worker likely stuck/dead | +| DEAD | > 30H (5m) | Worker definitely dead | + +### Stale vs Stuck + +Two different problems: + +| Condition | Symptom | Detection | +|-----------|---------|-----------| +| **Stale worker** | No heartbeats | `now - last_heartbeat > threshold` | +| **Stuck task** | Heartbeating but no progress | Same state for > N minutes | + +**Stuck detection**: +```sql +SELECT task_id FROM workers +WHERE state = 'WORKING' + AND state_changed_at < datetime('now', '-30 minutes') + AND last_heartbeat > datetime('now', '-1 minute') +``` + +## Status Computation + +```python +def compute_status(worker: Worker) -> str: + now = datetime.utcnow() + heartbeat_age = (now - worker.last_heartbeat).total_seconds() + state_age = (now - worker.state_changed_at).total_seconds() + + H = 10 # heartbeat interval + + # Check heartbeat freshness + if worker.state in ('ASSIGNED', 'WORKING'): + if heartbeat_age > 30 * H: # 5 min + return 'DEAD' + if heartbeat_age > 10 * H: # 100s + return 'STALE' + if heartbeat_age > 3 * H: # 30s + return 'WARN' + + # Check for conflicts/failures + if worker.state == 'CONFLICTED': + return 'blocked' + if worker.state == 'FAILED': + return 'error' + + # Check for stuck (working but no progress) + if worker.state == 'WORKING' and state_age > 30 * 60: # 30 min + return 'stuck' + + return 'ok' +``` + +## Output Formatting + +### Table Format + +Use fixed-width columns for alignment: + +```python +COLUMNS = [ + ('TASK', 12), + ('STATE', 11), + ('AGE', 6), + ('HEARTBEAT', 10), + ('STATUS', 7), + ('SUMMARY', 30), +] + +def format_row(worker): + return f"{worker.task_id:<12} {worker.state:<11} {format_age(worker.age):<6} ..." +``` + +### JSON Format + +For scripting: + +```json +[ + { + "task_id": "skills-abc", + "state": "WORKING", + "age_seconds": 2700, + "last_heartbeat": "2026-01-10T14:43:00Z", + "status": "ok", + "branch": "feat/skills-abc" + } +] +``` + +### Color Codes + +```python +STATE_COLORS = { + 'ASSIGNED': 'yellow', + 'WORKING': 'green', + 'IN_REVIEW': 'yellow', + 'APPROVED': 'green', + 'COMPLETED': 'dim', + 'CONFLICTED': 'red', + 'FAILED': 'red', + 'STALE': 'red', +} + +STATUS_COLORS = { + 'ok': 'green', + 'WARN': 'yellow', + 'STALE': 'red', + 'DEAD': 'red', + 'blocked': 'yellow', + 'error': 'red', + 'stuck': 'yellow', +} +``` + +## Integration + +### With Worker CLI (skills-sse) + +`worker status` is part of the same CLI: + +```python +@worker.command() +@click.option('--watch', is_flag=True) +@click.option('--state') +@click.option('--stale', is_flag=True) +@click.option('--json', 'as_json', is_flag=True) +def status(watch, state, stale, as_json): + """Show worker dashboard.""" + ... +``` + +### With Message Bus (skills-ms5) + +Query SQLite for heartbeats: + +```sql +SELECT task_id, MAX(ts) as last_heartbeat +FROM messages +WHERE type = 'heartbeat' +GROUP BY task_id +``` + +### With State Files + +Read `.worker-state/workers/*.json` for current state. + +## Implementation + +### Python Package + +``` +skills/worker/ +├── commands/ +│ ├── status.py # Dashboard +│ ├── show.py # Detail view +│ ├── logs.py # Message history +│ └── stats.py # Aggregate metrics +└── display/ + ├── table.py # Table formatting + ├── colors.py # Terminal colors + └── watch.py # Watch mode loop +``` + +### Watch Mode Implementation + +```python +import time +import os + +def watch_loop(render_fn, interval=2): + """Clear screen and re-render every interval seconds.""" + try: + while True: + os.system('clear') # or use curses + render_fn() + time.sleep(interval) + except KeyboardInterrupt: + pass +``` + +## MVP Scope + +For MVP, implement: + +1. ✅ `worker status` - basic table with state, age, heartbeat +2. ✅ `worker status --watch` - refresh mode +3. ✅ `worker status --json` - JSON output +4. ✅ `worker show ` - basic detail view +5. ⏸️ `worker logs` - defer (can query SQLite directly) +6. ⏸️ `worker stats` - defer (nice to have) +7. ⏸️ TUI dashboard - defer (CLI sufficient for 2-4 workers) + +## Open Questions + +1. **Notification/alerts**: Should there be `worker watch --alert` that beeps on failures? +2. **Shell integration**: Prompt integration showing worker count/status? +3. **Log streaming**: Real-time follow for `worker logs --follow`? + +## References + +- `docker ps` output format +- `kubectl get pods` and `kubectl describe` +- `systemctl status` detail level +- `htop` for TUI inspiration (future) diff --git a/docs/design/message-passing-comparison.md b/docs/design/message-passing-comparison.md new file mode 100644 index 0000000..bd9e214 --- /dev/null +++ b/docs/design/message-passing-comparison.md @@ -0,0 +1,140 @@ +# Message Passing: Design Comparison + +**Purpose**: Compare our design decisions against Beads and Tissue to validate approach and identify gaps. + +## Summary Comparison + +| Decision | Our Design (v2) | Beads | Tissue | +|----------|-----------------|-------|--------| +| **Primary storage** | SQLite (WAL mode) | SQLite + JSONL export | JSONL (append-only) | +| **Cache/index** | N/A (SQLite is primary) | SQLite is primary | SQLite (derived) | +| **Write locking** | SQLite BEGIN IMMEDIATE | SQLite BEGIN IMMEDIATE | None (git merge) | +| **Concurrency model** | SQLite transactions | Optimistic (hash IDs) + SQLite txn | Optimistic (git merge) | +| **Crash safety** | SQLite atomic commit | SQLite transactions | Git (implicit) | +| **Heartbeats** | Yes (10s interval) | No (daemon only) | No | +| **Liveness detection** | SQL query on heartbeat timestamps | Not documented | Not documented | +| **Large payloads** | Blob storage (>4KB) | Compaction/summarization | Not addressed | +| **Coordination** | Polling + claim-check | `bd ready` queries | `tissue ready` queries | +| **Message schema** | Explicit (id, ts, from, type, payload) | Implicit (issue events) | Implicit (issue events) | +| **Human debugging** | JSONL export (read-only) | JSONL in git | JSONL primary | + +**Decision (2026-01-10)**: After orch consensus with 3 models, we aligned with Beads' approach (SQLite primary) over Tissue's (JSONL primary). Key factors: +- Payloads 1-50KB exceed POSIX atomic write guarantees (~4KB) +- Crash mid-write with flock still corrupts log +- SQLite transactions provide true atomicity +- JSONL export preserves human debugging (`tail -f`) + +## Detailed Analysis + +### Where We Align + +**1. JSONL as Source of Truth** +All three systems use append-only JSONL as the authoritative store. This is the right call: +- Git-friendly (merges cleanly) +- Human-readable (debuggable with `cat | jq`) +- Simple to implement + +**2. SQLite as Derived Cache** +All three use SQLite for queries, not as primary storage: +- Beads: Always-on cache with dirty tracking +- Tissue: Derived index, gitignored +- Ours: Phase 2 optimization + +**3. Pull-Based Coordination** +All use polling/queries rather than push events: +- `bd ready` / `tissue ready` / our `poll()` function +- Simpler than event-driven, works across process boundaries + +### Where We Diverge + +**1. Write Locking Strategy** + +| System | Approach | Trade-off | +|--------|----------|-----------| +| **Ours** | flock on JSONL file | Simple, prevents interleaving, works locally | +| **Beads** | SQLite BEGIN IMMEDIATE | Stronger guarantees, more complex | +| **Tissue** | None (trust git merge) | Simplest, but can corrupt JSONL mid-write | + +**Our rationale**: flock is simpler than SQLite transactions and safer than trusting git merge for mid-write crashes. Tissue's approach assumes writes complete atomically, which isn't guaranteed for large JSON lines. + +**2. Crash Safety** + +| System | Approach | +|--------|----------| +| **Ours** | Write to staging → validate → append under lock → delete staging | +| **Beads** | SQLite transactions (rollback on failure) | +| **Tissue** | Git recovery (implicit) | + +**Our rationale**: Staging directory adds explicit crash recovery without SQLite complexity. If agent dies mid-write, staged file is recovered on restart. + +**3. Heartbeats / Liveness** + +| System | Approach | +|--------|----------| +| **Ours** | Mandatory heartbeats every 10s, timeout detection | +| **Beads** | Background daemon (no explicit heartbeats) | +| **Tissue** | None | + +**Our rationale**: LLM API calls can hang indefinitely. Without heartbeats, a stuck agent blocks tasks forever. Beads/Tissue are issue trackers, not real-time coordination systems. + +**4. Large Payload Handling** + +| System | Approach | +|--------|----------| +| **Ours** | Blob storage with content-addressable hashing | +| **Beads** | Compaction (summarize old tasks) | +| **Tissue** | Not addressed | + +**Our rationale**: Code diffs and agent outputs can be large. Blob storage keeps the log scannable. Beads' compaction is for context windows, not payload size. + +**5. Message Schema** + +| System | Schema Type | +|--------|-------------| +| **Ours** | Explicit message schema (id, ts, from, to, type, payload) | +| **Beads** | Issue-centric (tasks with dependencies, audit trail) | +| **Tissue** | Issue-centric (similar to Beads) | + +**Our rationale**: We need general message passing (state changes, heartbeats, claims), not just issue tracking. Beads/Tissue are issue trackers first; we're building coordination primitives. + +### Gaps in Our Design (Learned from Beads) + +**1. Hash-Based IDs for Merge Safety** +Beads uses hash-based IDs (e.g., `bd-a1b2`) to prevent merge collisions. We should consider this for message IDs if multiple agents might create messages offline and merge later. + +**2. Dirty Tracking for Incremental Export** +Beads tracks "dirty" issues for efficient JSONL export. When we add SQLite cache, we should track which messages need re-export rather than full rescans. + +**3. File Hash Validation** +Beads stores JSONL file hash to detect external modifications. We could add this to detect corruption or manual edits. + +### Gaps in Our Design (Learned from Tissue) + +**1. FTS5 Full-Text Search** +Tissue's SQLite cache includes FTS5 for searching issue content. Useful for "find messages mentioning X" queries in Phase 2. + +**2. Simpler Concurrency (Maybe)** +Tissue trusts git merge without explicit locking. For single-machine scenarios with small writes, this might be sufficient. We could offer a "simple mode" without flock for low-contention cases. + +## Validation Verdict + +Our design is **more complex than Tissue but simpler than Beads**, which matches our use case: + +- **Tissue**: Issue tracker, optimizes for git collaboration +- **Beads**: Full workflow engine with daemon, RPC, recipes +- **Ours**: Coordination primitives for multi-agent coding + +The key additions we make (heartbeats, blob storage, staging directory) are justified by our real-time coordination requirements that issue trackers don't have. + +## Recommended Updates to Design + +1. **Add hash-based message IDs** - Prevent merge collisions if agents work offline +2. **Add file hash validation** - Detect log corruption on startup +3. **Document "simple mode"** - No flock for single-agent or low-contention scenarios +4. **Plan for FTS5** - Add to Phase 2 SQLite cache design + +## References + +- Beads source: https://github.com/steveyegge/beads +- Tissue source: https://github.com/evil-mind-evil-sword/tissue +- Our design: docs/design/message-passing-layer.md diff --git a/docs/design/message-passing-layer.md b/docs/design/message-passing-layer.md new file mode 100644 index 0000000..51d6b67 --- /dev/null +++ b/docs/design/message-passing-layer.md @@ -0,0 +1,749 @@ +# Message Passing Layer Design + +**Status**: Draft (v4 - Nim implementation) +**Bead**: skills-ms5 +**Epic**: skills-s6y (Multi-agent orchestration: Lego brick architecture) +**Language**: Nim (ORC, threads, tiny_sqlite) + +## Changelog + +- **v4**: Nim implementation details (tiny_sqlite, dedicated heartbeat thread, channels) +- **v3**: Fixed BLOCKERs from orch spec-review (seq autoincrement, ack semantics, heartbeats table, epoch timestamps, JSONL export, task leases) +- **v2**: Changed to SQLite primary (was JSONL+flock) + +## Overview + +This document defines the local message passing layer for multi-agent coordination. Agents communicate through a shared SQLite database with ACID guarantees. A JSONL export provides human debugging and git history. + +## Design Principles + +1. **SQLite as source of truth** - Atomic commits, crash recovery, indexed queries +2. **JSONL for observability** - Human-readable export for debugging and git diffs +3. **File-based, no network** - Any agent with SQLite bindings can participate +4. **Liveness detection** - Heartbeats to detect dead agents +5. **Blob storage for large payloads** - Keep database lean + +## Why SQLite Over JSONL+flock + +| Concern | JSONL+flock | SQLite | +|---------|-------------|--------| +| Crash mid-write | Corrupted log, manual recovery | Atomic commit, automatic rollback | +| Payload >4KB | Interleaved writes possible | Always atomic | +| Query efficiency | O(N) scan | O(1) with index | +| Portability | POSIX flock only | Works everywhere | +| Complexity | DIY recovery logic | Built-in WAL, transactions | + +See `docs/design/message-passing-comparison.md` for full analysis. + +## Prior Art + +| System | Approach | Key Insight | +|--------|----------|-------------| +| Beads | SQLite + JSONL export | SQLite for coordination, JSONL for git | +| Tissue | JSONL primary, SQLite cache | Simpler but less safe | +| Kafka | Append-only log, offsets | Total ordering, replay | +| Redis Streams | Consumer groups, claims | First claim wins | + +## Directory Structure + +``` +.worker-state/ +├── bus.db # SQLite - source of truth (gitignored) +├── bus.jsonl # JSONL export for debugging/git (read-only) +├── blobs/ # Content-addressable storage for large payloads +│ └── sha256-a1b2c3... +└── workers/ + └── worker-auth.json # Worker state (separate from messages) +``` + +## Database Schema + +```sql +-- Messages table (seq is rowid-based for true auto-increment) +CREATE TABLE messages ( + seq INTEGER PRIMARY KEY AUTOINCREMENT, -- Guaranteed monotonic, auto-assigned + id TEXT NOT NULL UNIQUE, -- UUID for idempotent retries + ts_ms INTEGER NOT NULL, -- Epoch milliseconds (reliable comparisons) + from_agent TEXT NOT NULL, + to_agent TEXT, -- NULL = broadcast + type TEXT NOT NULL, + correlation_id TEXT, + in_reply_to TEXT, + payload TEXT, -- JSON blob or NULL + payload_ref TEXT, -- Blob hash for large payloads + CHECK (payload IS NULL OR payload_ref IS NULL) -- Not both +); + +CREATE INDEX idx_messages_to_seq ON messages(to_agent, seq); +CREATE INDEX idx_messages_correlation_seq ON messages(correlation_id, seq); +CREATE INDEX idx_messages_type_from_ts ON messages(type, from_agent, ts_ms); + +-- Cursors table (per-agent read position) +-- NOTE: Cursor is only advanced AFTER successful processing (at-least-once) +CREATE TABLE cursors ( + agent_id TEXT PRIMARY KEY, + last_acked_seq INTEGER NOT NULL DEFAULT 0, -- Last successfully processed + updated_at_ms INTEGER NOT NULL +); + +-- Heartbeats table (separate from messages to avoid write contention) +-- One row per agent, upsert pattern +CREATE TABLE heartbeats ( + agent_id TEXT PRIMARY KEY, + ts_ms INTEGER NOT NULL, + status TEXT NOT NULL, -- idle, working, blocked + current_task TEXT, + progress REAL +); + +-- Task claims with lease expiry (prevents zombie claims) +CREATE TABLE task_claims ( + task_id TEXT PRIMARY KEY, + claimed_by TEXT NOT NULL, + claimed_at_ms INTEGER NOT NULL, + lease_until_ms INTEGER NOT NULL -- Must renew or release +); + +-- Schema versioning for migrations +CREATE TABLE meta ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL +); +INSERT INTO meta (key, value) VALUES ('schema_version', '1'); + +-- Export tracking (not line count!) +CREATE TABLE export_state ( + id INTEGER PRIMARY KEY CHECK (id = 1), + last_seq INTEGER NOT NULL DEFAULT 0 +); +INSERT INTO export_state (id, last_seq) VALUES (1, 0); +``` + +### Connection Setup (REQUIRED) + +PRAGMAs must be set on every connection. **Each thread must have its own connection.** + +```nim +import tiny_sqlite + +proc openBusDb*(dbPath: string): DbConn = + ## Open database with required PRAGMAs. One connection per thread. + result = openDatabase(dbPath) + result.exec("PRAGMA busy_timeout = 5000") + result.exec("PRAGMA foreign_keys = ON") + result.exec("PRAGMA journal_mode = WAL") + result.exec("PRAGMA synchronous = NORMAL") +``` + +**Critical**: Do NOT share `DbConn` across threads. Each thread opens its own connection. + +## Message Schema + +Same fields as before, stored in SQLite: + +```json +{ + "id": "uuid-v4", + "ts": "2026-01-10T14:00:00.000Z", + "from": "worker-auth", + "to": "orchestrator", + "type": "task_done", + "correlation_id": "skills-abc", + "payload": { + "branch": "worker-auth/skills-abc", + "commit": "a1b2c3d4" + } +} +``` + +### Message Types + +**Task Lifecycle:** +- `task_assign` - Orchestrator assigns task to worker +- `task_claim` - Worker claims a task (first wins) +- `task_progress` - Worker reports progress +- `task_done` - Worker completed task +- `task_failed` - Worker failed task +- `task_blocked` - Worker blocked on dependency + +**Coordination:** +- `heartbeat` - Liveness signal +- `state_change` - Worker state transition +- `review_request` - Request review of work +- `review_result` - Review approved/rejected + +**System:** +- `agent_started` - Agent came online +- `agent_stopped` - Agent shutting down gracefully + +## Delivery Semantics + +**This layer provides AT-LEAST-ONCE delivery.** + +- Messages are durable once committed +- Cursor only advances after successful processing +- Handlers MUST be idempotent or deduplicate by `message.id` +- Out-of-order processing is possible between unrelated messages + +## Write Protocol + +### Publishing Messages + +```python +import sqlite3 +import uuid +import json +import time + +def publish(db_path, agent_id, message): + msg_id = message.get('id') or str(uuid.uuid4()) + ts_ms = int(time.time() * 1000) # Epoch milliseconds + + # Handle large payloads + payload = message.get('payload') + payload_ref = None + if payload: + payload_json = json.dumps(payload) + if len(payload_json) > 4096: + payload_ref = save_blob(payload) # Atomic write (see Blob Storage) + payload = None + else: + payload = payload_json + + conn = get_connection(db_path) + try: + # Use plain BEGIN for simple inserts (less contention than IMMEDIATE) + # seq is auto-assigned by SQLite + conn.execute(""" + INSERT INTO messages (id, ts_ms, from_agent, to_agent, type, + correlation_id, in_reply_to, payload, payload_ref) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + """, ( + msg_id, + ts_ms, + agent_id, + message.get('to'), + message['type'], + message.get('correlation_id'), + message.get('in_reply_to'), + payload, + payload_ref + )) + conn.commit() + except sqlite3.IntegrityError: + # Duplicate id = already published (idempotent retry) + conn.rollback() + except Exception: + conn.rollback() + raise + finally: + conn.close() + + return msg_id +``` + +### Write Contention + +- SQLite WAL allows concurrent readers but **one writer at a time** +- `busy_timeout = 5000` handles short contention +- For high-frequency writes, add retry with exponential backoff +- Heartbeats use separate table to avoid contending with task messages + +## Read Protocol + +### Polling for New Messages (At-Least-Once) + +**CRITICAL**: Do NOT update cursor until message is successfully processed. + +```python +def poll(db_path, agent_id, limit=100): + """Fetch unacknowledged messages. Does NOT advance cursor.""" + conn = get_connection(db_path) + + # Get cursor position + cursor = conn.execute( + "SELECT last_acked_seq FROM cursors WHERE agent_id = ?", + (agent_id,) + ).fetchone() + last_seq = cursor['last_acked_seq'] if cursor else 0 + + # Fetch new messages for this agent (or broadcasts) + rows = conn.execute(""" + SELECT * FROM messages + WHERE seq > ? + AND (to_agent IS NULL OR to_agent = ?) + ORDER BY seq + LIMIT ? + """, (last_seq, agent_id, limit)).fetchall() + + conn.close() + + # Convert to dicts, resolve blob refs + messages = [] + for row in rows: + msg = dict(row) + if msg['payload_ref']: + try: + msg['payload'] = load_blob(msg['payload_ref']) + except FileNotFoundError: + msg['payload_error'] = 'blob_missing' + elif msg['payload']: + try: + msg['payload'] = json.loads(msg['payload']) + except json.JSONDecodeError: + msg['payload_error'] = 'decode_failed' + messages.append(msg) + + return messages + + +def ack(db_path, agent_id, seq): + """Acknowledge successful processing of message. Advances cursor.""" + conn = get_connection(db_path) + ts_ms = int(time.time() * 1000) + + # Only advance forward (monotonic) + conn.execute(""" + INSERT INTO cursors (agent_id, last_acked_seq, updated_at_ms) + VALUES (?, ?, ?) + ON CONFLICT(agent_id) DO UPDATE SET + last_acked_seq = MAX(cursors.last_acked_seq, excluded.last_acked_seq), + updated_at_ms = excluded.updated_at_ms + """, (agent_id, seq, ts_ms)) + conn.commit() + conn.close() +``` + +### Processing Loop + +```python +def process_loop(db_path, agent_id, handler): + """Process messages with at-least-once semantics.""" + while running: + messages = poll(db_path, agent_id) + for msg in messages: + try: + handler(msg) + ack(db_path, agent_id, msg['seq']) # Only after success + except Exception as e: + # Don't ack - message will be redelivered + log.error(f"Failed to process {msg['id']}: {e}") + break # Stop batch on failure + + time.sleep(poll_interval) +``` + +### Adaptive Polling + +```python +POLL_ACTIVE = 0.2 # 200ms when working +POLL_IDLE = 2.0 # 2s when idle +POLL_BACKOFF = 1.5 # Multiplier on each empty poll + +poll_interval = POLL_ACTIVE + +while running: + messages = poll(db_path, my_id) + if messages: + process(messages) + poll_interval = POLL_ACTIVE # Reset to fast + else: + poll_interval = min(poll_interval * POLL_BACKOFF, POLL_IDLE) + + time.sleep(poll_interval) +``` + +## Heartbeat Protocol + +Heartbeats use a **separate table** with upsert to avoid contending with task messages. + +**CRITICAL**: Heartbeats MUST run in a dedicated thread to avoid false death detection during blocking operations (LLM calls can take 60s+). + +### Nim Implementation + +Nim threads don't share memory by default. Use channels for communication: + +```nim +import std/[times, os, channels] +import tiny_sqlite + +type + HeartbeatCmd* = enum + hbUpdateStatus, hbStop + + HeartbeatMsg* = object + cmd*: HeartbeatCmd + status*: string + task*: string + progress*: float + +var heartbeatChan*: Channel[HeartbeatMsg] + +proc writeHeartbeat(db: DbConn, agentId, status, task: string, progress: float) = + let tsMs = toUnix(getTime()) * 1000 + nanosecond(getTime()) div 1_000_000 + db.exec(""" + INSERT INTO heartbeats (agent_id, ts_ms, status, current_task, progress) + VALUES (?, ?, ?, ?, ?) + ON CONFLICT(agent_id) DO UPDATE SET + ts_ms = excluded.ts_ms, + status = excluded.status, + current_task = excluded.current_task, + progress = excluded.progress + """, agentId, tsMs, status, task, progress) + +proc heartbeatWorker*(args: tuple[dbPath, agentId: string]) {.thread.} = + ## Dedicated heartbeat thread - owns its own DB connection + let db = openBusDb(args.dbPath) + var status = "idle" + var task = "" + var progress = 0.0 + + while true: + # Check for commands (non-blocking) + let tried = heartbeatChan.tryRecv() + if tried.dataAvailable: + case tried.msg.cmd + of hbStop: + db.close() + return + of hbUpdateStatus: + status = tried.msg.status + task = tried.msg.task + progress = tried.msg.progress + + # Write heartbeat + try: + writeHeartbeat(db, args.agentId, status, task, progress) + except CatchableError: + discard # Log but don't crash + + sleep(10_000) # 10 seconds + +# Usage from main thread: +proc startHeartbeat*(dbPath, agentId: string): Thread[tuple[dbPath, agentId: string]] = + heartbeatChan.open() + createThread(result, heartbeatWorker, (dbPath, agentId)) + +proc updateHeartbeatStatus*(status, task: string, progress: float) = + heartbeatChan.send(HeartbeatMsg(cmd: hbUpdateStatus, status: status, task: task, progress: progress)) + +proc stopHeartbeat*() = + heartbeatChan.send(HeartbeatMsg(cmd: hbStop)) +``` + +### Heartbeat Rules + +1. Emit every 10 seconds via background thread +2. Include current status: `idle`, `working`, `blocked` +3. Stale thresholds: 30s = WARN, 100s = STALE, 5min = DEAD + +### Liveness Query + +```sql +-- Find suspected dead agents (no heartbeat in 30s) +-- Uses epoch ms for reliable comparison +SELECT agent_id, ts_ms, + (strftime('%s', 'now') * 1000 - ts_ms) / 1000 AS age_seconds +FROM heartbeats +WHERE ts_ms < (strftime('%s', 'now') * 1000 - 30000); +``` + +## Task Claim Pattern + +Task claims use a **dedicated table with leases** to prevent zombie claims when agents die. + +```python +LEASE_DURATION_MS = 60000 # 60 seconds + +def try_claim_task(db_path, task_id, agent_id): + """Attempt to claim a task. Returns True if successful.""" + conn = get_connection(db_path) + ts_ms = int(time.time() * 1000) + lease_until = ts_ms + LEASE_DURATION_MS + + try: + conn.execute("BEGIN IMMEDIATE") + + # Check for existing valid claim + existing = conn.execute(""" + SELECT claimed_by, lease_until_ms FROM task_claims + WHERE task_id = ? + """, (task_id,)).fetchone() + + if existing: + if existing['lease_until_ms'] > ts_ms: + # Valid claim exists + conn.rollback() + return False + else: + # Expired claim - delete and reclaim + conn.execute("DELETE FROM task_claims WHERE task_id = ?", (task_id,)) + + # Insert new claim + conn.execute(""" + INSERT INTO task_claims (task_id, claimed_by, claimed_at_ms, lease_until_ms) + VALUES (?, ?, ?, ?) + """, (task_id, agent_id, ts_ms, lease_until)) + + conn.commit() + return True + + except sqlite3.OperationalError: + # Lock contention - another agent got there first + conn.rollback() + return False + finally: + conn.close() + + +def renew_claim(db_path, task_id, agent_id): + """Renew lease on a claimed task. Call periodically while working.""" + conn = get_connection(db_path) + ts_ms = int(time.time() * 1000) + lease_until = ts_ms + LEASE_DURATION_MS + + result = conn.execute(""" + UPDATE task_claims + SET lease_until_ms = ? + WHERE task_id = ? AND claimed_by = ? + """, (lease_until, task_id, agent_id)) + conn.commit() + updated = result.rowcount > 0 + conn.close() + return updated + + +def release_claim(db_path, task_id, agent_id): + """Release a claim when done (success or failure).""" + conn = get_connection(db_path) + conn.execute(""" + DELETE FROM task_claims + WHERE task_id = ? AND claimed_by = ? + """, (task_id, agent_id)) + conn.commit() + conn.close() +``` + +### Claim Lifecycle + +1. Agent calls `try_claim_task()` - gets exclusive claim with 60s lease +2. Agent periodically calls `renew_claim()` (every 30s) while working +3. On completion, agent calls `release_claim()` +4. If agent dies, lease expires and task becomes claimable again + +## JSONL Export for Debugging + +Background process or post-commit hook exports to JSONL. + +**CRITICAL**: Track last exported seq in database, NOT by counting lines. Seq values can have gaps from rollbacks. + +```python +def export_to_jsonl(db_path, jsonl_path): + """Export new messages to JSONL. Idempotent, crash-safe.""" + conn = get_connection(db_path) + + # Get last exported seq from database + row = conn.execute( + "SELECT last_seq FROM export_state WHERE id = 1" + ).fetchone() + last_exported = row['last_seq'] if row else 0 + + # Export new messages + rows = conn.execute( + "SELECT * FROM messages WHERE seq > ? ORDER BY seq", + (last_exported,) + ).fetchall() + + if not rows: + conn.close() + return + + max_seq = last_exported + with open(jsonl_path, 'a') as f: + for row in rows: + msg = dict(row) + # Optionally resolve blob refs for full content + f.write(json.dumps(msg) + '\n') + max_seq = max(max_seq, msg['seq']) + + # Update export state (atomic with file write visible) + conn.execute(""" + UPDATE export_state SET last_seq = ? WHERE id = 1 + """, (max_seq,)) + conn.commit() + conn.close() +``` + +This gives you `tail -f bus.jsonl` for debugging while SQLite remains source of truth. + +**Note**: JSONL contains blob references, not resolved payloads. For full content, use SQLite queries. + +## Blob Storage + +Content-addressable storage for large payloads. Uses **atomic write** pattern (temp + rename) to prevent partial reads. + +```python +import tempfile +import hashlib + +def save_blob(content): + """Atomically write blob. Returns ref or raises.""" + data = json.dumps(content).encode() + hash_hex = hashlib.sha256(data).hexdigest() + ref = f"sha256-{hash_hex}" + path = f".worker-state/blobs/{ref}" + + if os.path.exists(path): + return ref # Already exists (content-addressable = idempotent) + + # Atomic write: temp file + fsync + rename + os.makedirs(os.path.dirname(path), exist_ok=True) + fd, tmp_path = tempfile.mkstemp(dir=os.path.dirname(path)) + try: + with os.fdopen(fd, 'wb') as f: + f.write(data) + f.flush() + os.fsync(f.fileno()) + os.replace(tmp_path, path) # Atomic on POSIX + except: + if os.path.exists(tmp_path): + os.unlink(tmp_path) + raise + + return ref + + +def load_blob(ref): + """Load blob content. Raises FileNotFoundError if missing.""" + path = f".worker-state/blobs/{ref}" + with open(path, 'rb') as f: + return json.load(f) +``` + +## Phased Implementation + +### Phase 0: MVP +- SQLite with WAL mode +- Basic publish/poll +- Heartbeats every 10s +- Simple polling (500ms) + +### Phase 1: Robustness +- Task claim with transactions +- JSONL export for debugging +- Blob storage for large payloads +- Dead agent detection + +### Phase 2: Performance +- FTS5 for full-text search +- Message compaction/archival +- Adaptive polling + +### Phase 3: Scale (if ever) +- Read replicas +- Sharding by correlation_id +- Background workers for maintenance + +## Integration Points + +### With Worker State Machine (skills-4oj) + +State transitions publish messages: +```python +publish(db, worker_id, { + 'type': 'state_change', + 'payload': {'from': 'WORKING', 'to': 'IN_REVIEW'} +}) +``` + +### With review-gate (skills-byq) + +Review requests/results via messages: +```python +publish(db, worker_id, { + 'type': 'review_request', + 'correlation_id': task_id +}) +``` + +### With Observability (skills-yak) + +Status command queries SQLite directly: +```sql +SELECT from_agent, type, ts, payload +FROM messages +WHERE type IN ('heartbeat', 'state_change') +ORDER BY seq DESC LIMIT 100; +``` + +## Open Questions + +1. ~~**Database location**~~: `.worker-state/bus.db` (resolved) +2. **Export frequency**: On-commit hook vs periodic background? (defer to implementation) +3. **Message retention**: Archive messages older than min cursor? Vacuum schedule? +4. ~~**Schema migrations**~~: `meta.schema_version` table added; agents check on startup +5. **Security**: All agents currently trusted; add HMAC signing if isolation needed? +6. **Backpressure**: How to slow producers when consumers overwhelmed? + +## Resolved from Spec Review + +| Issue | Resolution | +|-------|------------| +| `seq` not auto-increment | Changed to `INTEGER PRIMARY KEY AUTOINCREMENT` | +| At-most-once delivery | Cursor now advanced only after ack | +| Heartbeats in messages table | Separate `heartbeats` table with upsert | +| Heartbeats block during LLM | `HeartbeatThread` runs independently | +| JSONL line-count broken | `export_state` table tracks last seq | +| Timestamp format unreliable | Changed to epoch milliseconds | +| Zombie task claims | `task_claims` table with lease expiry | +| Blob writes not atomic | Temp file + fsync + rename pattern | + +## Nim Implementation Notes + +### Dependencies + +```nim +# nimble install +requires "tiny_sqlite >= 0.2.0" +requires "jsony >= 1.1.0" # Optional, faster JSON +``` + +### Build Configuration + +```nim +# config.nims or bus.nim.cfg +--mm:orc +--threads:on +-d:release + +# Static SQLite (single binary): +{.passC: "-DSQLITE_THREADSAFE=1".} +{.compile: "libs/sqlite3.c".} +``` + +### Project Structure + +``` +src/ +├── libs/sqlite3.c # SQLite amalgamation +├── db_layer.nim # Connection setup, PRAGMAs +├── bus.nim # Publish, poll, ack +├── heartbeat.nim # Dedicated thread + channel +├── claims.nim # Task claim with leases +└── export.nim # JSONL export +``` + +### Key Patterns + +1. **One connection per thread** - never share `DbConn` +2. **Channels for control** - stop signals, status updates +3. **Short transactions** - BEGIN IMMEDIATE, do work, COMMIT quickly +4. **Exceptions for errors** - catch `DbError`, convert to meaningful messages + +## References + +- Beads: https://github.com/steveyegge/beads +- Tissue: https://github.com/evil-mind-evil-sword/tissue +- SQLite WAL mode: https://sqlite.org/wal.html +- SQLite BEGIN IMMEDIATE: https://sqlite.org/lang_transaction.html +- tiny_sqlite: https://github.com/GULPF/tiny_sqlite diff --git a/docs/design/multi-agent-footguns-and-patterns.md b/docs/design/multi-agent-footguns-and-patterns.md new file mode 100644 index 0000000..b57fe4a --- /dev/null +++ b/docs/design/multi-agent-footguns-and-patterns.md @@ -0,0 +1,219 @@ +# Multi-Agent Footguns, Patterns, and Emerging Ideas + +**Status**: Research synthesis +**Date**: 2026-01-10 +**Sources**: HN discussions, Reddit, practitioner blogs, orch consensus + +## Footguns: Lessons Learned the Hard Way + +### Git & Branch Chaos + +| Footgun | Description | Mitigation | +|---------|-------------|------------| +| **Force-resolve conflicts** | Agents rebase improperly, rewrite history, break CI | No direct git access for agents; orchestrator owns git operations | +| **Stale branches** | Agent works on outdated branch for hours | Frequent auto-rebase; version check before major edits | +| **Recovery nightmare** | Broken git state is hard to recover | Git bundles for checkpoints (SkillFS pattern); worktree isolation | +| **Branch naming confusion** | `worker-id/task-id` becomes misleading on reassignment | Use `type/task-id`; worker identity in commit author | + +### State & Database Issues + +| Footgun | Description | Mitigation | +|---------|-------------|------------| +| **Shared DB pollution** | Agents debugging against mutated state, heisenbugs | Ephemeral namespaced DBs per branch; schema prefixes | +| **Port conflicts** | Multiple web servers on same port | Auto-increment ports; orchestrator manages allocation | +| **Service duplication** | 10 agents need 10 PostgreSQL/Redis instances | Container-per-worktree; or accept serialization | +| **Feature flag races** | Agents toggle flags in parallel | Namespace flags per agent/branch | + +### Coordination Failures + +| Footgun | Description | Mitigation | +|---------|-------------|------------| +| **State divergence** | Each agent has different snapshot of reality | Single source of truth artifact; frequent rebase | +| **Silent duplication** | Two agents "fix" same bug differently | Central task ledger with explicit states; idempotent task IDs | +| **Dependency deadlocks** | A waits on B waits on A | Event-driven async; bounded time limits; no sync waits | +| **Role collapse** | Planner writes code; tester refactors | Narrow role boundaries; tool-level constraints | + +### Human Bottlenecks + +| Footgun | Description | Mitigation | +|---------|-------------|------------| +| **Review overload** | 10 agents = 10 partial PRs to reconcile | Review funnel: worker → arbiter agent → single synthesized PR | +| **Context switching** | Human juggling parallel agent outputs | Size limits per PR; "one story per PR" | +| **Morale drain** | Endless nit-picking, people disable agents | Pre-review by lint/style agents; humans see substantive deltas only | + +### Agent-Specific Issues + +| Footgun | Description | Mitigation | +|---------|-------------|------------| +| **Hallucinated packages** | 30% of suggested packages don't exist | Validate imports against known registries | +| **Temporary fixes** | Works in session, breaks in Docker | Require full env rebuild as acceptance test | +| **Skill atrophy** | Developers can't code without AI | Deliberate practice; understand what AI generates | +| **Test/impl conspiracy** | Brittle tests + brittle code pass together | Separate spec tests from impl tests; mutation testing | + +### Resource & Cost Issues + +| Footgun | Description | Mitigation | +|---------|-------------|------------| +| **Token blowups** | Parallel agents saturate context/API limits | Hard budgets per agent; limit context sizes | +| **Credit drain** | AI fixing its own mistakes in loops | Circuit breakers; attempt limits | +| **Timeout misreads** | Rate limits interpreted as semantic failures | Structured error channels; retry with idempotency | + +## Emerging Patterns (2026) + +### The "Rule of 4" + +Research shows effective team sizes limited to ~3-4 agents. Beyond this, communication overhead grows super-linearly (exponent 1.724). Cost of coordination outpaces value. + +**Implication**: Don't build 10-agent swarms. Build 3-4 specialized agents with clear boundaries. + +### Spec-Driven Development + +Adopted by Kiro, Tessl, GitHub Spec Kit: +- `requirements.md` - what to build +- `design.md` - how to build it +- `tasks.md` - decomposed work items + +Agents work from specs, not vague prompts. Specs are versioned; agents echo which version they used. + +### Layered Coordination (Not Monolithic) + +Instead of one complex orchestrator, compose independent layers: +1. Configuration management +2. Issue tracking (JSONL, merge-friendly) +3. Atomic locking (PostgreSQL advisory locks) +4. Filesystem isolation (git worktrees) +5. Validation gates +6. Enforcement rules +7. Session protocols + +Each layer independently useful; failures isolated. + +### PostgreSQL Advisory Locks for Claims + +Novel insight: Advisory locks auto-release on crash (no orphaned locks), operate in ~1ms, no table writes. Elegant solution for distributed claim races. + +```sql +SELECT pg_try_advisory_lock(task_id_hash); +-- Work... +SELECT pg_advisory_unlock(task_id_hash); +-- Or: connection dies → auto-released +``` + +### Git Bundles for Checkpoints (SkillFS) + +Every agent sandbox is a git repo. Session ends → git bundle stored. New session → restore from bundle, continue where left off. Complete audit trail via `git log`. + +### Hierarchical Over Flat Swarms + +Instead of 100-agent flat swarms: +- Nested coordination structures +- Partition the communication graph +- Supervisor per sub-team +- Only supervisors talk to each other + +### Plan-and-Execute Cost Pattern + +Expensive model creates strategy; cheap models execute steps. Can reduce costs by 90%. + +``` +Orchestrator (Claude Opus) → Plan +Workers (Claude Haiku) → Execute steps +Reviewer (Claude Sonnet) → Validate +``` + +### Bounded Autonomy Spectrum + +Progressive autonomy based on risk: +1. **Human in the loop** - approve each action +2. **Human on the loop** - monitor, intervene if needed +3. **Human out of the loop** - fully autonomous + +Match to task complexity and outcome criticality. + +## Best Practices Synthesis + +### From HN Discussions + +1. **Well-scoped tasks with tight contracts** - Not vague prompts +2. **Automated testing gates** - Agents must pass before review +3. **2-3 agents realistic** - Not 10 parallel +4. **Exclusive ownership per module** - One writer per concern +5. **Short-lived branches** - Frequent merge to prevent drift + +### From orch Consensus + +1. **Treat agents as untrusted workers** - Not peers with full access +2. **Machine-readable contracts** - JSON schema between roles +3. **Per-agent logs with correlation IDs** - Distributed systems observability +4. **Guardrail agents** - Security/policy checks on every diff +5. **Versioned task specs** - Bump version → re-run affected agents + +### From Practitioner Blogs + +1. **Coordination ≠ isolation** - Advisory locks (who works on what) + worktrees (how they work) +2. **JSONL for issues** - One per line, deterministic merge rules +3. **Session protocols** - Explicit start/close procedures +4. **Modular rules with includes** - Template configuration + +## How This Applies to Our Design + +### Already Covered + +| Pattern | Our Design | +|---------|------------| +| SQLite for coordination | ✅ bus.db with transactions | +| Git worktrees | ✅ branch-per-worker.md | +| State machine | ✅ worker-state-machine.md | +| Heartbeats/liveness | ✅ 10s interval in message-passing | +| Claim-check pattern | ✅ SQLite transactions | +| Task serialization | ✅ No uncommitted dependencies | + +### Should Add + +| Pattern | Gap | Action | +|---------|-----|--------| +| Spec-driven tasks | Tasks are just titles | Add structured task specs (requirements, design, acceptance) | +| Role boundaries | Not enforced | Add tool-level constraints per agent type | +| Review funnel | Missing arbiter | Add synthesis step before human review | +| Versioned specs | Not tracked | Add version field to task assignments | +| Cost budgets | Not implemented | Add token/time budgets per agent | +| Correlation IDs | Partial (correlation_id) | Ensure end-to-end tracing | + +### Validate Our Decisions + +| Decision | Validation | +|----------|------------| +| SQLite over JSONL | ✅ Confirmed - JSONL for issues only, SQLite for coordination | +| Orchestrator creates branches | ✅ Confirmed - reduces agent setup, enforces policy | +| 3-4 agents max | ✅ Aligns with "Rule of 4" research | +| Mandatory rebase | ✅ Confirmed - prevents stale branch drift | +| Escalate semantic conflicts | ✅ Confirmed - agents hallucinate resolutions | + +## Open Questions Surfaced + +1. **PostgreSQL advisory locks vs SQLite?** - Do we need Postgres, or is SQLite sufficient? +2. **Git bundles for checkpoints?** - Should we adopt SkillFS pattern? +3. **Spec files per task?** - How structured should task specs be? +4. **Arbiter/synthesis agent?** - Add to architecture before human review? +5. **Token budgets?** - How to enforce across different agent types? + +## Sources + +### HN Discussions +- [Superset: 10 Parallel Coding Agents](https://news.ycombinator.com/item?id=46368739) +- [Desktop App for Parallel Agentic Dev](https://news.ycombinator.com/item?id=46027947) +- [SkillFS: Git-backed Sandboxes](https://news.ycombinator.com/item?id=46543093) +- [Zenflow: Agent Orchestration](https://news.ycombinator.com/item?id=46290617) +- [Git Worktree for Parallel Dev](https://news.ycombinator.com/item?id=46510462) + +### Blogs & Articles +- [Building a Multi-Agent Development Workflow](https://itsgg.com/blog/2026/01/08/building-a-multi-agent-development-workflow/) +- [The Real Struggle with AI Coding Agents](https://www.smiansh.com/blogs/the-real-struggle-with-ai-coding-agents-and-how-to-overcome-it/) +- [Why AI Coding Tools Don't Work For Me](https://blog.miguelgrinberg.com/post/why-generative-ai-coding-tools-and-agents-do-not-work-for-me) +- [Microsoft: Multi-Agent Systems at Scale](https://devblogs.microsoft.com/ise/multi-agent-systems-at-scale/) +- [LangChain: How and When to Build Multi-Agent](https://blog.langchain.com/how-and-when-to-build-multi-agent-systems/) + +### Research & Analysis +- [VentureBeat: More Agents Isn't Better](https://venturebeat.com/orchestration/research-shows-more-agents-isnt-a-reliable-path-to-better-enterprise-ai) +- [Deloitte: AI Agent Orchestration](https://www.deloitte.com/us/en/insights/industry/technology/technology-media-and-telecom-predictions/2026/ai-agent-orchestration.html) +- [10 Things Developers Want from Agentic IDEs](https://redmonk.com/kholterhoff/2025/12/22/10-things-developers-want-from-their-agentic-ides-in-2025/) diff --git a/docs/design/mvp-scope.md b/docs/design/mvp-scope.md new file mode 100644 index 0000000..fefb3b0 --- /dev/null +++ b/docs/design/mvp-scope.md @@ -0,0 +1,277 @@ +# Multi-Agent MVP Scope + +**Status**: Draft (v3 - Nim implementation) +**Goal**: Define the minimal viable set of primitives to run 2-3 worker agents coordinated by a human-attended orchestrator. +**Language**: Nim (ORC, cligen, tiny_sqlite) + +## Changelog + +- **v3**: Nim implementation decision (single binary, fast startup, compiled) +- **v2**: Fixed BLOCKERs from orch spec-review (resolved open questions, added rejection workflow, added failure scenarios) + +## Current Design Docs + +| Doc | Status | Content | +|-----|--------|---------| +| `worker-state-machine.md` | ✅ Complete | 8 states, transitions, file schema | +| `message-passing-layer.md` | ✅ v4 | SQLite bus, Nim heartbeat thread, channels | +| `worker-cli-primitives.md` | ✅ v3 | Nim CLI with cligen, state machine | +| `human-observability.md` | ✅ Complete | Status dashboard, watch mode, stale detection | +| `branch-per-worker.md` | ✅ Complete | Worktrees, integration branch, rebase protocol | +| `multi-agent-footguns-and-patterns.md` | ✅ Complete | Research synthesis, validated decisions | +| `message-passing-comparison.md` | ✅ Complete | Beads/Tissue comparison, SQLite rationale | + +## Task Triage for MVP + +### Tier 1: Essential for MVP (Must Have) + +These are required to run the basic loop: assign → work → review → merge. + +| Bead | Task | Rationale | +|------|------|-----------| +| `skills-sse` | Worker CLI commands | Core interface for orchestrator and agents | +| `skills-4oj` | Worker state machine | Already designed, need implementation | +| `skills-ms5` | Message passing layer | Already designed (SQLite), need implementation | +| `skills-roq` | Branch-per-worker isolation | Already designed, need implementation | +| `skills-byq` | Integrate review-gate | Have review-gate, need to wire to worker flow | +| `skills-yak` | Human observability (status) | Human needs to see what's happening | +| **NEW** | Agent system prompt | LLM needs tool definitions for worker commands | + +### Tier 2: Important but Can Defer + +| Bead | Task | Why Defer | +|------|------|-----------| +| `skills-0y9` | Structured task specs | Can start with simple task descriptions | +| `skills-4a2` | Role boundaries | Trust-based initially, add constraints later | +| `skills-31y` | Review funnel/arbiter | Works with 2-3 agents; needed at scale | +| `skills-zf6` | Evidence artifacts | Can use simple JSON initially | +| `skills-1jc` | Stuck detection | Monitor manually first (stale detection in MVP) | + +### Tier 3: Nice to Have (Post-MVP) + +| Bead | Task | Why Later | +|------|------|-----------| +| `skills-1qz` | Token budgets | Manual monitoring first | +| `skills-5ji` | Ephemeral namespaced envs | Single-project MVP | +| `skills-7n4` | Rollback strategy | Manual rollback first | +| `skills-8ak` | Git bundle checkpoints | Worktrees sufficient | +| `skills-r62` | Role + Veto pattern | Simple approve/reject first | +| `skills-udu` | Cross-agent compatibility | Single agent type first (Claude) | +| `skills-sh6` | OpenHands research | Research complete | +| `skills-yc6` | Document findings | Research captured | + +## MVP Feature Set + +### Commands + +```bash +# Orchestrator commands (human runs) +worker spawn [--description "..."] # Create branch, worktree, assign task +worker status [--watch] # Dashboard of all workers +worker approve # IN_REVIEW → APPROVED +worker request-changes # IN_REVIEW → WORKING (rejection) +worker merge # APPROVED → COMPLETED +worker cancel # * → FAILED (abort) + +# Worker commands (agent runs from worktree) +worker start # ASSIGNED → WORKING +worker done [--skip-rebase] # WORKING → IN_REVIEW (includes rebase) +worker heartbeat # Liveness signal (via background thread) +worker fail # WORKING → FAILED +``` + +### Data Flow + +``` +HAPPY PATH: + +1. Human: worker spawn skills-abc + → Creates feat/skills-abc branch + → Creates worktrees/skills-abc + → Publishes task_assign message + → State: ASSIGNED + +2. Agent: worker start + → Publishes state_change (ASSIGNED → WORKING) + → Starts HeartbeatThread (background) + → Begins work + +3. Agent: worker done + → Runs git rebase origin/integration + → Pushes branch + → Publishes review_request + → State: IN_REVIEW + +4. Human: worker approve skills-abc + → Publishes review_approved + → State: APPROVED + +5. Human: worker merge skills-abc + → Merges to integration (retry loop for contention) + → Cleans up branch/worktree + → State: COMPLETED + +REJECTION PATH: + +4b. Human: worker request-changes skills-abc "Fix error handling" + → Publishes changes_requested + → State: WORKING + → Agent resumes work, returns to step 3 + +CONFLICT PATH: + +3b. Agent: worker done (rebase fails) + → Rebase conflict detected, left in progress + → State: CONFLICTED + → Agent resolves conflicts, runs: git rebase --continue + → Agent: worker done --skip-rebase + → State: IN_REVIEW +``` + +### Directory Structure + +``` +project/ +├── .worker-state/ +│ ├── bus.db # SQLite message bus (source of truth) +│ ├── bus.jsonl # Debug export (derived) +│ ├── blobs/ # Large payloads (content-addressable) +│ └── workers/ +│ └── skills-abc.json # Worker state cache (derived from DB) +├── worktrees/ # Git worktrees (gitignored) +│ └── skills-abc/ +│ └── .worker-ctx.json # Static context for this worker +└── .git/ +``` + +## Implementation Order + +### Prerequisites + +```bash +# Nim dependencies +nimble install tiny_sqlite cligen jsony +``` + +Download SQLite amalgamation for static linking: +```bash +curl -O https://sqlite.org/2024/sqlite-amalgamation-3450000.zip +unzip sqlite-amalgamation-3450000.zip +cp sqlite-amalgamation-*/sqlite3.c src/libs/ +``` + +### Build Steps + +1. **Project setup** + - Create `src/worker.nimble` with dependencies + - Create `src/config.nims` with build flags (--mm:orc, --threads:on) + - Set up static SQLite compilation + +2. **Message bus** (`skills-ms5`) + - `src/worker/db.nim` - SQLite schema, connection setup + - `src/worker/bus.nim` - publish/poll/ack functions + - Dedicated heartbeat thread with channels + +3. **Worker state** (`skills-4oj`) + - `src/worker/state.nim` - State enum, transition guards + - `src/worker/types.nim` - Shared types + - Compare-and-set with BEGIN IMMEDIATE + +4. **Branch primitives** (`skills-roq`) + - `src/worker/git.nim` - Worktree create/remove (osproc) + - Rebase with conflict detection + - Merge with retry loop + +5. **CLI commands** (`skills-sse`) + - `src/worker.nim` - cligen dispatchMulti + - All subcommands: spawn, status, start, done, approve, merge, cancel + - Background heartbeat thread + +6. **review-gate integration** (`skills-byq`) + - review-gate calls `worker approve` / `worker request-changes` + - Stop hook checks worker state from bus.db + +7. **Status dashboard** (`skills-yak`) + - `worker status` with table output + - Stale detection from heartbeats table + - `--watch` mode for real-time updates + +8. **Agent system prompt** + - Tool definitions for worker commands + - Context about worktree location, task description + - Instructions for heartbeat, done, conflict handling + +### Compilation + +```bash +nim c -d:release --mm:orc --threads:on src/worker.nim +# Output: single static binary ~2-3MB +``` + +## Success Criteria + +MVP is complete when: + +### Happy Path +1. [ ] Can spawn a worker with `worker spawn ` +2. [ ] Worker appears in `worker status` dashboard with state and heartbeat +3. [ ] Agent can signal `worker start` and `worker done` +4. [ ] Heartbeats track agent liveness (stale detection after 30s) +5. [ ] `worker approve` transitions to APPROVED +6. [ ] `worker merge` completes the cycle +7. [ ] All state persists across session restarts + +### Failure Scenarios +8. [ ] Rebase conflict detected → state CONFLICTED, rebase left in progress +9. [ ] Agent timeout (no heartbeat 2+ min) → status shows STALE warning +10. [ ] `worker request-changes` returns to WORKING with feedback +11. [ ] `worker cancel` aborts any state → FAILED +12. [ ] Concurrent merge attempts handled (retry loop succeeds) + +## Non-Goals for MVP + +- Multiple orchestrators +- Cross-machine coordination +- Automatic conflict resolution (human intervenes) +- Token budgeting +- Structured task specs (simple descriptions) +- Arbiter agents +- Database isolation per worker + +## Resolved Questions + +| Question | Decision | Rationale | +|----------|----------|-----------| +| Language | **Nim** | Single binary, fast startup, compiled, Python-like syntax | +| CLI framework | **cligen** | Auto-generates from proc signatures | +| SQLite wrapper | **tiny_sqlite** | Better than stdlib, RAII, prepared statements | +| Memory management | **ORC** | Handles cycles, deterministic destruction | +| Static linking | **SQLite amalgamation** | Single binary, no system dependencies | +| Source of truth | **SQLite only** | JSON files are derived caches; DB is authoritative | +| Heartbeat | **Dedicated thread + channels** | Nim threads don't share memory | +| Integration branch | **Require exists** | Human creates `integration` before first spawn | +| review-gate | **Calls worker CLI** | `review-gate approve` → `worker approve` | +| STALE state | **Computed for display** | Not a persistent state; derived from heartbeat age | + +## Nim Dependencies + +| Package | Purpose | +|---------|---------| +| `tiny_sqlite` | SQLite wrapper with RAII | +| `cligen` | CLI subcommand generation | +| `jsony` | Fast JSON parsing (optional) | +| stdlib `osproc` | Git subprocess operations | +| stdlib `channels` | Thread communication | +| stdlib `times` | Epoch timestamps | + +## Spec Review Resolution + +| Issue | Resolution | +|-------|------------| +| Missing rejection workflow | Added `request-changes` command and path in data flow | +| Agent system prompt missing | Added to Tier 1 implementation order | +| Source of truth confusion | Clarified SQLite primary, JSON derived | +| Test scenarios missing | Added failure scenarios 8-12 to success criteria | +| Heartbeat mechanism | Dedicated thread with own SQLite connection | +| Review-gate integration | Clarified review-gate calls worker CLI | +| Language choice | Nim for single binary, fast startup | diff --git a/docs/design/worker-cli-primitives.md b/docs/design/worker-cli-primitives.md new file mode 100644 index 0000000..4bd4d55 --- /dev/null +++ b/docs/design/worker-cli-primitives.md @@ -0,0 +1,567 @@ +# Worker CLI Primitives Design + +**Status**: Draft (v3 - Nim implementation) +**Bead**: skills-sse +**Epic**: skills-s6y (Multi-agent orchestration: Lego brick architecture) +**Language**: Nim (cligen, ORC) + +## Changelog + +- **v3**: Nim implementation (cligen, tiny_sqlite, osproc) +- **v2**: Fixed BLOCKERs from orch spec-review (approve command, conflict flow, source of truth, races, STALE semantics) + +## Overview + +This document defines the CLI commands for multi-agent worker coordination. The CLI is the interface between human orchestrators, AI agents, and the underlying coordination infrastructure (SQLite bus, git worktrees, state machine). + +## Design Decisions (from orch consensus) + +### Strong Consensus (4/4 models) + +| Decision | Choice | Rationale | +|----------|--------|-----------| +| Spawn semantics | Prepare workspace only | Separation of concerns; debug before execute | +| Implementation | **Nim CLI** | Single binary, fast startup, compiled | +| CLI framework | **cligen** | Auto-generates from proc signatures | +| Idempotency | All commands idempotent | Crash recovery, safe retries | +| Worker ID = Task ID | Yes, 1:1 mapping | Simplifies mental model; add attempt IDs later if needed | +| State source of truth | SQLite only | All state reads/writes via DB; JSON files are derived caches | +| STALE detection | Computed for display | STALE is not a persistent state; computed from heartbeat age | + +### Split Decision: Agent Discovery + +| Approach | Advocates | Trade-off | +|----------|-----------|-----------| +| Local file in worktree | Gemini, GPT | `cd worktree && cmd` just works; no args needed | +| Explicit CLI arg | Qwen, Sonar | More explicit; no "magic" context | + +**Decision**: Hybrid - local file for context, but allow explicit `--task` override. + +### Split Decision: Heartbeat Mechanism + +| Approach | Advocates | Trade-off | +|----------|-----------|-----------| +| Wrapper/supervisor | Gemini, GPT | Reliable; agent doesn't need to remember | +| Explicit call | Qwen, Sonar | Simple; observable; agent controls timing | + +**Decision**: Start with explicit call; add wrapper as optional enhancement. + +## Command Structure + +``` +worker [options] +``` + +### Orchestrator Commands (Human Runs) + +| Command | Purpose | State Transition | +|---------|---------|------------------| +| `worker spawn ` | Create workspace | → ASSIGNED | +| `worker status` | Dashboard of all workers | (read-only) | +| `worker approve ` | Approve completed work | IN_REVIEW → APPROVED | +| `worker request-changes ` | Request revisions | IN_REVIEW → WORKING | +| `worker merge ` | Merge approved work | APPROVED → COMPLETED | +| `worker cancel ` | Abort and cleanup (state only) | * → FAILED | + +### Agent Commands (Worker Runs) + +| Command | Purpose | State Transition | +|---------|---------|------------------| +| `worker start` | Begin work | ASSIGNED → WORKING | +| `worker done` | Complete work (includes rebase) | WORKING → IN_REVIEW | +| `worker heartbeat` | Liveness signal | (no transition) | +| `worker fail ` | Signal failure | WORKING → FAILED | + +## Command Specifications + +### `worker spawn ` + +Create a new worker workspace for a task. + +```bash +worker spawn skills-abc [--description "Fix auth bug"] [--from integration] +``` + +**Actions**: +1. Check task doesn't already exist (or return existing if idempotent) +2. Create branch `feat/` from `--from` (default: `origin/integration`) +3. Create worktree at `worktrees//` +4. Write context file `worktrees//.worker-ctx.json` +5. Insert `task_assign` message into bus.db +6. Create state file `.worker-state/workers/.json` + +**Context File** (`.worker-ctx.json`): +```json +{ + "task_id": "skills-abc", + "branch": "feat/skills-abc", + "worktree": "worktrees/skills-abc", + "created_at": "2026-01-10T15:00:00Z", + "description": "Fix auth bug" +} +``` + +**State File** (`.worker-state/workers/.json`): +```json +{ + "task_id": "skills-abc", + "state": "ASSIGNED", + "branch": "feat/skills-abc", + "assigned_at": "2026-01-10T15:00:00Z", + "state_changed_at": "2026-01-10T15:00:00Z" +} +``` + +**Idempotency**: If task exists and worktree exists, return success with existing path. + +**Output**: +``` +Created worker: skills-abc + Branch: feat/skills-abc + Worktree: worktrees/skills-abc + State: ASSIGNED +``` + +### `worker status` + +Show dashboard of all workers. + +```bash +worker status [--state working] [--stale] [--json] +``` + +**Output**: +``` +TASK STATE BRANCH LAST HEARTBEAT AGE +skills-abc WORKING feat/skills-abc 2m ago 45m +skills-xyz IN_REVIEW feat/skills-xyz -- 2h +skills-123 STALE feat/skills-123 15m ago 3h +``` + +**Stale Detection**: Worker is STALE if: +- State is ASSIGNED/WORKING and no heartbeat in 5 minutes +- State is IN_REVIEW and no activity in 1 hour + +### `worker start` + +Agent signals it's beginning work. Run from inside worktree. + +```bash +cd worktrees/skills-abc +worker start +``` + +**Actions**: +1. Read task_id from `.worker-ctx.json` (or `--task` arg) +2. Verify current state is ASSIGNED +3. Insert `state_change` message (ASSIGNED → WORKING) +4. Update state file +5. Record first heartbeat + +**Idempotency**: If already WORKING, log warning and succeed. + +### `worker done` + +Agent signals work is complete. Includes mandatory rebase. + +```bash +cd worktrees/skills-abc +worker done [--skip-rebase] +``` + +**Actions**: +1. Read task_id from context +2. Verify current state is WORKING or CONFLICTED (for retry) +3. If not `--skip-rebase`: + - Run `git fetch origin integration` + - Run `git rebase origin/integration` + - If conflict: **DO NOT ABORT** - leave rebase in progress, set state to CONFLICTED, exit with error +4. If `--skip-rebase`: verify no rebase in progress and branch is up-to-date +5. Run `git push -u origin feat/` +6. Insert `review_request` message +7. Transition to IN_REVIEW via `transition()` with compare-and-set + +**Rebase Failure** (rebase left in progress): +``` +ERROR: Rebase conflict detected + Conflicting files: src/auth.py + State: CONFLICTED + + To resolve: + 1. Fix conflicts in worktree + 2. git add + 3. git rebase --continue + 4. worker done --skip-rebase +``` + +**State Guards**: +- From WORKING: proceed with rebase +- From CONFLICTED with `--skip-rebase`: proceed (assumes conflicts resolved) +- From CONFLICTED without `--skip-rebase`: attempt rebase again + +### `worker heartbeat` + +Emit liveness signal. Call periodically (every 10-30s). + +```bash +cd worktrees/skills-abc +worker heartbeat [--status working] [--progress 0.5] +``` + +**Actions**: +1. Read task_id from context +2. Insert `heartbeat` message with timestamp +3. Update `last_heartbeat` in state file + +**Idempotency**: Always succeeds; updates timestamp. + +### `worker approve ` + +Human/review-gate approves completed work. + +```bash +worker approve skills-abc [--by reviewer] [--comment "LGTM"] +``` + +**Actions**: +1. Verify state is IN_REVIEW +2. Transition to APPROVED via `transition()` +3. Insert `review_approved` message with reviewer and comment + +**Idempotency**: If already APPROVED, log and succeed. + +### `worker request-changes ` + +Human/review-gate requests revisions. + +```bash +worker request-changes skills-abc [--comment "Fix error handling"] +``` + +**Actions**: +1. Verify state is IN_REVIEW +2. Transition to WORKING via `transition()` +3. Insert `changes_requested` message with comment + +**Idempotency**: If already WORKING, log and succeed. + +### `worker merge ` + +Merge approved work to integration branch. + +```bash +worker merge skills-abc [--delete-branch] +``` + +**Actions** (with retry loop for contention): +1. Verify state is APPROVED +2. `git fetch origin integration feat/` +3. `git checkout integration && git reset --hard origin/integration` +4. Merge `feat/` with `--no-ff` +5. Push integration + - If push fails (remote changed): go to step 2 (max 3 retries) + - If merge conflict: transition APPROVED → WORKING, exit with error +6. Transition to COMPLETED via `transition()` +7. If `--delete-branch`: delete remote and local branch +8. Remove worktree via `git worktree remove` + +**Idempotency**: If already COMPLETED, log and succeed. + +**Merge Conflict** (rare - only if integration moved significantly): +``` +ERROR: Merge conflict during integration + State: WORKING (returned for revision) + + To resolve: + 1. Rebase in worktree: git rebase origin/integration + 2. worker done + 3. Get re-approval +``` + +### `worker kill ` + +Abort a worker and cleanup. + +```bash +worker kill skills-abc [--reason "Scope changed"] +``` + +**Actions**: +1. Insert `task_failed` message with reason +2. Update state to FAILED +3. Optionally remove worktree (with `--cleanup`) +4. Optionally archive branch (with `--archive`) + +## State Transitions via CLI + +``` + spawn + │ + ▼ +┌──────────┐ ┌──────────┐ start ┌─────────┐ +│ (none) │ ───► │ ASSIGNED │ ─────────────► │ WORKING │ +└──────────┘ └──────────┘ └────┬────┘ + │ + ┌────────────────────────────┤ + │ │ + │ done (rebase ok) │ done (conflict) + ▼ ▼ + ┌───────────┐ ┌────────────┐ + │ IN_REVIEW │ │ CONFLICTED │ + └─────┬─────┘ └────────────┘ + │ │ + approve │ │ fix + done + (external) │ │ + ▼ │ + ┌──────────┐ │ + │ APPROVED │ ◄─────────────────────┘ + └────┬─────┘ + │ + │ merge + ▼ + ┌───────────┐ + │ COMPLETED │ + └───────────┘ +``` + +## Directory Structure + +``` +project/ +├── .worker-state/ +│ ├── bus.db # SQLite message bus +│ ├── bus.jsonl # Debug export +│ └── workers/ +│ ├── skills-abc.json # State file +│ └── skills-xyz.json +├── worktrees/ # Git worktrees (gitignored) +│ ├── skills-abc/ +│ │ ├── .worker-ctx.json # Context for this worker +│ │ └── (project files) +│ └── skills-xyz/ +└── .git/ +``` + +## Implementation + +### Nim Project Structure + +``` +src/ +├── libs/sqlite3.c # SQLite amalgamation (static linking) +├── worker.nim # CLI entry point (cligen dispatch) +├── worker/ +│ ├── db.nim # SQLite operations (tiny_sqlite) +│ ├── git.nim # Git/worktree operations (osproc) +│ ├── state.nim # State machine logic +│ ├── context.nim # Worker context handling +│ ├── heartbeat.nim # Dedicated heartbeat thread +│ └── types.nim # Shared types and constants +├── config.nims # Build configuration +└── worker.nimble # Package definition +``` + +### CLI Entry Point (cligen) + +```nim +import cligen +import worker/[db, git, state, context, heartbeat] + +proc spawn(taskId: string, description = "", fromBranch = "origin/integration") = + ## Create a new worker workspace + let ctx = createWorker(taskId, description, fromBranch) + echo "Created worker: ", taskId + echo " Branch: ", ctx.branch + echo " Worktree: ", ctx.worktree + echo " State: ASSIGNED" + +proc status(state = "", stale = false, json = false, watch = false) = + ## Show worker dashboard + let workers = getAllWorkers() + if json: + echo toJson(workers) + else: + printStatusTable(workers, stale) + +proc start(task = "") = + ## Signal ASSIGNED → WORKING (run from worktree) + let taskId = if task != "": task else: readContext().taskId + transition(taskId, "ASSIGNED", "WORKING") + echo "Started work on ", taskId + +proc done(skipRebase = false) = + ## Signal WORKING → IN_REVIEW (includes rebase) + let ctx = readContext() + if not skipRebase: + let ok = rebaseOnIntegration(ctx) + if not ok: + transition(ctx.taskId, "WORKING", "CONFLICTED") + quit("Rebase conflict - resolve and run: worker done --skip-rebase", 1) + pushBranch(ctx) + transition(ctx.taskId, "WORKING", "IN_REVIEW") + echo "Ready for review: ", ctx.taskId + +proc approve(taskId: string, by = "", comment = "") = + ## Approve completed work (IN_REVIEW → APPROVED) + transition(taskId, "IN_REVIEW", "APPROVED") + echo "Approved: ", taskId + +proc requestChanges(taskId: string, comment = "") = + ## Request revisions (IN_REVIEW → WORKING) + transition(taskId, "IN_REVIEW", "WORKING") + echo "Changes requested: ", taskId + +proc merge(taskId: string, deleteBranch = false) = + ## Merge approved work (APPROVED → COMPLETED) + mergeToIntegration(taskId, deleteBranch) + transition(taskId, "APPROVED", "COMPLETED") + echo "Merged: ", taskId + +proc cancel(taskId: string, reason = "") = + ## Abort a worker (* → FAILED) + transitionToFailed(taskId, reason) + echo "Cancelled: ", taskId + +when isMainModule: + dispatchMulti( + [spawn], + [status], + [start], + [done], + [approve], + [requestChanges, cmdName = "request-changes"], + [merge], + [cancel] + ) +``` + +### State Machine Guards + +```nim +import std/[tables, strformat] +import tiny_sqlite + +type + WorkerState* = enum + wsAssigned = "ASSIGNED" + wsWorking = "WORKING" + wsConflicted = "CONFLICTED" + wsInReview = "IN_REVIEW" + wsApproved = "APPROVED" + wsCompleted = "COMPLETED" + wsFailed = "FAILED" + + InvalidTransition* = object of CatchableError + StaleState* = object of CatchableError + +# NOTE: STALE is NOT a persistent state - computed for display from heartbeat age +const ValidTransitions* = { + wsAssigned: @[wsWorking, wsFailed], + wsWorking: @[wsInReview, wsConflicted, wsFailed], + wsConflicted: @[wsInReview, wsWorking, wsFailed], + wsInReview: @[wsApproved, wsWorking, wsFailed], + wsApproved: @[wsCompleted, wsWorking, wsFailed], + wsCompleted: @[], + wsFailed: @[wsAssigned], # Retry +}.toTable + +proc transition*(db: DbConn, taskId: string, fromState, toState: WorkerState) = + ## Attempt state transition with compare-and-set guard + if toState notin ValidTransitions[fromState]: + raise newException(InvalidTransition, &"{fromState} → {toState}") + + db.exec("BEGIN IMMEDIATE") + try: + let current = db.value("SELECT state FROM workers WHERE task_id = ?", taskId).get.fromDbValue(string) + if current != $fromState: + raise newException(StaleState, &"Expected {fromState}, got {current}") + + db.exec("UPDATE workers SET state = ?, state_changed_at = ? WHERE task_id = ?", + $toState, epochTime(), taskId) + publishMessage(db, taskId, "state_change", %*{"from": $fromState, "to": $toState}) + db.exec("COMMIT") + except: + db.exec("ROLLBACK") + raise +``` + +### Build Configuration + +```nim +# config.nims +--mm:orc +--threads:on +-d:release +--opt:size + +# Static SQLite +switch("passC", "-DSQLITE_THREADSAFE=1") +switch("compile", "libs/sqlite3.c") +``` + +### Exit Codes + +```nim +const + ExitSuccess* = 0 + ExitUsageError* = 2 + ExitInvalidTransition* = 3 + ExitGitError* = 4 + ExitDbError* = 5 + ExitConflict* = 6 +``` + +## Integration with Existing Components + +### With Message Bus (skills-ms5) + +All state transitions publish messages: +```python +db.publish_message(task_id, 'state_change', {'from': 'ASSIGNED', 'to': 'WORKING'}) +``` + +### With review-gate (skills-byq) + +Review-gate reads worker state: +```python +state = db.get_state(task_id) +if state == 'IN_REVIEW': + # Check review status + ... +``` + +### With Status Dashboard (skills-yak) + +Status command queries message bus: +```python +workers = db.query(""" + SELECT task_id, state, last_heartbeat + FROM workers + ORDER BY state_changed_at DESC +""") +``` + +## Open Questions + +1. ~~**Approval flow**~~: Resolved - `worker approve` and `worker request-changes` commands added +2. **Agent wrapper**: Should `worker run ` wrap agent execution with heartbeats? +3. **Multiple attempts**: If task fails, `worker retry ` or new spawn? +4. **Integration branch**: Create on first spawn, or require it exists? + +## Resolved from Spec Review + +| Issue | Resolution | +|-------|------------| +| Missing approve command | Added `worker approve` and `worker request-changes` | +| Rebase conflict flow | Don't abort, leave in progress for resolution | +| Source of truth conflict | SQLite only; JSON files are derived caches | +| STALE as persistent state | Changed to computed for display from heartbeat age | +| `done` from CONFLICTED | Now allowed with --skip-rebase | +| Merge race condition | Added fetch-merge-push retry loop | +| Spawn race condition | Use DB unique constraint (implementation detail) | + +## References + +- State machine design: `docs/design/worker-state-machine.md` +- Message passing: `docs/design/message-passing-layer.md` +- Branch isolation: `docs/design/branch-per-worker.md` diff --git a/docs/design/worker-state-machine.md b/docs/design/worker-state-machine.md new file mode 100644 index 0000000..90a2f1e --- /dev/null +++ b/docs/design/worker-state-machine.md @@ -0,0 +1,258 @@ +# Worker State Machine Design + +**Status**: Draft +**Bead**: skills-4oj +**Epic**: skills-s6y (Multi-agent orchestration: Lego brick architecture) + +## Overview + +This document defines the state machine for background worker agents in the multi-agent coordination system. Workers are AI coding agents that operate on separate git branches, coordinated through local filesystem state. + +## Design Principles + +1. **File-based, no database** - State in JSON files, messages in append-only JSONL +2. **Minimal states** - Only what's needed, resist over-engineering +3. **Atomic updates** - Write to temp file, fsync, atomic rename +4. **Crash recovery** - Detect stale workers via timestamp + process checks +5. **Cross-agent compatible** - Any agent that reads/writes files can participate + +## State Diagram + +``` + ┌─────────────────────────────────────────────┐ + │ │ + ▼ │ +┌──────────┐ assign ┌──────────┐ start ┌─────────┐ │ +│ IDLE │ ──────────► │ ASSIGNED │ ─────────► │ WORKING │ ◄───┘ +└──────────┘ └──────────┘ └────┬────┘ changes + ▲ │ requested + │ │ + │ reset submit_pr │ + │ ▼ +┌──────────┐ timeout ┌──────────┐ review ┌─────────────┐ +│ FAILED │ ◄────────── │ STALE │ ◄───────── │ IN_REVIEW │ +└──────────┘ └──────────┘ └──────┬──────┘ + │ │ + │ approve │ + │ ▼ + │ ┌──────────┐ merge ┌─────────────┐ + └───────────────────│ COMPLETED│ ◄───────── │ APPROVED │ + retry └──────────┘ └─────────────┘ +``` + +## States + +| State | Description | Entry Condition | Exit Condition | +|-------|-------------|-----------------|----------------| +| **IDLE** | Worker available, no task | Initial / after reset | Task assigned | +| **ASSIGNED** | Task claimed, preparing | Orchestrator assigns task | Branch created, work starts | +| **WORKING** | Actively coding/testing | `git checkout -b` succeeds | PR submitted or failure | +| **IN_REVIEW** | PR open, awaiting review | PR created | Approved, changes requested, or timeout | +| **APPROVED** | Review passed, ready to merge | Reviewer approves | Merge succeeds or conflicts | +| **COMPLETED** | Work merged to main | Merge succeeds | (terminal) or recycle to IDLE | +| **STALE** | No progress detected | Heartbeat timeout | Manual intervention or auto-reset | +| **FAILED** | Unrecoverable error | Max retries exceeded | Manual reset or reassign | + +## State File Schema + +Location: `.worker-state/{worker-id}.json` + +```json +{ + "worker_id": "worker-auth", + "state": "WORKING", + "task_id": "skills-abc", + "branch": "worker-auth/skills-abc", + "assigned_at": "2026-01-10T14:00:00Z", + "state_changed_at": "2026-01-10T14:05:00Z", + "last_heartbeat": "2026-01-10T14:32:00Z", + "pid": 12345, + "attempt": 1, + "max_attempts": 3, + "last_error": null, + "pr_url": null, + "review_state": null +} +``` + +### Field Definitions + +| Field | Type | Description | +|-------|------|-------------| +| `worker_id` | string | Unique worker identifier (kebab-case) | +| `state` | enum | Current state (see States table) | +| `task_id` | string | Bead ID of assigned task | +| `branch` | string | Git branch name: `{worker_id}/{task_id}` | +| `assigned_at` | ISO8601 | When task was assigned | +| `state_changed_at` | ISO8601 | Last state transition timestamp | +| `last_heartbeat` | ISO8601 | Last activity timestamp | +| `pid` | int | Process ID (for crash detection) | +| `attempt` | int | Current attempt number (1-indexed) | +| `max_attempts` | int | Max retries before FAILED | +| `last_error` | string? | Error message if failed | +| `pr_url` | string? | Pull request URL when in IN_REVIEW | +| `review_state` | enum? | `pending`, `approved`, `changes_requested` | + +## Transitions + +### Valid Transitions + +``` +IDLE → ASSIGNED (assign_task) +ASSIGNED → WORKING (start_work) +ASSIGNED → FAILED (setup_failed) +WORKING → IN_REVIEW (submit_pr) +WORKING → FAILED (work_failed, max_attempts) +WORKING → STALE (heartbeat_timeout) +IN_REVIEW → APPROVED (review_approved) +IN_REVIEW → WORKING (changes_requested) +IN_REVIEW → STALE (review_timeout) +APPROVED → COMPLETED (merge_success) +APPROVED → WORKING (merge_conflict → rebase required) +STALE → WORKING (worker_recovered) +STALE → FAILED (timeout_exceeded) +FAILED → IDLE (reset/retry) +COMPLETED → IDLE (recycle) +``` + +### Transition Commands + +Each transition is triggered by a command or detected condition: + +| Command | From | To | Action | +|---------|------|----|----| +| `worker assign ` | IDLE | ASSIGNED | Write state file, record task | +| `worker start ` | ASSIGNED | WORKING | Create branch, start heartbeat | +| `worker submit ` | WORKING | IN_REVIEW | Push branch, create PR | +| `worker approve ` | IN_REVIEW | APPROVED | Record approval | +| `worker merge ` | APPROVED | COMPLETED | Merge PR, delete branch | +| `worker reset ` | FAILED | IDLE | Clear state, ready for new task | + +## File Operations + +### Atomic State Updates + +Never write directly to state file. Use atomic rename pattern: + +```bash +# Write to temp file +echo "$new_state" > .worker-state/${worker_id}.json.tmp + +# Sync to disk +sync .worker-state/${worker_id}.json.tmp + +# Atomic rename +mv .worker-state/${worker_id}.json.tmp .worker-state/${worker_id}.json +``` + +### Optimistic Concurrency + +Include `state_changed_at` as a version marker: + +1. Read current state file +2. Compute new state +3. Before write, re-read and verify `state_changed_at` unchanged +4. If changed, retry from step 1 + +### Lock File Pattern + +For operations needing exclusive access: + +```bash +# Acquire lock (atomic create with O_EXCL) +if ! (set -C; echo "$$" > .worker-state/${worker_id}.lock) 2>/dev/null; then + # Lock held - check if stale + lock_pid=$(cat .worker-state/${worker_id}.lock) + if ! kill -0 "$lock_pid" 2>/dev/null; then + # Process dead, steal lock + rm -f .worker-state/${worker_id}.lock + echo "$$" > .worker-state/${worker_id}.lock + else + exit 1 # Lock held by live process + fi +fi + +# ... do work ... + +# Release lock +rm -f .worker-state/${worker_id}.lock +``` + +## Stale Worker Detection + +Workers must update `last_heartbeat` during active work (every 30-60 seconds). + +Detection algorithm (run by orchestrator or patrol agent): + +``` +for each worker file in .worker-state/*.json: + if state in (ASSIGNED, WORKING, IN_REVIEW): + if (now - last_heartbeat) > STALE_TIMEOUT: + if pid is set and process not running: + transition to STALE (worker crashed) + elif (now - last_heartbeat) > DEAD_TIMEOUT: + transition to STALE (presumed dead) +``` + +### Timeout Values + +| Timeout | Duration | Description | +|---------|----------|-------------| +| `STALE_TIMEOUT` | 5 minutes | Mark as stale, eligible for intervention | +| `DEAD_TIMEOUT` | 15 minutes | Presume dead, ready for reassignment | +| `REVIEW_TIMEOUT` | 1 hour | Review taking too long, alert human | + +## Directory Structure + +``` +.worker-state/ +├── worker-auth.json # Worker state file +├── worker-auth.lock # Lock file (when held) +├── worker-refactor.json +├── messages/ +│ ├── orchestrator.jsonl # Orchestrator → workers +│ ├── worker-auth.jsonl # Worker → orchestrator +│ └── worker-refactor.jsonl +└── reviews/ + ├── worker-auth.json # Review state (review-gate) + └── worker-refactor.json +``` + +## Integration Points + +### With review-gate (skills-byq) + +When worker submits PR (WORKING → IN_REVIEW): +1. Worker writes review request to `reviews/{worker-id}.json` +2. review-gate Stop hook checks review state +3. On approval, orchestrator transitions to APPROVED + +### With message passing (skills-ms5) + +Workers append to their outbox: `.worker-state/messages/{worker-id}.jsonl` +Orchestrator appends to: `.worker-state/messages/orchestrator.jsonl` + +Message format: +```json +{"ts": "2026-01-10T14:00:00Z", "from": "worker-auth", "type": "done", "task": "skills-abc"} +``` + +### With branch isolation (skills-roq) + +- Branch naming: `{worker-id}/{task-id}` +- Mandatory rebase before IN_REVIEW → APPROVED +- Merge to main only from APPROVED state + +## Open Questions + +1. **Heartbeat mechanism**: Should workers write to their state file, a separate heartbeat file, or append to message log? +2. **PR creation**: Who creates the PR - worker agent or orchestrator? +3. **Review source**: Human review, AI review (review-gate), or both? +4. **Recycle vs terminate**: Should COMPLETED workers return to IDLE or be terminated? + +## References + +- Temporal.io: Event-sourced durable execution +- LangGraph: Graph-based state machines for agents +- Airflow: DAG-based task lifecycle (None → Scheduled → Running → Success) +- OpenHands: Event-sourced (Observation, Thought, Action) loop diff --git a/src/.gitignore b/src/.gitignore new file mode 100644 index 0000000..f47cb20 --- /dev/null +++ b/src/.gitignore @@ -0,0 +1 @@ +*.out diff --git a/src/README.md b/src/README.md new file mode 100644 index 0000000..a1b8271 --- /dev/null +++ b/src/README.md @@ -0,0 +1,80 @@ +# Worker CLI + +Multi-agent worker coordination CLI written in Nim. + +## Prerequisites + +```bash +# Install Nim (>= 2.0.0) +# On NixOS: nix-shell -p nim + +# Install dependencies +nimble install tiny_sqlite cligen +``` + +## Static SQLite (Optional) + +For a single static binary, download the SQLite amalgamation: + +```bash +curl -LO https://sqlite.org/2024/sqlite-amalgamation-3450000.zip +unzip sqlite-amalgamation-3450000.zip +cp sqlite-amalgamation-3450000/sqlite3.c libs/ +``` + +## Build + +```bash +# Development build +nim c src/worker.nim + +# Release build (optimized) +nim c -d:release src/worker.nim + +# The binary will be at src/worker +``` + +## Usage + +```bash +# Orchestrator commands (human) +worker spawn # Create workspace +worker status [--watch] # Dashboard +worker approve # Approve work +worker request-changes # Request changes +worker merge # Merge to integration +worker cancel # Abort worker + +# Agent commands (from worktree) +worker start # Begin work +worker done [--skip-rebase] # Complete work +worker fail # Signal failure +worker heartbeat # Manual heartbeat +worker show # Detailed view +``` + +## Project Structure + +``` +src/ +├── worker.nim # CLI entry point (cligen) +├── worker.nimble # Package definition +├── config.nims # Build configuration +├── libs/ +│ └── sqlite3.c # SQLite amalgamation (optional) +└── worker/ + ├── types.nim # Shared types and constants + ├── db.nim # SQLite operations + ├── state.nim # State machine + ├── heartbeat.nim # Background thread + ├── git.nim # Git/worktree operations + └── context.nim # Worker context handling +``` + +## Design Docs + +See `docs/design/`: +- `mvp-scope.md` - MVP scope and implementation order +- `message-passing-layer.md` - SQLite message bus +- `worker-cli-primitives.md` - CLI commands +- `worker-state-machine.md` - State transitions diff --git a/src/config.nims b/src/config.nims new file mode 100644 index 0000000..1b8bb06 --- /dev/null +++ b/src/config.nims @@ -0,0 +1,22 @@ +# Build configuration for worker CLI + +# Memory management: ORC handles cycles, deterministic destruction +switch("mm", "orc") + +# Enable threading for heartbeat +switch("threads", "on") + +# Release mode optimizations +when defined(release): + switch("opt", "size") + switch("passC", "-flto") + switch("passL", "-flto") + +# Static SQLite compilation +switch("passC", "-DSQLITE_THREADSAFE=1") +switch("passC", "-DSQLITE_ENABLE_JSON1") +switch("passC", "-DSQLITE_OMIT_LOAD_EXTENSION") + +# Compile SQLite amalgamation if present +when fileExists("libs/sqlite3.c"): + switch("compile", "libs/sqlite3.c") diff --git a/src/libs/.gitkeep b/src/libs/.gitkeep new file mode 100644 index 0000000..4300f62 --- /dev/null +++ b/src/libs/.gitkeep @@ -0,0 +1,2 @@ +# Place sqlite3.c here for static linking +# Download from: https://sqlite.org/download.html (amalgamation) diff --git a/src/worker.nim b/src/worker.nim new file mode 100644 index 0000000..559de73 --- /dev/null +++ b/src/worker.nim @@ -0,0 +1,341 @@ +## Worker CLI - Multi-agent coordination tool +## +## Usage: +## worker spawn Create a new worker workspace +## worker status Show dashboard of all workers +## worker start Signal ASSIGNED → WORKING +## worker done Signal WORKING → IN_REVIEW +## worker approve Approve completed work +## worker merge Merge approved work +## worker cancel Abort a worker + +import std/[os, strformat, json, times, terminal, strutils, sequtils] +import cligen +import tiny_sqlite +import worker/[types, db, state, git, context, heartbeat] + +# ----------------------------------------------------------------------------- +# Orchestrator Commands +# ----------------------------------------------------------------------------- + +proc spawn(taskId: string, description: string = "", + fromBranch: string = "origin/integration") = + ## Create a new worker workspace + let db = openBusDb() + defer: db.close() + + # Check if already exists + let existing = db.getWorker(taskId) + if existing.isSome: + echo "Worker already exists: ", taskId + echo " State: ", existing.get.state + echo " Worktree: ", existing.get.worktree + quit(ExitSuccess) + + # Create git worktree + let (branch, worktree) = createWorktree(taskId, fromBranch) + + # Create context file + discard createWorkerContext(taskId, branch, worktree, description) + + # Create worker in DB + discard db.createWorker(taskId, branch, worktree, description) + + echo "Created worker: ", taskId + echo " Branch: ", branch + echo " Worktree: ", worktree + echo " State: ASSIGNED" + +proc status(state: string = "", stale: bool = false, + json: bool = false, watch: bool = false) = + ## Show dashboard of all workers + proc render() = + let db = openBusDb() + defer: db.close() + + let workers = db.getAllWorkers() + + if json: + var arr = newJArray() + for w in workers: + arr.add(%*{ + "task_id": w.taskId, + "state": $w.state, + "branch": w.branch, + "stale": w.staleLevel, + "age": $(getTime() - w.createdAt) + }) + echo arr.pretty + return + + # Filter + var filtered = workers + if state != "": + filtered = workers.filterIt($it.state == state.toUpperAscii) + if stale: + filtered = filtered.filterIt(it.isStale) + + if filtered.len == 0: + echo "No workers found." + return + + # Table header + echo "" + echo "TASK".alignLeft(14), "STATE".alignLeft(12), "AGE".alignLeft(8), + "HEARTBEAT".alignLeft(12), "STATUS".alignLeft(8), "SUMMARY" + echo "-".repeat(70) + + for w in filtered: + let age = getTime() - w.createdAt + let ageStr = if age.inHours > 0: &"{age.inHours}h" + elif age.inMinutes > 0: &"{age.inMinutes}m" + else: &"{age.inSeconds}s" + + var hbStr = "--" + if w.lastHeartbeat != Time(): + let hbAge = getTime() - w.lastHeartbeat + hbStr = if hbAge.inMinutes > 0: &"{hbAge.inMinutes}m ago" + else: &"{hbAge.inSeconds}s ago" + + let staleStr = w.staleLevel + let summary = if w.description.len > 30: w.description[0..29] & "..." + else: w.description + + echo w.taskId.alignLeft(14), + ($w.state).alignLeft(12), + ageStr.alignLeft(8), + hbStr.alignLeft(12), + staleStr.alignLeft(8), + summary + + if watch: + while true: + eraseScreen() + setCursorPos(0, 0) + echo "Worker Status (", now().format("HH:mm:ss"), ") - Press Ctrl+C to exit" + render() + sleep(2000) + else: + render() + +proc approve(taskId: string, by: string = "", comment: string = "") = + ## Approve completed work (IN_REVIEW → APPROVED) + let db = openBusDb() + defer: db.close() + + db.transition(taskId, wsInReview, wsApproved) + echo "Approved: ", taskId + +proc requestChanges(taskId: string, comment: string = "") = + ## Request revisions (IN_REVIEW → WORKING) + let db = openBusDb() + defer: db.close() + + db.transition(taskId, wsInReview, wsWorking) + echo "Changes requested: ", taskId + if comment != "": + echo " Comment: ", comment + +proc merge(taskId: string, deleteBranch: bool = false) = + ## Merge approved work (APPROVED → COMPLETED) + let db = openBusDb() + defer: db.close() + + # Verify state + let st = db.getState(taskId) + if st.isNone: + echo "Worker not found: ", taskId + quit(ExitNotFound) + if st.get != wsApproved: + echo "Worker must be APPROVED to merge, got: ", st.get + quit(ExitInvalidTransition) + + # Attempt merge + let ok = mergeToIntegration(taskId) + if not ok: + echo "Merge conflict - returning to WORKING state" + db.transition(taskId, wsApproved, wsWorking) + quit(ExitConflict) + + # Success + db.transition(taskId, wsApproved, wsCompleted) + + if deleteBranch: + removeBranch(taskId) + + removeWorktree(taskId) + echo "Merged: ", taskId + +proc cancel(taskId: string, reason: string = "", cleanup: bool = false) = + ## Abort a worker (* → FAILED) + let db = openBusDb() + defer: db.close() + + db.transitionToFailed(taskId, reason) + echo "Cancelled: ", taskId + + if cleanup: + removeWorktree(taskId) + echo " Worktree removed" + +# ----------------------------------------------------------------------------- +# Agent Commands (run from worktree) +# ----------------------------------------------------------------------------- + +proc start(task: string = "") = + ## Signal ASSIGNED → WORKING (run from worktree) + let taskId = if task != "": task else: getTaskId() + + let db = openBusDb() + defer: db.close() + + # Check current state + let st = db.getState(taskId) + if st.isNone: + echo "Worker not found: ", taskId + quit(ExitNotFound) + + if st.get == wsWorking: + echo "Already WORKING: ", taskId + quit(ExitSuccess) + + db.transition(taskId, wsAssigned, wsWorking) + + # Start heartbeat + startGlobalHeartbeat(BusDbPath, taskId) + updateGlobalHeartbeat(hsWorking, taskId) + + echo "Started work on ", taskId + +proc done(skipRebase: bool = false) = + ## Signal WORKING → IN_REVIEW (includes rebase) + let ctx = findContext() + + let db = openBusDb() + defer: db.close() + + let st = db.getState(ctx.taskId) + if st.isNone: + echo "Worker not found: ", ctx.taskId + quit(ExitNotFound) + + # Check we're in WORKING or CONFLICTED + if st.get notin {wsWorking, wsConflicted}: + echo "Cannot complete from state: ", st.get + quit(ExitInvalidTransition) + + # Rebase unless skipped + if not skipRebase: + let ok = rebaseOnIntegration(ctx.worktree) + if not ok: + db.transition(ctx.taskId, st.get, wsConflicted) + let files = getConflictedFiles(ctx.worktree) + echo "ERROR: Rebase conflict detected" + echo " Conflicting files: ", files.join(", ") + echo " State: CONFLICTED" + echo "" + echo " To resolve:" + echo " 1. Fix conflicts in worktree" + echo " 2. git add " + echo " 3. git rebase --continue" + echo " 4. worker done --skip-rebase" + quit(ExitConflict) + else: + # Verify no rebase in progress + if isRebaseInProgress(ctx.worktree): + echo "ERROR: Rebase still in progress. Run: git rebase --continue" + quit(ExitConflict) + + # Push + pushBranch(ctx.worktree, ctx.branch) + + # Transition + db.transition(ctx.taskId, st.get, wsInReview) + + stopGlobalHeartbeat() + echo "Ready for review: ", ctx.taskId + +proc fail(reason: string) = + ## Signal WORKING → FAILED + let ctx = findContext() + + let db = openBusDb() + defer: db.close() + + db.transitionToFailed(ctx.taskId, reason) + stopGlobalHeartbeat() + echo "Failed: ", ctx.taskId, " - ", reason + +proc sendHeartbeat(status: string = "working", progress: float = 0.0) = + ## Emit a single heartbeat (normally done by background thread) + let ctx = findContext() + + let db = openBusDb() + defer: db.close() + + let hs = case status + of "idle": hsIdle + of "working": hsWorking + of "blocked": hsBlocked + else: hsWorking + + db.writeHeartbeat(ctx.taskId, hs, ctx.taskId, progress) + echo "Heartbeat: ", ctx.taskId, " - ", status + +proc show(taskId: string) = + ## Show detailed view of a worker + let db = openBusDb() + defer: db.close() + + let info = db.getWorker(taskId) + if info.isNone: + echo "Worker not found: ", taskId + quit(ExitNotFound) + + let w = info.get + echo "" + echo "Task: ", w.taskId + echo "Description: ", w.description + echo "State: ", w.state + echo "Branch: ", w.branch + echo "Worktree: ", w.worktree + echo "" + echo "Created: ", w.createdAt.format("yyyy-MM-dd HH:mm:ss") + echo "State Changed: ", w.stateChangedAt.format("yyyy-MM-dd HH:mm:ss") + if w.lastHeartbeat != Time(): + echo "Last Heartbeat: ", w.lastHeartbeat.format("yyyy-MM-dd HH:mm:ss") + echo "" + echo "Status: ", w.staleLevel + +# ----------------------------------------------------------------------------- +# CLI Dispatch +# ----------------------------------------------------------------------------- + +when isMainModule: + dispatchMulti( + [spawn, help = {"taskId": "Task identifier", + "description": "Task description", + "fromBranch": "Base branch"}], + [status, help = {"state": "Filter by state", + "stale": "Show only stale workers", + "json": "Output as JSON", + "watch": "Refresh every 2s"}], + [start, help = {"task": "Task ID (reads from context if empty)"}], + [done, help = {"skipRebase": "Skip rebase (after manual conflict resolution)"}], + [approve, help = {"taskId": "Task to approve", + "by": "Reviewer name", + "comment": "Approval comment"}], + [requestChanges, cmdName = "request-changes", + help = {"taskId": "Task to reject", + "comment": "Feedback for agent"}], + [merge, help = {"taskId": "Task to merge", + "deleteBranch": "Delete branch after merge"}], + [cancel, help = {"taskId": "Task to cancel", + "reason": "Cancellation reason", + "cleanup": "Remove worktree"}], + [fail, help = {"reason": "Failure reason"}], + [sendHeartbeat, cmdName = "heartbeat", + help = {"status": "Status (idle, working, blocked)", + "progress": "Progress 0.0-1.0"}], + [show, help = {"taskId": "Task to show"}] + ) diff --git a/src/worker.nimble b/src/worker.nimble new file mode 100644 index 0000000..72fede5 --- /dev/null +++ b/src/worker.nimble @@ -0,0 +1,15 @@ +# Package +version = "0.1.0" +author = "dan" +description = "Multi-agent worker coordination CLI" +license = "MIT" +srcDir = "." +bin = @["worker"] + +# Dependencies +requires "nim >= 2.0.0" +requires "tiny_sqlite >= 0.2.0" +requires "cligen >= 1.7.0" + +# Optional: faster JSON +# requires "jsony >= 1.1.0" diff --git a/src/worker/context.nim b/src/worker/context.nim new file mode 100644 index 0000000..aaa9d5e --- /dev/null +++ b/src/worker/context.nim @@ -0,0 +1,59 @@ +## Worker context handling +## +## Reads/writes .worker-ctx.json in worktrees for agent discovery. + +import std/[os, json, times, strformat] +import ./types + +proc writeContext*(worktree: string, ctx: WorkerContext) = + ## Write context file to worktree + let path = worktree / ContextFileName + writeFile(path, $ctx.toJson()) + +proc readContext*(worktree: string = ""): WorkerContext = + ## Read context from worktree. If empty, uses current directory. + let dir = if worktree != "": worktree else: getCurrentDir() + let path = dir / ContextFileName + + if not fileExists(path): + raise newException(IOError, &"Context file not found: {path}") + + let content = readFile(path) + let j = parseJson(content) + return WorkerContext.fromJson(j) + +proc findContext*(): WorkerContext = + ## Find context by walking up directory tree + var dir = getCurrentDir() + while dir != "" and dir != "/": + let path = dir / ContextFileName + if fileExists(path): + let content = readFile(path) + let j = parseJson(content) + return WorkerContext.fromJson(j) + dir = parentDir(dir) + + raise newException(IOError, "No .worker-ctx.json found in directory tree") + +proc isInWorktree*(): bool = + ## Check if current directory is inside a worker worktree + try: + discard findContext() + return true + except IOError: + return false + +proc getTaskId*(): string = + ## Get task ID from context + return findContext().taskId + +proc createWorkerContext*(taskId, branch, worktree, description: string): WorkerContext = + ## Create a new context and write to worktree + result = WorkerContext( + taskId: taskId, + branch: branch, + worktree: worktree, + createdAt: getTime(), + description: description + ) + writeContext(worktree, result) diff --git a/src/worker/db.nim b/src/worker/db.nim new file mode 100644 index 0000000..7e59d99 --- /dev/null +++ b/src/worker/db.nim @@ -0,0 +1,265 @@ +## SQLite database operations for worker coordination +## +## Key patterns: +## - One connection per thread (never share DbConn) +## - WAL mode for concurrent readers +## - BEGIN IMMEDIATE for write transactions +## - Explicit ack for at-least-once delivery + +import std/[os, json, strformat, options, strutils, random] +import tiny_sqlite +import ./types + +# Helper for generating unique IDs +proc genOid*(): string = + ## Generate a simple unique ID + result = "" + for i in 0..<16: + result.add(chr(ord('a') + rand(25))) + +const Schema* = """ +-- Messages table (seq is rowid-based for true auto-increment) +CREATE TABLE IF NOT EXISTS messages ( + seq INTEGER PRIMARY KEY AUTOINCREMENT, + id TEXT NOT NULL UNIQUE, + ts_ms INTEGER NOT NULL, + from_agent TEXT NOT NULL, + to_agent TEXT, + type TEXT NOT NULL, + correlation_id TEXT, + in_reply_to TEXT, + payload TEXT, + payload_ref TEXT, + CHECK (payload IS NULL OR payload_ref IS NULL) +); + +CREATE INDEX IF NOT EXISTS idx_messages_to_seq ON messages(to_agent, seq); +CREATE INDEX IF NOT EXISTS idx_messages_correlation_seq ON messages(correlation_id, seq); +CREATE INDEX IF NOT EXISTS idx_messages_type_from_ts ON messages(type, from_agent, ts_ms); + +-- Cursors table (per-agent read position) +CREATE TABLE IF NOT EXISTS cursors ( + agent_id TEXT PRIMARY KEY, + last_acked_seq INTEGER NOT NULL DEFAULT 0, + updated_at_ms INTEGER NOT NULL +); + +-- Heartbeats table (separate from messages to avoid write contention) +CREATE TABLE IF NOT EXISTS heartbeats ( + agent_id TEXT PRIMARY KEY, + ts_ms INTEGER NOT NULL, + status TEXT NOT NULL, + current_task TEXT, + progress REAL +); + +-- Task claims with lease expiry +CREATE TABLE IF NOT EXISTS task_claims ( + task_id TEXT PRIMARY KEY, + claimed_by TEXT NOT NULL, + claimed_at_ms INTEGER NOT NULL, + lease_until_ms INTEGER NOT NULL +); + +-- Workers table (state machine) +CREATE TABLE IF NOT EXISTS workers ( + task_id TEXT PRIMARY KEY, + state TEXT NOT NULL, + branch TEXT NOT NULL, + worktree TEXT NOT NULL, + description TEXT, + created_at_ms INTEGER NOT NULL, + state_changed_at_ms INTEGER NOT NULL +); + +-- Schema versioning +CREATE TABLE IF NOT EXISTS meta ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL +); + +-- Export tracking +CREATE TABLE IF NOT EXISTS export_state ( + id INTEGER PRIMARY KEY CHECK (id = 1), + last_seq INTEGER NOT NULL DEFAULT 0 +); +""" + +proc initSchema*(db: DbConn) = + ## Initialize database schema + db.execScript(Schema) + # Insert meta if not exists + db.exec("INSERT OR IGNORE INTO meta (key, value) VALUES ('schema_version', '1')") + db.exec("INSERT OR IGNORE INTO export_state (id, last_seq) VALUES (1, 0)") + +proc openBusDb*(dbPath: string = BusDbPath): DbConn = + ## Open database with required PRAGMAs. One connection per thread. + createDir(parentDir(dbPath)) + result = openDatabase(dbPath) + result.exec("PRAGMA busy_timeout = " & $BusyTimeoutMs) + result.exec("PRAGMA foreign_keys = ON") + result.exec("PRAGMA journal_mode = WAL") + result.exec("PRAGMA synchronous = NORMAL") + initSchema(result) + +proc optStr*(s: string): Option[string] = + ## Convert empty string to none + if s == "": none(string) else: some(s) + +proc publish*(db: DbConn, agentId: string, msgType: string, + to: string = "", correlationId: string = "", + inReplyTo: string = "", payload: JsonNode = nil): string = + ## Publish a message to the bus. Returns message ID. + let msgId = genOid() + let tsMs = epochMs() + let payloadStr = if payload.isNil: "" else: $payload + + db.exec(""" + INSERT INTO messages (id, ts_ms, from_agent, to_agent, type, + correlation_id, in_reply_to, payload) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + """, msgId, tsMs, agentId, + optStr(to), + msgType, + optStr(correlationId), + optStr(inReplyTo), + optStr(payloadStr)) + + return msgId + +type + Message* = object + seq*: int64 + id*: string + tsMs*: int64 + fromAgent*: string + toAgent*: Option[string] + msgType*: string + correlationId*: Option[string] + inReplyTo*: Option[string] + payload*: Option[JsonNode] + payloadRef*: Option[string] + +proc poll*(db: DbConn, agentId: string, limit: int = 100): seq[Message] = + ## Fetch unacknowledged messages. Does NOT advance cursor. + let cursor = db.one( + "SELECT last_acked_seq FROM cursors WHERE agent_id = ?", + agentId + ) + let lastSeq = if cursor.isSome: cursor.get[0].fromDbValue(int64) else: 0'i64 + + for row in db.iterate(""" + SELECT seq, id, ts_ms, from_agent, to_agent, type, + correlation_id, in_reply_to, payload, payload_ref + FROM messages + WHERE seq > ? + AND (to_agent IS NULL OR to_agent = ?) + ORDER BY seq + LIMIT ? + """, lastSeq, agentId, limit): + var msg = Message( + seq: row[0].fromDbValue(int64), + id: row[1].fromDbValue(string), + tsMs: row[2].fromDbValue(int64), + fromAgent: row[3].fromDbValue(string), + msgType: row[5].fromDbValue(string), + ) + if row[4].kind != sqliteNull: + msg.toAgent = some(row[4].fromDbValue(string)) + if row[6].kind != sqliteNull: + msg.correlationId = some(row[6].fromDbValue(string)) + if row[7].kind != sqliteNull: + msg.inReplyTo = some(row[7].fromDbValue(string)) + if row[8].kind != sqliteNull: + msg.payload = some(parseJson(row[8].fromDbValue(string))) + if row[9].kind != sqliteNull: + msg.payloadRef = some(row[9].fromDbValue(string)) + result.add(msg) + +proc ack*(db: DbConn, agentId: string, seq: int64) = + ## Acknowledge successful processing. Advances cursor. + let tsMs = epochMs() + db.exec(""" + INSERT INTO cursors (agent_id, last_acked_seq, updated_at_ms) + VALUES (?, ?, ?) + ON CONFLICT(agent_id) DO UPDATE SET + last_acked_seq = MAX(cursors.last_acked_seq, excluded.last_acked_seq), + updated_at_ms = excluded.updated_at_ms + """, agentId, seq, tsMs) + +proc writeHeartbeat*(db: DbConn, agentId: string, status: HeartbeatStatus, + currentTask: string = "", progress: float = 0.0) = + ## Upsert heartbeat to dedicated table + let tsMs = epochMs() + db.exec(""" + INSERT INTO heartbeats (agent_id, ts_ms, status, current_task, progress) + VALUES (?, ?, ?, ?, ?) + ON CONFLICT(agent_id) DO UPDATE SET + ts_ms = excluded.ts_ms, + status = excluded.status, + current_task = excluded.current_task, + progress = excluded.progress + """, agentId, tsMs, $status, + optStr(currentTask), + progress) + +proc getHeartbeat*(db: DbConn, agentId: string): Option[tuple[tsMs: int64, status: string]] = + ## Get last heartbeat for an agent + let row = db.one( + "SELECT ts_ms, status FROM heartbeats WHERE agent_id = ?", + agentId + ) + if row.isSome: + return some((tsMs: row.get[0].fromDbValue(int64), + status: row.get[1].fromDbValue(string))) + +proc tryClaim*(db: DbConn, taskId, agentId: string): bool = + ## Attempt to claim a task. Returns true if successful. + let tsMs = epochMs() + let leaseUntil = tsMs + LeaseDurationMs + + db.exec("BEGIN IMMEDIATE") + try: + let existing = db.one( + "SELECT claimed_by, lease_until_ms FROM task_claims WHERE task_id = ?", + taskId + ) + + if existing.isSome: + let leaseUntilMs = existing.get[1].fromDbValue(int64) + if leaseUntilMs > tsMs: + db.exec("ROLLBACK") + return false + else: + db.exec("DELETE FROM task_claims WHERE task_id = ?", taskId) + + db.exec(""" + INSERT INTO task_claims (task_id, claimed_by, claimed_at_ms, lease_until_ms) + VALUES (?, ?, ?, ?) + """, taskId, agentId, tsMs, leaseUntil) + + db.exec("COMMIT") + return true + except CatchableError: + db.exec("ROLLBACK") + return false + +proc renewClaim*(db: DbConn, taskId, agentId: string): bool = + ## Renew lease on a claimed task + let tsMs = epochMs() + let leaseUntil = tsMs + LeaseDurationMs + + db.exec(""" + UPDATE task_claims + SET lease_until_ms = ? + WHERE task_id = ? AND claimed_by = ? + """, leaseUntil, taskId, agentId) + + return db.changes() > 0 + +proc releaseClaim*(db: DbConn, taskId, agentId: string) = + ## Release a claim when done + db.exec(""" + DELETE FROM task_claims + WHERE task_id = ? AND claimed_by = ? + """, taskId, agentId) diff --git a/src/worker/git.nim b/src/worker/git.nim new file mode 100644 index 0000000..a03e225 --- /dev/null +++ b/src/worker/git.nim @@ -0,0 +1,143 @@ +## Git operations for worker coordination +## +## Uses osproc for subprocess execution. +## Key operations: worktree create/remove, rebase, merge + +import std/[os, osproc, strutils, strformat, streams] +import ./types + +proc runGit*(args: varargs[string], workDir: string = ""): tuple[output: string, exitCode: int] = + ## Run a git command and return output + exit code + var cmd = "git" + var fullArgs: seq[string] = @[] + + if workDir != "": + fullArgs.add("-C") + fullArgs.add(workDir) + + for arg in args: + fullArgs.add(arg) + + let process = startProcess(cmd, args = fullArgs, + options = {poUsePath, poStdErrToStdOut}) + result.output = process.outputStream.readAll() + result.exitCode = process.waitForExit() + process.close() + +proc runGitCheck*(args: varargs[string], workDir: string = ""): string = + ## Run git command, raise GitError on failure + let (output, code) = runGit(args, workDir) + if code != 0: + raise newException(GitError, &"Git command failed ({code}): {output}") + return output.strip() + +proc createWorktree*(taskId: string, fromBranch: string = "origin/integration"): tuple[branch, worktree: string] = + ## Create a worktree for a task + let branch = &"feat/{taskId}" + let worktree = &"{WorktreesDir}/{taskId}" + + # Fetch latest + discard runGit("fetch", "origin") + + # Create branch from base + discard runGitCheck("branch", branch, fromBranch) + + # Create worktree + createDir(parentDir(worktree)) + discard runGitCheck("worktree", "add", worktree, branch) + + return (branch, worktree) + +proc removeWorktree*(taskId: string) = + ## Remove a worktree + let worktree = &"{WorktreesDir}/{taskId}" + if dirExists(worktree): + discard runGit("worktree", "remove", "--force", worktree) + +proc removeBranch*(taskId: string, remote: bool = true) = + ## Remove feature branch + let branch = &"feat/{taskId}" + discard runGit("branch", "-D", branch) + if remote: + discard runGit("push", "origin", "--delete", branch) + +proc rebaseOnIntegration*(worktree: string): bool = + ## Rebase worktree on integration branch. Returns false on conflict. + discard runGit("fetch", "origin", workDir = worktree) + let (output, code) = runGit("rebase", "origin/integration", workDir = worktree) + + if code != 0: + if "CONFLICT" in output or "conflict" in output: + return false + # Other error - abort and raise + discard runGit("rebase", "--abort", workDir = worktree) + raise newException(GitError, &"Rebase failed: {output}") + + return true + +proc isRebaseInProgress*(worktree: string): bool = + ## Check if a rebase is in progress + return dirExists(worktree / ".git" / "rebase-merge") or + dirExists(worktree / ".git" / "rebase-apply") + +proc pushBranch*(worktree: string, branch: string) = + ## Push branch to origin + discard runGitCheck("push", "-u", "origin", branch, workDir = worktree) + +proc mergeToIntegration*(taskId: string, maxRetries: int = 3): bool = + ## Merge feature branch to integration with retry loop + let branch = &"feat/{taskId}" + + for attempt in 1..maxRetries: + # Fetch latest + discard runGitCheck("fetch", "origin", "integration", branch) + + # Checkout and reset integration + discard runGitCheck("checkout", "integration") + discard runGitCheck("reset", "--hard", "origin/integration") + + # Merge + let (mergeOutput, mergeCode) = runGit("merge", "--no-ff", branch, + "-m", &"Merge {branch}") + if mergeCode != 0: + if "CONFLICT" in mergeOutput: + discard runGit("merge", "--abort") + return false + raise newException(GitError, &"Merge failed: {mergeOutput}") + + # Push + let (pushOutput, pushCode) = runGit("push", "origin", "integration") + if pushCode == 0: + return true + + # Push failed, probably remote changed - retry + if attempt < maxRetries: + sleep(1000) # Brief pause before retry + continue + else: + raise newException(GitError, &"Push failed after {maxRetries} retries: {pushOutput}") + + return false + +proc getConflictedFiles*(worktree: string): seq[string] = + ## Get list of files with conflicts + let (output, _) = runGit("diff", "--name-only", "--diff-filter=U", + workDir = worktree) + for line in output.splitLines(): + if line.strip() != "": + result.add(line.strip()) + +proc getBranchStatus*(worktree: string): tuple[ahead, behind: int] = + ## Get commits ahead/behind integration + let (output, code) = runGit("rev-list", "--left-right", "--count", + "origin/integration...HEAD", workDir = worktree) + if code != 0: + return (0, 0) + + let parts = output.strip().split('\t') + if parts.len >= 2: + try: + result.behind = parseInt(parts[0]) + result.ahead = parseInt(parts[1]) + except ValueError: + discard diff --git a/src/worker/heartbeat.nim b/src/worker/heartbeat.nim new file mode 100644 index 0000000..2013dbd --- /dev/null +++ b/src/worker/heartbeat.nim @@ -0,0 +1,120 @@ +## Background heartbeat thread +## +## Nim threads don't share memory by default. +## Use channels for communication between main and heartbeat threads. +## +## Key patterns: +## - Dedicated thread with own SQLite connection +## - Channel for stop signal and status updates +## - Non-blocking tryRecv to check for commands + +import std/[os, times] +import tiny_sqlite +import ./types +import ./db + +type + HeartbeatCmd* = enum + hbUpdateStatus + hbStop + + HeartbeatMsg* = object + cmd*: HeartbeatCmd + status*: HeartbeatStatus + task*: string + progress*: float + + HeartbeatThread* = object + thread: Thread[HeartbeatArgs] + channel: Channel[HeartbeatMsg] + running: bool + + HeartbeatArgs = object + dbPath: string + agentId: string + channelPtr: ptr Channel[HeartbeatMsg] + +var globalChannel: Channel[HeartbeatMsg] + +proc heartbeatWorker(args: HeartbeatArgs) {.thread.} = + ## Dedicated heartbeat thread - owns its own DB connection + let db = openBusDb(args.dbPath) + var status = hsIdle + var task = "" + var progress = 0.0 + + while true: + # Check for commands (non-blocking) + let tried = args.channelPtr[].tryRecv() + if tried.dataAvailable: + case tried.msg.cmd + of hbStop: + db.close() + return + of hbUpdateStatus: + status = tried.msg.status + task = tried.msg.task + progress = tried.msg.progress + + # Write heartbeat + try: + db.writeHeartbeat(args.agentId, status, task, progress) + except CatchableError as e: + # Log but don't crash + stderr.writeLine("Heartbeat error: ", e.msg) + + sleep(HeartbeatIntervalMs) + +proc startHeartbeat*(dbPath, agentId: string): ptr HeartbeatThread = + ## Start the heartbeat thread. Returns handle for control. + result = cast[ptr HeartbeatThread](alloc0(sizeof(HeartbeatThread))) + result.channel.open() + result.running = true + + let args = HeartbeatArgs( + dbPath: dbPath, + agentId: agentId, + channelPtr: addr result.channel + ) + + createThread(result.thread, heartbeatWorker, args) + +proc updateStatus*(hb: ptr HeartbeatThread, status: HeartbeatStatus, + task: string = "", progress: float = 0.0) = + ## Update the status reported in heartbeats + if hb != nil and hb.running: + hb.channel.send(HeartbeatMsg( + cmd: hbUpdateStatus, + status: status, + task: task, + progress: progress + )) + +proc stopHeartbeat*(hb: ptr HeartbeatThread) = + ## Stop the heartbeat thread and clean up + if hb != nil and hb.running: + hb.channel.send(HeartbeatMsg(cmd: hbStop)) + joinThread(hb.thread) + hb.channel.close() + hb.running = false + dealloc(hb) + +# Simpler API using a global channel (for single-threaded CLI usage) +var globalHeartbeat: ptr HeartbeatThread = nil + +proc startGlobalHeartbeat*(dbPath, agentId: string) = + ## Start heartbeat using global state (simpler API for CLI) + if globalHeartbeat != nil: + return # Already running + globalHeartbeat = startHeartbeat(dbPath, agentId) + +proc updateGlobalHeartbeat*(status: HeartbeatStatus, task: string = "", + progress: float = 0.0) = + ## Update global heartbeat status + updateStatus(globalHeartbeat, status, task, progress) + +proc stopGlobalHeartbeat*() = + ## Stop global heartbeat + if globalHeartbeat != nil: + stopHeartbeat(globalHeartbeat) + globalHeartbeat = nil diff --git a/src/worker/state.nim b/src/worker/state.nim new file mode 100644 index 0000000..02640a9 --- /dev/null +++ b/src/worker/state.nim @@ -0,0 +1,241 @@ +## Worker state machine +## +## States: ASSIGNED → WORKING → IN_REVIEW → APPROVED → COMPLETED +## Also: CONFLICTED (rebase), FAILED (error) +## +## Key patterns: +## - Compare-and-set with BEGIN IMMEDIATE +## - STALE computed from heartbeat age (not persistent) + +import std/[tables, strformat, json, options, times] +import tiny_sqlite +import ./types +import ./db + +const ValidTransitions* = { + wsAssigned: @[wsWorking, wsFailed], + wsWorking: @[wsInReview, wsConflicted, wsFailed], + wsConflicted: @[wsInReview, wsWorking, wsFailed], + wsInReview: @[wsApproved, wsWorking, wsFailed], + wsApproved: @[wsCompleted, wsWorking, wsFailed], + wsCompleted: newSeq[WorkerState](), # Empty - no transitions out + wsFailed: @[wsAssigned], # Can retry +}.toTable + +proc canTransition*(fromState, toState: WorkerState): bool = + ## Check if transition is valid + if fromState notin ValidTransitions: + return false + return toState in ValidTransitions[fromState] + +proc transition*(db: DbConn, taskId: string, fromState, toState: WorkerState) = + ## Attempt state transition with compare-and-set guard. + ## Raises InvalidTransition or StaleState on failure. + if not canTransition(fromState, toState): + raise newException(InvalidTransition, &"Invalid transition: {fromState} → {toState}") + + db.exec("BEGIN IMMEDIATE") + try: + let row = db.one( + "SELECT state FROM workers WHERE task_id = ?", + taskId + ) + + if row.isNone: + db.exec("ROLLBACK") + raise newException(WorkerNotFound, &"Worker not found: {taskId}") + + let currentStr = row.get[0].fromDbValue(string) + let current = parseState(currentStr) + + if current != fromState: + db.exec("ROLLBACK") + raise newException(StaleState, &"Expected {fromState}, got {current}") + + let tsMs = epochMs() + db.exec(""" + UPDATE workers + SET state = ?, state_changed_at_ms = ? + WHERE task_id = ? + """, $toState, tsMs, taskId) + + # Publish state change message + discard db.publish("worker", "state_change", + correlationId = taskId, + payload = %*{ + "from": $fromState, + "to": $toState, + "task_id": taskId + }) + + db.exec("COMMIT") + except CatchableError: + db.exec("ROLLBACK") + raise + +proc transitionToFailed*(db: DbConn, taskId: string, reason: string = "") = + ## Transition any state to FAILED + db.exec("BEGIN IMMEDIATE") + try: + let row = db.one( + "SELECT state FROM workers WHERE task_id = ?", + taskId + ) + + if row.isNone: + db.exec("ROLLBACK") + raise newException(WorkerNotFound, &"Worker not found: {taskId}") + + let currentStr = row.get[0].fromDbValue(string) + let current = parseState(currentStr) + + if current == wsCompleted: + db.exec("ROLLBACK") + raise newException(InvalidTransition, "Cannot fail a completed worker") + + let tsMs = epochMs() + db.exec(""" + UPDATE workers + SET state = ?, state_changed_at_ms = ? + WHERE task_id = ? + """, $wsFailed, tsMs, taskId) + + discard db.publish("worker", "task_failed", + correlationId = taskId, + payload = %*{ + "from": $current, + "task_id": taskId, + "reason": reason + }) + + db.exec("COMMIT") + except CatchableError: + db.exec("ROLLBACK") + raise + +proc getState*(db: DbConn, taskId: string): Option[WorkerState] = + ## Get current state for a worker + let row = db.one( + "SELECT state FROM workers WHERE task_id = ?", + taskId + ) + if row.isSome: + return some(parseState(row.get[0].fromDbValue(string))) + +proc getWorker*(db: DbConn, taskId: string): Option[WorkerInfo] = + ## Get full worker information + let row = db.one(""" + SELECT task_id, state, branch, worktree, description, + created_at_ms, state_changed_at_ms + FROM workers WHERE task_id = ? + """, taskId) + + if row.isNone: + return none(WorkerInfo) + + var info = WorkerInfo( + taskId: row.get[0].fromDbValue(string), + state: parseState(row.get[1].fromDbValue(string)), + branch: row.get[2].fromDbValue(string), + worktree: row.get[3].fromDbValue(string), + createdAt: fromUnix(row.get[5].fromDbValue(int64) div 1000), + stateChangedAt: fromUnix(row.get[6].fromDbValue(int64) div 1000), + ) + + if row.get[4].kind != sqliteNull: + info.description = row.get[4].fromDbValue(string) + + # Get heartbeat + let hb = db.getHeartbeat(taskId) + if hb.isSome: + info.lastHeartbeat = fromUnix(hb.get.tsMs div 1000) + + return some(info) + +proc getAllWorkers*(db: DbConn): seq[WorkerInfo] = + ## Get all workers with their current state + for row in db.iterate(""" + SELECT w.task_id, w.state, w.branch, w.worktree, w.description, + w.created_at_ms, w.state_changed_at_ms, + h.ts_ms as heartbeat_ms + FROM workers w + LEFT JOIN heartbeats h ON w.task_id = h.agent_id + ORDER BY w.state_changed_at_ms DESC + """): + var info = WorkerInfo( + taskId: row[0].fromDbValue(string), + state: parseState(row[1].fromDbValue(string)), + branch: row[2].fromDbValue(string), + worktree: row[3].fromDbValue(string), + createdAt: fromUnix(row[5].fromDbValue(int64) div 1000), + stateChangedAt: fromUnix(row[6].fromDbValue(int64) div 1000), + ) + + if row[4].kind != sqliteNull: + info.description = row[4].fromDbValue(string) + + if row[7].kind != sqliteNull: + info.lastHeartbeat = fromUnix(row[7].fromDbValue(int64) div 1000) + + result.add(info) + +proc createWorker*(db: DbConn, taskId, branch, worktree: string, + description: string = ""): WorkerInfo = + ## Create a new worker in ASSIGNED state + let tsMs = epochMs() + + db.exec(""" + INSERT INTO workers (task_id, state, branch, worktree, description, + created_at_ms, state_changed_at_ms) + VALUES (?, ?, ?, ?, ?, ?, ?) + """, taskId, $wsAssigned, branch, worktree, + optStr(description), + tsMs, tsMs) + + discard db.publish("orchestrator", "task_assign", + correlationId = taskId, + payload = %*{ + "task_id": taskId, + "branch": branch, + "worktree": worktree, + "description": description + }) + + return WorkerInfo( + taskId: taskId, + state: wsAssigned, + branch: branch, + worktree: worktree, + description: description, + createdAt: fromUnix(tsMs div 1000), + stateChangedAt: fromUnix(tsMs div 1000), + ) + +proc isStale*(info: WorkerInfo): bool = + ## Check if worker is stale based on heartbeat age + if info.state notin {wsAssigned, wsWorking}: + return false + + if info.lastHeartbeat == Time(): + return true # No heartbeat yet + + let age = (epochMs() - toUnix(info.lastHeartbeat) * 1000) + return age > StaleThresholdMs + +proc staleLevel*(info: WorkerInfo): string = + ## Get stale level: ok, WARN, STALE, DEAD + if info.state notin {wsAssigned, wsWorking}: + return "ok" + + if info.lastHeartbeat == Time(): + return "DEAD" + + let age = (epochMs() - toUnix(info.lastHeartbeat) * 1000) + if age > DeadThresholdMs: + return "DEAD" + elif age > StaleThresholdMs: + return "STALE" + elif age > StaleWarnThresholdMs: + return "WARN" + else: + return "ok" diff --git a/src/worker/types.nim b/src/worker/types.nim new file mode 100644 index 0000000..36191f0 --- /dev/null +++ b/src/worker/types.nim @@ -0,0 +1,113 @@ +## Shared types for worker coordination +## +## This module defines the core types used across the worker CLI. + +import std/[times, json] + +type + WorkerState* = enum + ## Worker lifecycle states + ## NOTE: STALE is NOT a state - it's computed from heartbeat age + wsAssigned = "ASSIGNED" + wsWorking = "WORKING" + wsConflicted = "CONFLICTED" + wsInReview = "IN_REVIEW" + wsApproved = "APPROVED" + wsCompleted = "COMPLETED" + wsFailed = "FAILED" + + WorkerContext* = object + ## Context for a worker, stored in .worker-ctx.json + taskId*: string + branch*: string + worktree*: string + createdAt*: Time + description*: string + + WorkerInfo* = object + ## Full worker information for status display + taskId*: string + state*: WorkerState + branch*: string + worktree*: string + createdAt*: Time + stateChangedAt*: Time + lastHeartbeat*: Time + description*: string + + HeartbeatStatus* = enum + ## Status reported in heartbeats + hsIdle = "idle" + hsWorking = "working" + hsBlocked = "blocked" + + # Errors + InvalidTransition* = object of CatchableError + StaleState* = object of CatchableError + WorkerNotFound* = object of CatchableError + GitError* = object of CatchableError + DbError* = object of CatchableError + +const + # Exit codes + ExitSuccess* = 0 + ExitUsageError* = 2 + ExitInvalidTransition* = 3 + ExitGitError* = 4 + ExitDbError* = 5 + ExitConflict* = 6 + ExitNotFound* = 7 + + # Paths + WorkerStateDir* = ".worker-state" + BusDbPath* = ".worker-state/bus.db" + BusJsonlPath* = ".worker-state/bus.jsonl" + BlobsDir* = ".worker-state/blobs" + WorkersDir* = ".worker-state/workers" + WorktreesDir* = "worktrees" + ContextFileName* = ".worker-ctx.json" + + # Timing + HeartbeatIntervalMs* = 10_000 # 10 seconds + StaleWarnThresholdMs* = 30_000 # 30 seconds + StaleThresholdMs* = 100_000 # 100 seconds + DeadThresholdMs* = 300_000 # 5 minutes + LeaseDurationMs* = 60_000 # 60 seconds + BusyTimeoutMs* = 5000 # 5 seconds + +proc parseState*(s: string): WorkerState = + ## Parse state string to enum + case s + of "ASSIGNED": wsAssigned + of "WORKING": wsWorking + of "CONFLICTED": wsConflicted + of "IN_REVIEW": wsInReview + of "APPROVED": wsApproved + of "COMPLETED": wsCompleted + of "FAILED": wsFailed + else: raise newException(ValueError, "Unknown state: " & s) + +proc epochMs*(): int64 = + ## Current time as epoch milliseconds + let t = getTime() + toUnix(t) * 1000 + nanosecond(t) div 1_000_000 + +proc toJson*(ctx: WorkerContext): JsonNode = + ## Serialize context to JSON + %*{ + "task_id": ctx.taskId, + "branch": ctx.branch, + "worktree": ctx.worktree, + "created_at": $ctx.createdAt, + "description": ctx.description + } + +proc fromJson*(T: typedesc[WorkerContext], j: JsonNode): WorkerContext = + ## Deserialize context from JSON + WorkerContext( + taskId: j["task_id"].getStr, + branch: j["branch"].getStr, + worktree: j["worktree"].getStr, + createdAt: parse(j["created_at"].getStr, "yyyy-MM-dd'T'HH:mm:sszzz").toTime(), + description: j{"description"}.getStr + )