Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(): add api for /vector/document #193

Merged
merged 2 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@
# ------------------------------

# Log path
LOG_PATH=/tmp
LOG_PATH=/tmp/astra
# Graph designer server port
GRAPH_DESIGNER_SERVER_PORT=49483
# The corresponding graph name based on the language
GRAPH_NAME_ZH=va.openai.azure
GRAPH_NAME_EN=va.openai.azure
# Server port
SERVER_PORT=8080
# Maximum number of workers
Expand All @@ -26,6 +29,19 @@ AGORA_APP_CERTIFICATE=
# Worker Configuration
# ------------------------------

# Extension: aliyun_analyticdb_vector_storage
ALIBABA_CLOUD_ACCESS_KEY_ID=
ALIBABA_CLOUD_ACCESS_KEY_SECRET=
ALIYUN_ANALYTICDB_ACCOUNT=
ALIYUN_ANALYTICDB_ACCOUNT_PASSWORD=
ALIYUN_ANALYTICDB_INSTANCE_ID=
ALIYUN_ANALYTICDB_INSTANCE_REGION=cn-shanghai
ALIYUN_ANALYTICDB_NAMESPACE=
ALIYUN_ANALYTICDB_NAMESPACE_PASSWORD=

# Extension: aliyun_text_embedding
ALIYUN_TEXT_EMBEDDING_API_KEY=

# Extension: bedrock_llm
# Extension: polly_tts
AWS_ACCESS_KEY_ID=
Expand Down
14 changes: 7 additions & 7 deletions agents/property.json.example
Original file line number Diff line number Diff line change
Expand Up @@ -1465,7 +1465,7 @@
"type": "extension",
"extension_group": "embedding",
"addon": "aliyun_text_embedding",
"name": "embedding",
"name": "aliyun_text_embedding",
"property": {
"api_key": "<aliyun_text_embedding_api_key>",
"model": "text-embedding-v3"
Expand All @@ -1475,7 +1475,7 @@
"type": "extension",
"extension_group": "vector_storage",
"addon": "aliyun_analyticdb_vector_storage",
"name": "vector_storage",
"name": "aliyun_analyticdb_vector_storage",
"property": {
"alibaba_cloud_access_key_id": "<alibaba_cloud_access_key_id>",
"alibaba_cloud_access_key_secret": "<alibaba_cloud_access_key_secret>",
Expand Down Expand Up @@ -1617,7 +1617,7 @@
"dest": [
{
"extension_group": "embedding",
"extension": "embedding"
"extension": "aliyun_text_embedding"
}
]
},
Expand All @@ -1626,7 +1626,7 @@
"dest": [
{
"extension_group": "vector_storage",
"extension": "vector_storage"
"extension": "aliyun_analyticdb_vector_storage"
}
]
}
Expand Down Expand Up @@ -1701,7 +1701,7 @@
"dest": [
{
"extension_group": "embedding",
"extension": "embedding"
"extension": "aliyun_text_embedding"
}
]
},
Expand All @@ -1710,7 +1710,7 @@
"dest": [
{
"extension_group": "vector_storage",
"extension": "vector_storage"
"extension": "aliyun_analyticdb_vector_storage"
}
]
},
Expand All @@ -1719,7 +1719,7 @@
"dest": [
{
"extension_group": "vector_storage",
"extension": "vector_storage"
"extension": "aliyun_analyticdb_vector_storage"
}
]
},
Expand Down
1 change: 1 addition & 0 deletions server/internal/code.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ var (
codeErrChannelEmpty = NewCode("10004", "channel empty")
codeErrGenerateTokenFailed = NewCode("10005", "generate token failed")
codeErrSaveFileFailed = NewCode("10006", "save file failed")
codeErrParseJsonFailed = NewCode("10007", "parse json failed")

codeErrProcessPropertyFailed = NewCode("10100", "process property json failed")
codeErrStartWorkerFailed = NewCode("10101", "start worker failed")
Expand Down
62 changes: 49 additions & 13 deletions server/internal/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package internal

import "log/slog"
import (
"log/slog"
"os"
)

type Prop struct {
ExtensionName string
Expand All @@ -9,22 +12,22 @@ type Prop struct {

const (
// Extension name
extensionNameAgoraRTC = "agora_rtc"
extensionNameAzureTTS = "azure_tts"
extensionNameBedrockLLM = "bedrock_llm"
extensionNameCosyTTS = "cosy_tts"
extensionNameElevenlabsTTS = "elevenlabs_tts"
extensionNameHttpServer = "http_server"
extensionNameLiteLLM = "litellm"
extensionNameOpenaiChatgpt = "openai_chatgpt"
extensionNamePollyTTS = "polly_tts"
extensionNameQwenLLM = "qwen_llm"
extensionNameAgoraRTC = "agora_rtc"
extensionNameAliyunAnalyticdbVectorStorage = "aliyun_analyticdb_vector_storage"
extensionNameAliyunTextEmbedding = "aliyun_text_embedding"
extensionNameAzureTTS = "azure_tts"
extensionNameBedrockLLM = "bedrock_llm"
extensionNameCosyTTS = "cosy_tts"
extensionNameElevenlabsTTS = "elevenlabs_tts"
extensionNameHttpServer = "http_server"
extensionNameLiteLLM = "litellm"
extensionNameOpenaiChatgpt = "openai_chatgpt"
extensionNamePollyTTS = "polly_tts"
extensionNameQwenLLM = "qwen_llm"

// Language
languageChinese = "zh-CN"
languageEnglish = "en-US"
// Default graph name
graphNameDefault = "va.openai.azure"
// Property json
PropertyJsonFile = "./agents/property.json"
// Token expire time
Expand All @@ -42,6 +45,33 @@ var (
"AGORA_APP_ID": {
{ExtensionName: extensionNameAgoraRTC, Property: "app_id"},
},
"ALIBABA_CLOUD_ACCESS_KEY_SECRET": {
{ExtensionName: extensionNameAliyunAnalyticdbVectorStorage, Property: "alibaba_cloud_access_key_secret"},
},
"ALIBABA_CLOUD_ACCESS_KEY_ID": {
{ExtensionName: extensionNameAliyunAnalyticdbVectorStorage, Property: "adbpg_account"},
},
"ALIYUN_ANALYTICDB_ACCOUNT": {
{ExtensionName: extensionNameAliyunAnalyticdbVectorStorage, Property: "alibaba_cloud_access_key_id"},
wangyoucao577 marked this conversation as resolved.
Show resolved Hide resolved
},
"ALIYUN_ANALYTICDB_ACCOUNT_PASSWORD": {
{ExtensionName: extensionNameAliyunAnalyticdbVectorStorage, Property: "adbpg_account_password"},
},
"ALIYUN_ANALYTICDB_INSTANCE_ID": {
{ExtensionName: extensionNameAliyunAnalyticdbVectorStorage, Property: "adbpg_instance_id"},
},
"ALIYUN_ANALYTICDB_INSTANCE_REGION": {
{ExtensionName: extensionNameAliyunAnalyticdbVectorStorage, Property: "adbpg_instance_region"},
},
"ALIYUN_ANALYTICDB_NAMESPACE": {
{ExtensionName: extensionNameAliyunAnalyticdbVectorStorage, Property: "adbpg_namespace"},
},
"ALIYUN_ANALYTICDB_NAMESPACE_PASSWORD": {
{ExtensionName: extensionNameAliyunAnalyticdbVectorStorage, Property: "adbpg_namespace_password"},
},
"ALIYUN_TEXT_EMBEDDING_API_KEY": {
{ExtensionName: extensionNameAliyunTextEmbedding, Property: "api_key"},
},
"AWS_ACCESS_KEY_ID": {
{ExtensionName: extensionNameBedrockLLM, Property: "access_key"},
{ExtensionName: extensionNamePollyTTS, Property: "access_key"},
Expand Down Expand Up @@ -99,6 +129,12 @@ var (
},
}

// The corresponding graph name based on the language
graphNameMap = map[string]string{
languageChinese: os.Getenv("GRAPH_NAME_ZH"),
languageEnglish: os.Getenv("GRAPH_NAME_EN"),
}

// Retrieve parameters from the request and map them to the property.json file
startPropMap = map[string][]Prop{
"AgoraAsrLanguage": {
Expand Down
100 changes: 85 additions & 15 deletions server/internal/http_server.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
/**
*
* Agora Real Time Engagement
* Created by Xinhui Li in 2024.
* Created by XinHui Li in 2024.
* Copyright (c) 2024 Agora IO. All rights reserved.
*
*/
package internal

import (
"encoding/json"
"fmt"
"log/slog"
"mime/multipart"
Expand Down Expand Up @@ -67,11 +68,17 @@ type GenerateTokenReq struct {
Uid uint32 `json:"uid,omitempty"`
}

type UploadReq struct {
type VectorDocumentUpdate struct {
RequestId string `json:"request_id,omitempty"`
ChannelName string `json:"channel_name,omitempty"`
Collection string `json:"collection,omitempty"`
FileName string `json:"file_name,omitempty"`
}

type VectorDocumentUpload struct {
RequestId string `form:"request_id,omitempty" json:"request_id,omitempty"`
ChannelName string `form:"channel_name,omitempty" json:"channel_name,omitempty"`
File *multipart.FileHeader `form:"file" binding:"required"`
Uid uint32 `form:"uid,omitempty" json:"uid,omitempty"`
}

func NewHttpServer(httpServerConfig *HttpServerConfig) *HttpServer {
Expand Down Expand Up @@ -235,50 +242,111 @@ func (s *HttpServer) handlerGenerateToken(c *gin.Context) {
s.output(c, codeSuccess, map[string]any{"appId": s.config.AppId, "token": token, "channel_name": req.ChannelName, "uid": req.Uid})
}

func (s *HttpServer) handlerUpload(c *gin.Context) {
var req UploadReq
func (s *HttpServer) handlerVectorDocumentPresetList(c *gin.Context) {
presetList := []map[string]any{}
vectorDocumentPresetList := os.Getenv("VECTOR_DOCUMENT_PRESET_LIST")

if vectorDocumentPresetList != "" {
err := json.Unmarshal([]byte(vectorDocumentPresetList), &presetList)
if err != nil {
slog.Error("handlerVectorDocumentPresetList parse json failed", "err", err, logTag)
s.output(c, codeErrParseJsonFailed, http.StatusBadRequest)
return
}
}

s.output(c, codeSuccess, presetList)
}

func (s *HttpServer) handlerVectorDocumentUpdate(c *gin.Context) {
var req VectorDocumentUpdate

if err := c.ShouldBind(&req); err != nil {
slog.Error("handlerUpload params invalid", "err", err, "channelName", req.ChannelName, "requestId", req.RequestId, logTag)
slog.Error("handlerVectorDocumentUpdate params invalid", "err", err, "channelName", req.ChannelName, "requestId", req.RequestId, logTag)
s.output(c, codeErrParamsInvalid, http.StatusBadRequest)
return
}

if !workers.Contains(req.ChannelName) {
slog.Error("handlerUpload channel not existed", "channelName", req.ChannelName, "requestId", req.RequestId, logTag)
slog.Error("handlerVectorDocumentUpdate channel not existed", "channelName", req.ChannelName, "requestId", req.RequestId, logTag)
s.output(c, codeErrChannelNotExisted, http.StatusBadRequest)
return
}

slog.Info("handlerUpload start", "channelName", req.ChannelName, "requestId", req.RequestId, logTag)
slog.Info("handlerVectorDocumentUpdate start", "channelName", req.ChannelName, "requestId", req.RequestId, logTag)

// update worker
worker := workers.Get(req.ChannelName).(*Worker)
err := worker.update(&WorkerUpdateReq{
RequestId: req.RequestId,
ChannelName: req.ChannelName,
Collection: req.Collection,
FileName: req.FileName,
Rte: &WorkerUpdateReqRte{
Name: "update_querying_collection",
Type: "cmd",
},
})
if err != nil {
slog.Error("handlerVectorDocumentUpdate update worker failed", "err", err, "channelName", req.ChannelName, "Collection", req.Collection, "FileName", req.FileName, "requestId", req.RequestId, logTag)
s.output(c, codeErrUpdateWorkerFailed, http.StatusBadRequest)
return
}

slog.Info("handlerVectorDocumentUpdate end", "channelName", req.ChannelName, "Collection", req.Collection, "FileName", req.FileName, "requestId", req.RequestId, logTag)
s.output(c, codeSuccess, map[string]any{"channel_name": req.ChannelName})
}

func (s *HttpServer) handlerVectorDocumentUpload(c *gin.Context) {
var req VectorDocumentUpload

if err := c.ShouldBind(&req); err != nil {
slog.Error("handlerVectorDocumentUpload params invalid", "err", err, "channelName", req.ChannelName, "requestId", req.RequestId, logTag)
s.output(c, codeErrParamsInvalid, http.StatusBadRequest)
return
}

if !workers.Contains(req.ChannelName) {
slog.Error("handlerVectorDocumentUpload channel not existed", "channelName", req.ChannelName, "requestId", req.RequestId, logTag)
s.output(c, codeErrChannelNotExisted, http.StatusBadRequest)
return
}

slog.Info("handlerVectorDocumentUpload start", "channelName", req.ChannelName, "requestId", req.RequestId, logTag)

file := req.File
uploadFile := fmt.Sprintf("%s/file-%s-%d%s", s.config.LogPath, gmd5.MustEncryptString(req.ChannelName), time.Now().UnixNano(), filepath.Ext(file.Filename))
if err := c.SaveUploadedFile(file, uploadFile); err != nil {
slog.Error("handlerUpload save file failed", "err", err, "channelName", req.ChannelName, "requestId", req.RequestId, logTag)
slog.Error("handlerVectorDocumentUpload save file failed", "err", err, "channelName", req.ChannelName, "requestId", req.RequestId, logTag)
s.output(c, codeErrSaveFileFailed, http.StatusBadRequest)
return
}

// Generate collection
collection := fmt.Sprintf("%s_%d", gmd5.MustEncryptString(req.ChannelName), time.Now().UnixNano())
fileName := filepath.Base(file.Filename)

// update worker
worker := workers.Get(req.ChannelName).(*Worker)
err := worker.update(&WorkerUpdateReq{
RequestId: req.RequestId,
ChannelName: req.ChannelName,
Collection: collection,
FileName: fileName,
Path: uploadFile,
Rte: &WorkerUpdateReqRte{
Name: "file_download",
Name: "file_chunk",
Type: "cmd",
},
})
if err != nil {
slog.Error("handlerUpload update worker failed", "err", err, "channelName", req.ChannelName, "requestId", req.RequestId, logTag)
slog.Error("handlerVectorDocumentUpload update worker failed", "err", err, "channelName", req.ChannelName, "requestId", req.RequestId, logTag)
s.output(c, codeErrUpdateWorkerFailed, http.StatusBadRequest)
return
}

slog.Info("handlerUpload end", "channelName", req.ChannelName, "uploadFile", uploadFile, "requestId", req.RequestId, logTag)
s.output(c, codeSuccess, map[string]any{"channel_name": req.ChannelName, "uid": req.Uid})
slog.Info("handlerVectorDocumentUpload end", "channelName", req.ChannelName, "collection", collection, "uploadFile", uploadFile, "requestId", req.RequestId, logTag)
s.output(c, codeSuccess, map[string]any{"channel_name": req.ChannelName, "collection": collection, "file_name": fileName})
}

func (s *HttpServer) output(c *gin.Context, code *Code, data any, httpStatus ...int) {
Expand All @@ -301,7 +369,7 @@ func (s *HttpServer) processProperty(req *StartReq) (propertyJsonFile string, lo
// Get graph name
graphName := req.GraphName
if graphName == "" {
graphName = graphNameDefault
graphName = graphNameMap[req.AgoraAsrLanguage]
}

// Generate token
Expand Down Expand Up @@ -349,7 +417,9 @@ func (s *HttpServer) Start() {
r.POST("/start", s.handlerStart)
r.POST("/stop", s.handlerStop)
r.POST("/token/generate", s.handlerGenerateToken)
r.POST("/upload", s.handlerUpload)
r.GET("/vector/document/preset/list", s.handlerVectorDocumentPresetList)
r.POST("/vector/document/update", s.handlerVectorDocumentUpdate)
r.POST("/vector/document/upload", s.handlerVectorDocumentUpload)

slog.Info("server start", "port", s.config.Port, logTag)

Expand Down
2 changes: 2 additions & 0 deletions server/internal/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type Worker struct {
type WorkerUpdateReq struct {
RequestId string `form:"request_id,omitempty" json:"request_id,omitempty"`
ChannelName string `form:"channel_name,omitempty" json:"channel_name,omitempty"`
Collection string `form:"collection,omitempty" json:"collection,omitempty"`
FileName string `form:"filename,omitempty" json:"filename,omitempty"`
Path string `form:"path,omitempty" json:"path,omitempty"`
Rte *WorkerUpdateReqRte `form:"rte,omitempty" json:"rte,omitempty"`
}
Expand Down
Loading