From 3490f286827bf38cfa6d37ca2558af684268cc4e Mon Sep 17 00:00:00 2001 From: dan Date: Sun, 11 Jan 2026 15:37:55 -0800 Subject: [PATCH] refactor: consolidate stale logic, use transaction helper, add migrations - Add StaleLevel enum (ok, WARN, STALE, DEAD) to types.nim - Extract computeStaleLevel() as single source of truth - Simplify isStale() and staleLevel() to use computeStaleLevel() - Refactor transition() and transitionToFailed() to use withTransaction - Add schema migration infrastructure: - CurrentSchemaVersion constant - Migrations sequence for incremental upgrades - getSchemaVersion() and runMigrations() procs - initSchema() now runs pending migrations Closes: skills-dtk, skills-8fd, skills-9ny Co-Authored-By: Claude Opus 4.5 --- src/worker/db.nim | 47 +++++++++++++++++++++++++++++++++-- src/worker/state.nim | 58 +++++++++++++++++--------------------------- src/worker/types.nim | 7 ++++++ 3 files changed, 74 insertions(+), 38 deletions(-) diff --git a/src/worker/db.nim b/src/worker/db.nim index c372ba0..fc60182 100644 --- a/src/worker/db.nim +++ b/src/worker/db.nim @@ -92,13 +92,56 @@ CREATE TABLE IF NOT EXISTS export_state ( ); """ +const + CurrentSchemaVersion* = 1 + + # Migrations are applied in order. Each migration upgrades FROM the version + # number to the next version. Add new migrations at the end. + # Format: (fromVersion, sql) + Migrations*: seq[tuple[fromVersion: int, sql: string]] = @[ + # Example migration (uncomment when needed): + # (1, "ALTER TABLE workers ADD COLUMN priority INTEGER DEFAULT 2"), + ] + +proc getSchemaVersion*(db: DbConn): int = + ## Get current schema version from database + let row = db.one("SELECT value FROM meta WHERE key = 'schema_version'") + if row.isNone: + return 0 + try: + return parseInt(row.get[0].fromDbValue(string)) + except ValueError: + return 0 + +proc runMigrations*(db: DbConn) = + ## Apply any pending schema migrations + let currentVersion = db.getSchemaVersion() + + for migration in Migrations: + if migration.fromVersion >= currentVersion: + continue # Skip already-applied migrations + + # Run migration in a transaction + db.withTransaction: + db.execScript(migration.sql) + let newVersion = migration.fromVersion + 1 + db.exec("UPDATE meta SET value = ? WHERE key = 'schema_version'", + $newVersion) + + logWarn("runMigrations", "Applied migration " & $migration.fromVersion & + " → " & $(migration.fromVersion + 1)) + proc initSchema*(db: DbConn) = - ## Initialize database schema + ## Initialize database schema and run migrations db.execScript(Schema) # Insert meta if not exists - db.exec("INSERT OR IGNORE INTO meta (key, value) VALUES ('schema_version', '1')") + db.exec("INSERT OR IGNORE INTO meta (key, value) VALUES ('schema_version', ?)", + $CurrentSchemaVersion) db.exec("INSERT OR IGNORE INTO export_state (id, last_seq) VALUES (1, 0)") + # Run any pending migrations for existing databases + runMigrations(db) + proc openBusDb*(dbPath: string = BusDbPath): DbConn = ## Open database with required PRAGMAs. One connection per thread. try: diff --git a/src/worker/state.nim b/src/worker/state.nim index bf954df..4469051 100644 --- a/src/worker/state.nim +++ b/src/worker/state.nim @@ -51,8 +51,7 @@ proc transition*(db: DbConn, taskId: string, fromState, toState: WorkerState) = if not canTransition(fromState, toState): raise newException(InvalidTransition, &"Invalid transition: {fromState} → {toState}") - db.exec("BEGIN IMMEDIATE") - try: + db.withTransaction: let row = db.one( "SELECT state FROM workers WHERE task_id = ?", taskId @@ -83,15 +82,9 @@ proc transition*(db: DbConn, taskId: string, fromState, toState: WorkerState) = "task_id": taskId }) - db.exec("COMMIT") - except CatchableError: - db.exec("ROLLBACK") - raise - proc transitionToFailed*(db: DbConn, taskId: string, reason: string = "") = ## Transition any state to FAILED - db.exec("BEGIN IMMEDIATE") - try: + db.withTransaction: let row = db.one( "SELECT state FROM workers WHERE task_id = ?", taskId @@ -121,11 +114,6 @@ proc transitionToFailed*(db: DbConn, taskId: string, reason: string = "") = "reason": reason }) - db.exec("COMMIT") - except CatchableError: - db.exec("ROLLBACK") - raise - proc getState*(db: DbConn, taskId: string): Option[WorkerState] = ## Get current state for a worker let row = db.one( @@ -195,31 +183,29 @@ proc createWorker*(db: DbConn, taskId, branch, worktree: string, stateChangedAt: msToTime(tsMs), ) -proc isStale*(info: WorkerInfo): bool = - ## Check if worker is stale based on heartbeat age +proc computeStaleLevel*(info: WorkerInfo): StaleLevel = + ## Compute staleness level based on heartbeat age. + ## Only ASSIGNED and WORKING states can be stale. if info.state notin {wsAssigned, wsWorking}: - return false + return slOk if info.lastHeartbeat == Time(): - return true # No heartbeat yet + return slDead # No heartbeat yet - let age = (epochMs() - toUnix(info.lastHeartbeat) * 1000) - return age > StaleThresholdMs + let age = epochMs() - toUnix(info.lastHeartbeat) * 1000 + if age > DeadThresholdMs: + slDead + elif age > StaleThresholdMs: + slStale + elif age > StaleWarnThresholdMs: + slWarn + else: + slOk + +proc isStale*(info: WorkerInfo): bool = + ## Check if worker is stale (STALE or DEAD level) + info.computeStaleLevel in {slStale, slDead} proc staleLevel*(info: WorkerInfo): string = - ## Get stale level: ok, WARN, STALE, DEAD - if info.state notin {wsAssigned, wsWorking}: - return "ok" - - if info.lastHeartbeat == Time(): - return "DEAD" - - let age = (epochMs() - toUnix(info.lastHeartbeat) * 1000) - if age > DeadThresholdMs: - return "DEAD" - elif age > StaleThresholdMs: - return "STALE" - elif age > StaleWarnThresholdMs: - return "WARN" - else: - return "ok" + ## Get stale level as string: ok, WARN, STALE, DEAD + $info.computeStaleLevel diff --git a/src/worker/types.nim b/src/worker/types.nim index ebccea5..3b9e71b 100644 --- a/src/worker/types.nim +++ b/src/worker/types.nim @@ -41,6 +41,13 @@ type hsWorking = "working" hsBlocked = "blocked" + StaleLevel* = enum + ## Staleness level based on heartbeat age + slOk = "ok" + slWarn = "WARN" + slStale = "STALE" + slDead = "DEAD" + # Errors InvalidTransition* = object of CatchableError StaleState* = object of CatchableError