Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 52 additions & 45 deletions packages/dispatcher/src/workflows/implementation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import {
armWaitForSignal,
consumeWaitForSignal,
createWorkflowRecord,
isWaitForArmed,
updateWorkflow,
type WorkflowState,
} from "../workflow-record.ts";
Expand Down Expand Up @@ -1023,55 +1022,63 @@ export function createImplementationWorkflow(
async function parkForResume(ctx: StepContext<ImplementationInput>): Promise<void> {
const { outcome } = ctx.steps["launch-and-drive"] as DriveResult;
const reason = reasonFor(outcome.kind);
// Idempotent arm: the watchdog's sentinel re-arm (`blocked:<id>`) may have
// already armed a wait while the agent hung before this park. Don't add a
// second row (PK is signal_name, so they wouldn't collide) — the poller
// would then see two pollable waits for one workflow. Keep the existing one;
// its earlier `created_at` also avoids filtering out a reply made during the
// hang. Both names map to the same resume reason via `reasonFromSignalName`.
if (!isWaitForArmed(deps.db, ctx.executionId)) {
armWaitForSignal(
deps.db,
signalNameFor(ctx.input.epicRef, reason),
ctx.executionId,
JSON.stringify({ reason }),
);
}
// Always arm the signal for the ACTUAL park reason — even if the watchdog's
// sentinel fallback (`blocked:<id>`) is already armed from a stale
// `.middle/blocked.json` written during an earlier phase. The two names map
// to DIFFERENT resume reasons (`blocked:<id>` → `answered-question`,
// `epic-N-review-resolved` → `review-changes`; see
// `reasonFromSignalName`), so leaving review-resolved unarmed when the
// agent's actual stop was `done` orphans the workflow: CR's review event
// would never wake it because no review-resolved signal is armed. (Real
// incident — PR #230 / Epic #208 sat parked ~11h with only `blocked:` armed
// after CR posted CHANGES_REQUESTED.)
//
// `armWaitForSignal` is `INSERT OR IGNORE` keyed on `signal_name`, so
// re-arming the same name is a no-op; arming a *different* name leaves both
// wake paths live, which is correct when the workflow is genuinely waiting
// on either a human answer or a CR re-review.
armWaitForSignal(
deps.db,
signalNameFor(ctx.input.epicRef, reason),
ctx.executionId,
JSON.stringify({ reason }),
);
updateWorkflow(deps.db, ctx.executionId, { state: "waiting-human" });
if (outcome.kind === "asked-question") {
if (deps.postQuestion) {
// The sentinel's `kind` distinguishes a complexity pause (surfaced under the
// `complexity pause` state-issue label) from a plain question.
const kind = outcome.sentinel?.kind === "complexity" ? "complexity" : "question";
try {
await deps.postQuestion({
repo: ctx.input.repo,
epicRef: ctx.input.epicRef,
question: outcome.sentinel?.question ?? "(question text unavailable)",
context: outcome.sentinel?.context,
kind,
});
} catch (error) {
// Visibility is best-effort — the wait is already armed and durable, so
// a failed comment must not abort the park.
console.error(`[workflow] postQuestion failed: ${(error as Error).message}`);
}
}
// Consume the sentinel (#205): remove `<worktree>/.middle/blocked.json` once
// the question has been surfaced, so a re-dispatch / the watchdog's rule-4
// re-arm pass doesn't treat the stale file as fresh and re-post on the next
// cron tick. Anchored on the worktree handle (the stable home of the
// workstream's sentinels), not the adapter-reported `sentinelPath`. The
// `waitFor` is already durably armed, so removing the file can't strand the
// resume — and unconditional removal (even if the post above threw) is what
// stops the next tick re-posting "(question text unavailable)".
const { handle } = ctx.steps["prepare-worktree"] as PrepareResult;
if (outcome.kind === "asked-question" && deps.postQuestion) {
// The sentinel's `kind` distinguishes a complexity pause (surfaced under the
// `complexity pause` state-issue label) from a plain question.
const kind = outcome.sentinel?.kind === "complexity" ? "complexity" : "question";
try {
rmSync(join(handle.path, ".middle", "blocked.json"), { force: true });
await deps.postQuestion({
repo: ctx.input.repo,
epicRef: ctx.input.epicRef,
question: outcome.sentinel?.question ?? "(question text unavailable)",
context: outcome.sentinel?.context,
kind,
});
} catch (error) {
console.error(`[workflow] sentinel cleanup failed: ${(error as Error).message}`);
// Visibility is best-effort — the wait is already armed and durable, so
// a failed comment must not abort the park.
console.error(`[workflow] postQuestion failed: ${(error as Error).message}`);
}
}
// Consume the sentinel (#205, extended for the dual-signal bug): remove
// `<worktree>/.middle/blocked.json` on EVERY park, not just `asked-question`.
// On a `done`/review-changes park, a stale sentinel left from an earlier
// phase would still cause the watchdog's rule-4 pass to re-arm `blocked:<id>`
// on the next tick, racing the legitimate `epic-N-review-resolved` arm and
// re-introducing the orphaned-resume class this fix closes. Anchored on
// the worktree handle (the stable home of the workstream's sentinels), not
// the adapter-reported `sentinelPath`. The `waitFor` is already durably
// armed, so removing the file can't strand the resume; unconditional
// removal (even if `postQuestion` threw above) is also what stops the next
// tick re-posting "(question text unavailable)".
const { handle } = ctx.steps["prepare-worktree"] as PrepareResult;
try {
rmSync(join(handle.path, ".middle", "blocked.json"), { force: true });
} catch (error) {
console.error(`[workflow] sentinel cleanup failed: ${(error as Error).message}`);
}
}

/**
Expand Down
73 changes: 67 additions & 6 deletions packages/dispatcher/test/implementation-workflow.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,21 @@ describe("implementation workflow — blocked sentinel self-heal", () => {
expectNoSessionLeak(tmux);
});

test("parkForResume keeps a pre-armed blocked signal (no duplicate)", async () => {
test("parkForResume arms the actual-reason signal alongside a pre-armed blocked signal", async () => {
// Pre-fix behavior was: skip arming when ANY signal exists ("no duplicate"
// intent), keeping only `blocked:<id>`. That intent assumed both names
// mapped to the same resume reason, but they don't — `blocked:<id>` →
// `answered-question`, `epic-N-review-resolved` → `review-changes`. So when
// the agent's actual stop was `done` (review-changes) and the watchdog had
// pre-armed `blocked:`, the review-resolved signal never got armed and a
// CR event could not wake the workflow. (PR #230 / Epic #208 sat parked
// ~11h.) Post-fix: always arm the actual-reason signal; `armWaitForSignal`
// is `INSERT OR IGNORE` keyed on `signal_name` so same-name re-arms remain
// no-ops while different-name arms leave both wake paths live.
//
// This scenario covers the asked-question side of the same class: both
// signals end up armed; they happen to map to the same wake path here, so
// either firing is equivalent — but having both is correct.
const tmux = makeTmuxStub();
const deps = makeDeps({
tmux: { ...tmux.ops, status: async () => ({ alive: false }) },
Expand All @@ -569,11 +583,13 @@ describe("implementation workflow — blocked sentinel self-heal", () => {
const id = await start(deps);
await awaitParked(id);

const rows = db
.query("SELECT signal_name FROM waitfor_signals WHERE workflow_id = ?")
.all(id) as Array<{ signal_name: string }>;
expect(rows).toHaveLength(1);
expect(rows[0]!.signal_name).toBe(`blocked:${id}`);
const names = (
db
.query("SELECT signal_name FROM waitfor_signals WHERE workflow_id = ?")
.all(id) as Array<{ signal_name: string }>
).map((r) => r.signal_name);
expect(names).toContain(`blocked:${id}`);
expect(names).toContain(signalNameFor(EPIC_REF, "answered-question"));
});

test("a hung agent with NO sentinel still fails (compensates, worktree pruned)", async () => {
Expand Down Expand Up @@ -817,6 +833,51 @@ describe("implementation workflow — complexity pause (#52)", () => {
expect(surfaced).toBe(false);
});

test("a `done` park arms `epic-N-review-resolved` even when the watchdog already armed `blocked:<id>` (dual-signal class)", async () => {
// Regression for the orphaned-resume class that left PR #230 / Epic #208
// sitting in `waiting-human` for ~11h after CR posted CHANGES_REQUESTED.
//
// Pre-condition: a stale `.middle/blocked.json` from an earlier phase
// causes the watchdog's rule-4 sentinel pass to arm `blocked:<id>` before
// the agent finishes the current phase with `kind = "done"` (opened a PR,
// parks for review). The fix: `parkForResume` must arm
// `epic-N-review-resolved` for the ACTUAL park reason regardless of what's
// already armed — otherwise the CR event has no matching signal to fire
// and the workflow stays parked forever.
//
// Simulate the race by wrapping the adapter's `classifyStop` (the last
// step that runs *inside* launch-and-drive, before parkForResume). When
// the wrapper fires, the workflow row exists in the DB; look it up by the
// single-row guarantee of the fresh per-test DB, then arm `blocked:<id>`
// directly. That seeds the DB state the watchdog's rule-4 pass would have
// produced, ahead of parkForResume.
const baseAdapter = makeAdapterStub({ kind: "done" });
const deps = makeDeps({
getAdapter: () => ({
...baseAdapter,
classifyStop: (payload) => {
const row = db
.query("SELECT id FROM workflows ORDER BY created_at DESC LIMIT 1")
.get() as { id: string } | null;
if (row) armWaitForSignal(db, `blocked:${row.id}`, row.id, null);
return baseAdapter.classifyStop(payload);
},
}),
});
const workflowId = await start(deps);
await awaitRow(workflowId, "waiting-human");

// Both signals are armed on the workflow — the watchdog's fallback AND the
// actual-reason review-resolved arm. They map to DIFFERENT resume reasons
// (`reasonFromSignalName`), so each fires its own wake path independently.
const armed = db
.query("SELECT signal_name FROM waitfor_signals WHERE workflow_id = ? ORDER BY signal_name")
.all(workflowId) as Array<{ signal_name: string }>;
const names = armed.map((r) => r.signal_name);
expect(names).toContain(`blocked:${workflowId}`);
expect(names).toContain(signalNameFor(EPIC_REF, "review-changes"));
});

test("an approved Epic's brief authorizes proceeding past a complexity overrun (#53)", async () => {
const deps = makeDeps({
isEpicApproved: () => true,
Expand Down
Loading