Multi-agent coordination CLI with SQLite message bus: - State machine: ASSIGNED -> WORKING -> IN_REVIEW -> APPROVED -> COMPLETED - Commands: spawn, start, done, approve, merge, cancel, fail, heartbeat - SQLite WAL mode, dedicated heartbeat thread, channel-based IPC - cligen for CLI, tiny_sqlite for DB, ORC memory management Design docs for branch-per-worker, state machine, message passing, and human observability patterns. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
22 KiB
Message Passing Layer Design
Status: Draft (v4 - Nim implementation) Bead: skills-ms5 Epic: skills-s6y (Multi-agent orchestration: Lego brick architecture) Language: Nim (ORC, threads, tiny_sqlite)
Changelog
- v4: Nim implementation details (tiny_sqlite, dedicated heartbeat thread, channels)
- v3: Fixed BLOCKERs from orch spec-review (seq autoincrement, ack semantics, heartbeats table, epoch timestamps, JSONL export, task leases)
- v2: Changed to SQLite primary (was JSONL+flock)
Overview
This document defines the local message passing layer for multi-agent coordination. Agents communicate through a shared SQLite database with ACID guarantees. A JSONL export provides human debugging and git history.
Design Principles
- SQLite as source of truth - Atomic commits, crash recovery, indexed queries
- JSONL for observability - Human-readable export for debugging and git diffs
- File-based, no network - Any agent with SQLite bindings can participate
- Liveness detection - Heartbeats to detect dead agents
- Blob storage for large payloads - Keep database lean
Why SQLite Over JSONL+flock
| Concern | JSONL+flock | SQLite |
|---|---|---|
| Crash mid-write | Corrupted log, manual recovery | Atomic commit, automatic rollback |
| Payload >4KB | Interleaved writes possible | Always atomic |
| Query efficiency | O(N) scan | O(1) with index |
| Portability | POSIX flock only | Works everywhere |
| Complexity | DIY recovery logic | Built-in WAL, transactions |
See docs/design/message-passing-comparison.md for full analysis.
Prior Art
| System | Approach | Key Insight |
|---|---|---|
| Beads | SQLite + JSONL export | SQLite for coordination, JSONL for git |
| Tissue | JSONL primary, SQLite cache | Simpler but less safe |
| Kafka | Append-only log, offsets | Total ordering, replay |
| Redis Streams | Consumer groups, claims | First claim wins |
Directory Structure
.worker-state/
├── bus.db # SQLite - source of truth (gitignored)
├── bus.jsonl # JSONL export for debugging/git (read-only)
├── blobs/ # Content-addressable storage for large payloads
│ └── sha256-a1b2c3...
└── workers/
└── worker-auth.json # Worker state (separate from messages)
Database Schema
-- Messages table (seq is rowid-based for true auto-increment)
CREATE TABLE messages (
seq INTEGER PRIMARY KEY AUTOINCREMENT, -- Guaranteed monotonic, auto-assigned
id TEXT NOT NULL UNIQUE, -- UUID for idempotent retries
ts_ms INTEGER NOT NULL, -- Epoch milliseconds (reliable comparisons)
from_agent TEXT NOT NULL,
to_agent TEXT, -- NULL = broadcast
type TEXT NOT NULL,
correlation_id TEXT,
in_reply_to TEXT,
payload TEXT, -- JSON blob or NULL
payload_ref TEXT, -- Blob hash for large payloads
CHECK (payload IS NULL OR payload_ref IS NULL) -- Not both
);
CREATE INDEX idx_messages_to_seq ON messages(to_agent, seq);
CREATE INDEX idx_messages_correlation_seq ON messages(correlation_id, seq);
CREATE INDEX idx_messages_type_from_ts ON messages(type, from_agent, ts_ms);
-- Cursors table (per-agent read position)
-- NOTE: Cursor is only advanced AFTER successful processing (at-least-once)
CREATE TABLE cursors (
agent_id TEXT PRIMARY KEY,
last_acked_seq INTEGER NOT NULL DEFAULT 0, -- Last successfully processed
updated_at_ms INTEGER NOT NULL
);
-- Heartbeats table (separate from messages to avoid write contention)
-- One row per agent, upsert pattern
CREATE TABLE heartbeats (
agent_id TEXT PRIMARY KEY,
ts_ms INTEGER NOT NULL,
status TEXT NOT NULL, -- idle, working, blocked
current_task TEXT,
progress REAL
);
-- Task claims with lease expiry (prevents zombie claims)
CREATE TABLE task_claims (
task_id TEXT PRIMARY KEY,
claimed_by TEXT NOT NULL,
claimed_at_ms INTEGER NOT NULL,
lease_until_ms INTEGER NOT NULL -- Must renew or release
);
-- Schema versioning for migrations
CREATE TABLE meta (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
);
INSERT INTO meta (key, value) VALUES ('schema_version', '1');
-- Export tracking (not line count!)
CREATE TABLE export_state (
id INTEGER PRIMARY KEY CHECK (id = 1),
last_seq INTEGER NOT NULL DEFAULT 0
);
INSERT INTO export_state (id, last_seq) VALUES (1, 0);
Connection Setup (REQUIRED)
PRAGMAs must be set on every connection. Each thread must have its own connection.
import tiny_sqlite
proc openBusDb*(dbPath: string): DbConn =
## Open database with required PRAGMAs. One connection per thread.
result = openDatabase(dbPath)
result.exec("PRAGMA busy_timeout = 5000")
result.exec("PRAGMA foreign_keys = ON")
result.exec("PRAGMA journal_mode = WAL")
result.exec("PRAGMA synchronous = NORMAL")
Critical: Do NOT share DbConn across threads. Each thread opens its own connection.
Message Schema
Same fields as before, stored in SQLite:
{
"id": "uuid-v4",
"ts": "2026-01-10T14:00:00.000Z",
"from": "worker-auth",
"to": "orchestrator",
"type": "task_done",
"correlation_id": "skills-abc",
"payload": {
"branch": "worker-auth/skills-abc",
"commit": "a1b2c3d4"
}
}
Message Types
Task Lifecycle:
task_assign- Orchestrator assigns task to workertask_claim- Worker claims a task (first wins)task_progress- Worker reports progresstask_done- Worker completed tasktask_failed- Worker failed tasktask_blocked- Worker blocked on dependency
Coordination:
heartbeat- Liveness signalstate_change- Worker state transitionreview_request- Request review of workreview_result- Review approved/rejected
System:
agent_started- Agent came onlineagent_stopped- Agent shutting down gracefully
Delivery Semantics
This layer provides AT-LEAST-ONCE delivery.
- Messages are durable once committed
- Cursor only advances after successful processing
- Handlers MUST be idempotent or deduplicate by
message.id - Out-of-order processing is possible between unrelated messages
Write Protocol
Publishing Messages
import sqlite3
import uuid
import json
import time
def publish(db_path, agent_id, message):
msg_id = message.get('id') or str(uuid.uuid4())
ts_ms = int(time.time() * 1000) # Epoch milliseconds
# Handle large payloads
payload = message.get('payload')
payload_ref = None
if payload:
payload_json = json.dumps(payload)
if len(payload_json) > 4096:
payload_ref = save_blob(payload) # Atomic write (see Blob Storage)
payload = None
else:
payload = payload_json
conn = get_connection(db_path)
try:
# Use plain BEGIN for simple inserts (less contention than IMMEDIATE)
# seq is auto-assigned by SQLite
conn.execute("""
INSERT INTO messages (id, ts_ms, from_agent, to_agent, type,
correlation_id, in_reply_to, payload, payload_ref)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
msg_id,
ts_ms,
agent_id,
message.get('to'),
message['type'],
message.get('correlation_id'),
message.get('in_reply_to'),
payload,
payload_ref
))
conn.commit()
except sqlite3.IntegrityError:
# Duplicate id = already published (idempotent retry)
conn.rollback()
except Exception:
conn.rollback()
raise
finally:
conn.close()
return msg_id
Write Contention
- SQLite WAL allows concurrent readers but one writer at a time
busy_timeout = 5000handles short contention- For high-frequency writes, add retry with exponential backoff
- Heartbeats use separate table to avoid contending with task messages
Read Protocol
Polling for New Messages (At-Least-Once)
CRITICAL: Do NOT update cursor until message is successfully processed.
def poll(db_path, agent_id, limit=100):
"""Fetch unacknowledged messages. Does NOT advance cursor."""
conn = get_connection(db_path)
# Get cursor position
cursor = conn.execute(
"SELECT last_acked_seq FROM cursors WHERE agent_id = ?",
(agent_id,)
).fetchone()
last_seq = cursor['last_acked_seq'] if cursor else 0
# Fetch new messages for this agent (or broadcasts)
rows = conn.execute("""
SELECT * FROM messages
WHERE seq > ?
AND (to_agent IS NULL OR to_agent = ?)
ORDER BY seq
LIMIT ?
""", (last_seq, agent_id, limit)).fetchall()
conn.close()
# Convert to dicts, resolve blob refs
messages = []
for row in rows:
msg = dict(row)
if msg['payload_ref']:
try:
msg['payload'] = load_blob(msg['payload_ref'])
except FileNotFoundError:
msg['payload_error'] = 'blob_missing'
elif msg['payload']:
try:
msg['payload'] = json.loads(msg['payload'])
except json.JSONDecodeError:
msg['payload_error'] = 'decode_failed'
messages.append(msg)
return messages
def ack(db_path, agent_id, seq):
"""Acknowledge successful processing of message. Advances cursor."""
conn = get_connection(db_path)
ts_ms = int(time.time() * 1000)
# Only advance forward (monotonic)
conn.execute("""
INSERT INTO cursors (agent_id, last_acked_seq, updated_at_ms)
VALUES (?, ?, ?)
ON CONFLICT(agent_id) DO UPDATE SET
last_acked_seq = MAX(cursors.last_acked_seq, excluded.last_acked_seq),
updated_at_ms = excluded.updated_at_ms
""", (agent_id, seq, ts_ms))
conn.commit()
conn.close()
Processing Loop
def process_loop(db_path, agent_id, handler):
"""Process messages with at-least-once semantics."""
while running:
messages = poll(db_path, agent_id)
for msg in messages:
try:
handler(msg)
ack(db_path, agent_id, msg['seq']) # Only after success
except Exception as e:
# Don't ack - message will be redelivered
log.error(f"Failed to process {msg['id']}: {e}")
break # Stop batch on failure
time.sleep(poll_interval)
Adaptive Polling
POLL_ACTIVE = 0.2 # 200ms when working
POLL_IDLE = 2.0 # 2s when idle
POLL_BACKOFF = 1.5 # Multiplier on each empty poll
poll_interval = POLL_ACTIVE
while running:
messages = poll(db_path, my_id)
if messages:
process(messages)
poll_interval = POLL_ACTIVE # Reset to fast
else:
poll_interval = min(poll_interval * POLL_BACKOFF, POLL_IDLE)
time.sleep(poll_interval)
Heartbeat Protocol
Heartbeats use a separate table with upsert to avoid contending with task messages.
CRITICAL: Heartbeats MUST run in a dedicated thread to avoid false death detection during blocking operations (LLM calls can take 60s+).
Nim Implementation
Nim threads don't share memory by default. Use channels for communication:
import std/[times, os, channels]
import tiny_sqlite
type
HeartbeatCmd* = enum
hbUpdateStatus, hbStop
HeartbeatMsg* = object
cmd*: HeartbeatCmd
status*: string
task*: string
progress*: float
var heartbeatChan*: Channel[HeartbeatMsg]
proc writeHeartbeat(db: DbConn, agentId, status, task: string, progress: float) =
let tsMs = toUnix(getTime()) * 1000 + nanosecond(getTime()) div 1_000_000
db.exec("""
INSERT INTO heartbeats (agent_id, ts_ms, status, current_task, progress)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(agent_id) DO UPDATE SET
ts_ms = excluded.ts_ms,
status = excluded.status,
current_task = excluded.current_task,
progress = excluded.progress
""", agentId, tsMs, status, task, progress)
proc heartbeatWorker*(args: tuple[dbPath, agentId: string]) {.thread.} =
## Dedicated heartbeat thread - owns its own DB connection
let db = openBusDb(args.dbPath)
var status = "idle"
var task = ""
var progress = 0.0
while true:
# Check for commands (non-blocking)
let tried = heartbeatChan.tryRecv()
if tried.dataAvailable:
case tried.msg.cmd
of hbStop:
db.close()
return
of hbUpdateStatus:
status = tried.msg.status
task = tried.msg.task
progress = tried.msg.progress
# Write heartbeat
try:
writeHeartbeat(db, args.agentId, status, task, progress)
except CatchableError:
discard # Log but don't crash
sleep(10_000) # 10 seconds
# Usage from main thread:
proc startHeartbeat*(dbPath, agentId: string): Thread[tuple[dbPath, agentId: string]] =
heartbeatChan.open()
createThread(result, heartbeatWorker, (dbPath, agentId))
proc updateHeartbeatStatus*(status, task: string, progress: float) =
heartbeatChan.send(HeartbeatMsg(cmd: hbUpdateStatus, status: status, task: task, progress: progress))
proc stopHeartbeat*() =
heartbeatChan.send(HeartbeatMsg(cmd: hbStop))
Heartbeat Rules
- Emit every 10 seconds via background thread
- Include current status:
idle,working,blocked - Stale thresholds: 30s = WARN, 100s = STALE, 5min = DEAD
Liveness Query
-- Find suspected dead agents (no heartbeat in 30s)
-- Uses epoch ms for reliable comparison
SELECT agent_id, ts_ms,
(strftime('%s', 'now') * 1000 - ts_ms) / 1000 AS age_seconds
FROM heartbeats
WHERE ts_ms < (strftime('%s', 'now') * 1000 - 30000);
Task Claim Pattern
Task claims use a dedicated table with leases to prevent zombie claims when agents die.
LEASE_DURATION_MS = 60000 # 60 seconds
def try_claim_task(db_path, task_id, agent_id):
"""Attempt to claim a task. Returns True if successful."""
conn = get_connection(db_path)
ts_ms = int(time.time() * 1000)
lease_until = ts_ms + LEASE_DURATION_MS
try:
conn.execute("BEGIN IMMEDIATE")
# Check for existing valid claim
existing = conn.execute("""
SELECT claimed_by, lease_until_ms FROM task_claims
WHERE task_id = ?
""", (task_id,)).fetchone()
if existing:
if existing['lease_until_ms'] > ts_ms:
# Valid claim exists
conn.rollback()
return False
else:
# Expired claim - delete and reclaim
conn.execute("DELETE FROM task_claims WHERE task_id = ?", (task_id,))
# Insert new claim
conn.execute("""
INSERT INTO task_claims (task_id, claimed_by, claimed_at_ms, lease_until_ms)
VALUES (?, ?, ?, ?)
""", (task_id, agent_id, ts_ms, lease_until))
conn.commit()
return True
except sqlite3.OperationalError:
# Lock contention - another agent got there first
conn.rollback()
return False
finally:
conn.close()
def renew_claim(db_path, task_id, agent_id):
"""Renew lease on a claimed task. Call periodically while working."""
conn = get_connection(db_path)
ts_ms = int(time.time() * 1000)
lease_until = ts_ms + LEASE_DURATION_MS
result = conn.execute("""
UPDATE task_claims
SET lease_until_ms = ?
WHERE task_id = ? AND claimed_by = ?
""", (lease_until, task_id, agent_id))
conn.commit()
updated = result.rowcount > 0
conn.close()
return updated
def release_claim(db_path, task_id, agent_id):
"""Release a claim when done (success or failure)."""
conn = get_connection(db_path)
conn.execute("""
DELETE FROM task_claims
WHERE task_id = ? AND claimed_by = ?
""", (task_id, agent_id))
conn.commit()
conn.close()
Claim Lifecycle
- Agent calls
try_claim_task()- gets exclusive claim with 60s lease - Agent periodically calls
renew_claim()(every 30s) while working - On completion, agent calls
release_claim() - If agent dies, lease expires and task becomes claimable again
JSONL Export for Debugging
Background process or post-commit hook exports to JSONL.
CRITICAL: Track last exported seq in database, NOT by counting lines. Seq values can have gaps from rollbacks.
def export_to_jsonl(db_path, jsonl_path):
"""Export new messages to JSONL. Idempotent, crash-safe."""
conn = get_connection(db_path)
# Get last exported seq from database
row = conn.execute(
"SELECT last_seq FROM export_state WHERE id = 1"
).fetchone()
last_exported = row['last_seq'] if row else 0
# Export new messages
rows = conn.execute(
"SELECT * FROM messages WHERE seq > ? ORDER BY seq",
(last_exported,)
).fetchall()
if not rows:
conn.close()
return
max_seq = last_exported
with open(jsonl_path, 'a') as f:
for row in rows:
msg = dict(row)
# Optionally resolve blob refs for full content
f.write(json.dumps(msg) + '\n')
max_seq = max(max_seq, msg['seq'])
# Update export state (atomic with file write visible)
conn.execute("""
UPDATE export_state SET last_seq = ? WHERE id = 1
""", (max_seq,))
conn.commit()
conn.close()
This gives you tail -f bus.jsonl for debugging while SQLite remains source of truth.
Note: JSONL contains blob references, not resolved payloads. For full content, use SQLite queries.
Blob Storage
Content-addressable storage for large payloads. Uses atomic write pattern (temp + rename) to prevent partial reads.
import tempfile
import hashlib
def save_blob(content):
"""Atomically write blob. Returns ref or raises."""
data = json.dumps(content).encode()
hash_hex = hashlib.sha256(data).hexdigest()
ref = f"sha256-{hash_hex}"
path = f".worker-state/blobs/{ref}"
if os.path.exists(path):
return ref # Already exists (content-addressable = idempotent)
# Atomic write: temp file + fsync + rename
os.makedirs(os.path.dirname(path), exist_ok=True)
fd, tmp_path = tempfile.mkstemp(dir=os.path.dirname(path))
try:
with os.fdopen(fd, 'wb') as f:
f.write(data)
f.flush()
os.fsync(f.fileno())
os.replace(tmp_path, path) # Atomic on POSIX
except:
if os.path.exists(tmp_path):
os.unlink(tmp_path)
raise
return ref
def load_blob(ref):
"""Load blob content. Raises FileNotFoundError if missing."""
path = f".worker-state/blobs/{ref}"
with open(path, 'rb') as f:
return json.load(f)
Phased Implementation
Phase 0: MVP
- SQLite with WAL mode
- Basic publish/poll
- Heartbeats every 10s
- Simple polling (500ms)
Phase 1: Robustness
- Task claim with transactions
- JSONL export for debugging
- Blob storage for large payloads
- Dead agent detection
Phase 2: Performance
- FTS5 for full-text search
- Message compaction/archival
- Adaptive polling
Phase 3: Scale (if ever)
- Read replicas
- Sharding by correlation_id
- Background workers for maintenance
Integration Points
With Worker State Machine (skills-4oj)
State transitions publish messages:
publish(db, worker_id, {
'type': 'state_change',
'payload': {'from': 'WORKING', 'to': 'IN_REVIEW'}
})
With review-gate (skills-byq)
Review requests/results via messages:
publish(db, worker_id, {
'type': 'review_request',
'correlation_id': task_id
})
With Observability (skills-yak)
Status command queries SQLite directly:
SELECT from_agent, type, ts, payload
FROM messages
WHERE type IN ('heartbeat', 'state_change')
ORDER BY seq DESC LIMIT 100;
Open Questions
Database location:.worker-state/bus.db(resolved)- Export frequency: On-commit hook vs periodic background? (defer to implementation)
- Message retention: Archive messages older than min cursor? Vacuum schedule?
Schema migrations:meta.schema_versiontable added; agents check on startup- Security: All agents currently trusted; add HMAC signing if isolation needed?
- Backpressure: How to slow producers when consumers overwhelmed?
Resolved from Spec Review
| Issue | Resolution |
|---|---|
seq not auto-increment |
Changed to INTEGER PRIMARY KEY AUTOINCREMENT |
| At-most-once delivery | Cursor now advanced only after ack |
| Heartbeats in messages table | Separate heartbeats table with upsert |
| Heartbeats block during LLM | HeartbeatThread runs independently |
| JSONL line-count broken | export_state table tracks last seq |
| Timestamp format unreliable | Changed to epoch milliseconds |
| Zombie task claims | task_claims table with lease expiry |
| Blob writes not atomic | Temp file + fsync + rename pattern |
Nim Implementation Notes
Dependencies
# nimble install
requires "tiny_sqlite >= 0.2.0"
requires "jsony >= 1.1.0" # Optional, faster JSON
Build Configuration
# config.nims or bus.nim.cfg
--mm:orc
--threads:on
-d:release
# Static SQLite (single binary):
{.passC: "-DSQLITE_THREADSAFE=1".}
{.compile: "libs/sqlite3.c".}
Project Structure
src/
├── libs/sqlite3.c # SQLite amalgamation
├── db_layer.nim # Connection setup, PRAGMAs
├── bus.nim # Publish, poll, ack
├── heartbeat.nim # Dedicated thread + channel
├── claims.nim # Task claim with leases
└── export.nim # JSONL export
Key Patterns
- One connection per thread - never share
DbConn - Channels for control - stop signals, status updates
- Short transactions - BEGIN IMMEDIATE, do work, COMMIT quickly
- Exceptions for errors - catch
DbError, convert to meaningful messages
References
- Beads: https://github.com/steveyegge/beads
- Tissue: https://github.com/evil-mind-evil-sword/tissue
- SQLite WAL mode: https://sqlite.org/wal.html
- SQLite BEGIN IMMEDIATE: https://sqlite.org/lang_transaction.html
- tiny_sqlite: https://github.com/GULPF/tiny_sqlite