4.2 KiB
4.2 KiB
Worker Message Bus & Observability Design Spec
Status: Draft Epic: skills-29bp Related Issues: skills-imei
1. Problem Statement
The current worker orchestration relies on beads comments for communication. This has several drawbacks:
- High Latency:
bdsyncs with git, making real-time chat slow (seconds to minutes). - Unstructured: Comments are raw text; parsing signals like
[WORKER:status]is brittle. - Observability: No easy way to "watch" a worker's internal state or logs without manually tailing files.
2. Goals
- Low-Latency Messaging: Sub-second communication between HQ and Workers.
- Structured Data: Messages carry JSON payloads (progress, file lists, errors).
- Control Room: Real-time TUI observability via
tmuxintegration. - Agent Neutrality: Works with any agent (Claude, Pi, OpenCode) via CLI tools.
3. Architecture
3.1 The Bus (SQLite)
We will leverage the existing (but unused) messages table in .worker-state/bus.db.
Schema (Existing in db.nim):
CREATE TABLE messages (
seq INTEGER PRIMARY KEY AUTOINCREMENT,
id TEXT NOT NULL UNIQUE,
ts_ms INTEGER NOT NULL,
from_agent TEXT NOT NULL,
to_agent TEXT,
type TEXT NOT NULL,
payload TEXT -- JSON
);
CREATE TABLE cursors (
agent_id TEXT PRIMARY KEY,
last_acked_seq INTEGER NOT NULL DEFAULT 0
);
3.2 CLI Interface (worker msg)
We will add a msg subcommand group to the worker binary.
worker msg send <type> [payload]
Sends a message to the bus.
- Automatic Metadata:
from_agent: Defaults to current task ID (if in worktree) or "hq".ts_ms: Current timestamp.
- Payload: Accepts raw JSON string or
@filename.json. - Example:
worker msg send status '{"progress": 0.5, "step": "tests"}' worker msg send cmd '{"action": "stop"}' --to skills-123
worker msg poll
Fetches unread messages for the current agent.
- Behavior: Reads from
last_acked_seq, returns messages, updates cursor. - Output: JSON Lines (one message per line).
- Example:
# Returns stream of new messages worker msg poll
worker msg follow [--task <id>]
Tails the message bus in real-time (like tail -f).
- Behavior: Stateless tailing (does not update cursors).
- Use Case: Human monitoring or
tmuxdashboards.
3.3 Observability (worker watch)
A new command to spawn a "Control Room" view in tmux.
Command: worker watch <task-id>
Logic:
- Check if
tmuxis running. - Create a new window named
w:<task-id>. - Split window vertically (70% top, 30% bottom).
- Top Pane:
tail -f worktrees/<task-id>/.worker.log(Agent logs). - Bottom Pane:
worker msg follow --task <task-id>(Structured events).
4. Implementation Plan
Phase A: Database Logic (src/worker/db.nim)
- Verify
publishhandles JSON serialization correctly. - Implement
getLatestSeq()(needed forfollowto start at "now"). - Optimize
pollfor single-consumer throughput.
Phase B: CLI Commands (src/worker.nim)
- Add
msgdispatch group. - Implement
send,poll,follow. - Ensure context detection (Am I in a worktree? Who am I?) works reliably.
Phase C: Tmux Integration (src/worker/tmux.nim?)
- Implement
watchcommand usingstd/osproc. - Handle "tmux not found" gracefully.
Phase D: Skill Integration
- Update
skills/hq/scripts/worker-rpc.pyto:- Log to
.worker.log. - Emit
worker msg sendevents foragent_start,tool_use,error.
- Log to
- Update
skills/hq/SKILL.mdto useworker watch.
5. Message Protocol Standard
| Type | Payload Schema | Description |
|---|---|---|
status |
{ phase: string, progress: float } |
High-level progress update. |
log |
`{ level: "info" | "warn" |
signal |
`{ signal: "done" | "start" |
cmd |
`{ command: "stop" | "pause", reason: string }` |
6. Open Questions
- Retention: How long do we keep messages? (SQLite handles millions easily, maybe prune > 30d).
- Concurrency: SQLite WAL mode handles this natively.
- Auth: Currently trusts local user. Sufficient for V1.