Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 99 additions & 0 deletions packages/agent-worker/src/__tests__/sse-client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,105 @@ describe("GatewayClient heartbeat ACKs", () => {
});
});

test("propagates runId and runJobToken from job payload to handleThreadMessage", async () => {
// Regression test for the bug shipped in PR #871: snapshot mode became
// the default, but JobEventSchema was a plain z.object() (default zod
// mode strips unknown keys). MessageConsumer stamps runId + runJobToken
// onto the MessagePayload before SSE dispatch, but those fields were
// silently dropped by safeParse — so every snapshot-mode chat threw
// "WorkerConfig.runId is missing" at boot. Pre-fix: this assertion
// sees `undefined`. Post-fix: the fields survive parsing.
const client = new GatewayClient(
"https://gateway.example.com",
"worker-token",
"user-1",
"worker-1"
);
const handleThreadMessage = mock(async () => undefined);
(client as any).handleThreadMessage = handleThreadMessage;

await (client as any).handleEvent(
"job",
JSON.stringify({
payload: {
botId: "lobu-bot",
userId: "telegram-user-1",
agentId: "default",
conversationId: "telegram:6570514069",
platform: "telegram",
channelId: "6570514069",
messageId: "6570514069:29",
messageText: "hi",
platformMetadata: {},
agentOptions: {},
runId: 12345,
runJobToken: "per-run-jwt-abc",
},
jobId: "12345",
})
);

expect(handleThreadMessage).toHaveBeenCalledTimes(1);
const forwarded = handleThreadMessage.mock.calls[0]?.[0];
expect(forwarded.runId).toBe(12345);
expect(forwarded.runJobToken).toBe("per-run-jwt-abc");
});

test("payloadToWorkerConfig threads runId + runJobToken into WorkerConfig", async () => {
// The dispatch-path bug's second half: even if zod kept the fields,
// payloadToWorkerConfig has to forward them onto the WorkerConfig the
// worker reads at boot (worker.ts:353-360). Lock that mapping in.
const client = new GatewayClient(
"https://gateway.example.com",
"worker-token",
"user-1",
"worker-1"
);
const config = (client as any).payloadToWorkerConfig({
botId: "lobu-bot",
userId: "telegram-user-1",
agentId: "default",
conversationId: "telegram:6570514069",
platform: "telegram",
channelId: "6570514069",
messageId: "6570514069:29",
messageText: "hi",
platformMetadata: {},
agentOptions: {},
runId: 67890,
runJobToken: "per-run-jwt-xyz",
});
expect(config.runId).toBe(67890);
expect(config.runJobToken).toBe("per-run-jwt-xyz");
});

test("payloadToWorkerConfig leaves runId/runJobToken undefined when absent (legacy direct-enqueue path)", async () => {
// Backwards-compat: legacy direct-enqueue paths don't set runId. The
// worker handles this by skipping the snapshot POST when
// LOBU_SESSION_STORE=file. The fields must survive end-to-end as
// `undefined`, not be coerced into NaN/empty-string.
const client = new GatewayClient(
"https://gateway.example.com",
"worker-token",
"user-1",
"worker-1"
);
const config = (client as any).payloadToWorkerConfig({
botId: "lobu-bot",
userId: "user-1",
agentId: "default",
conversationId: "conv",
platform: "api",
channelId: "channel",
messageId: "msg-1",
messageText: "hi",
platformMetadata: {},
agentOptions: {},
});
expect(config.runId).toBeUndefined();
expect(config.runJobToken).toBeUndefined();
});

test("ACKs heartbeat pings over the worker response endpoint", async () => {
const fetchMock = mock(
async (_url: string | URL | Request, _options?: RequestInit) =>
Expand Down
41 changes: 27 additions & 14 deletions packages/agent-worker/src/gateway/sse-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,20 +81,33 @@ const AgentOptionsSchema = z
.passthrough();

const JobEventSchema = z.object({
payload: z.object({
botId: z.string(),
userId: z.string(),
agentId: z.string(),
conversationId: z.string(),
platform: z.string(),
channelId: z.string(),
messageId: z.string(),
messageText: z.string(),
platformMetadata: PlatformMetadataSchema,
agentOptions: AgentOptionsSchema,
jobId: z.string().optional(),
teamId: z.string().optional(), // Optional for WhatsApp (top-level) and Slack (in platformMetadata)
}),
payload: z
.object({
botId: z.string(),
userId: z.string(),
agentId: z.string(),
conversationId: z.string(),
platform: z.string(),
channelId: z.string(),
messageId: z.string(),
messageText: z.string(),
platformMetadata: PlatformMetadataSchema,
agentOptions: AgentOptionsSchema,
jobId: z.string().optional(),
teamId: z.string().optional(), // Optional for WhatsApp (top-level) and Slack (in platformMetadata)
// Threaded through from MessageConsumer's runs-queue claim. The worker
// asserts these in snapshot mode (LOBU_SESSION_STORE != "file") — see
// worker.ts:353-360. The default zod object mode strips unknown keys,
// which silently dropped these fields and broke every Telegram chat
// when snapshot mode became the default in PR #871. Declare them
// explicitly so they survive parsing, and `.passthrough()` keeps any
// future MessagePayload field (mcpConfig, nixConfig, egressConfig,
// preApprovedTools, exec* fields, organizationId, networkConfig...)
// from regressing the same way.
runId: z.number().optional(),
runJobToken: z.string().optional(),
})
.passthrough(),
processedIds: z.array(z.string()).optional(),
});

Expand Down
Loading