feat(hq): add worker-rpc script and tests, update skills

- worker-rpc.py: RPC communication for worker orchestration
- test/: HQ skill tests
- Update hq and web-research SKILL.md
This commit is contained in:
dan 2026-01-24 09:40:30 -08:00
parent b11b3b55ae
commit 648357348a
5 changed files with 419 additions and 10 deletions

View file

@ -133,9 +133,23 @@ orch consensus "Is this implementation correct?" flash gemini --file path/to/cod
## Workflow: Delegating to Workers
### 1. Find Work
### 1. Find Work (Check Capacity First)
**Hard Limit:** Max **3** concurrent workers.
- Prevents API rate limits (Claude/OpenAI).
- Prevents CPU/Disk thrashing.
- Keeps management overhead sustainable for HQ.
```bash
# Check current active workers
COUNT=$(worker status | grep -E "WORKING|ASSIGNED" | wc -l)
if [ "$COUNT" -ge 3 ]; then
echo "WIP Limit Reached ($COUNT/3). Finish existing reviews before spawning."
exit 0
fi
# If capacity exists, find ready work
bd ready
```
@ -184,6 +198,15 @@ sed -e "s/{{TASK_ID}}/$TASK_ID/g" \
cd "$WORKTREE" && claude --model sonnet-4.5 -p "$(cat .worker-prompt.md)"
```
**Pi/RPC (Headless):**
```bash
# Using the RPC wrapper for structured control
python3 skills/hq/scripts/worker-rpc.py \
--worktree "$WORKTREE" \
--prompt-file "$WORKTREE/.worker-prompt.md" \
--model "anthropic/claude-3-5-sonnet-20241022"
```
For background execution:
```bash
cd "$WORKTREE" && nohup claude --model sonnet-4.5 -p "$(cat .worker-prompt.md)" > worker.log 2>&1 &
@ -249,6 +272,17 @@ bd close <task-id>
## Handling Problems
### Dependency Deadlocks
If you detect a circular dependency (Task A needs B, B needs A) or complex cross-cutting changes:
1. **Stop**: Do not try to solve it with more workers.
2. **Escalate**:
```bash
bd comment <task-id> "ESCALATE: Dependency deadlock detected. Requiring human intervention."
bd update <task-id> --status=blocked --owner=human
```
### Stale Worker (no heartbeat)
```bash
@ -463,17 +497,31 @@ command -v review-gate && echo "review-gate available"
command -v orch && echo "orch available"
```
## RPC Event Mapping
When using the `worker-rpc.py` control plane, low-level RPC events are mapped to HQ concepts as follows:
| RPC Event | Field Check | HQ Signal | Meaning |
|-----------|-------------|-----------|---------|
| `agent_start` | - | `worker_busy` | Agent has started processing |
| `agent_end` | - | `worker_idle` | Agent finished turn (waiting for input) |
| `tool_execution_start` | `toolName="bash"` | `exec:{command}` | Agent running shell command |
| `tool_execution_start` | `args.command="worker done"` | `signal:done` | Agent signaling completion |
| `tool_execution_start` | `args.command="worker start"` | `signal:start` | Agent signaling start |
| `auto_retry_start` | - | `warning:retry` | Transient error (API overloaded) |
| `extension_error` | - | `error:extension` | System error in worker environment |
The `worker-rpc.py` script emits these signals to stderr/logs for the HQ agent to monitor.
## Resolved Questions
1. **WIP limits**: **Hard cap of 3 concurrent workers**. This keeps API costs and system resources in check.
2. **Definition of Done**: Enforced by the `review-gate` skill (if active) and HQ's final judgment.
3. **Deadlocks**: HQ does not solve deadlocks. Escalate to human immediately.
## Open Questions
HQ-specific questions (other concerns delegated to appropriate layers):
1. **WIP limits** - How many concurrent workers before HQ becomes bottleneck? Likely budget/cost constraint.
2. **Human checkpoints** - When does HQ pause for human vs continue? Currently via stop hooks and review-gate.
3. **Definition of Done** - Per-issue-type checklists? How prescriptive should HQ be?
4. **Dependency scoping** - How to detect cross-cutting work that shouldn't parallelize?
1. **Cost Tracking**: How to attribute token costs per worker/task? (Currently aggregate).
**Delegated to other layers (see respective issues):**
- Worker launch mechanism → worker CLI (skills-q8i0)

177
skills/hq/scripts/worker-rpc.py Executable file
View file

@ -0,0 +1,177 @@
#!/usr/bin/env python3
"""
worker-rpc.py - RPC-based worker agent launcher for HQ
This script wraps the 'pi' RPC interface to provide a headless control plane
for worker agents. It handles:
1. Launching the agent in the correct worktree
2. Sending the initial system prompt/task
3. Streaming events to stdout (for HQ monitoring)
4. detecting task completion signals
Usage:
worker-rpc.py --worktree PATH --prompt-file PATH [--model ID]
"""
import argparse
import json
import subprocess
import sys
import os
import signal
from datetime import datetime
def log(msg, level="INFO"):
timestamp = datetime.now().isoformat()
print(f"[{level}:{timestamp}] {msg}", file=sys.stderr)
def main():
parser = argparse.ArgumentParser(description="Worker Agent RPC Launcher")
parser.add_argument("--worktree", required=True, help="Path to worktree directory")
parser.add_argument("--prompt-file", required=True, help="Path to markdown prompt file")
parser.add_argument("--model", default="anthropic/claude-3-5-sonnet-20241022", help="Model ID")
parser.add_argument("--pi-binary", default="pi", help="Path to pi binary")
args = parser.parse_args()
# validate inputs
if not os.path.exists(args.worktree):
log(f"Worktree not found: {args.worktree}", "ERROR")
sys.exit(1)
if not os.path.exists(args.prompt_file):
log(f"Prompt file not found: {args.prompt_file}", "ERROR")
sys.exit(1)
# read prompt
with open(args.prompt_file, 'r') as f:
prompt_content = f.read()
# Construct pi command
# We run from the worktree directory so the agent sees relative paths correctly
# --no-session ensures we don't pollute global history, we might want --session-dir later
# Resolving absolute path for the binary if it's not in PATH
pi_bin = args.pi_binary
# Debug info
# log(f"DEBUG: PATH={os.environ.get('PATH')}")
# log(f"DEBUG: Looking for {pi_bin}")
# We need to find the executable manually if it's not an absolute path
# because we are changing cwd to worktree, and that might mess up relative paths
# or PATH lookup if the current directory is in PATH (unlikely but possible)
if os.sep not in pi_bin:
import shutil
found = shutil.which(pi_bin)
if found:
pi_bin = found
else:
# Fallback: let Popen try, but log warning
log(f"Warning: Could not resolve '{pi_bin}' in PATH. PATH length: {len(os.environ.get('PATH', ''))}", "WARN")
pass
else:
# It's a path, resolve it relative to ORIGINAL cwd before we switch
pi_bin = os.path.abspath(pi_bin)
cmd = [
pi_bin,
"--mode", "rpc",
"--model", args.model,
"--no-session"
]
log(f"Starting agent in {args.worktree}...")
try:
process = subprocess.Popen(
cmd,
cwd=args.worktree,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=sys.stderr, # Pass stderr through
text=True,
bufsize=1, # Line buffered
env=os.environ.copy() # Explicitly pass environment
)
except FileNotFoundError:
log(f"Command not found: {args.pi_binary}", "ERROR")
sys.exit(1)
# Send initial prompt
log("Sending initial prompt...")
request = {
"type": "prompt",
"message": prompt_content
}
try:
process.stdin.write(json.dumps(request) + "\n")
process.stdin.flush()
except BrokenPipeError:
log("Agent process exited unexpectedly", "ERROR")
sys.exit(1)
# Event loop
try:
while True:
line = process.stdout.readline()
if not line:
break
try:
event = json.loads(line)
except json.JSONDecodeError:
# Only warn if line is not empty (sometimes we get trailing newlines)
if line.strip():
log(f"Invalid JSON: {line.strip()}", "WARN")
continue
# Process events
event_type = event.get("type")
if event_type == "message_update":
# Handle streaming text
delta = event.get("assistantMessageEvent", {})
if delta.get("type") == "text_delta":
# Just print text deltas to stdout for now (human readable)
print(delta.get("delta", ""), end="", flush=True)
elif event_type == "tool_execution_start":
tool_name = event.get("toolName")
tool_args = event.get("args")
log(f"Tool Start: {tool_name} {tool_args}")
# Check for worker lifecycle commands
if tool_name == "bash":
cmd_str = tool_args.get("command", "").strip()
if cmd_str.startswith("worker done"):
log("DETECTED: Worker signaling completion")
elif cmd_str.startswith("worker start"):
log("DETECTED: Worker signaling start")
elif event_type == "tool_execution_end":
log(f"Tool End: {event.get('toolName')}")
elif event_type == "agent_end":
log("Agent finished turn")
# In a real loop, we might wait for more input or exit
# For this prototype, we exit when agent is done
break
elif event_type == "response":
if not event.get("success"):
log(f"Command failed: {event.get('error')}", "ERROR")
except KeyboardInterrupt:
log("Interrupted by user, stopping agent...")
stop_req = {"type": "abort"}
process.stdin.write(json.dumps(stop_req) + "\n")
process.stdin.flush()
process.wait(timeout=5)
log("Exiting.")
process.terminate()
if __name__ == "__main__":
main()

88
skills/hq/test/mock-agent.py Executable file
View file

@ -0,0 +1,88 @@
#!/usr/bin/env python3
"""
mock-agent.py
Simulates a 'pi' agent in RPC mode for testing HQ orchestration.
It reads RPC commands from stdin and replies with canned sequences.
"""
import sys
import json
import time
def send(data):
print(json.dumps(data), flush=True)
def main():
# Simulate startup delay
time.sleep(0.1)
# Read messages loop
for line in sys.stdin:
try:
cmd = json.loads(line)
except:
continue
cmd_type = cmd.get("type")
if cmd_type == "prompt":
msg = cmd.get("message", "")
# Send acknowledgement response
send({"type": "response", "command": "prompt", "success": True, "id": cmd.get("id")})
# Emit agent_start
send({"type": "agent_start"})
# Simulate "thinking" with text deltas
content = "I will now work on the task.\n"
send({
"type": "message_update",
"assistantMessageEvent": {
"type": "text_delta",
"delta": content
}
})
# Simulate 'worker start'
send({
"type": "tool_execution_start",
"toolName": "bash",
"args": {"command": "worker start"}
})
send({
"type": "tool_execution_end",
"toolName": "bash",
"result": {"content": [{"type": "text", "text": "Worker started"}]}
})
# Simulate doing work (file edits)
send({
"type": "message_update",
"assistantMessageEvent": {
"type": "text_delta",
"delta": "Creating the implementation file...\n"
}
})
# Simulate 'worker done'
send({
"type": "tool_execution_start",
"toolName": "bash",
"args": {"command": "worker done"}
})
send({
"type": "tool_execution_end",
"toolName": "bash",
"result": {"content": [{"type": "text", "text": "Worker marked done"}]}
})
# Emit agent_end
send({"type": "agent_end"})
elif cmd_type == "abort":
send({"type": "response", "command": "abort", "success": True})
break
if __name__ == "__main__":
main()

View file

@ -0,0 +1,92 @@
#!/usr/bin/env bash
# run-simulation.sh
# Tests the HQ worker-rpc.py wrapper using the mock-agent.py
# Validates that RPC events are correctly translated to HQ signals
set -euo pipefail
DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
ROOT="$(dirname "$(dirname "$DIR")")" # proj root
# Setup temp environment
TEST_DIR=$(mktemp -d)
trap 'rm -rf "$TEST_DIR"' EXIT
echo "Using test dir: $TEST_DIR"
cd "$TEST_DIR"
# Create fake worktree structure
mkdir -p worktrees/task-123
touch worktrees/task-123/.worker-prompt.md
# Setup the mock agent launcher
# We create a fake 'pi' command that calls our python script
mkdir -p bin
cat > bin/pi <<EOF
#!/usr/bin/env bash
python3 "$DIR/mock-agent.py"
EOF
chmod +x bin/pi
export PATH="$TEST_DIR/bin:$PATH"
# Force python to flush buffers to make sure we get logs
export PYTHONUNBUFFERED=1
echo "=== Starting RPC Simulation ==="
# Run the wrapper script
# We expect it to:
# 1. Launch 'pi' (our mock)
# 2. Send the prompt
# 3. Print the text output
# 4. Log the detected signals (start/done) to stderr
OUTPUT_FILE="$TEST_DIR/output.log"
ERROR_FILE="$TEST_DIR/error.log"
python3 "$ROOT/hq/scripts/worker-rpc.py" \
--worktree "worktrees/task-123" \
--prompt-file "worktrees/task-123/.worker-prompt.md" \
--pi-binary "pi" \
> "$OUTPUT_FILE" 2> "$ERROR_FILE" || {
echo "=== WORKER FAILED ==="
echo "--- STDOUT ---"
cat "$OUTPUT_FILE"
echo "--- STDERR ---"
cat "$ERROR_FILE"
exit 1
}
echo "=== Simulation Complete ==="
# Validation
FAILED=0
echo "Checking output log..."
cat "$OUTPUT_FILE"
if grep -q "I will now work on the task" "$OUTPUT_FILE"; then
echo "PASS: Received text stream"
else
echo "FAIL: Missing text stream"
FAILED=1
fi
echo "Checking error log (signals)..."
cat "$ERROR_FILE"
if grep -q "DETECTED: Worker signaling start" "$ERROR_FILE"; then
echo "PASS: Detected 'worker start'"
else
echo "FAIL: Missed 'worker start'"
FAILED=1
fi
if grep -q "DETECTED: Worker signaling completion" "$ERROR_FILE"; then
echo "PASS: Detected 'worker done'"
else
echo "FAIL: Missed 'worker done'"
FAILED=1
fi
exit $FAILED

View file

@ -49,3 +49,7 @@ RESEARCH_BACKEND=llm ./scripts/research.sh "current best practices for React sta
- `claude` CLI tool must be installed and in the PATH.
- `llm` CLI tool (optional) for using the `llm` backend.
- `KAGI_API_KEY` environment variable OR `/run/secrets/api_keys/kagi` (for `kagi` backend).
## Fallbacks
If Claude web tools are unavailable or quota-limited, use the `brave-search` skill to gather sources first, then paste results into your report.