docs(specs): add worker message bus specification

This commit is contained in:
dan 2026-01-24 09:40:49 -08:00
parent 131fb86852
commit 0253e6b1d6

119
specs/worker-message-bus.md Normal file
View file

@ -0,0 +1,119 @@
# Worker Message Bus & Observability Design Spec
**Status:** Draft
**Epic:** skills-29bp
**Related Issues:** skills-imei
## 1. Problem Statement
The current worker orchestration relies on `beads` comments for communication. This has several drawbacks:
- **High Latency:** `bd` syncs with git, making real-time chat slow (seconds to minutes).
- **Unstructured:** Comments are raw text; parsing signals like `[WORKER:status]` is brittle.
- **Observability:** No easy way to "watch" a worker's internal state or logs without manually tailing files.
## 2. Goals
1. **Low-Latency Messaging:** Sub-second communication between HQ and Workers.
2. **Structured Data:** Messages carry JSON payloads (progress, file lists, errors).
3. **Control Room:** Real-time TUI observability via `tmux` integration.
4. **Agent Neutrality:** Works with any agent (Claude, Pi, OpenCode) via CLI tools.
## 3. Architecture
### 3.1 The Bus (SQLite)
We will leverage the existing (but unused) `messages` table in `.worker-state/bus.db`.
**Schema (Existing in `db.nim`):**
```sql
CREATE TABLE messages (
seq INTEGER PRIMARY KEY AUTOINCREMENT,
id TEXT NOT NULL UNIQUE,
ts_ms INTEGER NOT NULL,
from_agent TEXT NOT NULL,
to_agent TEXT,
type TEXT NOT NULL,
payload TEXT -- JSON
);
CREATE TABLE cursors (
agent_id TEXT PRIMARY KEY,
last_acked_seq INTEGER NOT NULL DEFAULT 0
);
```
### 3.2 CLI Interface (`worker msg`)
We will add a `msg` subcommand group to the `worker` binary.
#### `worker msg send <type> [payload]`
Sends a message to the bus.
- **Automatic Metadata:**
- `from_agent`: Defaults to current task ID (if in worktree) or "hq".
- `ts_ms`: Current timestamp.
- **Payload:** Accepts raw JSON string or `@filename.json`.
- **Example:**
```bash
worker msg send status '{"progress": 0.5, "step": "tests"}'
worker msg send cmd '{"action": "stop"}' --to skills-123
```
#### `worker msg poll`
Fetches unread messages for the current agent.
- **Behavior:** Reads from `last_acked_seq`, returns messages, updates cursor.
- **Output:** JSON Lines (one message per line).
- **Example:**
```bash
# Returns stream of new messages
worker msg poll
```
#### `worker msg follow [--task <id>]`
Tails the message bus in real-time (like `tail -f`).
- **Behavior:** Stateless tailing (does not update cursors).
- **Use Case:** Human monitoring or `tmux` dashboards.
### 3.3 Observability (`worker watch`)
A new command to spawn a "Control Room" view in `tmux`.
**Command:** `worker watch <task-id>`
**Logic:**
1. Check if `tmux` is running.
2. Create a new window named `w:<task-id>`.
3. Split window vertically (70% top, 30% bottom).
4. **Top Pane:** `tail -f worktrees/<task-id>/.worker.log` (Agent logs).
5. **Bottom Pane:** `worker msg follow --task <task-id>` (Structured events).
## 4. Implementation Plan
### Phase A: Database Logic (`src/worker/db.nim`)
- [ ] Verify `publish` handles JSON serialization correctly.
- [ ] Implement `getLatestSeq()` (needed for `follow` to start at "now").
- [ ] Optimize `poll` for single-consumer throughput.
### Phase B: CLI Commands (`src/worker.nim`)
- [ ] Add `msg` dispatch group.
- [ ] Implement `send`, `poll`, `follow`.
- [ ] Ensure context detection (Am I in a worktree? Who am I?) works reliably.
### Phase C: Tmux Integration (`src/worker/tmux.nim`?)
- [ ] Implement `watch` command using `std/osproc`.
- [ ] Handle "tmux not found" gracefully.
### Phase D: Skill Integration
- [ ] Update `skills/hq/scripts/worker-rpc.py` to:
- Log to `.worker.log`.
- Emit `worker msg send` events for `agent_start`, `tool_use`, `error`.
- [ ] Update `skills/hq/SKILL.md` to use `worker watch`.
## 5. Message Protocol Standard
| Type | Payload Schema | Description |
|------|----------------|-------------|
| `status` | `{ phase: string, progress: float }` | High-level progress update. |
| `log` | `{ level: "info"|"warn"|"error", msg: string }` | Structured logging. |
| `signal` | `{ signal: "done"|"start"|"block" }` | Lifecycle signals. |
| `cmd` | `{ command: "stop"|"pause", reason: string }` | HQ commands to worker. |
## 6. Open Questions
- **Retention:** How long do we keep messages? (SQLite handles millions easily, maybe prune > 30d).
- **Concurrency:** SQLite WAL mode handles this natively.
- **Auth:** Currently trusts local user. Sufficient for V1.