Multi-agent coordination CLI with SQLite message bus: - State machine: ASSIGNED -> WORKING -> IN_REVIEW -> APPROVED -> COMPLETED - Commands: spawn, start, done, approve, merge, cancel, fail, heartbeat - SQLite WAL mode, dedicated heartbeat thread, channel-based IPC - cligen for CLI, tiny_sqlite for DB, ORC memory management Design docs for branch-per-worker, state machine, message passing, and human observability patterns. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
750 lines
22 KiB
Markdown
750 lines
22 KiB
Markdown
# Message Passing Layer Design
|
|
|
|
**Status**: Draft (v4 - Nim implementation)
|
|
**Bead**: skills-ms5
|
|
**Epic**: skills-s6y (Multi-agent orchestration: Lego brick architecture)
|
|
**Language**: Nim (ORC, threads, tiny_sqlite)
|
|
|
|
## Changelog
|
|
|
|
- **v4**: Nim implementation details (tiny_sqlite, dedicated heartbeat thread, channels)
|
|
- **v3**: Fixed BLOCKERs from orch spec-review (seq autoincrement, ack semantics, heartbeats table, epoch timestamps, JSONL export, task leases)
|
|
- **v2**: Changed to SQLite primary (was JSONL+flock)
|
|
|
|
## Overview
|
|
|
|
This document defines the local message passing layer for multi-agent coordination. Agents communicate through a shared SQLite database with ACID guarantees. A JSONL export provides human debugging and git history.
|
|
|
|
## Design Principles
|
|
|
|
1. **SQLite as source of truth** - Atomic commits, crash recovery, indexed queries
|
|
2. **JSONL for observability** - Human-readable export for debugging and git diffs
|
|
3. **File-based, no network** - Any agent with SQLite bindings can participate
|
|
4. **Liveness detection** - Heartbeats to detect dead agents
|
|
5. **Blob storage for large payloads** - Keep database lean
|
|
|
|
## Why SQLite Over JSONL+flock
|
|
|
|
| Concern | JSONL+flock | SQLite |
|
|
|---------|-------------|--------|
|
|
| Crash mid-write | Corrupted log, manual recovery | Atomic commit, automatic rollback |
|
|
| Payload >4KB | Interleaved writes possible | Always atomic |
|
|
| Query efficiency | O(N) scan | O(1) with index |
|
|
| Portability | POSIX flock only | Works everywhere |
|
|
| Complexity | DIY recovery logic | Built-in WAL, transactions |
|
|
|
|
See `docs/design/message-passing-comparison.md` for full analysis.
|
|
|
|
## Prior Art
|
|
|
|
| System | Approach | Key Insight |
|
|
|--------|----------|-------------|
|
|
| Beads | SQLite + JSONL export | SQLite for coordination, JSONL for git |
|
|
| Tissue | JSONL primary, SQLite cache | Simpler but less safe |
|
|
| Kafka | Append-only log, offsets | Total ordering, replay |
|
|
| Redis Streams | Consumer groups, claims | First claim wins |
|
|
|
|
## Directory Structure
|
|
|
|
```
|
|
.worker-state/
|
|
├── bus.db # SQLite - source of truth (gitignored)
|
|
├── bus.jsonl # JSONL export for debugging/git (read-only)
|
|
├── blobs/ # Content-addressable storage for large payloads
|
|
│ └── sha256-a1b2c3...
|
|
└── workers/
|
|
└── worker-auth.json # Worker state (separate from messages)
|
|
```
|
|
|
|
## Database Schema
|
|
|
|
```sql
|
|
-- Messages table (seq is rowid-based for true auto-increment)
|
|
CREATE TABLE messages (
|
|
seq INTEGER PRIMARY KEY AUTOINCREMENT, -- Guaranteed monotonic, auto-assigned
|
|
id TEXT NOT NULL UNIQUE, -- UUID for idempotent retries
|
|
ts_ms INTEGER NOT NULL, -- Epoch milliseconds (reliable comparisons)
|
|
from_agent TEXT NOT NULL,
|
|
to_agent TEXT, -- NULL = broadcast
|
|
type TEXT NOT NULL,
|
|
correlation_id TEXT,
|
|
in_reply_to TEXT,
|
|
payload TEXT, -- JSON blob or NULL
|
|
payload_ref TEXT, -- Blob hash for large payloads
|
|
CHECK (payload IS NULL OR payload_ref IS NULL) -- Not both
|
|
);
|
|
|
|
CREATE INDEX idx_messages_to_seq ON messages(to_agent, seq);
|
|
CREATE INDEX idx_messages_correlation_seq ON messages(correlation_id, seq);
|
|
CREATE INDEX idx_messages_type_from_ts ON messages(type, from_agent, ts_ms);
|
|
|
|
-- Cursors table (per-agent read position)
|
|
-- NOTE: Cursor is only advanced AFTER successful processing (at-least-once)
|
|
CREATE TABLE cursors (
|
|
agent_id TEXT PRIMARY KEY,
|
|
last_acked_seq INTEGER NOT NULL DEFAULT 0, -- Last successfully processed
|
|
updated_at_ms INTEGER NOT NULL
|
|
);
|
|
|
|
-- Heartbeats table (separate from messages to avoid write contention)
|
|
-- One row per agent, upsert pattern
|
|
CREATE TABLE heartbeats (
|
|
agent_id TEXT PRIMARY KEY,
|
|
ts_ms INTEGER NOT NULL,
|
|
status TEXT NOT NULL, -- idle, working, blocked
|
|
current_task TEXT,
|
|
progress REAL
|
|
);
|
|
|
|
-- Task claims with lease expiry (prevents zombie claims)
|
|
CREATE TABLE task_claims (
|
|
task_id TEXT PRIMARY KEY,
|
|
claimed_by TEXT NOT NULL,
|
|
claimed_at_ms INTEGER NOT NULL,
|
|
lease_until_ms INTEGER NOT NULL -- Must renew or release
|
|
);
|
|
|
|
-- Schema versioning for migrations
|
|
CREATE TABLE meta (
|
|
key TEXT PRIMARY KEY,
|
|
value TEXT NOT NULL
|
|
);
|
|
INSERT INTO meta (key, value) VALUES ('schema_version', '1');
|
|
|
|
-- Export tracking (not line count!)
|
|
CREATE TABLE export_state (
|
|
id INTEGER PRIMARY KEY CHECK (id = 1),
|
|
last_seq INTEGER NOT NULL DEFAULT 0
|
|
);
|
|
INSERT INTO export_state (id, last_seq) VALUES (1, 0);
|
|
```
|
|
|
|
### Connection Setup (REQUIRED)
|
|
|
|
PRAGMAs must be set on every connection. **Each thread must have its own connection.**
|
|
|
|
```nim
|
|
import tiny_sqlite
|
|
|
|
proc openBusDb*(dbPath: string): DbConn =
|
|
## Open database with required PRAGMAs. One connection per thread.
|
|
result = openDatabase(dbPath)
|
|
result.exec("PRAGMA busy_timeout = 5000")
|
|
result.exec("PRAGMA foreign_keys = ON")
|
|
result.exec("PRAGMA journal_mode = WAL")
|
|
result.exec("PRAGMA synchronous = NORMAL")
|
|
```
|
|
|
|
**Critical**: Do NOT share `DbConn` across threads. Each thread opens its own connection.
|
|
|
|
## Message Schema
|
|
|
|
Same fields as before, stored in SQLite:
|
|
|
|
```json
|
|
{
|
|
"id": "uuid-v4",
|
|
"ts": "2026-01-10T14:00:00.000Z",
|
|
"from": "worker-auth",
|
|
"to": "orchestrator",
|
|
"type": "task_done",
|
|
"correlation_id": "skills-abc",
|
|
"payload": {
|
|
"branch": "worker-auth/skills-abc",
|
|
"commit": "a1b2c3d4"
|
|
}
|
|
}
|
|
```
|
|
|
|
### Message Types
|
|
|
|
**Task Lifecycle:**
|
|
- `task_assign` - Orchestrator assigns task to worker
|
|
- `task_claim` - Worker claims a task (first wins)
|
|
- `task_progress` - Worker reports progress
|
|
- `task_done` - Worker completed task
|
|
- `task_failed` - Worker failed task
|
|
- `task_blocked` - Worker blocked on dependency
|
|
|
|
**Coordination:**
|
|
- `heartbeat` - Liveness signal
|
|
- `state_change` - Worker state transition
|
|
- `review_request` - Request review of work
|
|
- `review_result` - Review approved/rejected
|
|
|
|
**System:**
|
|
- `agent_started` - Agent came online
|
|
- `agent_stopped` - Agent shutting down gracefully
|
|
|
|
## Delivery Semantics
|
|
|
|
**This layer provides AT-LEAST-ONCE delivery.**
|
|
|
|
- Messages are durable once committed
|
|
- Cursor only advances after successful processing
|
|
- Handlers MUST be idempotent or deduplicate by `message.id`
|
|
- Out-of-order processing is possible between unrelated messages
|
|
|
|
## Write Protocol
|
|
|
|
### Publishing Messages
|
|
|
|
```python
|
|
import sqlite3
|
|
import uuid
|
|
import json
|
|
import time
|
|
|
|
def publish(db_path, agent_id, message):
|
|
msg_id = message.get('id') or str(uuid.uuid4())
|
|
ts_ms = int(time.time() * 1000) # Epoch milliseconds
|
|
|
|
# Handle large payloads
|
|
payload = message.get('payload')
|
|
payload_ref = None
|
|
if payload:
|
|
payload_json = json.dumps(payload)
|
|
if len(payload_json) > 4096:
|
|
payload_ref = save_blob(payload) # Atomic write (see Blob Storage)
|
|
payload = None
|
|
else:
|
|
payload = payload_json
|
|
|
|
conn = get_connection(db_path)
|
|
try:
|
|
# Use plain BEGIN for simple inserts (less contention than IMMEDIATE)
|
|
# seq is auto-assigned by SQLite
|
|
conn.execute("""
|
|
INSERT INTO messages (id, ts_ms, from_agent, to_agent, type,
|
|
correlation_id, in_reply_to, payload, payload_ref)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""", (
|
|
msg_id,
|
|
ts_ms,
|
|
agent_id,
|
|
message.get('to'),
|
|
message['type'],
|
|
message.get('correlation_id'),
|
|
message.get('in_reply_to'),
|
|
payload,
|
|
payload_ref
|
|
))
|
|
conn.commit()
|
|
except sqlite3.IntegrityError:
|
|
# Duplicate id = already published (idempotent retry)
|
|
conn.rollback()
|
|
except Exception:
|
|
conn.rollback()
|
|
raise
|
|
finally:
|
|
conn.close()
|
|
|
|
return msg_id
|
|
```
|
|
|
|
### Write Contention
|
|
|
|
- SQLite WAL allows concurrent readers but **one writer at a time**
|
|
- `busy_timeout = 5000` handles short contention
|
|
- For high-frequency writes, add retry with exponential backoff
|
|
- Heartbeats use separate table to avoid contending with task messages
|
|
|
|
## Read Protocol
|
|
|
|
### Polling for New Messages (At-Least-Once)
|
|
|
|
**CRITICAL**: Do NOT update cursor until message is successfully processed.
|
|
|
|
```python
|
|
def poll(db_path, agent_id, limit=100):
|
|
"""Fetch unacknowledged messages. Does NOT advance cursor."""
|
|
conn = get_connection(db_path)
|
|
|
|
# Get cursor position
|
|
cursor = conn.execute(
|
|
"SELECT last_acked_seq FROM cursors WHERE agent_id = ?",
|
|
(agent_id,)
|
|
).fetchone()
|
|
last_seq = cursor['last_acked_seq'] if cursor else 0
|
|
|
|
# Fetch new messages for this agent (or broadcasts)
|
|
rows = conn.execute("""
|
|
SELECT * FROM messages
|
|
WHERE seq > ?
|
|
AND (to_agent IS NULL OR to_agent = ?)
|
|
ORDER BY seq
|
|
LIMIT ?
|
|
""", (last_seq, agent_id, limit)).fetchall()
|
|
|
|
conn.close()
|
|
|
|
# Convert to dicts, resolve blob refs
|
|
messages = []
|
|
for row in rows:
|
|
msg = dict(row)
|
|
if msg['payload_ref']:
|
|
try:
|
|
msg['payload'] = load_blob(msg['payload_ref'])
|
|
except FileNotFoundError:
|
|
msg['payload_error'] = 'blob_missing'
|
|
elif msg['payload']:
|
|
try:
|
|
msg['payload'] = json.loads(msg['payload'])
|
|
except json.JSONDecodeError:
|
|
msg['payload_error'] = 'decode_failed'
|
|
messages.append(msg)
|
|
|
|
return messages
|
|
|
|
|
|
def ack(db_path, agent_id, seq):
|
|
"""Acknowledge successful processing of message. Advances cursor."""
|
|
conn = get_connection(db_path)
|
|
ts_ms = int(time.time() * 1000)
|
|
|
|
# Only advance forward (monotonic)
|
|
conn.execute("""
|
|
INSERT INTO cursors (agent_id, last_acked_seq, updated_at_ms)
|
|
VALUES (?, ?, ?)
|
|
ON CONFLICT(agent_id) DO UPDATE SET
|
|
last_acked_seq = MAX(cursors.last_acked_seq, excluded.last_acked_seq),
|
|
updated_at_ms = excluded.updated_at_ms
|
|
""", (agent_id, seq, ts_ms))
|
|
conn.commit()
|
|
conn.close()
|
|
```
|
|
|
|
### Processing Loop
|
|
|
|
```python
|
|
def process_loop(db_path, agent_id, handler):
|
|
"""Process messages with at-least-once semantics."""
|
|
while running:
|
|
messages = poll(db_path, agent_id)
|
|
for msg in messages:
|
|
try:
|
|
handler(msg)
|
|
ack(db_path, agent_id, msg['seq']) # Only after success
|
|
except Exception as e:
|
|
# Don't ack - message will be redelivered
|
|
log.error(f"Failed to process {msg['id']}: {e}")
|
|
break # Stop batch on failure
|
|
|
|
time.sleep(poll_interval)
|
|
```
|
|
|
|
### Adaptive Polling
|
|
|
|
```python
|
|
POLL_ACTIVE = 0.2 # 200ms when working
|
|
POLL_IDLE = 2.0 # 2s when idle
|
|
POLL_BACKOFF = 1.5 # Multiplier on each empty poll
|
|
|
|
poll_interval = POLL_ACTIVE
|
|
|
|
while running:
|
|
messages = poll(db_path, my_id)
|
|
if messages:
|
|
process(messages)
|
|
poll_interval = POLL_ACTIVE # Reset to fast
|
|
else:
|
|
poll_interval = min(poll_interval * POLL_BACKOFF, POLL_IDLE)
|
|
|
|
time.sleep(poll_interval)
|
|
```
|
|
|
|
## Heartbeat Protocol
|
|
|
|
Heartbeats use a **separate table** with upsert to avoid contending with task messages.
|
|
|
|
**CRITICAL**: Heartbeats MUST run in a dedicated thread to avoid false death detection during blocking operations (LLM calls can take 60s+).
|
|
|
|
### Nim Implementation
|
|
|
|
Nim threads don't share memory by default. Use channels for communication:
|
|
|
|
```nim
|
|
import std/[times, os, channels]
|
|
import tiny_sqlite
|
|
|
|
type
|
|
HeartbeatCmd* = enum
|
|
hbUpdateStatus, hbStop
|
|
|
|
HeartbeatMsg* = object
|
|
cmd*: HeartbeatCmd
|
|
status*: string
|
|
task*: string
|
|
progress*: float
|
|
|
|
var heartbeatChan*: Channel[HeartbeatMsg]
|
|
|
|
proc writeHeartbeat(db: DbConn, agentId, status, task: string, progress: float) =
|
|
let tsMs = toUnix(getTime()) * 1000 + nanosecond(getTime()) div 1_000_000
|
|
db.exec("""
|
|
INSERT INTO heartbeats (agent_id, ts_ms, status, current_task, progress)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
ON CONFLICT(agent_id) DO UPDATE SET
|
|
ts_ms = excluded.ts_ms,
|
|
status = excluded.status,
|
|
current_task = excluded.current_task,
|
|
progress = excluded.progress
|
|
""", agentId, tsMs, status, task, progress)
|
|
|
|
proc heartbeatWorker*(args: tuple[dbPath, agentId: string]) {.thread.} =
|
|
## Dedicated heartbeat thread - owns its own DB connection
|
|
let db = openBusDb(args.dbPath)
|
|
var status = "idle"
|
|
var task = ""
|
|
var progress = 0.0
|
|
|
|
while true:
|
|
# Check for commands (non-blocking)
|
|
let tried = heartbeatChan.tryRecv()
|
|
if tried.dataAvailable:
|
|
case tried.msg.cmd
|
|
of hbStop:
|
|
db.close()
|
|
return
|
|
of hbUpdateStatus:
|
|
status = tried.msg.status
|
|
task = tried.msg.task
|
|
progress = tried.msg.progress
|
|
|
|
# Write heartbeat
|
|
try:
|
|
writeHeartbeat(db, args.agentId, status, task, progress)
|
|
except CatchableError:
|
|
discard # Log but don't crash
|
|
|
|
sleep(10_000) # 10 seconds
|
|
|
|
# Usage from main thread:
|
|
proc startHeartbeat*(dbPath, agentId: string): Thread[tuple[dbPath, agentId: string]] =
|
|
heartbeatChan.open()
|
|
createThread(result, heartbeatWorker, (dbPath, agentId))
|
|
|
|
proc updateHeartbeatStatus*(status, task: string, progress: float) =
|
|
heartbeatChan.send(HeartbeatMsg(cmd: hbUpdateStatus, status: status, task: task, progress: progress))
|
|
|
|
proc stopHeartbeat*() =
|
|
heartbeatChan.send(HeartbeatMsg(cmd: hbStop))
|
|
```
|
|
|
|
### Heartbeat Rules
|
|
|
|
1. Emit every 10 seconds via background thread
|
|
2. Include current status: `idle`, `working`, `blocked`
|
|
3. Stale thresholds: 30s = WARN, 100s = STALE, 5min = DEAD
|
|
|
|
### Liveness Query
|
|
|
|
```sql
|
|
-- Find suspected dead agents (no heartbeat in 30s)
|
|
-- Uses epoch ms for reliable comparison
|
|
SELECT agent_id, ts_ms,
|
|
(strftime('%s', 'now') * 1000 - ts_ms) / 1000 AS age_seconds
|
|
FROM heartbeats
|
|
WHERE ts_ms < (strftime('%s', 'now') * 1000 - 30000);
|
|
```
|
|
|
|
## Task Claim Pattern
|
|
|
|
Task claims use a **dedicated table with leases** to prevent zombie claims when agents die.
|
|
|
|
```python
|
|
LEASE_DURATION_MS = 60000 # 60 seconds
|
|
|
|
def try_claim_task(db_path, task_id, agent_id):
|
|
"""Attempt to claim a task. Returns True if successful."""
|
|
conn = get_connection(db_path)
|
|
ts_ms = int(time.time() * 1000)
|
|
lease_until = ts_ms + LEASE_DURATION_MS
|
|
|
|
try:
|
|
conn.execute("BEGIN IMMEDIATE")
|
|
|
|
# Check for existing valid claim
|
|
existing = conn.execute("""
|
|
SELECT claimed_by, lease_until_ms FROM task_claims
|
|
WHERE task_id = ?
|
|
""", (task_id,)).fetchone()
|
|
|
|
if existing:
|
|
if existing['lease_until_ms'] > ts_ms:
|
|
# Valid claim exists
|
|
conn.rollback()
|
|
return False
|
|
else:
|
|
# Expired claim - delete and reclaim
|
|
conn.execute("DELETE FROM task_claims WHERE task_id = ?", (task_id,))
|
|
|
|
# Insert new claim
|
|
conn.execute("""
|
|
INSERT INTO task_claims (task_id, claimed_by, claimed_at_ms, lease_until_ms)
|
|
VALUES (?, ?, ?, ?)
|
|
""", (task_id, agent_id, ts_ms, lease_until))
|
|
|
|
conn.commit()
|
|
return True
|
|
|
|
except sqlite3.OperationalError:
|
|
# Lock contention - another agent got there first
|
|
conn.rollback()
|
|
return False
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def renew_claim(db_path, task_id, agent_id):
|
|
"""Renew lease on a claimed task. Call periodically while working."""
|
|
conn = get_connection(db_path)
|
|
ts_ms = int(time.time() * 1000)
|
|
lease_until = ts_ms + LEASE_DURATION_MS
|
|
|
|
result = conn.execute("""
|
|
UPDATE task_claims
|
|
SET lease_until_ms = ?
|
|
WHERE task_id = ? AND claimed_by = ?
|
|
""", (lease_until, task_id, agent_id))
|
|
conn.commit()
|
|
updated = result.rowcount > 0
|
|
conn.close()
|
|
return updated
|
|
|
|
|
|
def release_claim(db_path, task_id, agent_id):
|
|
"""Release a claim when done (success or failure)."""
|
|
conn = get_connection(db_path)
|
|
conn.execute("""
|
|
DELETE FROM task_claims
|
|
WHERE task_id = ? AND claimed_by = ?
|
|
""", (task_id, agent_id))
|
|
conn.commit()
|
|
conn.close()
|
|
```
|
|
|
|
### Claim Lifecycle
|
|
|
|
1. Agent calls `try_claim_task()` - gets exclusive claim with 60s lease
|
|
2. Agent periodically calls `renew_claim()` (every 30s) while working
|
|
3. On completion, agent calls `release_claim()`
|
|
4. If agent dies, lease expires and task becomes claimable again
|
|
|
|
## JSONL Export for Debugging
|
|
|
|
Background process or post-commit hook exports to JSONL.
|
|
|
|
**CRITICAL**: Track last exported seq in database, NOT by counting lines. Seq values can have gaps from rollbacks.
|
|
|
|
```python
|
|
def export_to_jsonl(db_path, jsonl_path):
|
|
"""Export new messages to JSONL. Idempotent, crash-safe."""
|
|
conn = get_connection(db_path)
|
|
|
|
# Get last exported seq from database
|
|
row = conn.execute(
|
|
"SELECT last_seq FROM export_state WHERE id = 1"
|
|
).fetchone()
|
|
last_exported = row['last_seq'] if row else 0
|
|
|
|
# Export new messages
|
|
rows = conn.execute(
|
|
"SELECT * FROM messages WHERE seq > ? ORDER BY seq",
|
|
(last_exported,)
|
|
).fetchall()
|
|
|
|
if not rows:
|
|
conn.close()
|
|
return
|
|
|
|
max_seq = last_exported
|
|
with open(jsonl_path, 'a') as f:
|
|
for row in rows:
|
|
msg = dict(row)
|
|
# Optionally resolve blob refs for full content
|
|
f.write(json.dumps(msg) + '\n')
|
|
max_seq = max(max_seq, msg['seq'])
|
|
|
|
# Update export state (atomic with file write visible)
|
|
conn.execute("""
|
|
UPDATE export_state SET last_seq = ? WHERE id = 1
|
|
""", (max_seq,))
|
|
conn.commit()
|
|
conn.close()
|
|
```
|
|
|
|
This gives you `tail -f bus.jsonl` for debugging while SQLite remains source of truth.
|
|
|
|
**Note**: JSONL contains blob references, not resolved payloads. For full content, use SQLite queries.
|
|
|
|
## Blob Storage
|
|
|
|
Content-addressable storage for large payloads. Uses **atomic write** pattern (temp + rename) to prevent partial reads.
|
|
|
|
```python
|
|
import tempfile
|
|
import hashlib
|
|
|
|
def save_blob(content):
|
|
"""Atomically write blob. Returns ref or raises."""
|
|
data = json.dumps(content).encode()
|
|
hash_hex = hashlib.sha256(data).hexdigest()
|
|
ref = f"sha256-{hash_hex}"
|
|
path = f".worker-state/blobs/{ref}"
|
|
|
|
if os.path.exists(path):
|
|
return ref # Already exists (content-addressable = idempotent)
|
|
|
|
# Atomic write: temp file + fsync + rename
|
|
os.makedirs(os.path.dirname(path), exist_ok=True)
|
|
fd, tmp_path = tempfile.mkstemp(dir=os.path.dirname(path))
|
|
try:
|
|
with os.fdopen(fd, 'wb') as f:
|
|
f.write(data)
|
|
f.flush()
|
|
os.fsync(f.fileno())
|
|
os.replace(tmp_path, path) # Atomic on POSIX
|
|
except:
|
|
if os.path.exists(tmp_path):
|
|
os.unlink(tmp_path)
|
|
raise
|
|
|
|
return ref
|
|
|
|
|
|
def load_blob(ref):
|
|
"""Load blob content. Raises FileNotFoundError if missing."""
|
|
path = f".worker-state/blobs/{ref}"
|
|
with open(path, 'rb') as f:
|
|
return json.load(f)
|
|
```
|
|
|
|
## Phased Implementation
|
|
|
|
### Phase 0: MVP
|
|
- SQLite with WAL mode
|
|
- Basic publish/poll
|
|
- Heartbeats every 10s
|
|
- Simple polling (500ms)
|
|
|
|
### Phase 1: Robustness
|
|
- Task claim with transactions
|
|
- JSONL export for debugging
|
|
- Blob storage for large payloads
|
|
- Dead agent detection
|
|
|
|
### Phase 2: Performance
|
|
- FTS5 for full-text search
|
|
- Message compaction/archival
|
|
- Adaptive polling
|
|
|
|
### Phase 3: Scale (if ever)
|
|
- Read replicas
|
|
- Sharding by correlation_id
|
|
- Background workers for maintenance
|
|
|
|
## Integration Points
|
|
|
|
### With Worker State Machine (skills-4oj)
|
|
|
|
State transitions publish messages:
|
|
```python
|
|
publish(db, worker_id, {
|
|
'type': 'state_change',
|
|
'payload': {'from': 'WORKING', 'to': 'IN_REVIEW'}
|
|
})
|
|
```
|
|
|
|
### With review-gate (skills-byq)
|
|
|
|
Review requests/results via messages:
|
|
```python
|
|
publish(db, worker_id, {
|
|
'type': 'review_request',
|
|
'correlation_id': task_id
|
|
})
|
|
```
|
|
|
|
### With Observability (skills-yak)
|
|
|
|
Status command queries SQLite directly:
|
|
```sql
|
|
SELECT from_agent, type, ts, payload
|
|
FROM messages
|
|
WHERE type IN ('heartbeat', 'state_change')
|
|
ORDER BY seq DESC LIMIT 100;
|
|
```
|
|
|
|
## Open Questions
|
|
|
|
1. ~~**Database location**~~: `.worker-state/bus.db` (resolved)
|
|
2. **Export frequency**: On-commit hook vs periodic background? (defer to implementation)
|
|
3. **Message retention**: Archive messages older than min cursor? Vacuum schedule?
|
|
4. ~~**Schema migrations**~~: `meta.schema_version` table added; agents check on startup
|
|
5. **Security**: All agents currently trusted; add HMAC signing if isolation needed?
|
|
6. **Backpressure**: How to slow producers when consumers overwhelmed?
|
|
|
|
## Resolved from Spec Review
|
|
|
|
| Issue | Resolution |
|
|
|-------|------------|
|
|
| `seq` not auto-increment | Changed to `INTEGER PRIMARY KEY AUTOINCREMENT` |
|
|
| At-most-once delivery | Cursor now advanced only after ack |
|
|
| Heartbeats in messages table | Separate `heartbeats` table with upsert |
|
|
| Heartbeats block during LLM | `HeartbeatThread` runs independently |
|
|
| JSONL line-count broken | `export_state` table tracks last seq |
|
|
| Timestamp format unreliable | Changed to epoch milliseconds |
|
|
| Zombie task claims | `task_claims` table with lease expiry |
|
|
| Blob writes not atomic | Temp file + fsync + rename pattern |
|
|
|
|
## Nim Implementation Notes
|
|
|
|
### Dependencies
|
|
|
|
```nim
|
|
# nimble install
|
|
requires "tiny_sqlite >= 0.2.0"
|
|
requires "jsony >= 1.1.0" # Optional, faster JSON
|
|
```
|
|
|
|
### Build Configuration
|
|
|
|
```nim
|
|
# config.nims or bus.nim.cfg
|
|
--mm:orc
|
|
--threads:on
|
|
-d:release
|
|
|
|
# Static SQLite (single binary):
|
|
{.passC: "-DSQLITE_THREADSAFE=1".}
|
|
{.compile: "libs/sqlite3.c".}
|
|
```
|
|
|
|
### Project Structure
|
|
|
|
```
|
|
src/
|
|
├── libs/sqlite3.c # SQLite amalgamation
|
|
├── db_layer.nim # Connection setup, PRAGMAs
|
|
├── bus.nim # Publish, poll, ack
|
|
├── heartbeat.nim # Dedicated thread + channel
|
|
├── claims.nim # Task claim with leases
|
|
└── export.nim # JSONL export
|
|
```
|
|
|
|
### Key Patterns
|
|
|
|
1. **One connection per thread** - never share `DbConn`
|
|
2. **Channels for control** - stop signals, status updates
|
|
3. **Short transactions** - BEGIN IMMEDIATE, do work, COMMIT quickly
|
|
4. **Exceptions for errors** - catch `DbError`, convert to meaningful messages
|
|
|
|
## References
|
|
|
|
- Beads: https://github.com/steveyegge/beads
|
|
- Tissue: https://github.com/evil-mind-evil-sword/tissue
|
|
- SQLite WAL mode: https://sqlite.org/wal.html
|
|
- SQLite BEGIN IMMEDIATE: https://sqlite.org/lang_transaction.html
|
|
- tiny_sqlite: https://github.com/GULPF/tiny_sqlite
|