From 6bbd50845cdbde78389d49a3b67d07970734aeb0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Burak=20Emre=20Kabakc=C4=B1?= Date: Wed, 20 May 2026 03:44:59 +0100 Subject: [PATCH 1/7] fix(server): surface worker-startup-failure notice to API/CLI clients MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When background worker creation fails, `trackFailedDeployment` notifies the user via a `thread_response`. It wrote the message into the payload's `content` field — but `content` is documented (and implemented) as ephemeral-only: the response router renders it solely in the `ephemeral`-gated branch. A non-ephemeral `content` notice is therefore silently dropped on the direct-API/CLI SSE path, so the gateway emits a bare `complete` with no preceding content and `lobu chat` exits 0 with no output — the silent-success footgun in lobu-ai/lobu#946. Route the notice through the `error` field instead, which the pipeline already renders end-to-end: an `error` SSE event for direct-API clients (`lobu chat` prints "Agent error: …" and exits 1) and an "Error: …" message on chat platforms. This also matches the issue's expected outcome #2 (a clear, actionable error rather than empty success). Worker provisioning itself is unchanged: workers auto-provision on first message regardless of platform connections, so the issue's no-worker hypothesis is moot — the only defect was the dropped failure notice. --- .../worker-startup-failure-notice.test.ts | 123 ++++++++++++++++++ .../gateway/orchestration/message-consumer.ts | 11 +- 2 files changed, 132 insertions(+), 2 deletions(-) create mode 100644 packages/server/src/gateway/__tests__/worker-startup-failure-notice.test.ts diff --git a/packages/server/src/gateway/__tests__/worker-startup-failure-notice.test.ts b/packages/server/src/gateway/__tests__/worker-startup-failure-notice.test.ts new file mode 100644 index 000000000..137dde8b7 --- /dev/null +++ b/packages/server/src/gateway/__tests__/worker-startup-failure-notice.test.ts @@ -0,0 +1,123 @@ +import { beforeAll, describe, expect, mock, test } from "bun:test"; + +import { MessageConsumer } from "../orchestration/message-consumer.js"; +import { UnifiedThreadResponseConsumer } from "../platform/unified-thread-consumer.js"; + +// `new RunsQueue()` (built inside the MessageConsumer constructor) only guards +// on DATABASE_URL being present — it does not connect. We immediately replace +// the queue with a recording fake before exercising any method, so no Postgres +// is touched. +beforeAll(() => { + process.env.DATABASE_URL ||= "postgres://test/test"; +}); + +const apiPayloadBase = { + messageId: "m1", + channelId: "api_u1", + conversationId: "conv-1", + userId: "u1", + teamId: "api", + platform: "api", + timestamp: 0, + // Direct-API sessions carry no `sessionId` in platformMetadata, so the + // consumer's cli-session broadcasts are skipped and every SSE event comes + // from the renderer keyed on `conversationId`. + platformMetadata: {}, +}; + +function createApiConsumer() { + const queue = { + start: mock(async () => undefined), + stop: mock(async () => undefined), + createQueue: mock(async () => undefined), + work: mock(async () => undefined), + }; + const renderer = { + handleDelta: mock(async () => "m1"), + handleError: mock(async () => undefined), + handleCompletion: mock(async () => undefined), + handleEphemeral: mock(async () => undefined), + handleStatusUpdate: mock(async () => undefined), + }; + const platformRegistry = { + get: mock(() => ({ getResponseRenderer: () => renderer })), + }; + const sseManager = { broadcast: mock(() => undefined) }; + const consumer = new UnifiedThreadResponseConsumer( + queue as any, + platformRegistry as any, + sseManager as any + ) as any; + return { consumer, renderer, sseManager }; +} + +describe("worker-startup-failure notice reaches direct-API clients (lobu-ai/lobu#946)", () => { + test("producer emits the failure notice via `error`, not the ephemeral-only `content` field", async () => { + const consumer = new MessageConsumer({} as any, {} as any) as any; + const sends: Array<{ queue: string; data: Record }> = []; + consumer.queue = { + createQueue: mock(async () => undefined), + send: mock(async (queue: string, data: Record) => { + sends.push({ queue, data }); + }), + }; + + await consumer.trackFailedDeployment( + "deploy-1", + { + messageId: "m1", + userId: "u1", + channelId: "api_u1", + conversationId: "conv-1", + platform: "api", + platformMetadata: {}, + }, + new Error("spawn ENOENT") + ); + + const notice = sends.find((s) => s.queue === "thread_response"); + expect(notice).toBeDefined(); + // The fix: the notice rides the `error` field (rendered end-to-end), not + // `content` (only the ephemeral branch renders content -> silently dropped). + expect(notice?.data.error).toMatch(/Worker startup failed/); + expect(notice?.data.content).toBeUndefined(); + expect(notice?.data.processedMessageIds).toEqual(["m1"]); + }); + + test("an `error` notice is surfaced to the API renderer (handleError + completion)", async () => { + const { consumer, renderer } = createApiConsumer(); + + await consumer.handleThreadResponse({ + id: "job-err", + data: { + ...apiPayloadBase, + error: "Worker startup failed and your request could not be processed.", + processedMessageIds: ["m1"], + }, + }); + + expect(renderer.handleError).toHaveBeenCalledTimes(1); + expect(renderer.handleCompletion).toHaveBeenCalledTimes(1); + }); + + test("a non-ephemeral `content` notice is dropped — only completion fires (the original bug)", async () => { + const { consumer, renderer } = createApiConsumer(); + + await consumer.handleThreadResponse({ + id: "job-content", + data: { + ...apiPayloadBase, + // The pre-fix shape: a human-readable message in `content` with no + // `ephemeral` flag. Nothing renders it; the user only sees `complete`. + content: "Worker startup failed and your request could not be processed.", + processedMessageIds: ["m1"], + }, + }); + + expect(renderer.handleError).not.toHaveBeenCalled(); + expect(renderer.handleDelta).not.toHaveBeenCalled(); + expect(renderer.handleEphemeral).not.toHaveBeenCalled(); + // Completion still fires (this is why the gateway returns a bare `complete`). + expect(renderer.handleCompletion).toHaveBeenCalledTimes(1); + }); +}); diff --git a/packages/server/src/gateway/orchestration/message-consumer.ts b/packages/server/src/gateway/orchestration/message-consumer.ts index 1c1966191..86a0513b9 100644 --- a/packages/server/src/gateway/orchestration/message-consumer.ts +++ b/packages/server/src/gateway/orchestration/message-consumer.ts @@ -640,7 +640,14 @@ export class MessageConsumer { const userMessage = "Worker startup failed and your request could not be processed. Please retry in a moment."; - // Notify user that their message could not be processed + // Notify user that their message could not be processed. + // This MUST go through the `error` field, not `content`: `content` is + // only rendered by the ephemeral branch of the response router, so a + // non-ephemeral `content` notice is silently dropped on the API/CLI SSE + // path — the gateway returns a bare `complete` and `lobu chat` exits 0 + // with no output (lobu-ai/lobu#946). `error` is surfaced end-to-end: + // an `error` SSE event for direct API clients (CLI prints it + exits 1) + // and an "Error: …" message on chat platforms. try { const responseQueue = "thread_response"; await this.queue.createQueue(responseQueue); @@ -651,7 +658,7 @@ export class MessageConsumer { conversationId: data.conversationId, platform: data.platform, platformMetadata: data.platformMetadata, - content: userMessage, + error: userMessage, processedMessageIds: [data.messageId], }); } catch (notifyError) { From 3d63931e9b1e3e2b43eb5492d615e2f7d40ccdf6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Burak=20Emre=20Kabakc=C4=B1?= Date: Wed, 20 May 2026 03:53:46 +0100 Subject: [PATCH 2/7] fix(server): render non-ephemeral `content` payloads end-to-end MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The response router (`routeToRenderer`) had branches for `delta`, `ephemeral`-gated `content`, `error`, and `processedMessageIds` (completion) — but none for a plain, non-ephemeral `content` message. Any payload that carried user-facing text in `content` therefore fell straight through to the completion branch and the user saw a bare `complete` with no content. This is the general defect behind lobu-ai/lobu#946: changing one producer only patches one call site, so make `content` a first-class render path instead. - Add optional `handleContent` to the `ResponseRenderer` interface. - `ApiResponseRenderer.handleContent` broadcasts the message as an `output` SSE event (direct-API/CLI clients render it like a streamed delta). - `ChatResponseBridge.handleContent` posts it in-thread; the ephemeral and content paths now share one `postBufferedContent` helper. - Router renders `content` (then falls through to completion so the turn still terminates); the `ephemeral` branch keeps its own handling. Invariant: a `thread_response` carrying `delta`, `content`, or `error` is always delivered — no field is silently dropped. Producers pick the field by intent (`error` = failure/exit 1, `content` = neutral notice/exit 0); the worker-startup-failure notice stays on `error`, and the guardrail input-trip notice (on `content`) is now surfaced too instead of vanishing. --- .../worker-startup-failure-notice.test.ts | 37 +++++++++++++++---- .../src/gateway/api/response-renderer.ts | 29 +++++++++++++++ .../connections/chat-response-bridge.ts | 32 ++++++++++++++-- .../src/gateway/platform/response-renderer.ts | 17 +++++++++ .../platform/unified-thread-consumer.ts | 20 ++++++++++ 5 files changed, 123 insertions(+), 12 deletions(-) diff --git a/packages/server/src/gateway/__tests__/worker-startup-failure-notice.test.ts b/packages/server/src/gateway/__tests__/worker-startup-failure-notice.test.ts index 137dde8b7..605f09e81 100644 --- a/packages/server/src/gateway/__tests__/worker-startup-failure-notice.test.ts +++ b/packages/server/src/gateway/__tests__/worker-startup-failure-notice.test.ts @@ -34,6 +34,7 @@ function createApiConsumer() { }; const renderer = { handleDelta: mock(async () => "m1"), + handleContent: mock(async () => undefined), handleError: mock(async () => undefined), handleCompletion: mock(async () => undefined), handleEphemeral: mock(async () => undefined), @@ -100,24 +101,44 @@ describe("worker-startup-failure notice reaches direct-API clients (lobu-ai/lobu expect(renderer.handleCompletion).toHaveBeenCalledTimes(1); }); - test("a non-ephemeral `content` notice is dropped — only completion fires (the original bug)", async () => { - const { consumer, renderer } = createApiConsumer(); + test("a non-ephemeral `content` notice is surfaced via handleContent, then completes (scalable router fix)", async () => { + const { consumer, renderer, sseManager } = createApiConsumer(); await consumer.handleThreadResponse({ id: "job-content", data: { ...apiPayloadBase, - // The pre-fix shape: a human-readable message in `content` with no - // `ephemeral` flag. Nothing renders it; the user only sees `complete`. - content: "Worker startup failed and your request could not be processed.", + // A human-readable message in `content` with no `ephemeral` flag. + // Pre-fix this was dropped (only `complete` fired); now the router + // renders it through handleContent so no notice silently vanishes. + content: "A buffered, non-streamed notice for the user.", processedMessageIds: ["m1"], }, }); - expect(renderer.handleError).not.toHaveBeenCalled(); + expect(renderer.handleContent).toHaveBeenCalledTimes(1); + // Falls through to completion so the turn still terminates. + expect(renderer.handleCompletion).toHaveBeenCalledTimes(1); + // Not misrouted as a stream chunk, ephemeral, or error. expect(renderer.handleDelta).not.toHaveBeenCalled(); expect(renderer.handleEphemeral).not.toHaveBeenCalled(); - // Completion still fires (this is why the gateway returns a bare `complete`). - expect(renderer.handleCompletion).toHaveBeenCalledTimes(1); + expect(renderer.handleError).not.toHaveBeenCalled(); + void sseManager; + }); + + test("an `ephemeral` content payload still routes to handleEphemeral, not handleContent", async () => { + const { consumer, renderer } = createApiConsumer(); + + await consumer.handleThreadResponse({ + id: "job-ephemeral", + data: { + ...apiPayloadBase, + ephemeral: true, + content: "Visit https://example.com to authorize.", + }, + }); + + expect(renderer.handleEphemeral).toHaveBeenCalledTimes(1); + expect(renderer.handleContent).not.toHaveBeenCalled(); }); }); diff --git a/packages/server/src/gateway/api/response-renderer.ts b/packages/server/src/gateway/api/response-renderer.ts index 2ea6df478..6267dfe89 100644 --- a/packages/server/src/gateway/api/response-renderer.ts +++ b/packages/server/src/gateway/api/response-renderer.ts @@ -55,6 +55,35 @@ export class ApiResponseRenderer implements ResponseRenderer { return payload.messageId; } + /** + * Handle a complete, non-streamed message (the `content` field). + * Broadcasts it to SSE clients as an `output` event so direct-API/CLI + * clients render it identically to streamed deltas. + */ + async handleContent( + payload: ThreadResponsePayload, + _sessionKey: string + ): Promise { + const sessionId = + (payload.platformMetadata?.sessionId as string) || payload.conversationId; + + if (!sessionId) { + logger.warn("No session ID found in payload for content broadcast"); + return; + } + + this.sseManager.broadcast(sessionId, "output", { + type: "delta", + content: payload.content, + timestamp: payload.timestamp || Date.now(), + messageId: payload.messageId, + }); + + logger.debug( + `Broadcast content to session ${sessionId}: ${payload.content?.length || 0} chars` + ); + } + /** * Handle completion of response processing * Sends completion event to SSE clients diff --git a/packages/server/src/gateway/connections/chat-response-bridge.ts b/packages/server/src/gateway/connections/chat-response-bridge.ts index f81117828..e92c5771f 100644 --- a/packages/server/src/gateway/connections/chat-response-bridge.ts +++ b/packages/server/src/gateway/connections/chat-response-bridge.ts @@ -598,6 +598,32 @@ export class ChatResponseBridge implements ResponseRenderer { } async handleEphemeral(payload: ThreadResponsePayload): Promise { + await this.postBufferedContent(payload, "ephemeral"); + } + + /** + * Handle a complete, non-streamed message (the `content` field) — e.g. a + * gateway-originated notice the worker never streamed as deltas. Posted as + * a normal in-thread message, identical to the ephemeral path. + */ + async handleContent( + payload: ThreadResponsePayload, + _sessionKey: string + ): Promise { + await this.postBufferedContent(payload, "content"); + } + + // --- Private --- + + /** + * Post a complete `content` message to the platform target. Shared by the + * ephemeral and content render paths — both deliver a one-shot buffered + * message (with optional settings link-buttons), they differ only in intent. + */ + private async postBufferedContent( + payload: ThreadResponsePayload, + label: "ephemeral" | "content" + ): Promise { if (!payload.content) return; const ctx = this.extractResponseContext(payload); @@ -641,7 +667,7 @@ export class ChatResponseBridge implements ResponseRenderer { } catch (error) { logger.warn( { connectionId, error: String(error) }, - "Failed to render ephemeral settings button" + `Failed to render ${label} settings button` ); const fallbackText = `${processedContent}\n\n${linkButtons.map((button) => `${button.text}: ${button.url}`).join("\n")}`; await target.post(fallbackText.trim()); @@ -654,13 +680,11 @@ export class ChatResponseBridge implements ResponseRenderer { } catch (error) { logger.error( { connectionId, error: String(error) }, - "Failed to send ephemeral message" + `Failed to send ${label} message` ); } } - // --- Private --- - private async resolveTarget( instance: any, channelId: string, diff --git a/packages/server/src/gateway/platform/response-renderer.ts b/packages/server/src/gateway/platform/response-renderer.ts index a245ba655..f29ee9400 100644 --- a/packages/server/src/gateway/platform/response-renderer.ts +++ b/packages/server/src/gateway/platform/response-renderer.ts @@ -28,6 +28,23 @@ export interface ResponseRenderer { sessionKey: string ): Promise; + /** + * Handle a complete, non-streamed message (the `content` field). + * + * Unlike `handleDelta` (incremental stream chunk) or `handleEphemeral` + * (auth/OAuth notice gated on the `ephemeral` flag), this is a buffered, + * user-facing message delivered in one shot — e.g. a gateway-originated + * notice the worker never streamed as deltas. Without a renderer for it, + * such a payload is silently dropped and only `complete` reaches the user. + * + * @param payload - The thread response payload carrying `content` + * @param sessionKey - Unique key for this response session + */ + handleContent?( + payload: ThreadResponsePayload, + sessionKey: string + ): Promise; + /** * Handle completion of response processing. * Called when all content has been processed (processedMessageIds is set). diff --git a/packages/server/src/gateway/platform/unified-thread-consumer.ts b/packages/server/src/gateway/platform/unified-thread-consumer.ts index 06d36dd83..7abec1d8b 100644 --- a/packages/server/src/gateway/platform/unified-thread-consumer.ts +++ b/packages/server/src/gateway/platform/unified-thread-consumer.ts @@ -263,6 +263,26 @@ export class UnifiedThreadResponseConsumer { return; } + // Handle a complete, non-streamed message (the `content` field). + // Gateway-originated notices and any buffered (non-delta) message arrive + // here. Without this branch a content-bearing payload falls straight + // through to completion and the user sees a bare `complete` with no + // content — the silent-success bug in lobu-ai/lobu#946. Render it, then + // fall through so the same payload's `processedMessageIds` still + // terminates the turn. `ephemeral` content is handled by its own branch + // above and returns before reaching here. + if (data.content && !data.ephemeral && renderer.handleContent) { + if (cliSessionId) { + this.sseManager.broadcast(cliSessionId, "output", { + type: "delta", + content: data.content, + timestamp: data.timestamp, + messageId: data.messageId, + }); + } + await renderer.handleContent(data, sessionKey); + } + // Handle completion if (data.processedMessageIds?.length) { if (cliSessionId) { From 57a73badc5894e672917f2604d16eea1c7d70e3d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Burak=20Emre=20Kabakc=C4=B1?= Date: Wed, 20 May 2026 03:58:22 +0100 Subject: [PATCH 3/7] test(server): assert real ApiResponseRenderer emits the output SSE event for content --- .../worker-startup-failure-notice.test.ts | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/packages/server/src/gateway/__tests__/worker-startup-failure-notice.test.ts b/packages/server/src/gateway/__tests__/worker-startup-failure-notice.test.ts index 605f09e81..fdc55d0ea 100644 --- a/packages/server/src/gateway/__tests__/worker-startup-failure-notice.test.ts +++ b/packages/server/src/gateway/__tests__/worker-startup-failure-notice.test.ts @@ -1,5 +1,6 @@ import { beforeAll, describe, expect, mock, test } from "bun:test"; +import { ApiResponseRenderer } from "../api/response-renderer.js"; import { MessageConsumer } from "../orchestration/message-consumer.js"; import { UnifiedThreadResponseConsumer } from "../platform/unified-thread-consumer.js"; @@ -141,4 +142,34 @@ describe("worker-startup-failure notice reaches direct-API clients (lobu-ai/lobu expect(renderer.handleEphemeral).toHaveBeenCalledTimes(1); expect(renderer.handleContent).not.toHaveBeenCalled(); }); + + // Real renderer (no mock for the rendering step): confirm the actual SSE + // event `lobu chat` parses is emitted. The CLI's `output` case reads + // `data.content` (chat.ts), so the wire shape matters. + test("real ApiResponseRenderer.handleContent emits an `output` SSE event the CLI renders", async () => { + const broadcasts: Array<{ session: string; event: string; data: any }> = []; + const sseManager = { + broadcast: (session: string, event: string, data: unknown) => { + broadcasts.push({ session, event, data }); + }, + }; + const renderer = new ApiResponseRenderer(sseManager as any); + + await renderer.handleContent( + { + ...apiPayloadBase, + content: "Worker startup failed and your request could not be processed.", + } as any, + "u1:m1" + ); + + expect(broadcasts).toHaveLength(1); + expect(broadcasts[0]?.event).toBe("output"); + expect(broadcasts[0]?.session).toBe("conv-1"); + expect(broadcasts[0]?.data).toMatchObject({ + type: "delta", + content: "Worker startup failed and your request could not be processed.", + messageId: "m1", + }); + }); }); From a5d9371c2b2422f07232f37c9dc816843f527a6b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Burak=20Emre=20Kabakc=C4=B1?= Date: Wed, 20 May 2026 13:50:37 +0100 Subject: [PATCH 4/7] fix(server): surface unexpected worker death instead of hanging the run MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A worker that spawns successfully and then dies (crash on startup, OOM, external kill) never rejects createWorkerDeployment — the embedded manager declares the worker "ready" the moment spawn() returns, so trackFailedDeployment is never reached. The exit handler only logged the non-zero exit; the worker's queued message stranded with no terminal event and the run hung until the client's idle timeout. Live-reproduced against a PGlite gateway with a worker entrypoint that exits 1: the SSE stream sat on `connected` + pings forever. Add an unexpected-exit notifier: - `BaseDeploymentManager.setWorkerExitNotifier(...)` — orchestrator-wired hook. - Embedded manager flips an `intentionalExit` flag in `killWorker` (the sole kill chokepoint: scale-to-0, idle reap, delete), so only genuine crashes / external kills notify — never operator-driven stops or clean exits. - `MessageConsumer.notifyWorkerCrash` emits the notice via the `error` field (rendered end-to-end; `content` is ephemeral-only) for the deployment's conversation. Live result (same broken-worker setup): SSE now emits connected → error → complete within ~6s, and real `lobu chat` prints "Agent error: The worker handling your request stopped unexpectedly (exit code 1)…" and exits 1. --- .../worker-startup-failure-notice.test.ts | 37 +++++++++++++++- .../orchestration/base-deployment-manager.ts | 26 +++++++++++ .../orchestration/impl/embedded-deployment.ts | 36 +++++++++++++++ .../gateway/orchestration/message-consumer.ts | 44 +++++++++++++++++++ 4 files changed, 142 insertions(+), 1 deletion(-) diff --git a/packages/server/src/gateway/__tests__/worker-startup-failure-notice.test.ts b/packages/server/src/gateway/__tests__/worker-startup-failure-notice.test.ts index fdc55d0ea..3ada78266 100644 --- a/packages/server/src/gateway/__tests__/worker-startup-failure-notice.test.ts +++ b/packages/server/src/gateway/__tests__/worker-startup-failure-notice.test.ts @@ -55,7 +55,9 @@ function createApiConsumer() { describe("worker-startup-failure notice reaches direct-API clients (lobu-ai/lobu#946)", () => { test("producer emits the failure notice via `error`, not the ephemeral-only `content` field", async () => { - const consumer = new MessageConsumer({} as any, {} as any) as any; + const consumer = new MessageConsumer({} as any, { + setWorkerExitNotifier: () => undefined, + } as any) as any; const sends: Array<{ queue: string; data: Record }> = []; consumer.queue = { createQueue: mock(async () => undefined), @@ -86,6 +88,39 @@ describe("worker-startup-failure notice reaches direct-API clients (lobu-ai/lobu expect(notice?.data.processedMessageIds).toEqual(["m1"]); }); + test("unexpected worker exit (crash after spawn) notifies the conversation via `error`", async () => { + const consumer = new MessageConsumer({} as any, { + setWorkerExitNotifier: () => undefined, + } as any) as any; + const sends: Array<{ queue: string; data: Record }> = []; + consumer.queue = { + createQueue: mock(async () => undefined), + send: mock(async (queue: string, data: Record) => { + sends.push({ queue, data }); + }), + }; + + await consumer.notifyWorkerCrash({ + deploymentName: "lobu-worker-api-abc", + messageData: { + messageId: "m1", + userId: "u1", + channelId: "api_u1", + conversationId: "conv-1", + platform: "api", + platformMetadata: {}, + }, + reason: "exit code 1", + }); + + const notice = sends.find((s) => s.queue === "thread_response"); + expect(notice).toBeDefined(); + expect(notice?.data.error).toMatch(/stopped unexpectedly \(exit code 1\)/); + expect(notice?.data.content).toBeUndefined(); + expect(notice?.data.conversationId).toBe("conv-1"); + expect(notice?.data.processedMessageIds).toEqual(["m1"]); + }); + test("an `error` notice is surfaced to the API renderer (handleError + completion)", async () => { const { consumer, renderer } = createApiConsumer(); diff --git a/packages/server/src/gateway/orchestration/base-deployment-manager.ts b/packages/server/src/gateway/orchestration/base-deployment-manager.ts index 2ab004ead..55a7d2ffe 100644 --- a/packages/server/src/gateway/orchestration/base-deployment-manager.ts +++ b/packages/server/src/gateway/orchestration/base-deployment-manager.ts @@ -188,6 +188,32 @@ export abstract class BaseDeploymentManager { */ private inFlightCreates = new Map>(); + /** + * Notifier invoked when a worker subprocess dies unexpectedly (a crash or an + * external kill — NOT a deliberate scale-down / idle reap). A worker that + * spawns and then exits non-zero never reaches `trackFailedDeployment` + * (createWorkerDeployment already resolved on spawn), so without this hook + * its queued message strands with no terminal event and the run hangs + * (lobu-ai/lobu#946). The orchestrator wires this to surface an error to the + * affected conversation. Optional: unset = no notification (legacy behavior). + */ + protected workerExitNotifier?: (info: { + deploymentName: string; + messageData: MessagePayload; + reason: string; + }) => void | Promise; + + /** Register the unexpected-worker-exit notifier (see `workerExitNotifier`). */ + setWorkerExitNotifier( + notifier: (info: { + deploymentName: string; + messageData: MessagePayload; + reason: string; + }) => void | Promise + ): void { + this.workerExitNotifier = notifier; + } + constructor( config: OrchestratorConfig, moduleEnvVarsBuilder?: ModuleEnvVarsBuilder, diff --git a/packages/server/src/gateway/orchestration/impl/embedded-deployment.ts b/packages/server/src/gateway/orchestration/impl/embedded-deployment.ts index 3a86a059a..1f61d9fc2 100644 --- a/packages/server/src/gateway/orchestration/impl/embedded-deployment.ts +++ b/packages/server/src/gateway/orchestration/impl/embedded-deployment.ts @@ -137,6 +137,13 @@ interface EmbeddedWorkerEntry { * taken on the legacy single-replica / RWO-PVC path). */ releaseConvLock?: () => Promise; + /** + * Mark this worker's pending exit as deliberate (set by `killWorker` before + * SIGTERM). The `exit` handler reads it to distinguish an operator-driven + * stop (scale-to-0, idle reap, delete) from a genuine crash, so only crashes + * notify the user. Closure setter, same pattern as `releaseConvLock`. + */ + markIntentionalExit?: () => void; } /** Stable namespace id for `pg_advisory_lock(key1, key2)` per-conversation locks. */ @@ -779,6 +786,11 @@ export class EmbeddedDeploymentManager extends BaseDeploymentManager { } }; + // Flipped by `killWorker` (via the entry's `markIntentionalExit`) before a + // deliberate SIGTERM, so the `exit` handler only notifies the user on a + // genuine crash / external kill, never on an operator-driven stop. + let intentionalExit = false; + // Spawn errors (binary missing, EACCES, fork failure) fire on the child // *after* spawn() returns, so without an "error" listener Node would // throw an unhandled exception and crash the gateway. Drop the entry @@ -820,6 +832,21 @@ export class EmbeddedDeploymentManager extends BaseDeploymentManager { } else { logger.info(`Embedded worker ${deploymentName} exited cleanly`); } + + // Surface an unexpected worker death to the user. A worker that spawns + // and then dies never rejects createWorkerDeployment (it resolved on + // spawn) and so never reaches trackFailedDeployment; its queued message + // would otherwise strand with no terminal event and the run hangs + // (lobu-ai/lobu#946). Deliberate stops go through killWorker, which sets + // `intentionalExit`, so only genuine crashes / external kills notify. + // Note: a long-lived worker handles later turns too, so messageData is + // the deployment's originating message; the notifier keys delivery on + // its conversationId, which is stable across the conversation's turns. + const cleanExit = code === 0 && !signal; + if (!intentionalExit && !cleanExit && messageData) { + const reason = signal ? `signal ${signal}` : `exit code ${code}`; + void this.workerExitNotifier?.({ deploymentName, messageData, reason }); + } }); this.workers.set(deploymentName, { @@ -831,6 +858,9 @@ export class EmbeddedDeploymentManager extends BaseDeploymentManager { // tests. The exit handler is the authoritative release site; // killWorker no longer touches this field. ...(convLock ? { releaseConvLock: releaseLockOnce } : {}), + markIntentionalExit: () => { + intentionalExit = true; + }, }); logger.info( @@ -909,6 +939,12 @@ export class EmbeddedDeploymentManager extends BaseDeploymentManager { ): Promise { const child = entry.process; + // Mark the imminent exit as deliberate so the spawn's `exit` handler + // doesn't mistake this operator-driven stop for a crash and notify the + // user. Safe to call after the map delete below — the entry reference (and + // its closure setter) outlives the map entry. + entry.markIntentionalExit?.(); + // Delete from the map up front so callers see an empty // listDeployments() the moment kill returns — the public contract // hasn't changed. The lock release is deliberately NOT touched here diff --git a/packages/server/src/gateway/orchestration/message-consumer.ts b/packages/server/src/gateway/orchestration/message-consumer.ts index 86a0513b9..585bafe49 100644 --- a/packages/server/src/gateway/orchestration/message-consumer.ts +++ b/packages/server/src/gateway/orchestration/message-consumer.ts @@ -53,6 +53,50 @@ export class MessageConsumer { this.config = config; this.deploymentManager = deploymentManager; this.queue = new RunsQueue(); + // Surface unexpected worker deaths (crash / external kill after a + // successful spawn) to the affected conversation. Without this the worker's + // queued message strands with no terminal event and the run hangs — the + // deployment manager only logs the exit (lobu-ai/lobu#946). + this.deploymentManager.setWorkerExitNotifier((info) => + this.notifyWorkerCrash(info) + ); + } + + /** + * Emit a terminal error to the conversation whose worker died unexpectedly. + * Routed through the `error` field (not `content`, which only the ephemeral + * branch renders) so it surfaces end-to-end: an `error` SSE event for direct + * API clients (`lobu chat` prints it and exits 1) and an "Error: …" message + * on chat platforms. Best-effort — a notify failure must not crash the + * deployment manager's exit handler. + */ + private async notifyWorkerCrash(info: { + deploymentName: string; + messageData: MessagePayload; + reason: string; + }): Promise { + const { messageData, reason } = info; + if (!messageData.conversationId) return; + const userMessage = `The worker handling your request stopped unexpectedly (${reason}) before it could reply. Please retry in a moment.`; + try { + const responseQueue = "thread_response"; + await this.queue.createQueue(responseQueue); + await this.queue.send(responseQueue, { + messageId: messageData.messageId, + userId: messageData.userId, + channelId: messageData.channelId, + conversationId: messageData.conversationId, + platform: messageData.platform, + platformMetadata: messageData.platformMetadata, + error: userMessage, + processedMessageIds: [messageData.messageId], + }); + } catch (err) { + logger.error( + { deploymentName: info.deploymentName, err: String(err) }, + "Failed to notify user of unexpected worker exit" + ); + } } /** From 5bba5c438ad6e3e69037d9466507989dc09ac111 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Burak=20Emre=20Kabakc=C4=B1?= Date: Wed, 20 May 2026 14:10:55 +0100 Subject: [PATCH 5/7] fix(server): harden unexpected-worker-death notification (codex review) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three reliability fixes from a codex pass on the worker-exit handling: 1. Spawn errors now notify. A missing/​unexecutable worker binary fires the child's `error` event (not `exit`), and Node may emit no `exit` after it — so the queued turn stranded with no terminal event despite the new exit handler. Route `error` through the same notifier; a one-shot guard dedups if `exit` also fires. 2. Notify for the in-flight turn, not the deployment's first message. A long-lived worker serves many turns; the notice was keyed to the originating message. Track the latest dispatched message per worker (`recordInFlightMessage`, advanced from sendToWorkerQueue) so a crash on turn N reports turn N. 3. Stay silent when no turn is outstanding. Gate notifyWorkerCrash on the thread queue having waiting+active work — a worker that already replied and then dies (idle crash / reap) has no pending or claimed message, so it no longer emits a false "before it could reply". Fails open if the stats lookup throws. Live-reverified (PGlite + worker that exits 1): SSE still emits connected → error → complete, now carrying the correct in-flight messageId. --- .../worker-startup-failure-notice.test.ts | 50 ++++++++++----- .../orchestration/base-deployment-manager.ts | 12 ++++ .../orchestration/impl/embedded-deployment.ts | 61 +++++++++++++++---- .../gateway/orchestration/message-consumer.ts | 29 +++++++++ 4 files changed, 125 insertions(+), 27 deletions(-) diff --git a/packages/server/src/gateway/__tests__/worker-startup-failure-notice.test.ts b/packages/server/src/gateway/__tests__/worker-startup-failure-notice.test.ts index 3ada78266..057f8cf14 100644 --- a/packages/server/src/gateway/__tests__/worker-startup-failure-notice.test.ts +++ b/packages/server/src/gateway/__tests__/worker-startup-failure-notice.test.ts @@ -88,30 +88,42 @@ describe("worker-startup-failure notice reaches direct-API clients (lobu-ai/lobu expect(notice?.data.processedMessageIds).toEqual(["m1"]); }); - test("unexpected worker exit (crash after spawn) notifies the conversation via `error`", async () => { + function crashConsumer(stats: { + waiting: number; + active: number; + }): { consumer: any; sends: Array<{ queue: string; data: any }> } { const consumer = new MessageConsumer({} as any, { setWorkerExitNotifier: () => undefined, } as any) as any; - const sends: Array<{ queue: string; data: Record }> = []; + const sends: Array<{ queue: string; data: any }> = []; consumer.queue = { createQueue: mock(async () => undefined), - send: mock(async (queue: string, data: Record) => { + getQueueStats: mock(async () => ({ ...stats, completed: 0, failed: 0 })), + send: mock(async (queue: string, data: any) => { sends.push({ queue, data }); }), }; + return { consumer, sends }; + } - await consumer.notifyWorkerCrash({ - deploymentName: "lobu-worker-api-abc", - messageData: { - messageId: "m1", - userId: "u1", - channelId: "api_u1", - conversationId: "conv-1", - platform: "api", - platformMetadata: {}, - }, - reason: "exit code 1", - }); + const crashInfo = { + deploymentName: "lobu-worker-api-abc", + messageData: { + messageId: "m1", + userId: "u1", + channelId: "api_u1", + conversationId: "conv-1", + platform: "api", + platformMetadata: {}, + }, + reason: "exit code 1", + }; + + test("unexpected worker exit with an outstanding turn notifies via `error`", async () => { + // A claimed-but-incomplete message shows as active>0 at crash time. + const { consumer, sends } = crashConsumer({ waiting: 0, active: 1 }); + + await consumer.notifyWorkerCrash(crashInfo); const notice = sends.find((s) => s.queue === "thread_response"); expect(notice).toBeDefined(); @@ -121,6 +133,14 @@ describe("worker-startup-failure notice reaches direct-API clients (lobu-ai/lobu expect(notice?.data.processedMessageIds).toEqual(["m1"]); }); + test("worker exit with NO outstanding turn (idle / already replied) sends nothing", async () => { + const { consumer, sends } = crashConsumer({ waiting: 0, active: 0 }); + + await consumer.notifyWorkerCrash(crashInfo); + + expect(sends.find((s) => s.queue === "thread_response")).toBeUndefined(); + }); + test("an `error` notice is surfaced to the API renderer (handleError + completion)", async () => { const { consumer, renderer } = createApiConsumer(); diff --git a/packages/server/src/gateway/orchestration/base-deployment-manager.ts b/packages/server/src/gateway/orchestration/base-deployment-manager.ts index 55a7d2ffe..6621eb2f1 100644 --- a/packages/server/src/gateway/orchestration/base-deployment-manager.ts +++ b/packages/server/src/gateway/orchestration/base-deployment-manager.ts @@ -214,6 +214,18 @@ export abstract class BaseDeploymentManager { this.workerExitNotifier = notifier; } + /** + * Advance a live worker's in-flight message to the latest dispatched turn, so + * `workerExitNotifier` reports the message actually being served rather than + * the deployment's first. No-op default (no running-worker registry to + * update); the embedded manager overrides it. Safe to call when no worker is + * tracked yet — the spawn path seeds the originating message. + */ + recordInFlightMessage( + _deploymentName: string, + _messageData: MessagePayload + ): void {} + constructor( config: OrchestratorConfig, moduleEnvVarsBuilder?: ModuleEnvVarsBuilder, diff --git a/packages/server/src/gateway/orchestration/impl/embedded-deployment.ts b/packages/server/src/gateway/orchestration/impl/embedded-deployment.ts index 1f61d9fc2..3f9b427a2 100644 --- a/packages/server/src/gateway/orchestration/impl/embedded-deployment.ts +++ b/packages/server/src/gateway/orchestration/impl/embedded-deployment.ts @@ -144,6 +144,12 @@ interface EmbeddedWorkerEntry { * notify the user. Closure setter, same pattern as `releaseConvLock`. */ markIntentionalExit?: () => void; + /** + * Advance the worker's in-flight message to the latest dispatched turn, so a + * crash notifies for the message actually being served. Closure setter, same + * pattern as `releaseConvLock` / `markIntentionalExit`. + */ + setCurrentMessage?: (m: MessagePayload) => void; } /** Stable namespace id for `pg_advisory_lock(key1, key2)` per-conversation locks. */ @@ -787,20 +793,43 @@ export class EmbeddedDeploymentManager extends BaseDeploymentManager { }; // Flipped by `killWorker` (via the entry's `markIntentionalExit`) before a - // deliberate SIGTERM, so the `exit` handler only notifies the user on a + // deliberate SIGTERM, so the death handlers only notify the user on a // genuine crash / external kill, never on an operator-driven stop. let intentionalExit = false; + // The message whose turn this worker is currently serving. Seeded with the + // deployment's originating message and advanced by `recordInFlightMessage` + // on each subsequent dispatch, so a crash on turn N notifies for turn N's + // message — not the deployment's first message (a long-lived worker serves + // many turns). The orchestrator additionally gates on outstanding queue + // work, so a crash while idle (turn already completed) sends nothing. + let currentMessage: MessagePayload | undefined = messageData; + + // `error` and `exit` can both fire for one death; notify at most once. + let deathNotified = false; + const notifyUnexpectedDeath = (reason: string): void => { + if (deathNotified || intentionalExit || !currentMessage) return; + deathNotified = true; + void this.workerExitNotifier?.({ + deploymentName, + messageData: currentMessage, + reason, + }); + }; + // Spawn errors (binary missing, EACCES, fork failure) fire on the child - // *after* spawn() returns, so without an "error" listener Node would - // throw an unhandled exception and crash the gateway. Drop the entry - // and log so the next ensureDeployment can retry cleanly. + // *after* spawn() returns, so spawnDeployment has already resolved and + // trackFailedDeployment is never reached. Node may emit `error` WITHOUT a + // following `exit`, so the queued turn would otherwise strand with no + // terminal event (lobu-ai/lobu#946). Notify here too; the one-shot guard + // dedups if `exit` also fires. child.once("error", (err) => { logger.error( `Embedded worker ${deploymentName} spawn error: ${err.message}` ); this.workers.delete(deploymentName); releaseLockOnce(); + notifyUnexpectedDeath(`spawn error: ${err.message}`); }); child.stdout?.on("data", (data: Buffer) => { @@ -835,17 +864,15 @@ export class EmbeddedDeploymentManager extends BaseDeploymentManager { // Surface an unexpected worker death to the user. A worker that spawns // and then dies never rejects createWorkerDeployment (it resolved on - // spawn) and so never reaches trackFailedDeployment; its queued message + // spawn) and so never reaches trackFailedDeployment; its queued turn // would otherwise strand with no terminal event and the run hangs // (lobu-ai/lobu#946). Deliberate stops go through killWorker, which sets - // `intentionalExit`, so only genuine crashes / external kills notify. - // Note: a long-lived worker handles later turns too, so messageData is - // the deployment's originating message; the notifier keys delivery on - // its conversationId, which is stable across the conversation's turns. + // `intentionalExit`. A clean (code 0) exit owes the user nothing. The + // orchestrator's notifier additionally drops the notice when no turn is + // outstanding, so an idle worker that exits non-zero stays silent. const cleanExit = code === 0 && !signal; - if (!intentionalExit && !cleanExit && messageData) { - const reason = signal ? `signal ${signal}` : `exit code ${code}`; - void this.workerExitNotifier?.({ deploymentName, messageData, reason }); + if (!cleanExit) { + notifyUnexpectedDeath(signal ? `signal ${signal}` : `exit code ${code}`); } }); @@ -861,6 +888,9 @@ export class EmbeddedDeploymentManager extends BaseDeploymentManager { markIntentionalExit: () => { intentionalExit = true; }, + setCurrentMessage: (m: MessagePayload) => { + currentMessage = m; + }, }); logger.info( @@ -925,6 +955,13 @@ export class EmbeddedDeploymentManager extends BaseDeploymentManager { } } + override recordInFlightMessage( + deploymentName: string, + messageData: MessagePayload + ): void { + this.workers.get(deploymentName)?.setCurrentMessage?.(messageData); + } + /** Send SIGTERM, then SIGKILL after timeout. Resolves on child exit. * * Does NOT release the conversation lock — the child's exit handler is diff --git a/packages/server/src/gateway/orchestration/message-consumer.ts b/packages/server/src/gateway/orchestration/message-consumer.ts index 585bafe49..5711f244c 100644 --- a/packages/server/src/gateway/orchestration/message-consumer.ts +++ b/packages/server/src/gateway/orchestration/message-consumer.ts @@ -77,6 +77,29 @@ export class MessageConsumer { }): Promise { const { messageData, reason } = info; if (!messageData.conversationId) return; + + // Only notify when a turn is actually outstanding. A long-lived worker that + // already replied and then dies (idle crash, or reaped) has no pending or + // claimed message in its thread queue, so notifying would be a false + // "before it could reply". `active` counts claimed/running rows, so a + // worker that crashed mid-turn (its row still claimed) is correctly caught. + // Fail open: if the stats lookup throws, notify rather than risk a silent + // strand. + try { + const stats = await this.queue.getQueueStats( + `thread_message_${info.deploymentName}` + ); + if (stats.waiting + stats.active === 0) { + logger.info( + { deploymentName: info.deploymentName }, + "Worker exited with no outstanding turn — skipping crash notice" + ); + return; + } + } catch { + // ignore — fall through and notify + } + const userMessage = `The worker handling your request stopped unexpectedly (${reason}) before it could reply. Please retry in a moment.`; try { const responseQueue = "thread_response"; @@ -495,6 +518,12 @@ export class MessageConsumer { logger.info( `✅ Sent message to thread queue ${threadQueueName} for conversation ${data.conversationId}, jobId: ${jobId}` ); + + // Advance the worker's in-flight message to this turn so an unexpected + // worker death notifies for the message actually being served, not the + // deployment's first. No-op until the worker entry exists (the spawn + // path seeds the originating message for a brand-new deployment). + this.deploymentManager.recordInFlightMessage(deploymentName, data); } catch (error) { logger.error(`❌ [ERROR] sendToWorkerQueue failed:`, error); throw new OrchestratorError( From 1b8e175077408b49ab56d2aeb1a8240e169f8a34 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Burak=20Emre=20Kabakc=C4=B1?= Date: Wed, 20 May 2026 14:20:35 +0100 Subject: [PATCH 6/7] fix(server): drop the queue-depth gate on worker-crash notice (codex round 2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous commit gated notifyWorkerCrash on the thread-message queue having waiting+active work, to avoid a false notice when an idle worker dies after already replying. But that gate is wrong: the thread-message row is marked completed the moment the worker acknowledges *receipt* (before it processes or replies), not after it replies — so during the entire agent-processing window the queue shows 0 outstanding. A crash there would be suppressed, re-stranding the turn with no terminal event — the exact silent-hang #946 fixes. Codex caught this. Remove the gate: notify on every unexpected, non-clean, non-intentional worker death. The accepted edge is benign — a long-lived worker that dies while idle after replying may emit a spurious notice; for the direct-API/CLI path that session already got `complete` and closed (no-op), and on chat platforms it's a rare non-destructive false alarm. Notice wording is now neutral on whether a reply happened. Surfacing a real mid-turn crash is worth that tradeoff; eliminating the edge entirely needs dispatch→terminal-response in-flight tracking shared across the response consumer (deferred). --- .../worker-startup-failure-notice.test.ts | 46 ++++++------------- .../gateway/orchestration/message-consumer.ts | 36 ++++++--------- 2 files changed, 26 insertions(+), 56 deletions(-) diff --git a/packages/server/src/gateway/__tests__/worker-startup-failure-notice.test.ts b/packages/server/src/gateway/__tests__/worker-startup-failure-notice.test.ts index 057f8cf14..538e7bef4 100644 --- a/packages/server/src/gateway/__tests__/worker-startup-failure-notice.test.ts +++ b/packages/server/src/gateway/__tests__/worker-startup-failure-notice.test.ts @@ -88,42 +88,30 @@ describe("worker-startup-failure notice reaches direct-API clients (lobu-ai/lobu expect(notice?.data.processedMessageIds).toEqual(["m1"]); }); - function crashConsumer(stats: { - waiting: number; - active: number; - }): { consumer: any; sends: Array<{ queue: string; data: any }> } { + test("unexpected worker death notifies the conversation via `error`", async () => { const consumer = new MessageConsumer({} as any, { setWorkerExitNotifier: () => undefined, } as any) as any; const sends: Array<{ queue: string; data: any }> = []; consumer.queue = { createQueue: mock(async () => undefined), - getQueueStats: mock(async () => ({ ...stats, completed: 0, failed: 0 })), send: mock(async (queue: string, data: any) => { sends.push({ queue, data }); }), }; - return { consumer, sends }; - } - const crashInfo = { - deploymentName: "lobu-worker-api-abc", - messageData: { - messageId: "m1", - userId: "u1", - channelId: "api_u1", - conversationId: "conv-1", - platform: "api", - platformMetadata: {}, - }, - reason: "exit code 1", - }; - - test("unexpected worker exit with an outstanding turn notifies via `error`", async () => { - // A claimed-but-incomplete message shows as active>0 at crash time. - const { consumer, sends } = crashConsumer({ waiting: 0, active: 1 }); - - await consumer.notifyWorkerCrash(crashInfo); + await consumer.notifyWorkerCrash({ + deploymentName: "lobu-worker-api-abc", + messageData: { + messageId: "m1", + userId: "u1", + channelId: "api_u1", + conversationId: "conv-1", + platform: "api", + platformMetadata: {}, + }, + reason: "exit code 1", + }); const notice = sends.find((s) => s.queue === "thread_response"); expect(notice).toBeDefined(); @@ -133,14 +121,6 @@ describe("worker-startup-failure notice reaches direct-API clients (lobu-ai/lobu expect(notice?.data.processedMessageIds).toEqual(["m1"]); }); - test("worker exit with NO outstanding turn (idle / already replied) sends nothing", async () => { - const { consumer, sends } = crashConsumer({ waiting: 0, active: 0 }); - - await consumer.notifyWorkerCrash(crashInfo); - - expect(sends.find((s) => s.queue === "thread_response")).toBeUndefined(); - }); - test("an `error` notice is surfaced to the API renderer (handleError + completion)", async () => { const { consumer, renderer } = createApiConsumer(); diff --git a/packages/server/src/gateway/orchestration/message-consumer.ts b/packages/server/src/gateway/orchestration/message-consumer.ts index 5711f244c..c5e44bf24 100644 --- a/packages/server/src/gateway/orchestration/message-consumer.ts +++ b/packages/server/src/gateway/orchestration/message-consumer.ts @@ -78,29 +78,19 @@ export class MessageConsumer { const { messageData, reason } = info; if (!messageData.conversationId) return; - // Only notify when a turn is actually outstanding. A long-lived worker that - // already replied and then dies (idle crash, or reaped) has no pending or - // claimed message in its thread queue, so notifying would be a false - // "before it could reply". `active` counts claimed/running rows, so a - // worker that crashed mid-turn (its row still claimed) is correctly caught. - // Fail open: if the stats lookup throws, notify rather than risk a silent - // strand. - try { - const stats = await this.queue.getQueueStats( - `thread_message_${info.deploymentName}` - ); - if (stats.waiting + stats.active === 0) { - logger.info( - { deploymentName: info.deploymentName }, - "Worker exited with no outstanding turn — skipping crash notice" - ); - return; - } - } catch { - // ignore — fall through and notify - } - - const userMessage = `The worker handling your request stopped unexpectedly (${reason}) before it could reply. Please retry in a moment.`; + // Notify on every unexpected death. We deliberately do NOT gate on the + // thread-message queue depth: that row is marked completed the moment the + // worker acknowledges *receipt* (before it processes/replies), so a crash + // during the long processing window would show 0 outstanding and be + // suppressed — re-stranding the turn (the exact silent-hang #946 is about). + // The cost of not gating is a benign edge: if a long-lived worker dies + // while idle *after* already replying, the conversation may see a spurious + // "stopped unexpectedly" notice. For the direct-API/CLI path that session + // has already received `complete` and closed, so the late event is a no-op; + // on chat platforms it is a rare, non-destructive false alarm. Surfacing a + // real mid-turn crash is worth that. Wording stays neutral on whether a + // reply happened. + const userMessage = `The worker handling your request stopped unexpectedly (${reason}). Please retry in a moment.`; try { const responseQueue = "thread_response"; await this.queue.createQueue(responseQueue); From 46bbc5dd875d7c6644d60d91d01cf5bffec8ae5c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Burak=20Emre=20Kabakc=C4=B1?= Date: Wed, 20 May 2026 14:42:59 +0100 Subject: [PATCH 7/7] docs(agents): require multi-replica k8s correctness on every change --- AGENTS.md | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/AGENTS.md b/AGENTS.md index 8349ad40d..2b4c78489 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -40,8 +40,13 @@ All chat platforms (Telegram, Slack, Discord, WhatsApp, Teams) run through Chat `mode: "polling"` is rejected when `LOBU_CLOUD_MODE=1` — a polling worker shares one Telegram edge connection across tenants, so a misbehaving one degrades delivery for everyone. Self-hosters (`LOBU_CLOUD_MODE` unset/0) keep polling for tunnel-less dev. #### Orchestration -- **Embedded-only deployment.** Gateway, workers, embeddings, and the Lobu memory backend run in one Node process (`lobu run`, or `bun run dev` in the monorepo). Workers spawn from `EmbeddedDeploymentManager` as `child_process.spawn` subprocesses with `cwd = ./workspaces/{agentId}/` and `WORKSPACE_DIR` env. On Linux production hosts the manager wraps the spawn in `systemd-run --user --scope` (MemoryMax, CPUQuota, IPAddressDeny=any + IPAddressAllow=127.0.0.1, capability drops). No Docker or Kubernetes. +- **Embedded process model.** Within a single app process the gateway, the worker *orchestrator*, embeddings, and the Lobu memory backend run together. Workers spawn from `EmbeddedDeploymentManager` as `child_process.spawn` subprocesses with `cwd = ./workspaces/{agentId}/` and `WORKSPACE_DIR` env (on Linux, wrapped in `systemd-run --user --scope` for MemoryMax/CPUQuota/IPAddressDeny+capability drops). "No Docker/Kubernetes" applies to **worker orchestration only** — workers are child processes, never pods. The app process itself is a different story (see below). - Postgres (with `pgvector`; optionally `postgis` for geo enrichment) is the only user-provided external. The Node process connects out via `DATABASE_URL`. Runtime state — queues, chat connection rows, grant cache, MCP proxy sessions — lives in dedicated Postgres tables. +- **🚨 Multi-replica k8s is the production reality — every change MUST be correct under N>1 app replicas.** The app ships as a k8s `Deployment` (`charts/lobu`) whose `app.replicaCount` is routinely >1. The Service uses `sessionAffinity: ClientIP` because per-pod state — `SseManager` connections **and** its event backlog (`sse-manager.ts`), the in-process `workers` map, the deployment-creation lock cache — is **in-memory and pod-local with no cross-pod fan-out**. Consequences you must respect on EVERY task: + - A client's SSE stream, its `POST /messages`, and its conversation's worker are co-located on one pod **only** because ClientIP affinity pins them. Don't assume two requests for the same conversation hit the same pod for any other reason. + - Cross-replica delivery rides Postgres: a worker reply reaches the client's SSE pod via the `thread_response` queue (any pod's consumer may claim a row and broadcasts to *its* local `SseManager`). An event broadcast on the wrong pod is silently dropped. + - **Never introduce shared state as an in-memory Map/singleton that another replica needs to read or mutate.** Per-pod in-memory state is fine only for data that pod exclusively owns (its own SSE connections, its own spawned workers). Anything that must be observed/coordinated across replicas goes in Postgres. + - Before claiming a feature works, answer explicitly: *"does this still hold with 3 app replicas behind ClientIP affinity?"* If a fix relies on one component (dispatch) seeing another component's event (completion) and they can land on different pods, it is broken in prod — use a Postgres-mediated signal instead. - Workers are sandboxed and **never see real credentials**. The gateway's `secret-proxy` swaps `lobu_secret_` placeholders for real keys at egress; workers receive only the placeholders. #### MCP