Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions packages/server/src/__tests__/guardrails-runtime.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,10 @@ describe('MessageConsumer — wired input guardrail', () => {
});

// The trip path must NOT enqueue to the worker thread queue and MUST
// enqueue a single thread_response with the rejection content.
// enqueue a single thread_response with the rejection. The notice rides the
// `error` field (not `content`): routeToRenderer has no plain-`content`
// branch, so a `content`-only payload is dropped — `error` renders
// end-to-end (SSE error event + CLI exit 1; platforms post `Error: …`).
const threadResponses = sentToQueue.filter(
(q) => q.queue === 'thread_response'
);
Expand All @@ -507,8 +510,8 @@ describe('MessageConsumer — wired input guardrail', () => {
);
expect(workerEnqueues.length).toBe(0);
expect(threadResponses.length).toBe(1);
expect(threadResponses[0]!.data.content).toMatch(/Message rejected:/);
expect(threadResponses[0]!.data.content).toMatch(/SECRET/);
expect(threadResponses[0]!.data.error).toMatch(/Message rejected:/);
expect(threadResponses[0]!.data.error).toMatch(/SECRET/);

await flushPendingGuardrailAudits();
const rows = await fetchGuardrailEvents(orgId, 'input');
Expand Down
209 changes: 209 additions & 0 deletions packages/server/src/gateway/__tests__/turn-liveness.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
/**
* Integration tests for turn-liveness (#946) against a real Postgres (PGlite in
* CI). Exercises the durable election marker end to end: arm, discharge,
* fast-path failure, the first-writer-wins election (failTurnIfPending),
* atomic terminal-reply commit, the deadline sweep + exactly-once, and the
* globally-unique (deploymentName:messageId) marker key.
*/

import {
afterEach,
beforeAll,
beforeEach,
describe,
expect,
test,
} from "bun:test";
import { getDb } from "../../db/client.js";
import { RunsQueue } from "../infrastructure/queue/runs-queue.js";
import {
armTurnTimeout,
commitTerminalReply,
dischargeTurn,
extendTurnDeadlines,
failTurnIfPending,
failTurnsForDeployment,
sweepExpiredTurns,
type TurnRouting,
} from "../orchestration/turn-liveness.js";
import {
ensurePgliteForGatewayTests,
resetTestDatabase,
} from "./helpers/db-setup.js";

const TURN_TIMEOUT_QUEUE = "internal:turn_timeout";

let queue: RunsQueue;

beforeAll(async () => {
await ensurePgliteForGatewayTests();
});

beforeEach(async () => {
await resetTestDatabase();
queue = new RunsQueue();
await queue.start();
});

afterEach(async () => {
await queue.stop();
});

function routing(deploymentName: string, messageId: string): TurnRouting {
return {
messageId,
channelId: "chan",
conversationId: `conv-${deploymentName}`,
userId: "user-1",
platform: "api",
deploymentName,
};
}

async function markerCount(deploymentName?: string): Promise<number> {
const sql = getDb();
const rows = deploymentName
? await sql<{ n: number }>`
SELECT count(*)::int AS n FROM public.runs
WHERE queue_name = ${TURN_TIMEOUT_QUEUE}
AND action_input->>'deploymentName' = ${deploymentName}`
: await sql<{ n: number }>`
SELECT count(*)::int AS n FROM public.runs
WHERE queue_name = ${TURN_TIMEOUT_QUEUE}`;
return Number(rows[0]?.n ?? 0);
}

async function errorRowCount(): Promise<number> {
const rows = await getDb()<{ n: number }>`
SELECT count(*)::int AS n FROM public.runs
WHERE queue_name = 'thread_response' AND action_input->>'error' IS NOT NULL`;
return Number(rows[0]?.n ?? 0);
}

async function threadResponseCount(): Promise<number> {
const rows = await getDb()<{ n: number }>`
SELECT count(*)::int AS n FROM public.runs WHERE queue_name = 'thread_response'`;
return Number(rows[0]?.n ?? 0);
}

async function expireAllMarkers(): Promise<void> {
await getDb()`
UPDATE public.runs SET run_at = now() - interval '1 minute'
WHERE queue_name = ${TURN_TIMEOUT_QUEUE}`;
}

describe("turn-liveness", () => {
test("arm then discharge: a real reply leaves no marker and no error", async () => {
await armTurnTimeout(queue, routing("dep-1", "m1"));
expect(await markerCount("dep-1")).toBe(1);

await dischargeTurn("dep-1", "m1");
expect(await markerCount("dep-1")).toBe(0);
expect(await errorRowCount()).toBe(0);
});

test("fast path fails all in-flight turns of a dead deployment, exactly once", async () => {
await armTurnTimeout(queue, routing("dep-2", "a"));
await armTurnTimeout(queue, routing("dep-2", "b"));

expect(await failTurnsForDeployment("dep-2", "worker died")).toBe(2);
expect(await markerCount("dep-2")).toBe(0);
expect(await errorRowCount()).toBe(2);

// Re-running emits nothing (markers already gone) — exactly-once.
expect(await failTurnsForDeployment("dep-2", "worker died")).toBe(0);
expect(await errorRowCount()).toBe(2);
});

test("failTurnIfPending emits once when the turn is still owed", async () => {
await armTurnTimeout(queue, routing("dep-3", "m"));

expect(await failTurnIfPending("dep-3", "m", "startup failed")).toBe(true);
expect(await errorRowCount()).toBe(1);

// Marker is gone — a second call must not double-signal.
expect(await failTurnIfPending("dep-3", "m", "startup failed")).toBe(false);
expect(await errorRowCount()).toBe(1);
});

test("failTurnIfPending does NOT double-signal when a worker already replied", async () => {
await armTurnTimeout(queue, routing("dep-4", "m"));
// Worker raced a real terminal reply → marker discharged.
await dischargeTurn("dep-4", "m");

expect(await failTurnIfPending("dep-4", "m", "startup failed")).toBe(false);
expect(await errorRowCount()).toBe(0);
});

function reply(deploymentName: string, messageId: string) {
return {
messageId,
conversationId: `conv-${deploymentName}`,
platform: "api",
teamId: "api",
processedMessageIds: [messageId],
timestamp: Date.now(),
};
}

test("commitTerminalReply atomically discharges the marker and enqueues the reply", async () => {
await armTurnTimeout(queue, routing("dep-5", "m"));

const emitted = await commitTerminalReply("dep-5", ["m"], reply("dep-5", "m"), null);

expect(emitted).toBe(true);
expect(await markerCount("dep-5")).toBe(0);
expect(await threadResponseCount()).toBe(1);
expect(await errorRowCount()).toBe(0); // a reply, not an error
});

test("commitTerminalReply drops a late reply after the sweep already terminalized the turn", async () => {
await armTurnTimeout(queue, routing("dep-5b", "m"));
await expireAllMarkers();
// Deadline lapsed → sweep emits the terminal error and deletes the marker.
expect(await sweepExpiredTurns("worker unresponsive")).toBe(1);
expect(await errorRowCount()).toBe(1);

// A worker reply that arrives AFTER the sweep must NOT double-signal: there
// is no pending marker left to win, so commitTerminalReply emits nothing.
const emitted = await commitTerminalReply("dep-5b", ["m"], reply("dep-5b", "m"), null);

expect(emitted).toBe(false);
expect(await threadResponseCount()).toBe(1); // still just the sweep's error
expect(await errorRowCount()).toBe(1);
});

test("sweep fails lapsed turns (hung/pod-death) exactly once", async () => {
await armTurnTimeout(queue, routing("dep-6", "m"));
await expireAllMarkers();

expect(await sweepExpiredTurns("worker unresponsive")).toBe(1);
expect(await markerCount("dep-6")).toBe(0);
expect(await errorRowCount()).toBe(1);

expect(await sweepExpiredTurns("worker unresponsive")).toBe(0);
expect(await errorRowCount()).toBe(1);
});

test("a live worker's heartbeat extends the deadline (sweep does not fire)", async () => {
await armTurnTimeout(queue, routing("dep-7", "m"));
await expireAllMarkers(); // simulate the deadline having lapsed

await extendTurnDeadlines("dep-7"); // heartbeat pushes it forward

expect(await sweepExpiredTurns("worker unresponsive")).toBe(0);
expect(await markerCount("dep-7")).toBe(1);
expect(await errorRowCount()).toBe(0);
});

test("marker key is globally unique: same messageId in two deployments is isolated", async () => {
await armTurnTimeout(queue, routing("dep-A", "same"));
await armTurnTimeout(queue, routing("dep-B", "same"));
expect(await markerCount()).toBe(2);

// Discharging one conversation must not touch the other's marker.
await dischargeTurn("dep-A", "same");
expect(await markerCount("dep-A")).toBe(0);
expect(await markerCount("dep-B")).toBe(1);
});
});
12 changes: 12 additions & 0 deletions packages/server/src/gateway/gateway-main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ import type { GatewayConfig } from "./config/index.js";
import type { RuntimeProviderCredentialResolver } from "./embedded.js";
import { type PlatformAdapter, platformRegistry } from "./platform.js";
import { UnifiedThreadResponseConsumer } from "./platform/unified-thread-consumer.js";
import {
startTurnTimeoutSweep,
stopTurnTimeoutSweep,
} from "./orchestration/turn-liveness.js";
import type { SecretStoreRegistry } from "./secrets/index.js";
import { CoreServices } from "./services/core-services.js";

Expand Down Expand Up @@ -143,6 +147,11 @@ export class Gateway {
);
await this.unifiedConsumer.start();

// 6. Start the turn-liveness deadline sweep — the cross-replica backstop
// that fails a turn whose worker hung or whose pod died (the fast path in
// EmbeddedDeploymentManager covers an observed crash instantly).
startTurnTimeoutSweep();

this.isRunning = true;
}

Expand All @@ -155,6 +164,9 @@ export class Gateway {
async stop(): Promise<void> {
logger.info("Stopping gateway...");

// Stop the turn-liveness deadline sweep (symmetric with start()).
stopTurnTimeoutSweep();

// Stop unified consumer if running
if (this.unifiedConsumer) {
logger.info("Stopping unified thread response consumer");
Expand Down
46 changes: 44 additions & 2 deletions packages/server/src/gateway/gateway/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ import { getStoredCredential } from "../routes/internal/device-auth.js";
import type { WritableSecretStore } from "../secrets/index.js";
import { resolveEffectiveModelRef } from "../auth/settings/model-selection.js";
import type { IMessageQueue } from "../infrastructure/queue/index.js";
import {
commitTerminalReply,
extendTurnDeadlines,
} from "../orchestration/turn-liveness.js";
import type { InstructionService } from "../services/instruction-service.js";
import type { AgentSettingsStore } from "../auth/settings/agent-settings-store.js";
import {
Expand Down Expand Up @@ -406,6 +410,10 @@ export class WorkerGateway {
`[WORKER-GATEWAY] Received heartbeat ACK from ${deploymentName}`
);
}
// A worker ACK (delivery receipt or heartbeat) is a worker-driven
// liveness signal — push the turn-liveness deadline forward so a live
// but slow worker is never falsely failed by the sweep. Best-effort.
void extendTurnDeadlines(deploymentName);
Comment on lines +413 to +416
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Don't fire-and-forget extendTurnDeadlines() without handling rejection.

This is a DB write on the heartbeat path. If it rejects, the handler still returns 200 while the promise goes unobserved and can surface as an unhandled rejection. Add a .catch() here and log the failure.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/server/src/gateway/gateway/index.ts` around lines 413 - 416, The
fire-and-forget call to extendTurnDeadlines(deploymentName) can reject and cause
an unhandled promise rejection; change the call site to attach a .catch(...)
handler that logs the error (including deploymentName and context) via the
existing logger so failures are observed; ensure you do not block the handler
(keep it best-effort) but log the rejection and any relevant metadata to aid
debugging.

return c.json({ success: true });
}

Expand All @@ -419,8 +427,42 @@ export class WorkerGateway {
);
}

// Send response to thread_response queue
await this.queue.send("thread_response", enrichedResponse);
// Send response to thread_response queue. TERMINAL rows (success
// completion via processedMessageIds, or error) are subject to the API
// owner-gate in routeToRenderer — a non-owning replica re-queues them —
// so they need the elevated retry budget to survive cross-pod hand-off.
// Non-terminal deltas/status keep default options (not owner-gated).
const isTerminalResponse = !!(
enrichedResponse.error ||
(Array.isArray(enrichedResponse.processedMessageIds) &&
enrichedResponse.processedMessageIds.length > 0)
);

if (isTerminalResponse) {
// The worker produced a real terminal reply (success or explicit
// error). Persist the reply AND discharge the turn-liveness marker(s)
// for the message(s) it processed in ONE transaction — so a pod crash
// can't leave a surviving marker that the sweep would later turn into a
// duplicate "worker stopped" error. The terminal row carries the
// elevated retry budget (applied inside commitTerminalReply) so it
// survives the owner-gate re-queue to the SSE-holding pod.
const dischargeIds = new Set<string>();
if (typeof enrichedResponse.messageId === "string") {
dischargeIds.add(enrichedResponse.messageId);
}
for (const id of enrichedResponse.processedMessageIds ?? []) {
if (typeof id === "string") dischargeIds.add(id);
}
await commitTerminalReply(
deploymentName,
[...dischargeIds],
enrichedResponse,
(enrichedResponse.organizationId as string | undefined) ?? null
);
} else {
// Non-terminal (delta / status): best-effort, not owner-gated.
await this.queue.send("thread_response", enrichedResponse);
}

return c.json({ success: true });
} catch (error) {
Expand Down
2 changes: 2 additions & 0 deletions packages/server/src/gateway/infrastructure/queue/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@

export { QueueProducer } from "./queue-producer.js";
export { RunsQueue } from "./runs-queue.js";
export { TERMINAL_DELIVERY_SEND_OPTS } from "./types.js";
export type {
IMessageQueue,
QueueJob,
QueueOptions,
ThreadResponsePayload,
} from "./types.js";
20 changes: 20 additions & 0 deletions packages/server/src/gateway/infrastructure/queue/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,26 @@ export interface QueueOptions {
actionKey?: string;
}

/**
* Send options for TERMINAL `thread_response` rows (success completion or
* error) that are subject to the API owner-gate in `routeToRenderer`. When a
* non-owning replica claims such a row it throws to re-queue; the owning pod
* (which holds the client's SSE) must win a SKIP-LOCKED claim before delivery.
*
* A short FIXED retry delay (not the default exponential backoff, which would
* span hours) plus a raised retry limit gives a ~30s re-claim window. That
* covers both the cross-pod hand-off and the client's POST→connect gap at the
* small replica counts we run. After the budget is exhausted the row is
* dropped (the client is genuinely gone).
*
* Non-terminal rows (deltas/status) are NOT owner-gated, so they don't need
* this and keep the default send options.
*/
export const TERMINAL_DELIVERY_SEND_OPTS: QueueOptions = {
retryLimit: 30,
retryDelay: 1,
};

export interface QueueStats {
waiting: number;
active: number;
Expand Down
Loading
Loading