From 3d1c299f59c212458737b64d1a0d1dbbc212d7d0 Mon Sep 17 00:00:00 2001 From: caitengwen Date: Wed, 9 Oct 2024 11:28:31 +0800 Subject: [PATCH 01/15] add fashion ai --- .../ten_packages/extension/fashionai/BUILD.gn | 22 ++ .../extension/fashionai/README.md | 29 +++ .../extension/fashionai/__init__.py | 11 + .../extension/fashionai/manifest.json | 60 +++++ .../extension/fashionai/property.json | 1 + .../extension/fashionai/requirements.txt | 1 + .../extension/fashionai/src/__init__.py | 0 .../extension/fashionai/src/addon.py | 22 ++ .../extension/fashionai/src/extension.py | 240 ++++++++++++++++++ .../extension/fashionai/src/log.py | 22 ++ 10 files changed, 408 insertions(+) create mode 100644 agents/ten_packages/extension/fashionai/BUILD.gn create mode 100644 agents/ten_packages/extension/fashionai/README.md create mode 100644 agents/ten_packages/extension/fashionai/__init__.py create mode 100644 agents/ten_packages/extension/fashionai/manifest.json create mode 100644 agents/ten_packages/extension/fashionai/property.json create mode 100644 agents/ten_packages/extension/fashionai/requirements.txt create mode 100644 agents/ten_packages/extension/fashionai/src/__init__.py create mode 100644 agents/ten_packages/extension/fashionai/src/addon.py create mode 100644 agents/ten_packages/extension/fashionai/src/extension.py create mode 100644 agents/ten_packages/extension/fashionai/src/log.py diff --git a/agents/ten_packages/extension/fashionai/BUILD.gn b/agents/ten_packages/extension/fashionai/BUILD.gn new file mode 100644 index 00000000..6a6765bb --- /dev/null +++ b/agents/ten_packages/extension/fashionai/BUILD.gn @@ -0,0 +1,22 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2022-11. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +import("//build/feature/ten_package.gni") + +ten_package("fashionai") { + package_kind = "extension" + + resources = [ + "__init__.py", + "manifest.json", + "property.json", + "src/__init__.py", + "src/addon.py", + "src/extension.py", + "src/log.py", + ] +} diff --git a/agents/ten_packages/extension/fashionai/README.md b/agents/ten_packages/extension/fashionai/README.md new file mode 100644 index 00000000..5af79bac --- /dev/null +++ b/agents/ten_packages/extension/fashionai/README.md @@ -0,0 +1,29 @@ +# fashionai + + + +## Features + + + +- xxx feature + +## API + +Refer to `api` definition in [manifest.json] and default values in [property.json](property.json). + + + +## Development + +### Build + + + +### Unit test + + + +## Misc + + diff --git a/agents/ten_packages/extension/fashionai/__init__.py b/agents/ten_packages/extension/fashionai/__init__.py new file mode 100644 index 00000000..f06f1641 --- /dev/null +++ b/agents/ten_packages/extension/fashionai/__init__.py @@ -0,0 +1,11 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2024-08. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +from .src import addon +from .src.log import logger + +logger.info("fashionai extension loaded") diff --git a/agents/ten_packages/extension/fashionai/manifest.json b/agents/ten_packages/extension/fashionai/manifest.json new file mode 100644 index 00000000..8c677ed4 --- /dev/null +++ b/agents/ten_packages/extension/fashionai/manifest.json @@ -0,0 +1,60 @@ +{ + "type": "extension", + "name": "fashionai", + "version": "0.1.0", + "dependencies": [ + { + "type": "system", + "name": "ten_runtime_python", + "version": "0.1.0" + } + ], + "package": { + "include": [ + "manifest.json", + "property.json", + "BUILD.gn", + "**.tent", + "**.py", + "src/**.tent", + "src/**.py", + "README.md" + ] + }, + "api": { + "property": { + "app_id": { + "type": "string" + }, + "token": { + "type": "string" + }, + "channel": { + "type": "string" + }, + "stream_id": { + "type": "uint32" + } + }, + "data_in": [ + { + "name": "text_data", + "property": { + "text": { + "type": "string" + } + } + } + ], + "cmd_in": [ + { + "name": "flush" + } + ], + "cmd_out": [ + { + "name": "flush" + } + ] + } +} \ No newline at end of file diff --git a/agents/ten_packages/extension/fashionai/property.json b/agents/ten_packages/extension/fashionai/property.json new file mode 100644 index 00000000..9e26dfee --- /dev/null +++ b/agents/ten_packages/extension/fashionai/property.json @@ -0,0 +1 @@ +{} \ No newline at end of file diff --git a/agents/ten_packages/extension/fashionai/requirements.txt b/agents/ten_packages/extension/fashionai/requirements.txt new file mode 100644 index 00000000..7a389117 --- /dev/null +++ b/agents/ten_packages/extension/fashionai/requirements.txt @@ -0,0 +1 @@ +websockets \ No newline at end of file diff --git a/agents/ten_packages/extension/fashionai/src/__init__.py b/agents/ten_packages/extension/fashionai/src/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/agents/ten_packages/extension/fashionai/src/addon.py b/agents/ten_packages/extension/fashionai/src/addon.py new file mode 100644 index 00000000..c0378773 --- /dev/null +++ b/agents/ten_packages/extension/fashionai/src/addon.py @@ -0,0 +1,22 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2024-08. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +from ten import ( + Addon, + register_addon_as_extension, + TenEnv, +) +from .extension import FashionAIExtension +from .log import logger + + +@register_addon_as_extension("fashionai") +class FashionAIExtensionAddon(Addon): + + def on_create_instance(self, ten_env: TenEnv, name: str, context) -> None: + logger.info("FashionAIExtensionAddon on_create_instance") + ten_env.on_create_instance_done(FashionAIExtension(name), context) diff --git a/agents/ten_packages/extension/fashionai/src/extension.py b/agents/ten_packages/extension/fashionai/src/extension.py new file mode 100644 index 00000000..1fed3c3e --- /dev/null +++ b/agents/ten_packages/extension/fashionai/src/extension.py @@ -0,0 +1,240 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2024-08. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +from ten import ( + AudioFrame, + VideoFrame, + Extension, + TenEnv, + Cmd, + StatusCode, + CmdResult, + Data, +) +from .log import logger +import asyncio +import json +import ssl +import queue + +import websockets +import threading +from datetime import datetime +import time + + +APP_ID = "25358c4d6ed54fbaaecfd9ffe8d45058" +CHANNEL = "astra_agents_test" +STREAM_ID = "0" +TOKEN = "" + +class FashionAIExtension(Extension): + + + def on_init(self, ten_env: TenEnv) -> None: + logger.info("FASHION_AI on_init *********************************************************") + self.stopped = False + self.queue = queue.Queue() + self.client = WebSocketClient("wss://ingress.service.fasionai.com/websocket/node7/server1") + + ten_env.on_init_done() + + def on_start(self, ten_env: TenEnv) -> None: + logger.info("FASHION_AI on_start *********************************************************") + + # TODO: read properties, initialize resources + APP_ID = ten_env.get_property_string("app_id") + TOKEN = ten_env.get_property_string("token") + CHANNEL = ten_env.get_property_string("channel") + STREAM_ID = str(ten_env.get_property_int("stream_id")) + if len(TOKEN) > 0 : + APP_ID = TOKEN + logger.info(f"FASHION_AI on_start: app_id = {APP_ID}, token = {TOKEN}, channel = {CHANNEL}, stream_id = {STREAM_ID}") + + new_loop = asyncio.new_event_loop() + + self.threadWebsocket = threading.Thread(target=self.thread_target, args=(new_loop,ten_env,APP_ID ,CHANNEL,STREAM_ID)) + self.threadWebsocket.start() + + threading.Thread(target=self.async_polly_handler, args=[ten_env]).start() + + ten_env.on_start_done() + + def on_stop(self, ten_env: TenEnv) -> None: + logger.info("FASHION_AI on_stop") + self.stopped = True + self.queue.put(None) + self.flush() + self.thread.join() + + # TODO: clean up resources + + ten_env.on_stop_done() + + def on_deinit(self, ten_env: TenEnv) -> None: + logger.info("FASHION_AI on_deinit") + ten_env.on_deinit_done() + + def on_cmd(self, ten_env: TenEnv, cmd: Cmd) -> None: + cmd_name = cmd.get_name() + logger.info("FASHION_AI on_cmd name {}".format(cmd_name)) + + # TODO: process cmd + if cmd_name == "flush": + self.outdate_ts = datetime.now() + self.flush() + cmd_out = Cmd.create("flush") + ten_env.send_cmd(cmd_out, lambda ten, result: logger.info("send_cmd flush done")) + else: + logger.info("unknown cmd {}".format(cmd_name)) + + cmd_result = CmdResult.create(StatusCode.OK) + ten_env.return_result(cmd_result, cmd) + + def on_data(self, ten_env: TenEnv, data: Data) -> None: + # TODO: process data + inputText = data.get_property_string("text") + if len(inputText) == 0: + logger.info("FASHION_AI ignore empty text") + return + + logger.info("FASHION_AI on data %s", inputText) + self.queue.put((inputText, datetime.now())) + # asyncio.get_event_loop().run_until_complete(self.client.send_inputText(inputText)) + logger.info("FASHION_AI send_inputText %s", inputText) + + # self.queue.put((inputText, datetime.now())) + + pass + + def on_audio_frame(self, ten_env: TenEnv, audio_frame: AudioFrame) -> None: + # TODO: process pcm frame + pass + + def on_video_frame(self, ten_env: TenEnv, video_frame: VideoFrame) -> None: + # TODO: process image frame + pass + + async def init_fashionai(self, ten_env: TenEnv, app_id, channel, stream_id): + await self.client.connect() + await self.client.stream_start(app_id, channel, stream_id) + # asyncio.create_task(self.client.heartbeat(10)) + await self.client.render_start() + while True: + await self.async_polly_handler() + + def thread_target(self, loop, ten_env: TenEnv, app_id, channel, stream_id): + asyncio.set_event_loop(loop) + loop.run_until_complete(self.init_fashionai(ten_env, app_id, channel, stream_id)) + + + async def async_polly_handler(self): + value = self.queue.get() + logger.info(f"async_polly_handler: loop fashion ai polly.{value}") + + inputText, ts = value + if len(inputText) > 0: + try: + await self.client.send_inputText(inputText) + except Exception as e: + logger.exception(e) + + def flush(self): + logger.info("FASHION_AI flush") + while not self.queue.empty(): + self.queue.get() + + +class WebSocketClient: + def __init__(self, uri): + self.uri = uri + self.websocket = None + + async def connect(self): + ssl_context = ssl._create_unverified_context() + self.websocket = await websockets.connect(self.uri, ssl=ssl_context) + + async def stream_start(self, app_id, channel, stream_id): + await self.send_message( + { + "request_id": "1", + "service_id": "agora", + "token": app_id, + "channel_id": channel, + "user_id": stream_id, + "signal": "STREAM_START", + } + ) + + async def render_start(self): + await self.send_message( + { + "request_id": "3", + "service_id": "agora", + "signal": "RENDER_START", + } + ) + + async def send_inputText(self, inputText): + await self.send_message( + { + "request_id": "4", + "service_id": "agora", + "signal": "RENDER_CONTENT", + "text": inputText, + } + ) + + + async def send_message(self, message): + if self.websocket is not None: + try: + await self.websocket.send(json.dumps(message)) + logger.info(f"FASHION_AI Sent: {message}") + response = await asyncio.wait_for(self.websocket.recv(), timeout=2) + logger.info(f"FASHION_AI Received: {response}") + except websockets.exceptions.ConnectionClosedError as e: + logger.info(f"FASHION_AI Connection closed with error: {e}") + await self.reconnect() + except asyncio.TimeoutError: + logger.info("FASHION_AI Timeout waiting for response") + else: + logger.info("FASHION_AI WebSocket is not connected.") + + async def receive_message(self): + if self.websocket is not None: + try: + response = await self.websocket.recv() + logger.info(f"FASHION_AI Received: {response}") + return response + except websockets.exceptions.ConnectionClosedError as e: + logger.info(f"FASHION_AI Connection closed with error: {e}") + await self.reconnect() + return None + else: + logger.info("FASHION_AI WebSocket is not connected.") + return None + + async def close(self): + if self.websocket is not None: + await self.websocket.close() + logger.info("FASHION_AI WebSocket connection closed.") + else: + logger.info("FASHION_AI WebSocket is not connected.") + + async def reconnect(self): + logger.info("FASHION_AI Reconnecting...") + await self.close() + await self.connect() + + async def heartbeat(self, interval): + while True: + await asyncio.sleep(interval) + try: + await self.send_inputText("ping") + except websockets.exceptions.ConnectionClosedError: + break \ No newline at end of file diff --git a/agents/ten_packages/extension/fashionai/src/log.py b/agents/ten_packages/extension/fashionai/src/log.py new file mode 100644 index 00000000..84b4ca42 --- /dev/null +++ b/agents/ten_packages/extension/fashionai/src/log.py @@ -0,0 +1,22 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2024-08. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +import logging + +logger = logging.getLogger("fashionai") +logger.setLevel(logging.INFO) + +formatter_str = ( + "%(asctime)s - %(name)s - %(levelname)s - %(process)d - " + "[%(filename)s:%(lineno)d] - %(message)s" +) +formatter = logging.Formatter(formatter_str) + +console_handler = logging.StreamHandler() +console_handler.setFormatter(formatter) + +logger.addHandler(console_handler) From 210cdb41a429e9abb1ec22eff68e14dc8265c35e Mon Sep 17 00:00:00 2001 From: caitengwen Date: Wed, 9 Oct 2024 20:50:21 +0800 Subject: [PATCH 02/15] Update extension.py remove appId --- agents/ten_packages/extension/fashionai/src/extension.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/agents/ten_packages/extension/fashionai/src/extension.py b/agents/ten_packages/extension/fashionai/src/extension.py index 1fed3c3e..7cb5de0a 100644 --- a/agents/ten_packages/extension/fashionai/src/extension.py +++ b/agents/ten_packages/extension/fashionai/src/extension.py @@ -27,8 +27,8 @@ import time -APP_ID = "25358c4d6ed54fbaaecfd9ffe8d45058" -CHANNEL = "astra_agents_test" +APP_ID = "" +CHANNEL = "" STREAM_ID = "0" TOKEN = "" @@ -237,4 +237,4 @@ async def heartbeat(self, interval): try: await self.send_inputText("ping") except websockets.exceptions.ConnectionClosedError: - break \ No newline at end of file + break From cd8e37d2bcaf0af577b187fe4d6ff6e13908f7e6 Mon Sep 17 00:00:00 2001 From: caitengwen Date: Fri, 11 Oct 2024 17:36:01 +0800 Subject: [PATCH 03/15] Update extension.py --- .../extension/fashionai/src/extension.py | 74 +++++++------------ 1 file changed, 28 insertions(+), 46 deletions(-) diff --git a/agents/ten_packages/extension/fashionai/src/extension.py b/agents/ten_packages/extension/fashionai/src/extension.py index 7cb5de0a..778aa202 100644 --- a/agents/ten_packages/extension/fashionai/src/extension.py +++ b/agents/ten_packages/extension/fashionai/src/extension.py @@ -24,22 +24,19 @@ import websockets import threading from datetime import datetime -import time - - -APP_ID = "" -CHANNEL = "" -STREAM_ID = "0" -TOKEN = "" class FashionAIExtension(Extension): def on_init(self, ten_env: TenEnv) -> None: logger.info("FASHION_AI on_init *********************************************************") + self.app_id = "" + self.token = "" + self.channel = "" + self.stream_id = "" self.stopped = False - self.queue = queue.Queue() - self.client = WebSocketClient("wss://ingress.service.fasionai.com/websocket/node7/server1") + self.queue = asyncio.Queue() + self.client = FashionAIClient("wss://ingress.service.fasionai.com/websocket/node7/server1") ten_env.on_init_done() @@ -47,29 +44,31 @@ def on_start(self, ten_env: TenEnv) -> None: logger.info("FASHION_AI on_start *********************************************************") # TODO: read properties, initialize resources - APP_ID = ten_env.get_property_string("app_id") - TOKEN = ten_env.get_property_string("token") - CHANNEL = ten_env.get_property_string("channel") - STREAM_ID = str(ten_env.get_property_int("stream_id")) - if len(TOKEN) > 0 : - APP_ID = TOKEN - logger.info(f"FASHION_AI on_start: app_id = {APP_ID}, token = {TOKEN}, channel = {CHANNEL}, stream_id = {STREAM_ID}") + self.app_id = ten_env.get_property_string("app_id") + self.token = ten_env.get_property_string("token") + self.channel = ten_env.get_property_string("channel") + self.stream_id = str(ten_env.get_property_int("stream_id")) + if len(self.token) > 0 : + self.app_id = self.token + logger.info(f"FASHION_AI on_start: app_id = {self.app_id}, token = {self.token}, channel = {self.channel}, stream_id = {self.stream_id}") - new_loop = asyncio.new_event_loop() + loop = asyncio.new_event_loop() - self.threadWebsocket = threading.Thread(target=self.thread_target, args=(new_loop,ten_env,APP_ID ,CHANNEL,STREAM_ID)) - self.threadWebsocket.start() + def thread_target(): + asyncio.set_event_loop(loop) + loop.run_until_complete(self.init_fashionai(ten_env, self.app_id, self.channel, self.stream_id)) - threading.Thread(target=self.async_polly_handler, args=[ten_env]).start() + self.threadWebsocket = threading.Thread(target=thread_target) + self.threadWebsocket.start() ten_env.on_start_done() def on_stop(self, ten_env: TenEnv) -> None: logger.info("FASHION_AI on_stop") self.stopped = True - self.queue.put(None) + asyncio.run(self.queue.put(None)) self.flush() - self.thread.join() + self.threadWebsocket.join() # TODO: clean up resources @@ -86,7 +85,7 @@ def on_cmd(self, ten_env: TenEnv, cmd: Cmd) -> None: # TODO: process cmd if cmd_name == "flush": self.outdate_ts = datetime.now() - self.flush() + asyncio.run(self.flush()) cmd_out = Cmd.create("flush") ten_env.send_cmd(cmd_out, lambda ten, result: logger.info("send_cmd flush done")) else: @@ -103,7 +102,7 @@ def on_data(self, ten_env: TenEnv, data: Data) -> None: return logger.info("FASHION_AI on data %s", inputText) - self.queue.put((inputText, datetime.now())) + asyncio.run(self.queue.put((inputText, datetime.now()))) # asyncio.get_event_loop().run_until_complete(self.client.send_inputText(inputText)) logger.info("FASHION_AI send_inputText %s", inputText) @@ -127,13 +126,9 @@ async def init_fashionai(self, ten_env: TenEnv, app_id, channel, stream_id): while True: await self.async_polly_handler() - def thread_target(self, loop, ten_env: TenEnv, app_id, channel, stream_id): - asyncio.set_event_loop(loop) - loop.run_until_complete(self.init_fashionai(ten_env, app_id, channel, stream_id)) - async def async_polly_handler(self): - value = self.queue.get() + value = await self.queue.get() # 从队列中获取值 logger.info(f"async_polly_handler: loop fashion ai polly.{value}") inputText, ts = value @@ -143,13 +138,14 @@ async def async_polly_handler(self): except Exception as e: logger.exception(e) - def flush(self): + async def flush(self): logger.info("FASHION_AI flush") while not self.queue.empty(): - self.queue.get() + value = await self.queue.get() + logger.info(f"Flushing value: {value}") -class WebSocketClient: +class FashionAIClient: def __init__(self, uri): self.uri = uri self.websocket = None @@ -205,20 +201,6 @@ async def send_message(self, message): else: logger.info("FASHION_AI WebSocket is not connected.") - async def receive_message(self): - if self.websocket is not None: - try: - response = await self.websocket.recv() - logger.info(f"FASHION_AI Received: {response}") - return response - except websockets.exceptions.ConnectionClosedError as e: - logger.info(f"FASHION_AI Connection closed with error: {e}") - await self.reconnect() - return None - else: - logger.info("FASHION_AI WebSocket is not connected.") - return None - async def close(self): if self.websocket is not None: await self.websocket.close() From 428da407e6df6162fbbfc6e9224b603ab803a5a6 Mon Sep 17 00:00:00 2001 From: caitengwen Date: Sat, 12 Oct 2024 11:43:13 +0800 Subject: [PATCH 04/15] Update extension.py 1. change asyncio.run to asyncio.run_coroutine_threadsafe 2. maybe put wihle true in async_polly_handler --- .../extension/fashionai/src/extension.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/agents/ten_packages/extension/fashionai/src/extension.py b/agents/ten_packages/extension/fashionai/src/extension.py index 778aa202..5132466c 100644 --- a/agents/ten_packages/extension/fashionai/src/extension.py +++ b/agents/ten_packages/extension/fashionai/src/extension.py @@ -35,8 +35,10 @@ def on_init(self, ten_env: TenEnv) -> None: self.channel = "" self.stream_id = "" self.stopped = False - self.queue = asyncio.Queue() + self.queue = asyncio.Queue(maxsize=3000) self.client = FashionAIClient("wss://ingress.service.fasionai.com/websocket/node7/server1") + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) ten_env.on_init_done() @@ -52,11 +54,8 @@ def on_start(self, ten_env: TenEnv) -> None: self.app_id = self.token logger.info(f"FASHION_AI on_start: app_id = {self.app_id}, token = {self.token}, channel = {self.channel}, stream_id = {self.stream_id}") - loop = asyncio.new_event_loop() - def thread_target(): - asyncio.set_event_loop(loop) - loop.run_until_complete(self.init_fashionai(ten_env, self.app_id, self.channel, self.stream_id)) + self.loop.run_until_complete(self.init_fashionai(ten_env, self.app_id, self.channel, self.stream_id)) self.threadWebsocket = threading.Thread(target=thread_target) self.threadWebsocket.start() @@ -85,7 +84,9 @@ def on_cmd(self, ten_env: TenEnv, cmd: Cmd) -> None: # TODO: process cmd if cmd_name == "flush": self.outdate_ts = datetime.now() - asyncio.run(self.flush()) + asyncio.run_coroutine_threadsafe( + self.flush(), self.loop + ).result(timeout=0.1) cmd_out = Cmd.create("flush") ten_env.send_cmd(cmd_out, lambda ten, result: logger.info("send_cmd flush done")) else: @@ -102,7 +103,10 @@ def on_data(self, ten_env: TenEnv, data: Data) -> None: return logger.info("FASHION_AI on data %s", inputText) - asyncio.run(self.queue.put((inputText, datetime.now()))) + asyncio.run_coroutine_threadsafe( + self.queue.put((inputText, datetime.now())), self.loop + ).result(timeout=0.1) + # asyncio.get_event_loop().run_until_complete(self.client.send_inputText(inputText)) logger.info("FASHION_AI send_inputText %s", inputText) From ed9fdd2c966915df2ffe62db81c8060a50f7fb5c Mon Sep 17 00:00:00 2001 From: caitengwen Date: Sat, 12 Oct 2024 11:47:16 +0800 Subject: [PATCH 05/15] Update extension.py --- .../extension/fashionai/src/extension.py | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/agents/ten_packages/extension/fashionai/src/extension.py b/agents/ten_packages/extension/fashionai/src/extension.py index 5132466c..a8dc2ec7 100644 --- a/agents/ten_packages/extension/fashionai/src/extension.py +++ b/agents/ten_packages/extension/fashionai/src/extension.py @@ -127,20 +127,20 @@ async def init_fashionai(self, ten_env: TenEnv, app_id, channel, stream_id): await self.client.stream_start(app_id, channel, stream_id) # asyncio.create_task(self.client.heartbeat(10)) await self.client.render_start() - while True: - await self.async_polly_handler() + await self.async_polly_handler() async def async_polly_handler(self): - value = await self.queue.get() # 从队列中获取值 - logger.info(f"async_polly_handler: loop fashion ai polly.{value}") - - inputText, ts = value - if len(inputText) > 0: - try: - await self.client.send_inputText(inputText) - except Exception as e: - logger.exception(e) + while True: + value = await self.queue.get() # 从队列中获取值 + logger.info(f"async_polly_handler: loop fashion ai polly.{value}") + + inputText, ts = value + if len(inputText) > 0: + try: + await self.client.send_inputText(inputText) + except Exception as e: + logger.exception(e) async def flush(self): logger.info("FASHION_AI flush") From 13acc227fbbeeada89ed83348392000fca94f408 Mon Sep 17 00:00:00 2001 From: caitengwen Date: Sat, 12 Oct 2024 12:31:52 +0800 Subject: [PATCH 06/15] Update extension.py change method name to create_task --- agents/ten_packages/extension/fashionai/src/extension.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/agents/ten_packages/extension/fashionai/src/extension.py b/agents/ten_packages/extension/fashionai/src/extension.py index a8dc2ec7..f5cf1b7a 100644 --- a/agents/ten_packages/extension/fashionai/src/extension.py +++ b/agents/ten_packages/extension/fashionai/src/extension.py @@ -127,10 +127,10 @@ async def init_fashionai(self, ten_env: TenEnv, app_id, channel, stream_id): await self.client.stream_start(app_id, channel, stream_id) # asyncio.create_task(self.client.heartbeat(10)) await self.client.render_start() - await self.async_polly_handler() + await self.create_task() - async def async_polly_handler(self): + async def create_task(self): while True: value = await self.queue.get() # 从队列中获取值 logger.info(f"async_polly_handler: loop fashion ai polly.{value}") From d956cc1565143ff46deb8230c94d8139fbd446b0 Mon Sep 17 00:00:00 2001 From: caitengwen Date: Sat, 12 Oct 2024 16:24:29 +0800 Subject: [PATCH 07/15] Update property.json add fashionai graph --- agents/property.json | 219 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 218 insertions(+), 1 deletion(-) diff --git a/agents/property.json b/agents/property.json index 68888fb1..f9bc2823 100644 --- a/agents/property.json +++ b/agents/property.json @@ -2,6 +2,223 @@ "_ten": { "log_level": 3, "predefined_graphs": [ + { + "auto_start": true, + "connections": [ + { + "app": "localhost", + "data": [ + { + "dest": [ + { + "app": "localhost", + "extension": "interrupt_detector", + "extension_group": "default" + }, + { + "app": "localhost", + "extension": "openai_chatgpt", + "extension_group": "chatgpt" + }, + { + "app": "localhost", + "extension": "message_collector", + "extension_group": "transcriber" + } + ], + "name": "text_data" + } + ], + "extension": "agora_rtc", + "extension_group": "default" + }, + { + "app": "localhost", + "cmd": [ + { + "dest": [ + { + "app": "localhost", + "extension": "azure_tts", + "extension_group": "tts" + }, + { + "app": "localhost", + "extension": "fashionai", + "extension_group": "default" + } + ], + "name": "flush" + } + ], + "data": [ + { + "dest": [ + { + "app": "localhost", + "extension": "message_collector", + "extension_group": "transcriber" + }, + { + "app": "localhost", + "extension": "fashionai", + "extension_group": "default" + } + ], + "name": "text_data" + } + ], + "extension": "openai_chatgpt", + "extension_group": "chatgpt" + }, + { + "app": "localhost", + "audio_frame": [ + { + "dest": [ + { + "app": "localhost", + "extension": "agora_rtc", + "extension_group": "default" + } + ], + "name": "pcm_frame" + } + ], + "cmd": [ + { + "dest": [ + { + "app": "localhost", + "extension": "agora_rtc", + "extension_group": "default" + } + ], + "name": "flush" + } + ], + "extension": "azure_tts", + "extension_group": "tts" + }, + { + "app": "localhost", + "data": [ + { + "dest": [ + { + "app": "localhost", + "extension": "agora_rtc", + "extension_group": "default" + } + ], + "name": "data" + } + ], + "extension": "message_collector", + "extension_group": "transcriber" + }, + { + "app": "localhost", + "cmd": [ + { + "dest": [ + { + "app": "localhost", + "extension": "openai_chatgpt", + "extension_group": "chatgpt" + } + ], + "name": "flush" + } + ], + "extension": "interrupt_detector", + "extension_group": "default" + } + ], + "name": "va.openai.azure.fashionai", + "nodes": [ + { + "addon": "agora_rtc", + "app": "localhost", + "extension_group": "default", + "name": "agora_rtc", + "property": { + "agora_asr_language": "en-US", + "agora_asr_session_control_file_path": "session_control.conf", + "agora_asr_vendor_key": "$AZURE_STT_KEY", + "agora_asr_vendor_name": "microsoft", + "agora_asr_vendor_region": "$AZURE_STT_REGION", + "app_id": "$AGORA_APP_ID", + "channel": "astra_agents_test", + "enable_agora_asr": true, + "publish_audio": true, + "publish_data": true, + "remote_stream_id": 123, + "stream_id": 1234, + "subscribe_audio": true, + "token": "" + }, + "type": "extension" + }, + { + "addon": "interrupt_detector", + "app": "localhost", + "extension_group": "default", + "name": "interrupt_detector", + "type": "extension" + }, + { + "addon": "openai_chatgpt", + "app": "localhost", + "extension_group": "chatgpt", + "name": "openai_chatgpt", + "property": { + "api_key": "$OPENAI_API_KEY", + "base_url": "", + "frequency_penalty": 0.9, + "greeting": "TEN Agent connected. How can I help you today?", + "max_memory_length": 10, + "max_tokens": 512, + "model": "gpt-4o-mini", + "prompt": "", + "proxy_url": "$OPENAI_PROXY_URL" + }, + "type": "extension" + }, + { + "addon": "azure_tts", + "app": "localhost", + "extension_group": "tts", + "name": "azure_tts", + "property": { + "azure_subscription_key": "$AZURE_TTS_KEY", + "azure_subscription_region": "$AZURE_TTS_REGION", + "azure_synthesis_voice_name": "en-US-JaneNeural" + }, + "type": "extension" + }, + { + "addon": "message_collector", + "app": "localhost", + "extension_group": "transcriber", + "name": "message_collector", + "type": "extension" + }, + { + "addon": "fashionai", + "app": "localhost", + "extension_group": "default", + "name": "fashionai", + "property": { + "app_id": "$AGORA_APP_ID", + "channel": "astra_agents_test", + "stream_id": 12345, + "token": "" + }, + "type": "extension" + } + ] + }, { "name": "va.openai.azure", "auto_start": false, @@ -2308,4 +2525,4 @@ } ] } -} \ No newline at end of file +} From 5683faf488324c43b72505cebf8aa31ddd7eb4eb Mon Sep 17 00:00:00 2001 From: caitengwen Date: Sat, 12 Oct 2024 19:37:12 +0800 Subject: [PATCH 08/15] Update extension.py 1. separate FashionAIClient 2. add try catch to asyncio.run_coroutine_threadsafe when timeout 3. define attributes in class directly --- .../extension/fashionai/src/extension.py | 134 ++++-------------- 1 file changed, 31 insertions(+), 103 deletions(-) diff --git a/agents/ten_packages/extension/fashionai/src/extension.py b/agents/ten_packages/extension/fashionai/src/extension.py index f5cf1b7a..9af3cfe9 100644 --- a/agents/ten_packages/extension/fashionai/src/extension.py +++ b/agents/ten_packages/extension/fashionai/src/extension.py @@ -15,25 +15,21 @@ CmdResult, Data, ) + from .log import logger import asyncio -import json -import ssl -import queue - -import websockets +from .fashionai_client import FashionAIClient import threading from datetime import datetime class FashionAIExtension(Extension): - + app_id = "" + token = "" + channel = "" + stream_id = 0 def on_init(self, ten_env: TenEnv) -> None: logger.info("FASHION_AI on_init *********************************************************") - self.app_id = "" - self.token = "" - self.channel = "" - self.stream_id = "" self.stopped = False self.queue = asyncio.Queue(maxsize=3000) self.client = FashionAIClient("wss://ingress.service.fasionai.com/websocket/node7/server1") @@ -46,13 +42,16 @@ def on_start(self, ten_env: TenEnv) -> None: logger.info("FASHION_AI on_start *********************************************************") # TODO: read properties, initialize resources - self.app_id = ten_env.get_property_string("app_id") - self.token = ten_env.get_property_string("token") - self.channel = ten_env.get_property_string("channel") - self.stream_id = str(ten_env.get_property_int("stream_id")) - if len(self.token) > 0 : - self.app_id = self.token - logger.info(f"FASHION_AI on_start: app_id = {self.app_id}, token = {self.token}, channel = {self.channel}, stream_id = {self.stream_id}") + try: + self.app_id = ten_env.get_property_string("app_id") + self.token = ten_env.get_property_string("token") + self.channel = ten_env.get_property_string("channel") + self.stream_id = str(ten_env.get_property_int("stream_id")) + if len(self.token) > 0 : + self.app_id = self.token + logger.info(f"FASHION_AI on_start: app_id = {self.app_id}, token = {self.token}, channel = {self.channel}, stream_id = {self.stream_id}") + except Exception as e: + logger.warning(f"get_property err: {e}") def thread_target(): self.loop.run_until_complete(self.init_fashionai(ten_env, self.app_id, self.channel, self.stream_id)) @@ -84,9 +83,13 @@ def on_cmd(self, ten_env: TenEnv, cmd: Cmd) -> None: # TODO: process cmd if cmd_name == "flush": self.outdate_ts = datetime.now() - asyncio.run_coroutine_threadsafe( - self.flush(), self.loop - ).result(timeout=0.1) + try: + asyncio.run_coroutine_threadsafe( + self.flush(), self.loop + ).result(timeout=0.1) + except Exception as e: + ten_env.return_result(CmdResult.create(StatusCode.ERROR), cmd) + cmd_out = Cmd.create("flush") ten_env.send_cmd(cmd_out, lambda ten, result: logger.info("send_cmd flush done")) else: @@ -103,10 +106,12 @@ def on_data(self, ten_env: TenEnv, data: Data) -> None: return logger.info("FASHION_AI on data %s", inputText) - asyncio.run_coroutine_threadsafe( - self.queue.put((inputText, datetime.now())), self.loop - ).result(timeout=0.1) - + try: + asyncio.run_coroutine_threadsafe( + self.queue.put((inputText, datetime.now())), self.loop + ).result(timeout=0.1) + except Exception as e: + ten_env.return_result(CmdResult.create(StatusCode.ERROR)) # asyncio.get_event_loop().run_until_complete(self.client.send_inputText(inputText)) logger.info("FASHION_AI send_inputText %s", inputText) @@ -127,10 +132,10 @@ async def init_fashionai(self, ten_env: TenEnv, app_id, channel, stream_id): await self.client.stream_start(app_id, channel, stream_id) # asyncio.create_task(self.client.heartbeat(10)) await self.client.render_start() - await self.create_task() + await self.async_polly_handler() - async def create_task(self): + async def async_polly_handler(self): while True: value = await self.queue.get() # 从队列中获取值 logger.info(f"async_polly_handler: loop fashion ai polly.{value}") @@ -147,80 +152,3 @@ async def flush(self): while not self.queue.empty(): value = await self.queue.get() logger.info(f"Flushing value: {value}") - - -class FashionAIClient: - def __init__(self, uri): - self.uri = uri - self.websocket = None - - async def connect(self): - ssl_context = ssl._create_unverified_context() - self.websocket = await websockets.connect(self.uri, ssl=ssl_context) - - async def stream_start(self, app_id, channel, stream_id): - await self.send_message( - { - "request_id": "1", - "service_id": "agora", - "token": app_id, - "channel_id": channel, - "user_id": stream_id, - "signal": "STREAM_START", - } - ) - - async def render_start(self): - await self.send_message( - { - "request_id": "3", - "service_id": "agora", - "signal": "RENDER_START", - } - ) - - async def send_inputText(self, inputText): - await self.send_message( - { - "request_id": "4", - "service_id": "agora", - "signal": "RENDER_CONTENT", - "text": inputText, - } - ) - - - async def send_message(self, message): - if self.websocket is not None: - try: - await self.websocket.send(json.dumps(message)) - logger.info(f"FASHION_AI Sent: {message}") - response = await asyncio.wait_for(self.websocket.recv(), timeout=2) - logger.info(f"FASHION_AI Received: {response}") - except websockets.exceptions.ConnectionClosedError as e: - logger.info(f"FASHION_AI Connection closed with error: {e}") - await self.reconnect() - except asyncio.TimeoutError: - logger.info("FASHION_AI Timeout waiting for response") - else: - logger.info("FASHION_AI WebSocket is not connected.") - - async def close(self): - if self.websocket is not None: - await self.websocket.close() - logger.info("FASHION_AI WebSocket connection closed.") - else: - logger.info("FASHION_AI WebSocket is not connected.") - - async def reconnect(self): - logger.info("FASHION_AI Reconnecting...") - await self.close() - await self.connect() - - async def heartbeat(self, interval): - while True: - await asyncio.sleep(interval) - try: - await self.send_inputText("ping") - except websockets.exceptions.ConnectionClosedError: - break From 26d32e82475e05a8aaf3d64058fa9198917134b3 Mon Sep 17 00:00:00 2001 From: caitengwen Date: Sat, 12 Oct 2024 20:23:25 +0800 Subject: [PATCH 09/15] Update extension.py 1. change method name to create_task 2. put queue timeout add log --- .../ten_packages/extension/fashionai/src/extension.py | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/agents/ten_packages/extension/fashionai/src/extension.py b/agents/ten_packages/extension/fashionai/src/extension.py index 9af3cfe9..bd7e5c10 100644 --- a/agents/ten_packages/extension/fashionai/src/extension.py +++ b/agents/ten_packages/extension/fashionai/src/extension.py @@ -111,12 +111,9 @@ def on_data(self, ten_env: TenEnv, data: Data) -> None: self.queue.put((inputText, datetime.now())), self.loop ).result(timeout=0.1) except Exception as e: - ten_env.return_result(CmdResult.create(StatusCode.ERROR)) - # asyncio.get_event_loop().run_until_complete(self.client.send_inputText(inputText)) + logger.warning(f"FASHION_AI put inputText={inputText} queue err: {e}") logger.info("FASHION_AI send_inputText %s", inputText) - # self.queue.put((inputText, datetime.now())) - pass def on_audio_frame(self, ten_env: TenEnv, audio_frame: AudioFrame) -> None: @@ -130,12 +127,10 @@ def on_video_frame(self, ten_env: TenEnv, video_frame: VideoFrame) -> None: async def init_fashionai(self, ten_env: TenEnv, app_id, channel, stream_id): await self.client.connect() await self.client.stream_start(app_id, channel, stream_id) - # asyncio.create_task(self.client.heartbeat(10)) await self.client.render_start() - await self.async_polly_handler() - + await self.create_task() - async def async_polly_handler(self): + async def create_task(self): while True: value = await self.queue.get() # 从队列中获取值 logger.info(f"async_polly_handler: loop fashion ai polly.{value}") From a7fc7e02680ca1a89a2b234d888dd2c702f64d44 Mon Sep 17 00:00:00 2001 From: caitengwen Date: Sat, 12 Oct 2024 21:44:44 +0800 Subject: [PATCH 10/15] Update extension.py rollback method name --- agents/ten_packages/extension/fashionai/src/extension.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/agents/ten_packages/extension/fashionai/src/extension.py b/agents/ten_packages/extension/fashionai/src/extension.py index bd7e5c10..7c4bfcd1 100644 --- a/agents/ten_packages/extension/fashionai/src/extension.py +++ b/agents/ten_packages/extension/fashionai/src/extension.py @@ -128,9 +128,9 @@ async def init_fashionai(self, ten_env: TenEnv, app_id, channel, stream_id): await self.client.connect() await self.client.stream_start(app_id, channel, stream_id) await self.client.render_start() - await self.create_task() + await self.async_polly_handler() - async def create_task(self): + async def async_polly_handler(self): while True: value = await self.queue.get() # 从队列中获取值 logger.info(f"async_polly_handler: loop fashion ai polly.{value}") From 9221016e7426fa02c047abb11a499d03a4df3f4b Mon Sep 17 00:00:00 2001 From: caitengwen Date: Sun, 13 Oct 2024 16:33:16 +0800 Subject: [PATCH 11/15] 1. Create the event loop in the child thread 2. Add FashionAIClient to separated file 3. Fix event loop handling and ensure proper coroutine execution in on_stop method --- .../extension/fashionai/src/extension.py | 52 +++++++----- .../fashionai/src/fashionai_client.py | 84 +++++++++++++++++++ 2 files changed, 114 insertions(+), 22 deletions(-) create mode 100644 agents/ten_packages/extension/fashionai/src/fashionai_client.py diff --git a/agents/ten_packages/extension/fashionai/src/extension.py b/agents/ten_packages/extension/fashionai/src/extension.py index 7c4bfcd1..19fedf1b 100644 --- a/agents/ten_packages/extension/fashionai/src/extension.py +++ b/agents/ten_packages/extension/fashionai/src/extension.py @@ -33,9 +33,7 @@ def on_init(self, ten_env: TenEnv) -> None: self.stopped = False self.queue = asyncio.Queue(maxsize=3000) self.client = FashionAIClient("wss://ingress.service.fasionai.com/websocket/node7/server1") - self.loop = asyncio.new_event_loop() - asyncio.set_event_loop(self.loop) - + self.threadWebsocketLoop = None ten_env.on_init_done() def on_start(self, ten_env: TenEnv) -> None: @@ -47,14 +45,17 @@ def on_start(self, ten_env: TenEnv) -> None: self.token = ten_env.get_property_string("token") self.channel = ten_env.get_property_string("channel") self.stream_id = str(ten_env.get_property_int("stream_id")) - if len(self.token) > 0 : - self.app_id = self.token logger.info(f"FASHION_AI on_start: app_id = {self.app_id}, token = {self.token}, channel = {self.channel}, stream_id = {self.stream_id}") except Exception as e: logger.warning(f"get_property err: {e}") + if len(self.token) > 0 : + self.app_id = self.token + def thread_target(): - self.loop.run_until_complete(self.init_fashionai(ten_env, self.app_id, self.channel, self.stream_id)) + self.threadWebsocketLoop = asyncio.new_event_loop() + asyncio.set_event_loop(self.threadWebsocketLoop) + self.threadWebsocketLoop.run_until_complete(self.init_fashionai(self.app_id, self.channel, self.stream_id)) self.threadWebsocket = threading.Thread(target=thread_target) self.threadWebsocket.start() @@ -64,11 +65,10 @@ def thread_target(): def on_stop(self, ten_env: TenEnv) -> None: logger.info("FASHION_AI on_stop") self.stopped = True - asyncio.run(self.queue.put(None)) - self.flush() - self.threadWebsocket.join() + asyncio.run_coroutine_threadsafe(self.queue.put(None), self.threadWebsocketLoop) + asyncio.run_coroutine_threadsafe(self.flush(), self.threadWebsocketLoop) - # TODO: clean up resources + self.threadWebsocket.join() ten_env.on_stop_done() @@ -85,7 +85,7 @@ def on_cmd(self, ten_env: TenEnv, cmd: Cmd) -> None: self.outdate_ts = datetime.now() try: asyncio.run_coroutine_threadsafe( - self.flush(), self.loop + self.flush(), self.threadWebsocketLoop ).result(timeout=0.1) except Exception as e: ten_env.return_result(CmdResult.create(StatusCode.ERROR), cmd) @@ -106,10 +106,13 @@ def on_data(self, ten_env: TenEnv, data: Data) -> None: return logger.info("FASHION_AI on data %s", inputText) - try: - asyncio.run_coroutine_threadsafe( - self.queue.put((inputText, datetime.now())), self.loop - ).result(timeout=0.1) + try: + future = asyncio.run_coroutine_threadsafe( + self.queue.put(inputText), self.threadWebsocketLoop + ) + future.result(timeout=0.1) + except asyncio.TimeoutError: + logger.warning(f"FASHION_AI put inputText={inputText} queue timed out") except Exception as e: logger.warning(f"FASHION_AI put inputText={inputText} queue err: {e}") logger.info("FASHION_AI send_inputText %s", inputText) @@ -122,20 +125,23 @@ def on_audio_frame(self, ten_env: TenEnv, audio_frame: AudioFrame) -> None: def on_video_frame(self, ten_env: TenEnv, video_frame: VideoFrame) -> None: # TODO: process image frame - pass + pass - async def init_fashionai(self, ten_env: TenEnv, app_id, channel, stream_id): + async def init_fashionai(self, app_id, channel, stream_id): await self.client.connect() await self.client.stream_start(app_id, channel, stream_id) await self.client.render_start() await self.async_polly_handler() - + async def async_polly_handler(self): while True: - value = await self.queue.get() # 从队列中获取值 - logger.info(f"async_polly_handler: loop fashion ai polly.{value}") + inputText = await self.queue.get() + if inputText is None: + logger.info("Stopping async_polly_handler...") + break + + logger.info(f"async_polly_handler: loop fashion ai polly.{inputText}") - inputText, ts = value if len(inputText) > 0: try: await self.client.send_inputText(inputText) @@ -146,4 +152,6 @@ async def flush(self): logger.info("FASHION_AI flush") while not self.queue.empty(): value = await self.queue.get() - logger.info(f"Flushing value: {value}") + if value is None: + break + logger.info(f"Flushing value: {value}") \ No newline at end of file diff --git a/agents/ten_packages/extension/fashionai/src/fashionai_client.py b/agents/ten_packages/extension/fashionai/src/fashionai_client.py new file mode 100644 index 00000000..c8b55e67 --- /dev/null +++ b/agents/ten_packages/extension/fashionai/src/fashionai_client.py @@ -0,0 +1,84 @@ +import json +import ssl + +import websockets +import asyncio + +from .log import logger + + +class FashionAIClient: + def __init__(self, uri): + self.uri = uri + self.websocket = None + + async def connect(self): + ssl_context = ssl._create_unverified_context() + self.websocket = await websockets.connect(self.uri, ssl=ssl_context) + + async def stream_start(self, app_id, channel, stream_id): + await self.send_message( + { + "request_id": "1", + "service_id": "agora", + "token": app_id, + "channel_id": channel, + "user_id": stream_id, + "signal": "STREAM_START", + } + ) + + async def render_start(self): + await self.send_message( + { + "request_id": "3", + "service_id": "agora", + "signal": "RENDER_START", + } + ) + + async def send_inputText(self, inputText): + await self.send_message( + { + "request_id": "4", + "service_id": "agora", + "signal": "RENDER_CONTENT", + "text": inputText, + } + ) + + + async def send_message(self, message): + if self.websocket is not None: + try: + await self.websocket.send(json.dumps(message)) + logger.info(f"FASHION_AI Sent: {message}") + response = await asyncio.wait_for(self.websocket.recv(), timeout=2) + logger.info(f"FASHION_AI Received: {response}") + except websockets.exceptions.ConnectionClosedError as e: + logger.info(f"FASHION_AI Connection closed with error: {e}") + await self.reconnect() + except asyncio.TimeoutError: + logger.info("FASHION_AI Timeout waiting for response") + else: + logger.info("FASHION_AI WebSocket is not connected.") + + async def close(self): + if self.websocket is not None: + await self.websocket.close() + logger.info("FASHION_AI WebSocket connection closed.") + else: + logger.info("FASHION_AI WebSocket is not connected.") + + async def reconnect(self): + logger.info("FASHION_AI Reconnecting...") + await self.close() + await self.connect() + + async def heartbeat(self, interval): + while True: + await asyncio.sleep(interval) + try: + await self.send_inputText("ping") + except websockets.exceptions.ConnectionClosedError: + break \ No newline at end of file From d2651acb3006ec891286ebd7e2ae6aa4f01df694 Mon Sep 17 00:00:00 2001 From: caitengwen Date: Sun, 13 Oct 2024 18:49:51 +0800 Subject: [PATCH 12/15] add va.openai.azure.fashionai graph --- agents/property.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agents/property.json b/agents/property.json index 97dcffae..56052708 100644 --- a/agents/property.json +++ b/agents/property.json @@ -3,6 +3,7 @@ "log_level": 3, "predefined_graphs": [ { + "name": "va.openai.azure.fashionai", "auto_start": true, "connections": [ { @@ -135,7 +136,6 @@ "extension_group": "default" } ], - "name": "va.openai.azure.fashionai", "nodes": [ { "addon": "agora_rtc", From c1ce0a8a11b62aae42b228bb8d3789574f1fbba0 Mon Sep 17 00:00:00 2001 From: caitengwen Date: Mon, 14 Oct 2024 17:51:52 +0800 Subject: [PATCH 13/15] add fashion ai --- agents/property.json | 423 +++++++++--------- .../extension/fashionai/manifest.json | 3 + .../extension/fashionai/src/extension.py | 20 +- .../fashionai/src/fashionai_client.py | 16 +- 4 files changed, 236 insertions(+), 226 deletions(-) diff --git a/agents/property.json b/agents/property.json index 56052708..a4a62092 100644 --- a/agents/property.json +++ b/agents/property.json @@ -3,221 +3,222 @@ "log_level": 3, "predefined_graphs": [ { - "name": "va.openai.azure.fashionai", - "auto_start": true, - "connections": [ - { - "app": "localhost", - "data": [ - { - "dest": [ - { - "app": "localhost", - "extension": "interrupt_detector", - "extension_group": "default" + "auto_start": true, + "connections": [ + { + "app": "localhost", + "data": [ + { + "dest": [ + { + "app": "localhost", + "extension": "interrupt_detector", + "extension_group": "default" + }, + { + "app": "localhost", + "extension": "openai_chatgpt", + "extension_group": "chatgpt" + }, + { + "app": "localhost", + "extension": "message_collector", + "extension_group": "transcriber" + } + ], + "name": "text_data" + } + ], + "extension": "agora_rtc", + "extension_group": "default" + }, + { + "app": "localhost", + "cmd": [ + { + "dest": [ + { + "app": "localhost", + "extension": "azure_tts", + "extension_group": "tts" + }, + { + "app": "localhost", + "extension": "fashionai", + "extension_group": "default" + } + ], + "name": "flush" + } + ], + "data": [ + { + "dest": [ + { + "app": "localhost", + "extension": "message_collector", + "extension_group": "transcriber" + }, + { + "app": "localhost", + "extension": "fashionai", + "extension_group": "default" + } + ], + "name": "text_data" + } + ], + "extension": "openai_chatgpt", + "extension_group": "chatgpt" + }, + { + "app": "localhost", + "audio_frame": [ + { + "dest": [ + { + "app": "localhost", + "extension": "agora_rtc", + "extension_group": "default" + } + ], + "name": "pcm_frame" + } + ], + "cmd": [ + { + "dest": [ + { + "app": "localhost", + "extension": "agora_rtc", + "extension_group": "default" + } + ], + "name": "flush" + } + ], + "extension": "azure_tts", + "extension_group": "tts" + }, + { + "app": "localhost", + "data": [ + { + "dest": [ + { + "app": "localhost", + "extension": "agora_rtc", + "extension_group": "default" + } + ], + "name": "data" + } + ], + "extension": "message_collector", + "extension_group": "transcriber" + }, + { + "app": "localhost", + "cmd": [ + { + "dest": [ + { + "app": "localhost", + "extension": "openai_chatgpt", + "extension_group": "chatgpt" + } + ], + "name": "flush" + } + ], + "extension": "interrupt_detector", + "extension_group": "default" + } + ], + "name": "va.openai.azure.fashionai", + "nodes": [ + { + "addon": "agora_rtc", + "app": "localhost", + "extension_group": "default", + "name": "agora_rtc", + "property": { + "agora_asr_language": "en-US", + "agora_asr_session_control_file_path": "session_control.conf", + "agora_asr_vendor_key": "$AZURE_STT_KEY", + "agora_asr_vendor_name": "microsoft", + "agora_asr_vendor_region": "$AZURE_STT_REGION", + "app_id": "$AGORA_APP_ID", + "channel": "astra_agents_test", + "enable_agora_asr": true, + "publish_audio": true, + "publish_data": true, + "remote_stream_id": 123, + "stream_id": 1234, + "subscribe_audio": true, + "token": "" }, - { - "app": "localhost", - "extension": "openai_chatgpt", - "extension_group": "chatgpt" + "type": "extension" + }, + { + "addon": "interrupt_detector", + "app": "localhost", + "extension_group": "default", + "name": "interrupt_detector", + "type": "extension" + }, + { + "addon": "openai_chatgpt", + "app": "localhost", + "extension_group": "chatgpt", + "name": "openai_chatgpt", + "property": { + "api_key": "$OPENAI_API_KEY", + "base_url": "", + "frequency_penalty": 0.9, + "greeting": "TEN Agent connected. How can I help you today?", + "max_memory_length": 10, + "max_tokens": 512, + "model": "gpt-4o-mini", + "prompt": "", + "proxy_url": "$OPENAI_PROXY_URL" }, - { - "app": "localhost", - "extension": "message_collector", - "extension_group": "transcriber" - } - ], - "name": "text_data" - } - ], - "extension": "agora_rtc", - "extension_group": "default" - }, - { - "app": "localhost", - "cmd": [ - { - "dest": [ - { - "app": "localhost", - "extension": "azure_tts", - "extension_group": "tts" + "type": "extension" + }, + { + "addon": "azure_tts", + "app": "localhost", + "extension_group": "tts", + "name": "azure_tts", + "property": { + "azure_subscription_key": "$AZURE_TTS_KEY", + "azure_subscription_region": "$AZURE_TTS_REGION", + "azure_synthesis_voice_name": "en-US-JaneNeural" }, - { - "app": "localhost", - "extension": "fashionai", - "extension_group": "default" - } - ], - "name": "flush" - } - ], - "data": [ - { - "dest": [ - { - "app": "localhost", - "extension": "message_collector", - "extension_group": "transcriber" + "type": "extension" + }, + { + "addon": "message_collector", + "app": "localhost", + "extension_group": "transcriber", + "name": "message_collector", + "type": "extension" + }, + { + "addon": "fashionai", + "app": "localhost", + "extension_group": "default", + "name": "fashionai", + "property": { + "app_id": "$AGORA_APP_ID", + "channel": "astra_agents_test", + "stream_id": 12345, + "token": "", + "service_id": "agora" }, - { - "app": "localhost", - "extension": "fashionai", - "extension_group": "default" - } - ], - "name": "text_data" + "type": "extension" } - ], - "extension": "openai_chatgpt", - "extension_group": "chatgpt" - }, - { - "app": "localhost", - "audio_frame": [ - { - "dest": [ - { - "app": "localhost", - "extension": "agora_rtc", - "extension_group": "default" - } - ], - "name": "pcm_frame" - } - ], - "cmd": [ - { - "dest": [ - { - "app": "localhost", - "extension": "agora_rtc", - "extension_group": "default" - } - ], - "name": "flush" - } - ], - "extension": "azure_tts", - "extension_group": "tts" - }, - { - "app": "localhost", - "data": [ - { - "dest": [ - { - "app": "localhost", - "extension": "agora_rtc", - "extension_group": "default" - } - ], - "name": "data" - } - ], - "extension": "message_collector", - "extension_group": "transcriber" - }, - { - "app": "localhost", - "cmd": [ - { - "dest": [ - { - "app": "localhost", - "extension": "openai_chatgpt", - "extension_group": "chatgpt" - } - ], - "name": "flush" - } - ], - "extension": "interrupt_detector", - "extension_group": "default" - } - ], - "nodes": [ - { - "addon": "agora_rtc", - "app": "localhost", - "extension_group": "default", - "name": "agora_rtc", - "property": { - "agora_asr_language": "en-US", - "agora_asr_session_control_file_path": "session_control.conf", - "agora_asr_vendor_key": "$AZURE_STT_KEY", - "agora_asr_vendor_name": "microsoft", - "agora_asr_vendor_region": "$AZURE_STT_REGION", - "app_id": "$AGORA_APP_ID", - "channel": "astra_agents_test", - "enable_agora_asr": true, - "publish_audio": true, - "publish_data": true, - "remote_stream_id": 123, - "stream_id": 1234, - "subscribe_audio": true, - "token": "" - }, - "type": "extension" - }, - { - "addon": "interrupt_detector", - "app": "localhost", - "extension_group": "default", - "name": "interrupt_detector", - "type": "extension" - }, - { - "addon": "openai_chatgpt", - "app": "localhost", - "extension_group": "chatgpt", - "name": "openai_chatgpt", - "property": { - "api_key": "$OPENAI_API_KEY", - "base_url": "", - "frequency_penalty": 0.9, - "greeting": "TEN Agent connected. How can I help you today?", - "max_memory_length": 10, - "max_tokens": 512, - "model": "gpt-4o-mini", - "prompt": "", - "proxy_url": "$OPENAI_PROXY_URL" - }, - "type": "extension" - }, - { - "addon": "azure_tts", - "app": "localhost", - "extension_group": "tts", - "name": "azure_tts", - "property": { - "azure_subscription_key": "$AZURE_TTS_KEY", - "azure_subscription_region": "$AZURE_TTS_REGION", - "azure_synthesis_voice_name": "en-US-JaneNeural" - }, - "type": "extension" - }, - { - "addon": "message_collector", - "app": "localhost", - "extension_group": "transcriber", - "name": "message_collector", - "type": "extension" - }, - { - "addon": "fashionai", - "app": "localhost", - "extension_group": "default", - "name": "fashionai", - "property": { - "app_id": "$AGORA_APP_ID", - "channel": "astra_agents_test", - "stream_id": 12345, - "token": "" - }, - "type": "extension" - } - ] + ] }, { "name": "va.openai.azure", @@ -2661,4 +2662,4 @@ } ] } -} +} \ No newline at end of file diff --git a/agents/ten_packages/extension/fashionai/manifest.json b/agents/ten_packages/extension/fashionai/manifest.json index 8c677ed4..21b67662 100644 --- a/agents/ten_packages/extension/fashionai/manifest.json +++ b/agents/ten_packages/extension/fashionai/manifest.json @@ -34,6 +34,9 @@ }, "stream_id": { "type": "uint32" + }, + "service_id": { + "type": "string" } }, "data_in": [ diff --git a/agents/ten_packages/extension/fashionai/src/extension.py b/agents/ten_packages/extension/fashionai/src/extension.py index 19fedf1b..e629f798 100644 --- a/agents/ten_packages/extension/fashionai/src/extension.py +++ b/agents/ten_packages/extension/fashionai/src/extension.py @@ -27,13 +27,14 @@ class FashionAIExtension(Extension): token = "" channel = "" stream_id = 0 + service_id = "agora" def on_init(self, ten_env: TenEnv) -> None: logger.info("FASHION_AI on_init *********************************************************") self.stopped = False self.queue = asyncio.Queue(maxsize=3000) - self.client = FashionAIClient("wss://ingress.service.fasionai.com/websocket/node7/server1") self.threadWebsocketLoop = None + ten_env.on_init_done() def on_start(self, ten_env: TenEnv) -> None: @@ -45,16 +46,19 @@ def on_start(self, ten_env: TenEnv) -> None: self.token = ten_env.get_property_string("token") self.channel = ten_env.get_property_string("channel") self.stream_id = str(ten_env.get_property_int("stream_id")) - logger.info(f"FASHION_AI on_start: app_id = {self.app_id}, token = {self.token}, channel = {self.channel}, stream_id = {self.stream_id}") + self.service_id = ten_env.get_property_string("service_id") + + logger.info(f"FASHION_AI on_start: app_id = {self.app_id}, token = {self.token}, channel = {self.channel}, stream_id = {self.stream_id}, service_id = {self.service_id}") except Exception as e: logger.warning(f"get_property err: {e}") - if len(self.token) > 0 : + if len(self.token) > 0: self.app_id = self.token + self.client = FashionAIClient("wss://ingress.service.fasionai.com/websocket/node7/server1", self.service_id) def thread_target(): - self.threadWebsocketLoop = asyncio.new_event_loop() - asyncio.set_event_loop(self.threadWebsocketLoop) + self.threadWebsocketLoop = asyncio.new_event_loop() + asyncio.set_event_loop(self.threadWebsocketLoop) self.threadWebsocketLoop.run_until_complete(self.init_fashionai(self.app_id, self.channel, self.stream_id)) self.threadWebsocket = threading.Thread(target=thread_target) @@ -125,17 +129,17 @@ def on_audio_frame(self, ten_env: TenEnv, audio_frame: AudioFrame) -> None: def on_video_frame(self, ten_env: TenEnv, video_frame: VideoFrame) -> None: # TODO: process image frame - pass + pass async def init_fashionai(self, app_id, channel, stream_id): await self.client.connect() await self.client.stream_start(app_id, channel, stream_id) await self.client.render_start() await self.async_polly_handler() - + async def async_polly_handler(self): while True: - inputText = await self.queue.get() + inputText = await self.queue.get() if inputText is None: logger.info("Stopping async_polly_handler...") break diff --git a/agents/ten_packages/extension/fashionai/src/fashionai_client.py b/agents/ten_packages/extension/fashionai/src/fashionai_client.py index c8b55e67..c6591600 100644 --- a/agents/ten_packages/extension/fashionai/src/fashionai_client.py +++ b/agents/ten_packages/extension/fashionai/src/fashionai_client.py @@ -1,5 +1,6 @@ import json import ssl +import uuid import websockets import asyncio @@ -8,9 +9,10 @@ class FashionAIClient: - def __init__(self, uri): + def __init__(self, uri, service_id): self.uri = uri self.websocket = None + self.service_id = service_id async def connect(self): ssl_context = ssl._create_unverified_context() @@ -19,8 +21,8 @@ async def connect(self): async def stream_start(self, app_id, channel, stream_id): await self.send_message( { - "request_id": "1", - "service_id": "agora", + "request_id": str(uuid.uuid4()), + "service_id": self.service_id, "token": app_id, "channel_id": channel, "user_id": stream_id, @@ -31,8 +33,8 @@ async def stream_start(self, app_id, channel, stream_id): async def render_start(self): await self.send_message( { - "request_id": "3", - "service_id": "agora", + "request_id": str(uuid.uuid4()), + "service_id": self.service_id, "signal": "RENDER_START", } ) @@ -40,8 +42,8 @@ async def render_start(self): async def send_inputText(self, inputText): await self.send_message( { - "request_id": "4", - "service_id": "agora", + "request_id": str(uuid.uuid4()), + "service_id": self.service_id, "signal": "RENDER_CONTENT", "text": inputText, } From 6ac26665b6b23eee282a2f4b9d3a94024403721f Mon Sep 17 00:00:00 2001 From: caitengwen Date: Mon, 14 Oct 2024 17:57:25 +0800 Subject: [PATCH 14/15] add va.openai.azure.fashionai graph --- agents/property.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agents/property.json b/agents/property.json index c8116965..12c36d29 100644 --- a/agents/property.json +++ b/agents/property.json @@ -3,6 +3,7 @@ "log_level": 3, "predefined_graphs": [ { + "name": "va.openai.azure.fashionai", "auto_start": true, "connections": [ { @@ -135,7 +136,6 @@ "extension_group": "default" } ], - "name": "va.openai.azure.fashionai", "nodes": [ { "addon": "agora_rtc", From 886571f02332387d364e2632ee648fe46fb71ca7 Mon Sep 17 00:00:00 2001 From: caitengwen Date: Mon, 14 Oct 2024 19:49:53 +0800 Subject: [PATCH 15/15] update ten_runtime_python version --- agents/ten_packages/extension/fashionai/manifest.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agents/ten_packages/extension/fashionai/manifest.json b/agents/ten_packages/extension/fashionai/manifest.json index 21b67662..521a7efb 100644 --- a/agents/ten_packages/extension/fashionai/manifest.json +++ b/agents/ten_packages/extension/fashionai/manifest.json @@ -6,7 +6,7 @@ { "type": "system", "name": "ten_runtime_python", - "version": "0.1.0" + "version": "0.2" } ], "package": {