diff --git a/KS.Fiks.IO.Client.Tests/Amqp/AmqpHandlerFixture.cs b/KS.Fiks.IO.Client.Tests/Amqp/AmqpHandlerFixture.cs index b0c20b6..16d5819 100644 --- a/KS.Fiks.IO.Client.Tests/Amqp/AmqpHandlerFixture.cs +++ b/KS.Fiks.IO.Client.Tests/Amqp/AmqpHandlerFixture.cs @@ -25,6 +25,7 @@ public class AmqpHandlerFixture private string _token = "testtoken"; private Guid _integrationId = Guid.NewGuid(); private string _integrationPassword = "defaultPassword"; + private IAmqpHandler? _amqpHandler; public AmqpHandlerFixture WhereConnectionFactoryThrowsException() { @@ -56,6 +57,11 @@ public AmqpHandlerFixture WithIntegrationId(Guid id) return this; } + public void SetConnectionToNull() + { + ((AmqpHandler)GetAmqpHandler()).SetConnection(null); + } + public Mock ConnectionFactoryMock { get; } = new Mock(); public Mock ConnectionMock { get; } = new Mock(); @@ -76,7 +82,7 @@ internal async Task CreateSutAsync() { SetupMocks(); var amqpConfiguration = CreateConfiguration(); - var amqpHandler = await AmqpHandler.CreateAsync( + _amqpHandler = await AmqpHandler.CreateAsync( MaskinportenClientMock.Object, SendHandlerMock.Object, DokumentlagerHandlerMock.Object, @@ -87,7 +93,7 @@ internal async Task CreateSutAsync() ConnectionFactoryMock.Object, AmqpConsumerFactoryMock.Object).ConfigureAwait(false); - return amqpHandler; + return _amqpHandler; } private static AmqpConfiguration CreateConfiguration() @@ -159,5 +165,10 @@ private IntegrasjonConfiguration CreateIntegrationConfiguration() { return new IntegrasjonConfiguration(_integrationId, _integrationPassword); } + + private object GetAmqpHandler() + { + return _amqpHandler ?? throw new InvalidOperationException("SUT is not created yet."); + } } } \ No newline at end of file diff --git a/KS.Fiks.IO.Client.Tests/Amqp/AmqpHandlerTests.cs b/KS.Fiks.IO.Client.Tests/Amqp/AmqpHandlerTests.cs index 99dc992..1c08864 100644 --- a/KS.Fiks.IO.Client.Tests/Amqp/AmqpHandlerTests.cs +++ b/KS.Fiks.IO.Client.Tests/Amqp/AmqpHandlerTests.cs @@ -188,6 +188,8 @@ public async Task DisposeAsync_UnsubscribesAllEvents() consumer => consumer.ConsumerCancelledAsync += It.IsAny>(), Times.Once); + _fixture.ConnectionMock.Setup(c => c.IsOpen).Returns(true); + connectionMock.SetupRemove(connection => connection.ConnectionShutdownAsync -= It.IsAny>()); @@ -243,6 +245,116 @@ public async Task DisposeAsync_UnsubscribesAllEvents() connection => connection.ConnectionRecoveryErrorAsync -= It.IsAny>(), Times.AtLeastOnce); + + _fixture.ChannelMock.Verify(c => c.DisposeAsync(), Times.Once); + _fixture.ConnectionMock.Verify(c => c.DisposeAsync(), Times.Once); + } + + [Fact] + public async Task DisposeAsync_HandlesObjectDisposedExceptionWhenUnsubscribingConnectionEvents() + { + var sut = await _fixture.CreateSutAsync(); + + _fixture.ConnectionMock.Setup(c => c.IsOpen).Throws(new ObjectDisposedException("MockedObject")); + _fixture.ConnectionMock.Setup(c => c.DisposeAsync()).Returns(ValueTask.CompletedTask); + _fixture.ChannelMock.Setup(c => c.DisposeAsync()).Returns(ValueTask.CompletedTask); + + var exceptionCaught = false; + + try + { + await sut.DisposeAsync(); + } + catch + { + exceptionCaught = true; + } + + exceptionCaught.ShouldBeFalse("DisposeAsync should handle ObjectDisposedException internally."); + + _fixture.ConnectionMock.Verify(c => c.DisposeAsync(), Times.Once); + _fixture.ChannelMock.Verify(c => c.DisposeAsync(), Times.Once); + } + + [Fact] + public async Task DisposeAsync_CanBeCalledMultipleTimesSafely() + { + var sut = await _fixture.CreateSutAsync(); + + _fixture.ConnectionMock.Setup(c => c.IsOpen).Returns(true); + _fixture.ChannelMock.Setup(c => c.DisposeAsync()).Returns(ValueTask.CompletedTask); + _fixture.ConnectionMock.Setup(c => c.DisposeAsync()).Returns(ValueTask.CompletedTask); + + await sut.DisposeAsync(); + await sut.DisposeAsync(); + + _fixture.ChannelMock.Verify(c => c.DisposeAsync(), Times.Once); + _fixture.ConnectionMock.Verify(c => c.DisposeAsync(), Times.Once); + } + + [Fact] + public async Task DisposeAsync_HandlesNullConnectionGracefully() + { + var sut = await _fixture.CreateSutAsync(); + + _fixture.SetConnectionToNull(); + _fixture.ChannelMock.Setup(c => c.DisposeAsync()).Returns(ValueTask.CompletedTask); + + var exceptionCaught = false; + try + { + await sut.DisposeAsync(); + } + catch + { + exceptionCaught = true; + } + + exceptionCaught.ShouldBeFalse("DisposeAsync should handle null connection gracefully."); + + _fixture.ChannelMock.Verify(c => c.DisposeAsync(), Times.Once); + } + + [Fact] + public async Task DisposeAsync_SkipsEventUnsubscriptionWhenConnectionIsClosed() + { + var sut = await _fixture.CreateSutAsync(); + + _fixture.ConnectionMock.Setup(c => c.IsOpen).Returns(false); + _fixture.ChannelMock.Setup(c => c.DisposeAsync()).Returns(ValueTask.CompletedTask); + _fixture.ConnectionMock.Setup(c => c.DisposeAsync()).Returns(ValueTask.CompletedTask); + + await sut.DisposeAsync(); + + _fixture.ConnectionMock.VerifyRemove( + c => c.ConnectionShutdownAsync -= It.IsAny>(), + Times.Never); + } + + [Fact] + public async Task DisposeAsync_HandlesObjectDisposedExceptionWhenUnsubscribingEvents() + { + var sut = await _fixture.CreateSutAsync(); + + _fixture.ConnectionMock.Setup(c => c.IsOpen).Returns(true); + _fixture.ConnectionMock + .SetupRemove(c => c.ConnectionShutdownAsync -= It.IsAny>()) + .Throws(new ObjectDisposedException("MockedConnection")); + _fixture.ChannelMock.Setup(c => c.DisposeAsync()).Returns(ValueTask.CompletedTask); + _fixture.ConnectionMock.Setup(c => c.DisposeAsync()).Returns(ValueTask.CompletedTask); + + var exceptionCaught = false; + + try + { + await sut.DisposeAsync(); + } + catch + { + exceptionCaught = true; + } + + exceptionCaught.ShouldBeFalse("DisposeAsync should handle ObjectDisposedException internally."); } } } \ No newline at end of file diff --git a/KS.Fiks.IO.Client/Amqp/AmqpHandler.cs b/KS.Fiks.IO.Client/Amqp/AmqpHandler.cs index c2d8dba..a23c635 100644 --- a/KS.Fiks.IO.Client/Amqp/AmqpHandler.cs +++ b/KS.Fiks.IO.Client/Amqp/AmqpHandler.cs @@ -29,6 +29,12 @@ internal class AmqpHandler : IAmqpHandler private IAmqpReceiveConsumer _receiveConsumer; private Func _receivedEvent; private Func _cancelledEvent; + private int _disposed; + + internal void SetConnection(IConnection connection) + { + _connection = connection; + } private AmqpHandler( IMaskinportenClient maskinportenClient, @@ -60,7 +66,8 @@ private AmqpHandler( _amqpWatcher = amqpWatcher ?? new DefaultAmqpWatcher(loggerFactory); - _amqpConsumerFactory = consumerFactory ?? new AmqpConsumerFactory(sendHandler, dokumentlagerHandler, _amqpWatcher, _kontoConfiguration); + _amqpConsumerFactory = consumerFactory ?? new AmqpConsumerFactory( + sendHandler, dokumentlagerHandler, _amqpWatcher, _kontoConfiguration); } public static async Task CreateAsync( @@ -75,11 +82,14 @@ public static async Task CreateAsync( IAmqpConsumerFactory consumerFactory = null, IAmqpWatcher amqpWatcher = null) { - var amqpHandler = new AmqpHandler(maskinportenClient, sendHandler, dokumentlagerHandler, amqpConfiguration, integrasjonConfiguration, kontoConfiguration, loggerFactory, connectionFactory, consumerFactory, amqpWatcher); - await amqpHandler.ConnectAsync(amqpConfiguration).ConfigureAwait(false); + var handler = new AmqpHandler(maskinportenClient, sendHandler, dokumentlagerHandler, + amqpConfiguration, integrasjonConfiguration, kontoConfiguration, + loggerFactory, connectionFactory, consumerFactory, amqpWatcher); + + await handler.ConnectAsync(amqpConfiguration).ConfigureAwait(false); _logger?.LogDebug("AmqpHandler CreateAsync done"); - return amqpHandler; + return handler; } public async Task AddMessageReceivedHandlerAsync( @@ -111,35 +121,50 @@ public Task IsOpenAsync() public async ValueTask DisposeAsync() { + if (Interlocked.Exchange(ref _disposed, 1) == 1) + { + return; + } + RunSafe( + () => + { UnsubscribeConsumerEvents(); + SafelyUnsubscribeConnectionEvents(); + }, "Error during event unsubscription in DisposeAsync"); - UnsubscribeConnectionEvents(); + await DisposeSafeAsync(_channel, "channel").ConfigureAwait(false); + await DisposeSafeAsync(_connection, "connection").ConfigureAwait(false); + } - await _channel.DisposeAsync().ConfigureAwait(false); + private void SafelyUnsubscribeConnectionEvents() + { + if (_connection == null || !_connection.IsOpen) + { + _logger?.LogDebug("Connection is closed, skipping event unsubscription"); + return; + } - await _connection.DisposeAsync().ConfigureAwait(false); + RunSafe(UnsubscribeConnectionEvents, "Error unsubscribing connection events"); } - private async Task ConnectAsync(AmqpConfiguration amqpConfiguration) + private async Task ConnectAsync(AmqpConfiguration config) { - _connection = await CreateConnectionAsync(amqpConfiguration).ConfigureAwait(false); - _channel = await ConnectToChannelAsync(amqpConfiguration).ConfigureAwait(false); + _connection = await CreateConnectionAsync(config).ConfigureAwait(false); + _channel = await ConnectToChannelAsync(config).ConfigureAwait(false); SubscribeConnectionEvents(); - - await Task.CompletedTask.ConfigureAwait(false); } private void SubscribeConnectionEvents() - { - _connection.ConnectionShutdownAsync += _amqpWatcher.HandleConnectionShutdown; - _connection.ConnectionBlockedAsync += _amqpWatcher.HandleConnectionBlocked; - _connection.ConnectionUnblockedAsync += _amqpWatcher.HandleConnectionUnblocked; - _connection.RecoverySucceededAsync += _amqpWatcher.HandleRecoverySucceeded; - _connection.RecoveringConsumerAsync += _amqpWatcher.HandleRecoveringConsumer; - _connection.ConnectionRecoveryErrorAsync += _amqpWatcher.HandleConnectionRecoveryError; - } + { + _connection.ConnectionShutdownAsync += _amqpWatcher.HandleConnectionShutdown; + _connection.ConnectionBlockedAsync += _amqpWatcher.HandleConnectionBlocked; + _connection.ConnectionUnblockedAsync += _amqpWatcher.HandleConnectionUnblocked; + _connection.RecoverySucceededAsync += _amqpWatcher.HandleRecoverySucceeded; + _connection.RecoveringConsumerAsync += _amqpWatcher.HandleRecoveringConsumer; + _connection.ConnectionRecoveryErrorAsync += _amqpWatcher.HandleConnectionRecoveryError; + } private void UnsubscribeConnectionEvents() { @@ -187,5 +212,43 @@ private string GetQueueName() { return $"{QueuePrefix}{_kontoConfiguration.KontoId}"; } + + private void RunSafe(Action action, string warningMessage) + { + try + { + action(); + } + catch (ObjectDisposedException ex) + { + _logger?.LogDebug(ex, $"{warningMessage} (disposed)"); + } + catch (Exception ex) + { + _logger?.LogWarning(ex, warningMessage); + } + } + + private async Task DisposeSafeAsync(T disposable, string name) + where T : class, IAsyncDisposable + { + if (disposable == null) + { + return; + } + + try + { + await disposable.DisposeAsync().ConfigureAwait(false); + } + catch (ObjectDisposedException ex) + { + _logger?.LogDebug(ex, $"{name} already disposed."); + } + catch (Exception ex) + { + _logger?.LogWarning(ex, $"Error disposing {name} in DisposeAsync"); + } + } } } \ No newline at end of file