From 0f905aa4b720f33966e7efde87589cdad0030243 Mon Sep 17 00:00:00 2001 From: zhangqianze Date: Wed, 11 Sep 2024 01:45:57 +0800 Subject: [PATCH 01/13] feat: openai extension refactoring --- agents/manifest-lock.json | 3 +- .../bak/openai_chatgpt_python/__init__.py | 4 + .../bak/openai_chatgpt_python/log.py | 13 + .../bak/openai_chatgpt_python/manifest.json | 93 ++++++ .../openai_chatgpt_python/openai_chatgpt.py | 0 .../openai_chatgpt_addon.py | 0 .../openai_chatgpt_extension.py | 0 .../bak/openai_chatgpt_python/property.json | 1 + .../openai_chatgpt_python/requirements.txt | 4 + .../extension/openai_chatgpt_python/BUILD.gn | 21 ++ .../extension/openai_chatgpt_python/README.md | 29 ++ .../openai_chatgpt_python/__init__.py | 9 +- .../extension/openai_chatgpt_python/addon.py | 22 ++ .../openai_chatgpt_python/extension.py | 282 ++++++++++++++++++ .../extension/openai_chatgpt_python/helper.py | 42 +++ .../extension/openai_chatgpt_python/log.py | 13 +- .../openai_chatgpt_python/manifest.json | 92 +----- .../extension/openai_chatgpt_python/openai.py | 92 ++++++ .../openai_chatgpt_python/requirements.txt | 4 +- 19 files changed, 636 insertions(+), 88 deletions(-) create mode 100644 agents/ten_packages/bak/openai_chatgpt_python/__init__.py create mode 100644 agents/ten_packages/bak/openai_chatgpt_python/log.py create mode 100644 agents/ten_packages/bak/openai_chatgpt_python/manifest.json rename agents/ten_packages/{extension => bak}/openai_chatgpt_python/openai_chatgpt.py (100%) rename agents/ten_packages/{extension => bak}/openai_chatgpt_python/openai_chatgpt_addon.py (100%) rename agents/ten_packages/{extension => bak}/openai_chatgpt_python/openai_chatgpt_extension.py (100%) create mode 100644 agents/ten_packages/bak/openai_chatgpt_python/property.json create mode 100644 agents/ten_packages/bak/openai_chatgpt_python/requirements.txt create mode 100644 agents/ten_packages/extension/openai_chatgpt_python/BUILD.gn create mode 100644 agents/ten_packages/extension/openai_chatgpt_python/README.md create mode 100644 agents/ten_packages/extension/openai_chatgpt_python/addon.py create mode 100644 agents/ten_packages/extension/openai_chatgpt_python/extension.py create mode 100644 agents/ten_packages/extension/openai_chatgpt_python/helper.py create mode 100644 agents/ten_packages/extension/openai_chatgpt_python/openai.py diff --git a/agents/manifest-lock.json b/agents/manifest-lock.json index c4921982..902dca63 100644 --- a/agents/manifest-lock.json +++ b/agents/manifest-lock.json @@ -141,8 +141,7 @@ "type": "system", "name": "nlohmann_json", "version": "3.11.2", - "hash": "72b15822c7ea9deef5e7ad96216ac55e93f11b00466dd1943afd5ee276e99d19", - "supports": [] + "hash": "72b15822c7ea9deef5e7ad96216ac55e93f11b00466dd1943afd5ee276e99d19" }, { "type": "system", diff --git a/agents/ten_packages/bak/openai_chatgpt_python/__init__.py b/agents/ten_packages/bak/openai_chatgpt_python/__init__.py new file mode 100644 index 00000000..42c4cd12 --- /dev/null +++ b/agents/ten_packages/bak/openai_chatgpt_python/__init__.py @@ -0,0 +1,4 @@ +from . import openai_chatgpt_addon +from .log import logger + +logger.info("openai_chatgpt_python extension loaded") diff --git a/agents/ten_packages/bak/openai_chatgpt_python/log.py b/agents/ten_packages/bak/openai_chatgpt_python/log.py new file mode 100644 index 00000000..fa2202da --- /dev/null +++ b/agents/ten_packages/bak/openai_chatgpt_python/log.py @@ -0,0 +1,13 @@ +import logging + +logger = logging.getLogger("openai_chatgpt_python") +logger.setLevel(logging.INFO) + +formatter = logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(process)d - [%(filename)s:%(lineno)d] - %(message)s" +) + +console_handler = logging.StreamHandler() +console_handler.setFormatter(formatter) + +logger.addHandler(console_handler) diff --git a/agents/ten_packages/bak/openai_chatgpt_python/manifest.json b/agents/ten_packages/bak/openai_chatgpt_python/manifest.json new file mode 100644 index 00000000..ce872dfe --- /dev/null +++ b/agents/ten_packages/bak/openai_chatgpt_python/manifest.json @@ -0,0 +1,93 @@ +{ + "type": "extension", + "name": "openai_chatgpt_python", + "version": "0.4.0", + "dependencies": [ + { + "type": "system", + "name": "ten_runtime_python", + "version": "0.2" + } + ], + "api": { + "property": { + "api_key": { + "type": "string" + }, + "frequency_penalty": { + "type": "float64" + }, + "presence_penalty": { + "type": "float64" + }, + "temperature": { + "type": "float64" + }, + "top_p": { + "type": "float64" + }, + "model": { + "type": "string" + }, + "max_tokens": { + "type": "int64" + }, + "base_url": { + "type": "string" + }, + "prompt": { + "type": "string" + }, + "greeting": { + "type": "string" + }, + "checking_vision_text_items": { + "type": "string" + }, + "proxy_url": { + "type": "string" + }, + "max_memory_length": { + "type": "int64" + }, + "enable_tools": { + "type": "bool" + } + }, + "data_in": [ + { + "name": "text_data", + "property": { + "text": { + "type": "string" + } + } + } + ], + "data_out": [ + { + "name": "text_data", + "property": { + "text": { + "type": "string" + } + } + } + ], + "cmd_in": [ + { + "name": "flush" + } + ], + "cmd_out": [ + { + "name": "flush" + } + ], + "video_frame_in": [ + { + "name": "video_frame" + } + ] + } +} \ No newline at end of file diff --git a/agents/ten_packages/extension/openai_chatgpt_python/openai_chatgpt.py b/agents/ten_packages/bak/openai_chatgpt_python/openai_chatgpt.py similarity index 100% rename from agents/ten_packages/extension/openai_chatgpt_python/openai_chatgpt.py rename to agents/ten_packages/bak/openai_chatgpt_python/openai_chatgpt.py diff --git a/agents/ten_packages/extension/openai_chatgpt_python/openai_chatgpt_addon.py b/agents/ten_packages/bak/openai_chatgpt_python/openai_chatgpt_addon.py similarity index 100% rename from agents/ten_packages/extension/openai_chatgpt_python/openai_chatgpt_addon.py rename to agents/ten_packages/bak/openai_chatgpt_python/openai_chatgpt_addon.py diff --git a/agents/ten_packages/extension/openai_chatgpt_python/openai_chatgpt_extension.py b/agents/ten_packages/bak/openai_chatgpt_python/openai_chatgpt_extension.py similarity index 100% rename from agents/ten_packages/extension/openai_chatgpt_python/openai_chatgpt_extension.py rename to agents/ten_packages/bak/openai_chatgpt_python/openai_chatgpt_extension.py diff --git a/agents/ten_packages/bak/openai_chatgpt_python/property.json b/agents/ten_packages/bak/openai_chatgpt_python/property.json new file mode 100644 index 00000000..9e26dfee --- /dev/null +++ b/agents/ten_packages/bak/openai_chatgpt_python/property.json @@ -0,0 +1 @@ +{} \ No newline at end of file diff --git a/agents/ten_packages/bak/openai_chatgpt_python/requirements.txt b/agents/ten_packages/bak/openai_chatgpt_python/requirements.txt new file mode 100644 index 00000000..ca4978c3 --- /dev/null +++ b/agents/ten_packages/bak/openai_chatgpt_python/requirements.txt @@ -0,0 +1,4 @@ +openai +numpy +requests==2.32.3 +pillow==10.4.0 \ No newline at end of file diff --git a/agents/ten_packages/extension/openai_chatgpt_python/BUILD.gn b/agents/ten_packages/extension/openai_chatgpt_python/BUILD.gn new file mode 100644 index 00000000..23f06108 --- /dev/null +++ b/agents/ten_packages/extension/openai_chatgpt_python/BUILD.gn @@ -0,0 +1,21 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2022-11. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +import("//build/feature/ten_package.gni") + +ten_package("openai_chatgpt_python") { + package_kind = "extension" + + resources = [ + "__init__.py", + "addon.py", + "extension.py", + "log.py", + "manifest.json", + "property.json", + ] +} diff --git a/agents/ten_packages/extension/openai_chatgpt_python/README.md b/agents/ten_packages/extension/openai_chatgpt_python/README.md new file mode 100644 index 00000000..6bb67d8e --- /dev/null +++ b/agents/ten_packages/extension/openai_chatgpt_python/README.md @@ -0,0 +1,29 @@ +# openai_chatgpt_python + + + +## Features + + + +- xxx feature + +## API + +Refer to `api` definition in [manifest.json] and default values in [property.json](property.json). + + + +## Development + +### Build + + + +### Unit test + + + +## Misc + + diff --git a/agents/ten_packages/extension/openai_chatgpt_python/__init__.py b/agents/ten_packages/extension/openai_chatgpt_python/__init__.py index 42c4cd12..09a409ff 100644 --- a/agents/ten_packages/extension/openai_chatgpt_python/__init__.py +++ b/agents/ten_packages/extension/openai_chatgpt_python/__init__.py @@ -1,4 +1,11 @@ -from . import openai_chatgpt_addon +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2024-08. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +from . import addon from .log import logger logger.info("openai_chatgpt_python extension loaded") diff --git a/agents/ten_packages/extension/openai_chatgpt_python/addon.py b/agents/ten_packages/extension/openai_chatgpt_python/addon.py new file mode 100644 index 00000000..ee13b156 --- /dev/null +++ b/agents/ten_packages/extension/openai_chatgpt_python/addon.py @@ -0,0 +1,22 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2024-08. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +from ten import ( + Addon, + register_addon_as_extension, + TenEnv, +) +from .extension import OpenAIChatGPTExtension +from .log import logger + + +@register_addon_as_extension("openai_chatgpt_python") +class OpenAIChatGPTExtensionAddon(Addon): + + def on_create_instance(self, ten_env: TenEnv, name: str, context) -> None: + logger.info("OpenAIChatGPTExtensionAddon on_create_instance") + ten_env.on_create_instance_done(OpenAIChatGPTExtension(name), context) diff --git a/agents/ten_packages/extension/openai_chatgpt_python/extension.py b/agents/ten_packages/extension/openai_chatgpt_python/extension.py new file mode 100644 index 00000000..f67f6afd --- /dev/null +++ b/agents/ten_packages/extension/openai_chatgpt_python/extension.py @@ -0,0 +1,282 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2024-08. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +from base64 import b64encode +from datetime import datetime +from io import BytesIO +import json + +from .helper import get_property_bool, get_property_float, get_property_int, get_property_string +from .openai import OpenAIChatGPT, OpenAIChatGPTConfig +from ten import ( + AudioFrame, + VideoFrame, + Extension, + TenEnv, + Cmd, + StatusCode, + CmdResult, + Data, +) +from PIL import Image +from .log import logger + +CMD_IN_FLUSH = "flush" +CMD_OUT_FLUSH = "flush" +DATA_IN_TEXT_DATA_PROPERTY_TEXT = "text" +DATA_IN_TEXT_DATA_PROPERTY_IS_FINAL = "is_final" +DATA_OUT_TEXT_DATA_PROPERTY_TEXT = "text" +DATA_OUT_TEXT_DATA_PROPERTY_TEXT_END_OF_SEGMENT = "end_of_segment" + +PROPERTY_BASE_URL = "base_url" # Optional +PROPERTY_API_KEY = "api_key" # Required +PROPERTY_MODEL = "model" # Optional +PROPERTY_PROMPT = "prompt" # Optional +PROPERTY_FREQUENCY_PENALTY = "frequency_penalty" # Optional +PROPERTY_PRESENCE_PENALTY = "presence_penalty" # Optional +PROPERTY_TEMPERATURE = "temperature" # Optional +PROPERTY_TOP_P = "top_p" # Optional +PROPERTY_MAX_TOKENS = "max_tokens" # Optional +PROPERTY_GREETING = "greeting" # Optional +PROPERTY_ENABLE_TOOLS = "enable_tools" # Optional +PROPERTY_PROXY_URL = "proxy_url" # Optional +PROPERTY_MAX_MEMORY_LENGTH = "max_memory_length" # Optional +PROPERTY_CHECKING_VISION_TEXT_ITEMS = "checking_vision_text_items" # Optional + +def get_current_time(): + # Get the current time + start_time = datetime.now() + # Get the number of microseconds since the Unix epoch + unix_microseconds = int(start_time.timestamp() * 1_000_000) + return unix_microseconds + + +def is_punctuation(char): + if char in [",", ",", ".", "。", "?", "?", "!", "!"]: + return True + return False + + +def parse_sentence(sentence, content): + remain = "" + found_punc = False + + for char in content: + if not found_punc: + sentence += char + else: + remain += char + + if not found_punc and is_punctuation(char): + found_punc = True + + return sentence, remain, found_punc + + +def rgb2base64jpeg(rgb_data, width, height): + # Convert the RGB image to a PIL Image + pil_image = Image.frombytes("RGBA", (width, height), bytes(rgb_data)) + pil_image = pil_image.convert("RGB") + + # Resize the image while maintaining its aspect ratio + pil_image = resize_image_keep_aspect(pil_image, 320) + + # Save the image to a BytesIO object in JPEG format + buffered = BytesIO() + pil_image.save(buffered, format="JPEG") + # pil_image.save("test.jpg", format="JPEG") + + # Get the byte data of the JPEG image + jpeg_image_data = buffered.getvalue() + + # Convert the JPEG byte data to a Base64 encoded string + base64_encoded_image = b64encode(jpeg_image_data).decode("utf-8") + + # Create the data URL + mime_type = "image/jpeg" + base64_url = f"data:{mime_type};base64,{base64_encoded_image}" + return base64_url + + +def resize_image_keep_aspect(image, max_size=512): + """ + Resize an image while maintaining its aspect ratio, ensuring the larger dimension is max_size. + If both dimensions are smaller than max_size, the image is not resized. + + :param image: A PIL Image object + :param max_size: The maximum size for the larger dimension (width or height) + :return: A PIL Image object (resized or original) + """ + # Get current width and height + width, height = image.size + + # If both dimensions are already smaller than max_size, return the original image + if width <= max_size and height <= max_size: + return image + + # Calculate the aspect ratio + aspect_ratio = width / height + + # Determine the new dimensions + if width > height: + new_width = max_size + new_height = int(max_size / aspect_ratio) + else: + new_height = max_size + new_width = int(max_size * aspect_ratio) + + # Resize the image with the new dimensions + resized_image = image.resize((new_width, new_height)) + + return resized_image + + +class OpenAIChatGPTExtension(Extension): + memory = [] + max_memory_length = 10 + outdate_ts = 0 + openai_chatgpt = None + enable_tools = False + image_data = None + image_width = 0 + image_height = 0 + checking_vision_text_items = [] + + available_tools = [ + { + "type": "function", + "function": { + # ensure you use gpt-4o or later model if you need image recognition, gpt-4o-mini does not work quite well in this case + "name": "get_vision_image", + "description": "Get the image from camera. Call this whenever you need to understand the input camera image like you have vision capability, for example when user asks 'What can you see?' or 'Can you see me?'", + }, + "strict": True, + } + ] + + def on_init(self, ten_env: TenEnv) -> None: + logger.info("on_init") + ten_env.on_init_done() + + def on_start(self, ten_env: TenEnv) -> None: + logger.info("on_start") + + # Prepare configuration + openai_chatgpt_config = OpenAIChatGPTConfig.default_config() + + # Mandatory properties + openai_chatgpt_config.base_url = get_property_string(ten_env, PROPERTY_BASE_URL) + openai_chatgpt_config.api_key = get_property_string(ten_env, PROPERTY_API_KEY) + if not openai_chatgpt_config.api_key: + logger.info(f"API key is missing, exiting on_start") + return + + # Optional properties + openai_chatgpt_config.model = get_property_string(ten_env, PROPERTY_MODEL) + openai_chatgpt_config.prompt = get_property_string(ten_env, PROPERTY_PROMPT) + openai_chatgpt_config.frequency_penalty = get_property_float(ten_env, PROPERTY_FREQUENCY_PENALTY) + openai_chatgpt_config.presence_penalty = get_property_float(ten_env, PROPERTY_PRESENCE_PENALTY) + openai_chatgpt_config.temperature = get_property_float(ten_env, PROPERTY_TEMPERATURE) + openai_chatgpt_config.top_p = get_property_float(ten_env, PROPERTY_TOP_P) + openai_chatgpt_config.max_tokens = get_property_int(ten_env, PROPERTY_MAX_TOKENS) + openai_chatgpt_config.proxy_url = get_property_string(ten_env, PROPERTY_PROXY_URL) + + # Properties that don't affect openai_chatgpt_config + greeting = get_property_string(ten_env, PROPERTY_GREETING) + self.enable_tools = get_property_bool(ten_env, PROPERTY_ENABLE_TOOLS) + self.max_memory_length = get_property_int(ten_env, PROPERTY_MAX_MEMORY_LENGTH) + checking_vision_text_items_str = get_property_string(ten_env, PROPERTY_CHECKING_VISION_TEXT_ITEMS) + if checking_vision_text_items_str: + try: + self.checking_vision_text_items = json.loads(checking_vision_text_items_str) + except Exception as err: + logger.info(f"Error parsing {PROPERTY_CHECKING_VISION_TEXT_ITEMS}: {err}") + + # Create openaiChatGPT instance + try: + self.openai_chatgpt = OpenAIChatGPT(openai_chatgpt_config) + logger.info(f"OpenAIChatGPT initialized with max_tokens: {openai_chatgpt_config.max_tokens}, model: {openai_chatgpt_config.model}") + except Exception as err: + logger.info(f"Failed to initialize OpenAIChatGPT: {err}") + + # Send greeting if available + if greeting: + try: + output_data = Data.create("text_data") + output_data.set_property_string(DATA_OUT_TEXT_DATA_PROPERTY_TEXT, greeting) + output_data.set_property_bool(DATA_OUT_TEXT_DATA_PROPERTY_TEXT_END_OF_SEGMENT, True) + ten_env.send_data(output_data) + logger.info(f"Greeting [{greeting}] sent") + except Exception as err: + logger.info(f"Failed to send greeting [{greeting}]: {err}") + ten_env.on_start_done() + + def on_stop(self, ten_env: TenEnv) -> None: + logger.info("on_stop") + + # TODO: clean up resources + + ten_env.on_stop_done() + + def on_deinit(self, ten_env: TenEnv) -> None: + logger.info("on_deinit") + ten_env.on_deinit_done() + + def on_cmd(self, ten_env: TenEnv, cmd: Cmd) -> None: + logger.info(f"on_cmd json: {cmd.to_json()}") + + cmd_name = cmd.get_name() + + if cmd_name == CMD_IN_FLUSH: + self.outdate_ts = get_current_time() + ten_env.send_cmd(Cmd.create(CMD_OUT_FLUSH), None) + logger.info("on_cmd sent flush") + status_code, detail = StatusCode.OK, "success" + else: + logger.info(f"on_cmd unknown cmd: {cmd_name}") + status_code, detail = StatusCode.ERROR, "unknown cmd" + + cmd_result = CmdResult.create(status_code) + cmd_result.set_property_string("detail", detail) + ten_env.return_result(cmd_result, cmd) + + def on_data(self, ten_env: TenEnv, data: Data) -> None: + # TODO: process data + pass + + def on_audio_frame(self, ten_env: TenEnv, audio_frame: AudioFrame) -> None: + # TODO: process pcm frame + pass + + def on_video_frame(self, ten_env: TenEnv, video_frame: VideoFrame) -> None: + # logger.info(f"OpenAIChatGPTExtension on_video_frame {frame.get_width()} {frame.get_height()}") + self.image_data = video_frame.get_buf() + self.image_width = video_frame.get_width() + self.image_height = video_frame.get_height() + return + pass + + def __append_memory(self, message): + if len(self.memory) > self.max_memory_length: + self.memory.pop(0) + self.memory.append(message) + + def __send_data(self, ten, sentence, end_of_segment, input_text): + try: + output_data = Data.create("text_data") + output_data.set_property_string(DATA_OUT_TEXT_DATA_PROPERTY_TEXT, sentence) + output_data.set_property_bool( + DATA_OUT_TEXT_DATA_PROPERTY_TEXT_END_OF_SEGMENT, end_of_segment + ) + ten.send_data(output_data) + logger.info( + f"for input text: [{input_text}] {'end of segment ' if end_of_segment else ''}sent sentence [{sentence}]" + ) + except Exception as err: + logger.info( + f"for input text: [{input_text}] send sentence [{sentence}] failed, err: {err}" + ) diff --git a/agents/ten_packages/extension/openai_chatgpt_python/helper.py b/agents/ten_packages/extension/openai_chatgpt_python/helper.py new file mode 100644 index 00000000..00bc5d13 --- /dev/null +++ b/agents/ten_packages/extension/openai_chatgpt_python/helper.py @@ -0,0 +1,42 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2024-08. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +from ten.data import Data +from .log import logger + + +def get_property_bool(data: Data, property_name: str) -> bool: + """Helper to get boolean property from data with error handling.""" + try: + return data.get_property_bool(property_name) + except Exception as err: + logger.warn(f"GetProperty {property_name} failed: {err}") + return False + +def get_property_string(data: Data, property_name: str) -> str: + """Helper to get string property from data with error handling.""" + try: + return data.get_property_string(property_name) + except Exception as err: + logger.warn(f"GetProperty {property_name} failed: {err}") + return "" + +def get_property_int(data: Data, property_name: str) -> int: + """Helper to get int property from data with error handling.""" + try: + return data.get_property_int(property_name) + except Exception as err: + logger.warn(f"GetProperty {property_name} failed: {err}") + return 0 + +def get_property_float(data: Data, property_name: str) -> float: + """Helper to get float property from data with error handling.""" + try: + return data.get_property_float(property_name) + except Exception as err: + logger.warn(f"GetProperty {property_name} failed: {err}") + return 0.0 \ No newline at end of file diff --git a/agents/ten_packages/extension/openai_chatgpt_python/log.py b/agents/ten_packages/extension/openai_chatgpt_python/log.py index fa2202da..1813e965 100644 --- a/agents/ten_packages/extension/openai_chatgpt_python/log.py +++ b/agents/ten_packages/extension/openai_chatgpt_python/log.py @@ -1,11 +1,20 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2024-08. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# import logging logger = logging.getLogger("openai_chatgpt_python") logger.setLevel(logging.INFO) -formatter = logging.Formatter( - "%(asctime)s - %(name)s - %(levelname)s - %(process)d - [%(filename)s:%(lineno)d] - %(message)s" +formatter_str = ( + "%(asctime)s - %(name)s - %(levelname)s - %(process)d - " + "[%(filename)s:%(lineno)d] - %(message)s" ) +formatter = logging.Formatter(formatter_str) console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) diff --git a/agents/ten_packages/extension/openai_chatgpt_python/manifest.json b/agents/ten_packages/extension/openai_chatgpt_python/manifest.json index ce872dfe..42f5d47f 100644 --- a/agents/ten_packages/extension/openai_chatgpt_python/manifest.json +++ b/agents/ten_packages/extension/openai_chatgpt_python/manifest.json @@ -1,7 +1,7 @@ { "type": "extension", "name": "openai_chatgpt_python", - "version": "0.4.0", + "version": "0.1.0", "dependencies": [ { "type": "system", @@ -9,85 +9,15 @@ "version": "0.2" } ], - "api": { - "property": { - "api_key": { - "type": "string" - }, - "frequency_penalty": { - "type": "float64" - }, - "presence_penalty": { - "type": "float64" - }, - "temperature": { - "type": "float64" - }, - "top_p": { - "type": "float64" - }, - "model": { - "type": "string" - }, - "max_tokens": { - "type": "int64" - }, - "base_url": { - "type": "string" - }, - "prompt": { - "type": "string" - }, - "greeting": { - "type": "string" - }, - "checking_vision_text_items": { - "type": "string" - }, - "proxy_url": { - "type": "string" - }, - "max_memory_length": { - "type": "int64" - }, - "enable_tools": { - "type": "bool" - } - }, - "data_in": [ - { - "name": "text_data", - "property": { - "text": { - "type": "string" - } - } - } - ], - "data_out": [ - { - "name": "text_data", - "property": { - "text": { - "type": "string" - } - } - } - ], - "cmd_in": [ - { - "name": "flush" - } - ], - "cmd_out": [ - { - "name": "flush" - } - ], - "video_frame_in": [ - { - "name": "video_frame" - } + "package": { + "include": [ + "manifest.json", + "property.json", + "BUILD.gn", + "**.tent", + "**.py", + "README.md" ] - } + }, + "api": {} } \ No newline at end of file diff --git a/agents/ten_packages/extension/openai_chatgpt_python/openai.py b/agents/ten_packages/extension/openai_chatgpt_python/openai.py new file mode 100644 index 00000000..7f2a659c --- /dev/null +++ b/agents/ten_packages/extension/openai_chatgpt_python/openai.py @@ -0,0 +1,92 @@ +import random +import requests +from openai import OpenAI +from typing import List, Dict, Any, Optional +from .log import logger + + +class OpenAIChatGPTConfig: + def __init__(self, + base_url: str, + api_key: str, + model: str, + prompt: str, + frequency_penalty: float, + presence_penalty: float, + top_p: float, + temperature: float, + max_tokens: int, + seed: Optional[int] = None, + proxy_url: Optional[str] = None): + self.base_url = base_url + self.api_key = api_key + self.model = model + self.prompt = prompt + self.frequency_penalty = frequency_penalty + self.presence_penalty = presence_penalty + self.top_p = top_p + self.temperature = temperature + self.max_tokens = max_tokens + self.seed = seed if seed is not None else random.randint(0, 10000) + self.proxy_url = proxy_url + + @classmethod + def default_config(cls): + return cls( + base_url="https://api.openai.com/v1", + api_key="", + model="gpt-4", # Adjust this to match the equivalent of `openai.GPT4o` in the Python library + prompt="You are a voice assistant who talks in a conversational way and can chat with me like my friends. I will speak to you in English or Chinese, and you will answer in the corrected and improved version of my text with the language I use. Don’t talk like a robot, instead I would like you to talk like a real human with emotions. I will use your answer for text-to-speech, so don’t return me any meaningless characters. I want you to be helpful, when I’m asking you for advice, give me precise, practical and useful advice instead of being vague. When giving me a list of options, express the options in a narrative way instead of bullet points.", + frequency_penalty=0.9, + presence_penalty=0.9, + top_p=1.0, + temperature=0.1, + max_tokens=512, + seed=random.randint(0, 10000), + proxy_url="" + ) + + +class OpenAIChatGPT: + client = None + def __init__(self, config: OpenAIChatGPTConfig): + self.config = config + logger.info(f"OpenAIChatGPT initialized with config: {config.api_key}") + self.client = OpenAI( + api_key=config.api_key, + base_url=config.base_url + ) + self.session = requests.Session() + if config.proxy_url: + proxies = { + "http": config.proxy_url, + "https": config.proxy_url, + } + self.session.proxies.update(proxies) + self.client.session = self.session + + def get_chat_completions_stream(self, messages, tools = None): + req = { + "model": self.config.model, + "messages": [ + { + "role": "system", + "content": self.config.prompt, + }, + *messages, + ], + "tools": tools, + "temperature": self.config.temperature, + "top_p": self.config.top_p, + "presence_penalty": self.config.presence_penalty, + "frequency_penalty": self.config.frequency_penalty, + "max_tokens": self.config.max_tokens, + "seed": self.config.seed, + "stream": True, + } + + try: + response = self.client.chat.completions.create(**req) + return response + except Exception as e: + raise Exception(f"CreateChatCompletionStream failed, err: {e}") \ No newline at end of file diff --git a/agents/ten_packages/extension/openai_chatgpt_python/requirements.txt b/agents/ten_packages/extension/openai_chatgpt_python/requirements.txt index ca4978c3..51cdd053 100644 --- a/agents/ten_packages/extension/openai_chatgpt_python/requirements.txt +++ b/agents/ten_packages/extension/openai_chatgpt_python/requirements.txt @@ -1,4 +1,4 @@ openai numpy -requests==2.32.3 -pillow==10.4.0 \ No newline at end of file +requests +pillow \ No newline at end of file From 968ff208a8ff5ab915fda7f4652af3002c3cdd4d Mon Sep 17 00:00:00 2001 From: Zhang Qianze Date: Wed, 11 Sep 2024 02:33:35 +0800 Subject: [PATCH 02/13] feat: adding refactor code / async.io --- .../openai_chatgpt_python/requirements.txt | 5 +- .../openai_chatgpt_python/extension.py | 206 +++++++++--------- .../extension/openai_chatgpt_python/helper.py | 94 +++++++- 3 files changed, 204 insertions(+), 101 deletions(-) diff --git a/agents/ten_packages/bak/openai_chatgpt_python/requirements.txt b/agents/ten_packages/bak/openai_chatgpt_python/requirements.txt index ca4978c3..5ddef5be 100644 --- a/agents/ten_packages/bak/openai_chatgpt_python/requirements.txt +++ b/agents/ten_packages/bak/openai_chatgpt_python/requirements.txt @@ -1,4 +1,5 @@ openai numpy -requests==2.32.3 -pillow==10.4.0 \ No newline at end of file +requests +pillow +asyncio \ No newline at end of file diff --git a/agents/ten_packages/extension/openai_chatgpt_python/extension.py b/agents/ten_packages/extension/openai_chatgpt_python/extension.py index f67f6afd..2eeda701 100644 --- a/agents/ten_packages/extension/openai_chatgpt_python/extension.py +++ b/agents/ten_packages/extension/openai_chatgpt_python/extension.py @@ -5,12 +5,12 @@ # Copyright (c) 2024 Agora IO. All rights reserved. # # -from base64 import b64encode -from datetime import datetime -from io import BytesIO +import asyncio import json +import random +import traceback -from .helper import get_property_bool, get_property_float, get_property_int, get_property_string +from .helper import get_current_time, get_property_bool, get_property_float, get_property_int, get_property_string, parse_sentence, rgb2base64jpeg from .openai import OpenAIChatGPT, OpenAIChatGPTConfig from ten import ( AudioFrame, @@ -22,7 +22,6 @@ CmdResult, Data, ) -from PIL import Image from .log import logger CMD_IN_FLUSH = "flush" @@ -47,93 +46,6 @@ PROPERTY_MAX_MEMORY_LENGTH = "max_memory_length" # Optional PROPERTY_CHECKING_VISION_TEXT_ITEMS = "checking_vision_text_items" # Optional -def get_current_time(): - # Get the current time - start_time = datetime.now() - # Get the number of microseconds since the Unix epoch - unix_microseconds = int(start_time.timestamp() * 1_000_000) - return unix_microseconds - - -def is_punctuation(char): - if char in [",", ",", ".", "。", "?", "?", "!", "!"]: - return True - return False - - -def parse_sentence(sentence, content): - remain = "" - found_punc = False - - for char in content: - if not found_punc: - sentence += char - else: - remain += char - - if not found_punc and is_punctuation(char): - found_punc = True - - return sentence, remain, found_punc - - -def rgb2base64jpeg(rgb_data, width, height): - # Convert the RGB image to a PIL Image - pil_image = Image.frombytes("RGBA", (width, height), bytes(rgb_data)) - pil_image = pil_image.convert("RGB") - - # Resize the image while maintaining its aspect ratio - pil_image = resize_image_keep_aspect(pil_image, 320) - - # Save the image to a BytesIO object in JPEG format - buffered = BytesIO() - pil_image.save(buffered, format="JPEG") - # pil_image.save("test.jpg", format="JPEG") - - # Get the byte data of the JPEG image - jpeg_image_data = buffered.getvalue() - - # Convert the JPEG byte data to a Base64 encoded string - base64_encoded_image = b64encode(jpeg_image_data).decode("utf-8") - - # Create the data URL - mime_type = "image/jpeg" - base64_url = f"data:{mime_type};base64,{base64_encoded_image}" - return base64_url - - -def resize_image_keep_aspect(image, max_size=512): - """ - Resize an image while maintaining its aspect ratio, ensuring the larger dimension is max_size. - If both dimensions are smaller than max_size, the image is not resized. - - :param image: A PIL Image object - :param max_size: The maximum size for the larger dimension (width or height) - :return: A PIL Image object (resized or original) - """ - # Get current width and height - width, height = image.size - - # If both dimensions are already smaller than max_size, return the original image - if width <= max_size and height <= max_size: - return image - - # Calculate the aspect ratio - aspect_ratio = width / height - - # Determine the new dimensions - if width > height: - new_width = max_size - new_height = int(max_size / aspect_ratio) - else: - new_height = max_size - new_width = int(max_size * aspect_ratio) - - # Resize the image with the new dimensions - resized_image = image.resize((new_width, new_height)) - - return resized_image - class OpenAIChatGPTExtension(Extension): memory = [] @@ -245,8 +157,22 @@ def on_cmd(self, ten_env: TenEnv, cmd: Cmd) -> None: ten_env.return_result(cmd_result, cmd) def on_data(self, ten_env: TenEnv, data: Data) -> None: - # TODO: process data - pass + # Get the necessary properties + is_final = get_property_bool(data, DATA_IN_TEXT_DATA_PROPERTY_IS_FINAL) + input_text = get_property_string(data, DATA_IN_TEXT_DATA_PROPERTY_TEXT) + + if not is_final: + logger.info("ignore non-final input") + return + if not input_text: + logger.info("ignore empty text") + return + + logger.info(f"OnData input text: [{input_text}]") + + # Start an asynchronous task for handling chat completion + start_time = get_current_time() + asyncio.create_task(self.__async_chat_completion(ten_env, start_time, input_text, self.memory)) def on_audio_frame(self, ten_env: TenEnv, audio_frame: AudioFrame) -> None: # TODO: process pcm frame @@ -258,21 +184,20 @@ def on_video_frame(self, ten_env: TenEnv, video_frame: VideoFrame) -> None: self.image_width = video_frame.get_width() self.image_height = video_frame.get_height() return - pass - def __append_memory(self, message): + def __append_memory(self, message:str): if len(self.memory) > self.max_memory_length: self.memory.pop(0) self.memory.append(message) - def __send_data(self, ten, sentence, end_of_segment, input_text): + def __send_data(self, ten_env: TenEnv, sentence: str, end_of_segment: bool, input_text: str): try: output_data = Data.create("text_data") output_data.set_property_string(DATA_OUT_TEXT_DATA_PROPERTY_TEXT, sentence) output_data.set_property_bool( DATA_OUT_TEXT_DATA_PROPERTY_TEXT_END_OF_SEGMENT, end_of_segment ) - ten.send_data(output_data) + ten_env.send_data(output_data) logger.info( f"for input text: [{input_text}] {'end of segment ' if end_of_segment else ''}sent sentence [{sentence}]" ) @@ -280,3 +205,88 @@ def __send_data(self, ten, sentence, end_of_segment, input_text): logger.info( f"for input text: [{input_text}] send sentence [{sentence}] failed, err: {err}" ) + + async def __async_chat_completion(self, ten: TenEnv, start_time, input_text, memory): + """Async chat completion task to be run from on_data.""" + try: + logger.info(f"for input text: [{input_text}] memory: {memory}") + message = {"role": "user", "content": input_text} + tools = self.available_tools if self.enable_tools else None + + # Make an async API call to get chat completions + resp = await self.openai_chatgpt.get_chat_completions_stream(memory + [message], tools) + if not resp: + logger.error(f"get_chat_completions_stream Response is None: {input_text}") + return + + # Process the completions asynchronously + await self.__process_completions(resp, ten, start_time, input_text, memory) + except Exception as e: + logger.error(f"Error in chat_completion: {traceback.format_exc()} for input text: {input_text}") + + async def __async_chat_completion_with_vision(self, ten: TenEnv, start_time, input_text, memory): + """Handles chat completion when a vision-based tool call is invoked.""" + try: + logger.info(f"for input text: [{input_text}] memory: {memory}") + + # Prepare the message with vision content + message = {"role": "user", "content": input_text} + if self.image_data is not None: + url = rgb2base64jpeg(self.image_data, self.image_width, self.image_height) + message = { + "role": "user", + "content": [ + {"type": "text", "text": input_text}, + {"type": "image_url", "image_url": {"url": url}}, + ], + } + logger.info(f"msg with vision data: {message}") + + # Asynchronous request to OpenAI for chat completions with vision + resp = await self.openai_chatgpt.get_chat_completions_stream(memory + [message]) + if not resp: + logger.error(f"get_chat_completions_stream Response is None for input text: {input_text}") + return + + # Process completions asynchronously + await self.__process_completions(resp, ten, start_time, input_text, memory) + + except Exception as e: + logger.error(f"Error in chat_completion_with_vision: {str(e)} for input text: {input_text}") + + + async def __process_completions(self, chat_completions, ten, start_time, input_text, memory): + """Processes completions and sends them asynchronously.""" + full_content = "" + first_sentence_sent = False + + for chat_completion in chat_completions: + if start_time < self.outdate_ts: + logger.info(f"recv interrupt for input text: [{input_text}]") + break + + content = chat_completion.choices[0].delta.content if len(chat_completion.choices) > 0 and chat_completion.choices[0].delta.content else "" + + if chat_completion.choices[0].delta.tool_calls: + for tool_call in chat_completion.choices[0].delta.tool_calls: + logger.info(f"tool_call: {tool_call}") + if tool_call.function.name == "get_vision_image": + if not full_content and self.checking_vision_text_items: + await self.__send_data(ten, random.choice(self.checking_vision_text_items), True, input_text) + await self.__async_chat_completion_with_vision(ten, start_time, input_text, memory) + return + + full_content += content + + sentence, content, sentence_is_final = "", content, False + while sentence_is_final: + sentence, content, sentence_is_final = parse_sentence(sentence, content) + logger.info(f"recv for input text: [{input_text}] got sentence: [{sentence}]") + await self.__send_data(ten, sentence, False, input_text) + if not first_sentence_sent: + first_sentence_sent = True + logger.info(f"first sentence latency: {get_current_time() - start_time}ms") + + self.__append_memory({"role": "user", "content": input_text}) + self.__append_memory({"role": "assistant", "content": full_content}) + await self.__send_data(ten, "", True, input_text) \ No newline at end of file diff --git a/agents/ten_packages/extension/openai_chatgpt_python/helper.py b/agents/ten_packages/extension/openai_chatgpt_python/helper.py index 00bc5d13..cc38ae0a 100644 --- a/agents/ten_packages/extension/openai_chatgpt_python/helper.py +++ b/agents/ten_packages/extension/openai_chatgpt_python/helper.py @@ -7,6 +7,10 @@ # from ten.data import Data from .log import logger +from PIL import Image +from datetime import datetime +from io import BytesIO +from base64 import b64encode def get_property_bool(data: Data, property_name: str) -> bool: @@ -39,4 +43,92 @@ def get_property_float(data: Data, property_name: str) -> float: return data.get_property_float(property_name) except Exception as err: logger.warn(f"GetProperty {property_name} failed: {err}") - return 0.0 \ No newline at end of file + return 0.0 + + +def get_current_time(): + # Get the current time + start_time = datetime.now() + # Get the number of microseconds since the Unix epoch + unix_microseconds = int(start_time.timestamp() * 1_000_000) + return unix_microseconds + + +def is_punctuation(char): + if char in [",", ",", ".", "。", "?", "?", "!", "!"]: + return True + return False + + +def parse_sentence(sentence, content): + remain = "" + found_punc = False + + for char in content: + if not found_punc: + sentence += char + else: + remain += char + + if not found_punc and is_punctuation(char): + found_punc = True + + return sentence, remain, found_punc + + +def rgb2base64jpeg(rgb_data, width, height): + # Convert the RGB image to a PIL Image + pil_image = Image.frombytes("RGBA", (width, height), bytes(rgb_data)) + pil_image = pil_image.convert("RGB") + + # Resize the image while maintaining its aspect ratio + pil_image = resize_image_keep_aspect(pil_image, 320) + + # Save the image to a BytesIO object in JPEG format + buffered = BytesIO() + pil_image.save(buffered, format="JPEG") + # pil_image.save("test.jpg", format="JPEG") + + # Get the byte data of the JPEG image + jpeg_image_data = buffered.getvalue() + + # Convert the JPEG byte data to a Base64 encoded string + base64_encoded_image = b64encode(jpeg_image_data).decode("utf-8") + + # Create the data URL + mime_type = "image/jpeg" + base64_url = f"data:{mime_type};base64,{base64_encoded_image}" + return base64_url + + +def resize_image_keep_aspect(image, max_size=512): + """ + Resize an image while maintaining its aspect ratio, ensuring the larger dimension is max_size. + If both dimensions are smaller than max_size, the image is not resized. + + :param image: A PIL Image object + :param max_size: The maximum size for the larger dimension (width or height) + :return: A PIL Image object (resized or original) + """ + # Get current width and height + width, height = image.size + + # If both dimensions are already smaller than max_size, return the original image + if width <= max_size and height <= max_size: + return image + + # Calculate the aspect ratio + aspect_ratio = width / height + + # Determine the new dimensions + if width > height: + new_width = max_size + new_height = int(max_size / aspect_ratio) + else: + new_height = max_size + new_width = int(max_size * aspect_ratio) + + # Resize the image with the new dimensions + resized_image = image.resize((new_width, new_height)) + + return resized_image \ No newline at end of file From b70ad6a92c58b57d685de39ae916b69c747445af Mon Sep 17 00:00:00 2001 From: zhangqianze Date: Wed, 11 Sep 2024 23:05:03 +0800 Subject: [PATCH 03/13] feat: fix refactoring bugs --- .../openai_chatgpt_python/extension.py | 38 ++++++++++++------- .../extension/openai_chatgpt_python/openai.py | 8 ++-- 2 files changed, 28 insertions(+), 18 deletions(-) diff --git a/agents/ten_packages/extension/openai_chatgpt_python/extension.py b/agents/ten_packages/extension/openai_chatgpt_python/extension.py index 2eeda701..f9bbf6a7 100644 --- a/agents/ten_packages/extension/openai_chatgpt_python/extension.py +++ b/agents/ten_packages/extension/openai_chatgpt_python/extension.py @@ -8,6 +8,7 @@ import asyncio import json import random +import threading import traceback from .helper import get_current_time, get_property_bool, get_property_float, get_property_int, get_property_string, parse_sentence, rgb2base64jpeg @@ -46,6 +47,8 @@ PROPERTY_MAX_MEMORY_LENGTH = "max_memory_length" # Optional PROPERTY_CHECKING_VISION_TEXT_ITEMS = "checking_vision_text_items" # Optional +async def test_task(): + logger.info("Test task is running") class OpenAIChatGPTExtension(Extension): memory = [] @@ -57,6 +60,7 @@ class OpenAIChatGPTExtension(Extension): image_width = 0 image_height = 0 checking_vision_text_items = [] + loop = None available_tools = [ { @@ -77,25 +81,31 @@ def on_init(self, ten_env: TenEnv) -> None: def on_start(self, ten_env: TenEnv) -> None: logger.info("on_start") + self.loop = asyncio.new_event_loop() + def start_loop(): + asyncio.set_event_loop(self.loop) + self.loop.run_forever() + threading.Thread(target=start_loop, args=[]).start() + # Prepare configuration openai_chatgpt_config = OpenAIChatGPTConfig.default_config() # Mandatory properties - openai_chatgpt_config.base_url = get_property_string(ten_env, PROPERTY_BASE_URL) + openai_chatgpt_config.base_url = get_property_string(ten_env, PROPERTY_BASE_URL) or openai_chatgpt_config.base_url openai_chatgpt_config.api_key = get_property_string(ten_env, PROPERTY_API_KEY) if not openai_chatgpt_config.api_key: logger.info(f"API key is missing, exiting on_start") return # Optional properties - openai_chatgpt_config.model = get_property_string(ten_env, PROPERTY_MODEL) - openai_chatgpt_config.prompt = get_property_string(ten_env, PROPERTY_PROMPT) - openai_chatgpt_config.frequency_penalty = get_property_float(ten_env, PROPERTY_FREQUENCY_PENALTY) - openai_chatgpt_config.presence_penalty = get_property_float(ten_env, PROPERTY_PRESENCE_PENALTY) - openai_chatgpt_config.temperature = get_property_float(ten_env, PROPERTY_TEMPERATURE) - openai_chatgpt_config.top_p = get_property_float(ten_env, PROPERTY_TOP_P) - openai_chatgpt_config.max_tokens = get_property_int(ten_env, PROPERTY_MAX_TOKENS) - openai_chatgpt_config.proxy_url = get_property_string(ten_env, PROPERTY_PROXY_URL) + openai_chatgpt_config.model = get_property_string(ten_env, PROPERTY_MODEL) or openai_chatgpt_config.model + openai_chatgpt_config.prompt = get_property_string(ten_env, PROPERTY_PROMPT) or openai_chatgpt_config.prompt + openai_chatgpt_config.frequency_penalty = get_property_float(ten_env, PROPERTY_FREQUENCY_PENALTY) or openai_chatgpt_config.frequency_penalty + openai_chatgpt_config.presence_penalty = get_property_float(ten_env, PROPERTY_PRESENCE_PENALTY) or openai_chatgpt_config.presence_penalty + openai_chatgpt_config.temperature = get_property_float(ten_env, PROPERTY_TEMPERATURE) or openai_chatgpt_config.temperature + openai_chatgpt_config.top_p = get_property_float(ten_env, PROPERTY_TOP_P) or openai_chatgpt_config.top_p + openai_chatgpt_config.max_tokens = get_property_int(ten_env, PROPERTY_MAX_TOKENS) or openai_chatgpt_config.max_tokens + openai_chatgpt_config.proxy_url = get_property_string(ten_env, PROPERTY_PROXY_URL) or openai_chatgpt_config.proxy_url # Properties that don't affect openai_chatgpt_config greeting = get_property_string(ten_env, PROPERTY_GREETING) @@ -108,10 +118,10 @@ def on_start(self, ten_env: TenEnv) -> None: except Exception as err: logger.info(f"Error parsing {PROPERTY_CHECKING_VISION_TEXT_ITEMS}: {err}") - # Create openaiChatGPT instance + # Create instance try: self.openai_chatgpt = OpenAIChatGPT(openai_chatgpt_config) - logger.info(f"OpenAIChatGPT initialized with max_tokens: {openai_chatgpt_config.max_tokens}, model: {openai_chatgpt_config.model}") + logger.info(f"initialized with max_tokens: {openai_chatgpt_config.max_tokens}, model: {openai_chatgpt_config.model}") except Exception as err: logger.info(f"Failed to initialize OpenAIChatGPT: {err}") @@ -172,7 +182,7 @@ def on_data(self, ten_env: TenEnv, data: Data) -> None: # Start an asynchronous task for handling chat completion start_time = get_current_time() - asyncio.create_task(self.__async_chat_completion(ten_env, start_time, input_text, self.memory)) + asyncio.run_coroutine_threadsafe(self.__async_chat_completion(ten_env, start_time, input_text, self.memory), self.loop) def on_audio_frame(self, ten_env: TenEnv, audio_frame: AudioFrame) -> None: # TODO: process pcm frame @@ -260,7 +270,7 @@ async def __process_completions(self, chat_completions, ten, start_time, input_t full_content = "" first_sentence_sent = False - for chat_completion in chat_completions: + async for chat_completion in chat_completions: if start_time < self.outdate_ts: logger.info(f"recv interrupt for input text: [{input_text}]") break @@ -289,4 +299,4 @@ async def __process_completions(self, chat_completions, ten, start_time, input_t self.__append_memory({"role": "user", "content": input_text}) self.__append_memory({"role": "assistant", "content": full_content}) - await self.__send_data(ten, "", True, input_text) \ No newline at end of file + self.__send_data(ten, full_content, True, input_text) \ No newline at end of file diff --git a/agents/ten_packages/extension/openai_chatgpt_python/openai.py b/agents/ten_packages/extension/openai_chatgpt_python/openai.py index 7f2a659c..11df4072 100644 --- a/agents/ten_packages/extension/openai_chatgpt_python/openai.py +++ b/agents/ten_packages/extension/openai_chatgpt_python/openai.py @@ -1,6 +1,6 @@ import random import requests -from openai import OpenAI +from openai import AsyncOpenAI from typing import List, Dict, Any, Optional from .log import logger @@ -52,7 +52,7 @@ class OpenAIChatGPT: def __init__(self, config: OpenAIChatGPTConfig): self.config = config logger.info(f"OpenAIChatGPT initialized with config: {config.api_key}") - self.client = OpenAI( + self.client = AsyncOpenAI( api_key=config.api_key, base_url=config.base_url ) @@ -65,7 +65,7 @@ def __init__(self, config: OpenAIChatGPTConfig): self.session.proxies.update(proxies) self.client.session = self.session - def get_chat_completions_stream(self, messages, tools = None): + async def get_chat_completions_stream(self, messages, tools = None): req = { "model": self.config.model, "messages": [ @@ -86,7 +86,7 @@ def get_chat_completions_stream(self, messages, tools = None): } try: - response = self.client.chat.completions.create(**req) + response = await self.client.chat.completions.create(**req) return response except Exception as e: raise Exception(f"CreateChatCompletionStream failed, err: {e}") \ No newline at end of file From c7019c922e86da9f486929f54d9bc17625c17818 Mon Sep 17 00:00:00 2001 From: zhangqianze Date: Wed, 11 Sep 2024 23:07:06 +0800 Subject: [PATCH 04/13] fix: add manifest.json --- .../openai_chatgpt_python/manifest.json | 82 ++++++++++++++++++- 1 file changed, 81 insertions(+), 1 deletion(-) diff --git a/agents/ten_packages/extension/openai_chatgpt_python/manifest.json b/agents/ten_packages/extension/openai_chatgpt_python/manifest.json index 42f5d47f..4a74e306 100644 --- a/agents/ten_packages/extension/openai_chatgpt_python/manifest.json +++ b/agents/ten_packages/extension/openai_chatgpt_python/manifest.json @@ -19,5 +19,85 @@ "README.md" ] }, - "api": {} + "api": { + "property": { + "api_key": { + "type": "string" + }, + "frequency_penalty": { + "type": "float64" + }, + "presence_penalty": { + "type": "float64" + }, + "temperature": { + "type": "float64" + }, + "top_p": { + "type": "float64" + }, + "model": { + "type": "string" + }, + "max_tokens": { + "type": "int64" + }, + "base_url": { + "type": "string" + }, + "prompt": { + "type": "string" + }, + "greeting": { + "type": "string" + }, + "checking_vision_text_items": { + "type": "string" + }, + "proxy_url": { + "type": "string" + }, + "max_memory_length": { + "type": "int64" + }, + "enable_tools": { + "type": "bool" + } + }, + "data_in": [ + { + "name": "text_data", + "property": { + "text": { + "type": "string" + } + } + } + ], + "data_out": [ + { + "name": "text_data", + "property": { + "text": { + "type": "string" + } + } + } + ], + "cmd_in": [ + { + "name": "flush" + } + ], + "cmd_out": [ + { + "name": "flush" + } + ], + "video_frame_in": [ + { + "name": "video_frame" + } + ] + } } \ No newline at end of file From 2ee7058a95cea0a19b6e3d45341834b7776a9c2e Mon Sep 17 00:00:00 2001 From: zhangqianze Date: Thu, 12 Sep 2024 20:59:11 +0800 Subject: [PATCH 05/13] feat: add queue logic --- .../openai_chatgpt_python/extension.py | 103 +++++++++++------- 1 file changed, 63 insertions(+), 40 deletions(-) diff --git a/agents/ten_packages/extension/openai_chatgpt_python/extension.py b/agents/ten_packages/extension/openai_chatgpt_python/extension.py index f9bbf6a7..6a5d3afe 100644 --- a/agents/ten_packages/extension/openai_chatgpt_python/extension.py +++ b/agents/ten_packages/extension/openai_chatgpt_python/extension.py @@ -53,7 +53,6 @@ async def test_task(): class OpenAIChatGPTExtension(Extension): memory = [] max_memory_length = 10 - outdate_ts = 0 openai_chatgpt = None enable_tools = False image_data = None @@ -61,6 +60,10 @@ class OpenAIChatGPTExtension(Extension): image_height = 0 checking_vision_text_items = [] loop = None + ten_env = None + + # Create the queue for message processing + queue = asyncio.Queue() available_tools = [ { @@ -87,6 +90,8 @@ def start_loop(): self.loop.run_forever() threading.Thread(target=start_loop, args=[]).start() + self.loop.create_task(self._process_queue(ten_env)) + # Prepare configuration openai_chatgpt_config = OpenAIChatGPTConfig.default_config() @@ -154,7 +159,7 @@ def on_cmd(self, ten_env: TenEnv, cmd: Cmd) -> None: cmd_name = cmd.get_name() if cmd_name == CMD_IN_FLUSH: - self.outdate_ts = get_current_time() + asyncio.run_coroutine_threadsafe(self._flush_queue(), self.loop) ten_env.send_cmd(Cmd.create(CMD_OUT_FLUSH), None) logger.info("on_cmd sent flush") status_code, detail = StatusCode.OK, "success" @@ -181,8 +186,7 @@ def on_data(self, ten_env: TenEnv, data: Data) -> None: logger.info(f"OnData input text: [{input_text}]") # Start an asynchronous task for handling chat completion - start_time = get_current_time() - asyncio.run_coroutine_threadsafe(self.__async_chat_completion(ten_env, start_time, input_text, self.memory), self.loop) + asyncio.run_coroutine_threadsafe(self.queue.put(input_text), self.loop) def on_audio_frame(self, ten_env: TenEnv, audio_frame: AudioFrame) -> None: # TODO: process pcm frame @@ -195,28 +199,30 @@ def on_video_frame(self, ten_env: TenEnv, video_frame: VideoFrame) -> None: self.image_height = video_frame.get_height() return - def __append_memory(self, message:str): - if len(self.memory) > self.max_memory_length: - self.memory.pop(0) - self.memory.append(message) - - def __send_data(self, ten_env: TenEnv, sentence: str, end_of_segment: bool, input_text: str): - try: - output_data = Data.create("text_data") - output_data.set_property_string(DATA_OUT_TEXT_DATA_PROPERTY_TEXT, sentence) - output_data.set_property_bool( - DATA_OUT_TEXT_DATA_PROPERTY_TEXT_END_OF_SEGMENT, end_of_segment - ) - ten_env.send_data(output_data) - logger.info( - f"for input text: [{input_text}] {'end of segment ' if end_of_segment else ''}sent sentence [{sentence}]" - ) - except Exception as err: - logger.info( - f"for input text: [{input_text}] send sentence [{sentence}] failed, err: {err}" - ) - - async def __async_chat_completion(self, ten: TenEnv, start_time, input_text, memory): + async def _process_queue(self, ten_env: TenEnv): + """Asynchronously process queue items one by one.""" + while True: + # Wait for an item to be available in the queue + message = await self.queue.get() + try: + # Create a new task for the new message + start_time = get_current_time() + self.current_task = asyncio.create_task(self._async_chat_completion(ten_env, start_time, message, self.memory)) + await self.current_task # Wait for the current task to finish or be cancelled + except asyncio.CancelledError: + logger.info(f"Task cancelled: {message}") + finally: + self.queue.task_done() + + async def _flush_queue(self): + """Flushes the self.queue and cancels the current task.""" + while not self.queue.empty(): + self.queue.get_nowait() + self.queue.task_done() + if self.current_task: + self.current_task.cancel() + + async def _async_chat_completion(self, ten: TenEnv, start_time, input_text, memory): """Async chat completion task to be run from on_data.""" try: logger.info(f"for input text: [{input_text}] memory: {memory}") @@ -230,11 +236,11 @@ async def __async_chat_completion(self, ten: TenEnv, start_time, input_text, mem return # Process the completions asynchronously - await self.__process_completions(resp, ten, start_time, input_text, memory) + await self._process_completions(resp, ten, start_time, input_text, memory) except Exception as e: logger.error(f"Error in chat_completion: {traceback.format_exc()} for input text: {input_text}") - async def __async_chat_completion_with_vision(self, ten: TenEnv, start_time, input_text, memory): + async def _async_chat_completion_with_vision(self, ten: TenEnv, start_time, input_text, memory): """Handles chat completion when a vision-based tool call is invoked.""" try: logger.info(f"for input text: [{input_text}] memory: {memory}") @@ -259,22 +265,18 @@ async def __async_chat_completion_with_vision(self, ten: TenEnv, start_time, inp return # Process completions asynchronously - await self.__process_completions(resp, ten, start_time, input_text, memory) + await self._process_completions(resp, ten, start_time, input_text, memory) except Exception as e: logger.error(f"Error in chat_completion_with_vision: {str(e)} for input text: {input_text}") - async def __process_completions(self, chat_completions, ten, start_time, input_text, memory): + async def _process_completions(self, chat_completions, ten, start_time, input_text, memory): """Processes completions and sends them asynchronously.""" full_content = "" first_sentence_sent = False async for chat_completion in chat_completions: - if start_time < self.outdate_ts: - logger.info(f"recv interrupt for input text: [{input_text}]") - break - content = chat_completion.choices[0].delta.content if len(chat_completion.choices) > 0 and chat_completion.choices[0].delta.content else "" if chat_completion.choices[0].delta.tool_calls: @@ -282,8 +284,8 @@ async def __process_completions(self, chat_completions, ten, start_time, input_t logger.info(f"tool_call: {tool_call}") if tool_call.function.name == "get_vision_image": if not full_content and self.checking_vision_text_items: - await self.__send_data(ten, random.choice(self.checking_vision_text_items), True, input_text) - await self.__async_chat_completion_with_vision(ten, start_time, input_text, memory) + await self._send_data(ten, random.choice(self.checking_vision_text_items), True, input_text) + await self._async_chat_completion_with_vision(ten, start_time, input_text, memory) return full_content += content @@ -292,11 +294,32 @@ async def __process_completions(self, chat_completions, ten, start_time, input_t while sentence_is_final: sentence, content, sentence_is_final = parse_sentence(sentence, content) logger.info(f"recv for input text: [{input_text}] got sentence: [{sentence}]") - await self.__send_data(ten, sentence, False, input_text) + await self._send_data(ten, sentence, False, input_text) if not first_sentence_sent: first_sentence_sent = True logger.info(f"first sentence latency: {get_current_time() - start_time}ms") - self.__append_memory({"role": "user", "content": input_text}) - self.__append_memory({"role": "assistant", "content": full_content}) - self.__send_data(ten, full_content, True, input_text) \ No newline at end of file + self._append_memory({"role": "user", "content": input_text}) + self._append_memory({"role": "assistant", "content": full_content}) + self._send_data(ten, full_content, True, input_text) + + def _append_memory(self, message:str): + if len(self.memory) > self.max_memory_length: + self.memory.pop(0) + self.memory.append(message) + + def _send_data(self, ten_env: TenEnv, sentence: str, end_of_segment: bool, input_text: str): + try: + output_data = Data.create("text_data") + output_data.set_property_string(DATA_OUT_TEXT_DATA_PROPERTY_TEXT, sentence) + output_data.set_property_bool( + DATA_OUT_TEXT_DATA_PROPERTY_TEXT_END_OF_SEGMENT, end_of_segment + ) + ten_env.send_data(output_data) + logger.info( + f"for input text: [{input_text}] {'end of segment ' if end_of_segment else ''}sent sentence [{sentence}]" + ) + except Exception as err: + logger.info( + f"for input text: [{input_text}] send sentence [{sentence}] failed, err: {err}" + ) \ No newline at end of file From eca0d2c2a50493e0e1c5e2c9516e09528592a78a Mon Sep 17 00:00:00 2001 From: zhangqianze Date: Thu, 12 Sep 2024 21:01:23 +0800 Subject: [PATCH 06/13] fix: fix issues - remove test code - prevent sending full content again - add queue logic --- .../ten_packages/extension/openai_chatgpt_python/extension.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/agents/ten_packages/extension/openai_chatgpt_python/extension.py b/agents/ten_packages/extension/openai_chatgpt_python/extension.py index 6a5d3afe..1746e0c3 100644 --- a/agents/ten_packages/extension/openai_chatgpt_python/extension.py +++ b/agents/ten_packages/extension/openai_chatgpt_python/extension.py @@ -47,8 +47,6 @@ PROPERTY_MAX_MEMORY_LENGTH = "max_memory_length" # Optional PROPERTY_CHECKING_VISION_TEXT_ITEMS = "checking_vision_text_items" # Optional -async def test_task(): - logger.info("Test task is running") class OpenAIChatGPTExtension(Extension): memory = [] @@ -301,7 +299,6 @@ async def _process_completions(self, chat_completions, ten, start_time, input_te self._append_memory({"role": "user", "content": input_text}) self._append_memory({"role": "assistant", "content": full_content}) - self._send_data(ten, full_content, True, input_text) def _append_memory(self, message:str): if len(self.memory) > self.max_memory_length: From fdcccb75d1a3ee0d4ee0d758f3a0749bad8c6a3f Mon Sep 17 00:00:00 2001 From: zhangqianze Date: Thu, 12 Sep 2024 21:20:49 +0800 Subject: [PATCH 07/13] feat: fix parseSentence --- .../extension/message_collector/src/extension.py | 8 ++------ .../extension/openai_chatgpt_python/extension.py | 13 ++++++++----- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/agents/ten_packages/extension/message_collector/src/extension.py b/agents/ten_packages/extension/message_collector/src/extension.py index 39206a13..07ae315a 100644 --- a/agents/ten_packages/extension/message_collector/src/extension.py +++ b/agents/ten_packages/extension/message_collector/src/extension.py @@ -89,16 +89,12 @@ def on_data(self, ten_env: TenEnv, data: Data) -> None: try: final = data.get_property_bool(TEXT_DATA_FINAL_FIELD) except Exception as e: - logger.warning( - f"on_data get_property_bool {TEXT_DATA_FINAL_FIELD} error: {e}" - ) + pass try: stream_id = data.get_property_int(TEXT_DATA_STREAM_ID_FIELD) except Exception as e: - logger.warning( - f"on_data get_property_int {TEXT_DATA_STREAM_ID_FIELD} error: {e}" - ) + pass try: end_of_segment = data.get_property_bool(TEXT_DATA_END_OF_SEGMENT_FIELD) diff --git a/agents/ten_packages/extension/openai_chatgpt_python/extension.py b/agents/ten_packages/extension/openai_chatgpt_python/extension.py index 1746e0c3..efa1876c 100644 --- a/agents/ten_packages/extension/openai_chatgpt_python/extension.py +++ b/agents/ten_packages/extension/openai_chatgpt_python/extension.py @@ -271,6 +271,7 @@ async def _async_chat_completion_with_vision(self, ten: TenEnv, start_time, inpu async def _process_completions(self, chat_completions, ten, start_time, input_text, memory): """Processes completions and sends them asynchronously.""" + sentence = "" full_content = "" first_sentence_sent = False @@ -288,14 +289,16 @@ async def _process_completions(self, chat_completions, ten, start_time, input_te full_content += content - sentence, content, sentence_is_final = "", content, False - while sentence_is_final: + while True: sentence, content, sentence_is_final = parse_sentence(sentence, content) - logger.info(f"recv for input text: [{input_text}] got sentence: [{sentence}]") - await self._send_data(ten, sentence, False, input_text) + if len(sentence) == 0 or not sentence_is_final: + break + self._send_data(ten, sentence, False, input_text) + sentence = "" + if not first_sentence_sent: first_sentence_sent = True - logger.info(f"first sentence latency: {get_current_time() - start_time}ms") + logger.info(f"first sentence latency: {float(get_current_time() - start_time)/1000}ms") self._append_memory({"role": "user", "content": input_text}) self._append_memory({"role": "assistant", "content": full_content}) From 0b6c775ac36d922100a59bc4d6a420e5d4c3fd38 Mon Sep 17 00:00:00 2001 From: zhangqianze Date: Thu, 12 Sep 2024 21:27:57 +0800 Subject: [PATCH 08/13] fix: fix end_segment bug --- agents/ten_packages/extension/openai_chatgpt_python/extension.py | 1 + 1 file changed, 1 insertion(+) diff --git a/agents/ten_packages/extension/openai_chatgpt_python/extension.py b/agents/ten_packages/extension/openai_chatgpt_python/extension.py index efa1876c..19738c18 100644 --- a/agents/ten_packages/extension/openai_chatgpt_python/extension.py +++ b/agents/ten_packages/extension/openai_chatgpt_python/extension.py @@ -302,6 +302,7 @@ async def _process_completions(self, chat_completions, ten, start_time, input_te self._append_memory({"role": "user", "content": input_text}) self._append_memory({"role": "assistant", "content": full_content}) + self._send_data(ten, sentence, True, input_text) def _append_memory(self, message:str): if len(self.memory) > self.max_memory_length: From 743b53363e14582b80a07aae02902c03a652a0e4 Mon Sep 17 00:00:00 2001 From: zhangqianze Date: Sat, 14 Sep 2024 04:10:06 +0800 Subject: [PATCH 09/13] feat: add chatflow abstraction - chatflow - refactor to simplify flow run - added event emitter for intermedium execution --- .../extension/openai_chatgpt_python/events.py | 18 ++ .../openai_chatgpt_python/extension.py | 224 +++++++++++------- .../extension/openai_chatgpt_python/helper.py | 23 +- .../extension/openai_chatgpt_python/openai.py | 39 ++- 4 files changed, 204 insertions(+), 100 deletions(-) create mode 100644 agents/ten_packages/extension/openai_chatgpt_python/events.py diff --git a/agents/ten_packages/extension/openai_chatgpt_python/events.py b/agents/ten_packages/extension/openai_chatgpt_python/events.py new file mode 100644 index 00000000..b063bf8b --- /dev/null +++ b/agents/ten_packages/extension/openai_chatgpt_python/events.py @@ -0,0 +1,18 @@ +import asyncio + + +class AsyncEventEmitter: + def __init__(self): + self.listeners = {} + + def on(self, event_name, listener): + """Register an event listener.""" + if event_name not in self.listeners: + self.listeners[event_name] = [] + self.listeners[event_name].append(listener) + + def emit(self, event_name, *args, **kwargs): + """Fire the event without waiting for listeners to finish.""" + if event_name in self.listeners: + for listener in self.listeners[event_name]: + asyncio.create_task(listener(*args, **kwargs)) \ No newline at end of file diff --git a/agents/ten_packages/extension/openai_chatgpt_python/extension.py b/agents/ten_packages/extension/openai_chatgpt_python/extension.py index 19738c18..549f28f3 100644 --- a/agents/ten_packages/extension/openai_chatgpt_python/extension.py +++ b/agents/ten_packages/extension/openai_chatgpt_python/extension.py @@ -11,7 +11,8 @@ import threading import traceback -from .helper import get_current_time, get_property_bool, get_property_float, get_property_int, get_property_string, parse_sentence, rgb2base64jpeg +from .events import AsyncEventEmitter +from .helper import get_current_time, get_property_bool, get_property_float, get_property_int, get_property_string, parse_sentences, rgb2base64jpeg from .openai import OpenAIChatGPT, OpenAIChatGPTConfig from ten import ( AudioFrame, @@ -58,7 +59,7 @@ class OpenAIChatGPTExtension(Extension): image_height = 0 checking_vision_text_items = [] loop = None - ten_env = None + sentence_fragment = "" # Create the queue for message processing queue = asyncio.Queue() @@ -204,8 +205,7 @@ async def _process_queue(self, ten_env: TenEnv): message = await self.queue.get() try: # Create a new task for the new message - start_time = get_current_time() - self.current_task = asyncio.create_task(self._async_chat_completion(ten_env, start_time, message, self.memory)) + self.current_task = asyncio.create_task(self._run_chatflow(ten_env, message, self.memory)) await self.current_task # Wait for the current task to finish or be cancelled except asyncio.CancelledError: logger.info(f"Task cancelled: {message}") @@ -220,96 +220,148 @@ async def _flush_queue(self): if self.current_task: self.current_task.cancel() - async def _async_chat_completion(self, ten: TenEnv, start_time, input_text, memory): - """Async chat completion task to be run from on_data.""" + async def _run_chatflow(self, ten_env: TenEnv, input_text: str, memory): + """Run the chatflow asynchronously.""" + completion_finished = False + memory_cache = [] try: - logger.info(f"for input text: [{input_text}] memory: {memory}") - message = {"role": "user", "content": input_text} - tools = self.available_tools if self.enable_tools else None - - # Make an async API call to get chat completions - resp = await self.openai_chatgpt.get_chat_completions_stream(memory + [message], tools) - if not resp: - logger.error(f"get_chat_completions_stream Response is None: {input_text}") - return - - # Process the completions asynchronously - await self._process_completions(resp, ten, start_time, input_text, memory) + while not completion_finished: + logger.info(f"for input text: [{input_text}] memory: {memory}") + message = {"role": "user", "content": input_text} + memory_cache = memory_cache + [message, {"role": "assistant", "content": ""}] + tools = self.available_tools if self.enable_tools else None + + self.sentence_fragment = "" + + # Create an asyncio.Event to signal when content is finished + content_finished_event = asyncio.Event() + + # Create an async listener to handle tool calls and content updates + async def handle_content_update(content:str): + # Append the content to the last assistant message + for item in reversed(memory_cache): + if item.get('role') == 'assistant': + item['content'] = item['content'] + content + break + sentences, self.sentence_fragment = parse_sentences(self.sentence_fragment, content) + for s in sentences: + self._send_data(ten_env, s, False) + + async def handle_content_finished(full_content:str): + nonlocal completion_finished + completion_finished = True + content_finished_event.set() + + listener = AsyncEventEmitter() + listener.on("tool_call", lambda tool_call: logger.info(f"tool_call: {tool_call}")) + listener.on("content_update", handle_content_update) + listener.on("content_finished", handle_content_finished) + + # Make an async API call to get chat completions + await self.openai_chatgpt.get_chat_completions_stream(memory + [message], tools, listener) + + # Wait for the content to be finished + await content_finished_event.wait() + except asyncio.CancelledError: + logger.info(f"Task cancelled: {input_text}") except Exception as e: logger.error(f"Error in chat_completion: {traceback.format_exc()} for input text: {input_text}") - - async def _async_chat_completion_with_vision(self, ten: TenEnv, start_time, input_text, memory): - """Handles chat completion when a vision-based tool call is invoked.""" - try: - logger.info(f"for input text: [{input_text}] memory: {memory}") + finally: + self._send_data(ten_env, "", True) + # always append the memory + for m in memory_cache: + self._append_memory(m) + + # async def _async_chat_completion(self, ten: TenEnv, start_time, input_text, memory): + # """Async chat completion task to be run from on_data.""" + # try: + # logger.info(f"for input text: [{input_text}] memory: {memory}") + # message = {"role": "user", "content": input_text} + # tools = self.available_tools if self.enable_tools else None + + # # Make an async API call to get chat completions + # resp = await self.openai_chatgpt.get_chat_completions_stream(memory + [message], tools) + # if not resp: + # logger.error(f"get_chat_completions_stream Response is None: {input_text}") + # return + + # # Process the completions asynchronously + # await self._process_completions(resp, ten, start_time, input_text, memory) + # except Exception as e: + # logger.error(f"Error in chat_completion: {traceback.format_exc()} for input text: {input_text}") + + # async def _async_chat_completion_with_vision(self, ten: TenEnv, start_time, input_text, memory): + # """Handles chat completion when a vision-based tool call is invoked.""" + # try: + # logger.info(f"for input text: [{input_text}] memory: {memory}") - # Prepare the message with vision content - message = {"role": "user", "content": input_text} - if self.image_data is not None: - url = rgb2base64jpeg(self.image_data, self.image_width, self.image_height) - message = { - "role": "user", - "content": [ - {"type": "text", "text": input_text}, - {"type": "image_url", "image_url": {"url": url}}, - ], - } - logger.info(f"msg with vision data: {message}") - - # Asynchronous request to OpenAI for chat completions with vision - resp = await self.openai_chatgpt.get_chat_completions_stream(memory + [message]) - if not resp: - logger.error(f"get_chat_completions_stream Response is None for input text: {input_text}") - return - - # Process completions asynchronously - await self._process_completions(resp, ten, start_time, input_text, memory) - - except Exception as e: - logger.error(f"Error in chat_completion_with_vision: {str(e)} for input text: {input_text}") - - - async def _process_completions(self, chat_completions, ten, start_time, input_text, memory): - """Processes completions and sends them asynchronously.""" - sentence = "" - full_content = "" - first_sentence_sent = False - - async for chat_completion in chat_completions: - content = chat_completion.choices[0].delta.content if len(chat_completion.choices) > 0 and chat_completion.choices[0].delta.content else "" - - if chat_completion.choices[0].delta.tool_calls: - for tool_call in chat_completion.choices[0].delta.tool_calls: - logger.info(f"tool_call: {tool_call}") - if tool_call.function.name == "get_vision_image": - if not full_content and self.checking_vision_text_items: - await self._send_data(ten, random.choice(self.checking_vision_text_items), True, input_text) - await self._async_chat_completion_with_vision(ten, start_time, input_text, memory) - return - - full_content += content - - while True: - sentence, content, sentence_is_final = parse_sentence(sentence, content) - if len(sentence) == 0 or not sentence_is_final: - break - self._send_data(ten, sentence, False, input_text) - sentence = "" - - if not first_sentence_sent: - first_sentence_sent = True - logger.info(f"first sentence latency: {float(get_current_time() - start_time)/1000}ms") - - self._append_memory({"role": "user", "content": input_text}) - self._append_memory({"role": "assistant", "content": full_content}) - self._send_data(ten, sentence, True, input_text) + # # Prepare the message with vision content + # message = {"role": "user", "content": input_text} + # if self.image_data is not None: + # url = rgb2base64jpeg(self.image_data, self.image_width, self.image_height) + # message = { + # "role": "user", + # "content": [ + # {"type": "text", "text": input_text}, + # {"type": "image_url", "image_url": {"url": url}}, + # ], + # } + # logger.info(f"msg with vision data: {message}") + + # # Asynchronous request to OpenAI for chat completions with vision + # resp = await self.openai_chatgpt.get_chat_completions_stream(memory + [message]) + # if not resp: + # logger.error(f"get_chat_completions_stream Response is None for input text: {input_text}") + # return + + # # Process completions asynchronously + # await self._process_completions(resp, ten, start_time, input_text, memory) + + # except Exception as e: + # logger.error(f"Error in chat_completion_with_vision: {str(e)} for input text: {input_text}") + + + # async def _process_completions(self, chat_completions, ten, start_time, input_text, memory): + # """Processes completions and sends them asynchronously.""" + # sentence = "" + # full_content = "" + # first_sentence_sent = False + + # async for chat_completion in chat_completions: + # content = chat_completion.choices[0].delta.content if len(chat_completion.choices) > 0 and chat_completion.choices[0].delta.content else "" + + # if chat_completion.choices[0].delta.tool_calls: + # for tool_call in chat_completion.choices[0].delta.tool_calls: + # logger.info(f"tool_call: {tool_call}") + # if tool_call.function.name == "get_vision_image": + # if not full_content and self.checking_vision_text_items: + # await self._send_data(ten, random.choice(self.checking_vision_text_items), True, input_text) + # await self._async_chat_completion_with_vision(ten, start_time, input_text, memory) + # return + + # full_content += content + + # while True: + # sentence, content, sentence_is_final = parse_sentence(sentence, content) + # if len(sentence) == 0 or not sentence_is_final: + # break + # self._send_data(ten, sentence, False, input_text) + # sentence = "" + + # if not first_sentence_sent: + # first_sentence_sent = True + # logger.info(f"first sentence latency: {float(get_current_time() - start_time)/1000}ms") + + # self._append_memory({"role": "user", "content": input_text}) + # self._append_memory({"role": "assistant", "content": full_content}) + # self._send_data(ten, sentence, True, input_text) def _append_memory(self, message:str): if len(self.memory) > self.max_memory_length: self.memory.pop(0) self.memory.append(message) - def _send_data(self, ten_env: TenEnv, sentence: str, end_of_segment: bool, input_text: str): + def _send_data(self, ten_env: TenEnv, sentence: str, end_of_segment: bool): try: output_data = Data.create("text_data") output_data.set_property_string(DATA_OUT_TEXT_DATA_PROPERTY_TEXT, sentence) @@ -318,9 +370,9 @@ def _send_data(self, ten_env: TenEnv, sentence: str, end_of_segment: bool, input ) ten_env.send_data(output_data) logger.info( - f"for input text: [{input_text}] {'end of segment ' if end_of_segment else ''}sent sentence [{sentence}]" + f"{'end of segment ' if end_of_segment else ''}sent sentence [{sentence}]" ) except Exception as err: logger.info( - f"for input text: [{input_text}] send sentence [{sentence}] failed, err: {err}" + f"send sentence [{sentence}] failed, err: {err}" ) \ No newline at end of file diff --git a/agents/ten_packages/extension/openai_chatgpt_python/helper.py b/agents/ten_packages/extension/openai_chatgpt_python/helper.py index cc38ae0a..f5c91b57 100644 --- a/agents/ten_packages/extension/openai_chatgpt_python/helper.py +++ b/agents/ten_packages/extension/openai_chatgpt_python/helper.py @@ -60,20 +60,21 @@ def is_punctuation(char): return False -def parse_sentence(sentence, content): - remain = "" - found_punc = False - +def parse_sentences(sentence_fragment, content): + sentences = [] + current_sentence = sentence_fragment for char in content: - if not found_punc: - sentence += char - else: - remain += char + current_sentence += char + if is_punctuation(char): + # Check if the current sentence contains non-punctuation characters + stripped_sentence = current_sentence.strip() + if any(c.isalnum() for c in stripped_sentence): + sentences.append(stripped_sentence) + current_sentence = "" # Reset for the next sentence - if not found_punc and is_punctuation(char): - found_punc = True + remain = current_sentence # Any remaining characters form the incomplete sentence + return sentences, remain - return sentence, remain, found_punc def rgb2base64jpeg(rgb_data, width, height): diff --git a/agents/ten_packages/extension/openai_chatgpt_python/openai.py b/agents/ten_packages/extension/openai_chatgpt_python/openai.py index 11df4072..3449126d 100644 --- a/agents/ten_packages/extension/openai_chatgpt_python/openai.py +++ b/agents/ten_packages/extension/openai_chatgpt_python/openai.py @@ -1,3 +1,10 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2024-08. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# import random import requests from openai import AsyncOpenAI @@ -65,7 +72,7 @@ def __init__(self, config: OpenAIChatGPTConfig): self.session.proxies.update(proxies) self.client.session = self.session - async def get_chat_completions_stream(self, messages, tools = None): + async def get_chat_completions_stream(self, messages, tools = None, listener = None): req = { "model": self.config.model, "messages": [ @@ -87,6 +94,32 @@ async def get_chat_completions_stream(self, messages, tools = None): try: response = await self.client.chat.completions.create(**req) - return response except Exception as e: - raise Exception(f"CreateChatCompletionStream failed, err: {e}") \ No newline at end of file + raise Exception(f"CreateChatCompletionStream failed, err: {e}") + + full_content = "" + + async for chat_completion in response: + choice = chat_completion.choices[0] + delta = choice.delta + + content = delta.content if delta and delta.content else "" + + # Emit content update event (fire-and-forget) + if listener and content: + listener.emit('content_update', content) + + full_content += content + + # Check for tool calls + if delta.tool_calls: + for tool_call in delta.tool_calls: + logger.info(f"tool_call: {tool_call}") + + # Emit tool call event (fire-and-forget) + if listener: + listener.emit('tool_call', tool_call) + + # Emit content finished event after the loop completes + if listener: + listener.emit('content_finished', full_content) From 7e1a5aa31d707e035b569170f22426bccc6e1907 Mon Sep 17 00:00:00 2001 From: zhangqianze Date: Sat, 14 Sep 2024 12:38:30 +0800 Subject: [PATCH 10/13] feat: refactor openai, support multi data-stream data pack --- .../message_collector/src/extension.py | 54 ++++++-- .../extension/openai_chatgpt_python/helper.py | 2 +- playground/src/manager/rtc/rtc.ts | 118 +++++++++++------- 3 files changed, 117 insertions(+), 57 deletions(-) diff --git a/agents/ten_packages/extension/message_collector/src/extension.py b/agents/ten_packages/extension/message_collector/src/extension.py index 07ae315a..3ddfcda2 100644 --- a/agents/ten_packages/extension/message_collector/src/extension.py +++ b/agents/ten_packages/extension/message_collector/src/extension.py @@ -7,6 +7,7 @@ # import json import time +import uuid from ten import ( AudioFrame, VideoFrame, @@ -19,7 +20,8 @@ ) from .log import logger - +MAX_SIZE = 1024 # 1 KB limit +OVERHEAD_ESTIMATE = 200 # Estimate for the overhead of metadata in the JSON CMD_NAME_FLUSH = "flush" @@ -120,19 +122,55 @@ def on_data(self, ten_env: TenEnv, data: Data) -> None: cached_text_map[stream_id] = text - msg_data = json.dumps({ - "text": text, + # Generate a unique message ID for this batch of parts + message_id = str(uuid.uuid4()) + + # Prepare the main JSON structure without the text field + base_msg_data = { "is_final": end_of_segment, "stream_id": stream_id, + "message_id": message_id, # Add message_id to identify the split message "data_type": "transcribe", "text_ts": int(time.time() * 1000), # Convert to milliseconds - }) + } try: - # convert the origin text data to the protobuf data and send it to the graph. - ten_data = Data.create("data") - ten_data.set_property_buf("data", msg_data.encode()) - ten_env.send_data(ten_data) + # Check if the text + metadata exceeds 1 KB (1024 bytes) + text_bytes = text.encode('utf-8') + if len(text_bytes) + OVERHEAD_ESTIMATE <= MAX_SIZE: + # If it's within the limit, send it directly + base_msg_data["text"] = text + msg_data = json.dumps(base_msg_data) + ten_data = Data.create("data") + ten_data.set_property_buf("data", msg_data.encode()) + ten_env.send_data(ten_data) + else: + # Split the text into smaller chunks + max_text_size = MAX_SIZE - OVERHEAD_ESTIMATE + total_length = len(text_bytes) + total_parts = (total_length + max_text_size - 1) // max_text_size # Calculate the number of parts + logger.info(f"Splitting text into {total_parts} parts") + + for part_number in range(total_parts): + start_index = part_number * max_text_size + end_index = min(start_index + max_text_size, total_length) + text_part = text_bytes[start_index:end_index].decode('utf-8', errors='ignore') + + # Create a message for this part + part_data = base_msg_data.copy() + part_data.update({ + "text": text_part, + "is_final": False, # Parts are not final + "part_number": part_number + 1, + "total_parts": total_parts, + }) + + # Send each part + part_msg_data = json.dumps(part_data) + ten_data = Data.create("data") + ten_data.set_property_buf("data", part_msg_data.encode()) + ten_env.send_data(ten_data) + except Exception as e: logger.warning(f"on_data new_data error: {e}") return diff --git a/agents/ten_packages/extension/openai_chatgpt_python/helper.py b/agents/ten_packages/extension/openai_chatgpt_python/helper.py index f5c91b57..4c762c7d 100644 --- a/agents/ten_packages/extension/openai_chatgpt_python/helper.py +++ b/agents/ten_packages/extension/openai_chatgpt_python/helper.py @@ -67,7 +67,7 @@ def parse_sentences(sentence_fragment, content): current_sentence += char if is_punctuation(char): # Check if the current sentence contains non-punctuation characters - stripped_sentence = current_sentence.strip() + stripped_sentence = current_sentence if any(c.isalnum() for c in stripped_sentence): sentences.append(stripped_sentence) current_sentence = "" # Reset for the next sentence diff --git a/playground/src/manager/rtc/rtc.ts b/playground/src/manager/rtc/rtc.ts index 0439be78..75264e18 100644 --- a/playground/src/manager/rtc/rtc.ts +++ b/playground/src/manager/rtc/rtc.ts @@ -103,57 +103,79 @@ export class RtcManager extends AGEventEmitter { }) }) this.client.on("stream-message", (uid: UID, stream: any) => { - this._praseData(stream) + this._parseData(stream) }) } - private _praseData(data: any): ITextItem | void { - // @ts-ignore - // const textstream = protoRoot.Agora.SpeechToText.lookup("Text").decode(data) - // if (!textstream) { - // return console.warn("Prase data failed.") - // } - let decoder = new TextDecoder('utf-8') - let decodedMessage = decoder.decode(data) - - const textstream = JSON.parse(decodedMessage) - - console.log("[test] textstream raw data", JSON.stringify(textstream)) - const { stream_id, is_final, text, text_ts, data_type } = textstream - let textStr: string = "" - let isFinal = false - const textItem: ITextItem = {} as ITextItem - textItem.uid = stream_id - textItem.time = text_ts - // switch (dataType) { - // case "transcribe": - // words.forEach((word: any) => { - // textStr += word.text - // if (word.isFinal) { - // isFinal = true - // } - // }) - textItem.dataType = "transcribe" - // textItem.language = culture - textItem.text = text - textItem.isFinal = is_final - this.emit("textChanged", textItem) - // break - // case "translate": - // if (!trans?.length) { - // return - // } - // trans.forEach((transItem: any) => { - // textStr = transItem.texts.join("") - // isFinal = !!transItem.isFinal - // textItem.dataType = "translate" - // textItem.language = transItem.lang - // textItem.isFinal = isFinal - // textItem.text = textStr - // this.emit("textChanged", textItem) - // }) - // break - // } + private _parseData(data: any): ITextItem | void { + let decoder = new TextDecoder('utf-8'); + let decodedMessage = decoder.decode(data); + const textstream = JSON.parse(decodedMessage); + + console.log("[test] textstream raw data", JSON.stringify(textstream)); + + const { stream_id, is_final, text, text_ts, data_type, message_id, part_number, total_parts } = textstream; + + if (total_parts > 0) { + // If message is split, handle it accordingly + this._handleSplitMessage(message_id, part_number, total_parts, stream_id, is_final, text, text_ts); + } else { + // If there is no message_id, treat it as a complete message + this._handleCompleteMessage(stream_id, is_final, text, text_ts); + } + } + + private messageCache: { [key: string]: { parts: string[], totalParts: number } } = {}; + + /** + * Handle complete messages (not split). + */ + private _handleCompleteMessage(stream_id: number, is_final: boolean, text: string, text_ts: number): void { + const textItem: ITextItem = { + uid: `${stream_id}`, + time: text_ts, + dataType: "transcribe", + text: text, + isFinal: is_final + }; + this.emit("textChanged", textItem); + } + + /** + * Handle split messages, track parts, and reassemble once all parts are received. + */ + private _handleSplitMessage( + message_id: string, + part_number: number, + total_parts: number, + stream_id: number, + is_final: boolean, + text: string, + text_ts: number + ): void { + // Ensure the messageCache entry exists for this message_id + if (!this.messageCache[message_id]) { + this.messageCache[message_id] = { parts: [], totalParts: total_parts }; + } + + const cache = this.messageCache[message_id]; + + // Store the received part at the correct index (part_number starts from 1, so we use part_number - 1) + cache.parts[part_number - 1] = text; + + // Check if all parts have been received + const receivedPartsCount = cache.parts.filter(part => part !== undefined).length; + + if (receivedPartsCount === total_parts) { + // All parts have been received, reassemble the message + const fullText = cache.parts.join(''); + + // Now that the message is reassembled, handle it like a complete message + this._handleCompleteMessage(stream_id, is_final, fullText, text_ts); + + // Remove the cached message since it is now fully processed + delete this.messageCache[message_id]; + } } From fa608ee919950a35f737f8348deb6f5c9080910b Mon Sep 17 00:00:00 2001 From: zhangqianze Date: Sat, 14 Sep 2024 22:56:55 +0800 Subject: [PATCH 11/13] feat: finalize openai extension refactoring - change asyncio.queue to AsyncQueue - change the way we abstract chatflow - use eventEmitter for easier tool notification - use queue to ensure task are processed one by one and cancellable --- .../message_collector/src/extension.py | 43 ++-- .../extension/openai_chatgpt_python/events.py | 18 -- .../openai_chatgpt_python/extension.py | 201 ++++++------------ .../extension/openai_chatgpt_python/helper.py | 54 ++++- playground/src/manager/rtc/rtc.ts | 5 +- 5 files changed, 157 insertions(+), 164 deletions(-) delete mode 100644 agents/ten_packages/extension/openai_chatgpt_python/events.py diff --git a/agents/ten_packages/extension/message_collector/src/extension.py b/agents/ten_packages/extension/message_collector/src/extension.py index 3ddfcda2..7013c432 100644 --- a/agents/ten_packages/extension/message_collector/src/extension.py +++ b/agents/ten_packages/extension/message_collector/src/extension.py @@ -20,7 +20,7 @@ ) from .log import logger -MAX_SIZE = 1024 # 1 KB limit +MAX_SIZE = 800 # 1 KB limit OVERHEAD_ESTIMATE = 200 # Estimate for the overhead of metadata in the JSON CMD_NAME_FLUSH = "flush" @@ -135,33 +135,47 @@ def on_data(self, ten_env: TenEnv, data: Data) -> None: } try: - # Check if the text + metadata exceeds 1 KB (1024 bytes) + # Convert the text to UTF-8 bytes text_bytes = text.encode('utf-8') + + # If the text + metadata fits within the size limit, send it directly if len(text_bytes) + OVERHEAD_ESTIMATE <= MAX_SIZE: - # If it's within the limit, send it directly base_msg_data["text"] = text msg_data = json.dumps(base_msg_data) ten_data = Data.create("data") ten_data.set_property_buf("data", msg_data.encode()) ten_env.send_data(ten_data) else: - # Split the text into smaller chunks + # Split the text bytes into smaller chunks, ensuring safe UTF-8 splitting max_text_size = MAX_SIZE - OVERHEAD_ESTIMATE total_length = len(text_bytes) - total_parts = (total_length + max_text_size - 1) // max_text_size # Calculate the number of parts - logger.info(f"Splitting text into {total_parts} parts") + total_parts = (total_length + max_text_size - 1) // max_text_size # Calculate number of parts - for part_number in range(total_parts): - start_index = part_number * max_text_size - end_index = min(start_index + max_text_size, total_length) - text_part = text_bytes[start_index:end_index].decode('utf-8', errors='ignore') + def get_valid_utf8_chunk(start, end): + """Helper function to ensure valid UTF-8 chunks.""" + while end > start: + try: + # Decode to check if this chunk is valid UTF-8 + text_part = text_bytes[start:end].decode('utf-8') + return text_part, end + except UnicodeDecodeError: + # Reduce the end point to avoid splitting in the middle of a character + end -= 1 + # If no valid chunk is found (shouldn't happen with valid UTF-8 input), return an empty string + return "", start + + part_number = 0 + start_index = 0 + while start_index < total_length: + part_number += 1 + # Get a valid UTF-8 chunk + text_part, end_index = get_valid_utf8_chunk(start_index, min(start_index + max_text_size, total_length)) - # Create a message for this part + # Prepare the part data with metadata part_data = base_msg_data.copy() part_data.update({ "text": text_part, - "is_final": False, # Parts are not final - "part_number": part_number + 1, + "part_number": part_number, "total_parts": total_parts, }) @@ -171,6 +185,9 @@ def on_data(self, ten_env: TenEnv, data: Data) -> None: ten_data.set_property_buf("data", part_msg_data.encode()) ten_env.send_data(ten_data) + # Move to the next chunk + start_index = end_index + except Exception as e: logger.warning(f"on_data new_data error: {e}") return diff --git a/agents/ten_packages/extension/openai_chatgpt_python/events.py b/agents/ten_packages/extension/openai_chatgpt_python/events.py deleted file mode 100644 index b063bf8b..00000000 --- a/agents/ten_packages/extension/openai_chatgpt_python/events.py +++ /dev/null @@ -1,18 +0,0 @@ -import asyncio - - -class AsyncEventEmitter: - def __init__(self): - self.listeners = {} - - def on(self, event_name, listener): - """Register an event listener.""" - if event_name not in self.listeners: - self.listeners[event_name] = [] - self.listeners[event_name].append(listener) - - def emit(self, event_name, *args, **kwargs): - """Fire the event without waiting for listeners to finish.""" - if event_name in self.listeners: - for listener in self.listeners[event_name]: - asyncio.create_task(listener(*args, **kwargs)) \ No newline at end of file diff --git a/agents/ten_packages/extension/openai_chatgpt_python/extension.py b/agents/ten_packages/extension/openai_chatgpt_python/extension.py index 549f28f3..7fe67c73 100644 --- a/agents/ten_packages/extension/openai_chatgpt_python/extension.py +++ b/agents/ten_packages/extension/openai_chatgpt_python/extension.py @@ -11,8 +11,7 @@ import threading import traceback -from .events import AsyncEventEmitter -from .helper import get_current_time, get_property_bool, get_property_float, get_property_int, get_property_string, parse_sentences, rgb2base64jpeg +from .helper import AsyncEventEmitter, AsyncQueue, get_current_time, get_property_bool, get_property_float, get_property_int, get_property_string, parse_sentences, rgb2base64jpeg from .openai import OpenAIChatGPT, OpenAIChatGPTConfig from ten import ( AudioFrame, @@ -49,6 +48,9 @@ PROPERTY_CHECKING_VISION_TEXT_ITEMS = "checking_vision_text_items" # Optional +TASK_TYPE_CHAT_COMPLETION = "chat_completion" +TASK_TYPE_CHAT_COMPLETION_WITH_VISION = "chat_completion_with_vision" + class OpenAIChatGPTExtension(Extension): memory = [] max_memory_length = 10 @@ -62,7 +64,7 @@ class OpenAIChatGPTExtension(Extension): sentence_fragment = "" # Create the queue for message processing - queue = asyncio.Queue() + queue = AsyncQueue() available_tools = [ { @@ -185,7 +187,7 @@ def on_data(self, ten_env: TenEnv, data: Data) -> None: logger.info(f"OnData input text: [{input_text}]") # Start an asynchronous task for handling chat completion - asyncio.run_coroutine_threadsafe(self.queue.put(input_text), self.loop) + asyncio.run_coroutine_threadsafe(self.queue.put([TASK_TYPE_CHAT_COMPLETION, input_text]), self.loop) def on_audio_frame(self, ten_env: TenEnv, audio_frame: AudioFrame) -> None: # TODO: process pcm frame @@ -202,66 +204,87 @@ async def _process_queue(self, ten_env: TenEnv): """Asynchronously process queue items one by one.""" while True: # Wait for an item to be available in the queue - message = await self.queue.get() + [task_type, message] = await self.queue.get() try: # Create a new task for the new message - self.current_task = asyncio.create_task(self._run_chatflow(ten_env, message, self.memory)) + self.current_task = asyncio.create_task(self._run_chatflow(ten_env, task_type, message, self.memory)) await self.current_task # Wait for the current task to finish or be cancelled except asyncio.CancelledError: logger.info(f"Task cancelled: {message}") - finally: - self.queue.task_done() async def _flush_queue(self): """Flushes the self.queue and cancels the current task.""" - while not self.queue.empty(): - self.queue.get_nowait() - self.queue.task_done() + # Flush the queue using the new flush method + await self.queue.flush() + + # Cancel the current task if one is running if self.current_task: + logger.info("Cancelling the current task during flush.") self.current_task.cancel() - async def _run_chatflow(self, ten_env: TenEnv, input_text: str, memory): + async def _run_chatflow(self, ten_env: TenEnv, task_type:str, input_text: str, memory): """Run the chatflow asynchronously.""" - completion_finished = False memory_cache = [] try: - while not completion_finished: - logger.info(f"for input text: [{input_text}] memory: {memory}") + logger.info(f"for input text: [{input_text}] memory: {memory}") + message = None + tools = None + + # Prepare the message and tools based on the task type + if task_type == TASK_TYPE_CHAT_COMPLETION: message = {"role": "user", "content": input_text} memory_cache = memory_cache + [message, {"role": "assistant", "content": ""}] tools = self.available_tools if self.enable_tools else None - - self.sentence_fragment = "" - - # Create an asyncio.Event to signal when content is finished - content_finished_event = asyncio.Event() - - # Create an async listener to handle tool calls and content updates - async def handle_content_update(content:str): - # Append the content to the last assistant message - for item in reversed(memory_cache): - if item.get('role') == 'assistant': - item['content'] = item['content'] + content - break - sentences, self.sentence_fragment = parse_sentences(self.sentence_fragment, content) - for s in sentences: - self._send_data(ten_env, s, False) - - async def handle_content_finished(full_content:str): - nonlocal completion_finished - completion_finished = True - content_finished_event.set() - - listener = AsyncEventEmitter() - listener.on("tool_call", lambda tool_call: logger.info(f"tool_call: {tool_call}")) - listener.on("content_update", handle_content_update) - listener.on("content_finished", handle_content_finished) - - # Make an async API call to get chat completions - await self.openai_chatgpt.get_chat_completions_stream(memory + [message], tools, listener) - - # Wait for the content to be finished - await content_finished_event.wait() + elif task_type == TASK_TYPE_CHAT_COMPLETION_WITH_VISION: + message = {"role": "user", "content": input_text} + memory_cache = memory_cache + [message, {"role": "assistant", "content": ""}] + tools = self.available_tools if self.enable_tools else None + if self.image_data is not None: + url = rgb2base64jpeg(self.image_data, self.image_width, self.image_height) + message = { + "role": "user", + "content": [ + {"type": "text", "text": input_text}, + {"type": "image_url", "image_url": {"url": url}}, + ], + } + logger.info(f"msg with vision data: {message}") + + + self.sentence_fragment = "" + + # Create an asyncio.Event to signal when content is finished + content_finished_event = asyncio.Event() + + # Create an async listener to handle tool calls and content updates + async def handle_tool_call(tool_call): + logger.info(f"tool_call: {tool_call}") + if tool_call.function.name == "get_vision_image": + self.queue._queue.appendleft([TASK_TYPE_CHAT_COMPLETION_WITH_VISION, input_text]) + + async def handle_content_update(content:str): + # Append the content to the last assistant message + for item in reversed(memory_cache): + if item.get('role') == 'assistant': + item['content'] = item['content'] + content + break + sentences, self.sentence_fragment = parse_sentences(self.sentence_fragment, content) + for s in sentences: + self._send_data(ten_env, s, False) + + async def handle_content_finished(full_content:str): + content_finished_event.set() + + listener = AsyncEventEmitter() + listener.on("tool_call", handle_tool_call) + listener.on("content_update", handle_content_update) + listener.on("content_finished", handle_content_finished) + + # Make an async API call to get chat completions + await self.openai_chatgpt.get_chat_completions_stream(memory + [message], tools, listener) + + # Wait for the content to be finished + await content_finished_event.wait() except asyncio.CancelledError: logger.info(f"Task cancelled: {input_text}") except Exception as e: @@ -272,90 +295,6 @@ async def handle_content_finished(full_content:str): for m in memory_cache: self._append_memory(m) - # async def _async_chat_completion(self, ten: TenEnv, start_time, input_text, memory): - # """Async chat completion task to be run from on_data.""" - # try: - # logger.info(f"for input text: [{input_text}] memory: {memory}") - # message = {"role": "user", "content": input_text} - # tools = self.available_tools if self.enable_tools else None - - # # Make an async API call to get chat completions - # resp = await self.openai_chatgpt.get_chat_completions_stream(memory + [message], tools) - # if not resp: - # logger.error(f"get_chat_completions_stream Response is None: {input_text}") - # return - - # # Process the completions asynchronously - # await self._process_completions(resp, ten, start_time, input_text, memory) - # except Exception as e: - # logger.error(f"Error in chat_completion: {traceback.format_exc()} for input text: {input_text}") - - # async def _async_chat_completion_with_vision(self, ten: TenEnv, start_time, input_text, memory): - # """Handles chat completion when a vision-based tool call is invoked.""" - # try: - # logger.info(f"for input text: [{input_text}] memory: {memory}") - - # # Prepare the message with vision content - # message = {"role": "user", "content": input_text} - # if self.image_data is not None: - # url = rgb2base64jpeg(self.image_data, self.image_width, self.image_height) - # message = { - # "role": "user", - # "content": [ - # {"type": "text", "text": input_text}, - # {"type": "image_url", "image_url": {"url": url}}, - # ], - # } - # logger.info(f"msg with vision data: {message}") - - # # Asynchronous request to OpenAI for chat completions with vision - # resp = await self.openai_chatgpt.get_chat_completions_stream(memory + [message]) - # if not resp: - # logger.error(f"get_chat_completions_stream Response is None for input text: {input_text}") - # return - - # # Process completions asynchronously - # await self._process_completions(resp, ten, start_time, input_text, memory) - - # except Exception as e: - # logger.error(f"Error in chat_completion_with_vision: {str(e)} for input text: {input_text}") - - - # async def _process_completions(self, chat_completions, ten, start_time, input_text, memory): - # """Processes completions and sends them asynchronously.""" - # sentence = "" - # full_content = "" - # first_sentence_sent = False - - # async for chat_completion in chat_completions: - # content = chat_completion.choices[0].delta.content if len(chat_completion.choices) > 0 and chat_completion.choices[0].delta.content else "" - - # if chat_completion.choices[0].delta.tool_calls: - # for tool_call in chat_completion.choices[0].delta.tool_calls: - # logger.info(f"tool_call: {tool_call}") - # if tool_call.function.name == "get_vision_image": - # if not full_content and self.checking_vision_text_items: - # await self._send_data(ten, random.choice(self.checking_vision_text_items), True, input_text) - # await self._async_chat_completion_with_vision(ten, start_time, input_text, memory) - # return - - # full_content += content - - # while True: - # sentence, content, sentence_is_final = parse_sentence(sentence, content) - # if len(sentence) == 0 or not sentence_is_final: - # break - # self._send_data(ten, sentence, False, input_text) - # sentence = "" - - # if not first_sentence_sent: - # first_sentence_sent = True - # logger.info(f"first sentence latency: {float(get_current_time() - start_time)/1000}ms") - - # self._append_memory({"role": "user", "content": input_text}) - # self._append_memory({"role": "assistant", "content": full_content}) - # self._send_data(ten, sentence, True, input_text) - def _append_memory(self, message:str): if len(self.memory) > self.max_memory_length: self.memory.pop(0) diff --git a/agents/ten_packages/extension/openai_chatgpt_python/helper.py b/agents/ten_packages/extension/openai_chatgpt_python/helper.py index 4c762c7d..12f3a7bf 100644 --- a/agents/ten_packages/extension/openai_chatgpt_python/helper.py +++ b/agents/ten_packages/extension/openai_chatgpt_python/helper.py @@ -5,6 +5,8 @@ # Copyright (c) 2024 Agora IO. All rights reserved. # # +import asyncio +from collections import deque from ten.data import Data from .log import logger from PIL import Image @@ -132,4 +134,54 @@ def resize_image_keep_aspect(image, max_size=512): # Resize the image with the new dimensions resized_image = image.resize((new_width, new_height)) - return resized_image \ No newline at end of file + return resized_image + + +class AsyncEventEmitter: + def __init__(self): + self.listeners = {} + + def on(self, event_name, listener): + """Register an event listener.""" + if event_name not in self.listeners: + self.listeners[event_name] = [] + self.listeners[event_name].append(listener) + + def emit(self, event_name, *args, **kwargs): + """Fire the event without waiting for listeners to finish.""" + if event_name in self.listeners: + for listener in self.listeners[event_name]: + asyncio.create_task(listener(*args, **kwargs)) + + +class AsyncQueue: + def __init__(self): + self._queue = deque() # Use deque for efficient prepend and append + self._condition = asyncio.Condition() # Use Condition to manage access + + async def put(self, item, prepend=False): + """Add an item to the queue (prepend if specified).""" + async with self._condition: + if prepend: + self._queue.appendleft(item) # Prepend item to the front + else: + self._queue.append(item) # Append item to the back + self._condition.notify() # Notify consumers that a new item is available + + async def get(self): + """Remove and return an item from the queue.""" + async with self._condition: + while not self._queue: + await self._condition.wait() # Wait until an item is available + return self._queue.popleft() # Pop from the front of the deque + + async def flush(self): + """Flush all items from the queue.""" + async with self._condition: + while self._queue: + self._queue.popleft() # Clear the queue + self._condition.notify_all() # Notify all consumers that the queue is empty + + def __len__(self): + """Return the current size of the queue.""" + return len(self._queue) \ No newline at end of file diff --git a/playground/src/manager/rtc/rtc.ts b/playground/src/manager/rtc/rtc.ts index 75264e18..4139d89e 100644 --- a/playground/src/manager/rtc/rtc.ts +++ b/playground/src/manager/rtc/rtc.ts @@ -138,7 +138,10 @@ export class RtcManager extends AGEventEmitter { text: text, isFinal: is_final }; - this.emit("textChanged", textItem); + + if (text.trim().length > 0) { + this.emit("textChanged", textItem); + } } /** From a38ef0d10482a105f6263ec5c6252c91de79eac5 Mon Sep 17 00:00:00 2001 From: zhangqianze Date: Sat, 14 Sep 2024 23:02:49 +0800 Subject: [PATCH 12/13] feat: add docs --- .../extension/openai_chatgpt_python/README.md | 61 ++++++++++++++----- 1 file changed, 46 insertions(+), 15 deletions(-) diff --git a/agents/ten_packages/extension/openai_chatgpt_python/README.md b/agents/ten_packages/extension/openai_chatgpt_python/README.md index 6bb67d8e..2a9b1c82 100644 --- a/agents/ten_packages/extension/openai_chatgpt_python/README.md +++ b/agents/ten_packages/extension/openai_chatgpt_python/README.md @@ -1,12 +1,15 @@ # openai_chatgpt_python - +An extension for integrating OpenAI's GPT models (e.g., GPT-4) into your application, providing configurable AI-driven features such as conversational agents, task automation, and tool integration. ## Features -- xxx feature +- OpenAI GPT Integration: Leverage GPT models for text processing and conversational tasks. +- Configurable: Easily customize API keys, model settings, prompts, temperature, etc. +- Async Queue Processing: Supports real-time message processing with task cancellation and prioritization. +- Tool Support: Integrate external tools like image recognition via OpenAI's API. ## API @@ -14,16 +17,44 @@ Refer to `api` definition in [manifest.json] and default values in [property.jso -## Development - -### Build - - - -### Unit test - - - -## Misc - - +| **Property** | **Type** | **Description** | +|----------------------------|------------|-------------------------------------------| +| `api_key` | `string` | API key for authenticating with OpenAI | +| `frequency_penalty` | `float64` | Controls how much to penalize new tokens based on their existing frequency in the text so far | +| `presence_penalty` | `float64` | Controls how much to penalize new tokens based on whether they appear in the text so far | +| `temperature` | `float64` | Sampling temperature, higher values mean more randomness | +| `top_p` | `float64` | Nucleus sampling, chooses tokens with cumulative probability `p` | +| `model` | `string` | Model identifier (e.g., GPT-3.5, GPT-4) | +| `max_tokens` | `int64` | Maximum number of tokens to generate | +| `base_url` | `string` | API base URL | +| `prompt` | `string` | Default prompt to send to the model | +| `greeting` | `string` | Greeting message to be used | +| `checking_vision_text_items`| `string` | Items for checking vision-based text responses | +| `proxy_url` | `string` | URL of the proxy server | +| `max_memory_length` | `int64` | Maximum memory length for processing | +| `enable_tools` | `bool` | Flag to enable or disable external tools | + +### Data In: +| **Name** | **Property** | **Type** | **Description** | +|----------------|--------------|------------|-------------------------------| +| `text_data` | `text` | `string` | Incoming text data | + +### Data Out: +| **Name** | **Property** | **Type** | **Description** | +|----------------|--------------|------------|-------------------------------| +| `text_data` | `text` | `string` | Outgoing text data | + +### Command In: +| **Name** | **Description** | +|----------------|---------------------------------------------| +| `flush` | Command to flush the current processing state | + +### Command Out: +| **Name** | **Description** | +|----------------|---------------------------------------------| +| `flush` | Response after flushing the current state | + +### Video Frame In: +| **Name** | **Description** | +|------------------|-------------------------------------------| +| `video_frame` | Video frame input for vision processing | From 46bffdac264bf7b9d5a9c7d2a16b10129fcf13ca Mon Sep 17 00:00:00 2001 From: zhangqianze Date: Sat, 14 Sep 2024 23:11:42 +0800 Subject: [PATCH 13/13] feat: don't use private api --- .../ten_packages/extension/openai_chatgpt_python/extension.py | 3 ++- agents/ten_packages/extension/openai_chatgpt_python/helper.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/agents/ten_packages/extension/openai_chatgpt_python/extension.py b/agents/ten_packages/extension/openai_chatgpt_python/extension.py index 7fe67c73..5d027267 100644 --- a/agents/ten_packages/extension/openai_chatgpt_python/extension.py +++ b/agents/ten_packages/extension/openai_chatgpt_python/extension.py @@ -260,7 +260,8 @@ async def _run_chatflow(self, ten_env: TenEnv, task_type:str, input_text: str, m async def handle_tool_call(tool_call): logger.info(f"tool_call: {tool_call}") if tool_call.function.name == "get_vision_image": - self.queue._queue.appendleft([TASK_TYPE_CHAT_COMPLETION_WITH_VISION, input_text]) + # Append the vision image to the last assistant message + await self.queue.put([TASK_TYPE_CHAT_COMPLETION_WITH_VISION, input_text], True) async def handle_content_update(content:str): # Append the content to the last assistant message diff --git a/agents/ten_packages/extension/openai_chatgpt_python/helper.py b/agents/ten_packages/extension/openai_chatgpt_python/helper.py index 12f3a7bf..28c28f19 100644 --- a/agents/ten_packages/extension/openai_chatgpt_python/helper.py +++ b/agents/ten_packages/extension/openai_chatgpt_python/helper.py @@ -166,7 +166,7 @@ async def put(self, item, prepend=False): self._queue.appendleft(item) # Prepend item to the front else: self._queue.append(item) # Append item to the back - self._condition.notify() # Notify consumers that a new item is available + self._condition.notify() async def get(self): """Remove and return an item from the queue."""