feat: add worker CLI scaffold in Nim

Multi-agent coordination CLI with SQLite message bus:
- State machine: ASSIGNED -> WORKING -> IN_REVIEW -> APPROVED -> COMPLETED
- Commands: spawn, start, done, approve, merge, cancel, fail, heartbeat
- SQLite WAL mode, dedicated heartbeat thread, channel-based IPC
- cligen for CLI, tiny_sqlite for DB, ORC memory management

Design docs for branch-per-worker, state machine, message passing,
and human observability patterns.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
dan 2026-01-10 18:47:47 -08:00
parent 555dbb8ccd
commit 1c66d019bd
20 changed files with 4423 additions and 0 deletions

View file

@ -0,0 +1,406 @@
# Branch-per-Worker Isolation Design
**Status**: Draft
**Bead**: skills-roq
**Epic**: skills-s6y (Multi-agent orchestration: Lego brick architecture)
## Overview
This document defines the git branching strategy for multi-agent coordination. Each worker operates in an isolated git worktree on a dedicated branch, with mandatory rebase before review.
## Design Principles
1. **Orchestrator controls branch lifecycle** - Creates, assigns, cleans up
2. **Worktrees for parallelism** - Each worker gets isolated directory
3. **Integration branch as staging** - Buffer before main
4. **SQLite = process truth, Git = code truth** - Don't duplicate state
5. **Mandatory rebase** - Fresh base before review (consensus requirement)
## Key Decisions
### Branch Naming: `type/task-id`
**Decision**: Use `type/task-id` (e.g., `feat/skills-abc`, `fix/skills-xyz`)
**Rationale** (2/3 consensus):
- Branch describes *work*, not *worker*
- Survives task reassignment (if Claude fails, Gemini can continue)
- Worker identity in commit author: `Author: claude-3.5 <agent@bot>`
**Rejected alternative**: `worker-id/task-id` - becomes misleading on reassignment
### Worktrees vs Checkout
**Decision**: Git worktrees (parallel directories)
**Rationale** (3/3 consensus):
- `git checkout` updates working directory globally
- If Worker A checks out while Worker B writes, corruption possible
- Worktrees share `.git` object database, isolate filesystem
- Maps cleanly to "one worker = one workspace"
```
/project/
├── .git/ # Shared object database
├── worktrees/
│ ├── skills-abc/ # Worker 1's worktree
│ │ └── (full working copy)
│ └── skills-xyz/ # Worker 2's worktree
│ └── (full working copy)
└── (main working copy) # For orchestrator
```
### Integration Branch
**Decision**: Use rolling `integration` branch as staging before `main`
**Rationale** (3/3 consensus):
- AI agents introduce subtle regressions
- Integration branch = "demilitarized zone" for combined testing
- Human review before promoting to main
- Allows batching of related changes
```
main ─────────────────────────●─────────────
integration ────●────●────●────●────●
↑ ↑ ↑ ↑
feat/T-101 ────● │ │ │
feat/T-102 ─────────● │ │
fix/T-103 ──────────────● │
feat/T-104 ───────────────────●
```
### Conflict Handling
**Decision**: Worker resolves trivial conflicts, escalates semantic conflicts
**Rationale** (2/3 consensus - flash-or, gpt):
- Blanket "never resolve" is safe but slows throughput
- Mechanical conflicts (formatting, imports, non-overlapping) are safe
- Logic conflicts require human judgment
**Rules**:
```python
def handle_rebase_conflict(conflict_info):
# Trivial: resolve automatically
if is_trivial_conflict(conflict_info):
resolve_mechanically()
run_tests()
if tests_pass():
continue_rebase()
else:
abort_and_escalate()
# Semantic: always escalate
else:
git_rebase_abort()
set_state(CONFLICTED)
notify_orchestrator()
```
**Trivial conflict criteria**:
- Only whitespace/formatting changes
- Import statement ordering
- Non-overlapping edits in same file
- Less than N lines changed in conflict region
**Escalate if**:
- Conflict touches core logic
- Conflict spans multiple files
- Test failures after resolution
- Uncertain about correctness
### State Machine Mapping
**Decision**: SQLite is process truth, Git is code truth
**Rationale** (2/3 consensus - gemini, gpt):
- Don't encode state in Git (tags, notes) - causes sync issues
- Observable Git signals already exist:
| Worker State | Git Observable |
|--------------|----------------|
| ASSIGNED | Branch exists, worktree created |
| WORKING | New commits appearing |
| IN_REVIEW | Branch pushed, PR opened (or flag in SQLite) |
| APPROVED | PR approved |
| COMPLETED | Merged to integration/main |
| CONFLICTED | Rebase aborted, no new commits |
**Link via task-id**: Commit trailers connect the two:
```
feat: implement user authentication
Task: skills-abc
Agent: claude-3.5
```
### Cross-Worker Dependencies
**Decision**: Strict serialization - don't depend on uncommitted work
**Rationale** (3/3 consensus):
- "Speculative execution" creates house of cards
- If A's code rejected, B's work becomes invalid
- Cheaper to wait than waste tokens on orphaned code
**Pattern for parallel work on related features**:
1. Orchestrator creates epic branch: `epic/auth-system`
2. Both workers branch from epic: `feat/T-101`, `feat/T-102`
3. Workers rebase onto epic, not main
4. Epic merged to integration when all tasks complete
### Branch Cleanup
**Decision**: Delete after merge, archive failures
**Rationale** (3/3 consensus):
- Prevent branch bloat
- Archive failures for post-mortem analysis
```bash
# On successful merge
git branch -d feat/T-101
git worktree remove worktrees/T-101
# On failure/abandonment
git branch -m feat/T-101 archive/T-101-$(date +%Y%m%d)
git worktree remove worktrees/T-101
```
## Workflow
### 1. Task Assignment
Orchestrator prepares workspace:
```bash
# 1. Fetch latest
git fetch origin
# 2. Create branch from integration
git branch feat/$TASK_ID origin/integration
# 3. Create worktree
git worktree add worktrees/$TASK_ID feat/$TASK_ID
# 4. Update SQLite
publish(db, 'orchestrator', {
'type': 'task_assign',
'to': worker_id,
'correlation_id': task_id,
'payload': {
'branch': f'feat/{task_id}',
'worktree': f'worktrees/{task_id}'
}
})
```
### 2. Worker Starts
Worker receives assignment:
```bash
cd worktrees/$TASK_ID
# Confirm environment
git status
git log --oneline -3
# Begin work...
```
### 3. Worker Commits
During work:
```bash
git add -A
git commit -m "feat: implement feature X
Task: $TASK_ID
Agent: $AGENT_ID"
```
Update SQLite:
```python
publish(db, agent_id, {
'type': 'state_change',
'correlation_id': task_id,
'payload': {'from': 'ASSIGNED', 'to': 'WORKING'}
})
```
### 4. Pre-Review Rebase (Mandatory)
Before requesting review:
```bash
# 1. Fetch latest integration
git fetch origin integration
# 2. Attempt rebase
git rebase origin/integration
# 3. Handle result
if [ $? -eq 0 ]; then
# Success - push and request review
git push -u origin feat/$TASK_ID
# Update SQLite: IN_REVIEW
else
# Conflict - check if trivial
if is_trivial_conflict; then
resolve_and_continue
else
git rebase --abort
# Update SQLite: CONFLICTED
fi
fi
```
### 5. Review
Review happens (human or review-gate):
```python
# Check review state
review = get_review_state(task_id)
if review['decision'] == 'approved':
publish(db, 'reviewer', {
'type': 'review_result',
'correlation_id': task_id,
'payload': {'decision': 'approved'}
})
elif review['decision'] == 'changes_requested':
publish(db, 'reviewer', {
'type': 'review_result',
'correlation_id': task_id,
'payload': {
'decision': 'changes_requested',
'feedback': review['comments']
}
})
# Worker returns to WORKING state
```
### 6. Merge
On approval:
```bash
# Orchestrator merges to integration
git checkout integration
git merge --no-ff feat/$TASK_ID -m "Merge feat/$TASK_ID: $TITLE"
git push origin integration
# Cleanup
git branch -d feat/$TASK_ID
git push origin --delete feat/$TASK_ID
git worktree remove worktrees/$TASK_ID
```
### 7. Promote to Main
Periodically (or per-task):
```bash
# When integration is green
git checkout main
git merge --ff-only integration
git push origin main
```
## Directory Structure
```
/project/
├── .git/
├── .worker-state/
│ ├── bus.db # SQLite message bus
│ └── workers/
│ └── worker-auth.json
├── worktrees/ # Worker worktrees (gitignored)
│ ├── skills-abc/
│ └── skills-xyz/
└── (main working copy)
```
Add to `.gitignore`:
```
worktrees/
```
## Conflict Resolution Script
```bash
#!/bin/bash
# scripts/try-rebase.sh
TASK_ID=$1
TARGET_BRANCH=${2:-origin/integration}
cd worktrees/$TASK_ID
git fetch origin
# Attempt rebase
if git rebase $TARGET_BRANCH; then
echo "Rebase successful"
exit 0
fi
# Check conflict severity
CONFLICT_FILES=$(git diff --name-only --diff-filter=U)
CONFLICT_COUNT=$(echo "$CONFLICT_FILES" | wc -l)
# Trivial: single file, small diff
if [ "$CONFLICT_COUNT" -le 2 ]; then
# Try automatic resolution for whitespace/formatting
for file in $CONFLICT_FILES; do
if git checkout --theirs "$file" 2>/dev/null; then
git add "$file"
else
echo "Cannot auto-resolve: $file"
git rebase --abort
exit 2 # CONFLICTED
fi
done
if git rebase --continue; then
echo "Auto-resolved trivial conflicts"
exit 0
fi
fi
# Non-trivial: abort and escalate
git rebase --abort
echo "Conflict requires human intervention"
exit 2 # CONFLICTED
```
## Integration with State Machine
| State | Git Action | SQLite Message |
|-------|------------|----------------|
| IDLE → ASSIGNED | Branch + worktree created | `task_assign` |
| ASSIGNED → WORKING | First commit | `state_change` |
| WORKING → IN_REVIEW | Push + rebase success | `review_request` |
| WORKING → CONFLICTED | Rebase failed | `state_change` + `escalate` |
| IN_REVIEW → APPROVED | Review passes | `review_result` |
| IN_REVIEW → WORKING | Changes requested | `review_result` |
| APPROVED → COMPLETED | Merged | `task_done` |
## Open Questions
1. **Worktree location**: `./worktrees/` or `/tmp/worktrees/`?
2. **Integration → main cadence**: Per-task, hourly, daily, manual?
3. **Epic branches**: How complex should the epic workflow be?
4. **Failed branch retention**: How long to keep archived branches?
## References
- OpenHands: https://docs.openhands.dev/sdk/guides/iterative-refinement
- Gastown worktrees: https://github.com/steveyegge/gastown
- Git worktrees: https://git-scm.com/docs/git-worktree

View file

@ -0,0 +1,405 @@
# Human Observability Design
**Status**: Draft
**Bead**: skills-yak
**Epic**: skills-s6y (Multi-agent orchestration: Lego brick architecture)
## Overview
This document defines the observability interface for human orchestrators managing AI workers. The design follows kubectl/docker patterns: one root command with subcommands, table output, watch mode, and detailed describe views.
## Design Decisions (from orch consensus)
| Decision | Choice | Rationale |
|----------|--------|-----------|
| Command structure | One root, many subcommands | kubectl-style; composable; easy to extend |
| Status columns | id, state, age, heartbeat, staleness | At-a-glance signal for stuck/healthy workers |
| Stale threshold | 3x heartbeat = WARN, 10x = STALE | Avoid false positives from jitter |
| Watch mode | `--watch` flag | Multi-agent is event-driven; real-time visibility |
| Detail view | Full spec, transitions, errors, "why stuck" | Answer "what, why, where stuck?" |
## Commands
### Command Overview
```
worker status [options] # Dashboard table
worker show <task-id> # Detailed view
worker logs <task-id> # Message history
worker stats # Aggregate metrics
```
### `worker status`
Dashboard view of all workers.
```bash
worker status [--state STATE] [--stale] [--watch] [--json]
```
**Default Output**:
```
TASK STATE AGE HEARTBEAT STATUS SUMMARY
skills-abc WORKING 45m 2m ago ok Fix auth bug
skills-xyz IN_REVIEW 2h -- ok Add login form
skills-123 WORKING 3h 12m ago STALE Refactor database
skills-456 CONFLICTED 1h 5m ago blocked Update API endpoints
```
**Columns**:
| Column | Source | Description |
|--------|--------|-------------|
| TASK | state file | Task ID |
| STATE | state file | Current state (color-coded) |
| AGE | state file | Time since created |
| HEARTBEAT | bus.db | Time since last heartbeat |
| STATUS | computed | ok, WARN, STALE, blocked, error |
| SUMMARY | task description | First 30 chars of description |
**State Colors** (if terminal supports):
- 🟢 Green: WORKING
- 🟡 Yellow: IN_REVIEW, ASSIGNED
- 🔴 Red: FAILED, CONFLICTED, STALE
- ⚪ Gray: COMPLETED
**Options**:
| Option | Description |
|--------|-------------|
| `--state STATE` | Filter by state (WORKING, IN_REVIEW, etc.) |
| `--stale` | Show only stale workers |
| `--watch` | Refresh every 2 seconds |
| `--json` | Output as JSON array |
| `--wide` | Show additional columns (branch, worker type) |
**Watch Mode**:
```bash
worker status --watch
# Clears screen, refreshes every 2s
# Ctrl+C to exit
```
### `worker show <task-id>`
Detailed view of a single worker.
```bash
worker show skills-abc [--events] [--json]
```
**Output**:
```
Task: skills-abc
Description: Fix authentication bug in login flow
State: WORKING
Branch: feat/skills-abc
Worktree: worktrees/skills-abc
Created: 2026-01-10 14:00:00 (45m ago)
State Changed: 2026-01-10 14:05:00 (40m ago)
Last Heartbeat: 2026-01-10 14:43:00 (2m ago)
Status: ok
✓ Heartbeat within threshold
✓ State progressing normally
State History:
14:00:00 → ASSIGNED (spawned)
14:05:00 → WORKING (worker start)
Git Status:
Branch: feat/skills-abc
Ahead of integration: 3 commits
Behind integration: 0 commits
Uncommitted changes: 2 files
Recent Messages:
14:43:00 heartbeat status=working, progress=0.6
14:35:00 heartbeat status=working, progress=0.4
14:05:00 state_change ASSIGNED → WORKING
14:00:00 task_assign from=orchestrator
```
**With `--events`**: Show full message history.
**Sections**:
1. **Header**: Task ID, description, current state, branch
2. **Timestamps**: Created, state changed, last heartbeat
3. **Status Check**: Is it healthy? What's wrong?
4. **State History**: Timeline of transitions
5. **Git Status**: Branch status, commits, conflicts
6. **Recent Messages**: Last 10 messages from bus.db
### `worker logs <task-id>`
Stream message history for a task.
```bash
worker logs skills-abc [--follow] [--since 1h] [--type heartbeat]
```
**Output**:
```
2026-01-10 14:00:00 task_assign from=orchestrator
2026-01-10 14:05:00 state_change ASSIGNED → WORKING
2026-01-10 14:15:00 heartbeat status=working
2026-01-10 14:25:00 heartbeat status=working
...
```
**Options**:
| Option | Description |
|--------|-------------|
| `--follow, -f` | Stream new messages as they arrive |
| `--since DURATION` | Show messages from last N minutes/hours |
| `--type TYPE` | Filter by message type |
| `--limit N` | Show last N messages (default: 50) |
### `worker stats`
Aggregate metrics across all workers.
```bash
worker stats [--since 24h]
```
**Output**:
```
Workers by State:
WORKING: 2
IN_REVIEW: 1
COMPLETED: 5
FAILED: 1
Total: 9
Health:
Healthy: 3 (33%)
Stale: 1 (11%)
Timing (median):
ASSIGNED → WORKING: 2m
WORKING → IN_REVIEW: 45m
IN_REVIEW → APPROVED: 15m
Full cycle: 1h 10m
Failures (last 24h): 1
skills-789: Rebase conflict (3h ago)
```
## Stale Detection
### Thresholds
Based on heartbeat interval `H` (default: 10s):
| Level | Threshold | Meaning |
|-------|-----------|---------|
| OK | < 3H (30s) | Normal operation |
| WARN | 3H - 10H (30s - 100s) | Possible issue |
| STALE | > 10H (100s) | Worker likely stuck/dead |
| DEAD | > 30H (5m) | Worker definitely dead |
### Stale vs Stuck
Two different problems:
| Condition | Symptom | Detection |
|-----------|---------|-----------|
| **Stale worker** | No heartbeats | `now - last_heartbeat > threshold` |
| **Stuck task** | Heartbeating but no progress | Same state for > N minutes |
**Stuck detection**:
```sql
SELECT task_id FROM workers
WHERE state = 'WORKING'
AND state_changed_at < datetime('now', '-30 minutes')
AND last_heartbeat > datetime('now', '-1 minute')
```
## Status Computation
```python
def compute_status(worker: Worker) -> str:
now = datetime.utcnow()
heartbeat_age = (now - worker.last_heartbeat).total_seconds()
state_age = (now - worker.state_changed_at).total_seconds()
H = 10 # heartbeat interval
# Check heartbeat freshness
if worker.state in ('ASSIGNED', 'WORKING'):
if heartbeat_age > 30 * H: # 5 min
return 'DEAD'
if heartbeat_age > 10 * H: # 100s
return 'STALE'
if heartbeat_age > 3 * H: # 30s
return 'WARN'
# Check for conflicts/failures
if worker.state == 'CONFLICTED':
return 'blocked'
if worker.state == 'FAILED':
return 'error'
# Check for stuck (working but no progress)
if worker.state == 'WORKING' and state_age > 30 * 60: # 30 min
return 'stuck'
return 'ok'
```
## Output Formatting
### Table Format
Use fixed-width columns for alignment:
```python
COLUMNS = [
('TASK', 12),
('STATE', 11),
('AGE', 6),
('HEARTBEAT', 10),
('STATUS', 7),
('SUMMARY', 30),
]
def format_row(worker):
return f"{worker.task_id:<12} {worker.state:<11} {format_age(worker.age):<6} ..."
```
### JSON Format
For scripting:
```json
[
{
"task_id": "skills-abc",
"state": "WORKING",
"age_seconds": 2700,
"last_heartbeat": "2026-01-10T14:43:00Z",
"status": "ok",
"branch": "feat/skills-abc"
}
]
```
### Color Codes
```python
STATE_COLORS = {
'ASSIGNED': 'yellow',
'WORKING': 'green',
'IN_REVIEW': 'yellow',
'APPROVED': 'green',
'COMPLETED': 'dim',
'CONFLICTED': 'red',
'FAILED': 'red',
'STALE': 'red',
}
STATUS_COLORS = {
'ok': 'green',
'WARN': 'yellow',
'STALE': 'red',
'DEAD': 'red',
'blocked': 'yellow',
'error': 'red',
'stuck': 'yellow',
}
```
## Integration
### With Worker CLI (skills-sse)
`worker status` is part of the same CLI:
```python
@worker.command()
@click.option('--watch', is_flag=True)
@click.option('--state')
@click.option('--stale', is_flag=True)
@click.option('--json', 'as_json', is_flag=True)
def status(watch, state, stale, as_json):
"""Show worker dashboard."""
...
```
### With Message Bus (skills-ms5)
Query SQLite for heartbeats:
```sql
SELECT task_id, MAX(ts) as last_heartbeat
FROM messages
WHERE type = 'heartbeat'
GROUP BY task_id
```
### With State Files
Read `.worker-state/workers/*.json` for current state.
## Implementation
### Python Package
```
skills/worker/
├── commands/
│ ├── status.py # Dashboard
│ ├── show.py # Detail view
│ ├── logs.py # Message history
│ └── stats.py # Aggregate metrics
└── display/
├── table.py # Table formatting
├── colors.py # Terminal colors
└── watch.py # Watch mode loop
```
### Watch Mode Implementation
```python
import time
import os
def watch_loop(render_fn, interval=2):
"""Clear screen and re-render every interval seconds."""
try:
while True:
os.system('clear') # or use curses
render_fn()
time.sleep(interval)
except KeyboardInterrupt:
pass
```
## MVP Scope
For MVP, implement:
1. ✅ `worker status` - basic table with state, age, heartbeat
2. ✅ `worker status --watch` - refresh mode
3. ✅ `worker status --json` - JSON output
4. ✅ `worker show <id>` - basic detail view
5. ⏸️ `worker logs` - defer (can query SQLite directly)
6. ⏸️ `worker stats` - defer (nice to have)
7. ⏸️ TUI dashboard - defer (CLI sufficient for 2-4 workers)
## Open Questions
1. **Notification/alerts**: Should there be `worker watch --alert` that beeps on failures?
2. **Shell integration**: Prompt integration showing worker count/status?
3. **Log streaming**: Real-time follow for `worker logs --follow`?
## References
- `docker ps` output format
- `kubectl get pods` and `kubectl describe`
- `systemctl status` detail level
- `htop` for TUI inspiration (future)

View file

@ -0,0 +1,140 @@
# Message Passing: Design Comparison
**Purpose**: Compare our design decisions against Beads and Tissue to validate approach and identify gaps.
## Summary Comparison
| Decision | Our Design (v2) | Beads | Tissue |
|----------|-----------------|-------|--------|
| **Primary storage** | SQLite (WAL mode) | SQLite + JSONL export | JSONL (append-only) |
| **Cache/index** | N/A (SQLite is primary) | SQLite is primary | SQLite (derived) |
| **Write locking** | SQLite BEGIN IMMEDIATE | SQLite BEGIN IMMEDIATE | None (git merge) |
| **Concurrency model** | SQLite transactions | Optimistic (hash IDs) + SQLite txn | Optimistic (git merge) |
| **Crash safety** | SQLite atomic commit | SQLite transactions | Git (implicit) |
| **Heartbeats** | Yes (10s interval) | No (daemon only) | No |
| **Liveness detection** | SQL query on heartbeat timestamps | Not documented | Not documented |
| **Large payloads** | Blob storage (>4KB) | Compaction/summarization | Not addressed |
| **Coordination** | Polling + claim-check | `bd ready` queries | `tissue ready` queries |
| **Message schema** | Explicit (id, ts, from, type, payload) | Implicit (issue events) | Implicit (issue events) |
| **Human debugging** | JSONL export (read-only) | JSONL in git | JSONL primary |
**Decision (2026-01-10)**: After orch consensus with 3 models, we aligned with Beads' approach (SQLite primary) over Tissue's (JSONL primary). Key factors:
- Payloads 1-50KB exceed POSIX atomic write guarantees (~4KB)
- Crash mid-write with flock still corrupts log
- SQLite transactions provide true atomicity
- JSONL export preserves human debugging (`tail -f`)
## Detailed Analysis
### Where We Align
**1. JSONL as Source of Truth**
All three systems use append-only JSONL as the authoritative store. This is the right call:
- Git-friendly (merges cleanly)
- Human-readable (debuggable with `cat | jq`)
- Simple to implement
**2. SQLite as Derived Cache**
All three use SQLite for queries, not as primary storage:
- Beads: Always-on cache with dirty tracking
- Tissue: Derived index, gitignored
- Ours: Phase 2 optimization
**3. Pull-Based Coordination**
All use polling/queries rather than push events:
- `bd ready` / `tissue ready` / our `poll()` function
- Simpler than event-driven, works across process boundaries
### Where We Diverge
**1. Write Locking Strategy**
| System | Approach | Trade-off |
|--------|----------|-----------|
| **Ours** | flock on JSONL file | Simple, prevents interleaving, works locally |
| **Beads** | SQLite BEGIN IMMEDIATE | Stronger guarantees, more complex |
| **Tissue** | None (trust git merge) | Simplest, but can corrupt JSONL mid-write |
**Our rationale**: flock is simpler than SQLite transactions and safer than trusting git merge for mid-write crashes. Tissue's approach assumes writes complete atomically, which isn't guaranteed for large JSON lines.
**2. Crash Safety**
| System | Approach |
|--------|----------|
| **Ours** | Write to staging → validate → append under lock → delete staging |
| **Beads** | SQLite transactions (rollback on failure) |
| **Tissue** | Git recovery (implicit) |
**Our rationale**: Staging directory adds explicit crash recovery without SQLite complexity. If agent dies mid-write, staged file is recovered on restart.
**3. Heartbeats / Liveness**
| System | Approach |
|--------|----------|
| **Ours** | Mandatory heartbeats every 10s, timeout detection |
| **Beads** | Background daemon (no explicit heartbeats) |
| **Tissue** | None |
**Our rationale**: LLM API calls can hang indefinitely. Without heartbeats, a stuck agent blocks tasks forever. Beads/Tissue are issue trackers, not real-time coordination systems.
**4. Large Payload Handling**
| System | Approach |
|--------|----------|
| **Ours** | Blob storage with content-addressable hashing |
| **Beads** | Compaction (summarize old tasks) |
| **Tissue** | Not addressed |
**Our rationale**: Code diffs and agent outputs can be large. Blob storage keeps the log scannable. Beads' compaction is for context windows, not payload size.
**5. Message Schema**
| System | Schema Type |
|--------|-------------|
| **Ours** | Explicit message schema (id, ts, from, to, type, payload) |
| **Beads** | Issue-centric (tasks with dependencies, audit trail) |
| **Tissue** | Issue-centric (similar to Beads) |
**Our rationale**: We need general message passing (state changes, heartbeats, claims), not just issue tracking. Beads/Tissue are issue trackers first; we're building coordination primitives.
### Gaps in Our Design (Learned from Beads)
**1. Hash-Based IDs for Merge Safety**
Beads uses hash-based IDs (e.g., `bd-a1b2`) to prevent merge collisions. We should consider this for message IDs if multiple agents might create messages offline and merge later.
**2. Dirty Tracking for Incremental Export**
Beads tracks "dirty" issues for efficient JSONL export. When we add SQLite cache, we should track which messages need re-export rather than full rescans.
**3. File Hash Validation**
Beads stores JSONL file hash to detect external modifications. We could add this to detect corruption or manual edits.
### Gaps in Our Design (Learned from Tissue)
**1. FTS5 Full-Text Search**
Tissue's SQLite cache includes FTS5 for searching issue content. Useful for "find messages mentioning X" queries in Phase 2.
**2. Simpler Concurrency (Maybe)**
Tissue trusts git merge without explicit locking. For single-machine scenarios with small writes, this might be sufficient. We could offer a "simple mode" without flock for low-contention cases.
## Validation Verdict
Our design is **more complex than Tissue but simpler than Beads**, which matches our use case:
- **Tissue**: Issue tracker, optimizes for git collaboration
- **Beads**: Full workflow engine with daemon, RPC, recipes
- **Ours**: Coordination primitives for multi-agent coding
The key additions we make (heartbeats, blob storage, staging directory) are justified by our real-time coordination requirements that issue trackers don't have.
## Recommended Updates to Design
1. **Add hash-based message IDs** - Prevent merge collisions if agents work offline
2. **Add file hash validation** - Detect log corruption on startup
3. **Document "simple mode"** - No flock for single-agent or low-contention scenarios
4. **Plan for FTS5** - Add to Phase 2 SQLite cache design
## References
- Beads source: https://github.com/steveyegge/beads
- Tissue source: https://github.com/evil-mind-evil-sword/tissue
- Our design: docs/design/message-passing-layer.md

View file

@ -0,0 +1,749 @@
# Message Passing Layer Design
**Status**: Draft (v4 - Nim implementation)
**Bead**: skills-ms5
**Epic**: skills-s6y (Multi-agent orchestration: Lego brick architecture)
**Language**: Nim (ORC, threads, tiny_sqlite)
## Changelog
- **v4**: Nim implementation details (tiny_sqlite, dedicated heartbeat thread, channels)
- **v3**: Fixed BLOCKERs from orch spec-review (seq autoincrement, ack semantics, heartbeats table, epoch timestamps, JSONL export, task leases)
- **v2**: Changed to SQLite primary (was JSONL+flock)
## Overview
This document defines the local message passing layer for multi-agent coordination. Agents communicate through a shared SQLite database with ACID guarantees. A JSONL export provides human debugging and git history.
## Design Principles
1. **SQLite as source of truth** - Atomic commits, crash recovery, indexed queries
2. **JSONL for observability** - Human-readable export for debugging and git diffs
3. **File-based, no network** - Any agent with SQLite bindings can participate
4. **Liveness detection** - Heartbeats to detect dead agents
5. **Blob storage for large payloads** - Keep database lean
## Why SQLite Over JSONL+flock
| Concern | JSONL+flock | SQLite |
|---------|-------------|--------|
| Crash mid-write | Corrupted log, manual recovery | Atomic commit, automatic rollback |
| Payload >4KB | Interleaved writes possible | Always atomic |
| Query efficiency | O(N) scan | O(1) with index |
| Portability | POSIX flock only | Works everywhere |
| Complexity | DIY recovery logic | Built-in WAL, transactions |
See `docs/design/message-passing-comparison.md` for full analysis.
## Prior Art
| System | Approach | Key Insight |
|--------|----------|-------------|
| Beads | SQLite + JSONL export | SQLite for coordination, JSONL for git |
| Tissue | JSONL primary, SQLite cache | Simpler but less safe |
| Kafka | Append-only log, offsets | Total ordering, replay |
| Redis Streams | Consumer groups, claims | First claim wins |
## Directory Structure
```
.worker-state/
├── bus.db # SQLite - source of truth (gitignored)
├── bus.jsonl # JSONL export for debugging/git (read-only)
├── blobs/ # Content-addressable storage for large payloads
│ └── sha256-a1b2c3...
└── workers/
└── worker-auth.json # Worker state (separate from messages)
```
## Database Schema
```sql
-- Messages table (seq is rowid-based for true auto-increment)
CREATE TABLE messages (
seq INTEGER PRIMARY KEY AUTOINCREMENT, -- Guaranteed monotonic, auto-assigned
id TEXT NOT NULL UNIQUE, -- UUID for idempotent retries
ts_ms INTEGER NOT NULL, -- Epoch milliseconds (reliable comparisons)
from_agent TEXT NOT NULL,
to_agent TEXT, -- NULL = broadcast
type TEXT NOT NULL,
correlation_id TEXT,
in_reply_to TEXT,
payload TEXT, -- JSON blob or NULL
payload_ref TEXT, -- Blob hash for large payloads
CHECK (payload IS NULL OR payload_ref IS NULL) -- Not both
);
CREATE INDEX idx_messages_to_seq ON messages(to_agent, seq);
CREATE INDEX idx_messages_correlation_seq ON messages(correlation_id, seq);
CREATE INDEX idx_messages_type_from_ts ON messages(type, from_agent, ts_ms);
-- Cursors table (per-agent read position)
-- NOTE: Cursor is only advanced AFTER successful processing (at-least-once)
CREATE TABLE cursors (
agent_id TEXT PRIMARY KEY,
last_acked_seq INTEGER NOT NULL DEFAULT 0, -- Last successfully processed
updated_at_ms INTEGER NOT NULL
);
-- Heartbeats table (separate from messages to avoid write contention)
-- One row per agent, upsert pattern
CREATE TABLE heartbeats (
agent_id TEXT PRIMARY KEY,
ts_ms INTEGER NOT NULL,
status TEXT NOT NULL, -- idle, working, blocked
current_task TEXT,
progress REAL
);
-- Task claims with lease expiry (prevents zombie claims)
CREATE TABLE task_claims (
task_id TEXT PRIMARY KEY,
claimed_by TEXT NOT NULL,
claimed_at_ms INTEGER NOT NULL,
lease_until_ms INTEGER NOT NULL -- Must renew or release
);
-- Schema versioning for migrations
CREATE TABLE meta (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
);
INSERT INTO meta (key, value) VALUES ('schema_version', '1');
-- Export tracking (not line count!)
CREATE TABLE export_state (
id INTEGER PRIMARY KEY CHECK (id = 1),
last_seq INTEGER NOT NULL DEFAULT 0
);
INSERT INTO export_state (id, last_seq) VALUES (1, 0);
```
### Connection Setup (REQUIRED)
PRAGMAs must be set on every connection. **Each thread must have its own connection.**
```nim
import tiny_sqlite
proc openBusDb*(dbPath: string): DbConn =
## Open database with required PRAGMAs. One connection per thread.
result = openDatabase(dbPath)
result.exec("PRAGMA busy_timeout = 5000")
result.exec("PRAGMA foreign_keys = ON")
result.exec("PRAGMA journal_mode = WAL")
result.exec("PRAGMA synchronous = NORMAL")
```
**Critical**: Do NOT share `DbConn` across threads. Each thread opens its own connection.
## Message Schema
Same fields as before, stored in SQLite:
```json
{
"id": "uuid-v4",
"ts": "2026-01-10T14:00:00.000Z",
"from": "worker-auth",
"to": "orchestrator",
"type": "task_done",
"correlation_id": "skills-abc",
"payload": {
"branch": "worker-auth/skills-abc",
"commit": "a1b2c3d4"
}
}
```
### Message Types
**Task Lifecycle:**
- `task_assign` - Orchestrator assigns task to worker
- `task_claim` - Worker claims a task (first wins)
- `task_progress` - Worker reports progress
- `task_done` - Worker completed task
- `task_failed` - Worker failed task
- `task_blocked` - Worker blocked on dependency
**Coordination:**
- `heartbeat` - Liveness signal
- `state_change` - Worker state transition
- `review_request` - Request review of work
- `review_result` - Review approved/rejected
**System:**
- `agent_started` - Agent came online
- `agent_stopped` - Agent shutting down gracefully
## Delivery Semantics
**This layer provides AT-LEAST-ONCE delivery.**
- Messages are durable once committed
- Cursor only advances after successful processing
- Handlers MUST be idempotent or deduplicate by `message.id`
- Out-of-order processing is possible between unrelated messages
## Write Protocol
### Publishing Messages
```python
import sqlite3
import uuid
import json
import time
def publish(db_path, agent_id, message):
msg_id = message.get('id') or str(uuid.uuid4())
ts_ms = int(time.time() * 1000) # Epoch milliseconds
# Handle large payloads
payload = message.get('payload')
payload_ref = None
if payload:
payload_json = json.dumps(payload)
if len(payload_json) > 4096:
payload_ref = save_blob(payload) # Atomic write (see Blob Storage)
payload = None
else:
payload = payload_json
conn = get_connection(db_path)
try:
# Use plain BEGIN for simple inserts (less contention than IMMEDIATE)
# seq is auto-assigned by SQLite
conn.execute("""
INSERT INTO messages (id, ts_ms, from_agent, to_agent, type,
correlation_id, in_reply_to, payload, payload_ref)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
msg_id,
ts_ms,
agent_id,
message.get('to'),
message['type'],
message.get('correlation_id'),
message.get('in_reply_to'),
payload,
payload_ref
))
conn.commit()
except sqlite3.IntegrityError:
# Duplicate id = already published (idempotent retry)
conn.rollback()
except Exception:
conn.rollback()
raise
finally:
conn.close()
return msg_id
```
### Write Contention
- SQLite WAL allows concurrent readers but **one writer at a time**
- `busy_timeout = 5000` handles short contention
- For high-frequency writes, add retry with exponential backoff
- Heartbeats use separate table to avoid contending with task messages
## Read Protocol
### Polling for New Messages (At-Least-Once)
**CRITICAL**: Do NOT update cursor until message is successfully processed.
```python
def poll(db_path, agent_id, limit=100):
"""Fetch unacknowledged messages. Does NOT advance cursor."""
conn = get_connection(db_path)
# Get cursor position
cursor = conn.execute(
"SELECT last_acked_seq FROM cursors WHERE agent_id = ?",
(agent_id,)
).fetchone()
last_seq = cursor['last_acked_seq'] if cursor else 0
# Fetch new messages for this agent (or broadcasts)
rows = conn.execute("""
SELECT * FROM messages
WHERE seq > ?
AND (to_agent IS NULL OR to_agent = ?)
ORDER BY seq
LIMIT ?
""", (last_seq, agent_id, limit)).fetchall()
conn.close()
# Convert to dicts, resolve blob refs
messages = []
for row in rows:
msg = dict(row)
if msg['payload_ref']:
try:
msg['payload'] = load_blob(msg['payload_ref'])
except FileNotFoundError:
msg['payload_error'] = 'blob_missing'
elif msg['payload']:
try:
msg['payload'] = json.loads(msg['payload'])
except json.JSONDecodeError:
msg['payload_error'] = 'decode_failed'
messages.append(msg)
return messages
def ack(db_path, agent_id, seq):
"""Acknowledge successful processing of message. Advances cursor."""
conn = get_connection(db_path)
ts_ms = int(time.time() * 1000)
# Only advance forward (monotonic)
conn.execute("""
INSERT INTO cursors (agent_id, last_acked_seq, updated_at_ms)
VALUES (?, ?, ?)
ON CONFLICT(agent_id) DO UPDATE SET
last_acked_seq = MAX(cursors.last_acked_seq, excluded.last_acked_seq),
updated_at_ms = excluded.updated_at_ms
""", (agent_id, seq, ts_ms))
conn.commit()
conn.close()
```
### Processing Loop
```python
def process_loop(db_path, agent_id, handler):
"""Process messages with at-least-once semantics."""
while running:
messages = poll(db_path, agent_id)
for msg in messages:
try:
handler(msg)
ack(db_path, agent_id, msg['seq']) # Only after success
except Exception as e:
# Don't ack - message will be redelivered
log.error(f"Failed to process {msg['id']}: {e}")
break # Stop batch on failure
time.sleep(poll_interval)
```
### Adaptive Polling
```python
POLL_ACTIVE = 0.2 # 200ms when working
POLL_IDLE = 2.0 # 2s when idle
POLL_BACKOFF = 1.5 # Multiplier on each empty poll
poll_interval = POLL_ACTIVE
while running:
messages = poll(db_path, my_id)
if messages:
process(messages)
poll_interval = POLL_ACTIVE # Reset to fast
else:
poll_interval = min(poll_interval * POLL_BACKOFF, POLL_IDLE)
time.sleep(poll_interval)
```
## Heartbeat Protocol
Heartbeats use a **separate table** with upsert to avoid contending with task messages.
**CRITICAL**: Heartbeats MUST run in a dedicated thread to avoid false death detection during blocking operations (LLM calls can take 60s+).
### Nim Implementation
Nim threads don't share memory by default. Use channels for communication:
```nim
import std/[times, os, channels]
import tiny_sqlite
type
HeartbeatCmd* = enum
hbUpdateStatus, hbStop
HeartbeatMsg* = object
cmd*: HeartbeatCmd
status*: string
task*: string
progress*: float
var heartbeatChan*: Channel[HeartbeatMsg]
proc writeHeartbeat(db: DbConn, agentId, status, task: string, progress: float) =
let tsMs = toUnix(getTime()) * 1000 + nanosecond(getTime()) div 1_000_000
db.exec("""
INSERT INTO heartbeats (agent_id, ts_ms, status, current_task, progress)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(agent_id) DO UPDATE SET
ts_ms = excluded.ts_ms,
status = excluded.status,
current_task = excluded.current_task,
progress = excluded.progress
""", agentId, tsMs, status, task, progress)
proc heartbeatWorker*(args: tuple[dbPath, agentId: string]) {.thread.} =
## Dedicated heartbeat thread - owns its own DB connection
let db = openBusDb(args.dbPath)
var status = "idle"
var task = ""
var progress = 0.0
while true:
# Check for commands (non-blocking)
let tried = heartbeatChan.tryRecv()
if tried.dataAvailable:
case tried.msg.cmd
of hbStop:
db.close()
return
of hbUpdateStatus:
status = tried.msg.status
task = tried.msg.task
progress = tried.msg.progress
# Write heartbeat
try:
writeHeartbeat(db, args.agentId, status, task, progress)
except CatchableError:
discard # Log but don't crash
sleep(10_000) # 10 seconds
# Usage from main thread:
proc startHeartbeat*(dbPath, agentId: string): Thread[tuple[dbPath, agentId: string]] =
heartbeatChan.open()
createThread(result, heartbeatWorker, (dbPath, agentId))
proc updateHeartbeatStatus*(status, task: string, progress: float) =
heartbeatChan.send(HeartbeatMsg(cmd: hbUpdateStatus, status: status, task: task, progress: progress))
proc stopHeartbeat*() =
heartbeatChan.send(HeartbeatMsg(cmd: hbStop))
```
### Heartbeat Rules
1. Emit every 10 seconds via background thread
2. Include current status: `idle`, `working`, `blocked`
3. Stale thresholds: 30s = WARN, 100s = STALE, 5min = DEAD
### Liveness Query
```sql
-- Find suspected dead agents (no heartbeat in 30s)
-- Uses epoch ms for reliable comparison
SELECT agent_id, ts_ms,
(strftime('%s', 'now') * 1000 - ts_ms) / 1000 AS age_seconds
FROM heartbeats
WHERE ts_ms < (strftime('%s', 'now') * 1000 - 30000);
```
## Task Claim Pattern
Task claims use a **dedicated table with leases** to prevent zombie claims when agents die.
```python
LEASE_DURATION_MS = 60000 # 60 seconds
def try_claim_task(db_path, task_id, agent_id):
"""Attempt to claim a task. Returns True if successful."""
conn = get_connection(db_path)
ts_ms = int(time.time() * 1000)
lease_until = ts_ms + LEASE_DURATION_MS
try:
conn.execute("BEGIN IMMEDIATE")
# Check for existing valid claim
existing = conn.execute("""
SELECT claimed_by, lease_until_ms FROM task_claims
WHERE task_id = ?
""", (task_id,)).fetchone()
if existing:
if existing['lease_until_ms'] > ts_ms:
# Valid claim exists
conn.rollback()
return False
else:
# Expired claim - delete and reclaim
conn.execute("DELETE FROM task_claims WHERE task_id = ?", (task_id,))
# Insert new claim
conn.execute("""
INSERT INTO task_claims (task_id, claimed_by, claimed_at_ms, lease_until_ms)
VALUES (?, ?, ?, ?)
""", (task_id, agent_id, ts_ms, lease_until))
conn.commit()
return True
except sqlite3.OperationalError:
# Lock contention - another agent got there first
conn.rollback()
return False
finally:
conn.close()
def renew_claim(db_path, task_id, agent_id):
"""Renew lease on a claimed task. Call periodically while working."""
conn = get_connection(db_path)
ts_ms = int(time.time() * 1000)
lease_until = ts_ms + LEASE_DURATION_MS
result = conn.execute("""
UPDATE task_claims
SET lease_until_ms = ?
WHERE task_id = ? AND claimed_by = ?
""", (lease_until, task_id, agent_id))
conn.commit()
updated = result.rowcount > 0
conn.close()
return updated
def release_claim(db_path, task_id, agent_id):
"""Release a claim when done (success or failure)."""
conn = get_connection(db_path)
conn.execute("""
DELETE FROM task_claims
WHERE task_id = ? AND claimed_by = ?
""", (task_id, agent_id))
conn.commit()
conn.close()
```
### Claim Lifecycle
1. Agent calls `try_claim_task()` - gets exclusive claim with 60s lease
2. Agent periodically calls `renew_claim()` (every 30s) while working
3. On completion, agent calls `release_claim()`
4. If agent dies, lease expires and task becomes claimable again
## JSONL Export for Debugging
Background process or post-commit hook exports to JSONL.
**CRITICAL**: Track last exported seq in database, NOT by counting lines. Seq values can have gaps from rollbacks.
```python
def export_to_jsonl(db_path, jsonl_path):
"""Export new messages to JSONL. Idempotent, crash-safe."""
conn = get_connection(db_path)
# Get last exported seq from database
row = conn.execute(
"SELECT last_seq FROM export_state WHERE id = 1"
).fetchone()
last_exported = row['last_seq'] if row else 0
# Export new messages
rows = conn.execute(
"SELECT * FROM messages WHERE seq > ? ORDER BY seq",
(last_exported,)
).fetchall()
if not rows:
conn.close()
return
max_seq = last_exported
with open(jsonl_path, 'a') as f:
for row in rows:
msg = dict(row)
# Optionally resolve blob refs for full content
f.write(json.dumps(msg) + '\n')
max_seq = max(max_seq, msg['seq'])
# Update export state (atomic with file write visible)
conn.execute("""
UPDATE export_state SET last_seq = ? WHERE id = 1
""", (max_seq,))
conn.commit()
conn.close()
```
This gives you `tail -f bus.jsonl` for debugging while SQLite remains source of truth.
**Note**: JSONL contains blob references, not resolved payloads. For full content, use SQLite queries.
## Blob Storage
Content-addressable storage for large payloads. Uses **atomic write** pattern (temp + rename) to prevent partial reads.
```python
import tempfile
import hashlib
def save_blob(content):
"""Atomically write blob. Returns ref or raises."""
data = json.dumps(content).encode()
hash_hex = hashlib.sha256(data).hexdigest()
ref = f"sha256-{hash_hex}"
path = f".worker-state/blobs/{ref}"
if os.path.exists(path):
return ref # Already exists (content-addressable = idempotent)
# Atomic write: temp file + fsync + rename
os.makedirs(os.path.dirname(path), exist_ok=True)
fd, tmp_path = tempfile.mkstemp(dir=os.path.dirname(path))
try:
with os.fdopen(fd, 'wb') as f:
f.write(data)
f.flush()
os.fsync(f.fileno())
os.replace(tmp_path, path) # Atomic on POSIX
except:
if os.path.exists(tmp_path):
os.unlink(tmp_path)
raise
return ref
def load_blob(ref):
"""Load blob content. Raises FileNotFoundError if missing."""
path = f".worker-state/blobs/{ref}"
with open(path, 'rb') as f:
return json.load(f)
```
## Phased Implementation
### Phase 0: MVP
- SQLite with WAL mode
- Basic publish/poll
- Heartbeats every 10s
- Simple polling (500ms)
### Phase 1: Robustness
- Task claim with transactions
- JSONL export for debugging
- Blob storage for large payloads
- Dead agent detection
### Phase 2: Performance
- FTS5 for full-text search
- Message compaction/archival
- Adaptive polling
### Phase 3: Scale (if ever)
- Read replicas
- Sharding by correlation_id
- Background workers for maintenance
## Integration Points
### With Worker State Machine (skills-4oj)
State transitions publish messages:
```python
publish(db, worker_id, {
'type': 'state_change',
'payload': {'from': 'WORKING', 'to': 'IN_REVIEW'}
})
```
### With review-gate (skills-byq)
Review requests/results via messages:
```python
publish(db, worker_id, {
'type': 'review_request',
'correlation_id': task_id
})
```
### With Observability (skills-yak)
Status command queries SQLite directly:
```sql
SELECT from_agent, type, ts, payload
FROM messages
WHERE type IN ('heartbeat', 'state_change')
ORDER BY seq DESC LIMIT 100;
```
## Open Questions
1. ~~**Database location**~~: `.worker-state/bus.db` (resolved)
2. **Export frequency**: On-commit hook vs periodic background? (defer to implementation)
3. **Message retention**: Archive messages older than min cursor? Vacuum schedule?
4. ~~**Schema migrations**~~: `meta.schema_version` table added; agents check on startup
5. **Security**: All agents currently trusted; add HMAC signing if isolation needed?
6. **Backpressure**: How to slow producers when consumers overwhelmed?
## Resolved from Spec Review
| Issue | Resolution |
|-------|------------|
| `seq` not auto-increment | Changed to `INTEGER PRIMARY KEY AUTOINCREMENT` |
| At-most-once delivery | Cursor now advanced only after ack |
| Heartbeats in messages table | Separate `heartbeats` table with upsert |
| Heartbeats block during LLM | `HeartbeatThread` runs independently |
| JSONL line-count broken | `export_state` table tracks last seq |
| Timestamp format unreliable | Changed to epoch milliseconds |
| Zombie task claims | `task_claims` table with lease expiry |
| Blob writes not atomic | Temp file + fsync + rename pattern |
## Nim Implementation Notes
### Dependencies
```nim
# nimble install
requires "tiny_sqlite >= 0.2.0"
requires "jsony >= 1.1.0" # Optional, faster JSON
```
### Build Configuration
```nim
# config.nims or bus.nim.cfg
--mm:orc
--threads:on
-d:release
# Static SQLite (single binary):
{.passC: "-DSQLITE_THREADSAFE=1".}
{.compile: "libs/sqlite3.c".}
```
### Project Structure
```
src/
├── libs/sqlite3.c # SQLite amalgamation
├── db_layer.nim # Connection setup, PRAGMAs
├── bus.nim # Publish, poll, ack
├── heartbeat.nim # Dedicated thread + channel
├── claims.nim # Task claim with leases
└── export.nim # JSONL export
```
### Key Patterns
1. **One connection per thread** - never share `DbConn`
2. **Channels for control** - stop signals, status updates
3. **Short transactions** - BEGIN IMMEDIATE, do work, COMMIT quickly
4. **Exceptions for errors** - catch `DbError`, convert to meaningful messages
## References
- Beads: https://github.com/steveyegge/beads
- Tissue: https://github.com/evil-mind-evil-sword/tissue
- SQLite WAL mode: https://sqlite.org/wal.html
- SQLite BEGIN IMMEDIATE: https://sqlite.org/lang_transaction.html
- tiny_sqlite: https://github.com/GULPF/tiny_sqlite

View file

@ -0,0 +1,219 @@
# Multi-Agent Footguns, Patterns, and Emerging Ideas
**Status**: Research synthesis
**Date**: 2026-01-10
**Sources**: HN discussions, Reddit, practitioner blogs, orch consensus
## Footguns: Lessons Learned the Hard Way
### Git & Branch Chaos
| Footgun | Description | Mitigation |
|---------|-------------|------------|
| **Force-resolve conflicts** | Agents rebase improperly, rewrite history, break CI | No direct git access for agents; orchestrator owns git operations |
| **Stale branches** | Agent works on outdated branch for hours | Frequent auto-rebase; version check before major edits |
| **Recovery nightmare** | Broken git state is hard to recover | Git bundles for checkpoints (SkillFS pattern); worktree isolation |
| **Branch naming confusion** | `worker-id/task-id` becomes misleading on reassignment | Use `type/task-id`; worker identity in commit author |
### State & Database Issues
| Footgun | Description | Mitigation |
|---------|-------------|------------|
| **Shared DB pollution** | Agents debugging against mutated state, heisenbugs | Ephemeral namespaced DBs per branch; schema prefixes |
| **Port conflicts** | Multiple web servers on same port | Auto-increment ports; orchestrator manages allocation |
| **Service duplication** | 10 agents need 10 PostgreSQL/Redis instances | Container-per-worktree; or accept serialization |
| **Feature flag races** | Agents toggle flags in parallel | Namespace flags per agent/branch |
### Coordination Failures
| Footgun | Description | Mitigation |
|---------|-------------|------------|
| **State divergence** | Each agent has different snapshot of reality | Single source of truth artifact; frequent rebase |
| **Silent duplication** | Two agents "fix" same bug differently | Central task ledger with explicit states; idempotent task IDs |
| **Dependency deadlocks** | A waits on B waits on A | Event-driven async; bounded time limits; no sync waits |
| **Role collapse** | Planner writes code; tester refactors | Narrow role boundaries; tool-level constraints |
### Human Bottlenecks
| Footgun | Description | Mitigation |
|---------|-------------|------------|
| **Review overload** | 10 agents = 10 partial PRs to reconcile | Review funnel: worker → arbiter agent → single synthesized PR |
| **Context switching** | Human juggling parallel agent outputs | Size limits per PR; "one story per PR" |
| **Morale drain** | Endless nit-picking, people disable agents | Pre-review by lint/style agents; humans see substantive deltas only |
### Agent-Specific Issues
| Footgun | Description | Mitigation |
|---------|-------------|------------|
| **Hallucinated packages** | 30% of suggested packages don't exist | Validate imports against known registries |
| **Temporary fixes** | Works in session, breaks in Docker | Require full env rebuild as acceptance test |
| **Skill atrophy** | Developers can't code without AI | Deliberate practice; understand what AI generates |
| **Test/impl conspiracy** | Brittle tests + brittle code pass together | Separate spec tests from impl tests; mutation testing |
### Resource & Cost Issues
| Footgun | Description | Mitigation |
|---------|-------------|------------|
| **Token blowups** | Parallel agents saturate context/API limits | Hard budgets per agent; limit context sizes |
| **Credit drain** | AI fixing its own mistakes in loops | Circuit breakers; attempt limits |
| **Timeout misreads** | Rate limits interpreted as semantic failures | Structured error channels; retry with idempotency |
## Emerging Patterns (2026)
### The "Rule of 4"
Research shows effective team sizes limited to ~3-4 agents. Beyond this, communication overhead grows super-linearly (exponent 1.724). Cost of coordination outpaces value.
**Implication**: Don't build 10-agent swarms. Build 3-4 specialized agents with clear boundaries.
### Spec-Driven Development
Adopted by Kiro, Tessl, GitHub Spec Kit:
- `requirements.md` - what to build
- `design.md` - how to build it
- `tasks.md` - decomposed work items
Agents work from specs, not vague prompts. Specs are versioned; agents echo which version they used.
### Layered Coordination (Not Monolithic)
Instead of one complex orchestrator, compose independent layers:
1. Configuration management
2. Issue tracking (JSONL, merge-friendly)
3. Atomic locking (PostgreSQL advisory locks)
4. Filesystem isolation (git worktrees)
5. Validation gates
6. Enforcement rules
7. Session protocols
Each layer independently useful; failures isolated.
### PostgreSQL Advisory Locks for Claims
Novel insight: Advisory locks auto-release on crash (no orphaned locks), operate in ~1ms, no table writes. Elegant solution for distributed claim races.
```sql
SELECT pg_try_advisory_lock(task_id_hash);
-- Work...
SELECT pg_advisory_unlock(task_id_hash);
-- Or: connection dies → auto-released
```
### Git Bundles for Checkpoints (SkillFS)
Every agent sandbox is a git repo. Session ends → git bundle stored. New session → restore from bundle, continue where left off. Complete audit trail via `git log`.
### Hierarchical Over Flat Swarms
Instead of 100-agent flat swarms:
- Nested coordination structures
- Partition the communication graph
- Supervisor per sub-team
- Only supervisors talk to each other
### Plan-and-Execute Cost Pattern
Expensive model creates strategy; cheap models execute steps. Can reduce costs by 90%.
```
Orchestrator (Claude Opus) → Plan
Workers (Claude Haiku) → Execute steps
Reviewer (Claude Sonnet) → Validate
```
### Bounded Autonomy Spectrum
Progressive autonomy based on risk:
1. **Human in the loop** - approve each action
2. **Human on the loop** - monitor, intervene if needed
3. **Human out of the loop** - fully autonomous
Match to task complexity and outcome criticality.
## Best Practices Synthesis
### From HN Discussions
1. **Well-scoped tasks with tight contracts** - Not vague prompts
2. **Automated testing gates** - Agents must pass before review
3. **2-3 agents realistic** - Not 10 parallel
4. **Exclusive ownership per module** - One writer per concern
5. **Short-lived branches** - Frequent merge to prevent drift
### From orch Consensus
1. **Treat agents as untrusted workers** - Not peers with full access
2. **Machine-readable contracts** - JSON schema between roles
3. **Per-agent logs with correlation IDs** - Distributed systems observability
4. **Guardrail agents** - Security/policy checks on every diff
5. **Versioned task specs** - Bump version → re-run affected agents
### From Practitioner Blogs
1. **Coordination ≠ isolation** - Advisory locks (who works on what) + worktrees (how they work)
2. **JSONL for issues** - One per line, deterministic merge rules
3. **Session protocols** - Explicit start/close procedures
4. **Modular rules with includes** - Template configuration
## How This Applies to Our Design
### Already Covered
| Pattern | Our Design |
|---------|------------|
| SQLite for coordination | ✅ bus.db with transactions |
| Git worktrees | ✅ branch-per-worker.md |
| State machine | ✅ worker-state-machine.md |
| Heartbeats/liveness | ✅ 10s interval in message-passing |
| Claim-check pattern | ✅ SQLite transactions |
| Task serialization | ✅ No uncommitted dependencies |
### Should Add
| Pattern | Gap | Action |
|---------|-----|--------|
| Spec-driven tasks | Tasks are just titles | Add structured task specs (requirements, design, acceptance) |
| Role boundaries | Not enforced | Add tool-level constraints per agent type |
| Review funnel | Missing arbiter | Add synthesis step before human review |
| Versioned specs | Not tracked | Add version field to task assignments |
| Cost budgets | Not implemented | Add token/time budgets per agent |
| Correlation IDs | Partial (correlation_id) | Ensure end-to-end tracing |
### Validate Our Decisions
| Decision | Validation |
|----------|------------|
| SQLite over JSONL | ✅ Confirmed - JSONL for issues only, SQLite for coordination |
| Orchestrator creates branches | ✅ Confirmed - reduces agent setup, enforces policy |
| 3-4 agents max | ✅ Aligns with "Rule of 4" research |
| Mandatory rebase | ✅ Confirmed - prevents stale branch drift |
| Escalate semantic conflicts | ✅ Confirmed - agents hallucinate resolutions |
## Open Questions Surfaced
1. **PostgreSQL advisory locks vs SQLite?** - Do we need Postgres, or is SQLite sufficient?
2. **Git bundles for checkpoints?** - Should we adopt SkillFS pattern?
3. **Spec files per task?** - How structured should task specs be?
4. **Arbiter/synthesis agent?** - Add to architecture before human review?
5. **Token budgets?** - How to enforce across different agent types?
## Sources
### HN Discussions
- [Superset: 10 Parallel Coding Agents](https://news.ycombinator.com/item?id=46368739)
- [Desktop App for Parallel Agentic Dev](https://news.ycombinator.com/item?id=46027947)
- [SkillFS: Git-backed Sandboxes](https://news.ycombinator.com/item?id=46543093)
- [Zenflow: Agent Orchestration](https://news.ycombinator.com/item?id=46290617)
- [Git Worktree for Parallel Dev](https://news.ycombinator.com/item?id=46510462)
### Blogs & Articles
- [Building a Multi-Agent Development Workflow](https://itsgg.com/blog/2026/01/08/building-a-multi-agent-development-workflow/)
- [The Real Struggle with AI Coding Agents](https://www.smiansh.com/blogs/the-real-struggle-with-ai-coding-agents-and-how-to-overcome-it/)
- [Why AI Coding Tools Don't Work For Me](https://blog.miguelgrinberg.com/post/why-generative-ai-coding-tools-and-agents-do-not-work-for-me)
- [Microsoft: Multi-Agent Systems at Scale](https://devblogs.microsoft.com/ise/multi-agent-systems-at-scale/)
- [LangChain: How and When to Build Multi-Agent](https://blog.langchain.com/how-and-when-to-build-multi-agent-systems/)
### Research & Analysis
- [VentureBeat: More Agents Isn't Better](https://venturebeat.com/orchestration/research-shows-more-agents-isnt-a-reliable-path-to-better-enterprise-ai)
- [Deloitte: AI Agent Orchestration](https://www.deloitte.com/us/en/insights/industry/technology/technology-media-and-telecom-predictions/2026/ai-agent-orchestration.html)
- [10 Things Developers Want from Agentic IDEs](https://redmonk.com/kholterhoff/2025/12/22/10-things-developers-want-from-their-agentic-ides-in-2025/)

277
docs/design/mvp-scope.md Normal file
View file

@ -0,0 +1,277 @@
# Multi-Agent MVP Scope
**Status**: Draft (v3 - Nim implementation)
**Goal**: Define the minimal viable set of primitives to run 2-3 worker agents coordinated by a human-attended orchestrator.
**Language**: Nim (ORC, cligen, tiny_sqlite)
## Changelog
- **v3**: Nim implementation decision (single binary, fast startup, compiled)
- **v2**: Fixed BLOCKERs from orch spec-review (resolved open questions, added rejection workflow, added failure scenarios)
## Current Design Docs
| Doc | Status | Content |
|-----|--------|---------|
| `worker-state-machine.md` | ✅ Complete | 8 states, transitions, file schema |
| `message-passing-layer.md` | ✅ v4 | SQLite bus, Nim heartbeat thread, channels |
| `worker-cli-primitives.md` | ✅ v3 | Nim CLI with cligen, state machine |
| `human-observability.md` | ✅ Complete | Status dashboard, watch mode, stale detection |
| `branch-per-worker.md` | ✅ Complete | Worktrees, integration branch, rebase protocol |
| `multi-agent-footguns-and-patterns.md` | ✅ Complete | Research synthesis, validated decisions |
| `message-passing-comparison.md` | ✅ Complete | Beads/Tissue comparison, SQLite rationale |
## Task Triage for MVP
### Tier 1: Essential for MVP (Must Have)
These are required to run the basic loop: assign → work → review → merge.
| Bead | Task | Rationale |
|------|------|-----------|
| `skills-sse` | Worker CLI commands | Core interface for orchestrator and agents |
| `skills-4oj` | Worker state machine | Already designed, need implementation |
| `skills-ms5` | Message passing layer | Already designed (SQLite), need implementation |
| `skills-roq` | Branch-per-worker isolation | Already designed, need implementation |
| `skills-byq` | Integrate review-gate | Have review-gate, need to wire to worker flow |
| `skills-yak` | Human observability (status) | Human needs to see what's happening |
| **NEW** | Agent system prompt | LLM needs tool definitions for worker commands |
### Tier 2: Important but Can Defer
| Bead | Task | Why Defer |
|------|------|-----------|
| `skills-0y9` | Structured task specs | Can start with simple task descriptions |
| `skills-4a2` | Role boundaries | Trust-based initially, add constraints later |
| `skills-31y` | Review funnel/arbiter | Works with 2-3 agents; needed at scale |
| `skills-zf6` | Evidence artifacts | Can use simple JSON initially |
| `skills-1jc` | Stuck detection | Monitor manually first (stale detection in MVP) |
### Tier 3: Nice to Have (Post-MVP)
| Bead | Task | Why Later |
|------|------|-----------|
| `skills-1qz` | Token budgets | Manual monitoring first |
| `skills-5ji` | Ephemeral namespaced envs | Single-project MVP |
| `skills-7n4` | Rollback strategy | Manual rollback first |
| `skills-8ak` | Git bundle checkpoints | Worktrees sufficient |
| `skills-r62` | Role + Veto pattern | Simple approve/reject first |
| `skills-udu` | Cross-agent compatibility | Single agent type first (Claude) |
| `skills-sh6` | OpenHands research | Research complete |
| `skills-yc6` | Document findings | Research captured |
## MVP Feature Set
### Commands
```bash
# Orchestrator commands (human runs)
worker spawn <task-id> [--description "..."] # Create branch, worktree, assign task
worker status [--watch] # Dashboard of all workers
worker approve <task-id> # IN_REVIEW → APPROVED
worker request-changes <task-id> # IN_REVIEW → WORKING (rejection)
worker merge <task-id> # APPROVED → COMPLETED
worker cancel <task-id> # * → FAILED (abort)
# Worker commands (agent runs from worktree)
worker start # ASSIGNED → WORKING
worker done [--skip-rebase] # WORKING → IN_REVIEW (includes rebase)
worker heartbeat # Liveness signal (via background thread)
worker fail <reason> # WORKING → FAILED
```
### Data Flow
```
HAPPY PATH:
1. Human: worker spawn skills-abc
→ Creates feat/skills-abc branch
→ Creates worktrees/skills-abc
→ Publishes task_assign message
→ State: ASSIGNED
2. Agent: worker start
→ Publishes state_change (ASSIGNED → WORKING)
→ Starts HeartbeatThread (background)
→ Begins work
3. Agent: worker done
→ Runs git rebase origin/integration
→ Pushes branch
→ Publishes review_request
→ State: IN_REVIEW
4. Human: worker approve skills-abc
→ Publishes review_approved
→ State: APPROVED
5. Human: worker merge skills-abc
→ Merges to integration (retry loop for contention)
→ Cleans up branch/worktree
→ State: COMPLETED
REJECTION PATH:
4b. Human: worker request-changes skills-abc "Fix error handling"
→ Publishes changes_requested
→ State: WORKING
→ Agent resumes work, returns to step 3
CONFLICT PATH:
3b. Agent: worker done (rebase fails)
→ Rebase conflict detected, left in progress
→ State: CONFLICTED
→ Agent resolves conflicts, runs: git rebase --continue
→ Agent: worker done --skip-rebase
→ State: IN_REVIEW
```
### Directory Structure
```
project/
├── .worker-state/
│ ├── bus.db # SQLite message bus (source of truth)
│ ├── bus.jsonl # Debug export (derived)
│ ├── blobs/ # Large payloads (content-addressable)
│ └── workers/
│ └── skills-abc.json # Worker state cache (derived from DB)
├── worktrees/ # Git worktrees (gitignored)
│ └── skills-abc/
│ └── .worker-ctx.json # Static context for this worker
└── .git/
```
## Implementation Order
### Prerequisites
```bash
# Nim dependencies
nimble install tiny_sqlite cligen jsony
```
Download SQLite amalgamation for static linking:
```bash
curl -O https://sqlite.org/2024/sqlite-amalgamation-3450000.zip
unzip sqlite-amalgamation-3450000.zip
cp sqlite-amalgamation-*/sqlite3.c src/libs/
```
### Build Steps
1. **Project setup**
- Create `src/worker.nimble` with dependencies
- Create `src/config.nims` with build flags (--mm:orc, --threads:on)
- Set up static SQLite compilation
2. **Message bus** (`skills-ms5`)
- `src/worker/db.nim` - SQLite schema, connection setup
- `src/worker/bus.nim` - publish/poll/ack functions
- Dedicated heartbeat thread with channels
3. **Worker state** (`skills-4oj`)
- `src/worker/state.nim` - State enum, transition guards
- `src/worker/types.nim` - Shared types
- Compare-and-set with BEGIN IMMEDIATE
4. **Branch primitives** (`skills-roq`)
- `src/worker/git.nim` - Worktree create/remove (osproc)
- Rebase with conflict detection
- Merge with retry loop
5. **CLI commands** (`skills-sse`)
- `src/worker.nim` - cligen dispatchMulti
- All subcommands: spawn, status, start, done, approve, merge, cancel
- Background heartbeat thread
6. **review-gate integration** (`skills-byq`)
- review-gate calls `worker approve` / `worker request-changes`
- Stop hook checks worker state from bus.db
7. **Status dashboard** (`skills-yak`)
- `worker status` with table output
- Stale detection from heartbeats table
- `--watch` mode for real-time updates
8. **Agent system prompt**
- Tool definitions for worker commands
- Context about worktree location, task description
- Instructions for heartbeat, done, conflict handling
### Compilation
```bash
nim c -d:release --mm:orc --threads:on src/worker.nim
# Output: single static binary ~2-3MB
```
## Success Criteria
MVP is complete when:
### Happy Path
1. [ ] Can spawn a worker with `worker spawn <task>`
2. [ ] Worker appears in `worker status` dashboard with state and heartbeat
3. [ ] Agent can signal `worker start` and `worker done`
4. [ ] Heartbeats track agent liveness (stale detection after 30s)
5. [ ] `worker approve` transitions to APPROVED
6. [ ] `worker merge` completes the cycle
7. [ ] All state persists across session restarts
### Failure Scenarios
8. [ ] Rebase conflict detected → state CONFLICTED, rebase left in progress
9. [ ] Agent timeout (no heartbeat 2+ min) → status shows STALE warning
10. [ ] `worker request-changes` returns to WORKING with feedback
11. [ ] `worker cancel` aborts any state → FAILED
12. [ ] Concurrent merge attempts handled (retry loop succeeds)
## Non-Goals for MVP
- Multiple orchestrators
- Cross-machine coordination
- Automatic conflict resolution (human intervenes)
- Token budgeting
- Structured task specs (simple descriptions)
- Arbiter agents
- Database isolation per worker
## Resolved Questions
| Question | Decision | Rationale |
|----------|----------|-----------|
| Language | **Nim** | Single binary, fast startup, compiled, Python-like syntax |
| CLI framework | **cligen** | Auto-generates from proc signatures |
| SQLite wrapper | **tiny_sqlite** | Better than stdlib, RAII, prepared statements |
| Memory management | **ORC** | Handles cycles, deterministic destruction |
| Static linking | **SQLite amalgamation** | Single binary, no system dependencies |
| Source of truth | **SQLite only** | JSON files are derived caches; DB is authoritative |
| Heartbeat | **Dedicated thread + channels** | Nim threads don't share memory |
| Integration branch | **Require exists** | Human creates `integration` before first spawn |
| review-gate | **Calls worker CLI** | `review-gate approve``worker approve` |
| STALE state | **Computed for display** | Not a persistent state; derived from heartbeat age |
## Nim Dependencies
| Package | Purpose |
|---------|---------|
| `tiny_sqlite` | SQLite wrapper with RAII |
| `cligen` | CLI subcommand generation |
| `jsony` | Fast JSON parsing (optional) |
| stdlib `osproc` | Git subprocess operations |
| stdlib `channels` | Thread communication |
| stdlib `times` | Epoch timestamps |
## Spec Review Resolution
| Issue | Resolution |
|-------|------------|
| Missing rejection workflow | Added `request-changes` command and path in data flow |
| Agent system prompt missing | Added to Tier 1 implementation order |
| Source of truth confusion | Clarified SQLite primary, JSON derived |
| Test scenarios missing | Added failure scenarios 8-12 to success criteria |
| Heartbeat mechanism | Dedicated thread with own SQLite connection |
| Review-gate integration | Clarified review-gate calls worker CLI |
| Language choice | Nim for single binary, fast startup |

View file

@ -0,0 +1,567 @@
# Worker CLI Primitives Design
**Status**: Draft (v3 - Nim implementation)
**Bead**: skills-sse
**Epic**: skills-s6y (Multi-agent orchestration: Lego brick architecture)
**Language**: Nim (cligen, ORC)
## Changelog
- **v3**: Nim implementation (cligen, tiny_sqlite, osproc)
- **v2**: Fixed BLOCKERs from orch spec-review (approve command, conflict flow, source of truth, races, STALE semantics)
## Overview
This document defines the CLI commands for multi-agent worker coordination. The CLI is the interface between human orchestrators, AI agents, and the underlying coordination infrastructure (SQLite bus, git worktrees, state machine).
## Design Decisions (from orch consensus)
### Strong Consensus (4/4 models)
| Decision | Choice | Rationale |
|----------|--------|-----------|
| Spawn semantics | Prepare workspace only | Separation of concerns; debug before execute |
| Implementation | **Nim CLI** | Single binary, fast startup, compiled |
| CLI framework | **cligen** | Auto-generates from proc signatures |
| Idempotency | All commands idempotent | Crash recovery, safe retries |
| Worker ID = Task ID | Yes, 1:1 mapping | Simplifies mental model; add attempt IDs later if needed |
| State source of truth | SQLite only | All state reads/writes via DB; JSON files are derived caches |
| STALE detection | Computed for display | STALE is not a persistent state; computed from heartbeat age |
### Split Decision: Agent Discovery
| Approach | Advocates | Trade-off |
|----------|-----------|-----------|
| Local file in worktree | Gemini, GPT | `cd worktree && cmd` just works; no args needed |
| Explicit CLI arg | Qwen, Sonar | More explicit; no "magic" context |
**Decision**: Hybrid - local file for context, but allow explicit `--task` override.
### Split Decision: Heartbeat Mechanism
| Approach | Advocates | Trade-off |
|----------|-----------|-----------|
| Wrapper/supervisor | Gemini, GPT | Reliable; agent doesn't need to remember |
| Explicit call | Qwen, Sonar | Simple; observable; agent controls timing |
**Decision**: Start with explicit call; add wrapper as optional enhancement.
## Command Structure
```
worker <command> [options]
```
### Orchestrator Commands (Human Runs)
| Command | Purpose | State Transition |
|---------|---------|------------------|
| `worker spawn <task-id>` | Create workspace | → ASSIGNED |
| `worker status` | Dashboard of all workers | (read-only) |
| `worker approve <task-id>` | Approve completed work | IN_REVIEW → APPROVED |
| `worker request-changes <task-id>` | Request revisions | IN_REVIEW → WORKING |
| `worker merge <task-id>` | Merge approved work | APPROVED → COMPLETED |
| `worker cancel <task-id>` | Abort and cleanup (state only) | * → FAILED |
### Agent Commands (Worker Runs)
| Command | Purpose | State Transition |
|---------|---------|------------------|
| `worker start` | Begin work | ASSIGNED → WORKING |
| `worker done` | Complete work (includes rebase) | WORKING → IN_REVIEW |
| `worker heartbeat` | Liveness signal | (no transition) |
| `worker fail <reason>` | Signal failure | WORKING → FAILED |
## Command Specifications
### `worker spawn <task-id>`
Create a new worker workspace for a task.
```bash
worker spawn skills-abc [--description "Fix auth bug"] [--from integration]
```
**Actions**:
1. Check task doesn't already exist (or return existing if idempotent)
2. Create branch `feat/<task-id>` from `--from` (default: `origin/integration`)
3. Create worktree at `worktrees/<task-id>/`
4. Write context file `worktrees/<task-id>/.worker-ctx.json`
5. Insert `task_assign` message into bus.db
6. Create state file `.worker-state/workers/<task-id>.json`
**Context File** (`.worker-ctx.json`):
```json
{
"task_id": "skills-abc",
"branch": "feat/skills-abc",
"worktree": "worktrees/skills-abc",
"created_at": "2026-01-10T15:00:00Z",
"description": "Fix auth bug"
}
```
**State File** (`.worker-state/workers/<task-id>.json`):
```json
{
"task_id": "skills-abc",
"state": "ASSIGNED",
"branch": "feat/skills-abc",
"assigned_at": "2026-01-10T15:00:00Z",
"state_changed_at": "2026-01-10T15:00:00Z"
}
```
**Idempotency**: If task exists and worktree exists, return success with existing path.
**Output**:
```
Created worker: skills-abc
Branch: feat/skills-abc
Worktree: worktrees/skills-abc
State: ASSIGNED
```
### `worker status`
Show dashboard of all workers.
```bash
worker status [--state working] [--stale] [--json]
```
**Output**:
```
TASK STATE BRANCH LAST HEARTBEAT AGE
skills-abc WORKING feat/skills-abc 2m ago 45m
skills-xyz IN_REVIEW feat/skills-xyz -- 2h
skills-123 STALE feat/skills-123 15m ago 3h
```
**Stale Detection**: Worker is STALE if:
- State is ASSIGNED/WORKING and no heartbeat in 5 minutes
- State is IN_REVIEW and no activity in 1 hour
### `worker start`
Agent signals it's beginning work. Run from inside worktree.
```bash
cd worktrees/skills-abc
worker start
```
**Actions**:
1. Read task_id from `.worker-ctx.json` (or `--task` arg)
2. Verify current state is ASSIGNED
3. Insert `state_change` message (ASSIGNED → WORKING)
4. Update state file
5. Record first heartbeat
**Idempotency**: If already WORKING, log warning and succeed.
### `worker done`
Agent signals work is complete. Includes mandatory rebase.
```bash
cd worktrees/skills-abc
worker done [--skip-rebase]
```
**Actions**:
1. Read task_id from context
2. Verify current state is WORKING or CONFLICTED (for retry)
3. If not `--skip-rebase`:
- Run `git fetch origin integration`
- Run `git rebase origin/integration`
- If conflict: **DO NOT ABORT** - leave rebase in progress, set state to CONFLICTED, exit with error
4. If `--skip-rebase`: verify no rebase in progress and branch is up-to-date
5. Run `git push -u origin feat/<task-id>`
6. Insert `review_request` message
7. Transition to IN_REVIEW via `transition()` with compare-and-set
**Rebase Failure** (rebase left in progress):
```
ERROR: Rebase conflict detected
Conflicting files: src/auth.py
State: CONFLICTED
To resolve:
1. Fix conflicts in worktree
2. git add <files>
3. git rebase --continue
4. worker done --skip-rebase
```
**State Guards**:
- From WORKING: proceed with rebase
- From CONFLICTED with `--skip-rebase`: proceed (assumes conflicts resolved)
- From CONFLICTED without `--skip-rebase`: attempt rebase again
### `worker heartbeat`
Emit liveness signal. Call periodically (every 10-30s).
```bash
cd worktrees/skills-abc
worker heartbeat [--status working] [--progress 0.5]
```
**Actions**:
1. Read task_id from context
2. Insert `heartbeat` message with timestamp
3. Update `last_heartbeat` in state file
**Idempotency**: Always succeeds; updates timestamp.
### `worker approve <task-id>`
Human/review-gate approves completed work.
```bash
worker approve skills-abc [--by reviewer] [--comment "LGTM"]
```
**Actions**:
1. Verify state is IN_REVIEW
2. Transition to APPROVED via `transition()`
3. Insert `review_approved` message with reviewer and comment
**Idempotency**: If already APPROVED, log and succeed.
### `worker request-changes <task-id>`
Human/review-gate requests revisions.
```bash
worker request-changes skills-abc [--comment "Fix error handling"]
```
**Actions**:
1. Verify state is IN_REVIEW
2. Transition to WORKING via `transition()`
3. Insert `changes_requested` message with comment
**Idempotency**: If already WORKING, log and succeed.
### `worker merge <task-id>`
Merge approved work to integration branch.
```bash
worker merge skills-abc [--delete-branch]
```
**Actions** (with retry loop for contention):
1. Verify state is APPROVED
2. `git fetch origin integration feat/<task-id>`
3. `git checkout integration && git reset --hard origin/integration`
4. Merge `feat/<task-id>` with `--no-ff`
5. Push integration
- If push fails (remote changed): go to step 2 (max 3 retries)
- If merge conflict: transition APPROVED → WORKING, exit with error
6. Transition to COMPLETED via `transition()`
7. If `--delete-branch`: delete remote and local branch
8. Remove worktree via `git worktree remove`
**Idempotency**: If already COMPLETED, log and succeed.
**Merge Conflict** (rare - only if integration moved significantly):
```
ERROR: Merge conflict during integration
State: WORKING (returned for revision)
To resolve:
1. Rebase in worktree: git rebase origin/integration
2. worker done
3. Get re-approval
```
### `worker kill <task-id>`
Abort a worker and cleanup.
```bash
worker kill skills-abc [--reason "Scope changed"]
```
**Actions**:
1. Insert `task_failed` message with reason
2. Update state to FAILED
3. Optionally remove worktree (with `--cleanup`)
4. Optionally archive branch (with `--archive`)
## State Transitions via CLI
```
spawn
┌──────────┐ ┌──────────┐ start ┌─────────┐
│ (none) │ ───► │ ASSIGNED │ ─────────────► │ WORKING │
└──────────┘ └──────────┘ └────┬────┘
┌────────────────────────────┤
│ │
│ done (rebase ok) │ done (conflict)
▼ ▼
┌───────────┐ ┌────────────┐
│ IN_REVIEW │ │ CONFLICTED │
└─────┬─────┘ └────────────┘
│ │
approve │ │ fix + done
(external) │ │
▼ │
┌──────────┐ │
│ APPROVED │ ◄─────────────────────┘
└────┬─────┘
│ merge
┌───────────┐
│ COMPLETED │
└───────────┘
```
## Directory Structure
```
project/
├── .worker-state/
│ ├── bus.db # SQLite message bus
│ ├── bus.jsonl # Debug export
│ └── workers/
│ ├── skills-abc.json # State file
│ └── skills-xyz.json
├── worktrees/ # Git worktrees (gitignored)
│ ├── skills-abc/
│ │ ├── .worker-ctx.json # Context for this worker
│ │ └── (project files)
│ └── skills-xyz/
└── .git/
```
## Implementation
### Nim Project Structure
```
src/
├── libs/sqlite3.c # SQLite amalgamation (static linking)
├── worker.nim # CLI entry point (cligen dispatch)
├── worker/
│ ├── db.nim # SQLite operations (tiny_sqlite)
│ ├── git.nim # Git/worktree operations (osproc)
│ ├── state.nim # State machine logic
│ ├── context.nim # Worker context handling
│ ├── heartbeat.nim # Dedicated heartbeat thread
│ └── types.nim # Shared types and constants
├── config.nims # Build configuration
└── worker.nimble # Package definition
```
### CLI Entry Point (cligen)
```nim
import cligen
import worker/[db, git, state, context, heartbeat]
proc spawn(taskId: string, description = "", fromBranch = "origin/integration") =
## Create a new worker workspace
let ctx = createWorker(taskId, description, fromBranch)
echo "Created worker: ", taskId
echo " Branch: ", ctx.branch
echo " Worktree: ", ctx.worktree
echo " State: ASSIGNED"
proc status(state = "", stale = false, json = false, watch = false) =
## Show worker dashboard
let workers = getAllWorkers()
if json:
echo toJson(workers)
else:
printStatusTable(workers, stale)
proc start(task = "") =
## Signal ASSIGNED → WORKING (run from worktree)
let taskId = if task != "": task else: readContext().taskId
transition(taskId, "ASSIGNED", "WORKING")
echo "Started work on ", taskId
proc done(skipRebase = false) =
## Signal WORKING → IN_REVIEW (includes rebase)
let ctx = readContext()
if not skipRebase:
let ok = rebaseOnIntegration(ctx)
if not ok:
transition(ctx.taskId, "WORKING", "CONFLICTED")
quit("Rebase conflict - resolve and run: worker done --skip-rebase", 1)
pushBranch(ctx)
transition(ctx.taskId, "WORKING", "IN_REVIEW")
echo "Ready for review: ", ctx.taskId
proc approve(taskId: string, by = "", comment = "") =
## Approve completed work (IN_REVIEW → APPROVED)
transition(taskId, "IN_REVIEW", "APPROVED")
echo "Approved: ", taskId
proc requestChanges(taskId: string, comment = "") =
## Request revisions (IN_REVIEW → WORKING)
transition(taskId, "IN_REVIEW", "WORKING")
echo "Changes requested: ", taskId
proc merge(taskId: string, deleteBranch = false) =
## Merge approved work (APPROVED → COMPLETED)
mergeToIntegration(taskId, deleteBranch)
transition(taskId, "APPROVED", "COMPLETED")
echo "Merged: ", taskId
proc cancel(taskId: string, reason = "") =
## Abort a worker (* → FAILED)
transitionToFailed(taskId, reason)
echo "Cancelled: ", taskId
when isMainModule:
dispatchMulti(
[spawn],
[status],
[start],
[done],
[approve],
[requestChanges, cmdName = "request-changes"],
[merge],
[cancel]
)
```
### State Machine Guards
```nim
import std/[tables, strformat]
import tiny_sqlite
type
WorkerState* = enum
wsAssigned = "ASSIGNED"
wsWorking = "WORKING"
wsConflicted = "CONFLICTED"
wsInReview = "IN_REVIEW"
wsApproved = "APPROVED"
wsCompleted = "COMPLETED"
wsFailed = "FAILED"
InvalidTransition* = object of CatchableError
StaleState* = object of CatchableError
# NOTE: STALE is NOT a persistent state - computed for display from heartbeat age
const ValidTransitions* = {
wsAssigned: @[wsWorking, wsFailed],
wsWorking: @[wsInReview, wsConflicted, wsFailed],
wsConflicted: @[wsInReview, wsWorking, wsFailed],
wsInReview: @[wsApproved, wsWorking, wsFailed],
wsApproved: @[wsCompleted, wsWorking, wsFailed],
wsCompleted: @[],
wsFailed: @[wsAssigned], # Retry
}.toTable
proc transition*(db: DbConn, taskId: string, fromState, toState: WorkerState) =
## Attempt state transition with compare-and-set guard
if toState notin ValidTransitions[fromState]:
raise newException(InvalidTransition, &"{fromState} → {toState}")
db.exec("BEGIN IMMEDIATE")
try:
let current = db.value("SELECT state FROM workers WHERE task_id = ?", taskId).get.fromDbValue(string)
if current != $fromState:
raise newException(StaleState, &"Expected {fromState}, got {current}")
db.exec("UPDATE workers SET state = ?, state_changed_at = ? WHERE task_id = ?",
$toState, epochTime(), taskId)
publishMessage(db, taskId, "state_change", %*{"from": $fromState, "to": $toState})
db.exec("COMMIT")
except:
db.exec("ROLLBACK")
raise
```
### Build Configuration
```nim
# config.nims
--mm:orc
--threads:on
-d:release
--opt:size
# Static SQLite
switch("passC", "-DSQLITE_THREADSAFE=1")
switch("compile", "libs/sqlite3.c")
```
### Exit Codes
```nim
const
ExitSuccess* = 0
ExitUsageError* = 2
ExitInvalidTransition* = 3
ExitGitError* = 4
ExitDbError* = 5
ExitConflict* = 6
```
## Integration with Existing Components
### With Message Bus (skills-ms5)
All state transitions publish messages:
```python
db.publish_message(task_id, 'state_change', {'from': 'ASSIGNED', 'to': 'WORKING'})
```
### With review-gate (skills-byq)
Review-gate reads worker state:
```python
state = db.get_state(task_id)
if state == 'IN_REVIEW':
# Check review status
...
```
### With Status Dashboard (skills-yak)
Status command queries message bus:
```python
workers = db.query("""
SELECT task_id, state, last_heartbeat
FROM workers
ORDER BY state_changed_at DESC
""")
```
## Open Questions
1. ~~**Approval flow**~~: Resolved - `worker approve` and `worker request-changes` commands added
2. **Agent wrapper**: Should `worker run <agent-cmd>` wrap agent execution with heartbeats?
3. **Multiple attempts**: If task fails, `worker retry <task-id>` or new spawn?
4. **Integration branch**: Create on first spawn, or require it exists?
## Resolved from Spec Review
| Issue | Resolution |
|-------|------------|
| Missing approve command | Added `worker approve` and `worker request-changes` |
| Rebase conflict flow | Don't abort, leave in progress for resolution |
| Source of truth conflict | SQLite only; JSON files are derived caches |
| STALE as persistent state | Changed to computed for display from heartbeat age |
| `done` from CONFLICTED | Now allowed with --skip-rebase |
| Merge race condition | Added fetch-merge-push retry loop |
| Spawn race condition | Use DB unique constraint (implementation detail) |
## References
- State machine design: `docs/design/worker-state-machine.md`
- Message passing: `docs/design/message-passing-layer.md`
- Branch isolation: `docs/design/branch-per-worker.md`

View file

@ -0,0 +1,258 @@
# Worker State Machine Design
**Status**: Draft
**Bead**: skills-4oj
**Epic**: skills-s6y (Multi-agent orchestration: Lego brick architecture)
## Overview
This document defines the state machine for background worker agents in the multi-agent coordination system. Workers are AI coding agents that operate on separate git branches, coordinated through local filesystem state.
## Design Principles
1. **File-based, no database** - State in JSON files, messages in append-only JSONL
2. **Minimal states** - Only what's needed, resist over-engineering
3. **Atomic updates** - Write to temp file, fsync, atomic rename
4. **Crash recovery** - Detect stale workers via timestamp + process checks
5. **Cross-agent compatible** - Any agent that reads/writes files can participate
## State Diagram
```
┌─────────────────────────────────────────────┐
│ │
▼ │
┌──────────┐ assign ┌──────────┐ start ┌─────────┐ │
│ IDLE │ ──────────► │ ASSIGNED │ ─────────► │ WORKING │ ◄───┘
└──────────┘ └──────────┘ └────┬────┘ changes
▲ │ requested
│ │
│ reset submit_pr │
│ ▼
┌──────────┐ timeout ┌──────────┐ review ┌─────────────┐
│ FAILED │ ◄────────── │ STALE │ ◄───────── │ IN_REVIEW │
└──────────┘ └──────────┘ └──────┬──────┘
│ │
│ approve │
│ ▼
│ ┌──────────┐ merge ┌─────────────┐
└───────────────────│ COMPLETED│ ◄───────── │ APPROVED │
retry └──────────┘ └─────────────┘
```
## States
| State | Description | Entry Condition | Exit Condition |
|-------|-------------|-----------------|----------------|
| **IDLE** | Worker available, no task | Initial / after reset | Task assigned |
| **ASSIGNED** | Task claimed, preparing | Orchestrator assigns task | Branch created, work starts |
| **WORKING** | Actively coding/testing | `git checkout -b` succeeds | PR submitted or failure |
| **IN_REVIEW** | PR open, awaiting review | PR created | Approved, changes requested, or timeout |
| **APPROVED** | Review passed, ready to merge | Reviewer approves | Merge succeeds or conflicts |
| **COMPLETED** | Work merged to main | Merge succeeds | (terminal) or recycle to IDLE |
| **STALE** | No progress detected | Heartbeat timeout | Manual intervention or auto-reset |
| **FAILED** | Unrecoverable error | Max retries exceeded | Manual reset or reassign |
## State File Schema
Location: `.worker-state/{worker-id}.json`
```json
{
"worker_id": "worker-auth",
"state": "WORKING",
"task_id": "skills-abc",
"branch": "worker-auth/skills-abc",
"assigned_at": "2026-01-10T14:00:00Z",
"state_changed_at": "2026-01-10T14:05:00Z",
"last_heartbeat": "2026-01-10T14:32:00Z",
"pid": 12345,
"attempt": 1,
"max_attempts": 3,
"last_error": null,
"pr_url": null,
"review_state": null
}
```
### Field Definitions
| Field | Type | Description |
|-------|------|-------------|
| `worker_id` | string | Unique worker identifier (kebab-case) |
| `state` | enum | Current state (see States table) |
| `task_id` | string | Bead ID of assigned task |
| `branch` | string | Git branch name: `{worker_id}/{task_id}` |
| `assigned_at` | ISO8601 | When task was assigned |
| `state_changed_at` | ISO8601 | Last state transition timestamp |
| `last_heartbeat` | ISO8601 | Last activity timestamp |
| `pid` | int | Process ID (for crash detection) |
| `attempt` | int | Current attempt number (1-indexed) |
| `max_attempts` | int | Max retries before FAILED |
| `last_error` | string? | Error message if failed |
| `pr_url` | string? | Pull request URL when in IN_REVIEW |
| `review_state` | enum? | `pending`, `approved`, `changes_requested` |
## Transitions
### Valid Transitions
```
IDLE → ASSIGNED (assign_task)
ASSIGNED → WORKING (start_work)
ASSIGNED → FAILED (setup_failed)
WORKING → IN_REVIEW (submit_pr)
WORKING → FAILED (work_failed, max_attempts)
WORKING → STALE (heartbeat_timeout)
IN_REVIEW → APPROVED (review_approved)
IN_REVIEW → WORKING (changes_requested)
IN_REVIEW → STALE (review_timeout)
APPROVED → COMPLETED (merge_success)
APPROVED → WORKING (merge_conflict → rebase required)
STALE → WORKING (worker_recovered)
STALE → FAILED (timeout_exceeded)
FAILED → IDLE (reset/retry)
COMPLETED → IDLE (recycle)
```
### Transition Commands
Each transition is triggered by a command or detected condition:
| Command | From | To | Action |
|---------|------|----|----|
| `worker assign <id> <task>` | IDLE | ASSIGNED | Write state file, record task |
| `worker start <id>` | ASSIGNED | WORKING | Create branch, start heartbeat |
| `worker submit <id>` | WORKING | IN_REVIEW | Push branch, create PR |
| `worker approve <id>` | IN_REVIEW | APPROVED | Record approval |
| `worker merge <id>` | APPROVED | COMPLETED | Merge PR, delete branch |
| `worker reset <id>` | FAILED | IDLE | Clear state, ready for new task |
## File Operations
### Atomic State Updates
Never write directly to state file. Use atomic rename pattern:
```bash
# Write to temp file
echo "$new_state" > .worker-state/${worker_id}.json.tmp
# Sync to disk
sync .worker-state/${worker_id}.json.tmp
# Atomic rename
mv .worker-state/${worker_id}.json.tmp .worker-state/${worker_id}.json
```
### Optimistic Concurrency
Include `state_changed_at` as a version marker:
1. Read current state file
2. Compute new state
3. Before write, re-read and verify `state_changed_at` unchanged
4. If changed, retry from step 1
### Lock File Pattern
For operations needing exclusive access:
```bash
# Acquire lock (atomic create with O_EXCL)
if ! (set -C; echo "$$" > .worker-state/${worker_id}.lock) 2>/dev/null; then
# Lock held - check if stale
lock_pid=$(cat .worker-state/${worker_id}.lock)
if ! kill -0 "$lock_pid" 2>/dev/null; then
# Process dead, steal lock
rm -f .worker-state/${worker_id}.lock
echo "$$" > .worker-state/${worker_id}.lock
else
exit 1 # Lock held by live process
fi
fi
# ... do work ...
# Release lock
rm -f .worker-state/${worker_id}.lock
```
## Stale Worker Detection
Workers must update `last_heartbeat` during active work (every 30-60 seconds).
Detection algorithm (run by orchestrator or patrol agent):
```
for each worker file in .worker-state/*.json:
if state in (ASSIGNED, WORKING, IN_REVIEW):
if (now - last_heartbeat) > STALE_TIMEOUT:
if pid is set and process not running:
transition to STALE (worker crashed)
elif (now - last_heartbeat) > DEAD_TIMEOUT:
transition to STALE (presumed dead)
```
### Timeout Values
| Timeout | Duration | Description |
|---------|----------|-------------|
| `STALE_TIMEOUT` | 5 minutes | Mark as stale, eligible for intervention |
| `DEAD_TIMEOUT` | 15 minutes | Presume dead, ready for reassignment |
| `REVIEW_TIMEOUT` | 1 hour | Review taking too long, alert human |
## Directory Structure
```
.worker-state/
├── worker-auth.json # Worker state file
├── worker-auth.lock # Lock file (when held)
├── worker-refactor.json
├── messages/
│ ├── orchestrator.jsonl # Orchestrator → workers
│ ├── worker-auth.jsonl # Worker → orchestrator
│ └── worker-refactor.jsonl
└── reviews/
├── worker-auth.json # Review state (review-gate)
└── worker-refactor.json
```
## Integration Points
### With review-gate (skills-byq)
When worker submits PR (WORKING → IN_REVIEW):
1. Worker writes review request to `reviews/{worker-id}.json`
2. review-gate Stop hook checks review state
3. On approval, orchestrator transitions to APPROVED
### With message passing (skills-ms5)
Workers append to their outbox: `.worker-state/messages/{worker-id}.jsonl`
Orchestrator appends to: `.worker-state/messages/orchestrator.jsonl`
Message format:
```json
{"ts": "2026-01-10T14:00:00Z", "from": "worker-auth", "type": "done", "task": "skills-abc"}
```
### With branch isolation (skills-roq)
- Branch naming: `{worker-id}/{task-id}`
- Mandatory rebase before IN_REVIEW → APPROVED
- Merge to main only from APPROVED state
## Open Questions
1. **Heartbeat mechanism**: Should workers write to their state file, a separate heartbeat file, or append to message log?
2. **PR creation**: Who creates the PR - worker agent or orchestrator?
3. **Review source**: Human review, AI review (review-gate), or both?
4. **Recycle vs terminate**: Should COMPLETED workers return to IDLE or be terminated?
## References
- Temporal.io: Event-sourced durable execution
- LangGraph: Graph-based state machines for agents
- Airflow: DAG-based task lifecycle (None → Scheduled → Running → Success)
- OpenHands: Event-sourced (Observation, Thought, Action) loop

1
src/.gitignore vendored Normal file
View file

@ -0,0 +1 @@
*.out

80
src/README.md Normal file
View file

@ -0,0 +1,80 @@
# Worker CLI
Multi-agent worker coordination CLI written in Nim.
## Prerequisites
```bash
# Install Nim (>= 2.0.0)
# On NixOS: nix-shell -p nim
# Install dependencies
nimble install tiny_sqlite cligen
```
## Static SQLite (Optional)
For a single static binary, download the SQLite amalgamation:
```bash
curl -LO https://sqlite.org/2024/sqlite-amalgamation-3450000.zip
unzip sqlite-amalgamation-3450000.zip
cp sqlite-amalgamation-3450000/sqlite3.c libs/
```
## Build
```bash
# Development build
nim c src/worker.nim
# Release build (optimized)
nim c -d:release src/worker.nim
# The binary will be at src/worker
```
## Usage
```bash
# Orchestrator commands (human)
worker spawn <task-id> # Create workspace
worker status [--watch] # Dashboard
worker approve <task-id> # Approve work
worker request-changes <task-id> # Request changes
worker merge <task-id> # Merge to integration
worker cancel <task-id> # Abort worker
# Agent commands (from worktree)
worker start # Begin work
worker done [--skip-rebase] # Complete work
worker fail <reason> # Signal failure
worker heartbeat # Manual heartbeat
worker show <task-id> # Detailed view
```
## Project Structure
```
src/
├── worker.nim # CLI entry point (cligen)
├── worker.nimble # Package definition
├── config.nims # Build configuration
├── libs/
│ └── sqlite3.c # SQLite amalgamation (optional)
└── worker/
├── types.nim # Shared types and constants
├── db.nim # SQLite operations
├── state.nim # State machine
├── heartbeat.nim # Background thread
├── git.nim # Git/worktree operations
└── context.nim # Worker context handling
```
## Design Docs
See `docs/design/`:
- `mvp-scope.md` - MVP scope and implementation order
- `message-passing-layer.md` - SQLite message bus
- `worker-cli-primitives.md` - CLI commands
- `worker-state-machine.md` - State transitions

22
src/config.nims Normal file
View file

@ -0,0 +1,22 @@
# Build configuration for worker CLI
# Memory management: ORC handles cycles, deterministic destruction
switch("mm", "orc")
# Enable threading for heartbeat
switch("threads", "on")
# Release mode optimizations
when defined(release):
switch("opt", "size")
switch("passC", "-flto")
switch("passL", "-flto")
# Static SQLite compilation
switch("passC", "-DSQLITE_THREADSAFE=1")
switch("passC", "-DSQLITE_ENABLE_JSON1")
switch("passC", "-DSQLITE_OMIT_LOAD_EXTENSION")
# Compile SQLite amalgamation if present
when fileExists("libs/sqlite3.c"):
switch("compile", "libs/sqlite3.c")

2
src/libs/.gitkeep Normal file
View file

@ -0,0 +1,2 @@
# Place sqlite3.c here for static linking
# Download from: https://sqlite.org/download.html (amalgamation)

341
src/worker.nim Normal file
View file

@ -0,0 +1,341 @@
## Worker CLI - Multi-agent coordination tool
##
## Usage:
## worker spawn <task-id> Create a new worker workspace
## worker status Show dashboard of all workers
## worker start Signal ASSIGNED → WORKING
## worker done Signal WORKING → IN_REVIEW
## worker approve <task-id> Approve completed work
## worker merge <task-id> Merge approved work
## worker cancel <task-id> Abort a worker
import std/[os, strformat, json, times, terminal, strutils, sequtils]
import cligen
import tiny_sqlite
import worker/[types, db, state, git, context, heartbeat]
# -----------------------------------------------------------------------------
# Orchestrator Commands
# -----------------------------------------------------------------------------
proc spawn(taskId: string, description: string = "",
fromBranch: string = "origin/integration") =
## Create a new worker workspace
let db = openBusDb()
defer: db.close()
# Check if already exists
let existing = db.getWorker(taskId)
if existing.isSome:
echo "Worker already exists: ", taskId
echo " State: ", existing.get.state
echo " Worktree: ", existing.get.worktree
quit(ExitSuccess)
# Create git worktree
let (branch, worktree) = createWorktree(taskId, fromBranch)
# Create context file
discard createWorkerContext(taskId, branch, worktree, description)
# Create worker in DB
discard db.createWorker(taskId, branch, worktree, description)
echo "Created worker: ", taskId
echo " Branch: ", branch
echo " Worktree: ", worktree
echo " State: ASSIGNED"
proc status(state: string = "", stale: bool = false,
json: bool = false, watch: bool = false) =
## Show dashboard of all workers
proc render() =
let db = openBusDb()
defer: db.close()
let workers = db.getAllWorkers()
if json:
var arr = newJArray()
for w in workers:
arr.add(%*{
"task_id": w.taskId,
"state": $w.state,
"branch": w.branch,
"stale": w.staleLevel,
"age": $(getTime() - w.createdAt)
})
echo arr.pretty
return
# Filter
var filtered = workers
if state != "":
filtered = workers.filterIt($it.state == state.toUpperAscii)
if stale:
filtered = filtered.filterIt(it.isStale)
if filtered.len == 0:
echo "No workers found."
return
# Table header
echo ""
echo "TASK".alignLeft(14), "STATE".alignLeft(12), "AGE".alignLeft(8),
"HEARTBEAT".alignLeft(12), "STATUS".alignLeft(8), "SUMMARY"
echo "-".repeat(70)
for w in filtered:
let age = getTime() - w.createdAt
let ageStr = if age.inHours > 0: &"{age.inHours}h"
elif age.inMinutes > 0: &"{age.inMinutes}m"
else: &"{age.inSeconds}s"
var hbStr = "--"
if w.lastHeartbeat != Time():
let hbAge = getTime() - w.lastHeartbeat
hbStr = if hbAge.inMinutes > 0: &"{hbAge.inMinutes}m ago"
else: &"{hbAge.inSeconds}s ago"
let staleStr = w.staleLevel
let summary = if w.description.len > 30: w.description[0..29] & "..."
else: w.description
echo w.taskId.alignLeft(14),
($w.state).alignLeft(12),
ageStr.alignLeft(8),
hbStr.alignLeft(12),
staleStr.alignLeft(8),
summary
if watch:
while true:
eraseScreen()
setCursorPos(0, 0)
echo "Worker Status (", now().format("HH:mm:ss"), ") - Press Ctrl+C to exit"
render()
sleep(2000)
else:
render()
proc approve(taskId: string, by: string = "", comment: string = "") =
## Approve completed work (IN_REVIEW → APPROVED)
let db = openBusDb()
defer: db.close()
db.transition(taskId, wsInReview, wsApproved)
echo "Approved: ", taskId
proc requestChanges(taskId: string, comment: string = "") =
## Request revisions (IN_REVIEW → WORKING)
let db = openBusDb()
defer: db.close()
db.transition(taskId, wsInReview, wsWorking)
echo "Changes requested: ", taskId
if comment != "":
echo " Comment: ", comment
proc merge(taskId: string, deleteBranch: bool = false) =
## Merge approved work (APPROVED → COMPLETED)
let db = openBusDb()
defer: db.close()
# Verify state
let st = db.getState(taskId)
if st.isNone:
echo "Worker not found: ", taskId
quit(ExitNotFound)
if st.get != wsApproved:
echo "Worker must be APPROVED to merge, got: ", st.get
quit(ExitInvalidTransition)
# Attempt merge
let ok = mergeToIntegration(taskId)
if not ok:
echo "Merge conflict - returning to WORKING state"
db.transition(taskId, wsApproved, wsWorking)
quit(ExitConflict)
# Success
db.transition(taskId, wsApproved, wsCompleted)
if deleteBranch:
removeBranch(taskId)
removeWorktree(taskId)
echo "Merged: ", taskId
proc cancel(taskId: string, reason: string = "", cleanup: bool = false) =
## Abort a worker (* → FAILED)
let db = openBusDb()
defer: db.close()
db.transitionToFailed(taskId, reason)
echo "Cancelled: ", taskId
if cleanup:
removeWorktree(taskId)
echo " Worktree removed"
# -----------------------------------------------------------------------------
# Agent Commands (run from worktree)
# -----------------------------------------------------------------------------
proc start(task: string = "") =
## Signal ASSIGNED → WORKING (run from worktree)
let taskId = if task != "": task else: getTaskId()
let db = openBusDb()
defer: db.close()
# Check current state
let st = db.getState(taskId)
if st.isNone:
echo "Worker not found: ", taskId
quit(ExitNotFound)
if st.get == wsWorking:
echo "Already WORKING: ", taskId
quit(ExitSuccess)
db.transition(taskId, wsAssigned, wsWorking)
# Start heartbeat
startGlobalHeartbeat(BusDbPath, taskId)
updateGlobalHeartbeat(hsWorking, taskId)
echo "Started work on ", taskId
proc done(skipRebase: bool = false) =
## Signal WORKING → IN_REVIEW (includes rebase)
let ctx = findContext()
let db = openBusDb()
defer: db.close()
let st = db.getState(ctx.taskId)
if st.isNone:
echo "Worker not found: ", ctx.taskId
quit(ExitNotFound)
# Check we're in WORKING or CONFLICTED
if st.get notin {wsWorking, wsConflicted}:
echo "Cannot complete from state: ", st.get
quit(ExitInvalidTransition)
# Rebase unless skipped
if not skipRebase:
let ok = rebaseOnIntegration(ctx.worktree)
if not ok:
db.transition(ctx.taskId, st.get, wsConflicted)
let files = getConflictedFiles(ctx.worktree)
echo "ERROR: Rebase conflict detected"
echo " Conflicting files: ", files.join(", ")
echo " State: CONFLICTED"
echo ""
echo " To resolve:"
echo " 1. Fix conflicts in worktree"
echo " 2. git add <files>"
echo " 3. git rebase --continue"
echo " 4. worker done --skip-rebase"
quit(ExitConflict)
else:
# Verify no rebase in progress
if isRebaseInProgress(ctx.worktree):
echo "ERROR: Rebase still in progress. Run: git rebase --continue"
quit(ExitConflict)
# Push
pushBranch(ctx.worktree, ctx.branch)
# Transition
db.transition(ctx.taskId, st.get, wsInReview)
stopGlobalHeartbeat()
echo "Ready for review: ", ctx.taskId
proc fail(reason: string) =
## Signal WORKING → FAILED
let ctx = findContext()
let db = openBusDb()
defer: db.close()
db.transitionToFailed(ctx.taskId, reason)
stopGlobalHeartbeat()
echo "Failed: ", ctx.taskId, " - ", reason
proc sendHeartbeat(status: string = "working", progress: float = 0.0) =
## Emit a single heartbeat (normally done by background thread)
let ctx = findContext()
let db = openBusDb()
defer: db.close()
let hs = case status
of "idle": hsIdle
of "working": hsWorking
of "blocked": hsBlocked
else: hsWorking
db.writeHeartbeat(ctx.taskId, hs, ctx.taskId, progress)
echo "Heartbeat: ", ctx.taskId, " - ", status
proc show(taskId: string) =
## Show detailed view of a worker
let db = openBusDb()
defer: db.close()
let info = db.getWorker(taskId)
if info.isNone:
echo "Worker not found: ", taskId
quit(ExitNotFound)
let w = info.get
echo ""
echo "Task: ", w.taskId
echo "Description: ", w.description
echo "State: ", w.state
echo "Branch: ", w.branch
echo "Worktree: ", w.worktree
echo ""
echo "Created: ", w.createdAt.format("yyyy-MM-dd HH:mm:ss")
echo "State Changed: ", w.stateChangedAt.format("yyyy-MM-dd HH:mm:ss")
if w.lastHeartbeat != Time():
echo "Last Heartbeat: ", w.lastHeartbeat.format("yyyy-MM-dd HH:mm:ss")
echo ""
echo "Status: ", w.staleLevel
# -----------------------------------------------------------------------------
# CLI Dispatch
# -----------------------------------------------------------------------------
when isMainModule:
dispatchMulti(
[spawn, help = {"taskId": "Task identifier",
"description": "Task description",
"fromBranch": "Base branch"}],
[status, help = {"state": "Filter by state",
"stale": "Show only stale workers",
"json": "Output as JSON",
"watch": "Refresh every 2s"}],
[start, help = {"task": "Task ID (reads from context if empty)"}],
[done, help = {"skipRebase": "Skip rebase (after manual conflict resolution)"}],
[approve, help = {"taskId": "Task to approve",
"by": "Reviewer name",
"comment": "Approval comment"}],
[requestChanges, cmdName = "request-changes",
help = {"taskId": "Task to reject",
"comment": "Feedback for agent"}],
[merge, help = {"taskId": "Task to merge",
"deleteBranch": "Delete branch after merge"}],
[cancel, help = {"taskId": "Task to cancel",
"reason": "Cancellation reason",
"cleanup": "Remove worktree"}],
[fail, help = {"reason": "Failure reason"}],
[sendHeartbeat, cmdName = "heartbeat",
help = {"status": "Status (idle, working, blocked)",
"progress": "Progress 0.0-1.0"}],
[show, help = {"taskId": "Task to show"}]
)

15
src/worker.nimble Normal file
View file

@ -0,0 +1,15 @@
# Package
version = "0.1.0"
author = "dan"
description = "Multi-agent worker coordination CLI"
license = "MIT"
srcDir = "."
bin = @["worker"]
# Dependencies
requires "nim >= 2.0.0"
requires "tiny_sqlite >= 0.2.0"
requires "cligen >= 1.7.0"
# Optional: faster JSON
# requires "jsony >= 1.1.0"

59
src/worker/context.nim Normal file
View file

@ -0,0 +1,59 @@
## Worker context handling
##
## Reads/writes .worker-ctx.json in worktrees for agent discovery.
import std/[os, json, times, strformat]
import ./types
proc writeContext*(worktree: string, ctx: WorkerContext) =
## Write context file to worktree
let path = worktree / ContextFileName
writeFile(path, $ctx.toJson())
proc readContext*(worktree: string = ""): WorkerContext =
## Read context from worktree. If empty, uses current directory.
let dir = if worktree != "": worktree else: getCurrentDir()
let path = dir / ContextFileName
if not fileExists(path):
raise newException(IOError, &"Context file not found: {path}")
let content = readFile(path)
let j = parseJson(content)
return WorkerContext.fromJson(j)
proc findContext*(): WorkerContext =
## Find context by walking up directory tree
var dir = getCurrentDir()
while dir != "" and dir != "/":
let path = dir / ContextFileName
if fileExists(path):
let content = readFile(path)
let j = parseJson(content)
return WorkerContext.fromJson(j)
dir = parentDir(dir)
raise newException(IOError, "No .worker-ctx.json found in directory tree")
proc isInWorktree*(): bool =
## Check if current directory is inside a worker worktree
try:
discard findContext()
return true
except IOError:
return false
proc getTaskId*(): string =
## Get task ID from context
return findContext().taskId
proc createWorkerContext*(taskId, branch, worktree, description: string): WorkerContext =
## Create a new context and write to worktree
result = WorkerContext(
taskId: taskId,
branch: branch,
worktree: worktree,
createdAt: getTime(),
description: description
)
writeContext(worktree, result)

265
src/worker/db.nim Normal file
View file

@ -0,0 +1,265 @@
## SQLite database operations for worker coordination
##
## Key patterns:
## - One connection per thread (never share DbConn)
## - WAL mode for concurrent readers
## - BEGIN IMMEDIATE for write transactions
## - Explicit ack for at-least-once delivery
import std/[os, json, strformat, options, strutils, random]
import tiny_sqlite
import ./types
# Helper for generating unique IDs
proc genOid*(): string =
## Generate a simple unique ID
result = ""
for i in 0..<16:
result.add(chr(ord('a') + rand(25)))
const Schema* = """
-- Messages table (seq is rowid-based for true auto-increment)
CREATE TABLE IF NOT EXISTS messages (
seq INTEGER PRIMARY KEY AUTOINCREMENT,
id TEXT NOT NULL UNIQUE,
ts_ms INTEGER NOT NULL,
from_agent TEXT NOT NULL,
to_agent TEXT,
type TEXT NOT NULL,
correlation_id TEXT,
in_reply_to TEXT,
payload TEXT,
payload_ref TEXT,
CHECK (payload IS NULL OR payload_ref IS NULL)
);
CREATE INDEX IF NOT EXISTS idx_messages_to_seq ON messages(to_agent, seq);
CREATE INDEX IF NOT EXISTS idx_messages_correlation_seq ON messages(correlation_id, seq);
CREATE INDEX IF NOT EXISTS idx_messages_type_from_ts ON messages(type, from_agent, ts_ms);
-- Cursors table (per-agent read position)
CREATE TABLE IF NOT EXISTS cursors (
agent_id TEXT PRIMARY KEY,
last_acked_seq INTEGER NOT NULL DEFAULT 0,
updated_at_ms INTEGER NOT NULL
);
-- Heartbeats table (separate from messages to avoid write contention)
CREATE TABLE IF NOT EXISTS heartbeats (
agent_id TEXT PRIMARY KEY,
ts_ms INTEGER NOT NULL,
status TEXT NOT NULL,
current_task TEXT,
progress REAL
);
-- Task claims with lease expiry
CREATE TABLE IF NOT EXISTS task_claims (
task_id TEXT PRIMARY KEY,
claimed_by TEXT NOT NULL,
claimed_at_ms INTEGER NOT NULL,
lease_until_ms INTEGER NOT NULL
);
-- Workers table (state machine)
CREATE TABLE IF NOT EXISTS workers (
task_id TEXT PRIMARY KEY,
state TEXT NOT NULL,
branch TEXT NOT NULL,
worktree TEXT NOT NULL,
description TEXT,
created_at_ms INTEGER NOT NULL,
state_changed_at_ms INTEGER NOT NULL
);
-- Schema versioning
CREATE TABLE IF NOT EXISTS meta (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
);
-- Export tracking
CREATE TABLE IF NOT EXISTS export_state (
id INTEGER PRIMARY KEY CHECK (id = 1),
last_seq INTEGER NOT NULL DEFAULT 0
);
"""
proc initSchema*(db: DbConn) =
## Initialize database schema
db.execScript(Schema)
# Insert meta if not exists
db.exec("INSERT OR IGNORE INTO meta (key, value) VALUES ('schema_version', '1')")
db.exec("INSERT OR IGNORE INTO export_state (id, last_seq) VALUES (1, 0)")
proc openBusDb*(dbPath: string = BusDbPath): DbConn =
## Open database with required PRAGMAs. One connection per thread.
createDir(parentDir(dbPath))
result = openDatabase(dbPath)
result.exec("PRAGMA busy_timeout = " & $BusyTimeoutMs)
result.exec("PRAGMA foreign_keys = ON")
result.exec("PRAGMA journal_mode = WAL")
result.exec("PRAGMA synchronous = NORMAL")
initSchema(result)
proc optStr*(s: string): Option[string] =
## Convert empty string to none
if s == "": none(string) else: some(s)
proc publish*(db: DbConn, agentId: string, msgType: string,
to: string = "", correlationId: string = "",
inReplyTo: string = "", payload: JsonNode = nil): string =
## Publish a message to the bus. Returns message ID.
let msgId = genOid()
let tsMs = epochMs()
let payloadStr = if payload.isNil: "" else: $payload
db.exec("""
INSERT INTO messages (id, ts_ms, from_agent, to_agent, type,
correlation_id, in_reply_to, payload)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", msgId, tsMs, agentId,
optStr(to),
msgType,
optStr(correlationId),
optStr(inReplyTo),
optStr(payloadStr))
return msgId
type
Message* = object
seq*: int64
id*: string
tsMs*: int64
fromAgent*: string
toAgent*: Option[string]
msgType*: string
correlationId*: Option[string]
inReplyTo*: Option[string]
payload*: Option[JsonNode]
payloadRef*: Option[string]
proc poll*(db: DbConn, agentId: string, limit: int = 100): seq[Message] =
## Fetch unacknowledged messages. Does NOT advance cursor.
let cursor = db.one(
"SELECT last_acked_seq FROM cursors WHERE agent_id = ?",
agentId
)
let lastSeq = if cursor.isSome: cursor.get[0].fromDbValue(int64) else: 0'i64
for row in db.iterate("""
SELECT seq, id, ts_ms, from_agent, to_agent, type,
correlation_id, in_reply_to, payload, payload_ref
FROM messages
WHERE seq > ?
AND (to_agent IS NULL OR to_agent = ?)
ORDER BY seq
LIMIT ?
""", lastSeq, agentId, limit):
var msg = Message(
seq: row[0].fromDbValue(int64),
id: row[1].fromDbValue(string),
tsMs: row[2].fromDbValue(int64),
fromAgent: row[3].fromDbValue(string),
msgType: row[5].fromDbValue(string),
)
if row[4].kind != sqliteNull:
msg.toAgent = some(row[4].fromDbValue(string))
if row[6].kind != sqliteNull:
msg.correlationId = some(row[6].fromDbValue(string))
if row[7].kind != sqliteNull:
msg.inReplyTo = some(row[7].fromDbValue(string))
if row[8].kind != sqliteNull:
msg.payload = some(parseJson(row[8].fromDbValue(string)))
if row[9].kind != sqliteNull:
msg.payloadRef = some(row[9].fromDbValue(string))
result.add(msg)
proc ack*(db: DbConn, agentId: string, seq: int64) =
## Acknowledge successful processing. Advances cursor.
let tsMs = epochMs()
db.exec("""
INSERT INTO cursors (agent_id, last_acked_seq, updated_at_ms)
VALUES (?, ?, ?)
ON CONFLICT(agent_id) DO UPDATE SET
last_acked_seq = MAX(cursors.last_acked_seq, excluded.last_acked_seq),
updated_at_ms = excluded.updated_at_ms
""", agentId, seq, tsMs)
proc writeHeartbeat*(db: DbConn, agentId: string, status: HeartbeatStatus,
currentTask: string = "", progress: float = 0.0) =
## Upsert heartbeat to dedicated table
let tsMs = epochMs()
db.exec("""
INSERT INTO heartbeats (agent_id, ts_ms, status, current_task, progress)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(agent_id) DO UPDATE SET
ts_ms = excluded.ts_ms,
status = excluded.status,
current_task = excluded.current_task,
progress = excluded.progress
""", agentId, tsMs, $status,
optStr(currentTask),
progress)
proc getHeartbeat*(db: DbConn, agentId: string): Option[tuple[tsMs: int64, status: string]] =
## Get last heartbeat for an agent
let row = db.one(
"SELECT ts_ms, status FROM heartbeats WHERE agent_id = ?",
agentId
)
if row.isSome:
return some((tsMs: row.get[0].fromDbValue(int64),
status: row.get[1].fromDbValue(string)))
proc tryClaim*(db: DbConn, taskId, agentId: string): bool =
## Attempt to claim a task. Returns true if successful.
let tsMs = epochMs()
let leaseUntil = tsMs + LeaseDurationMs
db.exec("BEGIN IMMEDIATE")
try:
let existing = db.one(
"SELECT claimed_by, lease_until_ms FROM task_claims WHERE task_id = ?",
taskId
)
if existing.isSome:
let leaseUntilMs = existing.get[1].fromDbValue(int64)
if leaseUntilMs > tsMs:
db.exec("ROLLBACK")
return false
else:
db.exec("DELETE FROM task_claims WHERE task_id = ?", taskId)
db.exec("""
INSERT INTO task_claims (task_id, claimed_by, claimed_at_ms, lease_until_ms)
VALUES (?, ?, ?, ?)
""", taskId, agentId, tsMs, leaseUntil)
db.exec("COMMIT")
return true
except CatchableError:
db.exec("ROLLBACK")
return false
proc renewClaim*(db: DbConn, taskId, agentId: string): bool =
## Renew lease on a claimed task
let tsMs = epochMs()
let leaseUntil = tsMs + LeaseDurationMs
db.exec("""
UPDATE task_claims
SET lease_until_ms = ?
WHERE task_id = ? AND claimed_by = ?
""", leaseUntil, taskId, agentId)
return db.changes() > 0
proc releaseClaim*(db: DbConn, taskId, agentId: string) =
## Release a claim when done
db.exec("""
DELETE FROM task_claims
WHERE task_id = ? AND claimed_by = ?
""", taskId, agentId)

143
src/worker/git.nim Normal file
View file

@ -0,0 +1,143 @@
## Git operations for worker coordination
##
## Uses osproc for subprocess execution.
## Key operations: worktree create/remove, rebase, merge
import std/[os, osproc, strutils, strformat, streams]
import ./types
proc runGit*(args: varargs[string], workDir: string = ""): tuple[output: string, exitCode: int] =
## Run a git command and return output + exit code
var cmd = "git"
var fullArgs: seq[string] = @[]
if workDir != "":
fullArgs.add("-C")
fullArgs.add(workDir)
for arg in args:
fullArgs.add(arg)
let process = startProcess(cmd, args = fullArgs,
options = {poUsePath, poStdErrToStdOut})
result.output = process.outputStream.readAll()
result.exitCode = process.waitForExit()
process.close()
proc runGitCheck*(args: varargs[string], workDir: string = ""): string =
## Run git command, raise GitError on failure
let (output, code) = runGit(args, workDir)
if code != 0:
raise newException(GitError, &"Git command failed ({code}): {output}")
return output.strip()
proc createWorktree*(taskId: string, fromBranch: string = "origin/integration"): tuple[branch, worktree: string] =
## Create a worktree for a task
let branch = &"feat/{taskId}"
let worktree = &"{WorktreesDir}/{taskId}"
# Fetch latest
discard runGit("fetch", "origin")
# Create branch from base
discard runGitCheck("branch", branch, fromBranch)
# Create worktree
createDir(parentDir(worktree))
discard runGitCheck("worktree", "add", worktree, branch)
return (branch, worktree)
proc removeWorktree*(taskId: string) =
## Remove a worktree
let worktree = &"{WorktreesDir}/{taskId}"
if dirExists(worktree):
discard runGit("worktree", "remove", "--force", worktree)
proc removeBranch*(taskId: string, remote: bool = true) =
## Remove feature branch
let branch = &"feat/{taskId}"
discard runGit("branch", "-D", branch)
if remote:
discard runGit("push", "origin", "--delete", branch)
proc rebaseOnIntegration*(worktree: string): bool =
## Rebase worktree on integration branch. Returns false on conflict.
discard runGit("fetch", "origin", workDir = worktree)
let (output, code) = runGit("rebase", "origin/integration", workDir = worktree)
if code != 0:
if "CONFLICT" in output or "conflict" in output:
return false
# Other error - abort and raise
discard runGit("rebase", "--abort", workDir = worktree)
raise newException(GitError, &"Rebase failed: {output}")
return true
proc isRebaseInProgress*(worktree: string): bool =
## Check if a rebase is in progress
return dirExists(worktree / ".git" / "rebase-merge") or
dirExists(worktree / ".git" / "rebase-apply")
proc pushBranch*(worktree: string, branch: string) =
## Push branch to origin
discard runGitCheck("push", "-u", "origin", branch, workDir = worktree)
proc mergeToIntegration*(taskId: string, maxRetries: int = 3): bool =
## Merge feature branch to integration with retry loop
let branch = &"feat/{taskId}"
for attempt in 1..maxRetries:
# Fetch latest
discard runGitCheck("fetch", "origin", "integration", branch)
# Checkout and reset integration
discard runGitCheck("checkout", "integration")
discard runGitCheck("reset", "--hard", "origin/integration")
# Merge
let (mergeOutput, mergeCode) = runGit("merge", "--no-ff", branch,
"-m", &"Merge {branch}")
if mergeCode != 0:
if "CONFLICT" in mergeOutput:
discard runGit("merge", "--abort")
return false
raise newException(GitError, &"Merge failed: {mergeOutput}")
# Push
let (pushOutput, pushCode) = runGit("push", "origin", "integration")
if pushCode == 0:
return true
# Push failed, probably remote changed - retry
if attempt < maxRetries:
sleep(1000) # Brief pause before retry
continue
else:
raise newException(GitError, &"Push failed after {maxRetries} retries: {pushOutput}")
return false
proc getConflictedFiles*(worktree: string): seq[string] =
## Get list of files with conflicts
let (output, _) = runGit("diff", "--name-only", "--diff-filter=U",
workDir = worktree)
for line in output.splitLines():
if line.strip() != "":
result.add(line.strip())
proc getBranchStatus*(worktree: string): tuple[ahead, behind: int] =
## Get commits ahead/behind integration
let (output, code) = runGit("rev-list", "--left-right", "--count",
"origin/integration...HEAD", workDir = worktree)
if code != 0:
return (0, 0)
let parts = output.strip().split('\t')
if parts.len >= 2:
try:
result.behind = parseInt(parts[0])
result.ahead = parseInt(parts[1])
except ValueError:
discard

120
src/worker/heartbeat.nim Normal file
View file

@ -0,0 +1,120 @@
## Background heartbeat thread
##
## Nim threads don't share memory by default.
## Use channels for communication between main and heartbeat threads.
##
## Key patterns:
## - Dedicated thread with own SQLite connection
## - Channel for stop signal and status updates
## - Non-blocking tryRecv to check for commands
import std/[os, times]
import tiny_sqlite
import ./types
import ./db
type
HeartbeatCmd* = enum
hbUpdateStatus
hbStop
HeartbeatMsg* = object
cmd*: HeartbeatCmd
status*: HeartbeatStatus
task*: string
progress*: float
HeartbeatThread* = object
thread: Thread[HeartbeatArgs]
channel: Channel[HeartbeatMsg]
running: bool
HeartbeatArgs = object
dbPath: string
agentId: string
channelPtr: ptr Channel[HeartbeatMsg]
var globalChannel: Channel[HeartbeatMsg]
proc heartbeatWorker(args: HeartbeatArgs) {.thread.} =
## Dedicated heartbeat thread - owns its own DB connection
let db = openBusDb(args.dbPath)
var status = hsIdle
var task = ""
var progress = 0.0
while true:
# Check for commands (non-blocking)
let tried = args.channelPtr[].tryRecv()
if tried.dataAvailable:
case tried.msg.cmd
of hbStop:
db.close()
return
of hbUpdateStatus:
status = tried.msg.status
task = tried.msg.task
progress = tried.msg.progress
# Write heartbeat
try:
db.writeHeartbeat(args.agentId, status, task, progress)
except CatchableError as e:
# Log but don't crash
stderr.writeLine("Heartbeat error: ", e.msg)
sleep(HeartbeatIntervalMs)
proc startHeartbeat*(dbPath, agentId: string): ptr HeartbeatThread =
## Start the heartbeat thread. Returns handle for control.
result = cast[ptr HeartbeatThread](alloc0(sizeof(HeartbeatThread)))
result.channel.open()
result.running = true
let args = HeartbeatArgs(
dbPath: dbPath,
agentId: agentId,
channelPtr: addr result.channel
)
createThread(result.thread, heartbeatWorker, args)
proc updateStatus*(hb: ptr HeartbeatThread, status: HeartbeatStatus,
task: string = "", progress: float = 0.0) =
## Update the status reported in heartbeats
if hb != nil and hb.running:
hb.channel.send(HeartbeatMsg(
cmd: hbUpdateStatus,
status: status,
task: task,
progress: progress
))
proc stopHeartbeat*(hb: ptr HeartbeatThread) =
## Stop the heartbeat thread and clean up
if hb != nil and hb.running:
hb.channel.send(HeartbeatMsg(cmd: hbStop))
joinThread(hb.thread)
hb.channel.close()
hb.running = false
dealloc(hb)
# Simpler API using a global channel (for single-threaded CLI usage)
var globalHeartbeat: ptr HeartbeatThread = nil
proc startGlobalHeartbeat*(dbPath, agentId: string) =
## Start heartbeat using global state (simpler API for CLI)
if globalHeartbeat != nil:
return # Already running
globalHeartbeat = startHeartbeat(dbPath, agentId)
proc updateGlobalHeartbeat*(status: HeartbeatStatus, task: string = "",
progress: float = 0.0) =
## Update global heartbeat status
updateStatus(globalHeartbeat, status, task, progress)
proc stopGlobalHeartbeat*() =
## Stop global heartbeat
if globalHeartbeat != nil:
stopHeartbeat(globalHeartbeat)
globalHeartbeat = nil

241
src/worker/state.nim Normal file
View file

@ -0,0 +1,241 @@
## Worker state machine
##
## States: ASSIGNED → WORKING → IN_REVIEW → APPROVED → COMPLETED
## Also: CONFLICTED (rebase), FAILED (error)
##
## Key patterns:
## - Compare-and-set with BEGIN IMMEDIATE
## - STALE computed from heartbeat age (not persistent)
import std/[tables, strformat, json, options, times]
import tiny_sqlite
import ./types
import ./db
const ValidTransitions* = {
wsAssigned: @[wsWorking, wsFailed],
wsWorking: @[wsInReview, wsConflicted, wsFailed],
wsConflicted: @[wsInReview, wsWorking, wsFailed],
wsInReview: @[wsApproved, wsWorking, wsFailed],
wsApproved: @[wsCompleted, wsWorking, wsFailed],
wsCompleted: newSeq[WorkerState](), # Empty - no transitions out
wsFailed: @[wsAssigned], # Can retry
}.toTable
proc canTransition*(fromState, toState: WorkerState): bool =
## Check if transition is valid
if fromState notin ValidTransitions:
return false
return toState in ValidTransitions[fromState]
proc transition*(db: DbConn, taskId: string, fromState, toState: WorkerState) =
## Attempt state transition with compare-and-set guard.
## Raises InvalidTransition or StaleState on failure.
if not canTransition(fromState, toState):
raise newException(InvalidTransition, &"Invalid transition: {fromState} → {toState}")
db.exec("BEGIN IMMEDIATE")
try:
let row = db.one(
"SELECT state FROM workers WHERE task_id = ?",
taskId
)
if row.isNone:
db.exec("ROLLBACK")
raise newException(WorkerNotFound, &"Worker not found: {taskId}")
let currentStr = row.get[0].fromDbValue(string)
let current = parseState(currentStr)
if current != fromState:
db.exec("ROLLBACK")
raise newException(StaleState, &"Expected {fromState}, got {current}")
let tsMs = epochMs()
db.exec("""
UPDATE workers
SET state = ?, state_changed_at_ms = ?
WHERE task_id = ?
""", $toState, tsMs, taskId)
# Publish state change message
discard db.publish("worker", "state_change",
correlationId = taskId,
payload = %*{
"from": $fromState,
"to": $toState,
"task_id": taskId
})
db.exec("COMMIT")
except CatchableError:
db.exec("ROLLBACK")
raise
proc transitionToFailed*(db: DbConn, taskId: string, reason: string = "") =
## Transition any state to FAILED
db.exec("BEGIN IMMEDIATE")
try:
let row = db.one(
"SELECT state FROM workers WHERE task_id = ?",
taskId
)
if row.isNone:
db.exec("ROLLBACK")
raise newException(WorkerNotFound, &"Worker not found: {taskId}")
let currentStr = row.get[0].fromDbValue(string)
let current = parseState(currentStr)
if current == wsCompleted:
db.exec("ROLLBACK")
raise newException(InvalidTransition, "Cannot fail a completed worker")
let tsMs = epochMs()
db.exec("""
UPDATE workers
SET state = ?, state_changed_at_ms = ?
WHERE task_id = ?
""", $wsFailed, tsMs, taskId)
discard db.publish("worker", "task_failed",
correlationId = taskId,
payload = %*{
"from": $current,
"task_id": taskId,
"reason": reason
})
db.exec("COMMIT")
except CatchableError:
db.exec("ROLLBACK")
raise
proc getState*(db: DbConn, taskId: string): Option[WorkerState] =
## Get current state for a worker
let row = db.one(
"SELECT state FROM workers WHERE task_id = ?",
taskId
)
if row.isSome:
return some(parseState(row.get[0].fromDbValue(string)))
proc getWorker*(db: DbConn, taskId: string): Option[WorkerInfo] =
## Get full worker information
let row = db.one("""
SELECT task_id, state, branch, worktree, description,
created_at_ms, state_changed_at_ms
FROM workers WHERE task_id = ?
""", taskId)
if row.isNone:
return none(WorkerInfo)
var info = WorkerInfo(
taskId: row.get[0].fromDbValue(string),
state: parseState(row.get[1].fromDbValue(string)),
branch: row.get[2].fromDbValue(string),
worktree: row.get[3].fromDbValue(string),
createdAt: fromUnix(row.get[5].fromDbValue(int64) div 1000),
stateChangedAt: fromUnix(row.get[6].fromDbValue(int64) div 1000),
)
if row.get[4].kind != sqliteNull:
info.description = row.get[4].fromDbValue(string)
# Get heartbeat
let hb = db.getHeartbeat(taskId)
if hb.isSome:
info.lastHeartbeat = fromUnix(hb.get.tsMs div 1000)
return some(info)
proc getAllWorkers*(db: DbConn): seq[WorkerInfo] =
## Get all workers with their current state
for row in db.iterate("""
SELECT w.task_id, w.state, w.branch, w.worktree, w.description,
w.created_at_ms, w.state_changed_at_ms,
h.ts_ms as heartbeat_ms
FROM workers w
LEFT JOIN heartbeats h ON w.task_id = h.agent_id
ORDER BY w.state_changed_at_ms DESC
"""):
var info = WorkerInfo(
taskId: row[0].fromDbValue(string),
state: parseState(row[1].fromDbValue(string)),
branch: row[2].fromDbValue(string),
worktree: row[3].fromDbValue(string),
createdAt: fromUnix(row[5].fromDbValue(int64) div 1000),
stateChangedAt: fromUnix(row[6].fromDbValue(int64) div 1000),
)
if row[4].kind != sqliteNull:
info.description = row[4].fromDbValue(string)
if row[7].kind != sqliteNull:
info.lastHeartbeat = fromUnix(row[7].fromDbValue(int64) div 1000)
result.add(info)
proc createWorker*(db: DbConn, taskId, branch, worktree: string,
description: string = ""): WorkerInfo =
## Create a new worker in ASSIGNED state
let tsMs = epochMs()
db.exec("""
INSERT INTO workers (task_id, state, branch, worktree, description,
created_at_ms, state_changed_at_ms)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", taskId, $wsAssigned, branch, worktree,
optStr(description),
tsMs, tsMs)
discard db.publish("orchestrator", "task_assign",
correlationId = taskId,
payload = %*{
"task_id": taskId,
"branch": branch,
"worktree": worktree,
"description": description
})
return WorkerInfo(
taskId: taskId,
state: wsAssigned,
branch: branch,
worktree: worktree,
description: description,
createdAt: fromUnix(tsMs div 1000),
stateChangedAt: fromUnix(tsMs div 1000),
)
proc isStale*(info: WorkerInfo): bool =
## Check if worker is stale based on heartbeat age
if info.state notin {wsAssigned, wsWorking}:
return false
if info.lastHeartbeat == Time():
return true # No heartbeat yet
let age = (epochMs() - toUnix(info.lastHeartbeat) * 1000)
return age > StaleThresholdMs
proc staleLevel*(info: WorkerInfo): string =
## Get stale level: ok, WARN, STALE, DEAD
if info.state notin {wsAssigned, wsWorking}:
return "ok"
if info.lastHeartbeat == Time():
return "DEAD"
let age = (epochMs() - toUnix(info.lastHeartbeat) * 1000)
if age > DeadThresholdMs:
return "DEAD"
elif age > StaleThresholdMs:
return "STALE"
elif age > StaleWarnThresholdMs:
return "WARN"
else:
return "ok"

113
src/worker/types.nim Normal file
View file

@ -0,0 +1,113 @@
## Shared types for worker coordination
##
## This module defines the core types used across the worker CLI.
import std/[times, json]
type
WorkerState* = enum
## Worker lifecycle states
## NOTE: STALE is NOT a state - it's computed from heartbeat age
wsAssigned = "ASSIGNED"
wsWorking = "WORKING"
wsConflicted = "CONFLICTED"
wsInReview = "IN_REVIEW"
wsApproved = "APPROVED"
wsCompleted = "COMPLETED"
wsFailed = "FAILED"
WorkerContext* = object
## Context for a worker, stored in .worker-ctx.json
taskId*: string
branch*: string
worktree*: string
createdAt*: Time
description*: string
WorkerInfo* = object
## Full worker information for status display
taskId*: string
state*: WorkerState
branch*: string
worktree*: string
createdAt*: Time
stateChangedAt*: Time
lastHeartbeat*: Time
description*: string
HeartbeatStatus* = enum
## Status reported in heartbeats
hsIdle = "idle"
hsWorking = "working"
hsBlocked = "blocked"
# Errors
InvalidTransition* = object of CatchableError
StaleState* = object of CatchableError
WorkerNotFound* = object of CatchableError
GitError* = object of CatchableError
DbError* = object of CatchableError
const
# Exit codes
ExitSuccess* = 0
ExitUsageError* = 2
ExitInvalidTransition* = 3
ExitGitError* = 4
ExitDbError* = 5
ExitConflict* = 6
ExitNotFound* = 7
# Paths
WorkerStateDir* = ".worker-state"
BusDbPath* = ".worker-state/bus.db"
BusJsonlPath* = ".worker-state/bus.jsonl"
BlobsDir* = ".worker-state/blobs"
WorkersDir* = ".worker-state/workers"
WorktreesDir* = "worktrees"
ContextFileName* = ".worker-ctx.json"
# Timing
HeartbeatIntervalMs* = 10_000 # 10 seconds
StaleWarnThresholdMs* = 30_000 # 30 seconds
StaleThresholdMs* = 100_000 # 100 seconds
DeadThresholdMs* = 300_000 # 5 minutes
LeaseDurationMs* = 60_000 # 60 seconds
BusyTimeoutMs* = 5000 # 5 seconds
proc parseState*(s: string): WorkerState =
## Parse state string to enum
case s
of "ASSIGNED": wsAssigned
of "WORKING": wsWorking
of "CONFLICTED": wsConflicted
of "IN_REVIEW": wsInReview
of "APPROVED": wsApproved
of "COMPLETED": wsCompleted
of "FAILED": wsFailed
else: raise newException(ValueError, "Unknown state: " & s)
proc epochMs*(): int64 =
## Current time as epoch milliseconds
let t = getTime()
toUnix(t) * 1000 + nanosecond(t) div 1_000_000
proc toJson*(ctx: WorkerContext): JsonNode =
## Serialize context to JSON
%*{
"task_id": ctx.taskId,
"branch": ctx.branch,
"worktree": ctx.worktree,
"created_at": $ctx.createdAt,
"description": ctx.description
}
proc fromJson*(T: typedesc[WorkerContext], j: JsonNode): WorkerContext =
## Deserialize context from JSON
WorkerContext(
taskId: j["task_id"].getStr,
branch: j["branch"].getStr,
worktree: j["worktree"].getStr,
createdAt: parse(j["created_at"].getStr, "yyyy-MM-dd'T'HH:mm:sszzz").toTime(),
description: j{"description"}.getStr
)