From ae7680da3d7bedde6b9b3332cec7be84bb34587f Mon Sep 17 00:00:00 2001 From: Haiping Chen Date: Mon, 10 Mar 2025 10:10:22 -0500 Subject: [PATCH] Hang-up if waiting AI Response Timeout --- .../Conversations/IConversationService.cs | 7 +- .../BotSharp.Plugin.Twilio.csproj | 3 +- .../Controllers/TwilioVoiceController.cs | 22 +- .../Models/ConversationalVoiceRequest.cs | 8 +- .../Services/TwilioMessageQueueService.cs | 283 +++++++++--------- 5 files changed, 171 insertions(+), 152 deletions(-) diff --git a/src/Infrastructure/BotSharp.Abstraction/Conversations/IConversationService.cs b/src/Infrastructure/BotSharp.Abstraction/Conversations/IConversationService.cs index 6e558b1b7..4bb795ada 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Conversations/IConversationService.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Conversations/IConversationService.cs @@ -32,13 +32,12 @@ public interface IConversationService /// Send message to LLM /// /// - /// + /// + /// /// Received the response from AI Agent - /// This delegate is useful when you want to report progress on UI - /// This delegate is useful when you want to report progress on UI /// Task SendMessage(string agentId, - RoleDialogModel lastDalog, + RoleDialogModel lastDialog, PostbackMessageModel? replyMessage, Func onResponseReceived); diff --git a/src/Plugins/BotSharp.Plugin.Twilio/BotSharp.Plugin.Twilio.csproj b/src/Plugins/BotSharp.Plugin.Twilio/BotSharp.Plugin.Twilio.csproj index 3f820090f..727143886 100644 --- a/src/Plugins/BotSharp.Plugin.Twilio/BotSharp.Plugin.Twilio.csproj +++ b/src/Plugins/BotSharp.Plugin.Twilio/BotSharp.Plugin.Twilio.csproj @@ -1,4 +1,4 @@ - + $(TargetFramework) @@ -6,6 +6,7 @@ $(BotSharpVersion) $(GeneratePackageOnBuild) $(SolutionDir)packages + enable diff --git a/src/Plugins/BotSharp.Plugin.Twilio/Controllers/TwilioVoiceController.cs b/src/Plugins/BotSharp.Plugin.Twilio/Controllers/TwilioVoiceController.cs index 7b041ca69..807d7af8e 100644 --- a/src/Plugins/BotSharp.Plugin.Twilio/Controllers/TwilioVoiceController.cs +++ b/src/Plugins/BotSharp.Plugin.Twilio/Controllers/TwilioVoiceController.cs @@ -142,7 +142,7 @@ public async Task ReceiveCallerMessage(ConversationalVoiceRequest r await messageQueue.EnqueueAsync(callerMessage); response = new VoiceResponse(); - response.Redirect(new Uri($"{_settings.CallbackHost}/twilio/voice/{request.ConversationId}/reply/{request.SeqNum}?{GenerateStatesParameter(request.States)}"), HttpMethod.Post); + response.Redirect(new Uri($"{_settings.CallbackHost}/twilio/voice/{request.ConversationId}/reply/{request.SeqNum}?{GenerateStatesParameter(request.States)}&AIResponseWaitTime=0"), HttpMethod.Post); await HookEmitter.Emit(_services, async hook => { @@ -217,8 +217,22 @@ public async Task ReplyCallerMessage(ConversationalVoiceRequest req var reply = await sessionManager.GetAssistantReplyAsync(request.ConversationId, request.SeqNum); VoiceResponse response; + + if (request.AIResponseWaitTime > 5) + { + // Wait AI Response Timeout + await HookEmitter.Emit(_services, async hook => + { + request.AIResponseErrorMessage = $"AI response timeout: AIResponseWaitTime greater than {request.AIResponseWaitTime}, please check internal error log!"; + await hook.OnAgentHangUp(request); + }, new HookEmitOption + { + OnlyOnce = true + }); - if (reply == null) + response = twilio.HangUp($"twilio/error.mp3"); + } + else if (reply == null) { var indication = await sessionManager.GetReplyIndicationAsync(request.ConversationId, request.SeqNum); if (indication != null) @@ -262,7 +276,7 @@ public async Task ReplyCallerMessage(ConversationalVoiceRequest req var instruction = new ConversationalVoiceResponse { SpeechPaths = speechPaths, - CallbackPath = $"twilio/voice/{request.ConversationId}/reply/{request.SeqNum}?{GenerateStatesParameter(request.States)}", + CallbackPath = $"twilio/voice/{request.ConversationId}/reply/{request.SeqNum}?{GenerateStatesParameter(request.States)}&AIResponseWaitTime={++request.AIResponseWaitTime}", ActionOnEmptyResult = true }; @@ -301,7 +315,7 @@ await HookEmitter.Emit(_services, async hook => var instruction = new ConversationalVoiceResponse { SpeechPaths = instructions, - CallbackPath = $"twilio/voice/{request.ConversationId}/reply/{request.SeqNum}?{GenerateStatesParameter(request.States)}", + CallbackPath = $"twilio/voice/{request.ConversationId}/reply/{request.SeqNum}?{GenerateStatesParameter(request.States)}&AIResponseWaitTime={++request.AIResponseWaitTime}", ActionOnEmptyResult = true }; diff --git a/src/Plugins/BotSharp.Plugin.Twilio/Models/ConversationalVoiceRequest.cs b/src/Plugins/BotSharp.Plugin.Twilio/Models/ConversationalVoiceRequest.cs index e2b6ee13b..33ee76335 100644 --- a/src/Plugins/BotSharp.Plugin.Twilio/Models/ConversationalVoiceRequest.cs +++ b/src/Plugins/BotSharp.Plugin.Twilio/Models/ConversationalVoiceRequest.cs @@ -5,17 +5,19 @@ namespace BotSharp.Plugin.Twilio.Models; public class ConversationalVoiceRequest : VoiceRequest { [FromQuery(Name = "agent-id")] - public string AgentId { get; set; } + public string AgentId { get; set; } = string.Empty; [FromRoute] - public string ConversationId { get; set; } + public string ConversationId { get; set; } = string.Empty; [FromRoute] public int SeqNum { get; set; } public int Attempts { get; set; } = 1; + public int AIResponseWaitTime { get; set; } = 0; + public string? AIResponseErrorMessage { get; set; } = string.Empty; - public string Intent { get; set; } + public string Intent { get; set; } = string.Empty; public List States { get; set; } = []; } diff --git a/src/Plugins/BotSharp.Plugin.Twilio/Services/TwilioMessageQueueService.cs b/src/Plugins/BotSharp.Plugin.Twilio/Services/TwilioMessageQueueService.cs index 2c942aa51..fb992c4a1 100644 --- a/src/Plugins/BotSharp.Plugin.Twilio/Services/TwilioMessageQueueService.cs +++ b/src/Plugins/BotSharp.Plugin.Twilio/Services/TwilioMessageQueueService.cs @@ -9,175 +9,178 @@ using System.Threading; using Task = System.Threading.Tasks.Task; -namespace BotSharp.Plugin.Twilio.Services +namespace BotSharp.Plugin.Twilio.Services; + +public class TwilioMessageQueueService : BackgroundService { - public class TwilioMessageQueueService : BackgroundService - { - private readonly TwilioMessageQueue _queue; - private readonly IServiceProvider _serviceProvider; - private readonly SemaphoreSlim _throttler; + private readonly TwilioMessageQueue _queue; + private readonly IServiceProvider _serviceProvider; + private readonly SemaphoreSlim _throttler; + private readonly ILogger _logger; - public TwilioMessageQueueService( - TwilioMessageQueue queue, - IServiceProvider serviceProvider) - { - _queue = queue; - _serviceProvider = serviceProvider; - _throttler = new SemaphoreSlim(4, 4); - } + public TwilioMessageQueueService( + TwilioMessageQueue queue, + IServiceProvider serviceProvider, + ILogger logger) + { + _queue = queue; + _serviceProvider = serviceProvider; + _throttler = new SemaphoreSlim(10, 10); + _logger = logger; + } - protected override async Task ExecuteAsync(CancellationToken stoppingToken) + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + await foreach (var message in _queue.Reader.ReadAllAsync(stoppingToken)) { - await foreach (var message in _queue.Reader.ReadAllAsync(stoppingToken)) + await _throttler.WaitAsync(stoppingToken); + _ = Task.Run(async () => { - await _throttler.WaitAsync(stoppingToken); - _ = Task.Run(async () => + try { - try - { - Console.WriteLine($"Start processing {message}."); - await ProcessUserMessageAsync(message); - } - catch (Exception ex) - { - Console.WriteLine($"Processing {message} failed due to {ex.Message}."); - } - finally - { - _throttler.Release(); - } - }); - } + _logger.LogInformation($"Start processing {message}."); + await ProcessUserMessageAsync(message); + } + catch (Exception ex) + { + _logger.LogError($"Processing {message} failed due to {ex.Message}."); + } + finally + { + _throttler.Release(); + } + }); } + } + + public override async Task StopAsync(CancellationToken cancellationToken) + { + _queue.Stop(); + await base.StopAsync(cancellationToken); + } - public override async Task StopAsync(CancellationToken cancellationToken) + private async Task ProcessUserMessageAsync(CallerMessage message) + { + using var scope = _serviceProvider.CreateScope(); + var sp = scope.ServiceProvider; + + // Clean static HttpContext + var httpContext = sp.GetRequiredService(); + httpContext.HttpContext = new DefaultHttpContext(); + httpContext.HttpContext.User = new ClaimsPrincipal(new ClaimsIdentity()); + foreach (var header in message.RequestHeaders ?? []) { - _queue.Stop(); - await base.StopAsync(cancellationToken); + httpContext.HttpContext.Request.Headers[header.Key] = header.Value; } + httpContext.HttpContext.Request.Headers["X-Twilio-BotSharp"] = "LOST"; - private async Task ProcessUserMessageAsync(CallerMessage message) - { - using var scope = _serviceProvider.CreateScope(); - var sp = scope.ServiceProvider; - - // Clean static HttpContext - var httpContext = sp.GetRequiredService(); - httpContext.HttpContext = new DefaultHttpContext(); - httpContext.HttpContext.User = new ClaimsPrincipal(new ClaimsIdentity()); - foreach (var header in message.RequestHeaders ?? []) + AssistantMessage reply = null; + + var inputMsg = new RoleDialogModel(AgentRole.User, message.Content); + var conv = sp.GetRequiredService(); + var routing = sp.GetRequiredService(); + var config = sp.GetRequiredService(); + var sessionManager = sp.GetRequiredService(); + var progressService = sp.GetRequiredService(); + InitProgressService(message, sessionManager, progressService); + InitConversation(message, inputMsg, conv, routing); + + // Need to consider Inbound and Outbound call + var conversation = await conv.GetConversation(message.ConversationId); + var agentId = string.IsNullOrWhiteSpace(conversation?.AgentId) ? config.AgentId : conversation.AgentId; + + var result = await conv.SendMessage(agentId, + inputMsg, + replyMessage: BuildPostbackMessageModel(conv, message), + async msg => { - httpContext.HttpContext.Request.Headers[header.Key] = header.Value; - } - httpContext.HttpContext.Request.Headers["X-Twilio-BotSharp"] = "LOST"; - - AssistantMessage reply = null; - var inputMsg = new RoleDialogModel(AgentRole.User, message.Content); - var conv = sp.GetRequiredService(); - var routing = sp.GetRequiredService(); - var config = sp.GetRequiredService(); - var sessionManager = sp.GetRequiredService(); - var progressService = sp.GetRequiredService(); - InitProgressService(message, sessionManager, progressService); - InitConversation(message, inputMsg, conv, routing); - - // Need to consider Inbound and Outbound call - var conversation = await conv.GetConversation(message.ConversationId); - var agentId = string.IsNullOrWhiteSpace(conversation?.AgentId) ? config.AgentId : conversation.AgentId; - - var result = await conv.SendMessage(agentId, - inputMsg, - replyMessage: BuildPostbackMessageModel(conv, message), - async msg => + reply = new AssistantMessage() { - reply = new AssistantMessage() - { - ConversationEnd = msg.Instruction?.ConversationEnd ?? false, - HumanIntervationNeeded = string.Equals("human_intervention_needed", msg.FunctionName), - Content = msg.Content, - MessageId = msg.MessageId - }; - } - ); - reply.SpeechFileName = await GetReplySpeechFileName(message.ConversationId, reply, sp); - reply.Hints = GetHints(reply); - reply.Content = null; - await sessionManager.SetAssistantReplyAsync(message.ConversationId, message.SeqNumber, reply); - } + ConversationEnd = msg.Instruction?.ConversationEnd ?? false, + HumanIntervationNeeded = string.Equals("human_intervention_needed", msg.FunctionName), + Content = msg.Content, + MessageId = msg.MessageId + }; + } + ); + reply.SpeechFileName = await GetReplySpeechFileName(message.ConversationId, reply, sp); + reply.Hints = GetHints(reply); + reply.Content = null; + await sessionManager.SetAssistantReplyAsync(message.ConversationId, message.SeqNumber, reply); + } - private PostbackMessageModel BuildPostbackMessageModel(IConversationService conv, CallerMessage message) + private PostbackMessageModel BuildPostbackMessageModel(IConversationService conv, CallerMessage message) + { + var messages = conv.GetDialogHistory(1); + if (!messages.Any()) return null; + var lastMessage = messages[0]; + if (string.IsNullOrEmpty(lastMessage.PostbackFunctionName)) return null; + return new PostbackMessageModel { - var messages = conv.GetDialogHistory(1); - if (!messages.Any()) return null; - var lastMessage = messages[0]; - if (string.IsNullOrEmpty(lastMessage.PostbackFunctionName)) return null; - return new PostbackMessageModel - { - FunctionName = lastMessage.PostbackFunctionName, - ParentId = lastMessage.MessageId, - Payload = message.Digits - }; - } + FunctionName = lastMessage.PostbackFunctionName, + ParentId = lastMessage.MessageId, + Payload = message.Digits + }; + } - private static void InitConversation(CallerMessage message, RoleDialogModel inputMsg, IConversationService conv, IRoutingService routing) + private static void InitConversation(CallerMessage message, RoleDialogModel inputMsg, IConversationService conv, IRoutingService routing) + { + routing.Context.SetMessageId(message.ConversationId, inputMsg.MessageId); + var states = new List { - routing.Context.SetMessageId(message.ConversationId, inputMsg.MessageId); - var states = new List - { - new("channel", ConversationChannel.Phone), - new("channel_id", message.From), - new("calling_phone", message.From) - }; - states.AddRange(message.States.Select(kvp => new MessageState(kvp.Key, kvp.Value))); - conv.SetConversationId(message.ConversationId, states); - } + new("channel", ConversationChannel.Phone), + new("channel_id", message.From), + new("calling_phone", message.From) + }; + states.AddRange(message.States.Select(kvp => new MessageState(kvp.Key, kvp.Value))); + conv.SetConversationId(message.ConversationId, states); + } - private static async Task GetReplySpeechFileName(string conversationId, AssistantMessage reply, IServiceProvider sp) - { - var completion = CompletionProvider.GetAudioCompletion(sp, "openai", "tts-1"); - var fileStorage = sp.GetRequiredService(); - var data = await completion.GenerateAudioFromTextAsync(reply.Content); - var fileName = $"reply_{reply.MessageId}.mp3"; - fileStorage.SaveSpeechFile(conversationId, fileName, data); - return fileName; - } + private static async Task GetReplySpeechFileName(string conversationId, AssistantMessage reply, IServiceProvider sp) + { + var completion = CompletionProvider.GetAudioCompletion(sp, "openai", "tts-1"); + var fileStorage = sp.GetRequiredService(); + var data = await completion.GenerateAudioFromTextAsync(reply.Content); + var fileName = $"reply_{reply.MessageId}.mp3"; + fileStorage.SaveSpeechFile(conversationId, fileName, data); + return fileName; + } - private static string GetHints(AssistantMessage reply) + private static string GetHints(AssistantMessage reply) + { + var phrases = reply.Content.Split(',', StringSplitOptions.RemoveEmptyEntries); + int capcity = 100; + var hints = new List(capcity); + for (int i = phrases.Length - 1; i >= 0; i--) { - var phrases = reply.Content.Split(',', StringSplitOptions.RemoveEmptyEntries); - int capcity = 100; - var hints = new List(capcity); - for (int i = phrases.Length - 1; i >= 0; i--) + var words = phrases[i].Split(' ', StringSplitOptions.RemoveEmptyEntries); + for (int j = words.Length - 1; j >= 0; j--) { - var words = phrases[i].Split(' ', StringSplitOptions.RemoveEmptyEntries); - for (int j = words.Length - 1; j >= 0; j--) - { - hints.Add(words[j]); - if (hints.Count >= capcity) - { - break; - } - } + hints.Add(words[j]); if (hints.Count >= capcity) { break; } } - // add frequency short words - hints.AddRange(["yes", "no", "correct", "right"]); - return string.Join(", ", hints.Select(x => x.ToLower()).Distinct().Reverse()); + if (hints.Count >= capcity) + { + break; + } } + // add frequency short words + hints.AddRange(["yes", "no", "correct", "right"]); + return string.Join(", ", hints.Select(x => x.ToLower()).Distinct().Reverse()); + } - private static void InitProgressService(CallerMessage message, ITwilioSessionManager sessionManager, IConversationProgressService progressService) + private static void InitProgressService(CallerMessage message, ITwilioSessionManager sessionManager, IConversationProgressService progressService) + { + progressService.OnFunctionExecuting = async msg => { - progressService.OnFunctionExecuting = async msg => + if (!string.IsNullOrEmpty(msg.Indication)) { - if (!string.IsNullOrEmpty(msg.Indication)) - { - await sessionManager.SetReplyIndicationAsync(message.ConversationId, message.SeqNumber, msg.Indication); - } - }; - progressService.OnFunctionExecuted = async msg => { }; - } + await sessionManager.SetReplyIndicationAsync(message.ConversationId, message.SeqNumber, msg.Indication); + } + }; + progressService.OnFunctionExecuted = async msg => { }; } }