diff --git a/.env.example b/.env.example index 70a544ae..f78c2141 100644 --- a/.env.example +++ b/.env.example @@ -67,6 +67,10 @@ ELEVENLABS_TTS_KEY= # Gemini API key GEMINI_API_KEY= +# Extension: azure_vision_python +# Azure vision key +AZURE_VISION_KEY= + # Extension: litellm # Using Environment Variables, refer to https://docs.litellm.ai/docs/providers # For example: diff --git a/agents/property.json b/agents/property.json index 12c36d29..aae833f8 100644 --- a/agents/property.json +++ b/agents/property.json @@ -2659,6 +2659,326 @@ ] } ] + }, + { + "name": "va.openai.v2v.vision", + "auto_start": false, + "nodes": [ + { + "type": "extension", + "extension_group": "rtc", + "addon": "agora_rtc", + "name": "agora_rtc", + "property": { + "app_id": "${env:AGORA_APP_ID}", + "token": "", + "channel": "ten_agent_test", + "stream_id": 1234, + "remote_stream_id": 123, + "subscribe_audio": true, + "subscribe_video": true, + "publish_audio": true, + "publish_data": true, + "subscribe_audio_sample_rate": 24000, + "subscribe_video_pix_fmt": 4 + } + }, + { + "type": "extension", + "extension_group": "llm", + "addon": "openai_v2v_python", + "name": "openai_v2v_python", + "property": { + "api_key": "${env:OPENAI_REALTIME_API_KEY}", + "temperature": 0.9, + "model": "gpt-4o-realtime-preview", + "max_tokens": 2048, + "voice": "alloy", + "language": "en-US", + "server_vad": true, + "dump": true, + "history": 10 + } + }, + { + "type": "extension", + "extension_group": "transcriber", + "addon": "message_collector", + "name": "message_collector" + }, + { + "type": "extension", + "extension_group": "tools", + "addon": "weatherapi_tool_python", + "name": "weatherapi_tool_python", + "property": { + "api_key": "${env:WEATHERAPI_API_KEY}" + } + }, + { + "type": "extension", + "extension_group": "tools", + "addon": "bingsearch_tool_python", + "name": "bingsearch_tool_python", + "property": { + "api_key": "${env:BING_API_KEY}" + } + }, + { + "type": "extension", + "extension_group": "tools", + "addon": "vision_tool_python", + "name": "vision_tool_python", + "property": { + "frequency_ms": 1000, + "use_llm": false + } + }, + { + "type": "extension", + "extension_group": "azure_vision", + "addon": "azure_vision_python", + "name": "azure_vision_python", + "property": { + "key": "${env:AZURE_VISION_KEY}", + "endpoint": "https://tenagentvision.cognitiveservices.azure.com/" + } + } + ], + "connections": [ + { + "extension_group": "rtc", + "extension": "agora_rtc", + "audio_frame": [ + { + "name": "pcm_frame", + "dest": [ + { + "extension_group": "llm", + "extension": "openai_v2v_python" + } + ] + } + ], + "video_frame": [ + { + "name": "video_frame", + "dest": [ + { + "extension_group": "tools", + "extension": "vision_tool_python" + } + ] + } + ] + }, + { + "extension_group": "tools", + "extension": "vision_tool_python", + "cmd": [ + { + "name": "tool_register", + "dest": [ + { + "extension_group": "llm", + "extension": "openai_v2v_python" + } + ] + }, + { + "name": "image_analyze", + "dest": [ + { + "extension_group": "azure_vision", + "extension": "azure_vision_python" + } + ] + } + ] + }, + { + "extension_group": "tools", + "extension": "weatherapi_tool_python", + "cmd": [ + { + "name": "tool_register", + "dest": [ + { + "extension_group": "llm", + "extension": "openai_v2v_python" + } + ] + } + ] + }, + { + "extension_group": "tools", + "extension": "bingsearch_tool_python", + "cmd": [ + { + "name": "tool_register", + "dest": [ + { + "extension_group": "llm", + "extension": "openai_v2v_python" + } + ] + } + ] + }, + { + "extension_group": "llm", + "extension": "openai_v2v_python", + "audio_frame": [ + { + "name": "pcm_frame", + "dest": [ + { + "extension_group": "rtc", + "extension": "agora_rtc" + } + ] + } + ], + "data": [ + { + "name": "text_data", + "dest": [ + { + "extension_group": "transcriber", + "extension": "message_collector" + } + ] + } + ], + "cmd": [ + { + "name": "flush", + "dest": [ + { + "extension_group": "rtc", + "extension": "agora_rtc" + } + ] + }, + { + "name": "tool_call_get_current_weather", + "dest": [ + { + "extension_group": "tools", + "extension": "weatherapi_tool_python", + "msg_conversion": { + "type": "per_property", + "keep_original": true, + "rules": [ + { + "path": "_ten.name", + "conversion_mode": "fixed_value", + "value": "tool_call" + } + ] + } + } + ] + }, + { + "name": "tool_call_get_past_weather", + "dest": [ + { + "extension_group": "tools", + "extension": "weatherapi_tool_python", + "msg_conversion": { + "type": "per_property", + "keep_original": true, + "rules": [ + { + "path": "_ten.name", + "conversion_mode": "fixed_value", + "value": "tool_call" + } + ] + } + } + ] + }, + { + "name": "tool_call_get_future_weather", + "dest": [ + { + "extension_group": "tools", + "extension": "weatherapi_tool_python", + "msg_conversion": { + "type": "per_property", + "keep_original": true, + "rules": [ + { + "path": "_ten.name", + "conversion_mode": "fixed_value", + "value": "tool_call" + } + ] + } + } + ] + }, + { + "name": "tool_call_bing_search", + "dest": [ + { + "extension_group": "tools", + "extension": "bingsearch_tool_python", + "msg_conversion": { + "type": "per_property", + "keep_original": true, + "rules": [ + { + "path": "_ten.name", + "conversion_mode": "fixed_value", + "value": "tool_call" + } + ] + } + } + ] + }, + { + "name": "tool_call_query_single_image", + "dest": [ + { + "extension_group": "tools", + "extension": "vision_tool_python", + "msg_conversion": { + "type": "per_property", + "keep_original": true, + "rules": [ + { + "path": "_ten.name", + "conversion_mode": "fixed_value", + "value": "tool_call" + } + ] + } + } + ] + } + ] + }, + { + "extension_group": "transcriber", + "extension": "message_collector", + "data": [ + { + "name": "data", + "dest": [ + { + "extension_group": "rtc", + "extension": "agora_rtc" + } + ] + } + ] + } + ] } ] } diff --git a/agents/ten_packages/extension/azure_vision_python/BUILD.gn b/agents/ten_packages/extension/azure_vision_python/BUILD.gn new file mode 100644 index 00000000..1211cff4 --- /dev/null +++ b/agents/ten_packages/extension/azure_vision_python/BUILD.gn @@ -0,0 +1,21 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2022-11. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +import("//build/feature/ten_package.gni") + +ten_package("azure_vision_python") { + package_kind = "extension" + + resources = [ + "__init__.py", + "addon.py", + "extension.py", + "log.py", + "manifest.json", + "property.json", + ] +} diff --git a/agents/ten_packages/extension/azure_vision_python/README.md b/agents/ten_packages/extension/azure_vision_python/README.md new file mode 100644 index 00000000..e8cb3ca4 --- /dev/null +++ b/agents/ten_packages/extension/azure_vision_python/README.md @@ -0,0 +1,44 @@ +# azure_vision_python + +This is the extension calling azure ai vision. + +The document is as follow: https://learn.microsoft.com/zh-cn/azure/ai-services/computer-vision/overview + +## Properties + +- key +- endpoint + +## Features + +- Only support one frame of image +- No customization for feature +- By default will include `TAGS`, `CAPTION`, `READ`, `PEOPLE`, `OBJECTS` + +## API + +Refer to `api` definition in [manifest.json] and default values in [property.json](property.json). + +Other extensions can call `analyze_image` cmd and will get all analyze result from result in `response` property, the result will looks like this: + +``` json +{ + "modelVersion": "2023-10-01", + "captionResult": { + "text": "a group of toys on a table", + "confidence": 0.7558467388153076 + }, + "metadata": { + "width": 320, + "height": 240 + }, + "objectsResult": {}, + "readResult": {}, + "peopleResult": {} +} +``` + +## Misc + +- Video analyze +- Multi-frame analyze \ No newline at end of file diff --git a/agents/ten_packages/extension/azure_vision_python/__init__.py b/agents/ten_packages/extension/azure_vision_python/__init__.py new file mode 100644 index 00000000..e742dc3e --- /dev/null +++ b/agents/ten_packages/extension/azure_vision_python/__init__.py @@ -0,0 +1,11 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2024-08. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +from . import addon +from .log import logger + +logger.info("azure_vision_python extension loaded") diff --git a/agents/ten_packages/extension/azure_vision_python/addon.py b/agents/ten_packages/extension/azure_vision_python/addon.py new file mode 100644 index 00000000..161ce5c3 --- /dev/null +++ b/agents/ten_packages/extension/azure_vision_python/addon.py @@ -0,0 +1,22 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2024-08. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +from ten import ( + Addon, + register_addon_as_extension, + TenEnv, +) +from .extension import AzureVisionExtension +from .log import logger + + +@register_addon_as_extension("azure_vision_python") +class AzureVisionExtensionAddon(Addon): + + def on_create_instance(self, ten_env: TenEnv, name: str, context) -> None: + logger.info("AzureVisionExtensionAddon on_create_instance") + ten_env.on_create_instance_done(AzureVisionExtension(name), context) diff --git a/agents/ten_packages/extension/azure_vision_python/extension.py b/agents/ten_packages/extension/azure_vision_python/extension.py new file mode 100644 index 00000000..84eeadce --- /dev/null +++ b/agents/ten_packages/extension/azure_vision_python/extension.py @@ -0,0 +1,153 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2024-08. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +import json + +from typing import Any +from azure.ai.vision.imageanalysis import ImageAnalysisClient +from azure.ai.vision.imageanalysis.models import VisualFeatures +from azure.core.credentials import AzureKeyCredential + +from ten import ( + AudioFrame, + VideoFrame, + Extension, + TenEnv, + Cmd, + StatusCode, + CmdResult, + Data, +) +from .log import logger + +PROPERTY_KEY = "key" +PROPERTY_ENDPOINT = "endpoint" + +CMD_IMAGE_ANALYZE = "image_analyze" + +class AzureVisionExtension(Extension): + key: str = "" + endpoint: str = "https://tenagentvision.cognitiveservices.azure.com/" + + def on_init(self, ten_env: TenEnv) -> None: + logger.info("AzureVisionExtension on_init") + ten_env.on_init_done() + + def on_start(self, ten_env: TenEnv) -> None: + logger.info("AzureVisionExtension on_start") + + try: + self.key = ten_env.get_property_string(PROPERTY_KEY) + except Exception as err: + logger.error(f"GetProperty optional {PROPERTY_KEY} error: {err}") + return + + try: + self.endpoint = ten_env.get_property_string(PROPERTY_ENDPOINT) + except Exception as err: + logger.info(f"GetProperty optional {PROPERTY_ENDPOINT} error: {err}") + + ten_env.on_start_done() + + def on_stop(self, ten_env: TenEnv) -> None: + logger.info("AzureVisionExtension on_stop") + ten_env.on_stop_done() + + def on_deinit(self, ten_env: TenEnv) -> None: + logger.info("AzureVisionExtension on_deinit") + ten_env.on_deinit_done() + + def on_cmd(self, ten_env: TenEnv, cmd: Cmd) -> None: + cmd_name = cmd.get_name() + logger.info("on_cmd name {}".format(cmd_name)) + + if cmd_name == CMD_IMAGE_ANALYZE: + try: + image_data = cmd.get_property_buf("image_data") + resp = self._analyze_image(image_data) + cmd_result = CmdResult.create(StatusCode.OK) + cmd_result.set_property_string("response", json.dumps(resp)) + ten_env.return_result(cmd_result, cmd) + return + except: + logger.exception("Failed to handle analyze") + + cmd_result = CmdResult.create(StatusCode.OK) + ten_env.return_result(cmd_result, cmd) + + def on_data(self, ten_env: TenEnv, data: Data) -> None: + pass + + def on_audio_frame(self, ten_env: TenEnv, audio_frame: AudioFrame) -> None: + pass + + def on_video_frame(self, ten_env: TenEnv, video_frame: VideoFrame) -> None: + pass + + def _analyze_image(self, image_data: bytes) -> Any: + client = ImageAnalysisClient( + endpoint=self.endpoint, + credential=AzureKeyCredential(self.key) + ) + + # Get a caption for the image. This will be a synchronously (blocking) call. + result = client.analyze( + image_data=image_data, + visual_features=[VisualFeatures.TAGS, VisualFeatures.CAPTION, VisualFeatures.READ, VisualFeatures.PEOPLE, VisualFeatures.OBJECTS], + gender_neutral_caption=True, + ) + + logger.info(f"before return {result}") + + rst = {} + if result.tags is not None: + tags = [] + for tag in result.tags.list: + tags.append({ + "name": tag.name, + "confidence": tag.confidence + }) + rst["tags"] = tags + + if result.caption is not None: + rst["caption"] = { + "text": result.caption.text, + "confidence": result.caption.confidence + } + + if result.read is not None: + lines = [] + for block in result.read.blocks: + for line in block.lines: + lines.append({ + "text": line.text, + "bounding_box": str(line.bounding_polygon), + }) + rst["read"] = lines + + if result.objects is not None: + objects = [] + for object in result.objects.list: + objects.append({ + "name": object.tags[0].name, + "bounding_box": str(object.bounding_box), + "confidence": object.tags[0].confidence + }) + rst["objects"] = objects + + if result.people is not None: + people = [] + for person in result.people.list: + people.append({ + "bounding_box": str(person.bounding_box), + "confidence": person.confidence + }) + rst["people"] = people + + logger.info(f"after parse {rst}") + + return rst \ No newline at end of file diff --git a/agents/ten_packages/extension/azure_vision_python/log.py b/agents/ten_packages/extension/azure_vision_python/log.py new file mode 100644 index 00000000..dd55cbfd --- /dev/null +++ b/agents/ten_packages/extension/azure_vision_python/log.py @@ -0,0 +1,22 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2024-08. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +import logging + +logger = logging.getLogger("azure_vision_python") +logger.setLevel(logging.INFO) + +formatter_str = ( + "%(asctime)s - %(name)s - %(levelname)s - %(process)d - " + "[%(filename)s:%(lineno)d] - %(message)s" +) +formatter = logging.Formatter(formatter_str) + +console_handler = logging.StreamHandler() +console_handler.setFormatter(formatter) + +logger.addHandler(console_handler) diff --git a/agents/ten_packages/extension/azure_vision_python/manifest.json b/agents/ten_packages/extension/azure_vision_python/manifest.json new file mode 100644 index 00000000..c8b3b8fa --- /dev/null +++ b/agents/ten_packages/extension/azure_vision_python/manifest.json @@ -0,0 +1,52 @@ +{ + "type": "extension", + "name": "azure_vision_python", + "version": "0.1.0", + "dependencies": [ + { + "type": "system", + "name": "ten_runtime_python", + "version": "0.2" + } + ], + "package": { + "include": [ + "manifest.json", + "property.json", + "BUILD.gn", + "**.tent", + "**.py", + "README.md" + ] + }, + "api": { + "property": { + "key": { + "type": "string" + }, + "endpoint": { + "type": "string" + } + }, + "cmd_in": [ + { + "name": "analyze_image", + "property": { + "image_data": { + "type": "buf" + } + }, + "required": [ + "image_data" + ], + "result": { + "property": { + "response": { + "type": "string" + } + } + } + } + ] + } +} \ No newline at end of file diff --git a/agents/ten_packages/extension/azure_vision_python/property.json b/agents/ten_packages/extension/azure_vision_python/property.json new file mode 100644 index 00000000..9e26dfee --- /dev/null +++ b/agents/ten_packages/extension/azure_vision_python/property.json @@ -0,0 +1 @@ +{} \ No newline at end of file diff --git a/agents/ten_packages/extension/azure_vision_python/requirements.txt b/agents/ten_packages/extension/azure_vision_python/requirements.txt new file mode 100644 index 00000000..830deb56 --- /dev/null +++ b/agents/ten_packages/extension/azure_vision_python/requirements.txt @@ -0,0 +1 @@ +azure-ai-vision-imageanalysis \ No newline at end of file diff --git a/agents/ten_packages/extension/bedrock_llm_python/bedrock_llm.py b/agents/ten_packages/extension/bedrock_llm_python/bedrock_llm.py index c833f7b7..71dc934c 100644 --- a/agents/ten_packages/extension/bedrock_llm_python/bedrock_llm.py +++ b/agents/ten_packages/extension/bedrock_llm_python/bedrock_llm.py @@ -1,4 +1,6 @@ import boto3 + +from typing import List, Any from .log import logger class BedrockLLMConfig: @@ -71,5 +73,31 @@ def get_converse_stream(self, messages): try: response = self.client.converse_stream(**bedrock_req_params) return response + except Exception as e: + raise Exception(f"GetConverseStream failed, err: {e}") + + def chat_completion_cmd(self, messages: List[Any], stream: bool, is_json: bool): + bedrock_req_params = { + "modelId": self.config.model, + "messages": messages, + "inferenceConfig": { + "temperature": self.config.temperature, + "maxTokens": self.config.max_tokens, + "topP": self.config.top_p, + # "stopSequences": [], + }, + # "additionalModelRequestFields": additional_model_fields, + } + + logger.info(f"before chat {bedrock_req_params}") + + f = self.client.converse_stream + if not stream: + f = self.client.converse + + try: + response = f(**bedrock_req_params) + logger.info(f"after chat {response}") + return response except Exception as e: raise Exception(f"GetConverseStream failed, err: {e}") \ No newline at end of file diff --git a/agents/ten_packages/extension/bedrock_llm_python/bedrock_llm_extension.py b/agents/ten_packages/extension/bedrock_llm_python/bedrock_llm_extension.py index 5e7918a8..cf6e5d52 100644 --- a/agents/ten_packages/extension/bedrock_llm_python/bedrock_llm_extension.py +++ b/agents/ten_packages/extension/bedrock_llm_python/bedrock_llm_extension.py @@ -1,6 +1,14 @@ -from .bedrock_llm import BedrockLLM, BedrockLLMConfig + +import json +import copy + from datetime import datetime from threading import Thread +from queue import Queue +from threading import Thread +from typing import List, Any, AnyStr +from base64 import b64decode + from ten import ( Addon, Extension, @@ -12,6 +20,7 @@ CmdResult, ) from .log import logger +from .bedrock_llm import BedrockLLM, BedrockLLMConfig CMD_IN_FLUSH = "flush" @@ -21,6 +30,8 @@ DATA_OUT_TEXT_DATA_PROPERTY_TEXT = "text" DATA_OUT_TEXT_DATA_PROPERTY_TEXT_END_OF_SEGMENT = "end_of_segment" +CMD_IN_CHAT_COMPLETION = "chat_completion" + PROPERTY_REGION = "region" # Optional PROPERTY_ACCESS_KEY = "access_key" # Optional PROPERTY_SECRET_KEY = "secret_key" # Optional @@ -68,10 +79,17 @@ class BedrockLLMExtension(Extension): max_memory_length = 10 outdate_ts = 0 bedrock_llm = None + greeting = "" + + stopped: bool = False + cmd_queue = Queue() + thread: Thread = None + ten_env: TenEnv = None def on_start(self, ten: TenEnv) -> None: logger.info("BedrockLLMExtension on_start") # Prepare configuration + self.ten_env = ten bedrock_llm_config = BedrockLLMConfig.default_config() for optional_str_param in [ @@ -110,7 +128,7 @@ def on_start(self, ten: TenEnv) -> None: ) try: - greeting = ten.get_property_string(PROPERTY_GREETING) + self.greeting = ten.get_property_string(PROPERTY_GREETING) except Exception as err: logger.debug( f"GetProperty optional {PROPERTY_GREETING} failed, err: {err}." @@ -134,24 +152,30 @@ def on_start(self, ten: TenEnv) -> None: except Exception as err: logger.exception(f"newBedrockLLM failed, err: {err}") + self.thread = Thread(target=self.loop) + self.thread.start() + # Send greeting if available - if greeting: + if self.greeting: try: output_data = Data.create("text_data") output_data.set_property_string( - DATA_OUT_TEXT_DATA_PROPERTY_TEXT, greeting + DATA_OUT_TEXT_DATA_PROPERTY_TEXT, self.greeting ) output_data.set_property_bool( DATA_OUT_TEXT_DATA_PROPERTY_TEXT_END_OF_SEGMENT, True ) ten.send_data(output_data) - logger.info(f"greeting [{greeting}] sent") + logger.info(f"greeting [{self.greeting}] sent") except Exception as err: - logger.info(f"greeting [{greeting}] send failed, err: {err}") + logger.info(f"greeting [{self.greeting}] send failed, err: {err}") ten.on_start_done() def on_stop(self, ten: TenEnv) -> None: logger.info("BedrockLLMExtension on_stop") + self.stopped = True + self.cmd_queue.put(None) + self.thread.join() ten.on_stop_done() def on_cmd(self, ten: TenEnv, cmd: Cmd) -> None: @@ -166,6 +190,23 @@ def on_cmd(self, ten: TenEnv, cmd: Cmd) -> None: cmd_out = Cmd.create(CMD_OUT_FLUSH) ten.send_cmd(cmd_out, None) logger.info(f"BedrockLLMExtension on_cmd sent flush") + elif cmd_name == CMD_IN_CHAT_COMPLETION: + m_str = cmd.get_property_string("messages") + messages = json.loads(m_str) + stream = False + is_json = False + try: + stream = cmd.get_property_bool("stream") + except: + pass + + try: + is_json = cmd.get_property_bool("json") + except: + pass + + self.cmd_queue.put((messages, stream, is_json, cmd)) + return else: logger.info(f"BedrockLLMExtension on_cmd unknown cmd: {cmd_name}") cmd_result = CmdResult.create(StatusCode.ERROR) @@ -365,6 +406,92 @@ def converse_stream_worker(start_time, input_text, memory): thread.start() logger.info(f"BedrockLLMExtension on_data end") + def loop(self): + logger.info(f"starting loop {self.stopped}") + while not self.stopped: + c = self.cmd_queue.get() + if c is None: + break + + try: + messages, stream, is_json, cmd = c + try: + messages = self._convert_messages(messages) + logger.info(f"after convert {messages}") + resp = self.bedrock_llm.chat_completion_cmd(messages=messages, stream=stream, is_json=is_json) + if not stream: + output_message = resp['output']['message'] + cmd_result = CmdResult.create(StatusCode.OK) + cmd_result.set_property_string("response", output_message) + self.ten_env.return_result(cmd_result, cmd) + else: + stream = resp.get("stream") + status_code = StatusCode.OK + for event in stream: + if "contentBlockDelta" in event: + delta_types = event["contentBlockDelta"]["delta"].keys() + # ignore other types of content: e.g toolUse + if "text" in delta_types: + content = event["contentBlockDelta"]["delta"]["text"] + cmd_result = CmdResult.create(StatusCode.OK) + cmd_result.set_property_string("response", content) + cmd_result.set_is_final(False) + self.ten_env.return_result(cmd_result, cmd) + elif ( + "internalServerException" in event + or "modelStreamErrorException" in event + or "throttlingException" in event + or "validationException" in event + ): + logger.error(f"GetConverseStream Error occured: {event}") + status_code = StatusCode.ERROR + break + else: + # ingore other events + continue + + cmd_result = CmdResult.create(status_code) + cmd_result.set_property_string("response", "") + cmd_result.set_is_final(True) + self.ten_env.return_result(cmd_result, cmd) + except: + logger.exception("Failed to handle queue") + except: + logger.exception("failed") + + def _convert_messages(self, messages: List[Any]): + result = [] + logger.info(f"_convert_messages {messages}") + for message in messages: + parts = [] + #if message["role"] == "system": + # result.append(message) + # continue + content = message["content"] + if type(content) == list: + for part in content: + if part["type"] == "image_url": + # rewrite image + # part["type"] = "image" + origin = part["image_url"]["url"] + del part["image_url"] + partial = str(origin[23:]) + part["image"] = { + "format": "jpeg", + "source": { + "bytes": b64decode(partial.encode('utf-8')) + } + } + del part["type"] + parts.append(part) + elif type(content) == str: + parts.append({"text": content}) + + del message["content"] + message["content"] = parts + result.append(message) + logger.info(f"after _convert_messages {result}") + return result @register_addon_as_extension("bedrock_llm_python") class BedrockLLMExtensionAddon(Addon): diff --git a/agents/ten_packages/extension/bingsearch_tool_python/README.md b/agents/ten_packages/extension/bingsearch_tool_python/README.md index 581fdf5e..b5b20b98 100644 --- a/agents/ten_packages/extension/bingsearch_tool_python/README.md +++ b/agents/ten_packages/extension/bingsearch_tool_python/README.md @@ -1,29 +1,26 @@ # bingsearch_tool_python - +This is tool for bing search, the document link is as follow: https://learn.microsoft.com/en-us/bing/search-apis/bing-web-search/quickstarts/rest/python + +It is built using TEN Tool Call Protocol (Beta). ## Features - +It is the bing search tool that will auto register to any llm extension. + +The tool description is as follow: -- xxx feature +*Use Bing.com to search for latest information. Call this function if you are not sure about the answer.* ## API Refer to `api` definition in [manifest.json] and default values in [property.json](property.json). - - -## Development - -### Build - - - -### Unit test - - +- out: tool_register +- in: tool_call ## Misc - +- use Tool Call Protocol Standard +- support async call +- apply asyncio template diff --git a/agents/ten_packages/extension/bingsearch_tool_python/manifest.json b/agents/ten_packages/extension/bingsearch_tool_python/manifest.json index 3e5a4193..47f22fdd 100644 --- a/agents/ten_packages/extension/bingsearch_tool_python/manifest.json +++ b/agents/ten_packages/extension/bingsearch_tool_python/manifest.json @@ -19,5 +19,55 @@ "README.md" ] }, - "api": {} + "api": { + "property": { + "api_key": { + "type": "string" + } + }, + "cmd_out": [ + { + "name": "tool_register", + "property": { + "name": { + "type": "string" + }, + "description": { + "type": "string" + }, + "parameters": { + "type": "string" + } + }, + "required": [ + "name", + "description", + "parameters" + ], + "result": { + "property": { + "response": { + "type": "string" + } + } + } + } + ], + "cmd_in": [ + { + "name": "tool_call", + "property": { + "name": { + "type": "string" + }, + "args": { + "type": "string" + } + }, + "required": [ + "name" + ] + } + ] + } } \ No newline at end of file diff --git a/agents/ten_packages/extension/openai_chatgpt_python/extension.py b/agents/ten_packages/extension/openai_chatgpt_python/extension.py index 4a5af8b8..802a28d0 100644 --- a/agents/ten_packages/extension/openai_chatgpt_python/extension.py +++ b/agents/ten_packages/extension/openai_chatgpt_python/extension.py @@ -10,6 +10,9 @@ import random import threading import traceback +import functools + +from typing import List, Callable from .helper import AsyncEventEmitter, AsyncQueue, get_current_time, get_property_bool, get_property_float, get_property_int, get_property_string, parse_sentences, rgb2base64jpeg from .openai import OpenAIChatGPT, OpenAIChatGPTConfig @@ -29,6 +32,7 @@ CMD_IN_ON_USER_JOINED = "on_user_joined" CMD_IN_ON_USER_LEFT = "on_user_left" CMD_OUT_FLUSH = "flush" +CMD_IN_CHAT_COMPLETION = "chat_completion" DATA_IN_TEXT_DATA_PROPERTY_TEXT = "text" DATA_IN_TEXT_DATA_PROPERTY_IS_FINAL = "is_final" DATA_OUT_TEXT_DATA_PROPERTY_TEXT = "text" @@ -52,7 +56,7 @@ TASK_TYPE_CHAT_COMPLETION = "chat_completion" TASK_TYPE_CHAT_COMPLETION_WITH_VISION = "chat_completion_with_vision" - +TASK_TYPE_CHAT_COMPLETION_CMD = "chat_completion_cmd" class OpenAIChatGPTExtension(Extension): memory = [] @@ -193,6 +197,24 @@ def on_cmd(self, ten_env: TenEnv, cmd: Cmd) -> None: elif cmd_name == CMD_IN_ON_USER_LEFT: self.users_count -= 1 status_code, detail = StatusCode.OK, "success" + elif cmd_name == CMD_IN_CHAT_COMPLETION: + m_str = cmd.get_property_string("messages") + messages = json.loads(m_str) + stream = False + is_json = False + try: + stream = cmd.get_property_bool("stream") + except: + pass + + try: + is_json = cmd.get_property_bool("json") + except: + pass + + asyncio.run_coroutine_threadsafe(self.queue.put( + [TASK_TYPE_CHAT_COMPLETION_CMD, (messages, stream, is_json, cmd)]), self.loop) + return else: logger.info(f"on_cmd unknown cmd: {cmd_name}") status_code, detail = StatusCode.ERROR, "unknown cmd" @@ -237,11 +259,30 @@ async def _process_queue(self, ten_env: TenEnv): [task_type, message] = await self.queue.get() try: # Create a new task for the new message - self.current_task = asyncio.create_task( - self._run_chatflow(ten_env, task_type, message, self.memory)) + if task_type == TASK_TYPE_CHAT_COMPLETION_CMD: + logger.info("on chat completion cmd") + messages, stream, is_json, cmd = message + logger.info(f"on chat completion cmd with params {messages}, {stream}, {is_json}") + def callback(ten_env: TenEnv, cmd: Cmd, delta: str, is_final: bool, status_code = StatusCode.OK) -> None: + logger.info(f"on callback of chat completion {delta} {is_final} {status_code}") + cmd_result = CmdResult.create(status_code) + cmd_result.set_property_string("response", delta) + if is_final: + cmd_result.set_is_final(True) # end of streaming return + else: + cmd_result.set_is_final(False) # keep streaming return + ten_env.return_result(cmd_result, cmd) + + self.current_task = asyncio.create_task( + self._run_chat_completion_cmd(ten_env, messages, stream, is_json, functools.partial(callback, ten_env, cmd))) + else: + self.current_task = asyncio.create_task( + self._run_chatflow(ten_env, task_type, message, self.memory)) await self.current_task # Wait for the current task to finish or be cancelled except asyncio.CancelledError: logger.info(f"Task cancelled: {message}") + except: + logger.exception(f"failed to handle queue") async def _flush_queue(self): """Flushes the self.queue and cancels the current task.""" @@ -253,6 +294,45 @@ async def _flush_queue(self): logger.info("Cancelling the current task during flush.") self.current_task.cancel() + async def _run_chat_completion_cmd(self, ten_env: TenEnv, messages: List, stream: bool, is_json: bool, callback: Callable) -> None: + # Create an asyncio.Event to signal when content is finished + content_finished_event = asyncio.Event() + + async def handle_stream_content_update(content: str): + nonlocal callback + callback(content, False) + + async def handle_stream_content_finished(full_content: str): + nonlocal callback + callback("", True) + content_finished_event.set() + + async def handle_content_finished(full_content: str): + nonlocal callback + logger.info(f"handle_content_finished {full_content}") + callback(full_content, True) + content_finished_event.set() + + async def handle_refusal(refusal: str): + nonlocal callback + logger.info(f"handle_refusal {refusal}") + callback(refusal, True, StatusCode.ERROR) + content_finished_event.set() + + listener = AsyncEventEmitter() + if stream: + listener.on("content_update", handle_stream_content_update) + listener.on("content_finished", handle_stream_content_finished) + else: + listener.on("content_finished", handle_content_finished) + listener.on("on_refusal", handle_refusal) + + # Make an async API call to get chat completions + await self.openai_chatgpt.get_chat_completions(messages=messages, stream=stream, is_json=is_json, listener=listener) + + # Wait for the content to be finished + await content_finished_event.wait() + async def _run_chatflow(self, ten_env: TenEnv, task_type: str, input_text: str, memory): """Run the chatflow asynchronously.""" memory_cache = [] diff --git a/agents/ten_packages/extension/openai_chatgpt_python/openai.py b/agents/ten_packages/extension/openai_chatgpt_python/openai.py index 86ec1aa6..c4789014 100644 --- a/agents/ten_packages/extension/openai_chatgpt_python/openai.py +++ b/agents/ten_packages/extension/openai_chatgpt_python/openai.py @@ -73,6 +73,73 @@ def __init__(self, config: OpenAIChatGPTConfig): self.session.proxies.update(proxies) self.client.session = self.session + async def get_chat_completions(self, messages, tools = None, stream = False, is_json = False, listener = None): + req = { + "model": self.config.model, + "messages": messages, + "tools": tools, + "temperature": self.config.temperature, + "top_p": self.config.top_p, + "presence_penalty": self.config.presence_penalty, + "frequency_penalty": self.config.frequency_penalty, + "max_tokens": self.config.max_tokens, + "seed": self.config.seed, + "stream": stream, + } + if is_json: + req["messages"] = [{"role": "system", "content": "You need to response in json format."}, *messages] + req["response_format"] = { "type": "json_object" } + + # logger.info(f"before request {req}") + try: + response = await self.client.chat.completions.create(**req) + except Exception as e: + raise Exception(f"CreateChatCompletionStream failed, err: {e}") + + full_content = "" + + if stream: + async for chat_completion in response: + choice = chat_completion.choices[0] + delta = choice.delta + + content = delta.content if delta and delta.content else "" + + # Emit content update event (fire-and-forget) + if listener and content: + listener.emit('content_update', content) + + full_content += content + + # Check for tool calls + if delta.tool_calls: + for tool_call in delta.tool_calls: + logger.info(f"tool_call: {tool_call}") + + # Emit tool call event (fire-and-forget) + if listener: + listener.emit('tool_call', tool_call) + else: + choice = response.choices[0] + logger.info(f"on response {response}") + if choice.message.refusal and listener: + listener.emit('on_refusal', choice.message.refusal) + return + + full_content = choice.message.content + + if choice.message.tool_calls: + for tool_call in choice.message.tool_calls: + logger.info(f"tool_call: {tool_call}") + + # Emit tool call event (fire-and-forget) + if listener: + listener.emit('tool_call', tool_call) + + # Emit content finished event after the loop completes + if listener: + listener.emit('content_finished', full_content) + async def get_chat_completions_stream(self, messages, tools = None, listener = None): req = { "model": self.config.model, diff --git a/agents/ten_packages/extension/openai_v2v_python/extension.py b/agents/ten_packages/extension/openai_v2v_python/extension.py index 57ee3945..0dd18656 100644 --- a/agents/ten_packages/extension/openai_v2v_python/extension.py +++ b/agents/ten_packages/extension/openai_v2v_python/extension.py @@ -172,6 +172,9 @@ def on_data(self, ten_env: TenEnv, data: Data) -> None: def on_config_changed(self) -> None: # update session again + if self._update_session: + logger.info("update session after config changed") + self._update_session() return async def _init_connection(self): diff --git a/agents/ten_packages/extension/tsdb_firestore/BUILD.gn b/agents/ten_packages/extension/tsdb_firestore/BUILD.gn new file mode 100644 index 00000000..66830a25 --- /dev/null +++ b/agents/ten_packages/extension/tsdb_firestore/BUILD.gn @@ -0,0 +1,21 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2022-11. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +import("//build/feature/ten_package.gni") + +ten_package("tsdb_firestore") { + package_kind = "extension" + + resources = [ + "__init__.py", + "addon.py", + "extension.py", + "log.py", + "manifest.json", + "property.json", + ] +} diff --git a/agents/ten_packages/extension/tsdb_firestore/README.md b/agents/ten_packages/extension/tsdb_firestore/README.md new file mode 100644 index 00000000..9fd6d384 --- /dev/null +++ b/agents/ten_packages/extension/tsdb_firestore/README.md @@ -0,0 +1,29 @@ +# tsdb_firestore + + + +## Features + + + +- xxx feature + +## API + +Refer to `api` definition in [manifest.json] and default values in [property.json](property.json). + + + +## Development + +### Build + + + +### Unit test + + + +## Misc + + diff --git a/agents/ten_packages/extension/tsdb_firestore/__init__.py b/agents/ten_packages/extension/tsdb_firestore/__init__.py new file mode 100644 index 00000000..0f296203 --- /dev/null +++ b/agents/ten_packages/extension/tsdb_firestore/__init__.py @@ -0,0 +1,11 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2024-08. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +from . import addon +from .log import logger + +logger.info("tsdb_firestore extension loaded") diff --git a/agents/ten_packages/extension/tsdb_firestore/addon.py b/agents/ten_packages/extension/tsdb_firestore/addon.py new file mode 100644 index 00000000..b264634f --- /dev/null +++ b/agents/ten_packages/extension/tsdb_firestore/addon.py @@ -0,0 +1,22 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2024-08. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +from ten import ( + Addon, + register_addon_as_extension, + TenEnv, +) +from .extension import TSDBFirestoreExtension +from .log import logger + + +@register_addon_as_extension("tsdb_firestore") +class TSDBFirestoreExtensionAddon(Addon): + + def on_create_instance(self, ten_env: TenEnv, name: str, context) -> None: + logger.info("TSDBFirestoreExtensionAddon on_create_instance") + ten_env.on_create_instance_done(TSDBFirestoreExtension(name), context) diff --git a/agents/ten_packages/extension/tsdb_firestore/extension.py b/agents/ten_packages/extension/tsdb_firestore/extension.py new file mode 100644 index 00000000..da82bcb5 --- /dev/null +++ b/agents/ten_packages/extension/tsdb_firestore/extension.py @@ -0,0 +1,256 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2024-08. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# + +from ten import ( + AudioFrame, + VideoFrame, + Extension, + TenEnv, + Cmd, + StatusCode, + CmdResult, + Data, +) +import firebase_admin +from firebase_admin import credentials +from firebase_admin import firestore +import datetime +import queue +import threading +import json +from .log import logger +from typing import List + +DATA_IN_TEXT_DATA_PROPERTY_IS_FINAL = "is_final" +DATA_IN_TEXT_DATA_PROPERTY_TEXT = "text" +DATA_IN_TEXT_DATA_PROPERTY_STREAM_ID = "stream_id" + +PROPERTY_CREDENTIALS_PATH = "credentials_path" +PROPERTY_CHANNEL_NAME = "channel_name" +PROPERTY_COLLECTION_NAME = "collection_name" +PROPERTY_TTL = "ttl" + +RETRIEVE_CMD = "retrieve" +CMD_OUT_PROPERTY_RESPONSE = "response" +DOC_EXPIRE_PATH = "expireAt" +DOC_CONTENTS_PATH = "contents" +CONTENT_ID_PATH = "id" +CONTENT_TS_PATH = "ts" +CONTENT_INPUT_PATH = "input" + +def get_current_time(): + # Get the current time + start_time = datetime.datetime.now() + # Get the number of microseconds since the Unix epoch + unix_microseconds = int(start_time.timestamp() * 1_000_000) + return unix_microseconds + +def order_by_ts(contents: List[str]) -> List[str]: + tmp = [] + for c in contents: + tmp.append(json.loads(c)) + sorted_contents = sorted(tmp, key=lambda x: x[CONTENT_TS_PATH]) + res = [] + for sc in sorted_contents: + res.append(json.dumps({CONTENT_ID_PATH: sc[CONTENT_ID_PATH], CONTENT_INPUT_PATH: sc[CONTENT_INPUT_PATH]})) + return res + +@firestore.transactional +def update_in_transaction(transaction, doc_ref, content): + transaction.update(doc_ref, content) + +@firestore.transactional +def read_in_transaction(transaction, doc_ref): + doc = doc_ref.get(transaction=transaction) + if doc.exists: + return doc.to_dict() + return None + +class TSDBFirestoreExtension(Extension): + def __init__(self, name: str): + super().__init__(name) + self.stopped = False + self.thread = None + self.queue = queue.Queue() + self.credentials_path = "" + self.channel_name = "" + self.collection_name = "" + self.client = None + self.document_ref = None + + def on_init(self, ten_env: TenEnv) -> None: + logger.info("TSDBFirestoreExtension on_init") + ten_env.on_init_done() + + def on_start(self, ten_env: TenEnv) -> None: + logger.info("TSDBFirestoreExtension on_start") + + try: + self.credentials_path = ten_env.get_property_string(PROPERTY_CREDENTIALS_PATH) + except Exception as err: + logger.info(f"GetProperty required {PROPERTY_CREDENTIALS_PATH} failed, err: {err}") + + try: + self.channel_name = ten_env.get_property_string(PROPERTY_CHANNEL_NAME) + except Exception as err: + logger.info(f"GetProperty required {PROPERTY_CHANNEL_NAME} failed, err: {err}") + + try: + self.collection_name = ten_env.get_property_string(PROPERTY_COLLECTION_NAME) + except Exception as err: + logger.info(f"GetProperty required {PROPERTY_COLLECTION_NAME} failed, err: {err}") + + try: + self.ttl = ten_env.get_property_int(PROPERTY_TTL) + except Exception as err: + logger.info(f"GetProperty required {PROPERTY_TTL} failed, err: {err}") + + # start firestore db + cred = credentials.Certificate(self.credentials_path) + firebase_admin.initialize_app(cred) + self.client = firestore.client() + + self.document_ref = self.client.collection(self.collection_name).document(self.channel_name) + # update ttl + expiration_time = datetime.datetime.now(datetime.UTC) + datetime.timedelta(seconds=self.ttl) + exists = self.document_ref.get().exists + if exists: + self.document_ref.update( + { + DOC_EXPIRE_PATH: expiration_time + } + ) + else: + # not exists yet, set to create one + self.document_ref.set( + { + DOC_EXPIRE_PATH: expiration_time + } + ) + + self.thread = threading.Thread(target=self.async_handle, args=[ten_env]) + self.thread.start() + + ten_env.on_start_done() + + def async_handle(self, ten_env: TenEnv) -> None: + while not self.stopped: + try: + value = self.queue.get() + if value is None: + logger.info("exit handle loop") + break + ts, input, id = value + msg = {g} + self.insert(ten_env, json.dumps(msg)) + except Exception as e: + logger.exception("Failed to store chat contents") + + def on_stop(self, ten_env: TenEnv) -> None: + logger.info("TSDBFirestoreExtension on_stop") + self.stopped = True + while not self.queue.empty(): + self.queue.get() + self.queue.put(None) + if self.thread is not None: + self.thread.join() + self.thread = None + + ten_env.on_stop_done() + + def on_deinit(self, ten_env: TenEnv) -> None: + logger.info("TSDBFirestoreExtension on_deinit") + ten_env.on_deinit_done() + + def on_cmd(self, ten_env: TenEnv, cmd: Cmd) -> None: + try: + cmd_name = cmd.get_name() + logger.info("on_cmd name {}".format(cmd_name)) + if cmd_name == RETRIEVE_CMD: + self.retrieve(ten_env, cmd) + else: + logger.info("unknown cmd name {}".format(cmd_name)) + cmd_result = CmdResult.create(StatusCode.ERROR) + ten_env.return_result(cmd_result, cmd) + except Exception as e: + ten_env.return_result(CmdResult.create(StatusCode.ERROR), cmd) + + + def retrieve(self, ten_env: TenEnv, cmd: Cmd): + doc_dict = read_in_transaction(self.client.transaction(), self.document_ref) + if doc_dict is not None: + if DOC_CONTENTS_PATH in doc_dict: + contents = doc_dict[DOC_CONTENTS_PATH] + ret = CmdResult.create(StatusCode.OK) + ret.set_property_from_json(CMD_OUT_PROPERTY_RESPONSE, order_by_ts(contents)) + ten_env.return_result(ret, cmd) + else: + logger.info(f"no contents for the channel {self.channel_name}") + ten_env.return_result(CmdResult.create(StatusCode.ERROR), cmd) + else: + logger.info(f"no corresponding document found for the channel {self.channel_name}") + ten_env.return_result(CmdResult.create(StatusCode.ERROR), cmd) + + def insert(self, ten_env: TenEnv, content: str): + update_in_transaction( + self.client.transaction(), + self.document_ref, + { + DOC_CONTENTS_PATH: firestore.ArrayUnion([content]) + } + ) + logger.info(f"append {content} to firestore document {self.channel_name}") + + def on_data(self, ten_env: TenEnv, data: Data) -> None: + logger.info(f"TSDBFirestoreExtension on_data") + + # Assume 'data' is an object from which we can get properties + try: + is_final = data.get_property_bool(DATA_IN_TEXT_DATA_PROPERTY_IS_FINAL) + if not is_final: + logger.info("ignore non-final input") + return + except Exception as err: + logger.info( + f"OnData GetProperty {DATA_IN_TEXT_DATA_PROPERTY_IS_FINAL} failed, err: {err}" + ) + return + + # Get input text + try: + input_text = data.get_property_string(DATA_IN_TEXT_DATA_PROPERTY_TEXT) + if not input_text: + logger.info("ignore empty text") + return + logger.info(f"OnData input text: [{input_text}]") + except Exception as err: + logger.info( + f"OnData GetProperty {DATA_IN_TEXT_DATA_PROPERTY_TEXT} failed, err: {err}" + ) + return + + # Get input text + try: + stream_id = data.get_property_int(DATA_IN_TEXT_DATA_PROPERTY_STREAM_ID) + if not stream_id: + logger.warning("ignore empty stream_id") + return + except Exception as err: + logger.info( + f"OnData GetProperty {DATA_IN_TEXT_DATA_PROPERTY_STREAM_ID} failed, err: {err}" + ) + return + + ts = get_current_time + self.queue.put((ts, input_text, stream_id)) + + def on_audio_frame(self, ten_env: TenEnv, audio_frame: AudioFrame) -> None: + pass + + def on_video_frame(self, ten_env: TenEnv, video_frame: VideoFrame) -> None: + pass diff --git a/agents/ten_packages/extension/tsdb_firestore/log.py b/agents/ten_packages/extension/tsdb_firestore/log.py new file mode 100644 index 00000000..aa14bacd --- /dev/null +++ b/agents/ten_packages/extension/tsdb_firestore/log.py @@ -0,0 +1,22 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2024-08. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +import logging + +logger = logging.getLogger("tsdb_firestore") +logger.setLevel(logging.INFO) + +formatter_str = ( + "%(asctime)s - %(name)s - %(levelname)s - %(process)d - " + "[%(filename)s:%(lineno)d] - %(message)s" +) +formatter = logging.Formatter(formatter_str) + +console_handler = logging.StreamHandler() +console_handler.setFormatter(formatter) + +logger.addHandler(console_handler) diff --git a/agents/ten_packages/extension/tsdb_firestore/manifest.json b/agents/ten_packages/extension/tsdb_firestore/manifest.json new file mode 100644 index 00000000..0a678926 --- /dev/null +++ b/agents/ten_packages/extension/tsdb_firestore/manifest.json @@ -0,0 +1,49 @@ +{ + "type": "extension", + "name": "tsdb_firestore", + "version": "0.1.0", + "dependencies": [ + { + "type": "system", + "name": "ten_runtime_python", + "version": "0.2" + } + ], + "package": { + "include": [ + "manifest.json", + "property.json", + "BUILD.gn", + "**.tent", + "**.py", + "README.md" + ] + }, + "api": { + "data_in": [ + { + "name": "text_data", + "property": { + "text": { + "type": "string" + } + } + } + ], + "cmd_in": [ + { + "name": "retrieve", + "result": { + "property": { + "response": { + "type": "array", + "items": { + "type": "string" + } + } + } + } + } + ] + } +} \ No newline at end of file diff --git a/agents/ten_packages/extension/tsdb_firestore/property.json b/agents/ten_packages/extension/tsdb_firestore/property.json new file mode 100644 index 00000000..9e26dfee --- /dev/null +++ b/agents/ten_packages/extension/tsdb_firestore/property.json @@ -0,0 +1 @@ +{} \ No newline at end of file diff --git a/agents/ten_packages/extension/tsdb_firestore/requirements.txt b/agents/ten_packages/extension/tsdb_firestore/requirements.txt new file mode 100644 index 00000000..4720fc6f --- /dev/null +++ b/agents/ten_packages/extension/tsdb_firestore/requirements.txt @@ -0,0 +1 @@ +firebase-admin \ No newline at end of file diff --git a/agents/ten_packages/extension/vision_tool_python/BUILD.gn b/agents/ten_packages/extension/vision_tool_python/BUILD.gn new file mode 100644 index 00000000..460593d0 --- /dev/null +++ b/agents/ten_packages/extension/vision_tool_python/BUILD.gn @@ -0,0 +1,21 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2022-11. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +import("//build/feature/ten_package.gni") + +ten_package("vision_tool_python") { + package_kind = "extension" + + resources = [ + "__init__.py", + "addon.py", + "extension.py", + "log.py", + "manifest.json", + "property.json", + ] +} diff --git a/agents/ten_packages/extension/vision_tool_python/README.md b/agents/ten_packages/extension/vision_tool_python/README.md new file mode 100644 index 00000000..74278fb9 --- /dev/null +++ b/agents/ten_packages/extension/vision_tool_python/README.md @@ -0,0 +1,36 @@ +# vision_tool_python + +This is tool for vision ability, currently there are two patterns: +- use triditional model +- use multimodal llm model + +The pattern can be switched by `use_llm` pattern to use different cmd protocol. + +Tool description is as follow: + +*Query to the latest frame from camera. The camera is always on, always use latest frame to answer user's question. Call this whenever you need to understand the input camera image like you have vision capability, for example when user asks 'What can you see?', 'Can you see me?', 'take a look.'* + +It is built using TEN Tool Call Protocol (Beta). + +## Features + +The tool can accept video frame from rtc extension. + +The tool will only register itself to llm extension as soon as the video frame is received. + +The tool will cache video frame every `frequency_ms` ms. + +## API + +Refer to `api` definition in [manifest.json] and default values in [property.json](property.json). + +- out: `tool_register` +- in: `tool_call` +- out(`use_llm=false`): `analyze_image` +- out(`use_llm=true`): `chat_completion` + +## Misc + +- Multi-frame support +- Movement detection +- Prompt Engineering \ No newline at end of file diff --git a/agents/ten_packages/extension/vision_tool_python/__init__.py b/agents/ten_packages/extension/vision_tool_python/__init__.py new file mode 100644 index 00000000..f8700b05 --- /dev/null +++ b/agents/ten_packages/extension/vision_tool_python/__init__.py @@ -0,0 +1,11 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2024-08. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +from . import addon +from .log import logger + +logger.info("vision_tool_python extension loaded") diff --git a/agents/ten_packages/extension/vision_tool_python/addon.py b/agents/ten_packages/extension/vision_tool_python/addon.py new file mode 100644 index 00000000..afd8d21b --- /dev/null +++ b/agents/ten_packages/extension/vision_tool_python/addon.py @@ -0,0 +1,22 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2024-08. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +from ten import ( + Addon, + register_addon_as_extension, + TenEnv, +) +from .extension import VisionToolExtension +from .log import logger + + +@register_addon_as_extension("vision_tool_python") +class VisionToolExtensionAddon(Addon): + + def on_create_instance(self, ten_env: TenEnv, name: str, context) -> None: + logger.info("VisionToolExtensionAddon on_create_instance") + ten_env.on_create_instance_done(VisionToolExtension(name), context) diff --git a/agents/ten_packages/extension/vision_tool_python/extension.py b/agents/ten_packages/extension/vision_tool_python/extension.py new file mode 100644 index 00000000..2507edeb --- /dev/null +++ b/agents/ten_packages/extension/vision_tool_python/extension.py @@ -0,0 +1,363 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2024-08. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +import json +import functools + +from datetime import datetime +from queue import Queue +from threading import Event, Thread +from typing import Any, List, Tuple, Union +from PIL import Image +from io import BytesIO +from base64 import b64encode + +from ten import ( + AudioFrame, + VideoFrame, + Extension, + TenEnv, + Cmd, + StatusCode, + CmdResult, + Data, +) +from .log import logger + +PROPERTY_HISTORY = "history" +PROPERTY_FREQUENCY_MS = "frequency_ms" +PROPERTY_USE_LLM = "use_llm" + +CMD_TOOL_REGISTER = "tool_register" +CMD_TOOL_CALL = "tool_call" +CMD_PROPERTY_NAME = "name" +CMD_PROPERTY_ARGS = "args" + +CMD_CHAT_COMPLETION = "chat_completion" + +TOOL_REGISTER_PROPERTY_NAME = "name" +TOOL_REGISTER_PROPERTY_DESCRIPTON = "description" +TOOL_REGISTER_PROPERTY_PARAMETERS = "parameters" +TOOL_CALLBACK = "callback" + +# TODO auto register and unregister +SINGLE_FRAME_TOOL_NAME = "query_single_image" +SINGLE_FRAME_TOOL_DESCRIPTION = "Query to the latest frame from camera. The camera is always on, always use latest frame to answer user's question. Call this whenever you need to understand the input camera image like you have vision capability, for example when user asks 'What can you see?', 'Can you see me?', 'take a look.'" +SINGLE_FRAME_TOOL_PARAMETERS = { + "type": "object", + "properties": { + "query": { + "type": "string", + "description": "The detail infomation use is interested in. You need to summary the conversation context first and ask for detail information, e.g. We saw a laptop on the desk just now, can you identify what language is the code shown in the laptop screen?" + } + }, + "required": ["query"], +} + +SINGLE_FRAME_TOOL_NON_LLM_PARAMETERS = {} + +def resize_image_keep_aspect(image, max_size=512): + """ + Resize an image while maintaining its aspect ratio, ensuring the larger dimension is max_size. + If both dimensions are smaller than max_size, the image is not resized. + + :param image: A PIL Image object + :param max_size: The maximum size for the larger dimension (width or height) + :return: A PIL Image object (resized or original) + """ + # Get current width and height + width, height = image.size + + # If both dimensions are already smaller than max_size, return the original image + if width <= max_size and height <= max_size: + return image + + # Calculate the aspect ratio + aspect_ratio = width / height + + # Determine the new dimensions + if width > height: + new_width = max_size + new_height = int(max_size / aspect_ratio) + else: + new_height = max_size + new_width = int(max_size * aspect_ratio) + + # Resize the image with the new dimensions + resized_image = image.resize((new_width, new_height)) + + return resized_image + +def rgb2base64jpeg(rgb_data: bytes, width: int, height: int, raw: bool = False) -> Union[bytes, str]: + # Convert the RGB image to a PIL Image + pil_image = Image.frombytes("RGBA", (width, height), bytes(rgb_data)) + pil_image = pil_image.convert("RGB") + + # Resize the image while maintaining its aspect ratio + pil_image = resize_image_keep_aspect(pil_image, 320) + + # Save the image to a BytesIO object in JPEG format + buffered = BytesIO() + pil_image.save(buffered, format="JPEG") + # pil_image.save("test.jpg", format="JPEG") + + # Get the byte data of the JPEG image + jpeg_image_data = buffered.getvalue() + if raw: + return jpeg_image_data + + # Convert the JPEG byte data to a Base64 encoded string + base64_encoded_image = b64encode(jpeg_image_data).decode("utf-8") + + # Create the data URL + mime_type = "image/jpeg" + base64_url = f"data:{mime_type};base64,{base64_encoded_image}" + + return base64_url + +class VisionToolExtension(Extension): + max_history: int = 1 + frequency_ms: int = 60 + use_llm: bool = True + + history: list = [] + queue: Queue = Queue() + last_capture: datetime = None + llm_tools = {} + tools = {} + + thread: Thread = None + stopped: bool = False + + ten_env: TenEnv = None + + def on_init(self, ten_env: TenEnv) -> None: + logger.info("VisionToolExtension on_init") + + # Change tool! + self.tools = { + SINGLE_FRAME_TOOL_NAME: { + TOOL_REGISTER_PROPERTY_NAME: SINGLE_FRAME_TOOL_NAME, + TOOL_REGISTER_PROPERTY_DESCRIPTON: SINGLE_FRAME_TOOL_DESCRIPTION, + TOOL_REGISTER_PROPERTY_PARAMETERS: SINGLE_FRAME_TOOL_NON_LLM_PARAMETERS, + TOOL_CALLBACK: functools.partial(self._ask_to_latest_frames, 1) + } + } + + self.llm_tools = { + SINGLE_FRAME_TOOL_NAME: { + TOOL_REGISTER_PROPERTY_NAME: SINGLE_FRAME_TOOL_NAME, + TOOL_REGISTER_PROPERTY_DESCRIPTON: SINGLE_FRAME_TOOL_DESCRIPTION, + TOOL_REGISTER_PROPERTY_PARAMETERS: SINGLE_FRAME_TOOL_PARAMETERS, + TOOL_CALLBACK: functools.partial(self._ask_to_latest_frames, 1) + } + } + + ten_env.on_init_done() + + def on_start(self, ten_env: TenEnv) -> None: + logger.info("VisionToolExtension on_start") + + self.ten_env = ten_env + + self.thread = Thread(target=self.loop) + self.thread.start() + + try: + self.max_history = ten_env.get_property_string(PROPERTY_HISTORY) + except Exception as err: + logger.info(f"GetProperty optional {PROPERTY_HISTORY} error: {err}") + + try: + self.frequency_ms = ten_env.get_property_int(PROPERTY_FREQUENCY_MS) + except Exception as err: + logger.info(f"GetProperty optional {PROPERTY_FREQUENCY_MS} error: {err}") + + try: + self.use_llm = ten_env.get_property_bool(PROPERTY_USE_LLM) + except Exception as err: + logger.info(f"GetProperty optional {PROPERTY_USE_LLM} error: {err}") + + ten_env.on_start_done() + + def on_stop(self, ten_env: TenEnv) -> None: + logger.info("VisionToolExtension on_stop") + + self.stopped = True + self.queue.put(None) + self.thread.join() + + ten_env.on_stop_done() + + def on_deinit(self, ten_env: TenEnv) -> None: + logger.info("VisionToolExtension on_deinit") + ten_env.on_deinit_done() + + def loop(self) -> None: + while not self.stopped: + t = self.queue.get() + if t is None: + break + + try: + # unpack + callback, args, cmd = t + logger.info(f"before callback {args}") + resp = callback(args) + logger.info(f"after callback {resp}") + cmd_result = CmdResult.create(StatusCode.OK) + cmd_result.set_property_string("response", json.dumps(resp)) + self.ten_env.return_result(cmd_result, cmd) + except: + logger.exception(f"Failed to fetch from queue") + if cmd: + cmd_result = CmdResult.create(StatusCode.ERROR) + self.ten_env.return_result(cmd_result, cmd) + + def on_cmd(self, ten_env: TenEnv, cmd: Cmd) -> None: + cmd_name = cmd.get_name() + logger.info("on_cmd name {}".format(cmd_name)) + + # FIXME need to handle async + try: + name = cmd.get_property_string(CMD_PROPERTY_NAME) + if name in self.tools: + try: + tool = self.tools[name] + args = cmd.get_property_string(CMD_PROPERTY_ARGS) + arg_dict = json.loads(args) + self.queue.put((tool[TOOL_CALLBACK], arg_dict, cmd)) + # will return result later + return + except: + logger.exception("Failed to callback") + cmd_result = CmdResult.create(StatusCode.ERROR) + ten_env.return_result(cmd_result, cmd) + return + else: + logger.error(f"unknown tool name {name}") + except: + logger.exception("Failed to get tool name") + cmd_result = CmdResult.create(StatusCode.ERROR) + ten_env.return_result(cmd_result, cmd) + return + + cmd_result = CmdResult.create(StatusCode.OK) + ten_env.return_result(cmd_result, cmd) + + def on_data(self, ten_env: TenEnv, data: Data) -> None: + pass + + def on_audio_frame(self, ten_env: TenEnv, audio_frame: AudioFrame) -> None: + pass + + def on_video_frame(self, ten_env: TenEnv, video_frame: VideoFrame) -> None: + if self.last_capture is None: + # Register func after video is captured + tool_targets = self.tools + if self.use_llm: + tool_targets = self.llm_tools + + for name, tool in tool_targets.items(): + c = Cmd.create(CMD_TOOL_REGISTER) + c.set_property_string(TOOL_REGISTER_PROPERTY_NAME, name) + c.set_property_string(TOOL_REGISTER_PROPERTY_DESCRIPTON, tool[TOOL_REGISTER_PROPERTY_DESCRIPTON]) + c.set_property_string(TOOL_REGISTER_PROPERTY_PARAMETERS, json.dumps(tool[TOOL_REGISTER_PROPERTY_PARAMETERS])) + ten_env.send_cmd(c, lambda ten, result: logger.info(f"register done, {result}")) + + now = datetime.now() + if self.frequency_ms and (not self.last_capture or (now - self.last_capture).total_seconds() * 1000 > self.frequency_ms): + self.history.append((now, video_frame.get_buf(), video_frame.get_width(), video_frame.get_height())) + self.last_capture = now + + diff = len(self.history) > self.max_history + if diff > 0: + self.history = self.history[diff:] + + def _get_latest_frames(self, count:int = 3, raw: bool = False) -> Tuple[datetime, List[Union[bytes, str]]]: + start = len(self.history) - count + if start < 0: + start = 0 + + result = [] + min_ts = None + for i in range(start, len(self.history)): + ts, buff, width, height = self.history[i] + if not min_ts or ts < min_ts: + min_ts = ts + result.append(rgb2base64jpeg(buff, width, height, raw = raw)) + + return min_ts, result + + def _ask_to_latest_frames(self, count:int, args:dict = {}) -> Any: + if not self.history: + raise Exception("Failed to get frames") + + if self.use_llm: + min_ts, frames = self._get_latest_frames(count=count) + ts = min_ts.strftime("%Y-%m-%d %H:%M:%S") + cmd = self._chat_completion(args, frames, ts) + else: + _, frames = self._get_latest_frames(count=1, raw=True) + cmd = self._analyze_frame(frames[0]) + + e = Event() + rst = None + failed = True + def on_result(evt:Event, ten_env: TenEnv, result: CmdResult) -> None: + nonlocal rst + nonlocal failed + try: + if result.get_status_code() == StatusCode.OK: + rst = result.get_property_string("response") + # rst = json.loads(resp_str) + failed = False + else: + logger.error(f"Failed to get ok result") + rst = result.get_property_string("reason") + except: + logger.exception(f"Failed to get response") + finally: + evt.set() + + self.ten_env.send_cmd(cmd, functools.partial(on_result, e)) + e.wait() + if failed: + raise Exception("Failed to get resp") + else: + return rst + + def _analyze_frame(self, frame: bytes) -> Cmd: + cmd = Cmd.create("image_analyze") + cmd.set_property_buf("image_data", frame) + # TODO What to analyze + return cmd + + def _chat_completion(self, args:dict, frames: List[str], ts: datetime) -> Cmd: + if "query" not in args: + raise Exception("Failed to get property") + + query = args["query"] + contents = [ + {"type": "text", "text": "You need to describe all the object in this image first, and then focus on the user's query. Keep your response short and simple unless the query ask you to."}, + {"type": "text", "text": f"This is the image captured within {self.frequency_ms} ms at {ts}. {query}"} + ] + for f in frames: + contents.append({"type": "image_url", "image_url": {"url": f}}) + + messages = [{ + "role": "user", + "content": contents, + }] + + # logger.debug(f"after prepare message: {messages}") + # Send message + cmd = Cmd.create("chat_completion") + cmd.set_property_string("messages", json.dumps(messages)) + cmd.set_property_bool("stream", False) # this is function call, we need to have complete result + # cmd.set_property_bool("json", True) + return cmd \ No newline at end of file diff --git a/agents/ten_packages/extension/vision_tool_python/log.py b/agents/ten_packages/extension/vision_tool_python/log.py new file mode 100644 index 00000000..cb7f9580 --- /dev/null +++ b/agents/ten_packages/extension/vision_tool_python/log.py @@ -0,0 +1,22 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2024-08. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +import logging + +logger = logging.getLogger("vision_tool_python") +logger.setLevel(logging.INFO) + +formatter_str = ( + "%(asctime)s - %(name)s - %(levelname)s - %(process)d - " + "[%(filename)s:%(lineno)d] - %(message)s" +) +formatter = logging.Formatter(formatter_str) + +console_handler = logging.StreamHandler() +console_handler.setFormatter(formatter) + +logger.addHandler(console_handler) diff --git a/agents/ten_packages/extension/vision_tool_python/manifest.json b/agents/ten_packages/extension/vision_tool_python/manifest.json new file mode 100644 index 00000000..980f8210 --- /dev/null +++ b/agents/ten_packages/extension/vision_tool_python/manifest.json @@ -0,0 +1,84 @@ +{ + "type": "extension", + "name": "vision_tool_python", + "version": "0.1.0", + "dependencies": [ + { + "type": "system", + "name": "ten_runtime_python", + "version": "0.2" + }, + { + "type": "extension", + "name": "agora_rtc", + "version": "=0.7.0-rc2" + } + ], + "package": { + "include": [ + "manifest.json", + "property.json", + "BUILD.gn", + "**.tent", + "**.py", + "README.md" + ] + }, + "api": { + "property": { + "history": { + "type": "int64" + }, + "frequency_ms": { + "type": "int64" + }, + "use_llm": { + "type": "bool" + } + }, + "cmd_out": [ + { + "name": "tool_register", + "property": { + "name": { + "type": "string" + }, + "description": { + "type": "string" + }, + "parameters": { + "type": "string" + } + }, + "required": [ + "name", + "description", + "parameters" + ], + "result": { + "property": { + "response": { + "type": "string" + } + } + } + } + ], + "cmd_in": [ + { + "name": "tool_call", + "property": { + "name": { + "type": "string" + }, + "args": { + "type": "string" + } + }, + "required": [ + "name" + ] + } + ] + } +} \ No newline at end of file diff --git a/agents/ten_packages/extension/vision_tool_python/property.json b/agents/ten_packages/extension/vision_tool_python/property.json new file mode 100644 index 00000000..9e26dfee --- /dev/null +++ b/agents/ten_packages/extension/vision_tool_python/property.json @@ -0,0 +1 @@ +{} \ No newline at end of file diff --git a/agents/ten_packages/extension/weatherapi_tool_python/README.md b/agents/ten_packages/extension/weatherapi_tool_python/README.md index de7c18aa..81808f90 100644 --- a/agents/ten_packages/extension/weatherapi_tool_python/README.md +++ b/agents/ten_packages/extension/weatherapi_tool_python/README.md @@ -1,21 +1,23 @@ # weatherapi_tool_python -This is the tool demo for weather query. +This is the tool for weather query, including current weather, broadcast and history weather check, the document link is as follow: https://www.weatherapi.com/docs/ + +It is built using TEN Tool Call Protocol (Beta). ## Features +For free plan: - Fetch today's weather. -- Search for history weather. +- Search for history weather within 7 days. - Forcast weather in 3 days. -## API - -Refer to `api` definition in [manifest.json] and default values in [property.json](property.json). +You can extend by using other plan in your project. -### Out: +https://www.weatherapi.com/pricing.aspx -- `tool_register`: auto register tool to llm +## API -### In: +Refer to `api` definition in [manifest.json] and default values in [property.json](property.json). -- `tool_call`: sync cmd to fetch weather +- out: tool_register +- in: tool_call diff --git a/demo/src/app/api/agents/start/graph.tsx b/demo/src/app/api/agents/start/graph.tsx index f01d6d95..beb5694a 100644 --- a/demo/src/app/api/agents/start/graph.tsx +++ b/demo/src/app/api/agents/start/graph.tsx @@ -105,7 +105,7 @@ export const getGraphProperties = ( "azure_synthesis_voice_name": voiceNameMap[language]["azure"][voiceType] } } - } else if (graphName == "va.openai.v2v") { + } else if (graphName == "va.openai.v2v" || graphName == "va.openai.v2v.vision") { return { "openai_v2v_python": { "model": "gpt-4o-realtime-preview", diff --git a/demo/src/common/constant.ts b/demo/src/common/constant.ts index 00fe177d..bbee7e82 100644 --- a/demo/src/common/constant.ts +++ b/demo/src/common/constant.ts @@ -48,6 +48,10 @@ export const GRAPH_OPTIONS: GraphOptionItem[] = [ { label: "Voice Agent with OpenAI Realtime API (Beta)", value: "va.openai.v2v" + }, + { + label: "Voice Agent with OpenAI Realtime API (Beta) with Vision", + value: "va.openai.v2v.vision" } ]