Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions skills/meet-join/bot/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ COPY skills/meet-join/contracts /app/contracts

# Install JS dependencies first so Docker can cache the layer when only
# source files change.
COPY skills/meet-join/bot/package.json skills/meet-join/bot/tsconfig.json ./
RUN bun install
COPY skills/meet-join/bot/package.json skills/meet-join/bot/bun.lock skills/meet-join/bot/tsconfig.json ./
RUN bun install --frozen-lockfile

# Pull Playwright's Chromium build. This is kept separate from apt's
# chromium install so the Playwright-controlled binary stays in sync with
Expand Down
29 changes: 12 additions & 17 deletions skills/meet-join/bot/src/browser/chat-reader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,22 +95,20 @@ export async function startChatReader(
): Promise<ChatReader> {
const bindingName = `__meetBotChatBridge_${++bindingCounter}`;

// Bot-side dedupe: `sender|text|timestampBucketSeconds`. A 1-second bucket
// tolerates clock-skew and millisecond jitter between the rendered
// timestamp and our DOM read, while still catching identical rapid-fire
// re-posts (which would be unusual in practice).
const seenKeys = new Set<string>();
const dedupeKey = (sender: string, text: string, tsMs: number): string =>
`${sender}|${text}|${Math.floor(tsMs / 1000)}`;
// Bot-side dedupe keyed on `domId`. The in-page observer already tracks
// seen DOM IDs, but across panel close/reopen cycles the in-page set
// resets. Using `domId` here preserves legitimate repeated messages
// (same sender + text within the same second) while still preventing
// double-emit on re-observation.
const seenDomIds = new Set<string>();

const handleRaw = (raw: RawChatMessage): void => {
// Authoritative self-flag wins; otherwise match by display name.
const isSelf = raw.isSelf || raw.fromName === opts.selfName;
if (isSelf) return;

const key = dedupeKey(raw.fromName, raw.text, raw.timestampMs);
if (seenKeys.has(key)) return;
seenKeys.add(key);
if (seenDomIds.has(raw.domId)) return;
seenDomIds.add(raw.domId);

const event: InboundChatEvent = {
type: "chat.inbound",
Expand All @@ -130,9 +128,6 @@ export async function startChatReader(
}
};

// Ensure the chat panel is open. We treat the existence of a message-node
// *selector match* (even an empty list) as a signal that the panel is
// mounted. If the selector returns nothing, we click the toolbar toggle.
await ensurePanelOpen(page);

// Prefer the MutationObserver path; on any failure, fall back to polling.
Expand All @@ -154,15 +149,15 @@ export async function startChatReader(
}

/**
* Click the chat toggle once if the panel isn't already open. Idempotent —
* if Meet renders the message-list regardless of panel visibility, the
* query-selector check short-circuits and we never click.
* Click the chat toggle once if the panel isn't already open. Detects open
* state via the message-list container (mounted even when empty), not
* individual message nodes which require at least one message to exist.
*/
async function ensurePanelOpen(page: Page): Promise<void> {
try {
const alreadyOpen = await page.evaluate((sel) => {
return document.querySelector(sel) !== null;
}, chatSelectors.MESSAGE_NODE);
}, chatSelectors.MESSAGE_LIST);
if (alreadyOpen) return;

const toggle = await page.$(chatSelectors.PANEL_BUTTON);
Expand Down
7 changes: 7 additions & 0 deletions skills/meet-join/bot/src/browser/dom-selectors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ export const chatSelectors = {
/** Send button adjacent to the chat composer. */
SEND_BUTTON: 'button[aria-label="Send a message"]',

/**
* Container that holds the list of chat messages. Used to detect whether the
* chat panel is open (the container is mounted even when no messages exist).
*/
MESSAGE_LIST: '[role="list"][aria-label="Chat messages"]',

/**
* Root node for a single rendered chat message. We use a data attribute
* rather than a class because Meet's message-list classes change often.
Expand Down Expand Up @@ -195,6 +201,7 @@ export const selectors = {
INGAME_CHAT_PANEL_BUTTON: chatSelectors.PANEL_BUTTON,
INGAME_CHAT_INPUT: chatSelectors.INPUT,
INGAME_CHAT_SEND_BUTTON: chatSelectors.SEND_BUTTON,
INGAME_CHAT_MESSAGE_LIST: chatSelectors.MESSAGE_LIST,
INGAME_CHAT_MESSAGE_NODE: chatSelectors.MESSAGE_NODE,
INGAME_CHAT_MESSAGE_SENDER: chatSelectors.MESSAGE_SENDER,
INGAME_CHAT_MESSAGE_TEXT: chatSelectors.MESSAGE_TEXT,
Expand Down
13 changes: 13 additions & 0 deletions skills/meet-join/bot/src/browser/participant-scraper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,21 @@ export function startParticipantScraper(
let previous: Map<string, Participant> = new Map();
let firstPollComplete = false;
let stopped = false;
let pollInFlight = false;

const poll = async (): Promise<void> => {
if (stopped) return;
if (pollInFlight) return;
pollInFlight = true;
try {
await pollInner();
} finally {
pollInFlight = false;
}
};

const pollInner = async (): Promise<void> => {
if (stopped) return;

let rows: ScrapedRow[];
try {
Expand Down Expand Up @@ -197,6 +209,7 @@ export function startParticipantScraper(
firstPollComplete = true;

if (joined.length === 0 && left.length === 0) return;
if (stopped) return;

const event: ParticipantChangeEvent = {
type: "participant.change",
Expand Down
26 changes: 22 additions & 4 deletions skills/meet-join/bot/src/browser/speaker-scraper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,9 @@ export function startSpeakerScraper(
// Swallow — polling fallback covers us.
});

// Install the MutationObserver. Guarded by a try/catch because the page
// may have closed between the caller obtaining the handle and us
// reaching this line.
void page
// Install the MutationObserver. Track the promise so stop() can wait for
// it and tear down any late-installed observer.
const observerInstalled = page
.evaluate(
({ selector, callbackName, observerName }) => {
// Skip if somehow already installed (e.g. hot reload, duplicate
Expand Down Expand Up @@ -234,6 +233,25 @@ export function startSpeakerScraper(
// Swallow — the polling fallback still emits transitions.
});

// If stop() was called while the observer was being installed, tear
// down the late observer immediately.
void observerInstalled.then(() => {
if (stopped && !page.isClosed()) {
void page
.evaluate((observerName) => {
const w = window as unknown as Record<string, unknown>;
const observer = w[observerName] as
| { disconnect: () => void }
| undefined;
if (observer && typeof observer.disconnect === "function") {
observer.disconnect();
}
delete w[observerName];
}, observerGlobal)
.catch(() => {});
}
});

// ----- Fallback path: Node-side polling of the same selector -----

const pollOnce = async (): Promise<void> => {
Expand Down
42 changes: 35 additions & 7 deletions skills/meet-join/bot/src/browser/xvfb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
*/

import type { Subprocess } from "bun";
import { existsSync } from "node:fs";
import { existsSync, readFileSync, unlinkSync } from "node:fs";

/** Opaque handle returned by `startXvfb`, consumed by `stopXvfb`. */
export interface XvfbHandle {
Expand Down Expand Up @@ -61,6 +61,25 @@ function lockFilePath(displayIndex: number): string {
return `/tmp/.X${displayIndex}-lock`;
}

function parseLockPid(lockPath: string): number | null {
try {
const content = readFileSync(lockPath, "utf8").trim();
const pid = Number.parseInt(content, 10);
return Number.isFinite(pid) && pid > 0 ? pid : null;
} catch {
return null;
}
}

function isProcessAlive(pid: number): boolean {
try {
process.kill(pid, 0);
return true;
} catch {
return false;
Comment thread
siddseethepalli marked this conversation as resolved.
}
}

async function sleep(ms: number): Promise<void> {
await new Promise((resolve) => setTimeout(resolve, ms));
}
Expand All @@ -76,13 +95,22 @@ export async function startXvfb(display = ":99"): Promise<XvfbHandle> {
const lockPath = lockFilePath(displayIndex);

if (existsSync(lockPath)) {
// Another process already owns this display; don't fight it. Returning a
// handle with `process: null` keeps the call idempotent — callers can
// still call `stopXvfb` unconditionally without tracking who started what.
return { display, process: null };
// Verify the lock holder is still alive. If Xvfb died uncleanly its
// lock file lingers and prevents respawning.
const pid = parseLockPid(lockPath);
if (pid !== null && isProcessAlive(pid)) {
return { display, process: null };
Comment thread
siddseethepalli marked this conversation as resolved.
}
// Stale lock — remove it so we can respawn.
try {
unlinkSync(lockPath);
} catch {
// Race with another cleanup; fine.
}
}

const proc = Bun.spawn(["Xvfb", display, "-screen", "0", "1280x720x24"], {
const canonicalDisplay = `:${displayIndex}`;
const proc = Bun.spawn(["Xvfb", canonicalDisplay, "-screen", "0", "1280x720x24"], {
stdin: "ignore",
stdout: "ignore",
stderr: "pipe",
Expand All @@ -91,7 +119,7 @@ export async function startXvfb(display = ":99"): Promise<XvfbHandle> {
const deadline = Date.now() + LOCK_WAIT_TIMEOUT_MS;
while (Date.now() < deadline) {
if (existsSync(lockPath)) {
return { display, process: proc };
return { display: canonicalDisplay, process: proc };
}
// If Xvfb died during startup, bail out with a useful error instead of
// spinning until the timeout.
Expand Down
23 changes: 17 additions & 6 deletions skills/meet-join/bot/src/media/audio-capture.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ function defaultSpawn(argv: readonly string[]): SpawnedParec {
const proc: Subprocess = Bun.spawn(argv as string[], {
stdin: "ignore",
stdout: "pipe",
stderr: "pipe",
stderr: "inherit",
});
return {
stdout: proc.stdout as ReadableStream<Uint8Array> | null,
Expand Down Expand Up @@ -247,6 +247,7 @@ export async function startAudioCapture(
async function runOneAttempt(): Promise<{
outcome: AttemptOutcome;
error?: Error;
hadData: boolean;
}> {
let attemptError: Error | undefined;

Expand All @@ -258,6 +259,7 @@ export async function startAudioCapture(
return {
outcome: "parec",
error: err instanceof Error ? err : new Error(String(err)),
hadData: false,
};
}
currentProc = proc;
Expand All @@ -276,6 +278,7 @@ export async function startAudioCapture(
return {
outcome: "socket",
error: err instanceof Error ? err : new Error(String(err)),
hadData: false,
};
}
currentSocket = sock;
Expand Down Expand Up @@ -305,7 +308,8 @@ export async function startAudioCapture(
// 4. Pipe parec.stdout through the frame chunker into the socket.
// We deliberately don't `await` the pump — it races against the three
// promises above and terminates when any of them settles.
const pumpDone = pumpFrames(proc.stdout, sock, frameBytes, () => stopping);
let framesWritten = false;
const pumpDone = pumpFrames(proc.stdout, sock, frameBytes, () => stopping, () => { framesWritten = true; });

const raceOutcome = await Promise.race([
stoppedP,
Expand Down Expand Up @@ -354,9 +358,9 @@ export async function startAudioCapture(
currentSocket = null;

if (outcome === "stopped") {
return { outcome: "stopped" };
return { outcome: "stopped", hadData: framesWritten };
}
return { outcome, error: attemptError };
return { outcome, error: attemptError, hadData: framesWritten };
}

/**
Expand All @@ -368,13 +372,18 @@ export async function startAudioCapture(
let consecutiveFailures = 0;

while (!stopping) {
const { outcome, error } = await runOneAttempt();
const { outcome, error, hadData } = await runOneAttempt();

if (outcome === "stopped") {
break;
}

// Any non-stop outcome counts as a failure toward the budget.
// Reset the failure counter if this attempt successfully transferred
// data — the pipeline was healthy for a while before it broke.
if (hadData) {
consecutiveFailures = 0;
}
Comment thread
siddseethepalli marked this conversation as resolved.

consecutiveFailures += 1;
if (consecutiveFailures > MAX_RECONNECT_ATTEMPTS) {
fatalError =
Expand Down Expand Up @@ -463,6 +472,7 @@ async function pumpFrames(
sock: CapturedSocket,
frameBytes: number,
isStopping: () => boolean,
onFrame?: () => void,
): Promise<void> {
if (!stdout) return;
const reader = stdout.getReader();
Expand Down Expand Up @@ -491,6 +501,7 @@ async function pumpFrames(
buffer = buffer.slice(frameBytes);
try {
sock.write(frame);
onFrame?.();
} catch {
// Socket write failure aborts the pump; the outer attempt loop
// will pick it up via the socket's `error`/`close` handlers.
Expand Down
18 changes: 13 additions & 5 deletions skills/meet-join/daemon/storage-writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -268,13 +268,17 @@ export class MeetStorageWriter {
log.info({ meetingId: this.meetingId, code, signal }, "ffmpeg exited");
});
child.stderr?.on("data", (chunk: Buffer) => {
// ffmpeg writes progress to stderr; keep at debug so prod logs stay
// clean but debugging is possible if needed.
log.debug(
{ meetingId: this.meetingId, stderr: chunk.toString("utf8") },
"ffmpeg stderr",
);
});
child.stdin.on("error", (err) => {
log.debug(
{ err, meetingId: this.meetingId },
"ffmpeg stdin error (suppressed)",
);
});

this.ffmpegChild = child;

Expand All @@ -289,10 +293,14 @@ export class MeetStorageWriter {
*/
async stop(): Promise<void> {
if (this.stopped) return;
this.stopped = true;

// Perform fallible I/O cleanup before setting stopped=true so that if
// this throws, a retry call to stop() won't short-circuit and leak
// resources.
this.closeOpenSegmentAt(new Date().toISOString());

this.stopped = true;

if (this.eventUnsubscribe) {
try {
this.eventUnsubscribe();
Expand Down Expand Up @@ -361,8 +369,7 @@ export class MeetStorageWriter {
this.closeFfmpegStdin();
}
break;
// chat.inbound is not persisted by the storage writer — conversation
// bridge (PR 17) handles chat surface. Drop silently.
// chat.inbound is handled by the conversation bridge. Drop silently.
default:
break;
}
Expand Down Expand Up @@ -544,6 +551,7 @@ export class MeetStorageWriter {
private closeFfmpegStdin(): void {
const child = this.ffmpegChild;
if (!child) return;
this.ffmpegChild = null;
try {
child.stdin?.end();
} catch (err) {
Expand Down
Loading