diff --git a/.vscode/settings.json b/.vscode/settings.json index cb4779f7..6c83cb6a 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,6 +1,7 @@ { "python.analysis.extraPaths": [ "./agents/ten_packages/system/ten_runtime_python/interface", + "./agents/ten_packages/system/ten_ai_base/interface", ], "C_Cpp.intelliSenseEngine": "disabled", "editor.formatOnSave": true, diff --git a/agents/bin/start b/agents/bin/start index 475c7f94..51c6f6cd 100755 --- a/agents/bin/start +++ b/agents/bin/start @@ -4,6 +4,7 @@ set -e cd "$(dirname "${BASH_SOURCE[0]}")/.." +export PYTHONPATH=/app/agents/ten_packages/system/ten_ai_base/interface:$PYTHONPATH export LD_LIBRARY_PATH=$(pwd)/ten_packages/system/agora_rtc_sdk/lib:$(pwd)/ten_packages/system/azure_speech_sdk/lib exec bin/worker "$@" diff --git a/agents/property.json b/agents/property.json index dbd877b6..18968b0c 100644 --- a/agents/property.json +++ b/agents/property.json @@ -1640,7 +1640,7 @@ }, { "name": "camera_va_openai_azure", - "auto_start": false, + "auto_start": true, "nodes": [ { "type": "extension", @@ -1649,7 +1649,7 @@ "name": "agora_rtc", "property": { "app_id": "${env:AGORA_APP_ID}", - "token": "", + "token": "", "channel": "ten_agent_test", "stream_id": 1234, "remote_stream_id": 123, @@ -1681,7 +1681,7 @@ "base_url": "${env:OPENAI_API_BASE}", "api_key": "${env:OPENAI_API_KEY}", "frequency_penalty": 0.9, - "model": "${env:OPENAI_MODEL}", + "model": "gpt-4o", "max_tokens": 512, "prompt": "", "proxy_url": "${env:OPENAI_PROXY_URL}", @@ -1691,6 +1691,12 @@ "enable_tools": true } }, + { + "type": "extension", + "extension_group": "chatgpt", + "addon": "vision_tool_python", + "name": "vision_tool" + }, { "type": "extension", "extension_group": "tts", @@ -1738,7 +1744,7 @@ "dest": [ { "extension_group": "chatgpt", - "extension": "openai_chatgpt" + "extension": "vision_tool" } ] } @@ -1791,6 +1797,30 @@ "extension": "azure_tts" } ] + }, + { + "name": "tool_call", + "dest": [ + { + "extension_group": "chatgpt", + "extension": "vision_tool" + } + ] + } + ] + }, + { + "extension_group": "chatgpt", + "extension": "vision_tool", + "cmd": [ + { + "name": "tool_register", + "dest": [ + { + "extension_group": "chatgpt", + "extension": "openai_chatgpt" + } + ] } ] }, @@ -2408,7 +2438,7 @@ }, { "name": "va_openai_v2v", - "auto_start": true, + "auto_start": false, "nodes": [ { "type": "extension", diff --git a/agents/scripts/install_deps_and_build.sh b/agents/scripts/install_deps_and_build.sh index df37ac83..28620af1 100755 --- a/agents/scripts/install_deps_and_build.sh +++ b/agents/scripts/install_deps_and_build.sh @@ -65,6 +65,15 @@ install_python_requirements() { done fi + # traverse the ten_packages/system directory to find the requirements.txt + if [[ -d "ten_packages/system" ]]; then + for extension in ten_packages/system/*; do + if [[ -f "$extension/requirements.txt" ]]; then + pip install -r $extension/requirements.txt + fi + done + fi + # pre-import llama-index as it cloud download additional resources during the first import echo "pre-import python modules..." python3.10 -c "import llama_index.core;" diff --git a/agents/ten_packages/extension/openai_chatgpt_python/extension.py b/agents/ten_packages/extension/openai_chatgpt_python/extension.py index 15947832..fce1973c 100644 --- a/agents/ten_packages/extension/openai_chatgpt_python/extension.py +++ b/agents/ten_packages/extension/openai_chatgpt_python/extension.py @@ -9,19 +9,21 @@ import json import traceback -from .helper import AsyncEventEmitter, AsyncQueue, get_current_time, get_property_bool, get_property_float, get_property_int, get_property_string, parse_sentences, rgb2base64jpeg +from ten.async_ten_env import AsyncTenEnv +from ten.ten_env import TenEnv +from ten_ai_base.const import CMD_PROPERTY_RESULT, CMD_TOOL_CALL +from ten_ai_base.helper import AsyncEventEmitter, get_properties_int, get_properties_string, get_properties_float, get_property_bool, get_property_int, get_property_string +from ten_ai_base.llm import AsyncLLMBaseExtension +from ten_ai_base.types import LLMCallCompletionArgs, LLMCompletionContentItemAudio, LLMCompletionContentItemImage, LLMCompletionContentItemText, LLMCompletionContentItem, LLMDataCompletionArgs, LLMToolMetadata, LLMToolResult + +from .helper import parse_sentences, rgb2base64jpeg from .openai import OpenAIChatGPT, OpenAIChatGPTConfig from ten import ( - AudioFrame, - VideoFrame, - AsyncExtension, - TenEnv, Cmd, StatusCode, CmdResult, Data, ) -from .log import logger CMD_IN_FLUSH = "flush" CMD_IN_ON_USER_JOINED = "on_user_joined" @@ -42,97 +44,49 @@ PROPERTY_TOP_P = "top_p" # Optional PROPERTY_MAX_TOKENS = "max_tokens" # Optional PROPERTY_GREETING = "greeting" # Optional -PROPERTY_ENABLE_TOOLS = "enable_tools" # Optional PROPERTY_PROXY_URL = "proxy_url" # Optional PROPERTY_MAX_MEMORY_LENGTH = "max_memory_length" # Optional -PROPERTY_CHECKING_VISION_TEXT_ITEMS = "checking_vision_text_items" # Optional - - -TASK_TYPE_CHAT_COMPLETION = "chat_completion" -TASK_TYPE_CHAT_COMPLETION_WITH_VISION = "chat_completion_with_vision" - - -class OpenAIChatGPTExtension(AsyncExtension): - memory = [] - max_memory_length = 10 - openai_chatgpt = None - enable_tools = False - image_data = None - image_width = 0 - image_height = 0 - checking_vision_text_items = [] - loop = None - sentence_fragment = "" - - # Create the queue for message processing - queue = AsyncQueue() - - available_tools = [ - { - "type": "function", - "function": { - # ensure you use gpt-4o or later model if you need image recognition, gpt-4o-mini does not work quite well in this case - "name": "get_vision_image", - "description": "Get the image from camera. Call this whenever you need to understand the input camera image like you have vision capability, for example when user asks 'What can you see?' or 'Can you see me?'", - }, - "strict": True, - } - ] - - async def on_init(self, ten_env: TenEnv) -> None: + + +class OpenAIChatGPTExtension(AsyncLLMBaseExtension): + def __init__(self, name: str): + super().__init__(name) + self.memory = [] + self.max_memory_length = 10 + self.openai_chatgpt = None + self.sentence_fragment = "" + + async def on_init(self, ten_env: AsyncTenEnv) -> None: ten_env.log_info("on_init") + await super().on_init(ten_env) ten_env.on_init_done() - async def on_start(self, ten_env: TenEnv) -> None: + async def on_start(self, ten_env: AsyncTenEnv) -> None: ten_env.log_info("on_start") - - self.loop = asyncio.get_event_loop() - self.loop.create_task(self._process_queue(ten_env)) + await super().on_start(ten_env) # Prepare configuration openai_chatgpt_config = OpenAIChatGPTConfig.default_config() # Mandatory properties - openai_chatgpt_config.base_url = get_property_string( - ten_env, PROPERTY_BASE_URL) or openai_chatgpt_config.base_url - openai_chatgpt_config.api_key = get_property_string( - ten_env, PROPERTY_API_KEY) + get_properties_string(ten_env, [PROPERTY_BASE_URL, PROPERTY_API_KEY], lambda name, value: setattr( + openai_chatgpt_config, name, value or getattr(openai_chatgpt_config, name))) if not openai_chatgpt_config.api_key: ten_env.log_info(f"API key is missing, exiting on_start") return # Optional properties - openai_chatgpt_config.model = get_property_string( - ten_env, PROPERTY_MODEL) or openai_chatgpt_config.model - openai_chatgpt_config.prompt = get_property_string( - ten_env, PROPERTY_PROMPT) or openai_chatgpt_config.prompt - openai_chatgpt_config.frequency_penalty = get_property_float( - ten_env, PROPERTY_FREQUENCY_PENALTY) or openai_chatgpt_config.frequency_penalty - openai_chatgpt_config.presence_penalty = get_property_float( - ten_env, PROPERTY_PRESENCE_PENALTY) or openai_chatgpt_config.presence_penalty - openai_chatgpt_config.temperature = get_property_float( - ten_env, PROPERTY_TEMPERATURE) or openai_chatgpt_config.temperature - openai_chatgpt_config.top_p = get_property_float( - ten_env, PROPERTY_TOP_P) or openai_chatgpt_config.top_p - openai_chatgpt_config.max_tokens = get_property_int( - ten_env, PROPERTY_MAX_TOKENS) or openai_chatgpt_config.max_tokens - openai_chatgpt_config.proxy_url = get_property_string( - ten_env, PROPERTY_PROXY_URL) or openai_chatgpt_config.proxy_url + get_properties_string(ten_env, [PROPERTY_MODEL, PROPERTY_PROMPT, PROPERTY_PROXY_URL], lambda name, value: setattr( + openai_chatgpt_config, name, value or getattr(openai_chatgpt_config, name))) + get_properties_float(ten_env, [PROPERTY_FREQUENCY_PENALTY, PROPERTY_PRESENCE_PENALTY, PROPERTY_TEMPERATURE, PROPERTY_TOP_P], lambda name, value: setattr( + openai_chatgpt_config, name, value or getattr(openai_chatgpt_config, name))) + get_properties_int(ten_env, [PROPERTY_MAX_TOKENS], lambda name, value: setattr( + openai_chatgpt_config, name, value or getattr(openai_chatgpt_config, name))) # Properties that don't affect openai_chatgpt_config self.greeting = get_property_string(ten_env, PROPERTY_GREETING) - self.enable_tools = get_property_bool(ten_env, PROPERTY_ENABLE_TOOLS) self.max_memory_length = get_property_int( ten_env, PROPERTY_MAX_MEMORY_LENGTH) - checking_vision_text_items_str = get_property_string( - ten_env, PROPERTY_CHECKING_VISION_TEXT_ITEMS) - if checking_vision_text_items_str: - try: - self.checking_vision_text_items = json.loads( - checking_vision_text_items_str) - except Exception as err: - ten_env.log_info( - f"Error parsing {PROPERTY_CHECKING_VISION_TEXT_ITEMS}: {err}") self.users_count = 0 # Create instance @@ -145,147 +99,130 @@ async def on_start(self, ten_env: TenEnv) -> None: ten_env.on_start_done() - async def on_stop(self, ten_env: TenEnv) -> None: + async def on_stop(self, ten_env: AsyncTenEnv) -> None: ten_env.log_info("on_stop") - - # TODO: clean up resources - + await super().on_stop(ten_env) ten_env.on_stop_done() - async def on_deinit(self, ten_env: TenEnv) -> None: + async def on_deinit(self, ten_env: AsyncTenEnv) -> None: ten_env.log_info("on_deinit") + await super().on_deinit(ten_env) ten_env.on_deinit_done() - async def on_cmd(self, ten_env: TenEnv, cmd: Cmd) -> None: + async def on_cmd(self, ten_env: AsyncTenEnv, cmd: Cmd) -> None: cmd_name = cmd.get_name() ten_env.log_info(f"on_cmd name: {cmd_name}") + if cmd_name == CMD_IN_FLUSH: - await self._flush_queue(ten_env) - ten_env.send_cmd(Cmd.create(CMD_OUT_FLUSH), None) + await self.flush_input_items(ten_env) + await ten_env.send_cmd(Cmd.create(CMD_OUT_FLUSH)) ten_env.log_info("on_cmd sent flush") status_code, detail = StatusCode.OK, "success" + cmd_result = CmdResult.create(status_code) + cmd_result.set_property_string("detail", detail) + ten_env.return_result(cmd_result, cmd) elif cmd_name == CMD_IN_ON_USER_JOINED: self.users_count += 1 # Send greeting when first user joined if self.greeting and self.users_count == 1: - try: - output_data = Data.create("text_data") - output_data.set_property_string( - DATA_OUT_TEXT_DATA_PROPERTY_TEXT, self.greeting) - output_data.set_property_bool( - DATA_OUT_TEXT_DATA_PROPERTY_TEXT_END_OF_SEGMENT, True) - ten_env.send_data(output_data) - ten_env.log_info(f"Greeting [{self.greeting}] sent") - except Exception as err: - ten_env.log_info( - f"Failed to send greeting [{self.greeting}]: {err}") + self.send_text_output(ten_env, self.greeting, True) status_code, detail = StatusCode.OK, "success" + cmd_result = CmdResult.create(status_code) + cmd_result.set_property_string("detail", detail) + ten_env.return_result(cmd_result, cmd) elif cmd_name == CMD_IN_ON_USER_LEFT: self.users_count -= 1 status_code, detail = StatusCode.OK, "success" + cmd_result = CmdResult.create(status_code) + cmd_result.set_property_string("detail", detail) + ten_env.return_result(cmd_result, cmd) else: - ten_env.log_info(f"on_cmd unknown cmd: {cmd_name}") - status_code, detail = StatusCode.ERROR, "unknown cmd" + await super().on_cmd(ten_env, cmd) + - cmd_result = CmdResult.create(status_code) - cmd_result.set_property_string("detail", detail) - ten_env.return_result(cmd_result, cmd) + async def on_data(self, ten_env: AsyncTenEnv, data: Data) -> None: + data_name = data.get_name() + ten_env.log_debug("on_data name {}".format(data_name)) - async def on_data(self, ten_env: TenEnv, data: Data) -> None: # Get the necessary properties - is_final = get_property_bool(data, DATA_IN_TEXT_DATA_PROPERTY_IS_FINAL) - input_text = get_property_string(data, DATA_IN_TEXT_DATA_PROPERTY_TEXT) + is_final = get_property_bool(data, "is_final") + input_text = get_property_string(data, "text") if not is_final: - ten_env.log_info("ignore non-final input") + ten_env.log_debug("ignore non-final input") return if not input_text: - ten_env.log_info("ignore empty text") + ten_env.log_warn("ignore empty text") return ten_env.log_info(f"OnData input text: [{input_text}]") # Start an asynchronous task for handling chat completion - await self.queue.put([TASK_TYPE_CHAT_COMPLETION, input_text]) - - async def on_audio_frame(self, ten_env: TenEnv, audio_frame: AudioFrame) -> None: - # TODO: process pcm frame - pass - - async def on_video_frame(self, ten_env: TenEnv, video_frame: VideoFrame) -> None: - # ten_env.log_info(f"OpenAIChatGPTExtension on_video_frame {frame.get_width()} {frame.get_height()}") - self.image_data = video_frame.get_buf() - self.image_width = video_frame.get_width() - self.image_height = video_frame.get_height() - return - - async def _process_queue(self, ten_env: TenEnv): - """Asynchronously process queue items one by one.""" - while True: - # Wait for an item to be available in the queue - [task_type, message] = await self.queue.get() - try: - # Create a new task for the new message - self.current_task = asyncio.create_task( - self._run_chatflow(ten_env, task_type, message, self.memory)) - await self.current_task # Wait for the current task to finish or be cancelled - except asyncio.CancelledError: - ten_env.log_info(f"Task cancelled: {message}") - - async def _flush_queue(self, ten_env: TenEnv): - """Flushes the self.queue and cancels the current task.""" - # Flush the queue using the new flush method - await self.queue.flush() - - # Cancel the current task if one is running - if self.current_task: - ten_env.log_info("Cancelling the current task during flush.") - self.current_task.cancel() - - async def _run_chatflow(self, ten_env: TenEnv, task_type: str, input_text: str, memory): + await self.queue_input_item(False, content=[LLMCompletionContentItemText(text=input_text)]) + + async def on_tools_update(self, ten_env: TenEnv, tool: LLMToolMetadata) -> None: + return await super().on_tools_update(ten_env, tool) + + async def on_call_chat_completion(self, ten_env: TenEnv, **kargs: LLMCallCompletionArgs) -> None: + return await super().on_call_chat_completion(ten_env, **kargs) + + async def on_data_chat_completion(self, ten_env: TenEnv, **kargs: LLMDataCompletionArgs) -> None: """Run the chatflow asynchronously.""" + kcontent = kargs.get("content", []) + content = [] + for item in kcontent: + content.append(self._contentItem_to_dict(ten_env, item)) memory_cache = [] + memory = self.memory try: - ten_env.log_info(f"for input text: [{input_text}] memory: {memory}") + ten_env.log_info(f"for input text: [{content}] memory: {memory}") message = None tools = None + no_tool = kargs.get("no_tool", False) + + message = {"role": "user", "content": content} + non_artifact_content = [item for item in content if item.get("type") == "text"] + non_artifact_message = {"role": "user", "content": non_artifact_content} + memory_cache = memory_cache + [non_artifact_message, {"role": "assistant", "content": ""}] + tools = [] if not no_tool and len(self.available_tools) > 0 else None + for tool in self.available_tools: + tools.append({ + "type": "function", + "function": { + "name": tool.name, + "description": tool.description, + }, + "strict": True + }) - # Prepare the message and tools based on the task type - if task_type == TASK_TYPE_CHAT_COMPLETION: - message = {"role": "user", "content": input_text} - memory_cache = memory_cache + \ - [message, {"role": "assistant", "content": ""}] - tools = self.available_tools if self.enable_tools else None - elif task_type == TASK_TYPE_CHAT_COMPLETION_WITH_VISION: - message = {"role": "user", "content": input_text} - memory_cache = memory_cache + \ - [message, {"role": "assistant", "content": ""}] - tools = self.available_tools if self.enable_tools else None - if self.image_data is not None: - url = rgb2base64jpeg( - self.image_data, self.image_width, self.image_height) - message = { - "role": "user", - "content": [ - {"type": "text", "text": input_text}, - {"type": "image_url", "image_url": {"url": url}}, - ], - } - ten_env.log_info(f"msg with vision data: {message}") self.sentence_fragment = "" # Create an asyncio.Event to signal when content is finished content_finished_event = asyncio.Event() + # Create a future to track the single tool call task + tool_task_future = None # Create an async listener to handle tool calls and content updates async def handle_tool_call(tool_call): ten_env.log_info(f"tool_call: {tool_call}") - if tool_call.function.name == "get_vision_image": - # Append the vision image to the last assistant message - await self.queue.put([TASK_TYPE_CHAT_COMPLETION_WITH_VISION, input_text], True) + for tool in self.available_tools: + if tool_call.function.name == tool.name: + cmd:Cmd = Cmd.create(CMD_TOOL_CALL) + cmd.set_property_string("name", tool.name) + # cmd.set_property_from_json("arguments", json.dumps(tool_call.arguments)) + cmd.set_property_from_json("arguments", json.dumps([])) + + # Send the command and handle the result through the future + result: CmdResult = await ten_env.send_cmd(cmd) + if result.get_status_code() == StatusCode.OK: + tool_result = LLMToolResult.model_validate_json(json.loads(result.get_property_to_json(CMD_PROPERTY_RESULT))) + ten_env.log_info(f"tool_result: {tool_result}") + await self.queue_input_item(True, content=tool_result.items) + else: + ten_env.log_error(f"Tool call failed: {result.get_property_to_json('error')}") async def handle_content_update(content: str): # Append the content to the last assistant message @@ -296,9 +233,12 @@ async def handle_content_update(content: str): sentences, self.sentence_fragment = parse_sentences( self.sentence_fragment, content) for s in sentences: - self._send_data(ten_env, s, False) + self.send_text_output(ten_env, s, False) async def handle_content_finished(full_content: str): + # Wait for the single tool task to complete (if any) + if tool_task_future: + await tool_task_future content_finished_event.set() listener = AsyncEventEmitter() @@ -312,34 +252,29 @@ async def handle_content_finished(full_content: str): # Wait for the content to be finished await content_finished_event.wait() except asyncio.CancelledError: - ten_env.log_info(f"Task cancelled: {input_text}") + ten_env.log_info(f"Task cancelled: {content}") except Exception as e: - logger.error( - f"Error in chat_completion: {traceback.format_exc()} for input text: {input_text}") + ten_env.log_error( + f"Error in chat_completion: {traceback.format_exc()} for input text: {content}") finally: - self._send_data(ten_env, "", True) + self.send_text_output(ten_env, "", True) # always append the memory for m in memory_cache: self._append_memory(m) + def _contentItem_to_dict(self, ten_env: TenEnv , item: LLMCompletionContentItem): + if isinstance(item, LLMCompletionContentItemText): + return {"type": "text", "text": item.text} + elif isinstance(item, LLMCompletionContentItemImage): + return {"type": "image_url", "image_url": {"url": item.image}} + elif isinstance(item, LLMCompletionContentItemAudio): + return {"type": "audio_url", "audio_url": {"url": item.audio}} + else: + ten_env.log_warn(f"Unknown content item type") + return None + + def _append_memory(self, message: str): if len(self.memory) > self.max_memory_length: self.memory.pop(0) self.memory.append(message) - - def _send_data(self, ten_env: TenEnv, sentence: str, end_of_segment: bool): - try: - output_data = Data.create("text_data") - output_data.set_property_string( - DATA_OUT_TEXT_DATA_PROPERTY_TEXT, sentence) - output_data.set_property_bool( - DATA_OUT_TEXT_DATA_PROPERTY_TEXT_END_OF_SEGMENT, end_of_segment - ) - ten_env.send_data(output_data) - ten_env.log_info( - f"{'end of segment ' if end_of_segment else ''}sent sentence [{sentence}]" - ) - except Exception as err: - ten_env.log_info( - f"send sentence [{sentence}] failed, err: {err}" - ) diff --git a/agents/ten_packages/extension/openai_chatgpt_python/helper.py b/agents/ten_packages/extension/openai_chatgpt_python/helper.py index 28c28f19..749916c0 100644 --- a/agents/ten_packages/extension/openai_chatgpt_python/helper.py +++ b/agents/ten_packages/extension/openai_chatgpt_python/helper.py @@ -14,40 +14,6 @@ from io import BytesIO from base64 import b64encode - -def get_property_bool(data: Data, property_name: str) -> bool: - """Helper to get boolean property from data with error handling.""" - try: - return data.get_property_bool(property_name) - except Exception as err: - logger.warn(f"GetProperty {property_name} failed: {err}") - return False - -def get_property_string(data: Data, property_name: str) -> str: - """Helper to get string property from data with error handling.""" - try: - return data.get_property_string(property_name) - except Exception as err: - logger.warn(f"GetProperty {property_name} failed: {err}") - return "" - -def get_property_int(data: Data, property_name: str) -> int: - """Helper to get int property from data with error handling.""" - try: - return data.get_property_int(property_name) - except Exception as err: - logger.warn(f"GetProperty {property_name} failed: {err}") - return 0 - -def get_property_float(data: Data, property_name: str) -> float: - """Helper to get float property from data with error handling.""" - try: - return data.get_property_float(property_name) - except Exception as err: - logger.warn(f"GetProperty {property_name} failed: {err}") - return 0.0 - - def get_current_time(): # Get the current time start_time = datetime.now() @@ -134,54 +100,4 @@ def resize_image_keep_aspect(image, max_size=512): # Resize the image with the new dimensions resized_image = image.resize((new_width, new_height)) - return resized_image - - -class AsyncEventEmitter: - def __init__(self): - self.listeners = {} - - def on(self, event_name, listener): - """Register an event listener.""" - if event_name not in self.listeners: - self.listeners[event_name] = [] - self.listeners[event_name].append(listener) - - def emit(self, event_name, *args, **kwargs): - """Fire the event without waiting for listeners to finish.""" - if event_name in self.listeners: - for listener in self.listeners[event_name]: - asyncio.create_task(listener(*args, **kwargs)) - - -class AsyncQueue: - def __init__(self): - self._queue = deque() # Use deque for efficient prepend and append - self._condition = asyncio.Condition() # Use Condition to manage access - - async def put(self, item, prepend=False): - """Add an item to the queue (prepend if specified).""" - async with self._condition: - if prepend: - self._queue.appendleft(item) # Prepend item to the front - else: - self._queue.append(item) # Append item to the back - self._condition.notify() - - async def get(self): - """Remove and return an item from the queue.""" - async with self._condition: - while not self._queue: - await self._condition.wait() # Wait until an item is available - return self._queue.popleft() # Pop from the front of the deque - - async def flush(self): - """Flush all items from the queue.""" - async with self._condition: - while self._queue: - self._queue.popleft() # Clear the queue - self._condition.notify_all() # Notify all consumers that the queue is empty - - def __len__(self): - """Return the current size of the queue.""" - return len(self._queue) \ No newline at end of file + return resized_image \ No newline at end of file diff --git a/agents/ten_packages/extension/openai_chatgpt_python/manifest.json b/agents/ten_packages/extension/openai_chatgpt_python/manifest.json index 68423776..240dbe72 100644 --- a/agents/ten_packages/extension/openai_chatgpt_python/manifest.json +++ b/agents/ten_packages/extension/openai_chatgpt_python/manifest.json @@ -51,17 +51,11 @@ "greeting": { "type": "string" }, - "checking_vision_text_items": { - "type": "string" - }, "proxy_url": { "type": "string" }, "max_memory_length": { "type": "int64" - }, - "enable_tools": { - "type": "bool" } }, "data_in": [ diff --git a/agents/ten_packages/extension/vision_tool_python/BUILD.gn b/agents/ten_packages/extension/vision_tool_python/BUILD.gn new file mode 100644 index 00000000..284103e8 --- /dev/null +++ b/agents/ten_packages/extension/vision_tool_python/BUILD.gn @@ -0,0 +1,20 @@ +# +# This file is part of TEN Framework, an open source project. +# Licensed under the Apache License, Version 2.0. +# See the LICENSE file for more information. +# +import("//build/feature/ten_package.gni") + +ten_package("vision_tool_python") { + package_kind = "extension" + + resources = [ + "__init__.py", + "addon.py", + "extension.py", + "log.py", + "manifest.json", + "property.json", + "tests", + ] +} diff --git a/agents/ten_packages/extension/vision_tool_python/README.md b/agents/ten_packages/extension/vision_tool_python/README.md new file mode 100644 index 00000000..b00a3151 --- /dev/null +++ b/agents/ten_packages/extension/vision_tool_python/README.md @@ -0,0 +1,29 @@ +# vision_tool_python + + + +## Features + + + +- xxx feature + +## API + +Refer to `api` definition in [manifest.json] and default values in [property.json](property.json). + + + +## Development + +### Build + + + +### Unit test + + + +## Misc + + diff --git a/agents/ten_packages/extension/vision_tool_python/__init__.py b/agents/ten_packages/extension/vision_tool_python/__init__.py new file mode 100644 index 00000000..56edc718 --- /dev/null +++ b/agents/ten_packages/extension/vision_tool_python/__init__.py @@ -0,0 +1,9 @@ +# +# This file is part of TEN Framework, an open source project. +# Licensed under the Apache License, Version 2.0. +# See the LICENSE file for more information. +# +from . import addon +from .log import logger + +logger.info("vision_tool_python extension loaded") diff --git a/agents/ten_packages/extension/vision_tool_python/addon.py b/agents/ten_packages/extension/vision_tool_python/addon.py new file mode 100644 index 00000000..f7cd01a5 --- /dev/null +++ b/agents/ten_packages/extension/vision_tool_python/addon.py @@ -0,0 +1,20 @@ +# +# This file is part of TEN Framework, an open source project. +# Licensed under the Apache License, Version 2.0. +# See the LICENSE file for more information. +# +from ten import ( + Addon, + register_addon_as_extension, + TenEnv, +) +from .extension import VisionToolExtension +from .log import logger + + +@register_addon_as_extension("vision_tool_python") +class VisionToolExtensionAddon(Addon): + + def on_create_instance(self, ten_env: TenEnv, name: str, context) -> None: + logger.info("VisionToolExtensionAddon on_create_instance") + ten_env.on_create_instance_done(VisionToolExtension(name), context) diff --git a/agents/ten_packages/extension/vision_tool_python/extension.py b/agents/ten_packages/extension/vision_tool_python/extension.py new file mode 100644 index 00000000..aaf1e456 --- /dev/null +++ b/agents/ten_packages/extension/vision_tool_python/extension.py @@ -0,0 +1,148 @@ +# +# This file is part of TEN Framework, an open source project. +# Licensed under the Apache License, Version 2.0. +# See the LICENSE file for more information. +# +from ten_ai_base.llm_tool import AsyncLLMToolBaseExtension, LLMToolMetadata, LLMToolResult +from ten import ( + AudioFrame, + VideoFrame, + AsyncTenEnv, + Cmd, + Data, +) +from PIL import Image +from io import BytesIO +from base64 import b64encode + +from ten_ai_base.types import LLMCompletionContentItemImage + + +def rgb2base64jpeg(rgb_data, width, height): + # Convert the RGB image to a PIL Image + pil_image = Image.frombytes("RGBA", (width, height), bytes(rgb_data)) + pil_image = pil_image.convert("RGB") + + # Resize the image while maintaining its aspect ratio + pil_image = resize_image_keep_aspect(pil_image, 320) + + # Save the image to a BytesIO object in JPEG format + buffered = BytesIO() + pil_image.save(buffered, format="JPEG") + # pil_image.save("test.jpg", format="JPEG") + + # Get the byte data of the JPEG image + jpeg_image_data = buffered.getvalue() + + # Convert the JPEG byte data to a Base64 encoded string + base64_encoded_image = b64encode(jpeg_image_data).decode("utf-8") + + # Create the data URL + mime_type = "image/jpeg" + base64_url = f"data:{mime_type};base64,{base64_encoded_image}" + return base64_url + + +def resize_image_keep_aspect(image, max_size=512): + """ + Resize an image while maintaining its aspect ratio, ensuring the larger dimension is max_size. + If both dimensions are smaller than max_size, the image is not resized. + + :param image: A PIL Image object + :param max_size: The maximum size for the larger dimension (width or height) + :return: A PIL Image object (resized or original) + """ + # Get current width and height + width, height = image.size + + # If both dimensions are already smaller than max_size, return the original image + if width <= max_size and height <= max_size: + return image + + # Calculate the aspect ratio + aspect_ratio = width / height + + # Determine the new dimensions + if width > height: + new_width = max_size + new_height = int(max_size / aspect_ratio) + else: + new_height = max_size + new_width = int(max_size * aspect_ratio) + + # Resize the image with the new dimensions + resized_image = image.resize((new_width, new_height)) + + return resized_image + + +class VisionToolExtension(AsyncLLMToolBaseExtension): + image_data = None + image_width = 0 + image_height = 0 + + async def on_init(self, ten_env: AsyncTenEnv) -> None: + ten_env.log_debug("on_init") + ten_env.on_init_done() + + async def on_start(self, ten_env: AsyncTenEnv) -> None: + ten_env.log_debug("on_start") + await super().on_start(ten_env) + ten_env.on_start_done() + + async def on_stop(self, ten_env: AsyncTenEnv) -> None: + ten_env.log_debug("on_stop") + + # TODO: clean up resources + + ten_env.on_stop_done() + + async def on_deinit(self, ten_env: AsyncTenEnv) -> None: + ten_env.log_debug("on_deinit") + ten_env.on_deinit_done() + + async def on_cmd(self, ten_env: AsyncTenEnv, cmd: Cmd) -> None: + cmd_name = cmd.get_name() + ten_env.log_debug("on_cmd name {}".format(cmd_name)) + + await super().on_cmd(ten_env, cmd) + + async def on_data(self, ten_env: AsyncTenEnv, data: Data) -> None: + data_name = data.get_name() + ten_env.log_debug("on_data name {}".format(data_name)) + + # TODO: process data + pass + + async def on_audio_frame(self, ten_env: AsyncTenEnv, audio_frame: AudioFrame) -> None: + audio_frame_name = audio_frame.get_name() + ten_env.log_debug("on_audio_frame name {}".format(audio_frame_name)) + + # TODO: process audio frame + pass + + async def on_video_frame(self, ten_env: AsyncTenEnv, video_frame: VideoFrame) -> None: + video_frame_name = video_frame.get_name() + ten_env.log_debug("on_video_frame name {}".format(video_frame_name)) + + self.image_data = video_frame.get_buf() + self.image_width = video_frame.get_width() + self.image_height = video_frame.get_height() + + def get_tool_metadata(self) -> list[LLMToolMetadata]: + return [ + LLMToolMetadata( + name="get_vision_tool", + description="Get the image from camera. Call this whenever you need to understand the input camera image like you have vision capability, for example when user asks 'What can you see?' or 'Can you see me?'", + parameters=[], + ) + ] + + async def run_tool(self, name: str, args: dict) -> LLMToolResult: + if name == "get_vision_tool": + if self.image_data is None: + raise Exception("No image data available") + + base64_image = rgb2base64jpeg(self.image_data, self.image_width, self.image_height) + result = LLMCompletionContentItemImage(image=base64_image) + return LLMToolResult(items=[result]) \ No newline at end of file diff --git a/agents/ten_packages/extension/vision_tool_python/log.py b/agents/ten_packages/extension/vision_tool_python/log.py new file mode 100644 index 00000000..6e4e495f --- /dev/null +++ b/agents/ten_packages/extension/vision_tool_python/log.py @@ -0,0 +1,20 @@ +# +# This file is part of TEN Framework, an open source project. +# Licensed under the Apache License, Version 2.0. +# See the LICENSE file for more information. +# +import logging + +logger = logging.getLogger("vision_tool_python") +logger.setLevel(logging.INFO) + +formatter_str = ( + "%(asctime)s - %(name)s - %(levelname)s - %(process)d - " + "[%(filename)s:%(lineno)d] - %(message)s" +) +formatter = logging.Formatter(formatter_str) + +console_handler = logging.StreamHandler() +console_handler.setFormatter(formatter) + +logger.addHandler(console_handler) diff --git a/agents/ten_packages/extension/vision_tool_python/manifest.json b/agents/ten_packages/extension/vision_tool_python/manifest.json new file mode 100644 index 00000000..375ed577 --- /dev/null +++ b/agents/ten_packages/extension/vision_tool_python/manifest.json @@ -0,0 +1,24 @@ +{ + "type": "extension", + "name": "vision_tool_python", + "version": "0.3.0", + "dependencies": [ + { + "type": "system", + "name": "ten_runtime_python", + "version": "0.3.0" + } + ], + "package": { + "include": [ + "manifest.json", + "property.json", + "BUILD.gn", + "**.tent", + "**.py", + "README.md", + "tests/**" + ] + }, + "api": {} +} \ No newline at end of file diff --git a/agents/ten_packages/extension/vision_tool_python/property.json b/agents/ten_packages/extension/vision_tool_python/property.json new file mode 100644 index 00000000..9e26dfee --- /dev/null +++ b/agents/ten_packages/extension/vision_tool_python/property.json @@ -0,0 +1 @@ +{} \ No newline at end of file diff --git a/agents/ten_packages/extension/vision_tool_python/tests/test_basic.py b/agents/ten_packages/extension/vision_tool_python/tests/test_basic.py new file mode 100644 index 00000000..019740a3 --- /dev/null +++ b/agents/ten_packages/extension/vision_tool_python/tests/test_basic.py @@ -0,0 +1,36 @@ +# +# Copyright © 2024 Agora +# This file is part of TEN Framework, an open source project. +# Licensed under the Apache License, Version 2.0, with certain conditions. +# Refer to the "LICENSE" file in the root directory for more information. +# +from pathlib import Path +from ten import ExtensionTester, TenEnvTester, Cmd, CmdResult, StatusCode + + +class ExtensionTesterBasic(ExtensionTester): + def check_hello(self, ten_env: TenEnvTester, result: CmdResult): + statusCode = result.get_status_code() + print("receive hello_world, status:" + str(statusCode)) + + if statusCode == StatusCode.OK: + ten_env.stop_test() + + def on_start(self, ten_env: TenEnvTester) -> None: + new_cmd = Cmd.create("hello_world") + + print("send hello_world") + ten_env.send_cmd( + new_cmd, + lambda ten_env, result: self.check_hello(ten_env, result), + ) + + print("tester on_start_done") + ten_env.on_start_done() + + +def test_basic(): + tester = ExtensionTesterBasic() + tester.add_addon_base_dir(str(Path(__file__).resolve().parent.parent)) + tester.set_test_mode_single("default_extension_python") + tester.run() diff --git a/agents/ten_packages/system/ten_ai_base/interface/ten_ai_base/__init__.py b/agents/ten_packages/system/ten_ai_base/interface/ten_ai_base/__init__.py new file mode 100644 index 00000000..da402faf --- /dev/null +++ b/agents/ten_packages/system/ten_ai_base/interface/ten_ai_base/__init__.py @@ -0,0 +1,5 @@ +# +# This file is part of TEN Framework, an open source project. +# Licensed under the Apache License, Version 2.0. +# See the LICENSE file for more information. +# diff --git a/agents/ten_packages/system/ten_ai_base/interface/ten_ai_base/const.py b/agents/ten_packages/system/ten_ai_base/interface/ten_ai_base/const.py new file mode 100644 index 00000000..85654216 --- /dev/null +++ b/agents/ten_packages/system/ten_ai_base/interface/ten_ai_base/const.py @@ -0,0 +1,8 @@ +CMD_TOOL_REGISTER = "tool_register" +CMD_TOOL_CALL = "tool_call" +CMD_PROPERTY_TOOL = "tool" +CMD_PROPERTY_RESULT = "tool_result" + +DATA_OUTPUT_NAME = "text_data" +DATA_OUTPUT_PROPERTY_TEXT = "text" +DATA_OUTPUT_PROPERTY_END_OF_SEGMENT = "end_of_segment" \ No newline at end of file diff --git a/agents/ten_packages/system/ten_ai_base/interface/ten_ai_base/helper.py b/agents/ten_packages/system/ten_ai_base/interface/ten_ai_base/helper.py new file mode 100644 index 00000000..9bf012d8 --- /dev/null +++ b/agents/ten_packages/system/ten_ai_base/interface/ten_ai_base/helper.py @@ -0,0 +1,115 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2024-08. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +import asyncio +from collections import deque +from typing import Callable +from ten.async_ten_env import AsyncTenEnv + + +def get_property_bool(ten_env: AsyncTenEnv, property_name: str) -> bool: + """Helper to get boolean property from ten_env with error handling.""" + try: + return ten_env.get_property_bool(property_name) + except Exception as err: + ten_env.log_warn(f"GetProperty {property_name} failed: {err}") + return False + +def get_properties_bool(ten_env: AsyncTenEnv, property_names: list[str], callback: Callable[[str, bool], None]) -> None: + """Helper to get boolean properties from ten_env with error handling.""" + for property_name in property_names: + callback(property_name, get_property_bool(ten_env, property_name)) + + +def get_property_string(ten_env: AsyncTenEnv, property_name: str) -> str: + """Helper to get string property from ten_env with error handling.""" + try: + return ten_env.get_property_string(property_name) + except Exception as err: + ten_env.log_warn(f"GetProperty {property_name} failed: {err}") + return "" + + +def get_properties_string(ten_env: AsyncTenEnv, property_names: list[str], callback: Callable[[str, str], None]) -> None: + """Helper to get string properties from ten_env with error handling.""" + for property_name in property_names: + callback(property_name, get_property_string(ten_env, property_name)) + +def get_property_int(ten_env: AsyncTenEnv, property_name: str) -> int: + """Helper to get int property from ten_env with error handling.""" + try: + return ten_env.get_property_int(property_name) + except Exception as err: + ten_env.log_warn(f"GetProperty {property_name} failed: {err}") + return 0 + +def get_properties_int(ten_env: AsyncTenEnv, property_names: list[str], callback: Callable[[str, int], None]) -> None: + """Helper to get int properties from ten_env with error handling.""" + for property_name in property_names: + callback(property_name, get_property_int(ten_env, property_name)) + +def get_property_float(ten_env: AsyncTenEnv, property_name: str) -> float: + """Helper to get float property from ten_env with error handling.""" + try: + return ten_env.get_property_float(property_name) + except Exception as err: + ten_env.log_warn(f"GetProperty {property_name} failed: {err}") + return 0.0 + +def get_properties_float(ten_env: AsyncTenEnv, property_names: list[str], callback: Callable[[str, float], None]) -> None: + """Helper to get float properties from ten_env with error handling.""" + for property_name in property_names: + callback(property_name, get_property_float(ten_env, property_name)) + +class AsyncEventEmitter: + def __init__(self): + self.listeners = {} + + def on(self, event_name, listener): + """Register an event listener.""" + if event_name not in self.listeners: + self.listeners[event_name] = [] + self.listeners[event_name].append(listener) + + def emit(self, event_name, *args, **kwargs): + """Fire the event without waiting for listeners to finish.""" + if event_name in self.listeners: + for listener in self.listeners[event_name]: + asyncio.create_task(listener(*args, **kwargs)) + + +class AsyncQueue: + def __init__(self): + self._queue = deque() # Use deque for efficient prepend and append + self._condition = asyncio.Condition() # Use Condition to manage access + + async def put(self, item, prepend=False): + """Add an item to the queue (prepend if specified).""" + async with self._condition: + if prepend: + self._queue.appendleft(item) # Prepend item to the front + else: + self._queue.append(item) # Append item to the back + self._condition.notify() + + async def get(self): + """Remove and return an item from the queue.""" + async with self._condition: + while not self._queue: + await self._condition.wait() # Wait until an item is available + return self._queue.popleft() # Pop from the front of the deque + + async def flush(self): + """Flush all items from the queue.""" + async with self._condition: + while self._queue: + self._queue.popleft() # Clear the queue + self._condition.notify_all() # Notify all consumers that the queue is empty + + def __len__(self): + """Return the current size of the queue.""" + return len(self._queue) \ No newline at end of file diff --git a/agents/ten_packages/system/ten_ai_base/interface/ten_ai_base/llm.py b/agents/ten_packages/system/ten_ai_base/interface/ten_ai_base/llm.py new file mode 100644 index 00000000..bc2665af --- /dev/null +++ b/agents/ten_packages/system/ten_ai_base/interface/ten_ai_base/llm.py @@ -0,0 +1,141 @@ +# +# This file is part of TEN Framework, an open source project. +# Licensed under the Apache License, Version 2.0. +# See the LICENSE file for more information. +# +from abc import ABC, abstractmethod +import asyncio +from enum import Enum +from typing import Optional, TypedDict, Union + +from pydantic import BaseModel +from ten import ( + AsyncExtension, + TenEnv, + Data, +) +from ten.async_ten_env import AsyncTenEnv +from ten.cmd import Cmd +from ten.cmd_result import CmdResult, StatusCode +from .const import CMD_PROPERTY_TOOL, CMD_TOOL_REGISTER, DATA_OUTPUT_NAME, DATA_OUTPUT_PROPERTY_END_OF_SEGMENT, DATA_OUTPUT_PROPERTY_TEXT +from .types import LLMCallCompletionArgs, LLMDataCompletionArgs, LLMToolMetadata +from .helper import AsyncQueue +import json + +class AsyncLLMBaseExtension(AsyncExtension, ABC): + """ + Base class for implementing a Language Model Extension. + This class provides a basic implementation for processing chat completions. + It automatically handles the registration of tools and the processing of chat completions. + Use queue_input_item to queue input items for processing. + Use flush_input_items to flush the queue and cancel the current task. + Override on_call_chat_completion and on_data_chat_completion to implement the chat completion logic. + """ + # Create the queue for message processing + def __init__(self, name: str): + super().__init__(name) + self.queue = AsyncQueue() + self.available_tools: list[LLMToolMetadata] = [] + self.available_tools_lock = asyncio.Lock() # Lock to ensure thread-safe access + self.current_task = None + self.hit_default_cmd = False + + async def on_init(self, ten_env: TenEnv) -> None: + ten_env.log_debug("on_init") + + async def on_start(self, ten_env: TenEnv) -> None: + ten_env.log_debug("on_start") + + self.loop = asyncio.get_event_loop() + self.loop.create_task(self._process_queue(ten_env)) + + async def on_stop(self, ten_env: TenEnv) -> None: + ten_env.log_debug("on_stop") + + async def on_deinit(self, ten_env: TenEnv) -> None: + ten_env.log_debug("on_deinit") + + async def on_cmd(self, async_ten_env: AsyncTenEnv, cmd: Cmd) -> None: + """ + handle default commands + return True if the command is handled, False otherwise + """ + cmd_name = cmd.get_name() + async_ten_env.log_debug(f"on_cmd name {cmd_name}") + if cmd_name == CMD_TOOL_REGISTER: + try: + tool_metadata_json = json.loads(cmd.get_property_to_json(CMD_PROPERTY_TOOL)) + async_ten_env.log_info(f"register tool: {tool_metadata_json}") + tool_metadata = LLMToolMetadata.model_validate_json(tool_metadata_json) + async with self.available_tools_lock: + self.available_tools.append(tool_metadata) + await self.on_tools_update(async_ten_env, tool_metadata) + async_ten_env.return_result(CmdResult.create(StatusCode.OK), cmd) + except Exception as err: + async_ten_env.log_warn(f"on_cmd failed: {err}") + async_ten_env.return_result(CmdResult.create(StatusCode.ERROR), cmd) + + + async def queue_input_item(self, prepend: bool = False, **kargs: LLMDataCompletionArgs): + """Queues an input item for processing.""" + await self.queue.put(kargs, prepend) + + async def flush_input_items(self, ten_env: TenEnv): + """Flushes the self.queue and cancels the current task.""" + # Flush the queue using the new flush method + await self.queue.flush() + + # Cancel the current task if one is running + if self.current_task: + ten_env.log_info("Cancelling the current task during flush.") + self.current_task.cancel() + + def send_text_output(self, ten_env: TenEnv, sentence: str, end_of_segment: bool): + try: + output_data = Data.create(DATA_OUTPUT_NAME) + output_data.set_property_string( + DATA_OUTPUT_PROPERTY_TEXT, sentence) + output_data.set_property_bool( + DATA_OUTPUT_PROPERTY_END_OF_SEGMENT, end_of_segment + ) + ten_env.send_data(output_data) + ten_env.log_info( + f"{'end of segment ' if end_of_segment else ''}sent sentence [{sentence}]" + ) + except Exception as err: + ten_env.log_warn( + f"send sentence [{sentence}] failed, err: {err}" + ) + + @abstractmethod + async def on_call_chat_completion(self, ten_env: TenEnv, **kargs: LLMCallCompletionArgs) -> None: + """Called when a chat completion is requested by cmd call. Implement this method to process the chat completion.""" + pass + + @abstractmethod + async def on_data_chat_completion(self, ten_env: TenEnv, **kargs: LLMDataCompletionArgs) -> None: + """ + Called when a chat completion is requested by data input. Implement this method to process the chat completion. + Note that this method is stream-based, and it should consider supporting local context caching. + """ + pass + + @abstractmethod + async def on_tools_update(self, ten_env: TenEnv, tool: LLMToolMetadata) -> None: + """Called when a new tool is registered. Implement this method to process the new tool.""" + pass + + async def _process_queue(self, ten_env: TenEnv): + """Asynchronously process queue items one by one.""" + while True: + # Wait for an item to be available in the queue + args = await self.queue.get() + try: + ten_env.log_info(f"Processing queue item: {args}") + self.current_task = asyncio.create_task( + self.on_data_chat_completion(ten_env, **args)) + await self.current_task # Wait for the current task to finish or be cancelled + except asyncio.CancelledError: + ten_env.log_info(f"Task cancelled: {args}") + + \ No newline at end of file diff --git a/agents/ten_packages/system/ten_ai_base/interface/ten_ai_base/llm_tool.py b/agents/ten_packages/system/ten_ai_base/interface/ten_ai_base/llm_tool.py new file mode 100644 index 00000000..bf4c76ff --- /dev/null +++ b/agents/ten_packages/system/ten_ai_base/interface/ten_ai_base/llm_tool.py @@ -0,0 +1,82 @@ +from abc import ABC, abstractmethod +import asyncio +from enum import Enum +from typing import Optional, TypedDict +from ten import ( + AsyncExtension, + TenEnv, + Data, +) +from ten.async_ten_env import AsyncTenEnv +from ten.audio_frame import AudioFrame +from ten.cmd import Cmd +from ten.cmd_result import CmdResult, StatusCode +from ten.video_frame import VideoFrame +from .types import LLMToolMetadata, LLMToolResult +from .const import CMD_TOOL_REGISTER, CMD_TOOL_CALL, CMD_PROPERTY_TOOL, CMD_PROPERTY_RESULT +import json + +class AsyncLLMToolBaseExtension(AsyncExtension, ABC): + async def on_start(self, ten_env: AsyncTenEnv) -> None: + ten_env.log_debug("on_start") + tools = self.get_tool_metadata() + for tool in tools: + ten_env.log_info(f"tool: {tool}") + c:Cmd = Cmd.create(CMD_TOOL_REGISTER) + c.set_property_from_json(CMD_PROPERTY_TOOL, json.dumps(tool.model_dump_json())) + await ten_env.send_cmd(c) + ten_env.log_info(f"tool registered, {tool}") + + + async def on_stop(self, ten_env: TenEnv) -> None: + ten_env.log_debug("on_stop") + + async def on_deinit(self, ten_env: TenEnv) -> None: + ten_env.log_debug("on_deinit") + + async def on_cmd(self, ten_env: TenEnv, cmd: Cmd) -> None: + cmd_name = cmd.get_name() + ten_env.log_debug("on_cmd name {}".format(cmd_name)) + + if cmd_name == CMD_TOOL_CALL: + try: + tool_name = cmd.get_property_string("name") + tool_args = json.loads(cmd.get_property_to_json("arguments")) + ten_env.log_debug(f"tool_name: {tool_name}, tool_args: {tool_args}") + result = await self.run_tool(tool_name, tool_args) + cmd_result:CmdResult = CmdResult.create(StatusCode.OK) + cmd_result.set_property_from_json(CMD_PROPERTY_RESULT, json.dumps(result.model_dump_json())) + ten_env.return_result(cmd_result, cmd) + ten_env.log_debug(f"tool result done, {result}") + except Exception as err: + ten_env.log_warn(f"on_cmd failed: {err}") + ten_env.return_result(CmdResult.create(StatusCode.ERROR), cmd) + + async def on_data(self, ten_env: TenEnv, data: Data) -> None: + data_name = data.get_name() + ten_env.log_debug("on_data name {}".format(data_name)) + + # TODO: process data + pass + + async def on_audio_frame(self, ten_env: TenEnv, audio_frame: AudioFrame) -> None: + audio_frame_name = audio_frame.get_name() + ten_env.log_debug("on_audio_frame name {}".format(audio_frame_name)) + + # TODO: process audio frame + pass + + async def on_video_frame(self, ten_env: TenEnv, video_frame: VideoFrame) -> None: + video_frame_name = video_frame.get_name() + ten_env.log_debug("on_video_frame name {}".format(video_frame_name)) + + # TODO: process video frame + pass + + @abstractmethod + def get_tool_metadata(self) -> list[LLMToolMetadata]: + pass + + @abstractmethod + async def run_tool(self, name: str, args: dict) -> LLMToolResult: + pass \ No newline at end of file diff --git a/agents/ten_packages/system/ten_ai_base/interface/ten_ai_base/types.py b/agents/ten_packages/system/ten_ai_base/interface/ten_ai_base/types.py new file mode 100644 index 00000000..1131f8f0 --- /dev/null +++ b/agents/ten_packages/system/ten_ai_base/interface/ten_ai_base/types.py @@ -0,0 +1,39 @@ +from typing import TypedDict, Union +from pydantic import BaseModel + +class LLMToolMetadataParameter(BaseModel): + name: str + type: str + description: str + required: bool + +class LLMToolMetadata(BaseModel): + name: str + description: str + parameters: list[LLMToolMetadataParameter] + + +class LLMCompletionContentItemText(BaseModel): + text: str + +class LLMCompletionContentItemImage(BaseModel): + image: str + +class LLMCompletionContentItemAudio(BaseModel): + audio: str + +LLMCompletionContentItem = Union[ + LLMCompletionContentItemText, + LLMCompletionContentItemImage, + LLMCompletionContentItemAudio +] + +class LLMToolResult(BaseModel): + items: list[LLMCompletionContentItem] + +class LLMCallCompletionArgs(TypedDict, total=False): + content: list[LLMCompletionContentItem] + +class LLMDataCompletionArgs(TypedDict, total=False): + content: list[LLMCompletionContentItem] + no_tool: bool = False \ No newline at end of file diff --git a/agents/ten_packages/system/ten_ai_base/requirements.txt b/agents/ten_packages/system/ten_ai_base/requirements.txt new file mode 100644 index 00000000..59cc1e9f --- /dev/null +++ b/agents/ten_packages/system/ten_ai_base/requirements.txt @@ -0,0 +1 @@ +pydantic \ No newline at end of file