diff --git a/packages/gateway/src/__tests__/base-deployment-grants.test.ts b/packages/gateway/src/__tests__/base-deployment-grants.test.ts index a560beded..33bd1c51e 100644 --- a/packages/gateway/src/__tests__/base-deployment-grants.test.ts +++ b/packages/gateway/src/__tests__/base-deployment-grants.test.ts @@ -13,7 +13,7 @@ class TestDeploymentManager extends BaseDeploymentManager { async listDeployments(): Promise { return []; } - async createDeployment(): Promise { + protected async spawnDeployment(): Promise { /* noop */ } async scaleDeployment(): Promise { @@ -260,3 +260,151 @@ describe("BaseDeploymentManager.syncNetworkConfigGrants", () => { ).toBe(false); }); }); + +/** + * In-flight coalescing for `ensureDeployment` lives in the base class so all + * orchestrators share one implementation. Subclass-specific concerns (Docker + * 409, K8s AlreadyExists, embedded `workers.has` short-circuit) are tested + * separately in their own files. + */ +describe("BaseDeploymentManager.ensureDeployment in-flight coalescing", () => { + class CountingManager extends BaseDeploymentManager { + spawnCalls = 0; + /** Resolver for the most recent spawn — lets tests hold spawn open. */ + releaseSpawn: () => void = () => { + // replaced by each test before spawn runs + }; + + async listDeployments(): Promise { + return []; + } + protected async spawnDeployment(): Promise { + this.spawnCalls++; + await new Promise((resolve) => { + this.releaseSpawn = resolve; + }); + } + async scaleDeployment(): Promise { + /* noop */ + } + async deleteDeployment(): Promise { + /* noop */ + } + async updateDeploymentActivity(): Promise { + /* noop */ + } + async validateWorkerImage(): Promise { + /* noop */ + } + protected getDispatcherHost(): string { + return "localhost"; + } + } + + test("concurrent calls for the same name share a single spawn", async () => { + const manager = new CountingManager(TEST_CONFIG); + + const p1 = manager.ensureDeployment("worker-1", "u", "u", buildPayload({})); + const p2 = manager.ensureDeployment("worker-1", "u", "u", buildPayload({})); + const p3 = manager.ensureDeployment("worker-1", "u", "u", buildPayload({})); + + // All three callers are blocked on the single in-flight spawn. + expect(manager.spawnCalls).toBe(1); + + manager.releaseSpawn(); + await Promise.all([p1, p2, p3]); + + expect(manager.spawnCalls).toBe(1); + }); + + test("concurrent calls for different names spawn independently", async () => { + const manager = new CountingManager(TEST_CONFIG); + + const p1 = manager.ensureDeployment("worker-1", "u", "u", buildPayload({})); + // Capture the first spawn's resolver before the second call overwrites it. + const release1 = manager.releaseSpawn; + const p2 = manager.ensureDeployment("worker-2", "u", "u", buildPayload({})); + const release2 = manager.releaseSpawn; + + expect(manager.spawnCalls).toBe(2); + + release1(); + release2(); + await Promise.all([p1, p2]); + }); + + test("after spawn resolves, a subsequent call re-spawns (cache cleared)", async () => { + const manager = new CountingManager(TEST_CONFIG); + + const p1 = manager.ensureDeployment("worker-1", "u", "u", buildPayload({})); + manager.releaseSpawn(); + await p1; + expect(manager.spawnCalls).toBe(1); + + // The base class only dedupes in-flight work — once it settles, a fresh + // call re-invokes spawn. Subclasses are responsible for their own + // post-completion idempotency (e.g. embedded's `workers.has` guard). + const p2 = manager.ensureDeployment("worker-1", "u", "u", buildPayload({})); + manager.releaseSpawn(); + await p2; + expect(manager.spawnCalls).toBe(2); + }); + + test("rejected spawn clears the in-flight entry so the next call retries", async () => { + class FailingManager extends BaseDeploymentManager { + attempts = 0; + async listDeployments(): Promise { + return []; + } + protected async spawnDeployment(): Promise { + this.attempts++; + if (this.attempts === 1) { + throw new Error("transient"); + } + } + async scaleDeployment(): Promise { + /* noop */ + } + async deleteDeployment(): Promise { + /* noop */ + } + async updateDeploymentActivity(): Promise { + /* noop */ + } + async validateWorkerImage(): Promise { + /* noop */ + } + protected getDispatcherHost(): string { + return "localhost"; + } + } + + const manager = new FailingManager(TEST_CONFIG); + await expect( + manager.ensureDeployment("worker-1", "u", "u", buildPayload({})) + ).rejects.toThrow("transient"); + await expect( + manager.ensureDeployment("worker-1", "u", "u", buildPayload({})) + ).resolves.toBeUndefined(); + expect(manager.attempts).toBe(2); + }); +}); + +// --------------------------------------------------------------------------- +// Known coverage gap: K8sDeploymentManager +// --------------------------------------------------------------------------- +// +// There is currently NO test file for `orchestration/impl/k8s/deployment.ts`. +// As a result, the following recently-added behavior is unverified by tests: +// +// 1. The `409 AlreadyExists` short-circuit added in `spawnDeployment` +// (treats concurrent multi-replica creates as benign success and returns +// without touching the PVC). +// 2. The PVC creation / cleanup paths in general. +// 3. The deployment env / pod-spec construction. +// +// In production this path is covered only by manual smoke tests via +// `make deploy` against a real cluster. Adding a `@kubernetes/client-node` +// mock layer is non-trivial (the SDK uses class-based watchers and dynamic +// API discovery), so coverage here was deferred. Track this gap before any +// further changes to k8s/deployment.ts. diff --git a/packages/gateway/src/__tests__/chat-response-bridge.test.ts b/packages/gateway/src/__tests__/chat-response-bridge.test.ts index 72a24f473..2ed0598dd 100644 --- a/packages/gateway/src/__tests__/chat-response-bridge.test.ts +++ b/packages/gateway/src/__tests__/chat-response-bridge.test.ts @@ -32,14 +32,23 @@ function createStreamingTarget() { return { target, collected, plainPosts, drained }; } -function createHarness(target: unknown, platform = "slack") { +function createHarness( + target: unknown, + platform = "telegram", + slackPostMessage?: ReturnType +) { const state = new InMemoryStateAdapter(); const conversationState = new ConversationStateStore(state); + const slackAdapter = slackPostMessage + ? { client: { chat: { postMessage: slackPostMessage } } } + : undefined; const manager = { getInstance: () => ({ connection: { platform }, chat: { channel: () => target, + getAdapter: (name: string) => + name === "slack" ? slackAdapter : undefined, }, conversationState, }), @@ -55,7 +64,7 @@ const basePayload = { userId: "u1", teamId: "t1", timestamp: 0, - platform: "slack", + platform: "telegram", platformMetadata: { connectionId: "conn-1", chatId: "123", @@ -271,6 +280,136 @@ describe("ChatResponseBridge.handleDelta — AsyncIterable streaming", () => { }); }); +describe("ChatResponseBridge.handleDelta — Slack markdown_text path", () => { + test("Slack accumulates deltas and posts via chat.postMessage with markdown_text", async () => { + const { target } = createStreamingTarget(); + const slackPost = mock(async () => ({ ok: true, ts: "1.1" })); + const { manager } = createHarness(target, "slack", slackPost); + const bridge = new ChatResponseBridge(manager as any); + + const slackPayload = { + ...basePayload, + platform: "slack", + channelId: "slack:C123", + conversationId: "slack:C123:1700000000.123456", + platformMetadata: { + connectionId: "conn-1", + chatId: "slack:C123", + }, + }; + + await bridge.handleDelta({ ...slackPayload, delta: "hello " }, "s"); + await bridge.handleDelta({ ...slackPayload, delta: "world" }, "s"); + await bridge.handleCompletion(slackPayload, "s"); + + // SDK target.post should NOT be called for Slack — we use markdown_text. + expect(target.post).not.toHaveBeenCalled(); + expect(slackPost).toHaveBeenCalledTimes(1); + expect(slackPost.mock.calls[0]?.[0]).toMatchObject({ + channel: "C123", + thread_ts: "1700000000.123456", + markdown_text: "hello world", + }); + }); + + test("Slack decodes HTML entities and strips empty markdown links", async () => { + const { target } = createStreamingTarget(); + const slackPost = mock(async () => ({ ok: true, ts: "1.1" })); + const { manager } = createHarness(target, "slack", slackPost); + const bridge = new ChatResponseBridge(manager as any); + + const slackPayload = { + ...basePayload, + platform: "slack", + channelId: "slack:C123", + conversationId: "slack:C123", + platformMetadata: { + connectionId: "conn-1", + chatId: "slack:C123", + }, + }; + + await bridge.handleDelta( + { + ...slackPayload, + delta: "<summary>hi</summary> [foo:1, 2]() bar", + }, + "s" + ); + await bridge.handleCompletion(slackPayload, "s"); + + expect(slackPost).toHaveBeenCalledTimes(1); + expect(slackPost.mock.calls[0]?.[0]).toMatchObject({ + markdown_text: "hi foo:1, 2 bar", + }); + }); + + test("Slack chunks long content on paragraph boundaries", async () => { + const { target } = createStreamingTarget(); + const slackPost = mock(async () => ({ ok: true, ts: "1.1" })); + const { manager } = createHarness(target, "slack", slackPost); + const bridge = new ChatResponseBridge(manager as any); + + const slackPayload = { + ...basePayload, + platform: "slack", + channelId: "slack:C123", + conversationId: "slack:C123", + platformMetadata: { + connectionId: "conn-1", + chatId: "slack:C123", + }, + }; + + // Build content > 11000 chars with explicit paragraph boundaries every ~5000 chars. + const para = `${"x".repeat(5000)}`; + const body = `${para}\n\n${para}\n\n${para}`; + + await bridge.handleDelta({ ...slackPayload, delta: body }, "s"); + await bridge.handleCompletion(slackPayload, "s"); + + expect(slackPost).toHaveBeenCalledTimes(2); + // Each posted chunk must NOT split mid-paragraph. + for (const call of slackPost.mock.calls) { + const md = (call[0] as { markdown_text: string }).markdown_text; + expect(md.length).toBeLessThanOrEqual(11_000); + // Chunks must start/end at paragraph boundaries (no leading/trailing + // partial paragraph fragments split mid-content). + expect(md.startsWith("xxxx")).toBe(true); + } + }); + + test("Slack isFullReplacement discards prior buffered content", async () => { + const { target } = createStreamingTarget(); + const slackPost = mock(async () => ({ ok: true, ts: "1.1" })); + const { manager } = createHarness(target, "slack", slackPost); + const bridge = new ChatResponseBridge(manager as any); + + const slackPayload = { + ...basePayload, + platform: "slack", + channelId: "slack:C123", + conversationId: "slack:C123", + platformMetadata: { + connectionId: "conn-1", + chatId: "slack:C123", + }, + }; + + await bridge.handleDelta({ ...slackPayload, delta: "old streamed" }, "s"); + await bridge.handleDelta( + { ...slackPayload, delta: "new content", isFullReplacement: true }, + "s" + ); + await bridge.handleCompletion(slackPayload, "s"); + + expect(slackPost).toHaveBeenCalledTimes(1); + expect(slackPost.mock.calls[0]?.[0]).toMatchObject({ + markdown_text: "new content", + }); + }); +}); + describe("ChatResponseBridge.handleEphemeral", () => { test("renders settings links as native buttons for Chat SDK targets", async () => { const posts: unknown[] = []; diff --git a/packages/gateway/src/__tests__/docker-deployment.test.ts b/packages/gateway/src/__tests__/docker-deployment.test.ts index 82cd1ba4e..9bfb64079 100644 --- a/packages/gateway/src/__tests__/docker-deployment.test.ts +++ b/packages/gateway/src/__tests__/docker-deployment.test.ts @@ -219,7 +219,7 @@ describe("DockerDeploymentManager", () => { // ResourceParser (tested indirectly via createContainer args) // ========================================================================= - describe("ResourceParser (via createDeployment)", () => { + describe("ResourceParser (via ensureDeployment)", () => { test("parseMemory: 512Mi -> 512 * 1024 * 1024", async () => { const manager = createManager({ worker: { @@ -231,7 +231,7 @@ describe("DockerDeploymentManager", () => { }, }); - await manager.createDeployment( + await manager.ensureDeployment( "test-deploy", "user", "user-id", @@ -253,7 +253,7 @@ describe("DockerDeploymentManager", () => { }, }); - await manager.createDeployment( + await manager.ensureDeployment( "test-deploy", "user", "user-id", @@ -267,7 +267,7 @@ describe("DockerDeploymentManager", () => { test("parseCpu: 500m -> 500_000_000 nanocpus", async () => { const manager = createManager(); - await manager.createDeployment( + await manager.ensureDeployment( "test-deploy", "user", "user-id", @@ -289,7 +289,7 @@ describe("DockerDeploymentManager", () => { }, }); - await manager.createDeployment( + await manager.ensureDeployment( "test-deploy", "user", "user-id", @@ -305,10 +305,10 @@ describe("DockerDeploymentManager", () => { // Container creation & security // ========================================================================= - describe("createDeployment", () => { + describe("ensureDeployment", () => { test("calls docker.createContainer with correct image", async () => { const manager = createManager(); - await manager.createDeployment( + await manager.ensureDeployment( "test-deploy", "user", "user-id", @@ -322,7 +322,7 @@ describe("DockerDeploymentManager", () => { test("drops all capabilities: CapDrop=['ALL']", async () => { const manager = createManager(); - await manager.createDeployment( + await manager.ensureDeployment( "test-deploy", "user", "user-id", @@ -334,7 +334,7 @@ describe("DockerDeploymentManager", () => { test("adds configurable capabilities via WORKER_CAPABILITIES env var", async () => { process.env.WORKER_CAPABILITIES = "NET_BIND_SERVICE,SYS_PTRACE"; const manager = createManager(); - await manager.createDeployment( + await manager.ensureDeployment( "test-deploy", "user", "user-id", @@ -349,7 +349,7 @@ describe("DockerDeploymentManager", () => { test("empty CapAdd when WORKER_CAPABILITIES not set", async () => { delete process.env.WORKER_CAPABILITIES; const manager = createManager(); - await manager.createDeployment( + await manager.ensureDeployment( "test-deploy", "user", "user-id", @@ -360,7 +360,7 @@ describe("DockerDeploymentManager", () => { test("enables no-new-privileges security option", async () => { const manager = createManager(); - await manager.createDeployment( + await manager.ensureDeployment( "test-deploy", "user", "user-id", @@ -373,7 +373,7 @@ describe("DockerDeploymentManager", () => { test("uses readonly rootfs by default", async () => { const manager = createManager(); - await manager.createDeployment( + await manager.ensureDeployment( "test-deploy", "user", "user-id", @@ -384,7 +384,7 @@ describe("DockerDeploymentManager", () => { test("disables readonly rootfs when Nix packages configured", async () => { const manager = createManager(); - await manager.createDeployment( + await manager.ensureDeployment( "test-deploy", "user", "user-id", @@ -395,7 +395,7 @@ describe("DockerDeploymentManager", () => { test("disables readonly rootfs when Nix flakeUrl configured", async () => { const manager = createManager(); - await manager.createDeployment( + await manager.ensureDeployment( "test-deploy", "user", "user-id", @@ -408,7 +408,7 @@ describe("DockerDeploymentManager", () => { test("adds tmpfs for /tmp when readonly rootfs enabled", async () => { const manager = createManager(); - await manager.createDeployment( + await manager.ensureDeployment( "test-deploy", "user", "user-id", @@ -421,7 +421,7 @@ describe("DockerDeploymentManager", () => { test("does not add tmpfs when Nix packages configured", async () => { const manager = createManager(); - await manager.createDeployment( + await manager.ensureDeployment( "test-deploy", "user", "user-id", @@ -432,7 +432,7 @@ describe("DockerDeploymentManager", () => { test("sets ShmSize to 256MB (268435456)", async () => { const manager = createManager(); - await manager.createDeployment( + await manager.ensureDeployment( "test-deploy", "user", "user-id", @@ -448,7 +448,7 @@ describe("DockerDeploymentManager", () => { const manager = createManager(); await new Promise((r) => setTimeout(r, 50)); - await manager.createDeployment( + await manager.ensureDeployment( "test-deploy", "user", "user-id", @@ -461,7 +461,7 @@ describe("DockerDeploymentManager", () => { const manager = createManager(); await new Promise((r) => setTimeout(r, 50)); - await manager.createDeployment( + await manager.ensureDeployment( "test-deploy", "user", "user-id", @@ -472,7 +472,7 @@ describe("DockerDeploymentManager", () => { test("starts container after creation", async () => { const manager = createManager(); - await manager.createDeployment( + await manager.ensureDeployment( "test-deploy", "user", "user-id", @@ -502,7 +502,7 @@ describe("DockerDeploymentManager", () => { const manager = createManager(); await expect( - manager.createDeployment( + manager.ensureDeployment( "test-deploy", "user", "user-id", @@ -515,7 +515,7 @@ describe("DockerDeploymentManager", () => { test("sets WorkingDir to /workspace", async () => { const manager = createManager(); - await manager.createDeployment( + await manager.ensureDeployment( "test-deploy", "user", "user-id", @@ -540,7 +540,7 @@ describe("DockerDeploymentManager", () => { const manager = createManager(); - await manager.createDeployment( + await manager.ensureDeployment( "test-deploy", "user", "user-id", @@ -558,7 +558,7 @@ describe("DockerDeploymentManager", () => { const manager = createManager(); // First deployment - await manager.createDeployment( + await manager.ensureDeployment( "deploy-1", "user", "user-id", @@ -566,7 +566,7 @@ describe("DockerDeploymentManager", () => { ); // Second deployment with same agentId - await manager.createDeployment( + await manager.ensureDeployment( "deploy-2", "user", "user-id", @@ -607,7 +607,7 @@ describe("DockerDeploymentManager", () => { // Should not throw despite 409 await expect( - manager.createDeployment( + manager.ensureDeployment( "test-deploy", "user", "user-id", @@ -616,6 +616,65 @@ describe("DockerDeploymentManager", () => { ).resolves.toBeUndefined(); }); + test("treats 409 from createContainer as benign and starts existing container if stopped", async () => { + const existing = { + inspect: mock(async () => ({ State: { Running: false } })), + start: mock(async () => { + /* noop */ + }), + }; + mockDocker.createContainer.mockImplementationOnce(async () => { + const err: any = new Error( + 'Conflict. The container name "/test-deploy" is already in use' + ); + err.statusCode = 409; + throw err; + }); + mockDocker.getContainer.mockImplementationOnce(() => existing as any); + + const manager = createManager(); + + await expect( + manager.ensureDeployment( + "test-deploy", + "user", + "user-id", + createTestMessagePayload() + ) + ).resolves.toBeUndefined(); + + expect(existing.inspect).toHaveBeenCalled(); + expect(existing.start).toHaveBeenCalled(); + }); + + test("treats 409 from createContainer as benign and skips start if already running", async () => { + const existing = { + inspect: mock(async () => ({ State: { Running: true } })), + start: mock(async () => { + /* noop */ + }), + }; + mockDocker.createContainer.mockImplementationOnce(async () => { + const err: any = new Error("already in use"); + err.statusCode = 409; + throw err; + }); + mockDocker.getContainer.mockImplementationOnce(() => existing as any); + + const manager = createManager(); + + await expect( + manager.ensureDeployment( + "test-deploy", + "user", + "user-id", + createTestMessagePayload() + ) + ).resolves.toBeUndefined(); + + expect(existing.start).not.toHaveBeenCalled(); + }); + test("development mode uses bind mounts", async () => { process.env.NODE_ENV = "development"; process.env.DEPLOYMENT_MODE = "docker"; @@ -623,7 +682,7 @@ describe("DockerDeploymentManager", () => { const manager = createManager(); - await manager.createDeployment( + await manager.ensureDeployment( "test-deploy", "user", "user-id", @@ -661,7 +720,7 @@ describe("DockerDeploymentManager", () => { test("WORKER_NETWORK env var overrides network name", async () => { process.env.WORKER_NETWORK = "custom-network"; const manager = createManager(); - await manager.createDeployment( + await manager.ensureDeployment( "test-deploy", "user", "user-id", @@ -674,7 +733,7 @@ describe("DockerDeploymentManager", () => { delete process.env.WORKER_NETWORK; process.env.COMPOSE_PROJECT_NAME = "myproject"; const manager = createManager(); - await manager.createDeployment( + await manager.ensureDeployment( "test-deploy", "user", "user-id", @@ -694,7 +753,7 @@ describe("DockerDeploymentManager", () => { const manager = createManager(); await new Promise((r) => setTimeout(r, 50)); - await manager.createDeployment( + await manager.ensureDeployment( "test-deploy", "user", "user-id", @@ -724,7 +783,7 @@ describe("DockerDeploymentManager", () => { (p: any) => String(p) === "/.dockerenv" ); const manager = createManager(); - await manager.createDeployment( + await manager.ensureDeployment( "test-deploy", "user", "user-id", @@ -738,7 +797,7 @@ describe("DockerDeploymentManager", () => { process.env.CONTAINER = "true"; const existsSpy = spyOn(fs, "existsSync").mockReturnValue(false); const manager = createManager(); - await manager.createDeployment( + await manager.ensureDeployment( "test-deploy", "user", "user-id", @@ -752,7 +811,7 @@ describe("DockerDeploymentManager", () => { delete process.env.CONTAINER; const existsSpy = spyOn(fs, "existsSync").mockReturnValue(false); const manager = createManager(); - await manager.createDeployment( + await manager.ensureDeployment( "test-deploy", "user", "user-id", @@ -780,7 +839,7 @@ describe("DockerDeploymentManager", () => { }, }, }); - await manager.createDeployment( + await manager.ensureDeployment( "test-deploy", "user", "user-id", @@ -791,7 +850,7 @@ describe("DockerDeploymentManager", () => { test("uses tag reference when no digest: repo:tag", async () => { const manager = createManager(); - await manager.createDeployment( + await manager.ensureDeployment( "test-deploy", "user", "user-id", @@ -812,7 +871,7 @@ describe("DockerDeploymentManager", () => { }, }, }); - await manager.createDeployment( + await manager.ensureDeployment( "test-deploy", "user", "user-id", @@ -1085,7 +1144,7 @@ describe("DockerDeploymentManager", () => { describe("labels", () => { test("sets base worker labels", async () => { const manager = createManager(); - await manager.createDeployment( + await manager.ensureDeployment( "test-deploy", "user", "user-id", @@ -1100,7 +1159,7 @@ describe("DockerDeploymentManager", () => { test("sets Docker Compose project labels", async () => { process.env.COMPOSE_PROJECT_NAME = "myproject"; const manager = createManager(); - await manager.createDeployment( + await manager.ensureDeployment( "test-deploy", "user", "user-id", @@ -1113,7 +1172,7 @@ describe("DockerDeploymentManager", () => { test("sets agent-id label", async () => { const manager = createManager(); - await manager.createDeployment( + await manager.ensureDeployment( "test-deploy", "user", "user-id", @@ -1134,7 +1193,7 @@ describe("DockerDeploymentManager", () => { delete process.env.CONTAINER; const existsSpy = spyOn(fs, "existsSync").mockReturnValue(false); const manager = createManager(); - await manager.createDeployment( + await manager.ensureDeployment( "test-deploy", "user", "user-id", @@ -1150,7 +1209,7 @@ describe("DockerDeploymentManager", () => { process.env.CONTAINER = "true"; const existsSpy = spyOn(fs, "existsSync").mockReturnValue(false); const manager = createManager(); - await manager.createDeployment( + await manager.ensureDeployment( "test-deploy", "user", "user-id", @@ -1169,7 +1228,7 @@ describe("DockerDeploymentManager", () => { test("adds seccomp profile when WORKER_SECCOMP_PROFILE set", async () => { process.env.WORKER_SECCOMP_PROFILE = "/path/to/seccomp.json"; const manager = createManager(); - await manager.createDeployment( + await manager.ensureDeployment( "test-deploy", "user", "user-id", @@ -1183,7 +1242,7 @@ describe("DockerDeploymentManager", () => { test("adds apparmor profile when WORKER_APPARMOR_PROFILE set", async () => { process.env.WORKER_APPARMOR_PROFILE = "docker-custom"; const manager = createManager(); - await manager.createDeployment( + await manager.ensureDeployment( "test-deploy", "user", "user-id", @@ -1197,7 +1256,7 @@ describe("DockerDeploymentManager", () => { test("disables readonly rootfs when WORKER_READONLY_ROOTFS=false", async () => { process.env.WORKER_READONLY_ROOTFS = "false"; const manager = createManager(); - await manager.createDeployment( + await manager.ensureDeployment( "test-deploy", "user", "user-id", diff --git a/packages/gateway/src/__tests__/embedded-deployment.test.ts b/packages/gateway/src/__tests__/embedded-deployment.test.ts index 50bcce758..1afd0e369 100644 --- a/packages/gateway/src/__tests__/embedded-deployment.test.ts +++ b/packages/gateway/src/__tests__/embedded-deployment.test.ts @@ -159,38 +159,60 @@ describe("EmbeddedDeploymentManager", () => { // Lifecycle: create / list / scale / delete // ========================================================================= describe("lifecycle", () => { - test("createDeployment then listDeployments returns 1 entry", async () => { + test("ensureDeployment then listDeployments returns 1 entry", async () => { const msg = createTestMessagePayload(); - await manager.createDeployment("worker-1", "user-1", "user-1", msg); + await manager.ensureDeployment("worker-1", "user-1", "user-1", msg); const list = await manager.listDeployments(); expect(list).toHaveLength(1); expect(list[0].deploymentName).toBe("worker-1"); expect(list[0].replicas).toBe(1); }); - test("createDeployment spawns a child process", async () => { + test("ensureDeployment spawns a child process", async () => { const msg = createTestMessagePayload(); - await manager.createDeployment("worker-1", "user-1", "user-1", msg); + await manager.ensureDeployment("worker-1", "user-1", "user-1", msg); expect(mockChildProcesses).toHaveLength(1); expect(mockChildProcesses[0]).toBeDefined(); expect(mockSpawn.mock.calls.at(-1)?.[0]).toBe(process.execPath); }); - test("createDeployment with different names returns multiple entries", async () => { + test("ensureDeployment with different names returns multiple entries", async () => { const msg1 = createTestMessagePayload({ agentId: "agent-a" }); const msg2 = createTestMessagePayload({ agentId: "agent-b", conversationId: "conv-2", }); - await manager.createDeployment("worker-1", "user-1", "user-1", msg1); - await manager.createDeployment("worker-2", "user-1", "user-1", msg2); + await manager.ensureDeployment("worker-1", "user-1", "user-1", msg1); + await manager.ensureDeployment("worker-2", "user-1", "user-1", msg2); const list = await manager.listDeployments(); expect(list).toHaveLength(2); }); + test("ensureDeployment is idempotent for the same name (sequential)", async () => { + const msg = createTestMessagePayload(); + await manager.ensureDeployment("worker-1", "user-1", "user-1", msg); + await manager.ensureDeployment("worker-1", "user-1", "user-1", msg); + await manager.ensureDeployment("worker-1", "user-1", "user-1", msg); + expect(mockChildProcesses).toHaveLength(1); + const list = await manager.listDeployments(); + expect(list).toHaveLength(1); + }); + + test("ensureDeployment coalesces concurrent calls for the same name", async () => { + const msg = createTestMessagePayload(); + await Promise.all([ + manager.ensureDeployment("worker-1", "user-1", "user-1", msg), + manager.ensureDeployment("worker-1", "user-1", "user-1", msg), + manager.ensureDeployment("worker-1", "user-1", "user-1", msg), + ]); + expect(mockChildProcesses).toHaveLength(1); + const list = await manager.listDeployments(); + expect(list).toHaveLength(1); + }); + test("scaleDeployment(0) kills worker and removes from map", async () => { const msg = createTestMessagePayload(); - await manager.createDeployment("worker-1", "user-1", "user-1", msg); + await manager.ensureDeployment("worker-1", "user-1", "user-1", msg); await manager.scaleDeployment("worker-1", 0); const list = await manager.listDeployments(); expect(list).toHaveLength(0); @@ -198,7 +220,7 @@ describe("EmbeddedDeploymentManager", () => { test("deleteDeployment kills process and removes entry", async () => { const msg = createTestMessagePayload(); - await manager.createDeployment("worker-1", "user-1", "user-1", msg); + await manager.ensureDeployment("worker-1", "user-1", "user-1", msg); await manager.deleteDeployment("worker-1"); const list = await manager.listDeployments(); expect(list).toHaveLength(0); @@ -232,7 +254,7 @@ describe("EmbeddedDeploymentManager", () => { test("lastActivity is set at creation time", async () => { const before = Date.now(); const msg = createTestMessagePayload(); - await manager.createDeployment("worker-1", "user-1", "user-1", msg); + await manager.ensureDeployment("worker-1", "user-1", "user-1", msg); const after = Date.now(); const list = await manager.listDeployments(); const ts = list[0].lastActivity.getTime(); @@ -242,7 +264,7 @@ describe("EmbeddedDeploymentManager", () => { test("updateDeploymentActivity advances timestamp", async () => { const msg = createTestMessagePayload(); - await manager.createDeployment("worker-1", "user-1", "user-1", msg); + await manager.ensureDeployment("worker-1", "user-1", "user-1", msg); const listBefore = await manager.listDeployments(); const tsBefore = listBefore[0].lastActivity.getTime(); @@ -268,7 +290,7 @@ describe("EmbeddedDeploymentManager", () => { test("does not mutate gateway process.env", async () => { const envBefore = { ...process.env }; const msg = createTestMessagePayload(); - await manager.createDeployment("worker-1", "user-1", "user-1", msg); + await manager.ensureDeployment("worker-1", "user-1", "user-1", msg); // Gateway process.env should not have new worker-specific vars added // (WORKSPACE_DIR, WORKER_TOKEN, etc. are passed to subprocess env, not process.env) expect(process.env.WORKSPACE_DIR).toBe(envBefore.WORKSPACE_DIR); @@ -279,7 +301,7 @@ describe("EmbeddedDeploymentManager", () => { test("does not set globalThis.__lobuEmbeddedBashOps", async () => { const msg = createTestMessagePayload(); - await manager.createDeployment("worker-1", "user-1", "user-1", msg); + await manager.ensureDeployment("worker-1", "user-1", "user-1", msg); expect((globalThis as any).__lobuEmbeddedBashOps).toBeUndefined(); }); @@ -291,7 +313,7 @@ describe("EmbeddedDeploymentManager", () => { const existsSpy = spyOn(fs, "existsSync").mockReturnValue(true); try { const msg = createTestMessagePayload(); - await manager.createDeployment("worker-1", "user-1", "user-1", msg); + await manager.ensureDeployment("worker-1", "user-1", "user-1", msg); const spawnCall = mockSpawn.mock.calls.at(-1); expect(spawnCall).toBeDefined(); @@ -308,7 +330,7 @@ describe("EmbeddedDeploymentManager", () => { test("child process exit removes worker from map", async () => { const msg = createTestMessagePayload(); - await manager.createDeployment("worker-1", "user-1", "user-1", msg); + await manager.ensureDeployment("worker-1", "user-1", "user-1", msg); expect(await manager.listDeployments()).toHaveLength(1); // Simulate child process exiting @@ -328,7 +350,7 @@ describe("EmbeddedDeploymentManager", () => { describe("listDeployments shape", () => { test("returns DeploymentInfo with expected fields", async () => { const msg = createTestMessagePayload(); - await manager.createDeployment("worker-1", "user-1", "user-1", msg); + await manager.ensureDeployment("worker-1", "user-1", "user-1", msg); const list = await manager.listDeployments(); const info = list[0]; expect(info.deploymentName).toBe("worker-1"); @@ -342,7 +364,7 @@ describe("EmbeddedDeploymentManager", () => { test("newly created worker is not idle", async () => { const msg = createTestMessagePayload(); - await manager.createDeployment("worker-1", "user-1", "user-1", msg); + await manager.ensureDeployment("worker-1", "user-1", "user-1", msg); const list = await manager.listDeployments(); expect(list[0].isIdle).toBe(false); expect(list[0].isVeryOld).toBe(false); diff --git a/packages/gateway/src/__tests__/interaction-bridge-action-handlers.test.ts b/packages/gateway/src/__tests__/interaction-bridge-action-handlers.test.ts index 0fa33afdb..46a5fee70 100644 --- a/packages/gateway/src/__tests__/interaction-bridge-action-handlers.test.ts +++ b/packages/gateway/src/__tests__/interaction-bridge-action-handlers.test.ts @@ -153,7 +153,7 @@ describe("registerActionHandlers — tool approval", () => { expect(edited).toContain("Approved (1h)"); }); - test("approve with no pending (retry / stale) silently no-ops — no grant, no execute, no user-visible message", async () => { + test("approve with no pending but tracked card (late first click) edits card and posts an expired notice — no grant, no execute", async () => { const h = setup({ pending: null }); await h.handler({ actionId: "tool:req-x:1h", @@ -162,7 +162,11 @@ describe("registerActionHandlers — tool approval", () => { }); expect(h.grantStore.grant).not.toHaveBeenCalled(); expect(h.executeToolDirect).not.toHaveBeenCalled(); - expect(h.post).not.toHaveBeenCalled(); + // Card should be edited to show the expired notice and the user told to retry. + expect(h.editCard).toHaveBeenCalledTimes(1); + expect(h.editCard.mock.calls[0]?.[0] as string).toMatch(/expired/i); + expect(h.post).toHaveBeenCalledTimes(1); + expect(h.post.mock.calls[0]?.[0] as string).toMatch(/expired/i); }); test("approve but tool execution throws posts failure message and still stores grant", async () => { @@ -215,7 +219,7 @@ describe("registerActionHandlers — tool approval", () => { expect(h.post.mock.calls[0]?.[0]).toMatch(/denied/i); }); - test("deny with no pending (retry / stale) silently no-ops — no grant, no user-visible message", async () => { + test("deny with no pending but tracked card (late first click) edits card and posts an expired notice — no grant", async () => { const h = setup({ pending: null }); await h.handler({ actionId: "tool:req-7:deny", @@ -223,7 +227,10 @@ describe("registerActionHandlers — tool approval", () => { thread: h.thread, }); expect(h.grantStore.grant).not.toHaveBeenCalled(); - expect(h.post).not.toHaveBeenCalled(); + expect(h.editCard).toHaveBeenCalledTimes(1); + expect(h.editCard.mock.calls[0]?.[0] as string).toMatch(/expired/i); + expect(h.post).toHaveBeenCalledTimes(1); + expect(h.post.mock.calls[0]?.[0] as string).toMatch(/expired/i); }); test("deny edits the approval card to show denial summary", async () => { diff --git a/packages/gateway/src/__tests__/mcp-config-service.test.ts b/packages/gateway/src/__tests__/mcp-config-service.test.ts index b3e167f6b..801f552e7 100644 --- a/packages/gateway/src/__tests__/mcp-config-service.test.ts +++ b/packages/gateway/src/__tests__/mcp-config-service.test.ts @@ -136,7 +136,7 @@ describe("McpConfigService", () => { test("getWorkerConfig - merges per-agent MCPs", async () => { const mockAgentSettingsStore = { - getSettings: async (agentId: string) => { + getEffectiveSettings: async (agentId: string) => { if (agentId === "agent1") { return { mcpServers: { @@ -185,7 +185,7 @@ describe("McpConfigService", () => { test("getWorkerConfig - skips disabled MCPs", async () => { const mockAgentSettingsStore = { - getSettings: async () => ({ + getEffectiveSettings: async () => ({ mcpServers: { "disabled-mcp": { url: "https://disabled.example.com/mcp", @@ -215,7 +215,7 @@ describe("McpConfigService", () => { test("getWorkerConfig - global takes precedence over per-agent", async () => { const mockAgentSettingsStore = { - getSettings: async () => ({ + getEffectiveSettings: async () => ({ mcpServers: { "shared-mcp": { url: "https://agent-version.example.com/mcp", @@ -291,7 +291,7 @@ describe("McpConfigService", () => { test("getAllHttpServers - merges global + per-agent, excludes disabled and non-HTTP", async () => { const mockAgentSettingsStore = { - getSettings: async () => ({ + getEffectiveSettings: async () => ({ mcpServers: { "agent-http": { url: "https://agent-http.example.com/mcp", diff --git a/packages/gateway/src/__tests__/message-handler-bridge.test.ts b/packages/gateway/src/__tests__/message-handler-bridge.test.ts index c2291385f..e59a9c66a 100644 --- a/packages/gateway/src/__tests__/message-handler-bridge.test.ts +++ b/packages/gateway/src/__tests__/message-handler-bridge.test.ts @@ -1,9 +1,13 @@ -import { afterEach, beforeEach, describe, expect, test } from "bun:test"; +import { afterEach, beforeEach, describe, expect, mock, test } from "bun:test"; +import { ConversationStateStore } from "../connections/conversation-state-store"; import { type InboundAttachmentLike, ingestInboundAttachments, isSenderAllowed, + MessageHandlerBridge, } from "../connections/message-handler-bridge"; +import type { PlatformConnection } from "../connections/types"; +import { InMemoryStateAdapter } from "./fixtures/in-memory-state-adapter"; import { type ArtifactTestEnv, createArtifactTestEnv, @@ -150,3 +154,277 @@ describe("ingestInboundAttachments", () => { expect(files[0]?.name).toBe("image-1.jpeg"); }); }); + +/** + * MessageHandlerBridge.handleMessage previously had zero coverage despite + * being the single entry point for every inbound platform message. The + * thread-history backfill bug — bot mentioned mid-thread had no context — + * lived here undetected because no test exercised this path. + * + * These tests pin the backfill behavior so future regressions trip CI. + */ + +const CONN_ID = "conn-test"; +const CHANNEL_ID = "C123"; +const THREAD_ID = "slack:C123:1700000000.000100"; +const TEMPLATE_AGENT_ID = "agent-template"; + +function buildConnection(): PlatformConnection { + return { + id: CONN_ID, + platform: "slack", + templateAgentId: TEMPLATE_AGENT_ID, + config: { platform: "slack" } as any, + settings: { allowGroups: true } as any, + metadata: { + botUsername: "testbot", + botUserId: "U_BOT", + }, + status: "active", + createdAt: 1, + updatedAt: 1, + }; +} + +function createBridgeHarness(opts: { + fetchMessages?: ReturnType | undefined; + withAdapter?: boolean; +}) { + const state = new InMemoryStateAdapter(); + const conversationState = new ConversationStateStore(state); + const connection = buildConnection(); + const enqueueMessage = mock(async () => undefined); + + const services = { + getArtifactStore: () => null, + getPublicGatewayUrl: () => "https://gateway.example.com", + getChannelBindingService: () => undefined, + getAgentMetadataStore: () => undefined, + getUserAgentsStore: () => undefined, + getTranscriptionService: () => undefined, + getAgentSettingsStore: () => undefined, + getDeclaredAgentRegistry: () => undefined, + getQueueProducer: () => ({ enqueueMessage }), + } as any; + + const manager = { + has: () => true, + getInstance: () => ({ connection, conversationState }), + } as any; + + const bridge = new MessageHandlerBridge(connection, services, manager); + + let adapter: unknown; + if (opts.withAdapter === false) { + adapter = undefined; + } else if (opts.fetchMessages === undefined && opts.withAdapter !== true) { + adapter = undefined; + } else if (opts.fetchMessages) { + adapter = { fetchMessages: opts.fetchMessages }; + } else { + adapter = {}; + } + + return { bridge, conversationState, enqueueMessage, adapter }; +} + +function makeMessage(overrides: Record = {}) { + return { + id: "M_NEW", + text: "<@U_BOT> what was the prior context?", + author: { + userId: "U_USER", + userName: "alice", + fullName: "Alice", + isBot: false, + isMe: false, + }, + raw: {}, + attachments: [], + metadata: { dateSent: new Date(), edited: false }, + ...overrides, + }; +} + +function makeThread(adapter: unknown) { + return { + id: THREAD_ID, + channelId: CHANNEL_ID, + adapter, + subscribe: mock(async () => undefined), + post: mock(async () => undefined), + startTyping: mock(async () => undefined), + }; +} + +function priorMessage(id: string, text: string, isMe: boolean, whenMs: number) { + return { + id, + text, + author: { + userId: isMe ? "U_BOT" : "U_USER", + userName: isMe ? "testbot" : "alice", + fullName: isMe ? "TestBot" : "Alice", + isBot: isMe, + isMe, + }, + raw: {}, + attachments: [], + metadata: { dateSent: new Date(whenMs), edited: false }, + }; +} + +describe("MessageHandlerBridge.handleMessage — thread backfill", () => { + test("first mention with empty history backfills via adapter.fetchMessages", async () => { + const fetchMessages = mock(async () => ({ + messages: [ + priorMessage("M1", "favorite color is teal", false, 1_700_000_001_000), + priorMessage("M2", "I drive a Civic", false, 1_700_000_002_000), + // Drop the current mention — handler should skip it by id. + priorMessage( + "M_NEW", + "<@U_BOT> what was the prior context?", + false, + 1_700_000_003_000 + ), + ], + })); + const { bridge, conversationState, adapter, enqueueMessage } = + createBridgeHarness({ fetchMessages }); + const thread = makeThread(adapter); + + await bridge.handleMessage(thread, makeMessage(), "mention"); + + expect(fetchMessages).toHaveBeenCalledTimes(1); + expect(fetchMessages.mock.calls[0]?.[0]).toBe(THREAD_ID); + expect(fetchMessages.mock.calls[0]?.[1]).toEqual({ + limit: 50, + direction: "forward", + }); + + const entries = await conversationState.getEntries(CONN_ID, CHANNEL_ID); + // 2 backfilled (current message id is skipped) + the current mention + // message (mention text stripped of `<@U_BOT>`). + expect(entries.map((e) => e.content)).toEqual([ + "favorite color is teal", + "I drive a Civic", + "what was the prior context?", + ]); + expect(entries[0]?.role).toBe("user"); + + // Worker payload's conversationHistory was snapshotted AFTER backfill + // but BEFORE the new mention was appended — so backfill is visible + // and the current message is not (it gets passed via messageText). + expect(enqueueMessage).toHaveBeenCalledTimes(1); + const payload = enqueueMessage.mock.calls[0]?.[0] as any; + const history = payload.platformMetadata.conversationHistory; + expect(history).toHaveLength(2); + expect(history[0]).toMatchObject({ + role: "user", + content: "favorite color is teal", + }); + }); + + test("backfilled bot replies are tagged role=assistant", async () => { + const fetchMessages = mock(async () => ({ + messages: [ + priorMessage("M1", "Alice: hi", false, 1), + priorMessage("M2", "Hi Alice — how can I help?", true, 2), + ], + })); + const { bridge, conversationState, adapter } = createBridgeHarness({ + fetchMessages, + }); + const thread = makeThread(adapter); + + await bridge.handleMessage(thread, makeMessage(), "mention"); + + const entries = await conversationState.getEntries(CONN_ID, CHANNEL_ID); + expect(entries[0]?.role).toBe("user"); + expect(entries[1]?.role).toBe("assistant"); + }); + + test("second mention in the same thread does not refetch — claim is one-shot", async () => { + const fetchMessages = mock(async () => ({ + messages: [priorMessage("M1", "hello", false, 1)], + })); + const { bridge, adapter } = createBridgeHarness({ fetchMessages }); + const thread = makeThread(adapter); + + await bridge.handleMessage(thread, makeMessage({ id: "M_A" }), "mention"); + await bridge.handleMessage(thread, makeMessage({ id: "M_B" }), "mention"); + + expect(fetchMessages).toHaveBeenCalledTimes(1); + }); + + test("DM source skips backfill — DMs are linear, no hidden history", async () => { + const fetchMessages = mock(async () => ({ messages: [] })); + const { bridge, adapter } = createBridgeHarness({ fetchMessages }); + const thread = makeThread(adapter); + + await bridge.handleMessage(thread, makeMessage(), "dm"); + + expect(fetchMessages).not.toHaveBeenCalled(); + }); + + test("subscribed source also backfills if no prior claim exists", async () => { + const fetchMessages = mock(async () => ({ + messages: [priorMessage("M1", "earlier note", false, 1)], + })); + const { bridge, conversationState, adapter } = createBridgeHarness({ + fetchMessages, + }); + const thread = makeThread(adapter); + + await bridge.handleMessage(thread, makeMessage(), "subscribed"); + + expect(fetchMessages).toHaveBeenCalledTimes(1); + const entries = await conversationState.getEntries(CONN_ID, CHANNEL_ID); + expect(entries.map((e) => e.content)).toContain("earlier note"); + }); + + test("fetchMessages failure releases the claim so the next event retries", async () => { + let calls = 0; + const fetchMessages = mock(async () => { + calls++; + if (calls === 1) throw new Error("Slack rate limited"); + return { messages: [priorMessage("M1", "recovered", false, 1)] }; + }); + const { bridge, conversationState, adapter } = createBridgeHarness({ + fetchMessages, + }); + const thread = makeThread(adapter); + + // First mention — fetch throws, marker is released. + await bridge.handleMessage(thread, makeMessage({ id: "M_A" }), "mention"); + expect(fetchMessages).toHaveBeenCalledTimes(1); + let entries = await conversationState.getEntries(CONN_ID, CHANNEL_ID); + // Only the current message survives — no backfill from the throw. + expect(entries.map((e) => e.content)).toEqual([ + "what was the prior context?", + ]); + + // Second mention — claim is free, fetch retries and succeeds. + await bridge.handleMessage(thread, makeMessage({ id: "M_B" }), "mention"); + expect(fetchMessages).toHaveBeenCalledTimes(2); + entries = await conversationState.getEntries(CONN_ID, CHANNEL_ID); + expect(entries.map((e) => e.content)).toContain("recovered"); + }); + + test("adapter without fetchMessages does not retry-storm", async () => { + // Some adapters may not implement fetchMessages. We must keep the claim + // (treating it as success) so subsequent events don't re-probe a + // missing capability on every message. + const { bridge, conversationState } = createBridgeHarness({ + withAdapter: true, + }); + const thread = makeThread({}); + + await bridge.handleMessage(thread, makeMessage({ id: "M_A" }), "mention"); + await bridge.handleMessage(thread, makeMessage({ id: "M_B" }), "mention"); + + // Only the two current mentions land in history. + const entries = await conversationState.getEntries(CONN_ID, CHANNEL_ID); + expect(entries).toHaveLength(2); + }); +}); diff --git a/packages/gateway/src/auth/mcp/config-service.ts b/packages/gateway/src/auth/mcp/config-service.ts index 7f4765acf..479541ce8 100644 --- a/packages/gateway/src/auth/mcp/config-service.ts +++ b/packages/gateway/src/auth/mcp/config-service.ts @@ -89,6 +89,34 @@ export class McpConfigService { ); } + /** + * Register or replace a single global MCP server. Used for runtime-derived + * entries (e.g. the Owletto memory MCP, whose upstream URL is resolved + * from `MEMORY_URL` at startup and may change when `lobu.toml` reloads). + */ + upsertGlobalServer(id: string, serverConfig: Record): void { + if (!this.cache) { + this.cache = { + rawServers: {}, + httpServers: new Map(), + }; + } + + const normalized = normalizeConfig({ mcpServers: { [id]: serverConfig } }); + const raw = normalized.rawServers[id]; + if (raw) { + this.cache.rawServers[id] = raw; + } + const http = normalized.httpServers.get(id); + if (http) { + this.cache.httpServers.set(id, http); + } else { + this.cache.httpServers.delete(id); + } + + logger.info(`Upserted global MCP "${id}"`); + } + /** * Return MCP config tailored for a worker request. */ @@ -275,7 +303,8 @@ export class McpConfigService { } try { - const settings = await this.agentSettingsStore.getSettings(agentId); + const settings = + await this.agentSettingsStore.getEffectiveSettings(agentId); return settings?.mcpServers || {}; } catch (error) { logger.warn(`Failed to load per-agent MCP settings for ${agentId}`, { diff --git a/packages/gateway/src/auth/mcp/proxy.ts b/packages/gateway/src/auth/mcp/proxy.ts index 7487e2ead..60173e40b 100644 --- a/packages/gateway/src/auth/mcp/proxy.ts +++ b/packages/gateway/src/auth/mcp/proxy.ts @@ -155,7 +155,12 @@ function extractSessionToken(c: Context): string | null { export class McpProxy { private readonly SESSION_TTL_SECONDS = 30 * 60; // 30 minutes - private readonly PENDING_TOOL_TTL = 300; // 5 minutes + // Tool-approval cards may sit in-thread for a long time before the user + // actually clicks (Slack notifications, async review, etc.). The pending + // invocation key holds the args needed to execute the tool after approval; + // 24h gives users a realistic window to respond. Anything shorter silently + // drops late clicks (the GETDEL returns null and the click no-ops). + private readonly PENDING_TOOL_TTL = 24 * 60 * 60; // 24 hours private readonly redisClient: any; private app: Hono; private readonly toolCache?: McpToolCache; @@ -666,6 +671,10 @@ export class McpProxy { args: toolArguments, agentId, userId: requesterUserId, + channelId: auth.tokenData.channelId || "", + conversationId: auth.tokenData.conversationId || "", + teamId: auth.tokenData.teamId, + connectionId: auth.tokenData.connectionId, }), "EX", this.PENDING_TOOL_TTL @@ -1070,6 +1079,10 @@ export class McpProxy { args: toolArgs, agentId, userId: tokenData.userId, + channelId: tokenData.channelId || "", + conversationId: tokenData.conversationId || "", + teamId: tokenData.teamId, + connectionId: tokenData.connectionId, }), "EX", this.PENDING_TOOL_TTL diff --git a/packages/gateway/src/cli/gateway.ts b/packages/gateway/src/cli/gateway.ts index f7e2f06f7..34eb7a7d5 100644 --- a/packages/gateway/src/cli/gateway.ts +++ b/packages/gateway/src/cli/gateway.ts @@ -315,7 +315,12 @@ export function createGatewayApp( agentConfigStore: coreServices.getConfigStore(), platformRegistry, approveToolCall: async (requestId: string, decision: string) => { - const raw = await approveRedis.get(`pending-tool:${requestId}`); + // GETDEL atomically claims the pending invocation so a retry of + // POST /api/v1/agents/approve (CLI re-tries, double-clicks, Slack + // webhook retries that ultimately reach the same key, etc.) cannot + // double-execute the tool. The Slack/Telegram interaction-bridge + // path has the same guard — both consumers MUST use GETDEL. + const raw = await approveRedis.getdel(`pending-tool:${requestId}`); if (!raw) return { success: false, error: "Request not found or expired" }; const pending = JSON.parse(raw); @@ -332,7 +337,6 @@ export function createGatewayApp( null, true ); - await approveRedis.del(`pending-tool:${requestId}`); return { success: true }; } await approveGrantStore?.grant( @@ -348,10 +352,8 @@ export function createGatewayApp( pending.toolName, pending.args ); - await approveRedis.del(`pending-tool:${requestId}`); return { success: true, result } as any; } - await approveRedis.del(`pending-tool:${requestId}`); return { success: true }; }, }); diff --git a/packages/gateway/src/config/index.ts b/packages/gateway/src/config/index.ts index a324aaa1c..cc7ab9ec3 100644 --- a/packages/gateway/src/config/index.ts +++ b/packages/gateway/src/config/index.ts @@ -197,7 +197,14 @@ export function loadEnvFile(envPath?: string): void { : path.resolve(process.cwd(), ".env"); if (existsSync(resolvedPath)) { - dotenvConfig({ path: resolvedPath }); + // Match the .env-as-single-source-of-truth contract used by + // docker-compose (see PR #209: compose no longer re-exports + // `DEPLOYMENT_MODE` from the shell). `override: true` means values in + // the file win over stale shell exports inherited from the user's + // environment, so `lobu gateway --env .env` and `docker compose up` + // behave consistently. Production (`NODE_ENV=production`) skips this + // path entirely, so real deployments are unaffected. + dotenvConfig({ path: resolvedPath, override: true }); logger.debug(`Loaded environment variables from ${resolvedPath}`); } else if (envProvided) { logger.warn( @@ -244,13 +251,20 @@ function isPluginInstalled(source: string): boolean { createRequire(resolverPath).resolve(source); return true; } catch { - const packageDir = path.join( - path.dirname(resolverPath), - "node_modules", - ...packagePathParts - ); - if (existsSync(packageDir)) { - return true; + // require.resolve() can fail for ESM-only packages whose `exports` map + // omits a `require`/`default` condition (e.g. @lobu/owletto-openclaw). + // Fall back to walking up parent directories looking for the package + // folder under any ancestor `node_modules`, mirroring Node's module + // resolution algorithm. + let dir = path.dirname(resolverPath); + while (true) { + const packageDir = path.join(dir, "node_modules", ...packagePathParts); + if (existsSync(path.join(packageDir, "package.json"))) { + return true; + } + const parent = path.dirname(dir); + if (parent === dir) break; + dir = parent; } } } diff --git a/packages/gateway/src/connections/chat-instance-manager.ts b/packages/gateway/src/connections/chat-instance-manager.ts index 4ef276f6c..7b8c7c767 100644 --- a/packages/gateway/src/connections/chat-instance-manager.ts +++ b/packages/gateway/src/connections/chat-instance-manager.ts @@ -24,6 +24,7 @@ import { } from "./conversation-state-store"; import { createGatewayStateAdapter } from "./state-adapter"; import { SlackConnectionCoordinator } from "./slack-connection-coordinator"; +import { SlackInstructionProvider } from "./slack-instruction-provider"; import { registerSlackPlatformHandlers } from "./slack-platform-bridge"; import type { MessageHandlerBridge } from "./message-handler-bridge"; import { @@ -620,19 +621,32 @@ export class ChatInstanceManager { } }; - // Populate metadata (bot username etc.) from adapter properties - if (!connection.metadata.botUsername) { - try { + // Populate metadata (bot username, bot user id) from adapter properties. + // Slack adapters call `auth.test` during initialize and expose `botUserId` + // via a getter; we mirror it onto connection.metadata so message-bridge + // mention-strip and the Slack instruction provider can find it. + try { + const metadataUpdate: Record = {}; + if (!connection.metadata.botUsername) { const userName = adapter.userName || adapter.botUsername; if (userName) { - connection.metadata.botUsername = userName; - await this.updateConnection(connection.id, { - metadata: { botUsername: userName }, - }); + metadataUpdate.botUsername = userName; } - } catch { - // non-critical } + if (!connection.metadata.botUserId) { + const botUserId = adapter.botUserId; + if (botUserId) { + metadataUpdate.botUserId = botUserId; + } + } + if (Object.keys(metadataUpdate).length > 0) { + Object.assign(connection.metadata, metadataUpdate); + await this.updateConnection(connection.id, { + metadata: metadataUpdate, + }); + } + } catch { + // non-critical } this.instances.set(connection.id, { @@ -940,6 +954,11 @@ export class ChatInstanceManager { } ) => this.routePlatformMessage(name, token, message, options), getFileHandler: (options) => this.getPlatformFileHandler(name, options), + ...(name === "slack" + ? { + getInstructionProvider: () => new SlackInstructionProvider(this), + } + : {}), getConversationHistory: ( channelId: string, conversationId: string | undefined, diff --git a/packages/gateway/src/connections/chat-response-bridge.ts b/packages/gateway/src/connections/chat-response-bridge.ts index 54188a795..3c6632bb0 100644 --- a/packages/gateway/src/connections/chat-response-bridge.ts +++ b/packages/gateway/src/connections/chat-response-bridge.ts @@ -48,6 +48,154 @@ function buildCurrentMessageFromMetadata( }; } +/** + * Decode HTML entities back to their literal characters. Slack's `chat.postMessage` + * `text` field auto-escapes `<`, `>`, `&` and re-rendering already-escaped content + * (e.g. text the worker streamed via the SDK that came back through history) leaves + * `>` etc. visible to the user. Use the `markdown_text` field for a Slack post + * so Slack does not double-escape, and pre-decode to handle entities the worker + * may have produced upstream (e.g. from MCP tool results that returned HTML). + */ +function decodeHtmlEntities(input: string): string { + return input + .replace(/</g, "<") + .replace(/>/g, ">") + .replace(/"/g, '"') + .replace(/'/g, "'") + .replace(/'/g, "'") + .replace(/ /g, " ") + .replace(/&/g, "&"); +} + +/** + * Strip empty markdown links `[text]()` → `text`. Some MCP tools (notably + * deepwiki) emit citation footnotes with no URL; rendering them as links + * leaves visible empty parens in Slack/Telegram. + */ +function stripEmptyLinks(input: string): string { + return input.replace(/\[([^\]]+)\]\(\s*\)/g, "$1"); +} + +/** + * Slack accepts up to 12,000 chars per `markdown_text` post. Keep a margin so + * downstream emoji/mention expansion does not push us over the limit. + */ +const SLACK_MARKDOWN_CHUNK_SIZE = 11_000; + +/** + * Split text on paragraph boundaries (`\n\n`) so we never break mid-sentence, + * mid-list, or mid-code-fence when posting multiple chunks. Long paragraphs + * that exceed the limit on their own fall back to line boundaries, then to + * a hard slice as last resort. + */ +function chunkOnParagraphBoundaries( + text: string, + maxChunkSize: number +): string[] { + if (text.length <= maxChunkSize) return [text]; + + const chunks: string[] = []; + const paragraphs = text.split(/\n\n+/); + let current = ""; + + const flush = () => { + if (current.length > 0) { + chunks.push(current); + current = ""; + } + }; + + const pushOversized = (chunk: string) => { + // Try line boundaries first, then hard slice as a last resort. + const lines = chunk.split("\n"); + let buf = ""; + for (const line of lines) { + if (buf.length + line.length + 1 > maxChunkSize) { + if (buf) chunks.push(buf); + buf = ""; + if (line.length > maxChunkSize) { + for (let i = 0; i < line.length; i += maxChunkSize) { + const slice = line.slice(i, i + maxChunkSize); + if (i + maxChunkSize >= line.length) { + buf = slice; + } else { + chunks.push(slice); + } + } + } else { + buf = line; + } + } else { + buf = buf ? `${buf}\n${line}` : line; + } + } + if (buf) chunks.push(buf); + }; + + for (const para of paragraphs) { + if (para.length > maxChunkSize) { + flush(); + pushOversized(para); + continue; + } + const candidate = current ? `${current}\n\n${para}` : para; + if (candidate.length > maxChunkSize) { + flush(); + current = para; + } else { + current = candidate; + } + } + flush(); + return chunks; +} + +/** + * Post a text body to a Slack channel/thread using `chat.postMessage` with + * `markdown_text`, so Slack renders markdown directly and does not HTML-escape + * `<`, `>`, `&`. Splits long bodies on paragraph boundaries to avoid hitting + * Slack's 12,000-char per-post limit. + * + * Returns true if the post was handled here, false if the caller should fall + * back to the SDK's generic `target.post()` path. + */ +async function postSlackMarkdown( + instance: any, + channelId: string, + conversationId: string | undefined, + body: string +): Promise { + const adapter = instance.chat?.getAdapter?.("slack"); + const slackClient = adapter?.client; + if (!slackClient?.chat?.postMessage) return false; + + // channelId looks like "slack:C0123ABCD"; conversationId either equals it + // (DM/channel-level) or is "slack:C0123ABCD:1700000000.123456" for a thread. + const channel = channelId.startsWith("slack:") + ? channelId.slice("slack:".length) + : channelId; + let thread_ts: string | undefined; + if (conversationId && conversationId !== channelId) { + const parts = conversationId.split(":"); + if (parts.length === 3 && parts[0] === "slack") { + thread_ts = parts[2]; + } + } + + const chunks = chunkOnParagraphBoundaries(body, SLACK_MARKDOWN_CHUNK_SIZE); + for (const chunk of chunks) { + if (!chunk.trim()) continue; + await slackClient.chat.postMessage({ + channel, + ...(thread_ts ? { thread_ts } : {}), + markdown_text: chunk, + unfurl_links: false, + unfurl_media: false, + }); + } + return true; +} + /** * Push-based async iterable: producers call `push(value)` and `close()`; * consumers iterate via `for await (...)`. @@ -177,10 +325,50 @@ export class ChatResponseBridge implements ResponseRenderer { const ctx = this.extractResponseContext(payload); if (!ctx) return null; - const { connectionId, instance, channelId } = ctx; + const { connectionId, instance, channelId, platform } = ctx; const key = `${channelId}:${payload.conversationId}`; const existing = this.streams.get(key); + // For Slack we skip the SDK streaming path entirely and post a single + // chunked `markdown_text` message at completion. The Slack streaming API + // (`chat.appendStream`) auto-splits at fixed sizes (breaking mid-line) + // and the regular `chat.postMessage` `text` field HTML-escapes `<`/`>`/`&`. + // Buffer-and-post on completion gives us paragraph-aligned chunks AND + // markdown-native rendering. See `postSlackMarkdown` above. + if (platform === "slack") { + if (payload.isFullReplacement && existing) { + // Discard prior buffered content — the worker is replacing it. + this.streams.delete(key); + } + let stream = this.streams.get(key); + if (!stream) { + // Resolve the SDK target up front so that if `postSlackMarkdown` + // can't reach `slackClient.chat.postMessage` at completion (adapter + // not wired, getAdapter returns undefined, etc.) we still have a + // non-null fallback and the response doesn't silently disappear. + const fallbackTarget = await this.resolveTarget( + instance, + channelId, + payload.conversationId, + (payload.platformMetadata as any)?.responseThreadId, + payload.platformMetadata as Record | undefined + ).catch(() => null); + stream = { + iterator: new AsyncPushIterator(), + streamPromise: Promise.resolve(), + buffer: payload.delta, + streamFailed: true, // Force completion to use the post-buffer path + wasFullyReplaced: !!payload.isFullReplacement, + target: fallbackTarget, + }; + this.streams.set(key, stream); + } else { + stream.buffer += payload.delta; + if (payload.isFullReplacement) stream.wasFullyReplaced = true; + } + return null; + } + // Full replacement: close current stream, await delivery, then start fresh. // This only fires in rare error paths (see worker.ts:584). if (payload.isFullReplacement && existing) { @@ -261,7 +449,7 @@ export class ChatResponseBridge implements ResponseRenderer { const ctx = this.extractResponseContext(payload); if (!ctx) return; - const { connectionId, channelId } = ctx; + const { connectionId, instance, channelId, platform } = ctx; const key = `${channelId}:${payload.conversationId}`; const stream = this.streams.get(key); @@ -275,11 +463,49 @@ export class ChatResponseBridge implements ResponseRenderer { "Adapter stream errored during completion" ); } - // Fallback: when native streaming rejected (e.g. Slack's chatStream - // requires a recipient user/team id that the public-API send path - // can't supply), post the accumulated buffer non-streaming so the - // response still lands in the thread instead of being silently dropped. - if (stream.streamFailed && stream.buffer.trim() && stream.target) { + + // Slack-specific path: always post via `markdown_text` with paragraph + // chunking (see handleDelta — we never opened a real stream for Slack). + if (platform === "slack" && stream.buffer.trim()) { + const cleaned = stripEmptyLinks(decodeHtmlEntities(stream.buffer)); + try { + const handled = await postSlackMarkdown( + instance, + channelId, + payload.conversationId, + cleaned + ); + if (handled) { + logger.info( + { connectionId, channelId, length: cleaned.length }, + "Posted Slack response via markdown_text with paragraph chunking" + ); + } else if (stream.target) { + // Adapter unavailable — fall back to the SDK so we still deliver. + await stream.target.post(cleaned); + } + } catch (error) { + logger.warn( + { connectionId, error: String(error) }, + "Slack markdown_text post failed; falling back to SDK" + ); + if (stream.target) { + try { + await stream.target.post(cleaned); + } catch (fallbackError) { + logger.warn( + { connectionId, error: String(fallbackError) }, + "SDK fallback post also failed" + ); + } + } + } + } else if (stream.streamFailed && stream.buffer.trim() && stream.target) { + // Non-Slack fallback: when native streaming rejected (e.g. Slack's + // chatStream requires a recipient user/team id that the public-API + // send path can't supply), post the accumulated buffer non-streaming + // so the response still lands in the thread instead of being + // silently dropped. try { await stream.target.post(stream.buffer); logger.info( diff --git a/packages/gateway/src/connections/conversation-state-store.ts b/packages/gateway/src/connections/conversation-state-store.ts index afcb2f738..b08c50868 100644 --- a/packages/gateway/src/connections/conversation-state-store.ts +++ b/packages/gateway/src/connections/conversation-state-store.ts @@ -141,6 +141,39 @@ export class ConversationStateStore { return entries.length > 0; } + /** + * Atomic "first time we've seen this thread" marker. Returns true on the + * first call per (connectionId, threadId) within HISTORY_TTL_MS, false + * thereafter. Used to ensure thread-history backfill from the platform + * runs at most once per thread per TTL window, even if multiple events + * race in. + * + * Pair with `releaseThreadBackfill` so a failed backfill (rate limit, + * transient network error) clears the marker and the next event can + * retry — otherwise a single failure poisons the thread for 24h. + */ + async claimThreadBackfill( + connectionId: string, + threadId: string + ): Promise { + return this.state.setIfNotExists( + this.threadBackfillKey(connectionId, threadId), + 1, + HISTORY_TTL_MS + ); + } + + async releaseThreadBackfill( + connectionId: string, + threadId: string + ): Promise { + await this.state.delete(this.threadBackfillKey(connectionId, threadId)); + } + + private threadBackfillKey(connectionId: string, threadId: string): string { + return `thread-backfilled:${connectionId}:${threadId}`; + } + async listHistoryChannels(connectionId: string): Promise { const index = await this.state.get( historyIndexKey(connectionId) diff --git a/packages/gateway/src/connections/interaction-bridge.ts b/packages/gateway/src/connections/interaction-bridge.ts index 6c6aba591..001de25f0 100644 --- a/packages/gateway/src/connections/interaction-bridge.ts +++ b/packages/gateway/src/connections/interaction-bridge.ts @@ -82,6 +82,10 @@ async function takePendingToolInvocation( args: Record; agentId: string; userId: string; + channelId?: string; + conversationId?: string; + teamId?: string; + connectionId?: string; } | null> { const raw = await redis.getdel(`${PENDING_TOOL_KEY_PREFIX}${requestId}`); if (!raw) return null; @@ -174,7 +178,9 @@ export function registerInteractionBridge( // Tracks posted tool-approval cards so we can edit them on click to strip // the buttons. Keyed by requestId (== PostedToolApproval.id == Redis key). - // Entries auto-expire after PENDING_TOOL_TTL (5min) matching Redis. + // Auto-expire window matches the Redis pending-tool TTL (24h) so a late + // click can still find the card to strip. + const APPROVAL_CARD_TTL_MS = 24 * 60 * 60 * 1000; const pendingApprovalCards = new Map(); const pendingApprovalTimers = new Map(); function trackApprovalCard(requestId: string, sent: SentMessage): void { @@ -182,7 +188,7 @@ export function registerInteractionBridge( const timer = setTimeout(() => { pendingApprovalCards.delete(requestId); pendingApprovalTimers.delete(requestId); - }, 300_000); + }, APPROVAL_CARD_TTL_MS); pendingApprovalTimers.set(requestId, timer); } function claimApprovalCard(requestId: string): SentMessage | undefined { @@ -498,7 +504,9 @@ export function registerInteractionBridge( "Background question-click processing failed" ); }); - } + }, + async (channelId, conversationId) => + resolveThread(manager, connectionId, channelId, conversationId) ); logger.info({ connectionId, platform }, "Interaction bridge registered"); @@ -558,7 +566,11 @@ export function registerActionHandlers( grantStore: GrantStore | undefined, executeToolDirect?: ExecuteToolDirectFn, claimApprovalCard?: (requestId: string) => SentMessage | undefined, - onQuestionClick?: OnQuestionClickFn + onQuestionClick?: OnQuestionClickFn, + resolveApprovalTarget?: ( + channelId: string, + conversationId: string + ) => Promise ): void { chat.onAction(async (event: any) => { const actionId: string = event.actionId ?? ""; @@ -575,17 +587,43 @@ export function registerActionHandlers( if (!requestId) return; - // GETDEL atomically claims the pending invocation. On retries (Slack - // re-delivering the same block_actions webhook) getdel returns null and - // we silently no-op — the first click already handled it. + // GETDEL atomically claims the pending invocation. On Slack retries of + // the same block_actions webhook the second GETDEL returns null and we + // silently no-op (the first click already won). But if the card was + // never claimed before — i.e. the in-memory approval card is still + // tracked — this is a real first click landing on an expired/missing + // pending key, and we MUST surface that to the user. Otherwise the + // click looks like it did nothing. const pending = await takePendingToolInvocation(redis, requestId).catch( () => null ); if (!pending) { - logger.debug( - { requestId, decision }, - "No pending tool invocation — ignoring (already handled or expired)" - ); + const sent = claimApprovalCard?.(requestId); + if (sent) { + logger.info( + { requestId, decision }, + "Tool approval click with no pending invocation — likely expired" + ); + try { + await sent.edit( + "*Tool Approval*\n\n_This approval request expired before it could be acted on. Re-send your last message to retry._" + ); + } catch { + // best effort + } + try { + await thread.post( + "This tool approval request expired before it could be acted on. Re-send your last message to retry." + ); + } catch { + // best effort + } + } else { + logger.debug( + { requestId, decision }, + "Tool approval click with no pending invocation and no tracked card — ignoring (already handled)" + ); + } return; } @@ -598,6 +636,25 @@ export function registerActionHandlers( decision ); + // Resolve the post target. Prefer the original conversation captured at + // the time the tool call was blocked (saved alongside the pending + // record) so the result lands in the same Slack/Telegram thread the + // user originally pinged the bot in. Fall back to the click event's + // thread (the card the user just clicked) only if we don't have the + // original context — that fallback can be wrong on Slack when the card + // ended up posted at channel level. + let postTarget: any = thread; + if ( + resolveApprovalTarget && + (pending.conversationId || pending.channelId) + ) { + const resolved = await resolveApprovalTarget( + pending.channelId ?? "", + pending.conversationId ?? "" + ).catch(() => null); + if (resolved) postTarget = resolved; + } + if (decision === "deny") { if (grantStore) { await grantStore @@ -605,7 +662,7 @@ export function registerActionHandlers( .catch(() => undefined); } try { - await thread.post( + await postTarget.post( "Tool call denied. Let me know if you'd like me to try a different approach." ); } catch { @@ -650,7 +707,7 @@ export function registerActionHandlers( ); const resultText = result.content.map((c) => c.text).join("\n"); - await thread.post( + await postTarget.post( result.isError ? `Tool error: ${resultText}` : resultText ); logger.info( @@ -668,14 +725,14 @@ export function registerActionHandlers( "Failed to execute tool after approval" ); try { - await thread.post(`Failed to execute tool: ${String(error)}`); + await postTarget.post(`Failed to execute tool: ${String(error)}`); } catch { // best effort } } } else { try { - await thread.post("approve"); + await postTarget.post("approve"); } catch { // best effort } diff --git a/packages/gateway/src/connections/message-handler-bridge.ts b/packages/gateway/src/connections/message-handler-bridge.ts index 2d5dc3148..719593db9 100644 --- a/packages/gateway/src/connections/message-handler-bridge.ts +++ b/packages/gateway/src/connections/message-handler-bridge.ts @@ -236,12 +236,21 @@ export class MessageHandlerBridge { const channelId = thread.channelId ?? thread.id ?? "unknown"; const messageId = message.id ?? String(Date.now()); const isGroup = source === "mention" || source === "subscribed"; - // Use the Chat SDK's canonical `thread.id` for group threads so a reply in - // an existing thread collapses to the same conversation as its parent - // (Slack: `slack:{channel}:{parent_thread_ts}`, Telegram: `telegram:{chatId}:{topicId}`). - // DMs use the bare channel id — they're channel-level, not thread-level. + // Collapse to the canonical `thread.id` whenever we're inside an existing + // thread — group thread reply OR DM thread reply alike. Slack encodes + // `slack:{channel}:{thread_ts}` (top-level DM has empty thread_ts so the id + // ends with a trailing `:`); Telegram encodes `telegram:{chatId}` for + // top-level and `telegram:{chatId}:{topicId}` inside a forum topic. Without + // this, a `onDirectMessage` event for a reply in a DM thread (e.g. the + // worker posted a `ScheduleReminder` ALARM_FIRED message and the user + // clicked Reply on it) would fall back to the channel id and the bot's + // response would land in the main DM pane instead of the thread. + const isThreadReply = + typeof thread.id === "string" && + thread.id !== channelId && + thread.id !== `${channelId}:`; const conversationId = - isGroup && typeof thread.id === "string" ? thread.id : channelId; + isGroup || isThreadReply ? (thread.id as string) : channelId; logger.info( { @@ -382,12 +391,22 @@ export class MessageHandlerBridge { } } - // Remove bot mention from text - const botUsername = this.manager.getInstance(this.connection.id)?.connection - .metadata.botUsername; + // Remove bot mention from text. Slack delivers raw `<@Uxxx>` tokens; the + // Chat SDK may strip the brackets, so we also catch the bare `@Uxxx` form. + const botMetadata = this.manager.getInstance(this.connection.id)?.connection + .metadata; + const botUsername = botMetadata?.botUsername as string | undefined; + const botUserId = botMetadata?.botUserId as string | undefined; if (botUsername) { messageText = messageText.replace(`@${botUsername}`, "").trim(); } + if (botUserId) { + messageText = messageText + .replace(new RegExp(`<@${botUserId}>`, "g"), "") + .replace(new RegExp(`@${botUserId}\\b`, "g"), "") + .replace(/\s+/g, " ") + .trim(); + } // Intercept /new and /clear before slash dispatch let sessionReset = false; @@ -423,6 +442,69 @@ export class MessageHandlerBridge { // Gap 1: Retrieve + append conversation history via the SDK state adapter. const conversationState = this.conversationState(); + + // Backfill: when the bot is first activated in a thread (mention or + // first subscribed event), ask the Chat SDK adapter for the thread's + // prior messages. Slack maps this to `conversations.replies` (Tier 3, + // generous limit). Without this, a mid-thread mention has no context + // for the messages that preceded it. `claimThreadBackfill` is an + // atomic per-thread one-shot guard — runs at most once per thread per + // HISTORY_TTL_MS window, regardless of how many events race in. + if ( + conversationState && + isGroup && + (await conversationState.claimThreadBackfill( + this.connection.id, + thread.id + )) + ) { + let backfillSucceeded = false; + try { + const adapter = (thread as any).adapter; + if (adapter?.fetchMessages) { + const result = await adapter.fetchMessages(thread.id, { + limit: 50, + direction: "forward", + }); + for (const prior of result.messages ?? []) { + if (prior.id === messageId) continue; + const text = (prior.text ?? "").trim(); + if (!text) continue; + const sentAt = + prior.metadata?.dateSent instanceof Date + ? prior.metadata.dateSent.getTime() + : Date.now(); + await conversationState.appendHistory( + this.connection.id, + channelId, + { + role: prior.author?.isMe ? "assistant" : "user", + content: text, + authorName: prior.author?.fullName, + timestamp: sentAt, + } + ); + } + backfillSucceeded = true; + } else { + // Adapter doesn't expose fetchMessages — nothing to retry, treat + // as "successful" so we don't hammer it on every event. + backfillSucceeded = true; + } + } catch (error) { + logger.warn( + { connectionId: this.connection.id, channelId, error: String(error) }, + "Thread backfill failed; will retry on next event" + ); + } + if (!backfillSucceeded) { + await conversationState.releaseThreadBackfill( + this.connection.id, + thread.id + ); + } + } + const conversationHistory = (await conversationState?.getHistory(this.connection.id, channelId)) ?? []; diff --git a/packages/gateway/src/connections/slack-instruction-provider.ts b/packages/gateway/src/connections/slack-instruction-provider.ts new file mode 100644 index 000000000..7469199ab --- /dev/null +++ b/packages/gateway/src/connections/slack-instruction-provider.ts @@ -0,0 +1,39 @@ +import type { InstructionContext, InstructionProvider } from "@lobu/core"; +import type { ChatInstanceManager } from "./chat-instance-manager"; + +export class SlackInstructionProvider implements InstructionProvider { + name = "slack-identity"; + priority = 20; + + constructor(private readonly manager: ChatInstanceManager) {} + + async getInstructions(context: InstructionContext): Promise { + const connections = await this.manager.listConnections({ + platform: "slack", + templateAgentId: context.agentId, + }); + const connection = connections[0]; + if (!connection) return ""; + + const botUsername = connection.metadata?.botUsername as string | undefined; + const botUserId = connection.metadata?.botUserId as string | undefined; + if (!botUsername && !botUserId) return ""; + + const lines: string[] = ["**Slack identity:**"]; + if (botUsername && botUserId) { + lines.push( + `- You are reachable in Slack as \`@${botUsername}\` (user ID \`${botUserId}\`).` + ); + } else if (botUsername) { + lines.push(`- You are reachable in Slack as \`@${botUsername}\`.`); + } else if (botUserId) { + lines.push(`- Your Slack user ID is \`${botUserId}\`.`); + } + if (botUserId) { + lines.push( + `- Mentions of \`<@${botUserId}>\` (or the bare \`@${botUserId}\`) refer to *you*; the gateway strips them before delivery, so anything you still see is incidental — do not treat your own ID as a stranger.` + ); + } + return lines.join("\n"); + } +} diff --git a/packages/gateway/src/gateway-main.ts b/packages/gateway/src/gateway-main.ts index 539b8a37a..fdb74ba96 100644 --- a/packages/gateway/src/gateway-main.ts +++ b/packages/gateway/src/gateway-main.ts @@ -79,6 +79,19 @@ export class Gateway { this.platforms.set(platform.name, platform); // Also register in global platform registry for deployment managers platformRegistry.register(platform); + + // If the gateway is already running, the start() registration loop has + // already passed. Register the instruction provider eagerly so platforms + // added post-start (chat adapters) still contribute identity context. + if (this.isRunning && platform.getInstructionProvider) { + const provider = platform.getInstructionProvider(); + if (provider) { + this.coreServices + .getInstructionService() + ?.registerPlatformProvider(platform.name, provider); + } + } + logger.debug(`Platform registered: ${platform.name}`); return this; } diff --git a/packages/gateway/src/orchestration/base-deployment-manager.ts b/packages/gateway/src/orchestration/base-deployment-manager.ts index 3676dcccd..5d637bf8a 100644 --- a/packages/gateway/src/orchestration/base-deployment-manager.ts +++ b/packages/gateway/src/orchestration/base-deployment-manager.ts @@ -197,6 +197,15 @@ export abstract class BaseDeploymentManager { * Redis instead of lingering forever. */ private grantSyncCache = new Map>(); + /** + * In-flight `ensureDeployment` promises keyed by deploymentName. Coalesces + * concurrent calls within a single gateway process so the orchestrator- + * specific `spawnDeployment` only runs once per deployment slot. Cross- + * process concurrency (multi-replica gateway) is handled by the underlying + * orchestrator's atomic name-uniqueness guarantee — each subclass catches + * the resulting AlreadyExists error and treats it as benign success. + */ + private inFlightCreates = new Map>(); constructor( config: OrchestratorConfig, @@ -249,7 +258,15 @@ export abstract class BaseDeploymentManager { // Abstract methods that must be implemented by concrete classes abstract listDeployments(): Promise; - abstract createDeployment( + /** + * Orchestrator-specific deployment spawn. Subclasses must implement the + * actual create call (process spawn / docker createContainer / k8s + * createNamespacedDeployment) and treat the orchestrator's AlreadyExists + * error as benign success — that is the cross-process serialization + * mechanism. Always invoked through `ensureDeployment` which provides + * in-process coalescing. + */ + protected abstract spawnDeployment( deploymentName: string, username: string, userId: string, @@ -269,6 +286,35 @@ export abstract class BaseDeploymentManager { */ protected abstract getDispatcherHost(): string; + /** + * Idempotent deployment ensure: returns the existing deployment if one is + * already being (or has been) created with this name, otherwise delegates + * to the orchestrator-specific `spawnDeployment`. Concurrent callers for + * the same name share a single in-flight promise. + */ + async ensureDeployment( + deploymentName: string, + username: string, + userId: string, + messageData?: MessagePayload + ): Promise { + const inFlight = this.inFlightCreates.get(deploymentName); + if (inFlight) { + return inFlight; + } + + const promise = this.spawnDeployment( + deploymentName, + username, + userId, + messageData + ).finally(() => { + this.inFlightCreates.delete(deploymentName); + }); + this.inFlightCreates.set(deploymentName, promise); + return promise; + } + /** * Resolve worker image reference. * If digest is configured, prefer immutable digest reference (repo@sha256:...). @@ -348,7 +394,7 @@ export abstract class BaseDeploymentManager { } } - await this.createDeployment(deploymentName, userId, userId, messageData); + await this.ensureDeployment(deploymentName, userId, userId, messageData); } catch (error) { throw new OrchestratorError( ErrorCode.DEPLOYMENT_CREATE_FAILED, diff --git a/packages/gateway/src/orchestration/impl/docker-deployment.ts b/packages/gateway/src/orchestration/impl/docker-deployment.ts index 8f2be0241..96e4bcfc1 100644 --- a/packages/gateway/src/orchestration/impl/docker-deployment.ts +++ b/packages/gateway/src/orchestration/impl/docker-deployment.ts @@ -333,12 +333,12 @@ export class DockerDeploymentManager extends BaseDeploymentManager { return volumeName; } - async createDeployment( - ...args: Parameters + protected async spawnDeployment( + deploymentName: string, + username: string, + userId: string, + messageData?: MessagePayload ): Promise { - const [deploymentName, username, userId, messageDataRaw] = args; - const messageData = messageDataRaw as MessagePayload | undefined; - try { // Use agentId for volume naming (shared across threads in same space) const agentId = messageData?.agentId; @@ -504,7 +504,30 @@ export class DockerDeploymentManager extends BaseDeploymentManager { WorkingDir: "/workspace", }; - const container = await this.docker.createContainer(createOptions); + let container: Docker.Container; + try { + container = await this.docker.createContainer(createOptions); + } catch (createError: any) { + // Another gateway replica created this container concurrently — Docker + // enforces unique container names cluster-wide on a host. Treat 409 as + // benign: the existing container is the canonical worker for this + // deployment slot, and we just need to ensure it's running. + if ( + createError?.statusCode === 409 || + createError?.message?.includes("already in use") + ) { + logger.info( + `Container ${deploymentName} already exists (created by another replica); ensuring it's started` + ); + const existing = this.docker.getContainer(deploymentName); + const info = await existing.inspect(); + if (!info.State.Running) { + await existing.start(); + } + return; + } + throw createError; + } try { await container.start(); } catch (startError) { diff --git a/packages/gateway/src/orchestration/impl/embedded-deployment.ts b/packages/gateway/src/orchestration/impl/embedded-deployment.ts index c6a869b99..cfabd3a06 100644 --- a/packages/gateway/src/orchestration/impl/embedded-deployment.ts +++ b/packages/gateway/src/orchestration/impl/embedded-deployment.ts @@ -77,11 +77,20 @@ export class EmbeddedDeploymentManager extends BaseDeploymentManager { logger.debug(`Worker entry point verified: ${entryPoint}`); } - async createDeployment( - ...args: Parameters + protected async spawnDeployment( + deploymentName: string, + username: string, + userId: string, + messageData?: MessagePayload ): Promise { - const [deploymentName, username, userId, messageDataRaw] = args; - const messageData = messageDataRaw as MessagePayload | undefined; + // Embedded mode is single-process by definition, so there is no cross- + // process orchestrator to enforce uniqueness. The base class's in-flight + // cache catches concurrent calls; this guards the rare case where a + // fully-completed worker is still in the map and a fresh create slips + // past the upstream `listDeployments()` check (e.g. stale snapshot). + if (this.workers.has(deploymentName)) { + return; + } const agentId = messageData?.agentId; if (!agentId) { @@ -201,7 +210,7 @@ export class EmbeddedDeploymentManager extends BaseDeploymentManager { logger.info(`Stopped embedded worker ${deploymentName}`); } else if (replicas === 1 && !entry) { logger.warn( - `Cannot scale up ${deploymentName} — use createDeployment to re-spawn` + `Cannot scale up ${deploymentName} — use ensureDeployment to re-spawn` ); } } diff --git a/packages/gateway/src/orchestration/impl/k8s/deployment.ts b/packages/gateway/src/orchestration/impl/k8s/deployment.ts index d0680b6b6..ffa9d9e79 100644 --- a/packages/gateway/src/orchestration/impl/k8s/deployment.ts +++ b/packages/gateway/src/orchestration/impl/k8s/deployment.ts @@ -480,7 +480,7 @@ export class K8sDeploymentManager extends BaseDeploymentManager { } } - async createDeployment( + protected async spawnDeployment( deploymentName: string, username: string, userId: string, @@ -761,6 +761,21 @@ export class K8sDeploymentManager extends BaseDeploymentManager { response?: { statusMessage?: string }; code?: string; }; + + // Another gateway replica created this deployment concurrently — the + // K8s API server enforces unique names atomically, so 409 AlreadyExists + // is the cluster-wide serialization signal. Treat it as benign success + // and DO NOT touch the PVC: it belongs to the deployment the winning + // replica just created and is now in active use. + if (k8sError.statusCode === 409) { + logger.info( + `Deployment ${deploymentName} already exists (created by another replica); treating as success` + ); + workerSpan?.setStatus({ code: SpanStatusCode.OK }); + workerSpan?.end(); + return; + } + // Log detailed error information logger.error(`❌ Failed to create deployment ${deploymentName}:`, { statusCode: k8sError.statusCode, @@ -799,15 +814,7 @@ export class K8sDeploymentManager extends BaseDeploymentManager { }); workerSpan?.end(); - // Check for specific error conditions and throw OrchestratorError - if (k8sError.statusCode === 409) { - throw new OrchestratorError( - ErrorCode.DEPLOYMENT_CREATE_FAILED, - `Deployment ${deploymentName} already exists`, - { deploymentName, statusCode: 409 }, - false - ); - } else if (k8sError.statusCode === 403) { + if (k8sError.statusCode === 403) { throw new OrchestratorError( ErrorCode.DEPLOYMENT_CREATE_FAILED, `Insufficient permissions to create deployment ${deploymentName}`, diff --git a/packages/gateway/src/routes/public/connections.ts b/packages/gateway/src/routes/public/connections.ts index 7983b70e6..e7d5b92be 100644 --- a/packages/gateway/src/routes/public/connections.ts +++ b/packages/gateway/src/routes/public/connections.ts @@ -374,9 +374,11 @@ export function createConnectionWebhookRoutes( return c.json({ error: "Connection not found" }, 404); } - logger.debug( + // Info-level so platform webhook traffic (Slack interactivity, Telegram + // updates, etc.) is visible in production logs without flipping LOG_LEVEL. + logger.info( { connectionId, platform: connection.platform }, - "Processing webhook" + "Inbound platform webhook" ); try { diff --git a/packages/gateway/src/services/core-services.ts b/packages/gateway/src/services/core-services.ts index 92340ef7f..20447f686 100644 --- a/packages/gateway/src/services/core-services.ts +++ b/packages/gateway/src/services/core-services.ts @@ -734,6 +734,8 @@ export class CoreServices { configResolver: this.providerConfigResolver, }); + this.syncOwlettoMcpFromEnv(); + // Initialize instruction service (needed by WorkerGateway) this.instructionService = new InstructionService( this.mcpConfigService, @@ -862,6 +864,30 @@ export class CoreServices { // File-First Helpers // ============================================================================ + /** + * Mirror the resolved `MEMORY_URL` env var into the MCP config service as a + * global `owletto` server. Without this, requests to `/mcp/owletto` (issued + * by the Owletto plugin running inside workers) fail with "MCP server + * 'owletto' not found" because `getHttpServer("owletto")` would otherwise + * return undefined — the upstream URL only lives in env, not in any + * agent settings entry. + * + * NOTE: do NOT set `oauth: {}` here. Owletto auth is owned by the + * worker-side `owletto_login` plugin tool (device-code flow). Adding + * `oauth: {}` would trigger the gateway's MCP OAuth auth-code/PKCE + * discovery as a parallel flow, producing two competing login links + * for the user. + */ + private syncOwlettoMcpFromEnv(): void { + if (!this.mcpConfigService) return; + const memoryUrl = process.env.MEMORY_URL?.trim(); + if (!memoryUrl) return; + this.mcpConfigService.upsertGlobalServer("owletto", { + url: memoryUrl, + type: "streamable-http", + }); + } + private async populateStoreFromFiles( store: InMemoryAgentStore, agents: FileLoadedAgent[] @@ -913,6 +939,7 @@ export class CoreServices { } await applyOwlettoMemoryEnvFromProject(this.projectPath); + this.syncOwlettoMcpFromEnv(); // Re-load from disk this.fileLoadedAgents = await loadAgentConfigFromFiles(this.projectPath); diff --git a/packages/worker/src/gateway/sse-client.ts b/packages/worker/src/gateway/sse-client.ts index 3bb87c5a6..f3145b83d 100644 --- a/packages/worker/src/gateway/sse-client.ts +++ b/packages/worker/src/gateway/sse-client.ts @@ -500,13 +500,13 @@ export class GatewayClient { ); } - if (data.userId.toLowerCase() !== this.userId.toLowerCase()) { - logger.warn( - { traceId, receivedUserId: data.userId, expectedUserId: this.userId }, - "Received message for wrong user" - ); - return; - } + // No per-user filtering here: deployment names intentionally hash only + // `platform:channelId:conversationId` (see `generateDeploymentName` in + // base-deployment-manager.ts) so a channel/thread has ONE shared worker + // across all posting users. DMs are single-participant, so a check would + // be dead there too. The WORKER_TOKEN-scoped-to-spawning-user tradeoff + // for shared channel workers is acknowledged and deferred to per-message + // JWT minting — gating here would break the core group-bot design. // Check job type and dispatch accordingly if (data.jobType === "exec") { diff --git a/packages/worker/src/openclaw/worker.ts b/packages/worker/src/openclaw/worker.ts index b3888df9a..ffb2f1b3c 100644 --- a/packages/worker/src/openclaw/worker.ts +++ b/packages/worker/src/openclaw/worker.ts @@ -1253,68 +1253,6 @@ Use it when the user references past discussions or you need context.`); } await startPluginServices(loadedPlugins); - // Proactive owletto login: call owletto_login to check auth status. - // If not authenticated, inject the login link into instructions so the model - // can relay it to the user without needing to call tools itself. - { - const loginTool = pluginTools.find((t) => t.name === "owletto_login"); - if (loginTool) { - logger.info("Checking Owletto auth status via proactive login"); - try { - const loginResult = await loginTool.execute( - "proactive-login", - {}, - undefined, - undefined, - undefined as any - ); - const resultText = - loginResult?.content - ?.filter( - ( - c - ): c is { - type: "text"; - text: string; - } => c.type === "text" && typeof c.text === "string" - ) - .map((c) => c.text) - .join("\n") || ""; - logger.info(`Owletto login result: ${resultText.slice(0, 200)}`); - const parsed = JSON.parse(resultText); - if (parsed.status === "login_started" && parsed.verification_url) { - // Send login link directly to the user — don't rely on the model - const loginMessage = `🔑 Memory requires login. Open this link to connect:\n${parsed.verification_url}${parsed.user_code ? `\nCode: ${parsed.user_code}` : ""}\n\n`; - await onProgress({ - type: "output", - data: loginMessage, - timestamp: Date.now(), - }); - logger.info( - "Proactive owletto login started — login link sent directly to user" - ); - instructionParts.push( - `\n\n## Owletto Login In Progress\nThe login link and code have already been sent directly to the user by the system. Do not repeat the verification URL or code unless the user explicitly asks. Do not call owletto_login again while authentication is pending. If the current request depends on Owletto, tell the user briefly to complete login and reply once done.` - ); - } else if (parsed.status === "already_authenticated") { - logger.info("Owletto already authenticated"); - } else if (parsed.status === "error") { - logger.warn(`Owletto login returned error: ${parsed.message}`); - instructionParts.push( - `\n\n## Owletto Memory Not Connected\nOwletto memory is not connected and login could not be started automatically. Tell the user they need to connect their Owletto memory. If owletto_login tool is available, call it to try again. Otherwise tell them an admin or an existing auth flow is required.` - ); - } - } catch (err) { - logger.warn( - `Proactive owletto login failed: ${err instanceof Error ? err.message : String(err)}` - ); - instructionParts.push( - `\n\n## Owletto Memory Not Connected\nOwletto memory is not connected. Tell the user they need to connect their memory via Owletto. If the owletto_login tool is available, call it to start the login flow.` - ); - } - } - } - // Rebuild final instructions after possible login link injection const finalInstructionsUpdated = instructionParts .filter(Boolean)