# Message Passing Layer Design **Status**: Draft (v4 - Nim implementation) **Bead**: skills-ms5 **Epic**: skills-s6y (Multi-agent orchestration: Lego brick architecture) **Language**: Nim (ORC, threads, tiny_sqlite) ## Changelog - **v4**: Nim implementation details (tiny_sqlite, dedicated heartbeat thread, channels) - **v3**: Fixed BLOCKERs from orch spec-review (seq autoincrement, ack semantics, heartbeats table, epoch timestamps, JSONL export, task leases) - **v2**: Changed to SQLite primary (was JSONL+flock) ## Overview This document defines the local message passing layer for multi-agent coordination. Agents communicate through a shared SQLite database with ACID guarantees. A JSONL export provides human debugging and git history. ## Design Principles 1. **SQLite as source of truth** - Atomic commits, crash recovery, indexed queries 2. **JSONL for observability** - Human-readable export for debugging and git diffs 3. **File-based, no network** - Any agent with SQLite bindings can participate 4. **Liveness detection** - Heartbeats to detect dead agents 5. **Blob storage for large payloads** - Keep database lean ## Why SQLite Over JSONL+flock | Concern | JSONL+flock | SQLite | |---------|-------------|--------| | Crash mid-write | Corrupted log, manual recovery | Atomic commit, automatic rollback | | Payload >4KB | Interleaved writes possible | Always atomic | | Query efficiency | O(N) scan | O(1) with index | | Portability | POSIX flock only | Works everywhere | | Complexity | DIY recovery logic | Built-in WAL, transactions | See `docs/design/message-passing-comparison.md` for full analysis. ## Prior Art | System | Approach | Key Insight | |--------|----------|-------------| | Beads | SQLite + JSONL export | SQLite for coordination, JSONL for git | | Tissue | JSONL primary, SQLite cache | Simpler but less safe | | Kafka | Append-only log, offsets | Total ordering, replay | | Redis Streams | Consumer groups, claims | First claim wins | ## Directory Structure ``` .worker-state/ ├── bus.db # SQLite - source of truth (gitignored) ├── bus.jsonl # JSONL export for debugging/git (read-only) ├── blobs/ # Content-addressable storage for large payloads │ └── sha256-a1b2c3... └── workers/ └── worker-auth.json # Worker state (separate from messages) ``` ## Database Schema ```sql -- Messages table (seq is rowid-based for true auto-increment) CREATE TABLE messages ( seq INTEGER PRIMARY KEY AUTOINCREMENT, -- Guaranteed monotonic, auto-assigned id TEXT NOT NULL UNIQUE, -- UUID for idempotent retries ts_ms INTEGER NOT NULL, -- Epoch milliseconds (reliable comparisons) from_agent TEXT NOT NULL, to_agent TEXT, -- NULL = broadcast type TEXT NOT NULL, correlation_id TEXT, in_reply_to TEXT, payload TEXT, -- JSON blob or NULL payload_ref TEXT, -- Blob hash for large payloads CHECK (payload IS NULL OR payload_ref IS NULL) -- Not both ); CREATE INDEX idx_messages_to_seq ON messages(to_agent, seq); CREATE INDEX idx_messages_correlation_seq ON messages(correlation_id, seq); CREATE INDEX idx_messages_type_from_ts ON messages(type, from_agent, ts_ms); -- Cursors table (per-agent read position) -- NOTE: Cursor is only advanced AFTER successful processing (at-least-once) CREATE TABLE cursors ( agent_id TEXT PRIMARY KEY, last_acked_seq INTEGER NOT NULL DEFAULT 0, -- Last successfully processed updated_at_ms INTEGER NOT NULL ); -- Heartbeats table (separate from messages to avoid write contention) -- One row per agent, upsert pattern CREATE TABLE heartbeats ( agent_id TEXT PRIMARY KEY, ts_ms INTEGER NOT NULL, status TEXT NOT NULL, -- idle, working, blocked current_task TEXT, progress REAL ); -- Task claims with lease expiry (prevents zombie claims) CREATE TABLE task_claims ( task_id TEXT PRIMARY KEY, claimed_by TEXT NOT NULL, claimed_at_ms INTEGER NOT NULL, lease_until_ms INTEGER NOT NULL -- Must renew or release ); -- Schema versioning for migrations CREATE TABLE meta ( key TEXT PRIMARY KEY, value TEXT NOT NULL ); INSERT INTO meta (key, value) VALUES ('schema_version', '1'); -- Export tracking (not line count!) CREATE TABLE export_state ( id INTEGER PRIMARY KEY CHECK (id = 1), last_seq INTEGER NOT NULL DEFAULT 0 ); INSERT INTO export_state (id, last_seq) VALUES (1, 0); ``` ### Connection Setup (REQUIRED) PRAGMAs must be set on every connection. **Each thread must have its own connection.** ```nim import tiny_sqlite proc openBusDb*(dbPath: string): DbConn = ## Open database with required PRAGMAs. One connection per thread. result = openDatabase(dbPath) result.exec("PRAGMA busy_timeout = 5000") result.exec("PRAGMA foreign_keys = ON") result.exec("PRAGMA journal_mode = WAL") result.exec("PRAGMA synchronous = NORMAL") ``` **Critical**: Do NOT share `DbConn` across threads. Each thread opens its own connection. ## Message Schema Same fields as before, stored in SQLite: ```json { "id": "uuid-v4", "ts": "2026-01-10T14:00:00.000Z", "from": "worker-auth", "to": "orchestrator", "type": "task_done", "correlation_id": "skills-abc", "payload": { "branch": "worker-auth/skills-abc", "commit": "a1b2c3d4" } } ``` ### Message Types **Task Lifecycle:** - `task_assign` - Orchestrator assigns task to worker - `task_claim` - Worker claims a task (first wins) - `task_progress` - Worker reports progress - `task_done` - Worker completed task - `task_failed` - Worker failed task - `task_blocked` - Worker blocked on dependency **Coordination:** - `heartbeat` - Liveness signal - `state_change` - Worker state transition - `review_request` - Request review of work - `review_result` - Review approved/rejected **System:** - `agent_started` - Agent came online - `agent_stopped` - Agent shutting down gracefully ## Delivery Semantics **This layer provides AT-LEAST-ONCE delivery.** - Messages are durable once committed - Cursor only advances after successful processing - Handlers MUST be idempotent or deduplicate by `message.id` - Out-of-order processing is possible between unrelated messages ## Write Protocol ### Publishing Messages ```python import sqlite3 import uuid import json import time def publish(db_path, agent_id, message): msg_id = message.get('id') or str(uuid.uuid4()) ts_ms = int(time.time() * 1000) # Epoch milliseconds # Handle large payloads payload = message.get('payload') payload_ref = None if payload: payload_json = json.dumps(payload) if len(payload_json) > 4096: payload_ref = save_blob(payload) # Atomic write (see Blob Storage) payload = None else: payload = payload_json conn = get_connection(db_path) try: # Use plain BEGIN for simple inserts (less contention than IMMEDIATE) # seq is auto-assigned by SQLite conn.execute(""" INSERT INTO messages (id, ts_ms, from_agent, to_agent, type, correlation_id, in_reply_to, payload, payload_ref) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( msg_id, ts_ms, agent_id, message.get('to'), message['type'], message.get('correlation_id'), message.get('in_reply_to'), payload, payload_ref )) conn.commit() except sqlite3.IntegrityError: # Duplicate id = already published (idempotent retry) conn.rollback() except Exception: conn.rollback() raise finally: conn.close() return msg_id ``` ### Write Contention - SQLite WAL allows concurrent readers but **one writer at a time** - `busy_timeout = 5000` handles short contention - For high-frequency writes, add retry with exponential backoff - Heartbeats use separate table to avoid contending with task messages ## Read Protocol ### Polling for New Messages (At-Least-Once) **CRITICAL**: Do NOT update cursor until message is successfully processed. ```python def poll(db_path, agent_id, limit=100): """Fetch unacknowledged messages. Does NOT advance cursor.""" conn = get_connection(db_path) # Get cursor position cursor = conn.execute( "SELECT last_acked_seq FROM cursors WHERE agent_id = ?", (agent_id,) ).fetchone() last_seq = cursor['last_acked_seq'] if cursor else 0 # Fetch new messages for this agent (or broadcasts) rows = conn.execute(""" SELECT * FROM messages WHERE seq > ? AND (to_agent IS NULL OR to_agent = ?) ORDER BY seq LIMIT ? """, (last_seq, agent_id, limit)).fetchall() conn.close() # Convert to dicts, resolve blob refs messages = [] for row in rows: msg = dict(row) if msg['payload_ref']: try: msg['payload'] = load_blob(msg['payload_ref']) except FileNotFoundError: msg['payload_error'] = 'blob_missing' elif msg['payload']: try: msg['payload'] = json.loads(msg['payload']) except json.JSONDecodeError: msg['payload_error'] = 'decode_failed' messages.append(msg) return messages def ack(db_path, agent_id, seq): """Acknowledge successful processing of message. Advances cursor.""" conn = get_connection(db_path) ts_ms = int(time.time() * 1000) # Only advance forward (monotonic) conn.execute(""" INSERT INTO cursors (agent_id, last_acked_seq, updated_at_ms) VALUES (?, ?, ?) ON CONFLICT(agent_id) DO UPDATE SET last_acked_seq = MAX(cursors.last_acked_seq, excluded.last_acked_seq), updated_at_ms = excluded.updated_at_ms """, (agent_id, seq, ts_ms)) conn.commit() conn.close() ``` ### Processing Loop ```python def process_loop(db_path, agent_id, handler): """Process messages with at-least-once semantics.""" while running: messages = poll(db_path, agent_id) for msg in messages: try: handler(msg) ack(db_path, agent_id, msg['seq']) # Only after success except Exception as e: # Don't ack - message will be redelivered log.error(f"Failed to process {msg['id']}: {e}") break # Stop batch on failure time.sleep(poll_interval) ``` ### Adaptive Polling ```python POLL_ACTIVE = 0.2 # 200ms when working POLL_IDLE = 2.0 # 2s when idle POLL_BACKOFF = 1.5 # Multiplier on each empty poll poll_interval = POLL_ACTIVE while running: messages = poll(db_path, my_id) if messages: process(messages) poll_interval = POLL_ACTIVE # Reset to fast else: poll_interval = min(poll_interval * POLL_BACKOFF, POLL_IDLE) time.sleep(poll_interval) ``` ## Heartbeat Protocol Heartbeats use a **separate table** with upsert to avoid contending with task messages. **CRITICAL**: Heartbeats MUST run in a dedicated thread to avoid false death detection during blocking operations (LLM calls can take 60s+). ### Nim Implementation Nim threads don't share memory by default. Use channels for communication: ```nim import std/[times, os, channels] import tiny_sqlite type HeartbeatCmd* = enum hbUpdateStatus, hbStop HeartbeatMsg* = object cmd*: HeartbeatCmd status*: string task*: string progress*: float var heartbeatChan*: Channel[HeartbeatMsg] proc writeHeartbeat(db: DbConn, agentId, status, task: string, progress: float) = let tsMs = toUnix(getTime()) * 1000 + nanosecond(getTime()) div 1_000_000 db.exec(""" INSERT INTO heartbeats (agent_id, ts_ms, status, current_task, progress) VALUES (?, ?, ?, ?, ?) ON CONFLICT(agent_id) DO UPDATE SET ts_ms = excluded.ts_ms, status = excluded.status, current_task = excluded.current_task, progress = excluded.progress """, agentId, tsMs, status, task, progress) proc heartbeatWorker*(args: tuple[dbPath, agentId: string]) {.thread.} = ## Dedicated heartbeat thread - owns its own DB connection let db = openBusDb(args.dbPath) var status = "idle" var task = "" var progress = 0.0 while true: # Check for commands (non-blocking) let tried = heartbeatChan.tryRecv() if tried.dataAvailable: case tried.msg.cmd of hbStop: db.close() return of hbUpdateStatus: status = tried.msg.status task = tried.msg.task progress = tried.msg.progress # Write heartbeat try: writeHeartbeat(db, args.agentId, status, task, progress) except CatchableError: discard # Log but don't crash sleep(10_000) # 10 seconds # Usage from main thread: proc startHeartbeat*(dbPath, agentId: string): Thread[tuple[dbPath, agentId: string]] = heartbeatChan.open() createThread(result, heartbeatWorker, (dbPath, agentId)) proc updateHeartbeatStatus*(status, task: string, progress: float) = heartbeatChan.send(HeartbeatMsg(cmd: hbUpdateStatus, status: status, task: task, progress: progress)) proc stopHeartbeat*() = heartbeatChan.send(HeartbeatMsg(cmd: hbStop)) ``` ### Heartbeat Rules 1. Emit every 10 seconds via background thread 2. Include current status: `idle`, `working`, `blocked` 3. Stale thresholds: 30s = WARN, 100s = STALE, 5min = DEAD ### Liveness Query ```sql -- Find suspected dead agents (no heartbeat in 30s) -- Uses epoch ms for reliable comparison SELECT agent_id, ts_ms, (strftime('%s', 'now') * 1000 - ts_ms) / 1000 AS age_seconds FROM heartbeats WHERE ts_ms < (strftime('%s', 'now') * 1000 - 30000); ``` ## Task Claim Pattern Task claims use a **dedicated table with leases** to prevent zombie claims when agents die. ```python LEASE_DURATION_MS = 60000 # 60 seconds def try_claim_task(db_path, task_id, agent_id): """Attempt to claim a task. Returns True if successful.""" conn = get_connection(db_path) ts_ms = int(time.time() * 1000) lease_until = ts_ms + LEASE_DURATION_MS try: conn.execute("BEGIN IMMEDIATE") # Check for existing valid claim existing = conn.execute(""" SELECT claimed_by, lease_until_ms FROM task_claims WHERE task_id = ? """, (task_id,)).fetchone() if existing: if existing['lease_until_ms'] > ts_ms: # Valid claim exists conn.rollback() return False else: # Expired claim - delete and reclaim conn.execute("DELETE FROM task_claims WHERE task_id = ?", (task_id,)) # Insert new claim conn.execute(""" INSERT INTO task_claims (task_id, claimed_by, claimed_at_ms, lease_until_ms) VALUES (?, ?, ?, ?) """, (task_id, agent_id, ts_ms, lease_until)) conn.commit() return True except sqlite3.OperationalError: # Lock contention - another agent got there first conn.rollback() return False finally: conn.close() def renew_claim(db_path, task_id, agent_id): """Renew lease on a claimed task. Call periodically while working.""" conn = get_connection(db_path) ts_ms = int(time.time() * 1000) lease_until = ts_ms + LEASE_DURATION_MS result = conn.execute(""" UPDATE task_claims SET lease_until_ms = ? WHERE task_id = ? AND claimed_by = ? """, (lease_until, task_id, agent_id)) conn.commit() updated = result.rowcount > 0 conn.close() return updated def release_claim(db_path, task_id, agent_id): """Release a claim when done (success or failure).""" conn = get_connection(db_path) conn.execute(""" DELETE FROM task_claims WHERE task_id = ? AND claimed_by = ? """, (task_id, agent_id)) conn.commit() conn.close() ``` ### Claim Lifecycle 1. Agent calls `try_claim_task()` - gets exclusive claim with 60s lease 2. Agent periodically calls `renew_claim()` (every 30s) while working 3. On completion, agent calls `release_claim()` 4. If agent dies, lease expires and task becomes claimable again ## JSONL Export for Debugging Background process or post-commit hook exports to JSONL. **CRITICAL**: Track last exported seq in database, NOT by counting lines. Seq values can have gaps from rollbacks. ```python def export_to_jsonl(db_path, jsonl_path): """Export new messages to JSONL. Idempotent, crash-safe.""" conn = get_connection(db_path) # Get last exported seq from database row = conn.execute( "SELECT last_seq FROM export_state WHERE id = 1" ).fetchone() last_exported = row['last_seq'] if row else 0 # Export new messages rows = conn.execute( "SELECT * FROM messages WHERE seq > ? ORDER BY seq", (last_exported,) ).fetchall() if not rows: conn.close() return max_seq = last_exported with open(jsonl_path, 'a') as f: for row in rows: msg = dict(row) # Optionally resolve blob refs for full content f.write(json.dumps(msg) + '\n') max_seq = max(max_seq, msg['seq']) # Update export state (atomic with file write visible) conn.execute(""" UPDATE export_state SET last_seq = ? WHERE id = 1 """, (max_seq,)) conn.commit() conn.close() ``` This gives you `tail -f bus.jsonl` for debugging while SQLite remains source of truth. **Note**: JSONL contains blob references, not resolved payloads. For full content, use SQLite queries. ## Blob Storage Content-addressable storage for large payloads. Uses **atomic write** pattern (temp + rename) to prevent partial reads. ```python import tempfile import hashlib def save_blob(content): """Atomically write blob. Returns ref or raises.""" data = json.dumps(content).encode() hash_hex = hashlib.sha256(data).hexdigest() ref = f"sha256-{hash_hex}" path = f".worker-state/blobs/{ref}" if os.path.exists(path): return ref # Already exists (content-addressable = idempotent) # Atomic write: temp file + fsync + rename os.makedirs(os.path.dirname(path), exist_ok=True) fd, tmp_path = tempfile.mkstemp(dir=os.path.dirname(path)) try: with os.fdopen(fd, 'wb') as f: f.write(data) f.flush() os.fsync(f.fileno()) os.replace(tmp_path, path) # Atomic on POSIX except: if os.path.exists(tmp_path): os.unlink(tmp_path) raise return ref def load_blob(ref): """Load blob content. Raises FileNotFoundError if missing.""" path = f".worker-state/blobs/{ref}" with open(path, 'rb') as f: return json.load(f) ``` ## Phased Implementation ### Phase 0: MVP - SQLite with WAL mode - Basic publish/poll - Heartbeats every 10s - Simple polling (500ms) ### Phase 1: Robustness - Task claim with transactions - JSONL export for debugging - Blob storage for large payloads - Dead agent detection ### Phase 2: Performance - FTS5 for full-text search - Message compaction/archival - Adaptive polling ### Phase 3: Scale (if ever) - Read replicas - Sharding by correlation_id - Background workers for maintenance ## Integration Points ### With Worker State Machine (skills-4oj) State transitions publish messages: ```python publish(db, worker_id, { 'type': 'state_change', 'payload': {'from': 'WORKING', 'to': 'IN_REVIEW'} }) ``` ### With review-gate (skills-byq) Review requests/results via messages: ```python publish(db, worker_id, { 'type': 'review_request', 'correlation_id': task_id }) ``` ### With Observability (skills-yak) Status command queries SQLite directly: ```sql SELECT from_agent, type, ts, payload FROM messages WHERE type IN ('heartbeat', 'state_change') ORDER BY seq DESC LIMIT 100; ``` ## Open Questions 1. ~~**Database location**~~: `.worker-state/bus.db` (resolved) 2. **Export frequency**: On-commit hook vs periodic background? (defer to implementation) 3. **Message retention**: Archive messages older than min cursor? Vacuum schedule? 4. ~~**Schema migrations**~~: `meta.schema_version` table added; agents check on startup 5. **Security**: All agents currently trusted; add HMAC signing if isolation needed? 6. **Backpressure**: How to slow producers when consumers overwhelmed? ## Resolved from Spec Review | Issue | Resolution | |-------|------------| | `seq` not auto-increment | Changed to `INTEGER PRIMARY KEY AUTOINCREMENT` | | At-most-once delivery | Cursor now advanced only after ack | | Heartbeats in messages table | Separate `heartbeats` table with upsert | | Heartbeats block during LLM | `HeartbeatThread` runs independently | | JSONL line-count broken | `export_state` table tracks last seq | | Timestamp format unreliable | Changed to epoch milliseconds | | Zombie task claims | `task_claims` table with lease expiry | | Blob writes not atomic | Temp file + fsync + rename pattern | ## Nim Implementation Notes ### Dependencies ```nim # nimble install requires "tiny_sqlite >= 0.2.0" requires "jsony >= 1.1.0" # Optional, faster JSON ``` ### Build Configuration ```nim # config.nims or bus.nim.cfg --mm:orc --threads:on -d:release # Static SQLite (single binary): {.passC: "-DSQLITE_THREADSAFE=1".} {.compile: "libs/sqlite3.c".} ``` ### Project Structure ``` src/ ├── libs/sqlite3.c # SQLite amalgamation ├── db_layer.nim # Connection setup, PRAGMAs ├── bus.nim # Publish, poll, ack ├── heartbeat.nim # Dedicated thread + channel ├── claims.nim # Task claim with leases └── export.nim # JSONL export ``` ### Key Patterns 1. **One connection per thread** - never share `DbConn` 2. **Channels for control** - stop signals, status updates 3. **Short transactions** - BEGIN IMMEDIATE, do work, COMMIT quickly 4. **Exceptions for errors** - catch `DbError`, convert to meaningful messages ## References - Beads: https://github.com/steveyegge/beads - Tissue: https://github.com/evil-mind-evil-sword/tissue - SQLite WAL mode: https://sqlite.org/wal.html - SQLite BEGIN IMMEDIATE: https://sqlite.org/lang_transaction.html - tiny_sqlite: https://github.com/GULPF/tiny_sqlite