skills/src/worker.nim
dan 48ec6cde83 fix(worker): address code review findings
- Fix rollback to handle partial branch creation (skills-yylq)
  - Pre-compute branch/worktree names before createWorktree
  - Use gitBranchExists() and dirExists() for robust cleanup
- Add step context to error messages (skills-ux6h)
  - Track currentStep through spawn process
  - Error now shows which step failed
- Deduplicate success output block (skills-qjln)
  - Consolidated to single block with conditional review line
- Simplify use-skills.sh auth symlink (skills-475o)
  - One-liner with || instead of nested if
- Fix inconsistent default branch in git.nim (skills-fext)
  - Changed default from "origin/integration" to "main"

Closes skills-yylq, skills-ux6h, skills-qjln, skills-475o, skills-fext

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-15 09:37:37 -08:00

452 lines
14 KiB
Nim

## Worker CLI - Multi-agent coordination tool
##
## Usage:
## worker spawn <task-id> Create a new worker workspace
## worker status Show dashboard of all workers
## worker start Signal ASSIGNED → WORKING
## worker done Signal WORKING → IN_REVIEW
## worker approve <task-id> Approve completed work
## worker merge <task-id> Merge approved work
## worker cancel <task-id> Abort a worker
import std/[os, strformat, json, times, terminal, strutils, sequtils]
import cligen
import tiny_sqlite
import worker/[types, db, state, git, context, heartbeat, utils, review]
# -----------------------------------------------------------------------------
# Orchestrator Commands
# -----------------------------------------------------------------------------
proc spawn(taskId: string, description: string = "",
fromBranch: string = "main", noFetch: bool = false) =
## Create a new worker workspace
discard validateTaskId(taskId)
let db = openBusDb()
defer: db.close()
# Check if already exists
let existing = db.getWorker(taskId)
if existing.isSome:
echo "Worker already exists: ", taskId
echo " State: ", existing.get.state
echo " Worktree: ", existing.get.worktree
quit(ExitSuccess)
# Pre-compute names so rollback works even if createWorktree fails mid-way
let branch = branchName(taskId)
let worktree = worktreePath(taskId)
var branchCreated, worktreeCreated = false
var currentStep = "initializing"
try:
# Create git worktree
currentStep = "creating git worktree"
discard createWorktree(taskId, fromBranch, noFetch)
branchCreated = true
worktreeCreated = true
# Create context file
currentStep = "creating context file"
discard createWorkerContext(taskId, branch, worktree, description)
# Create worker in DB
currentStep = "registering in database"
discard db.createWorker(taskId, branch, worktree, description)
# Enable review-gate for this task
let reviewEnabled = enableReview(taskId)
echo "Created worker: ", taskId
echo " Branch: ", branch
echo " Worktree: ", worktree
echo " State: ASSIGNED"
if reviewEnabled:
echo " Review: enabled"
else:
echo " Review: (review-gate not available - install 'review-gate' for review integration)"
except CatchableError as e:
echo "Error spawning worker (", currentStep, "): ", e.msg
# Rollback in reverse order
if worktreeCreated or dirExists(worktree):
echo "Rolling back worktree..."
removeWorktree(taskId)
if branchCreated or gitBranchExists(branch):
echo "Rolling back branch..."
removeBranch(taskId, remote = false)
quit(1)
# Status table column widths
const
ColTask = 14
ColState = 12
ColReview = 10
ColAge = 8
ColHeartbeat = 12
ColStatus = 8
ColSummaryMax = 30
proc renderStatusTable(stateFilter: string, staleOnly: bool, asJson: bool) =
## Render worker status as table or JSON
let db = openBusDb()
defer: db.close()
let workers = db.getAllWorkers()
if asJson:
var arr = newJArray()
for w in workers:
let rs = getReviewState(w.taskId)
arr.add(%*{
"task_id": w.taskId,
"state": $w.state,
"branch": w.branch,
"stale": w.staleLevel,
"age": $(getTime() - w.createdAt),
"review": $rs.status
})
echo arr.pretty
return
# Filter
var filtered = workers
if stateFilter != "":
filtered = workers.filterIt($it.state == stateFilter.toUpperAscii)
if staleOnly:
filtered = filtered.filterIt(it.isStale)
if filtered.len == 0:
echo "No workers found."
return
# Table header
echo ""
echo "TASK".alignLeft(ColTask), "STATE".alignLeft(ColState),
"REVIEW".alignLeft(ColReview), "AGE".alignLeft(ColAge),
"HEARTBEAT".alignLeft(ColHeartbeat), "STATUS".alignLeft(ColStatus), "SUMMARY"
echo "-".repeat(80)
for w in filtered:
let age = getTime() - w.createdAt
let ageStr = if age.inHours > 0: &"{age.inHours}h"
elif age.inMinutes > 0: &"{age.inMinutes}m"
else: &"{age.inSeconds}s"
var hbStr = "--"
if w.lastHeartbeat != Time():
let hbAge = getTime() - w.lastHeartbeat
hbStr = if hbAge.inMinutes > 0: &"{hbAge.inMinutes}m ago"
else: &"{hbAge.inSeconds}s ago"
let rs = getReviewState(w.taskId)
let reviewStr = case rs.status
of rsNone: "--"
of rsPending: "pending"
of rsApproved: "approved"
of rsRejected: "REJECTED"
let staleStr = w.staleLevel
let summary = if w.description.len > ColSummaryMax:
w.description[0..<ColSummaryMax] & "..."
else: w.description
echo w.taskId.alignLeft(ColTask),
($w.state).alignLeft(ColState),
reviewStr.alignLeft(ColReview),
ageStr.alignLeft(ColAge),
hbStr.alignLeft(ColHeartbeat),
staleStr.alignLeft(ColStatus),
summary
proc status(state: string = "", stale: bool = false,
json: bool = false, watch: bool = false) =
## Show dashboard of all workers
if watch:
while true:
eraseScreen()
setCursorPos(0, 0)
echo "Worker Status (", now().format("HH:mm:ss"), ") - Press Ctrl+C to exit"
renderStatusTable(state, stale, json)
sleep(2000)
else:
renderStatusTable(state, stale, json)
proc approve(taskId: string) =
## Approve completed work (IN_REVIEW → APPROVED)
discard validateTaskId(taskId)
let db = openBusDb()
defer: db.close()
# Update review-gate state
discard approveReview(taskId)
try:
db.transition(taskId, wsInReview, wsApproved)
echo "Approved: ", taskId
except WorkerNotFound:
echo "Worker not found: ", taskId
quit(ExitNotFound)
except InvalidTransition as e:
echo "Cannot approve: ", e.msg
quit(ExitInvalidTransition)
except StaleState as e:
echo "Cannot approve: ", e.msg
quit(ExitInvalidTransition)
proc requestChanges(taskId: string, comment: string = "") =
## Request revisions (IN_REVIEW → WORKING)
discard validateTaskId(taskId)
let db = openBusDb()
defer: db.close()
# Update review-gate state
discard rejectReview(taskId, comment)
try:
db.transition(taskId, wsInReview, wsWorking)
echo "Changes requested: ", taskId
if comment != "":
echo " Comment: ", comment
except WorkerNotFound:
echo "Worker not found: ", taskId
quit(ExitNotFound)
except InvalidTransition as e:
echo "Cannot request changes: ", e.msg
quit(ExitInvalidTransition)
except StaleState as e:
echo "Cannot request changes: ", e.msg
quit(ExitInvalidTransition)
proc merge(taskId: string, deleteBranch: bool = false) =
## Merge approved work (APPROVED → COMPLETED)
discard validateTaskId(taskId)
let db = openBusDb()
defer: db.close()
# Verify state
let st = db.getState(taskId)
if st.isNone:
echo "Worker not found: ", taskId
quit(ExitNotFound)
if st.get != wsApproved:
echo "Worker must be APPROVED to merge, got: ", st.get
quit(ExitInvalidTransition)
# Attempt merge
let ok = mergeToIntegration(taskId)
if not ok:
echo "Merge conflict - returning to WORKING state"
db.transition(taskId, wsApproved, wsWorking)
quit(ExitConflict)
# Success
db.transition(taskId, wsApproved, wsCompleted)
cleanReviewState(taskId)
if deleteBranch:
removeBranch(taskId)
removeWorktree(taskId)
echo "Merged: ", taskId
proc cancel(taskId: string, reason: string = "", cleanup: bool = false) =
## Abort a worker (* → FAILED)
discard validateTaskId(taskId)
let db = openBusDb()
defer: db.close()
db.transitionToFailed(taskId, reason)
cleanReviewState(taskId)
echo "Cancelled: ", taskId
if cleanup:
removeWorktree(taskId)
echo " Worktree removed"
# -----------------------------------------------------------------------------
# Agent Commands (run from worktree)
# -----------------------------------------------------------------------------
proc start(task: string = "") =
## Signal ASSIGNED → WORKING (run from worktree)
let taskId = if task != "": validateTaskId(task) else: getTaskId()
# Use main repo DB path (works from worktrees)
let dbPath = getMainRepoBusDbPath()
let db = openBusDb(dbPath)
defer: db.close()
# Check current state
let st = db.getState(taskId)
if st.isNone:
echo "Worker not found: ", taskId
quit(ExitNotFound)
if st.get == wsWorking:
echo "Already WORKING: ", taskId
quit(ExitSuccess)
# Start heartbeat before transition so we're heartbeating when state changes
discard startGlobalHeartbeat(dbPath, taskId)
try:
db.transition(taskId, wsAssigned, wsWorking)
updateGlobalHeartbeat(hsWorking, taskId)
echo "Started work on ", taskId
except InvalidTransition as e:
stopGlobalHeartbeat()
echo "Cannot start: ", e.msg
quit(ExitInvalidTransition)
except StaleState as e:
stopGlobalHeartbeat()
echo "Cannot start: ", e.msg
quit(ExitInvalidTransition)
proc done(skipRebase: bool = false) =
## Signal WORKING → IN_REVIEW (includes rebase)
let ctx = findContext()
# Use main repo DB path (works from worktrees)
let db = openBusDb(getMainRepoBusDbPath())
defer: db.close()
let st = db.getState(ctx.taskId)
if st.isNone:
echo "Worker not found: ", ctx.taskId
quit(ExitNotFound)
# Check we're in WORKING or CONFLICTED
if st.get notin {wsWorking, wsConflicted}:
echo "Cannot complete from state: ", st.get
quit(ExitInvalidTransition)
# Rebase unless skipped
if not skipRebase:
let ok = rebaseOnIntegration(ctx.worktree)
if not ok:
db.transition(ctx.taskId, st.get, wsConflicted)
let files = getConflictedFiles(ctx.worktree)
echo "ERROR: Rebase conflict detected"
echo " Conflicting files: ", files.join(", ")
echo " State: CONFLICTED"
echo ""
echo " To resolve:"
echo " 1. Fix conflicts in worktree"
echo " 2. git add <files>"
echo " 3. git rebase --continue"
echo " 4. worker done --skip-rebase"
quit(ExitConflict)
else:
# Verify no rebase in progress
if isRebaseInProgress(ctx.worktree):
echo "ERROR: Rebase still in progress. Run: git rebase --continue"
quit(ExitConflict)
# Push
pushBranch(ctx.worktree, ctx.branch)
# Transition
db.transition(ctx.taskId, st.get, wsInReview)
stopGlobalHeartbeat()
echo "Ready for review: ", ctx.taskId
proc fail(reason: string) =
## Signal WORKING → FAILED
let ctx = findContext()
# Use main repo DB path (works from worktrees)
let db = openBusDb(getMainRepoBusDbPath())
defer: db.close()
try:
db.transitionToFailed(ctx.taskId, reason)
stopGlobalHeartbeat()
echo "Failed: ", ctx.taskId, " - ", reason
except WorkerNotFound:
echo "Worker not found: ", ctx.taskId
quit(ExitNotFound)
except InvalidTransition as e:
echo "Cannot fail: ", e.msg
quit(ExitInvalidTransition)
except StaleState as e:
echo "Cannot fail: ", e.msg
quit(ExitInvalidTransition)
proc sendHeartbeat(status: string = "working", progress: float = 0.0) =
## Emit a single heartbeat (normally done by background thread)
let ctx = findContext()
# Use main repo DB path (works from worktrees)
let db = openBusDb(getMainRepoBusDbPath())
defer: db.close()
let hs = try:
parseEnum[HeartbeatStatus](status)
except ValueError:
hsWorking # Default for unknown status
db.writeHeartbeat(ctx.taskId, hs, ctx.taskId, progress)
echo "Heartbeat: ", ctx.taskId, " - ", status
proc show(taskId: string) =
## Show detailed view of a worker
discard validateTaskId(taskId)
let db = openBusDb()
defer: db.close()
let info = db.getWorker(taskId)
if info.isNone:
echo "Worker not found: ", taskId
quit(ExitNotFound)
let w = info.get
let rs = getReviewState(w.taskId)
echo ""
echo "Task: ", w.taskId
echo "Description: ", w.description
echo "State: ", w.state
echo "Review: ", reviewStatusStr(rs)
echo "Branch: ", w.branch
echo "Worktree: ", w.worktree
echo ""
echo "Created: ", w.createdAt.format("yyyy-MM-dd HH:mm:ss")
echo "State Changed: ", w.stateChangedAt.format("yyyy-MM-dd HH:mm:ss")
if w.lastHeartbeat != Time():
echo "Last Heartbeat: ", w.lastHeartbeat.format("yyyy-MM-dd HH:mm:ss")
echo ""
echo "Status: ", w.staleLevel
# -----------------------------------------------------------------------------
# CLI Dispatch
# -----------------------------------------------------------------------------
when isMainModule:
dispatchMulti(
[spawn, help = {"taskId": "Task identifier",
"description": "Task description",
"fromBranch": "Base branch",
"noFetch": "Skip git fetch"}],
[status, help = {"state": "Filter by state",
"stale": "Show only stale workers",
"json": "Output as JSON",
"watch": "Refresh every 2s"}],
[start, help = {"task": "Task ID (reads from context if empty)"}],
[done, help = {"skipRebase": "Skip rebase (after manual conflict resolution)"}],
[approve, help = {"taskId": "Task to approve"}],
[requestChanges, cmdName = "request-changes",
help = {"taskId": "Task to reject",
"comment": "Feedback for agent"}],
[merge, help = {"taskId": "Task to merge",
"deleteBranch": "Delete branch after merge"}],
[cancel, help = {"taskId": "Task to cancel",
"reason": "Cancellation reason",
"cleanup": "Remove worktree"}],
[fail, help = {"reason": "Failure reason"}],
[sendHeartbeat, cmdName = "heartbeat",
help = {"status": "Status (idle, working, blocked)",
"progress": "Progress 0.0-1.0"}],
[show, help = {"taskId": "Task to show"}]
)