diff --git a/.env.example b/.env.example index 17141464..cbd296eb 100644 --- a/.env.example +++ b/.env.example @@ -7,9 +7,12 @@ # ------------------------------ # Log path -LOG_PATH=/tmp +LOG_PATH=/tmp/astra # Graph designer server port GRAPH_DESIGNER_SERVER_PORT=49483 +# The corresponding graph name based on the language +GRAPH_NAME_ZH=va.openai.azure +GRAPH_NAME_EN=va.openai.azure # Server port SERVER_PORT=8080 # Maximum number of workers @@ -26,6 +29,19 @@ AGORA_APP_CERTIFICATE= # Worker Configuration # ------------------------------ +# Extension: aliyun_analyticdb_vector_storage +ALIBABA_CLOUD_ACCESS_KEY_ID= +ALIBABA_CLOUD_ACCESS_KEY_SECRET= +ALIYUN_ANALYTICDB_ACCOUNT= +ALIYUN_ANALYTICDB_ACCOUNT_PASSWORD= +ALIYUN_ANALYTICDB_INSTANCE_ID= +ALIYUN_ANALYTICDB_INSTANCE_REGION=cn-shanghai +ALIYUN_ANALYTICDB_NAMESPACE= +ALIYUN_ANALYTICDB_NAMESPACE_PASSWORD= + +# Extension: aliyun_text_embedding +ALIYUN_TEXT_EMBEDDING_API_KEY= + # Extension: bedrock_llm # Extension: polly_tts AWS_ACCESS_KEY_ID= diff --git a/agents/property.json.example b/agents/property.json.example index c63abb40..10907ba6 100644 --- a/agents/property.json.example +++ b/agents/property.json.example @@ -1465,7 +1465,7 @@ "type": "extension", "extension_group": "embedding", "addon": "aliyun_text_embedding", - "name": "embedding", + "name": "aliyun_text_embedding", "property": { "api_key": "", "model": "text-embedding-v3" @@ -1475,7 +1475,7 @@ "type": "extension", "extension_group": "vector_storage", "addon": "aliyun_analyticdb_vector_storage", - "name": "vector_storage", + "name": "aliyun_analyticdb_vector_storage", "property": { "alibaba_cloud_access_key_id": "", "alibaba_cloud_access_key_secret": "", @@ -1617,7 +1617,7 @@ "dest": [ { "extension_group": "embedding", - "extension": "embedding" + "extension": "aliyun_text_embedding" } ] }, @@ -1626,7 +1626,7 @@ "dest": [ { "extension_group": "vector_storage", - "extension": "vector_storage" + "extension": "aliyun_analyticdb_vector_storage" } ] } @@ -1701,7 +1701,7 @@ "dest": [ { "extension_group": "embedding", - "extension": "embedding" + "extension": "aliyun_text_embedding" } ] }, @@ -1710,7 +1710,7 @@ "dest": [ { "extension_group": "vector_storage", - "extension": "vector_storage" + "extension": "aliyun_analyticdb_vector_storage" } ] }, @@ -1719,7 +1719,7 @@ "dest": [ { "extension_group": "vector_storage", - "extension": "vector_storage" + "extension": "aliyun_analyticdb_vector_storage" } ] }, diff --git a/server/internal/code.go b/server/internal/code.go index 1a583ee0..8a4b61b2 100644 --- a/server/internal/code.go +++ b/server/internal/code.go @@ -16,6 +16,7 @@ var ( codeErrChannelEmpty = NewCode("10004", "channel empty") codeErrGenerateTokenFailed = NewCode("10005", "generate token failed") codeErrSaveFileFailed = NewCode("10006", "save file failed") + codeErrParseJsonFailed = NewCode("10007", "parse json failed") codeErrProcessPropertyFailed = NewCode("10100", "process property json failed") codeErrStartWorkerFailed = NewCode("10101", "start worker failed") diff --git a/server/internal/config.go b/server/internal/config.go index 9297e300..f1cf6dca 100644 --- a/server/internal/config.go +++ b/server/internal/config.go @@ -1,6 +1,9 @@ package internal -import "log/slog" +import ( + "log/slog" + "os" +) type Prop struct { ExtensionName string @@ -9,22 +12,22 @@ type Prop struct { const ( // Extension name - extensionNameAgoraRTC = "agora_rtc" - extensionNameAzureTTS = "azure_tts" - extensionNameBedrockLLM = "bedrock_llm" - extensionNameCosyTTS = "cosy_tts" - extensionNameElevenlabsTTS = "elevenlabs_tts" - extensionNameHttpServer = "http_server" - extensionNameLiteLLM = "litellm" - extensionNameOpenaiChatgpt = "openai_chatgpt" - extensionNamePollyTTS = "polly_tts" - extensionNameQwenLLM = "qwen_llm" + extensionNameAgoraRTC = "agora_rtc" + extensionNameAliyunAnalyticdbVectorStorage = "aliyun_analyticdb_vector_storage" + extensionNameAliyunTextEmbedding = "aliyun_text_embedding" + extensionNameAzureTTS = "azure_tts" + extensionNameBedrockLLM = "bedrock_llm" + extensionNameCosyTTS = "cosy_tts" + extensionNameElevenlabsTTS = "elevenlabs_tts" + extensionNameHttpServer = "http_server" + extensionNameLiteLLM = "litellm" + extensionNameOpenaiChatgpt = "openai_chatgpt" + extensionNamePollyTTS = "polly_tts" + extensionNameQwenLLM = "qwen_llm" // Language languageChinese = "zh-CN" languageEnglish = "en-US" - // Default graph name - graphNameDefault = "va.openai.azure" // Property json PropertyJsonFile = "./agents/property.json" // Token expire time @@ -42,6 +45,33 @@ var ( "AGORA_APP_ID": { {ExtensionName: extensionNameAgoraRTC, Property: "app_id"}, }, + "ALIBABA_CLOUD_ACCESS_KEY_ID": { + {ExtensionName: extensionNameAliyunAnalyticdbVectorStorage, Property: "alibaba_cloud_access_key_id"}, + }, + "ALIBABA_CLOUD_ACCESS_KEY_SECRET": { + {ExtensionName: extensionNameAliyunAnalyticdbVectorStorage, Property: "alibaba_cloud_access_key_secret"}, + }, + "ALIYUN_ANALYTICDB_ACCOUNT": { + {ExtensionName: extensionNameAliyunAnalyticdbVectorStorage, Property: "adbpg_account"}, + }, + "ALIYUN_ANALYTICDB_ACCOUNT_PASSWORD": { + {ExtensionName: extensionNameAliyunAnalyticdbVectorStorage, Property: "adbpg_account_password"}, + }, + "ALIYUN_ANALYTICDB_INSTANCE_ID": { + {ExtensionName: extensionNameAliyunAnalyticdbVectorStorage, Property: "adbpg_instance_id"}, + }, + "ALIYUN_ANALYTICDB_INSTANCE_REGION": { + {ExtensionName: extensionNameAliyunAnalyticdbVectorStorage, Property: "adbpg_instance_region"}, + }, + "ALIYUN_ANALYTICDB_NAMESPACE": { + {ExtensionName: extensionNameAliyunAnalyticdbVectorStorage, Property: "adbpg_namespace"}, + }, + "ALIYUN_ANALYTICDB_NAMESPACE_PASSWORD": { + {ExtensionName: extensionNameAliyunAnalyticdbVectorStorage, Property: "adbpg_namespace_password"}, + }, + "ALIYUN_TEXT_EMBEDDING_API_KEY": { + {ExtensionName: extensionNameAliyunTextEmbedding, Property: "api_key"}, + }, "AWS_ACCESS_KEY_ID": { {ExtensionName: extensionNameBedrockLLM, Property: "access_key"}, {ExtensionName: extensionNamePollyTTS, Property: "access_key"}, @@ -99,6 +129,12 @@ var ( }, } + // The corresponding graph name based on the language + graphNameMap = map[string]string{ + languageChinese: os.Getenv("GRAPH_NAME_ZH"), + languageEnglish: os.Getenv("GRAPH_NAME_EN"), + } + // Retrieve parameters from the request and map them to the property.json file startPropMap = map[string][]Prop{ "AgoraAsrLanguage": { diff --git a/server/internal/http_server.go b/server/internal/http_server.go index 7cbf158d..75d6fb8c 100644 --- a/server/internal/http_server.go +++ b/server/internal/http_server.go @@ -1,13 +1,14 @@ /** * * Agora Real Time Engagement - * Created by Xinhui Li in 2024. + * Created by XinHui Li in 2024. * Copyright (c) 2024 Agora IO. All rights reserved. * */ package internal import ( + "encoding/json" "fmt" "log/slog" "mime/multipart" @@ -67,11 +68,17 @@ type GenerateTokenReq struct { Uid uint32 `json:"uid,omitempty"` } -type UploadReq struct { +type VectorDocumentUpdate struct { + RequestId string `json:"request_id,omitempty"` + ChannelName string `json:"channel_name,omitempty"` + Collection string `json:"collection,omitempty"` + FileName string `json:"file_name,omitempty"` +} + +type VectorDocumentUpload struct { RequestId string `form:"request_id,omitempty" json:"request_id,omitempty"` ChannelName string `form:"channel_name,omitempty" json:"channel_name,omitempty"` File *multipart.FileHeader `form:"file" binding:"required"` - Uid uint32 `form:"uid,omitempty" json:"uid,omitempty"` } func NewHttpServer(httpServerConfig *HttpServerConfig) *HttpServer { @@ -235,50 +242,111 @@ func (s *HttpServer) handlerGenerateToken(c *gin.Context) { s.output(c, codeSuccess, map[string]any{"appId": s.config.AppId, "token": token, "channel_name": req.ChannelName, "uid": req.Uid}) } -func (s *HttpServer) handlerUpload(c *gin.Context) { - var req UploadReq +func (s *HttpServer) handlerVectorDocumentPresetList(c *gin.Context) { + presetList := []map[string]any{} + vectorDocumentPresetList := os.Getenv("VECTOR_DOCUMENT_PRESET_LIST") + + if vectorDocumentPresetList != "" { + err := json.Unmarshal([]byte(vectorDocumentPresetList), &presetList) + if err != nil { + slog.Error("handlerVectorDocumentPresetList parse json failed", "err", err, logTag) + s.output(c, codeErrParseJsonFailed, http.StatusBadRequest) + return + } + } + + s.output(c, codeSuccess, presetList) +} + +func (s *HttpServer) handlerVectorDocumentUpdate(c *gin.Context) { + var req VectorDocumentUpdate if err := c.ShouldBind(&req); err != nil { - slog.Error("handlerUpload params invalid", "err", err, "channelName", req.ChannelName, "requestId", req.RequestId, logTag) + slog.Error("handlerVectorDocumentUpdate params invalid", "err", err, "channelName", req.ChannelName, "requestId", req.RequestId, logTag) s.output(c, codeErrParamsInvalid, http.StatusBadRequest) return } if !workers.Contains(req.ChannelName) { - slog.Error("handlerUpload channel not existed", "channelName", req.ChannelName, "requestId", req.RequestId, logTag) + slog.Error("handlerVectorDocumentUpdate channel not existed", "channelName", req.ChannelName, "requestId", req.RequestId, logTag) s.output(c, codeErrChannelNotExisted, http.StatusBadRequest) return } - slog.Info("handlerUpload start", "channelName", req.ChannelName, "requestId", req.RequestId, logTag) + slog.Info("handlerVectorDocumentUpdate start", "channelName", req.ChannelName, "requestId", req.RequestId, logTag) + + // update worker + worker := workers.Get(req.ChannelName).(*Worker) + err := worker.update(&WorkerUpdateReq{ + RequestId: req.RequestId, + ChannelName: req.ChannelName, + Collection: req.Collection, + FileName: req.FileName, + Rte: &WorkerUpdateReqRte{ + Name: "update_querying_collection", + Type: "cmd", + }, + }) + if err != nil { + slog.Error("handlerVectorDocumentUpdate update worker failed", "err", err, "channelName", req.ChannelName, "Collection", req.Collection, "FileName", req.FileName, "requestId", req.RequestId, logTag) + s.output(c, codeErrUpdateWorkerFailed, http.StatusBadRequest) + return + } + + slog.Info("handlerVectorDocumentUpdate end", "channelName", req.ChannelName, "Collection", req.Collection, "FileName", req.FileName, "requestId", req.RequestId, logTag) + s.output(c, codeSuccess, map[string]any{"channel_name": req.ChannelName}) +} + +func (s *HttpServer) handlerVectorDocumentUpload(c *gin.Context) { + var req VectorDocumentUpload + + if err := c.ShouldBind(&req); err != nil { + slog.Error("handlerVectorDocumentUpload params invalid", "err", err, "channelName", req.ChannelName, "requestId", req.RequestId, logTag) + s.output(c, codeErrParamsInvalid, http.StatusBadRequest) + return + } + + if !workers.Contains(req.ChannelName) { + slog.Error("handlerVectorDocumentUpload channel not existed", "channelName", req.ChannelName, "requestId", req.RequestId, logTag) + s.output(c, codeErrChannelNotExisted, http.StatusBadRequest) + return + } + + slog.Info("handlerVectorDocumentUpload start", "channelName", req.ChannelName, "requestId", req.RequestId, logTag) file := req.File uploadFile := fmt.Sprintf("%s/file-%s-%d%s", s.config.LogPath, gmd5.MustEncryptString(req.ChannelName), time.Now().UnixNano(), filepath.Ext(file.Filename)) if err := c.SaveUploadedFile(file, uploadFile); err != nil { - slog.Error("handlerUpload save file failed", "err", err, "channelName", req.ChannelName, "requestId", req.RequestId, logTag) + slog.Error("handlerVectorDocumentUpload save file failed", "err", err, "channelName", req.ChannelName, "requestId", req.RequestId, logTag) s.output(c, codeErrSaveFileFailed, http.StatusBadRequest) return } + // Generate collection + collection := fmt.Sprintf("%s_%d", gmd5.MustEncryptString(req.ChannelName), time.Now().UnixNano()) + fileName := filepath.Base(file.Filename) + // update worker worker := workers.Get(req.ChannelName).(*Worker) err := worker.update(&WorkerUpdateReq{ RequestId: req.RequestId, ChannelName: req.ChannelName, + Collection: collection, + FileName: fileName, Path: uploadFile, Rte: &WorkerUpdateReqRte{ - Name: "file_download", + Name: "file_chunk", Type: "cmd", }, }) if err != nil { - slog.Error("handlerUpload update worker failed", "err", err, "channelName", req.ChannelName, "requestId", req.RequestId, logTag) + slog.Error("handlerVectorDocumentUpload update worker failed", "err", err, "channelName", req.ChannelName, "requestId", req.RequestId, logTag) s.output(c, codeErrUpdateWorkerFailed, http.StatusBadRequest) return } - slog.Info("handlerUpload end", "channelName", req.ChannelName, "uploadFile", uploadFile, "requestId", req.RequestId, logTag) - s.output(c, codeSuccess, map[string]any{"channel_name": req.ChannelName, "uid": req.Uid}) + slog.Info("handlerVectorDocumentUpload end", "channelName", req.ChannelName, "collection", collection, "uploadFile", uploadFile, "requestId", req.RequestId, logTag) + s.output(c, codeSuccess, map[string]any{"channel_name": req.ChannelName, "collection": collection, "file_name": fileName}) } func (s *HttpServer) output(c *gin.Context, code *Code, data any, httpStatus ...int) { @@ -301,7 +369,7 @@ func (s *HttpServer) processProperty(req *StartReq) (propertyJsonFile string, lo // Get graph name graphName := req.GraphName if graphName == "" { - graphName = graphNameDefault + graphName = graphNameMap[req.AgoraAsrLanguage] } // Generate token @@ -349,7 +417,9 @@ func (s *HttpServer) Start() { r.POST("/start", s.handlerStart) r.POST("/stop", s.handlerStop) r.POST("/token/generate", s.handlerGenerateToken) - r.POST("/upload", s.handlerUpload) + r.GET("/vector/document/preset/list", s.handlerVectorDocumentPresetList) + r.POST("/vector/document/update", s.handlerVectorDocumentUpdate) + r.POST("/vector/document/upload", s.handlerVectorDocumentUpload) slog.Info("server start", "port", s.config.Port, logTag) diff --git a/server/internal/worker.go b/server/internal/worker.go index c87e6981..583e5aa7 100644 --- a/server/internal/worker.go +++ b/server/internal/worker.go @@ -30,6 +30,8 @@ type Worker struct { type WorkerUpdateReq struct { RequestId string `form:"request_id,omitempty" json:"request_id,omitempty"` ChannelName string `form:"channel_name,omitempty" json:"channel_name,omitempty"` + Collection string `form:"collection,omitempty" json:"collection,omitempty"` + FileName string `form:"filename,omitempty" json:"filename,omitempty"` Path string `form:"path,omitempty" json:"path,omitempty"` Rte *WorkerUpdateReqRte `form:"rte,omitempty" json:"rte,omitempty"` } diff --git a/server/main.go b/server/main.go index 442e6506..d590b89d 100644 --- a/server/main.go +++ b/server/main.go @@ -20,6 +20,15 @@ func main() { slog.Warn("load .env file failed", "err", err) } + // Check if the directory exists + logPath := os.Getenv("LOG_PATH") + if _, err := os.Stat(logPath); os.IsNotExist(err) { + if err := os.MkdirAll(logPath, os.ModePerm); err != nil { + slog.Error("create log directory failed", "err", err) + os.Exit(1) + } + } + // Check environment agoraAppId := os.Getenv("AGORA_APP_ID") if len(agoraAppId) != 32 { @@ -49,7 +58,7 @@ func main() { httpServerConfig := &internal.HttpServerConfig{ AppId: agoraAppId, AppCertificate: os.Getenv("AGORA_APP_CERTIFICATE"), - LogPath: os.Getenv("LOG_PATH"), + LogPath: logPath, Port: os.Getenv("SERVER_PORT"), WorkersMax: workersMax, WorkerQuitTimeoutSeconds: workerQuitTimeoutSeconds,