From 648357348a3857d1a84414692dc0fde8e1a4b56d Mon Sep 17 00:00:00 2001 From: dan Date: Sat, 24 Jan 2026 09:40:30 -0800 Subject: [PATCH] feat(hq): add worker-rpc script and tests, update skills - worker-rpc.py: RPC communication for worker orchestration - test/: HQ skill tests - Update hq and web-research SKILL.md --- skills/hq/SKILL.md | 68 ++++++++++-- skills/hq/scripts/worker-rpc.py | 177 +++++++++++++++++++++++++++++++ skills/hq/test/mock-agent.py | 88 +++++++++++++++ skills/hq/test/run-simulation.sh | 92 ++++++++++++++++ skills/web-research/SKILL.md | 4 + 5 files changed, 419 insertions(+), 10 deletions(-) create mode 100755 skills/hq/scripts/worker-rpc.py create mode 100755 skills/hq/test/mock-agent.py create mode 100755 skills/hq/test/run-simulation.sh diff --git a/skills/hq/SKILL.md b/skills/hq/SKILL.md index aca8611..4cdbb04 100644 --- a/skills/hq/SKILL.md +++ b/skills/hq/SKILL.md @@ -133,9 +133,23 @@ orch consensus "Is this implementation correct?" flash gemini --file path/to/cod ## Workflow: Delegating to Workers -### 1. Find Work +### 1. Find Work (Check Capacity First) + +**Hard Limit:** Max **3** concurrent workers. +- Prevents API rate limits (Claude/OpenAI). +- Prevents CPU/Disk thrashing. +- Keeps management overhead sustainable for HQ. ```bash +# Check current active workers +COUNT=$(worker status | grep -E "WORKING|ASSIGNED" | wc -l) + +if [ "$COUNT" -ge 3 ]; then + echo "WIP Limit Reached ($COUNT/3). Finish existing reviews before spawning." + exit 0 +fi + +# If capacity exists, find ready work bd ready ``` @@ -184,6 +198,15 @@ sed -e "s/{{TASK_ID}}/$TASK_ID/g" \ cd "$WORKTREE" && claude --model sonnet-4.5 -p "$(cat .worker-prompt.md)" ``` +**Pi/RPC (Headless):** +```bash +# Using the RPC wrapper for structured control +python3 skills/hq/scripts/worker-rpc.py \ + --worktree "$WORKTREE" \ + --prompt-file "$WORKTREE/.worker-prompt.md" \ + --model "anthropic/claude-3-5-sonnet-20241022" +``` + For background execution: ```bash cd "$WORKTREE" && nohup claude --model sonnet-4.5 -p "$(cat .worker-prompt.md)" > worker.log 2>&1 & @@ -249,6 +272,17 @@ bd close ## Handling Problems +### Dependency Deadlocks + +If you detect a circular dependency (Task A needs B, B needs A) or complex cross-cutting changes: + +1. **Stop**: Do not try to solve it with more workers. +2. **Escalate**: + ```bash + bd comment "ESCALATE: Dependency deadlock detected. Requiring human intervention." + bd update --status=blocked --owner=human + ``` + ### Stale Worker (no heartbeat) ```bash @@ -463,17 +497,31 @@ command -v review-gate && echo "review-gate available" command -v orch && echo "orch available" ``` +## RPC Event Mapping + +When using the `worker-rpc.py` control plane, low-level RPC events are mapped to HQ concepts as follows: + +| RPC Event | Field Check | HQ Signal | Meaning | +|-----------|-------------|-----------|---------| +| `agent_start` | - | `worker_busy` | Agent has started processing | +| `agent_end` | - | `worker_idle` | Agent finished turn (waiting for input) | +| `tool_execution_start` | `toolName="bash"` | `exec:{command}` | Agent running shell command | +| `tool_execution_start` | `args.command="worker done"` | `signal:done` | Agent signaling completion | +| `tool_execution_start` | `args.command="worker start"` | `signal:start` | Agent signaling start | +| `auto_retry_start` | - | `warning:retry` | Transient error (API overloaded) | +| `extension_error` | - | `error:extension` | System error in worker environment | + +The `worker-rpc.py` script emits these signals to stderr/logs for the HQ agent to monitor. + +## Resolved Questions + +1. **WIP limits**: **Hard cap of 3 concurrent workers**. This keeps API costs and system resources in check. +2. **Definition of Done**: Enforced by the `review-gate` skill (if active) and HQ's final judgment. +3. **Deadlocks**: HQ does not solve deadlocks. Escalate to human immediately. + ## Open Questions -HQ-specific questions (other concerns delegated to appropriate layers): - -1. **WIP limits** - How many concurrent workers before HQ becomes bottleneck? Likely budget/cost constraint. - -2. **Human checkpoints** - When does HQ pause for human vs continue? Currently via stop hooks and review-gate. - -3. **Definition of Done** - Per-issue-type checklists? How prescriptive should HQ be? - -4. **Dependency scoping** - How to detect cross-cutting work that shouldn't parallelize? +1. **Cost Tracking**: How to attribute token costs per worker/task? (Currently aggregate). **Delegated to other layers (see respective issues):** - Worker launch mechanism → worker CLI (skills-q8i0) diff --git a/skills/hq/scripts/worker-rpc.py b/skills/hq/scripts/worker-rpc.py new file mode 100755 index 0000000..c35deb1 --- /dev/null +++ b/skills/hq/scripts/worker-rpc.py @@ -0,0 +1,177 @@ +#!/usr/bin/env python3 +""" +worker-rpc.py - RPC-based worker agent launcher for HQ + +This script wraps the 'pi' RPC interface to provide a headless control plane +for worker agents. It handles: +1. Launching the agent in the correct worktree +2. Sending the initial system prompt/task +3. Streaming events to stdout (for HQ monitoring) +4. detecting task completion signals + +Usage: + worker-rpc.py --worktree PATH --prompt-file PATH [--model ID] +""" + +import argparse +import json +import subprocess +import sys +import os +import signal +from datetime import datetime + +def log(msg, level="INFO"): + timestamp = datetime.now().isoformat() + print(f"[{level}:{timestamp}] {msg}", file=sys.stderr) + +def main(): + parser = argparse.ArgumentParser(description="Worker Agent RPC Launcher") + parser.add_argument("--worktree", required=True, help="Path to worktree directory") + parser.add_argument("--prompt-file", required=True, help="Path to markdown prompt file") + parser.add_argument("--model", default="anthropic/claude-3-5-sonnet-20241022", help="Model ID") + parser.add_argument("--pi-binary", default="pi", help="Path to pi binary") + args = parser.parse_args() + + # validate inputs + if not os.path.exists(args.worktree): + log(f"Worktree not found: {args.worktree}", "ERROR") + sys.exit(1) + + if not os.path.exists(args.prompt_file): + log(f"Prompt file not found: {args.prompt_file}", "ERROR") + sys.exit(1) + + # read prompt + with open(args.prompt_file, 'r') as f: + prompt_content = f.read() + + # Construct pi command + # We run from the worktree directory so the agent sees relative paths correctly + # --no-session ensures we don't pollute global history, we might want --session-dir later + + # Resolving absolute path for the binary if it's not in PATH + pi_bin = args.pi_binary + + # Debug info + # log(f"DEBUG: PATH={os.environ.get('PATH')}") + # log(f"DEBUG: Looking for {pi_bin}") + + # We need to find the executable manually if it's not an absolute path + # because we are changing cwd to worktree, and that might mess up relative paths + # or PATH lookup if the current directory is in PATH (unlikely but possible) + + if os.sep not in pi_bin: + import shutil + found = shutil.which(pi_bin) + if found: + pi_bin = found + else: + # Fallback: let Popen try, but log warning + log(f"Warning: Could not resolve '{pi_bin}' in PATH. PATH length: {len(os.environ.get('PATH', ''))}", "WARN") + pass + else: + # It's a path, resolve it relative to ORIGINAL cwd before we switch + pi_bin = os.path.abspath(pi_bin) + + cmd = [ + pi_bin, + "--mode", "rpc", + "--model", args.model, + "--no-session" + ] + + log(f"Starting agent in {args.worktree}...") + + try: + process = subprocess.Popen( + cmd, + cwd=args.worktree, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=sys.stderr, # Pass stderr through + text=True, + bufsize=1, # Line buffered + env=os.environ.copy() # Explicitly pass environment + ) + except FileNotFoundError: + log(f"Command not found: {args.pi_binary}", "ERROR") + sys.exit(1) + + # Send initial prompt + log("Sending initial prompt...") + request = { + "type": "prompt", + "message": prompt_content + } + + try: + process.stdin.write(json.dumps(request) + "\n") + process.stdin.flush() + except BrokenPipeError: + log("Agent process exited unexpectedly", "ERROR") + sys.exit(1) + + # Event loop + try: + while True: + line = process.stdout.readline() + if not line: + break + + try: + event = json.loads(line) + except json.JSONDecodeError: + # Only warn if line is not empty (sometimes we get trailing newlines) + if line.strip(): + log(f"Invalid JSON: {line.strip()}", "WARN") + continue + + # Process events + event_type = event.get("type") + + if event_type == "message_update": + # Handle streaming text + delta = event.get("assistantMessageEvent", {}) + if delta.get("type") == "text_delta": + # Just print text deltas to stdout for now (human readable) + print(delta.get("delta", ""), end="", flush=True) + + elif event_type == "tool_execution_start": + tool_name = event.get("toolName") + tool_args = event.get("args") + log(f"Tool Start: {tool_name} {tool_args}") + + # Check for worker lifecycle commands + if tool_name == "bash": + cmd_str = tool_args.get("command", "").strip() + if cmd_str.startswith("worker done"): + log("DETECTED: Worker signaling completion") + elif cmd_str.startswith("worker start"): + log("DETECTED: Worker signaling start") + + elif event_type == "tool_execution_end": + log(f"Tool End: {event.get('toolName')}") + + elif event_type == "agent_end": + log("Agent finished turn") + # In a real loop, we might wait for more input or exit + # For this prototype, we exit when agent is done + break + + elif event_type == "response": + if not event.get("success"): + log(f"Command failed: {event.get('error')}", "ERROR") + + except KeyboardInterrupt: + log("Interrupted by user, stopping agent...") + stop_req = {"type": "abort"} + process.stdin.write(json.dumps(stop_req) + "\n") + process.stdin.flush() + process.wait(timeout=5) + + log("Exiting.") + process.terminate() + +if __name__ == "__main__": + main() diff --git a/skills/hq/test/mock-agent.py b/skills/hq/test/mock-agent.py new file mode 100755 index 0000000..416dc94 --- /dev/null +++ b/skills/hq/test/mock-agent.py @@ -0,0 +1,88 @@ +#!/usr/bin/env python3 +""" +mock-agent.py +Simulates a 'pi' agent in RPC mode for testing HQ orchestration. +It reads RPC commands from stdin and replies with canned sequences. +""" + +import sys +import json +import time + +def send(data): + print(json.dumps(data), flush=True) + +def main(): + # Simulate startup delay + time.sleep(0.1) + + # Read messages loop + for line in sys.stdin: + try: + cmd = json.loads(line) + except: + continue + + cmd_type = cmd.get("type") + + if cmd_type == "prompt": + msg = cmd.get("message", "") + + # Send acknowledgement response + send({"type": "response", "command": "prompt", "success": True, "id": cmd.get("id")}) + + # Emit agent_start + send({"type": "agent_start"}) + + # Simulate "thinking" with text deltas + content = "I will now work on the task.\n" + send({ + "type": "message_update", + "assistantMessageEvent": { + "type": "text_delta", + "delta": content + } + }) + + # Simulate 'worker start' + send({ + "type": "tool_execution_start", + "toolName": "bash", + "args": {"command": "worker start"} + }) + send({ + "type": "tool_execution_end", + "toolName": "bash", + "result": {"content": [{"type": "text", "text": "Worker started"}]} + }) + + # Simulate doing work (file edits) + send({ + "type": "message_update", + "assistantMessageEvent": { + "type": "text_delta", + "delta": "Creating the implementation file...\n" + } + }) + + # Simulate 'worker done' + send({ + "type": "tool_execution_start", + "toolName": "bash", + "args": {"command": "worker done"} + }) + send({ + "type": "tool_execution_end", + "toolName": "bash", + "result": {"content": [{"type": "text", "text": "Worker marked done"}]} + }) + + # Emit agent_end + send({"type": "agent_end"}) + + elif cmd_type == "abort": + send({"type": "response", "command": "abort", "success": True}) + break + +if __name__ == "__main__": + main() diff --git a/skills/hq/test/run-simulation.sh b/skills/hq/test/run-simulation.sh new file mode 100755 index 0000000..a87d23c --- /dev/null +++ b/skills/hq/test/run-simulation.sh @@ -0,0 +1,92 @@ +#!/usr/bin/env bash +# run-simulation.sh +# Tests the HQ worker-rpc.py wrapper using the mock-agent.py +# Validates that RPC events are correctly translated to HQ signals + +set -euo pipefail + +DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +ROOT="$(dirname "$(dirname "$DIR")")" # proj root + +# Setup temp environment +TEST_DIR=$(mktemp -d) +trap 'rm -rf "$TEST_DIR"' EXIT + +echo "Using test dir: $TEST_DIR" +cd "$TEST_DIR" + +# Create fake worktree structure +mkdir -p worktrees/task-123 +touch worktrees/task-123/.worker-prompt.md + +# Setup the mock agent launcher +# We create a fake 'pi' command that calls our python script +mkdir -p bin +cat > bin/pi < "$OUTPUT_FILE" 2> "$ERROR_FILE" || { + echo "=== WORKER FAILED ===" + echo "--- STDOUT ---" + cat "$OUTPUT_FILE" + echo "--- STDERR ---" + cat "$ERROR_FILE" + exit 1 + } + +echo "=== Simulation Complete ===" + +# Validation +FAILED=0 + +echo "Checking output log..." +cat "$OUTPUT_FILE" +if grep -q "I will now work on the task" "$OUTPUT_FILE"; then + echo "PASS: Received text stream" +else + echo "FAIL: Missing text stream" + FAILED=1 +fi + +echo "Checking error log (signals)..." +cat "$ERROR_FILE" + +if grep -q "DETECTED: Worker signaling start" "$ERROR_FILE"; then + echo "PASS: Detected 'worker start'" +else + echo "FAIL: Missed 'worker start'" + FAILED=1 +fi + +if grep -q "DETECTED: Worker signaling completion" "$ERROR_FILE"; then + echo "PASS: Detected 'worker done'" +else + echo "FAIL: Missed 'worker done'" + FAILED=1 +fi + +exit $FAILED diff --git a/skills/web-research/SKILL.md b/skills/web-research/SKILL.md index 65a5345..e4ea68e 100644 --- a/skills/web-research/SKILL.md +++ b/skills/web-research/SKILL.md @@ -49,3 +49,7 @@ RESEARCH_BACKEND=llm ./scripts/research.sh "current best practices for React sta - `claude` CLI tool must be installed and in the PATH. - `llm` CLI tool (optional) for using the `llm` backend. - `KAGI_API_KEY` environment variable OR `/run/secrets/api_keys/kagi` (for `kagi` backend). + +## Fallbacks + +If Claude web tools are unavailable or quota-limited, use the `brave-search` skill to gather sources first, then paste results into your report.