diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqTransport.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqTransport.cs index e177cafeb..41e9d8dc8 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqTransport.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqTransport.cs @@ -157,17 +157,26 @@ public override async ValueTask ConnectAsync(IWolverineRuntime runtime) new ConnectionFactory { HostName = "localhost" }; configureDefaults(ConnectionFactory); - - if (_listenerConnection == null && !UseSenderConnectionOnly) + + try { - _listenerConnection = BuildConnection(ConnectionRole.Listening); - await _listenerConnection.ConnectAsync(); - } + if (_listenerConnection == null && !UseSenderConnectionOnly) + { + _listenerConnection = BuildConnection(ConnectionRole.Listening); + await _listenerConnection.ConnectAsync(); + } - if (_sendingConnection == null && !UseListenerConnectionOnly) + if (_sendingConnection == null && !UseListenerConnectionOnly) + { + _sendingConnection = BuildConnection(ConnectionRole.Sending); + await _sendingConnection.ConnectAsync(); + } + } + catch (Exception) { - _sendingConnection = BuildConnection(ConnectionRole.Sending); - await _sendingConnection.ConnectAsync(); + _listenerConnection = null; + _sendingConnection = null; + throw; } foreach (var tenant in Tenants) diff --git a/src/Wolverine/Transports/BrokerTransport.cs b/src/Wolverine/Transports/BrokerTransport.cs index 1dd62e174..3276a7d60 100644 --- a/src/Wolverine/Transports/BrokerTransport.cs +++ b/src/Wolverine/Transports/BrokerTransport.cs @@ -91,6 +91,32 @@ public sealed override async ValueTask InitializeAsync(IWolverineRuntime runtime tryBuildSystemEndpoints(runtime); + var attempts = 1; + + for (int i = 0; i < 20; i++) + { + try + { + await startupAsync(runtime); + return; + } + catch (Exception e) + { + runtime.Logger.LogError(e, "Error trying to start message broker {Broker} on Attempt {Attempt} of 20", Protocol, i + 1); + if (i < 19) + { + runtime.Logger.LogInformation("Will retry to start broker {Broker} in 5 seconds", Protocol); + await Task.Delay(5.Seconds()); + } + } + } + + throw new BrokerInitializationException(this); + + } + + private async ValueTask startupAsync(IWolverineRuntime runtime) + { await ConnectAsync(runtime); foreach (var endpoint in endpoints()) @@ -118,5 +144,13 @@ protected virtual IEnumerable explicitEndpoints() /// protected virtual void tryBuildSystemEndpoints(IWolverineRuntime runtime) { + } +} + +public class BrokerInitializationException : Exception +{ + public BrokerInitializationException(IBrokerTransport transport) : base($"Unable to initialize the Broker {transport.Protocol} in time") + { + } } \ No newline at end of file