diff --git a/skills/meet-join/bot/src/control/http-server.ts b/skills/meet-join/bot/src/control/http-server.ts index 90d928e8c86..3d48394df9c 100644 --- a/skills/meet-join/bot/src/control/http-server.ts +++ b/skills/meet-join/bot/src/control/http-server.ts @@ -322,14 +322,12 @@ export function createHttpServer( // awaiting it directly is safe. await previousChain; + try { let handle: AudioPlaybackHandle; try { handle = playbackFactory(playbackSpawnOptions); } catch (err) { const message = err instanceof Error ? err.message : String(err); - // Release our slot so the next POST in the chain isn't blocked on a - // handler that failed before it even registered. - releaseChain(); return c.json({ error: `failed to start playback: ${message}` }, 500); } @@ -342,15 +340,14 @@ export function createHttpServer( const body = c.req.raw.body; if (!body) { - activeStreams.delete(streamId); - // No body is treated as an empty stream — flush trailing silence for - // symmetry and return success. + if (activeStreams.get(streamId)?.controller === controller) { + activeStreams.delete(streamId); + } try { await handle.flushSilence(TRAILING_SILENCE_MS); } catch { // Best-effort; silence is cosmetic. } - releaseChain(); return c.json({ streamId, bytes: 0 }, 200); } @@ -411,18 +408,14 @@ export function createHttpServer( } catch { // Lock may already be released after `cancel()`; fine. } - activeStreams.delete(streamId); - // Always flush trailing silence so we don't "pop" — even on cancel, - // which intentionally stops PCM mid-frame. + if (activeStreams.get(streamId)?.controller === controller) { + activeStreams.delete(streamId); + } try { await handle.flushSilence(TRAILING_SILENCE_MS); } catch { // Best-effort. } - // Release our slot in the playback chain *after* the silence flush - // so any POST queued behind us only unblocks once the shared pacat - // stdin is fully quiesced. - releaseChain(); } if (writeError) { @@ -446,6 +439,9 @@ export function createHttpServer( ); } return c.json({ streamId, bytes }, 200); + } finally { + releaseChain(); + } }); // ------------------------------------------------------------------------- diff --git a/skills/meet-join/daemon/barge-in-watcher.ts b/skills/meet-join/daemon/barge-in-watcher.ts index c8125df3c45..0633bebfba9 100644 --- a/skills/meet-join/daemon/barge-in-watcher.ts +++ b/skills/meet-join/daemon/barge-in-watcher.ts @@ -152,8 +152,8 @@ export class MeetBargeInWatcher { /** Bot's DOM participant id, captured from the first `isSelf` joiner. */ private botSpeakerId: string | null = null; - /** True between `meet.speaking_started` and `meet.speaking_ended`. */ - private isBotSpeaking = false; + /** Active bot TTS stream ids — non-empty means the bot is speaking. */ + private activeSpeakingStreams = new Set(); /** Debounce timer for a pending cancel. `null` when no cancel is queued. */ private pendingCancelHandle: unknown = null; @@ -227,9 +227,7 @@ export class MeetBargeInWatcher { this.hubSubscription = null; } - // Reset speaking state so a re-start (e.g. test) doesn't inherit the - // last meeting's flag. - this.isBotSpeaking = false; + this.activeSpeakingStreams.clear(); } /** Test-only: read the bot's discovered speaker id. */ @@ -239,7 +237,7 @@ export class MeetBargeInWatcher { /** Test-only: read the bot-speaking flag. */ _isBotSpeaking(): boolean { - return this.isBotSpeaking; + return this.activeSpeakingStreams.size > 0; } /** Test-only: true while a debounced cancel is queued. */ @@ -282,19 +280,18 @@ export class MeetBargeInWatcher { if (message.meetingId !== this.meetingId) return; if (message.type === "meet.speaking_started") { - this.isBotSpeaking = true; - // Speaking_started arrives *before* any audio has hit the wire, so - // there's no in-flight cancel state to clear here. We still drop a - // potentially-stale pending cancel just in case the previous - // utterance ended in a barge-in that we never fired (defensive). + const streamId = (message as { streamId?: string }).streamId; + if (streamId) this.activeSpeakingStreams.add(streamId); this.clearPendingCancel(); return; } if (message.type === "meet.speaking_ended") { - this.isBotSpeaking = false; - // Stream is over — any pending cancel for this stream is moot. - this.clearPendingCancel(); + const streamId = (message as { streamId?: string }).streamId; + if (streamId) this.activeSpeakingStreams.delete(streamId); + if (this.activeSpeakingStreams.size === 0) { + this.clearPendingCancel(); + } return; } } @@ -315,7 +312,7 @@ export class MeetBargeInWatcher { } private onSpeakerChange(event: SpeakerChangeEvent): void { - if (!this.isBotSpeaking) return; + if (this.activeSpeakingStreams.size === 0) return; if (this.botSpeakerId !== null && event.speakerId === this.botSpeakerId) { // Floor returned to the bot — cancel any debounced cancel that was @@ -331,7 +328,7 @@ export class MeetBargeInWatcher { } private onTranscriptChunk(event: TranscriptChunkEvent): void { - if (!this.isBotSpeaking) return; + if (this.activeSpeakingStreams.size === 0) return; // Only interim chunks count for barge-in: finals are too late to be // a useful real-time signal, and the speaker.change path covers the // authoritative DOM-derived signal. @@ -371,10 +368,9 @@ export class MeetBargeInWatcher { this.pendingCancelHandle = this.setTimeoutFn(() => { this.pendingCancelHandle = null; - // Re-check at fire time — `isBotSpeaking` may have been flipped - // off by `meet.speaking_ended` between scheduling and firing, in - // which case there's nothing left to cancel. - if (!this.isBotSpeaking) return; + // Re-check at fire time — all streams may have ended between + // scheduling and firing, in which case there's nothing to cancel. + if (this.activeSpeakingStreams.size === 0) return; log.info( { meetingId: this.meetingId, trigger }, diff --git a/skills/meet-join/daemon/tts-bridge.ts b/skills/meet-join/daemon/tts-bridge.ts index 405724b164a..164ff9beb1b 100644 --- a/skills/meet-join/daemon/tts-bridge.ts +++ b/skills/meet-join/daemon/tts-bridge.ts @@ -56,6 +56,14 @@ export const BOT_AUDIO_CHANNELS = 1; export const BOT_AUDIO_SAMPLE_BITS = 16; export const BOT_AUDIO_ENCODING = "pcm_s16le"; +/** + * Timeout for the best-effort `DELETE /play_audio/` issued during + * cancel. The DELETE is cosmetic (the POST is already aborted, so the bot + * sees EOF), but we don't want a hung DELETE to block the cancel path + * indefinitely. + */ +export const CANCEL_DELETE_TIMEOUT_MS = 5_000; + /** * ffmpeg arguments that read whatever format the TTS provider emits on * stdin and write raw 48 kHz / mono / s16le PCM on stdout. The decoder is @@ -447,6 +455,7 @@ export class MeetTtsBridge { { method: "DELETE", headers: { Authorization: `Bearer ${this.botApiToken}` }, + signal: AbortSignal.timeout(CANCEL_DELETE_TIMEOUT_MS), }, ); } catch (err) { @@ -601,11 +610,21 @@ export class MeetTtsBridge { child.on("error", (err) => { const nodeErr = err as NodeJS.ErrnoException; - const reason = - nodeErr?.code === "ENOENT" - ? "ffmpeg binary not found on PATH (ENOENT)" - : `ffmpeg probe failed: ${err instanceof Error ? err.message : String(err)}`; - settle({ available: false, reason }); + if (nodeErr?.code === "ENOENT") { + settle({ + available: false, + reason: "ffmpeg binary not found on PATH (ENOENT)", + }); + } else { + // Transient error (EMFILE, EAGAIN, etc.) — clear the memoized + // probe so the next speak() retries instead of being stuck on a + // sticky false negative. + this.ffmpegProbe = null; + settle({ + available: false, + reason: `ffmpeg probe failed (transient): ${err instanceof Error ? err.message : String(err)}`, + }); + } }); child.on("exit", () => { // Any exit — zero or non-zero — means ffmpeg was runnable.