diff --git a/assistant/src/meet/__tests__/storage-writer.test.ts b/assistant/src/meet/__tests__/storage-writer.test.ts new file mode 100644 index 00000000000..d4f4a20f2af --- /dev/null +++ b/assistant/src/meet/__tests__/storage-writer.test.ts @@ -0,0 +1,633 @@ +/** + * Unit tests for {@link MeetStorageWriter}. + * + * These tests run against a tempdir workspace, bypass the real ffmpeg by + * injecting a mock `spawn` that records bytes piped into the spawned + * child's stdin, and drive the writer through its registered + * `MeetSessionEventRouter` handler. + */ + +import { + existsSync, + mkdtempSync, + readFileSync, + rmSync, +} from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { EventEmitter } from "node:events"; +import { + afterEach, + beforeEach, + describe, + expect, + mock, + test, +} from "bun:test"; + +import type { MeetBotEvent, Participant } from "@vellumai/meet-contracts"; + +import { + __resetMeetSessionEventRouterForTests, + getMeetSessionEventRouter, +} from "../session-event-router.js"; +import { + FSYNC_INTERVAL_MS, + FSYNC_WRITE_THRESHOLD, + MeetStorageWriter, + type PcmSource, +} from "../storage-writer.js"; + +// --------------------------------------------------------------------------- +// Fixtures +// --------------------------------------------------------------------------- + +interface MockFfmpegChild extends EventEmitter { + stdin: { + write: (chunk: Buffer) => boolean; + end: () => void; + chunks: Buffer[]; + ended: boolean; + }; + stdout: EventEmitter; + stderr: EventEmitter; +} + +function makeMockFfmpegChild(): MockFfmpegChild { + const emitter = new EventEmitter() as MockFfmpegChild; + const chunks: Buffer[] = []; + emitter.stdin = { + chunks, + ended: false, + write(chunk: Buffer): boolean { + chunks.push(chunk); + return true; + }, + end(): void { + this.ended = true; + }, + }; + emitter.stdout = new EventEmitter(); + emitter.stderr = new EventEmitter(); + return emitter; +} + +function makeSpawnMock(): { + spawn: ReturnType; + lastChild: () => MockFfmpegChild | null; + calls: () => Array<{ cmd: string; args: readonly string[] }>; +} { + let child: MockFfmpegChild | null = null; + const calls: Array<{ cmd: string; args: readonly string[] }> = []; + const spawn = mock((cmd: string, args: readonly string[]) => { + calls.push({ cmd, args: [...args] }); + child = makeMockFfmpegChild(); + return child as unknown as ReturnType; + }); + return { + spawn, + lastChild: () => child, + calls: () => calls, + }; +} + +function makeTestPcmSource(): { + source: PcmSource; + push: (bytes: Uint8Array) => void; + subscribers: number; +} { + const cbs = new Set<(bytes: Uint8Array) => void>(); + const state = { + source: { + subscribe(cb: (bytes: Uint8Array) => void): () => void { + cbs.add(cb); + return () => { + cbs.delete(cb); + }; + }, + } as PcmSource, + push(bytes: Uint8Array): void { + for (const cb of cbs) cb(bytes); + }, + get subscribers(): number { + return cbs.size; + }, + }; + return state; +} + +function participant(id: string, name: string): Participant { + return { id, name }; +} + +function transcriptChunk( + meetingId: string, + timestamp: string, + text: string, + options: { isFinal?: boolean; speakerId?: string; speakerLabel?: string } = {}, +): MeetBotEvent { + return { + type: "transcript.chunk", + meetingId, + timestamp, + isFinal: options.isFinal ?? true, + text, + speakerId: options.speakerId, + speakerLabel: options.speakerLabel, + }; +} + +function speakerChange( + meetingId: string, + timestamp: string, + speakerId: string, + speakerName: string, +): MeetBotEvent { + return { + type: "speaker.change", + meetingId, + timestamp, + speakerId, + speakerName, + }; +} + +function participantChange( + meetingId: string, + timestamp: string, + joined: Participant[], + left: Participant[] = [], +): MeetBotEvent { + return { + type: "participant.change", + meetingId, + timestamp, + joined, + left, + }; +} + +function lifecycleLeft(meetingId: string, timestamp: string): MeetBotEvent { + return { + type: "lifecycle", + meetingId, + timestamp, + state: "left", + }; +} + +function readJsonlLines(path: string): Array> { + if (!existsSync(path)) return []; + const raw = readFileSync(path, "utf8").trim(); + if (!raw) return []; + return raw.split("\n").map((line) => JSON.parse(line)); +} + +// --------------------------------------------------------------------------- +// Test scaffolding +// --------------------------------------------------------------------------- + +let workspaceDir: string; + +beforeEach(() => { + workspaceDir = mkdtempSync(join(tmpdir(), "meet-storage-writer-test-")); + __resetMeetSessionEventRouterForTests(); +}); + +afterEach(() => { + rmSync(workspaceDir, { recursive: true, force: true }); +}); + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +describe("MeetStorageWriter.start / router handler registration", () => { + test("start() creates the meeting dir and registers with the router", () => { + const writer = new MeetStorageWriter("m1", { + getWorkspaceDir: () => workspaceDir, + }); + writer.start(); + + expect(existsSync(join(workspaceDir, "meets", "m1"))).toBe(true); + expect(getMeetSessionEventRouter().registeredCount()).toBe(1); + }); + + test("start() is idempotent", () => { + const writer = new MeetStorageWriter("m1", { + getWorkspaceDir: () => workspaceDir, + }); + writer.start(); + writer.start(); + expect(getMeetSessionEventRouter().registeredCount()).toBe(1); + }); + + test("stop() unregisters from the router", async () => { + const writer = new MeetStorageWriter("m1", { + getWorkspaceDir: () => workspaceDir, + }); + writer.start(); + expect(getMeetSessionEventRouter().registeredCount()).toBe(1); + await writer.stop(); + expect(getMeetSessionEventRouter().registeredCount()).toBe(0); + }); +}); + +describe("MeetStorageWriter transcript.jsonl", () => { + test("appends final transcript chunks and ignores interim", async () => { + const writer = new MeetStorageWriter("m1", { + getWorkspaceDir: () => workspaceDir, + }); + writer.start(); + + const router = getMeetSessionEventRouter(); + router.dispatch( + "m1", + transcriptChunk("m1", "2024-01-01T00:00:00.000Z", "hello", { + isFinal: false, + }), + ); + router.dispatch( + "m1", + transcriptChunk("m1", "2024-01-01T00:00:01.000Z", "hello world", { + isFinal: true, + speakerId: "s1", + speakerLabel: "Alice", + }), + ); + router.dispatch( + "m1", + transcriptChunk("m1", "2024-01-01T00:00:02.000Z", "second final", { + isFinal: true, + }), + ); + + await writer.stop(); + + const lines = readJsonlLines( + join(workspaceDir, "meets", "m1", "transcript.jsonl"), + ); + expect(lines).toHaveLength(2); + expect(lines[0]).toEqual({ + timestamp: "2024-01-01T00:00:01.000Z", + text: "hello world", + speakerId: "s1", + speakerLabel: "Alice", + }); + expect(lines[1]).toEqual({ + timestamp: "2024-01-01T00:00:02.000Z", + text: "second final", + }); + }); +}); + +describe("MeetStorageWriter segments.jsonl", () => { + test("closes previous segment at each new speaker.change", async () => { + const writer = new MeetStorageWriter("m1", { + getWorkspaceDir: () => workspaceDir, + }); + writer.start(); + + const router = getMeetSessionEventRouter(); + router.dispatch( + "m1", + speakerChange("m1", "2024-01-01T00:00:00.000Z", "s1", "Alice"), + ); + router.dispatch( + "m1", + speakerChange("m1", "2024-01-01T00:00:05.000Z", "s2", "Bob"), + ); + router.dispatch( + "m1", + speakerChange("m1", "2024-01-01T00:00:12.000Z", "s1", "Alice"), + ); + router.dispatch( + "m1", + lifecycleLeft("m1", "2024-01-01T00:00:20.000Z"), + ); + + await writer.stop(); + + const lines = readJsonlLines( + join(workspaceDir, "meets", "m1", "segments.jsonl"), + ); + expect(lines).toEqual([ + { + start: "2024-01-01T00:00:00.000Z", + end: "2024-01-01T00:00:05.000Z", + speakerId: "s1", + speakerName: "Alice", + }, + { + start: "2024-01-01T00:00:05.000Z", + end: "2024-01-01T00:00:12.000Z", + speakerId: "s2", + speakerName: "Bob", + }, + { + start: "2024-01-01T00:00:12.000Z", + end: "2024-01-01T00:00:20.000Z", + speakerId: "s1", + speakerName: "Alice", + }, + ]); + }); + + test("open span on stop() is closed at stop timestamp", async () => { + const writer = new MeetStorageWriter("m1", { + getWorkspaceDir: () => workspaceDir, + }); + writer.start(); + + getMeetSessionEventRouter().dispatch( + "m1", + speakerChange("m1", "2024-01-01T00:00:00.000Z", "s1", "Alice"), + ); + + await writer.stop(); + + const lines = readJsonlLines( + join(workspaceDir, "meets", "m1", "segments.jsonl"), + ); + expect(lines).toHaveLength(1); + expect(lines[0].start).toBe("2024-01-01T00:00:00.000Z"); + expect(typeof lines[0].end).toBe("string"); + expect(lines[0].speakerId).toBe("s1"); + }); +}); + +describe("MeetStorageWriter participants.json", () => { + test("overwrites with the latest full list (not a diff)", async () => { + const writer = new MeetStorageWriter("m1", { + getWorkspaceDir: () => workspaceDir, + }); + writer.start(); + + const router = getMeetSessionEventRouter(); + router.dispatch( + "m1", + participantChange("m1", "2024-01-01T00:00:00.000Z", [ + participant("a", "Alice"), + participant("b", "Bob"), + ]), + ); + + const afterFirst = JSON.parse( + readFileSync( + join(workspaceDir, "meets", "m1", "participants.json"), + "utf8", + ), + ); + expect(afterFirst).toEqual([ + { id: "a", name: "Alice" }, + { id: "b", name: "Bob" }, + ]); + + router.dispatch( + "m1", + participantChange( + "m1", + "2024-01-01T00:00:05.000Z", + [participant("c", "Carol")], + [participant("a", "Alice")], + ), + ); + + const afterSecond = JSON.parse( + readFileSync( + join(workspaceDir, "meets", "m1", "participants.json"), + "utf8", + ), + ); + // Full snapshot: Bob remains, Alice was removed, Carol was added. + expect(afterSecond).toEqual([ + { id: "b", name: "Bob" }, + { id: "c", name: "Carol" }, + ]); + + await writer.stop(); + }); +}); + +describe("MeetStorageWriter meta.json", () => { + test("lifecycle:left writes meta.json with aggregate counters", async () => { + const writer = new MeetStorageWriter("m1", { + getWorkspaceDir: () => workspaceDir, + }); + writer.start(); + + const router = getMeetSessionEventRouter(); + router.dispatch( + "m1", + participantChange("m1", "2024-01-01T00:00:00.000Z", [ + participant("a", "Alice"), + participant("b", "Bob"), + ]), + ); + router.dispatch( + "m1", + transcriptChunk("m1", "2024-01-01T00:00:01.000Z", "hello", { + isFinal: true, + }), + ); + router.dispatch( + "m1", + transcriptChunk("m1", "2024-01-01T00:00:02.000Z", "world!!", { + isFinal: true, + }), + ); + router.dispatch( + "m1", + lifecycleLeft("m1", "2024-01-01T00:00:30.000Z"), + ); + + // meta.json is written on lifecycle:left, not stop() + const meta = JSON.parse( + readFileSync(join(workspaceDir, "meets", "m1", "meta.json"), "utf8"), + ); + expect(meta.meetingId).toBe("m1"); + expect(meta.participantCount).toBe(2); + expect(meta.totalTranscriptChars).toBe("hello".length + "world!!".length); + expect(meta.endedAt).toBe("2024-01-01T00:00:30.000Z"); + expect(typeof meta.startedAt).toBe("string"); + + await writer.stop(); + }); +}); + +describe("MeetStorageWriter audio pipeline (mocked spawn)", () => { + test("startAudio spawns ffmpeg with expected args and pipes PCM bytes", async () => { + const { spawn, lastChild, calls } = makeSpawnMock(); + const pcm = makeTestPcmSource(); + + const writer = new MeetStorageWriter("m1", { + getWorkspaceDir: () => workspaceDir, + spawn: spawn as unknown as typeof import("node:child_process").spawn, + }); + writer.start(); + await writer.startAudio(pcm.source); + + // Exactly one spawn; argv ends in the resolved audio.opus path. + expect(calls()).toHaveLength(1); + const call = calls()[0]; + expect(call.cmd).toBe("ffmpeg"); + const argv = call.args; + expect(argv[argv.length - 1]).toBe( + join(workspaceDir, "meets", "m1", "audio.opus"), + ); + expect(argv).toContain("-f"); + expect(argv).toContain("s16le"); + expect(argv).toContain("pipe:0"); + expect(argv).toContain("libopus"); + + // Push PCM — the mock child records bytes. + pcm.push(new Uint8Array([1, 2, 3, 4])); + pcm.push(new Uint8Array([5, 6])); + + const child = lastChild(); + expect(child).not.toBeNull(); + const received = Buffer.concat(child!.stdin.chunks); + expect(received.equals(Buffer.from([1, 2, 3, 4, 5, 6]))).toBe(true); + + // stop() closes ffmpeg stdin and drops the pcm subscription. + await writer.stop(); + expect(child!.stdin.ended).toBe(true); + expect(pcm.subscribers).toBe(0); + }); + + test("lifecycle:left closes ffmpeg stdin even without stop()", async () => { + const { spawn, lastChild } = makeSpawnMock(); + const pcm = makeTestPcmSource(); + + const writer = new MeetStorageWriter("m1", { + getWorkspaceDir: () => workspaceDir, + spawn: spawn as unknown as typeof import("node:child_process").spawn, + }); + writer.start(); + await writer.startAudio(pcm.source); + + getMeetSessionEventRouter().dispatch( + "m1", + lifecycleLeft("m1", "2024-01-01T00:00:00.000Z"), + ); + + expect(lastChild()!.stdin.ended).toBe(true); + + await writer.stop(); + }); + + test("startAudio is a no-op after the first spawn", async () => { + const { spawn } = makeSpawnMock(); + const pcm = makeTestPcmSource(); + + const writer = new MeetStorageWriter("m1", { + getWorkspaceDir: () => workspaceDir, + spawn: spawn as unknown as typeof import("node:child_process").spawn, + }); + writer.start(); + await writer.startAudio(pcm.source); + await writer.startAudio(pcm.source); + + expect(spawn).toHaveBeenCalledTimes(1); + + await writer.stop(); + }); +}); + +describe("MeetStorageWriter fsync cadence", () => { + test("fsyncs after FSYNC_WRITE_THRESHOLD writes", async () => { + // Frozen clock so only the write-count threshold can trigger fsync. + const now = () => 0; + const fsyncSyncMock = mock((_fd: number) => {}); + const writer = new MeetStorageWriter("m1", { + getWorkspaceDir: () => workspaceDir, + now, + fs: { fsyncSync: fsyncSyncMock as unknown as typeof import("node:fs").fsyncSync }, + }); + writer.start(); + + const router = getMeetSessionEventRouter(); + for (let i = 0; i < FSYNC_WRITE_THRESHOLD; i++) { + router.dispatch( + "m1", + transcriptChunk("m1", `2024-01-01T00:00:${i.toString().padStart(2, "0")}.000Z`, "x", { + isFinal: true, + }), + ); + } + // At threshold, at least one fsync should have been triggered by the + // write-count path (on the transcript fd). + const countsBeforeStop = fsyncSyncMock.mock.calls.length; + expect(countsBeforeStop).toBeGreaterThanOrEqual(1); + + await writer.stop(); + }); + + test("fsyncs when FSYNC_INTERVAL_MS elapses between writes", async () => { + let t = 0; + const now = () => t; + const fsyncSyncMock = mock((_fd: number) => {}); + const writer = new MeetStorageWriter("m1", { + getWorkspaceDir: () => workspaceDir, + now, + fs: { fsyncSync: fsyncSyncMock as unknown as typeof import("node:fs").fsyncSync }, + }); + writer.start(); + + const router = getMeetSessionEventRouter(); + // First write establishes the fd; lastFlushAtMs is set to 0. + router.dispatch( + "m1", + transcriptChunk("m1", "2024-01-01T00:00:00.000Z", "x", { + isFinal: true, + }), + ); + const beforeJump = fsyncSyncMock.mock.calls.length; + + // Jump the clock past the interval and write again — must trigger fsync. + t = FSYNC_INTERVAL_MS + 1; + router.dispatch( + "m1", + transcriptChunk("m1", "2024-01-01T00:00:10.000Z", "y", { + isFinal: true, + }), + ); + expect(fsyncSyncMock.mock.calls.length).toBeGreaterThan(beforeJump); + + await writer.stop(); + }); +}); + +describe("MeetStorageWriter error resilience", () => { + test("events to an unrelated meetingId never reach this writer", async () => { + const writer = new MeetStorageWriter("m1", { + getWorkspaceDir: () => workspaceDir, + }); + writer.start(); + + // Dispatch to a different meeting — router simply logs & drops because + // m2 has no handler. + getMeetSessionEventRouter().dispatch( + "m2", + transcriptChunk("m2", "2024-01-01T00:00:00.000Z", "nope", { + isFinal: true, + }), + ); + + await writer.stop(); + + expect( + existsSync(join(workspaceDir, "meets", "m1", "transcript.jsonl")), + ).toBe(false); + }); + + test("stop() is idempotent", async () => { + const writer = new MeetStorageWriter("m1", { + getWorkspaceDir: () => workspaceDir, + }); + writer.start(); + await writer.stop(); + await writer.stop(); + expect(getMeetSessionEventRouter().registeredCount()).toBe(0); + }); +}); diff --git a/assistant/src/meet/storage-writer.ts b/assistant/src/meet/storage-writer.ts new file mode 100644 index 00000000000..fb218522d41 --- /dev/null +++ b/assistant/src/meet/storage-writer.ts @@ -0,0 +1,542 @@ +/** + * MeetStorageWriter — per-meeting artifact writer. + * + * Materializes persistent artifacts for a meet-bot session into + * `/meets//`. The storage layout is: + * + * audio.opus — Opus-encoded audio written by a child ffmpeg that + * receives s16le@16kHz mono PCM on its stdin. + * segments.jsonl — One JSON line per DOM-reported speaker span. A span + * opens on a `speaker.change` event and is closed at + * the next `speaker.change` (or on session end). + * transcript.jsonl — One JSON line per *final* transcript chunk. Interim + * ASR chunks are ignored — only stable text is kept. + * participants.json — Full latest snapshot of participants (NOT a diff). + * Rewritten in full on each `participant.change`. + * meta.json — Summary record written when the session reaches + * lifecycle state "left". + * + * Append writes to segments/transcript use append mode and are explicitly + * fsync'd on meaningful boundaries (5s cadence or every 100 writes) so a + * daemon crash/kill doesn't silently lose just-emitted data. + * + * Dependency-injection hooks let tests substitute `spawn` (for ffmpeg) and + * the underlying fs primitives so the test suite doesn't need ffmpeg + * installed and can run against a tempdir workspace. + * + * Registered as a `MeetSessionEventRouter` handler. Callers are responsible + * for driving `startAudio` (when a PCM source is available) and `stop` on + * session teardown — the writer itself cleans up its subscription and the + * ffmpeg child when `stop` is invoked, and also closes the ffmpeg child on + * `lifecycle:left`. + */ + +import { + type ChildProcessWithoutNullStreams, + spawn as nodeSpawn, +} from "node:child_process"; +import { + closeSync, + fsyncSync, + mkdirSync, + openSync, + writeFileSync, + writeSync, +} from "node:fs"; +import { join } from "node:path"; + +import type { + MeetBotEvent, + Participant, + ParticipantChangeEvent, + SpeakerChangeEvent, + TranscriptChunkEvent, +} from "@vellumai/meet-contracts"; + +import { getLogger } from "../util/logger.js"; +import { getWorkspaceDir } from "../util/platform.js"; +import { getMeetSessionEventRouter } from "./session-event-router.js"; + +const log = getLogger("meet-storage-writer"); + +// --------------------------------------------------------------------------- +// Tuning knobs +// --------------------------------------------------------------------------- + +/** Flush cadence: force fsync at most every N milliseconds. */ +export const FSYNC_INTERVAL_MS = 5_000; + +/** Flush cadence: force fsync after N writes since the last flush. */ +export const FSYNC_WRITE_THRESHOLD = 100; + +/** + * ffmpeg arguments that encode the raw s16le@16kHz mono PCM stream flowing + * in on stdin to a 48 kbps Opus file at `/audio.opus`. `-y` + * overwrites an existing file — the previous session for this meeting + * shouldn't be pre-existing, but keep it explicit for idempotency. + */ +export const FFMPEG_AUDIO_ARGS = [ + "-f", + "s16le", + "-ar", + "16000", + "-ac", + "1", + "-i", + "pipe:0", + "-c:a", + "libopus", + "-b:a", + "48k", + "-y", +] as const; + +// --------------------------------------------------------------------------- +// Types +// --------------------------------------------------------------------------- + +/** + * Audio source that emits raw s16le@16kHz mono PCM chunks. The source + * provider registers a callback via `subscribe` and returns an unsubscribe + * function the writer invokes on stop. + */ +export interface PcmSource { + subscribe(cb: (bytes: Uint8Array) => void): () => void; +} + +/** Spawn primitive — `node:child_process#spawn` by default; swappable in tests. */ +export type SpawnFn = typeof nodeSpawn; + +/** fs primitives the writer relies on. All swappable in tests. */ +export interface FsPrimitives { + mkdirSync: typeof mkdirSync; + openSync: typeof openSync; + writeSync: typeof writeSync; + closeSync: typeof closeSync; + fsyncSync: typeof fsyncSync; + writeFileSync: typeof writeFileSync; +} + +export interface MeetStorageWriterDeps { + /** Override workspace directory resolution (tests). */ + getWorkspaceDir?: () => string; + /** Override the `spawn` used to launch ffmpeg (tests). */ + spawn?: SpawnFn; + /** Override fs primitives (tests). */ + fs?: Partial; + /** Override monotonic clock used for flush scheduling (tests). */ + now?: () => number; + /** Override the event router lookup (tests). */ + getRouter?: () => ReturnType; +} + +/** + * Shape of a span in `segments.jsonl`. `end` is null while a span is open + * (reserved for potential streaming consumers) and concrete by the time the + * line is flushed. + */ +interface SegmentLine { + start: string; + end: string; + speakerId: string; + speakerName: string; +} + +interface OpenSegment { + start: string; + speakerId: string; + speakerName: string; +} + +interface AppendFdState { + fd: number; + writesSinceFlush: number; + lastFlushAtMs: number; +} + +// --------------------------------------------------------------------------- +// Implementation +// --------------------------------------------------------------------------- + +const DEFAULT_FS: FsPrimitives = { + mkdirSync, + openSync, + writeSync, + closeSync, + fsyncSync, + writeFileSync, +}; + +export class MeetStorageWriter { + readonly meetingId: string; + readonly meetingDir: string; + + private readonly deps: Required; + private readonly fs: FsPrimitives; + + private segmentsFd: AppendFdState | null = null; + private transcriptFd: AppendFdState | null = null; + + private openSegment: OpenSegment | null = null; + private participants: Participant[] = []; + private totalTranscriptChars = 0; + private startedAt: string | null = null; + private endedAt: string | null = null; + + private ffmpegChild: ChildProcessWithoutNullStreams | null = null; + private pcmUnsubscribe: (() => void) | null = null; + + private routerHandler: ((event: MeetBotEvent) => void) | null = null; + private routerRegistered = false; + + private stopped = false; + + constructor(meetingId: string, deps: MeetStorageWriterDeps = {}) { + if (!meetingId) { + throw new Error("MeetStorageWriter: meetingId is required"); + } + this.meetingId = meetingId; + this.deps = { + getWorkspaceDir: deps.getWorkspaceDir ?? getWorkspaceDir, + spawn: deps.spawn ?? nodeSpawn, + fs: deps.fs ?? {}, + now: deps.now ?? Date.now, + getRouter: deps.getRouter ?? getMeetSessionEventRouter, + }; + this.fs = { ...DEFAULT_FS, ...(deps.fs ?? {}) }; + this.meetingDir = join( + this.deps.getWorkspaceDir(), + "meets", + this.meetingId, + ); + } + + /** + * Subscribe to the per-meeting event router. Idempotent. Callers should + * invoke this once, immediately after construction, so the writer catches + * the very first events of the session. + */ + start(): void { + if (this.routerRegistered) return; + this.ensureMeetingDir(); + if (!this.startedAt) this.startedAt = new Date().toISOString(); + const handler = (event: MeetBotEvent) => this.onEvent(event); + this.routerHandler = handler; + this.deps.getRouter().register(this.meetingId, handler); + this.routerRegistered = true; + } + + /** + * Start encoding audio. Spawns ffmpeg (s16le → Opus) and pipes PCM + * callbacks from `pcmSource` into its stdin. Safe to call multiple times; + * subsequent calls are no-ops after the first successful spawn. + */ + async startAudio(pcmSource: PcmSource): Promise { + if (this.stopped) { + throw new Error("MeetStorageWriter: cannot startAudio after stop()"); + } + if (this.ffmpegChild) return; + + this.ensureMeetingDir(); + + const audioPath = join(this.meetingDir, "audio.opus"); + const args = [...FFMPEG_AUDIO_ARGS, audioPath]; + + const child = this.deps.spawn("ffmpeg", args, { + stdio: ["pipe", "pipe", "pipe"], + }) as ChildProcessWithoutNullStreams; + + child.on("error", (err) => { + log.error( + { err, meetingId: this.meetingId }, + "ffmpeg spawn/runtime error", + ); + }); + child.on("exit", (code, signal) => { + log.info( + { meetingId: this.meetingId, code, signal }, + "ffmpeg exited", + ); + }); + child.stderr?.on("data", (chunk: Buffer) => { + // ffmpeg writes progress to stderr; keep at debug so prod logs stay + // clean but debugging is possible if needed. + log.debug( + { meetingId: this.meetingId, stderr: chunk.toString("utf8") }, + "ffmpeg stderr", + ); + }); + + this.ffmpegChild = child; + + this.pcmUnsubscribe = pcmSource.subscribe((bytes) => { + this.writeAudio(bytes); + }); + } + + /** + * Flush buffers, close open segment, unsubscribe from the router, and + * close the ffmpeg child. Idempotent: subsequent calls are no-ops. + */ + async stop(): Promise { + if (this.stopped) return; + this.stopped = true; + + this.closeOpenSegmentAt(new Date().toISOString()); + + if (this.routerRegistered) { + this.deps.getRouter().unregister(this.meetingId); + this.routerRegistered = false; + this.routerHandler = null; + } + + this.flushAndCloseFd(this.segmentsFd); + this.segmentsFd = null; + this.flushAndCloseFd(this.transcriptFd); + this.transcriptFd = null; + + if (this.pcmUnsubscribe) { + try { + this.pcmUnsubscribe(); + } catch (err) { + log.warn( + { err, meetingId: this.meetingId }, + "pcm unsubscribe threw during stop", + ); + } + this.pcmUnsubscribe = null; + } + + const child = this.ffmpegChild; + if (child) { + this.ffmpegChild = null; + try { + child.stdin?.end(); + } catch (err) { + log.warn( + { err, meetingId: this.meetingId }, + "ffmpeg stdin close threw during stop", + ); + } + } + } + + // ------------------------------------------------------------------------- + // Event handling + // ------------------------------------------------------------------------- + + private onEvent(event: MeetBotEvent): void { + if (this.stopped) return; + try { + switch (event.type) { + case "transcript.chunk": + this.onTranscriptChunk(event); + break; + case "speaker.change": + this.onSpeakerChange(event); + break; + case "participant.change": + this.onParticipantChange(event); + break; + case "lifecycle": + if (event.state === "left") { + this.endedAt = event.timestamp; + this.closeOpenSegmentAt(event.timestamp); + this.writeMetaJson(); + this.closeFfmpegStdin(); + } + break; + // chat.inbound is not persisted by the storage writer — conversation + // bridge (PR 17) handles chat surface. Drop silently. + default: + break; + } + } catch (err) { + log.error( + { err, meetingId: this.meetingId, eventType: event.type }, + "MeetStorageWriter: handler threw", + ); + } + } + + private onTranscriptChunk(event: TranscriptChunkEvent): void { + if (!event.isFinal) return; + const line: Record = { + timestamp: event.timestamp, + text: event.text, + }; + if (event.speakerId !== undefined) line.speakerId = event.speakerId; + if (event.speakerLabel !== undefined) line.speakerLabel = event.speakerLabel; + if (event.confidence !== undefined) line.confidence = event.confidence; + this.appendJsonl("transcript", line); + this.totalTranscriptChars += event.text.length; + } + + private onSpeakerChange(event: SpeakerChangeEvent): void { + // Close the previous span at this event's timestamp, then open a new + // one starting now. Back-to-back speaker.change events for the same + // speaker still produce a closed+opened pair — the source of truth for + // speaker spans is the sequence of events we receive. + this.closeOpenSegmentAt(event.timestamp); + this.openSegment = { + start: event.timestamp, + speakerId: event.speakerId, + speakerName: event.speakerName, + }; + } + + private onParticipantChange(event: ParticipantChangeEvent): void { + // Maintain a running snapshot and rewrite the file in full on each + // change. We prefer the explicit `joined`/`left` arrays over a naive + // full-replacement so id-stable updates (name changes, host flag + // transitions) don't duplicate entries. + const byId = new Map( + this.participants.map((p) => [p.id, p]), + ); + for (const p of event.joined) byId.set(p.id, p); + for (const p of event.left) byId.delete(p.id); + this.participants = Array.from(byId.values()); + this.writeParticipantsJson(); + } + + // ------------------------------------------------------------------------- + // File writing + // ------------------------------------------------------------------------- + + private ensureMeetingDir(): void { + this.fs.mkdirSync(this.meetingDir, { recursive: true }); + } + + private openAppendFd(name: string): AppendFdState { + this.ensureMeetingDir(); + const path = join(this.meetingDir, name); + // 'a' = O_APPEND|O_CREAT|O_WRONLY — atomic append relative to other + // writers on the same fd/file (single-process in our case). + const fd = this.fs.openSync(path, "a"); + return { fd, writesSinceFlush: 0, lastFlushAtMs: this.deps.now() }; + } + + private appendJsonl( + kind: "segments" | "transcript", + line: Record, + ): void { + const filename = kind === "segments" ? "segments.jsonl" : "transcript.jsonl"; + const state = + kind === "segments" + ? (this.segmentsFd ??= this.openAppendFd(filename)) + : (this.transcriptFd ??= this.openAppendFd(filename)); + + const data = Buffer.from(JSON.stringify(line) + "\n", "utf8"); + this.fs.writeSync(state.fd, data); + state.writesSinceFlush += 1; + this.maybeFlush(state); + } + + private maybeFlush(state: AppendFdState): void { + const now = this.deps.now(); + if ( + state.writesSinceFlush >= FSYNC_WRITE_THRESHOLD || + now - state.lastFlushAtMs >= FSYNC_INTERVAL_MS + ) { + try { + this.fs.fsyncSync(state.fd); + } catch (err) { + log.warn( + { err, meetingId: this.meetingId }, + "fsync failed (non-fatal)", + ); + } + state.writesSinceFlush = 0; + state.lastFlushAtMs = now; + } + } + + private flushAndCloseFd(state: AppendFdState | null): void { + if (!state) return; + try { + this.fs.fsyncSync(state.fd); + } catch (err) { + log.warn( + { err, meetingId: this.meetingId }, + "final fsync failed (non-fatal)", + ); + } + try { + this.fs.closeSync(state.fd); + } catch (err) { + log.warn( + { err, meetingId: this.meetingId }, + "fd close failed (non-fatal)", + ); + } + } + + private closeOpenSegmentAt(endTimestamp: string): void { + const open = this.openSegment; + if (!open) return; + const line: SegmentLine = { + start: open.start, + end: endTimestamp, + speakerId: open.speakerId, + speakerName: open.speakerName, + }; + this.appendJsonl("segments", line as unknown as Record); + this.openSegment = null; + } + + private writeParticipantsJson(): void { + this.ensureMeetingDir(); + const path = join(this.meetingDir, "participants.json"); + // Overwrite with the full current list. atomic-rename is overkill here: + // the file is written via a single writeFileSync call, and a partial + // write at the OS level only drops the tail of a small JSON blob. + this.fs.writeFileSync( + path, + JSON.stringify(this.participants, null, 2) + "\n", + "utf8", + ); + } + + private writeMetaJson(): void { + this.ensureMeetingDir(); + const path = join(this.meetingDir, "meta.json"); + const meta = { + meetingId: this.meetingId, + startedAt: this.startedAt ?? new Date().toISOString(), + endedAt: this.endedAt ?? new Date().toISOString(), + participantCount: this.participants.length, + totalTranscriptChars: this.totalTranscriptChars, + }; + this.fs.writeFileSync(path, JSON.stringify(meta, null, 2) + "\n", "utf8"); + } + + // ------------------------------------------------------------------------- + // Audio + // ------------------------------------------------------------------------- + + private writeAudio(bytes: Uint8Array): void { + const child = this.ffmpegChild; + if (!child) return; + try { + child.stdin.write(Buffer.from(bytes)); + } catch (err) { + log.warn( + { err, meetingId: this.meetingId }, + "ffmpeg stdin write failed", + ); + } + } + + private closeFfmpegStdin(): void { + const child = this.ffmpegChild; + if (!child) return; + try { + child.stdin?.end(); + } catch (err) { + log.warn( + { err, meetingId: this.meetingId }, + "ffmpeg stdin end failed", + ); + } + } +}