Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/Wolverine/DurabilitySettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,12 @@ internal set
/// </summary>
public TimeSpan NodeEventRecordExpirationTime { get; set; } = 5.Days();

/// <summary>
/// When this option is enabled retry block used in InlineSendingAgent will synchronously wait on sending task to assure the message is send.
/// When set to <see langword="false"/> default behavior is used so InlineSendingAgent agent will try to send a message and when failed it will give control to caller and retry on other thread in async manner
/// </summary>
public bool UseSyncRetryBlock { get; set; }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Option to drive sending agent


/// <summary>
/// Get or set the logical Wolverine service name. By default, this is
/// derived from the name of a custom WolverineOptions
Expand Down
13 changes: 8 additions & 5 deletions src/Wolverine/Transports/Sending/InlineSendingAgent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace Wolverine.Transports.Sending;
public class InlineSendingAgent : ISendingAgent, IDisposable
{
private readonly IMessageTracker _messageLogger;
private readonly RetryBlock<Envelope> _sending;
private readonly IRetryBlock<Envelope> _sending;
private readonly DurabilitySettings _settings;

public InlineSendingAgent(ILogger logger, ISender sender, Endpoint endpoint, IMessageTracker messageLogger,
Expand All @@ -20,23 +20,26 @@ public InlineSendingAgent(ILogger logger, ISender sender, Endpoint endpoint, IMe
_settings = settings;
Endpoint = endpoint;

if (endpoint.TelemetryEnabled)
if (settings.UseSyncRetryBlock)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use optional property to switch to different behavior so no breaking change is added

{
_sending = new RetryBlock<Envelope>(sendWithTracing, logger, _settings.Cancellation);
_sending = new RetryBlockSync<Envelope>(RetryHandlerResolver(endpoint), logger, _settings.Cancellation);
}
else
{
_sending = new RetryBlock<Envelope>(sendWithOutTracing, logger, _settings.Cancellation);
_sending = new RetryBlock<Envelope>(RetryHandlerResolver(endpoint), logger, _settings.Cancellation);
}
}

private Func<Envelope, CancellationToken, Task> RetryHandlerResolver(Endpoint endpoint) => endpoint.TelemetryEnabled ? sendWithTracing : sendWithOutTracing;

public ISender Sender { get; }

private async Task sendWithTracing(Envelope e, CancellationToken cancellationToken)
{
using var activity = WolverineTracing.StartSending(e);
try
{
//TODO: What about cancellationToken??
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cancellationToken is not passed to Sender.SendAsync - I think it should be

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't make sense before when it was truly asynchronous

await Sender.SendAsync(e);
_messageLogger.Sent(e);
}
Expand All @@ -58,7 +61,7 @@ private async Task sendWithOutTracing(Envelope e, CancellationToken cancellation

public void Dispose()
{
_sending.Dispose();
(_sending as IDisposable)?.Dispose();
}

public Uri Destination => Sender.Destination;
Expand Down
15 changes: 15 additions & 0 deletions src/Wolverine/Util/Dataflow/IRetryBlock.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
namespace Wolverine.Util.Dataflow;

/// <summary>
/// Abstract a way we can retry on <typeparamref name="T"/> message
/// </summary>
/// <typeparam name="T"></typeparam>
internal interface IRetryBlock<T>
{
/// <summary>
/// Send <typeparamref name="T"/> message in async manner
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
Task PostAsync(T message);
}
5 changes: 3 additions & 2 deletions src/Wolverine/Util/Dataflow/RetryBlock.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System.Threading.Tasks.Dataflow;
using JasperFx.Blocks;
using JasperFx.Core;
using Microsoft.Extensions.Logging;
using System.Threading.Tasks.Dataflow;

namespace Wolverine.Util.Dataflow;

Expand All @@ -24,7 +25,7 @@ public Task ExecuteAsync(T message, CancellationToken cancellation)
}
}

public class RetryBlock<T> : IDisposable
public class RetryBlock<T> : IRetryBlock<T>, IDisposable
{
private readonly ActionBlock<Item> _block;
private readonly CancellationToken _cancellationToken;
Expand Down
53 changes: 53 additions & 0 deletions src/Wolverine/Util/Dataflow/RetryBlockSync.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
using JasperFx.Core;
using Microsoft.Extensions.Logging;

namespace Wolverine.Util.Dataflow;

/// <summary>
/// Synchrous version of RetryBlock - this version is retrying on the same thread context so execution os sending message is waiting for information that message was sent to the broker or there was a problem
/// </summary>
/// <typeparam name="T"></typeparam>
public class RetryBlockSync<T>(Func<T, CancellationToken, Task> handler, ILogger logger, CancellationToken cancellationToken) : IRetryBlock<T>
{
private readonly CancellationToken _cancellationToken = cancellationToken;
private readonly Func<T, CancellationToken, Task> _handler = handler;
private readonly ILogger _logger = logger;
public TimeSpan[] Pauses { get; set; } = [0.Milliseconds(), 50.Milliseconds(), 100.Milliseconds(), 250.Milliseconds()];
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe move this Pauses to DurabilitySettings so retry logic could be configurable?


public async Task PostAsync(T message)
{
if (_cancellationToken.IsCancellationRequested) return;

for (var attempt = 1; attempt <= Pauses.Length; attempt++)
{
var delay = Pauses[attempt-1];
try
{
if (attempt > 1 && delay.TotalMilliseconds > 0)
{
await Task.Delay(delay, _cancellationToken).ConfigureAwait(false);
}

await _handler(message, _cancellationToken).ConfigureAwait(false);
_logger.LogDebug("Completed {Item}", message);
}
catch (OperationCanceledException)
{
_logger.LogWarning("Operation canceled for message {Message} on attempt {Attempts}", message, attempt);
throw;
}
catch (Exception e)
{
if (attempt < Pauses.Length)
{
_logger.LogInformation(e, "Retrying message {Message} after {Attempts} attempts", message, attempt);
}
else
{
_logger.LogError(e, "Error proccessing message {Message} on attempt {Attempts}", message, attempt);
throw;
}
}
}
}
}
Loading