skills/docs/design/message-passing-layer.md
dan 1c66d019bd feat: add worker CLI scaffold in Nim
Multi-agent coordination CLI with SQLite message bus:
- State machine: ASSIGNED -> WORKING -> IN_REVIEW -> APPROVED -> COMPLETED
- Commands: spawn, start, done, approve, merge, cancel, fail, heartbeat
- SQLite WAL mode, dedicated heartbeat thread, channel-based IPC
- cligen for CLI, tiny_sqlite for DB, ORC memory management

Design docs for branch-per-worker, state machine, message passing,
and human observability patterns.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-10 18:47:47 -08:00

22 KiB

Message Passing Layer Design

Status: Draft (v4 - Nim implementation) Bead: skills-ms5 Epic: skills-s6y (Multi-agent orchestration: Lego brick architecture) Language: Nim (ORC, threads, tiny_sqlite)

Changelog

  • v4: Nim implementation details (tiny_sqlite, dedicated heartbeat thread, channels)
  • v3: Fixed BLOCKERs from orch spec-review (seq autoincrement, ack semantics, heartbeats table, epoch timestamps, JSONL export, task leases)
  • v2: Changed to SQLite primary (was JSONL+flock)

Overview

This document defines the local message passing layer for multi-agent coordination. Agents communicate through a shared SQLite database with ACID guarantees. A JSONL export provides human debugging and git history.

Design Principles

  1. SQLite as source of truth - Atomic commits, crash recovery, indexed queries
  2. JSONL for observability - Human-readable export for debugging and git diffs
  3. File-based, no network - Any agent with SQLite bindings can participate
  4. Liveness detection - Heartbeats to detect dead agents
  5. Blob storage for large payloads - Keep database lean

Why SQLite Over JSONL+flock

Concern JSONL+flock SQLite
Crash mid-write Corrupted log, manual recovery Atomic commit, automatic rollback
Payload >4KB Interleaved writes possible Always atomic
Query efficiency O(N) scan O(1) with index
Portability POSIX flock only Works everywhere
Complexity DIY recovery logic Built-in WAL, transactions

See docs/design/message-passing-comparison.md for full analysis.

Prior Art

System Approach Key Insight
Beads SQLite + JSONL export SQLite for coordination, JSONL for git
Tissue JSONL primary, SQLite cache Simpler but less safe
Kafka Append-only log, offsets Total ordering, replay
Redis Streams Consumer groups, claims First claim wins

Directory Structure

.worker-state/
├── bus.db                # SQLite - source of truth (gitignored)
├── bus.jsonl             # JSONL export for debugging/git (read-only)
├── blobs/                # Content-addressable storage for large payloads
│   └── sha256-a1b2c3...
└── workers/
    └── worker-auth.json  # Worker state (separate from messages)

Database Schema

-- Messages table (seq is rowid-based for true auto-increment)
CREATE TABLE messages (
    seq INTEGER PRIMARY KEY AUTOINCREMENT,  -- Guaranteed monotonic, auto-assigned
    id TEXT NOT NULL UNIQUE,                -- UUID for idempotent retries
    ts_ms INTEGER NOT NULL,                 -- Epoch milliseconds (reliable comparisons)
    from_agent TEXT NOT NULL,
    to_agent TEXT,                          -- NULL = broadcast
    type TEXT NOT NULL,
    correlation_id TEXT,
    in_reply_to TEXT,
    payload TEXT,                           -- JSON blob or NULL
    payload_ref TEXT,                       -- Blob hash for large payloads
    CHECK (payload IS NULL OR payload_ref IS NULL)  -- Not both
);

CREATE INDEX idx_messages_to_seq ON messages(to_agent, seq);
CREATE INDEX idx_messages_correlation_seq ON messages(correlation_id, seq);
CREATE INDEX idx_messages_type_from_ts ON messages(type, from_agent, ts_ms);

-- Cursors table (per-agent read position)
-- NOTE: Cursor is only advanced AFTER successful processing (at-least-once)
CREATE TABLE cursors (
    agent_id TEXT PRIMARY KEY,
    last_acked_seq INTEGER NOT NULL DEFAULT 0,  -- Last successfully processed
    updated_at_ms INTEGER NOT NULL
);

-- Heartbeats table (separate from messages to avoid write contention)
-- One row per agent, upsert pattern
CREATE TABLE heartbeats (
    agent_id TEXT PRIMARY KEY,
    ts_ms INTEGER NOT NULL,
    status TEXT NOT NULL,              -- idle, working, blocked
    current_task TEXT,
    progress REAL
);

-- Task claims with lease expiry (prevents zombie claims)
CREATE TABLE task_claims (
    task_id TEXT PRIMARY KEY,
    claimed_by TEXT NOT NULL,
    claimed_at_ms INTEGER NOT NULL,
    lease_until_ms INTEGER NOT NULL    -- Must renew or release
);

-- Schema versioning for migrations
CREATE TABLE meta (
    key TEXT PRIMARY KEY,
    value TEXT NOT NULL
);
INSERT INTO meta (key, value) VALUES ('schema_version', '1');

-- Export tracking (not line count!)
CREATE TABLE export_state (
    id INTEGER PRIMARY KEY CHECK (id = 1),
    last_seq INTEGER NOT NULL DEFAULT 0
);
INSERT INTO export_state (id, last_seq) VALUES (1, 0);

Connection Setup (REQUIRED)

PRAGMAs must be set on every connection. Each thread must have its own connection.

import tiny_sqlite

proc openBusDb*(dbPath: string): DbConn =
  ## Open database with required PRAGMAs. One connection per thread.
  result = openDatabase(dbPath)
  result.exec("PRAGMA busy_timeout = 5000")
  result.exec("PRAGMA foreign_keys = ON")
  result.exec("PRAGMA journal_mode = WAL")
  result.exec("PRAGMA synchronous = NORMAL")

Critical: Do NOT share DbConn across threads. Each thread opens its own connection.

Message Schema

Same fields as before, stored in SQLite:

{
  "id": "uuid-v4",
  "ts": "2026-01-10T14:00:00.000Z",
  "from": "worker-auth",
  "to": "orchestrator",
  "type": "task_done",
  "correlation_id": "skills-abc",
  "payload": {
    "branch": "worker-auth/skills-abc",
    "commit": "a1b2c3d4"
  }
}

Message Types

Task Lifecycle:

  • task_assign - Orchestrator assigns task to worker
  • task_claim - Worker claims a task (first wins)
  • task_progress - Worker reports progress
  • task_done - Worker completed task
  • task_failed - Worker failed task
  • task_blocked - Worker blocked on dependency

Coordination:

  • heartbeat - Liveness signal
  • state_change - Worker state transition
  • review_request - Request review of work
  • review_result - Review approved/rejected

System:

  • agent_started - Agent came online
  • agent_stopped - Agent shutting down gracefully

Delivery Semantics

This layer provides AT-LEAST-ONCE delivery.

  • Messages are durable once committed
  • Cursor only advances after successful processing
  • Handlers MUST be idempotent or deduplicate by message.id
  • Out-of-order processing is possible between unrelated messages

Write Protocol

Publishing Messages

import sqlite3
import uuid
import json
import time

def publish(db_path, agent_id, message):
    msg_id = message.get('id') or str(uuid.uuid4())
    ts_ms = int(time.time() * 1000)  # Epoch milliseconds

    # Handle large payloads
    payload = message.get('payload')
    payload_ref = None
    if payload:
        payload_json = json.dumps(payload)
        if len(payload_json) > 4096:
            payload_ref = save_blob(payload)  # Atomic write (see Blob Storage)
            payload = None
        else:
            payload = payload_json

    conn = get_connection(db_path)
    try:
        # Use plain BEGIN for simple inserts (less contention than IMMEDIATE)
        # seq is auto-assigned by SQLite
        conn.execute("""
            INSERT INTO messages (id, ts_ms, from_agent, to_agent, type,
                                  correlation_id, in_reply_to, payload, payload_ref)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
        """, (
            msg_id,
            ts_ms,
            agent_id,
            message.get('to'),
            message['type'],
            message.get('correlation_id'),
            message.get('in_reply_to'),
            payload,
            payload_ref
        ))
        conn.commit()
    except sqlite3.IntegrityError:
        # Duplicate id = already published (idempotent retry)
        conn.rollback()
    except Exception:
        conn.rollback()
        raise
    finally:
        conn.close()

    return msg_id

Write Contention

  • SQLite WAL allows concurrent readers but one writer at a time
  • busy_timeout = 5000 handles short contention
  • For high-frequency writes, add retry with exponential backoff
  • Heartbeats use separate table to avoid contending with task messages

Read Protocol

Polling for New Messages (At-Least-Once)

CRITICAL: Do NOT update cursor until message is successfully processed.

def poll(db_path, agent_id, limit=100):
    """Fetch unacknowledged messages. Does NOT advance cursor."""
    conn = get_connection(db_path)

    # Get cursor position
    cursor = conn.execute(
        "SELECT last_acked_seq FROM cursors WHERE agent_id = ?",
        (agent_id,)
    ).fetchone()
    last_seq = cursor['last_acked_seq'] if cursor else 0

    # Fetch new messages for this agent (or broadcasts)
    rows = conn.execute("""
        SELECT * FROM messages
        WHERE seq > ?
          AND (to_agent IS NULL OR to_agent = ?)
        ORDER BY seq
        LIMIT ?
    """, (last_seq, agent_id, limit)).fetchall()

    conn.close()

    # Convert to dicts, resolve blob refs
    messages = []
    for row in rows:
        msg = dict(row)
        if msg['payload_ref']:
            try:
                msg['payload'] = load_blob(msg['payload_ref'])
            except FileNotFoundError:
                msg['payload_error'] = 'blob_missing'
        elif msg['payload']:
            try:
                msg['payload'] = json.loads(msg['payload'])
            except json.JSONDecodeError:
                msg['payload_error'] = 'decode_failed'
        messages.append(msg)

    return messages


def ack(db_path, agent_id, seq):
    """Acknowledge successful processing of message. Advances cursor."""
    conn = get_connection(db_path)
    ts_ms = int(time.time() * 1000)

    # Only advance forward (monotonic)
    conn.execute("""
        INSERT INTO cursors (agent_id, last_acked_seq, updated_at_ms)
        VALUES (?, ?, ?)
        ON CONFLICT(agent_id) DO UPDATE SET
            last_acked_seq = MAX(cursors.last_acked_seq, excluded.last_acked_seq),
            updated_at_ms = excluded.updated_at_ms
    """, (agent_id, seq, ts_ms))
    conn.commit()
    conn.close()

Processing Loop

def process_loop(db_path, agent_id, handler):
    """Process messages with at-least-once semantics."""
    while running:
        messages = poll(db_path, agent_id)
        for msg in messages:
            try:
                handler(msg)
                ack(db_path, agent_id, msg['seq'])  # Only after success
            except Exception as e:
                # Don't ack - message will be redelivered
                log.error(f"Failed to process {msg['id']}: {e}")
                break  # Stop batch on failure

        time.sleep(poll_interval)

Adaptive Polling

POLL_ACTIVE = 0.2    # 200ms when working
POLL_IDLE = 2.0      # 2s when idle
POLL_BACKOFF = 1.5   # Multiplier on each empty poll

poll_interval = POLL_ACTIVE

while running:
    messages = poll(db_path, my_id)
    if messages:
        process(messages)
        poll_interval = POLL_ACTIVE  # Reset to fast
    else:
        poll_interval = min(poll_interval * POLL_BACKOFF, POLL_IDLE)

    time.sleep(poll_interval)

Heartbeat Protocol

Heartbeats use a separate table with upsert to avoid contending with task messages.

CRITICAL: Heartbeats MUST run in a dedicated thread to avoid false death detection during blocking operations (LLM calls can take 60s+).

Nim Implementation

Nim threads don't share memory by default. Use channels for communication:

import std/[times, os, channels]
import tiny_sqlite

type
  HeartbeatCmd* = enum
    hbUpdateStatus, hbStop

  HeartbeatMsg* = object
    cmd*: HeartbeatCmd
    status*: string
    task*: string
    progress*: float

var heartbeatChan*: Channel[HeartbeatMsg]

proc writeHeartbeat(db: DbConn, agentId, status, task: string, progress: float) =
  let tsMs = toUnix(getTime()) * 1000 + nanosecond(getTime()) div 1_000_000
  db.exec("""
    INSERT INTO heartbeats (agent_id, ts_ms, status, current_task, progress)
    VALUES (?, ?, ?, ?, ?)
    ON CONFLICT(agent_id) DO UPDATE SET
      ts_ms = excluded.ts_ms,
      status = excluded.status,
      current_task = excluded.current_task,
      progress = excluded.progress
  """, agentId, tsMs, status, task, progress)

proc heartbeatWorker*(args: tuple[dbPath, agentId: string]) {.thread.} =
  ## Dedicated heartbeat thread - owns its own DB connection
  let db = openBusDb(args.dbPath)
  var status = "idle"
  var task = ""
  var progress = 0.0

  while true:
    # Check for commands (non-blocking)
    let tried = heartbeatChan.tryRecv()
    if tried.dataAvailable:
      case tried.msg.cmd
      of hbStop:
        db.close()
        return
      of hbUpdateStatus:
        status = tried.msg.status
        task = tried.msg.task
        progress = tried.msg.progress

    # Write heartbeat
    try:
      writeHeartbeat(db, args.agentId, status, task, progress)
    except CatchableError:
      discard  # Log but don't crash

    sleep(10_000)  # 10 seconds

# Usage from main thread:
proc startHeartbeat*(dbPath, agentId: string): Thread[tuple[dbPath, agentId: string]] =
  heartbeatChan.open()
  createThread(result, heartbeatWorker, (dbPath, agentId))

proc updateHeartbeatStatus*(status, task: string, progress: float) =
  heartbeatChan.send(HeartbeatMsg(cmd: hbUpdateStatus, status: status, task: task, progress: progress))

proc stopHeartbeat*() =
  heartbeatChan.send(HeartbeatMsg(cmd: hbStop))

Heartbeat Rules

  1. Emit every 10 seconds via background thread
  2. Include current status: idle, working, blocked
  3. Stale thresholds: 30s = WARN, 100s = STALE, 5min = DEAD

Liveness Query

-- Find suspected dead agents (no heartbeat in 30s)
-- Uses epoch ms for reliable comparison
SELECT agent_id, ts_ms,
       (strftime('%s', 'now') * 1000 - ts_ms) / 1000 AS age_seconds
FROM heartbeats
WHERE ts_ms < (strftime('%s', 'now') * 1000 - 30000);

Task Claim Pattern

Task claims use a dedicated table with leases to prevent zombie claims when agents die.

LEASE_DURATION_MS = 60000  # 60 seconds

def try_claim_task(db_path, task_id, agent_id):
    """Attempt to claim a task. Returns True if successful."""
    conn = get_connection(db_path)
    ts_ms = int(time.time() * 1000)
    lease_until = ts_ms + LEASE_DURATION_MS

    try:
        conn.execute("BEGIN IMMEDIATE")

        # Check for existing valid claim
        existing = conn.execute("""
            SELECT claimed_by, lease_until_ms FROM task_claims
            WHERE task_id = ?
        """, (task_id,)).fetchone()

        if existing:
            if existing['lease_until_ms'] > ts_ms:
                # Valid claim exists
                conn.rollback()
                return False
            else:
                # Expired claim - delete and reclaim
                conn.execute("DELETE FROM task_claims WHERE task_id = ?", (task_id,))

        # Insert new claim
        conn.execute("""
            INSERT INTO task_claims (task_id, claimed_by, claimed_at_ms, lease_until_ms)
            VALUES (?, ?, ?, ?)
        """, (task_id, agent_id, ts_ms, lease_until))

        conn.commit()
        return True

    except sqlite3.OperationalError:
        # Lock contention - another agent got there first
        conn.rollback()
        return False
    finally:
        conn.close()


def renew_claim(db_path, task_id, agent_id):
    """Renew lease on a claimed task. Call periodically while working."""
    conn = get_connection(db_path)
    ts_ms = int(time.time() * 1000)
    lease_until = ts_ms + LEASE_DURATION_MS

    result = conn.execute("""
        UPDATE task_claims
        SET lease_until_ms = ?
        WHERE task_id = ? AND claimed_by = ?
    """, (lease_until, task_id, agent_id))
    conn.commit()
    updated = result.rowcount > 0
    conn.close()
    return updated


def release_claim(db_path, task_id, agent_id):
    """Release a claim when done (success or failure)."""
    conn = get_connection(db_path)
    conn.execute("""
        DELETE FROM task_claims
        WHERE task_id = ? AND claimed_by = ?
    """, (task_id, agent_id))
    conn.commit()
    conn.close()

Claim Lifecycle

  1. Agent calls try_claim_task() - gets exclusive claim with 60s lease
  2. Agent periodically calls renew_claim() (every 30s) while working
  3. On completion, agent calls release_claim()
  4. If agent dies, lease expires and task becomes claimable again

JSONL Export for Debugging

Background process or post-commit hook exports to JSONL.

CRITICAL: Track last exported seq in database, NOT by counting lines. Seq values can have gaps from rollbacks.

def export_to_jsonl(db_path, jsonl_path):
    """Export new messages to JSONL. Idempotent, crash-safe."""
    conn = get_connection(db_path)

    # Get last exported seq from database
    row = conn.execute(
        "SELECT last_seq FROM export_state WHERE id = 1"
    ).fetchone()
    last_exported = row['last_seq'] if row else 0

    # Export new messages
    rows = conn.execute(
        "SELECT * FROM messages WHERE seq > ? ORDER BY seq",
        (last_exported,)
    ).fetchall()

    if not rows:
        conn.close()
        return

    max_seq = last_exported
    with open(jsonl_path, 'a') as f:
        for row in rows:
            msg = dict(row)
            # Optionally resolve blob refs for full content
            f.write(json.dumps(msg) + '\n')
            max_seq = max(max_seq, msg['seq'])

    # Update export state (atomic with file write visible)
    conn.execute("""
        UPDATE export_state SET last_seq = ? WHERE id = 1
    """, (max_seq,))
    conn.commit()
    conn.close()

This gives you tail -f bus.jsonl for debugging while SQLite remains source of truth.

Note: JSONL contains blob references, not resolved payloads. For full content, use SQLite queries.

Blob Storage

Content-addressable storage for large payloads. Uses atomic write pattern (temp + rename) to prevent partial reads.

import tempfile
import hashlib

def save_blob(content):
    """Atomically write blob. Returns ref or raises."""
    data = json.dumps(content).encode()
    hash_hex = hashlib.sha256(data).hexdigest()
    ref = f"sha256-{hash_hex}"
    path = f".worker-state/blobs/{ref}"

    if os.path.exists(path):
        return ref  # Already exists (content-addressable = idempotent)

    # Atomic write: temp file + fsync + rename
    os.makedirs(os.path.dirname(path), exist_ok=True)
    fd, tmp_path = tempfile.mkstemp(dir=os.path.dirname(path))
    try:
        with os.fdopen(fd, 'wb') as f:
            f.write(data)
            f.flush()
            os.fsync(f.fileno())
        os.replace(tmp_path, path)  # Atomic on POSIX
    except:
        if os.path.exists(tmp_path):
            os.unlink(tmp_path)
        raise

    return ref


def load_blob(ref):
    """Load blob content. Raises FileNotFoundError if missing."""
    path = f".worker-state/blobs/{ref}"
    with open(path, 'rb') as f:
        return json.load(f)

Phased Implementation

Phase 0: MVP

  • SQLite with WAL mode
  • Basic publish/poll
  • Heartbeats every 10s
  • Simple polling (500ms)

Phase 1: Robustness

  • Task claim with transactions
  • JSONL export for debugging
  • Blob storage for large payloads
  • Dead agent detection

Phase 2: Performance

  • FTS5 for full-text search
  • Message compaction/archival
  • Adaptive polling

Phase 3: Scale (if ever)

  • Read replicas
  • Sharding by correlation_id
  • Background workers for maintenance

Integration Points

With Worker State Machine (skills-4oj)

State transitions publish messages:

publish(db, worker_id, {
    'type': 'state_change',
    'payload': {'from': 'WORKING', 'to': 'IN_REVIEW'}
})

With review-gate (skills-byq)

Review requests/results via messages:

publish(db, worker_id, {
    'type': 'review_request',
    'correlation_id': task_id
})

With Observability (skills-yak)

Status command queries SQLite directly:

SELECT from_agent, type, ts, payload
FROM messages
WHERE type IN ('heartbeat', 'state_change')
ORDER BY seq DESC LIMIT 100;

Open Questions

  1. Database location: .worker-state/bus.db (resolved)
  2. Export frequency: On-commit hook vs periodic background? (defer to implementation)
  3. Message retention: Archive messages older than min cursor? Vacuum schedule?
  4. Schema migrations: meta.schema_version table added; agents check on startup
  5. Security: All agents currently trusted; add HMAC signing if isolation needed?
  6. Backpressure: How to slow producers when consumers overwhelmed?

Resolved from Spec Review

Issue Resolution
seq not auto-increment Changed to INTEGER PRIMARY KEY AUTOINCREMENT
At-most-once delivery Cursor now advanced only after ack
Heartbeats in messages table Separate heartbeats table with upsert
Heartbeats block during LLM HeartbeatThread runs independently
JSONL line-count broken export_state table tracks last seq
Timestamp format unreliable Changed to epoch milliseconds
Zombie task claims task_claims table with lease expiry
Blob writes not atomic Temp file + fsync + rename pattern

Nim Implementation Notes

Dependencies

# nimble install
requires "tiny_sqlite >= 0.2.0"
requires "jsony >= 1.1.0"  # Optional, faster JSON

Build Configuration

# config.nims or bus.nim.cfg
--mm:orc
--threads:on
-d:release

# Static SQLite (single binary):
{.passC: "-DSQLITE_THREADSAFE=1".}
{.compile: "libs/sqlite3.c".}

Project Structure

src/
├── libs/sqlite3.c       # SQLite amalgamation
├── db_layer.nim         # Connection setup, PRAGMAs
├── bus.nim              # Publish, poll, ack
├── heartbeat.nim        # Dedicated thread + channel
├── claims.nim           # Task claim with leases
└── export.nim           # JSONL export

Key Patterns

  1. One connection per thread - never share DbConn
  2. Channels for control - stop signals, status updates
  3. Short transactions - BEGIN IMMEDIATE, do work, COMMIT quickly
  4. Exceptions for errors - catch DbError, convert to meaningful messages

References