diff --git a/packages/server/src/__tests__/guardrails-runtime.test.ts b/packages/server/src/__tests__/guardrails-runtime.test.ts index e71dc6061..cdaf905d6 100644 --- a/packages/server/src/__tests__/guardrails-runtime.test.ts +++ b/packages/server/src/__tests__/guardrails-runtime.test.ts @@ -498,7 +498,10 @@ describe('MessageConsumer — wired input guardrail', () => { }); // The trip path must NOT enqueue to the worker thread queue and MUST - // enqueue a single thread_response with the rejection content. + // enqueue a single thread_response with the rejection. The notice rides the + // `error` field (not `content`): routeToRenderer has no plain-`content` + // branch, so a `content`-only payload is dropped — `error` renders + // end-to-end (SSE error event + CLI exit 1; platforms post `Error: …`). const threadResponses = sentToQueue.filter( (q) => q.queue === 'thread_response' ); @@ -507,8 +510,8 @@ describe('MessageConsumer — wired input guardrail', () => { ); expect(workerEnqueues.length).toBe(0); expect(threadResponses.length).toBe(1); - expect(threadResponses[0]!.data.content).toMatch(/Message rejected:/); - expect(threadResponses[0]!.data.content).toMatch(/SECRET/); + expect(threadResponses[0]!.data.error).toMatch(/Message rejected:/); + expect(threadResponses[0]!.data.error).toMatch(/SECRET/); await flushPendingGuardrailAudits(); const rows = await fetchGuardrailEvents(orgId, 'input'); diff --git a/packages/server/src/gateway/__tests__/turn-liveness.test.ts b/packages/server/src/gateway/__tests__/turn-liveness.test.ts new file mode 100644 index 000000000..21e05d444 --- /dev/null +++ b/packages/server/src/gateway/__tests__/turn-liveness.test.ts @@ -0,0 +1,209 @@ +/** + * Integration tests for turn-liveness (#946) against a real Postgres (PGlite in + * CI). Exercises the durable election marker end to end: arm, discharge, + * fast-path failure, the first-writer-wins election (failTurnIfPending), + * atomic terminal-reply commit, the deadline sweep + exactly-once, and the + * globally-unique (deploymentName:messageId) marker key. + */ + +import { + afterEach, + beforeAll, + beforeEach, + describe, + expect, + test, +} from "bun:test"; +import { getDb } from "../../db/client.js"; +import { RunsQueue } from "../infrastructure/queue/runs-queue.js"; +import { + armTurnTimeout, + commitTerminalReply, + dischargeTurn, + extendTurnDeadlines, + failTurnIfPending, + failTurnsForDeployment, + sweepExpiredTurns, + type TurnRouting, +} from "../orchestration/turn-liveness.js"; +import { + ensurePgliteForGatewayTests, + resetTestDatabase, +} from "./helpers/db-setup.js"; + +const TURN_TIMEOUT_QUEUE = "internal:turn_timeout"; + +let queue: RunsQueue; + +beforeAll(async () => { + await ensurePgliteForGatewayTests(); +}); + +beforeEach(async () => { + await resetTestDatabase(); + queue = new RunsQueue(); + await queue.start(); +}); + +afterEach(async () => { + await queue.stop(); +}); + +function routing(deploymentName: string, messageId: string): TurnRouting { + return { + messageId, + channelId: "chan", + conversationId: `conv-${deploymentName}`, + userId: "user-1", + platform: "api", + deploymentName, + }; +} + +async function markerCount(deploymentName?: string): Promise { + const sql = getDb(); + const rows = deploymentName + ? await sql<{ n: number }>` + SELECT count(*)::int AS n FROM public.runs + WHERE queue_name = ${TURN_TIMEOUT_QUEUE} + AND action_input->>'deploymentName' = ${deploymentName}` + : await sql<{ n: number }>` + SELECT count(*)::int AS n FROM public.runs + WHERE queue_name = ${TURN_TIMEOUT_QUEUE}`; + return Number(rows[0]?.n ?? 0); +} + +async function errorRowCount(): Promise { + const rows = await getDb()<{ n: number }>` + SELECT count(*)::int AS n FROM public.runs + WHERE queue_name = 'thread_response' AND action_input->>'error' IS NOT NULL`; + return Number(rows[0]?.n ?? 0); +} + +async function threadResponseCount(): Promise { + const rows = await getDb()<{ n: number }>` + SELECT count(*)::int AS n FROM public.runs WHERE queue_name = 'thread_response'`; + return Number(rows[0]?.n ?? 0); +} + +async function expireAllMarkers(): Promise { + await getDb()` + UPDATE public.runs SET run_at = now() - interval '1 minute' + WHERE queue_name = ${TURN_TIMEOUT_QUEUE}`; +} + +describe("turn-liveness", () => { + test("arm then discharge: a real reply leaves no marker and no error", async () => { + await armTurnTimeout(queue, routing("dep-1", "m1")); + expect(await markerCount("dep-1")).toBe(1); + + await dischargeTurn("dep-1", "m1"); + expect(await markerCount("dep-1")).toBe(0); + expect(await errorRowCount()).toBe(0); + }); + + test("fast path fails all in-flight turns of a dead deployment, exactly once", async () => { + await armTurnTimeout(queue, routing("dep-2", "a")); + await armTurnTimeout(queue, routing("dep-2", "b")); + + expect(await failTurnsForDeployment("dep-2", "worker died")).toBe(2); + expect(await markerCount("dep-2")).toBe(0); + expect(await errorRowCount()).toBe(2); + + // Re-running emits nothing (markers already gone) — exactly-once. + expect(await failTurnsForDeployment("dep-2", "worker died")).toBe(0); + expect(await errorRowCount()).toBe(2); + }); + + test("failTurnIfPending emits once when the turn is still owed", async () => { + await armTurnTimeout(queue, routing("dep-3", "m")); + + expect(await failTurnIfPending("dep-3", "m", "startup failed")).toBe(true); + expect(await errorRowCount()).toBe(1); + + // Marker is gone — a second call must not double-signal. + expect(await failTurnIfPending("dep-3", "m", "startup failed")).toBe(false); + expect(await errorRowCount()).toBe(1); + }); + + test("failTurnIfPending does NOT double-signal when a worker already replied", async () => { + await armTurnTimeout(queue, routing("dep-4", "m")); + // Worker raced a real terminal reply → marker discharged. + await dischargeTurn("dep-4", "m"); + + expect(await failTurnIfPending("dep-4", "m", "startup failed")).toBe(false); + expect(await errorRowCount()).toBe(0); + }); + + function reply(deploymentName: string, messageId: string) { + return { + messageId, + conversationId: `conv-${deploymentName}`, + platform: "api", + teamId: "api", + processedMessageIds: [messageId], + timestamp: Date.now(), + }; + } + + test("commitTerminalReply atomically discharges the marker and enqueues the reply", async () => { + await armTurnTimeout(queue, routing("dep-5", "m")); + + const emitted = await commitTerminalReply("dep-5", ["m"], reply("dep-5", "m"), null); + + expect(emitted).toBe(true); + expect(await markerCount("dep-5")).toBe(0); + expect(await threadResponseCount()).toBe(1); + expect(await errorRowCount()).toBe(0); // a reply, not an error + }); + + test("commitTerminalReply drops a late reply after the sweep already terminalized the turn", async () => { + await armTurnTimeout(queue, routing("dep-5b", "m")); + await expireAllMarkers(); + // Deadline lapsed → sweep emits the terminal error and deletes the marker. + expect(await sweepExpiredTurns("worker unresponsive")).toBe(1); + expect(await errorRowCount()).toBe(1); + + // A worker reply that arrives AFTER the sweep must NOT double-signal: there + // is no pending marker left to win, so commitTerminalReply emits nothing. + const emitted = await commitTerminalReply("dep-5b", ["m"], reply("dep-5b", "m"), null); + + expect(emitted).toBe(false); + expect(await threadResponseCount()).toBe(1); // still just the sweep's error + expect(await errorRowCount()).toBe(1); + }); + + test("sweep fails lapsed turns (hung/pod-death) exactly once", async () => { + await armTurnTimeout(queue, routing("dep-6", "m")); + await expireAllMarkers(); + + expect(await sweepExpiredTurns("worker unresponsive")).toBe(1); + expect(await markerCount("dep-6")).toBe(0); + expect(await errorRowCount()).toBe(1); + + expect(await sweepExpiredTurns("worker unresponsive")).toBe(0); + expect(await errorRowCount()).toBe(1); + }); + + test("a live worker's heartbeat extends the deadline (sweep does not fire)", async () => { + await armTurnTimeout(queue, routing("dep-7", "m")); + await expireAllMarkers(); // simulate the deadline having lapsed + + await extendTurnDeadlines("dep-7"); // heartbeat pushes it forward + + expect(await sweepExpiredTurns("worker unresponsive")).toBe(0); + expect(await markerCount("dep-7")).toBe(1); + expect(await errorRowCount()).toBe(0); + }); + + test("marker key is globally unique: same messageId in two deployments is isolated", async () => { + await armTurnTimeout(queue, routing("dep-A", "same")); + await armTurnTimeout(queue, routing("dep-B", "same")); + expect(await markerCount()).toBe(2); + + // Discharging one conversation must not touch the other's marker. + await dischargeTurn("dep-A", "same"); + expect(await markerCount("dep-A")).toBe(0); + expect(await markerCount("dep-B")).toBe(1); + }); +}); diff --git a/packages/server/src/gateway/gateway-main.ts b/packages/server/src/gateway/gateway-main.ts index 411bacfd6..cd0b09c6c 100644 --- a/packages/server/src/gateway/gateway-main.ts +++ b/packages/server/src/gateway/gateway-main.ts @@ -11,6 +11,10 @@ import type { GatewayConfig } from "./config/index.js"; import type { RuntimeProviderCredentialResolver } from "./embedded.js"; import { type PlatformAdapter, platformRegistry } from "./platform.js"; import { UnifiedThreadResponseConsumer } from "./platform/unified-thread-consumer.js"; +import { + startTurnTimeoutSweep, + stopTurnTimeoutSweep, +} from "./orchestration/turn-liveness.js"; import type { SecretStoreRegistry } from "./secrets/index.js"; import { CoreServices } from "./services/core-services.js"; @@ -143,6 +147,11 @@ export class Gateway { ); await this.unifiedConsumer.start(); + // 6. Start the turn-liveness deadline sweep — the cross-replica backstop + // that fails a turn whose worker hung or whose pod died (the fast path in + // EmbeddedDeploymentManager covers an observed crash instantly). + startTurnTimeoutSweep(); + this.isRunning = true; } @@ -155,6 +164,9 @@ export class Gateway { async stop(): Promise { logger.info("Stopping gateway..."); + // Stop the turn-liveness deadline sweep (symmetric with start()). + stopTurnTimeoutSweep(); + // Stop unified consumer if running if (this.unifiedConsumer) { logger.info("Stopping unified thread response consumer"); diff --git a/packages/server/src/gateway/gateway/index.ts b/packages/server/src/gateway/gateway/index.ts index a59838731..4e5c6f619 100644 --- a/packages/server/src/gateway/gateway/index.ts +++ b/packages/server/src/gateway/gateway/index.ts @@ -20,6 +20,10 @@ import { getStoredCredential } from "../routes/internal/device-auth.js"; import type { WritableSecretStore } from "../secrets/index.js"; import { resolveEffectiveModelRef } from "../auth/settings/model-selection.js"; import type { IMessageQueue } from "../infrastructure/queue/index.js"; +import { + commitTerminalReply, + extendTurnDeadlines, +} from "../orchestration/turn-liveness.js"; import type { InstructionService } from "../services/instruction-service.js"; import type { AgentSettingsStore } from "../auth/settings/agent-settings-store.js"; import { @@ -406,6 +410,10 @@ export class WorkerGateway { `[WORKER-GATEWAY] Received heartbeat ACK from ${deploymentName}` ); } + // A worker ACK (delivery receipt or heartbeat) is a worker-driven + // liveness signal — push the turn-liveness deadline forward so a live + // but slow worker is never falsely failed by the sweep. Best-effort. + void extendTurnDeadlines(deploymentName); return c.json({ success: true }); } @@ -419,8 +427,42 @@ export class WorkerGateway { ); } - // Send response to thread_response queue - await this.queue.send("thread_response", enrichedResponse); + // Send response to thread_response queue. TERMINAL rows (success + // completion via processedMessageIds, or error) are subject to the API + // owner-gate in routeToRenderer — a non-owning replica re-queues them — + // so they need the elevated retry budget to survive cross-pod hand-off. + // Non-terminal deltas/status keep default options (not owner-gated). + const isTerminalResponse = !!( + enrichedResponse.error || + (Array.isArray(enrichedResponse.processedMessageIds) && + enrichedResponse.processedMessageIds.length > 0) + ); + + if (isTerminalResponse) { + // The worker produced a real terminal reply (success or explicit + // error). Persist the reply AND discharge the turn-liveness marker(s) + // for the message(s) it processed in ONE transaction — so a pod crash + // can't leave a surviving marker that the sweep would later turn into a + // duplicate "worker stopped" error. The terminal row carries the + // elevated retry budget (applied inside commitTerminalReply) so it + // survives the owner-gate re-queue to the SSE-holding pod. + const dischargeIds = new Set(); + if (typeof enrichedResponse.messageId === "string") { + dischargeIds.add(enrichedResponse.messageId); + } + for (const id of enrichedResponse.processedMessageIds ?? []) { + if (typeof id === "string") dischargeIds.add(id); + } + await commitTerminalReply( + deploymentName, + [...dischargeIds], + enrichedResponse, + (enrichedResponse.organizationId as string | undefined) ?? null + ); + } else { + // Non-terminal (delta / status): best-effort, not owner-gated. + await this.queue.send("thread_response", enrichedResponse); + } return c.json({ success: true }); } catch (error) { diff --git a/packages/server/src/gateway/infrastructure/queue/index.ts b/packages/server/src/gateway/infrastructure/queue/index.ts index 4cdbd36ca..c29f6b56e 100644 --- a/packages/server/src/gateway/infrastructure/queue/index.ts +++ b/packages/server/src/gateway/infrastructure/queue/index.ts @@ -7,8 +7,10 @@ export { QueueProducer } from "./queue-producer.js"; export { RunsQueue } from "./runs-queue.js"; +export { TERMINAL_DELIVERY_SEND_OPTS } from "./types.js"; export type { IMessageQueue, QueueJob, + QueueOptions, ThreadResponsePayload, } from "./types.js"; diff --git a/packages/server/src/gateway/infrastructure/queue/types.ts b/packages/server/src/gateway/infrastructure/queue/types.ts index 16d934728..2a2eb045e 100644 --- a/packages/server/src/gateway/infrastructure/queue/types.ts +++ b/packages/server/src/gateway/infrastructure/queue/types.ts @@ -26,6 +26,26 @@ export interface QueueOptions { actionKey?: string; } +/** + * Send options for TERMINAL `thread_response` rows (success completion or + * error) that are subject to the API owner-gate in `routeToRenderer`. When a + * non-owning replica claims such a row it throws to re-queue; the owning pod + * (which holds the client's SSE) must win a SKIP-LOCKED claim before delivery. + * + * A short FIXED retry delay (not the default exponential backoff, which would + * span hours) plus a raised retry limit gives a ~30s re-claim window. That + * covers both the cross-pod hand-off and the client's POST→connect gap at the + * small replica counts we run. After the budget is exhausted the row is + * dropped (the client is genuinely gone). + * + * Non-terminal rows (deltas/status) are NOT owner-gated, so they don't need + * this and keep the default send options. + */ +export const TERMINAL_DELIVERY_SEND_OPTS: QueueOptions = { + retryLimit: 30, + retryDelay: 1, +}; + export interface QueueStats { waiting: number; active: number; diff --git a/packages/server/src/gateway/orchestration/impl/embedded-deployment.ts b/packages/server/src/gateway/orchestration/impl/embedded-deployment.ts index 3a86a059a..09b0d91fb 100644 --- a/packages/server/src/gateway/orchestration/impl/embedded-deployment.ts +++ b/packages/server/src/gateway/orchestration/impl/embedded-deployment.ts @@ -19,6 +19,11 @@ import { buildDeploymentInfoSummary, getVeryOldThresholdDays, } from "../deployment-utils.js"; +import { failTurnsForDeployment } from "../turn-liveness.js"; + +/** Surfaced to the client when a worker dies before producing a reply. */ +const WORKER_DIED_MESSAGE = + "The worker handling your request stopped unexpectedly before it could reply. Please retry in a moment."; const logger = createLogger("orchestrator"); @@ -503,6 +508,12 @@ export function nixPackageAttrRef(pkg: string): string { export class EmbeddedDeploymentManager extends BaseDeploymentManager { private workers: Map = new Map(); + /** Deployments currently being torn down deliberately (scale-to-0, idle + * reap, delete) via {@link killWorker}. The exit handler consumes the entry + * so a deliberate stop is NOT surfaced to the user as a worker crash; any + * OTHER exit/spawn-error fails the deployment's in-flight turns. Pod-local + * and pod-exclusive (this pod owns its own worker children). */ + private intentionalExits: Set = new Set(); constructor( config: OrchestratorConfig, @@ -789,6 +800,11 @@ export class EmbeddedDeploymentManager extends BaseDeploymentManager { ); this.workers.delete(deploymentName); releaseLockOnce(); + // A spawn error is never a deliberate stop. Fail any in-flight turn(s) + // for this deployment so the client gets a terminal error instead of a + // hang. No-op if nothing is in flight (markers already discharged). + this.intentionalExits.delete(deploymentName); + void failTurnsForDeployment(deploymentName, WORKER_DIED_MESSAGE); }); child.stdout?.on("data", (data: Buffer) => { @@ -809,6 +825,9 @@ export class EmbeddedDeploymentManager extends BaseDeploymentManager { // flag and is the authoritative release point — codex P1#3. this.workers.delete(deploymentName); releaseLockOnce(); + // `delete` returns true iff killWorker marked this exit as deliberate. + // Consume the flag here (the exit is the single authoritative point). + const wasIntentional = this.intentionalExits.delete(deploymentName); if (signal) { logger.info( `Embedded worker ${deploymentName} exited with signal ${signal}` @@ -820,6 +839,14 @@ export class EmbeddedDeploymentManager extends BaseDeploymentManager { } else { logger.info(`Embedded worker ${deploymentName} exited cleanly`); } + // Any exit that wasn't a deliberate teardown fails the deployment's + // in-flight turn(s) — gated on exit code is wrong: a clean `exit 0` that + // leaves a turn un-answered is still a failure (GPT-5.5 edge #3). The + // marker's presence is the source of truth, so this is a no-op when the + // worker had already replied (markers discharged) or was idle. + if (!wasIntentional) { + void failTurnsForDeployment(deploymentName, WORKER_DIED_MESSAGE); + } }); this.workers.set(deploymentName, { @@ -909,6 +936,11 @@ export class EmbeddedDeploymentManager extends BaseDeploymentManager { ): Promise { const child = entry.process; + // Mark this as a deliberate teardown so the spawnDeployment exit handler + // does NOT surface it to the user as a worker crash. The exit handler + // consumes (deletes) the flag. + this.intentionalExits.add(deploymentName); + // Delete from the map up front so callers see an empty // listDeployments() the moment kill returns — the public contract // hasn't changed. The lock release is deliberately NOT touched here @@ -921,7 +953,14 @@ export class EmbeddedDeploymentManager extends BaseDeploymentManager { // indicators here. `child.killed` is set the moment we *send* a signal, // so checking it would mis-treat "we just sent SIGTERM" as "already // exited" and skip the SIGKILL escalation below. - if (child.exitCode !== null || child.signalCode !== null) return; + if (child.exitCode !== null || child.signalCode !== null) { + // It exited on its own before we asked — the exit handler already ran + // (and, since the flag wasn't set then, correctly treated it as a crash + // and failed any in-flight turns). Drop the flag we just added so it + // can't suppress a future exit for a re-used deployment name. + this.intentionalExits.delete(deploymentName); + return; + } const exited = new Promise((resolve) => { child.once("exit", () => resolve()); diff --git a/packages/server/src/gateway/orchestration/message-consumer.ts b/packages/server/src/gateway/orchestration/message-consumer.ts index 1c1966191..d0447e0eb 100644 --- a/packages/server/src/gateway/orchestration/message-consumer.ts +++ b/packages/server/src/gateway/orchestration/message-consumer.ts @@ -21,7 +21,11 @@ import type { IMessageQueue, QueueJob as SharedQueueJob, } from "../infrastructure/queue/index.js"; -import { RunsQueue } from "../infrastructure/queue/index.js"; +import { + RunsQueue, + TERMINAL_DELIVERY_SEND_OPTS, +} from "../infrastructure/queue/index.js"; +import { armTurnTimeout, failTurnIfPending } from "./turn-liveness.js"; import { type BaseDeploymentManager, buildCanonicalConversationKey, @@ -228,6 +232,9 @@ export class MessageConsumer { // guardrail trips. We surface the trip reason to the user via the // `thread_response` queue (same path `trackFailedDeployment` uses) // and skip both the worker queue enqueue and the deployment ensure. + // The trip is captured here but DELIVERED below (outside the fail-open + // try-catch) so a delivery failure can't be swallowed into dispatch. + let inputTrip: { reason: string; guardrail: string } | null = null; if ( this.guardrailRegistry && this.agentSettingsStore && @@ -285,39 +292,14 @@ export class MessageConsumer { reason: outcome.tripped.reason, metadata: outcome.tripped.metadata, }); - const reasonText = - outcome.tripped.reason ?? "blocked by policy"; - const blockMessage = `Message rejected: ${reasonText}`; - try { - const responseQueue = "thread_response"; - await this.queue.createQueue(responseQueue); - await this.queue.send(responseQueue, { - messageId: data.messageId, - userId: data.userId, - channelId: data.channelId, - conversationId: data.conversationId, - platform: data.platform, - platformMetadata: data.platformMetadata, - content: blockMessage, - processedMessageIds: [data.messageId], - }); - } catch (notifyError) { - logger.error( - { notifyError: String(notifyError) }, - "Failed to send guardrail block message to user" - ); - } - logger.info( - { - agentId: data.agentId, - guardrail: outcome.tripped.guardrail, - conversationId: effectiveConversationId, - }, - "Input guardrail tripped — message dropped" - ); - queueSpan?.setStatus({ code: SpanStatusCode.OK }); - queueSpan?.end(); - return; + // Capture the trip; the rejection is DELIVERED below, outside this + // fail-open try-catch. Delivering here would let a delivery + // failure be caught by the catch and fall through to dispatch the + // blocked input — the opposite of what a trip must do. + inputTrip = { + reason: outcome.tripped.reason ?? "blocked by policy", + guardrail: outcome.tripped.guardrail, + }; } } } catch (err) { @@ -333,6 +315,61 @@ export class MessageConsumer { } } + // Deliver a guardrail rejection OUTSIDE the fail-open try-catch above. A + // delivery failure here MUST propagate so the `messages` run retries (the + // trip is deterministic) — it must never be swallowed and fall through to + // dispatching the blocked input. Routed via `error` (renders end-to-end: + // SSE error event + CLI exit 1; platforms post `Error: …`). No turn marker + // is armed for a rejected turn, so the message-queue retry is the backstop. + if (inputTrip) { + const responseQueue = "thread_response"; + await this.queue.createQueue(responseQueue); + await this.queue.send( + responseQueue, + { + messageId: data.messageId, + userId: data.userId, + channelId: data.channelId, + conversationId: data.conversationId, + platform: data.platform, + platformMetadata: data.platformMetadata, + error: `Message rejected: ${inputTrip.reason}`, + processedMessageIds: [data.messageId], + }, + TERMINAL_DELIVERY_SEND_OPTS + ); + logger.info( + { + agentId: data.agentId, + guardrail: inputTrip.guardrail, + conversationId: effectiveConversationId, + }, + "Input guardrail tripped — message dropped" + ); + queueSpan?.setStatus({ code: SpanStatusCode.OK }); + queueSpan?.end(); + return; + } + + // Arm the turn-liveness marker BEFORE the message is deliverable to the + // worker. The marker is the durable record that this turn owes the client + // a terminal event; it is discharged on the worker's reply and otherwise + // failed (fast path on crash, deadline backstop on hang/pod-death) into a + // terminal `error`. Arming first closes a race where an already-running + // worker could reply before the marker exists — the discharge would + // no-op, then a stale marker would be armed and the sweep would emit a + // spurious error after a successful turn. + await armTurnTimeout(this.queue, { + messageId: data.messageId, + channelId: data.channelId, + conversationId: effectiveConversationId, + userId: data.userId, + platform: data.platform, + platformMetadata: data.platformMetadata, + deploymentName, + organizationId: data.organizationId, + }); + // 1) Send to thread queue immediately (queue persists; worker will drain on attach) await Sentry.startSpan( { @@ -640,23 +677,14 @@ export class MessageConsumer { const userMessage = "Worker startup failed and your request could not be processed. Please retry in a moment."; - // Notify user that their message could not be processed - try { - const responseQueue = "thread_response"; - await this.queue.createQueue(responseQueue); - await this.queue.send(responseQueue, { - messageId: data.messageId, - userId: data.userId, - channelId: data.channelId, - conversationId: data.conversationId, - platform: data.platform, - platformMetadata: data.platformMetadata, - content: userMessage, - processedMessageIds: [data.messageId], - }); - } catch (notifyError) { - logger.error("Failed to send error notification to user:", notifyError); - } + // Emit the startup-failure notice through the first-writer-wins election + // (atomic delete-marker + enqueue-error in one tx). This is gated on the + // marker still being pending: if a still-attached worker raced a real + // terminal reply (which discharged the marker), this no-ops instead of + // double-signalling the client. Routes via `error` (renders end-to-end: + // SSE error event + CLI exit 1; platforms post `Error: …`). If the marker + // was never armed (arm failed) it also no-ops — logged at arm time. + await failTurnIfPending(deploymentName, data.messageId, userMessage); } catch (trackError) { // Don't fail the main flow if tracking fails logger.error("Failed to track deployment failure:", trackError); diff --git a/packages/server/src/gateway/orchestration/turn-liveness.ts b/packages/server/src/gateway/orchestration/turn-liveness.ts new file mode 100644 index 000000000..a85394e79 --- /dev/null +++ b/packages/server/src/gateway/orchestration/turn-liveness.ts @@ -0,0 +1,491 @@ +/** + * Turn liveness — surfaces a terminal error to the client when a worker fails + * to produce a reply (crash, hang, or pod death), so the SSE/CLI never hangs + * forever and never receives a silent `complete`. + * + * ## The obligation, as a durable election record + * + * Every dispatched turn owes the client exactly one terminal event for its + * `messageId`. Between delivery-receipt (when the `thread_message` run already + * completes) and the worker's reply there is otherwise NO durable record that + * the turn is still owed an answer — that gap is what lets a dead worker hang + * the stream. We close it by writing a **passive marker row** into `public.runs` + * on a queue with NO consumer (`internal:turn_timeout`): it is never claimed as + * a job, so the RunsQueue status machinery never touches it. The marker's + * EXISTENCE is the obligation; deleting it (`DELETE … RETURNING`) is a + * first-writer-wins election — a row can be deleted exactly once, and the + * deleter emits the terminal `error` in the SAME transaction, so the emit is + * atomic and crash-safe (the marker survives a mid-emit crash and a later sweep + * retries). + * + * ## Detection (two paths, one emit) + * - Fast path (instant): the owning pod observes `child.once("exit"/"error")` + * and calls {@link failTurnsForDeployment}. Covers the common case (bad + * provider key, OOM, `exit 1`). + * - Backstop (deadline): {@link sweepExpiredTurns} runs periodically on every + * replica and fails markers whose deadline has lapsed. Covers a hung worker + * (alive, never replies) and a worker-pod death (the marker outlives the pod + * and another replica sweeps it). The deadline is pushed forward by the + * worker's 20s heartbeat ({@link extendTurnDeadlines}), so a live-but-slow + * worker is never falsely failed, while a silent one lapses. + * + * ## Multi-replica + * Arming/extending/discharging all happen on the worker's owning pod (worker + * child, dispatch, and `handleWorkerResponse` are co-located there). The marker + * + emit live in shared Postgres, so any replica can sweep, and the emitted + * `thread_response{error}` is owner-gated in `routeToRenderer` to reach the pod + * that holds the client's SSE. + */ + +import { createLogger } from "@lobu/core"; +import { getDb, type DbClient } from "../../db/client.js"; +import type { IMessageQueue } from "../infrastructure/queue/index.js"; +import { TERMINAL_DELIVERY_SEND_OPTS } from "../infrastructure/queue/index.js"; + +const logger = createLogger("turn-liveness"); + +/** Queue name for the passive marker rows. Has NO registered consumer — the + * rows are never claimed as jobs; they are swept directly by this module. The + * `internal:` prefix maps to run_type `internal` (classifyQueue), keeping them + * out of the chat_message lane's stats/sweeps. */ +const TURN_TIMEOUT_QUEUE = "internal:turn_timeout"; + +/** thread_response NOTIFY channel — must match RunsQueue's `runs_lobu:` + * so the UnifiedThreadResponseConsumer wakes immediately on an emitted error. */ +const THREAD_RESPONSE_CHANNEL = "runs_lobu:thread_response"; + +/** Default turn deadline. Comfortably exceeds the worker's 20s heartbeat + * interval so a live worker (which extends on every heartbeat) is never + * falsely failed; a silent/dead worker lapses within this window of its last + * heartbeat (the fast path catches an observed crash instantly). */ +const DEFAULT_DEADLINE_MS = 60_000; + +/** How often each replica sweeps for lapsed markers. */ +const SWEEP_INTERVAL_MS = 15_000; + +/** Routing needed to build the terminal `thread_response{error}` for a turn, + * stored as the marker's `action_input`. */ +export interface TurnRouting { + messageId: string; + channelId?: string; + conversationId?: string; + userId?: string; + platform?: string; + platformMetadata?: Record; + deploymentName: string; + organizationId?: string; +} + +/** + * Narrow a JSONB `action_input` read back from Postgres (typed `unknown` at the + * DB boundary) to {@link TurnRouting}. `armTurnTimeout` is the only writer of + * these rows, but the value is still `unknown` on the way out — validate rather + * than blind-cast so a malformed row is skipped, never used to build a + * `thread_response` with `undefined` fields. `messageId` is the load-bearing + * field (discharge key + `processedMessageIds`), so it gates the narrow. + */ +function asTurnRouting(value: unknown): TurnRouting | null { + if (typeof value !== "object" || value === null) return null; + const v = value as Record; + if (typeof v.messageId !== "string" || v.messageId.length === 0) return null; + if (typeof v.deploymentName !== "string") return null; + return v as unknown as TurnRouting; +} + +/** + * Build the marker's globally-unique key. `messageId` alone is NOT global — + * platform message IDs (e.g. Telegram) are per-chat and API callers can supply + * their own — so two concurrent turns in different conversations could collide + * (one suppresses the other's marker, or a discharge hits the wrong turn). + * `deploymentName` is unique per conversation, so `deploymentName:messageId` is + * globally unique. + */ +function turnMarkerKey(deploymentName: string, messageId: string): string { + return `${deploymentName}:${messageId}`; +} + +/** + * Arm the turn-liveness marker at dispatch. Idempotent per (deployment, + * messageId) via the partial-unique `idempotency_key`, so a re-dispatched + * message doesn't double-arm. + * + * **Fail-closed:** throws if the marker can't be persisted. The marker is the + * ONLY durable record that this turn owes the client a terminal event — if it's + * missing, a later worker crash/hang falls back to the silent hang this module + * exists to prevent. The caller arms before enqueueing to the worker, so a + * throw aborts dispatch and the `messages` run retries the whole turn (the arm + * is idempotent), rather than dispatching an unprotected turn. + */ +export async function armTurnTimeout( + queue: IMessageQueue, + routing: TurnRouting, + deadlineMs: number = DEFAULT_DEADLINE_MS +): Promise { + await queue.createQueue(TURN_TIMEOUT_QUEUE); + await queue.send(TURN_TIMEOUT_QUEUE, routing, { + delayMs: deadlineMs, + singletonKey: turnMarkerKey(routing.deploymentName, routing.messageId), + }); +} + +/** + * Discharge a turn's obligation on a real terminal reply (worker success or an + * explicit worker error). Deletes the marker so neither the fast path nor the + * sweep emits a spurious error. Idempotent. + */ +export async function dischargeTurn( + deploymentName: string, + messageId: string +): Promise { + const key = turnMarkerKey(deploymentName, messageId); + try { + const sql = getDb(); + // `status = 'pending'` is required for the planner to use the PARTIAL index + // `runs_idempotency_key_uniq` (WHERE … status IN ('pending','claimed', + // 'running')) — without a status constraint it falls back to a seq scan + // over the full `runs` table. Markers are always pending (never claimed). + await sql` + DELETE FROM public.runs + WHERE idempotency_key = ${key} + AND status = 'pending' + AND queue_name = ${TURN_TIMEOUT_QUEUE} + `; + } catch (err) { + logger.warn( + { key, err: String(err) }, + "Failed to discharge turn-timeout marker" + ); + } +} + +/** + * Push the deadline forward for all in-flight turns of a deployment. Called on + * the worker's heartbeat ACK — a worker-driven liveness signal, so a live but + * slow worker keeps its markers fresh while a silent one lapses. + */ +export async function extendTurnDeadlines( + deploymentName: string, + deadlineMs: number = DEFAULT_DEADLINE_MS +): Promise { + try { + const sql = getDb(); + const deadlineSec = Math.ceil(deadlineMs / 1000); + // status + run_type match the partial predicate of `runs_lobu_claim_idx` + // (WHERE status='pending' AND run_type IN (…)) and its leading column, so + // this uses the index (run_type, queue_name, …) rather than scanning runs. + await sql` + UPDATE public.runs + SET run_at = now() + (${deadlineSec}::int * interval '1 second') + WHERE status = 'pending' + AND run_type = 'internal' + AND queue_name = ${TURN_TIMEOUT_QUEUE} + AND action_input->>'deploymentName' = ${deploymentName} + `; + } catch (err) { + logger.warn( + { deploymentName, err: String(err) }, + "Failed to extend turn-timeout deadline" + ); + } +} + +/** + * Fast path: fail every in-flight turn of a deployment whose worker has just + * died unexpectedly. Atomic per the `DELETE … RETURNING` election — only this + * caller gets the rows, and the terminal error is enqueued in the same + * transaction. + * + * @returns the number of turns failed (0 if the worker already replied / was a + * deliberate stop with nothing in flight). + */ +export async function failTurnsForDeployment( + deploymentName: string, + reason: string +): Promise { + try { + const sql = getDb(); + const failed = await sql.begin(async (tx: DbClient) => { + const rows = await tx<{ action_input: unknown }>` + DELETE FROM public.runs + WHERE status = 'pending' + AND run_type = 'internal' + AND queue_name = ${TURN_TIMEOUT_QUEUE} + AND action_input->>'deploymentName' = ${deploymentName} + RETURNING action_input + `; + let emitted = 0; + for (const row of rows) { + const routing = asTurnRouting(row.action_input); + if (!routing) { + // Unreachable for markers we write (arm always supplies messageId + + // deploymentName). A row that fails this validation lacks the fields + // needed to route a terminal event, so it's undeliverable — deleting + // it (vs leaving it) is correct; leaving it would re-loop the sweep + // forever. Logged at error so a real schema drift is noticed. + logger.error("Dropping unroutable turn-timeout marker (fast path)"); + continue; + } + await enqueueTerminalError(tx, routing, reason); + emitted += 1; + } + return emitted; + }); + if (failed > 0) { + await notifyThreadResponse(); + logger.info( + { deploymentName, failed }, + "Worker died unexpectedly — emitted terminal error for in-flight turn(s)" + ); + } + return failed; + } catch (err) { + logger.error( + { deploymentName, err: String(err) }, + "Failed to fail turns for dead deployment" + ); + return 0; + } +} + +/** + * Deadline backstop: fail markers whose deadline has lapsed. Runs on every + * replica; `FOR UPDATE SKIP LOCKED` + `DELETE … RETURNING` make it exactly-once + * across replicas. Covers a hung worker and a worker-pod death (the marker + * outlives the pod that armed it). + */ +export async function sweepExpiredTurns( + reason = "The worker handling your request stopped responding before it could reply. Please retry in a moment." +): Promise { + try { + const sql = getDb(); + const failed = await sql.begin(async (tx: DbClient) => { + // Literal queue_name (no bound params) on the claim to avoid a PGlite + // quirk where parameterized RETURNING statements intermittently report a + // parameter-count mismatch (see runs-queue stale sweep). The emit insert + // below DOES use params but has no RETURNING, so it's unaffected. + const rows = await tx.unsafe<{ action_input: unknown }>( + // status + run_type match the partial predicate and leading column of + // `runs_lobu_claim_idx`, so the inner SELECT is an index range scan + // (run_type, queue_name, …, run_at) — not a full scan of `runs` (which + // retains 30 days of completed rows). Literals only (no bound params) + // to dodge the PGlite parameterized-RETURNING quirk. + `DELETE FROM public.runs + WHERE id IN ( + SELECT id FROM public.runs + WHERE status = 'pending' + AND run_type = 'internal' + AND queue_name = 'internal:turn_timeout' + AND run_at < now() + FOR UPDATE SKIP LOCKED + LIMIT 200 + ) + RETURNING action_input` + ); + let emitted = 0; + for (const row of rows) { + const routing = asTurnRouting(row.action_input); + if (!routing) { + // See the fast-path note: unroutable (missing messageId/deployment), + // so undeliverable — deleting clears it; keeping would re-loop forever. + logger.error("Dropping unroutable turn-timeout marker (sweep)"); + continue; + } + await enqueueTerminalError(tx, routing, reason); + emitted += 1; + } + return emitted; + }); + if (failed > 0) { + await notifyThreadResponse(); + logger.warn( + { failed }, + "Turn-liveness sweep failed lapsed turn(s) (hung worker or pod death)" + ); + } + return failed; + } catch (err) { + logger.warn({ err: String(err) }, "Turn-liveness sweep failed"); + return 0; + } +} + +/** + * Insert one terminal `thread_response` row in the caller's transaction. + * Mirrors RunsQueue.send's row shape for `thread_response` (run_type + * chat_message), with the elevated retry budget terminal rows need to survive + * the owner-gate re-queue (see TERMINAL_DELIVERY_SEND_OPTS). The caller does the + * `pg_notify` after the transaction commits. + */ +async function insertThreadResponseRow( + tx: DbClient, + payload: unknown, + organizationId: string | null +): Promise { + await tx.unsafe( + `INSERT INTO public.runs ( + run_type, queue_name, action_input, status, run_at, + max_attempts, attempts, priority, retry_delay_seconds, organization_id + ) VALUES ( + 'chat_message', 'thread_response', $1, 'pending', now(), + $2, 0, 0, $3, $4 + )`, + [ + tx.json(payload), + TERMINAL_DELIVERY_SEND_OPTS.retryLimit ?? 30, + TERMINAL_DELIVERY_SEND_OPTS.retryDelay ?? 1, + organizationId, + ] + ); +} + +/** Build the terminal `thread_response{error}` payload for a turn. `platform` + * always carries an explicit value (defaults to "api") — gateway routing and + * platform isolation require it; never emit `platform: undefined`. */ +function buildTerminalErrorPayload(routing: TurnRouting, reason: string) { + return { + messageId: routing.messageId, + channelId: routing.channelId, + conversationId: routing.conversationId, + userId: routing.userId, + teamId: routing.platform ?? "api", + platform: routing.platform ?? "api", + platformMetadata: routing.platformMetadata, + error: reason, + processedMessageIds: [routing.messageId], + timestamp: Date.now(), + }; +} + +/** Insert a terminal `thread_response{error}` for a turn, in the caller's tx. */ +async function enqueueTerminalError( + tx: DbClient, + routing: TurnRouting, + reason: string +): Promise { + await insertThreadResponseRow( + tx, + buildTerminalErrorPayload(routing, reason), + routing.organizationId ?? null + ); +} + +/** + * Election-gated terminal error for a SINGLE turn, used by pre-spawn deployment + * failures (`trackFailedDeployment`). Atomically deletes the marker for + * (deploymentName, messageId) and — only if it won the delete (the turn wasn't + * already answered by a worker that raced) — emits the terminal error in the + * same transaction. Returns whether it emitted. + * + * This is the first-writer-wins guarantee for the startup-failure path: if a + * still-attached worker already produced a terminal reply (which discharged the + * marker), this no-ops instead of double-signalling the client. + */ +export async function failTurnIfPending( + deploymentName: string, + messageId: string, + reason: string +): Promise { + const key = turnMarkerKey(deploymentName, messageId); + try { + const sql = getDb(); + const emitted = await sql.begin(async (tx: DbClient) => { + const rows = await tx<{ action_input: unknown }>` + DELETE FROM public.runs + WHERE idempotency_key = ${key} + AND status = 'pending' + AND queue_name = ${TURN_TIMEOUT_QUEUE} + RETURNING action_input + `; + const routing = rows[0] ? asTurnRouting(rows[0].action_input) : null; + if (!routing) return false; + await enqueueTerminalError(tx, routing, reason); + return true; + }); + if (emitted) await notifyThreadResponse(); + return emitted; + } catch (err) { + logger.error( + { key, err: String(err) }, + "Failed to fail pending turn (startup-failure path)" + ); + return false; + } +} + +/** + * Atomically commit a worker's TERMINAL reply (success completion or explicit + * error) and discharge its marker(s) in ONE transaction. Two guarantees: + * + * - **Atomic** — reply insert + marker delete commit together, so a crash + * can't leave a surviving marker that the sweep would turn into a duplicate. + * - **First-writer-wins** — the reply is inserted ONLY if this transaction + * actually deleted a pending marker. If the sweep or fast path already + * terminalized the turn (deleted the marker + emitted an error), a late + * worker reply deletes 0 markers and is dropped instead of double-signalling. + * + * @returns whether the reply was emitted (false = turn already terminalized). + */ +export async function commitTerminalReply( + deploymentName: string, + messageIds: string[], + replyPayload: unknown, + organizationId: string | null +): Promise { + const sql = getDb(); + const emitted = await sql.begin(async (tx: DbClient) => { + let deleted = 0; + for (const messageId of messageIds) { + const rows = await tx<{ id: string }>` + DELETE FROM public.runs + WHERE idempotency_key = ${turnMarkerKey(deploymentName, messageId)} + AND status = 'pending' + AND queue_name = ${TURN_TIMEOUT_QUEUE} + RETURNING id + `; + deleted += rows.length; + } + if (deleted === 0) return false; // already terminalized — drop the late reply + await insertThreadResponseRow(tx, replyPayload, organizationId); + return true; + }); + if (emitted) await notifyThreadResponse(); + return emitted; +} + +/** Wake thread_response consumers immediately after committing an emit. */ +async function notifyThreadResponse(): Promise { + try { + const sql = getDb(); + await sql`SELECT pg_notify(${THREAD_RESPONSE_CHANNEL}, 'thread_response')`; + } catch { + // Non-fatal: consumers poll on their own interval and will pick it up. + } +} + +let sweepTimer: ReturnType | null = null; +let sweepInFlight = false; + +/** Start the periodic deadline backstop sweep. Idempotent. */ +export function startTurnTimeoutSweep(): void { + if (sweepTimer) return; + const tick = async () => { + if (sweepInFlight) return; + sweepInFlight = true; + try { + await sweepExpiredTurns(); + } finally { + sweepInFlight = false; + } + }; + void tick(); + sweepTimer = setInterval(tick, SWEEP_INTERVAL_MS); + sweepTimer.unref?.(); +} + +/** Stop the periodic sweep (graceful shutdown / tests). */ +export function stopTurnTimeoutSweep(): void { + if (sweepTimer) { + clearInterval(sweepTimer); + sweepTimer = null; + } +} diff --git a/packages/server/src/gateway/platform/unified-thread-consumer.ts b/packages/server/src/gateway/platform/unified-thread-consumer.ts index 06d36dd83..f533dec21 100644 --- a/packages/server/src/gateway/platform/unified-thread-consumer.ts +++ b/packages/server/src/gateway/platform/unified-thread-consumer.ts @@ -164,6 +164,54 @@ export class UnifiedThreadResponseConsumer { ? (data.platformMetadata.sessionId as string) : null; + // Owner-routing for API/SSE TERMINAL delivery (success completion OR + // error). The SseManager is per-pod and in-memory, so a terminal event + // only reaches the client if THIS pod holds the SSE connection. Under N>1 + // replicas the client's SSE (pod S, pinned by ClientIP affinity) and the + // pod that produced this row can differ — so a row claimed by a non-owning + // pod must re-queue until pod S claims it. This mirrors the platform + // canHandle re-queue (handleThreadResponse) and also fixes the pre-existing + // cross-pod success-completion drop on the API path. + // + // Scoped to TERMINAL API rows only: platform replies have their own + // connectionId owner-routing, and deltas stay best-effort (un-gated) to + // avoid per-delta re-queue churn. The re-queue relies on the owning pod + // eventually winning the SKIP-LOCKED claim; terminal sends use a short + // fixed retryDelay + raised retryLimit (see TERMINAL_DELIVERY_SEND_OPTS) so + // the window covers both cross-pod hand-off and the client's POST→connect + // gap. Sufficient at the small replica counts we run; a durable + // SSE-session→pod registry with targeted delivery is the future hardening + // for very large N. + // Detect API rows by platform OR teamId: the worker's HTTP response carries + // `teamId: "api"` but omits `platform`, while gateway-generated rows (e.g. + // turn-liveness errors) set `platform: "api"`. Matching only `platform` + // would leave NORMAL worker success/error rows un-gated → cross-pod drop. + // Exclude Chat SDK rows: routeToRenderer is also reached via the + // chatResponseBridge path, which already owner-routes by `connectionId` + // (canHandle re-queues to the managing instance). Gating those on SSE + // ownership too could re-queue a reply that was ready to deliver. The SSE + // owner-gate is only for pure API/SSE rows, which never carry a connectionId. + const isApiRow = + (data.platform || data.teamId) === "api" && + !data.platformMetadata?.connectionId; + if (isApiRow) { + const isTerminal = !!( + data.error || + (data.processedMessageIds && data.processedMessageIds.length) + ); + const sseKey = + (data.platformMetadata?.sessionId as string) || data.conversationId; + if ( + isTerminal && + sseKey && + !this.sseManager.hasActiveConnection(sseKey) + ) { + throw new Error( + `API SSE session ${sseKey} not owned by this gateway instance; re-queueing for owner delivery` + ); + } + } + if (data.customEvent) { const eventPayload = { ...data.customEvent.data,