feat(dispatcher): persist parked executions across daemon restart (#116)#160
Conversation
|
Caution Review failedThe pull request is closed. ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Plus Run ID: 📒 Files selected for processing (9)
📝 WalkthroughWalkthroughMakes the dispatcher’s bunqueue Engine durable (SQLite-backed), adds createDurableEngine/recoverEngine/reconcileOrphanedSignals, integrates recovery into daemon boot before polling, tightens terminal-state typing, and adds unit and end-to-end tests plus planning docs for Issue ChangesDurable Workflow Recovery
Sequence DiagramsequenceDiagram
participant Daemon as Dispatcher Daemon
participant createDurable as createDurableEngine
participant Engine
participant Recover as recoverEngine
participant Reconcile as reconcileOrphanedSignals
participant DB as queue.sqlite3
participant Poller as Resume Poller
Note over Daemon,DB: Boot with persistent queue
Daemon->>createDurable: createDurableEngine(dataPath)
createDurable->>Engine: returns Engine(embedded, dataPath)
Daemon->>Daemon: registerWorkflows()
Note over Daemon,DB: Recovery: re-arm parked executions
Daemon->>Recover: recoverEngine(engine)
Recover->>Engine: engine.cleanup() — clear running/compensating
Recover->>Engine: engine.recover() — re-arm waiting executions
Note over Daemon,Reconcile: Reconcile orphaned signals
Daemon->>Reconcile: reconcileOrphanedSignals({db, hasExecution})
Reconcile->>DB: scan waiting-human signals
Reconcile->>Reconcile: finalize orphan + consume signal
Note over Poller,DB: Start polling/resume flow
Daemon->>Poller: startPolling()
Poller->>DB: poll waitfor_signals
Poller->>Engine: engine.signal() on recoverable executions
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 ESLint
ESLint skipped: no ESLint configuration detected in root package.json. To enable, add Comment |
thejustinwalsh
left a comment
There was a problem hiding this comment.
Decision-log rationale (from planning/issues/116/decisions.md), inlined where it lives in the code.
| * Caveat: if a `BUNQUEUE_DATA_PATH`/`BQ_DATA_PATH`/`DATA_PATH`/`SQLITE_PATH` env var is | ||
| * set, the throwaway `Queue` would itself become persistent — middle never sets these. | ||
| */ | ||
| export function createDurableEngine(dataPath: string): Engine { |
There was a problem hiding this comment.
Why a transient queue, not just new Engine({ dataPath }). A spike proved a persistent queue replays stale step jobs onto the fresh worker after a restart — re-driving launch-and-drive and double-launching a tmux session the restart left alive (the exact regression #116's out-of-scope note guards against). The branch-before-waitFor shape in the implementation workflow leaves a non-terminal step job the new worker auto-processes on construct. Only the execution store needs durability; recoverEngine rebuilds the queue from it. bunqueue couples queue+store to one dataPath via a process-singleton manager keyed by the first caller and exposes no store-only option, so claiming the singleton in-memory first (the throwaway Queue) is the lever. Verified by spikes: branchless→no replay, branch→replay, in-memory-queue→no replay + resume works.
| * needs the workflow definition) and BEFORE the poller starts (so it never fires a | ||
| * resume at an exec recover hasn't re-armed yet). | ||
| */ | ||
| export async function recoverEngine(engine: Engine): Promise<EngineRecoveryResult> { |
There was a problem hiding this comment.
Why drop running/compensating before recover(). engine.recover() is all-or-nothing: it re-enqueues running execs (re-running launch-and-drive) and re-runs compensating ones. #116 explicitly scopes running-execution recovery OUT ("a tmux session lost to a restart is the watchdog's domain"), and a daemon restart does not kill the agent's tmux sessions, so re-driving would double-launch. Dropping them preserves today's behavior exactly: the watchdog reconciles launching/running rows on its first tick. cleanup(0, …) deletes execs with updated_at < now — every pre-restart row in those states, and never waiting.
| * its signal row consumed so the poller stops watching it, and surfaced for a human. | ||
| * Returns the orphans it reconciled. | ||
| */ | ||
| export async function reconcileOrphanedSignals( |
There was a problem hiding this comment.
Why an orphan is finalized failed (not left, not cancelled). Post-restart the execution should be recoverable from the durable store; an orphan means the store never had it (a park from before persistence shipped, or a wiped queue db). The bug #116 calls out is the poller firing engine.signal at a dead execution — which throws Execution "<id>" not found every pass forever. Finalizing stops the poller (its loadPollableWaits only sees waiting-human rows), frees the slot, and makes the failure visible; failed (vs cancelled) because it genuinely failed to recover and warrants a human look. finalizeParkedWorkflow is conditional on the row still being waiting-human, so it can't clobber a concurrent resume. finalState is overridable if a reviewer prefers cancelled.
| // watchdog's domain — then re-arm parked `waiting` executions so the poller can | ||
| // resume them. Runs AFTER the registers (recover needs the definitions) and | ||
| // BEFORE the poller (so it never signals an exec recover hasn't re-armed). | ||
| const recovery = await recoverEngine(engine); |
There was a problem hiding this comment.
Boot recovery order. Runs AFTER both engine.register(...) (recover may re-enqueue/resume, which needs the workflow definitions) and AFTER the control-feed workflow observer is registered (so an orphan's failed transition broadcasts), and BEFORE startWatchdog/startPoller (so the poller never fires a resume at an exec recover hasn't re-armed). engine.recover() is mandated by the issue and uniquely re-arms the waitFor 7-day timeout timer — resume-via-signal itself already works off the durable store without it.
Reviewer's brief — #116 durable parked-execution recovery (PR #160)What it does: the dispatcher's workflow engine now has a durable execution store ( How to run itbun install
bun run typecheck # tsc --noEmit, clean
bun run lint # oxlint --deny-warnings, clean
bun test # 730 pass, 0 fail
bun test packages/dispatcher/test/recovery.test.ts # the new unit suite (9)
bun test packages/dispatcher/test/implementation-workflow.test.ts -t "durable recovery" # the 2 restart e2eWhat to verify (and what "correct" looks like)
How to review
Fragile / extra eyes
|
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (3)
packages/dispatcher/test/implementation-workflow.test.ts (2)
957-1028: ⚡ Quick winGuarantee durable-engine teardown even when an assertion fails.
These tests close the shared in-memory
enginefirst, then managee1/e2manually. If anything throws before the finalclose(true), the file-levelafterEachonly closes the already-closed shared engine, so the durable engine and its SQLite handle are left behind. Atry/finallyaround each test body, or storing the active durable engine in shared teardown, would make these restart cases much less flaky.Also applies to: 1030-1067
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/dispatcher/test/implementation-workflow.test.ts` around lines 957 - 1028, The test tears down the shared in-memory engine but can leak durable engines e1/e2 (and their SQLite handles) if an assertion throws; wrap the test body that creates e1/e2 in a try/finally (or assign the active durable Engine to the module-level engine variable) and in the finally ensure any created durable Engine (e1, e2 or whatever Engine instance is currentEngine) is closed with close(true) and shutdownManager() so the SQLite handle is always released; update both the restart-case test at lines ~957–1028 and the similar block at ~1030–1067 to follow this pattern and reference currentEngine/e1/e2 to locate where to add the finally cleanup.
1013-1016: ⚡ Quick winAssert that the post-restart continuation reuses the parked worktree.
This restart path currently proves the continuation re-drives, but not that it keeps the original checkout alive. A regression that recreates a fresh worktree after restart would still pass here while silently dropping the parked state. Please add the same
worktreePathequality check the in-process resume test already has.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/dispatcher/test/implementation-workflow.test.ts` around lines 1013 - 1016, The test verifies continuation re-drives but not that it reused the original parked worktree; capture the worktreePath before the restart (e.g. worktreePathBefore) and after the restart (e.g. worktreePathAfter) and assert they are equal — mirror the worktreePath equality assertion from the in-process resume test by obtaining the parked worktreePath for id1 (using the same helper used there) after awaitParkedOn(e2, id1) and comparing it to the original worktreePath saved earlier in the test, keeping the existing awaitContinuation/awaitParkedOn, prompts, and readPromptBrief assertions intact.packages/dispatcher/test/recovery.test.ts (1)
225-252: ⚡ Quick winAlways close the durable engines on failure paths.
Both restart tests only close
e1/e2on the happy path. If an assertion fails earlier, the suite-levelafterEachonly resets bunqueue and removes the temp dir, so the durable engine can stay alive and bleed into the next case. Wrap each test body intry/finallyor track the active engine in shared teardown.Also applies to: 254-289
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/dispatcher/test/recovery.test.ts` around lines 225 - 252, The tests create durable engines e1 and e2 (via createDurableEngine) but only close them on the happy path, so if an assertion fails the engine stays alive and leaks state; update each test to ensure e1 and e2 are always closed by wrapping the test body in try/finally (or track the active engine and close in shared teardown) and call e1.close(true) and e2.close(true) in finally; ensure shutdownManager() and recoverEngine() usage remains the same but that any created engine references are closed on all paths to prevent cross-test leakage.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@packages/dispatcher/src/recovery.ts`:
- Around line 78-90: The declared finalState on ReconcileOrphanedSignalsDeps
must be narrowed to only terminal workflow states to prevent consuming wait rows
into non-terminal states; define a new union type (e.g., TerminalWorkflowState)
containing the concrete terminal states used by the engine and change
finalState?: WorkflowState to finalState?: TerminalWorkflowState on
ReconcileOrphanedSignalsDeps, and add a runtime check where the deps object is
constructed/used to assert/throw if a non-terminal value is passed (so misuses
fail fast).
- Around line 25-30: createDurableEngine currently creates a transient
Queue("__mm:engine-queue", { embedded: true }) but relies only on docs to ensure
BUNQUEUE_DATA_PATH/BQ_DATA_PATH/DATA_PATH/SQLITE_PATH are unset; add a runtime
guard at the start of createDurableEngine that checks process.env for those four
vars and throws a clear Error if any are set so the embedded Queue cannot be
made persistent via parent-process env injection, then proceed to instantiate
new Queue("__mm:engine-queue", { embedded: true }) and return new Engine({
embedded: true, dataPath }) as before.
---
Nitpick comments:
In `@packages/dispatcher/test/implementation-workflow.test.ts`:
- Around line 957-1028: The test tears down the shared in-memory engine but can
leak durable engines e1/e2 (and their SQLite handles) if an assertion throws;
wrap the test body that creates e1/e2 in a try/finally (or assign the active
durable Engine to the module-level engine variable) and in the finally ensure
any created durable Engine (e1, e2 or whatever Engine instance is currentEngine)
is closed with close(true) and shutdownManager() so the SQLite handle is always
released; update both the restart-case test at lines ~957–1028 and the similar
block at ~1030–1067 to follow this pattern and reference currentEngine/e1/e2 to
locate where to add the finally cleanup.
- Around line 1013-1016: The test verifies continuation re-drives but not that
it reused the original parked worktree; capture the worktreePath before the
restart (e.g. worktreePathBefore) and after the restart (e.g. worktreePathAfter)
and assert they are equal — mirror the worktreePath equality assertion from the
in-process resume test by obtaining the parked worktreePath for id1 (using the
same helper used there) after awaitParkedOn(e2, id1) and comparing it to the
original worktreePath saved earlier in the test, keeping the existing
awaitContinuation/awaitParkedOn, prompts, and readPromptBrief assertions intact.
In `@packages/dispatcher/test/recovery.test.ts`:
- Around line 225-252: The tests create durable engines e1 and e2 (via
createDurableEngine) but only close them on the happy path, so if an assertion
fails the engine stays alive and leaks state; update each test to ensure e1 and
e2 are always closed by wrapping the test body in try/finally (or track the
active engine and close in shared teardown) and call e1.close(true) and
e2.close(true) in finally; ensure shutdownManager() and recoverEngine() usage
remains the same but that any created engine references are closed on all paths
to prevent cross-test leakage.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro Plus
Run ID: f0b8b13e-5548-4547-8162-874ddca5ab9a
📒 Files selected for processing (8)
packages/dispatcher/CLAUDE.mdpackages/dispatcher/src/index.tspackages/dispatcher/src/main.tspackages/dispatcher/src/recovery.tspackages/dispatcher/test/implementation-workflow.test.tspackages/dispatcher/test/recovery.test.tsplanning/issues/116/decisions.mdplanning/issues/116/plan.md
Two robustness fixes from the PR #160 review: - createDurableEngine now throws if any of BUNQUEUE_DATA_PATH / BQ_DATA_PATH / DATA_PATH / SQLITE_PATH is set. bunqueue's getDataPath() coalesces exactly those four with `??`, so a parent-process env injection would otherwise make the throwaway in-memory Queue persistent — replaying stale step jobs onto the fresh worker after a restart and double-launching a session. The guard runs before claiming the singleton and matches bunqueue's nullish semantics ("" is a set value, not a fallback). - Narrow finalizeParkedWorkflow's finalState and ReconcileOrphanedSignalsDeps's finalState from WorkflowState to a new TerminalWorkflowState (a strict subset: completed | compensated | failed | cancelled). Finalizing a parked workflow to a non-terminal state would consume its wait row yet strand it with no recovery path; the narrowing makes that a compile error at every call site. TERMINAL_STATES now `satisfies readonly TerminalWorkflowState[]` to keep the two in sync.
Review round 1 — addressed (3cc2004, 9f244d7)Actionable (2): replied in-thread.
Nitpicks (3): all addressed.
Verification: |
Give the workflow engine a durable execution store (a SQLite db alongside db.sqlite3) so a parked `waiting` execution survives a daemon restart, while keeping the step queue in-memory — a persistent queue replays stale step jobs onto the fresh worker and re-drives launch-and-drive (double session). The `createDurableEngine` factory claims bunqueue's process-singleton queue manager as in-memory before constructing the Engine; `recoverEngine` drops mid-drive (running/compensating) executions (the watchdog's domain) then re-arms parked `waiting` ones, and `reconcileOrphanedSignals` finalizes any waiting-human row whose execution the store no longer has so the poller stops firing at a dead execution.
…onciliation (#116) Restart is simulated by engine.close(true) + shutdownManager() (resetting bunqueue's process-singleton manager) before the second createDurableEngine on the same dataPath — modelling a real separate-process boot. Covers: a real implementation workflow parked on .waitFor(RESUME_EVENT) surviving a restart and a review verdict resuming it (no re-drive); recoverEngine re-arming a parked waiting exec and dropping a mid-drive running one; and orphaned-signal reconciliation (orphan finalized + consumed + surfaced; alive parks untouched).
Replace the stale "engine is in-memory; don't add recover()" note with the durable-store / transient-queue reality, the boot recovery order, and why the queue must not persist. Add the recovery surface to the module front door.
Two robustness fixes from the PR #160 review: - createDurableEngine now throws if any of BUNQUEUE_DATA_PATH / BQ_DATA_PATH / DATA_PATH / SQLITE_PATH is set. bunqueue's getDataPath() coalesces exactly those four with `??`, so a parent-process env injection would otherwise make the throwaway in-memory Queue persistent — replaying stale step jobs onto the fresh worker after a restart and double-launching a session. The guard runs before claiming the singleton and matches bunqueue's nullish semantics ("" is a set value, not a fallback). - Narrow finalizeParkedWorkflow's finalState and ReconcileOrphanedSignalsDeps's finalState from WorkflowState to a new TerminalWorkflowState (a strict subset: completed | compensated | failed | cancelled). Finalizing a parked workflow to a non-terminal state would consume its wait row yet strand it with no recovery path; the narrowing makes that a compile error at every call site. TERMINAL_STATES now `satisfies readonly TerminalWorkflowState[]` to keep the two in sync.
… all paths - Assert createDurableEngine throws for each persistent-queue env var (and an empty-string value, matching bunqueue's `??`), naming every offending var. - Add a compile-time @ts-expect-error guard that finalState rejects non-terminal states (enforced by the typecheck gate). - Track every durable engine the restart suites open and close them idempotently in afterEach, so a mid-test assertion failure can't leak a durable engine + its SQLite handle or bleed the bunqueue singleton into the next case. Centralizes the post-restart cleanup (shutdownManager + fresh in-memory engine) into teardown. - After a restart-driven continuation, assert it inherits the parked worktree path (mirrors the in-process resume test).
9f244d7 to
f4c8c4d
Compare
Two robustness fixes from the PR #160 review: - createDurableEngine now throws if any of BUNQUEUE_DATA_PATH / BQ_DATA_PATH / DATA_PATH / SQLITE_PATH is set. bunqueue's getDataPath() coalesces exactly those four with `??`, so a parent-process env injection would otherwise make the throwaway in-memory Queue persistent — replaying stale step jobs onto the fresh worker after a restart and double-launching a session. The guard runs before claiming the singleton and matches bunqueue's nullish semantics ("" is a set value, not a fallback). - Narrow finalizeParkedWorkflow's finalState and ReconcileOrphanedSignalsDeps's finalState from WorkflowState to a new TerminalWorkflowState (a strict subset: completed | compensated | failed | cancelled). Finalizing a parked workflow to a non-terminal state would consume its wait row yet strand it with no recovery path; the narrowing makes that a compile error at every call site. TERMINAL_STATES now `satisfies readonly TerminalWorkflowState[]` to keep the two in sync.
Single-pass new-work-as-base merge of origin/main after rebase kept re-conflicting on the same hunks across multiple commits (CLAUDE.md escape hatch). - packages/dispatcher/src/poller-cron.ts — unified `startPoller(deps, opts)` signature; folded `ReconcilerHooks` into `StartPollerOptions` as `opts.reconcilers` (alongside `opts.checkboxRevert` and `opts.intervalMs`). - packages/dispatcher/src/main.ts — unified daemon-startup: keeps the durable engine + `recoverEngine` + `reconcileOrphanedSignals` from #160, the notification-failsafe watchdog comment from #162, and adds the `reconcileOpenPRsForRepo` block + `reconcilers` config in the `startPoller` call. Dropped the now-unused `Engine` import (main routes through `createDurableEngine`). - packages/core/src/index.ts — kept both export blocks: integration rubric from #163, `selectAdapter` from this PR. - packages/dispatcher/test/recommender-run.test.ts — kept both describe blocks (adapter-enabled gate from this PR, schema-resolution from #157); added `enabled: true` to the schema test's adapter config so it passes the new gate. - packages/dispatcher/test/gates/checkbox-revert-pass.test.ts — added the five new `GitHubGateway` methods to the test stub (`listOpenIssues`, `addLabel`, `listMergedPrsClosingRefs`, `closeIssue`, `createIssue`) main grew during the marathon. Gates re-verified locally: `bun run typecheck` clean, `bun test packages/dispatcher` 620/620 pass, `bun run lint` clean, `bun run format` clean (no changes).
Summary
Closes #116
The daemon's workflow engine was in-memory (
new Engine({ embedded: true })), so a daemon restart lost every parkedwaitingexecution while the durablewaitfor_signalsrows survived — after a restart the poller would fire a resume signal at an execution that no longer existed. This gives the engine a durable execution store (a SQLite db alongsidedb.sqlite3) with a transient in-memory step queue, callsrecover()on boot, and reconciles orphaned signals so nothing is left silently stuck.What changed
packages/dispatcher/src/recovery.ts(new) —createDurableEngine(persistent store + in-memory queue),recoverEngine(drop mid-drive execs →recover()),reconcileOrphanedSignals.packages/dispatcher/src/main.ts— build the engine viacreateDurableEngine; runrecoverEngine+reconcileOrphanedSignalson boot (after the workflow registers, before the poller), surfacing orphans on the Epic.packages/dispatcher/src/index.ts— export the recovery surface.packages/dispatcher/CLAUDE.md— replace the stale "in-memory; don't addrecover()" note with the durable-store / transient-queue reality + boot order.Why these changes
Only the execution store needs durability; the step queue must stay transient. A persistent queue replays stale step jobs onto the fresh worker after a restart, re-driving
launch-and-driveand double-launching a tmux session the restart left alive (the regression #116's out-of-scope note guards against). bunqueue couples queue+store to onedataPathvia a process-singleton manager and exposes no store-only option, socreateDurableEngineclaims that singleton as in-memory (a throwawayQueue) before constructing the Engine.recoverEnginefirst dropsrunning/compensatingexecutions (re-driving those is the watchdog's domain, explicitly out of scope) then re-arms parkedwaitingones. Rationale is inlined as PR review comments and lives inplanning/issues/116/decisions.md.Status
createDurableEngine,recoverEngine)Acceptance criteria
dataPathinstead of the in-memoryembeddedstore —main.tsengine construction viacreateDurableEngine.engine.recover()so parkedwaitingexecutions are re-armed —recoverEngine, wired atmain.ts:602..waitFor(RESUME_EVENT)leaves it resumable; a review verdict resumes it — e2e testimplementation-workflow.test.ts("survives a restart; a review verdict resumes it").waitfor_signalsrow with no recoverable execution is detected and surfaced (not silently stuck) —reconcileOrphanedSignals, wired atmain.ts:612.recovery.test.ts(9 tests) + the restart e2e + orphan-after-store-loss e2e inimplementation-workflow.test.ts.bun test(730 pass) andbun run typecheckgreen — root scriptspackage.json; see Verification below.Verification
bun test— 730 pass, 0 fail across 81 files (includes 9 newrecovery.test.ts+ 2 new restart e2e tests).bun run typecheck— clean (tsc --noEmit).bun run lint(oxlint --deny-warnings) +bun run format— clean.engine.signalon a missing id throwsExecution "<id>" not found(the orphan symptom).Stumbling points
new Engine({ dataPath })) is a trap: bunqueue persists the queue and the store off onedataPath, and the persistent queue replays stale step jobs on the fresh worker — silently re-drivinglaunch-and-drive. The first restart e2e caught it (an extra"initial"drive before any signal). The fix is the transient-queue/persistent-store split via the process-singleton claim.dataPath, so a single test process can't model a restart withoutshutdownManager()between engines.Suggested CLAUDE.md updates
Applied in this PR (
packages/dispatcher/CLAUDE.md): the durable-store/transient-queue rule, the boot recovery order, and the singleton/shutdownManager()test note.Known edges (out of scope)
running(mid-drive) executions — the watchdog's domain; this PR deliberately preserves that by dropping them beforerecover().recover()fires thewaitFortimeout and bunqueue fails the workflow (with compensation/worktree cleanup). Pre-existing bunqueue behavior for anywaitFortimeout; extreme and benign.Follow-up issues
None — no parallelizable discovery surfaced.
Summary by CodeRabbit
New Features
Behavior Change
Tests
Documentation