From 0253e6b1d6d22d4b17c194d33a8abb4cbe8b4209 Mon Sep 17 00:00:00 2001 From: dan Date: Sat, 24 Jan 2026 09:40:49 -0800 Subject: [PATCH] docs(specs): add worker message bus specification --- specs/worker-message-bus.md | 119 ++++++++++++++++++++++++++++++++++++ 1 file changed, 119 insertions(+) create mode 100644 specs/worker-message-bus.md diff --git a/specs/worker-message-bus.md b/specs/worker-message-bus.md new file mode 100644 index 0000000..f15f399 --- /dev/null +++ b/specs/worker-message-bus.md @@ -0,0 +1,119 @@ +# Worker Message Bus & Observability Design Spec + +**Status:** Draft +**Epic:** skills-29bp +**Related Issues:** skills-imei + +## 1. Problem Statement +The current worker orchestration relies on `beads` comments for communication. This has several drawbacks: +- **High Latency:** `bd` syncs with git, making real-time chat slow (seconds to minutes). +- **Unstructured:** Comments are raw text; parsing signals like `[WORKER:status]` is brittle. +- **Observability:** No easy way to "watch" a worker's internal state or logs without manually tailing files. + +## 2. Goals +1. **Low-Latency Messaging:** Sub-second communication between HQ and Workers. +2. **Structured Data:** Messages carry JSON payloads (progress, file lists, errors). +3. **Control Room:** Real-time TUI observability via `tmux` integration. +4. **Agent Neutrality:** Works with any agent (Claude, Pi, OpenCode) via CLI tools. + +## 3. Architecture + +### 3.1 The Bus (SQLite) +We will leverage the existing (but unused) `messages` table in `.worker-state/bus.db`. + +**Schema (Existing in `db.nim`):** +```sql +CREATE TABLE messages ( + seq INTEGER PRIMARY KEY AUTOINCREMENT, + id TEXT NOT NULL UNIQUE, + ts_ms INTEGER NOT NULL, + from_agent TEXT NOT NULL, + to_agent TEXT, + type TEXT NOT NULL, + payload TEXT -- JSON +); +CREATE TABLE cursors ( + agent_id TEXT PRIMARY KEY, + last_acked_seq INTEGER NOT NULL DEFAULT 0 +); +``` + +### 3.2 CLI Interface (`worker msg`) + +We will add a `msg` subcommand group to the `worker` binary. + +#### `worker msg send [payload]` +Sends a message to the bus. +- **Automatic Metadata:** + - `from_agent`: Defaults to current task ID (if in worktree) or "hq". + - `ts_ms`: Current timestamp. +- **Payload:** Accepts raw JSON string or `@filename.json`. +- **Example:** + ```bash + worker msg send status '{"progress": 0.5, "step": "tests"}' + worker msg send cmd '{"action": "stop"}' --to skills-123 + ``` + +#### `worker msg poll` +Fetches unread messages for the current agent. +- **Behavior:** Reads from `last_acked_seq`, returns messages, updates cursor. +- **Output:** JSON Lines (one message per line). +- **Example:** + ```bash + # Returns stream of new messages + worker msg poll + ``` + +#### `worker msg follow [--task ]` +Tails the message bus in real-time (like `tail -f`). +- **Behavior:** Stateless tailing (does not update cursors). +- **Use Case:** Human monitoring or `tmux` dashboards. + +### 3.3 Observability (`worker watch`) + +A new command to spawn a "Control Room" view in `tmux`. + +**Command:** `worker watch ` + +**Logic:** +1. Check if `tmux` is running. +2. Create a new window named `w:`. +3. Split window vertically (70% top, 30% bottom). +4. **Top Pane:** `tail -f worktrees//.worker.log` (Agent logs). +5. **Bottom Pane:** `worker msg follow --task ` (Structured events). + +## 4. Implementation Plan + +### Phase A: Database Logic (`src/worker/db.nim`) +- [ ] Verify `publish` handles JSON serialization correctly. +- [ ] Implement `getLatestSeq()` (needed for `follow` to start at "now"). +- [ ] Optimize `poll` for single-consumer throughput. + +### Phase B: CLI Commands (`src/worker.nim`) +- [ ] Add `msg` dispatch group. +- [ ] Implement `send`, `poll`, `follow`. +- [ ] Ensure context detection (Am I in a worktree? Who am I?) works reliably. + +### Phase C: Tmux Integration (`src/worker/tmux.nim`?) +- [ ] Implement `watch` command using `std/osproc`. +- [ ] Handle "tmux not found" gracefully. + +### Phase D: Skill Integration +- [ ] Update `skills/hq/scripts/worker-rpc.py` to: + - Log to `.worker.log`. + - Emit `worker msg send` events for `agent_start`, `tool_use`, `error`. +- [ ] Update `skills/hq/SKILL.md` to use `worker watch`. + +## 5. Message Protocol Standard + +| Type | Payload Schema | Description | +|------|----------------|-------------| +| `status` | `{ phase: string, progress: float }` | High-level progress update. | +| `log` | `{ level: "info"|"warn"|"error", msg: string }` | Structured logging. | +| `signal` | `{ signal: "done"|"start"|"block" }` | Lifecycle signals. | +| `cmd` | `{ command: "stop"|"pause", reason: string }` | HQ commands to worker. | + +## 6. Open Questions +- **Retention:** How long do we keep messages? (SQLite handles millions easily, maybe prune > 30d). +- **Concurrency:** SQLite WAL mode handles this natively. +- **Auth:** Currently trusts local user. Sufficient for V1.