diff --git a/assistant/src/home/__tests__/feed-scheduler.test.ts b/assistant/src/home/__tests__/feed-scheduler.test.ts index 33379b1a888..dd00984c5bd 100644 --- a/assistant/src/home/__tests__/feed-scheduler.test.ts +++ b/assistant/src/home/__tests__/feed-scheduler.test.ts @@ -4,34 +4,35 @@ * Producer implementations are injected via `FeedSchedulerOptions` * spies so the tests never touch `mock.module` (which leaks across * files in Bun's test runner). The dedicated producer tests - * (`reflection-producer.test.ts`, `platform-gmail-digest.test.ts`) - * cover each producer's internal behavior. + * (`rollup-producer.test.ts`, `platform-gmail-digest.test.ts`) cover + * each producer's internal behavior. */ import { afterEach, beforeEach, describe, expect, mock, test } from "bun:test"; import { startFeedScheduler } from "../feed-scheduler.js"; import type { FeedItem } from "../feed-types.js"; -import type { ReflectionResult } from "../reflection-producer.js"; +import type { RollupResult } from "../rollup-producer.js"; const gmailDigestRunner = mock< (now: Date, countSource: () => Promise) => Promise >(async () => null); -const reflectionRunner = mock<(now: Date) => Promise>( - async () => ({ wroteCount: 0, skippedReason: "empty_items" }), -); +const rollupRunner = mock<(now: Date) => Promise>(async () => ({ + wroteCount: 0, + skippedReason: "empty_items", +})); const defaultOptions = () => ({ gmailCountSource: async () => 0, gmailDigestRunner, - reflectionRunner, + rollupRunner, runOnStart: false, }); beforeEach(() => { gmailDigestRunner.mockClear(); - reflectionRunner.mockClear(); + rollupRunner.mockClear(); }); describe("startFeedScheduler", () => { @@ -47,9 +48,9 @@ describe("startFeedScheduler", () => { const summary = await handle.runOnce(new Date("2026-04-14T12:00:00.000Z")); expect(summary.gmailDigestRan).toBe(true); - expect(summary.reflectionRan).toBe(true); + expect(summary.rollupRan).toBe(true); expect(gmailDigestRunner).toHaveBeenCalledTimes(1); - expect(reflectionRunner).toHaveBeenCalledTimes(1); + expect(rollupRunner).toHaveBeenCalledTimes(1); }); test("gmail digest re-runs every tick once its interval has elapsed", async () => { @@ -69,7 +70,7 @@ describe("startFeedScheduler", () => { expect(summary2.gmailDigestRan).toBe(true); }); - test("reflection only re-runs every 30 minutes", async () => { + test("rollup only re-runs every 30 minutes", async () => { handle = startFeedScheduler(defaultOptions()); const t0 = new Date("2026-04-14T12:00:00.000Z"); @@ -78,20 +79,20 @@ describe("startFeedScheduler", () => { // 5 min later — below the 30-min reflection gate. const t1 = new Date("2026-04-14T12:05:00.000Z"); const summary1 = await handle.runOnce(t1); - expect(summary1.reflectionRan).toBe(false); + expect(summary1.rollupRan).toBe(false); // 31 min later — past the 30-min gate, should re-run. const t2 = new Date("2026-04-14T12:31:00.000Z"); const summary2 = await handle.runOnce(t2); - expect(summary2.reflectionRan).toBe(true); + expect(summary2.rollupRan).toBe(true); }); - test("reflection cooldown is NOT advanced on no_provider so the next tick retries", async () => { + test("rollup cooldown is NOT advanced on no_provider so the next tick retries", async () => { // Mimic the daemon startup ordering: the scheduler boots before // the provider registry is ready. The first tick gets no_provider; - // the next tick (even one second later) must still run the - // reflection instead of waiting 30 minutes. - reflectionRunner.mockImplementationOnce(async () => ({ + // the next tick (even one second later) must still run the rollup + // instead of waiting 30 minutes. + rollupRunner.mockImplementationOnce(async () => ({ wroteCount: 0, skippedReason: "no_provider", })); @@ -99,40 +100,59 @@ describe("startFeedScheduler", () => { handle = startFeedScheduler(defaultOptions()); const t0 = new Date("2026-04-14T12:00:00.000Z"); await handle.runOnce(t0); - expect(reflectionRunner).toHaveBeenCalledTimes(1); + expect(rollupRunner).toHaveBeenCalledTimes(1); // One second later — providers have initialized. const t1 = new Date("2026-04-14T12:00:01.000Z"); const summary = await handle.runOnce(t1); - expect(summary.reflectionRan).toBe(true); - expect(reflectionRunner).toHaveBeenCalledTimes(2); + expect(summary.rollupRan).toBe(true); + expect(rollupRunner).toHaveBeenCalledTimes(2); }); - test("reflection cooldown IS advanced on other skip reasons to preserve backoff", async () => { - // empty_items / malformed_output / provider_error are real attempts - // — the next tick should be gated by the full 30-minute window so - // a broken producer doesn't get hammered every tick. - reflectionRunner.mockImplementationOnce(async () => ({ + test("rollup cooldown is NOT advanced on no_actions so the next tick retries", async () => { + // no_actions means the activity log was empty — no LLM call was + // made. A subsequent tick should retry as soon as new actions + // land, not wait the full 30-minute window. + rollupRunner.mockImplementationOnce(async () => ({ + wroteCount: 0, + skippedReason: "no_actions", + })); + + handle = startFeedScheduler(defaultOptions()); + await handle.runOnce(new Date("2026-04-14T12:00:00.000Z")); + expect(rollupRunner).toHaveBeenCalledTimes(1); + + // One second later — the next tick must still invoke the rollup. + const summary = await handle.runOnce(new Date("2026-04-14T12:00:01.000Z")); + expect(summary.rollupRan).toBe(true); + expect(rollupRunner).toHaveBeenCalledTimes(2); + }); + + test("rollup cooldown IS advanced on other skip reasons to preserve backoff", async () => { + // empty_items / malformed_output / provider_error are real LLM + // attempts — the next tick should be gated by the full 30-minute + // window so a broken producer doesn't get hammered every tick. + rollupRunner.mockImplementationOnce(async () => ({ wroteCount: 0, skippedReason: "malformed_output", })); handle = startFeedScheduler(defaultOptions()); await handle.runOnce(new Date("2026-04-14T12:00:00.000Z")); - expect(reflectionRunner).toHaveBeenCalledTimes(1); + expect(rollupRunner).toHaveBeenCalledTimes(1); // Ten minutes later — below the 30-min gate, should NOT re-run. const summary = await handle.runOnce(new Date("2026-04-14T12:10:00.000Z")); - expect(summary.reflectionRan).toBe(false); - expect(reflectionRunner).toHaveBeenCalledTimes(1); + expect(summary.rollupRan).toBe(false); + expect(rollupRunner).toHaveBeenCalledTimes(1); }); test("producer exceptions do not break the tick loop", async () => { gmailDigestRunner.mockImplementationOnce(async () => { throw new Error("boom"); }); - reflectionRunner.mockImplementationOnce(async () => { + rollupRunner.mockImplementationOnce(async () => { throw new Error("also boom"); }); @@ -144,7 +164,7 @@ describe("startFeedScheduler", () => { // the intended behavior — a broken producer shouldn't cause the // scheduler to hammer it every tick via a backoff bypass. expect(summary.gmailDigestRan).toBe(true); - expect(summary.reflectionRan).toBe(true); + expect(summary.rollupRan).toBe(true); }); test("stop() makes subsequent runOnce calls no-op", async () => { diff --git a/assistant/src/home/__tests__/reflection-producer.test.ts b/assistant/src/home/__tests__/reflection-producer.test.ts deleted file mode 100644 index d118b725771..00000000000 --- a/assistant/src/home/__tests__/reflection-producer.test.ts +++ /dev/null @@ -1,291 +0,0 @@ -/** - * Unit tests for the home-feed reflection producer. - * - * All dependencies are injected via `ReflectionProducerDeps` spies so - * the tests never touch `mock.module`, which leaks across files in - * Bun's test runner. The production caller passes `undefined` and the - * producer falls through to the real config loader, relationship-state - * reader, and provider registry. - */ - -import { beforeEach, describe, expect, mock, test } from "bun:test"; - -import type { - ContentBlock, - Provider, - ProviderResponse, -} from "../../providers/types.js"; -import type { WriteAssistantFeedItemParams } from "../assistant-feed-authoring.js"; -import { runReflectionProducer } from "../reflection-producer.js"; - -const writeItem = mock< - (params: WriteAssistantFeedItemParams) => Promise ->(async () => ({})); - -const stubRelationshipState = async () => - ({ - version: 1, - assistantId: "self", - tier: 2, - progressPercent: 42, - facts: [ - { category: "voice", text: "Ships fast, explains the why." }, - { category: "priorities", text: "JARVIS is the current focus." }, - ], - capabilities: [], - conversationCount: 17, - hatchedDate: "2026-03-01T00:00:00.000Z", - assistantName: "Vellum", - userName: "Alex", - updatedAt: "2026-04-14T12:00:00.000Z", - // biome-ignore lint/suspicious/noExplicitAny: the real shape is - // internal-only and we only need the subset the producer reads. - }) as any; - -function makeProvider( - handler: ( - messages: Parameters[0], - tools: Parameters[1], - systemPrompt: Parameters[2], - options: Parameters[3], - ) => Promise, -): Provider { - return { - name: "mock", - sendMessage: handler, - }; -} - -function scriptedProvider(content: ContentBlock[]): Provider { - return makeProvider(async () => ({ - content, - model: "mock-model", - usage: { inputTokens: 0, outputTokens: 0 }, - stopReason: "tool_use", - })); -} - -function throwingProvider(error: Error): Provider { - return makeProvider(async () => { - throw error; - }); -} - -function toolUseContent(input: unknown): ContentBlock { - return { - type: "tool_use", - id: "tu_1", - name: "write_feed_items", - input: input as Record, - }; -} - -beforeEach(() => { - writeItem.mockClear(); -}); - -describe("runReflectionProducer", () => { - test("writes each item returned in the write_feed_items tool call", async () => { - const provider = scriptedProvider([ - toolUseContent({ - items: [ - { - type: "nudge", - source: "assistant", - title: "Follow up on Figma", - summary: "Noa shared a file Thursday. No reply yet.", - priority: 70, - minTimeAway: 3600, - }, - { - type: "thread", - source: "assistant", - title: "Hiring loop", - summary: "2 of 6 interviewed; pipeline is stalling.", - priority: 55, - }, - ], - }), - ]); - - const result = await runReflectionProducer(new Date(), { - writeItem, - loadRelationshipState: stubRelationshipState, - resolveProvider: () => provider, - }); - - expect(result.skippedReason).toBeNull(); - expect(result.wroteCount).toBe(2); - expect(writeItem).toHaveBeenCalledTimes(2); - const firstCall = writeItem.mock.calls[0]![0]; - expect(firstCall.type).toBe("nudge"); - expect(firstCall.title).toBe("Follow up on Figma"); - expect(firstCall.priority).toBe(70); - expect(firstCall.minTimeAway).toBe(3600); - }); - - test("returns empty_items when the model emits an empty array", async () => { - const provider = scriptedProvider([toolUseContent({ items: [] })]); - - const result = await runReflectionProducer(new Date(), { - writeItem, - loadRelationshipState: stubRelationshipState, - resolveProvider: () => provider, - }); - - expect(result.skippedReason).toBe("empty_items"); - expect(result.wroteCount).toBe(0); - expect(writeItem).not.toHaveBeenCalled(); - }); - - test("caps the batch at MAX_ITEMS_PER_REFLECTION (3)", async () => { - const provider = scriptedProvider([ - toolUseContent({ - items: [ - { type: "nudge", title: "One", summary: "One summary" }, - { type: "nudge", title: "Two", summary: "Two summary" }, - { type: "nudge", title: "Three", summary: "Three summary" }, - { - type: "nudge", - title: "Four", - summary: "Four summary — should be dropped.", - }, - ], - }), - ]); - - const result = await runReflectionProducer(new Date(), { - writeItem, - loadRelationshipState: stubRelationshipState, - resolveProvider: () => provider, - }); - - expect(result.wroteCount).toBe(3); - expect(writeItem).toHaveBeenCalledTimes(3); - }); - - test("reports malformed_output when every item in a non-empty batch fails coercion", async () => { - const provider = scriptedProvider([ - toolUseContent({ - items: [ - { type: "nudge", title: "", summary: "empty title, rejected" }, - { type: "bogus", title: "bad type", summary: "also rejected" }, - { type: "nudge", title: "valid title" }, // missing summary - ], - }), - ]); - - const result = await runReflectionProducer(new Date(), { - writeItem, - loadRelationshipState: stubRelationshipState, - resolveProvider: () => provider, - }); - - expect(result.skippedReason).toBe("malformed_output"); - expect(result.wroteCount).toBe(0); - expect(writeItem).not.toHaveBeenCalled(); - }); - - test("rejects malformed items but keeps valid ones in the same batch", async () => { - const provider = scriptedProvider([ - toolUseContent({ - items: [ - { - type: "nudge", - title: "", // empty title — rejected - summary: "Valid summary", - }, - { - type: "bogus-type", // invalid type — rejected - title: "Valid title", - summary: "Valid summary", - }, - { - type: "thread", - title: "Good item", - summary: "This one should land.", - }, - ], - }), - ]); - - const result = await runReflectionProducer(new Date(), { - writeItem, - loadRelationshipState: stubRelationshipState, - resolveProvider: () => provider, - }); - - expect(result.wroteCount).toBe(1); - expect(writeItem).toHaveBeenCalledTimes(1); - expect(writeItem.mock.calls[0]![0].title).toBe("Good item"); - }); - - test("returns no_provider when the resolver returns null", async () => { - const result = await runReflectionProducer(new Date(), { - writeItem, - loadRelationshipState: stubRelationshipState, - resolveProvider: () => null, - }); - - expect(result.skippedReason).toBe("no_provider"); - expect(result.wroteCount).toBe(0); - expect(writeItem).not.toHaveBeenCalled(); - }); - - test("returns provider_error when sendMessage throws", async () => { - const provider = throwingProvider(new Error("network down")); - - const result = await runReflectionProducer(new Date(), { - writeItem, - loadRelationshipState: stubRelationshipState, - resolveProvider: () => provider, - }); - - expect(result.skippedReason).toBe("provider_error"); - expect(result.wroteCount).toBe(0); - }); - - test("returns malformed_output when the response has no matching tool_use block", async () => { - const provider = scriptedProvider([{ type: "text", text: "just prose" }]); - - const result = await runReflectionProducer(new Date(), { - writeItem, - loadRelationshipState: stubRelationshipState, - resolveProvider: () => provider, - }); - - expect(result.skippedReason).toBe("malformed_output"); - expect(result.wroteCount).toBe(0); - }); - - test("clamps priority to the valid [0, 100] window", async () => { - const provider = scriptedProvider([ - toolUseContent({ - items: [ - { - type: "nudge", - title: "High priority", - summary: "Model returned 150", - priority: 150, - }, - { - type: "nudge", - title: "Low priority", - summary: "Model returned -5", - priority: -5, - }, - ], - }), - ]); - - await runReflectionProducer(new Date(), { - writeItem, - loadRelationshipState: stubRelationshipState, - resolveProvider: () => provider, - }); - - expect(writeItem).toHaveBeenCalledTimes(2); - expect(writeItem.mock.calls[0]![0].priority).toBe(100); - expect(writeItem.mock.calls[1]![0].priority).toBe(0); - }); -}); diff --git a/assistant/src/home/__tests__/rollup-producer.test.ts b/assistant/src/home/__tests__/rollup-producer.test.ts new file mode 100644 index 00000000000..1e0eeea1e27 --- /dev/null +++ b/assistant/src/home/__tests__/rollup-producer.test.ts @@ -0,0 +1,398 @@ +/** + * Unit tests for the home-feed rollup producer. + * + * All dependencies are injected via `RollupProducerDeps` spies so the + * tests never touch `mock.module`, which leaks across files in Bun's + * test runner. The production caller passes `undefined` and the + * producer falls through to the real config loader, feed reader, + * relationship-state reader, and provider registry. + */ + +import { beforeEach, describe, expect, mock, test } from "bun:test"; + +import type { + ContentBlock, + Provider, + ProviderResponse, +} from "../../providers/types.js"; +import type { WriteAssistantFeedItemParams } from "../assistant-feed-authoring.js"; +import type { FeedItem } from "../feed-types.js"; +import { runRollupProducer } from "../rollup-producer.js"; + +const writeItem = mock< + (params: WriteAssistantFeedItemParams) => Promise +>(async () => ({})); + +const stubRelationshipState = async () => + ({ + version: 1, + assistantId: "self", + tier: 2, + progressPercent: 42, + facts: [ + { category: "voice", text: "Ships fast, explains the why." }, + { category: "priorities", text: "JARVIS is the current focus." }, + ], + capabilities: [], + conversationCount: 17, + hatchedDate: "2026-03-01T00:00:00.000Z", + assistantName: "Vellum", + userName: "Alex", + updatedAt: "2026-04-14T12:00:00.000Z", + // biome-ignore lint/suspicious/noExplicitAny: the real shape is + // internal-only and we only need the subset the producer reads. + }) as any; + +function makeAction(overrides: Partial & { id: string }): FeedItem { + return { + id: overrides.id, + type: "action", + priority: 50, + title: overrides.title ?? "Action title", + summary: overrides.summary ?? "Action summary.", + timestamp: overrides.timestamp ?? "2026-04-14T12:00:00.000Z", + status: overrides.status ?? "new", + author: overrides.author ?? "assistant", + createdAt: overrides.createdAt ?? "2026-04-14T12:00:00.000Z", + source: overrides.source, + expiresAt: overrides.expiresAt, + minTimeAway: overrides.minTimeAway, + actions: overrides.actions, + }; +} + +const stubLoadRecentActions = (items: FeedItem[]) => () => items; + +function makeProvider( + handler: ( + messages: Parameters[0], + tools: Parameters[1], + systemPrompt: Parameters[2], + options: Parameters[3], + ) => Promise, +): Provider { + return { + name: "mock", + sendMessage: handler, + }; +} + +function scriptedProvider(content: ContentBlock[]): Provider { + return makeProvider(async () => ({ + content, + model: "mock-model", + usage: { inputTokens: 0, outputTokens: 0 }, + stopReason: "tool_use", + })); +} + +function throwingProvider(error: Error): Provider { + return makeProvider(async () => { + throw error; + }); +} + +function toolUseContent(input: unknown): ContentBlock { + return { + type: "tool_use", + id: "tu_1", + name: "write_feed_items", + input: input as Record, + }; +} + +const oneAction: FeedItem[] = [ + makeAction({ + id: "a1", + source: "gmail", + title: "Replied to Alice", + summary: "Sent a reply to alice@example.com.", + createdAt: "2026-04-14T11:30:00.000Z", + }), +]; + +beforeEach(() => { + writeItem.mockClear(); +}); + +describe("runRollupProducer", () => { + test("writes each digest/thread returned in the tool call", async () => { + const provider = scriptedProvider([ + toolUseContent({ + items: [ + { + type: "digest", + source: "gmail", + title: "3 replies sent this morning", + summary: "Replied to Alice, Bob, and Carol over the past hour.", + priority: 70, + }, + { + type: "thread", + source: "assistant", + title: "Outreach sequence 'Q2 renewals'", + summary: "Step 1 sent to 2 of 5 contacts; awaiting replies.", + priority: 55, + }, + ], + }), + ]); + + const result = await runRollupProducer(new Date(), { + writeItem, + loadRelationshipState: stubRelationshipState, + loadRecentActions: stubLoadRecentActions(oneAction), + resolveProvider: () => provider, + }); + + expect(result.skippedReason).toBeNull(); + expect(result.wroteCount).toBe(2); + expect(writeItem).toHaveBeenCalledTimes(2); + const firstCall = writeItem.mock.calls[0]![0]; + expect(firstCall.type).toBe("digest"); + expect(firstCall.title).toBe("3 replies sent this morning"); + expect(firstCall.priority).toBe(70); + }); + + test("returns no_actions when the activity log is empty", async () => { + // When there's nothing to roll up, we don't even hit the provider. + // The scheduler uses this to avoid advancing the cooldown gate. + const provider = scriptedProvider([toolUseContent({ items: [] })]); + const providerSpy = mock(provider.sendMessage); + provider.sendMessage = providerSpy; + + const result = await runRollupProducer(new Date(), { + writeItem, + loadRelationshipState: stubRelationshipState, + loadRecentActions: stubLoadRecentActions([]), + resolveProvider: () => provider, + }); + + expect(result.skippedReason).toBe("no_actions"); + expect(result.wroteCount).toBe(0); + expect(providerSpy).not.toHaveBeenCalled(); + expect(writeItem).not.toHaveBeenCalled(); + }); + + test("serializes recent actions into the user prompt", async () => { + let capturedPrompt = ""; + const provider = makeProvider(async (messages) => { + capturedPrompt = messages + .flatMap((m) => m.content) + .filter((b): b is { type: "text"; text: string } => b.type === "text") + .map((b) => b.text) + .join("\n"); + return { + content: [toolUseContent({ items: [] })], + model: "mock-model", + usage: { inputTokens: 0, outputTokens: 0 }, + stopReason: "tool_use", + }; + }); + + const actions: FeedItem[] = [ + makeAction({ + id: "a1", + source: "gmail", + title: "Replied to Alice", + summary: "Sent a reply to alice@example.com.", + createdAt: "2026-04-14T11:30:00.000Z", + }), + makeAction({ + id: "a2", + source: "slack", + title: "Posted in #general", + summary: "Answered a question about the deploy.", + createdAt: "2026-04-14T11:45:00.000Z", + }), + ]; + + await runRollupProducer(new Date(), { + writeItem, + loadRelationshipState: stubRelationshipState, + loadRecentActions: stubLoadRecentActions(actions), + resolveProvider: () => provider, + }); + + expect(capturedPrompt).toContain("Replied to Alice"); + expect(capturedPrompt).toContain("alice@example.com"); + expect(capturedPrompt).toContain("Posted in #general"); + expect(capturedPrompt).toContain("[gmail]"); + expect(capturedPrompt).toContain("[slack]"); + }); + + test("returns empty_items when the model emits an empty items array", async () => { + const provider = scriptedProvider([toolUseContent({ items: [] })]); + + const result = await runRollupProducer(new Date(), { + writeItem, + loadRelationshipState: stubRelationshipState, + loadRecentActions: stubLoadRecentActions(oneAction), + resolveProvider: () => provider, + }); + + expect(result.skippedReason).toBe("empty_items"); + expect(result.wroteCount).toBe(0); + expect(writeItem).not.toHaveBeenCalled(); + }); + + test("caps the batch at MAX_ITEMS_PER_ROLLUP (3)", async () => { + const provider = scriptedProvider([ + toolUseContent({ + items: [ + { type: "digest", title: "One", summary: "One summary" }, + { type: "digest", title: "Two", summary: "Two summary" }, + { type: "thread", title: "Three", summary: "Three summary" }, + { + type: "digest", + title: "Four", + summary: "Four summary — should be dropped.", + }, + ], + }), + ]); + + const result = await runRollupProducer(new Date(), { + writeItem, + loadRelationshipState: stubRelationshipState, + loadRecentActions: stubLoadRecentActions(oneAction), + resolveProvider: () => provider, + }); + + expect(result.wroteCount).toBe(3); + expect(writeItem).toHaveBeenCalledTimes(3); + }); + + test("rejects nudge and action types at coercion time", async () => { + // The tool schema narrows `type` to digest/thread, but the runtime + // coercion enforces it too so a drifted model can't sneak through. + const provider = scriptedProvider([ + toolUseContent({ + items: [ + { + type: "nudge", + title: "Should be rejected", + summary: "Rollup must never emit nudges.", + }, + { + type: "action", + title: "Also rejected", + summary: "Rollup must never emit actions.", + }, + { + type: "digest", + title: "Valid digest", + summary: "This one should land.", + }, + ], + }), + ]); + + const result = await runRollupProducer(new Date(), { + writeItem, + loadRelationshipState: stubRelationshipState, + loadRecentActions: stubLoadRecentActions(oneAction), + resolveProvider: () => provider, + }); + + expect(result.wroteCount).toBe(1); + expect(writeItem).toHaveBeenCalledTimes(1); + expect(writeItem.mock.calls[0]![0].title).toBe("Valid digest"); + }); + + test("reports malformed_output when every item in a non-empty batch fails coercion", async () => { + const provider = scriptedProvider([ + toolUseContent({ + items: [ + { type: "digest", title: "", summary: "empty title, rejected" }, + { type: "bogus", title: "bad type", summary: "also rejected" }, + { type: "digest", title: "valid title" }, // missing summary + ], + }), + ]); + + const result = await runRollupProducer(new Date(), { + writeItem, + loadRelationshipState: stubRelationshipState, + loadRecentActions: stubLoadRecentActions(oneAction), + resolveProvider: () => provider, + }); + + expect(result.skippedReason).toBe("malformed_output"); + expect(result.wroteCount).toBe(0); + expect(writeItem).not.toHaveBeenCalled(); + }); + + test("returns no_provider when the resolver returns null", async () => { + const result = await runRollupProducer(new Date(), { + writeItem, + loadRelationshipState: stubRelationshipState, + loadRecentActions: stubLoadRecentActions(oneAction), + resolveProvider: () => null, + }); + + expect(result.skippedReason).toBe("no_provider"); + expect(result.wroteCount).toBe(0); + expect(writeItem).not.toHaveBeenCalled(); + }); + + test("returns provider_error when sendMessage throws", async () => { + const provider = throwingProvider(new Error("network down")); + + const result = await runRollupProducer(new Date(), { + writeItem, + loadRelationshipState: stubRelationshipState, + loadRecentActions: stubLoadRecentActions(oneAction), + resolveProvider: () => provider, + }); + + expect(result.skippedReason).toBe("provider_error"); + expect(result.wroteCount).toBe(0); + }); + + test("returns malformed_output when the response has no matching tool_use block", async () => { + const provider = scriptedProvider([{ type: "text", text: "just prose" }]); + + const result = await runRollupProducer(new Date(), { + writeItem, + loadRelationshipState: stubRelationshipState, + loadRecentActions: stubLoadRecentActions(oneAction), + resolveProvider: () => provider, + }); + + expect(result.skippedReason).toBe("malformed_output"); + expect(result.wroteCount).toBe(0); + }); + + test("clamps priority to the valid [0, 100] window", async () => { + const provider = scriptedProvider([ + toolUseContent({ + items: [ + { + type: "digest", + title: "High priority", + summary: "Model returned 150", + priority: 150, + }, + { + type: "thread", + title: "Low priority", + summary: "Model returned -5", + priority: -5, + }, + ], + }), + ]); + + await runRollupProducer(new Date(), { + writeItem, + loadRelationshipState: stubRelationshipState, + loadRecentActions: stubLoadRecentActions(oneAction), + resolveProvider: () => provider, + }); + + expect(writeItem).toHaveBeenCalledTimes(2); + expect(writeItem.mock.calls[0]![0].priority).toBe(100); + expect(writeItem.mock.calls[1]![0].priority).toBe(0); + }); +}); diff --git a/assistant/src/home/feed-scheduler.ts b/assistant/src/home/feed-scheduler.ts index 1c001e905d4..2f9f2be714e 100644 --- a/assistant/src/home/feed-scheduler.ts +++ b/assistant/src/home/feed-scheduler.ts @@ -2,7 +2,7 @@ * Home activity feed scheduler. * * Periodic tick loop that drives the feed producers — the assistant - * reflection loop and the platform-baseline Gmail digest generator. This + * roll-up loop and the platform-baseline Gmail digest generator. This * is the layer that turns the Phase-5 scaffolding into a live feed: the * producers exist as standalone functions, the writer knows how to * persist their output, and the HTTP route + SSE pipeline surface it @@ -17,7 +17,7 @@ * - Each producer tracks its own "last ran at" timestamp and decides * whether to run on each tick. This keeps the tick cadence short * (so cheap producers refresh often) while expensive producers - * (LLM reflection) self-throttle independently. + * (LLM roll-up) self-throttle independently. * * - Fire-and-forget: every producer failure is logged and swallowed. * A broken producer must never break the tick loop or the daemon. @@ -34,10 +34,7 @@ import { generateGmailDigest, type GmailCountSource, } from "./platform-gmail-digest.js"; -import { - type ReflectionResult, - runReflectionProducer, -} from "./reflection-producer.js"; +import { type RollupResult, runRollupProducer } from "./rollup-producer.js"; const log = getLogger("home-feed-scheduler"); @@ -46,7 +43,7 @@ const TICK_INTERVAL_MS = 5 * 60 * 1000; /** Per-producer minimum gap between runs. */ const GMAIL_DIGEST_INTERVAL_MS = 5 * 60 * 1000; -const REFLECTION_INTERVAL_MS = 30 * 60 * 1000; +const ROLLUP_INTERVAL_MS = 30 * 60 * 1000; export interface FeedSchedulerHandle { /** Stops the interval. Safe to call multiple times. */ @@ -60,7 +57,7 @@ export interface FeedSchedulerHandle { export interface FeedTickSummary { gmailDigestRan: boolean; - reflectionRan: boolean; + rollupRan: boolean; } export interface FeedSchedulerOptions { @@ -88,7 +85,7 @@ export interface FeedSchedulerOptions { now: Date, countSource: GmailCountSource, ) => Promise; - reflectionRunner?: (now: Date) => Promise; + rollupRunner?: (now: Date) => Promise; } /** @@ -101,17 +98,17 @@ export function startFeedScheduler( let stopped = false; let tickRunning = false; let lastGmailDigestAt = 0; - let lastReflectionAt = 0; + let lastRollupAt = 0; const gmailCountSource = options.gmailCountSource ?? defaultGmailCountSource; const gmailDigestRunner = options.gmailDigestRunner ?? generateGmailDigest; - const reflectionRunner = - options.reflectionRunner ?? ((now: Date) => runReflectionProducer(now)); + const rollupRunner = + options.rollupRunner ?? ((now: Date) => runRollupProducer(now)); const tick = async (now: Date = new Date()): Promise => { const summary: FeedTickSummary = { gmailDigestRan: false, - reflectionRan: false, + rollupRan: false, }; if (stopped || tickRunning) return summary; tickRunning = true; @@ -135,37 +132,43 @@ export function startFeedScheduler( } } - if (nowMs - lastReflectionAt >= REFLECTION_INTERVAL_MS) { - summary.reflectionRan = true; + if (nowMs - lastRollupAt >= ROLLUP_INTERVAL_MS) { + summary.rollupRan = true; const startedAt = Date.now(); try { - const result = await reflectionRunner(now); + const result = await rollupRunner(now); log.info( { wroteCount: result.wroteCount, skippedReason: result.skippedReason, durationMs: Date.now() - startedAt, }, - "Reflection producer ran", + "Rollup producer ran", ); // Only advance the cooldown gate when the producer actually - // had a chance to run. A `no_provider` skip means the provider - // registry wasn't ready yet (this happens on the startup tick - // because the feed scheduler boots before the provider init - // pass in `daemon/lifecycle.ts`); advancing the gate in that - // case would burn the 30-minute window and delay the first - // real reflection. Every other outcome — success, empty - // items, malformed output, provider error — is a real - // attempt and does advance the gate so a broken producer - // doesn't hammer us every tick. - if (result.skippedReason !== "no_provider") { - lastReflectionAt = nowMs; + // had a chance to run the LLM. Two skip reasons short-circuit + // before any provider call and should NOT burn the window: + // - `no_provider`: the provider registry wasn't ready yet + // (happens on the startup tick because the feed scheduler + // boots before the provider init pass in + // `daemon/lifecycle.ts`). + // - `no_actions`: there was nothing to roll up. A subsequent + // tick should retry as soon as new actions land, not wait + // the full window. + // Every other outcome — success, empty items, malformed + // output, provider error — is a real LLM attempt and does + // advance the gate so a broken producer doesn't hammer us. + if ( + result.skippedReason !== "no_provider" && + result.skippedReason !== "no_actions" + ) { + lastRollupAt = nowMs; } } catch (err) { - lastReflectionAt = nowMs; + lastRollupAt = nowMs; log.warn( { err, durationMs: Date.now() - startedAt }, - "Reflection producer threw", + "Rollup producer threw", ); } } @@ -190,7 +193,7 @@ export function startFeedScheduler( { tickIntervalMs: TICK_INTERVAL_MS, gmailDigestIntervalMs: GMAIL_DIGEST_INTERVAL_MS, - reflectionIntervalMs: REFLECTION_INTERVAL_MS, + rollupIntervalMs: ROLLUP_INTERVAL_MS, }, "Home feed scheduler started", ); diff --git a/assistant/src/home/reflection-producer.ts b/assistant/src/home/rollup-producer.ts similarity index 50% rename from assistant/src/home/reflection-producer.ts rename to assistant/src/home/rollup-producer.ts index 1dedac7400a..ca5e790c239 100644 --- a/assistant/src/home/reflection-producer.ts +++ b/assistant/src/home/rollup-producer.ts @@ -1,26 +1,36 @@ /** - * Assistant reflection producer for the home activity feed. + * Activity-log roll-up producer for the home feed. * - * On a tick, asks the configured inference provider "given this - * relationship state, is there anything worth nudging the user about - * right now?" and emits 0–N assistant-authored feed items. Mirrors the - * background-inference pattern established in `approval-generators.ts`: - * resolve the provider from config, call `provider.sendMessage` with a - * `tool_use`-shaped structured output schema, validate each returned - * block, and hand the validated shapes to `writeAssistantFeedItem`. + * On a tick, reads the recent `action` items that background jobs have + * deposited in the feed (via `emit-feed-event.ts`) and asks the + * configured inference provider "consolidate these raw actions into a + * small set of digests or threads." This is the replacement for the + * old "reflect from nothing" producer: the roll-up starts from real + * side effects instead of prompting the model to hallucinate signal + * from relationship state alone. + * + * Mirrors the background-inference pattern established in + * `approval-generators.ts`: resolve the provider from config, call + * `provider.sendMessage` with a `tool_use`-shaped structured output + * schema, validate each returned block, and hand the validated + * shapes to `writeAssistantFeedItem`. * * Budget notes: * - * - Hard cap: {@link MAX_ITEMS_PER_REFLECTION} items per tick so a + * - Hard cap: {@link MAX_ITEMS_PER_ROLLUP} items per tick so a * single run can never flood the feed. - * - Timeout: {@link REFLECTION_TIMEOUT_MS} so a stalled provider - * can't stall the tick loop. - * - Token budget: {@link REFLECTION_MAX_TOKENS} — tight, because - * the output is a list of short feed items, not a long essay. + * - Timeout: {@link ROLLUP_TIMEOUT_MS} so a stalled provider can't + * stall the tick loop. + * - Token budget: {@link ROLLUP_MAX_TOKENS} — tight, because the + * output is a list of short feed items, not a long essay. + * - Input cap: at most {@link MAX_ACTIONS_IN_PROMPT} recent action + * items are serialized into the user prompt. Callers' volume is + * already bounded by the writer's per-source action cap, but + * this second cap protects against pathological inputs. * * Failure modes degrade gracefully: an unavailable provider, a * malformed tool_use block, a schema-rejected item, or an exception - * in the inner loop all return a {@link ReflectionResult} with the + * in the inner loop all return a {@link RollupResult} with the * appropriate `skippedReason`. The scheduler logs these but never * surfaces them to the user. */ @@ -33,55 +43,61 @@ import { writeAssistantFeedItem, type WriteAssistantFeedItemParams, } from "./assistant-feed-authoring.js"; +import type { FeedItem } from "./feed-types.js"; +import { readHomeFeed } from "./feed-writer.js"; import { computeRelationshipState } from "./relationship-state-writer.js"; -const log = getLogger("home-feed-reflection"); +const log = getLogger("home-feed-rollup"); -const REFLECTION_TIMEOUT_MS = 30_000; -const REFLECTION_MAX_TOKENS = 800; -const MAX_ITEMS_PER_REFLECTION = 3; +const ROLLUP_TIMEOUT_MS = 30_000; +const ROLLUP_MAX_TOKENS = 800; +const MAX_ITEMS_PER_ROLLUP = 3; +const MAX_ACTIONS_IN_PROMPT = 30; -const REFLECTION_TOOL_NAME = "write_feed_items"; +const ROLLUP_TOOL_NAME = "write_feed_items"; -const REFLECTION_SYSTEM_PROMPT = [ - "You are a background reflection loop for a personal assistant.", - "Your job is to decide whether there is anything worth surfacing to the user on their Home page right now.", +const ROLLUP_SYSTEM_PROMPT = [ + "You are a roll-up loop for a personal assistant's home activity feed.", + "Raw `action` items land in the feed as a deterministic side effect of background jobs.", + "Your job is to CONSOLIDATE those raw actions into higher-signal summary rows — never to invent signal from nothing.", "", "Rules:", - "- Emit nudges only when there is a CLEAR, SPECIFIC, ACTIONABLE reason.", - "- Never emit generic filler like 'stay productive' or 'here's a suggestion'.", - "- Never repeat the same nudge on consecutive runs — trust the writer's one-per-source replacement to dedupe.", - "- Prefer 0 items over low-quality items.", + "- Emit only `digest` or `thread` items. Do NOT emit `action` items — those come from the background jobs themselves.", + "- A `digest` collapses several related raw actions into one summary row (e.g. '3 scheduled jobs ran this morning').", + "- A `thread` tracks an ongoing multi-action situation worth surfacing (e.g. 'Outreach to Alice — 2 emails sent, awaiting reply').", + "- Each digest/thread must be grounded in specific action items from the list below. Do not invent events.", + "- Never duplicate a consolidation that already describes the same set of actions — the writer's one-per-source replacement for digests will collapse repeats but you shouldn't rely on it.", + "- Prefer 0 items over low-signal filler. An empty activity log should always produce 0 items.", "- You may emit up to 3 items total.", "", - "Use the `write_feed_items` tool to emit items. If nothing is worth surfacing, call the tool with an empty `items` array.", + "Use the `write_feed_items` tool to emit items. If nothing is worth rolling up, call the tool with an empty `items` array.", ].join("\n"); -const REFLECTION_TOOL_SCHEMA = { - name: REFLECTION_TOOL_NAME, +const ROLLUP_TOOL_SCHEMA = { + name: ROLLUP_TOOL_NAME, description: - "Record the set of feed items to surface on the user's Home page for this reflection tick. " + - "Pass an empty `items` array if nothing is worth surfacing right now.", + "Record the set of roll-up feed items (digests or threads) that consolidate recent activity-log actions. " + + "Pass an empty `items` array if nothing is worth rolling up right now.", input_schema: { type: "object" as const, properties: { items: { type: "array", - maxItems: MAX_ITEMS_PER_REFLECTION, + maxItems: MAX_ITEMS_PER_ROLLUP, items: { type: "object", properties: { type: { type: "string", - enum: ["nudge", "digest", "action", "thread"], + enum: ["digest", "thread"], description: - "The visual shape: nudge (card with action buttons), digest (summary row), action (already-done announcement), thread (ongoing situation).", + "`digest` collapses multiple related actions into one summary row; `thread` tracks an ongoing multi-action situation.", }, source: { type: "string", enum: ["gmail", "slack", "calendar", "assistant"], description: - "Origin hint used for the icon. Use 'assistant' when the nudge is self-initiated.", + "Origin hint used for the icon. Use `assistant` for cross-source roll-ups of assistant-driven work.", }, title: { type: "string", @@ -91,20 +107,20 @@ const REFLECTION_TOOL_SCHEMA = { summary: { type: "string", description: - "One-sentence body copy explaining the nudge. 1–25 words.", + "One-sentence body copy explaining the roll-up. 1–25 words. Must reference specific actions from the activity log.", }, priority: { type: "integer", minimum: 0, maximum: 100, description: - "Relative importance (higher = more prominent). Use 70 for normal nudges, 85 for time-sensitive, 55 for background threads.", + "Relative importance (higher = more prominent). Use 70 for digests, 55 for background threads.", }, minTimeAway: { type: "integer", minimum: 0, description: - "Seconds the user must be away before this item appears. Use 3600 (1h) for nudges, 0 for threads the user should see immediately.", + "Seconds the user must be away before this item appears. Use 0 for a roll-up the user should see immediately.", }, }, required: ["type", "title", "summary"], @@ -117,16 +133,21 @@ const REFLECTION_TOOL_SCHEMA = { }, }; -export interface ReflectionResult { +export interface RollupResult { /** Number of items actually written to the feed. */ wroteCount: number; /** * When non-null, indicates the producer short-circuited and no LLM * call was made (or the call's result was unusable). The scheduler * logs this but does not treat it as an error. + * + * `no_actions` means there was nothing to roll up — a quiet but + * normal outcome that does not advance the cooldown gate (no point + * re-running until new actions land). */ skippedReason: | "no_provider" + | "no_actions" | "empty_items" | "provider_error" | "malformed_output" @@ -139,27 +160,29 @@ export interface ReflectionResult { * stubs to avoid `mock.module`, which leaks across files in Bun's * test runner and causes cross-file isolation bugs. */ -export interface ReflectionProducerDeps { +export interface RollupProducerDeps { writeItem?: (params: WriteAssistantFeedItemParams) => Promise; loadRelationshipState?: () => Promise< Awaited> >; + loadRecentActions?: () => FeedItem[]; resolveProvider?: () => Provider | null; } /** - * Run one reflection pass. Loads the current relationship state, - * builds a user prompt around it, asks the provider for a - * `write_feed_items` tool call, and invokes + * Run one roll-up pass. Loads recent action items from the feed plus + * relationship state, builds a user prompt around them, asks the + * provider for a `write_feed_items` tool call, and invokes * {@link writeAssistantFeedItem} for each item in the returned array. */ -export async function runReflectionProducer( +export async function runRollupProducer( now: Date = new Date(), - deps: ReflectionProducerDeps = {}, -): Promise { + deps: RollupProducerDeps = {}, +): Promise { const writeItem = deps.writeItem ?? writeAssistantFeedItem; const loadRelationshipState = deps.loadRelationshipState ?? computeRelationshipState; + const loadRecentActions = deps.loadRecentActions ?? defaultLoadRecentActions; const provider = deps.resolveProvider ? deps.resolveProvider() @@ -168,27 +191,32 @@ export async function runReflectionProducer( return { wroteCount: 0, skippedReason: "no_provider" }; } + const actions = loadRecentActions(); + if (actions.length === 0) { + return { wroteCount: 0, skippedReason: "no_actions" }; + } + const state = await loadRelationshipState(); - const userPrompt = buildUserPrompt(state, now); + const userPrompt = buildUserPrompt(actions, state, now); let response; try { response = await provider.sendMessage( [{ role: "user", content: [{ type: "text", text: userPrompt }] }], - [REFLECTION_TOOL_SCHEMA], - REFLECTION_SYSTEM_PROMPT, + [ROLLUP_TOOL_SCHEMA], + ROLLUP_SYSTEM_PROMPT, { - config: { max_tokens: REFLECTION_MAX_TOKENS }, - signal: AbortSignal.timeout(REFLECTION_TIMEOUT_MS), + config: { max_tokens: ROLLUP_MAX_TOKENS }, + signal: AbortSignal.timeout(ROLLUP_TIMEOUT_MS), }, ); } catch (err) { - log.warn({ err }, "Reflection provider.sendMessage failed"); + log.warn({ err }, "Rollup provider.sendMessage failed"); return { wroteCount: 0, skippedReason: "provider_error" }; } const toolUse = response.content.find( - (block) => block.type === "tool_use" && block.name === REFLECTION_TOOL_NAME, + (block) => block.type === "tool_use" && block.name === ROLLUP_TOOL_NAME, ); if (!toolUse || toolUse.type !== "tool_use") { return { wroteCount: 0, skippedReason: "malformed_output" }; @@ -200,10 +228,10 @@ export async function runReflectionProducer( return { wroteCount: 0, skippedReason: "empty_items" }; } - const capped = rawItems.slice(0, MAX_ITEMS_PER_REFLECTION); + const capped = rawItems.slice(0, MAX_ITEMS_PER_ROLLUP); const accepted: WriteAssistantFeedItemParams[] = []; for (const raw of capped) { - const params = coerceReflectionItem(raw); + const params = coerceRollupItem(raw); if (params) accepted.push(params); } @@ -224,7 +252,7 @@ export async function runReflectionProducer( // Schema rejection is a model-output bug, not a regression in the // writer — log and keep going so a single malformed item doesn't // block the rest of the batch. - log.warn({ err, params }, "Failed to write reflection item"); + log.warn({ err, params }, "Failed to write rollup item"); } } @@ -240,17 +268,48 @@ function resolveDefaultProvider(): ReturnType | null { } /** - * Build the user-prompt context for one reflection pass. Kept small: - * the system prompt already enumerates the rules, and extra context - * mostly encourages hallucination. Only include fields that meaningfully - * change what's worth surfacing. + * Default recent-actions loader. Reads the TTL-filtered home feed, + * keeps only `action` items, and returns them sorted by `createdAt` + * descending so the most recent signals land at the top of the + * prompt. Non-action items (digests, threads, nudges) are excluded + * — the roll-up's input is the raw activity log, not the existing + * consolidations. + */ +function defaultLoadRecentActions(): FeedItem[] { + const feed = readHomeFeed(); + return feed.items + .filter((i) => i.type === "action") + .sort((a, b) => { + const am = Date.parse(a.createdAt); + const bm = Date.parse(b.createdAt); + if (Number.isNaN(am) && Number.isNaN(bm)) return 0; + if (Number.isNaN(am)) return 1; + if (Number.isNaN(bm)) return -1; + return bm - am; + }); +} + +/** + * Build the user-prompt context for one roll-up pass. Keeps the + * relationship-state block small and bounds the action list at + * {@link MAX_ACTIONS_IN_PROMPT} so a pathological input can't blow + * the token budget. */ function buildUserPrompt( + actions: FeedItem[], state: Awaited>, now: Date, ): string { + const actionLines = actions + .slice(0, MAX_ACTIONS_IN_PROMPT) + .map((a) => { + const src = a.source ? `[${a.source}]` : "[-]"; + return ` - ${a.createdAt} ${src} ${a.title} — ${a.summary}`; + }) + .join("\n"); + const factLines = state.facts - .slice(0, 20) + .slice(0, 10) .map((f) => ` - ${f.category}: ${f.text}`) .join("\n"); @@ -259,13 +318,14 @@ function buildUserPrompt( `Assistant name: ${state.assistantName}`, state.userName ? `User name: ${state.userName}` : "User name: (unknown)", `Relationship tier: ${state.tier} / 4`, - `Progress toward next tier: ${state.progressPercent}%`, - `Conversation count: ${state.conversationCount}`, "", - "Known facts about the user:", + `Recent activity log entries (most recent first, up to ${MAX_ACTIONS_IN_PROMPT}):`, + actionLines.length > 0 ? actionLines : " (none)", + "", + "Known facts about the user (for context only — do NOT invent roll-ups from these):", factLines.length > 0 ? factLines : " (none yet)", "", - "Based on this, is there anything worth nudging the user about right now? Remember: prefer 0 items over filler. Use the `write_feed_items` tool.", + "Consolidate the activity log above into a small set of `digest` or `thread` roll-up items. Remember: prefer 0 items over filler, and only roll up when several related actions cluster into a coherent story. Use the `write_feed_items` tool.", ].join("\n"); } @@ -273,21 +333,18 @@ function buildUserPrompt( * Coerce a raw tool_use item into * {@link WriteAssistantFeedItemParams}, returning null if the shape is * unrecoverable. The schema on the provider side enforces most of - * this, but the runtime check guards against model drift. + * this, but the runtime check guards against model drift — including + * the `type` narrowing to digest/thread (actions and nudges are + * rejected here even if the model ignores the tool schema). */ -function coerceReflectionItem( +function coerceRollupItem( raw: unknown, ): WriteAssistantFeedItemParams | null { if (!raw || typeof raw !== "object") return null; const obj = raw as Record; const type = obj.type; - if ( - type !== "nudge" && - type !== "digest" && - type !== "action" && - type !== "thread" - ) { + if (type !== "digest" && type !== "thread") { return null; } diff --git a/assistant/src/schedule/scheduler.ts b/assistant/src/schedule/scheduler.ts index ef059e46ac3..d186f795b55 100644 --- a/assistant/src/schedule/scheduler.ts +++ b/assistant/src/schedule/scheduler.ts @@ -1,3 +1,4 @@ +import { emitFeedEvent } from "../home/emit-feed-event.js"; import { bootstrapConversation } from "../memory/conversation-bootstrap.js"; import { getConversation } from "../memory/conversation-crud.js"; import { invalidateAssistantInferredItemsForConversation } from "../memory/task-memory-cleanup.js"; @@ -141,11 +142,21 @@ async function runScheduleOnce( }); if (isOneShot) { completeOneShot(job.id); + emitScheduleFeedEvent({ + title: job.name, + summary: "Reminder fired.", + dedupKey: `schedule-notify-oneshot:${job.id}`, + }); } else { // Track recurring notify-mode success so lastStatus resets to ok // and retryCount clears after a transient failure. const runId = createScheduleRun(job.id, `notify-ok:${job.id}`); completeScheduleRun(runId, { status: "ok" }); + emitScheduleFeedEvent({ + title: job.name, + summary: "Scheduled notification fired.", + dedupKey: `schedule-run:${runId}`, + }); } } catch (err) { log.warn( @@ -223,6 +234,11 @@ async function runScheduleOnce( completeScheduleRun(runId, { status: "ok" }); if (!job.quiet) { notifySchedule({ id: job.id, name: job.name }); + emitScheduleFeedEvent({ + title: job.name, + summary: "Scheduled task ran.", + dedupKey: `schedule-run:${runId}`, + }); } if (isOneShot) completeOneShot(job.id); } @@ -317,6 +333,11 @@ async function runScheduleOnce( completeScheduleRun(runId, { status: "ok" }); if (!job.quiet) { notifySchedule({ id: job.id, name: job.name }); + emitScheduleFeedEvent({ + title: job.name, + summary: isOneShot ? "One-shot reminder ran." : "Scheduled job ran.", + dedupKey: `schedule-run:${runId}`, + }); } if (isOneShot) completeOneShot(job.id); processed += 1; @@ -384,3 +405,32 @@ async function runScheduleOnce( } return processed; } + +/** + * Fire-and-forget home-feed emit for a successful schedule run. + * + * Wraps {@link emitFeedEvent} with local error handling so a schema + * failure or writer hiccup can never interrupt the scheduler tick + * loop. The dedupKey is always derived from the schedule run id (or + * the job id, for one-shot notify-mode which fires before a run + * record is created) so each run lands as its own entry in the + * activity log — the writer's per-source cap keeps total volume + * bounded. + */ +function emitScheduleFeedEvent(params: { + title: string; + summary: string; + dedupKey: string; +}): void { + void emitFeedEvent({ + source: "assistant", + title: params.title, + summary: params.summary, + dedupKey: params.dedupKey, + }).catch((err) => { + log.warn( + { err, dedupKey: params.dedupKey }, + "Failed to emit schedule feed event", + ); + }); +} diff --git a/assistant/src/sequence/engine.ts b/assistant/src/sequence/engine.ts index 32f42c5bc3d..e959ce03cdc 100644 --- a/assistant/src/sequence/engine.ts +++ b/assistant/src/sequence/engine.ts @@ -6,6 +6,7 @@ * sends through the messaging layer. */ +import { emitFeedEvent } from "../home/emit-feed-event.js"; import { bootstrapConversation } from "../memory/conversation-bootstrap.js"; import { getMessages } from "../memory/conversation-crud.js"; import type { ScheduleMessageProcessor } from "../schedule/scheduler.js"; @@ -200,6 +201,28 @@ async function processEnrollment( recordSend(sequence.id); recordEvent(sequence.id, enrollment.id, "send", step.index); + // Fire-and-forget home-feed activity log entry. Each (enrollment, + // step) pair is a distinct real signal (an email actually went + // out), so the dedupKey embeds both — repeat emits for the same + // step are impossible because the enrollment advances after this + // line, but if they did occur they'd land on the same entry. + void emitFeedEvent({ + source: "assistant", + title: sequence.name, + summary: `Sent step ${step.index + 1} of ${sequence.steps.length} to ${enrollment.contactEmail}.`, + dedupKey: `sequence-step:${enrollment.id}:${step.index}`, + }).catch((err) => { + log.warn( + { + err, + sequenceId: sequence.id, + enrollmentId: enrollment.id, + step: step.index, + }, + "Failed to emit sequence step feed event", + ); + }); + // Advance to the next step const nextStepIndex = enrollment.currentStep + 1; if (nextStepIndex >= sequence.steps.length) {