diff --git a/agents/ten_packages/extension/deepgram_asr_python/__init__.py b/agents/ten_packages/extension/deepgram_asr_python/__init__.py index 71578b73..f3c731cd 100644 --- a/agents/ten_packages/extension/deepgram_asr_python/__init__.py +++ b/agents/ten_packages/extension/deepgram_asr_python/__init__.py @@ -1,5 +1 @@ -from . import deepgram_asr_addon -from .extension import EXTENSION_NAME -from .log import logger - -logger.info(f"{EXTENSION_NAME} extension loaded") +from . import addon diff --git a/agents/ten_packages/extension/deepgram_asr_python/deepgram_asr_addon.py b/agents/ten_packages/extension/deepgram_asr_python/addon.py similarity index 55% rename from agents/ten_packages/extension/deepgram_asr_python/deepgram_asr_addon.py rename to agents/ten_packages/extension/deepgram_asr_python/addon.py index bf8c0422..d8e6f467 100644 --- a/agents/ten_packages/extension/deepgram_asr_python/deepgram_asr_addon.py +++ b/agents/ten_packages/extension/deepgram_asr_python/addon.py @@ -3,12 +3,10 @@ register_addon_as_extension, TenEnv, ) -from .extension import EXTENSION_NAME -@register_addon_as_extension(EXTENSION_NAME) +@register_addon_as_extension("deepgram_asr_python") class DeepgramASRExtensionAddon(Addon): def on_create_instance(self, ten: TenEnv, addon_name: str, context) -> None: - from .log import logger - from .deepgram_asr_extension import DeepgramASRExtension - logger.info("on_create_instance") + from .extension import DeepgramASRExtension + ten.log_info("on_create_instance") ten.on_create_instance_done(DeepgramASRExtension(addon_name), context) diff --git a/agents/ten_packages/extension/deepgram_asr_python/deepgram_config.py b/agents/ten_packages/extension/deepgram_asr_python/config.py similarity index 100% rename from agents/ten_packages/extension/deepgram_asr_python/deepgram_config.py rename to agents/ten_packages/extension/deepgram_asr_python/config.py diff --git a/agents/ten_packages/extension/deepgram_asr_python/deepgram_asr_extension.py b/agents/ten_packages/extension/deepgram_asr_python/deepgram_asr_extension.py deleted file mode 100644 index fbaccbdc..00000000 --- a/agents/ten_packages/extension/deepgram_asr_python/deepgram_asr_extension.py +++ /dev/null @@ -1,104 +0,0 @@ -from ten import ( - Extension, - TenEnv, - Cmd, - AudioFrame, - StatusCode, - CmdResult, -) - -import asyncio -import threading - -from .log import logger -from .deepgram_wrapper import AsyncDeepgramWrapper, DeepgramConfig - -PROPERTY_API_KEY = "api_key" # Required -PROPERTY_LANG = "language" # Optional -PROPERTY_MODEL = "model" # Optional -PROPERTY_SAMPLE_RATE = "sample_rate" # Optional - - -class DeepgramASRExtension(Extension): - def __init__(self, name: str): - super().__init__(name) - - self.stopped = False - self.queue = asyncio.Queue(maxsize=3000) # about 3000 * 10ms = 30s input - self.deepgram = None - self.thread = None - - self.loop = asyncio.new_event_loop() - asyncio.set_event_loop(self.loop) - - def on_start(self, ten: TenEnv) -> None: - logger.info("on_start") - - deepgram_config = DeepgramConfig.default_config() - - try: - deepgram_config.api_key = ten.get_property_string(PROPERTY_API_KEY).strip() - except Exception as e: - logger.error(f"get property {PROPERTY_API_KEY} error: {e}") - return - - for optional_param in [ - PROPERTY_LANG, - PROPERTY_MODEL, - PROPERTY_SAMPLE_RATE, - ]: - try: - value = ten.get_property_string(optional_param).strip() - if value: - deepgram_config.__setattr__(optional_param, value) - except Exception as err: - logger.debug( - f"get property optional {optional_param} failed, err: {err}. Using default value: {deepgram_config.__getattribute__(optional_param)}" - ) - - self.deepgram = AsyncDeepgramWrapper( - deepgram_config, self.queue, ten, self.loop - ) - - logger.info("starting async_deepgram_wrapper thread") - self.thread = threading.Thread(target=self.deepgram.run, args=[]) - self.thread.start() - - ten.on_start_done() - - def put_pcm_frame(self, pcm_frame: AudioFrame) -> None: - try: - asyncio.run_coroutine_threadsafe( - self.queue.put(pcm_frame), self.loop - ).result(timeout=0.5) - except asyncio.QueueFull: - logger.exception("queue is full, dropping frame") - except Exception as e: - logger.exception(f"error putting frame in queue: {e}") - - def on_audio_frame(self, ten: TenEnv, frame: AudioFrame) -> None: - self.put_pcm_frame(pcm_frame=frame) - - def on_stop(self, ten: TenEnv) -> None: - logger.info("on_stop") - - # put an empty frame to stop deepgram_wrapper - self.put_pcm_frame(None) - self.stopped = True - self.thread.join() - self.loop.stop() - self.loop.close() - - ten.on_stop_done() - - def on_cmd(self, ten: TenEnv, cmd: Cmd) -> None: - logger.info("on_cmd") - cmd_json = cmd.to_json() - logger.info("on_cmd json: " + cmd_json) - - cmdName = cmd.get_name() - logger.info("got cmd %s" % cmdName) - - cmd_result = CmdResult.create(StatusCode.OK) - cmd_result.set_property_string("detail", "success") - ten.return_result(cmd_result, cmd) diff --git a/agents/ten_packages/extension/deepgram_asr_python/deepgram_wrapper.py b/agents/ten_packages/extension/deepgram_asr_python/deepgram_wrapper.py deleted file mode 100644 index bea05b71..00000000 --- a/agents/ten_packages/extension/deepgram_asr_python/deepgram_wrapper.py +++ /dev/null @@ -1,132 +0,0 @@ -import asyncio - -from ten import ( - TenEnv, - Data -) - -from deepgram import AsyncListenWebSocketClient, DeepgramClientOptions, LiveTranscriptionEvents, LiveOptions - -from .log import logger -from .deepgram_config import DeepgramConfig - -DATA_OUT_TEXT_DATA_PROPERTY_TEXT = "text" -DATA_OUT_TEXT_DATA_PROPERTY_IS_FINAL = "is_final" -DATA_OUT_TEXT_DATA_PROPERTY_STREAM_ID = "stream_id" -DATA_OUT_TEXT_DATA_PROPERTY_END_OF_SEGMENT = "end_of_segment" - -def create_and_send_data(ten: TenEnv, text_result: str, is_final: bool, stream_id: int): - stable_data = Data.create("text_data") - stable_data.set_property_bool(DATA_OUT_TEXT_DATA_PROPERTY_IS_FINAL, is_final) - stable_data.set_property_string(DATA_OUT_TEXT_DATA_PROPERTY_TEXT, text_result) - stable_data.set_property_int(DATA_OUT_TEXT_DATA_PROPERTY_STREAM_ID, stream_id) - stable_data.set_property_bool(DATA_OUT_TEXT_DATA_PROPERTY_END_OF_SEGMENT, is_final) - ten.send_data(stable_data) - - -class AsyncDeepgramWrapper(): - def __init__(self, config: DeepgramConfig, queue: asyncio.Queue, ten:TenEnv, loop: asyncio.BaseEventLoop): - self.queue = queue - self.ten = ten - self.stopped = False - self.config = config - self.loop = loop - self.stream_id = 0 - - logger.info(f"init deepgram client with api key: {config.api_key[:5]}") - self.deepgram_client = AsyncListenWebSocketClient(config=DeepgramClientOptions( - api_key=config.api_key, - options={"keepalive": "true"} - )) - - asyncio.set_event_loop(self.loop) - self.loop.create_task(self.start_listen(ten)) - - async def start_listen(self, ten:TenEnv) -> None: - logger.info(f"start and listen deepgram") - - super = self - - async def on_open(self, open, **kwargs): - logger.info(f"deepgram event callback on_open: {open}") - - async def on_close(self, close, **kwargs): - logger.info(f"deepgram event callback on_close: {close}") - - async def on_message(self, result, **kwargs): - sentence = result.channel.alternatives[0].transcript - - if len(sentence) == 0: - return - - is_final = result.is_final - logger.info(f"deepgram got sentence: [{sentence}], is_final: {is_final}, stream_id: {super.stream_id}") - - create_and_send_data(ten=ten, text_result=sentence, is_final=is_final, stream_id=super.stream_id) - - async def on_error(self, error, **kwargs): - logger.error(f"deepgram event callback on_error: {error}") - - self.deepgram_client.on(LiveTranscriptionEvents.Open, on_open) - self.deepgram_client.on(LiveTranscriptionEvents.Close, on_close) - self.deepgram_client.on(LiveTranscriptionEvents.Transcript, on_message) - self.deepgram_client.on(LiveTranscriptionEvents.Error, on_error) - - options = LiveOptions(language=self.config.language, - model=self.config.model, - sample_rate=self.config.sample_rate, - channels=self.config.channels, - encoding=self.config.encoding, - interim_results=self.config.interim_results, - punctuate=self.config.punctuate) - # connect to websocket - result = await self.deepgram_client.start(options) - if result is False: - if self.deepgram_client.status_code == 402: - logger.error("Failed to connect to Deepgram - your account has run out of credits.") - else: - logger.error("Failed to connect to Deepgram") - return - - logger.info(f"successfully connected to deepgram") - - async def send_frame(self) -> None: - while not self.stopped: - try: - pcm_frame = await asyncio.wait_for(self.queue.get(), timeout=10.0) - - if pcm_frame is None: - logger.warning("send_frame: exit due to None value got.") - return - - frame_buf = pcm_frame.get_buf() - if not frame_buf: - logger.warning("send_frame: empty pcm_frame detected.") - continue - - self.stream_id = pcm_frame.get_property_int('stream_id') - await self.deepgram_client.send(frame_buf) - self.queue.task_done() - except asyncio.TimeoutError as e: - logger.exception(f"error in send_frame: {e}") - except IOError as e: - logger.exception(f"error in send_frame: {e}") - except Exception as e: - logger.exception(f"error in send_frame: {e}") - raise e - - logger.info("send_frame: exit due to self.stopped == True") - - async def deepgram_loop(self) -> None: - try: - await self.send_frame() - except Exception as e: - logger.exception(e) - - def run(self) -> None: - self.loop.run_until_complete(self.deepgram_loop()) - self.loop.close() - logger.info("async_deepgram_wrapper: thread completed.") - - def stop(self) -> None: - self.stopped = True diff --git a/agents/ten_packages/extension/deepgram_asr_python/extension.py b/agents/ten_packages/extension/deepgram_asr_python/extension.py index 43c52445..91248c55 100644 --- a/agents/ten_packages/extension/deepgram_asr_python/extension.py +++ b/agents/ten_packages/extension/deepgram_asr_python/extension.py @@ -1,3 +1,150 @@ -# extension.py +from ten import ( + AsyncExtension, + AsyncTenEnv, + Cmd, + Data, + AudioFrame, + StatusCode, + CmdResult, +) -EXTENSION_NAME = "deepgram_asr_python" +import asyncio + +from deepgram import AsyncListenWebSocketClient, DeepgramClientOptions, LiveTranscriptionEvents, LiveOptions + +from .config import DeepgramConfig + +PROPERTY_API_KEY = "api_key" # Required +PROPERTY_LANG = "language" # Optional +PROPERTY_MODEL = "model" # Optional +PROPERTY_SAMPLE_RATE = "sample_rate" # Optional + +DATA_OUT_TEXT_DATA_PROPERTY_TEXT = "text" +DATA_OUT_TEXT_DATA_PROPERTY_IS_FINAL = "is_final" +DATA_OUT_TEXT_DATA_PROPERTY_STREAM_ID = "stream_id" +DATA_OUT_TEXT_DATA_PROPERTY_END_OF_SEGMENT = "end_of_segment" + +class DeepgramASRExtension(AsyncExtension): + def __init__(self, name: str): + super().__init__(name) + + self.stopped = False + self.deepgram_client : AsyncListenWebSocketClient = None + self.deepgram_config : DeepgramConfig = None + self.ten_env : AsyncTenEnv = None + + async def on_init(self, ten_env: AsyncTenEnv) -> None: + ten_env.log_info("DeepgramASRExtension on_init") + + async def on_start(self, ten_env: AsyncTenEnv) -> None: + ten_env.log_info("on_start") + self.loop = asyncio.get_event_loop() + self.ten_env = ten_env + + self.deepgram_config = DeepgramConfig.default_config() + + try: + self.deepgram_config.api_key = ten_env.get_property_string(PROPERTY_API_KEY).strip() + except Exception as e: + ten_env.log_error(f"get property {PROPERTY_API_KEY} error: {e}") + return + + for optional_param in [ + PROPERTY_LANG, + PROPERTY_MODEL, + PROPERTY_SAMPLE_RATE, + ]: + try: + value = ten_env.get_property_string(optional_param).strip() + if value: + self.deepgram_config.__setattr__(optional_param, value) + except Exception as err: + ten_env.log_debug( + f"get property optional {optional_param} failed, err: {err}. Using default value: {self.deepgram_config.__getattribute__(optional_param)}" + ) + + self.deepgram_client = AsyncListenWebSocketClient(config=DeepgramClientOptions( + api_key=self.deepgram_config.api_key, + options={"keepalive": "true"} + )) + + self.loop.create_task(self._start_listen()) + + ten_env.log_info("starting async_deepgram_wrapper thread") + + async def on_audio_frame(self, ten_env: AsyncTenEnv, frame: AudioFrame) -> None: + frame_buf = frame.get_buf() + + if not frame_buf: + self.ten_env.log_warn("send_frame: empty pcm_frame detected.") + return + + self.stream_id = frame.get_property_int('stream_id') + await self.deepgram_client.send(frame_buf) + + async def on_stop(self, ten_env: AsyncTenEnv) -> None: + ten_env.log_info("on_stop") + + await self.deepgram_client.finish() + + async def on_cmd(self, ten_env: AsyncTenEnv, cmd: Cmd) -> None: + cmd_json = cmd.to_json() + ten_env.log_info(f"on_cmd json: {cmd_json}") + + cmd_result = CmdResult.create(StatusCode.OK) + cmd_result.set_property_string("detail", "success") + ten_env.return_result(cmd_result, cmd) + + async def _start_listen(self) -> None: + self.ten_env.log_info(f"start and listen deepgram") + + async def on_open(_, open, **kwargs): + self.ten_env.log_info(f"deepgram event callback on_open: {open}") + + async def on_close(_, close, **kwargs): + self.ten_env.log_info(f"deepgram event callback on_close: {close}") + + async def on_message(_, result, **kwargs): + sentence = result.channel.alternatives[0].transcript + + if len(sentence) == 0: + return + + is_final = result.is_final + self.ten_env.log_info(f"deepgram got sentence: [{sentence}], is_final: {is_final}, stream_id: {self.stream_id}") + + await self._send_text(text=sentence, is_final=is_final, stream_id=self.stream_id) + + async def on_error(_, error, **kwargs): + self.ten_env.log_error(f"deepgram event callback on_error: {error}") + + self.deepgram_client.on(LiveTranscriptionEvents.Open, on_open) + self.deepgram_client.on(LiveTranscriptionEvents.Close, on_close) + self.deepgram_client.on(LiveTranscriptionEvents.Transcript, on_message) + self.deepgram_client.on(LiveTranscriptionEvents.Error, on_error) + + options = LiveOptions(language=self.deepgram_config.language, + model=self.deepgram_config.model, + sample_rate=self.deepgram_config.sample_rate, + channels=self.deepgram_config.channels, + encoding=self.deepgram_config.encoding, + interim_results=self.deepgram_config.interim_results, + punctuate=self.deepgram_config.punctuate) + # connect to websocket + result = await self.deepgram_client.start(options) + if result is False: + if self.deepgram_client.status_code == 402: + self.ten_env.log_error("Failed to connect to Deepgram - your account has run out of credits.") + else: + self.ten_env.log_error("Failed to connect to Deepgram") + return + + self.ten_env.log_info(f"successfully connected to deepgram") + + async def _send_text(self, text: str, is_final: bool, stream_id: str) -> None: + stable_data = Data.create("text_data") + stable_data.set_property_bool(DATA_OUT_TEXT_DATA_PROPERTY_IS_FINAL, is_final) + stable_data.set_property_string(DATA_OUT_TEXT_DATA_PROPERTY_TEXT, text) + stable_data.set_property_int(DATA_OUT_TEXT_DATA_PROPERTY_STREAM_ID, stream_id) + stable_data.set_property_bool(DATA_OUT_TEXT_DATA_PROPERTY_END_OF_SEGMENT, is_final) + self.ten_env.send_data(stable_data) diff --git a/agents/ten_packages/extension/deepgram_asr_python/log.py b/agents/ten_packages/extension/deepgram_asr_python/log.py deleted file mode 100644 index 88a2cb1c..00000000 --- a/agents/ten_packages/extension/deepgram_asr_python/log.py +++ /dev/null @@ -1,16 +0,0 @@ -# log.py - -import logging -from .extension import EXTENSION_NAME - -logger = logging.getLogger(EXTENSION_NAME) -logger.setLevel(logging.INFO) - -formatter = logging.Formatter( - "%(asctime)s - %(name)s - %(levelname)s - %(process)d - [%(filename)s:%(lineno)d] - %(message)s" -) - -console_handler = logging.StreamHandler() -console_handler.setFormatter(formatter) - -logger.addHandler(console_handler)