Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initialize RAG #175

Merged
merged 12 commits into from
Aug 9, 2024
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from . import vector_storage_addon
from .log import logger

logger.info("aliyun_analyticdb_vector_storage extension loaded")
98 changes: 98 additions & 0 deletions agents/addon/extension/aliyun_analyticdb_vector_storage/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# -*- coding: utf-8 -*-

try:
from .log import logger
except ImportError:
from log import logger
import asyncio
import threading
from typing import Coroutine
from concurrent.futures import Future


from alibabacloud_gpdb20160503.client import Client as gpdb20160503Client
from alibabacloud_tea_openapi import models as open_api_models


# maybe need multiple clients
class AliGPDBClient:
def __init__(self, access_key_id, access_key_secret, endpoint):
self.stopEvent = asyncio.Event()
self.loop = None
self.tasks = asyncio.Queue()
self.access_key_id = access_key_id
self.access_key_secret = access_key_secret
self.endpoint = endpoint
self.client = self.create_client()
self.thread = threading.Thread(
target=asyncio.run, args=(self.__thread_routine(),)
)
self.thread.start()

async def stop_thread(self):
self.stopEvent.set()

def create_client(self) -> gpdb20160503Client:
config = open_api_models.Config(
access_key_id=self.access_key_id,
access_key_secret=self.access_key_secret,
endpoint=self.endpoint,
)
return gpdb20160503Client(config)

def get(self) -> gpdb20160503Client:
return self.client

def close(self):
if (self.loop is not None) and self.thread.is_alive():
self.stopEvent.set()
asyncio.run_coroutine_threadsafe(self.stop_thread(), self.loop)
self.thread.join()

async def __thread_routine(self):
logger.info("client __thread_routine start")
self.loop = asyncio.get_running_loop()
tasks = set()
while not self.stopEvent.is_set():
if not self.tasks.empty():
coro, future = await self.tasks.get()
try:
task = asyncio.create_task(coro)
tasks.add(task)
task.add_done_callback(lambda t: future.set_result(t.result()))
except Exception as e:
future.set_exception(e)
elif tasks:
done, tasks = await asyncio.wait(
tasks, return_when=asyncio.FIRST_COMPLETED
)
for task in done:
if task.exception():
logger.error(f"task exception: {task.exception()}")
future.set_exception(task.exception())
else:
await asyncio.sleep(0.1)
logger.info("client __thread_routine end")

async def submit_task(self, coro: Coroutine) -> Future:
future = Future()
await self.tasks.put((coro, future))
return future

def submit_task_with_new_thread(self, coro: Coroutine) -> Future:
future = Future()

def run_coro_in_new_thread():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(coro)
future.set_result(result)
except Exception as e:
future.set_exception(e)
finally:
loop.close()

thread = threading.Thread(target=run_coro_in_new_thread)
thread.start()
return future
13 changes: 13 additions & 0 deletions agents/addon/extension/aliyun_analyticdb_vector_storage/log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import logging

logger = logging.getLogger("aliyun_analyticdb_vector_storage")
logger.setLevel(logging.INFO)

formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(process)d - [%(filename)s:%(lineno)d] - %(message)s"
)

console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)

logger.addHandler(console_handler)
122 changes: 122 additions & 0 deletions agents/addon/extension/aliyun_analyticdb_vector_storage/manifest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
{
"type": "extension",
"name": "aliyun_analyticdb_vector_storage",
"version": "0.4.0",
"language": "python",
"dependencies": [
{
"type": "system",
"name": "rte_runtime_python",
"version": "0.4"
}
],
"api": {
"property": {
"alibaba_cloud_access_key_id": {
"type": "string"
},
"alibaba_cloud_access_key_secret": {
"type": "string"
},
"adbpg_instance_id": {
"type": "string"
},
"adbpg_instance_region": {
"type": "string"
},
"adbpg_account": {
"type": "string"
},
"adbpg_account_password": {
"type": "string"
},
"adbpg_namespace": {
"type": "string"
},
"adbpg_namespace_password": {
"type": "string"
}
},
"cmd_in": [
{
"name": "upsert_vector",
"property": {
"collection_name": {
"type": "string"
},
"file_name": {
"type": "string"
},
"content": {
"type": "string"
}
}
},
{
"name": "query_vector",
"property": {
"collection_name": {
"type": "string"
},
"top_k": {
"type": "int64"
},
"embedding": {
"type": "array",
"items": {
"type": "float64"
}
}
},
"required": [
"collection_name",
"top_k",
"embedding"
],
"result": {
"property": {
"response": {
"type": "array",
"items": {
"type": "object",
"properties": {
"content": {
"type": "string"
},
"score": {
"type": "float64"
}
}
}
}
}
}
},
{
"name": "create_collection",
"property": {
"collection_name": {
"type": "string"
},
"dimension": {
"type": "int32"
}
},
"required": [
"collection_name"
]
},
{
"name": "delete_collection",
"property": {
"collection_name": {
"type": "string"
}
},
"required": [
"collection_name"
]
}
]
}
}
Loading