From e5d8f040cf2755963b6e0c3dbac80794d6f3422b Mon Sep 17 00:00:00 2001 From: Ethan Zhang Date: Tue, 17 Sep 2024 14:23:58 +0800 Subject: [PATCH] Feat/refactor exts (#276) * feat: openai extension refactoring * feat: adding refactor code / async.io * feat: fix refactoring bugs * fix: add manifest.json * feat: add queue logic * fix: fix issues - remove test code - prevent sending full content again - add queue logic * feat: fix parseSentence * fix: fix end_segment bug * feat: add chatflow abstraction - chatflow - refactor to simplify flow run - added event emitter for intermedium execution * feat: refactor openai, support multi data-stream data pack * feat: finalize openai extension refactoring - change asyncio.queue to AsyncQueue - change the way we abstract chatflow - use eventEmitter for easier tool notification - use queue to ensure task are processed one by one and cancellable * feat: add docs * feat: don't use private api --- agents/manifest-lock.json | 3 +- .../bak/openai_chatgpt_python/__init__.py | 4 + .../bak/openai_chatgpt_python/log.py | 13 + .../bak/openai_chatgpt_python/manifest.json | 93 +++++ .../openai_chatgpt_python/openai_chatgpt.py | 0 .../openai_chatgpt_addon.py | 0 .../openai_chatgpt_extension.py | 0 .../bak/openai_chatgpt_python/property.json | 1 + .../openai_chatgpt_python/requirements.txt | 5 + .../message_collector/src/extension.py | 79 ++++- .../extension/openai_chatgpt_python/BUILD.gn | 21 ++ .../extension/openai_chatgpt_python/README.md | 60 ++++ .../openai_chatgpt_python/__init__.py | 9 +- .../extension/openai_chatgpt_python/addon.py | 22 ++ .../openai_chatgpt_python/extension.py | 318 ++++++++++++++++++ .../extension/openai_chatgpt_python/helper.py | 187 ++++++++++ .../extension/openai_chatgpt_python/log.py | 13 +- .../openai_chatgpt_python/manifest.json | 12 +- .../extension/openai_chatgpt_python/openai.py | 125 +++++++ .../openai_chatgpt_python/requirements.txt | 4 +- playground/src/manager/rtc/rtc.ts | 121 ++++--- 21 files changed, 1020 insertions(+), 70 deletions(-) create mode 100644 agents/ten_packages/bak/openai_chatgpt_python/__init__.py create mode 100644 agents/ten_packages/bak/openai_chatgpt_python/log.py create mode 100644 agents/ten_packages/bak/openai_chatgpt_python/manifest.json rename agents/ten_packages/{extension => bak}/openai_chatgpt_python/openai_chatgpt.py (100%) rename agents/ten_packages/{extension => bak}/openai_chatgpt_python/openai_chatgpt_addon.py (100%) rename agents/ten_packages/{extension => bak}/openai_chatgpt_python/openai_chatgpt_extension.py (100%) create mode 100644 agents/ten_packages/bak/openai_chatgpt_python/property.json create mode 100644 agents/ten_packages/bak/openai_chatgpt_python/requirements.txt create mode 100644 agents/ten_packages/extension/openai_chatgpt_python/BUILD.gn create mode 100644 agents/ten_packages/extension/openai_chatgpt_python/README.md create mode 100644 agents/ten_packages/extension/openai_chatgpt_python/addon.py create mode 100644 agents/ten_packages/extension/openai_chatgpt_python/extension.py create mode 100644 agents/ten_packages/extension/openai_chatgpt_python/helper.py create mode 100644 agents/ten_packages/extension/openai_chatgpt_python/openai.py diff --git a/agents/manifest-lock.json b/agents/manifest-lock.json index c4921982..902dca63 100644 --- a/agents/manifest-lock.json +++ b/agents/manifest-lock.json @@ -141,8 +141,7 @@ "type": "system", "name": "nlohmann_json", "version": "3.11.2", - "hash": "72b15822c7ea9deef5e7ad96216ac55e93f11b00466dd1943afd5ee276e99d19", - "supports": [] + "hash": "72b15822c7ea9deef5e7ad96216ac55e93f11b00466dd1943afd5ee276e99d19" }, { "type": "system", diff --git a/agents/ten_packages/bak/openai_chatgpt_python/__init__.py b/agents/ten_packages/bak/openai_chatgpt_python/__init__.py new file mode 100644 index 00000000..42c4cd12 --- /dev/null +++ b/agents/ten_packages/bak/openai_chatgpt_python/__init__.py @@ -0,0 +1,4 @@ +from . import openai_chatgpt_addon +from .log import logger + +logger.info("openai_chatgpt_python extension loaded") diff --git a/agents/ten_packages/bak/openai_chatgpt_python/log.py b/agents/ten_packages/bak/openai_chatgpt_python/log.py new file mode 100644 index 00000000..fa2202da --- /dev/null +++ b/agents/ten_packages/bak/openai_chatgpt_python/log.py @@ -0,0 +1,13 @@ +import logging + +logger = logging.getLogger("openai_chatgpt_python") +logger.setLevel(logging.INFO) + +formatter = logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(process)d - [%(filename)s:%(lineno)d] - %(message)s" +) + +console_handler = logging.StreamHandler() +console_handler.setFormatter(formatter) + +logger.addHandler(console_handler) diff --git a/agents/ten_packages/bak/openai_chatgpt_python/manifest.json b/agents/ten_packages/bak/openai_chatgpt_python/manifest.json new file mode 100644 index 00000000..ce872dfe --- /dev/null +++ b/agents/ten_packages/bak/openai_chatgpt_python/manifest.json @@ -0,0 +1,93 @@ +{ + "type": "extension", + "name": "openai_chatgpt_python", + "version": "0.4.0", + "dependencies": [ + { + "type": "system", + "name": "ten_runtime_python", + "version": "0.2" + } + ], + "api": { + "property": { + "api_key": { + "type": "string" + }, + "frequency_penalty": { + "type": "float64" + }, + "presence_penalty": { + "type": "float64" + }, + "temperature": { + "type": "float64" + }, + "top_p": { + "type": "float64" + }, + "model": { + "type": "string" + }, + "max_tokens": { + "type": "int64" + }, + "base_url": { + "type": "string" + }, + "prompt": { + "type": "string" + }, + "greeting": { + "type": "string" + }, + "checking_vision_text_items": { + "type": "string" + }, + "proxy_url": { + "type": "string" + }, + "max_memory_length": { + "type": "int64" + }, + "enable_tools": { + "type": "bool" + } + }, + "data_in": [ + { + "name": "text_data", + "property": { + "text": { + "type": "string" + } + } + } + ], + "data_out": [ + { + "name": "text_data", + "property": { + "text": { + "type": "string" + } + } + } + ], + "cmd_in": [ + { + "name": "flush" + } + ], + "cmd_out": [ + { + "name": "flush" + } + ], + "video_frame_in": [ + { + "name": "video_frame" + } + ] + } +} \ No newline at end of file diff --git a/agents/ten_packages/extension/openai_chatgpt_python/openai_chatgpt.py b/agents/ten_packages/bak/openai_chatgpt_python/openai_chatgpt.py similarity index 100% rename from agents/ten_packages/extension/openai_chatgpt_python/openai_chatgpt.py rename to agents/ten_packages/bak/openai_chatgpt_python/openai_chatgpt.py diff --git a/agents/ten_packages/extension/openai_chatgpt_python/openai_chatgpt_addon.py b/agents/ten_packages/bak/openai_chatgpt_python/openai_chatgpt_addon.py similarity index 100% rename from agents/ten_packages/extension/openai_chatgpt_python/openai_chatgpt_addon.py rename to agents/ten_packages/bak/openai_chatgpt_python/openai_chatgpt_addon.py diff --git a/agents/ten_packages/extension/openai_chatgpt_python/openai_chatgpt_extension.py b/agents/ten_packages/bak/openai_chatgpt_python/openai_chatgpt_extension.py similarity index 100% rename from agents/ten_packages/extension/openai_chatgpt_python/openai_chatgpt_extension.py rename to agents/ten_packages/bak/openai_chatgpt_python/openai_chatgpt_extension.py diff --git a/agents/ten_packages/bak/openai_chatgpt_python/property.json b/agents/ten_packages/bak/openai_chatgpt_python/property.json new file mode 100644 index 00000000..9e26dfee --- /dev/null +++ b/agents/ten_packages/bak/openai_chatgpt_python/property.json @@ -0,0 +1 @@ +{} \ No newline at end of file diff --git a/agents/ten_packages/bak/openai_chatgpt_python/requirements.txt b/agents/ten_packages/bak/openai_chatgpt_python/requirements.txt new file mode 100644 index 00000000..5ddef5be --- /dev/null +++ b/agents/ten_packages/bak/openai_chatgpt_python/requirements.txt @@ -0,0 +1,5 @@ +openai +numpy +requests +pillow +asyncio \ No newline at end of file diff --git a/agents/ten_packages/extension/message_collector/src/extension.py b/agents/ten_packages/extension/message_collector/src/extension.py index 39206a13..7013c432 100644 --- a/agents/ten_packages/extension/message_collector/src/extension.py +++ b/agents/ten_packages/extension/message_collector/src/extension.py @@ -7,6 +7,7 @@ # import json import time +import uuid from ten import ( AudioFrame, VideoFrame, @@ -19,7 +20,8 @@ ) from .log import logger - +MAX_SIZE = 800 # 1 KB limit +OVERHEAD_ESTIMATE = 200 # Estimate for the overhead of metadata in the JSON CMD_NAME_FLUSH = "flush" @@ -89,16 +91,12 @@ def on_data(self, ten_env: TenEnv, data: Data) -> None: try: final = data.get_property_bool(TEXT_DATA_FINAL_FIELD) except Exception as e: - logger.warning( - f"on_data get_property_bool {TEXT_DATA_FINAL_FIELD} error: {e}" - ) + pass try: stream_id = data.get_property_int(TEXT_DATA_STREAM_ID_FIELD) except Exception as e: - logger.warning( - f"on_data get_property_int {TEXT_DATA_STREAM_ID_FIELD} error: {e}" - ) + pass try: end_of_segment = data.get_property_bool(TEXT_DATA_END_OF_SEGMENT_FIELD) @@ -124,19 +122,72 @@ def on_data(self, ten_env: TenEnv, data: Data) -> None: cached_text_map[stream_id] = text - msg_data = json.dumps({ - "text": text, + # Generate a unique message ID for this batch of parts + message_id = str(uuid.uuid4()) + + # Prepare the main JSON structure without the text field + base_msg_data = { "is_final": end_of_segment, "stream_id": stream_id, + "message_id": message_id, # Add message_id to identify the split message "data_type": "transcribe", "text_ts": int(time.time() * 1000), # Convert to milliseconds - }) + } try: - # convert the origin text data to the protobuf data and send it to the graph. - ten_data = Data.create("data") - ten_data.set_property_buf("data", msg_data.encode()) - ten_env.send_data(ten_data) + # Convert the text to UTF-8 bytes + text_bytes = text.encode('utf-8') + + # If the text + metadata fits within the size limit, send it directly + if len(text_bytes) + OVERHEAD_ESTIMATE <= MAX_SIZE: + base_msg_data["text"] = text + msg_data = json.dumps(base_msg_data) + ten_data = Data.create("data") + ten_data.set_property_buf("data", msg_data.encode()) + ten_env.send_data(ten_data) + else: + # Split the text bytes into smaller chunks, ensuring safe UTF-8 splitting + max_text_size = MAX_SIZE - OVERHEAD_ESTIMATE + total_length = len(text_bytes) + total_parts = (total_length + max_text_size - 1) // max_text_size # Calculate number of parts + + def get_valid_utf8_chunk(start, end): + """Helper function to ensure valid UTF-8 chunks.""" + while end > start: + try: + # Decode to check if this chunk is valid UTF-8 + text_part = text_bytes[start:end].decode('utf-8') + return text_part, end + except UnicodeDecodeError: + # Reduce the end point to avoid splitting in the middle of a character + end -= 1 + # If no valid chunk is found (shouldn't happen with valid UTF-8 input), return an empty string + return "", start + + part_number = 0 + start_index = 0 + while start_index < total_length: + part_number += 1 + # Get a valid UTF-8 chunk + text_part, end_index = get_valid_utf8_chunk(start_index, min(start_index + max_text_size, total_length)) + + # Prepare the part data with metadata + part_data = base_msg_data.copy() + part_data.update({ + "text": text_part, + "part_number": part_number, + "total_parts": total_parts, + }) + + # Send each part + part_msg_data = json.dumps(part_data) + ten_data = Data.create("data") + ten_data.set_property_buf("data", part_msg_data.encode()) + ten_env.send_data(ten_data) + + # Move to the next chunk + start_index = end_index + except Exception as e: logger.warning(f"on_data new_data error: {e}") return diff --git a/agents/ten_packages/extension/openai_chatgpt_python/BUILD.gn b/agents/ten_packages/extension/openai_chatgpt_python/BUILD.gn new file mode 100644 index 00000000..23f06108 --- /dev/null +++ b/agents/ten_packages/extension/openai_chatgpt_python/BUILD.gn @@ -0,0 +1,21 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2022-11. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +import("//build/feature/ten_package.gni") + +ten_package("openai_chatgpt_python") { + package_kind = "extension" + + resources = [ + "__init__.py", + "addon.py", + "extension.py", + "log.py", + "manifest.json", + "property.json", + ] +} diff --git a/agents/ten_packages/extension/openai_chatgpt_python/README.md b/agents/ten_packages/extension/openai_chatgpt_python/README.md new file mode 100644 index 00000000..2a9b1c82 --- /dev/null +++ b/agents/ten_packages/extension/openai_chatgpt_python/README.md @@ -0,0 +1,60 @@ +# openai_chatgpt_python + +An extension for integrating OpenAI's GPT models (e.g., GPT-4) into your application, providing configurable AI-driven features such as conversational agents, task automation, and tool integration. + +## Features + + + +- OpenAI GPT Integration: Leverage GPT models for text processing and conversational tasks. +- Configurable: Easily customize API keys, model settings, prompts, temperature, etc. +- Async Queue Processing: Supports real-time message processing with task cancellation and prioritization. +- Tool Support: Integrate external tools like image recognition via OpenAI's API. + +## API + +Refer to `api` definition in [manifest.json] and default values in [property.json](property.json). + + + +| **Property** | **Type** | **Description** | +|----------------------------|------------|-------------------------------------------| +| `api_key` | `string` | API key for authenticating with OpenAI | +| `frequency_penalty` | `float64` | Controls how much to penalize new tokens based on their existing frequency in the text so far | +| `presence_penalty` | `float64` | Controls how much to penalize new tokens based on whether they appear in the text so far | +| `temperature` | `float64` | Sampling temperature, higher values mean more randomness | +| `top_p` | `float64` | Nucleus sampling, chooses tokens with cumulative probability `p` | +| `model` | `string` | Model identifier (e.g., GPT-3.5, GPT-4) | +| `max_tokens` | `int64` | Maximum number of tokens to generate | +| `base_url` | `string` | API base URL | +| `prompt` | `string` | Default prompt to send to the model | +| `greeting` | `string` | Greeting message to be used | +| `checking_vision_text_items`| `string` | Items for checking vision-based text responses | +| `proxy_url` | `string` | URL of the proxy server | +| `max_memory_length` | `int64` | Maximum memory length for processing | +| `enable_tools` | `bool` | Flag to enable or disable external tools | + +### Data In: +| **Name** | **Property** | **Type** | **Description** | +|----------------|--------------|------------|-------------------------------| +| `text_data` | `text` | `string` | Incoming text data | + +### Data Out: +| **Name** | **Property** | **Type** | **Description** | +|----------------|--------------|------------|-------------------------------| +| `text_data` | `text` | `string` | Outgoing text data | + +### Command In: +| **Name** | **Description** | +|----------------|---------------------------------------------| +| `flush` | Command to flush the current processing state | + +### Command Out: +| **Name** | **Description** | +|----------------|---------------------------------------------| +| `flush` | Response after flushing the current state | + +### Video Frame In: +| **Name** | **Description** | +|------------------|-------------------------------------------| +| `video_frame` | Video frame input for vision processing | diff --git a/agents/ten_packages/extension/openai_chatgpt_python/__init__.py b/agents/ten_packages/extension/openai_chatgpt_python/__init__.py index 42c4cd12..09a409ff 100644 --- a/agents/ten_packages/extension/openai_chatgpt_python/__init__.py +++ b/agents/ten_packages/extension/openai_chatgpt_python/__init__.py @@ -1,4 +1,11 @@ -from . import openai_chatgpt_addon +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2024-08. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +from . import addon from .log import logger logger.info("openai_chatgpt_python extension loaded") diff --git a/agents/ten_packages/extension/openai_chatgpt_python/addon.py b/agents/ten_packages/extension/openai_chatgpt_python/addon.py new file mode 100644 index 00000000..ee13b156 --- /dev/null +++ b/agents/ten_packages/extension/openai_chatgpt_python/addon.py @@ -0,0 +1,22 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2024-08. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +from ten import ( + Addon, + register_addon_as_extension, + TenEnv, +) +from .extension import OpenAIChatGPTExtension +from .log import logger + + +@register_addon_as_extension("openai_chatgpt_python") +class OpenAIChatGPTExtensionAddon(Addon): + + def on_create_instance(self, ten_env: TenEnv, name: str, context) -> None: + logger.info("OpenAIChatGPTExtensionAddon on_create_instance") + ten_env.on_create_instance_done(OpenAIChatGPTExtension(name), context) diff --git a/agents/ten_packages/extension/openai_chatgpt_python/extension.py b/agents/ten_packages/extension/openai_chatgpt_python/extension.py new file mode 100644 index 00000000..5d027267 --- /dev/null +++ b/agents/ten_packages/extension/openai_chatgpt_python/extension.py @@ -0,0 +1,318 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2024-08. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +import asyncio +import json +import random +import threading +import traceback + +from .helper import AsyncEventEmitter, AsyncQueue, get_current_time, get_property_bool, get_property_float, get_property_int, get_property_string, parse_sentences, rgb2base64jpeg +from .openai import OpenAIChatGPT, OpenAIChatGPTConfig +from ten import ( + AudioFrame, + VideoFrame, + Extension, + TenEnv, + Cmd, + StatusCode, + CmdResult, + Data, +) +from .log import logger + +CMD_IN_FLUSH = "flush" +CMD_OUT_FLUSH = "flush" +DATA_IN_TEXT_DATA_PROPERTY_TEXT = "text" +DATA_IN_TEXT_DATA_PROPERTY_IS_FINAL = "is_final" +DATA_OUT_TEXT_DATA_PROPERTY_TEXT = "text" +DATA_OUT_TEXT_DATA_PROPERTY_TEXT_END_OF_SEGMENT = "end_of_segment" + +PROPERTY_BASE_URL = "base_url" # Optional +PROPERTY_API_KEY = "api_key" # Required +PROPERTY_MODEL = "model" # Optional +PROPERTY_PROMPT = "prompt" # Optional +PROPERTY_FREQUENCY_PENALTY = "frequency_penalty" # Optional +PROPERTY_PRESENCE_PENALTY = "presence_penalty" # Optional +PROPERTY_TEMPERATURE = "temperature" # Optional +PROPERTY_TOP_P = "top_p" # Optional +PROPERTY_MAX_TOKENS = "max_tokens" # Optional +PROPERTY_GREETING = "greeting" # Optional +PROPERTY_ENABLE_TOOLS = "enable_tools" # Optional +PROPERTY_PROXY_URL = "proxy_url" # Optional +PROPERTY_MAX_MEMORY_LENGTH = "max_memory_length" # Optional +PROPERTY_CHECKING_VISION_TEXT_ITEMS = "checking_vision_text_items" # Optional + + +TASK_TYPE_CHAT_COMPLETION = "chat_completion" +TASK_TYPE_CHAT_COMPLETION_WITH_VISION = "chat_completion_with_vision" + +class OpenAIChatGPTExtension(Extension): + memory = [] + max_memory_length = 10 + openai_chatgpt = None + enable_tools = False + image_data = None + image_width = 0 + image_height = 0 + checking_vision_text_items = [] + loop = None + sentence_fragment = "" + + # Create the queue for message processing + queue = AsyncQueue() + + available_tools = [ + { + "type": "function", + "function": { + # ensure you use gpt-4o or later model if you need image recognition, gpt-4o-mini does not work quite well in this case + "name": "get_vision_image", + "description": "Get the image from camera. Call this whenever you need to understand the input camera image like you have vision capability, for example when user asks 'What can you see?' or 'Can you see me?'", + }, + "strict": True, + } + ] + + def on_init(self, ten_env: TenEnv) -> None: + logger.info("on_init") + ten_env.on_init_done() + + def on_start(self, ten_env: TenEnv) -> None: + logger.info("on_start") + + self.loop = asyncio.new_event_loop() + def start_loop(): + asyncio.set_event_loop(self.loop) + self.loop.run_forever() + threading.Thread(target=start_loop, args=[]).start() + + self.loop.create_task(self._process_queue(ten_env)) + + # Prepare configuration + openai_chatgpt_config = OpenAIChatGPTConfig.default_config() + + # Mandatory properties + openai_chatgpt_config.base_url = get_property_string(ten_env, PROPERTY_BASE_URL) or openai_chatgpt_config.base_url + openai_chatgpt_config.api_key = get_property_string(ten_env, PROPERTY_API_KEY) + if not openai_chatgpt_config.api_key: + logger.info(f"API key is missing, exiting on_start") + return + + # Optional properties + openai_chatgpt_config.model = get_property_string(ten_env, PROPERTY_MODEL) or openai_chatgpt_config.model + openai_chatgpt_config.prompt = get_property_string(ten_env, PROPERTY_PROMPT) or openai_chatgpt_config.prompt + openai_chatgpt_config.frequency_penalty = get_property_float(ten_env, PROPERTY_FREQUENCY_PENALTY) or openai_chatgpt_config.frequency_penalty + openai_chatgpt_config.presence_penalty = get_property_float(ten_env, PROPERTY_PRESENCE_PENALTY) or openai_chatgpt_config.presence_penalty + openai_chatgpt_config.temperature = get_property_float(ten_env, PROPERTY_TEMPERATURE) or openai_chatgpt_config.temperature + openai_chatgpt_config.top_p = get_property_float(ten_env, PROPERTY_TOP_P) or openai_chatgpt_config.top_p + openai_chatgpt_config.max_tokens = get_property_int(ten_env, PROPERTY_MAX_TOKENS) or openai_chatgpt_config.max_tokens + openai_chatgpt_config.proxy_url = get_property_string(ten_env, PROPERTY_PROXY_URL) or openai_chatgpt_config.proxy_url + + # Properties that don't affect openai_chatgpt_config + greeting = get_property_string(ten_env, PROPERTY_GREETING) + self.enable_tools = get_property_bool(ten_env, PROPERTY_ENABLE_TOOLS) + self.max_memory_length = get_property_int(ten_env, PROPERTY_MAX_MEMORY_LENGTH) + checking_vision_text_items_str = get_property_string(ten_env, PROPERTY_CHECKING_VISION_TEXT_ITEMS) + if checking_vision_text_items_str: + try: + self.checking_vision_text_items = json.loads(checking_vision_text_items_str) + except Exception as err: + logger.info(f"Error parsing {PROPERTY_CHECKING_VISION_TEXT_ITEMS}: {err}") + + # Create instance + try: + self.openai_chatgpt = OpenAIChatGPT(openai_chatgpt_config) + logger.info(f"initialized with max_tokens: {openai_chatgpt_config.max_tokens}, model: {openai_chatgpt_config.model}") + except Exception as err: + logger.info(f"Failed to initialize OpenAIChatGPT: {err}") + + # Send greeting if available + if greeting: + try: + output_data = Data.create("text_data") + output_data.set_property_string(DATA_OUT_TEXT_DATA_PROPERTY_TEXT, greeting) + output_data.set_property_bool(DATA_OUT_TEXT_DATA_PROPERTY_TEXT_END_OF_SEGMENT, True) + ten_env.send_data(output_data) + logger.info(f"Greeting [{greeting}] sent") + except Exception as err: + logger.info(f"Failed to send greeting [{greeting}]: {err}") + ten_env.on_start_done() + + def on_stop(self, ten_env: TenEnv) -> None: + logger.info("on_stop") + + # TODO: clean up resources + + ten_env.on_stop_done() + + def on_deinit(self, ten_env: TenEnv) -> None: + logger.info("on_deinit") + ten_env.on_deinit_done() + + def on_cmd(self, ten_env: TenEnv, cmd: Cmd) -> None: + logger.info(f"on_cmd json: {cmd.to_json()}") + + cmd_name = cmd.get_name() + + if cmd_name == CMD_IN_FLUSH: + asyncio.run_coroutine_threadsafe(self._flush_queue(), self.loop) + ten_env.send_cmd(Cmd.create(CMD_OUT_FLUSH), None) + logger.info("on_cmd sent flush") + status_code, detail = StatusCode.OK, "success" + else: + logger.info(f"on_cmd unknown cmd: {cmd_name}") + status_code, detail = StatusCode.ERROR, "unknown cmd" + + cmd_result = CmdResult.create(status_code) + cmd_result.set_property_string("detail", detail) + ten_env.return_result(cmd_result, cmd) + + def on_data(self, ten_env: TenEnv, data: Data) -> None: + # Get the necessary properties + is_final = get_property_bool(data, DATA_IN_TEXT_DATA_PROPERTY_IS_FINAL) + input_text = get_property_string(data, DATA_IN_TEXT_DATA_PROPERTY_TEXT) + + if not is_final: + logger.info("ignore non-final input") + return + if not input_text: + logger.info("ignore empty text") + return + + logger.info(f"OnData input text: [{input_text}]") + + # Start an asynchronous task for handling chat completion + asyncio.run_coroutine_threadsafe(self.queue.put([TASK_TYPE_CHAT_COMPLETION, input_text]), self.loop) + + def on_audio_frame(self, ten_env: TenEnv, audio_frame: AudioFrame) -> None: + # TODO: process pcm frame + pass + + def on_video_frame(self, ten_env: TenEnv, video_frame: VideoFrame) -> None: + # logger.info(f"OpenAIChatGPTExtension on_video_frame {frame.get_width()} {frame.get_height()}") + self.image_data = video_frame.get_buf() + self.image_width = video_frame.get_width() + self.image_height = video_frame.get_height() + return + + async def _process_queue(self, ten_env: TenEnv): + """Asynchronously process queue items one by one.""" + while True: + # Wait for an item to be available in the queue + [task_type, message] = await self.queue.get() + try: + # Create a new task for the new message + self.current_task = asyncio.create_task(self._run_chatflow(ten_env, task_type, message, self.memory)) + await self.current_task # Wait for the current task to finish or be cancelled + except asyncio.CancelledError: + logger.info(f"Task cancelled: {message}") + + async def _flush_queue(self): + """Flushes the self.queue and cancels the current task.""" + # Flush the queue using the new flush method + await self.queue.flush() + + # Cancel the current task if one is running + if self.current_task: + logger.info("Cancelling the current task during flush.") + self.current_task.cancel() + + async def _run_chatflow(self, ten_env: TenEnv, task_type:str, input_text: str, memory): + """Run the chatflow asynchronously.""" + memory_cache = [] + try: + logger.info(f"for input text: [{input_text}] memory: {memory}") + message = None + tools = None + + # Prepare the message and tools based on the task type + if task_type == TASK_TYPE_CHAT_COMPLETION: + message = {"role": "user", "content": input_text} + memory_cache = memory_cache + [message, {"role": "assistant", "content": ""}] + tools = self.available_tools if self.enable_tools else None + elif task_type == TASK_TYPE_CHAT_COMPLETION_WITH_VISION: + message = {"role": "user", "content": input_text} + memory_cache = memory_cache + [message, {"role": "assistant", "content": ""}] + tools = self.available_tools if self.enable_tools else None + if self.image_data is not None: + url = rgb2base64jpeg(self.image_data, self.image_width, self.image_height) + message = { + "role": "user", + "content": [ + {"type": "text", "text": input_text}, + {"type": "image_url", "image_url": {"url": url}}, + ], + } + logger.info(f"msg with vision data: {message}") + + + self.sentence_fragment = "" + + # Create an asyncio.Event to signal when content is finished + content_finished_event = asyncio.Event() + + # Create an async listener to handle tool calls and content updates + async def handle_tool_call(tool_call): + logger.info(f"tool_call: {tool_call}") + if tool_call.function.name == "get_vision_image": + # Append the vision image to the last assistant message + await self.queue.put([TASK_TYPE_CHAT_COMPLETION_WITH_VISION, input_text], True) + + async def handle_content_update(content:str): + # Append the content to the last assistant message + for item in reversed(memory_cache): + if item.get('role') == 'assistant': + item['content'] = item['content'] + content + break + sentences, self.sentence_fragment = parse_sentences(self.sentence_fragment, content) + for s in sentences: + self._send_data(ten_env, s, False) + + async def handle_content_finished(full_content:str): + content_finished_event.set() + + listener = AsyncEventEmitter() + listener.on("tool_call", handle_tool_call) + listener.on("content_update", handle_content_update) + listener.on("content_finished", handle_content_finished) + + # Make an async API call to get chat completions + await self.openai_chatgpt.get_chat_completions_stream(memory + [message], tools, listener) + + # Wait for the content to be finished + await content_finished_event.wait() + except asyncio.CancelledError: + logger.info(f"Task cancelled: {input_text}") + except Exception as e: + logger.error(f"Error in chat_completion: {traceback.format_exc()} for input text: {input_text}") + finally: + self._send_data(ten_env, "", True) + # always append the memory + for m in memory_cache: + self._append_memory(m) + + def _append_memory(self, message:str): + if len(self.memory) > self.max_memory_length: + self.memory.pop(0) + self.memory.append(message) + + def _send_data(self, ten_env: TenEnv, sentence: str, end_of_segment: bool): + try: + output_data = Data.create("text_data") + output_data.set_property_string(DATA_OUT_TEXT_DATA_PROPERTY_TEXT, sentence) + output_data.set_property_bool( + DATA_OUT_TEXT_DATA_PROPERTY_TEXT_END_OF_SEGMENT, end_of_segment + ) + ten_env.send_data(output_data) + logger.info( + f"{'end of segment ' if end_of_segment else ''}sent sentence [{sentence}]" + ) + except Exception as err: + logger.info( + f"send sentence [{sentence}] failed, err: {err}" + ) \ No newline at end of file diff --git a/agents/ten_packages/extension/openai_chatgpt_python/helper.py b/agents/ten_packages/extension/openai_chatgpt_python/helper.py new file mode 100644 index 00000000..28c28f19 --- /dev/null +++ b/agents/ten_packages/extension/openai_chatgpt_python/helper.py @@ -0,0 +1,187 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2024-08. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +import asyncio +from collections import deque +from ten.data import Data +from .log import logger +from PIL import Image +from datetime import datetime +from io import BytesIO +from base64 import b64encode + + +def get_property_bool(data: Data, property_name: str) -> bool: + """Helper to get boolean property from data with error handling.""" + try: + return data.get_property_bool(property_name) + except Exception as err: + logger.warn(f"GetProperty {property_name} failed: {err}") + return False + +def get_property_string(data: Data, property_name: str) -> str: + """Helper to get string property from data with error handling.""" + try: + return data.get_property_string(property_name) + except Exception as err: + logger.warn(f"GetProperty {property_name} failed: {err}") + return "" + +def get_property_int(data: Data, property_name: str) -> int: + """Helper to get int property from data with error handling.""" + try: + return data.get_property_int(property_name) + except Exception as err: + logger.warn(f"GetProperty {property_name} failed: {err}") + return 0 + +def get_property_float(data: Data, property_name: str) -> float: + """Helper to get float property from data with error handling.""" + try: + return data.get_property_float(property_name) + except Exception as err: + logger.warn(f"GetProperty {property_name} failed: {err}") + return 0.0 + + +def get_current_time(): + # Get the current time + start_time = datetime.now() + # Get the number of microseconds since the Unix epoch + unix_microseconds = int(start_time.timestamp() * 1_000_000) + return unix_microseconds + + +def is_punctuation(char): + if char in [",", ",", ".", "。", "?", "?", "!", "!"]: + return True + return False + + +def parse_sentences(sentence_fragment, content): + sentences = [] + current_sentence = sentence_fragment + for char in content: + current_sentence += char + if is_punctuation(char): + # Check if the current sentence contains non-punctuation characters + stripped_sentence = current_sentence + if any(c.isalnum() for c in stripped_sentence): + sentences.append(stripped_sentence) + current_sentence = "" # Reset for the next sentence + + remain = current_sentence # Any remaining characters form the incomplete sentence + return sentences, remain + + + +def rgb2base64jpeg(rgb_data, width, height): + # Convert the RGB image to a PIL Image + pil_image = Image.frombytes("RGBA", (width, height), bytes(rgb_data)) + pil_image = pil_image.convert("RGB") + + # Resize the image while maintaining its aspect ratio + pil_image = resize_image_keep_aspect(pil_image, 320) + + # Save the image to a BytesIO object in JPEG format + buffered = BytesIO() + pil_image.save(buffered, format="JPEG") + # pil_image.save("test.jpg", format="JPEG") + + # Get the byte data of the JPEG image + jpeg_image_data = buffered.getvalue() + + # Convert the JPEG byte data to a Base64 encoded string + base64_encoded_image = b64encode(jpeg_image_data).decode("utf-8") + + # Create the data URL + mime_type = "image/jpeg" + base64_url = f"data:{mime_type};base64,{base64_encoded_image}" + return base64_url + + +def resize_image_keep_aspect(image, max_size=512): + """ + Resize an image while maintaining its aspect ratio, ensuring the larger dimension is max_size. + If both dimensions are smaller than max_size, the image is not resized. + + :param image: A PIL Image object + :param max_size: The maximum size for the larger dimension (width or height) + :return: A PIL Image object (resized or original) + """ + # Get current width and height + width, height = image.size + + # If both dimensions are already smaller than max_size, return the original image + if width <= max_size and height <= max_size: + return image + + # Calculate the aspect ratio + aspect_ratio = width / height + + # Determine the new dimensions + if width > height: + new_width = max_size + new_height = int(max_size / aspect_ratio) + else: + new_height = max_size + new_width = int(max_size * aspect_ratio) + + # Resize the image with the new dimensions + resized_image = image.resize((new_width, new_height)) + + return resized_image + + +class AsyncEventEmitter: + def __init__(self): + self.listeners = {} + + def on(self, event_name, listener): + """Register an event listener.""" + if event_name not in self.listeners: + self.listeners[event_name] = [] + self.listeners[event_name].append(listener) + + def emit(self, event_name, *args, **kwargs): + """Fire the event without waiting for listeners to finish.""" + if event_name in self.listeners: + for listener in self.listeners[event_name]: + asyncio.create_task(listener(*args, **kwargs)) + + +class AsyncQueue: + def __init__(self): + self._queue = deque() # Use deque for efficient prepend and append + self._condition = asyncio.Condition() # Use Condition to manage access + + async def put(self, item, prepend=False): + """Add an item to the queue (prepend if specified).""" + async with self._condition: + if prepend: + self._queue.appendleft(item) # Prepend item to the front + else: + self._queue.append(item) # Append item to the back + self._condition.notify() + + async def get(self): + """Remove and return an item from the queue.""" + async with self._condition: + while not self._queue: + await self._condition.wait() # Wait until an item is available + return self._queue.popleft() # Pop from the front of the deque + + async def flush(self): + """Flush all items from the queue.""" + async with self._condition: + while self._queue: + self._queue.popleft() # Clear the queue + self._condition.notify_all() # Notify all consumers that the queue is empty + + def __len__(self): + """Return the current size of the queue.""" + return len(self._queue) \ No newline at end of file diff --git a/agents/ten_packages/extension/openai_chatgpt_python/log.py b/agents/ten_packages/extension/openai_chatgpt_python/log.py index fa2202da..1813e965 100644 --- a/agents/ten_packages/extension/openai_chatgpt_python/log.py +++ b/agents/ten_packages/extension/openai_chatgpt_python/log.py @@ -1,11 +1,20 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2024-08. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# import logging logger = logging.getLogger("openai_chatgpt_python") logger.setLevel(logging.INFO) -formatter = logging.Formatter( - "%(asctime)s - %(name)s - %(levelname)s - %(process)d - [%(filename)s:%(lineno)d] - %(message)s" +formatter_str = ( + "%(asctime)s - %(name)s - %(levelname)s - %(process)d - " + "[%(filename)s:%(lineno)d] - %(message)s" ) +formatter = logging.Formatter(formatter_str) console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) diff --git a/agents/ten_packages/extension/openai_chatgpt_python/manifest.json b/agents/ten_packages/extension/openai_chatgpt_python/manifest.json index ce872dfe..4a74e306 100644 --- a/agents/ten_packages/extension/openai_chatgpt_python/manifest.json +++ b/agents/ten_packages/extension/openai_chatgpt_python/manifest.json @@ -1,7 +1,7 @@ { "type": "extension", "name": "openai_chatgpt_python", - "version": "0.4.0", + "version": "0.1.0", "dependencies": [ { "type": "system", @@ -9,6 +9,16 @@ "version": "0.2" } ], + "package": { + "include": [ + "manifest.json", + "property.json", + "BUILD.gn", + "**.tent", + "**.py", + "README.md" + ] + }, "api": { "property": { "api_key": { diff --git a/agents/ten_packages/extension/openai_chatgpt_python/openai.py b/agents/ten_packages/extension/openai_chatgpt_python/openai.py new file mode 100644 index 00000000..3449126d --- /dev/null +++ b/agents/ten_packages/extension/openai_chatgpt_python/openai.py @@ -0,0 +1,125 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2024-08. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +import random +import requests +from openai import AsyncOpenAI +from typing import List, Dict, Any, Optional +from .log import logger + + +class OpenAIChatGPTConfig: + def __init__(self, + base_url: str, + api_key: str, + model: str, + prompt: str, + frequency_penalty: float, + presence_penalty: float, + top_p: float, + temperature: float, + max_tokens: int, + seed: Optional[int] = None, + proxy_url: Optional[str] = None): + self.base_url = base_url + self.api_key = api_key + self.model = model + self.prompt = prompt + self.frequency_penalty = frequency_penalty + self.presence_penalty = presence_penalty + self.top_p = top_p + self.temperature = temperature + self.max_tokens = max_tokens + self.seed = seed if seed is not None else random.randint(0, 10000) + self.proxy_url = proxy_url + + @classmethod + def default_config(cls): + return cls( + base_url="https://api.openai.com/v1", + api_key="", + model="gpt-4", # Adjust this to match the equivalent of `openai.GPT4o` in the Python library + prompt="You are a voice assistant who talks in a conversational way and can chat with me like my friends. I will speak to you in English or Chinese, and you will answer in the corrected and improved version of my text with the language I use. Don’t talk like a robot, instead I would like you to talk like a real human with emotions. I will use your answer for text-to-speech, so don’t return me any meaningless characters. I want you to be helpful, when I’m asking you for advice, give me precise, practical and useful advice instead of being vague. When giving me a list of options, express the options in a narrative way instead of bullet points.", + frequency_penalty=0.9, + presence_penalty=0.9, + top_p=1.0, + temperature=0.1, + max_tokens=512, + seed=random.randint(0, 10000), + proxy_url="" + ) + + +class OpenAIChatGPT: + client = None + def __init__(self, config: OpenAIChatGPTConfig): + self.config = config + logger.info(f"OpenAIChatGPT initialized with config: {config.api_key}") + self.client = AsyncOpenAI( + api_key=config.api_key, + base_url=config.base_url + ) + self.session = requests.Session() + if config.proxy_url: + proxies = { + "http": config.proxy_url, + "https": config.proxy_url, + } + self.session.proxies.update(proxies) + self.client.session = self.session + + async def get_chat_completions_stream(self, messages, tools = None, listener = None): + req = { + "model": self.config.model, + "messages": [ + { + "role": "system", + "content": self.config.prompt, + }, + *messages, + ], + "tools": tools, + "temperature": self.config.temperature, + "top_p": self.config.top_p, + "presence_penalty": self.config.presence_penalty, + "frequency_penalty": self.config.frequency_penalty, + "max_tokens": self.config.max_tokens, + "seed": self.config.seed, + "stream": True, + } + + try: + response = await self.client.chat.completions.create(**req) + except Exception as e: + raise Exception(f"CreateChatCompletionStream failed, err: {e}") + + full_content = "" + + async for chat_completion in response: + choice = chat_completion.choices[0] + delta = choice.delta + + content = delta.content if delta and delta.content else "" + + # Emit content update event (fire-and-forget) + if listener and content: + listener.emit('content_update', content) + + full_content += content + + # Check for tool calls + if delta.tool_calls: + for tool_call in delta.tool_calls: + logger.info(f"tool_call: {tool_call}") + + # Emit tool call event (fire-and-forget) + if listener: + listener.emit('tool_call', tool_call) + + # Emit content finished event after the loop completes + if listener: + listener.emit('content_finished', full_content) diff --git a/agents/ten_packages/extension/openai_chatgpt_python/requirements.txt b/agents/ten_packages/extension/openai_chatgpt_python/requirements.txt index ca4978c3..51cdd053 100644 --- a/agents/ten_packages/extension/openai_chatgpt_python/requirements.txt +++ b/agents/ten_packages/extension/openai_chatgpt_python/requirements.txt @@ -1,4 +1,4 @@ openai numpy -requests==2.32.3 -pillow==10.4.0 \ No newline at end of file +requests +pillow \ No newline at end of file diff --git a/playground/src/manager/rtc/rtc.ts b/playground/src/manager/rtc/rtc.ts index 0439be78..4139d89e 100644 --- a/playground/src/manager/rtc/rtc.ts +++ b/playground/src/manager/rtc/rtc.ts @@ -103,57 +103,82 @@ export class RtcManager extends AGEventEmitter { }) }) this.client.on("stream-message", (uid: UID, stream: any) => { - this._praseData(stream) + this._parseData(stream) }) } - private _praseData(data: any): ITextItem | void { - // @ts-ignore - // const textstream = protoRoot.Agora.SpeechToText.lookup("Text").decode(data) - // if (!textstream) { - // return console.warn("Prase data failed.") - // } - let decoder = new TextDecoder('utf-8') - let decodedMessage = decoder.decode(data) - - const textstream = JSON.parse(decodedMessage) - - console.log("[test] textstream raw data", JSON.stringify(textstream)) - const { stream_id, is_final, text, text_ts, data_type } = textstream - let textStr: string = "" - let isFinal = false - const textItem: ITextItem = {} as ITextItem - textItem.uid = stream_id - textItem.time = text_ts - // switch (dataType) { - // case "transcribe": - // words.forEach((word: any) => { - // textStr += word.text - // if (word.isFinal) { - // isFinal = true - // } - // }) - textItem.dataType = "transcribe" - // textItem.language = culture - textItem.text = text - textItem.isFinal = is_final - this.emit("textChanged", textItem) - // break - // case "translate": - // if (!trans?.length) { - // return - // } - // trans.forEach((transItem: any) => { - // textStr = transItem.texts.join("") - // isFinal = !!transItem.isFinal - // textItem.dataType = "translate" - // textItem.language = transItem.lang - // textItem.isFinal = isFinal - // textItem.text = textStr - // this.emit("textChanged", textItem) - // }) - // break - // } + private _parseData(data: any): ITextItem | void { + let decoder = new TextDecoder('utf-8'); + let decodedMessage = decoder.decode(data); + const textstream = JSON.parse(decodedMessage); + + console.log("[test] textstream raw data", JSON.stringify(textstream)); + + const { stream_id, is_final, text, text_ts, data_type, message_id, part_number, total_parts } = textstream; + + if (total_parts > 0) { + // If message is split, handle it accordingly + this._handleSplitMessage(message_id, part_number, total_parts, stream_id, is_final, text, text_ts); + } else { + // If there is no message_id, treat it as a complete message + this._handleCompleteMessage(stream_id, is_final, text, text_ts); + } + } + + private messageCache: { [key: string]: { parts: string[], totalParts: number } } = {}; + + /** + * Handle complete messages (not split). + */ + private _handleCompleteMessage(stream_id: number, is_final: boolean, text: string, text_ts: number): void { + const textItem: ITextItem = { + uid: `${stream_id}`, + time: text_ts, + dataType: "transcribe", + text: text, + isFinal: is_final + }; + + if (text.trim().length > 0) { + this.emit("textChanged", textItem); + } + } + + /** + * Handle split messages, track parts, and reassemble once all parts are received. + */ + private _handleSplitMessage( + message_id: string, + part_number: number, + total_parts: number, + stream_id: number, + is_final: boolean, + text: string, + text_ts: number + ): void { + // Ensure the messageCache entry exists for this message_id + if (!this.messageCache[message_id]) { + this.messageCache[message_id] = { parts: [], totalParts: total_parts }; + } + + const cache = this.messageCache[message_id]; + + // Store the received part at the correct index (part_number starts from 1, so we use part_number - 1) + cache.parts[part_number - 1] = text; + + // Check if all parts have been received + const receivedPartsCount = cache.parts.filter(part => part !== undefined).length; + + if (receivedPartsCount === total_parts) { + // All parts have been received, reassemble the message + const fullText = cache.parts.join(''); + + // Now that the message is reassembled, handle it like a complete message + this._handleCompleteMessage(stream_id, is_final, fullText, text_ts); + + // Remove the cached message since it is now fully processed + delete this.messageCache[message_id]; + } }