diff --git a/src/Wolverine/DurabilitySettings.cs b/src/Wolverine/DurabilitySettings.cs index 14181db7e..ee2ce7d38 100644 --- a/src/Wolverine/DurabilitySettings.cs +++ b/src/Wolverine/DurabilitySettings.cs @@ -201,6 +201,12 @@ internal set /// public TimeSpan NodeEventRecordExpirationTime { get; set; } = 5.Days(); + /// + /// When this option is enabled retry block used in InlineSendingAgent will synchronously wait on sending task to assure the message is send. + /// When set to default behavior is used so InlineSendingAgent agent will try to send a message and when failed it will give control to caller and retry on other thread in async manner + /// + public bool UseSyncRetryBlock { get; set; } + /// /// Get or set the logical Wolverine service name. By default, this is /// derived from the name of a custom WolverineOptions diff --git a/src/Wolverine/Transports/Sending/InlineSendingAgent.cs b/src/Wolverine/Transports/Sending/InlineSendingAgent.cs index 1cc065e17..da1c07b2e 100644 --- a/src/Wolverine/Transports/Sending/InlineSendingAgent.cs +++ b/src/Wolverine/Transports/Sending/InlineSendingAgent.cs @@ -9,7 +9,7 @@ namespace Wolverine.Transports.Sending; public class InlineSendingAgent : ISendingAgent, IDisposable { private readonly IMessageTracker _messageLogger; - private readonly RetryBlock _sending; + private readonly IRetryBlock _sending; private readonly DurabilitySettings _settings; public InlineSendingAgent(ILogger logger, ISender sender, Endpoint endpoint, IMessageTracker messageLogger, @@ -20,16 +20,18 @@ public InlineSendingAgent(ILogger logger, ISender sender, Endpoint endpoint, IMe _settings = settings; Endpoint = endpoint; - if (endpoint.TelemetryEnabled) + if (settings.UseSyncRetryBlock) { - _sending = new RetryBlock(sendWithTracing, logger, _settings.Cancellation); + _sending = new RetryBlockSync(RetryHandlerResolver(endpoint), logger, _settings.Cancellation); } else { - _sending = new RetryBlock(sendWithOutTracing, logger, _settings.Cancellation); + _sending = new RetryBlock(RetryHandlerResolver(endpoint), logger, _settings.Cancellation); } } + private Func RetryHandlerResolver(Endpoint endpoint) => endpoint.TelemetryEnabled ? sendWithTracing : sendWithOutTracing; + public ISender Sender { get; } private async Task sendWithTracing(Envelope e, CancellationToken cancellationToken) @@ -37,6 +39,7 @@ private async Task sendWithTracing(Envelope e, CancellationToken cancellationTok using var activity = WolverineTracing.StartSending(e); try { + //TODO: What about cancellationToken?? await Sender.SendAsync(e); _messageLogger.Sent(e); } @@ -58,7 +61,7 @@ private async Task sendWithOutTracing(Envelope e, CancellationToken cancellation public void Dispose() { - _sending.Dispose(); + (_sending as IDisposable)?.Dispose(); } public Uri Destination => Sender.Destination; diff --git a/src/Wolverine/Util/Dataflow/IRetryBlock.cs b/src/Wolverine/Util/Dataflow/IRetryBlock.cs new file mode 100644 index 000000000..364b1c3b5 --- /dev/null +++ b/src/Wolverine/Util/Dataflow/IRetryBlock.cs @@ -0,0 +1,15 @@ +namespace Wolverine.Util.Dataflow; + +/// +/// Abstract a way we can retry on message +/// +/// +internal interface IRetryBlock +{ + /// + /// Send message in async manner + /// + /// + /// + Task PostAsync(T message); +} diff --git a/src/Wolverine/Util/Dataflow/RetryBlock.cs b/src/Wolverine/Util/Dataflow/RetryBlock.cs index c34cf46d2..9dfff772e 100644 --- a/src/Wolverine/Util/Dataflow/RetryBlock.cs +++ b/src/Wolverine/Util/Dataflow/RetryBlock.cs @@ -1,6 +1,7 @@ -using System.Threading.Tasks.Dataflow; +using JasperFx.Blocks; using JasperFx.Core; using Microsoft.Extensions.Logging; +using System.Threading.Tasks.Dataflow; namespace Wolverine.Util.Dataflow; @@ -24,7 +25,7 @@ public Task ExecuteAsync(T message, CancellationToken cancellation) } } -public class RetryBlock : IDisposable +public class RetryBlock : IRetryBlock, IDisposable { private readonly ActionBlock _block; private readonly CancellationToken _cancellationToken; diff --git a/src/Wolverine/Util/Dataflow/RetryBlockSync.cs b/src/Wolverine/Util/Dataflow/RetryBlockSync.cs new file mode 100644 index 000000000..4ec06e81c --- /dev/null +++ b/src/Wolverine/Util/Dataflow/RetryBlockSync.cs @@ -0,0 +1,53 @@ +using JasperFx.Core; +using Microsoft.Extensions.Logging; + +namespace Wolverine.Util.Dataflow; + +/// +/// Synchrous version of RetryBlock - this version is retrying on the same thread context so execution os sending message is waiting for information that message was sent to the broker or there was a problem +/// +/// +public class RetryBlockSync(Func handler, ILogger logger, CancellationToken cancellationToken) : IRetryBlock +{ + private readonly CancellationToken _cancellationToken = cancellationToken; + private readonly Func _handler = handler; + private readonly ILogger _logger = logger; + public TimeSpan[] Pauses { get; set; } = [0.Milliseconds(), 50.Milliseconds(), 100.Milliseconds(), 250.Milliseconds()]; + + public async Task PostAsync(T message) + { + if (_cancellationToken.IsCancellationRequested) return; + + for (var attempt = 1; attempt <= Pauses.Length; attempt++) + { + var delay = Pauses[attempt-1]; + try + { + if (attempt > 1 && delay.TotalMilliseconds > 0) + { + await Task.Delay(delay, _cancellationToken).ConfigureAwait(false); + } + + await _handler(message, _cancellationToken).ConfigureAwait(false); + _logger.LogDebug("Completed {Item}", message); + } + catch (OperationCanceledException) + { + _logger.LogWarning("Operation canceled for message {Message} on attempt {Attempts}", message, attempt); + throw; + } + catch (Exception e) + { + if (attempt < Pauses.Length) + { + _logger.LogInformation(e, "Retrying message {Message} after {Attempts} attempts", message, attempt); + } + else + { + _logger.LogError(e, "Error proccessing message {Message} on attempt {Attempts}", message, attempt); + throw; + } + } + } + } +} \ No newline at end of file