refactor: consolidate stale logic, use transaction helper, add migrations

- Add StaleLevel enum (ok, WARN, STALE, DEAD) to types.nim
- Extract computeStaleLevel() as single source of truth
- Simplify isStale() and staleLevel() to use computeStaleLevel()
- Refactor transition() and transitionToFailed() to use withTransaction
- Add schema migration infrastructure:
  - CurrentSchemaVersion constant
  - Migrations sequence for incremental upgrades
  - getSchemaVersion() and runMigrations() procs
  - initSchema() now runs pending migrations

Closes: skills-dtk, skills-8fd, skills-9ny

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
dan 2026-01-11 15:37:55 -08:00
parent fb6da27e96
commit 3490f28682
3 changed files with 74 additions and 38 deletions

View file

@ -92,13 +92,56 @@ CREATE TABLE IF NOT EXISTS export_state (
);
"""
const
CurrentSchemaVersion* = 1
# Migrations are applied in order. Each migration upgrades FROM the version
# number to the next version. Add new migrations at the end.
# Format: (fromVersion, sql)
Migrations*: seq[tuple[fromVersion: int, sql: string]] = @[
# Example migration (uncomment when needed):
# (1, "ALTER TABLE workers ADD COLUMN priority INTEGER DEFAULT 2"),
]
proc getSchemaVersion*(db: DbConn): int =
## Get current schema version from database
let row = db.one("SELECT value FROM meta WHERE key = 'schema_version'")
if row.isNone:
return 0
try:
return parseInt(row.get[0].fromDbValue(string))
except ValueError:
return 0
proc runMigrations*(db: DbConn) =
## Apply any pending schema migrations
let currentVersion = db.getSchemaVersion()
for migration in Migrations:
if migration.fromVersion >= currentVersion:
continue # Skip already-applied migrations
# Run migration in a transaction
db.withTransaction:
db.execScript(migration.sql)
let newVersion = migration.fromVersion + 1
db.exec("UPDATE meta SET value = ? WHERE key = 'schema_version'",
$newVersion)
logWarn("runMigrations", "Applied migration " & $migration.fromVersion &
"" & $(migration.fromVersion + 1))
proc initSchema*(db: DbConn) =
## Initialize database schema
## Initialize database schema and run migrations
db.execScript(Schema)
# Insert meta if not exists
db.exec("INSERT OR IGNORE INTO meta (key, value) VALUES ('schema_version', '1')")
db.exec("INSERT OR IGNORE INTO meta (key, value) VALUES ('schema_version', ?)",
$CurrentSchemaVersion)
db.exec("INSERT OR IGNORE INTO export_state (id, last_seq) VALUES (1, 0)")
# Run any pending migrations for existing databases
runMigrations(db)
proc openBusDb*(dbPath: string = BusDbPath): DbConn =
## Open database with required PRAGMAs. One connection per thread.
try:

View file

@ -51,8 +51,7 @@ proc transition*(db: DbConn, taskId: string, fromState, toState: WorkerState) =
if not canTransition(fromState, toState):
raise newException(InvalidTransition, &"Invalid transition: {fromState} → {toState}")
db.exec("BEGIN IMMEDIATE")
try:
db.withTransaction:
let row = db.one(
"SELECT state FROM workers WHERE task_id = ?",
taskId
@ -83,15 +82,9 @@ proc transition*(db: DbConn, taskId: string, fromState, toState: WorkerState) =
"task_id": taskId
})
db.exec("COMMIT")
except CatchableError:
db.exec("ROLLBACK")
raise
proc transitionToFailed*(db: DbConn, taskId: string, reason: string = "") =
## Transition any state to FAILED
db.exec("BEGIN IMMEDIATE")
try:
db.withTransaction:
let row = db.one(
"SELECT state FROM workers WHERE task_id = ?",
taskId
@ -121,11 +114,6 @@ proc transitionToFailed*(db: DbConn, taskId: string, reason: string = "") =
"reason": reason
})
db.exec("COMMIT")
except CatchableError:
db.exec("ROLLBACK")
raise
proc getState*(db: DbConn, taskId: string): Option[WorkerState] =
## Get current state for a worker
let row = db.one(
@ -195,31 +183,29 @@ proc createWorker*(db: DbConn, taskId, branch, worktree: string,
stateChangedAt: msToTime(tsMs),
)
proc isStale*(info: WorkerInfo): bool =
## Check if worker is stale based on heartbeat age
proc computeStaleLevel*(info: WorkerInfo): StaleLevel =
## Compute staleness level based on heartbeat age.
## Only ASSIGNED and WORKING states can be stale.
if info.state notin {wsAssigned, wsWorking}:
return false
return slOk
if info.lastHeartbeat == Time():
return true # No heartbeat yet
return slDead # No heartbeat yet
let age = (epochMs() - toUnix(info.lastHeartbeat) * 1000)
return age > StaleThresholdMs
let age = epochMs() - toUnix(info.lastHeartbeat) * 1000
if age > DeadThresholdMs:
slDead
elif age > StaleThresholdMs:
slStale
elif age > StaleWarnThresholdMs:
slWarn
else:
slOk
proc isStale*(info: WorkerInfo): bool =
## Check if worker is stale (STALE or DEAD level)
info.computeStaleLevel in {slStale, slDead}
proc staleLevel*(info: WorkerInfo): string =
## Get stale level: ok, WARN, STALE, DEAD
if info.state notin {wsAssigned, wsWorking}:
return "ok"
if info.lastHeartbeat == Time():
return "DEAD"
let age = (epochMs() - toUnix(info.lastHeartbeat) * 1000)
if age > DeadThresholdMs:
return "DEAD"
elif age > StaleThresholdMs:
return "STALE"
elif age > StaleWarnThresholdMs:
return "WARN"
else:
return "ok"
## Get stale level as string: ok, WARN, STALE, DEAD
$info.computeStaleLevel

View file

@ -41,6 +41,13 @@ type
hsWorking = "working"
hsBlocked = "blocked"
StaleLevel* = enum
## Staleness level based on heartbeat age
slOk = "ok"
slWarn = "WARN"
slStale = "STALE"
slDead = "DEAD"
# Errors
InvalidTransition* = object of CatchableError
StaleState* = object of CatchableError