Skip to content
7 changes: 6 additions & 1 deletion AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,13 @@ All chat platforms (Telegram, Slack, Discord, WhatsApp, Teams) run through Chat
`mode: "polling"` is rejected when `LOBU_CLOUD_MODE=1` — a polling worker shares one Telegram edge connection across tenants, so a misbehaving one degrades delivery for everyone. Self-hosters (`LOBU_CLOUD_MODE` unset/0) keep polling for tunnel-less dev.

#### Orchestration
- **Embedded-only deployment.** Gateway, workers, embeddings, and the Lobu memory backend run in one Node process (`lobu run`, or `bun run dev` in the monorepo). Workers spawn from `EmbeddedDeploymentManager` as `child_process.spawn` subprocesses with `cwd = ./workspaces/{agentId}/` and `WORKSPACE_DIR` env. On Linux production hosts the manager wraps the spawn in `systemd-run --user --scope` (MemoryMax, CPUQuota, IPAddressDeny=any + IPAddressAllow=127.0.0.1, capability drops). No Docker or Kubernetes.
- **Embedded process model.** Within a single app process the gateway, the worker *orchestrator*, embeddings, and the Lobu memory backend run together. Workers spawn from `EmbeddedDeploymentManager` as `child_process.spawn` subprocesses with `cwd = ./workspaces/{agentId}/` and `WORKSPACE_DIR` env (on Linux, wrapped in `systemd-run --user --scope` for MemoryMax/CPUQuota/IPAddressDeny+capability drops). "No Docker/Kubernetes" applies to **worker orchestration only** — workers are child processes, never pods. The app process itself is a different story (see below).
- Postgres (with `pgvector`; optionally `postgis` for geo enrichment) is the only user-provided external. The Node process connects out via `DATABASE_URL`. Runtime state — queues, chat connection rows, grant cache, MCP proxy sessions — lives in dedicated Postgres tables.
- **🚨 Multi-replica k8s is the production reality — every change MUST be correct under N>1 app replicas.** The app ships as a k8s `Deployment` (`charts/lobu`) whose `app.replicaCount` is routinely >1. The Service uses `sessionAffinity: ClientIP` because per-pod state — `SseManager` connections **and** its event backlog (`sse-manager.ts`), the in-process `workers` map, the deployment-creation lock cache — is **in-memory and pod-local with no cross-pod fan-out**. Consequences you must respect on EVERY task:
- A client's SSE stream, its `POST /messages`, and its conversation's worker are co-located on one pod **only** because ClientIP affinity pins them. Don't assume two requests for the same conversation hit the same pod for any other reason.
- Cross-replica delivery rides Postgres: a worker reply reaches the client's SSE pod via the `thread_response` queue (any pod's consumer may claim a row and broadcasts to *its* local `SseManager`). An event broadcast on the wrong pod is silently dropped.
- **Never introduce shared state as an in-memory Map/singleton that another replica needs to read or mutate.** Per-pod in-memory state is fine only for data that pod exclusively owns (its own SSE connections, its own spawned workers). Anything that must be observed/coordinated across replicas goes in Postgres.
- Before claiming a feature works, answer explicitly: *"does this still hold with 3 app replicas behind ClientIP affinity?"* If a fix relies on one component (dispatch) seeing another component's event (completion) and they can land on different pods, it is broken in prod — use a Postgres-mediated signal instead.
- Workers are sandboxed and **never see real credentials**. The gateway's `secret-proxy` swaps `lobu_secret_<uuid>` placeholders for real keys at egress; workers receive only the placeholders.

#### MCP
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
import { beforeAll, describe, expect, mock, test } from "bun:test";

import { ApiResponseRenderer } from "../api/response-renderer.js";
import { MessageConsumer } from "../orchestration/message-consumer.js";
import { UnifiedThreadResponseConsumer } from "../platform/unified-thread-consumer.js";

// `new RunsQueue()` (built inside the MessageConsumer constructor) only guards
// on DATABASE_URL being present — it does not connect. We immediately replace
// the queue with a recording fake before exercising any method, so no Postgres
// is touched.
beforeAll(() => {
process.env.DATABASE_URL ||= "postgres://test/test";
});

const apiPayloadBase = {
messageId: "m1",
channelId: "api_u1",
conversationId: "conv-1",
userId: "u1",
teamId: "api",
platform: "api",
timestamp: 0,
// Direct-API sessions carry no `sessionId` in platformMetadata, so the
// consumer's cli-session broadcasts are skipped and every SSE event comes
// from the renderer keyed on `conversationId`.
platformMetadata: {},
};

function createApiConsumer() {
const queue = {
start: mock(async () => undefined),
stop: mock(async () => undefined),
createQueue: mock(async () => undefined),
work: mock(async () => undefined),
};
const renderer = {
handleDelta: mock(async () => "m1"),
handleContent: mock(async () => undefined),
handleError: mock(async () => undefined),
handleCompletion: mock(async () => undefined),
handleEphemeral: mock(async () => undefined),
handleStatusUpdate: mock(async () => undefined),
};
const platformRegistry = {
get: mock(() => ({ getResponseRenderer: () => renderer })),
};
const sseManager = { broadcast: mock(() => undefined) };
const consumer = new UnifiedThreadResponseConsumer(
queue as any,
platformRegistry as any,
sseManager as any
) as any;
return { consumer, renderer, sseManager };
}

describe("worker-startup-failure notice reaches direct-API clients (lobu-ai/lobu#946)", () => {
test("producer emits the failure notice via `error`, not the ephemeral-only `content` field", async () => {
const consumer = new MessageConsumer({} as any, {
setWorkerExitNotifier: () => undefined,
} as any) as any;
const sends: Array<{ queue: string; data: Record<string, unknown> }> = [];
consumer.queue = {
createQueue: mock(async () => undefined),
send: mock(async (queue: string, data: Record<string, unknown>) => {
sends.push({ queue, data });
}),
};

await consumer.trackFailedDeployment(
"deploy-1",
{
messageId: "m1",
userId: "u1",
channelId: "api_u1",
conversationId: "conv-1",
platform: "api",
platformMetadata: {},
},
new Error("spawn ENOENT")
);

const notice = sends.find((s) => s.queue === "thread_response");
expect(notice).toBeDefined();
// The fix: the notice rides the `error` field (rendered end-to-end), not
// `content` (only the ephemeral branch renders content -> silently dropped).
expect(notice?.data.error).toMatch(/Worker startup failed/);
expect(notice?.data.content).toBeUndefined();
expect(notice?.data.processedMessageIds).toEqual(["m1"]);
});

test("unexpected worker death notifies the conversation via `error`", async () => {
const consumer = new MessageConsumer({} as any, {
setWorkerExitNotifier: () => undefined,
} as any) as any;
const sends: Array<{ queue: string; data: any }> = [];
consumer.queue = {
createQueue: mock(async () => undefined),
send: mock(async (queue: string, data: any) => {
sends.push({ queue, data });
}),
};

await consumer.notifyWorkerCrash({
deploymentName: "lobu-worker-api-abc",
messageData: {
messageId: "m1",
userId: "u1",
channelId: "api_u1",
conversationId: "conv-1",
platform: "api",
platformMetadata: {},
},
reason: "exit code 1",
});

const notice = sends.find((s) => s.queue === "thread_response");
expect(notice).toBeDefined();
expect(notice?.data.error).toMatch(/stopped unexpectedly \(exit code 1\)/);
expect(notice?.data.content).toBeUndefined();
expect(notice?.data.conversationId).toBe("conv-1");
expect(notice?.data.processedMessageIds).toEqual(["m1"]);
});

test("an `error` notice is surfaced to the API renderer (handleError + completion)", async () => {
const { consumer, renderer } = createApiConsumer();

await consumer.handleThreadResponse({
id: "job-err",
data: {
...apiPayloadBase,
error: "Worker startup failed and your request could not be processed.",
processedMessageIds: ["m1"],
},
});

expect(renderer.handleError).toHaveBeenCalledTimes(1);
expect(renderer.handleCompletion).toHaveBeenCalledTimes(1);
});

test("a non-ephemeral `content` notice is surfaced via handleContent, then completes (scalable router fix)", async () => {
const { consumer, renderer, sseManager } = createApiConsumer();

await consumer.handleThreadResponse({
id: "job-content",
data: {
...apiPayloadBase,
// A human-readable message in `content` with no `ephemeral` flag.
// Pre-fix this was dropped (only `complete` fired); now the router
// renders it through handleContent so no notice silently vanishes.
content: "A buffered, non-streamed notice for the user.",
processedMessageIds: ["m1"],
},
});

expect(renderer.handleContent).toHaveBeenCalledTimes(1);
// Falls through to completion so the turn still terminates.
expect(renderer.handleCompletion).toHaveBeenCalledTimes(1);
// Not misrouted as a stream chunk, ephemeral, or error.
expect(renderer.handleDelta).not.toHaveBeenCalled();
expect(renderer.handleEphemeral).not.toHaveBeenCalled();
expect(renderer.handleError).not.toHaveBeenCalled();
void sseManager;
});

test("an `ephemeral` content payload still routes to handleEphemeral, not handleContent", async () => {
const { consumer, renderer } = createApiConsumer();

await consumer.handleThreadResponse({
id: "job-ephemeral",
data: {
...apiPayloadBase,
ephemeral: true,
content: "Visit https://example.com to authorize.",
},
});

expect(renderer.handleEphemeral).toHaveBeenCalledTimes(1);
expect(renderer.handleContent).not.toHaveBeenCalled();
});

// Real renderer (no mock for the rendering step): confirm the actual SSE
// event `lobu chat` parses is emitted. The CLI's `output` case reads
// `data.content` (chat.ts), so the wire shape matters.
test("real ApiResponseRenderer.handleContent emits an `output` SSE event the CLI renders", async () => {
const broadcasts: Array<{ session: string; event: string; data: any }> = [];
const sseManager = {
broadcast: (session: string, event: string, data: unknown) => {
broadcasts.push({ session, event, data });
},
};
const renderer = new ApiResponseRenderer(sseManager as any);

await renderer.handleContent(
{
...apiPayloadBase,
content: "Worker startup failed and your request could not be processed.",
} as any,
"u1:m1"
);

expect(broadcasts).toHaveLength(1);
expect(broadcasts[0]?.event).toBe("output");
expect(broadcasts[0]?.session).toBe("conv-1");
expect(broadcasts[0]?.data).toMatchObject({
type: "delta",
content: "Worker startup failed and your request could not be processed.",
messageId: "m1",
});
});
});
29 changes: 29 additions & 0 deletions packages/server/src/gateway/api/response-renderer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,35 @@ export class ApiResponseRenderer implements ResponseRenderer {
return payload.messageId;
}

/**
* Handle a complete, non-streamed message (the `content` field).
* Broadcasts it to SSE clients as an `output` event so direct-API/CLI
* clients render it identically to streamed deltas.
*/
async handleContent(
payload: ThreadResponsePayload,
_sessionKey: string
): Promise<void> {
const sessionId =
(payload.platformMetadata?.sessionId as string) || payload.conversationId;

if (!sessionId) {
logger.warn("No session ID found in payload for content broadcast");
return;
}

this.sseManager.broadcast(sessionId, "output", {
type: "delta",
content: payload.content,
timestamp: payload.timestamp || Date.now(),
messageId: payload.messageId,
});

logger.debug(
`Broadcast content to session ${sessionId}: ${payload.content?.length || 0} chars`
);
}

/**
* Handle completion of response processing
* Sends completion event to SSE clients
Expand Down
32 changes: 28 additions & 4 deletions packages/server/src/gateway/connections/chat-response-bridge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,32 @@ export class ChatResponseBridge implements ResponseRenderer {
}

async handleEphemeral(payload: ThreadResponsePayload): Promise<void> {
await this.postBufferedContent(payload, "ephemeral");
}

/**
* Handle a complete, non-streamed message (the `content` field) — e.g. a
* gateway-originated notice the worker never streamed as deltas. Posted as
* a normal in-thread message, identical to the ephemeral path.
*/
async handleContent(
payload: ThreadResponsePayload,
_sessionKey: string
): Promise<void> {
await this.postBufferedContent(payload, "content");
}

// --- Private ---

/**
* Post a complete `content` message to the platform target. Shared by the
* ephemeral and content render paths — both deliver a one-shot buffered
* message (with optional settings link-buttons), they differ only in intent.
*/
private async postBufferedContent(
payload: ThreadResponsePayload,
label: "ephemeral" | "content"
): Promise<void> {
if (!payload.content) return;

const ctx = this.extractResponseContext(payload);
Expand Down Expand Up @@ -641,7 +667,7 @@ export class ChatResponseBridge implements ResponseRenderer {
} catch (error) {
logger.warn(
{ connectionId, error: String(error) },
"Failed to render ephemeral settings button"
`Failed to render ${label} settings button`
);
const fallbackText = `${processedContent}\n\n${linkButtons.map((button) => `${button.text}: ${button.url}`).join("\n")}`;
await target.post(fallbackText.trim());
Expand All @@ -654,13 +680,11 @@ export class ChatResponseBridge implements ResponseRenderer {
} catch (error) {
logger.error(
{ connectionId, error: String(error) },
"Failed to send ephemeral message"
`Failed to send ${label} message`
);
}
}

// --- Private ---

private async resolveTarget(
instance: any,
channelId: string,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,44 @@ export abstract class BaseDeploymentManager {
*/
private inFlightCreates = new Map<string, Promise<void>>();

/**
* Notifier invoked when a worker subprocess dies unexpectedly (a crash or an
* external kill — NOT a deliberate scale-down / idle reap). A worker that
* spawns and then exits non-zero never reaches `trackFailedDeployment`
* (createWorkerDeployment already resolved on spawn), so without this hook
* its queued message strands with no terminal event and the run hangs
* (lobu-ai/lobu#946). The orchestrator wires this to surface an error to the
* affected conversation. Optional: unset = no notification (legacy behavior).
*/
protected workerExitNotifier?: (info: {
deploymentName: string;
messageData: MessagePayload;
reason: string;
}) => void | Promise<void>;

/** Register the unexpected-worker-exit notifier (see `workerExitNotifier`). */
setWorkerExitNotifier(
notifier: (info: {
deploymentName: string;
messageData: MessagePayload;
reason: string;
}) => void | Promise<void>
): void {
this.workerExitNotifier = notifier;
}

/**
* Advance a live worker's in-flight message to the latest dispatched turn, so
* `workerExitNotifier` reports the message actually being served rather than
* the deployment's first. No-op default (no running-worker registry to
* update); the embedded manager overrides it. Safe to call when no worker is
* tracked yet — the spawn path seeds the originating message.
*/
recordInFlightMessage(
_deploymentName: string,
_messageData: MessagePayload
): void {}

constructor(
config: OrchestratorConfig,
moduleEnvVarsBuilder?: ModuleEnvVarsBuilder,
Expand Down
Loading
Loading