## Worker state machine ## ## States: ASSIGNED → WORKING → IN_REVIEW → APPROVED → COMPLETED ## Also: CONFLICTED (rebase), FAILED (error) ## ## Key patterns: ## - Compare-and-set with BEGIN IMMEDIATE ## - STALE computed from heartbeat age (not persistent) import std/[tables, strformat, json, options, times] import tiny_sqlite import ./types import ./db import ./utils proc rowToWorkerInfo*(row: ResultRow, heartbeatMs: Option[int64] = none(int64)): WorkerInfo = ## Convert a database row to WorkerInfo. ## Row must have: task_id, state, branch, worktree, description, created_at_ms, state_changed_at_ms result = WorkerInfo( taskId: row[0].fromDbValue(string), state: parseState(row[1].fromDbValue(string)), branch: row[2].fromDbValue(string), worktree: row[3].fromDbValue(string), createdAt: msToTime(row[5].fromDbValue(int64)), stateChangedAt: msToTime(row[6].fromDbValue(int64)), ) if row[4].kind != sqliteNull: result.description = row[4].fromDbValue(string) if heartbeatMs.isSome: result.lastHeartbeat = msToTime(heartbeatMs.get) const ValidTransitions* = { wsAssigned: @[wsWorking, wsFailed], wsWorking: @[wsInReview, wsConflicted, wsFailed], wsConflicted: @[wsInReview, wsWorking, wsFailed], wsInReview: @[wsApproved, wsWorking, wsFailed], wsApproved: @[wsCompleted, wsWorking, wsFailed], wsCompleted: newSeq[WorkerState](), # Empty - no transitions out wsFailed: @[wsAssigned], # Can retry }.toTable proc canTransition*(fromState, toState: WorkerState): bool = ## Check if transition is valid if fromState notin ValidTransitions: return false return toState in ValidTransitions[fromState] proc transition*(db: DbConn, taskId: string, fromState, toState: WorkerState) = ## Attempt state transition with compare-and-set guard. ## Raises InvalidTransition or StaleState on failure. if not canTransition(fromState, toState): raise newException(InvalidTransition, &"Invalid transition: {fromState} → {toState}") db.withTransaction: let row = db.one( "SELECT state FROM workers WHERE task_id = ?", taskId ) if row.isNone: raise newException(WorkerNotFound, &"Worker not found: {taskId}") let currentStr = row.get[0].fromDbValue(string) let current = parseState(currentStr) if current != fromState: raise newException(StaleState, &"Expected {fromState}, got {current}") let tsMs = epochMs() db.exec(""" UPDATE workers SET state = ?, state_changed_at_ms = ? WHERE task_id = ? """, $toState, tsMs, taskId) # Publish state change message discard db.publish("worker", "state_change", correlationId = taskId, payload = %*{ "from": $fromState, "to": $toState, "task_id": taskId }) proc transitionToFailed*(db: DbConn, taskId: string, reason: string = "") = ## Transition any state to FAILED db.withTransaction: let row = db.one( "SELECT state FROM workers WHERE task_id = ?", taskId ) if row.isNone: raise newException(WorkerNotFound, &"Worker not found: {taskId}") let currentStr = row.get[0].fromDbValue(string) let current = parseState(currentStr) if current == wsCompleted: raise newException(InvalidTransition, "Cannot fail a completed worker") let tsMs = epochMs() db.exec(""" UPDATE workers SET state = ?, state_changed_at_ms = ? WHERE task_id = ? """, $wsFailed, tsMs, taskId) discard db.publish("worker", "task_failed", correlationId = taskId, payload = %*{ "from": $current, "task_id": taskId, "reason": reason }) proc getState*(db: DbConn, taskId: string): Option[WorkerState] = ## Get current state for a worker let row = db.one( "SELECT state FROM workers WHERE task_id = ?", taskId ) if row.isSome: return some(parseState(row.get[0].fromDbValue(string))) proc getWorker*(db: DbConn, taskId: string): Option[WorkerInfo] = ## Get full worker information let row = db.one(""" SELECT task_id, state, branch, worktree, description, created_at_ms, state_changed_at_ms FROM workers WHERE task_id = ? """, taskId) if row.isNone: return none(WorkerInfo) let hb = db.getHeartbeat(taskId) let hbMs = if hb.isSome: some(hb.get.tsMs) else: none(int64) return some(rowToWorkerInfo(row.get, hbMs)) proc getAllWorkers*(db: DbConn): seq[WorkerInfo] = ## Get all workers with their current state for row in db.iterate(""" SELECT w.task_id, w.state, w.branch, w.worktree, w.description, w.created_at_ms, w.state_changed_at_ms, h.ts_ms as heartbeat_ms FROM workers w LEFT JOIN heartbeats h ON w.task_id = h.agent_id ORDER BY w.state_changed_at_ms DESC """): let hbMs = optFromDb(row[7], int64) result.add(rowToWorkerInfo(row, hbMs)) proc createWorker*(db: DbConn, taskId, branch, worktree: string, description: string = ""): WorkerInfo = ## Create a new worker in ASSIGNED state let tsMs = epochMs() db.exec(""" INSERT INTO workers (task_id, state, branch, worktree, description, created_at_ms, state_changed_at_ms) VALUES (?, ?, ?, ?, ?, ?, ?) """, taskId, $wsAssigned, branch, worktree, optStr(description), tsMs, tsMs) discard db.publish("orchestrator", "task_assign", correlationId = taskId, payload = %*{ "task_id": taskId, "branch": branch, "worktree": worktree, "description": description }) return WorkerInfo( taskId: taskId, state: wsAssigned, branch: branch, worktree: worktree, description: description, createdAt: msToTime(tsMs), stateChangedAt: msToTime(tsMs), ) proc computeStaleLevel*(info: WorkerInfo): StaleLevel = ## Compute staleness level based on heartbeat age. ## Only ASSIGNED and WORKING states can be stale. if info.state notin {wsAssigned, wsWorking}: return slOk if info.lastHeartbeat == Time(): return slDead # No heartbeat yet let age = epochMs() - toUnix(info.lastHeartbeat) * 1000 if age > DeadThresholdMs: slDead elif age > StaleThresholdMs: slStale elif age > StaleWarnThresholdMs: slWarn else: slOk proc isStale*(info: WorkerInfo): bool = ## Check if worker is stale (STALE or DEAD level) info.computeStaleLevel in {slStale, slDead} proc staleLevel*(info: WorkerInfo): string = ## Get stale level as string: ok, WARN, STALE, DEAD $info.computeStaleLevel