diff --git a/dotnet/samples/Hello/Program.cs b/dotnet/samples/Hello/Program.cs index ef55511d044f..26df86295eba 100644 --- a/dotnet/samples/Hello/Program.cs +++ b/dotnet/samples/Hello/Program.cs @@ -47,7 +47,6 @@ public async Task Handle(ConversationClosed item) Message = goodbye }.ToCloudEvent(this.AgentId.Key); await PublishEvent(evt).ConfigureAwait(false); - await Task.Delay(60000); await App.ShutdownAsync(); } public async Task SayHello(string ask) diff --git a/dotnet/src/Microsoft.AutoGen.Agents/Client/AgentWorkerRuntime.cs b/dotnet/src/Microsoft.AutoGen.Agents/Client/AgentWorkerRuntime.cs index 5483efd72bc2..8fe2d8b1c381 100644 --- a/dotnet/src/Microsoft.AutoGen.Agents/Client/AgentWorkerRuntime.cs +++ b/dotnet/src/Microsoft.AutoGen.Agents/Client/AgentWorkerRuntime.cs @@ -99,11 +99,21 @@ private async Task RunReadPump() } } } - catch (Exception ex) + catch (OperationCanceledException) + { + // Time to shut down. + break; + } + catch (Exception ex) when (!_shutdownCts.IsCancellationRequested) { _logger.LogError(ex, "Error reading from channel."); channel = RecreateChannel(channel); } + catch + { + // Shutdown requested. + break; + } } } @@ -113,34 +123,39 @@ private async Task RunWritePump() var outboundMessages = _outboundMessagesChannel.Reader; while (!_shutdownCts.IsCancellationRequested) { - await outboundMessages.WaitToReadAsync().ConfigureAwait(false); - - // Read the next message if we don't already have an unsent message - // waiting to be sent. - if (!outboundMessages.TryRead(out var message)) + try { - break; - } + await outboundMessages.WaitToReadAsync().ConfigureAwait(false); - while (!_shutdownCts.IsCancellationRequested) - { - try + // Read the next message if we don't already have an unsent message + // waiting to be sent. + if (!outboundMessages.TryRead(out var message)) { - await channel.RequestStream.WriteAsync(message, _shutdownCts.Token).ConfigureAwait(false); break; } - catch (Exception ex) when (!_shutdownCts.IsCancellationRequested) - { - _logger.LogError(ex, "Error writing to channel."); - channel = RecreateChannel(channel); - continue; - } - catch + + while (!_shutdownCts.IsCancellationRequested) { - // Shutdown requested. + await channel.RequestStream.WriteAsync(message, _shutdownCts.Token).ConfigureAwait(false); break; } } + catch (OperationCanceledException) + { + // Time to shut down. + break; + } + catch (Exception ex) when (!_shutdownCts.IsCancellationRequested) + { + _logger.LogError(ex, "Error writing to channel."); + channel = RecreateChannel(channel); + continue; + } + catch + { + // Shutdown requested. + break; + } } } @@ -286,10 +301,6 @@ void StartCore() public async Task StopAsync(CancellationToken cancellationToken) { _shutdownCts.Cancel(); - lock (_channelLock) - { - _channel?.Dispose(); - } _outboundMessagesChannel.Writer.TryComplete(); @@ -302,6 +313,10 @@ public async Task StopAsync(CancellationToken cancellationToken) { await writeTask.ConfigureAwait(false); } + lock (_channelLock) + { + _channel?.Dispose(); + } } public ValueTask SendRequest(RpcRequest request) diff --git a/dotnet/src/Microsoft.AutoGen.Agents/Client/App.cs b/dotnet/src/Microsoft.AutoGen.Agents/Client/App.cs index 65a4abb327f6..cb25863d6f3b 100644 --- a/dotnet/src/Microsoft.AutoGen.Agents/Client/App.cs +++ b/dotnet/src/Microsoft.AutoGen.Agents/Client/App.cs @@ -49,7 +49,7 @@ public static async ValueTask ShutdownAsync() { throw new InvalidOperationException("Client not started"); } - await RuntimeApp!.StopAsync(); await ClientApp.StopAsync(); + await RuntimeApp!.StopAsync(); } } diff --git a/dotnet/src/Microsoft.AutoGen.Agents/Runtime/WorkerProcessConnection.cs b/dotnet/src/Microsoft.AutoGen.Agents/Runtime/WorkerProcessConnection.cs index b4b7e1e1acc0..23febaaa4df9 100644 --- a/dotnet/src/Microsoft.AutoGen.Agents/Runtime/WorkerProcessConnection.cs +++ b/dotnet/src/Microsoft.AutoGen.Agents/Runtime/WorkerProcessConnection.cs @@ -87,6 +87,9 @@ public async Task RunReadPump() _gateway.OnReceivedMessageAsync(this, message).Ignore(); } } + catch (OperationCanceledException) + { + } finally { _shutdownCancellationToken.Cancel(); @@ -104,6 +107,9 @@ public async Task RunWritePump() await ResponseStream.WriteAsync(message); } } + catch (OperationCanceledException) + { + } finally { _shutdownCancellationToken.Cancel();