Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 10 additions & 14 deletions skills/meet-join/bot/src/control/http-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -322,14 +322,12 @@ export function createHttpServer(
// awaiting it directly is safe.
await previousChain;

try {
let handle: AudioPlaybackHandle;
try {
handle = playbackFactory(playbackSpawnOptions);
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
// Release our slot so the next POST in the chain isn't blocked on a
// handler that failed before it even registered.
releaseChain();
return c.json({ error: `failed to start playback: ${message}` }, 500);
}

Expand All @@ -342,15 +340,14 @@ export function createHttpServer(

const body = c.req.raw.body;
if (!body) {
activeStreams.delete(streamId);
// No body is treated as an empty stream — flush trailing silence for
// symmetry and return success.
if (activeStreams.get(streamId)?.controller === controller) {
activeStreams.delete(streamId);
}
try {
await handle.flushSilence(TRAILING_SILENCE_MS);
} catch {
// Best-effort; silence is cosmetic.
}
releaseChain();
return c.json({ streamId, bytes: 0 }, 200);
}

Expand Down Expand Up @@ -411,18 +408,14 @@ export function createHttpServer(
} catch {
// Lock may already be released after `cancel()`; fine.
}
activeStreams.delete(streamId);
// Always flush trailing silence so we don't "pop" — even on cancel,
// which intentionally stops PCM mid-frame.
if (activeStreams.get(streamId)?.controller === controller) {
activeStreams.delete(streamId);
}
try {
await handle.flushSilence(TRAILING_SILENCE_MS);
} catch {
// Best-effort.
}
// Release our slot in the playback chain *after* the silence flush
// so any POST queued behind us only unblocks once the shared pacat
// stdin is fully quiesced.
releaseChain();
}

if (writeError) {
Expand All @@ -446,6 +439,9 @@ export function createHttpServer(
);
}
return c.json({ streamId, bytes }, 200);
} finally {
releaseChain();
}
});

// -------------------------------------------------------------------------
Expand Down
36 changes: 16 additions & 20 deletions skills/meet-join/daemon/barge-in-watcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ export class MeetBargeInWatcher {
/** Bot's DOM participant id, captured from the first `isSelf` joiner. */
private botSpeakerId: string | null = null;

/** True between `meet.speaking_started` and `meet.speaking_ended`. */
private isBotSpeaking = false;
/** Active bot TTS stream ids — non-empty means the bot is speaking. */
private activeSpeakingStreams = new Set<string>();

/** Debounce timer for a pending cancel. `null` when no cancel is queued. */
private pendingCancelHandle: unknown = null;
Expand Down Expand Up @@ -227,9 +227,7 @@ export class MeetBargeInWatcher {
this.hubSubscription = null;
}

// Reset speaking state so a re-start (e.g. test) doesn't inherit the
// last meeting's flag.
this.isBotSpeaking = false;
this.activeSpeakingStreams.clear();
}

/** Test-only: read the bot's discovered speaker id. */
Expand All @@ -239,7 +237,7 @@ export class MeetBargeInWatcher {

/** Test-only: read the bot-speaking flag. */
_isBotSpeaking(): boolean {
return this.isBotSpeaking;
return this.activeSpeakingStreams.size > 0;
}

/** Test-only: true while a debounced cancel is queued. */
Expand Down Expand Up @@ -282,19 +280,18 @@ export class MeetBargeInWatcher {
if (message.meetingId !== this.meetingId) return;

if (message.type === "meet.speaking_started") {
this.isBotSpeaking = true;
// Speaking_started arrives *before* any audio has hit the wire, so
// there's no in-flight cancel state to clear here. We still drop a
// potentially-stale pending cancel just in case the previous
// utterance ended in a barge-in that we never fired (defensive).
const streamId = (message as { streamId?: string }).streamId;
if (streamId) this.activeSpeakingStreams.add(streamId);
this.clearPendingCancel();
return;
}

if (message.type === "meet.speaking_ended") {
this.isBotSpeaking = false;
// Stream is over — any pending cancel for this stream is moot.
this.clearPendingCancel();
const streamId = (message as { streamId?: string }).streamId;
if (streamId) this.activeSpeakingStreams.delete(streamId);
if (this.activeSpeakingStreams.size === 0) {
this.clearPendingCancel();
}
return;
}
}
Expand All @@ -315,7 +312,7 @@ export class MeetBargeInWatcher {
}

private onSpeakerChange(event: SpeakerChangeEvent): void {
if (!this.isBotSpeaking) return;
if (this.activeSpeakingStreams.size === 0) return;

if (this.botSpeakerId !== null && event.speakerId === this.botSpeakerId) {
// Floor returned to the bot — cancel any debounced cancel that was
Expand All @@ -331,7 +328,7 @@ export class MeetBargeInWatcher {
}

private onTranscriptChunk(event: TranscriptChunkEvent): void {
if (!this.isBotSpeaking) return;
if (this.activeSpeakingStreams.size === 0) return;
// Only interim chunks count for barge-in: finals are too late to be
// a useful real-time signal, and the speaker.change path covers the
// authoritative DOM-derived signal.
Expand Down Expand Up @@ -371,10 +368,9 @@ export class MeetBargeInWatcher {

this.pendingCancelHandle = this.setTimeoutFn(() => {
this.pendingCancelHandle = null;
// Re-check at fire time — `isBotSpeaking` may have been flipped
// off by `meet.speaking_ended` between scheduling and firing, in
// which case there's nothing left to cancel.
if (!this.isBotSpeaking) return;
// Re-check at fire time — all streams may have ended between
// scheduling and firing, in which case there's nothing to cancel.
if (this.activeSpeakingStreams.size === 0) return;

log.info(
{ meetingId: this.meetingId, trigger },
Expand Down
29 changes: 24 additions & 5 deletions skills/meet-join/daemon/tts-bridge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ export const BOT_AUDIO_CHANNELS = 1;
export const BOT_AUDIO_SAMPLE_BITS = 16;
export const BOT_AUDIO_ENCODING = "pcm_s16le";

/**
* Timeout for the best-effort `DELETE /play_audio/<streamId>` issued during
* cancel. The DELETE is cosmetic (the POST is already aborted, so the bot
* sees EOF), but we don't want a hung DELETE to block the cancel path
* indefinitely.
*/
export const CANCEL_DELETE_TIMEOUT_MS = 5_000;

/**
* ffmpeg arguments that read whatever format the TTS provider emits on
* stdin and write raw 48 kHz / mono / s16le PCM on stdout. The decoder is
Expand Down Expand Up @@ -447,6 +455,7 @@ export class MeetTtsBridge {
{
method: "DELETE",
headers: { Authorization: `Bearer ${this.botApiToken}` },
signal: AbortSignal.timeout(CANCEL_DELETE_TIMEOUT_MS),
},
);
} catch (err) {
Expand Down Expand Up @@ -601,11 +610,21 @@ export class MeetTtsBridge {

child.on("error", (err) => {
const nodeErr = err as NodeJS.ErrnoException;
const reason =
nodeErr?.code === "ENOENT"
? "ffmpeg binary not found on PATH (ENOENT)"
: `ffmpeg probe failed: ${err instanceof Error ? err.message : String(err)}`;
settle({ available: false, reason });
if (nodeErr?.code === "ENOENT") {
settle({
available: false,
reason: "ffmpeg binary not found on PATH (ENOENT)",
});
} else {
// Transient error (EMFILE, EAGAIN, etc.) — clear the memoized
// probe so the next speak() retries instead of being stuck on a
// sticky false negative.
this.ffmpegProbe = null;
settle({
available: false,
reason: `ffmpeg probe failed (transient): ${err instanceof Error ? err.message : String(err)}`,
});
}
});
child.on("exit", () => {
// Any exit — zero or non-zero — means ffmpeg was runnable.
Expand Down
Loading