diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 0482866c..ddab0fa2 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -3,29 +3,21 @@ { "name": "astra", "image": "ghcr.io/rte-design/astra_agents_build", - "customizations": { "vscode": { - "extensions": ["golang.go"] + "extensions": [ + "golang.go" + ] } }, - - "workspaceMount": "source=${localWorkspaceFolder},target=/app,type=bind", + "workspaceMount": "source=${localWorkspaceFolder},target=/app,type=bind", "workspaceFolder": "/app", - // Use 'forwardPorts' to make a list of ports inside the container available locally. - "forwardPorts": [8080], - + "forwardPorts": [ + 8080 + ], // Features to add to the dev container. More info: https://containers.dev/features. "features": { "ghcr.io/devcontainers/features/git:1": {} } - - // Uncomment the next line to run commands after the container is created. - // "postCreateCommand": "" - - // Configure tool-specific properties. - // "customizations": {}, - // Uncomment to connect as an existing user other than the container default. More info: https://aka.ms/dev-containers-non-root. - // "remoteUser": "devcontainer" } \ No newline at end of file diff --git a/.env.example b/.env.example new file mode 100644 index 00000000..71ab8930 --- /dev/null +++ b/.env.example @@ -0,0 +1,64 @@ +# ------------------------------ +# Environment Variables for server & worker +# ------------------------------ + +# ------------------------------ +# Server Configuration +# ------------------------------ + +# Log path +LOG_PATH=/tmp +# Server port +SERVER_PORT=8080 +# Maximum number of workers +WORKERS_MAX=100 +# Worker quit timeout in seconds +WORKER_QUIT_TIMEOUT_SECONDES=60 + +# Agora App ID and Agora App Certificate +# required: this variable must be set +AGORA_APP_ID= +AGORA_APP_CERTIFICATE= + +# ------------------------------ +# Worker Configuration +# ------------------------------ + +# Extension: bedrock_llm +AWS_ACCESS_KEY_ID= +AWS_SECRET_ACCESS_KEY= +AWS_BEDROCK_MODEL= +AWS_REGION= + +# Extension: agora_rtc +# Azure STT key and region +AZURE_STT_KEY= +AZURE_STT_REGION= + +# Extension: azure_tts +# Azure TTS key and region +AZURE_TTS_KEY= +AZURE_TTS_REGION= + +# Extension: cosy_tts +# Cosy TTS key +COSY_TTS_KEY= + +# Extension: elevenlabs_tts +# ElevenLabs TTS key +ELEVENLABS_TTS_KEY= + +# Extension: openai_chatgpt +# OpenAI API key +OPENAI_API_KEY= +# OpenAI base URL +# if using OpenAI, keep default. using other OpenAI-compatible providers, then set it to the other provider's address +OPENAI_BASE_URL= +# OpenAI Model +OPENAI_MODEL=gpt-4o-mini +# OpenAI proxy URL +OPENAI_PROXY_URL= + +# Extension: qwen_llm +# Qwen API key +QWEN_API_KEY= diff --git a/.gitignore b/.gitignore index b7455c79..8a1ca581 100644 --- a/.gitignore +++ b/.gitignore @@ -13,7 +13,6 @@ bin/ core crash_context_v1 .deps/ -docker-compose.yml .DS_Store .env /.gn @@ -22,9 +21,6 @@ include/ interface/ lib/ lib64 -agents/manifest.elevenlabs.json -agents/manifest.cn.json -agents/manifest.en.json node_modules/ /out/ *.pcm @@ -35,4 +31,4 @@ xdump_config speechsdk/ SpeechSDK-Linux.tar.gz pyvenv.cfg -xdump_config \ No newline at end of file +xdump_config diff --git a/Dockerfile b/Dockerfile index 621b9f6d..c3ca836c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -5,11 +5,10 @@ ARG SESSION_CONTROL_CONF=session_control.conf WORKDIR /app COPY . . -COPY agents/manifest.json.example agents/manifest.json -COPY agents/manifest.json.elevenlabs.example agents/manifest.elevenlabs.json +COPY agents/property.json.example agents/property.json COPY agents/${SESSION_CONTROL_CONF} agents/session_control.conf -RUN make build && \ +RUN make clean && make build && \ cd agents && ./scripts/package.sh FROM ubuntu:22.04 diff --git a/agents/property.json.graphs_examples b/agents/property.json.example similarity index 50% rename from agents/property.json.graphs_examples rename to agents/property.json.example index bcf2b706..a1942b63 100644 --- a/agents/property.json.graphs_examples +++ b/agents/property.json.example @@ -2,7 +2,7 @@ "rte": { "predefined_graphs": [ { - "name": "astra_agents", + "name": "astra_agents_cn", "auto_start": true, "nodes": [ { @@ -230,6 +230,231 @@ ] } ] + }, + { + "name": "astra_agents_en", + "auto_start": false, + "nodes": [ + { + "type": "extension", + "extension_group": "default", + "addon": "agora_rtc", + "name": "agora_rtc", + "property": { + "app_id": "", + "token": "", + "channel": "astra_agents_test", + "stream_id": 1234, + "remote_stream_id": 123, + "subscribe_audio": true, + "publish_audio": true, + "publish_data": true, + "enable_agora_asr": true, + "agora_asr_vendor_name": "microsoft", + "agora_asr_language": "en-US", + "agora_asr_vendor_key": "", + "agora_asr_vendor_region": "", + "agora_asr_session_control_file_path": "session_control.conf" + } + }, + { + "type": "extension", + "extension_group": "default", + "addon": "interrupt_detector", + "name": "interrupt_detector" + }, + { + "type": "extension", + "extension_group": "chatgpt", + "addon": "openai_chatgpt", + "name": "openai_chatgpt", + "property": { + "base_url": "", + "api_key": "", + "frequency_penalty": 0.9, + "model": "gpt-3.5-turbo", + "max_tokens": 512, + "prompt": "", + "proxy_url": "", + "greeting": "ASTRA agent connected. How can i help you today?", + "max_memory_length": 10 + } + }, + { + "type": "extension", + "extension_group": "tts", + "addon": "elevenlabs_tts", + "name": "elevenlabs_tts", + "property": { + "api_key": "", + "model_id": "eleven_multilingual_v2", + "optimize_streaming_latency": 0, + "request_timeout_seconds": 30, + "similarity_boost": 0.75, + "speaker_boost": false, + "stability": 0.5, + "style": 0.0, + "voice_id": "pNInz6obpgDQGcFmaJgB" + } + }, + { + "type": "extension", + "extension_group": "transcriber", + "addon": "chat_transcriber", + "name": "chat_transcriber" + }, + { + "type": "extension_group", + "addon": "default_extension_group", + "name": "default" + }, + { + "type": "extension_group", + "addon": "default_extension_group", + "name": "chatgpt" + }, + { + "type": "extension_group", + "addon": "default_extension_group", + "name": "tts" + }, + { + "type": "extension_group", + "addon": "default_extension_group", + "name": "transcriber" + } + ], + "connections": [ + { + "extension_group": "default", + "extension": "agora_rtc", + "data": [ + { + "name": "text_data", + "dest": [ + { + "extension_group": "default", + "extension": "interrupt_detector" + }, + { + "extension_group": "chatgpt", + "extension": "openai_chatgpt" + }, + { + "extension_group": "transcriber", + "extension": "chat_transcriber" + } + ] + } + ] + }, + { + "extension_group": "chatgpt", + "extension": "openai_chatgpt", + "data": [ + { + "name": "text_data", + "dest": [ + { + "extension_group": "tts", + "extension": "elevenlabs_tts" + }, + { + "extension_group": "transcriber", + "extension": "chat_transcriber", + "cmd_conversions": [ + { + "cmd": { + "type": "per_property", + "keep_original": true, + "rules": [ + { + "path": "is_final", + "type": "fixed_value", + "value": "bool(true)" + }, + { + "path": "stream_id", + "type": "fixed_value", + "value": "uint32(999)" + } + ] + } + } + ] + } + ] + } + ], + "cmd": [ + { + "name": "flush", + "dest": [ + { + "extension_group": "tts", + "extension": "elevenlabs_tts" + } + ] + } + ] + }, + { + "extension_group": "tts", + "extension": "elevenlabs_tts", + "pcm_frame": [ + { + "name": "pcm_frame", + "dest": [ + { + "extension_group": "default", + "extension": "agora_rtc" + } + ] + } + ], + "cmd": [ + { + "name": "flush", + "dest": [ + { + "extension_group": "default", + "extension": "agora_rtc" + } + ] + } + ] + }, + { + "extension_group": "transcriber", + "extension": "chat_transcriber", + "data": [ + { + "name": "data", + "dest": [ + { + "extension_group": "default", + "extension": "agora_rtc" + } + ] + } + ] + }, + { + "extension_group": "default", + "extension": "interrupt_detector", + "cmd": [ + { + "name": "flush", + "dest": [ + { + "extension_group": "chatgpt", + "extension": "openai_chatgpt" + } + ] + } + ] + } + ] } ] } diff --git a/agents/scripts/package.sh b/agents/scripts/package.sh index 4c3ba1d0..eedb734d 100755 --- a/agents/scripts/package.sh +++ b/agents/scripts/package.sh @@ -1,6 +1,9 @@ #!/usr/bin/env bash -APP_HOME=$(cd $(dirname $0)/..; pwd) +APP_HOME=$( + cd $(dirname $0)/.. + pwd +) cd $APP_HOME @@ -8,49 +11,46 @@ rm -rf .release mkdir .release copy_extension() { - local extension=$1 - mkdir -p .release/addon/extension/$extension + local extension=$1 + mkdir -p .release/addon/extension/$extension - if [[ -d addon/extension/$extension/lib ]]; then - cp -r addon/extension/$extension/lib .release/addon/extension/$extension/ - fi + if [[ -d addon/extension/$extension/lib ]]; then + cp -r addon/extension/$extension/lib .release/addon/extension/$extension/ + fi + + if [[ -f addon/extension/$extension/manifest.json ]]; then + cp addon/extension/$extension/manifest.json .release/addon/extension/$extension/ - if [[ -f addon/extension/$extension/manifest.json ]]; then - cp addon/extension/$extension/manifest.json .release/addon/extension/$extension/ + # package .py for python extensions + EXTENSION_LANGUAGE=$(jq -r '.language' addon/extension/$extension/manifest.json) + if [[ $EXTENSION_LANGUAGE == "python" ]]; then + # TODO: package 'publish' contents only + cp addon/extension/$extension/*.py .release/addon/extension/$extension/ + if [[ -f addon/extension/$extension/requirements.txt ]]; then + cp addon/extension/$extension/requirements.txt .release/addon/extension/$extension/ + fi - # package .py for python extensions - EXTENSION_LANGUAGE=$(jq -r '.language' addon/extension/$extension/manifest.json) - if [[ $EXTENSION_LANGUAGE == "python" ]]; then - # TODO: package 'publish' contents only - cp addon/extension/$extension/*.py .release/addon/extension/$extension/ - if [[ -f addon/extension/$extension/requirements.txt ]]; then - cp addon/extension/$extension/requirements.txt .release/addon/extension/$extension/ - fi - - # TODO: copy specific contents - if [[ -d addon/extension/$extension/pb ]]; then - cp -r addon/extension/$extension/pb .release/addon/extension/$extension/ - fi + # TODO: copy specific contents + if [[ -d addon/extension/$extension/pb ]]; then + cp -r addon/extension/$extension/pb .release/addon/extension/$extension/ + fi + fi fi - fi - if [[ -f addon/extension/$extension/property.json ]]; then - cp addon/extension/$extension/property.json .release/addon/extension/$extension/ - fi + if [[ -f addon/extension/$extension/property.json ]]; then + cp addon/extension/$extension/property.json .release/addon/extension/$extension/ + fi } cp -r bin .release cp -r lib .release cp manifest.json .release -#cp manifest.elevenlabs.json .release -cp manifest.cn.json .release -cp manifest.en.json .release cp property.json .release # python deps if [[ -d interface/rte ]]; then - mkdir -p .release/interface - cp -r interface/rte .release/interface + mkdir -p .release/interface + cp -r interface/rte .release/interface fi # extension group @@ -59,12 +59,11 @@ cp -r addon/extension_group .release/addon/ # extensions mkdir -p .release/addon/extension -for extension in addon/extension/* -do - extension_name=$(basename $extension) - copy_extension $extension_name +for extension in addon/extension/*; do + extension_name=$(basename $extension) + copy_extension $extension_name done if [[ -f session_control.conf ]]; then - cp -r session_control.conf .release/ + cp -r session_control.conf .release/ fi diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 00000000..b20a1f16 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,40 @@ +version: "3.8" + +services: + astra_agents_dev: + image: ghcr.io/rte-design/astra_agents_build:latest + container_name: astra_agents_dev + platform: linux/amd64 + tty: true + stdin_open: true + restart: always + ports: + - "${SERVER_PORT}:${SERVER_PORT}" + volumes: + - ./:/app + - ${LOG_PATH}:${LOG_PATH} + working_dir: /app + environment: + AGORA_APP_ID: ${AGORA_APP_ID} + AGORA_APP_CERTIFICATE: ${AGORA_APP_CERTIFICATE} + AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID} + AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY} + AWS_BEDROCK_MODEL: ${AWS_BEDROCK_MODEL} + AWS_REGION: ${AWS_REGION} + AZURE_STT_KEY: ${AZURE_STT_KEY} + AZURE_STT_REGION: ${AZURE_STT_REGION} + AZURE_TTS_KEY: ${AZURE_TTS_KEY} + AZURE_TTS_REGION: ${AZURE_TTS_REGION} + COSY_TTS_KEY: ${COSY_TTS_KEY} + ELEVENLABS_TTS_KEY: ${ELEVENLABS_TTS_KEY} + OPENAI_API_KEY: ${OPENAI_API_KEY} + OPENAI_BASE_URL: ${OPENAI_BASE_URL} + OPENAI_MODEL: ${OPENAI_MODEL} + OPENAI_PROXY_URL: ${OPENAI_PROXY_URL} + QWEN_API_KEY: ${QWEN_API_KEY} + astra_playground: + image: agoraio/astra_playground:latest + container_name: astra_playground + restart: always + ports: + - "3000:3000" diff --git a/docker-compose.yml.example b/docker-compose.yml.example deleted file mode 100644 index d548cffb..00000000 --- a/docker-compose.yml.example +++ /dev/null @@ -1,34 +0,0 @@ -version: "3.8" - -services: - astra_agents_server: - image: agoraio/astra_agents_server:latest - container_name: astra_agents_server - restart: always - ports: - - "8080:8080" - volumes: - - /tmp:/tmp - environment: - # Agora App ID and Agora App Certificate - AGORA_APP_ID: - AGORA_APP_CERTIFICATE: - # Azure STT key and region - AZURE_STT_KEY: - AZURE_STT_REGION: - # OpenAI API key - OPENAI_API_KEY: - # TTS vendor: azure/elevenlabs - TTS_VENDOR_CHINESE: azure - TTS_VENDOR_ENGLISH: azure - # If you choose azure, you need to provide the following Azure STT key and region - AZURE_TTS_KEY: - AZURE_TTS_REGION: - # If you choose elevenlabs, you need to provide the following Elevenlabs TTS key - ELEVENLABS_TTS_KEY: - astra_playground: - image: agoraio/astra_playground:latest - container_name: astra_playground - restart: always - ports: - - "3000:3000" diff --git a/server/go.mod b/server/go.mod index 088a6b4b..c2324acc 100644 --- a/server/go.mod +++ b/server/go.mod @@ -7,6 +7,7 @@ require ( github.com/gin-gonic/gin v1.9.1 github.com/gogf/gf v1.16.9 github.com/google/uuid v1.6.0 + github.com/joho/godotenv v1.5.1 github.com/tidwall/gjson v1.17.1 github.com/tidwall/sjson v1.2.5 ) diff --git a/server/go.sum b/server/go.sum index 55cfe725..5be9e17a 100644 --- a/server/go.sum +++ b/server/go.sum @@ -59,6 +59,8 @@ github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWm github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grokify/html-strip-tags-go v0.0.1 h1:0fThFwLbW7P/kOiTBs03FsJSV9RM2M/Q/MOnCQxKMo0= github.com/grokify/html-strip-tags-go v0.0.1/go.mod h1:2Su6romC5/1VXOQMaWL2yb618ARB8iVo6/DR99A6d78= +github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= +github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= diff --git a/server/internal/code.go b/server/internal/code.go index e3f56e42..dfb6f6ca 100644 --- a/server/internal/code.go +++ b/server/internal/code.go @@ -16,7 +16,7 @@ var ( codeErrChannelEmpty = NewCode("10004", "channel empty") codeErrGenerateTokenFailed = NewCode("10005", "generate token failed") - codeErrProcessManifestFailed = NewCode("10100", "process manifest json failed") + codeErrProcessPropertyFailed = NewCode("10100", "process property json failed") codeErrStartWorkerFailed = NewCode("10101", "start worker failed") codeErrStopWorkerFailed = NewCode("10102", "stop worker failed") ) diff --git a/server/internal/config.go b/server/internal/config.go new file mode 100644 index 00000000..ea14f8de --- /dev/null +++ b/server/internal/config.go @@ -0,0 +1,131 @@ +package internal + +import "log/slog" + +type Prop struct { + ExtensionName string + Property string +} + +const ( + // Extension name + extensionNameAgoraRTC = "agora_rtc" + extensionNameBedrockLLM = "bedrock_llm" + extensionNameAzureTTS = "azure_tts" + extensionNameCosyTTS = "cosy_tts" + extensionNameElevenlabsTTS = "elevenlabs_tts" + extensionNameOpenaiChatgpt = "openai_chatgpt" + extensionNameQwenLLM = "qwen_llm" + + // Language + languageChinese = "zh-CN" + languageEnglish = "en-US" + // Default graph name + graphNameDefault = "astra_agents_cn" + // Property json + PropertyJsonFile = "./agents/property.json" + // Token expire time + tokenExpirationInSeconds = uint32(86400) + // Voice type + voiceTypeMale = "male" + voiceTypeFemale = "female" +) + +var ( + logTag = slog.String("service", "HTTP_SERVER") + + // Retrieve configuration information from environment variables and map it to the property.json file + EnvPropMap = map[string][]Prop{ + "AGORA_APP_ID": { + {ExtensionName: extensionNameAgoraRTC, Property: "app_id"}, + }, + "AWS_ACCESS_KEY_ID": { + {ExtensionName: extensionNameBedrockLLM, Property: "access_key"}, + }, + "AWS_SECRET_ACCESS_KEY": { + {ExtensionName: extensionNameBedrockLLM, Property: "secret_key"}, + }, + "AWS_BEDROCK_MODEL": { + {ExtensionName: extensionNameBedrockLLM, Property: "model"}, + }, + "AWS_REGION": { + {ExtensionName: extensionNameBedrockLLM, Property: "region"}, + }, + "AZURE_STT_KEY": { + {ExtensionName: extensionNameAgoraRTC, Property: "agora_asr_vendor_key"}, + }, + "AZURE_STT_REGION": { + {ExtensionName: extensionNameAgoraRTC, Property: "agora_asr_vendor_region"}, + }, + "AZURE_TTS_KEY": { + {ExtensionName: extensionNameAzureTTS, Property: "azure_subscription_key"}, + }, + "AZURE_TTS_REGION": { + {ExtensionName: extensionNameAzureTTS, Property: "azure_subscription_region"}}, + "COSY_TTS_KEY": { + {ExtensionName: extensionNameCosyTTS, Property: "api_key"}, + }, + "ELEVENLABS_TTS_KEY": { + {ExtensionName: extensionNameElevenlabsTTS, Property: "api_key"}, + }, + "OPENAI_API_KEY": { + {ExtensionName: extensionNameOpenaiChatgpt, Property: "api_key"}, + }, + "OPENAI_BASE_URL": { + {ExtensionName: extensionNameOpenaiChatgpt, Property: "base_url"}, + }, + "OPENAI_MODEL": { + {ExtensionName: extensionNameOpenaiChatgpt, Property: "model"}, + }, + "OPENAI_PROXY_URL": { + {ExtensionName: extensionNameOpenaiChatgpt, Property: "proxy_url"}, + }, + "QWEN_API_KEY": { + {ExtensionName: extensionNameQwenLLM, Property: "api_key"}, + }, + } + + // Retrieve parameters from the request and map them to the property.json file + startPropMap = map[string][]Prop{ + "AgoraAsrLanguage": { + {ExtensionName: extensionNameAgoraRTC, Property: "agora_asr_language"}, + }, + "ChannelName": { + {ExtensionName: extensionNameAgoraRTC, Property: "channel"}, + }, + "RemoteStreamId": { + {ExtensionName: extensionNameAgoraRTC, Property: "remote_stream_id"}, + }, + "Token": { + {ExtensionName: extensionNameAgoraRTC, Property: "token"}, + }, + "VoiceType": { + {ExtensionName: extensionNameAzureTTS, Property: "azure_synthesis_voice_name"}, + {ExtensionName: extensionNameElevenlabsTTS, Property: "voice_id"}, + }, + } + + // Map the voice name to the voice type + voiceNameMap = map[string]map[string]map[string]string{ + languageChinese: { + extensionNameAzureTTS: { + voiceTypeMale: "zh-CN-YunxiNeural", + voiceTypeFemale: "zh-CN-XiaoxiaoNeural", + }, + extensionNameElevenlabsTTS: { + voiceTypeMale: "pNInz6obpgDQGcFmaJgB", // Adam + voiceTypeFemale: "Xb7hH8MSUJpSbSDYk0k2", // Alice + }, + }, + languageEnglish: { + extensionNameAzureTTS: { + voiceTypeMale: "en-US-BrianNeural", + voiceTypeFemale: "en-US-JaneNeural", + }, + extensionNameElevenlabsTTS: { + voiceTypeMale: "pNInz6obpgDQGcFmaJgB", // Adam + voiceTypeFemale: "Xb7hH8MSUJpSbSDYk0k2", // Alice + }, + }, + } +) diff --git a/server/internal/http_server.go b/server/internal/http_server.go index 3158edd3..13461156 100644 --- a/server/internal/http_server.go +++ b/server/internal/http_server.go @@ -5,9 +5,6 @@ * Copyright (c) 2024 Agora IO. All rights reserved. * */ -// Note that this is just an example extension written in the GO programming -// language, so the package name does not equal to the containing directory -// name. However, it is not common in Go. package internal import ( @@ -22,7 +19,6 @@ import ( "github.com/gin-gonic/gin" "github.com/gin-gonic/gin/binding" "github.com/gogf/gf/crypto/gmd5" - "github.com/tidwall/gjson" "github.com/tidwall/sjson" ) @@ -33,7 +29,8 @@ type HttpServer struct { type HttpServerConfig struct { AppId string AppCertificate string - ManifestJsonFile string + LogPath string + PropertyJsonFile string Port string TTSVendorChinese string TTSVendorEnglish string @@ -42,98 +39,37 @@ type HttpServerConfig struct { } type PingReq struct { - RequestId string `form:"request_id,omitempty" json:"request_id,omitempty"` - ChannelName string `form:"channel_name,omitempty" json:"channel_name,omitempty"` + RequestId string `json:"request_id,omitempty"` + ChannelName string `json:"channel_name,omitempty"` } type StartReq struct { - RequestId string `form:"request_id,omitempty" json:"request_id,omitempty"` - AgoraAsrLanguage string `form:"agora_asr_language,omitempty" json:"agora_asr_language,omitempty"` - ChannelName string `form:"channel_name,omitempty" json:"channel_name,omitempty"` - RemoteStreamId uint32 `form:"remote_stream_id,omitempty" json:"remote_stream_id,omitempty"` - VoiceType string `form:"voice_type,omitempty" json:"voice_type,omitempty"` + RequestId string `json:"request_id,omitempty"` + AgoraAsrLanguage string `json:"agora_asr_language,omitempty"` + ChannelName string `json:"channel_name,omitempty"` + GraphName string `json:"graph_name,omitempty"` + RemoteStreamId uint32 `json:"remote_stream_id,omitempty"` + Token string `json:"token,omitempty"` + VoiceType string `json:"voice_type,omitempty"` } type StopReq struct { - RequestId string `form:"request_id,omitempty" json:"request_id,omitempty"` - ChannelName string `form:"channel_name,omitempty" json:"channel_name,omitempty"` + RequestId string `json:"request_id,omitempty"` + ChannelName string `json:"channel_name,omitempty"` } type GenerateTokenReq struct { - RequestId string `form:"request_id,omitempty" json:"request_id,omitempty"` - ChannelName string `form:"channel_name,omitempty" json:"channel_name,omitempty"` - Uid uint32 `form:"uid,omitempty" json:"uid,omitempty"` + RequestId string `json:"request_id,omitempty"` + ChannelName string `json:"channel_name,omitempty"` + Uid uint32 `json:"uid,omitempty"` } -const ( - privilegeExpirationInSeconds = uint32(86400) - tokenExpirationInSeconds = uint32(86400) - - languageChinese = "zh-CN" - languageEnglish = "en-US" - - ManifestJsonFile = "./agents/manifest.json" - ManifestJsonFileElevenlabs = "./agents/manifest.elevenlabs.json" - - TTSVendorAzure = "azure" - TTSVendorElevenlabs = "elevenlabs" - - voiceTypeMale = "male" - voiceTypeFemale = "female" -) - -var ( - voiceNameMap = map[string]map[string]map[string]string{ - languageChinese: { - TTSVendorAzure: { - voiceTypeMale: "zh-CN-YunxiNeural", - voiceTypeFemale: "zh-CN-XiaoxiaoNeural", - }, - TTSVendorElevenlabs: { - voiceTypeMale: "pNInz6obpgDQGcFmaJgB", // Adam - voiceTypeFemale: "Xb7hH8MSUJpSbSDYk0k2", // Alice - }, - }, - languageEnglish: { - TTSVendorAzure: { - voiceTypeMale: "en-US-BrianNeural", - voiceTypeFemale: "en-US-JaneNeural", - }, - TTSVendorElevenlabs: { - voiceTypeMale: "pNInz6obpgDQGcFmaJgB", // Adam - voiceTypeFemale: "Xb7hH8MSUJpSbSDYk0k2", // Alice - }, - }, - } - - logTag = slog.String("service", "HTTP_SERVER") -) - func NewHttpServer(httpServerConfig *HttpServerConfig) *HttpServer { return &HttpServer{ config: httpServerConfig, } } -func (s *HttpServer) getManifestJsonFile(language string) (manifestJsonFile string) { - ttsVendor := s.getTtsVendor(language) - manifestJsonFile = ManifestJsonFile - - if ttsVendor == TTSVendorElevenlabs { - manifestJsonFile = ManifestJsonFileElevenlabs - } - - return -} - -func (s *HttpServer) getTtsVendor(language string) string { - if language == languageChinese { - return s.config.TTSVendorChinese - } - - return s.config.TTSVendorEnglish -} - func (s *HttpServer) handlerHealth(c *gin.Context) { slog.Debug("handlerHealth", logTag) s.output(c, codeOk, nil) @@ -200,14 +136,14 @@ func (s *HttpServer) handlerStart(c *gin.Context) { return } - manifestJsonFile, logFile, err := s.processManifest(&req) + propertyJsonFile, logFile, err := s.processProperty(&req) if err != nil { - slog.Error("handlerStart process manifest", "channelName", req.ChannelName, "requestId", req.RequestId, logTag) - s.output(c, codeErrProcessManifestFailed, http.StatusInternalServerError) + slog.Error("handlerStart process property", "channelName", req.ChannelName, "requestId", req.RequestId, logTag) + s.output(c, codeErrProcessPropertyFailed, http.StatusInternalServerError) return } - worker := newWorker(req.ChannelName, logFile, manifestJsonFile) + worker := newWorker(req.ChannelName, logFile, propertyJsonFile) worker.QuitTimeoutSeconds = s.config.WorkerQuitTimeoutSeconds if err := worker.start(&req); err != nil { slog.Error("handlerStart start worker failed", "err", err, "requestId", req.RequestId, logTag) @@ -276,7 +212,7 @@ func (s *HttpServer) handlerGenerateToken(c *gin.Context) { return } - token, err := rtctokenbuilder.BuildTokenWithUid(s.config.AppId, s.config.AppCertificate, req.ChannelName, req.Uid, rtctokenbuilder.RolePublisher, tokenExpirationInSeconds, privilegeExpirationInSeconds) + token, err := rtctokenbuilder.BuildTokenWithUid(s.config.AppId, s.config.AppCertificate, req.ChannelName, req.Uid, rtctokenbuilder.RolePublisher, tokenExpirationInSeconds, tokenExpirationInSeconds) if err != nil { slog.Error("handlerGenerateToken generate token failed", "err", err, "requestId", req.RequestId, logTag) s.output(c, codeErrGenerateTokenFailed, http.StatusBadRequest) @@ -295,59 +231,52 @@ func (s *HttpServer) output(c *gin.Context, code *Code, data any, httpStatus ... c.JSON(httpStatus[0], gin.H{"code": code.code, "msg": code.msg, "data": data}) } -func (s *HttpServer) processManifest(req *StartReq) (manifestJsonFile string, logFile string, err error) { - manifestJsonFile = s.getManifestJsonFile(req.AgoraAsrLanguage) - content, err := os.ReadFile(manifestJsonFile) +func (s *HttpServer) processProperty(req *StartReq) (propertyJsonFile string, logFile string, err error) { + content, err := os.ReadFile(PropertyJsonFile) if err != nil { - slog.Error("handlerStart read manifest.json failed", "err", err, "manifestJsonFile", manifestJsonFile, "requestId", req.RequestId, logTag) + slog.Error("handlerStart read property.json failed", "err", err, "propertyJsonFile", propertyJsonFile, "requestId", req.RequestId, logTag) return } - manifestJson := string(content) + propertyJson := string(content) - if s.config.AppId != "" { - manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="agora_rtc").property.app_id`, s.config.AppId) + // Get graph name + graphName := req.GraphName + if graphName == "" { + graphName = graphNameDefault } - appId := gjson.Get(manifestJson, `predefined_graphs.0.nodes.#(name=="agora_rtc").property.app_id`).String() // Generate token - token := appId + req.Token = s.config.AppId if s.config.AppCertificate != "" { - token, err = rtctokenbuilder.BuildTokenWithUid(appId, s.config.AppCertificate, req.ChannelName, 0, rtctokenbuilder.RoleSubscriber, tokenExpirationInSeconds, privilegeExpirationInSeconds) + req.Token, err = rtctokenbuilder.BuildTokenWithUid(s.config.AppId, s.config.AppCertificate, req.ChannelName, 0, rtctokenbuilder.RoleSubscriber, tokenExpirationInSeconds, tokenExpirationInSeconds) if err != nil { slog.Error("handlerStart generate token failed", "err", err, "requestId", req.RequestId, logTag) return } } - manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="agora_rtc").property.token`, token) - if req.AgoraAsrLanguage != "" { - manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="agora_rtc").property.agora_asr_language`, req.AgoraAsrLanguage) - } - if req.ChannelName != "" { - manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="agora_rtc").property.channel`, req.ChannelName) - } - if req.RemoteStreamId != 0 { - manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="agora_rtc").property.remote_stream_id`, req.RemoteStreamId) - } - - language := gjson.Get(manifestJson, `predefined_graphs.0.nodes.#(name=="agora_rtc").property.agora_asr_language`).String() - - ttsVendor := s.getTtsVendor(language) - voiceName := voiceNameMap[language][ttsVendor][req.VoiceType] - if voiceName != "" { - if ttsVendor == TTSVendorAzure { - manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="azure_tts").property.azure_synthesis_voice_name`, voiceName) - } else if ttsVendor == TTSVendorElevenlabs { - manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="elevenlabs_tts").property.voice_id`, voiceName) + graph := fmt.Sprintf(`rte.predefined_graphs.#(name=="%s")`, graphName) + // Automatically start on launch + propertyJson, _ = sjson.Set(propertyJson, fmt.Sprintf(`%s.auto_start`, graph), true) + + // Set parameters from the request to property.json + for key, props := range startPropMap { + if val := getFieldValue(req, key); val != "" { + for _, prop := range props { + if key == "VoiceType" { + val = voiceNameMap[req.AgoraAsrLanguage][prop.ExtensionName][req.VoiceType] + } + propertyJson, _ = sjson.Set(propertyJson, fmt.Sprintf(`%s.nodes.#(name=="%s").property.%s`, graph, prop.ExtensionName, prop.Property), val) + } } } channelNameMd5 := gmd5.MustEncryptString(req.ChannelName) ts := time.Now().UnixNano() - manifestJsonFile = fmt.Sprintf("/tmp/manifest-%s-%d.json", channelNameMd5, ts) - logFile = fmt.Sprintf("/tmp/app-%s-%d.log", channelNameMd5, ts) - os.WriteFile(manifestJsonFile, []byte(manifestJson), 0644) + propertyJsonFile = fmt.Sprintf("%s/property-%s-%d.json", s.config.LogPath, channelNameMd5, ts) + logFile = fmt.Sprintf("%s/app-%s-%d.log", s.config.LogPath, channelNameMd5, ts) + os.WriteFile(propertyJsonFile, []byte(propertyJson), 0644) return } @@ -366,5 +295,5 @@ func (s *HttpServer) Start() { slog.Info("server start", "port", s.config.Port, logTag) go cleanWorker() - r.Run(s.config.Port) + r.Run(fmt.Sprintf(":%s", s.config.Port)) } diff --git a/server/internal/utils.go b/server/internal/utils.go new file mode 100644 index 00000000..0342c36d --- /dev/null +++ b/server/internal/utils.go @@ -0,0 +1,39 @@ +package internal + +import ( + "reflect" +) + +func getFieldValue(req any, fieldName string) any { + v := reflect.ValueOf(req) + if v.Kind() == reflect.Ptr { + v = v.Elem() + } + + field := v.FieldByName(fieldName) + + if field.IsValid() { + switch field.Kind() { + case reflect.Bool: + return field.Bool() + case reflect.Float32: + return float32(field.Float()) + case reflect.Float64: + return field.Float() + case reflect.Int: + return field.Int() + case reflect.Int32: + return int(field.Int()) + case reflect.Int64: + return field.Int() + case reflect.Uint32: + return field.Uint() + case reflect.Uint64: + return field.Uint() + case reflect.String: + return field.String() + } + } + + return nil +} diff --git a/server/internal/worker.go b/server/internal/worker.go index 788649f1..c250c296 100644 --- a/server/internal/worker.go +++ b/server/internal/worker.go @@ -16,7 +16,7 @@ import ( type Worker struct { ChannelName string LogFile string - ManifestJsonFile string + PropertyJsonFile string Pid int QuitTimeoutSeconds int CreateTs int64 @@ -32,11 +32,11 @@ var ( workers = gmap.New(true) ) -func newWorker(channelName string, logFile string, manifestJsonFile string) *Worker { +func newWorker(channelName string, logFile string, propertyJsonFile string) *Worker { return &Worker{ ChannelName: channelName, LogFile: logFile, - ManifestJsonFile: manifestJsonFile, + PropertyJsonFile: propertyJsonFile, QuitTimeoutSeconds: 60, CreateTs: time.Now().Unix(), UpdateTs: time.Now().Unix(), @@ -44,14 +44,14 @@ func newWorker(channelName string, logFile string, manifestJsonFile string) *Wor } func (w *Worker) start(req *StartReq) (err error) { - shell := fmt.Sprintf("cd /app/agents && nohup %s --manifest %s > %s 2>&1 &", workerExec, w.ManifestJsonFile, w.LogFile) + shell := fmt.Sprintf("cd /app/agents && nohup %s --property %s > %s 2>&1 &", workerExec, w.PropertyJsonFile, w.LogFile) slog.Info("Worker start", "requestId", req.RequestId, "shell", shell, logTag) if _, err = exec.Command("sh", "-c", shell).CombinedOutput(); err != nil { slog.Error("Worker start failed", "err", err, "requestId", req.RequestId, logTag) return } - shell = fmt.Sprintf("ps aux | grep %s | grep -v grep | awk '{print $2}'", w.ManifestJsonFile) + shell = fmt.Sprintf("ps aux | grep %s | grep -v grep | awk '{print $2}'", w.PropertyJsonFile) slog.Info("Worker get pid", "requestId", req.RequestId, "shell", shell, logTag) output, err := exec.Command("sh", "-c", shell).CombinedOutput() if err != nil { diff --git a/server/main.go b/server/main.go index 2c3274d9..442e6506 100644 --- a/server/main.go +++ b/server/main.go @@ -1,146 +1,86 @@ package main import ( - "flag" + "fmt" "log/slog" "os" "strconv" + "github.com/joho/godotenv" + "github.com/tidwall/gjson" "github.com/tidwall/sjson" "app/internal" ) func main() { - httpServerConfig := &internal.HttpServerConfig{} - - ttsVendorChinese := os.Getenv("TTS_VENDOR_CHINESE") - if len(ttsVendorChinese) == 0 { - ttsVendorChinese = internal.TTSVendorAzure + // Load .env + err := godotenv.Load() + if err != nil { + slog.Warn("load .env file failed", "err", err) } - ttsVendorEnglish := os.Getenv("TTS_VENDOR_ENGLISH") - if len(ttsVendorEnglish) == 0 { - ttsVendorEnglish = internal.TTSVendorAzure + // Check environment + agoraAppId := os.Getenv("AGORA_APP_ID") + if len(agoraAppId) != 32 { + slog.Error("environment AGORA_APP_ID invalid") + os.Exit(1) } workersMax, err := strconv.Atoi(os.Getenv("WORKERS_MAX")) if err != nil || workersMax <= 0 { - workersMax = 2 + slog.Error("environment WORKERS_MAX invalid") + os.Exit(1) } workerQuitTimeoutSeconds, err := strconv.Atoi(os.Getenv("WORKER_QUIT_TIMEOUT_SECONDES")) if err != nil || workerQuitTimeoutSeconds <= 0 { - workerQuitTimeoutSeconds = 60 + slog.Error("environment WORKER_QUIT_TIMEOUT_SECONDES invalid") + os.Exit(1) } - flag.StringVar(&httpServerConfig.AppId, "appId", os.Getenv("AGORA_APP_ID"), "agora appid") - flag.StringVar(&httpServerConfig.AppCertificate, "appCertificate", os.Getenv("AGORA_APP_CERTIFICATE"), "agora certificate") - flag.StringVar(&httpServerConfig.Port, "port", ":8080", "http server port") - flag.StringVar(&httpServerConfig.TTSVendorChinese, "ttsVendorChinese", ttsVendorChinese, "tts vendor for chinese") - flag.StringVar(&httpServerConfig.TTSVendorEnglish, "ttsVendorEnglish", ttsVendorEnglish, "tts vendor for english") - flag.IntVar(&httpServerConfig.WorkersMax, "workersMax", workersMax, "workers max") - flag.IntVar(&httpServerConfig.WorkerQuitTimeoutSeconds, "workerQuitTimeoutSeconds", workerQuitTimeoutSeconds, "worker quit timeout seconds") - flag.Parse() - - slog.Info("server config", "ttsVendorChinese", httpServerConfig.TTSVendorChinese, "ttsVendorEnglish", httpServerConfig.TTSVendorEnglish, - "workersMax", httpServerConfig.WorkersMax, "workerQuitTimeoutSeconds", httpServerConfig.WorkerQuitTimeoutSeconds) + // Process property.json + if err = processProperty(internal.PropertyJsonFile); err != nil { + slog.Error("process property.json failed", "err", err) + os.Exit(1) + } - processManifest(internal.ManifestJsonFile) - processManifest(internal.ManifestJsonFileElevenlabs) + // Start server + httpServerConfig := &internal.HttpServerConfig{ + AppId: agoraAppId, + AppCertificate: os.Getenv("AGORA_APP_CERTIFICATE"), + LogPath: os.Getenv("LOG_PATH"), + Port: os.Getenv("SERVER_PORT"), + WorkersMax: workersMax, + WorkerQuitTimeoutSeconds: workerQuitTimeoutSeconds, + } httpServer := internal.NewHttpServer(httpServerConfig) httpServer.Start() } -func processManifest(manifestJsonFile string) (err error) { - content, err := os.ReadFile(manifestJsonFile) +func processProperty(propertyJsonFile string) (err error) { + content, err := os.ReadFile(propertyJsonFile) if err != nil { - slog.Error("read manifest.json failed", "err", err, "manifestJsonFile", manifestJsonFile) + slog.Error("read property.json failed", "err", err, "propertyJsonFile", propertyJsonFile) return } - manifestJson := string(content) - - appId := os.Getenv("AGORA_APP_ID") - if appId != "" { - manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="agora_rtc").property.app_id`, appId) - } - - azureSttKey := os.Getenv("AZURE_STT_KEY") - if azureSttKey != "" { - manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="agora_rtc").property.agora_asr_vendor_key`, azureSttKey) - } - - azureSttRegion := os.Getenv("AZURE_STT_REGION") - if azureSttRegion != "" { - manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="agora_rtc").property.agora_asr_vendor_region`, azureSttRegion) - } - - openaiBaseUrl := os.Getenv("OPENAI_BASE_URL") - if openaiBaseUrl != "" { - manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="openai_chatgpt").property.base_url`, openaiBaseUrl) - } - - openaiApiKey := os.Getenv("OPENAI_API_KEY") - if openaiApiKey != "" { - manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="openai_chatgpt").property.api_key`, openaiApiKey) - } - - openaiModel := os.Getenv("OPENAI_MODEL") - if openaiModel != "" { - manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="openai_chatgpt").property.model`, openaiModel) - } - - proxyUrl := os.Getenv("PROXY_URL") - if proxyUrl != "" { - manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="openai_chatgpt").property.proxy_url`, proxyUrl) - } - - azureTtsKey := os.Getenv("AZURE_TTS_KEY") - if azureTtsKey != "" { - manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="azure_tts").property.azure_subscription_key`, azureTtsKey) - } - - azureTtsRegion := os.Getenv("AZURE_TTS_REGION") - if azureTtsRegion != "" { - manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="azure_tts").property.azure_subscription_region`, azureTtsRegion) - } - - awsRegion := os.Getenv("AWS_REGION") - if awsRegion != "" { - manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="bedrock_llm").property.region`, awsRegion) - } - - awsAccessKey := os.Getenv("AWS_ACCESS_KEY_ID") - if awsAccessKey != "" { - manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="bedrock_llm").property.access_key`, awsAccessKey) - } - - awsSecretKey := os.Getenv("AWS_SECRET_ACCESS_KEY") - if awsSecretKey != "" { - manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="bedrock_llm").property.secret_key`, awsSecretKey) - } - - bedrockModel := os.Getenv("AWS_BEDROCK_MODEL") - if bedrockModel != "" { - manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="bedrock_llm").property.model`, bedrockModel) - } - - elevenlabsTtsKey := os.Getenv("ELEVENLABS_TTS_KEY") - if elevenlabsTtsKey != "" { - manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="elevenlabs_tts").property.api_key`, elevenlabsTtsKey) - } - - cosyTtsKey := os.Getenv("COSY_TTS_KEY") - if cosyTtsKey != "" { - manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="cosy_tts").property.api_key`, cosyTtsKey) - } + propertyJson := string(content) + for i := range gjson.Get(propertyJson, "rte.predefined_graphs").Array() { + graph := fmt.Sprintf("rte.predefined_graphs.%d", i) + // Shut down all auto-starting Graphs + propertyJson, _ = sjson.Set(propertyJson, fmt.Sprintf(`%s.auto_start`, graph), false) - qwenApiKey := os.Getenv("QWEN_API_KEY") - if qwenApiKey != "" { - manifestJson, _ = sjson.Set(manifestJson, `predefined_graphs.0.nodes.#(name=="qwen_llm").property.api_key`, qwenApiKey) + // Set environment variable values to property.json + for envKey, envProps := range internal.EnvPropMap { + if envVal := os.Getenv(envKey); envVal != "" { + for _, envProp := range envProps { + propertyJson, _ = sjson.Set(propertyJson, fmt.Sprintf(`%s.nodes.#(name=="%s").property.%s`, graph, envProp.ExtensionName, envProp.Property), envVal) + } + } + } } - err = os.WriteFile(manifestJsonFile, []byte(manifestJson), 0644) + err = os.WriteFile(propertyJsonFile, []byte(propertyJson), 0644) return }