diff --git a/tools/bus/bus.test.ts b/tools/bus/bus.test.ts index a83c7f37a..a888708a0 100644 --- a/tools/bus/bus.test.ts +++ b/tools/bus/bus.test.ts @@ -231,7 +231,7 @@ describe("bus — watch", () => { const listResult = run("list", "--to", "otto", "--json"); const msgs = JSON.parse(listResult.stdout) as Array<{ timestamp: string }>; expect(msgs.length).toBeGreaterThan(0); - const existingTs = msgs[0].timestamp; + const existingTs = msgs[0]!.timestamp; const r = spawnSync("bun", [SCRIPT, "watch", "--to", "otto", "--timeout", "0", "--json"], { encoding: "utf-8", diff --git a/tools/bus/bus.ts b/tools/bus/bus.ts index 5fe22bf80..d2c10482a 100644 --- a/tools/bus/bus.ts +++ b/tools/bus/bus.ts @@ -283,14 +283,18 @@ function main(): void { // hazard where advancing to the newest timestamp permanently drops earlier writes. const cursorTimestamp = process.env.ZETA_WATCH_INITIAL_CURSOR ?? new Date().toISOString(); const delivered = new Set(); + const listOpts = { + ...(topicFilter !== undefined && { topic: topicFilter }), + ...(toFilter !== undefined && { to: toFilter }), + }; // Seed: exclude all messages already on disk at or before watch-start. - for (const m of list({ topic: topicFilter, to: toFilter })) { + for (const m of list(listOpts)) { if (m.timestamp <= cursorTimestamp) delivered.add(m.id); } const deadline = timeoutSec >= 0 ? Date.now() + timeoutSec * 1_000 : Infinity; const poll = () => { - const msgs = list({ topic: topicFilter, to: toFilter }); + const msgs = list(listOpts); const fresh = msgs.filter((m) => !delivered.has(m.id)); for (const m of fresh) { if (asJson) { diff --git a/tools/bus/claim.ts b/tools/bus/claim.ts index 4e81d72b2..28186a4be 100644 --- a/tools/bus/claim.ts +++ b/tools/bus/claim.ts @@ -137,7 +137,7 @@ export function activeClaims(itemId: string): ClaimRecord[] { id: m.id, from: m.from, itemId: p.itemId, - branch: p.branch, + ...(p.branch !== undefined && { branch: p.branch }), timestamp: m.timestamp, expiresAt: m.expiresAt, });