skills/src/worker/state.nim
dan 3490f28682 refactor: consolidate stale logic, use transaction helper, add migrations
- Add StaleLevel enum (ok, WARN, STALE, DEAD) to types.nim
- Extract computeStaleLevel() as single source of truth
- Simplify isStale() and staleLevel() to use computeStaleLevel()
- Refactor transition() and transitionToFailed() to use withTransaction
- Add schema migration infrastructure:
  - CurrentSchemaVersion constant
  - Migrations sequence for incremental upgrades
  - getSchemaVersion() and runMigrations() procs
  - initSchema() now runs pending migrations

Closes: skills-dtk, skills-8fd, skills-9ny

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-11 15:37:55 -08:00

212 lines
6.4 KiB
Nim

## Worker state machine
##
## States: ASSIGNED → WORKING → IN_REVIEW → APPROVED → COMPLETED
## Also: CONFLICTED (rebase), FAILED (error)
##
## Key patterns:
## - Compare-and-set with BEGIN IMMEDIATE
## - STALE computed from heartbeat age (not persistent)
import std/[tables, strformat, json, options, times]
import tiny_sqlite
import ./types
import ./db
import ./utils
proc rowToWorkerInfo*(row: ResultRow, heartbeatMs: Option[int64] = none(int64)): WorkerInfo =
## Convert a database row to WorkerInfo.
## Row must have: task_id, state, branch, worktree, description, created_at_ms, state_changed_at_ms
result = WorkerInfo(
taskId: row[0].fromDbValue(string),
state: parseState(row[1].fromDbValue(string)),
branch: row[2].fromDbValue(string),
worktree: row[3].fromDbValue(string),
createdAt: msToTime(row[5].fromDbValue(int64)),
stateChangedAt: msToTime(row[6].fromDbValue(int64)),
)
if row[4].kind != sqliteNull:
result.description = row[4].fromDbValue(string)
if heartbeatMs.isSome:
result.lastHeartbeat = msToTime(heartbeatMs.get)
const ValidTransitions* = {
wsAssigned: @[wsWorking, wsFailed],
wsWorking: @[wsInReview, wsConflicted, wsFailed],
wsConflicted: @[wsInReview, wsWorking, wsFailed],
wsInReview: @[wsApproved, wsWorking, wsFailed],
wsApproved: @[wsCompleted, wsWorking, wsFailed],
wsCompleted: newSeq[WorkerState](), # Empty - no transitions out
wsFailed: @[wsAssigned], # Can retry
}.toTable
proc canTransition*(fromState, toState: WorkerState): bool =
## Check if transition is valid
if fromState notin ValidTransitions:
return false
return toState in ValidTransitions[fromState]
proc transition*(db: DbConn, taskId: string, fromState, toState: WorkerState) =
## Attempt state transition with compare-and-set guard.
## Raises InvalidTransition or StaleState on failure.
if not canTransition(fromState, toState):
raise newException(InvalidTransition, &"Invalid transition: {fromState} → {toState}")
db.withTransaction:
let row = db.one(
"SELECT state FROM workers WHERE task_id = ?",
taskId
)
if row.isNone:
raise newException(WorkerNotFound, &"Worker not found: {taskId}")
let currentStr = row.get[0].fromDbValue(string)
let current = parseState(currentStr)
if current != fromState:
raise newException(StaleState, &"Expected {fromState}, got {current}")
let tsMs = epochMs()
db.exec("""
UPDATE workers
SET state = ?, state_changed_at_ms = ?
WHERE task_id = ?
""", $toState, tsMs, taskId)
# Publish state change message
discard db.publish("worker", "state_change",
correlationId = taskId,
payload = %*{
"from": $fromState,
"to": $toState,
"task_id": taskId
})
proc transitionToFailed*(db: DbConn, taskId: string, reason: string = "") =
## Transition any state to FAILED
db.withTransaction:
let row = db.one(
"SELECT state FROM workers WHERE task_id = ?",
taskId
)
if row.isNone:
raise newException(WorkerNotFound, &"Worker not found: {taskId}")
let currentStr = row.get[0].fromDbValue(string)
let current = parseState(currentStr)
if current == wsCompleted:
raise newException(InvalidTransition, "Cannot fail a completed worker")
let tsMs = epochMs()
db.exec("""
UPDATE workers
SET state = ?, state_changed_at_ms = ?
WHERE task_id = ?
""", $wsFailed, tsMs, taskId)
discard db.publish("worker", "task_failed",
correlationId = taskId,
payload = %*{
"from": $current,
"task_id": taskId,
"reason": reason
})
proc getState*(db: DbConn, taskId: string): Option[WorkerState] =
## Get current state for a worker
let row = db.one(
"SELECT state FROM workers WHERE task_id = ?",
taskId
)
if row.isSome:
return some(parseState(row.get[0].fromDbValue(string)))
proc getWorker*(db: DbConn, taskId: string): Option[WorkerInfo] =
## Get full worker information
let row = db.one("""
SELECT task_id, state, branch, worktree, description,
created_at_ms, state_changed_at_ms
FROM workers WHERE task_id = ?
""", taskId)
if row.isNone:
return none(WorkerInfo)
let hb = db.getHeartbeat(taskId)
let hbMs = if hb.isSome: some(hb.get.tsMs) else: none(int64)
return some(rowToWorkerInfo(row.get, hbMs))
proc getAllWorkers*(db: DbConn): seq[WorkerInfo] =
## Get all workers with their current state
for row in db.iterate("""
SELECT w.task_id, w.state, w.branch, w.worktree, w.description,
w.created_at_ms, w.state_changed_at_ms,
h.ts_ms as heartbeat_ms
FROM workers w
LEFT JOIN heartbeats h ON w.task_id = h.agent_id
ORDER BY w.state_changed_at_ms DESC
"""):
let hbMs = optFromDb(row[7], int64)
result.add(rowToWorkerInfo(row, hbMs))
proc createWorker*(db: DbConn, taskId, branch, worktree: string,
description: string = ""): WorkerInfo =
## Create a new worker in ASSIGNED state
let tsMs = epochMs()
db.exec("""
INSERT INTO workers (task_id, state, branch, worktree, description,
created_at_ms, state_changed_at_ms)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", taskId, $wsAssigned, branch, worktree,
optStr(description),
tsMs, tsMs)
discard db.publish("orchestrator", "task_assign",
correlationId = taskId,
payload = %*{
"task_id": taskId,
"branch": branch,
"worktree": worktree,
"description": description
})
return WorkerInfo(
taskId: taskId,
state: wsAssigned,
branch: branch,
worktree: worktree,
description: description,
createdAt: msToTime(tsMs),
stateChangedAt: msToTime(tsMs),
)
proc computeStaleLevel*(info: WorkerInfo): StaleLevel =
## Compute staleness level based on heartbeat age.
## Only ASSIGNED and WORKING states can be stale.
if info.state notin {wsAssigned, wsWorking}:
return slOk
if info.lastHeartbeat == Time():
return slDead # No heartbeat yet
let age = epochMs() - toUnix(info.lastHeartbeat) * 1000
if age > DeadThresholdMs:
slDead
elif age > StaleThresholdMs:
slStale
elif age > StaleWarnThresholdMs:
slWarn
else:
slOk
proc isStale*(info: WorkerInfo): bool =
## Check if worker is stale (STALE or DEAD level)
info.computeStaleLevel in {slStale, slDead}
proc staleLevel*(info: WorkerInfo): string =
## Get stale level as string: ok, WARN, STALE, DEAD
$info.computeStaleLevel