refactor: extract common utilities to utils.nim
- Add branchName() and worktreePath() helpers for consistent path generation - Add msToTime() for epoch ms to Time conversion (8 occurrences consolidated) - Add validateTaskId() for CLI input validation (prevents path traversal) - Add optString/optInt64 helpers for nullable DB values - Add withTransaction template for automatic rollback on error Closes: skills-lzh2, skills-3d9o, skills-5x2o, skills-qiq0, skills-73yu, skills-vuj2 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
5550421ae9
commit
d3d22a91e5
|
|
@ -12,7 +12,7 @@
|
|||
import std/[os, strformat, json, times, terminal, strutils, sequtils]
|
||||
import cligen
|
||||
import tiny_sqlite
|
||||
import worker/[types, db, state, git, context, heartbeat]
|
||||
import worker/[types, db, state, git, context, heartbeat, utils]
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Orchestrator Commands
|
||||
|
|
@ -21,6 +21,7 @@ import worker/[types, db, state, git, context, heartbeat]
|
|||
proc spawn(taskId: string, description: string = "",
|
||||
fromBranch: string = "origin/integration") =
|
||||
## Create a new worker workspace
|
||||
discard validateTaskId(taskId)
|
||||
let db = openBusDb()
|
||||
defer: db.close()
|
||||
|
||||
|
|
@ -120,6 +121,7 @@ proc status(state: string = "", stale: bool = false,
|
|||
|
||||
proc approve(taskId: string, by: string = "", comment: string = "") =
|
||||
## Approve completed work (IN_REVIEW → APPROVED)
|
||||
discard validateTaskId(taskId)
|
||||
let db = openBusDb()
|
||||
defer: db.close()
|
||||
|
||||
|
|
@ -128,6 +130,7 @@ proc approve(taskId: string, by: string = "", comment: string = "") =
|
|||
|
||||
proc requestChanges(taskId: string, comment: string = "") =
|
||||
## Request revisions (IN_REVIEW → WORKING)
|
||||
discard validateTaskId(taskId)
|
||||
let db = openBusDb()
|
||||
defer: db.close()
|
||||
|
||||
|
|
@ -138,6 +141,7 @@ proc requestChanges(taskId: string, comment: string = "") =
|
|||
|
||||
proc merge(taskId: string, deleteBranch: bool = false) =
|
||||
## Merge approved work (APPROVED → COMPLETED)
|
||||
discard validateTaskId(taskId)
|
||||
let db = openBusDb()
|
||||
defer: db.close()
|
||||
|
||||
|
|
@ -168,6 +172,7 @@ proc merge(taskId: string, deleteBranch: bool = false) =
|
|||
|
||||
proc cancel(taskId: string, reason: string = "", cleanup: bool = false) =
|
||||
## Abort a worker (* → FAILED)
|
||||
discard validateTaskId(taskId)
|
||||
let db = openBusDb()
|
||||
defer: db.close()
|
||||
|
||||
|
|
@ -184,7 +189,7 @@ proc cancel(taskId: string, reason: string = "", cleanup: bool = false) =
|
|||
|
||||
proc start(task: string = "") =
|
||||
## Signal ASSIGNED → WORKING (run from worktree)
|
||||
let taskId = if task != "": task else: getTaskId()
|
||||
let taskId = if task != "": validateTaskId(task) else: getTaskId()
|
||||
|
||||
let db = openBusDb()
|
||||
defer: db.close()
|
||||
|
|
@ -284,6 +289,7 @@ proc sendHeartbeat(status: string = "working", progress: float = 0.0) =
|
|||
|
||||
proc show(taskId: string) =
|
||||
## Show detailed view of a worker
|
||||
discard validateTaskId(taskId)
|
||||
let db = openBusDb()
|
||||
defer: db.close()
|
||||
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@
|
|||
|
||||
import std/[os, osproc, strutils, strformat, streams]
|
||||
import ./types
|
||||
import ./utils
|
||||
|
||||
proc runGit*(args: varargs[string], workDir: string = ""): tuple[output: string, exitCode: int] =
|
||||
## Run a git command and return output + exit code
|
||||
|
|
@ -33,8 +34,8 @@ proc runGitCheck*(args: varargs[string], workDir: string = ""): string =
|
|||
|
||||
proc createWorktree*(taskId: string, fromBranch: string = "origin/integration"): tuple[branch, worktree: string] =
|
||||
## Create a worktree for a task
|
||||
let branch = &"feat/{taskId}"
|
||||
let worktree = &"{WorktreesDir}/{taskId}"
|
||||
let branch = branchName(taskId)
|
||||
let worktree = worktreePath(taskId)
|
||||
|
||||
# Fetch latest
|
||||
discard runGit("fetch", "origin")
|
||||
|
|
@ -50,13 +51,13 @@ proc createWorktree*(taskId: string, fromBranch: string = "origin/integration"):
|
|||
|
||||
proc removeWorktree*(taskId: string) =
|
||||
## Remove a worktree
|
||||
let worktree = &"{WorktreesDir}/{taskId}"
|
||||
let worktree = worktreePath(taskId)
|
||||
if dirExists(worktree):
|
||||
discard runGit("worktree", "remove", "--force", worktree)
|
||||
|
||||
proc removeBranch*(taskId: string, remote: bool = true) =
|
||||
## Remove feature branch
|
||||
let branch = &"feat/{taskId}"
|
||||
let branch = branchName(taskId)
|
||||
discard runGit("branch", "-D", branch)
|
||||
if remote:
|
||||
discard runGit("push", "origin", "--delete", branch)
|
||||
|
|
@ -86,7 +87,7 @@ proc pushBranch*(worktree: string, branch: string) =
|
|||
|
||||
proc mergeToIntegration*(taskId: string, maxRetries: int = 3): bool =
|
||||
## Merge feature branch to integration with retry loop
|
||||
let branch = &"feat/{taskId}"
|
||||
let branch = branchName(taskId)
|
||||
|
||||
for attempt in 1..maxRetries:
|
||||
# Fetch latest
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ import std/[tables, strformat, json, options, times]
|
|||
import tiny_sqlite
|
||||
import ./types
|
||||
import ./db
|
||||
import ./utils
|
||||
|
||||
const ValidTransitions* = {
|
||||
wsAssigned: @[wsWorking, wsFailed],
|
||||
|
|
@ -138,8 +139,8 @@ proc getWorker*(db: DbConn, taskId: string): Option[WorkerInfo] =
|
|||
state: parseState(row.get[1].fromDbValue(string)),
|
||||
branch: row.get[2].fromDbValue(string),
|
||||
worktree: row.get[3].fromDbValue(string),
|
||||
createdAt: fromUnix(row.get[5].fromDbValue(int64) div 1000),
|
||||
stateChangedAt: fromUnix(row.get[6].fromDbValue(int64) div 1000),
|
||||
createdAt: msToTime(row.get[5].fromDbValue(int64)),
|
||||
stateChangedAt: msToTime(row.get[6].fromDbValue(int64)),
|
||||
)
|
||||
|
||||
if row.get[4].kind != sqliteNull:
|
||||
|
|
@ -148,7 +149,7 @@ proc getWorker*(db: DbConn, taskId: string): Option[WorkerInfo] =
|
|||
# Get heartbeat
|
||||
let hb = db.getHeartbeat(taskId)
|
||||
if hb.isSome:
|
||||
info.lastHeartbeat = fromUnix(hb.get.tsMs div 1000)
|
||||
info.lastHeartbeat = msToTime(hb.get.tsMs)
|
||||
|
||||
return some(info)
|
||||
|
||||
|
|
@ -167,15 +168,15 @@ proc getAllWorkers*(db: DbConn): seq[WorkerInfo] =
|
|||
state: parseState(row[1].fromDbValue(string)),
|
||||
branch: row[2].fromDbValue(string),
|
||||
worktree: row[3].fromDbValue(string),
|
||||
createdAt: fromUnix(row[5].fromDbValue(int64) div 1000),
|
||||
stateChangedAt: fromUnix(row[6].fromDbValue(int64) div 1000),
|
||||
createdAt: msToTime(row[5].fromDbValue(int64)),
|
||||
stateChangedAt: msToTime(row[6].fromDbValue(int64)),
|
||||
)
|
||||
|
||||
if row[4].kind != sqliteNull:
|
||||
info.description = row[4].fromDbValue(string)
|
||||
|
||||
if row[7].kind != sqliteNull:
|
||||
info.lastHeartbeat = fromUnix(row[7].fromDbValue(int64) div 1000)
|
||||
info.lastHeartbeat = msToTime(row[7].fromDbValue(int64))
|
||||
|
||||
result.add(info)
|
||||
|
||||
|
|
@ -207,8 +208,8 @@ proc createWorker*(db: DbConn, taskId, branch, worktree: string,
|
|||
branch: branch,
|
||||
worktree: worktree,
|
||||
description: description,
|
||||
createdAt: fromUnix(tsMs div 1000),
|
||||
stateChangedAt: fromUnix(tsMs div 1000),
|
||||
createdAt: msToTime(tsMs),
|
||||
stateChangedAt: msToTime(tsMs),
|
||||
)
|
||||
|
||||
proc isStale*(info: WorkerInfo): bool =
|
||||
|
|
|
|||
62
src/worker/utils.nim
Normal file
62
src/worker/utils.nim
Normal file
|
|
@ -0,0 +1,62 @@
|
|||
## Common utilities for worker coordination
|
||||
##
|
||||
## Consolidates repeated patterns across modules.
|
||||
|
||||
import std/[strformat, times, options]
|
||||
import tiny_sqlite
|
||||
import ./types
|
||||
|
||||
# Path helpers
|
||||
proc branchName*(taskId: string): string =
|
||||
## Get the feature branch name for a task
|
||||
&"feat/{taskId}"
|
||||
|
||||
proc worktreePath*(taskId: string): string =
|
||||
## Get the worktree path for a task
|
||||
&"{WorktreesDir}/{taskId}"
|
||||
|
||||
# Time conversion
|
||||
proc msToTime*(ms: int64): Time =
|
||||
## Convert epoch milliseconds to Time
|
||||
fromUnix(ms div 1000)
|
||||
|
||||
# Input validation
|
||||
const AllowedTaskIdChars = {'a'..'z', 'A'..'Z', '0'..'9', '_', '-'}
|
||||
|
||||
proc validateTaskId*(taskId: string): string =
|
||||
## Validate task ID to prevent path traversal and injection.
|
||||
## Returns the taskId if valid, raises ValueError otherwise.
|
||||
if taskId.len == 0:
|
||||
raise newException(ValueError, "Task ID cannot be empty")
|
||||
if taskId.len > 64:
|
||||
raise newException(ValueError, "Task ID too long (max 64 chars)")
|
||||
for c in taskId:
|
||||
if c notin AllowedTaskIdChars:
|
||||
raise newException(ValueError, "Task ID contains invalid character: " & $c)
|
||||
return taskId
|
||||
|
||||
# SQLite helpers
|
||||
proc optString*(val: DbValue): Option[string] =
|
||||
## Convert nullable DbValue to Option[string]
|
||||
if val.kind == sqliteNull:
|
||||
none(string)
|
||||
else:
|
||||
some(val.fromDbValue(string))
|
||||
|
||||
proc optInt64*(val: DbValue): Option[int64] =
|
||||
## Convert nullable DbValue to Option[int64]
|
||||
if val.kind == sqliteNull:
|
||||
none(int64)
|
||||
else:
|
||||
some(val.fromDbValue(int64))
|
||||
|
||||
# Transaction helper
|
||||
template withTransaction*(db: DbConn, body: untyped) =
|
||||
## Execute body within a transaction with automatic rollback on error.
|
||||
db.exec("BEGIN IMMEDIATE")
|
||||
try:
|
||||
body
|
||||
db.exec("COMMIT")
|
||||
except CatchableError:
|
||||
db.exec("ROLLBACK")
|
||||
raise
|
||||
Loading…
Reference in a new issue