From b1fa4b9c2f1a1640a271c465298c52985c236f22 Mon Sep 17 00:00:00 2001 From: sunshinexcode <24xinhui@163.com> Date: Tue, 12 Nov 2024 14:04:26 +0000 Subject: [PATCH 1/3] feat(): add minimax tts extension --- .env.example | 5 + agents/property.json | 182 ++++++++++ .../ten_packages/extension/minimax_tts/go.mod | 12 + .../ten_packages/extension/minimax_tts/go.sum | 5 + .../extension/minimax_tts/manifest.json | 59 ++++ .../extension/minimax_tts/minimax_tts.go | 150 +++++++++ .../minimax_tts/minimax_tts_extension.go | 316 ++++++++++++++++++ .../ten_packages/extension/minimax_tts/pcm.go | 102 ++++++ .../extension/minimax_tts/property.json | 1 + 9 files changed, 832 insertions(+) create mode 100644 agents/ten_packages/extension/minimax_tts/go.mod create mode 100644 agents/ten_packages/extension/minimax_tts/go.sum create mode 100644 agents/ten_packages/extension/minimax_tts/manifest.json create mode 100644 agents/ten_packages/extension/minimax_tts/minimax_tts.go create mode 100644 agents/ten_packages/extension/minimax_tts/minimax_tts_extension.go create mode 100644 agents/ten_packages/extension/minimax_tts/pcm.go create mode 100644 agents/ten_packages/extension/minimax_tts/property.json diff --git a/.env.example b/.env.example index cea81be18..a5c809306 100644 --- a/.env.example +++ b/.env.example @@ -91,6 +91,11 @@ GEMINI_API_KEY= # AWS_REGION_NAME= LITELLM_MODEL=gpt-4o-mini +# Extension: minimax_tts +# Minimax TTS key +MINIMAX_TTS_API_KEY= +MINIMAX_TTS_GROUP_ID= + # Extension: openai_chatgpt # OpenAI API key OPENAI_API_BASE=https://api.openai.com/v1 diff --git a/agents/property.json b/agents/property.json index 0c921e84a..5b12dd7dc 100644 --- a/agents/property.json +++ b/agents/property.json @@ -4575,6 +4575,188 @@ ] } ] + }, + { + "name": "va_openai_minimax", + "auto_start": false, + "nodes": [ + { + "type": "extension", + "extension_group": "default", + "addon": "agora_rtc", + "name": "agora_rtc", + "property": { + "app_id": "${env:AGORA_APP_ID}", + "token": "", + "channel": "ten_agents_test", + "stream_id": 1234, + "remote_stream_id": 123, + "subscribe_audio": true, + "publish_audio": true, + "publish_data": true, + "enable_agora_asr": true, + "agora_asr_vendor_name": "microsoft", + "agora_asr_language": "en-US", + "agora_asr_vendor_key": "${env:AZURE_STT_KEY}", + "agora_asr_vendor_region": "${env:AZURE_STT_REGION}", + "agora_asr_session_control_file_path": "session_control.conf" + } + }, + { + "type": "extension", + "extension_group": "default", + "addon": "interrupt_detector", + "name": "interrupt_detector" + }, + { + "type": "extension", + "extension_group": "chatgpt", + "addon": "openai_chatgpt", + "name": "openai_chatgpt", + "property": { + "base_url": "${env:OPENAI_API_BASE}", + "api_key": "${env:OPENAI_API_KEY}", + "frequency_penalty": 0.9, + "model": "${env:OPENAI_MODEL}", + "max_tokens": 512, + "prompt": "", + "proxy_url": "${env:OPENAI_PROXY_URL}", + "greeting": "TEN Agent connected. How can I help you today?", + "max_memory_length": 10 + } + }, + { + "type": "extension", + "extension_group": "transcriber", + "addon": "message_collector", + "name": "message_collector" + }, + { + "type": "extension", + "extension_group": "tts", + "addon": "minimax_tts", + "name": "minimax_tts", + "property": { + "api_key": "${env:MINIMAX_TTS_API_KEY}", + "group_id": "${env:MINIMAX_TTS_GROUP_ID}", + "model": "speech-01-turbo", + "request_timeout_seconds": 30, + "url": "https://api.minimax.chat/v1/t2a_v2", + "voice_id": "male-qn-qingse" + } + } + ], + "connections": [ + { + "extension_group": "default", + "extension": "agora_rtc", + "data": [ + { + "name": "text_data", + "dest": [ + { + "extension_group": "default", + "extension": "interrupt_detector" + }, + { + "extension_group": "chatgpt", + "extension": "openai_chatgpt" + }, + { + "extension_group": "transcriber", + "extension": "message_collector" + } + ] + } + ] + }, + { + "extension_group": "chatgpt", + "extension": "openai_chatgpt", + "data": [ + { + "name": "text_data", + "dest": [ + { + "extension_group": "tts", + "extension": "minimax_tts" + }, + { + "extension_group": "transcriber", + "extension": "message_collector" + } + ] + } + ], + "cmd": [ + { + "name": "flush", + "dest": [ + { + "extension_group": "tts", + "extension": "minimax_tts" + } + ] + } + ] + }, + { + "extension_group": "tts", + "extension": "minimax_tts", + "audio_frame": [ + { + "name": "pcm_frame", + "dest": [ + { + "extension_group": "default", + "extension": "agora_rtc" + } + ] + } + ], + "cmd": [ + { + "name": "flush", + "dest": [ + { + "extension_group": "default", + "extension": "agora_rtc" + } + ] + } + ] + }, + { + "extension_group": "transcriber", + "extension": "message_collector", + "data": [ + { + "name": "data", + "dest": [ + { + "extension_group": "default", + "extension": "agora_rtc" + } + ] + } + ] + }, + { + "extension_group": "default", + "extension": "interrupt_detector", + "cmd": [ + { + "name": "flush", + "dest": [ + { + "extension_group": "chatgpt", + "extension": "openai_chatgpt" + } + ] + } + ] + } + ] } ] } diff --git a/agents/ten_packages/extension/minimax_tts/go.mod b/agents/ten_packages/extension/minimax_tts/go.mod new file mode 100644 index 000000000..82c072d98 --- /dev/null +++ b/agents/ten_packages/extension/minimax_tts/go.mod @@ -0,0 +1,12 @@ +module minimax_tts + +go 1.20 + +replace ten_framework => ../../system/ten_runtime_go/interface + +require ( + github.com/go-resty/resty/v2 v2.16.0 + ten_framework v0.0.0-00010101000000-000000000000 +) + +require golang.org/x/net v0.27.0 // indirect diff --git a/agents/ten_packages/extension/minimax_tts/go.sum b/agents/ten_packages/extension/minimax_tts/go.sum new file mode 100644 index 000000000..6ceffb66d --- /dev/null +++ b/agents/ten_packages/extension/minimax_tts/go.sum @@ -0,0 +1,5 @@ +github.com/go-resty/resty/v2 v2.16.0 h1:qpKalHWI2bpp9BIKlyT8TYWEJXOk1NuKbfiT3RRnzWc= +github.com/go-resty/resty/v2 v2.16.0/go.mod h1:0fHAoK7JoBy/Ch36N8VFeMsK7xQOHhvWaC3iOktwmIU= +golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys= +golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= +golang.org/x/time v0.6.0 h1:eTDhh4ZXt5Qf0augr54TN6suAUudPcawVZeIAPU7D4U= diff --git a/agents/ten_packages/extension/minimax_tts/manifest.json b/agents/ten_packages/extension/minimax_tts/manifest.json new file mode 100644 index 000000000..5dd27f7b9 --- /dev/null +++ b/agents/ten_packages/extension/minimax_tts/manifest.json @@ -0,0 +1,59 @@ +{ + "type": "extension", + "name": "minimax_tts", + "version": "0.4.0", + "dependencies": [ + { + "type": "system", + "name": "ten_runtime_go", + "version": "0.3" + } + ], + "api": { + "property": { + "api_key": { + "type": "string" + }, + "group_id": { + "type": "string" + }, + "model": { + "type": "string" + }, + "request_timeout_seconds": { + "type": "int64" + }, + "url": { + "type": "string" + }, + "voice_id": { + "type": "string" + } + }, + "data_in": [ + { + "name": "text_data", + "property": { + "text": { + "type": "string" + } + } + } + ], + "cmd_in": [ + { + "name": "flush" + } + ], + "cmd_out": [ + { + "name": "flush" + } + ], + "audio_frame_out": [ + { + "name": "pcm_frame" + } + ] + } +} \ No newline at end of file diff --git a/agents/ten_packages/extension/minimax_tts/minimax_tts.go b/agents/ten_packages/extension/minimax_tts/minimax_tts.go new file mode 100644 index 000000000..33b5249d8 --- /dev/null +++ b/agents/ten_packages/extension/minimax_tts/minimax_tts.go @@ -0,0 +1,150 @@ +/** + * + * Agora Real Time Engagement + * Created by XinHui Li in 2024. + * Copyright (c) 2024 Agora IO. All rights reserved. + * + */ +// An extension written by Go for TTS +package extension + +import ( + "bufio" + "bytes" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "log/slog" + "net/http" + "time" + + "github.com/go-resty/resty/v2" +) + +type minimaxTTS struct { + client *resty.Client + config minimaxTTSConfig +} + +type minimaxTTSConfig struct { + ApiKey string + Model string + RequestTimeoutSeconds int + GroupId string + Url string + VoiceId string +} + +func defaultMinimaxTTSConfig() minimaxTTSConfig { + return minimaxTTSConfig{ + ApiKey: "", + Model: "speech-01-turbo", + RequestTimeoutSeconds: 30, + GroupId: "", + Url: "https://api.minimax.chat/v1/t2a_v2", + VoiceId: "male-qn-qingse", + } +} + +func newMinimaxTTS(config minimaxTTSConfig) (*minimaxTTS, error) { + return &minimaxTTS{ + config: config, + client: resty.New(). + SetRetryCount(0). + SetTimeout(time.Duration(config.RequestTimeoutSeconds) * time.Second), + }, nil +} + +func (e *minimaxTTS) textToSpeechStream(streamWriter io.Writer, text string) (err error) { + slog.Info("textToSpeechStream start tts", "text", text) + + payload := map[string]any{ + "audio_setting": map[string]any{ + "channel": 1, + "format": "pcm", + "sample_rate": 32000, + }, + "model": e.config.Model, + "pronunciation_dict": map[string]any{ + "tone": []string{}, + }, + "stream": true, + "text": text, + "voice_setting": map[string]any{ + "pitch": 0, + "speed": 1.0, + "voice_id": e.config.VoiceId, + "vol": 1.0, + }, + } + + resp, err := e.client.R(). + SetHeader("Content-Type", "application/json"). + SetHeader("Authorization", "Bearer "+e.config.ApiKey). + SetDoNotParseResponse(true). + SetBody(payload). + Post(fmt.Sprintf("%s?GroupId=%s", e.config.Url, e.config.GroupId)) + + if err != nil { + slog.Error("request failed", "err", err, "text", text) + return fmt.Errorf("textToSpeechStream failed, err: %v", err) + } + + defer func() { + resp.RawBody().Close() + + slog.Info("textToSpeechStream close response", "err", err, "text", text) + }() + + // Check the response status code + if resp.StatusCode() != http.StatusOK { + slog.Error("unexpected response status", "status", resp.StatusCode()) + return fmt.Errorf("unexpected response status: %d", resp.StatusCode()) + } + + scanner := bufio.NewScanner(resp.RawBody()) + for scanner.Scan() { + line := scanner.Bytes() + + if len(line) <= 5 || !bytes.HasPrefix(line, []byte("data:")) { + slog.Info("textToSpeechStream drop chunk", "text", text, "line", line) + continue + } + + var chunk struct { + Data struct { + Audio string `json:"audio"` + Status int `json:"status"` + } `json:"data"` + TraceId string `json:"trace_id"` + BaseResp struct { + StatusCode int `json:"status_code"` + StatusMsg string `json:"status_msg"` + } `json:"base_resp"` + } + + if err = json.Unmarshal(line[5:], &chunk); err != nil { + slog.Error("failed to decode JSON chunk", "err", err) + return + } + + if chunk.Data.Status == 2 { + continue + } + + audioData, err := hex.DecodeString(chunk.Data.Audio) + if err != nil { + slog.Error("failed to decode audio data", "err", err, "traceId", chunk.TraceId, "BaseResp", chunk.BaseResp) + break + } + + _, err = streamWriter.Write(audioData) + if err != nil { + slog.Error("failed to write to streamWriter", "err", err, "traceId", chunk.TraceId, "BaseResp", chunk.BaseResp) + break + } + } + + return +} diff --git a/agents/ten_packages/extension/minimax_tts/minimax_tts_extension.go b/agents/ten_packages/extension/minimax_tts/minimax_tts_extension.go new file mode 100644 index 000000000..696a34bbc --- /dev/null +++ b/agents/ten_packages/extension/minimax_tts/minimax_tts_extension.go @@ -0,0 +1,316 @@ +/** + * + * Agora Real Time Engagement + * Created by XinHui Li in 2024. + * Copyright (c) 2024 Agora IO. All rights reserved. + * + */ +// An extension written by Go for TTS +package extension + +import ( + "fmt" + "io" + "log/slog" + "sync" + "sync/atomic" + "time" + + "ten_framework/ten" +) + +const ( + cmdInFlush = "flush" + cmdOutFlush = "flush" + dataInTextDataPropertyText = "text" + + propertyApiKey = "api_key" // Required + propertyGroupId = "group_id" // Required + propertyModel = "model" // Optional + propertyRequestTimeoutSeconds = "request_timeout_seconds" // Optional + propertyUrl = "url" // Optional + propertyVoiceId = "voice_id" // Optional +) + +const ( + textChanMax = 1024 +) + +var ( + logTag = slog.String("extension", "MINIMAX_TTS_EXTENSION") + + outdateTs atomic.Int64 + textChan chan *message + wg sync.WaitGroup +) + +type minimaxTTSExtension struct { + ten.DefaultExtension + minimaxTTS *minimaxTTS +} + +type message struct { + text string + receivedTs int64 +} + +func newMinimaxTTSExtension(name string) ten.Extension { + return &minimaxTTSExtension{} +} + +// OnStart will be called when the extension is starting, +// properies can be read here to initialize and start the extension. +// current supported properties: +// - api_key (required) +// - group_id (required) +// - model +// - request_timeout_seconds +// - url +// - voice_id +func (e *minimaxTTSExtension) OnStart(ten ten.TenEnv) { + slog.Info("OnStart", logTag) + + // prepare configuration + minimaxTTSConfig := defaultMinimaxTTSConfig() + + if apiKey, err := ten.GetPropertyString(propertyApiKey); err != nil { + slog.Error(fmt.Sprintf("GetProperty required %s failed, err: %v", propertyApiKey, err), logTag) + return + } else { + minimaxTTSConfig.ApiKey = apiKey + } + + if groupId, err := ten.GetPropertyString(propertyGroupId); err != nil { + slog.Error(fmt.Sprintf("GetProperty required %s failed, err: %v", propertyGroupId, err), logTag) + return + } else { + minimaxTTSConfig.GroupId = groupId + } + + if model, err := ten.GetPropertyString(propertyModel); err != nil { + slog.Warn(fmt.Sprintf("GetProperty optional %s failed, err: %v", propertyModel, err), logTag) + } else { + if len(model) > 0 { + minimaxTTSConfig.Model = model + } + } + + if requestTimeoutSeconds, err := ten.GetPropertyInt64(propertyRequestTimeoutSeconds); err != nil { + slog.Warn(fmt.Sprintf("GetProperty optional %s failed, err: %v", propertyRequestTimeoutSeconds, err), logTag) + } else { + if requestTimeoutSeconds > 0 { + minimaxTTSConfig.RequestTimeoutSeconds = int(requestTimeoutSeconds) + } + } + + if url, err := ten.GetPropertyString(propertyUrl); err != nil { + slog.Warn(fmt.Sprintf("GetProperty optional %s failed, err: %v", propertyUrl, err), logTag) + } else { + if len(url) > 0 { + minimaxTTSConfig.Url = url + } + } + + if voiceId, err := ten.GetPropertyString(propertyVoiceId); err != nil { + slog.Warn(fmt.Sprintf("GetProperty optional %s failed, err: %v", propertyVoiceId, err), logTag) + } else { + minimaxTTSConfig.VoiceId = voiceId + } + + // create minimaxTTS instance + minimaxTTS, err := newMinimaxTTS(minimaxTTSConfig) + if err != nil { + slog.Error(fmt.Sprintf("newMinimaxTTS failed, err: %v", err), logTag) + return + } + + slog.Info(fmt.Sprintf("newMinimaxTTS succeed with Model: %s", minimaxTTSConfig.Model), logTag) + + // set minimaxTTS instance + e.minimaxTTS = minimaxTTS + + // create pcm instance + pcm := newPcm(defaultPcmConfig()) + pcmFrameSize := pcm.getPcmFrameSize() + + // init chan + textChan = make(chan *message, textChanMax) + + go func() { + slog.Info("process textChan", logTag) + + for msg := range textChan { + if msg.receivedTs < outdateTs.Load() { // Check whether to interrupt + slog.Info(fmt.Sprintf("textChan interrupt and flushing for input text: [%s], receivedTs: %d, outdateTs: %d", + msg.text, msg.receivedTs, outdateTs.Load()), logTag) + continue + } + + wg.Add(1) + slog.Info(fmt.Sprintf("textChan text: [%s]", msg.text), logTag) + + r, w := io.Pipe() + startTime := time.Now() + + go func() { + defer wg.Done() + defer w.Close() + + slog.Info(fmt.Sprintf("textToSpeechStream text: [%s]", msg.text), logTag) + err = e.minimaxTTS.textToSpeechStream(w, msg.text) + slog.Info(fmt.Sprintf("textToSpeechStream result: [%v]", err), logTag) + if err != nil { + slog.Error(fmt.Sprintf("textToSpeechStream failed, err: %v", err), logTag) + return + } + }() + + slog.Info(fmt.Sprintf("read pcm stream, text:[%s], pcmFrameSize:%d", msg.text, pcmFrameSize), logTag) + + var ( + firstFrameLatency int64 + n int + pcmFrameRead int + readBytes int + sentFrames int + ) + buf := pcm.newBuf() + + // read pcm stream + for { + if msg.receivedTs < outdateTs.Load() { // Check whether to interrupt + slog.Info(fmt.Sprintf("read pcm stream interrupt and flushing for input text: [%s], receivedTs: %d, outdateTs: %d", + msg.text, msg.receivedTs, outdateTs.Load()), logTag) + break + } + + n, err = r.Read(buf[pcmFrameRead:]) + readBytes += n + pcmFrameRead += n + + if err != nil { + if err == io.EOF { + slog.Info("read pcm stream EOF", logTag) + break + } + + slog.Error(fmt.Sprintf("read pcm stream failed, err: %v", err), logTag) + break + } + + if pcmFrameRead != pcmFrameSize { + slog.Debug(fmt.Sprintf("the number of bytes read is [%d] inconsistent with pcm frame size", pcmFrameRead), logTag) + continue + } + + pcm.send(ten, buf) + // clear buf + buf = pcm.newBuf() + pcmFrameRead = 0 + sentFrames++ + + if firstFrameLatency == 0 { + firstFrameLatency = time.Since(startTime).Milliseconds() + slog.Info(fmt.Sprintf("first frame available for text: [%s], receivedTs: %d, firstFrameLatency: %dms", msg.text, msg.receivedTs, firstFrameLatency), logTag) + } + + slog.Debug(fmt.Sprintf("sending pcm data, text: [%s]", msg.text), logTag) + } + + if pcmFrameRead > 0 { + pcm.send(ten, buf) + sentFrames++ + slog.Info(fmt.Sprintf("sending pcm remain data, text: [%s], pcmFrameRead: %d", msg.text, pcmFrameRead), logTag) + } + + r.Close() + slog.Info(fmt.Sprintf("send pcm data finished, text: [%s], receivedTs: %d, readBytes: %d, sentFrames: %d, firstFrameLatency: %dms, finishLatency: %dms", + msg.text, msg.receivedTs, readBytes, sentFrames, firstFrameLatency, time.Since(startTime).Milliseconds()), logTag) + } + }() + + ten.OnStartDone() +} + +// OnCmd receives cmd from ten graph. +// current supported cmd: +// - name: flush +// example: +// {"name": "flush"} +func (e *minimaxTTSExtension) OnCmd( + tenEnv ten.TenEnv, + cmd ten.Cmd, +) { + cmdName, err := cmd.GetName() + if err != nil { + slog.Error(fmt.Sprintf("OnCmd get name failed, err: %v", err), logTag) + cmdResult, _ := ten.NewCmdResult(ten.StatusCodeError) + tenEnv.ReturnResult(cmdResult, cmd) + return + } + + slog.Info(fmt.Sprintf("OnCmd %s", cmdInFlush), logTag) + + switch cmdName { + case cmdInFlush: + outdateTs.Store(time.Now().UnixMicro()) + + // send out + outCmd, err := ten.NewCmd(cmdOutFlush) + if err != nil { + slog.Error(fmt.Sprintf("new cmd %s failed, err: %v", cmdOutFlush, err), logTag) + cmdResult, _ := ten.NewCmdResult(ten.StatusCodeError) + tenEnv.ReturnResult(cmdResult, cmd) + return + } + + if err := tenEnv.SendCmd(outCmd, nil); err != nil { + slog.Error(fmt.Sprintf("send cmd %s failed, err: %v", cmdOutFlush, err), logTag) + cmdResult, _ := ten.NewCmdResult(ten.StatusCodeError) + tenEnv.ReturnResult(cmdResult, cmd) + return + } else { + slog.Info(fmt.Sprintf("cmd %s sent", cmdOutFlush), logTag) + } + } + + cmdResult, _ := ten.NewCmdResult(ten.StatusCodeOk) + tenEnv.ReturnResult(cmdResult, cmd) +} + +// OnData receives data from ten graph. +// current supported data: +// - name: text_data +// example: +// {name: text_data, properties: {text: "hello"} +func (e *minimaxTTSExtension) OnData( + tenEnv ten.TenEnv, + data ten.Data, +) { + text, err := data.GetPropertyString(dataInTextDataPropertyText) + if err != nil { + slog.Warn(fmt.Sprintf("OnData GetProperty %s failed, err: %v", dataInTextDataPropertyText, err), logTag) + return + } + + if len(text) == 0 { + slog.Debug("OnData text is empty, ignored", logTag) + return + } + + slog.Info(fmt.Sprintf("OnData input text: [%s]", text), logTag) + + go func() { + textChan <- &message{text: text, receivedTs: time.Now().UnixMicro()} + }() +} + +func init() { + slog.Info("minimax_tts extension init", logTag) + + // Register addon + ten.RegisterAddonAsExtension( + "minimax_tts", + ten.NewDefaultExtensionAddon(newMinimaxTTSExtension), + ) +} diff --git a/agents/ten_packages/extension/minimax_tts/pcm.go b/agents/ten_packages/extension/minimax_tts/pcm.go new file mode 100644 index 000000000..bb7072efc --- /dev/null +++ b/agents/ten_packages/extension/minimax_tts/pcm.go @@ -0,0 +1,102 @@ +/** + * + * Agora Real Time Engagement + * Created by XinHui Li in 2024. + * Copyright (c) 2024 Agora IO. All rights reserved. + * + */ +// An extension written by Go for TTS +package extension + +import ( + "fmt" + "log/slog" + + "ten_framework/ten" +) + +type pcm struct { + config *pcmConfig +} + +type pcmConfig struct { + BytesPerSample int32 + Channel int32 + ChannelLayout uint64 + Name string + SampleRate int32 + SamplesPerChannel int32 + Timestamp int64 +} + +func defaultPcmConfig() *pcmConfig { + return &pcmConfig{ + BytesPerSample: 2, + Channel: 1, + ChannelLayout: 1, + Name: "pcm_frame", + SampleRate: 32000, + SamplesPerChannel: 32000 / 100, + Timestamp: 0, + } +} + +func newPcm(config *pcmConfig) *pcm { + return &pcm{ + config: config, + } +} + +func (p *pcm) getPcmFrame(buf []byte) (pcmFrame ten.AudioFrame, err error) { + pcmFrame, err = ten.NewAudioFrame(p.config.Name) + if err != nil { + slog.Error(fmt.Sprintf("NewAudioFrame failed, err: %v", err), logTag) + return + } + + // set pcm frame + pcmFrame.SetBytesPerSample(p.config.BytesPerSample) + pcmFrame.SetSampleRate(p.config.SampleRate) + pcmFrame.SetChannelLayout(p.config.ChannelLayout) + pcmFrame.SetNumberOfChannels(p.config.Channel) + pcmFrame.SetTimestamp(p.config.Timestamp) + pcmFrame.SetDataFmt(ten.AudioFrameDataFmtInterleave) + pcmFrame.SetSamplesPerChannel(p.config.SamplesPerChannel) + pcmFrame.AllocBuf(p.getPcmFrameSize()) + + borrowedBuf, err := pcmFrame.LockBuf() + if err != nil { + slog.Error(fmt.Sprintf("LockBuf failed, err: %v", err), logTag) + return + } + + // copy data + copy(borrowedBuf, buf) + + pcmFrame.UnlockBuf(&borrowedBuf) + return +} + +func (p *pcm) getPcmFrameSize() int { + return int(p.config.SamplesPerChannel * p.config.Channel * p.config.BytesPerSample) +} + +func (p *pcm) newBuf() []byte { + return make([]byte, p.getPcmFrameSize()) +} + +func (p *pcm) send(tenEnv ten.TenEnv, buf []byte) (err error) { + pcmFrame, err := p.getPcmFrame(buf) + if err != nil { + slog.Error(fmt.Sprintf("getPcmFrame failed, err: %v", err), logTag) + return + } + + // send pcm + if err = tenEnv.SendAudioFrame(pcmFrame); err != nil { + slog.Error(fmt.Sprintf("SendAudioFrame failed, err: %v", err), logTag) + return + } + + return +} diff --git a/agents/ten_packages/extension/minimax_tts/property.json b/agents/ten_packages/extension/minimax_tts/property.json new file mode 100644 index 000000000..9e26dfeeb --- /dev/null +++ b/agents/ten_packages/extension/minimax_tts/property.json @@ -0,0 +1 @@ +{} \ No newline at end of file From b2847d3b7bd76e8a95d51215b22d35c605b9f606 Mon Sep 17 00:00:00 2001 From: sunshinexcode <24xinhui@163.com> Date: Wed, 13 Nov 2024 03:20:18 +0000 Subject: [PATCH 2/3] fix(): fix the way of reading streaming data --- .../extension/minimax_tts/minimax_tts.go | 26 ++++++++++++------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/agents/ten_packages/extension/minimax_tts/minimax_tts.go b/agents/ten_packages/extension/minimax_tts/minimax_tts.go index 33b5249d8..321ca3ba3 100644 --- a/agents/ten_packages/extension/minimax_tts/minimax_tts.go +++ b/agents/ten_packages/extension/minimax_tts/minimax_tts.go @@ -57,7 +57,7 @@ func newMinimaxTTS(config minimaxTTSConfig) (*minimaxTTS, error) { } func (e *minimaxTTS) textToSpeechStream(streamWriter io.Writer, text string) (err error) { - slog.Info("textToSpeechStream start tts", "text", text) + slog.Debug("textToSpeechStream start tts", "text", text) payload := map[string]any{ "audio_setting": map[string]any{ @@ -94,7 +94,7 @@ func (e *minimaxTTS) textToSpeechStream(streamWriter io.Writer, text string) (er defer func() { resp.RawBody().Close() - slog.Info("textToSpeechStream close response", "err", err, "text", text) + slog.Debug("textToSpeechStream close response", "err", err, "text", text) }() // Check the response status code @@ -103,12 +103,20 @@ func (e *minimaxTTS) textToSpeechStream(streamWriter io.Writer, text string) (er return fmt.Errorf("unexpected response status: %d", resp.StatusCode()) } - scanner := bufio.NewScanner(resp.RawBody()) - for scanner.Scan() { - line := scanner.Bytes() + reader := bufio.NewReader(resp.RawBody()) + for { + line, err := reader.ReadBytes('\n') + if err != nil { + if err == io.EOF { + break + } + + slog.Error("failed to read line", "error", err) + return err + } - if len(line) <= 5 || !bytes.HasPrefix(line, []byte("data:")) { - slog.Info("textToSpeechStream drop chunk", "text", text, "line", line) + if !bytes.HasPrefix(line, []byte("data:")) { + slog.Debug("drop chunk", "text", text, "line", line) continue } @@ -126,11 +134,11 @@ func (e *minimaxTTS) textToSpeechStream(streamWriter io.Writer, text string) (er if err = json.Unmarshal(line[5:], &chunk); err != nil { slog.Error("failed to decode JSON chunk", "err", err) - return + break } if chunk.Data.Status == 2 { - continue + break } audioData, err := hex.DecodeString(chunk.Data.Audio) From 3df22f5a63256104cbf9ff545c0c2295546ccf92 Mon Sep 17 00:00:00 2001 From: sunshinexcode <24xinhui@163.com> Date: Wed, 13 Nov 2024 13:20:57 +0000 Subject: [PATCH 3/3] feat(): add minimax tts extension for python --- agents/property.json | 7 +- .../extension/minimax_tts/manifest.json | 5 +- .../extension/minimax_tts/minimax_tts.go | 10 +- .../minimax_tts/minimax_tts_extension.go | 15 +- .../extension/minimax_tts_python/__init__.py | 11 + .../extension/minimax_tts_python/addon.py | 22 ++ .../extension/minimax_tts_python/extension.py | 293 ++++++++++++++++++ .../extension/minimax_tts_python/log.py | 19 ++ .../minimax_tts_python/manifest.json | 62 ++++ .../minimax_tts_python/property.json | 1 + .../minimax_tts_python/requirements.txt | 1 + 11 files changed, 437 insertions(+), 9 deletions(-) create mode 100644 agents/ten_packages/extension/minimax_tts_python/__init__.py create mode 100644 agents/ten_packages/extension/minimax_tts_python/addon.py create mode 100644 agents/ten_packages/extension/minimax_tts_python/extension.py create mode 100644 agents/ten_packages/extension/minimax_tts_python/log.py create mode 100644 agents/ten_packages/extension/minimax_tts_python/manifest.json create mode 100644 agents/ten_packages/extension/minimax_tts_python/property.json create mode 100644 agents/ten_packages/extension/minimax_tts_python/requirements.txt diff --git a/agents/property.json b/agents/property.json index 5b12dd7dc..8ba3c80d0 100644 --- a/agents/property.json +++ b/agents/property.json @@ -4634,13 +4634,14 @@ { "type": "extension", "extension_group": "tts", - "addon": "minimax_tts", + "addon": "minimax_tts_python", "name": "minimax_tts", "property": { "api_key": "${env:MINIMAX_TTS_API_KEY}", "group_id": "${env:MINIMAX_TTS_GROUP_ID}", "model": "speech-01-turbo", - "request_timeout_seconds": 30, + "request_timeout_seconds": 10, + "sample_rate": 32000, "url": "https://api.minimax.chat/v1/t2a_v2", "voice_id": "male-qn-qingse" } @@ -4760,4 +4761,4 @@ } ] } -} +} \ No newline at end of file diff --git a/agents/ten_packages/extension/minimax_tts/manifest.json b/agents/ten_packages/extension/minimax_tts/manifest.json index 5dd27f7b9..e4870308d 100644 --- a/agents/ten_packages/extension/minimax_tts/manifest.json +++ b/agents/ten_packages/extension/minimax_tts/manifest.json @@ -1,7 +1,7 @@ { "type": "extension", "name": "minimax_tts", - "version": "0.4.0", + "version": "0.1.0", "dependencies": [ { "type": "system", @@ -23,6 +23,9 @@ "request_timeout_seconds": { "type": "int64" }, + "sample_rate": { + "type": "int64" + }, "url": { "type": "string" }, diff --git a/agents/ten_packages/extension/minimax_tts/minimax_tts.go b/agents/ten_packages/extension/minimax_tts/minimax_tts.go index 321ca3ba3..003f57788 100644 --- a/agents/ten_packages/extension/minimax_tts/minimax_tts.go +++ b/agents/ten_packages/extension/minimax_tts/minimax_tts.go @@ -29,9 +29,10 @@ type minimaxTTS struct { type minimaxTTSConfig struct { ApiKey string + GroupId string Model string RequestTimeoutSeconds int - GroupId string + SampleRate int32 Url string VoiceId string } @@ -39,9 +40,10 @@ type minimaxTTSConfig struct { func defaultMinimaxTTSConfig() minimaxTTSConfig { return minimaxTTSConfig{ ApiKey: "", - Model: "speech-01-turbo", - RequestTimeoutSeconds: 30, GroupId: "", + Model: "speech-01-turbo", + RequestTimeoutSeconds: 10, + SampleRate: 32000, Url: "https://api.minimax.chat/v1/t2a_v2", VoiceId: "male-qn-qingse", } @@ -63,7 +65,7 @@ func (e *minimaxTTS) textToSpeechStream(streamWriter io.Writer, text string) (er "audio_setting": map[string]any{ "channel": 1, "format": "pcm", - "sample_rate": 32000, + "sample_rate": e.config.SampleRate, }, "model": e.config.Model, "pronunciation_dict": map[string]any{ diff --git a/agents/ten_packages/extension/minimax_tts/minimax_tts_extension.go b/agents/ten_packages/extension/minimax_tts/minimax_tts_extension.go index 696a34bbc..867bcedb3 100644 --- a/agents/ten_packages/extension/minimax_tts/minimax_tts_extension.go +++ b/agents/ten_packages/extension/minimax_tts/minimax_tts_extension.go @@ -28,6 +28,7 @@ const ( propertyGroupId = "group_id" // Required propertyModel = "model" // Optional propertyRequestTimeoutSeconds = "request_timeout_seconds" // Optional + propertySampleRate = "sample_rate" // Optional propertyUrl = "url" // Optional propertyVoiceId = "voice_id" // Optional ) @@ -65,6 +66,7 @@ func newMinimaxTTSExtension(name string) ten.Extension { // - group_id (required) // - model // - request_timeout_seconds +// - sample_rate // - url // - voice_id func (e *minimaxTTSExtension) OnStart(ten ten.TenEnv) { @@ -103,6 +105,14 @@ func (e *minimaxTTSExtension) OnStart(ten ten.TenEnv) { } } + if sampleRate, err := ten.GetPropertyInt64(propertySampleRate); err != nil { + slog.Warn(fmt.Sprintf("GetProperty optional %s failed, err: %v", propertySampleRate, err), logTag) + } else { + if sampleRate > 0 { + minimaxTTSConfig.SampleRate = int32(sampleRate) + } + } + if url, err := ten.GetPropertyString(propertyUrl); err != nil { slog.Warn(fmt.Sprintf("GetProperty optional %s failed, err: %v", propertyUrl, err), logTag) } else { @@ -130,7 +140,10 @@ func (e *minimaxTTSExtension) OnStart(ten ten.TenEnv) { e.minimaxTTS = minimaxTTS // create pcm instance - pcm := newPcm(defaultPcmConfig()) + pcmConfig := defaultPcmConfig() + pcmConfig.SampleRate = minimaxTTSConfig.SampleRate + pcmConfig.SamplesPerChannel = minimaxTTSConfig.SampleRate / 100 + pcm := newPcm(pcmConfig) pcmFrameSize := pcm.getPcmFrameSize() // init chan diff --git a/agents/ten_packages/extension/minimax_tts_python/__init__.py b/agents/ten_packages/extension/minimax_tts_python/__init__.py new file mode 100644 index 000000000..ada529452 --- /dev/null +++ b/agents/ten_packages/extension/minimax_tts_python/__init__.py @@ -0,0 +1,11 @@ +# +# +# Agora Real Time Engagement +# Created by Tomas Liu/XinHui Li in 2024. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +from . import addon +from .log import logger + +logger.info("minimax_tts_python extension loaded") diff --git a/agents/ten_packages/extension/minimax_tts_python/addon.py b/agents/ten_packages/extension/minimax_tts_python/addon.py new file mode 100644 index 000000000..2274dd496 --- /dev/null +++ b/agents/ten_packages/extension/minimax_tts_python/addon.py @@ -0,0 +1,22 @@ +# +# +# Agora Real Time Engagement +# Created by Tomas Liu/XinHui Li in 2024. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +from ten import ( + Addon, + register_addon_as_extension, + TenEnv, +) +from .extension import MinimaxTTSExtension +from .log import logger + + +@register_addon_as_extension("minimax_tts_python") +class MinimaxTTSExtensionAddon(Addon): + + def on_create_instance(self, ten_env: TenEnv, name: str, context) -> None: + logger.info("on_create_instance") + ten_env.on_create_instance_done(MinimaxTTSExtension(name), context) diff --git a/agents/ten_packages/extension/minimax_tts_python/extension.py b/agents/ten_packages/extension/minimax_tts_python/extension.py new file mode 100644 index 000000000..6a09cd1a3 --- /dev/null +++ b/agents/ten_packages/extension/minimax_tts_python/extension.py @@ -0,0 +1,293 @@ +# +# +# Agora Real Time Engagement +# Created by Tomas Liu/XinHui Li in 2024. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +import threading +from datetime import datetime +import requests +import json + +from queue import Queue +from typing import Iterator + +from ten import ( + AudioFrame, + AudioFrameDataFmt, + VideoFrame, + Extension, + TenEnv, + Cmd, + StatusCode, + CmdResult, + Data, +) +from .log import logger + +PROPERTY_API_KEY = "api_key" +PROPERTY_GROUP_ID = "group_id" +PROPERTY_MODEL = "model" +PROPERTY_REQUEST_TIMEOUT_SECONDS = "request_timeout_seconds" +PROPERTY_SAMPLE_RATE = "sample_rate" +PROPERTY_URL = "url" +PROPERTY_VOICE_ID = "voice_id" + + +class MinimaxTTSExtension(Extension): + ten_env: TenEnv = None + + api_key: str = "" + dump: bool = False + group_id: str = "" + model: str = "speech-01-turbo" + request_timeout_seconds: int = 10 + sample_rate: int = 32000 + url: str = "https://api.minimax.chat/v1/t2a_v2" + voice_id: str = "male-qn-qingse" + + thread: threading.Thread = None + queue = Queue() + + stopped: bool = False + outdate_ts = datetime.now() + mutex = threading.Lock() + + def on_init(self, ten_env: TenEnv) -> None: + logger.info("MinimaxTTSExtension on_init") + self.ten_env = ten_env + ten_env.on_init_done() + + def on_start(self, ten_env: TenEnv) -> None: + logger.info("MinimaxTTSExtension on_start") + + try: + self.api_key = ten_env.get_property_string(PROPERTY_API_KEY) + except Exception as err: + logger.info(f"GetProperty required {PROPERTY_API_KEY} failed, err: {err}") + + try: + self.group_id = ten_env.get_property_string(PROPERTY_GROUP_ID) + except Exception as err: + logger.info(f"GetProperty required {PROPERTY_GROUP_ID} failed, err: {err}") + return + + try: + self.model = ten_env.get_property_string(PROPERTY_MODEL) + except Exception as err: + logger.info(f"GetProperty optional {PROPERTY_MODEL} failed, err: {err}") + + try: + self.sample_rate = ten_env.get_property_int(PROPERTY_SAMPLE_RATE) + except Exception as err: + logger.info(f"GetProperty optional {PROPERTY_SAMPLE_RATE} failed, err: {err}") + + try: + self.request_timeout_seconds = ten_env.get_property_int(PROPERTY_REQUEST_TIMEOUT_SECONDS) + except Exception as err: + logger.info(f"GetProperty optional {PROPERTY_REQUEST_TIMEOUT_SECONDS} failed, err: {err}") + + try: + self.voice_id = ten_env.get_property_string(PROPERTY_VOICE_ID) + except Exception as err: + logger.info(f"GetProperty optional {PROPERTY_VOICE_ID} failed, err: {err}") + + self.thread = threading.Thread(target=self.loop) + self.thread.start() + + ten_env.on_start_done() + + def on_stop(self, ten_env: TenEnv) -> None: + logger.info("MinimaxTTSExtension on_stop") + + self.stopped = True + self._flush() + self.queue.put(None) + if self.thread: + self.thread.join() + self.thread = None + + ten_env.on_stop_done() + + def loop(self) -> None: + while not self.stopped: + entry = self.queue.get() + if entry is None: + return + + try: + ts, text = entry + if self._need_interrupt(ts): + continue + self._call_tts_stream(ts, text) + except Exception as e: + logger.exception(f"Failed to handle entry, err {e}") + + def on_cmd(self, ten_env: TenEnv, cmd: Cmd) -> None: + cmd_name = cmd.get_name() + logger.info("on_cmd name {}".format(cmd_name)) + + if cmd_name == "flush": + self._flush() + + out_cmd = Cmd.create("flush") + ten_env.send_cmd( + out_cmd, lambda ten, result: logger.info( + "send_cmd flush done"), + ) + + cmd_result = CmdResult.create(StatusCode.OK) + ten_env.return_result(cmd_result, cmd) + + def on_data(self, ten_env: TenEnv, data: Data) -> None: + logger.debug("on_data") + + try: + text = data.get_property_string("text") + except Exception as e: + logger.warning(f"on_data get_property_string text error: {e}") + return + + if len(text) == 0: + logger.debug("on_data text is empty, ignored") + return + + logger.info(f"OnData input text: [{text}]") + + self.queue.put((datetime.now(), text)) + + def on_audio_frame(self, ten_env: TenEnv, audio_frame: AudioFrame) -> None: + pass + + def on_video_frame(self, ten_env: TenEnv, video_frame: VideoFrame) -> None: + pass + + def _need_interrupt(self, ts: datetime.time) -> bool: + with self.mutex: + return self.outdate_ts > ts + + def _call_tts_stream(self, ts: datetime, text: str) -> Iterator[bytes]: + payload = { + "model": self.model, + "text": text, + "stream": True, + "voice_setting": { + "voice_id": self.voice_id, + "speed": 1.0, + "vol": 1.0, + "pitch": 0 + }, + "pronunciation_dict": { + "tone": [] + }, + "audio_setting": { + "sample_rate": self.sample_rate, + "format": "pcm", + "channel": 1 + } + } + + url = "%s?GroupId=%s" % (self.url, self.group_id) + headers = { + 'accept': 'application/json, text/plain, */*', + "Authorization": f"Bearer {self.api_key}", + "Content-Type": "application/json" + } + + start_time = datetime.now() + logger.info(f"start request, url: {self.url}, text: {text}") + ttfb = None + try: + with requests.request("POST", url, stream=True, headers=headers, data=json.dumps(payload), timeout=self.request_timeout_seconds) as response: + trace_id = "" + alb_receive_time = "" + + try: + trace_id = response.headers.get("Trace-Id") + except: + logger.warning("get response, no Trace-Id") + try: + alb_receive_time = response.headers.get("alb_receive_time") + except: + logger.warning("get response, no alb_receive_time") + + logger.info(f"get response trace-id: {trace_id}, alb_receive_time: {alb_receive_time}, cost_time {self._duration_in_ms_since(start_time)}ms") + + response.raise_for_status() + + for chunk in (response.raw): + if self._need_interrupt(ts): + logger.warning(f"trace-id: {trace_id}, interrupted") + break + + if not chunk: + continue + if chunk[:5] != b'data:': + logger.debug(f"invalid chunk data {data}") + continue + + logger.debug(f"chunk len {len(chunk)}") + data = json.loads(chunk[5:]) + + if "extra_info" in data: + break + + if "data" not in data: + logger.warning(f"invalid chunk data {data}") + continue + + if "audio" not in data["data"]: + logger.warning(f"invalid chunk data {data}") + continue + + audio = data["data"]['audio'] + if audio is not None and audio != '\n': + decoded_hex = bytes.fromhex(audio) + if len(decoded_hex) > 0: + self._send_audio_out(decoded_hex) + + if not ttfb: + ttfb = self._duration_in_ms_since(start_time) + logger.info(f"trace-id: {trace_id}, ttfb {ttfb}ms") + except Exception as e: + logger.warning(f"unknown err {e}") + finally: + logger.info(f"http loop done, cost_time {self._duration_in_ms_since(start_time)}ms") + + def _send_audio_out(self, audio_data: bytearray) -> None: + self._dump_audio_if_need(audio_data, "out") + + try: + f = AudioFrame.create("pcm_frame") + f.set_sample_rate(self.sample_rate) + f.set_bytes_per_sample(2) + f.set_number_of_channels(1) + f.set_data_fmt(AudioFrameDataFmt.INTERLEAVE) + f.set_samples_per_channel(len(audio_data) // 2) + f.alloc_buf(len(audio_data)) + buff = f.lock_buf() + buff[:] = audio_data + f.unlock_buf(buff) + self.ten_env.send_audio_frame(f) + except Exception as e: + logger.exception("error send audio frame, {e}") + + def _flush(self) -> None: + with self.mutex: + self.outdate_ts = datetime.now() + while not self.queue.empty(): + self.queue.get() + + def _dump_audio_if_need(self, buf: bytearray, suffix: str) -> None: + if not self.dump: + return + + with open("{}_{}.pcm".format("minimax_tts", suffix), "ab") as dump_file: + dump_file.write(buf) + + def _duration_in_ms(self, start: datetime, end: datetime) -> int: + return int((end - start).total_seconds() * 1000) + + def _duration_in_ms_since(self, start: datetime) -> int: + return self._duration_in_ms(start, datetime.now()) diff --git a/agents/ten_packages/extension/minimax_tts_python/log.py b/agents/ten_packages/extension/minimax_tts_python/log.py new file mode 100644 index 000000000..2aeb786bc --- /dev/null +++ b/agents/ten_packages/extension/minimax_tts_python/log.py @@ -0,0 +1,19 @@ +# +# +# Agora Real Time Engagement +# Created by Tomas Liu/XinHui Li in 2024. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +import logging + +logger = logging.getLogger("minimax_tts_python") +logger.setLevel(logging.INFO) + +formatter_str = ("%(asctime)s - %(name)s - %(levelname)s - %(process)d - [%(filename)s:%(lineno)d] - %(message)s") +formatter = logging.Formatter(formatter_str) + +console_handler = logging.StreamHandler() +console_handler.setFormatter(formatter) + +logger.addHandler(console_handler) diff --git a/agents/ten_packages/extension/minimax_tts_python/manifest.json b/agents/ten_packages/extension/minimax_tts_python/manifest.json new file mode 100644 index 000000000..5cf3a82fb --- /dev/null +++ b/agents/ten_packages/extension/minimax_tts_python/manifest.json @@ -0,0 +1,62 @@ +{ + "type": "extension", + "name": "minimax_tts_python", + "version": "0.1.0", + "dependencies": [ + { + "type": "system", + "name": "ten_runtime_python", + "version": "0.3" + } + ], + "api": { + "property": { + "api_key": { + "type": "string" + }, + "group_id": { + "type": "string" + }, + "model": { + "type": "string" + }, + "request_timeout_seconds": { + "type": "int64" + }, + "sample_rate": { + "type": "int64" + }, + "url": { + "type": "string" + }, + "voice_id": { + "type": "string" + } + }, + "data_in": [ + { + "name": "text_data", + "property": { + "text": { + "type": "string" + } + } + } + ], + "cmd_in": [ + { + "name": "flush" + } + ], + "cmd_out": [ + { + "name": "flush" + } + ], + "audio_frame_out": [ + { + "name": "pcm_frame" + } + ] + } +} \ No newline at end of file diff --git a/agents/ten_packages/extension/minimax_tts_python/property.json b/agents/ten_packages/extension/minimax_tts_python/property.json new file mode 100644 index 000000000..9e26dfeeb --- /dev/null +++ b/agents/ten_packages/extension/minimax_tts_python/property.json @@ -0,0 +1 @@ +{} \ No newline at end of file diff --git a/agents/ten_packages/extension/minimax_tts_python/requirements.txt b/agents/ten_packages/extension/minimax_tts_python/requirements.txt new file mode 100644 index 000000000..ef487e06e --- /dev/null +++ b/agents/ten_packages/extension/minimax_tts_python/requirements.txt @@ -0,0 +1 @@ +requests==2.32.3 \ No newline at end of file