From 299c075d15a7da81bdafd80124cc64cebe77a1cc Mon Sep 17 00:00:00 2001 From: simon Date: Wed, 19 Nov 2025 22:11:59 +0100 Subject: [PATCH 1/5] add close handlers for stt,tts and vad for agent activity endings --- agents/src/stt/stt.ts | 9 ++++++++- agents/src/tts/tts.ts | 4 ++++ agents/src/vad.ts | 4 ++++ agents/src/voice/agent_activity.ts | 3 +++ 4 files changed, 19 insertions(+), 1 deletion(-) diff --git a/agents/src/stt/stt.ts b/agents/src/stt/stt.ts index 902cca795..54ba46c2e 100644 --- a/agents/src/stt/stt.ts +++ b/agents/src/stt/stt.ts @@ -135,6 +135,10 @@ export abstract class STT extends (EventEmitter as new () => TypedEmitter { + return; + } } /** @@ -183,7 +187,10 @@ export abstract class SpeechStream implements AsyncIterableIterator // is run **after** the constructor has finished. Otherwise we get // runtime error when trying to access class variables in the // `run` method. - startSoon(() => this.mainTask().then(() => this.queue.close())); + startSoon(() => this.mainTask().then(() => { + console.log('STT QUEU ECLOSED'); + return this.queue.close() + })); } private async mainTask() { diff --git a/agents/src/tts/tts.ts b/agents/src/tts/tts.ts index fc420afe6..91c31fc06 100644 --- a/agents/src/tts/tts.ts +++ b/agents/src/tts/tts.ts @@ -94,6 +94,10 @@ export abstract class TTS extends (EventEmitter as new () => TypedEmitter { + return; + } } /** diff --git a/agents/src/vad.ts b/agents/src/vad.ts index c220ba8ff..0f9471264 100644 --- a/agents/src/vad.ts +++ b/agents/src/vad.ts @@ -80,6 +80,10 @@ export abstract class VAD extends (EventEmitter as new () => TypedEmitter { + return; + } } export abstract class VADStream implements AsyncIterableIterator { diff --git a/agents/src/voice/agent_activity.ts b/agents/src/voice/agent_activity.ts index 799de0e4a..7b4b53ce4 100644 --- a/agents/src/voice/agent_activity.ts +++ b/agents/src/voice/agent_activity.ts @@ -2141,12 +2141,15 @@ export class AgentActivity implements RecognitionHooks { } if (this.stt instanceof STT) { this.stt.off('metrics_collected', this.onMetricsCollected); + await this.stt.close(); } if (this.tts instanceof TTS) { this.tts.off('metrics_collected', this.onMetricsCollected); + await this.tts.close(); } if (this.vad instanceof VAD) { this.vad.off('metrics_collected', this.onMetricsCollected); + await this.vad.close(); } this.detachAudioInput(); From 6f387138affa761d2dc33e4cee713e834c91e466 Mon Sep 17 00:00:00 2001 From: simon Date: Wed, 19 Nov 2025 22:42:52 +0100 Subject: [PATCH 2/5] deepgram STT resource cleanup --- plugins/deepgram/src/stt.ts | 161 +++++++++++++++++++++++------------- 1 file changed, 103 insertions(+), 58 deletions(-) diff --git a/plugins/deepgram/src/stt.ts b/plugins/deepgram/src/stt.ts index bc4b1d4a4..cd3f7d8c6 100644 --- a/plugins/deepgram/src/stt.ts +++ b/plugins/deepgram/src/stt.ts @@ -6,11 +6,13 @@ import { AudioByteStream, AudioEnergyFilter, Future, + Task, log, stt, + waitForAbort, } from '@livekit/agents'; import type { AudioFrame } from '@livekit/rtc-node'; -import { type RawData, WebSocket } from 'ws'; +import { WebSocket } from 'ws'; import { PeriodicCollector } from './_utils.js'; import type { STTLanguages, STTModels } from './models.js'; @@ -62,6 +64,7 @@ export class STT extends stt.STT { #opts: STTOptions; #logger = log(); label = 'deepgram.STT'; + private abortController = new AbortController(); constructor(opts: Partial = defaultSTTOptions) { super({ @@ -111,7 +114,11 @@ export class STT extends stt.STT { } stream(): SpeechStream { - return new SpeechStream(this, this.#opts); + return new SpeechStream(this, this.#opts, this.abortController); + } + + async close() { + this.abortController.abort(); } } @@ -125,7 +132,11 @@ export class SpeechStream extends stt.SpeechStream { #audioDurationCollector: PeriodicCollector; label = 'deepgram.SpeechStream'; - constructor(stt: STT, opts: STTOptions) { + constructor( + stt: STT, + opts: STTOptions, + private abortController: AbortController, + ) { super(stt, opts.sampleRate); this.#opts = opts; this.closed = false; @@ -140,7 +151,8 @@ export class SpeechStream extends stt.SpeechStream { const maxRetry = 32; let retries = 0; let ws: WebSocket; - while (!this.input.closed) { + + while (!this.input.closed && !this.closed) { const streamURL = new URL(API_BASE_URL_V1); const params = { model: this.#opts.model, @@ -185,17 +197,23 @@ export class SpeechStream extends stt.SpeechStream { await this.#runWS(ws); } catch (e) { - if (retries >= maxRetry) { - throw new Error(`failed to connect to Deepgram after ${retries} attempts: ${e}`); - } + if (!this.closed && !this.input.closed) { + if (retries >= maxRetry) { + throw new Error(`failed to connect to Deepgram after ${retries} attempts: ${e}`); + } - const delay = Math.min(retries * 5, 10); - retries++; + const delay = Math.min(retries * 5, 10); + retries++; - this.#logger.warn( - `failed to connect to Deepgram, retrying in ${delay} seconds: ${e} (${retries}/${maxRetry})`, - ); - await new Promise((resolve) => setTimeout(resolve, delay * 1000)); + this.#logger.warn( + `failed to connect to Deepgram, retrying in ${delay} seconds: ${e} (${retries}/${maxRetry})`, + ); + await new Promise((resolve) => setTimeout(resolve, delay * 1000)); + } else { + this.#logger.warn( + `Deepgram disconnected, connection is closed: ${e} (inputClosed: ${this.input.closed}, isClosed: ${this.closed})`, + ); + } } } @@ -220,6 +238,20 @@ export class SpeechStream extends stt.SpeechStream { } }, 5000); + // gets cancelled also when sendTask is complete + const wsMonitor = Task.from(async (controller) => { + const closed = new Promise(async (_, reject) => { + ws.once('close', (code, reason) => { + if (!closing) { + this.#logger.error(`WebSocket closed with code ${code}: ${reason}`); + reject(new Error('WebSocket closed')); + } + }); + }); + + await Promise.race([closed, waitForAbort(controller.signal)]); + }); + const sendTask = async () => { const samples100Ms = Math.floor(this.#opts.sampleRate / 10); const stream = new AudioByteStream( @@ -228,48 +260,50 @@ export class SpeechStream extends stt.SpeechStream { samples100Ms, ); - for await (const data of this.input) { - let frames: AudioFrame[]; - if (data === SpeechStream.FLUSH_SENTINEL) { - frames = stream.flush(); - this.#audioDurationCollector.flush(); - } else if ( - data.sampleRate === this.#opts.sampleRate || - data.channels === this.#opts.numChannels - ) { - frames = stream.write(data.data.buffer); - } else { - throw new Error(`sample rate or channel count of frame does not match`); - } + try { + while (!this.closed) { + const result = await Promise.race([ + this.input.next(), + waitForAbort(this.abortController.signal), + ]); + + if (!result) { + return; + } + const data = result.value; + + let frames: AudioFrame[]; + if (data === SpeechStream.FLUSH_SENTINEL) { + frames = stream.flush(); + this.#audioDurationCollector.flush(); + } else if ( + data.sampleRate === this.#opts.sampleRate || + data.channels === this.#opts.numChannels + ) { + frames = stream.write(data.data.buffer as ArrayBuffer); + } else { + throw new Error(`sample rate or channel count of frame does not match`); + } - for await (const frame of frames) { - if (this.#audioEnergyFilter.pushFrame(frame)) { - const frameDuration = frame.samplesPerChannel / frame.sampleRate; - this.#audioDurationCollector.push(frameDuration); - ws.send(frame.data.buffer); + for await (const frame of frames) { + if (this.#audioEnergyFilter.pushFrame(frame)) { + const frameDuration = frame.samplesPerChannel / frame.sampleRate; + this.#audioDurationCollector.push(frameDuration); + ws.send(frame.data.buffer); + } } } + } finally { + closing = true; + ws.send(JSON.stringify({ type: 'CloseStream' })); + wsMonitor.cancel(); } - - closing = true; - ws.send(JSON.stringify({ type: 'CloseStream' })); }; - const wsMonitor = new Promise((_, reject) => - ws.once('close', (code, reason) => { - if (!closing) { - this.#logger.error(`WebSocket closed with code ${code}: ${reason}`); - reject(new Error('WebSocket closed')); - } - }), - ); - - const listenTask = async () => { - while (!this.closed && !closing) { - try { - await new Promise((resolve) => { - ws.once('message', (data) => resolve(data)); - }).then((msg) => { + const listenTask = Task.from(async (controller) => { + const listenMessage = new Promise((resolve, reject) => { + ws.on('message', (msg) => { + try { const json = JSON.parse(msg.toString()); switch (json['type']) { case 'SpeechStarted': { @@ -300,7 +334,9 @@ export class SpeechStream extends stt.SpeechStream { if (alternatives[0] && alternatives[0].text) { if (!this.#speaking) { this.#speaking = true; - this.queue.put({ type: stt.SpeechEventType.START_OF_SPEECH }); + this.queue.put({ + type: stt.SpeechEventType.START_OF_SPEECH, + }); } if (isFinal) { @@ -334,15 +370,24 @@ export class SpeechStream extends stt.SpeechStream { break; } } - }); - } catch (error) { - this.#logger.child({ error }).warn('unrecoverable error, exiting'); - break; - } - } - }; - await Promise.race([this.#resetWS.await, Promise.all([sendTask(), listenTask(), wsMonitor])]); + if (this.closed || closing) { + resolve(); + } + } catch (err) { + this.#logger.error(`STT: Error processing message: ${msg}`); + reject(err); + } + }); + }); + + await Promise.race([listenMessage, waitForAbort(controller.signal)]); + }, this.abortController); + + await Promise.race([ + this.#resetWS.await, + Promise.all([sendTask(), listenTask.result, wsMonitor]), + ]); closing = true; ws.close(); clearInterval(keepalive); From 980b2239240facbe2e8174f27bde58338e057530 Mon Sep 17 00:00:00 2001 From: Simon Tretter Date: Wed, 19 Nov 2025 22:48:13 +0100 Subject: [PATCH 3/5] Update stt.ts --- agents/src/stt/stt.ts | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/agents/src/stt/stt.ts b/agents/src/stt/stt.ts index 54ba46c2e..7fac7b11c 100644 --- a/agents/src/stt/stt.ts +++ b/agents/src/stt/stt.ts @@ -187,10 +187,7 @@ export abstract class SpeechStream implements AsyncIterableIterator // is run **after** the constructor has finished. Otherwise we get // runtime error when trying to access class variables in the // `run` method. - startSoon(() => this.mainTask().then(() => { - console.log('STT QUEU ECLOSED'); - return this.queue.close() - })); + startSoon(() => this.mainTask().then(() => this.queue.close())); } private async mainTask() { From 8aa9df6290acd3d39f1b34a0573aad549db35712 Mon Sep 17 00:00:00 2001 From: simon Date: Thu, 20 Nov 2025 13:34:49 +0100 Subject: [PATCH 4/5] catch result.done --- plugins/deepgram/src/stt.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/plugins/deepgram/src/stt.ts b/plugins/deepgram/src/stt.ts index cd3f7d8c6..b7d7c02a6 100644 --- a/plugins/deepgram/src/stt.ts +++ b/plugins/deepgram/src/stt.ts @@ -267,9 +267,11 @@ export class SpeechStream extends stt.SpeechStream { waitForAbort(this.abortController.signal), ]); - if (!result) { - return; + if (result === undefined) return; // aborted + if (result.done) { + break; } + const data = result.value; let frames: AudioFrame[]; From 7d966e84ddb3331bbce218cd08a2250eb41bae75 Mon Sep 17 00:00:00 2001 From: Simon Tretter Date: Thu, 20 Nov 2025 13:36:13 +0100 Subject: [PATCH 5/5] Fix resource cleanup in changeset --- .changeset/curly-kangaroos-accept.md | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .changeset/curly-kangaroos-accept.md diff --git a/.changeset/curly-kangaroos-accept.md b/.changeset/curly-kangaroos-accept.md new file mode 100644 index 000000000..af9713307 --- /dev/null +++ b/.changeset/curly-kangaroos-accept.md @@ -0,0 +1,6 @@ +--- +"@livekit/agents": patch +"@livekit/agents-plugin-deepgram": patch +--- + +fix resource cleanup