diff --git a/skills/meet-join/bot/Dockerfile b/skills/meet-join/bot/Dockerfile index 64993c1db15..b02d3471525 100644 --- a/skills/meet-join/bot/Dockerfile +++ b/skills/meet-join/bot/Dockerfile @@ -57,8 +57,8 @@ COPY skills/meet-join/contracts /app/contracts # Install JS dependencies first so Docker can cache the layer when only # source files change. -COPY skills/meet-join/bot/package.json skills/meet-join/bot/tsconfig.json ./ -RUN bun install +COPY skills/meet-join/bot/package.json skills/meet-join/bot/bun.lock skills/meet-join/bot/tsconfig.json ./ +RUN bun install --frozen-lockfile # Pull Playwright's Chromium build. This is kept separate from apt's # chromium install so the Playwright-controlled binary stays in sync with diff --git a/skills/meet-join/bot/src/browser/chat-reader.ts b/skills/meet-join/bot/src/browser/chat-reader.ts index 1920f7b9ca6..42e49bc1be3 100644 --- a/skills/meet-join/bot/src/browser/chat-reader.ts +++ b/skills/meet-join/bot/src/browser/chat-reader.ts @@ -95,22 +95,20 @@ export async function startChatReader( ): Promise { const bindingName = `__meetBotChatBridge_${++bindingCounter}`; - // Bot-side dedupe: `sender|text|timestampBucketSeconds`. A 1-second bucket - // tolerates clock-skew and millisecond jitter between the rendered - // timestamp and our DOM read, while still catching identical rapid-fire - // re-posts (which would be unusual in practice). - const seenKeys = new Set(); - const dedupeKey = (sender: string, text: string, tsMs: number): string => - `${sender}|${text}|${Math.floor(tsMs / 1000)}`; + // Bot-side dedupe keyed on `domId`. The in-page observer already tracks + // seen DOM IDs, but across panel close/reopen cycles the in-page set + // resets. Using `domId` here preserves legitimate repeated messages + // (same sender + text within the same second) while still preventing + // double-emit on re-observation. + const seenDomIds = new Set(); const handleRaw = (raw: RawChatMessage): void => { // Authoritative self-flag wins; otherwise match by display name. const isSelf = raw.isSelf || raw.fromName === opts.selfName; if (isSelf) return; - const key = dedupeKey(raw.fromName, raw.text, raw.timestampMs); - if (seenKeys.has(key)) return; - seenKeys.add(key); + if (seenDomIds.has(raw.domId)) return; + seenDomIds.add(raw.domId); const event: InboundChatEvent = { type: "chat.inbound", @@ -130,9 +128,6 @@ export async function startChatReader( } }; - // Ensure the chat panel is open. We treat the existence of a message-node - // *selector match* (even an empty list) as a signal that the panel is - // mounted. If the selector returns nothing, we click the toolbar toggle. await ensurePanelOpen(page); // Prefer the MutationObserver path; on any failure, fall back to polling. @@ -154,15 +149,15 @@ export async function startChatReader( } /** - * Click the chat toggle once if the panel isn't already open. Idempotent — - * if Meet renders the message-list regardless of panel visibility, the - * query-selector check short-circuits and we never click. + * Click the chat toggle once if the panel isn't already open. Detects open + * state via the message-list container (mounted even when empty), not + * individual message nodes which require at least one message to exist. */ async function ensurePanelOpen(page: Page): Promise { try { const alreadyOpen = await page.evaluate((sel) => { return document.querySelector(sel) !== null; - }, chatSelectors.MESSAGE_NODE); + }, chatSelectors.MESSAGE_LIST); if (alreadyOpen) return; const toggle = await page.$(chatSelectors.PANEL_BUTTON); diff --git a/skills/meet-join/bot/src/browser/dom-selectors.ts b/skills/meet-join/bot/src/browser/dom-selectors.ts index f74a8b1e7c4..c326fc24db2 100644 --- a/skills/meet-join/bot/src/browser/dom-selectors.ts +++ b/skills/meet-join/bot/src/browser/dom-selectors.ts @@ -99,6 +99,12 @@ export const chatSelectors = { /** Send button adjacent to the chat composer. */ SEND_BUTTON: 'button[aria-label="Send a message"]', + /** + * Container that holds the list of chat messages. Used to detect whether the + * chat panel is open (the container is mounted even when no messages exist). + */ + MESSAGE_LIST: '[role="list"][aria-label="Chat messages"]', + /** * Root node for a single rendered chat message. We use a data attribute * rather than a class because Meet's message-list classes change often. @@ -195,6 +201,7 @@ export const selectors = { INGAME_CHAT_PANEL_BUTTON: chatSelectors.PANEL_BUTTON, INGAME_CHAT_INPUT: chatSelectors.INPUT, INGAME_CHAT_SEND_BUTTON: chatSelectors.SEND_BUTTON, + INGAME_CHAT_MESSAGE_LIST: chatSelectors.MESSAGE_LIST, INGAME_CHAT_MESSAGE_NODE: chatSelectors.MESSAGE_NODE, INGAME_CHAT_MESSAGE_SENDER: chatSelectors.MESSAGE_SENDER, INGAME_CHAT_MESSAGE_TEXT: chatSelectors.MESSAGE_TEXT, diff --git a/skills/meet-join/bot/src/browser/participant-scraper.ts b/skills/meet-join/bot/src/browser/participant-scraper.ts index 8fa4155144f..a53f4252542 100644 --- a/skills/meet-join/bot/src/browser/participant-scraper.ts +++ b/skills/meet-join/bot/src/browser/participant-scraper.ts @@ -109,9 +109,21 @@ export function startParticipantScraper( let previous: Map = new Map(); let firstPollComplete = false; let stopped = false; + let pollInFlight = false; const poll = async (): Promise => { if (stopped) return; + if (pollInFlight) return; + pollInFlight = true; + try { + await pollInner(); + } finally { + pollInFlight = false; + } + }; + + const pollInner = async (): Promise => { + if (stopped) return; let rows: ScrapedRow[]; try { @@ -197,6 +209,7 @@ export function startParticipantScraper( firstPollComplete = true; if (joined.length === 0 && left.length === 0) return; + if (stopped) return; const event: ParticipantChangeEvent = { type: "participant.change", diff --git a/skills/meet-join/bot/src/browser/speaker-scraper.ts b/skills/meet-join/bot/src/browser/speaker-scraper.ts index 51611ed3ccc..acd96492eea 100644 --- a/skills/meet-join/bot/src/browser/speaker-scraper.ts +++ b/skills/meet-join/bot/src/browser/speaker-scraper.ts @@ -161,10 +161,9 @@ export function startSpeakerScraper( // Swallow — polling fallback covers us. }); - // Install the MutationObserver. Guarded by a try/catch because the page - // may have closed between the caller obtaining the handle and us - // reaching this line. - void page + // Install the MutationObserver. Track the promise so stop() can wait for + // it and tear down any late-installed observer. + const observerInstalled = page .evaluate( ({ selector, callbackName, observerName }) => { // Skip if somehow already installed (e.g. hot reload, duplicate @@ -234,6 +233,25 @@ export function startSpeakerScraper( // Swallow — the polling fallback still emits transitions. }); + // If stop() was called while the observer was being installed, tear + // down the late observer immediately. + void observerInstalled.then(() => { + if (stopped && !page.isClosed()) { + void page + .evaluate((observerName) => { + const w = window as unknown as Record; + const observer = w[observerName] as + | { disconnect: () => void } + | undefined; + if (observer && typeof observer.disconnect === "function") { + observer.disconnect(); + } + delete w[observerName]; + }, observerGlobal) + .catch(() => {}); + } + }); + // ----- Fallback path: Node-side polling of the same selector ----- const pollOnce = async (): Promise => { diff --git a/skills/meet-join/bot/src/browser/xvfb.ts b/skills/meet-join/bot/src/browser/xvfb.ts index dd6baa61244..4bd02463106 100644 --- a/skills/meet-join/bot/src/browser/xvfb.ts +++ b/skills/meet-join/bot/src/browser/xvfb.ts @@ -21,7 +21,7 @@ */ import type { Subprocess } from "bun"; -import { existsSync } from "node:fs"; +import { existsSync, readFileSync, unlinkSync } from "node:fs"; /** Opaque handle returned by `startXvfb`, consumed by `stopXvfb`. */ export interface XvfbHandle { @@ -61,6 +61,25 @@ function lockFilePath(displayIndex: number): string { return `/tmp/.X${displayIndex}-lock`; } +function parseLockPid(lockPath: string): number | null { + try { + const content = readFileSync(lockPath, "utf8").trim(); + const pid = Number.parseInt(content, 10); + return Number.isFinite(pid) && pid > 0 ? pid : null; + } catch { + return null; + } +} + +function isProcessAlive(pid: number): boolean { + try { + process.kill(pid, 0); + return true; + } catch { + return false; + } +} + async function sleep(ms: number): Promise { await new Promise((resolve) => setTimeout(resolve, ms)); } @@ -76,13 +95,22 @@ export async function startXvfb(display = ":99"): Promise { const lockPath = lockFilePath(displayIndex); if (existsSync(lockPath)) { - // Another process already owns this display; don't fight it. Returning a - // handle with `process: null` keeps the call idempotent — callers can - // still call `stopXvfb` unconditionally without tracking who started what. - return { display, process: null }; + // Verify the lock holder is still alive. If Xvfb died uncleanly its + // lock file lingers and prevents respawning. + const pid = parseLockPid(lockPath); + if (pid !== null && isProcessAlive(pid)) { + return { display, process: null }; + } + // Stale lock — remove it so we can respawn. + try { + unlinkSync(lockPath); + } catch { + // Race with another cleanup; fine. + } } - const proc = Bun.spawn(["Xvfb", display, "-screen", "0", "1280x720x24"], { + const canonicalDisplay = `:${displayIndex}`; + const proc = Bun.spawn(["Xvfb", canonicalDisplay, "-screen", "0", "1280x720x24"], { stdin: "ignore", stdout: "ignore", stderr: "pipe", @@ -91,7 +119,7 @@ export async function startXvfb(display = ":99"): Promise { const deadline = Date.now() + LOCK_WAIT_TIMEOUT_MS; while (Date.now() < deadline) { if (existsSync(lockPath)) { - return { display, process: proc }; + return { display: canonicalDisplay, process: proc }; } // If Xvfb died during startup, bail out with a useful error instead of // spinning until the timeout. diff --git a/skills/meet-join/bot/src/media/audio-capture.ts b/skills/meet-join/bot/src/media/audio-capture.ts index c16eaa32db7..df69d9024db 100644 --- a/skills/meet-join/bot/src/media/audio-capture.ts +++ b/skills/meet-join/bot/src/media/audio-capture.ts @@ -147,7 +147,7 @@ function defaultSpawn(argv: readonly string[]): SpawnedParec { const proc: Subprocess = Bun.spawn(argv as string[], { stdin: "ignore", stdout: "pipe", - stderr: "pipe", + stderr: "inherit", }); return { stdout: proc.stdout as ReadableStream | null, @@ -247,6 +247,7 @@ export async function startAudioCapture( async function runOneAttempt(): Promise<{ outcome: AttemptOutcome; error?: Error; + hadData: boolean; }> { let attemptError: Error | undefined; @@ -258,6 +259,7 @@ export async function startAudioCapture( return { outcome: "parec", error: err instanceof Error ? err : new Error(String(err)), + hadData: false, }; } currentProc = proc; @@ -276,6 +278,7 @@ export async function startAudioCapture( return { outcome: "socket", error: err instanceof Error ? err : new Error(String(err)), + hadData: false, }; } currentSocket = sock; @@ -305,7 +308,8 @@ export async function startAudioCapture( // 4. Pipe parec.stdout through the frame chunker into the socket. // We deliberately don't `await` the pump — it races against the three // promises above and terminates when any of them settles. - const pumpDone = pumpFrames(proc.stdout, sock, frameBytes, () => stopping); + let framesWritten = false; + const pumpDone = pumpFrames(proc.stdout, sock, frameBytes, () => stopping, () => { framesWritten = true; }); const raceOutcome = await Promise.race([ stoppedP, @@ -354,9 +358,9 @@ export async function startAudioCapture( currentSocket = null; if (outcome === "stopped") { - return { outcome: "stopped" }; + return { outcome: "stopped", hadData: framesWritten }; } - return { outcome, error: attemptError }; + return { outcome, error: attemptError, hadData: framesWritten }; } /** @@ -368,13 +372,18 @@ export async function startAudioCapture( let consecutiveFailures = 0; while (!stopping) { - const { outcome, error } = await runOneAttempt(); + const { outcome, error, hadData } = await runOneAttempt(); if (outcome === "stopped") { break; } - // Any non-stop outcome counts as a failure toward the budget. + // Reset the failure counter if this attempt successfully transferred + // data — the pipeline was healthy for a while before it broke. + if (hadData) { + consecutiveFailures = 0; + } + consecutiveFailures += 1; if (consecutiveFailures > MAX_RECONNECT_ATTEMPTS) { fatalError = @@ -463,6 +472,7 @@ async function pumpFrames( sock: CapturedSocket, frameBytes: number, isStopping: () => boolean, + onFrame?: () => void, ): Promise { if (!stdout) return; const reader = stdout.getReader(); @@ -491,6 +501,7 @@ async function pumpFrames( buffer = buffer.slice(frameBytes); try { sock.write(frame); + onFrame?.(); } catch { // Socket write failure aborts the pump; the outer attempt loop // will pick it up via the socket's `error`/`close` handlers. diff --git a/skills/meet-join/daemon/storage-writer.ts b/skills/meet-join/daemon/storage-writer.ts index 88de36041f7..48128e1b9eb 100644 --- a/skills/meet-join/daemon/storage-writer.ts +++ b/skills/meet-join/daemon/storage-writer.ts @@ -268,13 +268,17 @@ export class MeetStorageWriter { log.info({ meetingId: this.meetingId, code, signal }, "ffmpeg exited"); }); child.stderr?.on("data", (chunk: Buffer) => { - // ffmpeg writes progress to stderr; keep at debug so prod logs stay - // clean but debugging is possible if needed. log.debug( { meetingId: this.meetingId, stderr: chunk.toString("utf8") }, "ffmpeg stderr", ); }); + child.stdin.on("error", (err) => { + log.debug( + { err, meetingId: this.meetingId }, + "ffmpeg stdin error (suppressed)", + ); + }); this.ffmpegChild = child; @@ -289,10 +293,14 @@ export class MeetStorageWriter { */ async stop(): Promise { if (this.stopped) return; - this.stopped = true; + // Perform fallible I/O cleanup before setting stopped=true so that if + // this throws, a retry call to stop() won't short-circuit and leak + // resources. this.closeOpenSegmentAt(new Date().toISOString()); + this.stopped = true; + if (this.eventUnsubscribe) { try { this.eventUnsubscribe(); @@ -361,8 +369,7 @@ export class MeetStorageWriter { this.closeFfmpegStdin(); } break; - // chat.inbound is not persisted by the storage writer — conversation - // bridge (PR 17) handles chat surface. Drop silently. + // chat.inbound is handled by the conversation bridge. Drop silently. default: break; } @@ -544,6 +551,7 @@ export class MeetStorageWriter { private closeFfmpegStdin(): void { const child = this.ffmpegChild; if (!child) return; + this.ffmpegChild = null; try { child.stdin?.end(); } catch (err) {