Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions KS.Fiks.IO.Client.Tests/Amqp/AmqpHandlerFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public class AmqpHandlerFixture
private string _token = "testtoken";
private Guid _integrationId = Guid.NewGuid();
private string _integrationPassword = "defaultPassword";
private IAmqpHandler? _amqpHandler;

public AmqpHandlerFixture WhereConnectionFactoryThrowsException()
{
Expand Down Expand Up @@ -56,6 +57,11 @@ public AmqpHandlerFixture WithIntegrationId(Guid id)
return this;
}

public void SetConnectionToNull()
{
((AmqpHandler)GetAmqpHandler()).SetConnection(null);
}

public Mock<IConnectionFactory> ConnectionFactoryMock { get; } = new Mock<IConnectionFactory>();

public Mock<IConnection> ConnectionMock { get; } = new Mock<IConnection>();
Expand All @@ -76,7 +82,7 @@ internal async Task<IAmqpHandler> CreateSutAsync()
{
SetupMocks();
var amqpConfiguration = CreateConfiguration();
var amqpHandler = await AmqpHandler.CreateAsync(
_amqpHandler = await AmqpHandler.CreateAsync(
MaskinportenClientMock.Object,
SendHandlerMock.Object,
DokumentlagerHandlerMock.Object,
Expand All @@ -87,7 +93,7 @@ internal async Task<IAmqpHandler> CreateSutAsync()
ConnectionFactoryMock.Object,
AmqpConsumerFactoryMock.Object).ConfigureAwait(false);

return amqpHandler;
return _amqpHandler;
}

private static AmqpConfiguration CreateConfiguration()
Expand Down Expand Up @@ -159,5 +165,10 @@ private IntegrasjonConfiguration CreateIntegrationConfiguration()
{
return new IntegrasjonConfiguration(_integrationId, _integrationPassword);
}

private object GetAmqpHandler()
{
return _amqpHandler ?? throw new InvalidOperationException("SUT is not created yet.");
}
}
}
112 changes: 112 additions & 0 deletions KS.Fiks.IO.Client.Tests/Amqp/AmqpHandlerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ public async Task DisposeAsync_UnsubscribesAllEvents()
consumer => consumer.ConsumerCancelledAsync += It.IsAny<Func<ConsumerEventArgs, Task>>(),
Times.Once);

_fixture.ConnectionMock.Setup(c => c.IsOpen).Returns(true);

connectionMock.SetupRemove(connection =>
connection.ConnectionShutdownAsync -= It.IsAny<AsyncEventHandler<ShutdownEventArgs>>());

Expand Down Expand Up @@ -243,6 +245,116 @@ public async Task DisposeAsync_UnsubscribesAllEvents()
connection => connection.ConnectionRecoveryErrorAsync -=
It.IsAny<AsyncEventHandler<ConnectionRecoveryErrorEventArgs>>(),
Times.AtLeastOnce);

_fixture.ChannelMock.Verify(c => c.DisposeAsync(), Times.Once);
_fixture.ConnectionMock.Verify(c => c.DisposeAsync(), Times.Once);
}

[Fact]
public async Task DisposeAsync_HandlesObjectDisposedExceptionWhenUnsubscribingConnectionEvents()
{
var sut = await _fixture.CreateSutAsync();

_fixture.ConnectionMock.Setup(c => c.IsOpen).Throws(new ObjectDisposedException("MockedObject"));
_fixture.ConnectionMock.Setup(c => c.DisposeAsync()).Returns(ValueTask.CompletedTask);
_fixture.ChannelMock.Setup(c => c.DisposeAsync()).Returns(ValueTask.CompletedTask);

var exceptionCaught = false;

try
{
await sut.DisposeAsync();
}
catch
{
exceptionCaught = true;
}

exceptionCaught.ShouldBeFalse("DisposeAsync should handle ObjectDisposedException internally.");

_fixture.ConnectionMock.Verify(c => c.DisposeAsync(), Times.Once);
_fixture.ChannelMock.Verify(c => c.DisposeAsync(), Times.Once);
}

[Fact]
public async Task DisposeAsync_CanBeCalledMultipleTimesSafely()
{
var sut = await _fixture.CreateSutAsync();

_fixture.ConnectionMock.Setup(c => c.IsOpen).Returns(true);
_fixture.ChannelMock.Setup(c => c.DisposeAsync()).Returns(ValueTask.CompletedTask);
_fixture.ConnectionMock.Setup(c => c.DisposeAsync()).Returns(ValueTask.CompletedTask);

await sut.DisposeAsync();
await sut.DisposeAsync();

_fixture.ChannelMock.Verify(c => c.DisposeAsync(), Times.Once);
_fixture.ConnectionMock.Verify(c => c.DisposeAsync(), Times.Once);
}

[Fact]
public async Task DisposeAsync_HandlesNullConnectionGracefully()
{
var sut = await _fixture.CreateSutAsync();

_fixture.SetConnectionToNull();
_fixture.ChannelMock.Setup(c => c.DisposeAsync()).Returns(ValueTask.CompletedTask);

var exceptionCaught = false;
try
{
await sut.DisposeAsync();
}
catch
{
exceptionCaught = true;
}

exceptionCaught.ShouldBeFalse("DisposeAsync should handle null connection gracefully.");

_fixture.ChannelMock.Verify(c => c.DisposeAsync(), Times.Once);
}

[Fact]
public async Task DisposeAsync_SkipsEventUnsubscriptionWhenConnectionIsClosed()
{
var sut = await _fixture.CreateSutAsync();

_fixture.ConnectionMock.Setup(c => c.IsOpen).Returns(false);
_fixture.ChannelMock.Setup(c => c.DisposeAsync()).Returns(ValueTask.CompletedTask);
_fixture.ConnectionMock.Setup(c => c.DisposeAsync()).Returns(ValueTask.CompletedTask);

await sut.DisposeAsync();

_fixture.ConnectionMock.VerifyRemove(
c => c.ConnectionShutdownAsync -= It.IsAny<AsyncEventHandler<ShutdownEventArgs>>(),
Times.Never);
}

[Fact]
public async Task DisposeAsync_HandlesObjectDisposedExceptionWhenUnsubscribingEvents()
{
var sut = await _fixture.CreateSutAsync();

_fixture.ConnectionMock.Setup(c => c.IsOpen).Returns(true);
_fixture.ConnectionMock
.SetupRemove(c => c.ConnectionShutdownAsync -= It.IsAny<AsyncEventHandler<ShutdownEventArgs>>())
.Throws(new ObjectDisposedException("MockedConnection"));
_fixture.ChannelMock.Setup(c => c.DisposeAsync()).Returns(ValueTask.CompletedTask);
_fixture.ConnectionMock.Setup(c => c.DisposeAsync()).Returns(ValueTask.CompletedTask);

var exceptionCaught = false;

try
{
await sut.DisposeAsync();
}
catch
{
exceptionCaught = true;
}

exceptionCaught.ShouldBeFalse("DisposeAsync should handle ObjectDisposedException internally.");
}
}
}
103 changes: 83 additions & 20 deletions KS.Fiks.IO.Client/Amqp/AmqpHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ internal class AmqpHandler : IAmqpHandler
private IAmqpReceiveConsumer _receiveConsumer;
private Func<MottattMeldingArgs, Task> _receivedEvent;
private Func<ConsumerEventArgs, Task> _cancelledEvent;
private int _disposed;

internal void SetConnection(IConnection connection)
{
_connection = connection;
}

private AmqpHandler(
IMaskinportenClient maskinportenClient,
Expand Down Expand Up @@ -60,7 +66,8 @@ private AmqpHandler(

_amqpWatcher = amqpWatcher ?? new DefaultAmqpWatcher(loggerFactory);

_amqpConsumerFactory = consumerFactory ?? new AmqpConsumerFactory(sendHandler, dokumentlagerHandler, _amqpWatcher, _kontoConfiguration);
_amqpConsumerFactory = consumerFactory ?? new AmqpConsumerFactory(
sendHandler, dokumentlagerHandler, _amqpWatcher, _kontoConfiguration);
}

public static async Task<IAmqpHandler> CreateAsync(
Expand All @@ -75,11 +82,14 @@ public static async Task<IAmqpHandler> CreateAsync(
IAmqpConsumerFactory consumerFactory = null,
IAmqpWatcher amqpWatcher = null)
{
var amqpHandler = new AmqpHandler(maskinportenClient, sendHandler, dokumentlagerHandler, amqpConfiguration, integrasjonConfiguration, kontoConfiguration, loggerFactory, connectionFactory, consumerFactory, amqpWatcher);
await amqpHandler.ConnectAsync(amqpConfiguration).ConfigureAwait(false);
var handler = new AmqpHandler(maskinportenClient, sendHandler, dokumentlagerHandler,
amqpConfiguration, integrasjonConfiguration, kontoConfiguration,
loggerFactory, connectionFactory, consumerFactory, amqpWatcher);

await handler.ConnectAsync(amqpConfiguration).ConfigureAwait(false);

_logger?.LogDebug("AmqpHandler CreateAsync done");
return amqpHandler;
return handler;
}

public async Task AddMessageReceivedHandlerAsync(
Expand Down Expand Up @@ -111,35 +121,50 @@ public Task<bool> IsOpenAsync()

public async ValueTask DisposeAsync()
{
if (Interlocked.Exchange(ref _disposed, 1) == 1)
{
return;
}

RunSafe(
() =>
{
UnsubscribeConsumerEvents();
SafelyUnsubscribeConnectionEvents();
}, "Error during event unsubscription in DisposeAsync");

UnsubscribeConnectionEvents();
await DisposeSafeAsync(_channel, "channel").ConfigureAwait(false);
await DisposeSafeAsync(_connection, "connection").ConfigureAwait(false);
}

await _channel.DisposeAsync().ConfigureAwait(false);
private void SafelyUnsubscribeConnectionEvents()
{
if (_connection == null || !_connection.IsOpen)
{
_logger?.LogDebug("Connection is closed, skipping event unsubscription");
return;
}

await _connection.DisposeAsync().ConfigureAwait(false);
RunSafe(UnsubscribeConnectionEvents, "Error unsubscribing connection events");
}

private async Task ConnectAsync(AmqpConfiguration amqpConfiguration)
private async Task ConnectAsync(AmqpConfiguration config)
{
_connection = await CreateConnectionAsync(amqpConfiguration).ConfigureAwait(false);
_channel = await ConnectToChannelAsync(amqpConfiguration).ConfigureAwait(false);
_connection = await CreateConnectionAsync(config).ConfigureAwait(false);
_channel = await ConnectToChannelAsync(config).ConfigureAwait(false);

SubscribeConnectionEvents();

await Task.CompletedTask.ConfigureAwait(false);
}

private void SubscribeConnectionEvents()
{
_connection.ConnectionShutdownAsync += _amqpWatcher.HandleConnectionShutdown;
_connection.ConnectionBlockedAsync += _amqpWatcher.HandleConnectionBlocked;
_connection.ConnectionUnblockedAsync += _amqpWatcher.HandleConnectionUnblocked;
_connection.RecoverySucceededAsync += _amqpWatcher.HandleRecoverySucceeded;
_connection.RecoveringConsumerAsync += _amqpWatcher.HandleRecoveringConsumer;
_connection.ConnectionRecoveryErrorAsync += _amqpWatcher.HandleConnectionRecoveryError;
}
{
_connection.ConnectionShutdownAsync += _amqpWatcher.HandleConnectionShutdown;
_connection.ConnectionBlockedAsync += _amqpWatcher.HandleConnectionBlocked;
_connection.ConnectionUnblockedAsync += _amqpWatcher.HandleConnectionUnblocked;
_connection.RecoverySucceededAsync += _amqpWatcher.HandleRecoverySucceeded;
_connection.RecoveringConsumerAsync += _amqpWatcher.HandleRecoveringConsumer;
_connection.ConnectionRecoveryErrorAsync += _amqpWatcher.HandleConnectionRecoveryError;
}

private void UnsubscribeConnectionEvents()
{
Expand Down Expand Up @@ -187,5 +212,43 @@ private string GetQueueName()
{
return $"{QueuePrefix}{_kontoConfiguration.KontoId}";
}

private void RunSafe(Action action, string warningMessage)
{
try
{
action();
}
catch (ObjectDisposedException ex)
{
_logger?.LogDebug(ex, $"{warningMessage} (disposed)");
}
catch (Exception ex)
{
_logger?.LogWarning(ex, warningMessage);
}
}

private async Task DisposeSafeAsync<T>(T disposable, string name)
where T : class, IAsyncDisposable
{
if (disposable == null)
{
return;
}

try
{
await disposable.DisposeAsync().ConfigureAwait(false);
}
catch (ObjectDisposedException ex)
{
_logger?.LogDebug(ex, $"{name} already disposed.");
}
catch (Exception ex)
{
_logger?.LogWarning(ex, $"Error disposing {name} in DisposeAsync");
}
}
}
}