Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions packages/server/src/gateway/__tests__/embedded-deployment.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,104 @@ describe("EmbeddedDeploymentManager", () => {
const list = await manager.listDeployments();
expect(list).toHaveLength(0);
});

// =====================================================================
// Snapshot-mode cross-pod gate
// =====================================================================
// A snapshot-writing turn (one that carries a `runId`) hydrates from and
// writes back to a SHARED Postgres snapshot. The cross-pod advisory lock
// — keyed on (org, agent, conversationId) — is the only thing stopping two
// replicas from both hydrating the same `completed` snapshot and writing
// divergent next snapshots (one reply silently wins). If org or
// conversationId is missing, the lock CANNOT be keyed, so the old code
// silently skipped it and spawned an UNGUARDED worker. The manager must
// now REFUSE to spawn instead (re-queueable failure), never run unguarded.
describe("snapshot-mode cross-pod gate", () => {
const snapshotBefore = process.env.LOBU_SESSION_STORE;
afterEach(() => {
if (snapshotBefore === undefined) delete process.env.LOBU_SESSION_STORE;
else process.env.LOBU_SESSION_STORE = snapshotBefore;
});

test("refuses to spawn a snapshot-writing turn when organizationId is missing", async () => {
delete process.env.LOBU_SESSION_STORE; // snapshot mode (default)
const msg = createTestMessagePayload({
runId: 42, // snapshot-writing turn
organizationId: undefined, // org missing → lock cannot be keyed
conversationId: "conv-1",
});
await expect(
manager.ensureDeployment("worker-1", "user-1", "user-1", msg)
).rejects.toThrow(OrchestratorError);
// No child spawned — the worker never ran unguarded.
expect(mockChildProcesses).toHaveLength(0);
expect(await manager.listDeployments()).toHaveLength(0);
});

test("refuses to spawn a snapshot-writing turn when conversationId is missing", async () => {
delete process.env.LOBU_SESSION_STORE;
const msg = createTestMessagePayload({
runId: 42,
organizationId: "org-1",
conversationId: undefined as unknown as string,
});
await expect(
manager.ensureDeployment("worker-1", "user-1", "user-1", msg)
).rejects.toThrow(OrchestratorError);
expect(mockChildProcesses).toHaveLength(0);
});

test("legacy direct-enqueue turn (no runId) still spawns even with no org — never writes a shared snapshot", async () => {
delete process.env.LOBU_SESSION_STORE; // snapshot mode on
const msg = createTestMessagePayload({
runId: undefined, // no shared snapshot write → no divergence risk
organizationId: undefined,
conversationId: "conv-1",
});
await manager.ensureDeployment("worker-1", "user-1", "user-1", msg);
expect(mockChildProcesses).toHaveLength(1);
});

test("file-mode (snapshot opted out) turn still spawns even with no org", async () => {
process.env.LOBU_SESSION_STORE = "file"; // legacy RWO-PVC single-writer
const msg = createTestMessagePayload({
runId: 42,
organizationId: undefined,
conversationId: "conv-1",
});
await manager.ensureDeployment("worker-1", "user-1", "user-1", msg);
expect(mockChildProcesses).toHaveLength(1);
});

// Two pods (two managers = two replicas) both receive the SAME
// org-less snapshot-writing turn for the same conversation. Pre-fix,
// each silently skipped the cross-pod lock and spawned its own worker —
// both hydrate the same `completed` snapshot and write divergent next
// snapshots (one reply silently wins). Post-fix, BOTH refuse: zero
// duplicate spawns, so the divergent-snapshot race can't occur.
test("two pods both refuse an org-less snapshot turn — no duplicate spawn across replicas", async () => {
delete process.env.LOBU_SESSION_STORE; // snapshot mode on both pods
const pod1 = new EmbeddedDeploymentManager(TEST_CONFIG);
const pod2 = new EmbeddedDeploymentManager(TEST_CONFIG);
const msg = createTestMessagePayload({
runId: 42,
organizationId: undefined,
conversationId: "conv-shared",
});

await expect(
pod1.ensureDeployment("worker-x", "user-1", "user-1", msg)
).rejects.toThrow(OrchestratorError);
await expect(
pod2.ensureDeployment("worker-x", "user-1", "user-1", msg)
).rejects.toThrow(OrchestratorError);

// Neither replica spawned a worker for the shared conversation.
expect(mockChildProcesses).toHaveLength(0);
expect(await pod1.listDeployments()).toHaveLength(0);
expect(await pod2.listDeployments()).toHaveLength(0);
});
});
});

// =========================================================================
Expand Down
38 changes: 21 additions & 17 deletions packages/server/src/gateway/__tests__/turn-liveness.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import { RunsQueue } from "../infrastructure/queue/runs-queue.js";
import {
armTurnTimeout,
commitTerminalReply,
dischargeTurn,
extendTurnDeadlines,
failTurnIfPending,
failTurnsForDeployment,
Expand Down Expand Up @@ -92,12 +91,25 @@ async function expireAllMarkers(): Promise<void> {
WHERE queue_name = ${TURN_TIMEOUT_QUEUE}`;
}

function reply(deploymentName: string, messageId: string) {
return {
messageId,
conversationId: `conv-${deploymentName}`,
platform: "api",
teamId: "api",
processedMessageIds: [messageId],
timestamp: Date.now(),
};
}

describe("turn-liveness", () => {
test("arm then discharge: a real reply leaves no marker and no error", async () => {
await armTurnTimeout(queue, routing("dep-1", "m1"));
expect(await markerCount("dep-1")).toBe(1);

await dischargeTurn("dep-1", "m1");
// A real terminal reply discharges the marker via the production path
// (commitTerminalReply atomically deletes the marker + inserts the reply).
await commitTerminalReply("dep-1", ["m1"], reply("dep-1", "m1"), null);
expect(await markerCount("dep-1")).toBe(0);
expect(await errorRowCount()).toBe(0);
});
Expand Down Expand Up @@ -128,24 +140,15 @@ describe("turn-liveness", () => {

test("failTurnIfPending does NOT double-signal when a worker already replied", async () => {
await armTurnTimeout(queue, routing("dep-4", "m"));
// Worker raced a real terminal reply → marker discharged.
await dischargeTurn("dep-4", "m");
// Worker raced a real terminal reply → marker discharged via the production
// commitTerminalReply path.
await commitTerminalReply("dep-4", ["m"], reply("dep-4", "m"), null);

expect(await failTurnIfPending("dep-4", "m", "startup failed")).toBe(false);
// commitTerminalReply emitted a (non-error) reply; no error row.
expect(await errorRowCount()).toBe(0);
});

function reply(deploymentName: string, messageId: string) {
return {
messageId,
conversationId: `conv-${deploymentName}`,
platform: "api",
teamId: "api",
processedMessageIds: [messageId],
timestamp: Date.now(),
};
}

test("commitTerminalReply atomically discharges the marker and enqueues the reply", async () => {
await armTurnTimeout(queue, routing("dep-5", "m"));

Expand Down Expand Up @@ -201,8 +204,9 @@ describe("turn-liveness", () => {
await armTurnTimeout(queue, routing("dep-B", "same"));
expect(await markerCount()).toBe(2);

// Discharging one conversation must not touch the other's marker.
await dischargeTurn("dep-A", "same");
// Discharging one conversation (via the production commitTerminalReply
// path) must not touch the other's marker.
await commitTerminalReply("dep-A", ["same"], reply("dep-A", "same"), null);
expect(await markerCount("dep-A")).toBe(0);
expect(await markerCount("dep-B")).toBe(1);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,36 @@ export class EmbeddedDeploymentManager extends BaseDeploymentManager {
typeof messageData?.organizationId === "string"
? messageData.organizationId
: null;
// A turn writes a SHARED snapshot only when it carries a `runId` (the
// worker's `writeSnapshot` bails otherwise — see the runId comment in
// MessageConsumer.handleMessage). Legacy direct-enqueue / unit-test
// turns leave `runId` undefined, never write a shared snapshot, and so
// can never produce the divergent-snapshot race the cross-pod lock
// guards against — they are safe to spawn without the lock even with no
// org/conversationId.
const writesSharedSnapshot =
snapshotModeEnabled && typeof messageData?.runId === "number";
// A snapshot-writing turn with org OR conversationId missing CANNOT take
// the cross-pod lock (the lock key is (org, agent, conversationId)). The
// old code silently SKIPPED the lock in that case, so two pods could
// both hydrate the same `completed` snapshot and write divergent next
// snapshots — one reply silently wins. Refuse to spawn instead: a
// re-queueable failure (mirrors the lock-busy throw below) so the runs
// queue retries rather than running an unguarded, divergence-prone
// worker. This is a misconfiguration in practice (snapshot turns always
// carry org + conversationId), so surfacing it beats silently diverging.
if (writesSharedSnapshot && (!organizationId || !conversationId)) {
logger.error(
`Refusing to spawn snapshot-mode worker ${deploymentName}: ` +
`cross-pod conversation lock requires both organizationId and ` +
`conversationId (org=${organizationId ?? "<missing>"}, ` +
`conv=${conversationId ?? "<missing>"})`
);
throw new OrchestratorError(
ErrorCode.DEPLOYMENT_CREATE_FAILED,
"Cannot acquire per-conversation lock: snapshot-mode turn is missing organizationId or conversationId"
);
}
let convLock: { release: () => Promise<void> } | null = null;
if (snapshotModeEnabled && organizationId && conversationId) {
try {
Expand Down
30 changes: 0 additions & 30 deletions packages/server/src/gateway/orchestration/turn-liveness.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,36 +128,6 @@ export async function armTurnTimeout(
});
}

/**
* Discharge a turn's obligation on a real terminal reply (worker success or an
* explicit worker error). Deletes the marker so neither the fast path nor the
* sweep emits a spurious error. Idempotent.
*/
export async function dischargeTurn(
deploymentName: string,
messageId: string
): Promise<void> {
const key = turnMarkerKey(deploymentName, messageId);
try {
const sql = getDb();
// `status = 'pending'` is required for the planner to use the PARTIAL index
// `runs_idempotency_key_uniq` (WHERE … status IN ('pending','claimed',
// 'running')) — without a status constraint it falls back to a seq scan
// over the full `runs` table. Markers are always pending (never claimed).
await sql`
DELETE FROM public.runs
WHERE idempotency_key = ${key}
AND status = 'pending'
AND queue_name = ${TURN_TIMEOUT_QUEUE}
`;
} catch (err) {
logger.warn(
{ key, err: String(err) },
"Failed to discharge turn-timeout marker"
);
}
}

/**
* Push the deadline forward for all in-flight turns of a deployment. Called on
* the worker's heartbeat ACK — a worker-driven liveness signal, so a live but
Expand Down
Loading