Skip to content

Commit

Permalink
chore(): add api for /vector/document (#193)
Browse files Browse the repository at this point in the history
* chore(): add api for /vector/document

* fix(): fix mapping
  • Loading branch information
sunshinexcode authored Aug 12, 2024
1 parent bd29cfa commit f175b8c
Show file tree
Hide file tree
Showing 7 changed files with 171 additions and 37 deletions.
18 changes: 17 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@
# ------------------------------

# Log path
LOG_PATH=/tmp
LOG_PATH=/tmp/astra
# Graph designer server port
GRAPH_DESIGNER_SERVER_PORT=49483
# The corresponding graph name based on the language
GRAPH_NAME_ZH=va.openai.azure
GRAPH_NAME_EN=va.openai.azure
# Server port
SERVER_PORT=8080
# Maximum number of workers
Expand All @@ -26,6 +29,19 @@ AGORA_APP_CERTIFICATE=
# Worker Configuration
# ------------------------------

# Extension: aliyun_analyticdb_vector_storage
ALIBABA_CLOUD_ACCESS_KEY_ID=
ALIBABA_CLOUD_ACCESS_KEY_SECRET=
ALIYUN_ANALYTICDB_ACCOUNT=
ALIYUN_ANALYTICDB_ACCOUNT_PASSWORD=
ALIYUN_ANALYTICDB_INSTANCE_ID=
ALIYUN_ANALYTICDB_INSTANCE_REGION=cn-shanghai
ALIYUN_ANALYTICDB_NAMESPACE=
ALIYUN_ANALYTICDB_NAMESPACE_PASSWORD=

# Extension: aliyun_text_embedding
ALIYUN_TEXT_EMBEDDING_API_KEY=

# Extension: bedrock_llm
# Extension: polly_tts
AWS_ACCESS_KEY_ID=
Expand Down
14 changes: 7 additions & 7 deletions agents/property.json.example
Original file line number Diff line number Diff line change
Expand Up @@ -1465,7 +1465,7 @@
"type": "extension",
"extension_group": "embedding",
"addon": "aliyun_text_embedding",
"name": "embedding",
"name": "aliyun_text_embedding",
"property": {
"api_key": "<aliyun_text_embedding_api_key>",
"model": "text-embedding-v3"
Expand All @@ -1475,7 +1475,7 @@
"type": "extension",
"extension_group": "vector_storage",
"addon": "aliyun_analyticdb_vector_storage",
"name": "vector_storage",
"name": "aliyun_analyticdb_vector_storage",
"property": {
"alibaba_cloud_access_key_id": "<alibaba_cloud_access_key_id>",
"alibaba_cloud_access_key_secret": "<alibaba_cloud_access_key_secret>",
Expand Down Expand Up @@ -1617,7 +1617,7 @@
"dest": [
{
"extension_group": "embedding",
"extension": "embedding"
"extension": "aliyun_text_embedding"
}
]
},
Expand All @@ -1626,7 +1626,7 @@
"dest": [
{
"extension_group": "vector_storage",
"extension": "vector_storage"
"extension": "aliyun_analyticdb_vector_storage"
}
]
}
Expand Down Expand Up @@ -1701,7 +1701,7 @@
"dest": [
{
"extension_group": "embedding",
"extension": "embedding"
"extension": "aliyun_text_embedding"
}
]
},
Expand All @@ -1710,7 +1710,7 @@
"dest": [
{
"extension_group": "vector_storage",
"extension": "vector_storage"
"extension": "aliyun_analyticdb_vector_storage"
}
]
},
Expand All @@ -1719,7 +1719,7 @@
"dest": [
{
"extension_group": "vector_storage",
"extension": "vector_storage"
"extension": "aliyun_analyticdb_vector_storage"
}
]
},
Expand Down
1 change: 1 addition & 0 deletions server/internal/code.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ var (
codeErrChannelEmpty = NewCode("10004", "channel empty")
codeErrGenerateTokenFailed = NewCode("10005", "generate token failed")
codeErrSaveFileFailed = NewCode("10006", "save file failed")
codeErrParseJsonFailed = NewCode("10007", "parse json failed")

codeErrProcessPropertyFailed = NewCode("10100", "process property json failed")
codeErrStartWorkerFailed = NewCode("10101", "start worker failed")
Expand Down
62 changes: 49 additions & 13 deletions server/internal/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package internal

import "log/slog"
import (
"log/slog"
"os"
)

type Prop struct {
ExtensionName string
Expand All @@ -9,22 +12,22 @@ type Prop struct {

const (
// Extension name
extensionNameAgoraRTC = "agora_rtc"
extensionNameAzureTTS = "azure_tts"
extensionNameBedrockLLM = "bedrock_llm"
extensionNameCosyTTS = "cosy_tts"
extensionNameElevenlabsTTS = "elevenlabs_tts"
extensionNameHttpServer = "http_server"
extensionNameLiteLLM = "litellm"
extensionNameOpenaiChatgpt = "openai_chatgpt"
extensionNamePollyTTS = "polly_tts"
extensionNameQwenLLM = "qwen_llm"
extensionNameAgoraRTC = "agora_rtc"
extensionNameAliyunAnalyticdbVectorStorage = "aliyun_analyticdb_vector_storage"
extensionNameAliyunTextEmbedding = "aliyun_text_embedding"
extensionNameAzureTTS = "azure_tts"
extensionNameBedrockLLM = "bedrock_llm"
extensionNameCosyTTS = "cosy_tts"
extensionNameElevenlabsTTS = "elevenlabs_tts"
extensionNameHttpServer = "http_server"
extensionNameLiteLLM = "litellm"
extensionNameOpenaiChatgpt = "openai_chatgpt"
extensionNamePollyTTS = "polly_tts"
extensionNameQwenLLM = "qwen_llm"

// Language
languageChinese = "zh-CN"
languageEnglish = "en-US"
// Default graph name
graphNameDefault = "va.openai.azure"
// Property json
PropertyJsonFile = "./agents/property.json"
// Token expire time
Expand All @@ -42,6 +45,33 @@ var (
"AGORA_APP_ID": {
{ExtensionName: extensionNameAgoraRTC, Property: "app_id"},
},
"ALIBABA_CLOUD_ACCESS_KEY_ID": {
{ExtensionName: extensionNameAliyunAnalyticdbVectorStorage, Property: "alibaba_cloud_access_key_id"},
},
"ALIBABA_CLOUD_ACCESS_KEY_SECRET": {
{ExtensionName: extensionNameAliyunAnalyticdbVectorStorage, Property: "alibaba_cloud_access_key_secret"},
},
"ALIYUN_ANALYTICDB_ACCOUNT": {
{ExtensionName: extensionNameAliyunAnalyticdbVectorStorage, Property: "adbpg_account"},
},
"ALIYUN_ANALYTICDB_ACCOUNT_PASSWORD": {
{ExtensionName: extensionNameAliyunAnalyticdbVectorStorage, Property: "adbpg_account_password"},
},
"ALIYUN_ANALYTICDB_INSTANCE_ID": {
{ExtensionName: extensionNameAliyunAnalyticdbVectorStorage, Property: "adbpg_instance_id"},
},
"ALIYUN_ANALYTICDB_INSTANCE_REGION": {
{ExtensionName: extensionNameAliyunAnalyticdbVectorStorage, Property: "adbpg_instance_region"},
},
"ALIYUN_ANALYTICDB_NAMESPACE": {
{ExtensionName: extensionNameAliyunAnalyticdbVectorStorage, Property: "adbpg_namespace"},
},
"ALIYUN_ANALYTICDB_NAMESPACE_PASSWORD": {
{ExtensionName: extensionNameAliyunAnalyticdbVectorStorage, Property: "adbpg_namespace_password"},
},
"ALIYUN_TEXT_EMBEDDING_API_KEY": {
{ExtensionName: extensionNameAliyunTextEmbedding, Property: "api_key"},
},
"AWS_ACCESS_KEY_ID": {
{ExtensionName: extensionNameBedrockLLM, Property: "access_key"},
{ExtensionName: extensionNamePollyTTS, Property: "access_key"},
Expand Down Expand Up @@ -99,6 +129,12 @@ var (
},
}

// The corresponding graph name based on the language
graphNameMap = map[string]string{
languageChinese: os.Getenv("GRAPH_NAME_ZH"),
languageEnglish: os.Getenv("GRAPH_NAME_EN"),
}

// Retrieve parameters from the request and map them to the property.json file
startPropMap = map[string][]Prop{
"AgoraAsrLanguage": {
Expand Down
100 changes: 85 additions & 15 deletions server/internal/http_server.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
/**
*
* Agora Real Time Engagement
* Created by Xinhui Li in 2024.
* Created by XinHui Li in 2024.
* Copyright (c) 2024 Agora IO. All rights reserved.
*
*/
package internal

import (
"encoding/json"
"fmt"
"log/slog"
"mime/multipart"
Expand Down Expand Up @@ -67,11 +68,17 @@ type GenerateTokenReq struct {
Uid uint32 `json:"uid,omitempty"`
}

type UploadReq struct {
type VectorDocumentUpdate struct {
RequestId string `json:"request_id,omitempty"`
ChannelName string `json:"channel_name,omitempty"`
Collection string `json:"collection,omitempty"`
FileName string `json:"file_name,omitempty"`
}

type VectorDocumentUpload struct {
RequestId string `form:"request_id,omitempty" json:"request_id,omitempty"`
ChannelName string `form:"channel_name,omitempty" json:"channel_name,omitempty"`
File *multipart.FileHeader `form:"file" binding:"required"`
Uid uint32 `form:"uid,omitempty" json:"uid,omitempty"`
}

func NewHttpServer(httpServerConfig *HttpServerConfig) *HttpServer {
Expand Down Expand Up @@ -235,50 +242,111 @@ func (s *HttpServer) handlerGenerateToken(c *gin.Context) {
s.output(c, codeSuccess, map[string]any{"appId": s.config.AppId, "token": token, "channel_name": req.ChannelName, "uid": req.Uid})
}

func (s *HttpServer) handlerUpload(c *gin.Context) {
var req UploadReq
func (s *HttpServer) handlerVectorDocumentPresetList(c *gin.Context) {
presetList := []map[string]any{}
vectorDocumentPresetList := os.Getenv("VECTOR_DOCUMENT_PRESET_LIST")

if vectorDocumentPresetList != "" {
err := json.Unmarshal([]byte(vectorDocumentPresetList), &presetList)
if err != nil {
slog.Error("handlerVectorDocumentPresetList parse json failed", "err", err, logTag)
s.output(c, codeErrParseJsonFailed, http.StatusBadRequest)
return
}
}

s.output(c, codeSuccess, presetList)
}

func (s *HttpServer) handlerVectorDocumentUpdate(c *gin.Context) {
var req VectorDocumentUpdate

if err := c.ShouldBind(&req); err != nil {
slog.Error("handlerUpload params invalid", "err", err, "channelName", req.ChannelName, "requestId", req.RequestId, logTag)
slog.Error("handlerVectorDocumentUpdate params invalid", "err", err, "channelName", req.ChannelName, "requestId", req.RequestId, logTag)
s.output(c, codeErrParamsInvalid, http.StatusBadRequest)
return
}

if !workers.Contains(req.ChannelName) {
slog.Error("handlerUpload channel not existed", "channelName", req.ChannelName, "requestId", req.RequestId, logTag)
slog.Error("handlerVectorDocumentUpdate channel not existed", "channelName", req.ChannelName, "requestId", req.RequestId, logTag)
s.output(c, codeErrChannelNotExisted, http.StatusBadRequest)
return
}

slog.Info("handlerUpload start", "channelName", req.ChannelName, "requestId", req.RequestId, logTag)
slog.Info("handlerVectorDocumentUpdate start", "channelName", req.ChannelName, "requestId", req.RequestId, logTag)

// update worker
worker := workers.Get(req.ChannelName).(*Worker)
err := worker.update(&WorkerUpdateReq{
RequestId: req.RequestId,
ChannelName: req.ChannelName,
Collection: req.Collection,
FileName: req.FileName,
Rte: &WorkerUpdateReqRte{
Name: "update_querying_collection",
Type: "cmd",
},
})
if err != nil {
slog.Error("handlerVectorDocumentUpdate update worker failed", "err", err, "channelName", req.ChannelName, "Collection", req.Collection, "FileName", req.FileName, "requestId", req.RequestId, logTag)
s.output(c, codeErrUpdateWorkerFailed, http.StatusBadRequest)
return
}

slog.Info("handlerVectorDocumentUpdate end", "channelName", req.ChannelName, "Collection", req.Collection, "FileName", req.FileName, "requestId", req.RequestId, logTag)
s.output(c, codeSuccess, map[string]any{"channel_name": req.ChannelName})
}

func (s *HttpServer) handlerVectorDocumentUpload(c *gin.Context) {
var req VectorDocumentUpload

if err := c.ShouldBind(&req); err != nil {
slog.Error("handlerVectorDocumentUpload params invalid", "err", err, "channelName", req.ChannelName, "requestId", req.RequestId, logTag)
s.output(c, codeErrParamsInvalid, http.StatusBadRequest)
return
}

if !workers.Contains(req.ChannelName) {
slog.Error("handlerVectorDocumentUpload channel not existed", "channelName", req.ChannelName, "requestId", req.RequestId, logTag)
s.output(c, codeErrChannelNotExisted, http.StatusBadRequest)
return
}

slog.Info("handlerVectorDocumentUpload start", "channelName", req.ChannelName, "requestId", req.RequestId, logTag)

file := req.File
uploadFile := fmt.Sprintf("%s/file-%s-%d%s", s.config.LogPath, gmd5.MustEncryptString(req.ChannelName), time.Now().UnixNano(), filepath.Ext(file.Filename))
if err := c.SaveUploadedFile(file, uploadFile); err != nil {
slog.Error("handlerUpload save file failed", "err", err, "channelName", req.ChannelName, "requestId", req.RequestId, logTag)
slog.Error("handlerVectorDocumentUpload save file failed", "err", err, "channelName", req.ChannelName, "requestId", req.RequestId, logTag)
s.output(c, codeErrSaveFileFailed, http.StatusBadRequest)
return
}

// Generate collection
collection := fmt.Sprintf("%s_%d", gmd5.MustEncryptString(req.ChannelName), time.Now().UnixNano())
fileName := filepath.Base(file.Filename)

// update worker
worker := workers.Get(req.ChannelName).(*Worker)
err := worker.update(&WorkerUpdateReq{
RequestId: req.RequestId,
ChannelName: req.ChannelName,
Collection: collection,
FileName: fileName,
Path: uploadFile,
Rte: &WorkerUpdateReqRte{
Name: "file_download",
Name: "file_chunk",
Type: "cmd",
},
})
if err != nil {
slog.Error("handlerUpload update worker failed", "err", err, "channelName", req.ChannelName, "requestId", req.RequestId, logTag)
slog.Error("handlerVectorDocumentUpload update worker failed", "err", err, "channelName", req.ChannelName, "requestId", req.RequestId, logTag)
s.output(c, codeErrUpdateWorkerFailed, http.StatusBadRequest)
return
}

slog.Info("handlerUpload end", "channelName", req.ChannelName, "uploadFile", uploadFile, "requestId", req.RequestId, logTag)
s.output(c, codeSuccess, map[string]any{"channel_name": req.ChannelName, "uid": req.Uid})
slog.Info("handlerVectorDocumentUpload end", "channelName", req.ChannelName, "collection", collection, "uploadFile", uploadFile, "requestId", req.RequestId, logTag)
s.output(c, codeSuccess, map[string]any{"channel_name": req.ChannelName, "collection": collection, "file_name": fileName})
}

func (s *HttpServer) output(c *gin.Context, code *Code, data any, httpStatus ...int) {
Expand All @@ -301,7 +369,7 @@ func (s *HttpServer) processProperty(req *StartReq) (propertyJsonFile string, lo
// Get graph name
graphName := req.GraphName
if graphName == "" {
graphName = graphNameDefault
graphName = graphNameMap[req.AgoraAsrLanguage]
}

// Generate token
Expand Down Expand Up @@ -349,7 +417,9 @@ func (s *HttpServer) Start() {
r.POST("/start", s.handlerStart)
r.POST("/stop", s.handlerStop)
r.POST("/token/generate", s.handlerGenerateToken)
r.POST("/upload", s.handlerUpload)
r.GET("/vector/document/preset/list", s.handlerVectorDocumentPresetList)
r.POST("/vector/document/update", s.handlerVectorDocumentUpdate)
r.POST("/vector/document/upload", s.handlerVectorDocumentUpload)

slog.Info("server start", "port", s.config.Port, logTag)

Expand Down
2 changes: 2 additions & 0 deletions server/internal/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type Worker struct {
type WorkerUpdateReq struct {
RequestId string `form:"request_id,omitempty" json:"request_id,omitempty"`
ChannelName string `form:"channel_name,omitempty" json:"channel_name,omitempty"`
Collection string `form:"collection,omitempty" json:"collection,omitempty"`
FileName string `form:"filename,omitempty" json:"filename,omitempty"`
Path string `form:"path,omitempty" json:"path,omitempty"`
Rte *WorkerUpdateReqRte `form:"rte,omitempty" json:"rte,omitempty"`
}
Expand Down
Loading

0 comments on commit f175b8c

Please sign in to comment.