diff --git a/packages/agent-worker/src/__tests__/sse-client.test.ts b/packages/agent-worker/src/__tests__/sse-client.test.ts index 8c6dda882..b7bac2d0c 100644 --- a/packages/agent-worker/src/__tests__/sse-client.test.ts +++ b/packages/agent-worker/src/__tests__/sse-client.test.ts @@ -50,6 +50,105 @@ describe("GatewayClient heartbeat ACKs", () => { }); }); + test("propagates runId and runJobToken from job payload to handleThreadMessage", async () => { + // Regression test for the bug shipped in PR #871: snapshot mode became + // the default, but JobEventSchema was a plain z.object() (default zod + // mode strips unknown keys). MessageConsumer stamps runId + runJobToken + // onto the MessagePayload before SSE dispatch, but those fields were + // silently dropped by safeParse — so every snapshot-mode chat threw + // "WorkerConfig.runId is missing" at boot. Pre-fix: this assertion + // sees `undefined`. Post-fix: the fields survive parsing. + const client = new GatewayClient( + "https://gateway.example.com", + "worker-token", + "user-1", + "worker-1" + ); + const handleThreadMessage = mock(async () => undefined); + (client as any).handleThreadMessage = handleThreadMessage; + + await (client as any).handleEvent( + "job", + JSON.stringify({ + payload: { + botId: "lobu-bot", + userId: "telegram-user-1", + agentId: "default", + conversationId: "telegram:6570514069", + platform: "telegram", + channelId: "6570514069", + messageId: "6570514069:29", + messageText: "hi", + platformMetadata: {}, + agentOptions: {}, + runId: 12345, + runJobToken: "per-run-jwt-abc", + }, + jobId: "12345", + }) + ); + + expect(handleThreadMessage).toHaveBeenCalledTimes(1); + const forwarded = handleThreadMessage.mock.calls[0]?.[0]; + expect(forwarded.runId).toBe(12345); + expect(forwarded.runJobToken).toBe("per-run-jwt-abc"); + }); + + test("payloadToWorkerConfig threads runId + runJobToken into WorkerConfig", async () => { + // The dispatch-path bug's second half: even if zod kept the fields, + // payloadToWorkerConfig has to forward them onto the WorkerConfig the + // worker reads at boot (worker.ts:353-360). Lock that mapping in. + const client = new GatewayClient( + "https://gateway.example.com", + "worker-token", + "user-1", + "worker-1" + ); + const config = (client as any).payloadToWorkerConfig({ + botId: "lobu-bot", + userId: "telegram-user-1", + agentId: "default", + conversationId: "telegram:6570514069", + platform: "telegram", + channelId: "6570514069", + messageId: "6570514069:29", + messageText: "hi", + platformMetadata: {}, + agentOptions: {}, + runId: 67890, + runJobToken: "per-run-jwt-xyz", + }); + expect(config.runId).toBe(67890); + expect(config.runJobToken).toBe("per-run-jwt-xyz"); + }); + + test("payloadToWorkerConfig leaves runId/runJobToken undefined when absent (legacy direct-enqueue path)", async () => { + // Backwards-compat: legacy direct-enqueue paths don't set runId. The + // worker handles this by skipping the snapshot POST when + // LOBU_SESSION_STORE=file. The fields must survive end-to-end as + // `undefined`, not be coerced into NaN/empty-string. + const client = new GatewayClient( + "https://gateway.example.com", + "worker-token", + "user-1", + "worker-1" + ); + const config = (client as any).payloadToWorkerConfig({ + botId: "lobu-bot", + userId: "user-1", + agentId: "default", + conversationId: "conv", + platform: "api", + channelId: "channel", + messageId: "msg-1", + messageText: "hi", + platformMetadata: {}, + agentOptions: {}, + }); + expect(config.runId).toBeUndefined(); + expect(config.runJobToken).toBeUndefined(); + }); + test("ACKs heartbeat pings over the worker response endpoint", async () => { const fetchMock = mock( async (_url: string | URL | Request, _options?: RequestInit) => diff --git a/packages/agent-worker/src/gateway/sse-client.ts b/packages/agent-worker/src/gateway/sse-client.ts index cea4c4f23..e9b95f85d 100644 --- a/packages/agent-worker/src/gateway/sse-client.ts +++ b/packages/agent-worker/src/gateway/sse-client.ts @@ -81,20 +81,33 @@ const AgentOptionsSchema = z .passthrough(); const JobEventSchema = z.object({ - payload: z.object({ - botId: z.string(), - userId: z.string(), - agentId: z.string(), - conversationId: z.string(), - platform: z.string(), - channelId: z.string(), - messageId: z.string(), - messageText: z.string(), - platformMetadata: PlatformMetadataSchema, - agentOptions: AgentOptionsSchema, - jobId: z.string().optional(), - teamId: z.string().optional(), // Optional for WhatsApp (top-level) and Slack (in platformMetadata) - }), + payload: z + .object({ + botId: z.string(), + userId: z.string(), + agentId: z.string(), + conversationId: z.string(), + platform: z.string(), + channelId: z.string(), + messageId: z.string(), + messageText: z.string(), + platformMetadata: PlatformMetadataSchema, + agentOptions: AgentOptionsSchema, + jobId: z.string().optional(), + teamId: z.string().optional(), // Optional for WhatsApp (top-level) and Slack (in platformMetadata) + // Threaded through from MessageConsumer's runs-queue claim. The worker + // asserts these in snapshot mode (LOBU_SESSION_STORE != "file") — see + // worker.ts:353-360. The default zod object mode strips unknown keys, + // which silently dropped these fields and broke every Telegram chat + // when snapshot mode became the default in PR #871. Declare them + // explicitly so they survive parsing, and `.passthrough()` keeps any + // future MessagePayload field (mcpConfig, nixConfig, egressConfig, + // preApprovedTools, exec* fields, organizationId, networkConfig...) + // from regressing the same way. + runId: z.number().optional(), + runJobToken: z.string().optional(), + }) + .passthrough(), processedIds: z.array(z.string()).optional(), });