From 192c0173a8e0daf3c2ee513bf2ca7942355b1f80 Mon Sep 17 00:00:00 2001 From: TomasLiu Date: Thu, 21 Nov 2024 16:57:20 +0800 Subject: [PATCH] add auto retry --- .../deepgram_asr_python/extension.py | 43 ++++++++++++------- 1 file changed, 28 insertions(+), 15 deletions(-) diff --git a/agents/ten_packages/extension/deepgram_asr_python/extension.py b/agents/ten_packages/extension/deepgram_asr_python/extension.py index a21e8b83..af7cebde 100644 --- a/agents/ten_packages/extension/deepgram_asr_python/extension.py +++ b/agents/ten_packages/extension/deepgram_asr_python/extension.py @@ -37,6 +37,7 @@ def __init__(self, name: str): super().__init__(name) self.stopped = False + self.connected = False self.client : AsyncListenWebSocketClient = None self.config : DeepgramASRConfig = None self.ten_env : AsyncTenEnv = None @@ -56,11 +57,6 @@ async def on_start(self, ten_env: AsyncTenEnv) -> None: ten_env.log_error(f"get property api_key") return - self.client = AsyncListenWebSocketClient(config=DeepgramClientOptions( - api_key=self.config.api_key, - options={"keepalive": "true"} - )) - self.loop.create_task(self._start_listen()) ten_env.log_info("starting async_deepgram_wrapper thread") @@ -71,14 +67,22 @@ async def on_audio_frame(self, ten_env: AsyncTenEnv, frame: AudioFrame) -> None: if not frame_buf: self.ten_env.log_warn("send_frame: empty pcm_frame detected.") return + + if not self.connected: + self.ten_env.log_warn("send_frame: deepgram not connected.") + return self.stream_id = frame.get_property_int('stream_id') - await self.client.send(frame_buf) + if self.client: + await self.client.send(frame_buf) async def on_stop(self, ten_env: AsyncTenEnv) -> None: ten_env.log_info("on_stop") - await self.client.finish() + self.stopped = True + + if self.client: + await self.client.finish() async def on_cmd(self, ten_env: AsyncTenEnv, cmd: Cmd) -> None: cmd_json = cmd.to_json() @@ -91,11 +95,22 @@ async def on_cmd(self, ten_env: AsyncTenEnv, cmd: Cmd) -> None: async def _start_listen(self) -> None: self.ten_env.log_info(f"start and listen deepgram") + self.client = AsyncListenWebSocketClient(config=DeepgramClientOptions( + api_key=self.config.api_key, + options={"keepalive": "true"} + )) + async def on_open(_, open, **kwargs): self.ten_env.log_info(f"deepgram event callback on_open: {open}") + self.connected = True async def on_close(_, close, **kwargs): self.ten_env.log_info(f"deepgram event callback on_close: {close}") + self.connected = False + if not self.stopped: + self.ten_env.log_warn("Deepgram connection closed unexpectedly. Reconnecting...") + await asyncio.sleep(0.2) + self.loop.create_task(self._start_listen()) async def on_message(_, result, **kwargs): sentence = result.channel.alternatives[0].transcript @@ -125,14 +140,12 @@ async def on_error(_, error, **kwargs): punctuate=self.config.punctuate) # connect to websocket result = await self.client.start(options) - if result is False: - if self.client.status_code == 402: - self.ten_env.log_error("Failed to connect to Deepgram - your account has run out of credits.") - else: - self.ten_env.log_error("Failed to connect to Deepgram") - return - - self.ten_env.log_info(f"successfully connected to deepgram") + if not result: + self.ten_env.log_error(f"failed to connect to deepgram") + await asyncio.sleep(0.2) + self.loop.create_task(self._start_listen()) + else: + self.ten_env.log_info(f"successfully connected to deepgram") async def _send_text(self, text: str, is_final: bool, stream_id: str) -> None: stable_data = Data.create("text_data")