# Worker Message Bus & Observability Design Spec **Status:** Draft **Epic:** skills-29bp **Related Issues:** skills-imei ## 1. Problem Statement The current worker orchestration relies on `beads` comments for communication. This has several drawbacks: - **High Latency:** `bd` syncs with git, making real-time chat slow (seconds to minutes). - **Unstructured:** Comments are raw text; parsing signals like `[WORKER:status]` is brittle. - **Observability:** No easy way to "watch" a worker's internal state or logs without manually tailing files. ## 2. Goals 1. **Low-Latency Messaging:** Sub-second communication between HQ and Workers. 2. **Structured Data:** Messages carry JSON payloads (progress, file lists, errors). 3. **Control Room:** Real-time TUI observability via `tmux` integration. 4. **Agent Neutrality:** Works with any agent (Claude, Pi, OpenCode) via CLI tools. ## 3. Architecture ### 3.1 The Bus (SQLite) We will leverage the existing (but unused) `messages` table in `.worker-state/bus.db`. **Schema (Existing in `db.nim`):** ```sql CREATE TABLE messages ( seq INTEGER PRIMARY KEY AUTOINCREMENT, id TEXT NOT NULL UNIQUE, ts_ms INTEGER NOT NULL, from_agent TEXT NOT NULL, to_agent TEXT, type TEXT NOT NULL, payload TEXT -- JSON ); CREATE TABLE cursors ( agent_id TEXT PRIMARY KEY, last_acked_seq INTEGER NOT NULL DEFAULT 0 ); ``` ### 3.2 CLI Interface (`worker msg`) We will add a `msg` subcommand group to the `worker` binary. #### `worker msg send [payload]` Sends a message to the bus. - **Automatic Metadata:** - `from_agent`: Defaults to current task ID (if in worktree) or "hq". - `ts_ms`: Current timestamp. - **Payload:** Accepts raw JSON string or `@filename.json`. - **Example:** ```bash worker msg send status '{"progress": 0.5, "step": "tests"}' worker msg send cmd '{"action": "stop"}' --to skills-123 ``` #### `worker msg poll` Fetches unread messages for the current agent. - **Behavior:** Reads from `last_acked_seq`, returns messages, updates cursor. - **Output:** JSON Lines (one message per line). - **Example:** ```bash # Returns stream of new messages worker msg poll ``` #### `worker msg follow [--task ]` Tails the message bus in real-time (like `tail -f`). - **Behavior:** Stateless tailing (does not update cursors). - **Use Case:** Human monitoring or `tmux` dashboards. ### 3.3 Observability (`worker watch`) A new command to spawn a "Control Room" view in `tmux`. **Command:** `worker watch ` **Logic:** 1. Check if `tmux` is running. 2. Create a new window named `w:`. 3. Split window vertically (70% top, 30% bottom). 4. **Top Pane:** `tail -f worktrees//.worker.log` (Agent logs). 5. **Bottom Pane:** `worker msg follow --task ` (Structured events). ## 4. Implementation Plan ### Phase A: Database Logic (`src/worker/db.nim`) - [ ] Verify `publish` handles JSON serialization correctly. - [ ] Implement `getLatestSeq()` (needed for `follow` to start at "now"). - [ ] Optimize `poll` for single-consumer throughput. ### Phase B: CLI Commands (`src/worker.nim`) - [ ] Add `msg` dispatch group. - [ ] Implement `send`, `poll`, `follow`. - [ ] Ensure context detection (Am I in a worktree? Who am I?) works reliably. ### Phase C: Tmux Integration (`src/worker/tmux.nim`?) - [ ] Implement `watch` command using `std/osproc`. - [ ] Handle "tmux not found" gracefully. ### Phase D: Skill Integration - [ ] Update `skills/hq/scripts/worker-rpc.py` to: - Log to `.worker.log`. - Emit `worker msg send` events for `agent_start`, `tool_use`, `error`. - [ ] Update `skills/hq/SKILL.md` to use `worker watch`. ## 5. Message Protocol Standard | Type | Payload Schema | Description | |------|----------------|-------------| | `status` | `{ phase: string, progress: float }` | High-level progress update. | | `log` | `{ level: "info"|"warn"|"error", msg: string }` | Structured logging. | | `signal` | `{ signal: "done"|"start"|"block" }` | Lifecycle signals. | | `cmd` | `{ command: "stop"|"pause", reason: string }` | HQ commands to worker. | ## 6. Open Questions - **Retention:** How long do we keep messages? (SQLite handles millions easily, maybe prune > 30d). - **Concurrency:** SQLite WAL mode handles this natively. - **Auth:** Currently trusts local user. Sufficient for V1.