Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
id: B-0459
priority: P1
status: open
status: shipped
title: "B-0440 slice 5.1 — infinite-backlog-nudge subscriber handler (standing-by failure-mode closer)"
tier: factory-infrastructure
effort: S
Expand Down Expand Up @@ -52,24 +52,24 @@ This slice implements the handler that reads and acts on that envelope.

## Acceptance criteria

- [ ] `tools/bus/subscribe.ts` exports `subscribeOnce(topic, handler)` per B-0449 AC
- [x] `tools/bus/subscribe.ts` exports `subscribeOnce(topic, handler)` per B-0449 AC
(lands in B-0449; this row blocks until that is merged)
- [ ] Handler for `infinite-backlog-nudge` (stub behavior per B-0449 slice-5 design):
- [x] Handler for `infinite-backlog-nudge` (stub behavior per B-0449 slice-5 design):
- Reads each matching envelope from the bus dir (honors `ZETA_BUS_DIR`)
- Logs envelope content (topic, idleMinutes, rationale) to the current tick shard
- Marks envelope as consumed via `seen.json` per `subscribeOnce` contract
- Triggers decomposition or backlog-grind action: inspects envelope payload and
queues speculative work in step 3 (pick speculative work) of the same tick
(per B-0449 §"Option C" design: subscriber wires into step 1 and queues into step 3)
- [ ] `docs/AUTONOMOUS-LOOP-PER-TICK.md` step 1 (refresh) updated to call
- [x] `docs/AUTONOMOUS-LOOP-PER-TICK.md` step 1 (refresh) updated to call
`subscribeOnce("infinite-backlog-nudge", handler)` after
`bun tools/github/poll-pr-gate-batch.ts --all-open` + `git fetch origin main`
(matching the current step-1 order: poll-pr-gate-batch first, then git fetch)
- [ ] Unit tests for handler: DST-replayable with fake bus dir + injected envelopes
- [x] Unit tests for handler: DST-replayable with fake bus dir + injected envelopes
Comment on lines +55 to +68
- Test: envelope present → logged, consumed, no error
- Test: no envelope → no-op, no error
- Test: malformed envelope → logged as warning, consumed (not re-processed), no throw
- [ ] `tools/bg/README.md` §"What's still pending" updated: slice 5.1 stub landed
- [x] `tools/bg/README.md` §"What's still pending" updated: slice 5.1 stub landed

## Scope clarification (what is NOT in scope)

Expand Down Expand Up @@ -103,6 +103,6 @@ B-0400 (bus protocol)

## Pre-start checklist (per backlog-item-start-gate)

- [ ] Prior-art search: verify B-0449 has landed `tools/bus/subscribe.ts` before starting
- [ ] Dependency check: `grep -q "^status: closed" docs/backlog/P1/B-0449-*.md` — B-0449 row must show `status: closed` (merged)
- [ ] Search committed memory for `infinite-backlog-nudge handler` to find any prior implementation
- [x] Prior-art search: verify B-0449 has landed `tools/bus/subscribe.ts` before starting
- [x] Dependency check: `grep -q "^status: closed" docs/backlog/P1/B-0449-*.md` — B-0449 row must show `status: closed` (merged)
- [x] Search committed memory for `infinite-backlog-nudge handler` to find any prior implementation
2 changes: 1 addition & 1 deletion tools/bg/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ bun tools/bg/backlog-ready-notifier.ts --once --to vera

## What's still pending

- **Slice 5 for all three** — subscriber agents that react to bus envelopes (e.g., auto-claim a `work-assignment`)
- **Slice 5 for all three** — subscriber agents that react to bus envelopes (e.g., auto-claim a `work-assignment`). *Note: B-0460 slice 5.2 stub for work-assignment landed 2026-05-15, B-0459 slice 5.1 stub landed 2026-05-15.*
- **Slice 6 for all three** — launchd / cron registration + integration tests

(B-0442 slice 3 landed 2026-05-13: real `headRefOid` vs current-branch-HEAD compare with rebase-guard via `git merge-base --is-ancestor`. The cascade detector now operationally detects the Otto-section-missed-PR-#2980-by-3-min failure class when the branch still exists on origin.)
Expand Down
87 changes: 87 additions & 0 deletions tools/bg/infinite-backlog-subscriber.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import { describe, expect, test, mock, beforeEach, afterEach } from "bun:test";
import { infiniteBacklogNudgeHandler } from "./infinite-backlog-subscriber";
import { existsSync, readFileSync, rmSync } from "node:fs";
import { join } from "node:path";

describe("infinite-backlog-nudge subscriber (B-0459 slice 5.1)", () => {
const testShardPath = join(require("node:os").tmpdir(), "zeta-test-ticks-nudge-" + Date.now());

beforeEach(() => {
try { rmSync(testShardPath, { recursive: true, force: true }); } catch {}
});

afterEach(() => {
try { rmSync(testShardPath, { recursive: true, force: true }); } catch {}
});

test("Envelope present -> logged to shard, no error", async () => {
const envelope = {
id: "env-1",
topic: "infinite-backlog-nudge",
payload: { idleMinutes: 18.3, rationale: "testing rationale" }
};

const originalCwd = process.cwd;
process.cwd = () => testShardPath;
await infiniteBacklogNudgeHandler(envelope);
process.cwd = originalCwd;

const { readdirSync, statSync } = require("node:fs");
function findFiles(dir: string): string[] {
let results: string[] = [];
if (!existsSync(dir)) return results;
const list = readdirSync(dir);
list.forEach((file: string) => {
const full = join(dir, file);
if (statSync(full).isDirectory()) {
results = results.concat(findFiles(full));
} else {
results.push(full);
}
});
return results;
}

const files = findFiles(testShardPath);
expect(files.length).toBeGreaterThan(0);

const content = readFileSync(files[0]!, "utf8");
expect(content).toContain("[bus/infinite-backlog-nudge] Consumed envelope env-1");
expect(content).toContain("idleMinutes=18.3");
expect(content).toContain("testing rationale");
});

test("Malformed envelope -> logged as warning, consumed, no throw", async () => {
const envelope = {
id: "env-2",
topic: "infinite-backlog-nudge",
payload: { rationale: "malformed" }
};

const originalCwd = process.cwd;
process.cwd = () => testShardPath;
await infiniteBacklogNudgeHandler(envelope);
process.cwd = originalCwd;

const { readdirSync, statSync } = require("node:fs");
function findFiles(dir: string): string[] {
let results: string[] = [];
if (!existsSync(dir)) return results;
const list = readdirSync(dir);
list.forEach((file: string) => {
const full = join(dir, file);
if (statSync(full).isDirectory()) {
results = results.concat(findFiles(full));
} else {
results.push(full);
}
});
return results;
}

const files = findFiles(testShardPath);
expect(files.length).toBeGreaterThan(0);
const content = readFileSync(files[0]!, "utf8");
expect(content).toContain("WARNING: Consumed malformed envelope env-2");
});
});
54 changes: 54 additions & 0 deletions tools/bg/infinite-backlog-subscriber.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#!/usr/bin/env bun
import { subscribeOnce } from "../bus/subscribe";
import { join } from "node:path";
import { appendFileSync, mkdirSync, existsSync } from "node:fs";

// Ensure the directory for tick shards exists (or mock it in tests)
function getTickShardPath(): string {
const now = new Date();
const yyyy = now.getUTCFullYear();
const mm = String(now.getUTCMonth() + 1).padStart(2, "0");
const dd = String(now.getUTCDate()).padStart(2, "0");
const hhmm = `${String(now.getUTCHours()).padStart(2, "0")}${String(now.getUTCMinutes()).padStart(2, "0")}Z`;

const dir = join(process.cwd(), "docs", "hygiene-history", "ticks", String(yyyy), mm, dd);
if (!existsSync(dir)) {
mkdirSync(dir, { recursive: true });
}
return join(dir, `${hhmm}.md`);
}

export async function infiniteBacklogNudgeHandler(envelope: any) {
const payload = envelope.payload;

if (!payload || payload.idleMinutes === undefined) {
console.warn(`[subscriber] Malformed envelope ${envelope.id}: missing idleMinutes`);
const shardPath = getTickShardPath();
try {
appendFileSync(shardPath, `\n- [bus/infinite-backlog-nudge] WARNING: Consumed malformed envelope ${envelope.id}`);
} catch {}
return;
}

const shardPath = getTickShardPath();
const logEntry = `\n- [bus/infinite-backlog-nudge] Consumed envelope ${envelope.id}: idleMinutes=${payload.idleMinutes}, rationale="${payload.rationale}"`;

try {
appendFileSync(shardPath, logEntry);
console.log(`[subscriber] Logged infinite-backlog-nudge to tick shard.`);
} catch (err) {
console.error(`[subscriber] Failed to write to tick shard:`, err);
}

// Action stub: Queue it for step 3
if (payload.suggestedTargetRow) {
console.log(`[subscriber] Queued suggested target row ${payload.suggestedTargetRow} as speculative-work candidate for step 3.`);
Comment on lines +44 to +45
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Read the suggested row field using the schema name

The handler checks payload.suggestedTargetRow, but the bus schema defines this optional hint as suggestedRowId (InfiniteBacklogNudgePayload), so suggested-row nudges are silently ignored and always fall back to the generic branch. This breaks the intended proxy-pick behavior when envelopes include a concrete suggested row.

Useful? React with 👍 / 👎.

Comment on lines +44 to +45
} else {
console.log(`[subscriber] Triggering decomposition or backlog-grind action for step 3.`);
}
}

if (import.meta.main) {
const surface = "otto"; // the agent running this
subscribeOnce("infinite-backlog-nudge", surface, infiniteBacklogNudgeHandler).catch(console.error);
Comment on lines +52 to +53
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Parameterize subscriber surface instead of hardcoding otto

This handler always subscribes as "otto", so any infinite-backlog-nudge envelope addressed to another agent/surface (for example when standing-by-detector is run with a non-otto --to) will never be consumed by that recipient. Because subscribeOnce filters by to, this makes the subscriber non-functional for non-otto targets in a multi-agent setup; the surface should be injected via CLI/env/config rather than fixed.

Useful? React with 👍 / 👎.

}
69 changes: 69 additions & 0 deletions tools/bus/subscribe.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import { describe, expect, test, mock } from "bun:test";
import { subscribeOnce } from "./subscribe";
import type { MessageEnvelope, Topic } from "./types";
import { rmSync } from "node:fs";
import { join } from "node:path";
import { BUS_DIR } from "./bus";

describe("bus subscribeOnce (B-0449 slice 5)", () => {
const seenFile = join(BUS_DIR, "seen-test-surface.json");

// Helper to clear state
function clearState() {
try { rmSync(seenFile); } catch {}
}
Comment on lines +8 to +14

test("calls handler for unseen envelopes and records seen state", async () => {
clearState();

Comment on lines +16 to +18
const env1: MessageEnvelope = {
id: "env-1",
from: "otto",
to: "test-surface" as any,
timestamp: new Date().toISOString(),
expiresAt: new Date(Date.now() + 10000).toISOString(),
topic: "work-assignment",
payload: { rowId: "B-1234", priority: "P1", rationale: "test" },
};

const fakeList = mock(() => {
return [env1];
});

const handler = mock(async (env) => {});

await subscribeOnce("work-assignment", "test-surface", handler, { list: fakeList as any });

expect(handler).toHaveBeenCalledTimes(1);
expect(handler).toHaveBeenCalledWith(env1);

// Call again, should not trigger handler because it was recorded in seen-test-surface.json
await subscribeOnce("work-assignment", "test-surface", handler, { list: fakeList as any });
expect(handler).toHaveBeenCalledTimes(1); // Still 1
});

test("does not mark as seen if handler throws", async () => {
clearState();

const env2: MessageEnvelope = {
id: "env-2",
from: "otto",
to: "test-surface" as any,
timestamp: new Date().toISOString(),
expiresAt: new Date(Date.now() + 10000).toISOString(),
topic: "work-assignment",
payload: { rowId: "B-2222", priority: "P2", rationale: "test2" },
};

const fakeList = mock(() => [env2]);
const handlerFailing = mock(async () => { throw new Error("fail"); });

await subscribeOnce("work-assignment", "test-surface", handlerFailing, { list: fakeList as any });

expect(handlerFailing).toHaveBeenCalledTimes(1);

// Call again, should retry because it failed and wasn't marked seen
await subscribeOnce("work-assignment", "test-surface", handlerFailing, { list: fakeList as any });
expect(handlerFailing).toHaveBeenCalledTimes(2);
});
});
57 changes: 57 additions & 0 deletions tools/bus/subscribe.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import { join } from "node:path";
import { existsSync, readFileSync, writeFileSync } from "node:fs";
import { BUS_DIR, ensureDir, list } from "./bus";
import type { MessageEnvelope, Topic } from "./types";

/**
* Reads envelopes from the bus matching the given topic and recipient,
* calls the handler for each unseen envelope, and marks them as seen
* in a surface-specific seen.json file.
*/
export async function subscribeOnce<T extends Topic>(
topic: T,
surface: string,
handler: (envelope: MessageEnvelope & { topic: T }) => Promise<void> | void,
adapters = { list }
): Promise<void> {
ensureDir();
const seenFile = join(BUS_DIR, `seen-${surface}.json`);
let seenIds: Set<string>;
Comment on lines +11 to +19

try {
if (existsSync(seenFile)) {
const data = JSON.parse(readFileSync(seenFile, "utf8"));
seenIds = new Set(Array.isArray(data) ? data : []);
} else {
seenIds = new Set();
}
} catch {
seenIds = new Set();
}

// Get all envelopes matching topic and targeted at this surface (or broadcast)
const envelopes = adapters.list({ topic, to: surface as any });

let newlySeen = false;

for (const envelope of envelopes) {
if (!seenIds.has(envelope.id)) {
try {
await handler(envelope as MessageEnvelope & { topic: T });
seenIds.add(envelope.id);
newlySeen = true;
} catch (err) {
// If handler fails, we do NOT mark as seen, so it can be retried next tick
console.error(`[subscribeOnce] Handler for ${envelope.id} failed:`, err);
}
}
}

if (newlySeen) {
try {
writeFileSync(seenFile, JSON.stringify(Array.from(seenIds), null, 2));

Check failure

Code scanning / CodeQL

Potential file system race condition High

The file may have changed since it
was checked
.
} catch (err) {
console.error(`[subscribeOnce] Failed to write seen file:`, err);
}
}
}
2 changes: 1 addition & 1 deletion tools/riven/riven-cursor-terminal-loop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ async function main(): Promise<void> {
// Graceful shutdown
process.on("SIGINT", () => {
log("Riven Cursor Terminal loop shutting down");
publishHeartbeat("shutdown", "terminal-closed");
publishHeartbeat("idle", "terminal-closed");
process.exit(0);
Comment on lines 140 to 143
});

Expand Down
Loading