From 475e0ab2d9f332961745dda7f5c31055e81553c9 Mon Sep 17 00:00:00 2001 From: Ethan Zhang Date: Sun, 11 Aug 2024 14:01:58 +0800 Subject: [PATCH] feat: support vision for openai (#137) * feat: support vision for openai * feat: add tool support * updates * feat: update property.json.example * feat: finalize camera video feature * feat: update propery.json.example * feat: finalize camera change * fix: avoice duplicate memory appending * fix: adjust comments --- .../openai_chatgpt_python/manifest.json | 10 +- .../openai_chatgpt_python/openai_chatgpt.py | 3 +- .../openai_chatgpt_extension.py | 296 ++++++++++++------ .../openai_chatgpt_python/requirements.txt | 4 +- agents/property.json.example | 233 ++++++++++++++ 5 files changed, 442 insertions(+), 104 deletions(-) diff --git a/agents/addon/extension/openai_chatgpt_python/manifest.json b/agents/addon/extension/openai_chatgpt_python/manifest.json index 065f9a94..8193a637 100644 --- a/agents/addon/extension/openai_chatgpt_python/manifest.json +++ b/agents/addon/extension/openai_chatgpt_python/manifest.json @@ -47,6 +47,9 @@ }, "max_memory_length": { "type": "int64" + }, + "enable_tools": { + "type": "bool" } }, "data_in": [ @@ -78,6 +81,11 @@ { "name": "flush" } + ], + "image_frame_in": [ + { + "name": "image_frame" + } ] } -} \ No newline at end of file +} diff --git a/agents/addon/extension/openai_chatgpt_python/openai_chatgpt.py b/agents/addon/extension/openai_chatgpt_python/openai_chatgpt.py index 5ad5b6cc..7f2a659c 100644 --- a/agents/addon/extension/openai_chatgpt_python/openai_chatgpt.py +++ b/agents/addon/extension/openai_chatgpt_python/openai_chatgpt.py @@ -65,7 +65,7 @@ def __init__(self, config: OpenAIChatGPTConfig): self.session.proxies.update(proxies) self.client.session = self.session - def get_chat_completions_stream(self, messages): + def get_chat_completions_stream(self, messages, tools = None): req = { "model": self.config.model, "messages": [ @@ -75,6 +75,7 @@ def get_chat_completions_stream(self, messages): }, *messages, ], + "tools": tools, "temperature": self.config.temperature, "top_p": self.config.top_p, "presence_penalty": self.config.presence_penalty, diff --git a/agents/addon/extension/openai_chatgpt_python/openai_chatgpt_extension.py b/agents/addon/extension/openai_chatgpt_python/openai_chatgpt_extension.py index 7dec2073..e8f670e9 100644 --- a/agents/addon/extension/openai_chatgpt_python/openai_chatgpt_extension.py +++ b/agents/addon/extension/openai_chatgpt_python/openai_chatgpt_extension.py @@ -5,6 +5,8 @@ # Copyright (c) 2024 Agora IO. All rights reserved. # # +import traceback +from rte.image_frame import ImageFrame from .openai_chatgpt import OpenAIChatGPT, OpenAIChatGPTConfig from datetime import datetime from threading import Thread @@ -20,6 +22,10 @@ MetadataInfo, ) from .log import logger +from base64 import b64encode +import numpy as np +from io import BytesIO +from PIL import Image CMD_IN_FLUSH = "flush" @@ -39,6 +45,7 @@ PROPERTY_TOP_P = "top_p" # Optional PROPERTY_MAX_TOKENS = "max_tokens" # Optional PROPERTY_GREETING = "greeting" # Optional +PROPERTY_ENABLE_TOOLS = "enable_tools" # Optional PROPERTY_PROXY_URL = "proxy_url" # Optional PROPERTY_MAX_MEMORY_LENGTH = "max_memory_length" # Optional @@ -72,12 +79,81 @@ def parse_sentence(sentence, content): return sentence, remain, found_punc +def rgb2base64jpeg(rgb_data, width, height): + # Convert the RGB image to a PIL Image + pil_image = Image.frombytes('RGBA', (width, height), bytes(rgb_data)) + pil_image = pil_image.convert('RGB') + + # Resize the image while maintaining its aspect ratio + pil_image = resize_image_keep_aspect(pil_image, 320) + + # Save the image to a BytesIO object in JPEG format + buffered = BytesIO() + pil_image.save(buffered, format="JPEG") + # pil_image.save("test.jpg", format="JPEG") + + # Get the byte data of the JPEG image + jpeg_image_data = buffered.getvalue() + + # Convert the JPEG byte data to a Base64 encoded string + base64_encoded_image = b64encode(jpeg_image_data).decode('utf-8') + + # Create the data URL + mime_type = 'image/jpeg' + base64_url = f"data:{mime_type};base64,{base64_encoded_image}" + return base64_url + +def resize_image_keep_aspect(image, max_size=512): + """ + Resize an image while maintaining its aspect ratio, ensuring the larger dimension is max_size. + If both dimensions are smaller than max_size, the image is not resized. + + :param image: A PIL Image object + :param max_size: The maximum size for the larger dimension (width or height) + :return: A PIL Image object (resized or original) + """ + # Get current width and height + width, height = image.size + + # If both dimensions are already smaller than max_size, return the original image + if width <= max_size and height <= max_size: + return image + + # Calculate the aspect ratio + aspect_ratio = width / height + + # Determine the new dimensions + if width > height: + new_width = max_size + new_height = int(max_size / aspect_ratio) + else: + new_height = max_size + new_width = int(max_size * aspect_ratio) + + # Resize the image with the new dimensions + resized_image = image.resize((new_width, new_height)) + + return resized_image class OpenAIChatGPTExtension(Extension): memory = [] max_memory_length = 10 outdate_ts = 0 openai_chatgpt = None + enable_tools = False + image_data = None + image_width = 0 + image_height = 0 + + available_tools = [{ + "type": "function", + "function": { + # ensure you use gpt-4o or later model if you need image recognition, gpt-4o-mini does not work quite well in this case + "name": "get_vision_image", + "description": "Get the image from camera. Call this whenever you need to understand the input camera image like you have vision capability, for example when user asks 'What can you see?' or 'Can you see me?'", + }, + "strict": True, + }] def on_start(self, rte: RteEnv) -> None: logger.info("OpenAIChatGPTExtension on_start") @@ -162,6 +238,11 @@ def on_start(self, rte: RteEnv) -> None: except Exception as err: logger.info(f"GetProperty optional {PROPERTY_GREETING} failed, err: {err}") + try: + self.enable_tools = rte.get_property_bool(PROPERTY_ENABLE_TOOLS) + except Exception as err: + logger.info(f"GetProperty optional {PROPERTY_ENABLE_TOOLS} failed, err: {err}") + try: prop_max_memory_length = rte.get_property_int(PROPERTY_MAX_MEMORY_LENGTH) if prop_max_memory_length > 0: @@ -200,6 +281,11 @@ def on_stop(self, rte: RteEnv) -> None: logger.info("OpenAIChatGPTExtension on_stop") rte.on_stop_done() + def append_memory(self, message): + if len(self.memory) > self.max_memory_length: + self.memory.pop(0) + self.memory.append(message) + def on_cmd(self, rte: RteEnv, cmd: Cmd) -> None: logger.info("OpenAIChatGPTExtension on_cmd") cmd_json = cmd.to_json() @@ -223,6 +309,13 @@ def on_cmd(self, rte: RteEnv, cmd: Cmd) -> None: cmd_result.set_property_string("detail", "success") rte.return_result(cmd_result, cmd) + def on_image_frame(self, rte_env: RteEnv, image_frame: ImageFrame) -> None: + # logger.info(f"OpenAIChatGPTExtension on_image_frame {image_frame.get_width()} {image_frame.get_height()}") + self.image_data = image_frame.get_buf() + self.image_width = image_frame.get_width() + self.image_height = image_frame.get_height() + return + def on_data(self, rte: RteEnv, data: Data) -> None: """ on_data receives data from rte graph. @@ -258,108 +351,8 @@ def on_data(self, rte: RteEnv, data: Data) -> None: ) return - # Prepare memory - if len(self.memory) > self.max_memory_length: - self.memory.pop(0) - self.memory.append({"role": "user", "content": input_text}) - def chat_completions_stream_worker(start_time, input_text, memory): - try: - logger.info( - f"GetChatCompletionsStream for input text: [{input_text}] memory: {memory}" - ) - - # Get result from AI - resp = self.openai_chatgpt.get_chat_completions_stream(memory) - if resp is None: - logger.info( - f"GetChatCompletionsStream for input text: [{input_text}] failed" - ) - return - - sentence = "" - full_content = "" - first_sentence_sent = False - - for chat_completions in resp: - if start_time < self.outdate_ts: - logger.info( - f"GetChatCompletionsStream recv interrupt and flushing for input text: [{input_text}], startTs: {start_time}, outdateTs: {self.outdate_ts}" - ) - break - - if ( - len(chat_completions.choices) > 0 - and chat_completions.choices[0].delta.content is not None - ): - content = chat_completions.choices[0].delta.content - else: - content = "" - - full_content += content - - while True: - sentence, content, sentence_is_final = parse_sentence( - sentence, content - ) - if len(sentence) == 0 or not sentence_is_final: - logger.info(f"sentence {sentence} is empty or not final") - break - logger.info( - f"GetChatCompletionsStream recv for input text: [{input_text}] got sentence: [{sentence}]" - ) - - # send sentence - try: - output_data = Data.create("text_data") - output_data.set_property_string( - DATA_OUT_TEXT_DATA_PROPERTY_TEXT, sentence - ) - output_data.set_property_bool( - DATA_OUT_TEXT_DATA_PROPERTY_TEXT_END_OF_SEGMENT, False - ) - rte.send_data(output_data) - logger.info( - f"GetChatCompletionsStream recv for input text: [{input_text}] sent sentence [{sentence}]" - ) - except Exception as err: - logger.info( - f"GetChatCompletionsStream recv for input text: [{input_text}] send sentence [{sentence}] failed, err: {err}" - ) - break - - sentence = "" - if not first_sentence_sent: - first_sentence_sent = True - logger.info( - f"GetChatCompletionsStream recv for input text: [{input_text}] first sentence sent, first_sentence_latency {get_current_time() - start_time}ms" - ) - - # remember response as assistant content in memory - memory.append({"role": "assistant", "content": full_content}) - - # send end of segment - try: - output_data = Data.create("text_data") - output_data.set_property_string( - DATA_OUT_TEXT_DATA_PROPERTY_TEXT, sentence - ) - output_data.set_property_bool( - DATA_OUT_TEXT_DATA_PROPERTY_TEXT_END_OF_SEGMENT, True - ) - rte.send_data(output_data) - logger.info( - f"GetChatCompletionsStream for input text: [{input_text}] end of segment with sentence [{sentence}] sent" - ) - except Exception as err: - logger.info( - f"GetChatCompletionsStream for input text: [{input_text}] end of segment with sentence [{sentence}] send failed, err: {err}" - ) - - except Exception as e: - logger.info( - f"GetChatCompletionsStream for input text: [{input_text}] failed, err: {e}" - ) + self.chat_completion(rte, start_time, input_text, memory) # Start thread to request and read responses from OpenAI start_time = get_current_time() @@ -370,6 +363,107 @@ def chat_completions_stream_worker(start_time, input_text, memory): thread.start() logger.info(f"OpenAIChatGPTExtension on_data end") + def send_data(self, rte, sentence, end_of_segment, input_text): + try: + output_data = Data.create("text_data") + output_data.set_property_string(DATA_OUT_TEXT_DATA_PROPERTY_TEXT, sentence) + output_data.set_property_bool(DATA_OUT_TEXT_DATA_PROPERTY_TEXT_END_OF_SEGMENT, end_of_segment) + rte.send_data(output_data) + logger.info(f"for input text: [{input_text}] {'end of segment ' if end_of_segment else ''}sent sentence [{sentence}]") + except Exception as err: + logger.info(f"for input text: [{input_text}] send sentence [{sentence}] failed, err: {err}") + + def process_completions(self, chat_completions, rte, start_time, input_text, memory): + sentence = "" + full_content = "" + first_sentence_sent = False + + for chat_completion in chat_completions: + content = "" + if start_time < self.outdate_ts: + logger.info(f"recv interrupt and flushing for input text: [{input_text}], startTs: {start_time}, outdateTs: {self.outdate_ts}") + break + + # content = chat_completion.choices[0].delta.content if len(chat_completion.choices) > 0 and chat_completion.choices[0].delta.content is not None else "" + if ( + len(chat_completion.choices) > 0 + ): + if chat_completion.choices[0].delta.tool_calls is not None: + for tool_call in chat_completion.choices[0].delta.tool_calls: + logger.info(f"tool_call: {tool_call}") + if tool_call.function.name == "get_vision_image": + if full_content is "": + # if no text content, send a message to ask user to wait + self.send_data(rte, "Let me take a look...", True, input_text) + # for get_vision_image, re-run the completion with vision, memory should not be affected + self.chat_completion_with_vision(rte, start_time, input_text, memory) + return + elif chat_completion.choices[0].delta.content is not None: + content = chat_completion.choices[0].delta.content + else: + content = "" + + full_content += content + + while True: + sentence, content, sentence_is_final = parse_sentence(sentence, content) + if len(sentence) == 0 or not sentence_is_final: + logger.info(f"sentence {sentence} is empty or not final") + break + logger.info(f"recv for input text: [{input_text}] got sentence: [{sentence}]") + self.send_data(rte, sentence, False, input_text) + sentence = "" + + if not first_sentence_sent: + first_sentence_sent = True + logger.info(f"recv for input text: [{input_text}] first sentence sent, first_sentence_latency {get_current_time() - start_time}ms") + + + # memory is recorded only when completion is completely done, with single pair of user and assistant message + self.append_memory({"role": "user", "content": input_text}) + self.append_memory({"role": "assistant", "content": full_content}) + self.send_data(rte, sentence, True, input_text) + + def chat_completion_with_vision(self, rte: RteEnv, start_time, input_text, memory): + try: + logger.info(f"for input text: [{input_text}] memory: {memory}") + message = {"role": "user", "content": input_text} + + if self.image_data is not None: + url = rgb2base64jpeg(self.image_data, self.image_width, self.image_height) + message = {"role": "user", "content": [ + {"type": "text", "text": input_text}, + {"type": "image_url", "image_url": {"url": url}} + ]} + logger.info(f"msg: {message}") + + resp = self.openai_chatgpt.get_chat_completions_stream(memory + [message]) + if resp is None: + logger.error(f"get_chat_completions_stream Response is None: {input_text}") + return + + self.process_completions(resp, rte, start_time, input_text, memory) + + except Exception as e: + logger.error(f"err: {str(e)}: {input_text}") + + def chat_completion(self, rte: RteEnv, start_time, input_text, memory): + try: + logger.info(f"for input text: [{input_text}] memory: {memory}") + message = {"role": "user", "content": input_text} + + tools = self.available_tools if self.enable_tools else None + logger.info(f"chat_completion tools: {tools}") + resp = self.openai_chatgpt.get_chat_completions_stream(memory + [message], tools) + if resp is None: + logger.error(f"get_chat_completions_stream Response is None: {input_text}") + return + + self.process_completions(resp, rte, start_time, input_text, memory) + + except Exception as e: + logger.error(f"err: {traceback.format_exc()}: {input_text}") + @register_addon_as_extension("openai_chatgpt_python") class OpenAIChatGPTExtensionAddon(Addon): def on_create_instance(self, rte: RteEnv, addon_name: str, context) -> None: diff --git a/agents/addon/extension/openai_chatgpt_python/requirements.txt b/agents/addon/extension/openai_chatgpt_python/requirements.txt index de1e7f46..288b987d 100644 --- a/agents/addon/extension/openai_chatgpt_python/requirements.txt +++ b/agents/addon/extension/openai_chatgpt_python/requirements.txt @@ -1,2 +1,4 @@ openai==1.35.13 -requests==2.32.3 \ No newline at end of file +requests==2.32.3 +numpy==2.0.1 +pillow==10.4.0 \ No newline at end of file diff --git a/agents/property.json.example b/agents/property.json.example index 323c648d..7a7a80b7 100644 --- a/agents/property.json.example +++ b/agents/property.json.example @@ -1449,6 +1449,239 @@ ] } ] + }, + { + "name": "camera.va.openai.azure", + "auto_start": true, + "nodes": [ + { + "type": "extension", + "extension_group": "default", + "addon": "agora_rtc", + "name": "agora_rtc", + "property": { + "app_id": "", + "token": "", + "channel": "astra_agents_test", + "stream_id": 1234, + "remote_stream_id": 123, + "subscribe_audio": true, + "subscribe_video": true, + "publish_audio": true, + "publish_data": true, + "enable_agora_asr": true, + "agora_asr_vendor_name": "microsoft", + "agora_asr_language": "en-US", + "agora_asr_vendor_key": "", + "agora_asr_vendor_region": "", + "agora_asr_session_control_file_path": "session_control.conf", + "subscribe_video_pix_fmt": 4 + } + }, + { + "type": "extension", + "extension_group": "default", + "addon": "interrupt_detector", + "name": "interrupt_detector" + }, + { + "type": "extension", + "extension_group": "chatgpt", + "addon": "openai_chatgpt_python", + "name": "openai_chatgpt", + "property": { + "base_url": "", + "api_key": "", + "frequency_penalty": 0.9, + "model": "gpt-4o-mini", + "max_tokens": 512, + "prompt": "", + "proxy_url": "", + "greeting": "ASTRA agent connected. How can i help you today?", + "max_memory_length": 10, + "enable_tools": true + } + }, + { + "type": "extension", + "extension_group": "tts", + "addon": "azure_tts", + "name": "azure_tts", + "property": { + "azure_subscription_key": "", + "azure_subscription_region": "", + "azure_synthesis_voice_name": "en-US-JaneNeural" + } + }, + { + "type": "extension", + "extension_group": "transcriber", + "addon": "chat_transcriber", + "name": "chat_transcriber" + }, + { + "type": "extension_group", + "addon": "default_extension_group", + "name": "default" + }, + { + "type": "extension_group", + "addon": "default_extension_group", + "name": "chatgpt" + }, + { + "type": "extension_group", + "addon": "default_extension_group", + "name": "tts" + }, + { + "type": "extension_group", + "addon": "default_extension_group", + "name": "transcriber" + } + ], + "connections": [ + { + "extension_group": "default", + "extension": "agora_rtc", + "data": [ + { + "name": "text_data", + "dest": [ + { + "extension_group": "default", + "extension": "interrupt_detector" + }, + { + "extension_group": "chatgpt", + "extension": "openai_chatgpt" + }, + { + "extension_group": "transcriber", + "extension": "chat_transcriber" + } + ] + } + ], + "image_frame": [ + { + "name": "image_frame", + "dest": [ + { + "extension_group": "chatgpt", + "extension": "openai_chatgpt" + } + ] + } + ] + }, + { + "extension_group": "chatgpt", + "extension": "openai_chatgpt", + "data": [ + { + "name": "text_data", + "dest": [ + { + "extension_group": "tts", + "extension": "azure_tts" + }, + { + "extension_group": "transcriber", + "extension": "chat_transcriber", + "cmd_conversions": [ + { + "cmd": { + "type": "per_property", + "keep_original": true, + "rules": [ + { + "path": "is_final", + "type": "fixed_value", + "value": "bool(true)" + }, + { + "path": "stream_id", + "type": "fixed_value", + "value": "uint32(999)" + } + ] + } + } + ] + } + ] + } + ], + "cmd": [ + { + "name": "flush", + "dest": [ + { + "extension_group": "tts", + "extension": "azure_tts" + } + ] + } + ] + }, + { + "extension_group": "tts", + "extension": "azure_tts", + "pcm_frame": [ + { + "name": "pcm_frame", + "dest": [ + { + "extension_group": "default", + "extension": "agora_rtc" + } + ] + } + ], + "cmd": [ + { + "name": "flush", + "dest": [ + { + "extension_group": "default", + "extension": "agora_rtc" + } + ] + } + ] + }, + { + "extension_group": "transcriber", + "extension": "chat_transcriber", + "data": [ + { + "name": "data", + "dest": [ + { + "extension_group": "default", + "extension": "agora_rtc" + } + ] + } + ] + }, + { + "extension_group": "default", + "extension": "interrupt_detector", + "cmd": [ + { + "name": "flush", + "dest": [ + { + "extension_group": "chatgpt", + "extension": "openai_chatgpt" + } + ] + } + ] + } + ] } ] }