diff --git a/agents/addon/extension/aliyun_analyticdb_vector_storage/__init__.py b/agents/addon/extension/aliyun_analyticdb_vector_storage/__init__.py new file mode 100644 index 00000000..1af08bf4 --- /dev/null +++ b/agents/addon/extension/aliyun_analyticdb_vector_storage/__init__.py @@ -0,0 +1,4 @@ +from . import vector_storage_addon +from .log import logger + +logger.info("aliyun_analyticdb_vector_storage extension loaded") diff --git a/agents/addon/extension/aliyun_analyticdb_vector_storage/client.py b/agents/addon/extension/aliyun_analyticdb_vector_storage/client.py new file mode 100644 index 00000000..43c4f541 --- /dev/null +++ b/agents/addon/extension/aliyun_analyticdb_vector_storage/client.py @@ -0,0 +1,98 @@ +# -*- coding: utf-8 -*- + +try: + from .log import logger +except ImportError: + from log import logger +import asyncio +import threading +from typing import Coroutine +from concurrent.futures import Future + + +from alibabacloud_gpdb20160503.client import Client as gpdb20160503Client +from alibabacloud_tea_openapi import models as open_api_models + + +# maybe need multiple clients +class AliGPDBClient: + def __init__(self, access_key_id, access_key_secret, endpoint): + self.stopEvent = asyncio.Event() + self.loop = None + self.tasks = asyncio.Queue() + self.access_key_id = access_key_id + self.access_key_secret = access_key_secret + self.endpoint = endpoint + self.client = self.create_client() + self.thread = threading.Thread( + target=asyncio.run, args=(self.__thread_routine(),) + ) + self.thread.start() + + async def stop_thread(self): + self.stopEvent.set() + + def create_client(self) -> gpdb20160503Client: + config = open_api_models.Config( + access_key_id=self.access_key_id, + access_key_secret=self.access_key_secret, + endpoint=self.endpoint, + ) + return gpdb20160503Client(config) + + def get(self) -> gpdb20160503Client: + return self.client + + def close(self): + if (self.loop is not None) and self.thread.is_alive(): + self.stopEvent.set() + asyncio.run_coroutine_threadsafe(self.stop_thread(), self.loop) + self.thread.join() + + async def __thread_routine(self): + logger.info("client __thread_routine start") + self.loop = asyncio.get_running_loop() + tasks = set() + while not self.stopEvent.is_set(): + if not self.tasks.empty(): + coro, future = await self.tasks.get() + try: + task = asyncio.create_task(coro) + tasks.add(task) + task.add_done_callback(lambda t: future.set_result(t.result())) + except Exception as e: + future.set_exception(e) + elif tasks: + done, tasks = await asyncio.wait( + tasks, return_when=asyncio.FIRST_COMPLETED + ) + for task in done: + if task.exception(): + logger.error(f"task exception: {task.exception()}") + future.set_exception(task.exception()) + else: + await asyncio.sleep(0.1) + logger.info("client __thread_routine end") + + async def submit_task(self, coro: Coroutine) -> Future: + future = Future() + await self.tasks.put((coro, future)) + return future + + def submit_task_with_new_thread(self, coro: Coroutine) -> Future: + future = Future() + + def run_coro_in_new_thread(): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + result = loop.run_until_complete(coro) + future.set_result(result) + except Exception as e: + future.set_exception(e) + finally: + loop.close() + + thread = threading.Thread(target=run_coro_in_new_thread) + thread.start() + return future diff --git a/agents/addon/extension/aliyun_analyticdb_vector_storage/log.py b/agents/addon/extension/aliyun_analyticdb_vector_storage/log.py new file mode 100644 index 00000000..0cfa1aaf --- /dev/null +++ b/agents/addon/extension/aliyun_analyticdb_vector_storage/log.py @@ -0,0 +1,13 @@ +import logging + +logger = logging.getLogger("aliyun_analyticdb_vector_storage") +logger.setLevel(logging.INFO) + +formatter = logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(process)d - [%(filename)s:%(lineno)d] - %(message)s" +) + +console_handler = logging.StreamHandler() +console_handler.setFormatter(formatter) + +logger.addHandler(console_handler) diff --git a/agents/addon/extension/aliyun_analyticdb_vector_storage/manifest.json b/agents/addon/extension/aliyun_analyticdb_vector_storage/manifest.json new file mode 100644 index 00000000..028a2154 --- /dev/null +++ b/agents/addon/extension/aliyun_analyticdb_vector_storage/manifest.json @@ -0,0 +1,122 @@ +{ + "type": "extension", + "name": "aliyun_analyticdb_vector_storage", + "version": "0.4.0", + "language": "python", + "dependencies": [ + { + "type": "system", + "name": "rte_runtime_python", + "version": "0.4" + } + ], + "api": { + "property": { + "alibaba_cloud_access_key_id": { + "type": "string" + }, + "alibaba_cloud_access_key_secret": { + "type": "string" + }, + "adbpg_instance_id": { + "type": "string" + }, + "adbpg_instance_region": { + "type": "string" + }, + "adbpg_account": { + "type": "string" + }, + "adbpg_account_password": { + "type": "string" + }, + "adbpg_namespace": { + "type": "string" + }, + "adbpg_namespace_password": { + "type": "string" + } + }, + "cmd_in": [ + { + "name": "upsert_vector", + "property": { + "collection_name": { + "type": "string" + }, + "file_name": { + "type": "string" + }, + "content": { + "type": "string" + } + } + }, + { + "name": "query_vector", + "property": { + "collection_name": { + "type": "string" + }, + "top_k": { + "type": "int64" + }, + "embedding": { + "type": "array", + "items": { + "type": "float64" + } + } + }, + "required": [ + "collection_name", + "top_k", + "embedding" + ], + "result": { + "property": { + "response": { + "type": "array", + "items": { + "type": "object", + "properties": { + "content": { + "type": "string" + }, + "score": { + "type": "float64" + } + } + } + } + } + } + }, + { + "name": "create_collection", + "property": { + "collection_name": { + "type": "string" + }, + "dimension": { + "type": "int32" + } + }, + "required": [ + "collection_name" + ] + }, + { + "name": "delete_collection", + "property": { + "collection_name": { + "type": "string" + } + }, + "required": [ + "collection_name" + ] + } + ] + } +} \ No newline at end of file diff --git a/agents/addon/extension/aliyun_analyticdb_vector_storage/model.py b/agents/addon/extension/aliyun_analyticdb_vector_storage/model.py new file mode 100644 index 00000000..48365736 --- /dev/null +++ b/agents/addon/extension/aliyun_analyticdb_vector_storage/model.py @@ -0,0 +1,547 @@ +# -*- coding: utf-8 -*- + +from alibabacloud_gpdb20160503 import models as gpdb_20160503_models # type: ignore + +try: + from .log import logger +except ImportError: + from log import logger +import time +import json +from typing import Dict, List, Any, Tuple +from alibabacloud_tea_util import models as util_models + + +class Model: + def __init__(self, region_id, dbinstance_id, client): + self.region_id = region_id + self.dbinstance_id = dbinstance_id + self.client = client + self.read_timeout = 10 * 1000 + self.connect_timeout = 10 * 1000 + + def get_client(self): + return self.client.get() + + def init_vector_database(self, account, account_password) -> None: + try: + request = gpdb_20160503_models.InitVectorDatabaseRequest( + region_id=self.region_id, + dbinstance_id=self.dbinstance_id, + manager_account=account, + manager_account_password=account_password, + ) + runtime = util_models.RuntimeOptions( + read_timeout=self.read_timeout, connect_timeout=self.connect_timeout + ) + response = self.get_client().init_vector_database_with_options( + request, runtime + ) + logger.debug( + f"init_vector_database response code: {response.status_code}, body:{response.body}" + ) + except Exception as e: + logger.error(f"Error: {e}") + return e + + async def init_vector_database_async(self, account, account_password) -> None: + try: + request = gpdb_20160503_models.InitVectorDatabaseRequest( + region_id=self.region_id, + dbinstance_id=self.dbinstance_id, + manager_account=account, + manager_account_password=account_password, + ) + runtime = util_models.RuntimeOptions( + read_timeout=self.read_timeout, connect_timeout=self.connect_timeout + ) + response = await self.get_client().init_vector_database_with_options_async( + request, runtime + ) + logger.debug( + f"init_vector_database response code: {response.status_code}, body:{response.body}" + ) + except Exception as e: + logger.error(f"Error: {e}") + return e + + def create_namespace( + self, account, account_password, namespace, namespace_password + ) -> None: + try: + request = gpdb_20160503_models.CreateNamespaceRequest( + region_id=self.region_id, + dbinstance_id=self.dbinstance_id, + manager_account=account, + manager_account_password=account_password, + namespace=namespace, + namespace_password=namespace_password, + ) + runtime = util_models.RuntimeOptions( + read_timeout=self.read_timeout, connect_timeout=self.connect_timeout + ) + response = self.get_client().create_namespace_with_options(request, runtime) + logger.debug( + f"create_namespace response code: {response.status_code}, body:{response.body}" + ) + except Exception as e: + logger.error(f"Error: {e}") + return e + + async def create_namespace_async( + self, account, account_password, namespace, namespace_password + ) -> None: + try: + request = gpdb_20160503_models.CreateNamespaceRequest( + region_id=self.region_id, + dbinstance_id=self.dbinstance_id, + manager_account=account, + manager_account_password=account_password, + namespace=namespace, + namespace_password=namespace_password, + ) + runtime = util_models.RuntimeOptions( + read_timeout=self.read_timeout, connect_timeout=self.connect_timeout + ) + response = await self.get_client().create_namespace_with_options_async( + request, runtime + ) + logger.debug( + f"create_namespace response code: {response.status_code}, body:{response.body}" + ) + except Exception as e: + logger.error(f"Error: {e}") + return e + + def create_collection( + self, + account, + account_password, + namespace, + collection, + parser: str = None, + metrics: str = None, + hnsw_m: int = None, + pq_enable: int = None, + external_storage: int = None, + ) -> None: + try: + metadata = '{"update_ts": "bigint", "file_name": "text", "content": "text"}' + full_text_retrieval_fields = "update_ts,file_name" + request = gpdb_20160503_models.CreateCollectionRequest( + region_id=self.region_id, + dbinstance_id=self.dbinstance_id, + manager_account=account, + manager_account_password=account_password, + namespace=namespace, + collection=collection, + metadata=metadata, + full_text_retrieval_fields=full_text_retrieval_fields, + parser=parser, + metrics=metrics, + hnsw_m=hnsw_m, + pq_enable=pq_enable, + external_storage=external_storage, + ) + runtime = util_models.RuntimeOptions( + read_timeout=self.read_timeout, connect_timeout=self.connect_timeout + ) + response = self.get_client().create_collection_with_options( + request, runtime + ) + logger.debug( + f"create_document_collection response code: {response.status_code}, body:{response.body}" + ) + except Exception as e: + logger.error(f"Error: {e}") + return e + + async def create_collection_async( + self, + account, + account_password, + namespace, + collection, + parser: str = None, + metrics: str = None, + hnsw_m: int = None, + pq_enable: int = None, + external_storage: int = None, + ) -> None: + try: + metadata = '{"update_ts": "bigint", "file_name": "text", "content": "text"}' + full_text_retrieval_fields = "update_ts,file_name" + request = gpdb_20160503_models.CreateCollectionRequest( + region_id=self.region_id, + dbinstance_id=self.dbinstance_id, + manager_account=account, + manager_account_password=account_password, + namespace=namespace, + collection=collection, + metadata=metadata, + full_text_retrieval_fields=full_text_retrieval_fields, + parser=parser, + metrics=metrics, + hnsw_m=hnsw_m, + pq_enable=pq_enable, + external_storage=external_storage, + ) + runtime = util_models.RuntimeOptions( + read_timeout=self.read_timeout, connect_timeout=self.connect_timeout + ) + response = await self.get_client().create_collection_with_options_async( + request, runtime + ) + logger.debug( + f"create_document_collection response code: {response.status_code}, body:{response.body}" + ) + except Exception as e: + logger.error(f"Error: {e}") + return e + + def delete_collection(self, namespace, namespace_password, collection) -> None: + try: + request = gpdb_20160503_models.DeleteCollectionRequest( + region_id=self.region_id, + dbinstance_id=self.dbinstance_id, + namespace_password=namespace_password, + namespace=namespace, + collection=collection, + ) + runtime = util_models.RuntimeOptions( + read_timeout=self.read_timeout, connect_timeout=self.connect_timeout + ) + response = self.get_client().delete_collection_with_options( + request, runtime + ) + logger.debug( + f"delete_collection response code: {response.status_code}, body:{response.body}" + ) + except Exception as e: + logger.error(f"Error: {e}") + return e + + async def delete_collection_async( + self, namespace, namespace_password, collection + ) -> None: + try: + request = gpdb_20160503_models.DeleteCollectionRequest( + region_id=self.region_id, + dbinstance_id=self.dbinstance_id, + namespace_password=namespace_password, + namespace=namespace, + collection=collection, + ) + runtime = util_models.RuntimeOptions( + read_timeout=self.read_timeout, connect_timeout=self.connect_timeout + ) + response = await self.get_client().delete_collection_with_options_async( + request, runtime + ) + logger.info( + f"delete_collection response code: {response.status_code}, body:{response.body}" + ) + except Exception as e: + logger.error(f"Error: {e}") + return e + + def upsert_collection_data( + self, + collection, + namespace, + namespace_password, + rows: List[Tuple[str, str, List[float]]] = None, + ) -> None: + try: + request_rows = [] + for row in rows: + file_name = row[0] + content = row[1] + vector = row[2] + metadata = { + "update_ts": int(time.time() * 1000), + "file_name": file_name, + "content": content, + } + request_row = gpdb_20160503_models.UpsertCollectionDataRequestRows( + metadata=metadata, vector=vector + ) + request_rows.append(request_row) + upsert_collection_data_request = ( + gpdb_20160503_models.UpsertCollectionDataRequest( + region_id=self.region_id, + dbinstance_id=self.dbinstance_id, + collection=collection, + namespace_password=namespace_password, + namespace=namespace, + rows=request_rows, + ) + ) + runtime = util_models.RuntimeOptions( + read_timeout=self.read_timeout, connect_timeout=self.connect_timeout + ) + response = self.get_client().upsert_collection_data_with_options( + upsert_collection_data_request, runtime + ) + logger.debug( + f"upsert_collection response code: {response.status_code}, body:{response.body}" + ) + except Exception as e: + logger.error(f"Error: {e}") + return e + + async def upsert_collection_data_async( + self, + collection, + namespace, + namespace_password, + rows: List[Tuple[str, str, List[float]]] = None, + ) -> None: + try: + request_rows = [] + for row in rows: + file_name = row[0] + content = row[1] + vector = row[2] + metadata = { + "update_ts": int(time.time() * 1000), + "file_name": file_name, + "content": content, + } + request_row = gpdb_20160503_models.UpsertCollectionDataRequestRows( + metadata=metadata, vector=vector + ) + request_rows.append(request_row) + upsert_collection_data_request = ( + gpdb_20160503_models.UpsertCollectionDataRequest( + region_id=self.region_id, + dbinstance_id=self.dbinstance_id, + collection=collection, + namespace_password=namespace_password, + namespace=namespace, + rows=request_rows, + ) + ) + runtime = util_models.RuntimeOptions( + read_timeout=self.read_timeout, connect_timeout=self.connect_timeout + ) + response = ( + await self.get_client().upsert_collection_data_with_options_async( + upsert_collection_data_request, runtime + ) + ) + logger.debug( + f"upsert_collection response code: {response.status_code}, body:{response.body}" + ) + except Exception as e: + logger.error(f"Error: {e}") + return e + + def query_collection_data( + self, + collection, + namespace, + namespace_password, + vector: List[float] = None, + top_k: int = 10, + content: str = None, + filter: str = None, + hybrid_search: str = None, + hybrid_search_args: Dict[str, dict] = None, + include_metadata_fields: str = None, + include_values: bool = None, + metrics: str = None, + ) -> Tuple[Any, Any]: + try: + query_collection_data_request = ( + gpdb_20160503_models.QueryCollectionDataRequest( + region_id=self.region_id, + dbinstance_id=self.dbinstance_id, + collection=collection, + namespace_password=namespace_password, + namespace=namespace, + vector=vector, + top_k=top_k, + content=content, + filter=filter, + hybrid_search=hybrid_search, + hybrid_search_args=hybrid_search_args, + include_metadata_fields=include_metadata_fields, + include_values=include_values, + metrics=metrics, + ) + ) + runtime = util_models.RuntimeOptions( + read_timeout=self.read_timeout, connect_timeout=self.connect_timeout + ) + response = self.get_client().query_collection_data_with_options( + query_collection_data_request, runtime + ) + # logger.info(f"query_collection response code: {response.status_code}, body:{response.body}") + logger.debug(f"query_collection response code: {response.status_code}") + return response, None + except Exception as e: + logger.error(f"Error: {e}") + return None, e + + async def query_collection_data_async( + self, + collection, + namespace, + namespace_password, + vector: List[float] = None, + top_k: int = 10, + content: str = None, + filter: str = None, + hybrid_search: str = None, + hybrid_search_args: Dict[str, dict] = None, + include_metadata_fields: str = None, + include_values: bool = None, + metrics: str = None, + ) -> Tuple[Any, Any]: + try: + query_collection_data_request = ( + gpdb_20160503_models.QueryCollectionDataRequest( + region_id=self.region_id, + dbinstance_id=self.dbinstance_id, + collection=collection, + namespace_password=namespace_password, + namespace=namespace, + vector=vector, + top_k=top_k, + content=content, + filter=filter, + hybrid_search=hybrid_search, + hybrid_search_args=hybrid_search_args, + include_metadata_fields=include_metadata_fields, + include_values=include_values, + metrics=metrics, + ) + ) + runtime = util_models.RuntimeOptions( + read_timeout=self.read_timeout, connect_timeout=self.connect_timeout + ) + response = await self.get_client().query_collection_data_with_options_async( + query_collection_data_request, runtime + ) + logger.debug(f"query_collection response code: {response.status_code}") + return response, None + except Exception as e: + logger.error(f"Error: {e}") + return None, e + + def parse_collection_data( + self, body: gpdb_20160503_models.QueryCollectionDataResponseBody + ) -> str: + try: + matches = body.to_map()["Matches"]["match"] + results = [ + {"content": match["Metadata"]["content"], "score": match["Score"]} + for match in matches + ] + results.sort(key=lambda x: x["score"], reverse=True) + json_str = json.dumps(results) + return json_str + except Exception as e: + logger.error(f"Error: {e}") + return "[]" + + def list_collections(self, namespace, namespace_password) -> Tuple[List[str], Any]: + try: + request = gpdb_20160503_models.ListCollectionsRequest( + region_id=self.region_id, + dbinstance_id=self.dbinstance_id, + namespace=namespace, + namespace_password=namespace_password, + ) + runtime = util_models.RuntimeOptions( + read_timeout=self.read_timeout, connect_timeout=self.connect_timeout + ) + response = self.get_client().list_collections_with_options(request, runtime) + logger.debug( + f"list_collections response code: {response.status_code}, body:{response.body}" + ) + collections = response.body.to_map()["Collections"]["collection"] + return collections, None + except Exception as e: + logger.error(f"Error: {e}") + return [], e + + async def list_collections_async( + self, namespace, namespace_password + ) -> Tuple[List[str], Any]: + try: + request = gpdb_20160503_models.ListCollectionsRequest( + region_id=self.region_id, + dbinstance_id=self.dbinstance_id, + namespace=namespace, + namespace_password=namespace_password, + ) + runtime = util_models.RuntimeOptions( + read_timeout=self.read_timeout, connect_timeout=self.connect_timeout + ) + response = await self.get_client().list_collections_with_options_async( + request, runtime + ) + logger.debug( + f"list_collections response code: {response.status_code}, body:{response.body}" + ) + collections = response.body.to_map()["Collections"]["collection"] + return collections, None + except Exception as e: + logger.error(f"Error: {e}") + return [], e + + def create_vector_index( + self, account, account_password, namespace, collection, dimension + ) -> None: + try: + request = gpdb_20160503_models.CreateVectorIndexRequest( + region_id=self.region_id, + dbinstance_id=self.dbinstance_id, + manager_account=account, + manager_account_password=account_password, + namespace=namespace, + collection=collection, + dimension=dimension, + pq_enable=0, + ) + runtime = util_models.RuntimeOptions( + read_timeout=self.read_timeout, connect_timeout=self.connect_timeout + ) + response = self.get_client().create_vector_index_with_options( + request, runtime + ) + logger.debug( + f"create_vector_index response code: {response.status_code}, body:{response.body}" + ) + except Exception as e: + logger.error(f"Error: {e}") + return e + + async def create_vector_index_async( + self, account, account_password, namespace, collection, dimension + ) -> None: + try: + request = gpdb_20160503_models.CreateVectorIndexRequest( + region_id=self.region_id, + dbinstance_id=self.dbinstance_id, + manager_account=account, + manager_account_password=account_password, + namespace=namespace, + collection=collection, + dimension=dimension, + pq_enable=0, + ) + runtime = util_models.RuntimeOptions( + read_timeout=self.read_timeout, connect_timeout=self.connect_timeout + ) + response = await self.get_client().create_vector_index_with_options_async( + request, runtime + ) + logger.debug( + f"create_vector_index response code: {response.status_code}, body:{response.body}" + ) + except Exception as e: + logger.error(f"Error: {e}") + return e diff --git a/agents/addon/extension/aliyun_analyticdb_vector_storage/property.json b/agents/addon/extension/aliyun_analyticdb_vector_storage/property.json new file mode 100644 index 00000000..9e26dfee --- /dev/null +++ b/agents/addon/extension/aliyun_analyticdb_vector_storage/property.json @@ -0,0 +1 @@ +{} \ No newline at end of file diff --git a/agents/addon/extension/aliyun_analyticdb_vector_storage/requirements.txt b/agents/addon/extension/aliyun_analyticdb_vector_storage/requirements.txt new file mode 100644 index 00000000..fa0bed40 --- /dev/null +++ b/agents/addon/extension/aliyun_analyticdb_vector_storage/requirements.txt @@ -0,0 +1 @@ +alibabacloud_gpdb20160503 \ No newline at end of file diff --git a/agents/addon/extension/aliyun_analyticdb_vector_storage/vector_storage_addon.py b/agents/addon/extension/aliyun_analyticdb_vector_storage/vector_storage_addon.py new file mode 100644 index 00000000..6bb26538 --- /dev/null +++ b/agents/addon/extension/aliyun_analyticdb_vector_storage/vector_storage_addon.py @@ -0,0 +1,14 @@ +from rte import ( + Addon, + register_addon_as_extension, + RteEnv, +) +from .log import logger +from .vector_storage_extension import AliPGDBExtension + + +@register_addon_as_extension("aliyun_analyticdb_vector_storage") +class AliPGDBExtensionAddon(Addon): + def on_create_instance(self, rte: RteEnv, addon_name: str, context) -> None: + logger.info("on_create_instance") + rte.on_create_instance_done(AliPGDBExtension(addon_name), context) diff --git a/agents/addon/extension/aliyun_analyticdb_vector_storage/vector_storage_extension.py b/agents/addon/extension/aliyun_analyticdb_vector_storage/vector_storage_extension.py new file mode 100644 index 00000000..5cb53fe2 --- /dev/null +++ b/agents/addon/extension/aliyun_analyticdb_vector_storage/vector_storage_extension.py @@ -0,0 +1,226 @@ +# -*- coding: utf-8 -*- +# + +import asyncio +import os +import json +from .client import AliGPDBClient +from .model import Model +from rte import ( + Extension, + RteEnv, + Cmd, + Data, + StatusCode, + CmdResult, +) + +from typing import List +from .log import logger +import threading +from datetime import datetime + +from alibabacloud_gpdb20160503.client import Client as gpdb20160503Client +from alibabacloud_tea_openapi import models as open_api_models +from alibabacloud_gpdb20160503 import models as gpdb_20160503_models +from alibabacloud_tea_util import models as util_models +from alibabacloud_tea_util.client import Client as UtilClient + + +class AliPGDBExtension(Extension): + def __init__(self, name): + self.stopEvent = asyncio.Event() + self.thread = None + self.loop = None + self.access_key_id = os.environ.get("ALIBABA_CLOUD_ACCESS_KEY_ID") + self.access_key_secret = os.environ.get("ALIBABA_CLOUD_ACCESS_KEY_SECRET") + self.region_id = os.environ.get("ADBPG_INSTANCE_REGION") + self.dbinstance_id = os.environ.get("ADBPG_INSTANCE_ID") + self.endpoint = f"gpdb.aliyuncs.com" + self.client = None + self.account = os.environ.get("ADBPG_ACCOUNT") + self.account_password = os.environ.get("ADBPG_ACCOUNT_PASSWORD") + self.namespace = os.environ.get("ADBPG_NAMESPACE") + self.namespace_password = os.environ.get("ADBPG_NAMESPACE_PASSWORD") + + async def __thread_routine(self, rte_env: RteEnv): + logger.info("__thread_routine start") + self.loop = asyncio.get_running_loop() + rte_env.on_start_done() + await self.stopEvent.wait() + + async def stop_thread(self): + self.stopEvent.set() + + def on_start(self, rte: RteEnv) -> None: + logger.info(f"on_start") + self.access_key_id = self.get_property_string( + rte, "ALIBABA_CLOUD_ACCESS_KEY_ID", self.access_key_id + ) + self.access_key_secret = self.get_property_string( + rte, "ALIBABA_CLOUD_ACCESS_KEY_SECRET", self.access_key_secret + ) + self.region_id = self.get_property_string( + rte, "ADBPG_INSTANCE_REGION", self.region_id + ) + self.dbinstance_id = self.get_property_string( + rte, "ADBPG_INSTANCE_ID", self.dbinstance_id + ) + self.account = self.get_property_string(rte, "ADBPG_ACCOUNT", self.account) + self.account_password = self.get_property_string( + rte, "ADBPG_ACCOUNT_PASSWORD", self.account_password + ) + self.namespace = self.get_property_string( + rte, "ADBPG_NAMESPACE", self.namespace + ) + self.namespace_password = self.get_property_string( + rte, "ADBPG_NAMESPACE_PASSWORD", self.namespace_password + ) + + if self.region_id in ( + "cn-beijing", + "cn-hangzhou", + "cn-shanghai", + "cn-shenzhen", + "cn-hongkong", + "ap-southeast-1", + "cn-hangzhou-finance", + "cn-shanghai-finance-1", + "cn-shenzhen-finance-1", + "cn-beijing-finance-1", + ): + self.endpoint = "gpdb.aliyuncs.com" + else: + self.endpoint = f"gpdb.{self.region_id}.aliyuncs.com" + self.client = AliGPDBClient( + self.access_key_id, self.access_key_secret, self.endpoint + ) + self.thread = threading.Thread( + target=asyncio.run, args=(self.__thread_routine(rte),) + ) + + # Then 'on_start_done' will be called in the thread + self.thread.start() + return + + def on_stop(self, rte: RteEnv) -> None: + logger.info("on_stop") + if self.thread is not None and self.thread.is_alive(): + asyncio.run_coroutine_threadsafe(self.stop_thread(), self.loop) + self.thread.join() + self.thread = None + rte.on_stop_done() + return + + def on_data(self, rte: RteEnv, data: Data) -> None: + pass + + def on_cmd(self, rte: RteEnv, cmd: Cmd) -> None: + try: + cmd_name = cmd.get_name() + logger.info(f"on_cmd [{cmd_name}]") + if cmd_name == "create_collection": + asyncio.run_coroutine_threadsafe( + self.async_create_collection(rte, cmd), self.loop + ) + elif cmd_name == "delete_collection": + asyncio.run_coroutine_threadsafe( + self.async_delete_collection(rte, cmd), self.loop + ) + elif cmd_name == "upsert_vector": + asyncio.run_coroutine_threadsafe( + self.async_upsert_vector(rte, cmd), self.loop + ) + elif cmd_name == "query_vector": + asyncio.run_coroutine_threadsafe( + self.async_query_vector(rte, cmd), self.loop + ) + else: + rte.return_result(CmdResult.create(StatusCode.ERROR), cmd) + except Exception as e: + rte.return_result(CmdResult.create(StatusCode.ERROR), cmd) + + async def async_create_collection(self, rte: RteEnv, cmd: Cmd): + m = Model(self.region_id, self.dbinstance_id, self.client) + collection = cmd.get_property_string("collection_name") + dimension = 1024 + try: + dimension = cmd.get_property_int("dimension") + except Exception as e: + logger.warning(f"Error: {e}") + + err = await m.create_collection_async( + self.account, self.account_password, self.namespace, collection + ) + if err is None: + await m.create_vector_index_async( + self.account, + self.account_password, + self.namespace, + collection, + dimension, + ) + rte.return_result(CmdResult.create(StatusCode.OK), cmd) + else: + rte.return_result(CmdResult.create(StatusCode.ERROR), cmd) + + async def async_upsert_vector(self, rte: RteEnv, cmd: Cmd): + start_time = datetime.now() + m = Model(self.region_id, self.dbinstance_id, self.client) + collection = cmd.get_property_string("collection_name") + file = cmd.get_property_string("file_name") + content = cmd.get_property_string("content") + obj = json.loads(content) + rows = [(file, item["text"], item["embedding"]) for item in obj] + + err = await m.upsert_collection_data_async( + collection, self.namespace, self.namespace_password, rows + ) + logger.info( + "upsert_vector finished for file {}, collection {}, rows len {}, err {}, cost {}ms".format( + file, + collection, + len(rows), + err, + int((datetime.now() - start_time).total_seconds() * 1000), + ) + ) + if err is None: + rte.return_result(CmdResult.create(StatusCode.OK), cmd) + else: + rte.return_result(CmdResult.create(StatusCode.ERROR), cmd) + + async def async_query_vector(self, rte: RteEnv, cmd: Cmd): + m = Model(self.region_id, self.dbinstance_id, self.client) + collection = cmd.get_property_string("collection_name") + embedding = cmd.get_property_to_json("embedding") + top_k = cmd.get_property_int("top_k") + vector = json.loads(embedding) + response, error = await m.query_collection_data_async( + collection, self.namespace, self.namespace_password, vector, top_k=top_k + ) + if error: + return rte.return_result(CmdResult.create(StatusCode.ERROR), cmd) + else: + body = m.parse_collection_data(response.body) + ret = CmdResult.create(StatusCode.OK) + ret.set_property_from_json("response", body) + rte.return_result(ret, cmd) + + async def async_delete_collection(self, rte: RteEnv, cmd: Cmd): + m = Model(self.region_id, self.dbinstance_id, self.client) + collection = cmd.get_property_string("collection_name") + err = await m.delete_collection_async( + self.account, self.account_password, self.namespace, collection + ) + if err is None: + return rte.return_result(CmdResult.create(StatusCode.OK), cmd) + else: + return rte.return_result(CmdResult.create(StatusCode.ERROR), cmd) + + def get_property_string(self, rte: RteEnv, key: str, default: str) -> str: + try: + return rte.get_property_string(key.lower()) + except Exception as e: + logger.error(f"Error: {e}") + return default diff --git a/agents/addon/extension/aliyun_text_embedding/__init__.py b/agents/addon/extension/aliyun_text_embedding/__init__.py new file mode 100644 index 00000000..b06c946a --- /dev/null +++ b/agents/addon/extension/aliyun_text_embedding/__init__.py @@ -0,0 +1,4 @@ +from . import embedding_addon +from .log import logger + +logger.info("aliyun_text_embedding extension loaded") diff --git a/agents/addon/extension/aliyun_text_embedding/embedding_addon.py b/agents/addon/extension/aliyun_text_embedding/embedding_addon.py new file mode 100644 index 00000000..e149bbf6 --- /dev/null +++ b/agents/addon/extension/aliyun_text_embedding/embedding_addon.py @@ -0,0 +1,14 @@ +from rte import ( + Addon, + register_addon_as_extension, + RteEnv, +) +from .log import logger +from .embedding_extension import EmbeddingExtension + + +@register_addon_as_extension("aliyun_text_embedding") +class EmbeddingExtensionAddon(Addon): + def on_create_instance(self, rte: RteEnv, addon_name: str, context) -> None: + logger.info("on_create_instance") + rte.on_create_instance_done(EmbeddingExtension(addon_name), context) diff --git a/agents/addon/extension/aliyun_text_embedding/embedding_extension.py b/agents/addon/extension/aliyun_text_embedding/embedding_extension.py new file mode 100644 index 00000000..284a7918 --- /dev/null +++ b/agents/addon/extension/aliyun_text_embedding/embedding_extension.py @@ -0,0 +1,182 @@ +from rte import ( + Extension, + RteEnv, + Cmd, + StatusCode, + CmdResult, +) + +import dashscope +import json +from typing import Generator, List +from http import HTTPStatus +from .log import logger +import threading, queue +from datetime import datetime + +CMD_EMBED = "embed" +CMD_EMBED_BATCH = "embed_batch" + +FIELD_KEY_EMBEDDING = "embedding" +FIELD_KEY_EMBEDDINGS = "embeddings" +FIELD_KEY_MESSAGE = "message" +FIELD_KEY_CODE = "code" + +DASHSCOPE_MAX_BATCH_SIZE = 6 + + +class EmbeddingExtension(Extension): + def __init__(self, name: str): + super().__init__(name) + self.api_key = "" + self.model = "" + + self.stop = False + self.queue = queue.Queue() + self.threads = [] + + # TODO: workaround to speed up the embedding process, + # should be replace by https://help.aliyun.com/zh/model-studio/developer-reference/text-embedding-batch-api?spm=a2c4g.11186623.0.0.24cb7453KSjdhC + # once v3 models supported + self.parallel = 10 + + def on_start(self, rte: RteEnv) -> None: + logger.info("on_start") + self.api_key = self.get_property_string(rte, "api_key", self.api_key) + self.model = self.get_property_string(rte, "model", self.api_key) + + dashscope.api_key = self.api_key + + for i in range(self.parallel): + thread = threading.Thread(target=self.async_handler, args=[i, rte]) + thread.start() + self.threads.append(thread) + + rte.on_start_done() + + def async_handler(self, index: int, rte: RteEnv): + logger.info("async_handler {} started".format(index)) + + while not self.stop: + cmd = self.queue.get() + if cmd is None: + break + + cmd_name = cmd.get_name() + start_time = datetime.now() + logger.info( + "async_handler {} processing cmd {}".format(index, cmd_name)) + + if cmd_name == CMD_EMBED: + cmd_result = self.call_with_str(cmd.get_property_string("input")) + rte.return_result(cmd_result, cmd) + elif cmd_name == CMD_EMBED_BATCH: + list = json.loads(cmd.get_property_to_json("inputs")) + cmd_result = self.call_with_strs(list) + rte.return_result(cmd_result, cmd) + else: + logger.warning("unknown cmd {}".format(cmd_name)) + + logger.info( + "async_handler {} finished processing cmd {}, cost {}ms".format(index, cmd_name, int((datetime.now() - start_time).total_seconds() * 1000))) + + logger.info("async_handler {} stopped".format(index)) + + def call_with_str(self, message: str) -> CmdResult: + start_time = datetime.now() + response = dashscope.TextEmbedding.call(model=self.model, input=message) + logger.info("embedding call finished for input [{}], status_code {}, cost {}ms".format(message, response.status_code, int((datetime.now() - start_time).total_seconds() * 1000))) + + if response.status_code == HTTPStatus.OK: + cmd_result = CmdResult.create(StatusCode.OK) + cmd_result.set_property_from_json(FIELD_KEY_EMBEDDING, json.dumps(response.output["embeddings"][0]["embedding"])) + return cmd_result + else: + cmd_result = CmdResult.create(StatusCode.ERROR) + cmd_result.set_property_string(FIELD_KEY_CODE, response.status_code) + cmd_result.set_property_string(FIELD_KEY_MESSAGE, response.message) + return cmd_result + + def batched( + self, inputs: List, batch_size: int = DASHSCOPE_MAX_BATCH_SIZE + ) -> Generator[List, None, None]: + for i in range(0, len(inputs), batch_size): + yield inputs[i : i + batch_size] + + def call_with_strs(self, messages: List[str]) -> CmdResult: + start_time = datetime.now() + result = None # merge the results. + batch_counter = 0 + for batch in self.batched(messages): + response = dashscope.TextEmbedding.call(model=self.model, input=batch) + # logger.info("%s Received %s", batch, response) + if response.status_code == HTTPStatus.OK: + if result is None: + result = response.output + else: + for emb in response.output["embeddings"]: + emb["text_index"] += batch_counter + result["embeddings"].append(emb) + else: + logger.error("call %s failed, errmsg: %s", batch, response) + batch_counter += len(batch) + + logger.info("embedding call finished for inputs len {}, batch_counter {}, results len {}, cost {}ms ".format(len(messages), batch_counter, len(result["embeddings"]), int((datetime.now() - start_time).total_seconds() * 1000))) + if result is not None: + cmd_result = CmdResult.create(StatusCode.OK) + + # TODO: too slow `set_property_to_json`, so use `set_property_string` at the moment as workaround + # will be replaced once `set_property_to_json` improved + cmd_result.set_property_string(FIELD_KEY_EMBEDDINGS, json.dumps(result["embeddings"])) + return cmd_result + else: + cmd_result = CmdResult.create(StatusCode.ERROR) + cmd_result.set_property_string(FIELD_KEY_MESSAGE, "All batch failed") + logger.error("All batch failed") + return cmd_result + + def on_stop(self, rte: RteEnv) -> None: + logger.info("on_stop") + self.stop = True + # clear queue + while not self.queue.empty(): + self.queue.get() + # put enough None to stop all threads + for thread in self.threads: + self.queue.put(None) + for thread in self.threads: + thread.join() + self.threads = [] + + rte.on_stop_done() + + def on_cmd(self, rte: RteEnv, cmd: Cmd) -> None: + cmd_name = cmd.get_name() + + if cmd_name in [CMD_EMBED, CMD_EMBED_BATCH]: + """ + // embed + { + "name": "embed", + "input": "hello" + } + + // embed_batch + { + "name": "embed_batch", + "inputs": ["hello", ...] + } + """ + + self.queue.put(cmd) + else: + logger.warning("unknown cmd {}".format(cmd_name)) + cmd_result = CmdResult.create(StatusCode.ERROR) + rte.return_result(cmd_result, cmd) + + def get_property_string(self, rte: RteEnv, key, default): + try: + return rte.get_property_string(key) + except Exception as e: + logger.warning(f"err: {e}") + return default diff --git a/agents/addon/extension/aliyun_text_embedding/log.py b/agents/addon/extension/aliyun_text_embedding/log.py new file mode 100644 index 00000000..2e90975a --- /dev/null +++ b/agents/addon/extension/aliyun_text_embedding/log.py @@ -0,0 +1,13 @@ +import logging + +logger = logging.getLogger("aliyun_text_embedding") +logger.setLevel(logging.INFO) + +formatter = logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(process)d - [%(filename)s:%(lineno)d] - %(message)s" +) + +console_handler = logging.StreamHandler() +console_handler.setFormatter(formatter) + +logger.addHandler(console_handler) diff --git a/agents/addon/extension/aliyun_text_embedding/manifest.json b/agents/addon/extension/aliyun_text_embedding/manifest.json new file mode 100644 index 00000000..2fd19099 --- /dev/null +++ b/agents/addon/extension/aliyun_text_embedding/manifest.json @@ -0,0 +1,79 @@ +{ + "type": "extension", + "name": "aliyun_text_embedding", + "version": "0.4.0", + "language": "python", + "dependencies": [ + { + "type": "system", + "name": "rte_runtime_python", + "version": "0.4" + } + ], + "api": { + "property": { + "api_key": { + "type": "string" + }, + "model": { + "type": "string" + } + }, + "cmd_in": [ + { + "name": "embed", + "property": { + "input": { + "type": "string" + } + }, + "required": [ + "input" + ], + "result": { + "property": { + "embedding": { + "type": "array", + "items": { + "type": "float64" + } + }, + "code": { + "type": "string" + }, + "message": { + "type": "string" + } + } + } + }, + { + "name": "embed_batch", + "property": { + "inputs": { + "type": "array", + "items": { + "type": "string" + } + } + }, + "required": [ + "inputs" + ], + "result": { + "property": { + "embeddings": { + "type": "string" + }, + "code": { + "type": "string" + }, + "message": { + "type": "string" + } + } + } + } + ] + } +} \ No newline at end of file diff --git a/agents/addon/extension/aliyun_text_embedding/property.json b/agents/addon/extension/aliyun_text_embedding/property.json new file mode 100644 index 00000000..9e26dfee --- /dev/null +++ b/agents/addon/extension/aliyun_text_embedding/property.json @@ -0,0 +1 @@ +{} \ No newline at end of file diff --git a/agents/addon/extension/aliyun_text_embedding/requirements.txt b/agents/addon/extension/aliyun_text_embedding/requirements.txt new file mode 100644 index 00000000..5899464f --- /dev/null +++ b/agents/addon/extension/aliyun_text_embedding/requirements.txt @@ -0,0 +1 @@ +dashscope \ No newline at end of file diff --git a/agents/addon/extension/cosy_tts/cosy_tts_extension.py b/agents/addon/extension/cosy_tts/cosy_tts_extension.py index 368d10ea..cd57d406 100644 --- a/agents/addon/extension/cosy_tts/cosy_tts_extension.py +++ b/agents/addon/extension/cosy_tts/cosy_tts_extension.py @@ -15,7 +15,6 @@ Data, StatusCode, CmdResult, - MetadataInfo, ) from typing import List, Any import dashscope @@ -27,17 +26,21 @@ class CosyTTSCallback(ResultCallback): - _player = None - _stream = None - - def __init__(self, rte: RteEnv, sample_rate: int): + def __init__(self, rte: RteEnv, sample_rate: int, need_interrupt_callback): super().__init__() self.rte = rte self.sample_rate = sample_rate self.frame_size = int(self.sample_rate * 1 * 2 / 100) - self.canceled = False + self.ts = datetime.now() + self.need_interrupt_callback = need_interrupt_callback self.closed = False + def need_interrupt(self) -> bool: + return self.need_interrupt_callback(self.ts) + + def set_input_ts(self, ts: datetime): + self.ts = ts + def on_open(self): logger.info("websocket is open.") @@ -71,11 +74,8 @@ def get_frame(self, data: bytes) -> PcmFrame: f.unlock_buf(buff) return f - def cancel(self) -> None: - self.canceled = True - def on_data(self, data: bytes) -> None: - if self.canceled: + if self.need_interrupt(): return # logger.info("audio result length: %d, %d", len(data), self.frame_size) @@ -83,13 +83,13 @@ def on_data(self, data: bytes) -> None: chunk = int(len(data) / self.frame_size) offset = 0 for i in range(0, chunk): - if self.canceled: + if self.need_interrupt(): return f = self.get_frame(data[offset : offset + self.frame_size]) self.rte.send_pcm_frame(f) offset += self.frame_size - if self.canceled: + if self.need_interrupt(): return if offset < len(data): size = len(data) - offset @@ -109,15 +109,15 @@ def __init__(self, name: str): self.tts = None self.callback = None self.format = None - self.outdateTs = datetime.now() + + self.outdate_ts = datetime.now() self.stopped = False self.thread = None self.queue = queue.Queue() - self.mutex = threading.Lock() def on_start(self, rte: RteEnv) -> None: - logger.info("CosyTTSExtension on_start") + logger.info("on_start") self.api_key = rte.get_property_string("api_key") self.voice = rte.get_property_string("voice") self.model = rte.get_property_string("model") @@ -138,7 +138,7 @@ def on_start(self, rte: RteEnv) -> None: elif self.sample_rate == 48000: f = AudioFormat.PCM_48000HZ_MONO_16BIT else: - logger.info("unknown sample rate %d", self.sample_rate) + logger.error("unknown sample rate %d", self.sample_rate) exit() self.format = f @@ -148,16 +148,18 @@ def on_start(self, rte: RteEnv) -> None: rte.on_start_done() def on_stop(self, rte: RteEnv) -> None: - logger.info("CosyTTSExtension on_stop") + logger.info("on_stop") self.stopped = True - self.queue.put(None) self.flush() - self.thread.join() + self.queue.put(None) + if self.thread is not None: + self.thread.join() + self.thread = None rte.on_stop_done() def need_interrupt(self, ts: datetime.time) -> bool: - return self.outdateTs > ts and (self.outdateTs - ts).total_seconds() > 1 + return self.outdate_ts > ts def async_handle(self, rte: RteEnv): try: @@ -168,29 +170,33 @@ def async_handle(self, rte: RteEnv): value = self.queue.get() if value is None: break - inputText, ts = value - if len(inputText) == 0: - logger.warning("empty input for interrupt") - if tts is not None: - try: - tts.streaming_cancel() - except Exception as e: - logger.exception(e) - if callback is not None: - callback.cancel() + input_text, ts, end_of_segment = value + + # clear tts if old one is closed already + if callback is not None and callback.closed is True: + tts = None + callback = None + + # cancel last streaming call to avoid unprocessed audio coming back + if ( + callback is not None + and tts is not None + and callback.need_interrupt() + ): + tts.streaming_cancel() tts = None callback = None - continue if self.need_interrupt(ts): + logger.info("drop outdated input") continue - if callback is not None and callback.closed is True: - tts = None - - if tts is None: + # create new tts if needed + if tts is None or callback is None: logger.info("creating tts") - callback = CosyTTSCallback(rte, self.sample_rate) + callback = CosyTTSCallback( + rte, self.sample_rate, self.need_interrupt + ) tts = SpeechSynthesizer( model=self.model, voice=self.voice, @@ -198,49 +204,57 @@ def async_handle(self, rte: RteEnv): callback=callback, ) - logger.info("on message [%s]", inputText) - tts.streaming_call(inputText) - tts.streaming_complete() + logger.info( + "on message [{}] ts [{}] end_of_segment [{}]".format( + input_text, ts, end_of_segment + ) + ) + + # make sure new data won't be marked as outdated + callback.set_input_ts(ts) + + if len(input_text) > 0: + # last segment may have empty text but is_end is true + tts.streaming_call(input_text) + + # complete the streaming call to drain remained audio if end_of_segment is true + if end_of_segment: + try: + tts.streaming_complete() + except Exception as e: + logger.warning(e) + tts = None + callback = None except Exception as e: logger.exception(e) logger.exception(traceback.format_exc()) finally: if tts is not None: - tts.streaming_complete() + tts.streaming_cancel() + tts = None + callback = None def flush(self): - logger.info("CosyTTSExtension flush") while not self.queue.empty(): self.queue.get() - self.queue.put(("", datetime.now())) def on_data(self, rte: RteEnv, data: Data) -> None: - logger.info("CosyTTSExtension on_data") inputText = data.get_property_string("text") - if len(inputText) == 0: - logger.info("ignore empty text") - return - - is_end = data.get_property_bool("end_of_segment") + end_of_segment = data.get_property_bool("end_of_segment") - logger.info("on data %s %d", inputText, is_end) - self.queue.put((inputText, datetime.now())) + logger.info("on data {} {}".format(inputText, end_of_segment)) + self.queue.put((inputText, datetime.now(), end_of_segment)) def on_cmd(self, rte: RteEnv, cmd: Cmd) -> None: - logger.info("CosyTTSExtension on_cmd") - cmd_json = cmd.to_json() - logger.info("CosyTTSExtension on_cmd json: %s" + cmd_json) - - cmdName = cmd.get_name() - if cmdName == "flush": - self.outdateTs = datetime.now() + cmd_name = cmd.get_name() + logger.info("on_cmd {}".format(cmd_name)) + if cmd_name == "flush": + self.outdate_ts = datetime.now() self.flush() cmd_out = Cmd.create("flush") - rte.send_cmd( - cmd_out, lambda rte, result: print("DefaultExtension send_cmd done") - ) + rte.send_cmd(cmd_out, lambda rte, result: print("send_cmd flush done")) else: - logger.info("unknown cmd %s", cmdName) + logger.info("unknown cmd {}".format(cmd_name)) cmd_result = CmdResult.create(StatusCode.OK) cmd_result.set_property_string("detail", "success") diff --git a/agents/addon/extension/file_chunker/__init__.py b/agents/addon/extension/file_chunker/__init__.py new file mode 100644 index 00000000..06d98178 --- /dev/null +++ b/agents/addon/extension/file_chunker/__init__.py @@ -0,0 +1,4 @@ +from . import file_chunker_addon +from .log import logger + +logger.info("file_chunker extension loaded") diff --git a/agents/addon/extension/file_chunker/file_chunker_addon.py b/agents/addon/extension/file_chunker/file_chunker_addon.py new file mode 100644 index 00000000..b5daf556 --- /dev/null +++ b/agents/addon/extension/file_chunker/file_chunker_addon.py @@ -0,0 +1,14 @@ +from rte import ( + Addon, + register_addon_as_extension, + RteEnv, +) +from .log import logger +from .file_chunker_extension import FileChunkerExtension + + +@register_addon_as_extension("file_chunker") +class FileChunkerExtensionAddon(Addon): + def on_create_instance(self, rte: RteEnv, addon_name: str, context) -> None: + logger.info("on_create_instance") + rte.on_create_instance_done(FileChunkerExtension(addon_name), context) diff --git a/agents/addon/extension/file_chunker/file_chunker_extension.py b/agents/addon/extension/file_chunker/file_chunker_extension.py new file mode 100644 index 00000000..d1fb4e02 --- /dev/null +++ b/agents/addon/extension/file_chunker/file_chunker_extension.py @@ -0,0 +1,232 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2024-05. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +from rte import ( + Extension, + RteEnv, + Cmd, + StatusCode, + CmdResult, +) +from typing import List, Any +from .log import logger +from llama_index.core import SimpleDirectoryReader +from llama_index.core.node_parser import SentenceSplitter +import json +from datetime import datetime +import uuid, math +import queue, threading + +CMD_FILE_DOWNLOADED = "file_downloaded" +UPSERT_VECTOR_CMD = "upsert_vector" +FILE_CHUNKED_CMD = "file_chunked" + +# TODO: configable +CHUNK_SIZE = 200 +CHUNK_OVERLAP = 20 +BATCH_SIZE = 5 + + +def batch(nodes, size): + batch_texts = [] + for n in nodes: + batch_texts.append(n.text) + if len(batch_texts) == size: + yield batch_texts[:] + batch_texts.clear() + if batch_texts: + yield batch_texts + + +class FileChunkerExtension(Extension): + def __init__(self, name: str): + super().__init__(name) + + self.counters = {} + self.expected = {} + self.new_collection_name = "" + self.file_chunked_event = threading.Event() + + self.thread = None + self.queue = queue.Queue() + self.stop = False + + def generate_collection_name(self) -> str: + """ + follow rules: ^[a-z]+[a-z0-9_]* + """ + + return "coll_" + uuid.uuid1().hex.lower() + + def split(self, path: str) -> List[Any]: + + # load pdf file by path + documents = SimpleDirectoryReader( + input_files=[path], filename_as_id=True + ).load_data() + + # split pdf file into chunks + splitter = SentenceSplitter( + chunk_size=CHUNK_SIZE, + chunk_overlap=CHUNK_OVERLAP, + ) + nodes = splitter.get_nodes_from_documents(documents) + logger.info( + "file {} pages count {}, chunking count {}".format( + path, len(documents), len(nodes) + ) + ) + return nodes + + def create_collection(self, rte: RteEnv, collection_name: str, wait: bool): + cmd_out = Cmd.create("create_collection") + cmd_out.set_property_string("collection_name", collection_name) + + wait_event = threading.Event() + rte.send_cmd( + cmd_out, + lambda rte, result: wait_event.set(), + ) + if wait: + wait_event.wait() + + def embedding(self, rte: RteEnv, path: str, texts: List[str]): + logger.info( + "generate embeddings for the file: {}, with batch size: {}".format( + path, len(texts) + ) + ) + + cmd_out = Cmd.create("embed_batch") + cmd_out.set_property_from_json("inputs", json.dumps(texts)) + rte.send_cmd( + cmd_out, + lambda rte, result: self.vector_store( + rte, path, texts, result + ), # TODO: deal with error + ) + + def vector_store(self, rte: RteEnv, path: str, texts: List[str], result: CmdResult): + logger.info("vector store start for one splitting of the file {}".format(path)) + file_name = path.split("/")[-1] + embed_output_json = result.get_property_string("embeddings") + embed_output = json.loads(embed_output_json) + cmd_out = Cmd.create(UPSERT_VECTOR_CMD) + cmd_out.set_property_string("collection_name", self.new_collection_name) + cmd_out.set_property_string("file_name", file_name) + embeddings = [record["embedding"] for record in embed_output] + content = [] + for text, embedding in zip(texts, embeddings): + content.append({"text": text, "embedding": embedding}) + cmd_out.set_property_string("content", json.dumps(content)) + # logger.info(json.dumps(content)) + rte.send_cmd(cmd_out, lambda rte, result: self.file_chunked(rte, path)) + + def file_chunked(self, rte: RteEnv, path: str): + if path in self.counters and path in self.expected: + self.counters[path] += 1 + logger.info( + "complete vector store for one splitting of the file: %s, current counter: %i, expected: %i", + path, + self.counters[path], + self.expected[path], + ) + if self.counters[path] == self.expected[path]: + chunks_count = self.counters[path] + del self.counters[path] + del self.expected[path] + logger.info( + "complete chunk for the file: {}, chunks_count {}".format( + path, + chunks_count, + ) + ) + cmd_out = Cmd.create(FILE_CHUNKED_CMD) + cmd_out.set_property_string("path", path) + cmd_out.set_property_string("collection_name", self.new_collection_name) + rte.send_cmd( + cmd_out, + lambda rte, result: logger.info("send_cmd done"), + ) + self.file_chunked_event.set() + else: + logger.error("missing counter for the file path: %s", path) + + def on_cmd(self, rte: RteEnv, cmd: Cmd) -> None: + cmd_name = cmd.get_name() + if cmd_name == CMD_FILE_DOWNLOADED: + path = cmd.get_property_string("path") + self.queue.put(path) # make sure files are processed in order + else: + logger.info("unknown cmd {}".format(cmd_name)) + + cmd_result = CmdResult.create(StatusCode.OK) + cmd_result.set_property_string("detail", "ok") + rte.return_result(cmd_result, cmd) + + def async_handler(self, rte: RteEnv) -> None: + while not self.stop: + path = self.queue.get() + if path is None: + break + + # start processing the file + start_time = datetime.now() + collection_name = self.generate_collection_name() + logger.info( + "start processing {}, collection_name {}".format(path, collection_name) + ) + + # create collection + self.create_collection(rte, collection_name, True) + logger.info("collection_name {} created".format(collection_name)) + + # split + nodes = self.split(path) + + # reset counters and events + self.new_collection_name = collection_name + self.expected[path] = math.ceil(len(nodes) / BATCH_SIZE) + self.counters[path] = 0 + self.file_chunked_event.clear() + + # trigger embedding and vector storing in parallel + for texts in list(batch(nodes, BATCH_SIZE)): + self.embedding(rte, path, texts) + + # wait for all chunks to be processed + self.file_chunked_event.wait() + + logger.info( + "finished processing {}, collection_name {}, cost {}ms".format( + path, + collection_name, + int((datetime.now() - start_time).total_seconds() * 1000), + ) + ) + + def on_start(self, rte: RteEnv) -> None: + logger.info("on_start") + + self.stop = False + self.thread = threading.Thread(target=self.async_handler, args=[rte]) + self.thread.start() + + rte.on_start_done() + + def on_stop(self, rte: RteEnv) -> None: + logger.info("on_stop") + + self.stop = True + if self.thread is not None: + while not self.queue.empty(): + self.queue.get() + self.queue.put(None) + self.thread.join() + self.thread = None + + rte.on_stop_done() diff --git a/agents/addon/extension/file_chunker/log.py b/agents/addon/extension/file_chunker/log.py new file mode 100644 index 00000000..d9cb27d3 --- /dev/null +++ b/agents/addon/extension/file_chunker/log.py @@ -0,0 +1,13 @@ +import logging + +logger = logging.getLogger("file_chunker") +logger.setLevel(logging.INFO) + +formatter = logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(process)d - [%(filename)s:%(lineno)d] - %(message)s" +) + +console_handler = logging.StreamHandler() +console_handler.setFormatter(formatter) + +logger.addHandler(console_handler) diff --git a/agents/addon/extension/file_chunker/manifest.json b/agents/addon/extension/file_chunker/manifest.json new file mode 100644 index 00000000..248114db --- /dev/null +++ b/agents/addon/extension/file_chunker/manifest.json @@ -0,0 +1,100 @@ +{ + "type": "extension", + "name": "file_chunker", + "version": "0.4.0", + "language": "python", + "dependencies": [ + { + "type": "system", + "name": "rte_runtime_python", + "version": "0.4" + } + ], + "api": { + "property": {}, + "cmd_in": [ + { + "name": "file_downloaded", + "property": { + "path": { + "type": "string" + } + }, + "required": [ + "path" + ] + } + ], + "cmd_out": [ + { + "name": "embed_batch", + "property": { + "inputs": { + "type": "array", + "items": { + "type": "string" + } + } + }, + "required": [ + "inputs" + ], + "result": { + "property": { + "embeddings": { + "type": "string" + } + } + } + }, + { + "name": "upsert_vector", + "property": { + "collection_name": { + "type": "string" + }, + "file_name": { + "type": "string" + }, + "content": { + "type": "string" + } + }, + "required": [ + "collection_name", + "file_name", + "content" + ] + }, + { + "name": "create_collection", + "property": { + "collection_name": { + "type": "string" + }, + "dimension": { + "type": "int32" + } + }, + "required": [ + "collection_name" + ] + }, + { + "name": "file_chunked", + "property": { + "path": { + "type": "string" + }, + "collection_name": { + "type": "string" + } + }, + "required": [ + "path", + "collection_name" + ] + } + ] + } +} \ No newline at end of file diff --git a/agents/addon/extension/file_chunker/property.json b/agents/addon/extension/file_chunker/property.json new file mode 100644 index 00000000..9e26dfee --- /dev/null +++ b/agents/addon/extension/file_chunker/property.json @@ -0,0 +1 @@ +{} \ No newline at end of file diff --git a/agents/addon/extension/file_chunker/requirements.txt b/agents/addon/extension/file_chunker/requirements.txt new file mode 100644 index 00000000..b5a22c29 --- /dev/null +++ b/agents/addon/extension/file_chunker/requirements.txt @@ -0,0 +1,2 @@ +pypdf +llama-index \ No newline at end of file diff --git a/agents/addon/extension/http_server_python/__init__.py b/agents/addon/extension/http_server_python/__init__.py new file mode 100644 index 00000000..0bcc331c --- /dev/null +++ b/agents/addon/extension/http_server_python/__init__.py @@ -0,0 +1,4 @@ +from . import http_server_addon +from .log import logger + +logger.info("http_server_python extension loaded") diff --git a/agents/addon/extension/http_server_python/http_server_addon.py b/agents/addon/extension/http_server_python/http_server_addon.py new file mode 100644 index 00000000..b1884e81 --- /dev/null +++ b/agents/addon/extension/http_server_python/http_server_addon.py @@ -0,0 +1,14 @@ +from rte import ( + Addon, + register_addon_as_extension, + RteEnv, +) +from .log import logger +from .http_server_extension import HTTPServerExtension + + +@register_addon_as_extension("http_server_python") +class HTTPServerExtensionAddon(Addon): + def on_create_instance(self, rte: RteEnv, addon_name: str, context): + logger.info("on_create_instance") + rte.on_create_instance_done(HTTPServerExtension(addon_name), context) diff --git a/agents/addon/extension/http_server_python/http_server_extension.py b/agents/addon/extension/http_server_python/http_server_extension.py new file mode 100644 index 00000000..1092231e --- /dev/null +++ b/agents/addon/extension/http_server_python/http_server_extension.py @@ -0,0 +1,89 @@ +from rte import ( + Extension, + RteEnv, + Cmd, + StatusCode, + CmdResult, +) +from .log import logger +from http.server import HTTPServer, BaseHTTPRequestHandler +import threading +from functools import partial + + +class HTTPHandler(BaseHTTPRequestHandler): + def __init__(self, rte, *args, directory=None, **kwargs): + logger.info("new handler: %s %s %s", directory, args, kwargs) + self.rte = rte + super().__init__(*args, **kwargs) + + def do_POST(self): + logger.info("post request incoming %s", self.path) + if self.path == "/cmd": + try: + content_length = int(self.headers["Content-Length"]) + input = self.rfile.read(content_length).decode("utf-8") + logger.info("incoming request %s", input) + self.rte.send_cmd( + Cmd.create_from_json(input), + lambda rte, result: logger.info( + "finish send_cmd from http server %s %s", input, result + ), + ) + self.send_response_only(200) + self.end_headers() + except Exception as e: + logger.warning("failed to handle request, err {}".format(e)) + self.send_response_only(500) + self.end_headers() + else: + logger.warning("invalid path: %s", self.path) + self.send_response_only(404) + self.end_headers() + + +class HTTPServerExtension(Extension): + def __init__(self, name: str): + super().__init__(name) + self.listen_addr = "127.0.0.1" + self.listen_port = 8888 + self.cmd_white_list = None + self.server = None + self.thread = None + + def on_start(self, rte: RteEnv): + self.listen_addr = rte.get_property_string("listen_addr") + self.listen_port = rte.get_property_int("listen_port") + """ + white_list = rte.get_property_string("cmd_white_list") + if len(white_list) > 0: + self.cmd_white_list = white_list.split(",") + """ + + logger.info( + "HTTPServerExtension on_start %s:%d, %s", + self.listen_addr, + self.listen_port, + self.cmd_white_list, + ) + + self.server = HTTPServer( + (self.listen_addr, self.listen_port), partial(HTTPHandler, rte) + ) + self.thread = threading.Thread(target=self.server.serve_forever) + self.thread.start() + + rte.on_start_done() + + def on_stop(self, rte: RteEnv): + logger.info("on_stop") + self.server.shutdown() + self.thread.join() + rte.on_stop_done() + + def on_cmd(self, rte: RteEnv, cmd: Cmd): + cmd_json = cmd.to_json() + logger.info("on_cmd json: " + cmd_json) + cmd_result = CmdResult.create(StatusCode.OK) + cmd_result.set_property_string("detail", "ok") + rte.return_result(cmd_result, cmd) diff --git a/agents/addon/extension/http_server_python/log.py b/agents/addon/extension/http_server_python/log.py new file mode 100644 index 00000000..a7b47d79 --- /dev/null +++ b/agents/addon/extension/http_server_python/log.py @@ -0,0 +1,13 @@ +import logging + +logger = logging.getLogger("http_server_python") +logger.setLevel(logging.INFO) + +formatter = logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(process)d - [%(filename)s:%(lineno)d] - %(message)s" +) + +console_handler = logging.StreamHandler() +console_handler.setFormatter(formatter) + +logger.addHandler(console_handler) diff --git a/agents/addon/extension/http_server_python/manifest.json b/agents/addon/extension/http_server_python/manifest.json new file mode 100644 index 00000000..bfc16037 --- /dev/null +++ b/agents/addon/extension/http_server_python/manifest.json @@ -0,0 +1,30 @@ +{ + "type": "extension", + "name": "http_server_python", + "version": "0.4.0", + "language": "python", + "dependencies": [ + { + "type": "system", + "name": "rte_runtime_python", + "version": "0.4" + } + ], + "publish": { + "include": [ + "manifest.json", + "property.json", + "**.py" + ] + }, + "api": { + "property": { + "listen_addr": { + "type": "string" + }, + "listen_port": { + "type": "int32" + } + } + } +} \ No newline at end of file diff --git a/agents/addon/extension/http_server_python/property.json b/agents/addon/extension/http_server_python/property.json new file mode 100644 index 00000000..27ae7c57 --- /dev/null +++ b/agents/addon/extension/http_server_python/property.json @@ -0,0 +1,4 @@ +{ + "listen_addr": "0.0.0.0", + "listen_port": 8888 +} \ No newline at end of file diff --git a/agents/addon/extension/llama_index_chat_engine/__init__.py b/agents/addon/extension/llama_index_chat_engine/__init__.py new file mode 100644 index 00000000..55408286 --- /dev/null +++ b/agents/addon/extension/llama_index_chat_engine/__init__.py @@ -0,0 +1,4 @@ +from . import addon +from .log import logger + +logger.info("llama_index_chat_engine extension loaded") diff --git a/agents/addon/extension/llama_index_chat_engine/addon.py b/agents/addon/extension/llama_index_chat_engine/addon.py new file mode 100644 index 00000000..03586528 --- /dev/null +++ b/agents/addon/extension/llama_index_chat_engine/addon.py @@ -0,0 +1,10 @@ +from rte import Addon, register_addon_as_extension, RteEnv +from .extension import LlamaIndexExtension +from .log import logger + + +@register_addon_as_extension("llama_index_chat_engine") +class LlamaIndexExtensionAddon(Addon): + def on_create_instance(self, rte: RteEnv, addon_name: str, context) -> None: + logger.info("on_create_instance") + rte.on_create_instance_done(LlamaIndexExtension(addon_name), context) diff --git a/agents/addon/extension/llama_index_chat_engine/astra_embedding.py b/agents/addon/extension/llama_index_chat_engine/astra_embedding.py new file mode 100644 index 00000000..fb96da77 --- /dev/null +++ b/agents/addon/extension/llama_index_chat_engine/astra_embedding.py @@ -0,0 +1,65 @@ +from typing import Any, List +import threading +from llama_index.core.embeddings import BaseEmbedding +from .log import logger +import json +from rte import ( + Cmd, + CmdResult, +) + +EMBED_CMD = "embed" + + +def embed_from_resp(cmd_result: CmdResult) -> List[float]: + embedding_output_json = cmd_result.get_property_to_json("embedding") + return json.loads(embedding_output_json) + + +class ASTRAEmbedding(BaseEmbedding): + rte: Any + + def __init__(self, rte): + """Creates a new ASTRA embedding interface.""" + super().__init__() + self.rte = rte + + @classmethod + def class_name(cls) -> str: + return "astra_embedding" + + async def _aget_query_embedding(self, query: str) -> List[float]: + return self._get_query_embedding(query) + + async def _aget_text_embedding(self, text: str) -> List[float]: + return self._get_text_embedding(text) + + def _get_query_embedding(self, query: str) -> List[float]: + logger.info( + "ASTRAEmbedding generate embeddings for the query: {}".format(query) + ) + wait_event = threading.Event() + resp: List[float] + + def callback(_, result): + nonlocal resp + nonlocal wait_event + + logger.debug("ASTRAEmbedding embedding received") + resp = embed_from_resp(result) + wait_event.set() + + cmd_out = Cmd.create(EMBED_CMD) + cmd_out.set_property_string("input", query) + + self.rte.send_cmd(cmd_out, callback) + wait_event.wait() + return resp + + def _get_text_embedding(self, text: str) -> List[float]: + return self._get_query_embedding(text) + + # for texts embedding, will not be called in this module + def _get_text_embeddings(self, texts: List[str]) -> List[List[float]]: + logger.warning("not implemented") + return [] diff --git a/agents/addon/extension/llama_index_chat_engine/astra_llm.py b/agents/addon/extension/llama_index_chat_engine/astra_llm.py new file mode 100644 index 00000000..a642603c --- /dev/null +++ b/agents/addon/extension/llama_index_chat_engine/astra_llm.py @@ -0,0 +1,148 @@ +from typing import Any, Sequence +import json, queue +import threading + +from llama_index.core.base.llms.types import ( + LLMMetadata, + MessageRole, + ChatMessage, + ChatResponse, + CompletionResponse, + ChatResponseGen, + CompletionResponseGen, +) + +from llama_index.core.llms.callbacks import llm_chat_callback, llm_completion_callback + +from llama_index.core.llms.custom import CustomLLM +from .log import logger +from rte import Cmd, StatusCode, CmdResult, RteEnv + + +def chat_from_astra_response(cmd_result: CmdResult) -> ChatResponse: + status = cmd_result.get_status_code() + if status != StatusCode.OK: + return None + text_data = cmd_result.get_property_string("text") + return ChatResponse(message=ChatMessage(content=text_data)) + + +def _messages_str_from_chat_messages(messages: Sequence[ChatMessage]) -> str: + messages_list = [] + for message in messages: + messages_list.append( + {"role": message.role, "content": "{}".format(message.content)} + ) + return json.dumps(messages_list, ensure_ascii=False) + + +class ASTRALLM(CustomLLM): + rte: Any + + def __init__(self, rte): + """Creates a new ASTRA model interface.""" + super().__init__() + self.rte = rte + + @property + def metadata(self) -> LLMMetadata: + return LLMMetadata( + # TODO: fix metadata + context_window=1024, + num_output=512, + model_name="astra_llm", + is_chat_model=True, + ) + + @llm_chat_callback() + def chat(self, messages: Sequence[ChatMessage], **kwargs: Any) -> ChatResponse: + logger.debug("ASTRALLM chat start") + + resp: ChatResponse + wait_event = threading.Event() + + def callback(_, result): + logger.debug("ASTRALLM chat callback done") + nonlocal resp + nonlocal wait_event + resp = chat_from_astra_response(result) + wait_event.set() + + messages_str = _messages_str_from_chat_messages(messages) + + cmd = Cmd.create("call_chat") + cmd.set_property_string("messages", messages_str) + cmd.set_property_bool("stream", False) + logger.info( + "ASTRALLM chat send_cmd {}, messages {}".format( + cmd.get_name(), messages_str + ) + ) + + self.rte.send_cmd(cmd, callback) + wait_event.wait() + return resp + + @llm_completion_callback() + def complete( + self, prompt: str, formatted: bool = False, **kwargs: Any + ) -> CompletionResponse: + logger.warning("ASTRALLM complete hasn't been implemented yet") + + @llm_chat_callback() + def stream_chat( + self, messages: Sequence[ChatMessage], **kwargs: Any + ) -> ChatResponseGen: + logger.debug("ASTRALLM stream_chat start") + + cur_tokens = "" + resp_queue = queue.Queue() + + def gen() -> ChatResponseGen: + while True: + delta_text = resp_queue.get() + if delta_text is None: + break + + yield ChatResponse( + message=ChatMessage(content=delta_text, role=MessageRole.ASSISTANT), + delta=delta_text, + ) + + def callback(_, result): + nonlocal cur_tokens + nonlocal resp_queue + + status = result.get_status_code() + if status != StatusCode.OK: + logger.warn("ASTRALLM stream_chat callback status {}".format(status)) + resp_queue.put(None) + return + + cur_tokens = result.get_property_string("text") + logger.debug("ASTRALLM stream_chat callback text [{}]".format(cur_tokens)) + resp_queue.put(cur_tokens) + if result.get_is_final(): + resp_queue.put(None) + + messages_str = _messages_str_from_chat_messages(messages) + + cmd = Cmd.create("call_chat") + cmd.set_property_string("messages", messages_str) + cmd.set_property_bool("stream", True) + logger.info( + "ASTRALLM stream_chat send_cmd {}, messages {}".format( + cmd.get_name(), messages_str + ) + ) + self.rte.send_cmd(cmd, callback) + return gen() + + def stream_complete( + self, prompt: str, formatted: bool = False, **kwargs: Any + ) -> CompletionResponseGen: + logger.warning("ASTRALLM stream_complete hasn't been implemented yet") + + @classmethod + def class_name(cls) -> str: + return "astra_llm" diff --git a/agents/addon/extension/llama_index_chat_engine/astra_retriever.py b/agents/addon/extension/llama_index_chat_engine/astra_retriever.py new file mode 100644 index 00000000..c74dfb99 --- /dev/null +++ b/agents/addon/extension/llama_index_chat_engine/astra_retriever.py @@ -0,0 +1,88 @@ +import time, json, threading +from typing import Any, List +from llama_index.core.schema import QueryBundle, TextNode +from llama_index.core.schema import NodeWithScore +from llama_index.core.retrievers import BaseRetriever + +from .log import logger +from .astra_embedding import ASTRAEmbedding +from rte import ( + RteEnv, + Cmd, + StatusCode, + CmdResult, +) + + +def format_node_result(cmd_result: CmdResult) -> List[NodeWithScore]: + logger.info("ASTRARetriever retrieve response {}".format(cmd_result.to_json())) + status = cmd_result.get_status_code() + try: + contents_json = cmd_result.get_property_to_json("response") + except Exception as e: + logger.warning(f"Failed to get response from cmd_result: {e}") + return [ + NodeWithScore( + node=TextNode(), + score=0.0, + ) + ] + contents = json.loads(contents_json) + if status != StatusCode.OK or len(contents) == 0: + return [ + NodeWithScore( + node=TextNode(), + score=0.0, + ) + ] + + nodes = [] + for result in contents: + text_node = TextNode( + text=result["content"], + ) + nodes.append(NodeWithScore(node=text_node, score=result["score"])) + return nodes + + +class ASTRARetriever(BaseRetriever): + rte: Any + embed_model: ASTRAEmbedding + + def __init__(self, rte: RteEnv, coll: str): + super().__init__() + try: + self.rte = rte + self.embed_model = ASTRAEmbedding(rte=rte) + self.collection_name = coll + except Exception as e: + logger.error(f"Failed to initialize ASTRARetriever: {e}") + + def _retrieve(self, query_bundle: QueryBundle) -> List[NodeWithScore]: + logger.info("ASTRARetriever retrieve: {}".format(query_bundle.to_json)) + + wait_event = threading.Event() + resp: List[NodeWithScore] = [] + + def cmd_callback(_, result): + nonlocal resp + nonlocal wait_event + resp = format_node_result(result) + wait_event.set() + logger.debug("ASTRARetriever callback done") + + embedding = self.embed_model.get_query_embedding(query=query_bundle.query_str) + + query_cmd = Cmd.create("query_vector") + query_cmd.set_property_string("collection_name", self.collection_name) + query_cmd.set_property_int("top_k", 3) # TODO: configable + query_cmd.set_property_from_json("embedding", json.dumps(embedding)) + logger.info( + "ASTRARetriever send_cmd, collection_name: {}, embedding len: {}".format( + self.collection_name, len(embedding) + ) + ) + self.rte.send_cmd(query_cmd, cmd_callback) + + wait_event.wait() + return resp diff --git a/agents/addon/extension/llama_index_chat_engine/extension.py b/agents/addon/extension/llama_index_chat_engine/extension.py new file mode 100644 index 00000000..2c189dc0 --- /dev/null +++ b/agents/addon/extension/llama_index_chat_engine/extension.py @@ -0,0 +1,203 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2024-05. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +from rte import ( + Extension, + RteEnv, + Cmd, + Data, + StatusCode, + CmdResult, +) +from .log import logger +from .astra_llm import ASTRALLM +from .astra_retriever import ASTRARetriever +import queue, threading +from datetime import datetime +from llama_index.core.chat_engine import SimpleChatEngine, ContextChatEngine +from llama_index.core.storage.chat_store import SimpleChatStore +from llama_index.core.memory import ChatMemoryBuffer + +PROPERTY_CHAT_MEMORY_TOKEN_LIMIT = "chat_memory_token_limit" +PROPERTY_GREETING = "greeting" + + +class LlamaIndexExtension(Extension): + def __init__(self, name: str): + super().__init__(name) + self.queue = queue.Queue() + self.thread = None + self.stop = False + + self.collection_name = "" + self.outdate_ts = datetime.now() + self.chat_memory_token_limit = 3000 + self.chat_memory = None + + def _send_text_data(self, rte: RteEnv, text: str, end_of_segment: bool): + try: + output_data = Data.create("text_data") + output_data.set_property_string("text", text) + output_data.set_property_bool("end_of_segment", end_of_segment) + rte.send_data(output_data) + logger.info("text [{}] end_of_segment {} sent".format(text, end_of_segment)) + except Exception as err: + logger.info( + "text [{}] end_of_segment {} send failed, err {}".format( + text, end_of_segment, err + ) + ) + + def on_start(self, rte: RteEnv) -> None: + logger.info("on_start") + + greeting = None + try: + greeting = rte.get_property_string(PROPERTY_GREETING) + except Exception as err: + logger.warning(f"get {PROPERTY_GREETING} property failed, err: {err}") + + try: + self.chat_memory_token_limit = rte.get_property_int( + PROPERTY_CHAT_MEMORY_TOKEN_LIMIT + ) + except Exception as err: + logger.warning( + f"get {PROPERTY_CHAT_MEMORY_TOKEN_LIMIT} property failed, err: {err}" + ) + + self.thread = threading.Thread(target=self.async_handle, args=[rte]) + self.thread.start() + + # enable chat memory + self.chat_memory = ChatMemoryBuffer.from_defaults( + token_limit=self.chat_memory_token_limit, + chat_store=SimpleChatStore(), + ) + + # Send greeting if available + if greeting is not None: + self._send_text_data(rte, greeting, True) + + rte.on_start_done() + + def on_stop(self, rte: RteEnv) -> None: + logger.info("on_stop") + + self.stop = True + self.flush() + self.queue.put(None) + if self.thread is not None: + self.thread.join() + self.thread = None + self.chat_memory = None + + rte.on_stop_done() + + def on_cmd(self, rte: RteEnv, cmd: Cmd) -> None: + + cmd_name = cmd.get_name() + logger.info("on_cmd {}".format(cmd_name)) + if cmd_name == "file_chunked": + coll = cmd.get_property_string("collection_name") + self.collection_name = coll + + file_chunked_text = "Your document has been processed. Please wait a moment while we process your document for you. " + self._send_text_data(rte, file_chunked_text, True) + elif cmd_name == "file_downloaded": + file_downloaded_text = "Your document has been received. Please wait a moment while we process it for you. " + self._send_text_data(rte, file_downloaded_text, True) + elif cmd_name == "flush": + self.flush() + rte.send_cmd(Cmd.create("flush"), None) + + cmd_result = CmdResult.create(StatusCode.OK) + cmd_result.set_property_string("detail", "ok") + rte.return_result(cmd_result, cmd) + + def on_data(self, rte: RteEnv, data: Data) -> None: + is_final = data.get_property_bool("is_final") + if not is_final: + logger.info("on_data ignore non final") + return + + inputText = data.get_property_string("text") + if len(inputText) == 0: + logger.info("on_data ignore empty text") + return + + ts = datetime.now() + + logger.info("on_data text [%s], ts [%s]", inputText, ts) + self.queue.put((inputText, ts)) + + def async_handle(self, rte: RteEnv): + logger.info("async_handle started") + while not self.stop: + try: + value = self.queue.get() + if value is None: + break + input_text, ts = value + if ts < self.outdate_ts: + logger.info( + "text [%s] ts [%s] dropped due to outdated", input_text, ts + ) + continue + logger.info("process input text [%s] ts [%s]", input_text, ts) + + # prepare chat engine + chat_engine = None + if len(self.collection_name) > 0: + chat_engine = ContextChatEngine.from_defaults( + llm=ASTRALLM(rte=rte), + retriever=ASTRARetriever(rte=rte, coll=self.collection_name), + memory=self.chat_memory, + system_prompt=( + "You are an expert Q&A system that is trusted around the world.\n" + "Always answer the query using the provided context information, " + "and not prior knowledge.\n" + "Some rules to follow:\n" + "1. Never directly reference the given context in your answer.\n" + "2. Avoid statements like 'Based on the context, ...' or " + "'The context information ...' or anything along " + "those lines." + ), + ) + else: + chat_engine = SimpleChatEngine.from_defaults( + llm=ASTRALLM(rte=rte), + system_prompt="You are an expert Q&A system that is trusted around the world.\n", + memory=self.chat_memory, + ) + + resp = chat_engine.stream_chat(input_text) + for cur_token in resp.response_gen: + if self.stop: + break + if ts < self.outdate_ts: + logger.info( + "stream_chat coming responses dropped due to outdated for input text [%s] ts [%s] ", + input_text, + ts, + ) + break + text = str(cur_token) + + # send out + self._send_text_data(rte, text, False) + + # send out end_of_segment + self._send_text_data(rte, "", True) + except Exception as e: + logger.exception(e) + logger.info("async_handle stoped") + + def flush(self): + self.outdate_ts = datetime.now() + while not self.queue.empty(): + self.queue.get() diff --git a/agents/addon/extension/llama_index_chat_engine/log.py b/agents/addon/extension/llama_index_chat_engine/log.py new file mode 100644 index 00000000..0804a279 --- /dev/null +++ b/agents/addon/extension/llama_index_chat_engine/log.py @@ -0,0 +1,13 @@ +import logging + +logger = logging.getLogger("llama_index_chat_engine") +logger.setLevel(logging.INFO) + +formatter = logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(process)d - [%(filename)s:%(lineno)d] - %(message)s" +) + +console_handler = logging.StreamHandler() +console_handler.setFormatter(formatter) + +logger.addHandler(console_handler) diff --git a/agents/addon/extension/llama_index_chat_engine/manifest.json b/agents/addon/extension/llama_index_chat_engine/manifest.json new file mode 100644 index 00000000..11ca8973 --- /dev/null +++ b/agents/addon/extension/llama_index_chat_engine/manifest.json @@ -0,0 +1,158 @@ +{ + "type": "extension", + "name": "llama_index_chat_engine", + "version": "0.4.0", + "language": "python", + "dependencies": [ + { + "type": "system", + "name": "rte_runtime_python", + "version": "0.4" + } + ], + "api": { + "property": { + "chat_memory_token_limit": { + "type": "int32" + }, + "greeting": { + "type": "string" + } + }, + "data_in": [ + { + "name": "text_data", + "property": { + "text": { + "type": "string" + }, + "is_final": { + "type": "bool" + } + } + } + ], + "data_out": [ + { + "name": "text_data", + "property": { + "text": { + "type": "string" + }, + "end_of_segment": { + "type": "bool" + } + } + } + ], + "cmd_in": [ + { + "name": "flush" + }, + { + "name": "file_downloaded" + }, + { + "name": "file_chunked", + "property": { + "collection_name": { + "type": "string" + } + }, + "required": [ + "collection_name" + ] + } + ], + "cmd_out": [ + { + "name": "flush" + }, + { + "name": "call_chat", + "property": { + "messages": { + "type": "string" + }, + "stream": { + "type": "bool" + } + }, + "required": [ + "messages" + ], + "result": { + "property": { + "text": { + "type": "string" + } + }, + "required": [ + "text" + ] + } + }, + { + "name": "embed", + "property": { + "input": { + "type": "string" + } + }, + "required": [ + "input" + ], + "result": { + "property": { + "embedding": { + "type": "array", + "items": { + "type": "float64" + } + } + } + } + }, + { + "name": "query_vector", + "property": { + "collection_name": { + "type": "string" + }, + "top_k": { + "type": "int64" + }, + "embedding": { + "type": "array", + "items": { + "type": "float64" + } + } + }, + "required": [ + "collection_name", + "top_k", + "embedding" + ], + "result": { + "property": { + "response": { + "type": "array", + "items": { + "type": "object", + "properties": { + "content": { + "type": "string" + }, + "score": { + "type": "float64" + } + } + } + } + } + } + } + ] + } +} \ No newline at end of file diff --git a/agents/addon/extension/llama_index_chat_engine/property.json b/agents/addon/extension/llama_index_chat_engine/property.json new file mode 100644 index 00000000..9e26dfee --- /dev/null +++ b/agents/addon/extension/llama_index_chat_engine/property.json @@ -0,0 +1 @@ +{} \ No newline at end of file diff --git a/agents/addon/extension/llama_index_chat_engine/requirements.txt b/agents/addon/extension/llama_index_chat_engine/requirements.txt new file mode 100644 index 00000000..f1987506 --- /dev/null +++ b/agents/addon/extension/llama_index_chat_engine/requirements.txt @@ -0,0 +1 @@ +llama_index diff --git a/agents/addon/extension/qwen_llm_python/__init__.py b/agents/addon/extension/qwen_llm_python/__init__.py index b5b5c7c2..3d3b6a9c 100644 --- a/agents/addon/extension/qwen_llm_python/__init__.py +++ b/agents/addon/extension/qwen_llm_python/__init__.py @@ -1,3 +1,4 @@ from . import qwen_llm_addon +from .log import logger -print("qwen_llm_python extension loaded") +logger.info("qwen_llm_python extension loaded") diff --git a/agents/addon/extension/qwen_llm_python/manifest.json b/agents/addon/extension/qwen_llm_python/manifest.json index 097d87b4..153369c1 100644 --- a/agents/addon/extension/qwen_llm_python/manifest.json +++ b/agents/addon/extension/qwen_llm_python/manifest.json @@ -1,13 +1,13 @@ { "type": "extension", "name": "qwen_llm_python", - "version": "0.1.0", + "version": "0.4.0", "language": "python", "dependencies": [ { "type": "system", "name": "rte_runtime_python", - "version": "0.4.0" + "version": "0.4" } ], "api": { @@ -60,6 +60,30 @@ "cmd_in": [ { "name": "flush" + }, + { + "name": "call_chat", + "property": { + "messages": { + "type": "string" + }, + "stream": { + "type": "bool" + } + }, + "required": [ + "messages" + ], + "result": { + "property": { + "text": { + "type": "string" + } + }, + "required": [ + "text" + ] + } } ], "cmd_out": [ diff --git a/agents/addon/extension/qwen_llm_python/qwen_llm_extension.py b/agents/addon/extension/qwen_llm_python/qwen_llm_extension.py index bcb7612b..2094d4e8 100644 --- a/agents/addon/extension/qwen_llm_python/qwen_llm_extension.py +++ b/agents/addon/extension/qwen_llm_python/qwen_llm_extension.py @@ -12,11 +12,11 @@ Data, StatusCode, CmdResult, - MetadataInfo, ) from typing import List, Any import dashscope import queue +import json from datetime import datetime import threading from http import HTTPStatus @@ -47,8 +47,7 @@ def __init__(self, name: str): self.max_history = 10 self.stopped = False self.thread = None - self.outdateTs = datetime.now() - self.ongoing = "" + self.outdate_ts = datetime.now() self.queue = queue.Queue() self.mutex = threading.Lock() @@ -75,38 +74,63 @@ def get_messages(self) -> List[Any]: return messages def need_interrupt(self, ts: datetime.time) -> bool: - return self.outdateTs > ts and (self.outdateTs - ts).total_seconds() > 1 + return self.outdate_ts > ts and (self.outdate_ts - ts).total_seconds() > 1 - def call(self, messages: List[Any]): - logger.info("before call %s", messages) - response = dashscope.Generation.call( - "qwen-max", - messages=messages, - result_format="message", # set the result to be "message" format. - stream=False, # set streaming output - incremental_output=False, # get streaming output incrementally - ) - if response.status_code == HTTPStatus.OK: - self.on_msg( - response.output.choices[0]["message"]["role"], - response.output.choices[0]["message"]["content"], - ) - logger.info( - "on response %s", response.output.choices[0]["message"]["content"] - ) + def complete_with_history(self, rte: RteEnv, ts: datetime.time, input_text: str): + """ + Complete input_text querying with built-in chat history. + """ + + def callback(text: str, end_of_segment: bool): + d = Data.create("text_data") + d.set_property_string("text", text) + d.set_property_bool("end_of_segment", end_of_segment) + rte.send_data(d) + + messages = self.get_messages() + messages.append({"role": "user", "content": input_text}) + total = self.stream_chat(ts, messages, callback) + self.on_msg("user", input_text) + if len(total) > 0: + self.on_msg("assistant", total) + + def call_chat(self, rte: RteEnv, ts: datetime.time, cmd: Cmd): + """ + Respond to call_chat cmd and return results in streaming. + The incoming 'messages' will contains all the system prompt, chat history and question. + """ + + def callback(text: str, end_of_segment: bool): + cmd_result = CmdResult.create(StatusCode.OK) + cmd_result.set_property_string("text", text) + if end_of_segment: + cmd_result.set_is_final(True) # end of streaming return + else: + cmd_result.set_is_final(False) # keep streaming return + logger.info("call_chat cmd return_result {}".format(cmd_result.to_json())) + rte.return_result(cmd_result, cmd) + + messages_str = cmd.get_property_string("messages") + messages = json.loads(messages_str) + # messages = [{"role": "user", "content": messages_str}] + stream = False + try: + stream = cmd.get_property_bool("stream") + except Exception as e: + logger.warning("stream property not found, default to False") + + if stream: + self.stream_chat(ts, messages, callback) else: - logger.info("Failed to get response %s", response) + total = self.stream_chat(ts, messages, None) + callback(total, True) # callback once until full answer returned + + def stream_chat(self, ts: datetime.time, messages: List[Any], callback): + logger.info("before stream_chat call {} {}".format(messages, ts)) - def call_with_stream( - self, rte: RteEnv, ts: datetime.time, inputText: str, messages: List[Any] - ): if self.need_interrupt(ts): - logger.warning("out of date, %s, %s", self.outdateTs, ts) + logger.warning("out of date, %s, %s", self.outdate_ts, ts) return - if len(self.ongoing) > 0: - messages.append({"role": "assistant", "content": self.ongoing}) - messages.append({"role": "user", "content": inputText}) - logger.info("before call %s %s", messages, ts) responses = dashscope.Generation.call( self.model, @@ -115,54 +139,47 @@ def call_with_stream( stream=True, # set streaming output incremental_output=True, # get streaming output incrementally ) + total = "" partial = "" for response in responses: if self.need_interrupt(ts): - if len(self.ongoing) > 0: - self.on_msg("user", inputText) - self.on_msg("assistant", self.ongoing) - self.ongoing = "" - logger.warning("out of date, %s, %s", self.outdateTs, ts) - return + logger.warning("out of date, %s, %s", self.outdate_ts, ts) + total += partial + partial = "" # discard not sent + break if response.status_code == HTTPStatus.OK: temp = response.output.choices[0]["message"]["content"] if len(temp) == 0: continue partial += temp - self.ongoing += temp if (isEnd(temp) and len(partial) > 10) or len(partial) > 50: - d = Data.create("text_data") - d.set_property_bool("end_of_segment", isEnd(partial)) - d.set_property_string("text", partial) - rte.send_data(d) + if callback is not None: + callback(partial, False) total += partial partial = "" else: - logger.info( - "Request id: %s, Status code: %s, error code: %s, error message: %s" - % ( + logger.warning( + "request_id: {}, status_code: {}, error code: {}, error message: {}".format( response.request_id, response.status_code, response.code, response.message, ) ) - return + break + + if len(total) > 0 or len(partial) > 0: # make sure no empty answer + if callback is not None: + callback(partial, True) if len(partial) > 0: - d = Data.create("text_data") - d.set_property_bool("end_of_segment", True) - d.set_property_string("text", partial) - rte.send_data(d) total += partial partial = "" - self.ongoing = "" - self.on_msg("user", inputText) - self.on_msg("assistant", total) - logger.info("on response %s", total) + logger.info("stream_chat full_answer {}".format(total)) + return total def on_start(self, rte: RteEnv) -> None: - logger.info("QWenLLMExtension on_start") + logger.info("on_start") self.api_key = rte.get_property_string("api_key") self.model = rte.get_property_string("model") self.prompt = rte.get_property_string("prompt") @@ -174,34 +191,35 @@ def on_start(self, rte: RteEnv) -> None: rte.on_start_done() def on_stop(self, rte: RteEnv) -> None: - logger.info("QWenLLMExtension on_stop") + logger.info("on_stop") self.stopped = True - self.queue.put(None) self.flush() - self.thread.join() + self.queue.put(None) + if self.thread is not None: + self.thread.join() + self.thread = None rte.on_stop_done() - + def flush(self): - logger.info("QWenLLMExtension flush") + self.outdate_ts = datetime.now() while not self.queue.empty(): self.queue.get() def on_data(self, rte: RteEnv, data: Data) -> None: - logger.info("QWenLLMExtension on_data") + logger.info("on_data") is_final = data.get_property_bool("is_final") if not is_final: logger.info("ignore non final") return - inputText = data.get_property_string("text") - if len(inputText) == 0: + input_text = data.get_property_string("text") + if len(input_text) == 0: logger.info("ignore empty text") return ts = datetime.now() - - logger.info("on data %s, %s", inputText, ts) - self.queue.put((inputText, ts)) + logger.info("on data %s, %s", input_text, ts) + self.queue.put((input_text, ts)) def async_handle(self, rte: RteEnv): while not self.stopped: @@ -209,31 +227,36 @@ def async_handle(self, rte: RteEnv): value = self.queue.get() if value is None: break - inputText, ts = value + input, ts = value if self.need_interrupt(ts): continue - logger.info("fetch from queue %s", inputText) - history = self.get_messages() - self.call_with_stream(rte, ts, inputText, history) + + if isinstance(input, str): + logger.info("fetched from queue {}".format(input)) + self.complete_with_history(rte, ts, input) + else: + logger.info("fetched from queue {}".format(input.get_name())) + self.call_chat(rte, ts, input) except Exception as e: logger.exception(e) def on_cmd(self, rte: RteEnv, cmd: Cmd) -> None: - logger.info("QWenLLMExtension on_cmd") - cmd_json = cmd.to_json() - logger.info("QWenLLMExtension on_cmd json: %s", cmd_json) - - cmdName = cmd.get_name() - if cmdName == "flush": - self.outdateTs = datetime.now() - # self.flush() + ts = datetime.now() + cmd_name = cmd.get_name() + logger.info("on_cmd {}, {}".format(cmd_name, ts)) + + if cmd_name == "flush": + self.flush() cmd_out = Cmd.create("flush") rte.send_cmd( cmd_out, - lambda rte, result: print("QWenLLMExtensionAddon send_cmd done"), + lambda rte, result: logger.info("send_cmd flush done"), ) + elif cmd_name == "call_chat": + self.queue.put((cmd, ts)) + return # cmd_result will be returned once it's processed else: - logger.info("unknown cmd %s", cmdName) + logger.info("unknown cmd {}".format(cmd_name)) cmd_result = CmdResult.create(StatusCode.OK) rte.return_result(cmd_result, cmd) diff --git a/agents/property.json.example b/agents/property.json.example index c84ad926..c63abb40 100644 --- a/agents/property.json.example +++ b/agents/property.json.example @@ -1387,6 +1387,354 @@ ] } ] + }, + { + "name": "va.qwen.rag", + "auto_start": true, + "nodes": [ + { + "type": "extension", + "extension_group": "rtc", + "addon": "agora_rtc", + "name": "agora_rtc", + "property": { + "app_id": "", + "token": "", + "channel": "astra_agents_test", + "stream_id": 1234, + "remote_stream_id": 123, + "subscribe_audio": true, + "publish_audio": true, + "publish_data": true, + "enable_agora_asr": true, + "agora_asr_vendor_name": "microsoft", + "agora_asr_language": "en-US", + "agora_asr_vendor_key": "", + "agora_asr_vendor_region": "", + "agora_asr_session_control_file_path": "session_control.conf" + } + }, + { + "type": "extension", + "extension_group": "llm", + "addon": "qwen_llm_python", + "name": "qwen_llm", + "property": { + "api_key": "", + "model": "qwen-max", + "max_tokens": 512, + "prompt": "", + "max_memory_length": 10 + } + }, + { + "type": "extension", + "extension_group": "tts", + "addon": "cosy_tts", + "name": "cosy_tts", + "property": { + "api_key": "", + "model": "cosyvoice-v1", + "voice": "longxiaochun", + "sample_rate": 16000 + } + }, + { + "type": "extension", + "extension_group": "chat_transcriber", + "addon": "chat_transcriber_python", + "name": "chat_transcriber" + }, + { + "type": "extension", + "extension_group": "interrupt_detector", + "addon": "interrupt_detector_python", + "name": "interrupt_detector" + }, + { + "type": "extension", + "extension_group": "http_server", + "addon": "http_server_python", + "name": "http_server", + "property": { + "listen_addr": "127.0.0.1", + "listen_port": 8080 + } + }, + { + "type": "extension", + "extension_group": "embedding", + "addon": "aliyun_text_embedding", + "name": "embedding", + "property": { + "api_key": "", + "model": "text-embedding-v3" + } + }, + { + "type": "extension", + "extension_group": "vector_storage", + "addon": "aliyun_analyticdb_vector_storage", + "name": "vector_storage", + "property": { + "alibaba_cloud_access_key_id": "", + "alibaba_cloud_access_key_secret": "", + "adbpg_instance_id": "", + "adbpg_instance_region": "cn-shanghai", + "adbpg_account": "", + "adbpg_account_password": "", + "adbpg_namespace": "", + "adbpg_namespace_password": "" + } + }, + { + "type": "extension", + "extension_group": "file_chunker", + "addon": "file_chunker", + "name": "file_chunker", + "property": {} + }, + { + "type": "extension", + "extension_group": "llama_index", + "addon": "llama_index_chat_engine", + "name": "llama_index", + "property": { + "greeting": "ASTRA agent connected. How can i help you today?", + "chat_memory_token_limit": 3000 + } + } + ], + "connections": [ + { + "extension_group": "rtc", + "extension": "agora_rtc", + "data": [ + { + "name": "text_data", + "dest": [ + { + "extension_group": "interrupt_detector", + "extension": "interrupt_detector" + }, + { + "extension_group": "chat_transcriber", + "extension": "chat_transcriber" + } + ] + } + ] + }, + { + "extension_group": "interrupt_detector", + "extension": "interrupt_detector", + "cmd": [ + { + "name": "flush", + "dest": [ + { + "extension_group": "llama_index", + "extension": "llama_index" + } + ] + } + ], + "data": [ + { + "name": "text_data", + "dest": [ + { + "extension_group": "llama_index", + "extension": "llama_index" + } + ] + } + ] + }, + { + "extension_group": "llama_index", + "extension": "llama_index", + "data": [ + { + "name": "text_data", + "dest": [ + { + "extension_group": "tts", + "extension": "cosy_tts" + }, + { + "extension_group": "chat_transcriber", + "extension": "chat_transcriber", + "cmd_conversions": [ + { + "cmd": { + "type": "per_property", + "keep_original": true, + "rules": [ + { + "path": "is_final", + "type": "fixed_value", + "value": "bool(true)" + }, + { + "path": "stream_id", + "type": "fixed_value", + "value": "uint32(999)" + } + ] + } + } + ] + } + ] + } + ], + "cmd": [ + { + "name": "flush", + "dest": [ + { + "extension_group": "llm", + "extension": "qwen_llm" + }, + { + "extension_group": "tts", + "extension": "cosy_tts" + } + ] + }, + { + "name": "call_chat", + "dest": [ + { + "extension_group": "llm", + "extension": "qwen_llm" + } + ] + }, + { + "name": "embed", + "dest": [ + { + "extension_group": "embedding", + "extension": "embedding" + } + ] + }, + { + "name": "query_vector", + "dest": [ + { + "extension_group": "vector_storage", + "extension": "vector_storage" + } + ] + } + ] + }, + { + "extension_group": "tts", + "extension": "cosy_tts", + "pcm_frame": [ + { + "name": "pcm_frame", + "dest": [ + { + "extension_group": "rtc", + "extension": "agora_rtc" + } + ] + } + ], + "cmd": [ + { + "name": "flush", + "dest": [ + { + "extension_group": "rtc", + "extension": "agora_rtc" + } + ] + } + ] + }, + { + "extension_group": "chat_transcriber", + "extension": "chat_transcriber", + "data": [ + { + "name": "data", + "dest": [ + { + "extension_group": "rtc", + "extension": "agora_rtc" + } + ] + } + ] + }, + { + "extension_group": "http_server", + "extension": "http_server", + "cmd": [ + { + "name": "file_downloaded", + "dest": [ + { + "extension_group": "file_chunker", + "extension": "file_chunker" + }, + { + "extension_group": "llama_index", + "extension": "llama_index" + } + ] + } + ] + }, + { + "extension_group": "file_chunker", + "extension": "file_chunker", + "cmd": [ + { + "name": "embed_batch", + "dest": [ + { + "extension_group": "embedding", + "extension": "embedding" + } + ] + }, + { + "name": "create_collection", + "dest": [ + { + "extension_group": "vector_storage", + "extension": "vector_storage" + } + ] + }, + { + "name": "upsert_vector", + "dest": [ + { + "extension_group": "vector_storage", + "extension": "vector_storage" + } + ] + }, + { + "name": "file_chunked", + "dest": [ + { + "extension_group": "llama_index", + "extension": "llama_index" + } + ] + } + ] + } + ] } ] }