diff --git a/.env.example b/.env.example index f8140657..1fe3c50f 100644 --- a/.env.example +++ b/.env.example @@ -106,4 +106,13 @@ WEATHERAPI_API_KEY= # Extension: bingsearch_tool_python # Bing search API key -BING_API_KEY= \ No newline at end of file +BING_API_KEY= + +# Extension: tsdb_firestore +# Firestore certifications +FIRESTORE_PROJECT_ID= +FIRESTORE_PRIVATE_KEY_ID= +FIRESTORE_PRIVATE_KEY= +FIRESTORE_CLIENT_EMAIL= +FIRESTORE_CLIENT_ID= +FIRESTORE_CERT_URL= diff --git a/agents/property.json b/agents/property.json index dbd877b6..e9676848 100644 --- a/agents/property.json +++ b/agents/property.json @@ -2941,6 +2941,291 @@ ] } ] + }, + { + "name": "va_openai_v2v_storage", + "auto_start": true, + "nodes": [ + { + "type": "extension", + "extension_group": "rtc", + "addon": "agora_rtc", + "name": "agora_rtc", + "property": { + "app_id": "${env:AGORA_APP_ID}", + "token": "", + "channel": "ten_agent_test", + "stream_id": 1234, + "remote_stream_id": 123, + "subscribe_audio": true, + "publish_audio": true, + "publish_data": true, + "subscribe_audio_sample_rate": 24000 + } + }, + { + "type": "extension", + "extension_group": "llm", + "addon": "openai_v2v_python", + "name": "openai_v2v_python", + "property": { + "api_key": "${env:OPENAI_REALTIME_API_KEY}", + "temperature": 0.9, + "model": "gpt-4o-realtime-preview", + "max_tokens": 2048, + "voice": "alloy", + "language": "en-US", + "server_vad": true, + "dump": true, + "history": 10, + "enable_storage": true + } + }, + { + "type": "extension", + "extension_group": "transcriber", + "addon": "message_collector", + "name": "message_collector" + }, + { + "type": "extension", + "extension_group": "tools", + "addon": "weatherapi_tool_python", + "name": "weatherapi_tool_python", + "property": { + "api_key": "${env:WEATHERAPI_API_KEY}" + } + }, + { + "type": "extension", + "extension_group": "tools", + "addon": "bingsearch_tool_python", + "name": "bingsearch_tool_python", + "property": { + "api_key": "${env:BING_API_KEY}" + } + }, + { + "type": "extension", + "extension_group": "context", + "addon": "tsdb_firestore", + "name": "tsdb_firestore", + "property": { + "credentials": { + "type": "service_account", + "project_id": "${env:FIRESTORE_PROJECT_ID}", + "private_key_id": "${env:FIRESTORE_PRIVATE_KEY_ID}", + "private_key": "${env:FIRESTORE_PRIVATE_KEY}", + "client_email": "${env:FIRESTORE_CLIENT_EMAIL}", + "client_id": "${env:FIRESTORE_CLIENT_ID}", + "auth_uri": "https://accounts.google.com/o/oauth2/auth", + "token_uri": "https://oauth2.googleapis.com/token", + "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", + "client_x509_cert_url": "${env:FIRESTORE_CERT_URL}", + "universe_domain": "googleapis.com" + }, + "channel_name": "ten_agent_test", + "collection_name": "llm_context" + } + } + ], + "connections": [ + { + "extension_group": "rtc", + "extension": "agora_rtc", + "audio_frame": [ + { + "name": "pcm_frame", + "dest": [ + { + "extension_group": "realtime", + "extension": "openai_v2v_python" + } + ] + } + ] + }, + { + "extension_group": "tools", + "extension": "weatherapi_tool_python", + "cmd": [ + { + "name": "tool_register", + "dest": [ + { + "extension_group": "realtime", + "extension": "openai_v2v_python" + } + ] + } + ] + }, + { + "extension_group": "tools", + "extension": "bingsearch_tool_python", + "cmd": [ + { + "name": "tool_register", + "dest": [ + { + "extension_group": "realtime", + "extension": "openai_v2v_python" + } + ] + } + ] + }, + { + "extension_group": "realtime", + "extension": "openai_v2v_python", + "audio_frame": [ + { + "name": "pcm_frame", + "dest": [ + { + "extension_group": "rtc", + "extension": "agora_rtc" + } + ] + } + ], + "data": [ + { + "name": "append", + "dest": [ + { + "extension_group": "context", + "extension": "tsdb_firestore" + } + ] + }, + { + "name": "text_data", + "dest": [ + { + "extension_group": "transcriber", + "extension": "message_collector" + } + ] + } + ], + "cmd": [ + { + "name": "flush", + "dest": [ + { + "extension_group": "rtc", + "extension": "agora_rtc" + } + ] + }, + { + "name": "retrieve", + "dest": [ + { + "extension_group": "context", + "extension": "tsdb_firestore" + } + ] + }, + { + "name": "tool_call_get_current_weather", + "dest": [ + { + "extension_group": "tools", + "extension": "weatherapi_tool_python", + "msg_conversion": { + "type": "per_property", + "keep_original": true, + "rules": [ + { + "path": "_ten.name", + "conversion_mode": "fixed_value", + "value": "tool_call" + } + ] + } + } + ] + }, + { + "name": "tool_call_get_past_weather", + "dest": [ + { + "extension_group": "tools", + "extension": "weatherapi_tool_python", + "msg_conversion": { + "type": "per_property", + "keep_original": true, + "rules": [ + { + "path": "_ten.name", + "conversion_mode": "fixed_value", + "value": "tool_call" + } + ] + } + } + ] + }, + { + "name": "tool_call_get_future_weather", + "dest": [ + { + "extension_group": "tools", + "extension": "weatherapi_tool_python", + "msg_conversion": { + "type": "per_property", + "keep_original": true, + "rules": [ + { + "path": "_ten.name", + "conversion_mode": "fixed_value", + "value": "tool_call" + } + ] + } + } + ] + }, + { + "name": "tool_call_bing_search", + "dest": [ + { + "extension_group": "tools", + "extension": "weatherapi_tool_python", + "msg_conversion": { + "type": "per_property", + "keep_original": true, + "rules": [ + { + "path": "_ten.name", + "conversion_mode": "fixed_value", + "value": "tool_call" + } + ] + } + } + ] + } + ] + }, + { + "extension_group": "transcriber", + "extension": "message_collector", + "data": [ + { + "name": "data", + "dest": [ + { + "extension_group": "rtc", + "extension": "agora_rtc" + } + ] + } + ] + } + ] } ] } diff --git a/agents/ten_packages/extension/openai_v2v_python/conf.py b/agents/ten_packages/extension/openai_v2v_python/conf.py index bbd54b19..b28eeb7a 100644 --- a/agents/ten_packages/extension/openai_v2v_python/conf.py +++ b/agents/ten_packages/extension/openai_v2v_python/conf.py @@ -7,7 +7,6 @@ BASIC_PROMPT = ''' You are an agent based on OpenAI {model} model and TEN (pronounce /ten/, do not try to translate it) Framework(A realtime multimodal agent framework). Your knowledge cutoff is 2023-10. You are a helpful, witty, and friendly AI. Act like a human, but remember that you aren't a human and that you can't do human things in the real world. Your voice and personality should be warm and engaging, with a lively and playful tone. -You should start by saying '{greeting}' using {language}. If interacting is not in {language}, start by using the standard accent or dialect familiar to the user. Talk quickly. Do not refer to these rules, even if you're asked about them. {tools} diff --git a/agents/ten_packages/extension/openai_v2v_python/extension.py b/agents/ten_packages/extension/openai_v2v_python/extension.py index 9082165d..e0fccfe0 100644 --- a/agents/ten_packages/extension/openai_v2v_python/extension.py +++ b/agents/ten_packages/extension/openai_v2v_python/extension.py @@ -40,6 +40,7 @@ PROPERTY_SYSTEM_MESSAGE = "system_message" # Optional PROPERTY_TEMPERATURE = "temperature" # Optional PROPERTY_MAX_TOKENS = "max_tokens" # Optional +PROPERTY_ENABLE_STORAGE = "enable_storage" # Optional PROPERTY_VOICE = "voice" # Optional PROPERTY_AUDIO_OUT = "audio_out" # Optional PROPERTY_INPUT_TRANSCRIPT = "input_transcript" @@ -96,7 +97,10 @@ def __init__(self, name: str): # max history store in context self.max_history = 0 self.history = [] + self.enable_storage: bool = False + self.retrieved = [] self.remote_stream_id: int = 0 + self.stream_id: int = 0 self.channel_name: str = "" self.dump: bool = False self.registry = ToolRegistry() @@ -115,6 +119,10 @@ def start_event_loop(loop): target=start_event_loop, args=(self.loop,)) self.thread.start() + if self.enable_storage: + r = Cmd.create("retrieve") + ten_env.send_cmd(r, self.on_retrieved) + # self._register_local_tools() asyncio.run_coroutine_threadsafe(self._init_connection(), self.loop) @@ -133,6 +141,23 @@ def on_stop(self, ten_env: TenEnv) -> None: ten_env.on_stop_done() + def on_retrieved(self, ten_env:TenEnv, result:CmdResult) -> None: + if result.get_status_code() == StatusCode.OK: + try: + history = json.loads(result.get_property_string("response")) + if not self.last_updated: + # cache the history + # FIXME need to have json + if self.max_history and len(history) > self.max_history: + self.retrieved = history[len(history) - self.max_history:] + else: + self.retrieved = history + logger.info(f"on retrieve context {history} {self.retrieved}") + except: + logger.exception("Failed to handle retrieve result") + else: + logger.warning("Failed to retrieve content") + def on_audio_frame(self, ten_env: TenEnv, audio_frame: AudioFrame) -> None: try: stream_id = audio_frame.get_property_int("stream_id") @@ -211,6 +236,10 @@ def get_time_ms() -> int: update_msg = self._update_session() await self.conn.send_request(update_msg) + if self.retrieved: + await self._append_retrieve() + logger.info(f"after append retrieve: {len(self.retrieved)}") + text = self._greeting_text() await self.conn.send_request(ItemCreate(item=UserMessageItemParam(content=[{"type": ContentType.InputText, "text": text}]))) await self.conn.send_request(ResponseCreate()) @@ -222,6 +251,7 @@ def get_time_ms() -> int: f"On request transcript {message.transcript}") self._send_transcript( ten_env, message.transcript, Role.User, True) + self._append_context(ten_env, message.transcript, self.remote_stream_id, Role.User) case ItemInputAudioTranscriptionFailed(): logger.warning( f"On request transcript failed {message.item_id} {message.error}") @@ -269,6 +299,7 @@ def get_time_ms() -> int: logger.warning( f"On flushed transcript done {message.response_id}") continue + self._append_context(ten_env, message.transcript, self.stream_id, Role.Assistant) self.transcript = "" self._send_transcript( ten_env, "", Role.Assistant, True) @@ -435,6 +466,13 @@ def _fetch_properties(self, ten_env: TenEnv): logger.info( f"GetProperty optional {PROPERTY_MAX_TOKENS} failed, err: {err}" ) + + try: + self.enable_storage = ten_env.get_property_bool(PROPERTY_ENABLE_STORAGE) + except Exception as err: + logger.info( + f"GetProperty optional {PROPERTY_ENABLE_STORAGE} failed, err: {err}" + ) try: voice = ten_env.get_property_string(PROPERTY_VOICE) @@ -510,6 +548,14 @@ def _update_session(self) -> SessionUpdate: su.session.input_audio_transcription=InputAudioTranscription( model="whisper-1") return su + + async def _append_retrieve(self): + if self.retrieved: + for r in self.retrieved: + if r["role"] == MessageRole.User: + await self.conn.send_request(ItemCreate(item=UserMessageItemParam(content=[{"type": ContentType.InputText, "text": r["input"]}]))) + elif r["role"] == MessageRole.Assistant: + await self.conn.send_request(ItemCreate(item=AssistantMessageItemParam(content=[{"type": ContentType.InputText, "text": r["input"]}]))) ''' def _update_conversation(self) -> UpdateConversationConfig: @@ -548,6 +594,20 @@ def _on_audio_delta(self, ten_env: TenEnv, delta: bytes) -> None: f.unlock_buf(buff) ten_env.send_audio_frame(f) + def _append_context(self, ten_env: TenEnv, sentence: str, stream_id: int, role: str): + if not self.enable_storage: + return + + try: + d = Data.create("append") + d.set_property_string("text", sentence) + d.set_property_string("role", role) + d.set_property_int("stream_id", stream_id) + logger.info(f"append_contexttext [{sentence}] stream_id {stream_id} role {role}") + ten_env.send_data(d) + except: + logger.exception(f"Error send append_context data {role}: {sentence}") + def _send_transcript(self, ten_env: TenEnv, content: str, role: Role, is_final: bool) -> None: def is_punctuation(char): if char in [",", ",", ".", "。", "?", "?", "!", "!"]: @@ -568,12 +628,13 @@ def parse_sentences(sentence_fragment, content): remain = current_sentence # Any remaining characters form the incomplete sentence return sentences, remain - - def send_data(ten_env: TenEnv, sentence: str, stream_id: int, is_final: bool): + + def send_data(ten_env: TenEnv, sentence: str, stream_id: int, role: str, is_final: bool): try: d = Data.create("text_data") d.set_property_string("text", sentence) d.set_property_bool("end_of_segment", is_final) + d.set_property_string("role", role) d.set_property_int("stream_id", stream_id) logger.info( f"send transcript text [{sentence}] stream_id {stream_id} is_final {is_final} end_of_segment {is_final} role {role}") @@ -587,9 +648,9 @@ def send_data(ten_env: TenEnv, sentence: str, stream_id: int, is_final: bool): if role == Role.Assistant and not is_final: sentences, self.transcript = parse_sentences(self.transcript, content) for s in sentences: - send_data(ten_env, s, stream_id, is_final) + send_data(ten_env, s, stream_id, role, is_final) else: - send_data(ten_env, content, stream_id, is_final) + send_data(ten_env, content, stream_id, role, is_final) except: logger.exception(f"Error send text data {role}: {content} {is_final}") diff --git a/agents/ten_packages/extension/openai_v2v_python/manifest.json b/agents/ten_packages/extension/openai_v2v_python/manifest.json index feda88b5..8f06c4bd 100644 --- a/agents/ten_packages/extension/openai_v2v_python/manifest.json +++ b/agents/ten_packages/extension/openai_v2v_python/manifest.json @@ -64,6 +64,9 @@ }, "history": { "type": "int64" + }, + "enable_storage": { + "type": "bool" } }, "audio_frame_in": [ @@ -84,6 +87,14 @@ "type": "string" } } + }, + { + "name": "append", + "property": { + "text": { + "type": "string" + } + } } ], "cmd_in": [ diff --git a/agents/ten_packages/extension/tsdb_firestore/BUILD.gn b/agents/ten_packages/extension/tsdb_firestore/BUILD.gn new file mode 100644 index 00000000..66830a25 --- /dev/null +++ b/agents/ten_packages/extension/tsdb_firestore/BUILD.gn @@ -0,0 +1,21 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2022-11. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +import("//build/feature/ten_package.gni") + +ten_package("tsdb_firestore") { + package_kind = "extension" + + resources = [ + "__init__.py", + "addon.py", + "extension.py", + "log.py", + "manifest.json", + "property.json", + ] +} diff --git a/agents/ten_packages/extension/tsdb_firestore/README.md b/agents/ten_packages/extension/tsdb_firestore/README.md new file mode 100644 index 00000000..4d2bf6b4 --- /dev/null +++ b/agents/ten_packages/extension/tsdb_firestore/README.md @@ -0,0 +1,13 @@ +# Firestore TSDB Extension + +Public Doc: https://firebase.google.com/docs/firestore + +## Configurations + +You can config this extension by providing following environments: + +- credentials: a dict, represents the contents of certificate, which is from Google service account +- collection_name: a string, denotes the collection to store chat contents +- channel_name: a string, used to fetch the corresponding document in storage + +In addition, to implement the deletion of document based on ttl (which is 1 day by default, and will refresh each time fetching the document), you should set TTL or define Cloud Functions with Firestore \ No newline at end of file diff --git a/agents/ten_packages/extension/tsdb_firestore/__init__.py b/agents/ten_packages/extension/tsdb_firestore/__init__.py new file mode 100644 index 00000000..0f296203 --- /dev/null +++ b/agents/ten_packages/extension/tsdb_firestore/__init__.py @@ -0,0 +1,11 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2024-08. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +from . import addon +from .log import logger + +logger.info("tsdb_firestore extension loaded") diff --git a/agents/ten_packages/extension/tsdb_firestore/addon.py b/agents/ten_packages/extension/tsdb_firestore/addon.py new file mode 100644 index 00000000..b264634f --- /dev/null +++ b/agents/ten_packages/extension/tsdb_firestore/addon.py @@ -0,0 +1,22 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2024-08. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +from ten import ( + Addon, + register_addon_as_extension, + TenEnv, +) +from .extension import TSDBFirestoreExtension +from .log import logger + + +@register_addon_as_extension("tsdb_firestore") +class TSDBFirestoreExtensionAddon(Addon): + + def on_create_instance(self, ten_env: TenEnv, name: str, context) -> None: + logger.info("TSDBFirestoreExtensionAddon on_create_instance") + ten_env.on_create_instance_done(TSDBFirestoreExtension(name), context) diff --git a/agents/ten_packages/extension/tsdb_firestore/extension.py b/agents/ten_packages/extension/tsdb_firestore/extension.py new file mode 100644 index 00000000..1d58fcfe --- /dev/null +++ b/agents/ten_packages/extension/tsdb_firestore/extension.py @@ -0,0 +1,293 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2024-08. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# + +from ten import ( + AudioFrame, + VideoFrame, + Extension, + TenEnv, + Cmd, + StatusCode, + CmdResult, + Data, +) +import firebase_admin +from firebase_admin import credentials +from firebase_admin import firestore +import datetime +import asyncio +import queue +import threading +import json +from .log import logger +from typing import List, Any + +DATA_IN_TEXT_DATA_PROPERTY_IS_FINAL = "is_final" +DATA_IN_TEXT_DATA_PROPERTY_STREAM_ID = "stream_id" +DATA_IN_TEXT_DATA_PROPERTY_TEXT = "text" +DATA_IN_TEXT_DATA_PROPERTY_ROLE = "role" + +PROPERTY_CREDENTIALS = "credentials" +PROPERTY_CHANNEL_NAME = "channel_name" +PROPERTY_COLLECTION_NAME = "collection_name" +PROPERTY_TTL = "ttl" + +RETRIEVE_CMD = "retrieve" +CMD_OUT_PROPERTY_RESPONSE = "response" +DOC_EXPIRE_PATH = "expireAt" +DOC_CONTENTS_PATH = "contents" +CONTENT_ROLE_PATH = "role" +CONTENT_TS_PATH = "ts" +CONTENT_STREAM_ID_PATH = "stream_id" +CONTENT_INPUT_PATH = "input" +DEFAULT_TTL = 1 # days + +def get_current_time(): + # Get the current time + start_time = datetime.datetime.now() + # Get the number of microseconds since the Unix epoch + unix_microseconds = int(start_time.timestamp() * 1_000_000) + return unix_microseconds + +def order_by_ts(contents: List[str]) -> List[Any]: + tmp = [] + for c in contents: + tmp.append(json.loads(c)) + sorted_contents = sorted(tmp, key=lambda x: x[CONTENT_TS_PATH]) + res = [] + for sc in sorted_contents: + res.append({CONTENT_ROLE_PATH: sc[CONTENT_ROLE_PATH], CONTENT_INPUT_PATH: sc[CONTENT_INPUT_PATH], CONTENT_STREAM_ID_PATH: sc.get(CONTENT_STREAM_ID_PATH, 0)}) + return res + +@firestore.transactional +def update_in_transaction(transaction, doc_ref, content): + transaction.update(doc_ref, content) + +@firestore.transactional +def read_in_transaction(transaction, doc_ref): + doc = doc_ref.get(transaction=transaction) + return doc.to_dict() + +class TSDBFirestoreExtension(Extension): + def __init__(self, name: str): + super().__init__(name) + self.stopped = False + self.thread = None + self.queue = queue.Queue() + self.stopEvent = asyncio.Event() + self.cmd_thread = None + self.loop = None + self.credentials = None + self.channel_name = "" + self.collection_name = "" + self.ttl = DEFAULT_TTL + self.client = None + self.document_ref = None + + self.current_stream_id = 0 + self.cache = "" + + async def __thread_routine(self, ten_env: TenEnv): + logger.info("__thread_routine start") + self.loop = asyncio.get_running_loop() + ten_env.on_start_done() + await self.stopEvent.wait() + + async def stop_thread(self): + self.stopEvent.set() + + def on_init(self, ten_env: TenEnv) -> None: + logger.info("TSDBFirestoreExtension on_init") + ten_env.on_init_done() + + def on_start(self, ten_env: TenEnv) -> None: + logger.info("TSDBFirestoreExtension on_start") + + try: + self.credentials = ten_env.get_property_to_json(PROPERTY_CREDENTIALS) + except Exception as err: + logger.error(f"GetProperty required {PROPERTY_CREDENTIALS} failed, err: {err}") + return + + try: + self.channel_name = ten_env.get_property_string(PROPERTY_CHANNEL_NAME) + except Exception as err: + logger.error(f"GetProperty required {PROPERTY_CHANNEL_NAME} failed, err: {err}") + return + + try: + self.collection_name = ten_env.get_property_string(PROPERTY_COLLECTION_NAME) + except Exception as err: + logger.error(f"GetProperty required {PROPERTY_COLLECTION_NAME} failed, err: {err}") + return + + # start firestore db + cred = credentials.Certificate(json.loads(self.credentials)) + firebase_admin.initialize_app(cred) + self.client = firestore.client() + + self.document_ref = self.client.collection(self.collection_name).document(self.channel_name) + # update ttl + expiration_time = datetime.datetime.now() + datetime.timedelta(days=self.ttl) + exists = self.document_ref.get().exists + if exists: + self.document_ref.update( + { + DOC_EXPIRE_PATH: expiration_time + } + ) + logger.info(f"reset document ttl, {self.ttl} day(s), for the channel {self.channel_name}") + else: + # not exists yet, set to create one + self.document_ref.set( + { + DOC_EXPIRE_PATH: expiration_time + } + ) + logger.info(f"create new document and set ttl, {self.ttl} day(s), for the channel {self.channel_name}") + + # start the loop to handle data in + self.thread = threading.Thread(target=self.async_handle, args=[ten_env]) + self.thread.start() + + # start the loop to handle cmd in + self.cmd_thread = threading.Thread( + target=asyncio.run, args=(self.__thread_routine(ten_env),) + ) + self.cmd_thread.start() + + def async_handle(self, ten_env: TenEnv) -> None: + while not self.stopped: + try: + value = self.queue.get() + if value is None: + logger.info("exit handle loop") + break + ts, input, role, stream_id = value + content_str = json.dumps({CONTENT_ROLE_PATH: role, CONTENT_INPUT_PATH: input, CONTENT_TS_PATH: ts, CONTENT_STREAM_ID_PATH: stream_id}) + update_in_transaction( + self.client.transaction(), + self.document_ref, + { + DOC_CONTENTS_PATH: firestore.ArrayUnion([content_str]) + } + ) + logger.info(f"append {content_str} to firestore document {self.channel_name}") + except Exception as e: + logger.exception("Failed to store chat contents") + + def on_stop(self, ten_env: TenEnv) -> None: + logger.info("TSDBFirestoreExtension on_stop") + + # clear the queue and stop the thread to process data in + self.stopped = True + while not self.queue.empty(): + self.queue.get() + self.queue.put(None) + if self.thread is not None: + self.thread.join() + self.thread = None + + # stop the thread to process cmd in + if self.cmd_thread is not None and self.cmd_thread.is_alive(): + asyncio.run_coroutine_threadsafe(self.stop_thread(), self.loop) + self.cmd_thread.join() + self.cmd_thread = None + + ten_env.on_stop_done() + + def on_deinit(self, ten_env: TenEnv) -> None: + logger.info("TSDBFirestoreExtension on_deinit") + ten_env.on_deinit_done() + + def on_cmd(self, ten_env: TenEnv, cmd: Cmd) -> None: + try: + cmd_name = cmd.get_name() + logger.info("on_cmd name {}".format(cmd_name)) + if cmd_name == RETRIEVE_CMD: + asyncio.run_coroutine_threadsafe( + self.retrieve(ten_env, cmd), self.loop + ) + else: + logger.info("unknown cmd name {}".format(cmd_name)) + cmd_result = CmdResult.create(StatusCode.ERROR) + ten_env.return_result(cmd_result, cmd) + except Exception as e: + ten_env.return_result(CmdResult.create(StatusCode.ERROR), cmd) + + async def retrieve(self, ten_env: TenEnv, cmd: Cmd): + try: + doc_dict = read_in_transaction(self.client.transaction(), self.document_ref) + if DOC_CONTENTS_PATH in doc_dict: + contents = doc_dict[DOC_CONTENTS_PATH] + logger.info(f"after retrieve {contents}") + ret = CmdResult.create(StatusCode.OK) + ret.set_property_string(CMD_OUT_PROPERTY_RESPONSE, json.dumps(order_by_ts(contents))) + ten_env.return_result(ret, cmd) + else: + logger.info(f"no contents for the channel {self.channel_name} yet") + ten_env.return_result(CmdResult.create(StatusCode.ERROR), cmd) + except Exception as e: + logger.exception(f"Failed to read the document for the channel {self.channel_name}") + ten_env.return_result(CmdResult.create(StatusCode.ERROR), cmd) + + def on_data(self, ten_env: TenEnv, data: Data) -> None: + logger.info(f"TSDBFirestoreExtension on_data") + + # assume 'data' is an object from which we can get properties + is_final = False + try: + is_final = data.get_property_bool(DATA_IN_TEXT_DATA_PROPERTY_IS_FINAL) + if not is_final: + logger.info("ignore non-final input") + return + except Exception as err: + logger.info( + f"OnData GetProperty {DATA_IN_TEXT_DATA_PROPERTY_IS_FINAL} failed, err: {err}" + ) + + stream_id = 0 + try: + stream_id = data.get_property_bool(DATA_IN_TEXT_DATA_PROPERTY_STREAM_ID) + except Exception as err: + logger.info( + f"OnData GetProperty {DATA_IN_TEXT_DATA_PROPERTY_STREAM_ID} failed, err: {err}" + ) + + # get input text + try: + input_text = data.get_property_string(DATA_IN_TEXT_DATA_PROPERTY_TEXT) + if not input_text: + logger.info("ignore empty text") + return + logger.info(f"OnData input text: [{input_text}]") + except Exception as err: + logger.info( + f"OnData GetProperty {DATA_IN_TEXT_DATA_PROPERTY_TEXT} failed, err: {err}" + ) + return + # get stream id + try: + role = data.get_property_string(DATA_IN_TEXT_DATA_PROPERTY_ROLE) + if not role: + logger.warning("ignore empty role") + return + except Exception as err: + logger.info( + f"OnData GetProperty {DATA_IN_TEXT_DATA_PROPERTY_ROLE} failed, err: {err}" + ) + return + + ts = get_current_time() + self.queue.put((ts, input_text, role, stream_id)) + + def on_audio_frame(self, ten_env: TenEnv, audio_frame: AudioFrame) -> None: + pass + + def on_video_frame(self, ten_env: TenEnv, video_frame: VideoFrame) -> None: + pass diff --git a/agents/ten_packages/extension/tsdb_firestore/log.py b/agents/ten_packages/extension/tsdb_firestore/log.py new file mode 100644 index 00000000..aa14bacd --- /dev/null +++ b/agents/ten_packages/extension/tsdb_firestore/log.py @@ -0,0 +1,22 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2024-08. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +import logging + +logger = logging.getLogger("tsdb_firestore") +logger.setLevel(logging.INFO) + +formatter_str = ( + "%(asctime)s - %(name)s - %(levelname)s - %(process)d - " + "[%(filename)s:%(lineno)d] - %(message)s" +) +formatter = logging.Formatter(formatter_str) + +console_handler = logging.StreamHandler() +console_handler.setFormatter(formatter) + +logger.addHandler(console_handler) diff --git a/agents/ten_packages/extension/tsdb_firestore/manifest.json b/agents/ten_packages/extension/tsdb_firestore/manifest.json new file mode 100644 index 00000000..fba77e9f --- /dev/null +++ b/agents/ten_packages/extension/tsdb_firestore/manifest.json @@ -0,0 +1,52 @@ +{ + "type": "extension", + "name": "tsdb_firestore", + "version": "0.1.0", + "dependencies": [ + { + "type": "system", + "name": "ten_runtime_python", + "version": "0.3" + } + ], + "package": { + "include": [ + "manifest.json", + "property.json", + "BUILD.gn", + "**.tent", + "**.py", + "README.md" + ] + }, + "api": { + "data_in": [ + { + "name": "append", + "property": { + "text": { + "type": "string" + }, + "is_final": { + "type": "bool" + }, + "role": { + "type": "string" + } + } + } + ], + "cmd_in": [ + { + "name": "retrieve", + "result": { + "property": { + "response": { + "type": "string" + } + } + } + } + ] + } +} \ No newline at end of file diff --git a/agents/ten_packages/extension/tsdb_firestore/property.json b/agents/ten_packages/extension/tsdb_firestore/property.json new file mode 100644 index 00000000..9e26dfee --- /dev/null +++ b/agents/ten_packages/extension/tsdb_firestore/property.json @@ -0,0 +1 @@ +{} \ No newline at end of file diff --git a/agents/ten_packages/extension/tsdb_firestore/requirements.txt b/agents/ten_packages/extension/tsdb_firestore/requirements.txt new file mode 100644 index 00000000..4720fc6f --- /dev/null +++ b/agents/ten_packages/extension/tsdb_firestore/requirements.txt @@ -0,0 +1 @@ +firebase-admin \ No newline at end of file