diff --git a/packages/dispatcher/src/main.ts b/packages/dispatcher/src/main.ts index 981b77c..f736cc5 100644 --- a/packages/dispatcher/src/main.ts +++ b/packages/dispatcher/src/main.ts @@ -655,7 +655,13 @@ export async function runDaemon(opts: RunDaemonOptions = {}): Promise { db, getAdapter, sessionGate: deps.sessionGate, - tmux: { newSession, sendText, sendEnter, killSession }, + // `status` is wired so the spawn-recommender-agent step races the Stop + // hook against tmux liveness — a session killed out from under the drive + // (watchdog, force-close, manual `tmux kill-session`) fails the step + // immediately instead of blocking on the Stop wait for the full agent + // timeout. Mirrors how `buildImplementationDeps` wires the implementation + // drive's liveness probe. + tmux: { newSession, sendText, sendEnter, killSession, status }, worktree: { createWorktree, destroyWorktree }, resolveRepoPath: (repo) => { const path = repoPaths.get(repo); diff --git a/packages/dispatcher/src/recommender-run.ts b/packages/dispatcher/src/recommender-run.ts index 1684e56..b7e8095 100644 --- a/packages/dispatcher/src/recommender-run.ts +++ b/packages/dispatcher/src/recommender-run.ts @@ -13,7 +13,7 @@ import { ghGitHub } from "./github.ts"; import type { EpicGateway } from "./github.ts"; import { ghStateIssueGateway } from "./state-issue.ts"; import type { StateGateway } from "./state-issue.ts"; -import { killSession, newSession, sendEnter, sendText } from "./tmux.ts"; +import { killSession, newSession, sendEnter, sendText, status } from "./tmux.ts"; import { buildRecommenderContext, createRecommenderWorkflow, @@ -285,7 +285,10 @@ export async function dispatchRecommender( db, getAdapter: opts.getAdapter, sessionGate, - tmux: ov.tmux ?? { newSession, sendText, sendEnter, killSession }, + // `status` lets the spawn step race the Stop hook against tmux liveness + // (mirrors the daemon's wiring) — a session killed mid-run fails fast + // rather than blocking for the full agent timeout. + tmux: ov.tmux ?? { newSession, sendText, sendEnter, killSession, status }, worktree: ov.worktree ?? { createWorktree, destroyWorktree }, resolveRepoPath: () => opts.repoPath, worktreeRoot: opts.worktreeRoot, diff --git a/packages/dispatcher/src/workflows/recommender.ts b/packages/dispatcher/src/workflows/recommender.ts index d6b2d22..5e650ff 100644 --- a/packages/dispatcher/src/workflows/recommender.ts +++ b/packages/dispatcher/src/workflows/recommender.ts @@ -21,6 +21,7 @@ import { updateWorkflow, } from "../workflow-record.ts"; import type { CreateWorktreeOpts, WorktreeHandle } from "../worktree.ts"; +import { awaitStopOrSessionEnd } from "./implementation.ts"; import type { TmuxOps, WorktreeOps } from "./implementation.ts"; /** A recommender run: rewrite one repo's state issue with a ranked dispatch plan. */ @@ -144,6 +145,13 @@ export type RecommenderDeps = { launchTimeoutMs?: number; /** Hard cap on the agent run — the spec's 5-minute ceiling. */ agentTimeoutMs?: number; + /** + * Cadence for the spawn step's session-liveness probe (the + * `awaitStopOrSessionEnd` race). Defaults to 5s in production. Exposed so + * tests can drive it tighter; should be ≪ `agentTimeoutMs` so a session that + * dies mid-run fails the step in seconds rather than minutes. + */ + livenessPollMs?: number; /** * Surface a malformed produced body to a human (the verify step's failure * path). Optional + injectable so tests need no `gh`. @@ -570,10 +578,29 @@ export function createRecommenderWorkflow(deps: RecommenderDeps): Workflow deps.sessionGate.awaitStop(sessionName, ms), + timeoutMs: agentTimeout, + isAlive: deps.tmux.status + ? async () => (await deps.tmux.status!(sessionName)).alive + : undefined, + pollMs: deps.livenessPollMs, + }); + if (wait.via === "session-ended") { + throw new Error("recommender session ended before Stop hook"); + } + if (wait.via === "timeout") { + throw new Error(`recommender Stop hook timed out after ${agentTimeout}ms`); + } // END SESSION — the turn is over; free the dedicated slot. await deps.tmux.killSession(sessionName); } catch (error) { diff --git a/packages/dispatcher/test/recommender-workflow.test.ts b/packages/dispatcher/test/recommender-workflow.test.ts index 0f2d25a..c0ccd3d 100644 --- a/packages/dispatcher/test/recommender-workflow.test.ts +++ b/packages/dispatcher/test/recommender-workflow.test.ts @@ -132,6 +132,19 @@ function makeHarness(opts?: { autoDispatch?: boolean; wireTrigger?: boolean; failSession?: boolean; + /** + * Session-liveness test seam (#229-followup): when set, `tmux.status` reports + * the session alive until this many ms have elapsed (since the test started), + * then `alive: false`. Combined with a `neverStopping` gate this drives the + * spawn step's `awaitStopOrSessionEnd` race onto the `session-ended` branch. + */ + sessionDiesAfterMs?: number; + /** + * Make the gate's `awaitStop` never resolve (until its own internal timeout) so + * the only way the spawn step finishes is via the session-liveness race. Used + * with `sessionDiesAfterMs` to exercise the session-ended failure path. + */ + neverStopping?: boolean; }) { const trace: string[] = []; const created: string[] = []; @@ -142,6 +155,7 @@ function makeHarness(opts?: { const bodies = opts?.bodies ?? [validBody(), validBody()]; let readCount = 0; let written: string | null = null; + const harnessStart = Date.now(); const tmux = { async newSession(o: { sessionName: string }) { @@ -155,6 +169,17 @@ function makeHarness(opts?: { async killSession(sessionName: string) { killed.push(sessionName); }, + // Wired only when the test cares about session-liveness; otherwise omitted + // (so the spawn step's `awaitStopOrSessionEnd` degrades to Stop-or-timeout, + // matching the prior behavior the existing tests assert). + ...(opts?.sessionDiesAfterMs !== undefined + ? { + async status(_sessionName: string) { + const elapsed = Date.now() - harnessStart; + return { alive: elapsed < opts.sessionDiesAfterMs!, paneCount: 1 }; + }, + } + : {}), }; const adapter: AgentAdapter = { @@ -179,7 +204,14 @@ function makeHarness(opts?: { if (opts?.failSession) throw new Error("launch timeout"); return { session_id: "s", transcript_path: "/tmp/s.jsonl" } as HookPayload; }, - awaitStop: async () => ({ reason: "turn-end" }) as HookPayload, + awaitStop: opts?.neverStopping + ? // Promise that resolves only on the internal timeout — the test must end + // via the session-liveness race, not via the Stop hook. + (_n: string, ms: number) => + new Promise((_resolve, reject) => { + setTimeout(() => reject(new Error(`timed out waiting for stop:${_n}`)), ms); + }) + : async () => ({ reason: "turn-end" }) as HookPayload, }; const deps: RecommenderDeps = { @@ -232,6 +264,9 @@ function makeHarness(opts?: { }, launchTimeoutMs: 2000, agentTimeoutMs: 2000, + // Tight poll so the session-liveness race fires within the test budget; + // production defaults to 5s (matches the implementation drive). + livenessPollMs: 25, surfaceProblem: async (o) => { surfaced.push(o.problem); }, @@ -379,6 +414,30 @@ describe("recommender workflow — #43 shell: step order + dedicated slot", () = expect(await listWorktrees({ repoPath, worktreeRoot })).toEqual([]); for (const s of new Set(h.created)) expect(h.killed).toContain(s); }); + + test("a tmux session killed mid-run fails the spawn step fast (#229-followup)", async () => { + // Regression: the spawn step's `awaitStop` used to be the bare gate call. + // If the tmux session was killed out from under the drive (a watchdog kill, + // a daemon SIGTERM with `engine.close(true)` force-closing, a tmux crash), + // the Stop hook never arrived and the step blocked on `awaitStop` for the + // full `agentTimeout` (up to 30 minutes). With the liveness race wired the + // step fails immediately with a *specific* error — "session ended before + // Stop hook" — and `prepare-shallow-worktree`'s compensation tears the run + // down to a clean terminal state. Verifies: the harness pretends the + // session dies 80ms into the run while the Stop hook never fires, and the + // whole workflow settles inside the runToEnd budget (much less than the + // 2-second `agentTimeoutMs` the harness sets — proof the race won, not the + // Stop timeout). + const start = Date.now(); + const h = makeHarness({ neverStopping: true, sessionDiesAfterMs: 80 }); + const id = await runToEnd(h.deps, /* timeoutMs */ 1500); + const elapsed = Date.now() - start; + + expect(getWorkflow(db, id)!.state).toBe("compensated"); + expect(elapsed).toBeLessThan(1500); // race won before the Stop timeout + expect(await listWorktrees({ repoPath, worktreeRoot })).toEqual([]); + for (const s of new Set(h.created)) expect(h.killed).toContain(s); + }); }); describe("recommender workflow — #44 build-prompt: every required input, verbatim", () => {