From 835d2acc042d86b8e75e261943616bf15df577f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Burak=20Emre=20Kabakc=C4=B1?= Date: Tue, 26 May 2026 04:40:54 +0100 Subject: [PATCH 1/2] refactor(server): remove dead non-atomic dischargeTurn from turn-liveness dischargeTurn was a standalone DELETE of the turn-liveness marker with no reply insert. Production discharge has gone through commitTerminalReply (atomic delete+insert) since it was wired in gateway/index.ts; the only remaining references to dischargeTurn were its definition and three test cases. Delete the dead function and rewrite those tests to discharge via the production commitTerminalReply path instead, so they exercise real behaviour rather than a non-atomic shortcut. --- .../gateway/__tests__/turn-liveness.test.ts | 38 ++++++++++--------- .../gateway/orchestration/turn-liveness.ts | 30 --------------- 2 files changed, 21 insertions(+), 47 deletions(-) diff --git a/packages/server/src/gateway/__tests__/turn-liveness.test.ts b/packages/server/src/gateway/__tests__/turn-liveness.test.ts index d3cbf244d..095da5b53 100644 --- a/packages/server/src/gateway/__tests__/turn-liveness.test.ts +++ b/packages/server/src/gateway/__tests__/turn-liveness.test.ts @@ -19,7 +19,6 @@ import { RunsQueue } from "../infrastructure/queue/runs-queue.js"; import { armTurnTimeout, commitTerminalReply, - dischargeTurn, extendTurnDeadlines, failTurnIfPending, failTurnsForDeployment, @@ -92,12 +91,25 @@ async function expireAllMarkers(): Promise { WHERE queue_name = ${TURN_TIMEOUT_QUEUE}`; } +function reply(deploymentName: string, messageId: string) { + return { + messageId, + conversationId: `conv-${deploymentName}`, + platform: "api", + teamId: "api", + processedMessageIds: [messageId], + timestamp: Date.now(), + }; +} + describe("turn-liveness", () => { test("arm then discharge: a real reply leaves no marker and no error", async () => { await armTurnTimeout(queue, routing("dep-1", "m1")); expect(await markerCount("dep-1")).toBe(1); - await dischargeTurn("dep-1", "m1"); + // A real terminal reply discharges the marker via the production path + // (commitTerminalReply atomically deletes the marker + inserts the reply). + await commitTerminalReply("dep-1", ["m1"], reply("dep-1", "m1"), null); expect(await markerCount("dep-1")).toBe(0); expect(await errorRowCount()).toBe(0); }); @@ -128,24 +140,15 @@ describe("turn-liveness", () => { test("failTurnIfPending does NOT double-signal when a worker already replied", async () => { await armTurnTimeout(queue, routing("dep-4", "m")); - // Worker raced a real terminal reply → marker discharged. - await dischargeTurn("dep-4", "m"); + // Worker raced a real terminal reply → marker discharged via the production + // commitTerminalReply path. + await commitTerminalReply("dep-4", ["m"], reply("dep-4", "m"), null); expect(await failTurnIfPending("dep-4", "m", "startup failed")).toBe(false); + // commitTerminalReply emitted a (non-error) reply; no error row. expect(await errorRowCount()).toBe(0); }); - function reply(deploymentName: string, messageId: string) { - return { - messageId, - conversationId: `conv-${deploymentName}`, - platform: "api", - teamId: "api", - processedMessageIds: [messageId], - timestamp: Date.now(), - }; - } - test("commitTerminalReply atomically discharges the marker and enqueues the reply", async () => { await armTurnTimeout(queue, routing("dep-5", "m")); @@ -201,8 +204,9 @@ describe("turn-liveness", () => { await armTurnTimeout(queue, routing("dep-B", "same")); expect(await markerCount()).toBe(2); - // Discharging one conversation must not touch the other's marker. - await dischargeTurn("dep-A", "same"); + // Discharging one conversation (via the production commitTerminalReply + // path) must not touch the other's marker. + await commitTerminalReply("dep-A", ["same"], reply("dep-A", "same"), null); expect(await markerCount("dep-A")).toBe(0); expect(await markerCount("dep-B")).toBe(1); }); diff --git a/packages/server/src/gateway/orchestration/turn-liveness.ts b/packages/server/src/gateway/orchestration/turn-liveness.ts index a85394e79..a0f66ad98 100644 --- a/packages/server/src/gateway/orchestration/turn-liveness.ts +++ b/packages/server/src/gateway/orchestration/turn-liveness.ts @@ -128,36 +128,6 @@ export async function armTurnTimeout( }); } -/** - * Discharge a turn's obligation on a real terminal reply (worker success or an - * explicit worker error). Deletes the marker so neither the fast path nor the - * sweep emits a spurious error. Idempotent. - */ -export async function dischargeTurn( - deploymentName: string, - messageId: string -): Promise { - const key = turnMarkerKey(deploymentName, messageId); - try { - const sql = getDb(); - // `status = 'pending'` is required for the planner to use the PARTIAL index - // `runs_idempotency_key_uniq` (WHERE … status IN ('pending','claimed', - // 'running')) — without a status constraint it falls back to a seq scan - // over the full `runs` table. Markers are always pending (never claimed). - await sql` - DELETE FROM public.runs - WHERE idempotency_key = ${key} - AND status = 'pending' - AND queue_name = ${TURN_TIMEOUT_QUEUE} - `; - } catch (err) { - logger.warn( - { key, err: String(err) }, - "Failed to discharge turn-timeout marker" - ); - } -} - /** * Push the deadline forward for all in-flight turns of a deployment. Called on * the worker's heartbeat ACK — a worker-driven liveness signal, so a live but From 0d252bcbebb76c761cef8741b34697a412818f40 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Burak=20Emre=20Kabakc=C4=B1?= Date: Tue, 26 May 2026 04:41:07 +0100 Subject: [PATCH 2/2] fix(server): refuse snapshot-mode spawn when org/conversationId missing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The cross-pod conversation lock — keyed on (org, agent, conversationId) — is the only thing stopping two replicas from both hydrating the same `completed` snapshot and writing divergent next snapshots (one reply silently wins). spawnDeployment only ran that lock when both organizationId AND conversationId were present; when either was missing it silently SKIPPED the lock and spawned an unguarded worker, so two pods could both spawn for the same conversation. A turn only writes a SHARED snapshot when it carries a runId (the worker's writeSnapshot bails otherwise). Scope the guard to exactly that case: under snapshot mode, a runId-bearing turn missing org or conversationId now throws a re-queueable OrchestratorError instead of spawning unguarded. Legacy direct-enqueue / file-mode turns (no runId, or snapshot opted out) never write a shared snapshot and are unaffected. Reproducer: two-pod unit test (two managers) shows both replicas refuse an org-less snapshot turn for the same conversation — zero duplicate spawns; plus per-field refusal tests and legacy/file-mode pass-through tests. --- .../__tests__/embedded-deployment.test.ts | 98 +++++++++++++++++++ .../orchestration/impl/embedded-deployment.ts | 30 ++++++ 2 files changed, 128 insertions(+) diff --git a/packages/server/src/gateway/__tests__/embedded-deployment.test.ts b/packages/server/src/gateway/__tests__/embedded-deployment.test.ts index 85256e0b1..e0e72152b 100644 --- a/packages/server/src/gateway/__tests__/embedded-deployment.test.ts +++ b/packages/server/src/gateway/__tests__/embedded-deployment.test.ts @@ -268,6 +268,104 @@ describe("EmbeddedDeploymentManager", () => { const list = await manager.listDeployments(); expect(list).toHaveLength(0); }); + + // ===================================================================== + // Snapshot-mode cross-pod gate + // ===================================================================== + // A snapshot-writing turn (one that carries a `runId`) hydrates from and + // writes back to a SHARED Postgres snapshot. The cross-pod advisory lock + // — keyed on (org, agent, conversationId) — is the only thing stopping two + // replicas from both hydrating the same `completed` snapshot and writing + // divergent next snapshots (one reply silently wins). If org or + // conversationId is missing, the lock CANNOT be keyed, so the old code + // silently skipped it and spawned an UNGUARDED worker. The manager must + // now REFUSE to spawn instead (re-queueable failure), never run unguarded. + describe("snapshot-mode cross-pod gate", () => { + const snapshotBefore = process.env.LOBU_SESSION_STORE; + afterEach(() => { + if (snapshotBefore === undefined) delete process.env.LOBU_SESSION_STORE; + else process.env.LOBU_SESSION_STORE = snapshotBefore; + }); + + test("refuses to spawn a snapshot-writing turn when organizationId is missing", async () => { + delete process.env.LOBU_SESSION_STORE; // snapshot mode (default) + const msg = createTestMessagePayload({ + runId: 42, // snapshot-writing turn + organizationId: undefined, // org missing → lock cannot be keyed + conversationId: "conv-1", + }); + await expect( + manager.ensureDeployment("worker-1", "user-1", "user-1", msg) + ).rejects.toThrow(OrchestratorError); + // No child spawned — the worker never ran unguarded. + expect(mockChildProcesses).toHaveLength(0); + expect(await manager.listDeployments()).toHaveLength(0); + }); + + test("refuses to spawn a snapshot-writing turn when conversationId is missing", async () => { + delete process.env.LOBU_SESSION_STORE; + const msg = createTestMessagePayload({ + runId: 42, + organizationId: "org-1", + conversationId: undefined as unknown as string, + }); + await expect( + manager.ensureDeployment("worker-1", "user-1", "user-1", msg) + ).rejects.toThrow(OrchestratorError); + expect(mockChildProcesses).toHaveLength(0); + }); + + test("legacy direct-enqueue turn (no runId) still spawns even with no org — never writes a shared snapshot", async () => { + delete process.env.LOBU_SESSION_STORE; // snapshot mode on + const msg = createTestMessagePayload({ + runId: undefined, // no shared snapshot write → no divergence risk + organizationId: undefined, + conversationId: "conv-1", + }); + await manager.ensureDeployment("worker-1", "user-1", "user-1", msg); + expect(mockChildProcesses).toHaveLength(1); + }); + + test("file-mode (snapshot opted out) turn still spawns even with no org", async () => { + process.env.LOBU_SESSION_STORE = "file"; // legacy RWO-PVC single-writer + const msg = createTestMessagePayload({ + runId: 42, + organizationId: undefined, + conversationId: "conv-1", + }); + await manager.ensureDeployment("worker-1", "user-1", "user-1", msg); + expect(mockChildProcesses).toHaveLength(1); + }); + + // Two pods (two managers = two replicas) both receive the SAME + // org-less snapshot-writing turn for the same conversation. Pre-fix, + // each silently skipped the cross-pod lock and spawned its own worker — + // both hydrate the same `completed` snapshot and write divergent next + // snapshots (one reply silently wins). Post-fix, BOTH refuse: zero + // duplicate spawns, so the divergent-snapshot race can't occur. + test("two pods both refuse an org-less snapshot turn — no duplicate spawn across replicas", async () => { + delete process.env.LOBU_SESSION_STORE; // snapshot mode on both pods + const pod1 = new EmbeddedDeploymentManager(TEST_CONFIG); + const pod2 = new EmbeddedDeploymentManager(TEST_CONFIG); + const msg = createTestMessagePayload({ + runId: 42, + organizationId: undefined, + conversationId: "conv-shared", + }); + + await expect( + pod1.ensureDeployment("worker-x", "user-1", "user-1", msg) + ).rejects.toThrow(OrchestratorError); + await expect( + pod2.ensureDeployment("worker-x", "user-1", "user-1", msg) + ).rejects.toThrow(OrchestratorError); + + // Neither replica spawned a worker for the shared conversation. + expect(mockChildProcesses).toHaveLength(0); + expect(await pod1.listDeployments()).toHaveLength(0); + expect(await pod2.listDeployments()).toHaveLength(0); + }); + }); }); // ========================================================================= diff --git a/packages/server/src/gateway/orchestration/impl/embedded-deployment.ts b/packages/server/src/gateway/orchestration/impl/embedded-deployment.ts index 842a65650..a95b3f4a1 100644 --- a/packages/server/src/gateway/orchestration/impl/embedded-deployment.ts +++ b/packages/server/src/gateway/orchestration/impl/embedded-deployment.ts @@ -616,6 +616,36 @@ export class EmbeddedDeploymentManager extends BaseDeploymentManager { typeof messageData?.organizationId === "string" ? messageData.organizationId : null; + // A turn writes a SHARED snapshot only when it carries a `runId` (the + // worker's `writeSnapshot` bails otherwise — see the runId comment in + // MessageConsumer.handleMessage). Legacy direct-enqueue / unit-test + // turns leave `runId` undefined, never write a shared snapshot, and so + // can never produce the divergent-snapshot race the cross-pod lock + // guards against — they are safe to spawn without the lock even with no + // org/conversationId. + const writesSharedSnapshot = + snapshotModeEnabled && typeof messageData?.runId === "number"; + // A snapshot-writing turn with org OR conversationId missing CANNOT take + // the cross-pod lock (the lock key is (org, agent, conversationId)). The + // old code silently SKIPPED the lock in that case, so two pods could + // both hydrate the same `completed` snapshot and write divergent next + // snapshots — one reply silently wins. Refuse to spawn instead: a + // re-queueable failure (mirrors the lock-busy throw below) so the runs + // queue retries rather than running an unguarded, divergence-prone + // worker. This is a misconfiguration in practice (snapshot turns always + // carry org + conversationId), so surfacing it beats silently diverging. + if (writesSharedSnapshot && (!organizationId || !conversationId)) { + logger.error( + `Refusing to spawn snapshot-mode worker ${deploymentName}: ` + + `cross-pod conversation lock requires both organizationId and ` + + `conversationId (org=${organizationId ?? ""}, ` + + `conv=${conversationId ?? ""})` + ); + throw new OrchestratorError( + ErrorCode.DEPLOYMENT_CREATE_FAILED, + "Cannot acquire per-conversation lock: snapshot-mode turn is missing organizationId or conversationId" + ); + } let convLock: { release: () => Promise } | null = null; if (snapshotModeEnabled && organizationId && conversationId) { try {