Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion packages/dispatcher/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,13 @@ export async function runDaemon(opts: RunDaemonOptions = {}): Promise<void> {
db,
getAdapter,
sessionGate: deps.sessionGate,
tmux: { newSession, sendText, sendEnter, killSession },
// `status` is wired so the spawn-recommender-agent step races the Stop
// hook against tmux liveness — a session killed out from under the drive
// (watchdog, force-close, manual `tmux kill-session`) fails the step
// immediately instead of blocking on the Stop wait for the full agent
// timeout. Mirrors how `buildImplementationDeps` wires the implementation
// drive's liveness probe.
tmux: { newSession, sendText, sendEnter, killSession, status },
worktree: { createWorktree, destroyWorktree },
resolveRepoPath: (repo) => {
const path = repoPaths.get(repo);
Expand Down
7 changes: 5 additions & 2 deletions packages/dispatcher/src/recommender-run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { ghGitHub } from "./github.ts";
import type { EpicGateway } from "./github.ts";
import { ghStateIssueGateway } from "./state-issue.ts";
import type { StateGateway } from "./state-issue.ts";
import { killSession, newSession, sendEnter, sendText } from "./tmux.ts";
import { killSession, newSession, sendEnter, sendText, status } from "./tmux.ts";
import {
buildRecommenderContext,
createRecommenderWorkflow,
Expand Down Expand Up @@ -285,7 +285,10 @@ export async function dispatchRecommender(
db,
getAdapter: opts.getAdapter,
sessionGate,
tmux: ov.tmux ?? { newSession, sendText, sendEnter, killSession },
// `status` lets the spawn step race the Stop hook against tmux liveness
// (mirrors the daemon's wiring) — a session killed mid-run fails fast
// rather than blocking for the full agent timeout.
tmux: ov.tmux ?? { newSession, sendText, sendEnter, killSession, status },
worktree: ov.worktree ?? { createWorktree, destroyWorktree },
resolveRepoPath: () => opts.repoPath,
worktreeRoot: opts.worktreeRoot,
Expand Down
35 changes: 31 additions & 4 deletions packages/dispatcher/src/workflows/recommender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import {
updateWorkflow,
} from "../workflow-record.ts";
import type { CreateWorktreeOpts, WorktreeHandle } from "../worktree.ts";
import { awaitStopOrSessionEnd } from "./implementation.ts";
import type { TmuxOps, WorktreeOps } from "./implementation.ts";

/** A recommender run: rewrite one repo's state issue with a ranked dispatch plan. */
Expand Down Expand Up @@ -144,6 +145,13 @@ export type RecommenderDeps = {
launchTimeoutMs?: number;
/** Hard cap on the agent run — the spec's 5-minute ceiling. */
agentTimeoutMs?: number;
/**
* Cadence for the spawn step's session-liveness probe (the
* `awaitStopOrSessionEnd` race). Defaults to 5s in production. Exposed so
* tests can drive it tighter; should be ≪ `agentTimeoutMs` so a session that
* dies mid-run fails the step in seconds rather than minutes.
*/
livenessPollMs?: number;
/**
* Surface a malformed produced body to a human (the verify step's failure
* path). Optional + injectable so tests need no `gh`.
Expand Down Expand Up @@ -570,10 +578,29 @@ export function createRecommenderWorkflow(deps: RecommenderDeps): Workflow<Recom
await deps.tmux.sendText(sessionName, promptText);
await deps.tmux.sendEnter(sessionName);

// The recommender is a one-shot: drive one turn, observe the Stop. The
// step's own `timeout` (5 min) is the hard cap; this Stop-await is bounded
// just under it so it surfaces a specific error rather than the step timeout.
await deps.sessionGate.awaitStop(sessionName, agentTimeout);
// The recommender is a one-shot: drive one turn, observe the Stop.
// Liveness-aware (mirrors the implementation drive): race the Stop hook
// against tmux session-death so a session killed out from under us (a
// watchdog kill, a daemon force-close mid-run, a tmux crash) fails the
// step *immediately* with a specific reason instead of blocking on
// `awaitStop` for the full `agentTimeout` (a 15-min stall the bunqueue
// force-close would otherwise terminate as a generic compensation). When
// `tmux.status` is unwired (tests / standalone runner), it degrades to
// Stop-or-timeout — same behavior as before.
const wait = await awaitStopOrSessionEnd({
awaitStop: (ms) => deps.sessionGate.awaitStop(sessionName, ms),
timeoutMs: agentTimeout,
isAlive: deps.tmux.status
? async () => (await deps.tmux.status!(sessionName)).alive
: undefined,
pollMs: deps.livenessPollMs,
});
if (wait.via === "session-ended") {
throw new Error("recommender session ended before Stop hook");
}
if (wait.via === "timeout") {
throw new Error(`recommender Stop hook timed out after ${agentTimeout}ms`);
}
// END SESSION — the turn is over; free the dedicated slot.
await deps.tmux.killSession(sessionName);
} catch (error) {
Expand Down
61 changes: 60 additions & 1 deletion packages/dispatcher/test/recommender-workflow.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,19 @@ function makeHarness(opts?: {
autoDispatch?: boolean;
wireTrigger?: boolean;
failSession?: boolean;
/**
* Session-liveness test seam (#229-followup): when set, `tmux.status` reports
* the session alive until this many ms have elapsed (since the test started),
* then `alive: false`. Combined with a `neverStopping` gate this drives the
* spawn step's `awaitStopOrSessionEnd` race onto the `session-ended` branch.
*/
sessionDiesAfterMs?: number;
/**
* Make the gate's `awaitStop` never resolve (until its own internal timeout) so
* the only way the spawn step finishes is via the session-liveness race. Used
* with `sessionDiesAfterMs` to exercise the session-ended failure path.
*/
neverStopping?: boolean;
}) {
const trace: string[] = [];
const created: string[] = [];
Expand All @@ -142,6 +155,7 @@ function makeHarness(opts?: {
const bodies = opts?.bodies ?? [validBody(), validBody()];
let readCount = 0;
let written: string | null = null;
const harnessStart = Date.now();

const tmux = {
async newSession(o: { sessionName: string }) {
Expand All @@ -155,6 +169,17 @@ function makeHarness(opts?: {
async killSession(sessionName: string) {
killed.push(sessionName);
},
// Wired only when the test cares about session-liveness; otherwise omitted
// (so the spawn step's `awaitStopOrSessionEnd` degrades to Stop-or-timeout,
// matching the prior behavior the existing tests assert).
...(opts?.sessionDiesAfterMs !== undefined
? {
async status(_sessionName: string) {
const elapsed = Date.now() - harnessStart;
return { alive: elapsed < opts.sessionDiesAfterMs!, paneCount: 1 };
},
}
: {}),
};

const adapter: AgentAdapter = {
Expand All @@ -179,7 +204,14 @@ function makeHarness(opts?: {
if (opts?.failSession) throw new Error("launch timeout");
return { session_id: "s", transcript_path: "/tmp/s.jsonl" } as HookPayload;
},
awaitStop: async () => ({ reason: "turn-end" }) as HookPayload,
awaitStop: opts?.neverStopping
? // Promise that resolves only on the internal timeout — the test must end
// via the session-liveness race, not via the Stop hook.
(_n: string, ms: number) =>
new Promise<HookPayload>((_resolve, reject) => {
setTimeout(() => reject(new Error(`timed out waiting for stop:${_n}`)), ms);
})
: async () => ({ reason: "turn-end" }) as HookPayload,
};

const deps: RecommenderDeps = {
Expand Down Expand Up @@ -232,6 +264,9 @@ function makeHarness(opts?: {
},
launchTimeoutMs: 2000,
agentTimeoutMs: 2000,
// Tight poll so the session-liveness race fires within the test budget;
// production defaults to 5s (matches the implementation drive).
livenessPollMs: 25,
surfaceProblem: async (o) => {
surfaced.push(o.problem);
},
Expand Down Expand Up @@ -379,6 +414,30 @@ describe("recommender workflow — #43 shell: step order + dedicated slot", () =
expect(await listWorktrees({ repoPath, worktreeRoot })).toEqual([]);
for (const s of new Set(h.created)) expect(h.killed).toContain(s);
});

test("a tmux session killed mid-run fails the spawn step fast (#229-followup)", async () => {
// Regression: the spawn step's `awaitStop` used to be the bare gate call.
// If the tmux session was killed out from under the drive (a watchdog kill,
// a daemon SIGTERM with `engine.close(true)` force-closing, a tmux crash),
// the Stop hook never arrived and the step blocked on `awaitStop` for the
// full `agentTimeout` (up to 30 minutes). With the liveness race wired the
// step fails immediately with a *specific* error — "session ended before
// Stop hook" — and `prepare-shallow-worktree`'s compensation tears the run
// down to a clean terminal state. Verifies: the harness pretends the
// session dies 80ms into the run while the Stop hook never fires, and the
// whole workflow settles inside the runToEnd budget (much less than the
// 2-second `agentTimeoutMs` the harness sets — proof the race won, not the
// Stop timeout).
const start = Date.now();
const h = makeHarness({ neverStopping: true, sessionDiesAfterMs: 80 });
const id = await runToEnd(h.deps, /* timeoutMs */ 1500);
const elapsed = Date.now() - start;

expect(getWorkflow(db, id)!.state).toBe("compensated");
expect(elapsed).toBeLessThan(1500); // race won before the Stop timeout
expect(await listWorktrees({ repoPath, worktreeRoot })).toEqual([]);
for (const s of new Set(h.created)) expect(h.killed).toContain(s);
});
});

describe("recommender workflow — #44 build-prompt: every required input, verbatim", () => {
Expand Down
Loading