skills/docs/design/message-passing-layer.md
dan 1c66d019bd feat: add worker CLI scaffold in Nim
Multi-agent coordination CLI with SQLite message bus:
- State machine: ASSIGNED -> WORKING -> IN_REVIEW -> APPROVED -> COMPLETED
- Commands: spawn, start, done, approve, merge, cancel, fail, heartbeat
- SQLite WAL mode, dedicated heartbeat thread, channel-based IPC
- cligen for CLI, tiny_sqlite for DB, ORC memory management

Design docs for branch-per-worker, state machine, message passing,
and human observability patterns.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-10 18:47:47 -08:00

750 lines
22 KiB
Markdown

# Message Passing Layer Design
**Status**: Draft (v4 - Nim implementation)
**Bead**: skills-ms5
**Epic**: skills-s6y (Multi-agent orchestration: Lego brick architecture)
**Language**: Nim (ORC, threads, tiny_sqlite)
## Changelog
- **v4**: Nim implementation details (tiny_sqlite, dedicated heartbeat thread, channels)
- **v3**: Fixed BLOCKERs from orch spec-review (seq autoincrement, ack semantics, heartbeats table, epoch timestamps, JSONL export, task leases)
- **v2**: Changed to SQLite primary (was JSONL+flock)
## Overview
This document defines the local message passing layer for multi-agent coordination. Agents communicate through a shared SQLite database with ACID guarantees. A JSONL export provides human debugging and git history.
## Design Principles
1. **SQLite as source of truth** - Atomic commits, crash recovery, indexed queries
2. **JSONL for observability** - Human-readable export for debugging and git diffs
3. **File-based, no network** - Any agent with SQLite bindings can participate
4. **Liveness detection** - Heartbeats to detect dead agents
5. **Blob storage for large payloads** - Keep database lean
## Why SQLite Over JSONL+flock
| Concern | JSONL+flock | SQLite |
|---------|-------------|--------|
| Crash mid-write | Corrupted log, manual recovery | Atomic commit, automatic rollback |
| Payload >4KB | Interleaved writes possible | Always atomic |
| Query efficiency | O(N) scan | O(1) with index |
| Portability | POSIX flock only | Works everywhere |
| Complexity | DIY recovery logic | Built-in WAL, transactions |
See `docs/design/message-passing-comparison.md` for full analysis.
## Prior Art
| System | Approach | Key Insight |
|--------|----------|-------------|
| Beads | SQLite + JSONL export | SQLite for coordination, JSONL for git |
| Tissue | JSONL primary, SQLite cache | Simpler but less safe |
| Kafka | Append-only log, offsets | Total ordering, replay |
| Redis Streams | Consumer groups, claims | First claim wins |
## Directory Structure
```
.worker-state/
├── bus.db # SQLite - source of truth (gitignored)
├── bus.jsonl # JSONL export for debugging/git (read-only)
├── blobs/ # Content-addressable storage for large payloads
│ └── sha256-a1b2c3...
└── workers/
└── worker-auth.json # Worker state (separate from messages)
```
## Database Schema
```sql
-- Messages table (seq is rowid-based for true auto-increment)
CREATE TABLE messages (
seq INTEGER PRIMARY KEY AUTOINCREMENT, -- Guaranteed monotonic, auto-assigned
id TEXT NOT NULL UNIQUE, -- UUID for idempotent retries
ts_ms INTEGER NOT NULL, -- Epoch milliseconds (reliable comparisons)
from_agent TEXT NOT NULL,
to_agent TEXT, -- NULL = broadcast
type TEXT NOT NULL,
correlation_id TEXT,
in_reply_to TEXT,
payload TEXT, -- JSON blob or NULL
payload_ref TEXT, -- Blob hash for large payloads
CHECK (payload IS NULL OR payload_ref IS NULL) -- Not both
);
CREATE INDEX idx_messages_to_seq ON messages(to_agent, seq);
CREATE INDEX idx_messages_correlation_seq ON messages(correlation_id, seq);
CREATE INDEX idx_messages_type_from_ts ON messages(type, from_agent, ts_ms);
-- Cursors table (per-agent read position)
-- NOTE: Cursor is only advanced AFTER successful processing (at-least-once)
CREATE TABLE cursors (
agent_id TEXT PRIMARY KEY,
last_acked_seq INTEGER NOT NULL DEFAULT 0, -- Last successfully processed
updated_at_ms INTEGER NOT NULL
);
-- Heartbeats table (separate from messages to avoid write contention)
-- One row per agent, upsert pattern
CREATE TABLE heartbeats (
agent_id TEXT PRIMARY KEY,
ts_ms INTEGER NOT NULL,
status TEXT NOT NULL, -- idle, working, blocked
current_task TEXT,
progress REAL
);
-- Task claims with lease expiry (prevents zombie claims)
CREATE TABLE task_claims (
task_id TEXT PRIMARY KEY,
claimed_by TEXT NOT NULL,
claimed_at_ms INTEGER NOT NULL,
lease_until_ms INTEGER NOT NULL -- Must renew or release
);
-- Schema versioning for migrations
CREATE TABLE meta (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
);
INSERT INTO meta (key, value) VALUES ('schema_version', '1');
-- Export tracking (not line count!)
CREATE TABLE export_state (
id INTEGER PRIMARY KEY CHECK (id = 1),
last_seq INTEGER NOT NULL DEFAULT 0
);
INSERT INTO export_state (id, last_seq) VALUES (1, 0);
```
### Connection Setup (REQUIRED)
PRAGMAs must be set on every connection. **Each thread must have its own connection.**
```nim
import tiny_sqlite
proc openBusDb*(dbPath: string): DbConn =
## Open database with required PRAGMAs. One connection per thread.
result = openDatabase(dbPath)
result.exec("PRAGMA busy_timeout = 5000")
result.exec("PRAGMA foreign_keys = ON")
result.exec("PRAGMA journal_mode = WAL")
result.exec("PRAGMA synchronous = NORMAL")
```
**Critical**: Do NOT share `DbConn` across threads. Each thread opens its own connection.
## Message Schema
Same fields as before, stored in SQLite:
```json
{
"id": "uuid-v4",
"ts": "2026-01-10T14:00:00.000Z",
"from": "worker-auth",
"to": "orchestrator",
"type": "task_done",
"correlation_id": "skills-abc",
"payload": {
"branch": "worker-auth/skills-abc",
"commit": "a1b2c3d4"
}
}
```
### Message Types
**Task Lifecycle:**
- `task_assign` - Orchestrator assigns task to worker
- `task_claim` - Worker claims a task (first wins)
- `task_progress` - Worker reports progress
- `task_done` - Worker completed task
- `task_failed` - Worker failed task
- `task_blocked` - Worker blocked on dependency
**Coordination:**
- `heartbeat` - Liveness signal
- `state_change` - Worker state transition
- `review_request` - Request review of work
- `review_result` - Review approved/rejected
**System:**
- `agent_started` - Agent came online
- `agent_stopped` - Agent shutting down gracefully
## Delivery Semantics
**This layer provides AT-LEAST-ONCE delivery.**
- Messages are durable once committed
- Cursor only advances after successful processing
- Handlers MUST be idempotent or deduplicate by `message.id`
- Out-of-order processing is possible between unrelated messages
## Write Protocol
### Publishing Messages
```python
import sqlite3
import uuid
import json
import time
def publish(db_path, agent_id, message):
msg_id = message.get('id') or str(uuid.uuid4())
ts_ms = int(time.time() * 1000) # Epoch milliseconds
# Handle large payloads
payload = message.get('payload')
payload_ref = None
if payload:
payload_json = json.dumps(payload)
if len(payload_json) > 4096:
payload_ref = save_blob(payload) # Atomic write (see Blob Storage)
payload = None
else:
payload = payload_json
conn = get_connection(db_path)
try:
# Use plain BEGIN for simple inserts (less contention than IMMEDIATE)
# seq is auto-assigned by SQLite
conn.execute("""
INSERT INTO messages (id, ts_ms, from_agent, to_agent, type,
correlation_id, in_reply_to, payload, payload_ref)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
msg_id,
ts_ms,
agent_id,
message.get('to'),
message['type'],
message.get('correlation_id'),
message.get('in_reply_to'),
payload,
payload_ref
))
conn.commit()
except sqlite3.IntegrityError:
# Duplicate id = already published (idempotent retry)
conn.rollback()
except Exception:
conn.rollback()
raise
finally:
conn.close()
return msg_id
```
### Write Contention
- SQLite WAL allows concurrent readers but **one writer at a time**
- `busy_timeout = 5000` handles short contention
- For high-frequency writes, add retry with exponential backoff
- Heartbeats use separate table to avoid contending with task messages
## Read Protocol
### Polling for New Messages (At-Least-Once)
**CRITICAL**: Do NOT update cursor until message is successfully processed.
```python
def poll(db_path, agent_id, limit=100):
"""Fetch unacknowledged messages. Does NOT advance cursor."""
conn = get_connection(db_path)
# Get cursor position
cursor = conn.execute(
"SELECT last_acked_seq FROM cursors WHERE agent_id = ?",
(agent_id,)
).fetchone()
last_seq = cursor['last_acked_seq'] if cursor else 0
# Fetch new messages for this agent (or broadcasts)
rows = conn.execute("""
SELECT * FROM messages
WHERE seq > ?
AND (to_agent IS NULL OR to_agent = ?)
ORDER BY seq
LIMIT ?
""", (last_seq, agent_id, limit)).fetchall()
conn.close()
# Convert to dicts, resolve blob refs
messages = []
for row in rows:
msg = dict(row)
if msg['payload_ref']:
try:
msg['payload'] = load_blob(msg['payload_ref'])
except FileNotFoundError:
msg['payload_error'] = 'blob_missing'
elif msg['payload']:
try:
msg['payload'] = json.loads(msg['payload'])
except json.JSONDecodeError:
msg['payload_error'] = 'decode_failed'
messages.append(msg)
return messages
def ack(db_path, agent_id, seq):
"""Acknowledge successful processing of message. Advances cursor."""
conn = get_connection(db_path)
ts_ms = int(time.time() * 1000)
# Only advance forward (monotonic)
conn.execute("""
INSERT INTO cursors (agent_id, last_acked_seq, updated_at_ms)
VALUES (?, ?, ?)
ON CONFLICT(agent_id) DO UPDATE SET
last_acked_seq = MAX(cursors.last_acked_seq, excluded.last_acked_seq),
updated_at_ms = excluded.updated_at_ms
""", (agent_id, seq, ts_ms))
conn.commit()
conn.close()
```
### Processing Loop
```python
def process_loop(db_path, agent_id, handler):
"""Process messages with at-least-once semantics."""
while running:
messages = poll(db_path, agent_id)
for msg in messages:
try:
handler(msg)
ack(db_path, agent_id, msg['seq']) # Only after success
except Exception as e:
# Don't ack - message will be redelivered
log.error(f"Failed to process {msg['id']}: {e}")
break # Stop batch on failure
time.sleep(poll_interval)
```
### Adaptive Polling
```python
POLL_ACTIVE = 0.2 # 200ms when working
POLL_IDLE = 2.0 # 2s when idle
POLL_BACKOFF = 1.5 # Multiplier on each empty poll
poll_interval = POLL_ACTIVE
while running:
messages = poll(db_path, my_id)
if messages:
process(messages)
poll_interval = POLL_ACTIVE # Reset to fast
else:
poll_interval = min(poll_interval * POLL_BACKOFF, POLL_IDLE)
time.sleep(poll_interval)
```
## Heartbeat Protocol
Heartbeats use a **separate table** with upsert to avoid contending with task messages.
**CRITICAL**: Heartbeats MUST run in a dedicated thread to avoid false death detection during blocking operations (LLM calls can take 60s+).
### Nim Implementation
Nim threads don't share memory by default. Use channels for communication:
```nim
import std/[times, os, channels]
import tiny_sqlite
type
HeartbeatCmd* = enum
hbUpdateStatus, hbStop
HeartbeatMsg* = object
cmd*: HeartbeatCmd
status*: string
task*: string
progress*: float
var heartbeatChan*: Channel[HeartbeatMsg]
proc writeHeartbeat(db: DbConn, agentId, status, task: string, progress: float) =
let tsMs = toUnix(getTime()) * 1000 + nanosecond(getTime()) div 1_000_000
db.exec("""
INSERT INTO heartbeats (agent_id, ts_ms, status, current_task, progress)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(agent_id) DO UPDATE SET
ts_ms = excluded.ts_ms,
status = excluded.status,
current_task = excluded.current_task,
progress = excluded.progress
""", agentId, tsMs, status, task, progress)
proc heartbeatWorker*(args: tuple[dbPath, agentId: string]) {.thread.} =
## Dedicated heartbeat thread - owns its own DB connection
let db = openBusDb(args.dbPath)
var status = "idle"
var task = ""
var progress = 0.0
while true:
# Check for commands (non-blocking)
let tried = heartbeatChan.tryRecv()
if tried.dataAvailable:
case tried.msg.cmd
of hbStop:
db.close()
return
of hbUpdateStatus:
status = tried.msg.status
task = tried.msg.task
progress = tried.msg.progress
# Write heartbeat
try:
writeHeartbeat(db, args.agentId, status, task, progress)
except CatchableError:
discard # Log but don't crash
sleep(10_000) # 10 seconds
# Usage from main thread:
proc startHeartbeat*(dbPath, agentId: string): Thread[tuple[dbPath, agentId: string]] =
heartbeatChan.open()
createThread(result, heartbeatWorker, (dbPath, agentId))
proc updateHeartbeatStatus*(status, task: string, progress: float) =
heartbeatChan.send(HeartbeatMsg(cmd: hbUpdateStatus, status: status, task: task, progress: progress))
proc stopHeartbeat*() =
heartbeatChan.send(HeartbeatMsg(cmd: hbStop))
```
### Heartbeat Rules
1. Emit every 10 seconds via background thread
2. Include current status: `idle`, `working`, `blocked`
3. Stale thresholds: 30s = WARN, 100s = STALE, 5min = DEAD
### Liveness Query
```sql
-- Find suspected dead agents (no heartbeat in 30s)
-- Uses epoch ms for reliable comparison
SELECT agent_id, ts_ms,
(strftime('%s', 'now') * 1000 - ts_ms) / 1000 AS age_seconds
FROM heartbeats
WHERE ts_ms < (strftime('%s', 'now') * 1000 - 30000);
```
## Task Claim Pattern
Task claims use a **dedicated table with leases** to prevent zombie claims when agents die.
```python
LEASE_DURATION_MS = 60000 # 60 seconds
def try_claim_task(db_path, task_id, agent_id):
"""Attempt to claim a task. Returns True if successful."""
conn = get_connection(db_path)
ts_ms = int(time.time() * 1000)
lease_until = ts_ms + LEASE_DURATION_MS
try:
conn.execute("BEGIN IMMEDIATE")
# Check for existing valid claim
existing = conn.execute("""
SELECT claimed_by, lease_until_ms FROM task_claims
WHERE task_id = ?
""", (task_id,)).fetchone()
if existing:
if existing['lease_until_ms'] > ts_ms:
# Valid claim exists
conn.rollback()
return False
else:
# Expired claim - delete and reclaim
conn.execute("DELETE FROM task_claims WHERE task_id = ?", (task_id,))
# Insert new claim
conn.execute("""
INSERT INTO task_claims (task_id, claimed_by, claimed_at_ms, lease_until_ms)
VALUES (?, ?, ?, ?)
""", (task_id, agent_id, ts_ms, lease_until))
conn.commit()
return True
except sqlite3.OperationalError:
# Lock contention - another agent got there first
conn.rollback()
return False
finally:
conn.close()
def renew_claim(db_path, task_id, agent_id):
"""Renew lease on a claimed task. Call periodically while working."""
conn = get_connection(db_path)
ts_ms = int(time.time() * 1000)
lease_until = ts_ms + LEASE_DURATION_MS
result = conn.execute("""
UPDATE task_claims
SET lease_until_ms = ?
WHERE task_id = ? AND claimed_by = ?
""", (lease_until, task_id, agent_id))
conn.commit()
updated = result.rowcount > 0
conn.close()
return updated
def release_claim(db_path, task_id, agent_id):
"""Release a claim when done (success or failure)."""
conn = get_connection(db_path)
conn.execute("""
DELETE FROM task_claims
WHERE task_id = ? AND claimed_by = ?
""", (task_id, agent_id))
conn.commit()
conn.close()
```
### Claim Lifecycle
1. Agent calls `try_claim_task()` - gets exclusive claim with 60s lease
2. Agent periodically calls `renew_claim()` (every 30s) while working
3. On completion, agent calls `release_claim()`
4. If agent dies, lease expires and task becomes claimable again
## JSONL Export for Debugging
Background process or post-commit hook exports to JSONL.
**CRITICAL**: Track last exported seq in database, NOT by counting lines. Seq values can have gaps from rollbacks.
```python
def export_to_jsonl(db_path, jsonl_path):
"""Export new messages to JSONL. Idempotent, crash-safe."""
conn = get_connection(db_path)
# Get last exported seq from database
row = conn.execute(
"SELECT last_seq FROM export_state WHERE id = 1"
).fetchone()
last_exported = row['last_seq'] if row else 0
# Export new messages
rows = conn.execute(
"SELECT * FROM messages WHERE seq > ? ORDER BY seq",
(last_exported,)
).fetchall()
if not rows:
conn.close()
return
max_seq = last_exported
with open(jsonl_path, 'a') as f:
for row in rows:
msg = dict(row)
# Optionally resolve blob refs for full content
f.write(json.dumps(msg) + '\n')
max_seq = max(max_seq, msg['seq'])
# Update export state (atomic with file write visible)
conn.execute("""
UPDATE export_state SET last_seq = ? WHERE id = 1
""", (max_seq,))
conn.commit()
conn.close()
```
This gives you `tail -f bus.jsonl` for debugging while SQLite remains source of truth.
**Note**: JSONL contains blob references, not resolved payloads. For full content, use SQLite queries.
## Blob Storage
Content-addressable storage for large payloads. Uses **atomic write** pattern (temp + rename) to prevent partial reads.
```python
import tempfile
import hashlib
def save_blob(content):
"""Atomically write blob. Returns ref or raises."""
data = json.dumps(content).encode()
hash_hex = hashlib.sha256(data).hexdigest()
ref = f"sha256-{hash_hex}"
path = f".worker-state/blobs/{ref}"
if os.path.exists(path):
return ref # Already exists (content-addressable = idempotent)
# Atomic write: temp file + fsync + rename
os.makedirs(os.path.dirname(path), exist_ok=True)
fd, tmp_path = tempfile.mkstemp(dir=os.path.dirname(path))
try:
with os.fdopen(fd, 'wb') as f:
f.write(data)
f.flush()
os.fsync(f.fileno())
os.replace(tmp_path, path) # Atomic on POSIX
except:
if os.path.exists(tmp_path):
os.unlink(tmp_path)
raise
return ref
def load_blob(ref):
"""Load blob content. Raises FileNotFoundError if missing."""
path = f".worker-state/blobs/{ref}"
with open(path, 'rb') as f:
return json.load(f)
```
## Phased Implementation
### Phase 0: MVP
- SQLite with WAL mode
- Basic publish/poll
- Heartbeats every 10s
- Simple polling (500ms)
### Phase 1: Robustness
- Task claim with transactions
- JSONL export for debugging
- Blob storage for large payloads
- Dead agent detection
### Phase 2: Performance
- FTS5 for full-text search
- Message compaction/archival
- Adaptive polling
### Phase 3: Scale (if ever)
- Read replicas
- Sharding by correlation_id
- Background workers for maintenance
## Integration Points
### With Worker State Machine (skills-4oj)
State transitions publish messages:
```python
publish(db, worker_id, {
'type': 'state_change',
'payload': {'from': 'WORKING', 'to': 'IN_REVIEW'}
})
```
### With review-gate (skills-byq)
Review requests/results via messages:
```python
publish(db, worker_id, {
'type': 'review_request',
'correlation_id': task_id
})
```
### With Observability (skills-yak)
Status command queries SQLite directly:
```sql
SELECT from_agent, type, ts, payload
FROM messages
WHERE type IN ('heartbeat', 'state_change')
ORDER BY seq DESC LIMIT 100;
```
## Open Questions
1. ~~**Database location**~~: `.worker-state/bus.db` (resolved)
2. **Export frequency**: On-commit hook vs periodic background? (defer to implementation)
3. **Message retention**: Archive messages older than min cursor? Vacuum schedule?
4. ~~**Schema migrations**~~: `meta.schema_version` table added; agents check on startup
5. **Security**: All agents currently trusted; add HMAC signing if isolation needed?
6. **Backpressure**: How to slow producers when consumers overwhelmed?
## Resolved from Spec Review
| Issue | Resolution |
|-------|------------|
| `seq` not auto-increment | Changed to `INTEGER PRIMARY KEY AUTOINCREMENT` |
| At-most-once delivery | Cursor now advanced only after ack |
| Heartbeats in messages table | Separate `heartbeats` table with upsert |
| Heartbeats block during LLM | `HeartbeatThread` runs independently |
| JSONL line-count broken | `export_state` table tracks last seq |
| Timestamp format unreliable | Changed to epoch milliseconds |
| Zombie task claims | `task_claims` table with lease expiry |
| Blob writes not atomic | Temp file + fsync + rename pattern |
## Nim Implementation Notes
### Dependencies
```nim
# nimble install
requires "tiny_sqlite >= 0.2.0"
requires "jsony >= 1.1.0" # Optional, faster JSON
```
### Build Configuration
```nim
# config.nims or bus.nim.cfg
--mm:orc
--threads:on
-d:release
# Static SQLite (single binary):
{.passC: "-DSQLITE_THREADSAFE=1".}
{.compile: "libs/sqlite3.c".}
```
### Project Structure
```
src/
├── libs/sqlite3.c # SQLite amalgamation
├── db_layer.nim # Connection setup, PRAGMAs
├── bus.nim # Publish, poll, ack
├── heartbeat.nim # Dedicated thread + channel
├── claims.nim # Task claim with leases
└── export.nim # JSONL export
```
### Key Patterns
1. **One connection per thread** - never share `DbConn`
2. **Channels for control** - stop signals, status updates
3. **Short transactions** - BEGIN IMMEDIATE, do work, COMMIT quickly
4. **Exceptions for errors** - catch `DbError`, convert to meaningful messages
## References
- Beads: https://github.com/steveyegge/beads
- Tissue: https://github.com/evil-mind-evil-sword/tissue
- SQLite WAL mode: https://sqlite.org/wal.html
- SQLite BEGIN IMMEDIATE: https://sqlite.org/lang_transaction.html
- tiny_sqlite: https://github.com/GULPF/tiny_sqlite