Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
246 changes: 233 additions & 13 deletions assistant/src/__tests__/conversation-stream-state.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import { beforeEach, describe, expect, test } from "bun:test";

import type { AssistantEvent } from "../runtime/assistant-event.js";
import type {
EventTargeting,
ReplaySubscriber,
} from "../runtime/conversation-stream-state.js";
import {
_peekStreamForTesting,
_resetConversationStreamsForTesting,
Expand Down Expand Up @@ -66,30 +70,46 @@ describe("conversation-stream-state", () => {
expect(peek?.newestSeq).toBe(2);
});

test("replayable: false stamps seq but skips ring push", () => {
test("targeted events are buffered with targeting metadata", () => {
/** Targeted events now stay in the ring so replay can filter them. */

// GIVEN a targeting modifier
const targeting: EventTargeting = {
targetCapability: "host_bash",
};

// WHEN a targeted event is stamped
const targeted = mkEvent();
stampAndBuffer(targeted, { replayable: false });
expect(targeted.seq).toBe(1); // still stamped for wire-side ordering
stampAndBuffer(targeted, { targeting });

// THEN it receives a seq and lands in the ring
expect(targeted.seq).toBe(1);
const peek = _peekStreamForTesting(CONV);
// State exists (we created it to assign seq) but ring is empty.
expect(peek?.nextSeq).toBe(2);
expect(peek?.ringLength).toBe(0);
expect(peek?.ringLength).toBe(1);
expect(peek?.oldestSeq).toBe(1);
});

test("seq stays monotonic across mixed replayable/non-replayable events", () => {
test("seq stays monotonic across targeted and untargeted events", () => {
/** All events share a contiguous seq counter regardless of targeting. */

// GIVEN a mix of untargeted and targeted events
const a = mkEvent();
const b = mkEvent();
const c = mkEvent();
const d = mkEvent();
stampAndBuffer(a); // replayable
stampAndBuffer(b, { replayable: false }); // targeted -- skipped
stampAndBuffer(c); // replayable
stampAndBuffer(d, { replayable: false }); // targeted -- skipped

// WHEN they are stamped
stampAndBuffer(a);
stampAndBuffer(b, { targeting: { targetCapability: "host_bash" } });
stampAndBuffer(c);
stampAndBuffer(d, { targeting: { excludeClientId: "client-1" } });

// THEN seqs are monotonic and all four are buffered
expect([a.seq, b.seq, c.seq, d.seq]).toEqual([1, 2, 3, 4]);
const peek = _peekStreamForTesting(CONV);
expect(peek?.ringLength).toBe(2); // only a + c in buffer
expect(peek?.ringLength).toBe(4);
expect(peek?.oldestSeq).toBe(1);
expect(peek?.newestSeq).toBe(3);
expect(peek?.newestSeq).toBe(4);
});
});

Expand Down Expand Up @@ -246,6 +266,206 @@ describe("conversation-stream-state", () => {
});
});

describe("getReplayWindow — targeting filter", () => {
const MACOS_CLIENT: ReplaySubscriber = {
type: "client",
clientId: "mac-1",
interfaceId: "macos",
capabilities: ["host_bash", "host_file", "host_cu", "host_browser"],
};

const WEB_CLIENT: ReplaySubscriber = {
type: "client",
clientId: "web-1",
interfaceId: "web",
capabilities: [],
};

const CHROME_EXT_CLIENT: ReplaySubscriber = {
type: "client",
clientId: "ext-1",
interfaceId: "chrome-extension",
capabilities: ["host_browser"],
};

const PROCESS_SUB: ReplaySubscriber = { type: "process" };

test("untargeted events are replayed to all subscriber types", () => {
/** Events without targeting metadata go to everyone. */

// GIVEN two untargeted events in the ring
stampAndBuffer(mkEvent());
stampAndBuffer(mkEvent());

// WHEN each subscriber type requests replay
const macReplay = getReplayWindow(CONV, 0, MACOS_CLIENT);
const webReplay = getReplayWindow(CONV, 0, WEB_CLIENT);
const procReplay = getReplayWindow(CONV, 0, PROCESS_SUB);

// THEN all see both events
expect(macReplay!.map((e) => e.seq)).toEqual([1, 2]);
expect(webReplay!.map((e) => e.seq)).toEqual([1, 2]);
expect(procReplay!.map((e) => e.seq)).toEqual([1, 2]);
});

test("capability-targeted events only replay to subscribers with that capability", () => {
/** host_bash events should only reach macOS, not web or process. */

// GIVEN an untargeted event and a host_bash-targeted event
stampAndBuffer(mkEvent());
stampAndBuffer(mkEvent(), {
targeting: { targetCapability: "host_bash" },
});

// WHEN each subscriber requests replay
const macReplay = getReplayWindow(CONV, 0, MACOS_CLIENT);
const webReplay = getReplayWindow(CONV, 0, WEB_CLIENT);
const procReplay = getReplayWindow(CONV, 0, PROCESS_SUB);

// THEN macOS sees both; web and process see only the untargeted event
expect(macReplay!.map((e) => e.seq)).toEqual([1, 2]);
expect(webReplay!.map((e) => e.seq)).toEqual([1]);
expect(procReplay!.map((e) => e.seq)).toEqual([1]);
});

test("host_browser capability targets both macOS and chrome-extension", () => {
/** Both interfaces declare host_browser. */

// GIVEN a host_browser-targeted event
stampAndBuffer(mkEvent(), {
targeting: { targetCapability: "host_browser" },
});

// WHEN macOS and chrome-extension request replay
const macReplay = getReplayWindow(CONV, 0, MACOS_CLIENT);
const extReplay = getReplayWindow(CONV, 0, CHROME_EXT_CLIENT);
const webReplay = getReplayWindow(CONV, 0, WEB_CLIENT);

// THEN both capable clients see it; web does not
expect(macReplay!.map((e) => e.seq)).toEqual([1]);
expect(extReplay!.map((e) => e.seq)).toEqual([1]);
expect(webReplay).toEqual([]);
});

test("client-targeted events only replay to the named client", () => {
/** targetClientId narrows delivery to a single subscriber. */

// GIVEN an event targeted to mac-1
stampAndBuffer(mkEvent(), {
targeting: { targetClientId: "mac-1" },
});

// WHEN different clients request replay
const macReplay = getReplayWindow(CONV, 0, MACOS_CLIENT);
const webReplay = getReplayWindow(CONV, 0, WEB_CLIENT);
const procReplay = getReplayWindow(CONV, 0, PROCESS_SUB);

// THEN only the named client receives it
expect(macReplay!.map((e) => e.seq)).toEqual([1]);
expect(webReplay).toEqual([]);
expect(procReplay).toEqual([]);
});

test("client + capability targeting requires both to match", () => {
/**
* targetClientId + targetCapability: the client must match by ID
* AND have the required capability.
*/

// GIVEN an event targeted to web-1 with host_bash capability
stampAndBuffer(mkEvent(), {
targeting: { targetClientId: "web-1", targetCapability: "host_bash" },
});

// WHEN the named client (without the capability) and macOS request replay
const webReplay = getReplayWindow(CONV, 0, WEB_CLIENT);
const macReplay = getReplayWindow(CONV, 0, MACOS_CLIENT);

// THEN neither receives it — web-1 lacks the capability, mac-1 isn't the target
expect(webReplay).toEqual([]);
expect(macReplay).toEqual([]);
});

test("excludeClientId suppresses replay for the originating client", () => {
/** Self-echo suppression on replay. */

// GIVEN an event excluding web-1
stampAndBuffer(mkEvent(), {
targeting: { excludeClientId: "web-1" },
});

// WHEN web-1 and mac-1 request replay
const webReplay = getReplayWindow(CONV, 0, WEB_CLIENT);
const macReplay = getReplayWindow(CONV, 0, MACOS_CLIENT);
const procReplay = getReplayWindow(CONV, 0, PROCESS_SUB);

// THEN web-1 is suppressed; mac-1 and process subscribers see it
expect(webReplay).toEqual([]);
expect(macReplay!.map((e) => e.seq)).toEqual([1]);
expect(procReplay!.map((e) => e.seq)).toEqual([1]);
});

test("interface-targeted events only replay to clients of that interface", () => {
/** targetInterfaceId narrows delivery to a specific interface. */

// GIVEN an event targeted to the macos interface
stampAndBuffer(mkEvent(), {
targeting: { targetInterfaceId: "macos" },
});

// WHEN different subscribers request replay
const macReplay = getReplayWindow(CONV, 0, MACOS_CLIENT);
const webReplay = getReplayWindow(CONV, 0, WEB_CLIENT);
const procReplay = getReplayWindow(CONV, 0, PROCESS_SUB);

// THEN only the macos client receives it
expect(macReplay!.map((e) => e.seq)).toEqual([1]);
expect(webReplay).toEqual([]);
expect(procReplay).toEqual([]);
});

test("mixed targeting across events filters per-entry", () => {
/**
* A ring with untargeted, capability-targeted, and excluded events
* filters each entry independently.
*/

// GIVEN a mix of events
stampAndBuffer(mkEvent()); // seq 1: untargeted
stampAndBuffer(mkEvent(), {
targeting: { targetCapability: "host_bash" },
}); // seq 2: bash-targeted
stampAndBuffer(mkEvent(), {
targeting: { excludeClientId: "web-1" },
}); // seq 3: exclude web-1
stampAndBuffer(mkEvent()); // seq 4: untargeted

// WHEN each subscriber requests replay from seq 0
const macReplay = getReplayWindow(CONV, 0, MACOS_CLIENT);
const webReplay = getReplayWindow(CONV, 0, WEB_CLIENT);

// THEN macOS sees all four; web sees 1 + 4 (not 2=no capability, not 3=excluded)
expect(macReplay!.map((e) => e.seq)).toEqual([1, 2, 3, 4]);
expect(webReplay!.map((e) => e.seq)).toEqual([1, 4]);
});

test("no subscriber argument returns all entries unfiltered", () => {
/** Backwards-compatible: omitting subscriber skips filtering. */

// GIVEN targeted and untargeted events
stampAndBuffer(mkEvent());
stampAndBuffer(mkEvent(), {
targeting: { targetCapability: "host_bash" },
});

// WHEN replay is requested without a subscriber
const replay = getReplayWindow(CONV, 0);

// THEN all events are returned
expect(replay!.map((e) => e.seq)).toEqual([1, 2]);
});
});

describe("clearConversationStream", () => {
test("drops all state for the conversation", () => {
stampAndBuffer(mkEvent());
Expand Down
6 changes: 1 addition & 5 deletions assistant/src/runtime/assistant-event-hub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -605,11 +605,7 @@ export function broadcastMessage(
excludeClientId,
}
: undefined;
// Stamp per-conversation seq and (when replayable) push onto the ring
// buffer. Mutates `event.seq` in place. Targeted/exclusion publishes
// are stamped but not buffered -- replay by `conversationId` alone
// would leak them to subscribers outside their intended delivery set.
stampAndBuffer(event, { replayable: publishOptions == null });
stampAndBuffer(event, { targeting: publishOptions });
_hubChain = _hubChain
.then(() => assistantEventHub.publish(event, publishOptions))
.then(() => {
Expand Down
Loading
Loading