diff --git a/agents/ten_packages/extension/glue_python_async/BUILD.gn b/agents/ten_packages/extension/glue_python_async/BUILD.gn new file mode 100644 index 00000000..c0ccf4d5 --- /dev/null +++ b/agents/ten_packages/extension/glue_python_async/BUILD.gn @@ -0,0 +1,20 @@ +# +# This file is part of TEN Framework, an open source project. +# Licensed under the Apache License, Version 2.0. +# See the LICENSE file for more information. +# +import("//build/feature/ten_package.gni") + +ten_package("glue_python_async") { + package_kind = "extension" + + resources = [ + "__init__.py", + "addon.py", + "extension.py", + "log.py", + "manifest.json", + "property.json", + "tests", + ] +} diff --git a/agents/ten_packages/extension/glue_python_async/README.md b/agents/ten_packages/extension/glue_python_async/README.md new file mode 100644 index 00000000..e7be443b --- /dev/null +++ b/agents/ten_packages/extension/glue_python_async/README.md @@ -0,0 +1,37 @@ +# glue_python_async + +This is a python extension for glue service. The schema of glue service is attached in `schema.yml`. + +An example of OpenAI wrapper is also attached in `examples/openai_wrapper.py`. + +## Features + +The extension will record history with count of `max_history`. + +- `api_url` (must have): the url for the glue service. +- `token` (must have): use Bearer token to support default auth + +The extension support flush that will close the existing http session. + +## API + +Refer to `api` definition in [manifest.json] and default values in [property.json](property.json). + +- In: + - `text_data` [data]: the asr result + - `flush` [cmd]: the flush signal +- Out: + - `flush` [cmd]: the flush signal + +## Examples + +You can run example using following command, and the wrapper service will listen 8000 by default. + +``` +> export API_TOKEN="xxx" && export OPENAI_API_KEY="xxx" && python3 openai_wrapper.py + +INFO: Started server process [162886] +INFO: Waiting for application startup. +INFO: Application startup complete. +INFO: Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit) +``` diff --git a/agents/ten_packages/extension/glue_python_async/__init__.py b/agents/ten_packages/extension/glue_python_async/__init__.py new file mode 100644 index 00000000..b5acaf2d --- /dev/null +++ b/agents/ten_packages/extension/glue_python_async/__init__.py @@ -0,0 +1,9 @@ +# +# This file is part of TEN Framework, an open source project. +# Licensed under the Apache License, Version 2.0. +# See the LICENSE file for more information. +# +from . import addon +from .log import logger + +logger.info("glue_python_async extension loaded") diff --git a/agents/ten_packages/extension/glue_python_async/addon.py b/agents/ten_packages/extension/glue_python_async/addon.py new file mode 100644 index 00000000..61d685b6 --- /dev/null +++ b/agents/ten_packages/extension/glue_python_async/addon.py @@ -0,0 +1,20 @@ +# +# This file is part of TEN Framework, an open source project. +# Licensed under the Apache License, Version 2.0. +# See the LICENSE file for more information. +# +from ten import ( + Addon, + register_addon_as_extension, + TenEnv, +) +from .extension import AsyncGlueExtension +from .log import logger + + +@register_addon_as_extension("glue_python_async") +class AsyncGlueExtensionAddon(Addon): + + def on_create_instance(self, ten_env: TenEnv, name: str, context) -> None: + logger.info("AsyncGlueExtensionAddon on_create_instance") + ten_env.on_create_instance_done(AsyncGlueExtension(name), context) diff --git a/agents/ten_packages/extension/glue_python_async/examples/openai_wrapper.py b/agents/ten_packages/extension/glue_python_async/examples/openai_wrapper.py new file mode 100644 index 00000000..b9299369 --- /dev/null +++ b/agents/ten_packages/extension/glue_python_async/examples/openai_wrapper.py @@ -0,0 +1,123 @@ +import os +import openai +from openai import AsyncOpenAI +import traceback # Add this import + +from typing import List, Union +from pydantic import BaseModel, HttpUrl + +from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials +from fastapi.responses import StreamingResponse +from fastapi import Depends, FastAPI, HTTPException, Request +import asyncio + +app = FastAPI(title="Chat Completion API", + description="API for streaming chat completions with support for text, image, and audio content", + version="1.0.0") + +# Set your OpenAI API key +openai.api_key = os.getenv("OPENAI_API_KEY") + +class TextContent(BaseModel): + type: str = "text" + text: str + +class ImageContent(BaseModel): + type: str = "image" + image_url: HttpUrl + +class AudioContent(BaseModel): + type: str = "audio" + audio_url: HttpUrl + +class Message(BaseModel): + role: str + content: Union[TextContent, ImageContent, AudioContent, List[Union[TextContent, ImageContent, AudioContent]]] + +class ChatCompletionRequest(BaseModel): + messages: List[Message] + model: str + temperature: float = 1.0 + stream: bool = True + +def format_openai_messages(messages): + formatted_messages = [] + for msg in messages: + if isinstance(msg.content, list): + content = [] + for item in msg.content: + if item.type == "text": + content.append({"type": "text", "text": item.text}) + elif item.type == "image": + content.append({"type": "image_url", "image_url": str(item.image_url)}) + elif item.type == "audio": + content.append({"type": "audio_url", "audio_url": str(item.audio_url)}) + else: + if msg.content.type == "text": + content = [{"type": "text", "text": msg.content.text}] + elif msg.content.type == "image": + content = [{"type": "image_url", "image_url": {"url": str(msg.content.image_url)}}] + elif msg.content.type == "audio": + content = [{"type": "audio_url", "audio_url": {"url": str(msg.content.audio_url)}}] + + formatted_messages.append({"role": msg.role, "content": content}) + return formatted_messages + +security = HTTPBearer() + +def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)): + token = credentials.credentials + if token != os.getenv("API_TOKEN"): + raise HTTPException(status_code=403, detail="Invalid or missing token") + +@app.post("/chat/completions", dependencies=[Depends(verify_token)]) +async def create_chat_completion(request: ChatCompletionRequest, req: Request): + try: + messages = format_openai_messages(request.messages) + client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY")) + response = await client.chat.completions.create( + model=request.model, + messages=messages, + temperature=request.temperature, + stream=request.stream + ) + + async def generate(): + try: + async for chunk in response: + if chunk.choices[0].delta.content is not None: + yield f"data: {chunk.choices[0].delta.content}\n\n" + yield "data: [DONE]\n\n" + except asyncio.CancelledError: + print("Request was cancelled") + raise + + return StreamingResponse(generate(), media_type="text/event-stream") + except asyncio.CancelledError: + print("Request was cancelled") + raise HTTPException(status_code=499, detail="Request was cancelled") + except Exception as e: + traceback_str = ''.join(traceback.format_tb(e.__traceback__)) + error_message = f"{str(e)}\n{traceback_str}" + print(error_message) + raise HTTPException(status_code=500, detail=error_message) + +if __name__ == "__main__": + import uvicorn + from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials + from fastapi import Depends + import traceback + + ''' + http_proxy = os.getenv("HTTP_PROXY") + https_proxy = os.getenv("HTTPS_PROXY") + + if http_proxy or https_proxy: + proxies = { + "http": http_proxy, + "https": https_proxy + } + openai.proxy = proxies + ''' + + uvicorn.run(app, host="0.0.0.0", port=8000) diff --git a/agents/ten_packages/extension/glue_python_async/examples/requirements.txt b/agents/ten_packages/extension/glue_python_async/examples/requirements.txt new file mode 100644 index 00000000..0cfcb8dd --- /dev/null +++ b/agents/ten_packages/extension/glue_python_async/examples/requirements.txt @@ -0,0 +1,3 @@ +uvicorn +fastapi +openai \ No newline at end of file diff --git a/agents/ten_packages/extension/glue_python_async/extension.py b/agents/ten_packages/extension/glue_python_async/extension.py new file mode 100644 index 00000000..4936c57f --- /dev/null +++ b/agents/ten_packages/extension/glue_python_async/extension.py @@ -0,0 +1,258 @@ +# +# This file is part of TEN Framework, an open source project. +# Licensed under the Apache License, Version 2.0. +# See the LICENSE file for more information. +# +import asyncio +import traceback +import aiohttp + +from datetime import datetime +from typing import List + +from ten import ( + AudioFrame, + VideoFrame, + AsyncExtension, + AsyncTenEnv, + Cmd, + StatusCode, + CmdResult, + Data, +) + +PROPERTY_API_URL = "api_url" +PROPERTY_USER_ID = "user_id" +PROPERTY_PROMPT = "prompt" +PROPERTY_TOKEN = "token" + +DATA_IN_TEXT_DATA_PROPERTY_IS_FINAL = "is_final" +DATA_IN_TEXT_DATA_PROPERTY_TEXT = "text" + +DATA_OUT_TEXT_DATA_PROPERTY_TEXT = "text" +DATA_OUT_TEXT_DATA_PROPERTY_END_OF_SEGMENT = "end_of_segment" + +def is_punctuation(char): + if char in [",", ",", ".", "。", "?", "?", "!", "!"]: + return True + return False + +def parse_sentences(sentence_fragment, content): + sentences = [] + current_sentence = sentence_fragment + for char in content: + current_sentence += char + if is_punctuation(char): + stripped_sentence = current_sentence + if any(c.isalnum() for c in stripped_sentence): + sentences.append(stripped_sentence) + current_sentence = "" + + remain = current_sentence + return sentences, remain + +class AsyncGlueExtension(AsyncExtension): + api_url: str = "http://localhost:8000/chat/completions" + user_id: str = "TenAgent" + prompt: str = "" + token: str = "" + outdate_ts = datetime.now() + sentence_fragment: str = "" + ten_env: AsyncTenEnv = None + loop: asyncio.AbstractEventLoop = None + stopped: bool = False + queue = asyncio.Queue() + history: List[dict] = [] + max_history: int = 10 + session: aiohttp.ClientSession = None + + async def on_init(self, ten_env: AsyncTenEnv) -> None: + ten_env.log_debug("on_init") + ten_env.on_init_done() + + async def on_start(self, ten_env: AsyncTenEnv) -> None: + ten_env.log_debug("on_start") + + self.loop = asyncio.get_event_loop() + + try: + self.api_url = ten_env.get_property_string(PROPERTY_API_URL) + except Exception as err: + ten_env.log_error(f"GetProperty optional {PROPERTY_API_URL} failed, err: {err}") + return + + try: + self.user_id = ten_env.get_property_string(PROPERTY_USER_ID) + except Exception as err: + ten_env.log_error(f"GetProperty optional {PROPERTY_USER_ID} failed, err: {err}") + + try: + self.prompt = ten_env.get_property_string(PROPERTY_PROMPT) + except Exception as err: + ten_env.log_error(f"GetProperty optional {PROPERTY_PROMPT} failed, err: {err}") + + try: + self.token = ten_env.get_property_string(PROPERTY_TOKEN) + except Exception as err: + ten_env.log_error(f"GetProperty optional {PROPERTY_TOKEN} failed, err: {err}") + + try: + self.max_history = ten_env.get_property_int("max_memory_length") + except Exception as err: + ten_env.log_error(f"GetProperty optional max_memory_length failed, err: {err}") + + self.ten_env = ten_env + self.loop.create_task(self._consume()) + + ten_env.on_start_done() + + async def on_stop(self, ten_env: AsyncTenEnv) -> None: + ten_env.log_debug("on_stop") + + self.stopped = True + await self.queue.put(None) + await self._flush() + + ten_env.on_stop_done() + + async def on_deinit(self, ten_env: AsyncTenEnv) -> None: + ten_env.log_debug("on_deinit") + ten_env.on_deinit_done() + + async def on_cmd(self, ten_env: AsyncTenEnv, cmd: Cmd) -> None: + cmd_name = cmd.get_name() + ten_env.log_debug("on_cmd name {}".format(cmd_name)) + + if cmd_name == "flush": + try: + await self._flush() + await ten_env.send_cmd(Cmd.create("flush")) + ten_env.log_info("on flush") + except Exception as e: + ten_env.log_error(f"{traceback.format_exc()} \n Failed to handle {e}") + + cmd_result = CmdResult.create(StatusCode.OK) + ten_env.return_result(cmd_result, cmd) + + async def on_data(self, ten_env: AsyncTenEnv, data: Data) -> None: + data_name = data.get_name() + ten_env.log_debug("on_data name {}".format(data_name)) + + is_final = False + input_text = "" + try: + is_final = data.get_property_bool(DATA_IN_TEXT_DATA_PROPERTY_IS_FINAL) + except Exception as err: + ten_env.log_info(f"GetProperty optional {DATA_IN_TEXT_DATA_PROPERTY_IS_FINAL} failed, err: {err}") + + try: + input_text = data.get_property_string(DATA_IN_TEXT_DATA_PROPERTY_TEXT) + except Exception as err: + ten_env.log_info(f"GetProperty optional {DATA_IN_TEXT_DATA_PROPERTY_TEXT} failed, err: {err}") + + if not is_final: + ten_env.log_info("ignore non-final input") + return + if not input_text: + ten_env.log_info("ignore empty text") + return + + ten_env.log_info(f"OnData input text: [{input_text}]") + + ts = datetime.now() + await self.queue.put((input_text, ts)) + + async def on_audio_frame(self, ten_env: AsyncTenEnv, audio_frame: AudioFrame) -> None: + pass + + async def on_video_frame(self, ten_env: AsyncTenEnv, video_frame: VideoFrame) -> None: + pass + + async def _flush(self): + self.ten_env.log_info("flush") + self.outdate_ts = datetime.now() + if self.session: + await self.session.close() + self.session = None + + def _need_interrrupt(self, ts: datetime) -> bool: + return self.outdate_ts > ts + + async def _send_text(self, text: str) -> None: + data = Data.create("text_data") + data.set_property_string(DATA_OUT_TEXT_DATA_PROPERTY_TEXT, text) + data.set_property_bool(DATA_OUT_TEXT_DATA_PROPERTY_END_OF_SEGMENT, True) + self.ten_env.send_data(data) + + async def _consume(self) -> None: + self.ten_env.log_info("start async loop") + while not self.stopped: + try: + value = await self.queue.get() + if value is None: + self.ten_env.log_info("async loop exit") + break + input, ts = value + if self._need_interrrupt(ts): + continue + + await self._chat(input, ts) + except Exception as e: + self.ten_env.log_error(f"Failed to handle {e}") + + async def _add_to_history(self, role: str, content: str) -> None: + self.history.append({"role": role, "content": content}) + if len(self.history) > self.max_history: + self.history = self.history[1:] + + async def _get_messages(self) -> List[dict]: + messages = [] + if self.prompt: + messages.append({"role": "system", "content": self.prompt}) + messages.extend(self.history) + return messages + + async def _chat(self, input: str, ts: datetime) -> None: + self.session = aiohttp.ClientSession() + try: + messages = await self._get_messages() + messages.append({"role": "user", "content": input}) + await self._add_to_history("user", input) + payload = { + "messages": [{"role": msg["role"], "content": {"type": "text", "text": msg["content"]}} for msg in messages], + "model": "gpt-3.5-turbo", + "temperature": 1.0, + "stream": True + } + self.ten_env.log_info(f"payload before sending: {payload}") + headers = { + "Authorization": f"Bearer {self.token}", + "Content-Type": "application/json" + } + total_output = "" + async with self.session.post(self.api_url, json=payload, headers=headers) as response: + async for line in response.content: + if self._need_interrrupt(ts): + self.ten_env.log_info("interrupted") + total_output += "[interrupted]" + break + + if line: + l = line.decode('utf-8').strip() + if l.startswith("data:"): + content = l[5:].strip() + if content == "[DONE]": + break + self.ten_env.log_info(f"content: {content}") + sentences, self.sentence_fragment = parse_sentences(self.sentence_fragment, content) + for s in sentences: + await self._send_text(s) + total_output += s + self.ten_env.log_info(f"total_output: {total_output}") + await self._add_to_history("assistant", total_output) + except Exception as e: + traceback.print_exc() + self.ten_env.log_error(f"Failed to handle {e}") + finally: + await self.session.close() + self.session = None diff --git a/agents/ten_packages/extension/glue_python_async/log.py b/agents/ten_packages/extension/glue_python_async/log.py new file mode 100644 index 00000000..5e7ec888 --- /dev/null +++ b/agents/ten_packages/extension/glue_python_async/log.py @@ -0,0 +1,20 @@ +# +# This file is part of TEN Framework, an open source project. +# Licensed under the Apache License, Version 2.0. +# See the LICENSE file for more information. +# +import logging + +logger = logging.getLogger("glue_python_async") +logger.setLevel(logging.INFO) + +formatter_str = ( + "%(asctime)s - %(name)s - %(levelname)s - %(process)d - " + "[%(filename)s:%(lineno)d] - %(message)s" +) +formatter = logging.Formatter(formatter_str) + +console_handler = logging.StreamHandler() +console_handler.setFormatter(formatter) + +logger.addHandler(console_handler) diff --git a/agents/ten_packages/extension/glue_python_async/manifest.json b/agents/ten_packages/extension/glue_python_async/manifest.json new file mode 100644 index 00000000..34065895 --- /dev/null +++ b/agents/ten_packages/extension/glue_python_async/manifest.json @@ -0,0 +1,69 @@ +{ + "type": "extension", + "name": "glue_python_async", + "version": "0.3.1", + "dependencies": [ + { + "type": "system", + "name": "ten_runtime_python", + "version": "0.3.1" + } + ], + "package": { + "include": [ + "manifest.json", + "property.json", + "BUILD.gn", + "**.tent", + "**.py", + "README.md", + "tests/**" + ] + }, + "api": { + "property": { + "token": { + "type": "string" + }, + "api_url": { + "type": "string" + }, + "user_id": { + "type": "string" + }, + "prompt": { + "type": "string" + } + }, + "data_in": [ + { + "name": "text_data", + "property": { + "text": { + "type": "string" + } + } + } + ], + "data_out": [ + { + "name": "text_data", + "property": { + "text": { + "type": "string" + } + } + } + ], + "cmd_in": [ + { + "name": "flush" + } + ], + "cmd_out": [ + { + "name": "flush" + } + ] + } +} \ No newline at end of file diff --git a/agents/ten_packages/extension/glue_python_async/property.json b/agents/ten_packages/extension/glue_python_async/property.json new file mode 100644 index 00000000..9e26dfee --- /dev/null +++ b/agents/ten_packages/extension/glue_python_async/property.json @@ -0,0 +1 @@ +{} \ No newline at end of file diff --git a/agents/ten_packages/extension/glue_python_async/schema.yml b/agents/ten_packages/extension/glue_python_async/schema.yml new file mode 100644 index 00000000..ce885c7b --- /dev/null +++ b/agents/ten_packages/extension/glue_python_async/schema.yml @@ -0,0 +1,143 @@ +openapi: 3.0.0 +info: + title: Streaming Chat Completion API with Multimedia Support + version: 1.0.0 + description: API for streaming chat completions with support for text, image, and audio content + +paths: + /chat/completions: + post: + summary: Create a streaming chat completion + description: Streams a chat completion response + operationId: createChatCompletion + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/ChatCompletionRequest' + responses: + '200': + description: Successful response + content: + application/json: + schema: + $ref: '#/components/schemas/ChatCompletionResponse' + x-stream: true + +components: + schemas: + ChatCompletionRequest: + type: object + required: + - messages + - model + properties: + messages: + type: array + items: + $ref: '#/components/schemas/Message' + model: + type: string + example: "gpt-4-vision-preview" + temperature: + type: number + format: float + minimum: 0 + maximum: 2 + default: 1 + stream: + type: boolean + default: true + + Message: + type: object + required: + - role + - content + properties: + role: + type: string + enum: [system, user, assistant] + content: + oneOf: + - $ref: '#/components/schemas/TextContent' + - $ref: '#/components/schemas/ImageContent' + - $ref: '#/components/schemas/AudioContent' + - type: array + items: + oneOf: + - $ref: '#/components/schemas/TextContent' + - $ref: '#/components/schemas/ImageContent' + - $ref: '#/components/schemas/AudioContent' + + TextContent: + type: object + required: + - type + - text + properties: + type: + type: string + enum: [text] + text: + type: string + + ImageContent: + type: object + required: + - type + - image_url + properties: + type: + type: string + enum: [image] + image_url: + type: string + format: uri + + AudioContent: + type: object + required: + - type + - audio_url + properties: + type: + type: string + enum: [audio] + audio_url: + type: string + format: uri + + ChatCompletionResponse: + type: object + properties: + id: + type: string + object: + type: string + created: + type: integer + model: + type: string + choices: + type: array + items: + $ref: '#/components/schemas/Choice' + + Choice: + type: object + properties: + delta: + $ref: '#/components/schemas/Delta' + index: + type: integer + finish_reason: + type: string + nullable: true + + Delta: + type: object + properties: + content: + type: string \ No newline at end of file diff --git a/agents/ten_packages/extension/glue_python_async/tests/test_basic.py b/agents/ten_packages/extension/glue_python_async/tests/test_basic.py new file mode 100644 index 00000000..c3755f44 --- /dev/null +++ b/agents/ten_packages/extension/glue_python_async/tests/test_basic.py @@ -0,0 +1,36 @@ +# +# Copyright © 2024 Agora +# This file is part of TEN Framework, an open source project. +# Licensed under the Apache License, Version 2.0, with certain conditions. +# Refer to the "LICENSE" file in the root directory for more information. +# +from pathlib import Path +from ten import ExtensionTester, TenEnvTester, Cmd, CmdResult, StatusCode + + +class ExtensionTesterBasic(ExtensionTester): + def check_hello(self, ten_env: TenEnvTester, result: CmdResult): + statusCode = result.get_status_code() + print("receive hello_world, status:" + str(statusCode)) + + if statusCode == StatusCode.OK: + ten_env.stop_test() + + def on_start(self, ten_env: TenEnvTester) -> None: + new_cmd = Cmd.create("hello_world") + + print("send hello_world") + ten_env.send_cmd( + new_cmd, + lambda ten_env, result: self.check_hello(ten_env, result), + ) + + print("tester on_start_done") + ten_env.on_start_done() + + +def test_basic(): + tester = ExtensionTesterBasic() + tester.add_addon_base_dir(str(Path(__file__).resolve().parent.parent)) + tester.set_test_mode_single("default_async_extension_python") + tester.run()