From 921e75a1be07541a5cd99916f2ef66f028d4ee80 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Mon, 1 Jun 2026 14:04:04 +0000 Subject: [PATCH] feat(runtime): store targeting metadata on ring entries for filtered replay Targeted events (those with targetCapability, targetClientId, targetInterfaceId, or excludeClientId modifiers) are now buffered in the per-conversation ring with their targeting metadata attached. getReplayWindow accepts an optional ReplaySubscriber descriptor and re-applies the same delivery filter that the live publish() path uses, so reconnecting clients only receive events they would have seen live. This eliminates false-positive seq gaps on reconnect -- previously, targeted events were stamped with a seq but excluded from the ring, creating permanent holes that triggered unnecessary reconcile fetches on the client side. Co-Authored-By: vargas@vellum.ai --- .../conversation-stream-state.test.ts | 246 +++++++++++++++++- assistant/src/runtime/assistant-event-hub.ts | 6 +- .../src/runtime/conversation-stream-state.ts | 141 ++++++++-- assistant/src/runtime/routes/events-routes.ts | 18 +- 4 files changed, 375 insertions(+), 36 deletions(-) diff --git a/assistant/src/__tests__/conversation-stream-state.test.ts b/assistant/src/__tests__/conversation-stream-state.test.ts index 951047cdf3a..66a240715da 100644 --- a/assistant/src/__tests__/conversation-stream-state.test.ts +++ b/assistant/src/__tests__/conversation-stream-state.test.ts @@ -1,6 +1,10 @@ import { beforeEach, describe, expect, test } from "bun:test"; import type { AssistantEvent } from "../runtime/assistant-event.js"; +import type { + EventTargeting, + ReplaySubscriber, +} from "../runtime/conversation-stream-state.js"; import { _peekStreamForTesting, _resetConversationStreamsForTesting, @@ -66,30 +70,46 @@ describe("conversation-stream-state", () => { expect(peek?.newestSeq).toBe(2); }); - test("replayable: false stamps seq but skips ring push", () => { + test("targeted events are buffered with targeting metadata", () => { + /** Targeted events now stay in the ring so replay can filter them. */ + + // GIVEN a targeting modifier + const targeting: EventTargeting = { + targetCapability: "host_bash", + }; + + // WHEN a targeted event is stamped const targeted = mkEvent(); - stampAndBuffer(targeted, { replayable: false }); - expect(targeted.seq).toBe(1); // still stamped for wire-side ordering + stampAndBuffer(targeted, { targeting }); + + // THEN it receives a seq and lands in the ring + expect(targeted.seq).toBe(1); const peek = _peekStreamForTesting(CONV); - // State exists (we created it to assign seq) but ring is empty. - expect(peek?.nextSeq).toBe(2); - expect(peek?.ringLength).toBe(0); + expect(peek?.ringLength).toBe(1); + expect(peek?.oldestSeq).toBe(1); }); - test("seq stays monotonic across mixed replayable/non-replayable events", () => { + test("seq stays monotonic across targeted and untargeted events", () => { + /** All events share a contiguous seq counter regardless of targeting. */ + + // GIVEN a mix of untargeted and targeted events const a = mkEvent(); const b = mkEvent(); const c = mkEvent(); const d = mkEvent(); - stampAndBuffer(a); // replayable - stampAndBuffer(b, { replayable: false }); // targeted -- skipped - stampAndBuffer(c); // replayable - stampAndBuffer(d, { replayable: false }); // targeted -- skipped + + // WHEN they are stamped + stampAndBuffer(a); + stampAndBuffer(b, { targeting: { targetCapability: "host_bash" } }); + stampAndBuffer(c); + stampAndBuffer(d, { targeting: { excludeClientId: "client-1" } }); + + // THEN seqs are monotonic and all four are buffered expect([a.seq, b.seq, c.seq, d.seq]).toEqual([1, 2, 3, 4]); const peek = _peekStreamForTesting(CONV); - expect(peek?.ringLength).toBe(2); // only a + c in buffer + expect(peek?.ringLength).toBe(4); expect(peek?.oldestSeq).toBe(1); - expect(peek?.newestSeq).toBe(3); + expect(peek?.newestSeq).toBe(4); }); }); @@ -246,6 +266,206 @@ describe("conversation-stream-state", () => { }); }); + describe("getReplayWindow — targeting filter", () => { + const MACOS_CLIENT: ReplaySubscriber = { + type: "client", + clientId: "mac-1", + interfaceId: "macos", + capabilities: ["host_bash", "host_file", "host_cu", "host_browser"], + }; + + const WEB_CLIENT: ReplaySubscriber = { + type: "client", + clientId: "web-1", + interfaceId: "web", + capabilities: [], + }; + + const CHROME_EXT_CLIENT: ReplaySubscriber = { + type: "client", + clientId: "ext-1", + interfaceId: "chrome-extension", + capabilities: ["host_browser"], + }; + + const PROCESS_SUB: ReplaySubscriber = { type: "process" }; + + test("untargeted events are replayed to all subscriber types", () => { + /** Events without targeting metadata go to everyone. */ + + // GIVEN two untargeted events in the ring + stampAndBuffer(mkEvent()); + stampAndBuffer(mkEvent()); + + // WHEN each subscriber type requests replay + const macReplay = getReplayWindow(CONV, 0, MACOS_CLIENT); + const webReplay = getReplayWindow(CONV, 0, WEB_CLIENT); + const procReplay = getReplayWindow(CONV, 0, PROCESS_SUB); + + // THEN all see both events + expect(macReplay!.map((e) => e.seq)).toEqual([1, 2]); + expect(webReplay!.map((e) => e.seq)).toEqual([1, 2]); + expect(procReplay!.map((e) => e.seq)).toEqual([1, 2]); + }); + + test("capability-targeted events only replay to subscribers with that capability", () => { + /** host_bash events should only reach macOS, not web or process. */ + + // GIVEN an untargeted event and a host_bash-targeted event + stampAndBuffer(mkEvent()); + stampAndBuffer(mkEvent(), { + targeting: { targetCapability: "host_bash" }, + }); + + // WHEN each subscriber requests replay + const macReplay = getReplayWindow(CONV, 0, MACOS_CLIENT); + const webReplay = getReplayWindow(CONV, 0, WEB_CLIENT); + const procReplay = getReplayWindow(CONV, 0, PROCESS_SUB); + + // THEN macOS sees both; web and process see only the untargeted event + expect(macReplay!.map((e) => e.seq)).toEqual([1, 2]); + expect(webReplay!.map((e) => e.seq)).toEqual([1]); + expect(procReplay!.map((e) => e.seq)).toEqual([1]); + }); + + test("host_browser capability targets both macOS and chrome-extension", () => { + /** Both interfaces declare host_browser. */ + + // GIVEN a host_browser-targeted event + stampAndBuffer(mkEvent(), { + targeting: { targetCapability: "host_browser" }, + }); + + // WHEN macOS and chrome-extension request replay + const macReplay = getReplayWindow(CONV, 0, MACOS_CLIENT); + const extReplay = getReplayWindow(CONV, 0, CHROME_EXT_CLIENT); + const webReplay = getReplayWindow(CONV, 0, WEB_CLIENT); + + // THEN both capable clients see it; web does not + expect(macReplay!.map((e) => e.seq)).toEqual([1]); + expect(extReplay!.map((e) => e.seq)).toEqual([1]); + expect(webReplay).toEqual([]); + }); + + test("client-targeted events only replay to the named client", () => { + /** targetClientId narrows delivery to a single subscriber. */ + + // GIVEN an event targeted to mac-1 + stampAndBuffer(mkEvent(), { + targeting: { targetClientId: "mac-1" }, + }); + + // WHEN different clients request replay + const macReplay = getReplayWindow(CONV, 0, MACOS_CLIENT); + const webReplay = getReplayWindow(CONV, 0, WEB_CLIENT); + const procReplay = getReplayWindow(CONV, 0, PROCESS_SUB); + + // THEN only the named client receives it + expect(macReplay!.map((e) => e.seq)).toEqual([1]); + expect(webReplay).toEqual([]); + expect(procReplay).toEqual([]); + }); + + test("client + capability targeting requires both to match", () => { + /** + * targetClientId + targetCapability: the client must match by ID + * AND have the required capability. + */ + + // GIVEN an event targeted to web-1 with host_bash capability + stampAndBuffer(mkEvent(), { + targeting: { targetClientId: "web-1", targetCapability: "host_bash" }, + }); + + // WHEN the named client (without the capability) and macOS request replay + const webReplay = getReplayWindow(CONV, 0, WEB_CLIENT); + const macReplay = getReplayWindow(CONV, 0, MACOS_CLIENT); + + // THEN neither receives it — web-1 lacks the capability, mac-1 isn't the target + expect(webReplay).toEqual([]); + expect(macReplay).toEqual([]); + }); + + test("excludeClientId suppresses replay for the originating client", () => { + /** Self-echo suppression on replay. */ + + // GIVEN an event excluding web-1 + stampAndBuffer(mkEvent(), { + targeting: { excludeClientId: "web-1" }, + }); + + // WHEN web-1 and mac-1 request replay + const webReplay = getReplayWindow(CONV, 0, WEB_CLIENT); + const macReplay = getReplayWindow(CONV, 0, MACOS_CLIENT); + const procReplay = getReplayWindow(CONV, 0, PROCESS_SUB); + + // THEN web-1 is suppressed; mac-1 and process subscribers see it + expect(webReplay).toEqual([]); + expect(macReplay!.map((e) => e.seq)).toEqual([1]); + expect(procReplay!.map((e) => e.seq)).toEqual([1]); + }); + + test("interface-targeted events only replay to clients of that interface", () => { + /** targetInterfaceId narrows delivery to a specific interface. */ + + // GIVEN an event targeted to the macos interface + stampAndBuffer(mkEvent(), { + targeting: { targetInterfaceId: "macos" }, + }); + + // WHEN different subscribers request replay + const macReplay = getReplayWindow(CONV, 0, MACOS_CLIENT); + const webReplay = getReplayWindow(CONV, 0, WEB_CLIENT); + const procReplay = getReplayWindow(CONV, 0, PROCESS_SUB); + + // THEN only the macos client receives it + expect(macReplay!.map((e) => e.seq)).toEqual([1]); + expect(webReplay).toEqual([]); + expect(procReplay).toEqual([]); + }); + + test("mixed targeting across events filters per-entry", () => { + /** + * A ring with untargeted, capability-targeted, and excluded events + * filters each entry independently. + */ + + // GIVEN a mix of events + stampAndBuffer(mkEvent()); // seq 1: untargeted + stampAndBuffer(mkEvent(), { + targeting: { targetCapability: "host_bash" }, + }); // seq 2: bash-targeted + stampAndBuffer(mkEvent(), { + targeting: { excludeClientId: "web-1" }, + }); // seq 3: exclude web-1 + stampAndBuffer(mkEvent()); // seq 4: untargeted + + // WHEN each subscriber requests replay from seq 0 + const macReplay = getReplayWindow(CONV, 0, MACOS_CLIENT); + const webReplay = getReplayWindow(CONV, 0, WEB_CLIENT); + + // THEN macOS sees all four; web sees 1 + 4 (not 2=no capability, not 3=excluded) + expect(macReplay!.map((e) => e.seq)).toEqual([1, 2, 3, 4]); + expect(webReplay!.map((e) => e.seq)).toEqual([1, 4]); + }); + + test("no subscriber argument returns all entries unfiltered", () => { + /** Backwards-compatible: omitting subscriber skips filtering. */ + + // GIVEN targeted and untargeted events + stampAndBuffer(mkEvent()); + stampAndBuffer(mkEvent(), { + targeting: { targetCapability: "host_bash" }, + }); + + // WHEN replay is requested without a subscriber + const replay = getReplayWindow(CONV, 0); + + // THEN all events are returned + expect(replay!.map((e) => e.seq)).toEqual([1, 2]); + }); + }); + describe("clearConversationStream", () => { test("drops all state for the conversation", () => { stampAndBuffer(mkEvent()); diff --git a/assistant/src/runtime/assistant-event-hub.ts b/assistant/src/runtime/assistant-event-hub.ts index 3918d76b7d6..b49ffb0076f 100644 --- a/assistant/src/runtime/assistant-event-hub.ts +++ b/assistant/src/runtime/assistant-event-hub.ts @@ -605,11 +605,7 @@ export function broadcastMessage( excludeClientId, } : undefined; - // Stamp per-conversation seq and (when replayable) push onto the ring - // buffer. Mutates `event.seq` in place. Targeted/exclusion publishes - // are stamped but not buffered -- replay by `conversationId` alone - // would leak them to subscribers outside their intended delivery set. - stampAndBuffer(event, { replayable: publishOptions == null }); + stampAndBuffer(event, { targeting: publishOptions }); _hubChain = _hubChain .then(() => assistantEventHub.publish(event, publishOptions)) .then(() => { diff --git a/assistant/src/runtime/conversation-stream-state.ts b/assistant/src/runtime/conversation-stream-state.ts index b2579ed0d79..434495ac6ff 100644 --- a/assistant/src/runtime/conversation-stream-state.ts +++ b/assistant/src/runtime/conversation-stream-state.ts @@ -28,11 +28,39 @@ const RING_AGE_LIMIT_MS = 30_000; // ── Types ──────────────────────────────────────────────────────────── +/** + * Targeting / exclusion modifiers attached to an event at publish time. + * Stored on ring entries so replay can re-apply the same delivery + * filter that the live `publish()` path used. + * + * Fields use plain `string` rather than branded channel types so + * this module stays independent of the `channels/` package. + */ +export interface EventTargeting { + targetCapability?: string; + targetClientId?: string; + targetInterfaceId?: string; + excludeClientId?: string; +} + +/** + * Identity of the subscriber requesting a replay window. Replay + * filtering mirrors the live `publish()` logic in `AssistantEventHub`: + * targeted entries are only delivered when the subscriber matches. + */ +export interface ReplaySubscriber { + type: "client" | "process"; + clientId?: string; + interfaceId?: string; + capabilities?: readonly string[]; +} + interface RingEntry { seq: number; event: AssistantEvent; emittedAt: number; sizeBytes: number; + targeting?: EventTargeting; } interface ConversationStreamState { @@ -57,25 +85,21 @@ function getOrCreate(conversationId: string): ConversationStreamState { // ── Public API ─────────────────────────────────────────────────────── /** - * Assign a monotonic `seq` to a conversation-scoped event and (when - * replayable) push it onto the ring buffer. No-op when - * `event.conversationId` is absent (unscoped broadcasts are never - * replayable). + * Assign a monotonic `seq` to a conversation-scoped event and push it + * onto the ring buffer. No-op when `event.conversationId` is absent + * (unscoped broadcasts are never replayable). * - * `options.replayable` should be `false` whenever the event was - * published with any targeting / exclusion modifier - * (`targetCapability`, `targetClientId`, `targetInterfaceId`, - * `excludeClientId`). Such events have a narrower delivery set than the - * conversation subscriber list, so storing them by `conversationId` - * alone would leak them to the wrong subscribers on replay. The seq is - * still stamped so the wire-side ordering stays contiguous; only the - * ring push is skipped. Defaults to `true`. + * When `options.targeting` is provided, the metadata is stored on the + * ring entry so that {@link getReplayWindow} can re-apply the same + * delivery filter at replay time. This keeps targeted events in the + * ring (preventing false-positive seq gaps on reconnect) without + * leaking them to subscribers outside their intended delivery set. * * Mutates `event.seq` in place. */ export function stampAndBuffer( event: AssistantEvent, - options?: { replayable?: boolean }, + options?: { targeting?: EventTargeting }, ): void { const cid = event.conversationId; if (cid == null) return; @@ -83,13 +107,20 @@ export function stampAndBuffer( const state = getOrCreate(cid); event.seq = state.nextSeq++; - if (options?.replayable === false) return; - // Approximate size by serialized JSON length. This is the same // bytes-on-wire we'll send, so it tracks ring memory pressure // closely without a separate measurement pass. const sizeBytes = JSON.stringify(event).length; - state.ring.push({ seq: event.seq, event, emittedAt: Date.now(), sizeBytes }); + const entry: RingEntry = { + seq: event.seq, + event, + emittedAt: Date.now(), + sizeBytes, + }; + if (options?.targeting) { + entry.targeting = options.targeting; + } + state.ring.push(entry); state.totalSizeBytes += sizeBytes; evict(state); @@ -100,6 +131,13 @@ export function stampAndBuffer( * Returns `null` when the requested cursor is older than the oldest * buffered entry -- callers should fall back to a snapshot resync. * + * When `subscriber` is provided, entries carrying targeting metadata + * are filtered using the same rules as the live `publish()` path in + * `AssistantEventHub`. This prevents targeted events from leaking to + * subscribers outside their intended delivery set on reconnect. + * When `subscriber` is omitted, all entries are returned unfiltered + * (backwards-compatible behaviour). + * * Sweeps age-expired entries at read time so an idle conversation * cannot serve stale deltas past the 30-second window (eviction * only runs on `stampAndBuffer`, so without this an idle stream @@ -111,6 +149,7 @@ export function stampAndBuffer( export function getReplayWindow( conversationId: string, lastSeenSeq: number, + subscriber?: ReplaySubscriber, ): readonly AssistantEvent[] | null { const state = streams.get(conversationId); if (!state) return []; @@ -126,7 +165,11 @@ export function getReplayWindow( if (lastSeenSeq < oldest - 1) return null; return state.ring - .filter((entry) => entry.seq > lastSeenSeq) + .filter( + (entry) => + entry.seq > lastSeenSeq && + (subscriber == null || matchesSubscriber(entry, subscriber)), + ) .map((entry) => entry.event); } @@ -169,6 +212,70 @@ export function _peekStreamForTesting(conversationId: string): { // ── Internals ──────────────────────────────────────────────────────── +/** + * Mirrors the delivery logic in `AssistantEventHub.publish()`. Returns + * `true` when `subscriber` would have received the entry during live + * fanout. + */ +function matchesSubscriber( + entry: RingEntry, + subscriber: ReplaySubscriber, +): boolean { + const t = entry.targeting; + if (!t) return true; + + // Self-echo suppression: the originating client never receives the + // event back. + if ( + t.excludeClientId != null && + subscriber.type === "client" && + subscriber.clientId === t.excludeClientId + ) { + return false; + } + + // Interface targeting: only clients of the requested interface. + if (t.targetInterfaceId != null) { + if ( + subscriber.type !== "client" || + subscriber.interfaceId !== t.targetInterfaceId + ) { + return false; + } + } + + if (t.targetClientId != null) { + // Client targeting: bypass conversation filter, deliver only to the + // named client. + if ( + subscriber.type !== "client" || + subscriber.clientId !== t.targetClientId + ) { + return false; + } + if ( + t.targetCapability != null && + !subscriber.capabilities?.includes(t.targetCapability) + ) { + return false; + } + return true; + } + + // Capability targeting (without client targeting): only subscribers + // that declare the required capability. + if (t.targetCapability != null) { + if ( + subscriber.type !== "client" || + !subscriber.capabilities?.includes(t.targetCapability) + ) { + return false; + } + } + + return true; +} + function evict(state: ConversationStreamState): void { const now = Date.now(); while (state.ring.length > 0) { diff --git a/assistant/src/runtime/routes/events-routes.ts b/assistant/src/runtime/routes/events-routes.ts index 878bc4d72d4..2930a787a21 100644 --- a/assistant/src/runtime/routes/events-routes.ts +++ b/assistant/src/runtime/routes/events-routes.ts @@ -40,6 +40,7 @@ import { assistantEventHub, } from "../assistant-event-hub.js"; import { ACTOR_PRINCIPALS, GATEWAY_PRINCIPALS } from "../auth/route-policy.js"; +import type { ReplaySubscriber } from "../conversation-stream-state.js"; import { getReplayWindow } from "../conversation-stream-state.js"; import { resolveActorPrincipalIdForLocalGuardian } from "../local-actor-identity.js"; import { @@ -471,7 +472,22 @@ export function handleSubscribeAssistantEvents( // The client detects the gap from the seq jump on its first // live event and refetches via the existing messages API. if (lastSeenSeq != null && filter.conversationId) { - const window = getReplayWindow(filter.conversationId, lastSeenSeq); + const replaySubscriber: ReplaySubscriber = + clientId && interfaceId + ? { + type: "client", + clientId, + interfaceId, + capabilities: ALL_CAPABILITIES.filter((cap) => + supportsHostProxy(interfaceId, cap), + ), + } + : { type: "process" }; + const window = getReplayWindow( + filter.conversationId, + lastSeenSeq, + replaySubscriber, + ); if (window !== null) { for (const replayed of window) { controller.enqueue(encoder.encode(formatSseFrame(replayed)));