diff --git a/packages/server/src/gateway/__tests__/embedded-deployment.test.ts b/packages/server/src/gateway/__tests__/embedded-deployment.test.ts index 85256e0b1..e0e72152b 100644 --- a/packages/server/src/gateway/__tests__/embedded-deployment.test.ts +++ b/packages/server/src/gateway/__tests__/embedded-deployment.test.ts @@ -268,6 +268,104 @@ describe("EmbeddedDeploymentManager", () => { const list = await manager.listDeployments(); expect(list).toHaveLength(0); }); + + // ===================================================================== + // Snapshot-mode cross-pod gate + // ===================================================================== + // A snapshot-writing turn (one that carries a `runId`) hydrates from and + // writes back to a SHARED Postgres snapshot. The cross-pod advisory lock + // — keyed on (org, agent, conversationId) — is the only thing stopping two + // replicas from both hydrating the same `completed` snapshot and writing + // divergent next snapshots (one reply silently wins). If org or + // conversationId is missing, the lock CANNOT be keyed, so the old code + // silently skipped it and spawned an UNGUARDED worker. The manager must + // now REFUSE to spawn instead (re-queueable failure), never run unguarded. + describe("snapshot-mode cross-pod gate", () => { + const snapshotBefore = process.env.LOBU_SESSION_STORE; + afterEach(() => { + if (snapshotBefore === undefined) delete process.env.LOBU_SESSION_STORE; + else process.env.LOBU_SESSION_STORE = snapshotBefore; + }); + + test("refuses to spawn a snapshot-writing turn when organizationId is missing", async () => { + delete process.env.LOBU_SESSION_STORE; // snapshot mode (default) + const msg = createTestMessagePayload({ + runId: 42, // snapshot-writing turn + organizationId: undefined, // org missing → lock cannot be keyed + conversationId: "conv-1", + }); + await expect( + manager.ensureDeployment("worker-1", "user-1", "user-1", msg) + ).rejects.toThrow(OrchestratorError); + // No child spawned — the worker never ran unguarded. + expect(mockChildProcesses).toHaveLength(0); + expect(await manager.listDeployments()).toHaveLength(0); + }); + + test("refuses to spawn a snapshot-writing turn when conversationId is missing", async () => { + delete process.env.LOBU_SESSION_STORE; + const msg = createTestMessagePayload({ + runId: 42, + organizationId: "org-1", + conversationId: undefined as unknown as string, + }); + await expect( + manager.ensureDeployment("worker-1", "user-1", "user-1", msg) + ).rejects.toThrow(OrchestratorError); + expect(mockChildProcesses).toHaveLength(0); + }); + + test("legacy direct-enqueue turn (no runId) still spawns even with no org — never writes a shared snapshot", async () => { + delete process.env.LOBU_SESSION_STORE; // snapshot mode on + const msg = createTestMessagePayload({ + runId: undefined, // no shared snapshot write → no divergence risk + organizationId: undefined, + conversationId: "conv-1", + }); + await manager.ensureDeployment("worker-1", "user-1", "user-1", msg); + expect(mockChildProcesses).toHaveLength(1); + }); + + test("file-mode (snapshot opted out) turn still spawns even with no org", async () => { + process.env.LOBU_SESSION_STORE = "file"; // legacy RWO-PVC single-writer + const msg = createTestMessagePayload({ + runId: 42, + organizationId: undefined, + conversationId: "conv-1", + }); + await manager.ensureDeployment("worker-1", "user-1", "user-1", msg); + expect(mockChildProcesses).toHaveLength(1); + }); + + // Two pods (two managers = two replicas) both receive the SAME + // org-less snapshot-writing turn for the same conversation. Pre-fix, + // each silently skipped the cross-pod lock and spawned its own worker — + // both hydrate the same `completed` snapshot and write divergent next + // snapshots (one reply silently wins). Post-fix, BOTH refuse: zero + // duplicate spawns, so the divergent-snapshot race can't occur. + test("two pods both refuse an org-less snapshot turn — no duplicate spawn across replicas", async () => { + delete process.env.LOBU_SESSION_STORE; // snapshot mode on both pods + const pod1 = new EmbeddedDeploymentManager(TEST_CONFIG); + const pod2 = new EmbeddedDeploymentManager(TEST_CONFIG); + const msg = createTestMessagePayload({ + runId: 42, + organizationId: undefined, + conversationId: "conv-shared", + }); + + await expect( + pod1.ensureDeployment("worker-x", "user-1", "user-1", msg) + ).rejects.toThrow(OrchestratorError); + await expect( + pod2.ensureDeployment("worker-x", "user-1", "user-1", msg) + ).rejects.toThrow(OrchestratorError); + + // Neither replica spawned a worker for the shared conversation. + expect(mockChildProcesses).toHaveLength(0); + expect(await pod1.listDeployments()).toHaveLength(0); + expect(await pod2.listDeployments()).toHaveLength(0); + }); + }); }); // ========================================================================= diff --git a/packages/server/src/gateway/__tests__/turn-liveness.test.ts b/packages/server/src/gateway/__tests__/turn-liveness.test.ts index d3cbf244d..095da5b53 100644 --- a/packages/server/src/gateway/__tests__/turn-liveness.test.ts +++ b/packages/server/src/gateway/__tests__/turn-liveness.test.ts @@ -19,7 +19,6 @@ import { RunsQueue } from "../infrastructure/queue/runs-queue.js"; import { armTurnTimeout, commitTerminalReply, - dischargeTurn, extendTurnDeadlines, failTurnIfPending, failTurnsForDeployment, @@ -92,12 +91,25 @@ async function expireAllMarkers(): Promise { WHERE queue_name = ${TURN_TIMEOUT_QUEUE}`; } +function reply(deploymentName: string, messageId: string) { + return { + messageId, + conversationId: `conv-${deploymentName}`, + platform: "api", + teamId: "api", + processedMessageIds: [messageId], + timestamp: Date.now(), + }; +} + describe("turn-liveness", () => { test("arm then discharge: a real reply leaves no marker and no error", async () => { await armTurnTimeout(queue, routing("dep-1", "m1")); expect(await markerCount("dep-1")).toBe(1); - await dischargeTurn("dep-1", "m1"); + // A real terminal reply discharges the marker via the production path + // (commitTerminalReply atomically deletes the marker + inserts the reply). + await commitTerminalReply("dep-1", ["m1"], reply("dep-1", "m1"), null); expect(await markerCount("dep-1")).toBe(0); expect(await errorRowCount()).toBe(0); }); @@ -128,24 +140,15 @@ describe("turn-liveness", () => { test("failTurnIfPending does NOT double-signal when a worker already replied", async () => { await armTurnTimeout(queue, routing("dep-4", "m")); - // Worker raced a real terminal reply → marker discharged. - await dischargeTurn("dep-4", "m"); + // Worker raced a real terminal reply → marker discharged via the production + // commitTerminalReply path. + await commitTerminalReply("dep-4", ["m"], reply("dep-4", "m"), null); expect(await failTurnIfPending("dep-4", "m", "startup failed")).toBe(false); + // commitTerminalReply emitted a (non-error) reply; no error row. expect(await errorRowCount()).toBe(0); }); - function reply(deploymentName: string, messageId: string) { - return { - messageId, - conversationId: `conv-${deploymentName}`, - platform: "api", - teamId: "api", - processedMessageIds: [messageId], - timestamp: Date.now(), - }; - } - test("commitTerminalReply atomically discharges the marker and enqueues the reply", async () => { await armTurnTimeout(queue, routing("dep-5", "m")); @@ -201,8 +204,9 @@ describe("turn-liveness", () => { await armTurnTimeout(queue, routing("dep-B", "same")); expect(await markerCount()).toBe(2); - // Discharging one conversation must not touch the other's marker. - await dischargeTurn("dep-A", "same"); + // Discharging one conversation (via the production commitTerminalReply + // path) must not touch the other's marker. + await commitTerminalReply("dep-A", ["same"], reply("dep-A", "same"), null); expect(await markerCount("dep-A")).toBe(0); expect(await markerCount("dep-B")).toBe(1); }); diff --git a/packages/server/src/gateway/orchestration/impl/embedded-deployment.ts b/packages/server/src/gateway/orchestration/impl/embedded-deployment.ts index 842a65650..a95b3f4a1 100644 --- a/packages/server/src/gateway/orchestration/impl/embedded-deployment.ts +++ b/packages/server/src/gateway/orchestration/impl/embedded-deployment.ts @@ -616,6 +616,36 @@ export class EmbeddedDeploymentManager extends BaseDeploymentManager { typeof messageData?.organizationId === "string" ? messageData.organizationId : null; + // A turn writes a SHARED snapshot only when it carries a `runId` (the + // worker's `writeSnapshot` bails otherwise — see the runId comment in + // MessageConsumer.handleMessage). Legacy direct-enqueue / unit-test + // turns leave `runId` undefined, never write a shared snapshot, and so + // can never produce the divergent-snapshot race the cross-pod lock + // guards against — they are safe to spawn without the lock even with no + // org/conversationId. + const writesSharedSnapshot = + snapshotModeEnabled && typeof messageData?.runId === "number"; + // A snapshot-writing turn with org OR conversationId missing CANNOT take + // the cross-pod lock (the lock key is (org, agent, conversationId)). The + // old code silently SKIPPED the lock in that case, so two pods could + // both hydrate the same `completed` snapshot and write divergent next + // snapshots — one reply silently wins. Refuse to spawn instead: a + // re-queueable failure (mirrors the lock-busy throw below) so the runs + // queue retries rather than running an unguarded, divergence-prone + // worker. This is a misconfiguration in practice (snapshot turns always + // carry org + conversationId), so surfacing it beats silently diverging. + if (writesSharedSnapshot && (!organizationId || !conversationId)) { + logger.error( + `Refusing to spawn snapshot-mode worker ${deploymentName}: ` + + `cross-pod conversation lock requires both organizationId and ` + + `conversationId (org=${organizationId ?? ""}, ` + + `conv=${conversationId ?? ""})` + ); + throw new OrchestratorError( + ErrorCode.DEPLOYMENT_CREATE_FAILED, + "Cannot acquire per-conversation lock: snapshot-mode turn is missing organizationId or conversationId" + ); + } let convLock: { release: () => Promise } | null = null; if (snapshotModeEnabled && organizationId && conversationId) { try { diff --git a/packages/server/src/gateway/orchestration/turn-liveness.ts b/packages/server/src/gateway/orchestration/turn-liveness.ts index a85394e79..a0f66ad98 100644 --- a/packages/server/src/gateway/orchestration/turn-liveness.ts +++ b/packages/server/src/gateway/orchestration/turn-liveness.ts @@ -128,36 +128,6 @@ export async function armTurnTimeout( }); } -/** - * Discharge a turn's obligation on a real terminal reply (worker success or an - * explicit worker error). Deletes the marker so neither the fast path nor the - * sweep emits a spurious error. Idempotent. - */ -export async function dischargeTurn( - deploymentName: string, - messageId: string -): Promise { - const key = turnMarkerKey(deploymentName, messageId); - try { - const sql = getDb(); - // `status = 'pending'` is required for the planner to use the PARTIAL index - // `runs_idempotency_key_uniq` (WHERE … status IN ('pending','claimed', - // 'running')) — without a status constraint it falls back to a seq scan - // over the full `runs` table. Markers are always pending (never claimed). - await sql` - DELETE FROM public.runs - WHERE idempotency_key = ${key} - AND status = 'pending' - AND queue_name = ${TURN_TIMEOUT_QUEUE} - `; - } catch (err) { - logger.warn( - { key, err: String(err) }, - "Failed to discharge turn-timeout marker" - ); - } -} - /** * Push the deadline forward for all in-flight turns of a deployment. Called on * the worker's heartbeat ACK — a worker-driven liveness signal, so a live but