Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 39 additions & 11 deletions assistant/src/home/__tests__/feed-scheduler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,28 +70,36 @@ describe("startFeedScheduler", () => {
expect(summary2.gmailDigestRan).toBe(true);
});

test("rollup only re-runs every 30 minutes", async () => {
test("rollup only re-runs every 2 hours as the safety-net cadence", async () => {
// The scheduler is the safety net; the primary trigger is the
// on-visit refresh in home-feed-routes.ts. Long cadence is
// intentional so the scheduler doesn't fight the route.
handle = startFeedScheduler(defaultOptions());

const t0 = new Date("2026-04-14T12:00:00.000Z");
await handle.runOnce(t0);

// 5 min later — below the 30-min reflection gate.
const t1 = new Date("2026-04-14T12:05:00.000Z");
// 30 min later — below the 2-hour gate.
const t1 = new Date("2026-04-14T12:30:00.000Z");
const summary1 = await handle.runOnce(t1);
expect(summary1.rollupRan).toBe(false);

// 31 min later — past the 30-min gate, should re-run.
const t2 = new Date("2026-04-14T12:31:00.000Z");
// 1h later — still below the 2-hour gate.
const t2 = new Date("2026-04-14T13:00:00.000Z");
const summary2 = await handle.runOnce(t2);
expect(summary2.rollupRan).toBe(true);
expect(summary2.rollupRan).toBe(false);

// 2h 1m later — past the gate, should re-run.
const t3 = new Date("2026-04-14T14:01:00.000Z");
const summary3 = await handle.runOnce(t3);
expect(summary3.rollupRan).toBe(true);
});

test("rollup cooldown is NOT advanced on no_provider so the next tick retries", async () => {
// Mimic the daemon startup ordering: the scheduler boots before
// the provider registry is ready. The first tick gets no_provider;
// the next tick (even one second later) must still run the rollup
// instead of waiting 30 minutes.
// instead of waiting 2 hours.
rollupRunner.mockImplementationOnce(async () => ({
wroteCount: 0,
skippedReason: "no_provider",
Expand All @@ -110,10 +118,30 @@ describe("startFeedScheduler", () => {
expect(rollupRunner).toHaveBeenCalledTimes(2);
});

test("rollup cooldown is NOT advanced on in_flight so the next tick retries", async () => {
// in_flight means another caller (on-visit refresh, usually) is
// already running the producer. Advancing the gate here would
// force the NEXT tick to wait out the full cadence window even
// though nothing broken happened — the other caller's result is
// effectively this tick's run.
rollupRunner.mockImplementationOnce(async () => ({
wroteCount: 0,
skippedReason: "in_flight",
}));

handle = startFeedScheduler(defaultOptions());
await handle.runOnce(new Date("2026-04-14T12:00:00.000Z"));
expect(rollupRunner).toHaveBeenCalledTimes(1);

const summary = await handle.runOnce(new Date("2026-04-14T12:00:01.000Z"));
expect(summary.rollupRan).toBe(true);
expect(rollupRunner).toHaveBeenCalledTimes(2);
});

test("rollup cooldown is NOT advanced on no_actions so the next tick retries", async () => {
// no_actions means the activity log was empty — no LLM call was
// made. A subsequent tick should retry as soon as new actions
// land, not wait the full 30-minute window.
// land, not wait the full 2-hour window.
rollupRunner.mockImplementationOnce(async () => ({
wroteCount: 0,
skippedReason: "no_actions",
Expand All @@ -131,7 +159,7 @@ describe("startFeedScheduler", () => {

test("rollup cooldown IS advanced on other skip reasons to preserve backoff", async () => {
// empty_items / malformed_output / provider_error are real LLM
// attempts — the next tick should be gated by the full 30-minute
// attempts — the next tick should be gated by the full 2-hour
// window so a broken producer doesn't get hammered every tick.
rollupRunner.mockImplementationOnce(async () => ({
wroteCount: 0,
Expand All @@ -142,8 +170,8 @@ describe("startFeedScheduler", () => {
await handle.runOnce(new Date("2026-04-14T12:00:00.000Z"));
expect(rollupRunner).toHaveBeenCalledTimes(1);

// Ten minutes later — below the 30-min gate, should NOT re-run.
const summary = await handle.runOnce(new Date("2026-04-14T12:10:00.000Z"));
// Thirty minutes later — below the 2-hour gate, should NOT re-run.
const summary = await handle.runOnce(new Date("2026-04-14T12:30:00.000Z"));
expect(summary.rollupRan).toBe(false);
expect(rollupRunner).toHaveBeenCalledTimes(1);
});
Expand Down
44 changes: 44 additions & 0 deletions assistant/src/home/__tests__/rollup-producer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,50 @@ describe("runRollupProducer", () => {
expect(result.wroteCount).toBe(0);
});

test("concurrent calls short-circuit the second with in_flight", async () => {
// Gate the provider behind a manually-controlled deferred so we
// can observe state while the first call is still inside the
// producer body. Without this we'd race the runtime's microtask
// scheduler to check in-flightness.
let release: ((value: ContentBlock[]) => void) | null = null;
const gated = new Promise<ContentBlock[]>((resolve) => {
release = resolve;
});
const provider = makeProvider(async () => {
const content = await gated;
return {
content,
model: "mock-model",
usage: { inputTokens: 0, outputTokens: 0 },
stopReason: "tool_use",
};
});

const first = runRollupProducer(new Date(), {
writeItem,
loadRelationshipState: stubRelationshipState,
loadRecentActions: stubLoadRecentActions(oneAction),
resolveProvider: () => provider,
});

// Second call lands while `first` is blocked awaiting the gated
// provider response — the in-flight guard must short-circuit it.
const second = await runRollupProducer(new Date(), {
writeItem,
loadRelationshipState: stubRelationshipState,
loadRecentActions: stubLoadRecentActions(oneAction),
resolveProvider: () => provider,
});

expect(second.skippedReason).toBe("in_flight");
expect(second.wroteCount).toBe(0);

// Release the first call and let it finish.
release!([toolUseContent({ items: [] })]);
const firstResult = await first;
expect(firstResult.skippedReason).toBe("empty_items");
});

test("clamps priority to the valid [0, 100] window", async () => {
const provider = scriptedProvider([
toolUseContent({
Expand Down
24 changes: 20 additions & 4 deletions assistant/src/home/feed-scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,15 @@ const TICK_INTERVAL_MS = 5 * 60 * 1000;

/** Per-producer minimum gap between runs. */
const GMAIL_DIGEST_INTERVAL_MS = 5 * 60 * 1000;
const ROLLUP_INTERVAL_MS = 30 * 60 * 1000;
/**
* Roll-up cadence is deliberately long — 120 minutes — because the
* scheduler is the *safety net*, not the primary trigger. Opening the
* Home page fires a debounced on-visit refresh in the HTTP route (see
* `runtime/routes/home-feed-routes.ts`), which is the path most users
* actually hit. The scheduler exists so the feed still stays fresh
* for long idle stretches where nobody opens the Home page.
*/
const ROLLUP_INTERVAL_MS = 2 * 60 * 60 * 1000;

export interface FeedSchedulerHandle {
/** Stops the interval. Safe to call multiple times. */
Expand Down Expand Up @@ -146,21 +154,29 @@ export function startFeedScheduler(
"Rollup producer ran",
);
// Only advance the cooldown gate when the producer actually
// had a chance to run the LLM. Two skip reasons short-circuit
// before any provider call and should NOT burn the window:
// had a chance to run the LLM. Three skip reasons short-
// circuit before any provider call and should NOT burn the
// window:
// - `no_provider`: the provider registry wasn't ready yet
// (happens on the startup tick because the feed scheduler
// boots before the provider init pass in
// `daemon/lifecycle.ts`).
// - `no_actions`: there was nothing to roll up. A subsequent
// tick should retry as soon as new actions land, not wait
// the full window.
// - `in_flight`: another caller (usually the on-visit
// refresh trigger in `home-feed-routes.ts`) is already
// running the rollup. That caller's result effectively
// counts as this scheduler tick's real run; bumping the
// gate here would force the NEXT tick to also wait out
// the full window even though nothing broken happened.
// Every other outcome — success, empty items, malformed
// output, provider error — is a real LLM attempt and does
// advance the gate so a broken producer doesn't hammer us.
if (
result.skippedReason !== "no_provider" &&
result.skippedReason !== "no_actions"
result.skippedReason !== "no_actions" &&
result.skippedReason !== "in_flight"
) {
lastRollupAt = nowMs;
}
Expand Down
35 changes: 32 additions & 3 deletions assistant/src/home/rollup-producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,16 @@ export interface RollupResult {
* `no_actions` means there was nothing to roll up — a quiet but
* normal outcome that does not advance the cooldown gate (no point
* re-running until new actions land).
*
* `in_flight` means another caller is already running the producer.
* The second call short-circuits without entering the body so we
* never fire two concurrent LLM requests — this guards against the
* scheduler tick and an on-visit refresh trigger racing each other.
*/
skippedReason:
| "no_provider"
| "no_actions"
| "in_flight"
| "empty_items"
| "provider_error"
| "malformed_output"
Expand All @@ -169,6 +175,16 @@ export interface RollupProducerDeps {
resolveProvider?: () => Provider | null;
}

/**
* Module-level in-flight guard. Prevents two callers (e.g. the
* scheduler tick and an on-visit refresh trigger) from launching two
* concurrent LLM requests. A second caller short-circuits with
* `skippedReason: "in_flight"` without entering the body. The lock
* is released in the `finally` so a thrown exception can't strand
* it in the `true` state.
*/
let producerInFlight = false;

/**
* Run one roll-up pass. Loads recent action items from the feed plus
* relationship state, builds a user prompt around them, asks the
Expand All @@ -178,6 +194,21 @@ export interface RollupProducerDeps {
export async function runRollupProducer(
now: Date = new Date(),
deps: RollupProducerDeps = {},
): Promise<RollupResult> {
if (producerInFlight) {
return { wroteCount: 0, skippedReason: "in_flight" };
}
producerInFlight = true;
try {
return await runRollupProducerInner(now, deps);
} finally {
producerInFlight = false;
}
}

async function runRollupProducerInner(
now: Date,
deps: RollupProducerDeps,
): Promise<RollupResult> {
const writeItem = deps.writeItem ?? writeAssistantFeedItem;
const loadRelationshipState =
Expand Down Expand Up @@ -337,9 +368,7 @@ function buildUserPrompt(
* the `type` narrowing to digest/thread (actions and nudges are
* rejected here even if the model ignores the tool schema).
*/
function coerceRollupItem(
raw: unknown,
): WriteAssistantFeedItemParams | null {
function coerceRollupItem(raw: unknown): WriteAssistantFeedItemParams | null {
if (!raw || typeof raw !== "object") return null;
const obj = raw as Record<string, unknown>;

Expand Down
101 changes: 101 additions & 0 deletions assistant/src/runtime/routes/__tests__/home-feed-routes.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,28 @@ mock.module("../../../memory/conversation-crud.js", () => ({
},
}));

// Stub the rollup producer so the on-visit refresh trigger inside
// handleGetHomeFeed doesn't try to resolve a real provider or touch
// relationship state. The route calls the producer fire-and-forget,
// so tests observe the trigger via call-count on this spy rather
// than awaiting a return value.
//
// Default skip reason is `empty_items` — a real LLM attempt that
// returned nothing to consolidate. Using a real-run skip means the
// debounce gate holds firm in the default case (matching production
// semantics); individual tests override with `no_provider` etc. to
// exercise the rollback path.
const rollupProducerSpy = mock<() => Promise<unknown>>(async () => ({
wroteCount: 0,
skippedReason: "empty_items",
}));
mock.module("../../../home/rollup-producer.js", () => ({
runRollupProducer: rollupProducerSpy,
}));

// Dynamic imports so module mocks are wired before evaluation.
const {
__resetOnVisitRefreshStateForTests,
computeGreeting,
formatRelativeTime,
handleGetHomeFeed,
Expand Down Expand Up @@ -125,6 +145,8 @@ beforeEach(() => {
origWorkspaceDir = process.env.VELLUM_WORKSPACE_DIR;
process.env.VELLUM_WORKSPACE_DIR = workspaceDir;
publishSpy.mockClear();
rollupProducerSpy.mockClear();
__resetOnVisitRefreshStateForTests();
createdConversations.length = 0;
addedMessages.length = 0;
createConversationShouldThrow = false;
Expand Down Expand Up @@ -322,6 +344,85 @@ describe("handleGetHomeFeed", () => {
expect(body.contextBanner.timeAwayLabel).toBe("2 hours ago");
});

test("fires the rollup producer fire-and-forget on the first GET", async () => {
const res = await handleGetHomeFeed(
new Request("http://localhost/v1/home/feed?timeAwaySeconds=0"),
);
expect(res.status).toBe(200);
// Yield a microtask so the fire-and-forget call reaches its
// first await point.
await Promise.resolve();
expect(rollupProducerSpy).toHaveBeenCalledTimes(1);
});

test("does NOT refire the rollup when the debounce window has not elapsed", async () => {
await handleGetHomeFeed(
new Request("http://localhost/v1/home/feed?timeAwaySeconds=0"),
);
await Promise.resolve();
expect(rollupProducerSpy).toHaveBeenCalledTimes(1);

// A second GET milliseconds later should NOT re-trigger the
// rollup — the 10-minute debounce prevents aggressive pollers or
// multiple panels from firing repeat refreshes.
await handleGetHomeFeed(
new Request("http://localhost/v1/home/feed?timeAwaySeconds=0"),
);
await handleGetHomeFeed(
new Request("http://localhost/v1/home/feed?timeAwaySeconds=0"),
);
await Promise.resolve();
expect(rollupProducerSpy).toHaveBeenCalledTimes(1);
});

test("debounce is rolled back when the producer skips before the LLM call", async () => {
// Simulate the daemon-boot race: the first GET fires the
// producer but the provider registry isn't ready yet, so the
// producer short-circuits with `no_provider`. A second GET a
// moment later must still be allowed to fire — otherwise Home
// stays stale for the full 10-minute debounce window while the
// user is actively trying to refresh.
rollupProducerSpy.mockImplementationOnce(async () => ({
wroteCount: 0,
skippedReason: "no_provider",
}));

await handleGetHomeFeed(
new Request("http://localhost/v1/home/feed?timeAwaySeconds=0"),
);
// Let the fire-and-forget `.then()` that performs the rollback
// run before we issue the second GET. Two microtask ticks
// because the chain is runRollupProducer -> .then handler.
await Promise.resolve();
await Promise.resolve();
expect(rollupProducerSpy).toHaveBeenCalledTimes(1);

// Second GET — producer is now ready. Gate must have been
// rolled back so this GET re-fires the producer.
await handleGetHomeFeed(
new Request("http://localhost/v1/home/feed?timeAwaySeconds=0"),
);
await Promise.resolve();
expect(rollupProducerSpy).toHaveBeenCalledTimes(2);
});

test("rollup producer failure does not turn the GET into an error", async () => {
// Even if the rollup producer rejects, the GET must still return
// 200 with the cached feed — the refresh is fire-and-forget.
rollupProducerSpy.mockImplementationOnce(async () => {
throw new Error("synthetic rollup failure");
});

const res = await handleGetHomeFeed(
new Request("http://localhost/v1/home/feed?timeAwaySeconds=0"),
);
expect(res.status).toBe(200);
// Drain the rejected promise so it doesn't leak into the next
// test as an unhandled rejection.
await Promise.resolve();
await Promise.resolve();
});

test("newCount counts only status=new after filtering", async () => {
writeFeedFile([
makeItem({ id: "a", status: "new" }),
Expand Down
Loading
Loading