diff --git a/skills/meet-join/bot/__tests__/audio-playback.test.ts b/skills/meet-join/bot/__tests__/audio-playback.test.ts new file mode 100644 index 00000000000..a7d9b59a4d7 --- /dev/null +++ b/skills/meet-join/bot/__tests__/audio-playback.test.ts @@ -0,0 +1,388 @@ +/** + * Unit tests for the audio-playback pipeline. + * + * We don't spawn a real `pacat` — the module accepts an injected `spawn` + * factory whose `stdin` is a shim that appends writes to a `Uint8Array` + * buffer. That lets us assert byte ordering on completion, mid-stream + * cancellation, and the trailing silence flush without any OS processes. + * + * Coverage: + * - Module: `startAudioPlayback` spawns with the expected argv and is + * idempotent (second call returns the same handle). + * - Module: `stopAudioPlayback` kills pacat and clears the singleton. + * - Module: `flushSilence` writes the correct number of zero bytes. + * - HTTP: POST /play_audio forwards body bytes in order and flushes + * trailing silence on completion. + * - HTTP: POST /play_audio with an abort-triggered stream returns 499 + * and stops writing bytes to the shim mid-stream. + * - HTTP: DELETE /play_audio/:streamId cancels the matching POST. + */ + +import { afterEach, beforeEach, describe, expect, test } from "bun:test"; + +import { createHttpServer, type HttpServerHandle } from "../src/control/http-server.js"; +import { BotState } from "../src/control/state.js"; +import { + DEFAULT_BYTES_PER_MS, + __resetForTests, + flushSilence, + startAudioPlayback, + stopAudioPlayback, + type PacatWritable, + type SpawnedPacat, +} from "../src/media/audio-playback.js"; + +const API_TOKEN = "test-token-playback"; + +/** ------------------------ shim helpers ---------------------------- */ + +interface PacatShim { + proc: SpawnedPacat; + /** All bytes written to pacat's stdin, in order. */ + readonly buffer: Uint8Array; + /** Resolves once `kill()` is called. */ + killed: Promise; + /** Was kill() invoked? */ + isKilled: () => boolean; + /** How many `write` calls have been made so far. */ + writeCount: () => number; +} + +/** + * Build a fake pacat whose stdin appends every write into a single + * `Uint8Array` so tests can assert end-to-end byte ordering. The process + * stays alive until `kill()` is invoked (matching how real pacat behaves + * until we SIGTERM it). + */ +function makePacatShim(): PacatShim { + let buf = new Uint8Array(0); + let writes = 0; + let killed = false; + let resolveExited!: (code: number) => void; + let resolveKilled!: () => void; + const exited = new Promise((resolve) => { + resolveExited = resolve; + }); + const killedP = new Promise((resolve) => { + resolveKilled = resolve; + }); + + const stdin: PacatWritable = { + write(chunk: Uint8Array): number { + writes += 1; + const next = new Uint8Array(buf.length + chunk.length); + next.set(buf, 0); + next.set(chunk, buf.length); + buf = next; + return chunk.length; + }, + async end() { + // No-op; the test controls lifetime via kill(). + }, + }; + + const proc: SpawnedPacat = { + stdin, + exited, + kill() { + if (killed) return; + killed = true; + resolveKilled(); + resolveExited(0); + }, + }; + + const shim: PacatShim = { + proc, + get buffer() { + return buf; + }, + killed: killedP, + isKilled: () => killed, + writeCount: () => writes, + }; + return shim; +} + +/** ---------------------- module-level tests ----------------------- */ + +describe("audio-playback module", () => { + beforeEach(() => { + __resetForTests(); + }); + + afterEach(async () => { + await stopAudioPlayback(); + __resetForTests(); + }); + + test("startAudioPlayback spawns pacat with the expected argv", () => { + let capturedArgv: readonly string[] | null = null; + const shim = makePacatShim(); + const handle = startAudioPlayback({ + spawn: (argv) => { + capturedArgv = argv; + return shim.proc; + }, + }); + expect(capturedArgv).not.toBeNull(); + expect(capturedArgv as readonly string[] | null).toEqual([ + "pacat", + "--playback", + "--device=bot_out", + "--format=s16le", + "--rate=48000", + "--channels=1", + "--raw", + ]); + expect(handle.active).toBe(true); + }); + + test("startAudioPlayback is idempotent — second call returns the same handle", () => { + const shim = makePacatShim(); + let spawns = 0; + const h1 = startAudioPlayback({ + spawn: () => { + spawns += 1; + return shim.proc; + }, + }); + const h2 = startAudioPlayback({ + spawn: () => { + spawns += 1; + return shim.proc; + }, + }); + expect(h1).toBe(h2); + expect(spawns).toBe(1); + }); + + test("stopAudioPlayback kills pacat and clears the singleton", async () => { + const shim = makePacatShim(); + startAudioPlayback({ spawn: () => shim.proc }); + await stopAudioPlayback(); + expect(shim.isKilled()).toBe(true); + // After stop, a fresh start should spawn again. + const shim2 = makePacatShim(); + let spawned = false; + startAudioPlayback({ + spawn: () => { + spawned = true; + return shim2.proc; + }, + }); + expect(spawned).toBe(true); + }); + + test("flushSilence writes ms * bytesPerMs zero bytes", async () => { + const shim = makePacatShim(); + startAudioPlayback({ spawn: () => shim.proc }); + await flushSilence(10); // 10ms at 48kHz mono s16le = 960 bytes + expect(shim.buffer.length).toBe(10 * DEFAULT_BYTES_PER_MS); + // All bytes must be zero. + for (const b of shim.buffer) { + expect(b).toBe(0); + } + }); + + test("flushSilence on inactive handle is a no-op", async () => { + await flushSilence(10); // no active handle + // No throw = pass. + }); +}); + +/** ---------------------- HTTP endpoint tests ----------------------- */ + +describe("POST /play_audio (streaming)", () => { + let server: HttpServerHandle | null = null; + let shim: PacatShim; + + beforeEach(() => { + __resetForTests(); + BotState.__resetForTests(); + shim = makePacatShim(); + }); + + afterEach(async () => { + if (server !== null) { + await server.stop(); + server = null; + } + await stopAudioPlayback(); + __resetForTests(); + }); + + function build(): HttpServerHandle { + return createHttpServer({ + apiToken: API_TOKEN, + onLeave: () => {}, + onSendChat: () => {}, + onPlayAudio: () => {}, + playbackSpawnOptions: { spawn: () => shim.proc }, + }); + } + + test("forwards PCM bytes in order and flushes trailing silence", async () => { + server = build(); + const { port } = await server.start(0); + + // Build a deterministic PCM payload: four 4-byte chunks. + const chunks = [ + new Uint8Array([1, 2, 3, 4]), + new Uint8Array([5, 6, 7, 8]), + new Uint8Array([9, 10, 11, 12]), + new Uint8Array([13, 14, 15, 16]), + ]; + const totalLen = chunks.reduce((a, c) => a + c.length, 0); + const flat = new Uint8Array(totalLen); + let o = 0; + for (const c of chunks) { + flat.set(c, o); + o += c.length; + } + + const res = await fetch(`http://127.0.0.1:${port}/play_audio?stream_id=s-1`, { + method: "POST", + headers: { + authorization: `Bearer ${API_TOKEN}`, + "content-type": "application/octet-stream", + }, + body: flat, + }); + expect(res.status).toBe(200); + const body = (await res.json()) as { streamId: string; bytes: number }; + expect(body.streamId).toBe("s-1"); + expect(body.bytes).toBe(totalLen); + + // The shim should have received the original bytes in order, followed + // by 50ms of trailing silence (50 * 96 = 4800 zero bytes). + const expectedSilenceBytes = 50 * DEFAULT_BYTES_PER_MS; + expect(shim.buffer.length).toBe(totalLen + expectedSilenceBytes); + for (let i = 0; i < totalLen; i++) { + expect(shim.buffer[i]).toBe(flat[i]!); + } + for (let i = totalLen; i < shim.buffer.length; i++) { + expect(shim.buffer[i]).toBe(0); + } + }); + + test("DELETE /play_audio/:streamId cancels in-flight stream with 499", async () => { + // For this test we want a payload large enough that we can DELETE it + // before it finishes. We feed the body through a ReadableStream with + // a gate so the last chunk is only released after the DELETE runs. + server = build(); + const { port } = await server.start(0); + + const firstChunk = new Uint8Array(1024); + for (let i = 0; i < firstChunk.length; i++) firstChunk[i] = (i % 250) + 1; + const secondChunk = new Uint8Array(1024); + for (let i = 0; i < secondChunk.length; i++) + secondChunk[i] = ((i + 17) % 250) + 1; + + let release!: () => void; + const gate = new Promise((resolve) => { + release = resolve; + }); + + const body = new ReadableStream({ + async start(controller) { + controller.enqueue(firstChunk); + // Wait until we've been told to release — the test triggers this + // after issuing DELETE so the abort lands mid-stream. + await gate; + try { + controller.enqueue(secondChunk); + } catch { + // enqueue may throw if the reader was cancelled; that's what we + // want. + } + controller.close(); + }, + }); + + const postPromise = fetch( + `http://127.0.0.1:${port}/play_audio?stream_id=cancel-me`, + { + method: "POST", + headers: { + authorization: `Bearer ${API_TOKEN}`, + "content-type": "application/octet-stream", + }, + // Bun/undici fetch supports passing a ReadableStream as body when + // `duplex: "half"` is set. + body, + // @ts-expect-error — undici/fetch extension, not in lib.dom types + duplex: "half", + }, + ); + + // Give the server a beat to start writing the first chunk. + await new Promise((r) => setTimeout(r, 50)); + + // Cancel via DELETE — this should release the stream and make POST + // return 499. + const delRes = await fetch( + `http://127.0.0.1:${port}/play_audio/cancel-me`, + { + method: "DELETE", + headers: { authorization: `Bearer ${API_TOKEN}` }, + }, + ); + expect(delRes.status).toBe(200); + + // Release the gate so the body's async start can complete — this + // unsticks the ReadableStream's `start` coroutine. The server has + // already aborted the reader by now so the second chunk is a no-op. + release(); + + const res = await postPromise; + expect(res.status).toBe(499); + const payload = (await res.json()) as { + streamId: string; + bytes: number; + cancelled: boolean; + }; + expect(payload.streamId).toBe("cancel-me"); + expect(payload.cancelled).toBe(true); + // We should have written *at most* the first chunk (possibly less if + // the server aborted mid-chunk write, but never the second). + expect(payload.bytes).toBeLessThan(firstChunk.length + secondChunk.length); + + // Shim received at least the trailing silence block even on cancel. + const silenceBytes = 50 * DEFAULT_BYTES_PER_MS; + expect(shim.buffer.length).toBeGreaterThanOrEqual(silenceBytes); + }); + + test("empty body still returns 200 and flushes silence", async () => { + server = build(); + const { port } = await server.start(0); + + const res = await fetch(`http://127.0.0.1:${port}/play_audio`, { + method: "POST", + headers: { + authorization: `Bearer ${API_TOKEN}`, + "content-type": "application/octet-stream", + }, + body: new Uint8Array(0), + }); + expect(res.status).toBe(200); + const body = (await res.json()) as { streamId: string; bytes: number }; + expect(body.bytes).toBe(0); + expect(typeof body.streamId).toBe("string"); + expect(body.streamId.length).toBeGreaterThan(0); + + const silenceBytes = 50 * DEFAULT_BYTES_PER_MS; + expect(shim.buffer.length).toBe(silenceBytes); + }); + + test("DELETE returns 404 when no matching stream is in flight", async () => { + server = build(); + const { port } = await server.start(0); + + const res = await fetch(`http://127.0.0.1:${port}/play_audio/nonexistent`, { + method: "DELETE", + headers: { authorization: `Bearer ${API_TOKEN}` }, + }); + expect(res.status).toBe(404); + }); +}); diff --git a/skills/meet-join/bot/__tests__/http-server.test.ts b/skills/meet-join/bot/__tests__/http-server.test.ts index 97611895b3f..14948a828d4 100644 --- a/skills/meet-join/bot/__tests__/http-server.test.ts +++ b/skills/meet-join/bot/__tests__/http-server.test.ts @@ -329,37 +329,41 @@ describe("http-server", () => { }); // ------------------------------------------------------------------------- - // POST /play_audio (501 stub until Phase 3) + // POST /play_audio + // + // Happy-path / ordering / cancellation coverage lives in + // audio-playback.test.ts. These cases just sanity-check auth and + // content-type validation at the HTTP boundary. // ------------------------------------------------------------------------- describe("POST /play_audio", () => { - test("returns 501 Not Implemented", async () => { + test("requires auth", async () => { const instance = makeServer(); server = instance.server; const base = await startOnRandomPort(server); const res = await fetch(`${base}/play_audio`, { method: "POST", - headers: { - authorization: `Bearer ${API_TOKEN}`, - "content-type": "application/json", - }, - body: JSON.stringify({ type: "play_audio", streamId: "s-1" }), + headers: { "content-type": "application/octet-stream" }, + body: new Uint8Array([0, 0, 0, 0]), }); - expect(res.status).toBe(501); + expect(res.status).toBe(401); }); - test("requires auth", async () => { + test("rejects non-octet-stream content-type with 400", async () => { const instance = makeServer(); server = instance.server; const base = await startOnRandomPort(server); const res = await fetch(`${base}/play_audio`, { method: "POST", - headers: { "content-type": "application/json" }, + headers: { + authorization: `Bearer ${API_TOKEN}`, + "content-type": "application/json", + }, body: JSON.stringify({ type: "play_audio", streamId: "s-1" }), }); - expect(res.status).toBe(401); + expect(res.status).toBe(400); }); }); diff --git a/skills/meet-join/bot/src/control/http-server.ts b/skills/meet-join/bot/src/control/http-server.ts index 2b1e856a17e..f5d9a1e049a 100644 --- a/skills/meet-join/bot/src/control/http-server.ts +++ b/skills/meet-join/bot/src/control/http-server.ts @@ -3,11 +3,12 @@ * * Exposes a small Hono app that the assistant daemon talks to: * - * - `GET /health` — liveness/health probe (also used by Docker HEALTHCHECK). - * - `GET /status` — full lifecycle snapshot. - * - `POST /leave` — ask the bot to leave the meeting. - * - `POST /send_chat` — post a chat message into the Meet chat panel. - * - `POST /play_audio` — play an out-of-band audio stream (Phase 3; stub 501). + * - `GET /health` — liveness/health probe (also used by Docker HEALTHCHECK). + * - `GET /status` — full lifecycle snapshot. + * - `POST /leave` — ask the bot to leave the meeting. + * - `POST /send_chat` — post a chat message into the Meet chat panel. + * - `POST /play_audio` — stream raw PCM into pacat (Phase 3). + * - `DELETE /play_audio/:streamId` — cancel an in-flight playback (barge-in). * * Every mutating route validates its body against the corresponding Zod * schema from `@vellumai/meet-contracts` so command shapes stay in sync with @@ -24,7 +25,13 @@ import { SendChatCommandSchema, } from "@vellumai/meet-contracts"; import { Hono, type Context } from "hono"; +import { randomUUID } from "node:crypto"; +import { + startAudioPlayback, + type AudioPlaybackHandle, + type StartAudioPlaybackOptions, +} from "../media/audio-playback.js"; import { BotState } from "./state.js"; /** @@ -55,10 +62,10 @@ export interface HttpServerCallbacks { */ onSendChat: (text: string) => Promise | void; /** - * Called when `POST /play_audio` is received. Currently a stub — the HTTP - * route returns 501. The real PCM stream is delivered out of band - * (chunked transfer in Phase 3); this callback will eventually be handed - * just the metadata. + * Called when a `POST /play_audio` stream starts. The real PCM payload + * is consumed by the route directly and streamed into pacat; this + * callback exists for lifecycle observation (logging, metrics, joining + * the stream to an in-memory barge-in registry). */ onPlayAudio: (streamId: string) => Promise | void; } @@ -66,6 +73,18 @@ export interface HttpServerCallbacks { export interface CreateHttpServerOptions extends HttpServerCallbacks { /** Bearer token required on every request. */ apiToken: string; + /** + * Override for the audio-playback factory. In production we call + * `startAudioPlayback` from `../media/audio-playback.js`; tests inject a + * handle whose `write` captures bytes into a buffer. + */ + startPlayback?: (opts?: StartAudioPlaybackOptions) => AudioPlaybackHandle; + /** + * Override for pacat spawn. Forwarded into the default `startPlayback` + * when tests want the singleton behavior but still need to stub out the + * child process. + */ + playbackSpawnOptions?: StartAudioPlaybackOptions; } export interface HttpServerHandle { @@ -77,11 +96,38 @@ export interface HttpServerHandle { stop: () => Promise; } +/** + * Trailing silence pushed at the end of a clean stream (or when a stream + * is cancelled) so the null-sink doesn't leave the last PCM sample held in + * Chrome's resampler, which surfaces as a "pop" to other participants. + */ +const TRAILING_SILENCE_MS = 50; + +/** + * In-flight playback registry — keyed by the stream's uuid so `DELETE + * /play_audio/:streamId` can target a specific stream. The value is just + * the `AbortController` the POST handler is racing against. + */ +interface ActiveStream { + controller: AbortController; + handle: AudioPlaybackHandle; +} + /** Build (but do not start) the HTTP server. */ export function createHttpServer( options: CreateHttpServerOptions, ): HttpServerHandle { - const { apiToken, onLeave, onSendChat, onPlayAudio } = options; + const { + apiToken, + onLeave, + onSendChat, + onPlayAudio, + startPlayback, + playbackSpawnOptions, + } = options; + const playbackFactory = startPlayback ?? startAudioPlayback; + + const activeStreams = new Map(); const app = new Hono(); @@ -178,15 +224,184 @@ export function createHttpServer( }); // ------------------------------------------------------------------------- - // POST /play_audio — pure stub until Phase 3 lands. + // POST /play_audio — stream raw PCM body into pacat. + // + // The body is `application/octet-stream`: s16le mono 48kHz PCM, framed + // however the daemon likes (chunks don't need to be sample-aligned; pacat + // buffers internally). We allocate a stream id per request (either from + // `?stream_id=` or a fresh uuid) so a later `DELETE /play_audio/:id` can + // cancel this specific pipeline for barge-in. + // + // Status codes: + // - 200 `{ streamId, bytes }` — body fully forwarded. + // - 400 — wrong content-type. + // - 499 — cancelled mid-stream (client-closed OR + // `DELETE /play_audio/:id` fired). + // - 500 `{ error }` — pacat failed to start / write errored. // ------------------------------------------------------------------------- app.post("/play_audio", async (c) => { - // We don't validate the body here — the stream metadata shape may still - // change when the Phase 3 audio channel lands. The callback is invoked - // with a placeholder so its signature can stabilize in tests. - void Promise.resolve(onPlayAudio("")).catch(() => {}); - return c.json({ error: "not implemented" }, 501); + const contentType = c.req.header("content-type") ?? ""; + if (!contentType.toLowerCase().startsWith("application/octet-stream")) { + return c.json( + { + error: + "invalid content-type; expected application/octet-stream (raw PCM)", + }, + 400, + ); + } + + const providedId = c.req.query("stream_id"); + const streamId = + providedId && providedId.length > 0 ? providedId : randomUUID(); + + // If a stream id was reused, cancel whatever's in flight first. This + // matches the barge-in semantics we want in Phase 3: a fresh POST with + // the same id supersedes the old one. + const existing = activeStreams.get(streamId); + if (existing) { + existing.controller.abort(); + } + + let handle: AudioPlaybackHandle; + try { + handle = playbackFactory(playbackSpawnOptions); + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + return c.json({ error: `failed to start playback: ${message}` }, 500); + } + + const controller = new AbortController(); + activeStreams.set(streamId, { controller, handle }); + + // Observability hook — invoked fire-and-forget so slow callbacks don't + // stall the audio pipeline. + void Promise.resolve(onPlayAudio(streamId)).catch(() => {}); + + const body = c.req.raw.body; + if (!body) { + activeStreams.delete(streamId); + // No body is treated as an empty stream — flush trailing silence for + // symmetry and return success. + try { + await handle.flushSilence(TRAILING_SILENCE_MS); + } catch { + // Best-effort; silence is cosmetic. + } + return c.json({ streamId, bytes: 0 }, 200); + } + + let bytes = 0; + let cancelled = false; + let writeError: Error | null = null; + + const reader = body.getReader(); + const abortPromise = new Promise((resolve) => { + if (controller.signal.aborted) { + cancelled = true; + resolve(); + return; + } + controller.signal.addEventListener( + "abort", + () => { + cancelled = true; + try { + // Best-effort — releases the reader so the `read()` loop sees + // EOF on the next iteration. + reader.cancel().catch(() => {}); + } catch { + // ignore + } + resolve(); + }, + { once: true }, + ); + }); + + try { + while (true) { + const readP = reader.read(); + const next = await Promise.race([ + readP.then((r) => ({ kind: "read" as const, value: r })), + abortPromise.then(() => ({ kind: "abort" as const })), + ]); + + if (next.kind === "abort") { + break; + } + const { value, done } = next.value; + if (done) break; + if (!value || value.length === 0) continue; + + try { + await handle.write(value); + bytes += value.length; + } catch (err) { + writeError = err instanceof Error ? err : new Error(String(err)); + break; + } + } + } finally { + try { + reader.releaseLock(); + } catch { + // Lock may already be released after `cancel()`; fine. + } + activeStreams.delete(streamId); + // Always flush trailing silence so we don't "pop" — even on cancel, + // which intentionally stops PCM mid-frame. + try { + await handle.flushSilence(TRAILING_SILENCE_MS); + } catch { + // Best-effort. + } + } + + if (writeError) { + return c.json( + { error: `playback write failed: ${writeError.message}`, bytes }, + 500, + ); + } + if (cancelled) { + // 499 — Nginx's convention for "client closed request"; used here as + // the signal that playback was interrupted (either by the HTTP peer + // dropping or by DELETE /play_audio/:id). Hono's typed status codes + // don't include 499 (it's non-standard), so we build the Response by + // hand. + return new Response( + JSON.stringify({ streamId, bytes, cancelled: true }), + { + status: 499, + headers: { "content-type": "application/json" }, + }, + ); + } + return c.json({ streamId, bytes }, 200); + }); + + // ------------------------------------------------------------------------- + // DELETE /play_audio/:streamId — cancel a specific in-flight playback. + // + // Used by barge-in (PR 3): when the daemon detects the user talking over + // the bot, it nukes the active stream so pacat stops writing into + // `bot_out`. Returns 404 if no such stream exists (which is a normal + // race — the stream might have just completed). + // ------------------------------------------------------------------------- + + app.delete("/play_audio/:streamId", async (c) => { + const streamId = c.req.param("streamId"); + const stream = activeStreams.get(streamId); + if (!stream) { + return c.json({ error: "no such stream", streamId }, 404); + } + stream.controller.abort(); + // Don't wait for the POST handler to finish — the DELETE is an + // interrupt, not a join point. The POST side is responsible for + // flushing silence and clearing its registry entry. + return c.json({ cancelled: true, streamId }, 200); }); // ------------------------------------------------------------------------- diff --git a/skills/meet-join/bot/src/media/audio-playback.ts b/skills/meet-join/bot/src/media/audio-playback.ts new file mode 100644 index 00000000000..9850d5e987f --- /dev/null +++ b/skills/meet-join/bot/src/media/audio-playback.ts @@ -0,0 +1,268 @@ +/** + * Audio playback for the meet-bot. + * + * Spawns `pacat --playback` against the `bot_out` Pulse null-sink (set up by + * `pulse-setup.sh`); Chrome's microphone is wired to `bot_out.monitor`, so + * any PCM we push into pacat's stdin shows up as the bot's voice inside the + * Meet tab. + * + * Public surface: + * - `startAudioPlayback()` — idempotent. Returns a writable handle. If a + * previous pacat is still alive, the same handle is returned so + * concurrent playback calls don't fight over who spawned the process. + * - `stopAudioPlayback()` — closes the current handle, kills pacat, and + * resets the module-level singleton. Safe to call repeatedly. + * - `flushSilence(ms)` — writes `ms * bytesPerMs` zero-valued bytes into + * the active handle. Used to cleanly terminate a stream without the + * characteristic "pop" caused by a sudden cut-off. + * + * Defaults match what the daemon sends: s16le mono 48kHz. That rate is + * chosen to match Meet's WebRTC pipeline so the null-sink resampler has no + * work to do. + * + * Like `audio-capture`, the module accepts an injected `spawn` factory so + * tests can exercise ordering, cancellation, and silence-flush behavior + * without ever shelling out to a real `pacat`. The default factory calls + * `Bun.spawn`. + */ + +import type { Subprocess } from "bun"; + +/** Pulse sink where Chrome will pick up our playback. */ +export const DEFAULT_PLAYBACK_DEVICE = "bot_out"; + +/** 48kHz s16le mono — matches WebRTC. */ +export const DEFAULT_RATE_HZ = 48_000; +export const DEFAULT_CHANNELS = 1; +/** s16le = 2 bytes/sample. */ +export const DEFAULT_SAMPLE_BYTES = 2; + +/** + * Bytes of PCM per millisecond at the default format. + * 48000 samples/s * 1 channel * 2 bytes = 96 bytes/ms. + */ +export const DEFAULT_BYTES_PER_MS = + (DEFAULT_RATE_HZ * DEFAULT_CHANNELS * DEFAULT_SAMPLE_BYTES) / 1000; + +/** + * Minimal slice of `Bun.spawn`'s return type that playback actually needs. + * Tests can satisfy this with a shim whose stdin captures writes into a + * buffer. + */ +export interface SpawnedPacat { + /** Writable end of pacat's stdin. Accepts raw PCM. */ + stdin: PacatWritable; + /** Settles once pacat exits. */ + exited: Promise; + /** Send a signal — SIGTERM on clean shutdown. */ + kill(signal?: number | NodeJS.Signals): void; +} + +/** + * The subset of a writable stream we rely on. Matches both Node's + * `Writable` and `FileSink` (returned by `Bun.spawn`'s `stdin: "pipe"`). + * + * `write` is allowed to return synchronously (Node) or as a promise + * (FileSink); the caller always `await`s it. + */ +export interface PacatWritable { + write(chunk: Uint8Array): number | Promise | void | boolean; + end?: () => void | Promise; +} + +export type PacatSpawnFactory = (argv: readonly string[]) => SpawnedPacat; + +/** + * Options for `startAudioPlayback`. In production all fields default to + * reasonable values; tests typically override `spawn`. + */ +export interface StartAudioPlaybackOptions { + device?: string; + rateHz?: number; + channels?: number; + spawn?: PacatSpawnFactory; +} + +/** + * Writable handle backed by an active `pacat` process. + * + * `write` returns once the bytes have been handed to pacat (its stdin may + * apply backpressure via a promise resolution). `flushSilence` pushes a + * block of zeroes — typically 50ms at shutdown to avoid a popping sound. + */ +export interface AudioPlaybackHandle { + /** Whether this handle is still usable. Flips to `false` on stop. */ + readonly active: boolean; + /** The argv pacat was spawned with. Useful for debugging/logging. */ + readonly argv: readonly string[]; + /** Bytes of PCM per millisecond at this handle's configured format. */ + readonly bytesPerMs: number; + /** Write raw PCM bytes to pacat's stdin. */ + write(chunk: Uint8Array): Promise; + /** Write `ms` milliseconds of silence (zero bytes). */ + flushSilence(ms: number): Promise; +} + +/** Default spawn factory — wraps `Bun.spawn` with the pacat flags. */ +function defaultSpawn(argv: readonly string[]): SpawnedPacat { + const proc: Subprocess<"pipe", "pipe", "pipe"> = Bun.spawn(argv as string[], { + stdin: "pipe", + stdout: "pipe", + stderr: "pipe", + }); + return { + stdin: proc.stdin as unknown as PacatWritable, + exited: proc.exited, + kill: (signal) => proc.kill(signal ?? "SIGTERM"), + }; +} + +function buildPacatArgv( + device: string, + rateHz: number, + channels: number, +): readonly string[] { + return [ + "pacat", + "--playback", + `--device=${device}`, + "--format=s16le", + `--rate=${rateHz}`, + `--channels=${channels}`, + "--raw", + ]; +} + +interface ActivePlayback { + proc: SpawnedPacat; + handle: AudioPlaybackHandle; + markDead: () => void; +} + +/** + * Module-level singleton — we only ever run one pacat at a time. Sharing + * the sink across concurrent playbacks is a daemon concern (mixing), not + * the bot's: the bot just needs to stream whatever PCM arrives in order. + */ +let active: ActivePlayback | null = null; + +/** + * Start (or reuse) the playback pipeline. Returns an `AudioPlaybackHandle` + * that callers write PCM into. Idempotent — if pacat is already running, + * the existing handle is returned verbatim so callers don't race on spawn. + */ +export function startAudioPlayback( + opts: StartAudioPlaybackOptions = {}, +): AudioPlaybackHandle { + if (active !== null) { + return active.handle; + } + + const device = opts.device ?? DEFAULT_PLAYBACK_DEVICE; + const rateHz = opts.rateHz ?? DEFAULT_RATE_HZ; + const channels = opts.channels ?? DEFAULT_CHANNELS; + const spawn = opts.spawn ?? defaultSpawn; + const bytesPerMs = (rateHz * channels * DEFAULT_SAMPLE_BYTES) / 1000; + + const argv = buildPacatArgv(device, rateHz, channels); + const proc = spawn(argv); + + let alive = true; + const markDead = () => { + alive = false; + }; + + // If pacat exits on its own, drop the singleton so the next + // `startAudioPlayback` call respawns it instead of handing back a dead + // handle. + void proc.exited.then(() => { + markDead(); + if (active && active.proc === proc) { + active = null; + } + }); + + const handle: AudioPlaybackHandle = { + get active() { + return alive; + }, + argv, + bytesPerMs, + async write(chunk: Uint8Array): Promise { + if (!alive) { + throw new Error("audio-playback: write to inactive handle"); + } + if (chunk.length === 0) return; + const result = proc.stdin.write(chunk); + if (result && typeof (result as Promise).then === "function") { + await result; + } + }, + async flushSilence(ms: number): Promise { + if (!alive) return; + if (ms <= 0) return; + const total = Math.floor(ms * bytesPerMs); + if (total <= 0) return; + // Write silence in a single allocation — 50ms at 48kHz mono is only + // 4800 bytes, comfortably small. + const silence = new Uint8Array(total); // zero-filled by default + await this.write(silence); + }, + }; + + active = { proc, handle, markDead }; + return handle; +} + +/** + * Stop the current pacat process (if any). Safe to call when nothing is + * running. Awaits the process exit so callers can sequence teardown. + */ +export async function stopAudioPlayback(): Promise { + const current = active; + if (current === null) return; + active = null; + current.markDead(); + try { + await current.proc.stdin.end?.(); + } catch { + // Best-effort — if stdin is already closed, that's fine. + } + try { + current.proc.kill("SIGTERM"); + } catch { + // Best-effort — process may already be gone. + } + try { + await current.proc.exited; + } catch { + // Exit shouldn't throw; paranoia. + } +} + +/** + * Convenience — `flushSilence` through the currently-active handle. If no + * handle is active this is a no-op. + */ +export async function flushSilence(ms: number): Promise { + const current = active; + if (current === null) return; + await current.handle.flushSilence(ms); +} + +/** + * Test-only: peek at the currently-active handle (or `null` if none). + * Exposed so tests can assert singleton behavior. + */ +export function __getActiveHandleForTests(): AudioPlaybackHandle | null { + return active ? active.handle : null; +} + +/** + * Test-only: force-reset the singleton so tests don't leak state between + * cases. Does not attempt to kill pacat — use `stopAudioPlayback()` for + * that. + */ +export function __resetForTests(): void { + active = null; +}