diff --git a/agents/property.json b/agents/property.json index 5639bf5a..12c36d29 100644 --- a/agents/property.json +++ b/agents/property.json @@ -2,6 +2,224 @@ "_ten": { "log_level": 3, "predefined_graphs": [ + { + "name": "va.openai.azure.fashionai", + "auto_start": true, + "connections": [ + { + "app": "localhost", + "data": [ + { + "dest": [ + { + "app": "localhost", + "extension": "interrupt_detector", + "extension_group": "default" + }, + { + "app": "localhost", + "extension": "openai_chatgpt", + "extension_group": "chatgpt" + }, + { + "app": "localhost", + "extension": "message_collector", + "extension_group": "transcriber" + } + ], + "name": "text_data" + } + ], + "extension": "agora_rtc", + "extension_group": "default" + }, + { + "app": "localhost", + "cmd": [ + { + "dest": [ + { + "app": "localhost", + "extension": "azure_tts", + "extension_group": "tts" + }, + { + "app": "localhost", + "extension": "fashionai", + "extension_group": "default" + } + ], + "name": "flush" + } + ], + "data": [ + { + "dest": [ + { + "app": "localhost", + "extension": "message_collector", + "extension_group": "transcriber" + }, + { + "app": "localhost", + "extension": "fashionai", + "extension_group": "default" + } + ], + "name": "text_data" + } + ], + "extension": "openai_chatgpt", + "extension_group": "chatgpt" + }, + { + "app": "localhost", + "audio_frame": [ + { + "dest": [ + { + "app": "localhost", + "extension": "agora_rtc", + "extension_group": "default" + } + ], + "name": "pcm_frame" + } + ], + "cmd": [ + { + "dest": [ + { + "app": "localhost", + "extension": "agora_rtc", + "extension_group": "default" + } + ], + "name": "flush" + } + ], + "extension": "azure_tts", + "extension_group": "tts" + }, + { + "app": "localhost", + "data": [ + { + "dest": [ + { + "app": "localhost", + "extension": "agora_rtc", + "extension_group": "default" + } + ], + "name": "data" + } + ], + "extension": "message_collector", + "extension_group": "transcriber" + }, + { + "app": "localhost", + "cmd": [ + { + "dest": [ + { + "app": "localhost", + "extension": "openai_chatgpt", + "extension_group": "chatgpt" + } + ], + "name": "flush" + } + ], + "extension": "interrupt_detector", + "extension_group": "default" + } + ], + "nodes": [ + { + "addon": "agora_rtc", + "app": "localhost", + "extension_group": "default", + "name": "agora_rtc", + "property": { + "agora_asr_language": "en-US", + "agora_asr_session_control_file_path": "session_control.conf", + "agora_asr_vendor_key": "$AZURE_STT_KEY", + "agora_asr_vendor_name": "microsoft", + "agora_asr_vendor_region": "$AZURE_STT_REGION", + "app_id": "$AGORA_APP_ID", + "channel": "astra_agents_test", + "enable_agora_asr": true, + "publish_audio": true, + "publish_data": true, + "remote_stream_id": 123, + "stream_id": 1234, + "subscribe_audio": true, + "token": "" + }, + "type": "extension" + }, + { + "addon": "interrupt_detector", + "app": "localhost", + "extension_group": "default", + "name": "interrupt_detector", + "type": "extension" + }, + { + "addon": "openai_chatgpt", + "app": "localhost", + "extension_group": "chatgpt", + "name": "openai_chatgpt", + "property": { + "api_key": "$OPENAI_API_KEY", + "base_url": "", + "frequency_penalty": 0.9, + "greeting": "TEN Agent connected. How can I help you today?", + "max_memory_length": 10, + "max_tokens": 512, + "model": "gpt-4o-mini", + "prompt": "", + "proxy_url": "$OPENAI_PROXY_URL" + }, + "type": "extension" + }, + { + "addon": "azure_tts", + "app": "localhost", + "extension_group": "tts", + "name": "azure_tts", + "property": { + "azure_subscription_key": "$AZURE_TTS_KEY", + "azure_subscription_region": "$AZURE_TTS_REGION", + "azure_synthesis_voice_name": "en-US-JaneNeural" + }, + "type": "extension" + }, + { + "addon": "message_collector", + "app": "localhost", + "extension_group": "transcriber", + "name": "message_collector", + "type": "extension" + }, + { + "addon": "fashionai", + "app": "localhost", + "extension_group": "default", + "name": "fashionai", + "property": { + "app_id": "$AGORA_APP_ID", + "channel": "astra_agents_test", + "stream_id": 12345, + "token": "", + "service_id": "agora" + }, + "type": "extension" + } + ] + }, { "name": "va.openai.azure", "auto_start": false, diff --git a/agents/ten_packages/extension/fashionai/BUILD.gn b/agents/ten_packages/extension/fashionai/BUILD.gn new file mode 100644 index 00000000..6a6765bb --- /dev/null +++ b/agents/ten_packages/extension/fashionai/BUILD.gn @@ -0,0 +1,22 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2022-11. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +import("//build/feature/ten_package.gni") + +ten_package("fashionai") { + package_kind = "extension" + + resources = [ + "__init__.py", + "manifest.json", + "property.json", + "src/__init__.py", + "src/addon.py", + "src/extension.py", + "src/log.py", + ] +} diff --git a/agents/ten_packages/extension/fashionai/README.md b/agents/ten_packages/extension/fashionai/README.md new file mode 100644 index 00000000..5af79bac --- /dev/null +++ b/agents/ten_packages/extension/fashionai/README.md @@ -0,0 +1,29 @@ +# fashionai + + + +## Features + + + +- xxx feature + +## API + +Refer to `api` definition in [manifest.json] and default values in [property.json](property.json). + + + +## Development + +### Build + + + +### Unit test + + + +## Misc + + diff --git a/agents/ten_packages/extension/fashionai/__init__.py b/agents/ten_packages/extension/fashionai/__init__.py new file mode 100644 index 00000000..f06f1641 --- /dev/null +++ b/agents/ten_packages/extension/fashionai/__init__.py @@ -0,0 +1,11 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2024-08. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +from .src import addon +from .src.log import logger + +logger.info("fashionai extension loaded") diff --git a/agents/ten_packages/extension/fashionai/manifest.json b/agents/ten_packages/extension/fashionai/manifest.json new file mode 100644 index 00000000..521a7efb --- /dev/null +++ b/agents/ten_packages/extension/fashionai/manifest.json @@ -0,0 +1,63 @@ +{ + "type": "extension", + "name": "fashionai", + "version": "0.1.0", + "dependencies": [ + { + "type": "system", + "name": "ten_runtime_python", + "version": "0.2" + } + ], + "package": { + "include": [ + "manifest.json", + "property.json", + "BUILD.gn", + "**.tent", + "**.py", + "src/**.tent", + "src/**.py", + "README.md" + ] + }, + "api": { + "property": { + "app_id": { + "type": "string" + }, + "token": { + "type": "string" + }, + "channel": { + "type": "string" + }, + "stream_id": { + "type": "uint32" + }, + "service_id": { + "type": "string" + } + }, + "data_in": [ + { + "name": "text_data", + "property": { + "text": { + "type": "string" + } + } + } + ], + "cmd_in": [ + { + "name": "flush" + } + ], + "cmd_out": [ + { + "name": "flush" + } + ] + } +} \ No newline at end of file diff --git a/agents/ten_packages/extension/fashionai/property.json b/agents/ten_packages/extension/fashionai/property.json new file mode 100644 index 00000000..9e26dfee --- /dev/null +++ b/agents/ten_packages/extension/fashionai/property.json @@ -0,0 +1 @@ +{} \ No newline at end of file diff --git a/agents/ten_packages/extension/fashionai/requirements.txt b/agents/ten_packages/extension/fashionai/requirements.txt new file mode 100644 index 00000000..7a389117 --- /dev/null +++ b/agents/ten_packages/extension/fashionai/requirements.txt @@ -0,0 +1 @@ +websockets \ No newline at end of file diff --git a/agents/ten_packages/extension/fashionai/src/__init__.py b/agents/ten_packages/extension/fashionai/src/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/agents/ten_packages/extension/fashionai/src/addon.py b/agents/ten_packages/extension/fashionai/src/addon.py new file mode 100644 index 00000000..c0378773 --- /dev/null +++ b/agents/ten_packages/extension/fashionai/src/addon.py @@ -0,0 +1,22 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2024-08. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +from ten import ( + Addon, + register_addon_as_extension, + TenEnv, +) +from .extension import FashionAIExtension +from .log import logger + + +@register_addon_as_extension("fashionai") +class FashionAIExtensionAddon(Addon): + + def on_create_instance(self, ten_env: TenEnv, name: str, context) -> None: + logger.info("FashionAIExtensionAddon on_create_instance") + ten_env.on_create_instance_done(FashionAIExtension(name), context) diff --git a/agents/ten_packages/extension/fashionai/src/extension.py b/agents/ten_packages/extension/fashionai/src/extension.py new file mode 100644 index 00000000..e629f798 --- /dev/null +++ b/agents/ten_packages/extension/fashionai/src/extension.py @@ -0,0 +1,161 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2024-08. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +from ten import ( + AudioFrame, + VideoFrame, + Extension, + TenEnv, + Cmd, + StatusCode, + CmdResult, + Data, +) + +from .log import logger +import asyncio +from .fashionai_client import FashionAIClient +import threading +from datetime import datetime + +class FashionAIExtension(Extension): + app_id = "" + token = "" + channel = "" + stream_id = 0 + service_id = "agora" + + def on_init(self, ten_env: TenEnv) -> None: + logger.info("FASHION_AI on_init *********************************************************") + self.stopped = False + self.queue = asyncio.Queue(maxsize=3000) + self.threadWebsocketLoop = None + + ten_env.on_init_done() + + def on_start(self, ten_env: TenEnv) -> None: + logger.info("FASHION_AI on_start *********************************************************") + + # TODO: read properties, initialize resources + try: + self.app_id = ten_env.get_property_string("app_id") + self.token = ten_env.get_property_string("token") + self.channel = ten_env.get_property_string("channel") + self.stream_id = str(ten_env.get_property_int("stream_id")) + self.service_id = ten_env.get_property_string("service_id") + + logger.info(f"FASHION_AI on_start: app_id = {self.app_id}, token = {self.token}, channel = {self.channel}, stream_id = {self.stream_id}, service_id = {self.service_id}") + except Exception as e: + logger.warning(f"get_property err: {e}") + + if len(self.token) > 0: + self.app_id = self.token + self.client = FashionAIClient("wss://ingress.service.fasionai.com/websocket/node7/server1", self.service_id) + + def thread_target(): + self.threadWebsocketLoop = asyncio.new_event_loop() + asyncio.set_event_loop(self.threadWebsocketLoop) + self.threadWebsocketLoop.run_until_complete(self.init_fashionai(self.app_id, self.channel, self.stream_id)) + + self.threadWebsocket = threading.Thread(target=thread_target) + self.threadWebsocket.start() + + ten_env.on_start_done() + + def on_stop(self, ten_env: TenEnv) -> None: + logger.info("FASHION_AI on_stop") + self.stopped = True + asyncio.run_coroutine_threadsafe(self.queue.put(None), self.threadWebsocketLoop) + asyncio.run_coroutine_threadsafe(self.flush(), self.threadWebsocketLoop) + + self.threadWebsocket.join() + + ten_env.on_stop_done() + + def on_deinit(self, ten_env: TenEnv) -> None: + logger.info("FASHION_AI on_deinit") + ten_env.on_deinit_done() + + def on_cmd(self, ten_env: TenEnv, cmd: Cmd) -> None: + cmd_name = cmd.get_name() + logger.info("FASHION_AI on_cmd name {}".format(cmd_name)) + + # TODO: process cmd + if cmd_name == "flush": + self.outdate_ts = datetime.now() + try: + asyncio.run_coroutine_threadsafe( + self.flush(), self.threadWebsocketLoop + ).result(timeout=0.1) + except Exception as e: + ten_env.return_result(CmdResult.create(StatusCode.ERROR), cmd) + + cmd_out = Cmd.create("flush") + ten_env.send_cmd(cmd_out, lambda ten, result: logger.info("send_cmd flush done")) + else: + logger.info("unknown cmd {}".format(cmd_name)) + + cmd_result = CmdResult.create(StatusCode.OK) + ten_env.return_result(cmd_result, cmd) + + def on_data(self, ten_env: TenEnv, data: Data) -> None: + # TODO: process data + inputText = data.get_property_string("text") + if len(inputText) == 0: + logger.info("FASHION_AI ignore empty text") + return + + logger.info("FASHION_AI on data %s", inputText) + try: + future = asyncio.run_coroutine_threadsafe( + self.queue.put(inputText), self.threadWebsocketLoop + ) + future.result(timeout=0.1) + except asyncio.TimeoutError: + logger.warning(f"FASHION_AI put inputText={inputText} queue timed out") + except Exception as e: + logger.warning(f"FASHION_AI put inputText={inputText} queue err: {e}") + logger.info("FASHION_AI send_inputText %s", inputText) + + pass + + def on_audio_frame(self, ten_env: TenEnv, audio_frame: AudioFrame) -> None: + # TODO: process pcm frame + pass + + def on_video_frame(self, ten_env: TenEnv, video_frame: VideoFrame) -> None: + # TODO: process image frame + pass + + async def init_fashionai(self, app_id, channel, stream_id): + await self.client.connect() + await self.client.stream_start(app_id, channel, stream_id) + await self.client.render_start() + await self.async_polly_handler() + + async def async_polly_handler(self): + while True: + inputText = await self.queue.get() + if inputText is None: + logger.info("Stopping async_polly_handler...") + break + + logger.info(f"async_polly_handler: loop fashion ai polly.{inputText}") + + if len(inputText) > 0: + try: + await self.client.send_inputText(inputText) + except Exception as e: + logger.exception(e) + + async def flush(self): + logger.info("FASHION_AI flush") + while not self.queue.empty(): + value = await self.queue.get() + if value is None: + break + logger.info(f"Flushing value: {value}") \ No newline at end of file diff --git a/agents/ten_packages/extension/fashionai/src/fashionai_client.py b/agents/ten_packages/extension/fashionai/src/fashionai_client.py new file mode 100644 index 00000000..c6591600 --- /dev/null +++ b/agents/ten_packages/extension/fashionai/src/fashionai_client.py @@ -0,0 +1,86 @@ +import json +import ssl +import uuid + +import websockets +import asyncio + +from .log import logger + + +class FashionAIClient: + def __init__(self, uri, service_id): + self.uri = uri + self.websocket = None + self.service_id = service_id + + async def connect(self): + ssl_context = ssl._create_unverified_context() + self.websocket = await websockets.connect(self.uri, ssl=ssl_context) + + async def stream_start(self, app_id, channel, stream_id): + await self.send_message( + { + "request_id": str(uuid.uuid4()), + "service_id": self.service_id, + "token": app_id, + "channel_id": channel, + "user_id": stream_id, + "signal": "STREAM_START", + } + ) + + async def render_start(self): + await self.send_message( + { + "request_id": str(uuid.uuid4()), + "service_id": self.service_id, + "signal": "RENDER_START", + } + ) + + async def send_inputText(self, inputText): + await self.send_message( + { + "request_id": str(uuid.uuid4()), + "service_id": self.service_id, + "signal": "RENDER_CONTENT", + "text": inputText, + } + ) + + + async def send_message(self, message): + if self.websocket is not None: + try: + await self.websocket.send(json.dumps(message)) + logger.info(f"FASHION_AI Sent: {message}") + response = await asyncio.wait_for(self.websocket.recv(), timeout=2) + logger.info(f"FASHION_AI Received: {response}") + except websockets.exceptions.ConnectionClosedError as e: + logger.info(f"FASHION_AI Connection closed with error: {e}") + await self.reconnect() + except asyncio.TimeoutError: + logger.info("FASHION_AI Timeout waiting for response") + else: + logger.info("FASHION_AI WebSocket is not connected.") + + async def close(self): + if self.websocket is not None: + await self.websocket.close() + logger.info("FASHION_AI WebSocket connection closed.") + else: + logger.info("FASHION_AI WebSocket is not connected.") + + async def reconnect(self): + logger.info("FASHION_AI Reconnecting...") + await self.close() + await self.connect() + + async def heartbeat(self, interval): + while True: + await asyncio.sleep(interval) + try: + await self.send_inputText("ping") + except websockets.exceptions.ConnectionClosedError: + break \ No newline at end of file diff --git a/agents/ten_packages/extension/fashionai/src/log.py b/agents/ten_packages/extension/fashionai/src/log.py new file mode 100644 index 00000000..84b4ca42 --- /dev/null +++ b/agents/ten_packages/extension/fashionai/src/log.py @@ -0,0 +1,22 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2024-08. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +import logging + +logger = logging.getLogger("fashionai") +logger.setLevel(logging.INFO) + +formatter_str = ( + "%(asctime)s - %(name)s - %(levelname)s - %(process)d - " + "[%(filename)s:%(lineno)d] - %(message)s" +) +formatter = logging.Formatter(formatter_str) + +console_handler = logging.StreamHandler() +console_handler.setFormatter(formatter) + +logger.addHandler(console_handler)