Skip to content

Commit

Permalink
add auto retry
Browse files Browse the repository at this point in the history
  • Loading branch information
TomasBack2Future committed Nov 21, 2024
1 parent 7ae5349 commit 192c017
Showing 1 changed file with 28 additions and 15 deletions.
43 changes: 28 additions & 15 deletions agents/ten_packages/extension/deepgram_asr_python/extension.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def __init__(self, name: str):
super().__init__(name)

self.stopped = False
self.connected = False
self.client : AsyncListenWebSocketClient = None
self.config : DeepgramASRConfig = None
self.ten_env : AsyncTenEnv = None
Expand All @@ -56,11 +57,6 @@ async def on_start(self, ten_env: AsyncTenEnv) -> None:
ten_env.log_error(f"get property api_key")
return

self.client = AsyncListenWebSocketClient(config=DeepgramClientOptions(
api_key=self.config.api_key,
options={"keepalive": "true"}
))

self.loop.create_task(self._start_listen())

ten_env.log_info("starting async_deepgram_wrapper thread")
Expand All @@ -71,14 +67,22 @@ async def on_audio_frame(self, ten_env: AsyncTenEnv, frame: AudioFrame) -> None:
if not frame_buf:
self.ten_env.log_warn("send_frame: empty pcm_frame detected.")
return

if not self.connected:
self.ten_env.log_warn("send_frame: deepgram not connected.")
return

self.stream_id = frame.get_property_int('stream_id')
await self.client.send(frame_buf)
if self.client:
await self.client.send(frame_buf)

async def on_stop(self, ten_env: AsyncTenEnv) -> None:
ten_env.log_info("on_stop")

await self.client.finish()
self.stopped = True

if self.client:
await self.client.finish()

async def on_cmd(self, ten_env: AsyncTenEnv, cmd: Cmd) -> None:
cmd_json = cmd.to_json()
Expand All @@ -91,11 +95,22 @@ async def on_cmd(self, ten_env: AsyncTenEnv, cmd: Cmd) -> None:
async def _start_listen(self) -> None:
self.ten_env.log_info(f"start and listen deepgram")

self.client = AsyncListenWebSocketClient(config=DeepgramClientOptions(
api_key=self.config.api_key,
options={"keepalive": "true"}
))

async def on_open(_, open, **kwargs):
self.ten_env.log_info(f"deepgram event callback on_open: {open}")
self.connected = True

async def on_close(_, close, **kwargs):
self.ten_env.log_info(f"deepgram event callback on_close: {close}")
self.connected = False
if not self.stopped:
self.ten_env.log_warn("Deepgram connection closed unexpectedly. Reconnecting...")
await asyncio.sleep(0.2)
self.loop.create_task(self._start_listen())

async def on_message(_, result, **kwargs):
sentence = result.channel.alternatives[0].transcript
Expand Down Expand Up @@ -125,14 +140,12 @@ async def on_error(_, error, **kwargs):
punctuate=self.config.punctuate)
# connect to websocket
result = await self.client.start(options)
if result is False:
if self.client.status_code == 402:
self.ten_env.log_error("Failed to connect to Deepgram - your account has run out of credits.")
else:
self.ten_env.log_error("Failed to connect to Deepgram")
return

self.ten_env.log_info(f"successfully connected to deepgram")
if not result:
self.ten_env.log_error(f"failed to connect to deepgram")
await asyncio.sleep(0.2)
self.loop.create_task(self._start_listen())
else:
self.ten_env.log_info(f"successfully connected to deepgram")

async def _send_text(self, text: str, is_final: bool, stream_id: str) -> None:
stable_data = Data.create("text_data")
Expand Down

0 comments on commit 192c017

Please sign in to comment.