diff --git a/meet-bot/__tests__/audio-capture.test.ts b/meet-bot/__tests__/audio-capture.test.ts new file mode 100644 index 00000000000..7496fcd5118 --- /dev/null +++ b/meet-bot/__tests__/audio-capture.test.ts @@ -0,0 +1,538 @@ +/** + * Unit tests for the audio-capture pipeline. + * + * We never invoke real `parec` or open real Unix sockets here — the module + * is designed around injected `spawn` / `connect` factories so the tests + * can feed canned PCM through the chunker and inspect what lands on the + * socket side. This keeps the suite fast and hermetic (runs on macOS CI + * and containerless hosts alike). + * + * Coverage: + * - Happy path: PCM bytes from `parec` stdout are chunked into frames of + * the requested size and written to the socket in order. + * - Reconnect on parec exit: one failed spawn, second spawn succeeds. + * - Error surface after the retry budget is exhausted. + * - Non-default frame size honored. + */ + +import { describe, expect, test } from "bun:test"; + +import { + DEFAULT_FRAME_BYTES, + DEFAULT_RATE_HZ, + DEFAULT_SOURCE_DEVICE, + type CapturedSocket, + type SpawnedParec, + startAudioCapture, +} from "../src/media/audio-capture.js"; + +/** -------------------- helpers --------------------------------------- */ + +/** + * Build a `SpawnedParec` whose stdout emits the supplied `Uint8Array` + * chunks synchronously (as separate `enqueue` calls) and then closes. + * `exited` only settles once `kill()` is invoked — this prevents the + * retry loop in `startAudioCapture` from firing spuriously as soon as the + * stream drains. + */ +function fakeParec(chunks: Uint8Array[]): { + proc: SpawnedParec; + killed: Promise; +} { + let resolveExited!: (code: number) => void; + let resolveKilled!: () => void; + const exited = new Promise((resolve) => { + resolveExited = resolve; + }); + const killed = new Promise((resolve) => { + resolveKilled = resolve; + }); + + const stdout = new ReadableStream({ + start(controller) { + for (const chunk of chunks) { + controller.enqueue(chunk); + } + controller.close(); + }, + }); + + const proc: SpawnedParec = { + stdout, + exited, + kill() { + resolveExited(0); + resolveKilled(); + }, + }; + return { proc, killed }; +} + +/** + * `SpawnedParec` that exits with the supplied non-zero code immediately + * (no stdout emitted). Used to exercise the reconnect / retry paths. + */ +function fakeFailedParec(exitCode: number): SpawnedParec { + const stdout = new ReadableStream({ + start(controller) { + controller.close(); + }, + }); + return { + stdout, + exited: Promise.resolve(exitCode), + kill() { + /* already exited */ + }, + }; +} + +interface RecordingSocket extends CapturedSocket { + /** All bytes written via `write()`, concatenated in order. */ + writes: Uint8Array[]; + /** Count of `end()` invocations. */ + endCalls: number; + /** Count of `destroy()` invocations. */ + destroyCalls: number; + /** Trigger a synthetic `error` event on this socket. */ + triggerError(err: NodeJS.ErrnoException): void; + /** Trigger a synthetic `close` event on this socket. */ + triggerClose(): void; +} + +/** + * Build an in-memory socket shim that records every write and can be + * signalled by the test to emit `error` / `close` events. This is the + * substitute for the real Unix socket server on the daemon side. + */ +function recordingSocket(): RecordingSocket { + const errorListeners: Array<(err: NodeJS.ErrnoException) => void> = []; + const closeListeners: Array<() => void> = []; + const writes: Uint8Array[] = []; + let endCalls = 0; + let destroyCalls = 0; + + return { + writes, + get endCalls() { + return endCalls; + }, + get destroyCalls() { + return destroyCalls; + }, + write(chunk: Uint8Array) { + // Copy so later mutations by the test fixture can't retroactively + // change what we've "received" on the wire. + writes.push(new Uint8Array(chunk)); + return true; + }, + end() { + endCalls += 1; + }, + destroy() { + destroyCalls += 1; + }, + on(event, listener) { + if (event === "error") { + errorListeners.push(listener as (err: NodeJS.ErrnoException) => void); + } else { + closeListeners.push(listener as () => void); + } + }, + triggerError(err: NodeJS.ErrnoException) { + for (const l of errorListeners) l(err); + }, + triggerClose() { + for (const l of closeListeners) l(); + }, + }; +} + +/** Concatenate an array of Uint8Arrays into a single buffer. */ +function concat(parts: Uint8Array[]): Uint8Array { + const total = parts.reduce((n, p) => n + p.length, 0); + const out = new Uint8Array(total); + let offset = 0; + for (const part of parts) { + out.set(part, offset); + offset += part.length; + } + return out; +} + +/** Build a deterministic fake PCM payload of the given size. */ +function fakePcm(size: number): Uint8Array { + const out = new Uint8Array(size); + for (let i = 0; i < size; i++) { + // Cheap pattern: low byte is the index, so we can eyeball the contents + // on test failure output. + out[i] = i & 0xff; + } + return out; +} + +async function tick(ms = 0): Promise { + await new Promise((r) => setTimeout(r, ms)); +} + +/** + * Poll `predicate` until it returns true or the deadline elapses. Used to + * wait for async side-effects (e.g. writes landing on the recording + * socket) without relying on fixed sleeps. + */ +async function waitFor( + predicate: () => boolean, + { timeoutMs = 2000, intervalMs = 5 }: { timeoutMs?: number; intervalMs?: number } = {}, +): Promise { + const deadline = Date.now() + timeoutMs; + while (Date.now() < deadline) { + if (predicate()) return; + await tick(intervalMs); + } + throw new Error(`waitFor: predicate did not become true in ${timeoutMs}ms`); +} + +/** -------------------- tests ----------------------------------------- */ + +describe("startAudioCapture — argv + defaults", () => { + test("spawns parec with the expected flags and defaults", async () => { + const spawnedArgv: string[][] = []; + const { proc } = fakeParec([]); + const sock = recordingSocket(); + + const capture = await startAudioCapture({ + socketPath: "/tmp/test.sock", + spawn: (argv) => { + spawnedArgv.push([...argv]); + return proc; + }, + connect: () => sock, + }); + + expect(spawnedArgv.length).toBe(1); + expect(spawnedArgv[0]).toEqual([ + "parec", + `--device=${DEFAULT_SOURCE_DEVICE}`, + "--format=s16le", + `--rate=${DEFAULT_RATE_HZ}`, + "--channels=1", + "--raw", + ]); + + await capture.stop(); + }); + + test("honors custom sourceDevice + rateHz", async () => { + const spawnedArgv: string[][] = []; + const { proc } = fakeParec([]); + const sock = recordingSocket(); + + const capture = await startAudioCapture({ + socketPath: "/tmp/test.sock", + sourceDevice: "custom_source.monitor", + rateHz: 48_000, + spawn: (argv) => { + spawnedArgv.push([...argv]); + return proc; + }, + connect: () => sock, + }); + + expect(spawnedArgv[0]).toEqual([ + "parec", + "--device=custom_source.monitor", + "--format=s16le", + "--rate=48000", + "--channels=1", + "--raw", + ]); + + await capture.stop(); + }); + + test("passes the socketPath verbatim to the connect factory", async () => { + const { proc } = fakeParec([]); + const sock = recordingSocket(); + const seenPaths: string[] = []; + + const capture = await startAudioCapture({ + socketPath: "/var/run/meet/audio-xyz.sock", + spawn: () => proc, + connect: (path) => { + seenPaths.push(path); + return sock; + }, + }); + + expect(seenPaths).toEqual(["/var/run/meet/audio-xyz.sock"]); + await capture.stop(); + }); + + test("rejects a zero or negative frameBytes at start", async () => { + const { proc } = fakeParec([]); + const sock = recordingSocket(); + + let thrown: unknown; + try { + await startAudioCapture({ + socketPath: "/tmp/x.sock", + frameBytes: 0, + spawn: () => proc, + connect: () => sock, + }); + } catch (err) { + thrown = err; + } + expect(thrown).toBeInstanceOf(Error); + expect((thrown as Error).message).toContain("frameBytes must be > 0"); + }); +}); + +describe("startAudioCapture — framing", () => { + test("chunks PCM into frames of the requested size, preserving byte order", async () => { + // Use the production default (320 bytes). Feed 5 frames' worth (1600 + // bytes) split across 3 arbitrarily-sized chunks to prove the chunker + // re-assembles them at frame boundaries. + const frameBytes = DEFAULT_FRAME_BYTES; + const total = frameBytes * 5; + const payload = fakePcm(total); + const split1 = payload.slice(0, 100); // smaller than one frame + const split2 = payload.slice(100, 700); // crosses frame boundary + const split3 = payload.slice(700); // remainder + const { proc } = fakeParec([split1, split2, split3]); + const sock = recordingSocket(); + + const capture = await startAudioCapture({ + socketPath: "/tmp/t.sock", + spawn: () => proc, + connect: () => sock, + }); + + // Wait until all 5 frames have arrived at the socket. + await waitFor(() => sock.writes.length === 5); + + // Every write must be exactly `frameBytes` bytes. + for (const w of sock.writes) { + expect(w.length).toBe(frameBytes); + } + + // Concatenated writes must equal the original payload verbatim. + expect(concat(sock.writes)).toEqual(payload); + + await capture.stop(); + }); + + test("drops an incomplete trailing partial frame at EOF", async () => { + // 320-byte frames; send 321 bytes so exactly one full frame flushes and + // the single-byte tail is held in the buffer until EOF, where the + // implementation drops it rather than emitting a short frame. + const frameBytes = 320; + const payload = fakePcm(frameBytes + 1); + const { proc } = fakeParec([payload]); + const sock = recordingSocket(); + + const capture = await startAudioCapture({ + socketPath: "/tmp/t.sock", + frameBytes, + spawn: () => proc, + connect: () => sock, + }); + + await waitFor(() => sock.writes.length === 1); + // Give the pump a tick to confirm it doesn't emit another (short) frame + // after the stream ends. + await tick(20); + expect(sock.writes.length).toBe(1); + expect(sock.writes[0]!.length).toBe(frameBytes); + + await capture.stop(); + }); + + test("supports non-default frame sizes", async () => { + const frameBytes = 64; + const payload = fakePcm(frameBytes * 3); + const { proc } = fakeParec([payload]); + const sock = recordingSocket(); + + const capture = await startAudioCapture({ + socketPath: "/tmp/t.sock", + frameBytes, + spawn: () => proc, + connect: () => sock, + }); + + await waitFor(() => sock.writes.length === 3); + for (const w of sock.writes) { + expect(w.length).toBe(frameBytes); + } + expect(concat(sock.writes)).toEqual(payload); + + await capture.stop(); + }); +}); + +describe("startAudioCapture — reconnect", () => { + test("reconnects once after parec exits non-zero and resumes piping", async () => { + // First spawn: parec exits with code 1 before emitting any data. + const first = fakeFailedParec(1); + // Second spawn: real canned payload. + const payload = fakePcm(640); // two default-size frames + const { proc: second } = fakeParec([payload]); + + const procs: SpawnedParec[] = [first, second]; + const spawnCalls: string[][] = []; + let spawnIdx = 0; + + const sock = recordingSocket(); + + const capture = await startAudioCapture({ + socketPath: "/tmp/t.sock", + spawn: (argv) => { + spawnCalls.push([...argv]); + const p = procs[spawnIdx++]; + if (!p) throw new Error("spawn called more times than expected"); + return p; + }, + connect: () => sock, + }); + + // Two frames should arrive after the reconnect. + await waitFor(() => sock.writes.length === 2); + expect(spawnCalls.length).toBe(2); + expect(concat(sock.writes)).toEqual(payload); + + await capture.stop(); + }); + + test("surfaces an error after 3 failed reconnects", async () => { + // Every spawn returns a process that exits immediately with code 1. + // Initial attempt + 3 reconnects = 4 spawns before we give up. `stop()` + // must reject with an Error mentioning the retry exhaustion. + let spawnCount = 0; + const capture = await startAudioCapture({ + socketPath: "/tmp/t.sock", + spawn: () => { + spawnCount += 1; + return fakeFailedParec(1); + }, + connect: () => recordingSocket(), + }); + + // Wait for the retry budget to be exhausted. 4 spawns * (~1ms per + // attempt + 500ms backoff between attempts) — use a generous ceiling. + await waitFor(() => spawnCount >= 4, { timeoutMs: 5000 }); + + // Give the loop a moment to record the fatal error and signal done. + await tick(50); + + let thrown: unknown; + try { + await capture.stop(); + } catch (err) { + thrown = err; + } + + expect(thrown).toBeInstanceOf(Error); + expect((thrown as Error).message).toContain("parec exited with code 1"); + // Initial + 3 reconnects = 4 total spawns. + expect(spawnCount).toBe(4); + }); + + test("onError callback fires after retry budget is exhausted", async () => { + const errors: Error[] = []; + let spawnCount = 0; + const capture = await startAudioCapture({ + socketPath: "/tmp/t.sock", + onError: (err) => errors.push(err), + spawn: () => { + spawnCount += 1; + return fakeFailedParec(2); + }, + connect: () => recordingSocket(), + }); + + // Wait until the retry budget has been exhausted and the loop has + // fired the callback. `stop()` early would suppress the fatal-error + // path by short-circuiting the loop. + await waitFor(() => errors.length === 1, { timeoutMs: 5000 }); + expect(spawnCount).toBe(4); + expect(errors[0]!.message).toContain("parec exited with code 2"); + + // stop() after the fact must still reject with the accumulated error. + let thrown: unknown; + try { + await capture.stop(); + } catch (err) { + thrown = err; + } + expect(thrown).toBeInstanceOf(Error); + }); +}); + +describe("startAudioCapture — stop semantics", () => { + test("stop() kills parec and closes the socket", async () => { + const { proc, killed } = fakeParec([fakePcm(DEFAULT_FRAME_BYTES)]); + const sock = recordingSocket(); + + const capture = await startAudioCapture({ + socketPath: "/tmp/t.sock", + spawn: () => proc, + connect: () => sock, + }); + + await waitFor(() => sock.writes.length === 1); + await capture.stop(); + + // `stop()` must have killed the fake parec (it resolves `killed`). + await killed; + // And must have torn down the socket. + expect(sock.endCalls).toBeGreaterThanOrEqual(1); + expect(sock.destroyCalls).toBeGreaterThanOrEqual(1); + }); + + test("stop() is idempotent", async () => { + const { proc } = fakeParec([]); + const sock = recordingSocket(); + + const capture = await startAudioCapture({ + socketPath: "/tmp/t.sock", + spawn: () => proc, + connect: () => sock, + }); + + await capture.stop(); + // Second call should not hang or throw a duplicate error. + await capture.stop(); + }); + + test("socket error during capture triggers a reconnect", async () => { + // First socket errors out; second is a plain recorder. + const sock1 = recordingSocket(); + const sock2 = recordingSocket(); + let connectIdx = 0; + + const payload = fakePcm(DEFAULT_FRAME_BYTES); + const { proc: proc1 } = fakeParec([]); + const { proc: proc2 } = fakeParec([payload]); + const procs = [proc1, proc2]; + let spawnIdx = 0; + + const capture = await startAudioCapture({ + socketPath: "/tmp/t.sock", + spawn: () => procs[spawnIdx++]!, + connect: () => (connectIdx++ === 0 ? sock1 : sock2), + }); + + // Simulate a socket error after the initial connect completes. + await tick(10); + const connErr = new Error("ECONNRESET") as NodeJS.ErrnoException; + connErr.code = "ECONNRESET"; + sock1.triggerError(connErr); + + // Expect the second socket to eventually receive the replayed payload. + await waitFor(() => sock2.writes.length === 1); + expect(concat(sock2.writes)).toEqual(payload); + + await capture.stop(); + }); +}); diff --git a/meet-bot/src/media/audio-capture.ts b/meet-bot/src/media/audio-capture.ts new file mode 100644 index 00000000000..a49e935abb4 --- /dev/null +++ b/meet-bot/src/media/audio-capture.ts @@ -0,0 +1,510 @@ +/** + * Audio capture for the meet-bot. + * + * Spawns `parec` against the `meet_capture.monitor` Pulse source (set up by + * `pulse-setup.sh` in PR 5), chunks the raw PCM stream into fixed-size + * frames, and writes those frames into a Unix socket whose server end is + * opened by the daemon (`MeetAudioIngest`, PR 16) on the host. The socket + * path is bind-mounted into the container so the bot can connect as a + * client. + * + * Defaults are tuned for Deepgram's realtime STT: s16le mono 16kHz with + * 20ms frames (320 bytes = 160 samples * 2 bytes). Callers can override the + * rate/frame size for other consumers (e.g. Whisper streaming). + * + * Transient failures — `parec` crashing, the socket dropping, the daemon + * not yet listening — are absorbed with a bounded retry loop. After 3 + * consecutive failed attempts to re-establish the pipeline we surface the + * accumulated error via `stop()`'s rejection (and via the optional + * `onError` callback for async observation). + * + * The implementation is structured around injectable `spawn` / `connect` + * factories so the unit tests can exercise the chunking, reconnect, and + * error paths without shelling out to a real `parec` or opening a real + * socket. The defaults point at `Bun.spawn` + `net.createConnection`. + */ + +import type { Subprocess } from "bun"; +import { + createConnection as netCreateConnection, + type Socket as NetSocket, +} from "node:net"; + +/** + * Default Pulse source the bot taps for the meeting's audio. `meet_capture` + * is a null-sink provisioned by `pulse-setup.sh`; its `.monitor` side is + * where Chrome's playback lands and where we siphon PCM from. + */ +export const DEFAULT_SOURCE_DEVICE = "meet_capture.monitor"; + +/** s16le mono 16kHz matches Deepgram's realtime ingest format. */ +export const DEFAULT_RATE_HZ = 16_000; + +/** + * 20ms at 16kHz, s16le, mono = 160 samples * 2 bytes = 320 bytes. This is + * Deepgram's recommended realtime frame size. + */ +export const DEFAULT_FRAME_BYTES = 320; + +const MAX_RECONNECT_ATTEMPTS = 3; +const RECONNECT_BACKOFF_MS = 500; + +export interface AudioCaptureOptions { + /** + * Absolute path to the Unix socket the daemon is listening on. The daemon + * owns the server end; the bot is the client. + */ + socketPath: string; + /** + * Pulse source to capture from. Defaults to the monitor of the + * `meet_capture` null-sink created by `pulse-setup.sh`. + */ + sourceDevice?: string; + /** Sample rate in Hz. Defaults to 16000. */ + rateHz?: number; + /** + * Frame size in bytes. `parec`'s stdout is accumulated until this many + * bytes are buffered, then flushed to the socket as a single write. + * Defaults to 320 bytes (20ms at 16kHz mono 16-bit). + */ + frameBytes?: number; + /** + * Optional async error observer. Fired once — when the capture gives up + * after exhausting the reconnect budget. `stop()` will also reject with + * the same error, so callbacks are not required. + */ + onError?: (err: Error) => void; + /** + * Test hook — factory matching `Bun.spawn`'s shape, used to spawn `parec`. + * Left unset in production so we use Bun directly. + */ + spawn?: SpawnFactory; + /** + * Test hook — factory matching `net.createConnection`'s shape, used to + * open the client socket. Left unset in production so we use Node's net. + */ + connect?: ConnectFactory; +} + +export interface AudioCaptureHandle { + /** + * Stop the capture pipeline. Sends SIGTERM to `parec`, ends the socket, + * and resolves once both have settled. If the pipeline previously + * exhausted its reconnect budget and ended in an error, this rejects + * with that error (so callers can `await handle.stop()` and catch). + */ + stop(): Promise; +} + +/** + * Minimal slice of `Bun.spawn`'s return type that `startAudioCapture` + * actually uses. Tests can satisfy this without implementing the full + * `Subprocess` surface. + */ +export interface SpawnedParec { + /** Readable stream of raw PCM bytes from `parec`'s stdout. */ + stdout: ReadableStream | null; + /** Settles with the child's exit code. */ + exited: Promise; + /** Send a signal to the child. SIGTERM on clean shutdown. */ + kill(signal?: number | NodeJS.Signals): void; +} + +export type SpawnFactory = ( + argv: readonly string[], +) => SpawnedParec; + +/** + * Minimal slice of `net.Socket` that `startAudioCapture` relies on. Tests + * provide a shim with just these members so we don't need a real kernel + * socket. + * + * The `on` overload intentionally uses a union event + any-listener shape + * instead of Node's exact overloads: this keeps the shim declaration in + * test code a single-line object literal rather than forcing the test + * harness to mirror Node's event-map types. + */ +export interface CapturedSocket { + /** Enqueue bytes to be written. Must return `true`/`false` per Node's API. */ + write(chunk: Uint8Array): boolean; + /** Half-close the writable side, causing the server to see EOF. */ + end(): void; + /** Force-close (optional — tests may no-op). */ + destroy(): void; + /** + * Subscribe to `error` / `close` events. The listener receives an + * `Error` for `"error"` and no arguments for `"close"` — callers are + * expected to branch on the event name themselves. + */ + on( + event: "error" | "close", + listener: (err?: NodeJS.ErrnoException) => void, + ): void; +} + +export type ConnectFactory = (socketPath: string) => CapturedSocket; + +/** Default spawn factory — delegates to `Bun.spawn` with the parec flags. */ +function defaultSpawn(argv: readonly string[]): SpawnedParec { + const proc: Subprocess = Bun.spawn(argv as string[], { + stdin: "ignore", + stdout: "pipe", + stderr: "pipe", + }); + return { + stdout: proc.stdout as ReadableStream | null, + exited: proc.exited, + kill: (signal) => proc.kill(signal ?? "SIGTERM"), + }; +} + +/** Default connect factory — a Node `net.createConnection` over a Unix path. */ +function defaultConnect(socketPath: string): CapturedSocket { + const sock: NetSocket = netCreateConnection({ path: socketPath }); + return { + write: (chunk) => sock.write(chunk), + end: () => sock.end(), + destroy: () => sock.destroy(), + on(event, listener) { + if (event === "error") { + sock.on("error", (err: NodeJS.ErrnoException) => listener(err)); + } else { + sock.on("close", () => listener()); + } + }, + }; +} + +function buildParecArgv( + sourceDevice: string, + rateHz: number, +): readonly string[] { + return [ + "parec", + `--device=${sourceDevice}`, + "--format=s16le", + `--rate=${rateHz}`, + "--channels=1", + "--raw", + ]; +} + +/** + * Start capturing meeting audio and forwarding it to the daemon. + * + * On success, returns a handle with a `stop()` method. The pipeline keeps + * running (and auto-reconnects on transient failures) until the caller + * invokes `stop()` or the reconnect budget is exhausted. + */ +export async function startAudioCapture( + opts: AudioCaptureOptions, +): Promise { + const sourceDevice = opts.sourceDevice ?? DEFAULT_SOURCE_DEVICE; + const rateHz = opts.rateHz ?? DEFAULT_RATE_HZ; + const frameBytes = opts.frameBytes ?? DEFAULT_FRAME_BYTES; + const spawn = opts.spawn ?? defaultSpawn; + const connect = opts.connect ?? defaultConnect; + const onError = opts.onError; + + if (frameBytes <= 0) { + throw new Error( + `startAudioCapture: frameBytes must be > 0, got ${frameBytes}`, + ); + } + + const argv = buildParecArgv(sourceDevice, rateHz); + + // Capture state — shared across the retry loop. + let stopping = false; + let fatalError: Error | null = null; + let currentProc: SpawnedParec | null = null; + let currentSocket: CapturedSocket | null = null; + + // Wakeup channel fired when `stop()` is called — lets the inner attempt + // loop race against a stop signal without polling. + let fireStopSignal!: () => void; + const stopSignal = new Promise((resolve) => { + fireStopSignal = resolve; + }); + + // Resolves once the retry loop has fully wound down (either after a + // user-initiated `stop()` or after exhausting the retry budget). `stop()` + // awaits this so the caller can block until everything is torn down. + let loopDone!: () => void; + const loopDonePromise = new Promise((resolve) => { + loopDone = resolve; + }); + + /** + * Single pipeline attempt: spawn parec, open the socket, pipe stdout + * through a frame chunker into the socket, and resolve with a tag + * describing how the attempt ended. + * + * - "stopped" — the caller invoked `stop()`; we tore things down cleanly. + * - "parec" — parec exited (non-stop), caller should retry. + * - "socket" — socket errored/closed (non-stop), caller should retry. + */ + type AttemptOutcome = "stopped" | "parec" | "socket"; + + async function runOneAttempt(): Promise<{ + outcome: AttemptOutcome; + error?: Error; + }> { + let attemptError: Error | undefined; + + // 1. Spawn parec. + let proc: SpawnedParec; + try { + proc = spawn(argv); + } catch (err) { + return { + outcome: "parec", + error: err instanceof Error ? err : new Error(String(err)), + }; + } + currentProc = proc; + + // 2. Connect to the daemon socket. + let sock: CapturedSocket; + try { + sock = connect(opts.socketPath); + } catch (err) { + // Socket open failed synchronously — kill parec and report. + try { + proc.kill("SIGTERM"); + } catch { + // Process may already be gone; fine. + } + return { + outcome: "socket", + error: err instanceof Error ? err : new Error(String(err)), + }; + } + currentSocket = sock; + + // 3. Wait for either parec to exit, the socket to die, or `stop()`. + const stoppedP = stopSignal.then(() => "stopped" as const); + + const parecExitedP = proc.exited.then((code) => { + if (code !== 0 && !stopping) { + attemptError = new Error(`parec exited with code ${code}`); + } + return "parec" as const; + }); + + const socketDeadP = new Promise<"socket">((resolve) => { + sock.on("error", (err) => { + if (!stopping && err) { + attemptError = err instanceof Error ? err : new Error(String(err)); + } + resolve("socket"); + }); + sock.on("close", () => { + resolve("socket"); + }); + }); + + // 4. Pipe parec.stdout through the frame chunker into the socket. + // We deliberately don't `await` the pump — it races against the three + // promises above and terminates when any of them settles. + const pumpDone = pumpFrames(proc.stdout, sock, frameBytes, () => stopping); + + const raceOutcome = await Promise.race([ + stoppedP, + parecExitedP, + socketDeadP, + ]); + // If `stop()` fired concurrently with a parec/socket event, the race + // winner is non-deterministic — force-classify as "stopped" so the + // orchestrator doesn't count a user-initiated teardown as a retry + // failure. + const outcome: AttemptOutcome = stopping ? "stopped" : raceOutcome; + + // 5. Tear down whichever side is still alive. + try { + proc.kill("SIGTERM"); + } catch { + // Already dead — fine. + } + try { + sock.end(); + } catch { + // Already closed — fine. + } + try { + sock.destroy(); + } catch { + // Ditto. + } + + // 6. Wait for the pump to drain so we don't leave dangling reads. + try { + await pumpDone; + } catch { + // Pump errors are already accounted for via the socket/parec paths. + } + + // 7. Make sure parec has fully exited before we return (so the next + // attempt starts from a clean slate). + try { + await proc.exited; + } catch { + // Shouldn't throw; paranoia. + } + + currentProc = null; + currentSocket = null; + + if (outcome === "stopped") { + return { outcome: "stopped" }; + } + return { outcome, error: attemptError }; + } + + /** + * Orchestrator: runs attempts until we either stop cleanly or exceed the + * reconnect budget. Populates `fatalError` on budget exhaustion so + * `stop()` can surface it. + */ + async function runLoop(): Promise { + let consecutiveFailures = 0; + + while (!stopping) { + const { outcome, error } = await runOneAttempt(); + + if (outcome === "stopped") { + break; + } + + // Any non-stop outcome counts as a failure toward the budget. + consecutiveFailures += 1; + if (consecutiveFailures > MAX_RECONNECT_ATTEMPTS) { + fatalError = + error ?? + new Error( + `audio-capture: exceeded ${MAX_RECONNECT_ATTEMPTS} reconnect attempts`, + ); + break; + } + + // Backoff before the next attempt. Break early if stop() fires + // during the sleep so the caller doesn't have to wait 500ms. + await Promise.race([ + new Promise((r) => setTimeout(r, RECONNECT_BACKOFF_MS)), + stopSignal, + ]); + } + + if (fatalError && onError) { + try { + onError(fatalError); + } catch { + // Callback errors are not our problem to propagate further. + } + } + loopDone(); + } + + // Kick off the first attempt synchronously before returning so callers + // see "parec is spawned" as part of `startAudioCapture`'s successful + // resolution. If the very first spawn throws, the loop will surface it + // via stop() just like any other failure. + void runLoop(); + + const stop = async (): Promise => { + if (!stopping) { + stopping = true; + // Wake the attempt loop so it stops racing on parec/socket events. + fireStopSignal(); + // Proactively tear down whatever's still alive so we don't have to + // wait for the attempt's internal race to time-slice back. + const proc = currentProc; + const sock = currentSocket; + if (proc) { + try { + proc.kill("SIGTERM"); + } catch { + // Best-effort. + } + } + if (sock) { + try { + sock.end(); + } catch { + // Best-effort. + } + try { + sock.destroy(); + } catch { + // Best-effort. + } + } + } + await loopDonePromise; + if (fatalError) { + throw fatalError; + } + }; + + return { stop }; +} + +/** + * Drain `stdout` through a frame chunker, flushing full frames into the + * socket. `frameBytes` must be > 0 (validated upstream). Partial tails are + * held in memory until the next chunk completes them; any trailing partial + * at EOF is dropped (undersized frames would confuse a downstream decoder + * expecting fixed-size PCM). + * + * Writes are plain `Uint8Array`s — we copy from the buffered arena so we + * don't hand the socket a slice that could later be overwritten by + * subsequent reads. + */ +async function pumpFrames( + stdout: ReadableStream | null, + sock: CapturedSocket, + frameBytes: number, + isStopping: () => boolean, +): Promise { + if (!stdout) return; + const reader = stdout.getReader(); + + // Accumulator for partial frames. We avoid per-chunk allocations of + // `frameBytes` by growing this only when needed. + let buffer = new Uint8Array(0); + + try { + while (!isStopping()) { + const { value, done } = await reader.read(); + if (done) break; + if (!value || value.length === 0) continue; + + // Append the new chunk. For a small buffer this is cheap; a zero-copy + // chain would be faster but premature here — 20ms frames at 16kHz + // are 320 bytes, well inside memcpy-is-fine territory. + const next = new Uint8Array(buffer.length + value.length); + next.set(buffer, 0); + next.set(value, buffer.length); + buffer = next; + + // Flush complete frames. + while (buffer.length >= frameBytes && !isStopping()) { + const frame = buffer.slice(0, frameBytes); + buffer = buffer.slice(frameBytes); + try { + sock.write(frame); + } catch { + // Socket write failure aborts the pump; the outer attempt loop + // will pick it up via the socket's `error`/`close` handlers. + return; + } + } + } + } finally { + try { + reader.releaseLock(); + } catch { + // releaseLock is a no-op if the stream is already closed. + } + } +}