diff --git a/assistant/src/home/__tests__/feed-scheduler.test.ts b/assistant/src/home/__tests__/feed-scheduler.test.ts index dd00984c5bd..80fb318196a 100644 --- a/assistant/src/home/__tests__/feed-scheduler.test.ts +++ b/assistant/src/home/__tests__/feed-scheduler.test.ts @@ -70,28 +70,36 @@ describe("startFeedScheduler", () => { expect(summary2.gmailDigestRan).toBe(true); }); - test("rollup only re-runs every 30 minutes", async () => { + test("rollup only re-runs every 2 hours as the safety-net cadence", async () => { + // The scheduler is the safety net; the primary trigger is the + // on-visit refresh in home-feed-routes.ts. Long cadence is + // intentional so the scheduler doesn't fight the route. handle = startFeedScheduler(defaultOptions()); const t0 = new Date("2026-04-14T12:00:00.000Z"); await handle.runOnce(t0); - // 5 min later — below the 30-min reflection gate. - const t1 = new Date("2026-04-14T12:05:00.000Z"); + // 30 min later — below the 2-hour gate. + const t1 = new Date("2026-04-14T12:30:00.000Z"); const summary1 = await handle.runOnce(t1); expect(summary1.rollupRan).toBe(false); - // 31 min later — past the 30-min gate, should re-run. - const t2 = new Date("2026-04-14T12:31:00.000Z"); + // 1h later — still below the 2-hour gate. + const t2 = new Date("2026-04-14T13:00:00.000Z"); const summary2 = await handle.runOnce(t2); - expect(summary2.rollupRan).toBe(true); + expect(summary2.rollupRan).toBe(false); + + // 2h 1m later — past the gate, should re-run. + const t3 = new Date("2026-04-14T14:01:00.000Z"); + const summary3 = await handle.runOnce(t3); + expect(summary3.rollupRan).toBe(true); }); test("rollup cooldown is NOT advanced on no_provider so the next tick retries", async () => { // Mimic the daemon startup ordering: the scheduler boots before // the provider registry is ready. The first tick gets no_provider; // the next tick (even one second later) must still run the rollup - // instead of waiting 30 minutes. + // instead of waiting 2 hours. rollupRunner.mockImplementationOnce(async () => ({ wroteCount: 0, skippedReason: "no_provider", @@ -110,10 +118,30 @@ describe("startFeedScheduler", () => { expect(rollupRunner).toHaveBeenCalledTimes(2); }); + test("rollup cooldown is NOT advanced on in_flight so the next tick retries", async () => { + // in_flight means another caller (on-visit refresh, usually) is + // already running the producer. Advancing the gate here would + // force the NEXT tick to wait out the full cadence window even + // though nothing broken happened — the other caller's result is + // effectively this tick's run. + rollupRunner.mockImplementationOnce(async () => ({ + wroteCount: 0, + skippedReason: "in_flight", + })); + + handle = startFeedScheduler(defaultOptions()); + await handle.runOnce(new Date("2026-04-14T12:00:00.000Z")); + expect(rollupRunner).toHaveBeenCalledTimes(1); + + const summary = await handle.runOnce(new Date("2026-04-14T12:00:01.000Z")); + expect(summary.rollupRan).toBe(true); + expect(rollupRunner).toHaveBeenCalledTimes(2); + }); + test("rollup cooldown is NOT advanced on no_actions so the next tick retries", async () => { // no_actions means the activity log was empty — no LLM call was // made. A subsequent tick should retry as soon as new actions - // land, not wait the full 30-minute window. + // land, not wait the full 2-hour window. rollupRunner.mockImplementationOnce(async () => ({ wroteCount: 0, skippedReason: "no_actions", @@ -131,7 +159,7 @@ describe("startFeedScheduler", () => { test("rollup cooldown IS advanced on other skip reasons to preserve backoff", async () => { // empty_items / malformed_output / provider_error are real LLM - // attempts — the next tick should be gated by the full 30-minute + // attempts — the next tick should be gated by the full 2-hour // window so a broken producer doesn't get hammered every tick. rollupRunner.mockImplementationOnce(async () => ({ wroteCount: 0, @@ -142,8 +170,8 @@ describe("startFeedScheduler", () => { await handle.runOnce(new Date("2026-04-14T12:00:00.000Z")); expect(rollupRunner).toHaveBeenCalledTimes(1); - // Ten minutes later — below the 30-min gate, should NOT re-run. - const summary = await handle.runOnce(new Date("2026-04-14T12:10:00.000Z")); + // Thirty minutes later — below the 2-hour gate, should NOT re-run. + const summary = await handle.runOnce(new Date("2026-04-14T12:30:00.000Z")); expect(summary.rollupRan).toBe(false); expect(rollupRunner).toHaveBeenCalledTimes(1); }); diff --git a/assistant/src/home/__tests__/rollup-producer.test.ts b/assistant/src/home/__tests__/rollup-producer.test.ts index 1e0eeea1e27..d9377beee7d 100644 --- a/assistant/src/home/__tests__/rollup-producer.test.ts +++ b/assistant/src/home/__tests__/rollup-producer.test.ts @@ -364,6 +364,50 @@ describe("runRollupProducer", () => { expect(result.wroteCount).toBe(0); }); + test("concurrent calls short-circuit the second with in_flight", async () => { + // Gate the provider behind a manually-controlled deferred so we + // can observe state while the first call is still inside the + // producer body. Without this we'd race the runtime's microtask + // scheduler to check in-flightness. + let release: ((value: ContentBlock[]) => void) | null = null; + const gated = new Promise((resolve) => { + release = resolve; + }); + const provider = makeProvider(async () => { + const content = await gated; + return { + content, + model: "mock-model", + usage: { inputTokens: 0, outputTokens: 0 }, + stopReason: "tool_use", + }; + }); + + const first = runRollupProducer(new Date(), { + writeItem, + loadRelationshipState: stubRelationshipState, + loadRecentActions: stubLoadRecentActions(oneAction), + resolveProvider: () => provider, + }); + + // Second call lands while `first` is blocked awaiting the gated + // provider response — the in-flight guard must short-circuit it. + const second = await runRollupProducer(new Date(), { + writeItem, + loadRelationshipState: stubRelationshipState, + loadRecentActions: stubLoadRecentActions(oneAction), + resolveProvider: () => provider, + }); + + expect(second.skippedReason).toBe("in_flight"); + expect(second.wroteCount).toBe(0); + + // Release the first call and let it finish. + release!([toolUseContent({ items: [] })]); + const firstResult = await first; + expect(firstResult.skippedReason).toBe("empty_items"); + }); + test("clamps priority to the valid [0, 100] window", async () => { const provider = scriptedProvider([ toolUseContent({ diff --git a/assistant/src/home/feed-scheduler.ts b/assistant/src/home/feed-scheduler.ts index 2f9f2be714e..1bddc0ea024 100644 --- a/assistant/src/home/feed-scheduler.ts +++ b/assistant/src/home/feed-scheduler.ts @@ -43,7 +43,15 @@ const TICK_INTERVAL_MS = 5 * 60 * 1000; /** Per-producer minimum gap between runs. */ const GMAIL_DIGEST_INTERVAL_MS = 5 * 60 * 1000; -const ROLLUP_INTERVAL_MS = 30 * 60 * 1000; +/** + * Roll-up cadence is deliberately long — 120 minutes — because the + * scheduler is the *safety net*, not the primary trigger. Opening the + * Home page fires a debounced on-visit refresh in the HTTP route (see + * `runtime/routes/home-feed-routes.ts`), which is the path most users + * actually hit. The scheduler exists so the feed still stays fresh + * for long idle stretches where nobody opens the Home page. + */ +const ROLLUP_INTERVAL_MS = 2 * 60 * 60 * 1000; export interface FeedSchedulerHandle { /** Stops the interval. Safe to call multiple times. */ @@ -146,8 +154,9 @@ export function startFeedScheduler( "Rollup producer ran", ); // Only advance the cooldown gate when the producer actually - // had a chance to run the LLM. Two skip reasons short-circuit - // before any provider call and should NOT burn the window: + // had a chance to run the LLM. Three skip reasons short- + // circuit before any provider call and should NOT burn the + // window: // - `no_provider`: the provider registry wasn't ready yet // (happens on the startup tick because the feed scheduler // boots before the provider init pass in @@ -155,12 +164,19 @@ export function startFeedScheduler( // - `no_actions`: there was nothing to roll up. A subsequent // tick should retry as soon as new actions land, not wait // the full window. + // - `in_flight`: another caller (usually the on-visit + // refresh trigger in `home-feed-routes.ts`) is already + // running the rollup. That caller's result effectively + // counts as this scheduler tick's real run; bumping the + // gate here would force the NEXT tick to also wait out + // the full window even though nothing broken happened. // Every other outcome — success, empty items, malformed // output, provider error — is a real LLM attempt and does // advance the gate so a broken producer doesn't hammer us. if ( result.skippedReason !== "no_provider" && - result.skippedReason !== "no_actions" + result.skippedReason !== "no_actions" && + result.skippedReason !== "in_flight" ) { lastRollupAt = nowMs; } diff --git a/assistant/src/home/rollup-producer.ts b/assistant/src/home/rollup-producer.ts index ca5e790c239..4f9d3f56e1d 100644 --- a/assistant/src/home/rollup-producer.ts +++ b/assistant/src/home/rollup-producer.ts @@ -144,10 +144,16 @@ export interface RollupResult { * `no_actions` means there was nothing to roll up — a quiet but * normal outcome that does not advance the cooldown gate (no point * re-running until new actions land). + * + * `in_flight` means another caller is already running the producer. + * The second call short-circuits without entering the body so we + * never fire two concurrent LLM requests — this guards against the + * scheduler tick and an on-visit refresh trigger racing each other. */ skippedReason: | "no_provider" | "no_actions" + | "in_flight" | "empty_items" | "provider_error" | "malformed_output" @@ -169,6 +175,16 @@ export interface RollupProducerDeps { resolveProvider?: () => Provider | null; } +/** + * Module-level in-flight guard. Prevents two callers (e.g. the + * scheduler tick and an on-visit refresh trigger) from launching two + * concurrent LLM requests. A second caller short-circuits with + * `skippedReason: "in_flight"` without entering the body. The lock + * is released in the `finally` so a thrown exception can't strand + * it in the `true` state. + */ +let producerInFlight = false; + /** * Run one roll-up pass. Loads recent action items from the feed plus * relationship state, builds a user prompt around them, asks the @@ -178,6 +194,21 @@ export interface RollupProducerDeps { export async function runRollupProducer( now: Date = new Date(), deps: RollupProducerDeps = {}, +): Promise { + if (producerInFlight) { + return { wroteCount: 0, skippedReason: "in_flight" }; + } + producerInFlight = true; + try { + return await runRollupProducerInner(now, deps); + } finally { + producerInFlight = false; + } +} + +async function runRollupProducerInner( + now: Date, + deps: RollupProducerDeps, ): Promise { const writeItem = deps.writeItem ?? writeAssistantFeedItem; const loadRelationshipState = @@ -337,9 +368,7 @@ function buildUserPrompt( * the `type` narrowing to digest/thread (actions and nudges are * rejected here even if the model ignores the tool schema). */ -function coerceRollupItem( - raw: unknown, -): WriteAssistantFeedItemParams | null { +function coerceRollupItem(raw: unknown): WriteAssistantFeedItemParams | null { if (!raw || typeof raw !== "object") return null; const obj = raw as Record; diff --git a/assistant/src/runtime/routes/__tests__/home-feed-routes.test.ts b/assistant/src/runtime/routes/__tests__/home-feed-routes.test.ts index b9f8df5d495..6d7f52ae988 100644 --- a/assistant/src/runtime/routes/__tests__/home-feed-routes.test.ts +++ b/assistant/src/runtime/routes/__tests__/home-feed-routes.test.ts @@ -53,8 +53,28 @@ mock.module("../../../memory/conversation-crud.js", () => ({ }, })); +// Stub the rollup producer so the on-visit refresh trigger inside +// handleGetHomeFeed doesn't try to resolve a real provider or touch +// relationship state. The route calls the producer fire-and-forget, +// so tests observe the trigger via call-count on this spy rather +// than awaiting a return value. +// +// Default skip reason is `empty_items` — a real LLM attempt that +// returned nothing to consolidate. Using a real-run skip means the +// debounce gate holds firm in the default case (matching production +// semantics); individual tests override with `no_provider` etc. to +// exercise the rollback path. +const rollupProducerSpy = mock<() => Promise>(async () => ({ + wroteCount: 0, + skippedReason: "empty_items", +})); +mock.module("../../../home/rollup-producer.js", () => ({ + runRollupProducer: rollupProducerSpy, +})); + // Dynamic imports so module mocks are wired before evaluation. const { + __resetOnVisitRefreshStateForTests, computeGreeting, formatRelativeTime, handleGetHomeFeed, @@ -125,6 +145,8 @@ beforeEach(() => { origWorkspaceDir = process.env.VELLUM_WORKSPACE_DIR; process.env.VELLUM_WORKSPACE_DIR = workspaceDir; publishSpy.mockClear(); + rollupProducerSpy.mockClear(); + __resetOnVisitRefreshStateForTests(); createdConversations.length = 0; addedMessages.length = 0; createConversationShouldThrow = false; @@ -322,6 +344,85 @@ describe("handleGetHomeFeed", () => { expect(body.contextBanner.timeAwayLabel).toBe("2 hours ago"); }); + test("fires the rollup producer fire-and-forget on the first GET", async () => { + const res = await handleGetHomeFeed( + new Request("http://localhost/v1/home/feed?timeAwaySeconds=0"), + ); + expect(res.status).toBe(200); + // Yield a microtask so the fire-and-forget call reaches its + // first await point. + await Promise.resolve(); + expect(rollupProducerSpy).toHaveBeenCalledTimes(1); + }); + + test("does NOT refire the rollup when the debounce window has not elapsed", async () => { + await handleGetHomeFeed( + new Request("http://localhost/v1/home/feed?timeAwaySeconds=0"), + ); + await Promise.resolve(); + expect(rollupProducerSpy).toHaveBeenCalledTimes(1); + + // A second GET milliseconds later should NOT re-trigger the + // rollup — the 10-minute debounce prevents aggressive pollers or + // multiple panels from firing repeat refreshes. + await handleGetHomeFeed( + new Request("http://localhost/v1/home/feed?timeAwaySeconds=0"), + ); + await handleGetHomeFeed( + new Request("http://localhost/v1/home/feed?timeAwaySeconds=0"), + ); + await Promise.resolve(); + expect(rollupProducerSpy).toHaveBeenCalledTimes(1); + }); + + test("debounce is rolled back when the producer skips before the LLM call", async () => { + // Simulate the daemon-boot race: the first GET fires the + // producer but the provider registry isn't ready yet, so the + // producer short-circuits with `no_provider`. A second GET a + // moment later must still be allowed to fire — otherwise Home + // stays stale for the full 10-minute debounce window while the + // user is actively trying to refresh. + rollupProducerSpy.mockImplementationOnce(async () => ({ + wroteCount: 0, + skippedReason: "no_provider", + })); + + await handleGetHomeFeed( + new Request("http://localhost/v1/home/feed?timeAwaySeconds=0"), + ); + // Let the fire-and-forget `.then()` that performs the rollback + // run before we issue the second GET. Two microtask ticks + // because the chain is runRollupProducer -> .then handler. + await Promise.resolve(); + await Promise.resolve(); + expect(rollupProducerSpy).toHaveBeenCalledTimes(1); + + // Second GET — producer is now ready. Gate must have been + // rolled back so this GET re-fires the producer. + await handleGetHomeFeed( + new Request("http://localhost/v1/home/feed?timeAwaySeconds=0"), + ); + await Promise.resolve(); + expect(rollupProducerSpy).toHaveBeenCalledTimes(2); + }); + + test("rollup producer failure does not turn the GET into an error", async () => { + // Even if the rollup producer rejects, the GET must still return + // 200 with the cached feed — the refresh is fire-and-forget. + rollupProducerSpy.mockImplementationOnce(async () => { + throw new Error("synthetic rollup failure"); + }); + + const res = await handleGetHomeFeed( + new Request("http://localhost/v1/home/feed?timeAwaySeconds=0"), + ); + expect(res.status).toBe(200); + // Drain the rejected promise so it doesn't leak into the next + // test as an unhandled rejection. + await Promise.resolve(); + await Promise.resolve(); + }); + test("newCount counts only status=new after filtering", async () => { writeFeedFile([ makeItem({ id: "a", status: "new" }), diff --git a/assistant/src/runtime/routes/home-feed-routes.ts b/assistant/src/runtime/routes/home-feed-routes.ts index 6cb830edede..9229b5814af 100644 --- a/assistant/src/runtime/routes/home-feed-routes.ts +++ b/assistant/src/runtime/routes/home-feed-routes.ts @@ -28,6 +28,7 @@ import { type FeedItemStatus, } from "../../home/feed-types.js"; import { patchFeedItemStatus, readHomeFeed } from "../../home/feed-writer.js"; +import { runRollupProducer } from "../../home/rollup-producer.js"; import { addMessage, createConversation, @@ -38,6 +39,26 @@ import type { RouteDefinition } from "../http-router.js"; const log = getLogger("home-feed-routes"); +/** + * Debounce window for the on-visit rollup refresh. A GET on the feed + * route fires the rollup producer fire-and-forget at most once per + * window — repeat GETs within this interval (e.g. from a client that + * polls aggressively, or from multiple panels opening in rapid + * succession) skip the trigger and just return the cached feed. + */ +const ON_VISIT_REFRESH_DEBOUNCE_MS = 10 * 60 * 1000; + +let lastOnVisitRefreshAt = 0; + +/** + * Reset the on-visit debounce gate. Test-only — production callers + * should never touch this. Exported with an underscore-prefixed name + * so lint rules can flag misuse. + */ +export function __resetOnVisitRefreshStateForTests(): void { + lastOnVisitRefreshAt = 0; +} + // --------------------------------------------------------------------------- // Response / request schemas // --------------------------------------------------------------------------- @@ -165,6 +186,12 @@ export async function handleGetHomeFeed(req: Request): Promise { "GET /v1/home/feed", ); + // Fire the on-visit rollup refresh AFTER computing the response so + // nothing in the trigger path can delay the GET. The rollup runs + // fire-and-forget; its writes will publish `home_feed_updated` via + // the writer's SSE path, and the client auto-refreshes. + maybeTriggerOnVisitRollupRefresh(now); + return Response.json({ items: filtered, updatedAt: feed.updatedAt, @@ -172,6 +199,73 @@ export async function handleGetHomeFeed(req: Request): Promise { }); } +/** + * Fire the rollup producer fire-and-forget when the debounce window + * has elapsed since the last trigger. Returns immediately — the + * caller never awaits the rollup, and any error inside the producer + * is swallowed into a warn log so an LLM hiccup can never turn a + * GET /v1/home/feed into a 500. + * + * The debounce is deliberately set in this module rather than the + * producer itself so the producer stays reusable from the scheduler + * (which has its own separate 2h safety-net cadence). The + * producer's internal in-flight guard handles the rare race where + * both the scheduler and the route fire at the same instant. + * + * Debounce semantics: we eager-advance `lastOnVisitRefreshAt` before + * awaiting the producer so concurrent GETs during the LLM call are + * blocked at the guard check above. When the producer returns with + * a skip reason that short-circuited BEFORE the LLM call + * (`no_provider`, `no_actions`, `in_flight`) we roll the gate back + * to its previous value — burning the full 10-minute window on a + * daemon that hasn't finished booting would leave Home stale the + * whole time the user is actively trying to refresh. Real LLM + * attempts (success / `empty_items` / `malformed_output` / + * `provider_error`) keep the advanced gate so a broken producer + * can't be hammered by an aggressive client. + */ +function maybeTriggerOnVisitRollupRefresh(now: Date): void { + const nowMs = now.getTime(); + if (nowMs - lastOnVisitRefreshAt < ON_VISIT_REFRESH_DEBOUNCE_MS) return; + const previousRefreshAt = lastOnVisitRefreshAt; + lastOnVisitRefreshAt = nowMs; + void runRollupProducer(now) + .then((result) => { + const skippedBeforeLLM = + result.skippedReason === "no_provider" || + result.skippedReason === "no_actions" || + result.skippedReason === "in_flight"; + if (skippedBeforeLLM) { + // Only roll back if no subsequent GET has since advanced the + // gate past our eager value. Defensive: given the guard at + // the top of this function a concurrent GET shouldn't be + // able to advance past our `nowMs`, but if some future + // refactor changes that we don't want to silently clobber a + // newer timestamp. + if (lastOnVisitRefreshAt === nowMs) { + lastOnVisitRefreshAt = previousRefreshAt; + } + log.debug( + { skippedReason: result.skippedReason }, + "On-visit rollup refresh skipped; debounce gate rolled back", + ); + } else if (result.skippedReason !== null) { + log.debug( + { skippedReason: result.skippedReason }, + "On-visit rollup refresh skipped", + ); + } else { + log.info( + { wroteCount: result.wroteCount }, + "On-visit rollup refresh completed", + ); + } + }) + .catch((err) => { + log.warn({ err }, "On-visit rollup refresh failed"); + }); +} + /** * `PATCH /v1/home/feed/:id`. *