diff --git a/src/Mocha/examples/ExceptionPolicies/ExceptionPolicies.csproj b/src/Mocha/examples/ExceptionPolicies/ExceptionPolicies.csproj new file mode 100644 index 00000000000..5719a995f08 --- /dev/null +++ b/src/Mocha/examples/ExceptionPolicies/ExceptionPolicies.csproj @@ -0,0 +1,13 @@ + + + Exe + net10.0 + enable + + + + + + + + diff --git a/src/Mocha/examples/ExceptionPolicies/Exceptions/Exceptions.cs b/src/Mocha/examples/ExceptionPolicies/Exceptions/Exceptions.cs new file mode 100644 index 00000000000..4154a60a0ee --- /dev/null +++ b/src/Mocha/examples/ExceptionPolicies/Exceptions/Exceptions.cs @@ -0,0 +1,44 @@ +namespace ExceptionPolicies.Exceptions; + +/// +/// Transient database failure — worth retrying because it usually resolves quickly. +/// +public class TransientDatabaseException(string message) : Exception(message); + +/// +/// The message payload is malformed — retrying will never help. +/// +public class MessageValidationException(string message) : Exception(message); + +/// +/// The message was already processed — expected in at-least-once delivery. +/// +public class DuplicateMessageException(string message) : Exception(message); + +/// +/// Payment gateway returned an error — flaky but usually recovers. +/// +public class PaymentGatewayException(string message) : Exception(message); + +/// +/// Auth token expired — immediate retry is pointless, need to wait for refresh. +/// +public class AuthTokenExpiredException(string message) : Exception(message); + +/// +/// External service is completely unavailable — needs time to recover. +/// +public class ExternalServiceUnavailableException(string message) : Exception(message); + +/// +/// HTTP-level failure with a status code for conditional policy matching. +/// +public class HttpServiceException(string message, int statusCode) : Exception(message) +{ + public int StatusCode { get; } = statusCode; +} + +/// +/// Corrupt or unparseable message payload — a poison message. +/// +public class PoisonMessageException(string message) : Exception(message); diff --git a/src/Mocha/examples/ExceptionPolicies/Handlers/Handlers.cs b/src/Mocha/examples/ExceptionPolicies/Handlers/Handlers.cs new file mode 100644 index 00000000000..7b48de3e008 --- /dev/null +++ b/src/Mocha/examples/ExceptionPolicies/Handlers/Handlers.cs @@ -0,0 +1,148 @@ +using ExceptionPolicies.Exceptions; +using ExceptionPolicies.Messages; +using Mocha; + +namespace ExceptionPolicies.Handlers; + +/// +/// Simulates a flaky payment gateway that succeeds after 3 failures. +/// Policy: Retry 5x with exponential backoff. +/// +public class ProcessPaymentHandler : IEventHandler +{ + private static int _attempts; + + public ValueTask HandleAsync(ProcessPayment message, CancellationToken cancellationToken) + { + var attempt = Interlocked.Increment(ref _attempts); + Console.WriteLine($"[Payment] Attempt {attempt} for order {message.OrderId}"); + + if (attempt <= 3) + { + throw new PaymentGatewayException("Gateway timeout"); + } + + Console.WriteLine($"[Payment] Successfully processed order {message.OrderId}"); + return ValueTask.CompletedTask; + } +} + +/// +/// Receives a message with an invalid payload. +/// Policy: DeadLetter immediately — the message is permanently bad. +/// +public class ValidateOrderHandler : IEventHandler +{ + public ValueTask HandleAsync(ValidateOrder message, CancellationToken cancellationToken) + { + Console.WriteLine($"[Validate] Validating order {message.OrderId}"); + throw new MessageValidationException($"Order {message.OrderId} has invalid schema"); + } +} + +/// +/// Detects a duplicate message that was already processed. +/// Policy: Discard silently — no retry, no dead-letter. +/// +public class DeduplicateMessageHandler : IEventHandler +{ + public ValueTask HandleAsync(DeduplicateMessage message, CancellationToken cancellationToken) + { + Console.WriteLine($"[Dedup] Message {message.MessageId} already processed"); + throw new DuplicateMessageException($"Message {message.MessageId} is a duplicate"); + } +} + +/// +/// Calls an external API that is completely down. +/// Policy: Retry 5x aggressively, then redeliver with increasing intervals, then dead-letter. +/// +public class CallExternalApiHandler : IEventHandler +{ + private static int _attempts; + + public ValueTask HandleAsync(CallExternalApi message, CancellationToken cancellationToken) + { + var attempt = Interlocked.Increment(ref _attempts); + Console.WriteLine($"[ExternalApi] Attempt {attempt} for {message.Url}"); + throw new ExternalServiceUnavailableException($"Service at {message.Url} is down"); + } +} + +/// +/// Service with an expired auth token. +/// Policy: Redeliver only (skip retry) — immediate retry won't help. +/// +public class RefreshAuthTokenHandler : IEventHandler +{ + private static int _attempts; + + public ValueTask HandleAsync(RefreshAuthToken message, CancellationToken cancellationToken) + { + var attempt = Interlocked.Increment(ref _attempts); + Console.WriteLine($"[Auth] Attempt {attempt} for service {message.Service}"); + throw new AuthTokenExpiredException($"Token for {message.Service} expired"); + } +} + +/// +/// Transient database failure during batch processing. +/// Policy: Retry 3x quickly, then escalate to redelivery. +/// +public class ProcessBatchHandler : IEventHandler +{ + private static int _attempts; + + public ValueTask HandleAsync(ProcessBatch message, CancellationToken cancellationToken) + { + var attempt = Interlocked.Increment(ref _attempts); + Console.WriteLine($"[Batch] Attempt {attempt} for batch {message.BatchId}"); + + if (attempt <= 4) + { + throw new TransientDatabaseException("Connection pool exhausted"); + } + + Console.WriteLine($"[Batch] Successfully processed batch {message.BatchId}"); + return ValueTask.CompletedTask; + } +} + +/// +/// Ingests telemetry data from a device, encountering various HTTP errors. +/// Policy: Conditional — different behavior for 404, 429, and 503 status codes. +/// +public class IngestTelemetryHandler : IEventHandler +{ + private static int _attempts; + + public ValueTask HandleAsync(IngestTelemetry message, CancellationToken cancellationToken) + { + var attempt = Interlocked.Increment(ref _attempts); + Console.WriteLine($"[Telemetry] Attempt {attempt} for device {message.DeviceId}"); + + // Rotate through different HTTP errors to demonstrate conditional policies + throw (attempt % 3) switch + { + 0 => new HttpServiceException("Not Found", 404), + 1 => new HttpServiceException("Too Many Requests", 429), + _ => new HttpServiceException("Service Unavailable", 503) + }; + } +} + +/// +/// Handles a corrupt/unparseable message. +/// Policy: Retry once (in case of transient parse issue), then dead-letter immediately. +/// +public class HandlePoisonMessageHandler : IEventHandler +{ + private static int _attempts; + + public ValueTask HandleAsync(HandlePoisonMessage message, CancellationToken cancellationToken) + { + var attempt = Interlocked.Increment(ref _attempts); + Console.WriteLine($"[Poison] Attempt {attempt} for data: {message.Data}"); + throw new PoisonMessageException("Payload cannot be deserialized"); + } +} diff --git a/src/Mocha/examples/ExceptionPolicies/Messages/Messages.cs b/src/Mocha/examples/ExceptionPolicies/Messages/Messages.cs new file mode 100644 index 00000000000..be0f9243ac0 --- /dev/null +++ b/src/Mocha/examples/ExceptionPolicies/Messages/Messages.cs @@ -0,0 +1,17 @@ +namespace ExceptionPolicies.Messages; + +public record ProcessPayment(string OrderId, decimal Amount); + +public record ValidateOrder(string OrderId); + +public record DeduplicateMessage(string MessageId); + +public record CallExternalApi(string Url); + +public record RefreshAuthToken(string Service); + +public record ProcessBatch(string BatchId); + +public record IngestTelemetry(string DeviceId); + +public record HandlePoisonMessage(string Data); diff --git a/src/Mocha/examples/ExceptionPolicies/Program.cs b/src/Mocha/examples/ExceptionPolicies/Program.cs new file mode 100644 index 00000000000..27892d97516 --- /dev/null +++ b/src/Mocha/examples/ExceptionPolicies/Program.cs @@ -0,0 +1,125 @@ +using ExceptionPolicies.Exceptions; +using ExceptionPolicies.Handlers; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Mocha; +using Mocha.Transport.InMemory; + +// --------------------------------------------------------------------------- +// Exception Policies Demo +// +// Demonstrates all per-exception policy configurations available in Mocha. +// Uses the InMemory transport for simplicity — no external dependencies. +// --------------------------------------------------------------------------- + +var builder = Host.CreateApplicationBuilder(args); + +builder.Services + .AddMessageBus() + + // ----------------------------------------------------------------------- + // Exception Policies — the main showcase + // + // Per-exception rules are configured in a single AddExceptionPolicy call. + // The On() catch-all provides global retry/redelivery defaults. + // ----------------------------------------------------------------------- + .AddExceptionPolicy(policy => + { + // --- Terminal: DeadLetter --- + // Validation errors are permanent — the message payload is bad. + // Skip retry and redelivery entirely; route straight to the error endpoint. + policy.On().DeadLetter(); + + // --- Terminal: Discard --- + // Duplicate messages are expected in at-least-once delivery systems. + // Silently drop them — no retry, no redelivery, no error endpoint. + policy.On().Discard(); + + // --- Retry only (skip redelivery) --- + // Payment gateway is flaky but usually recovers within a few attempts. + // Retry 5 times with exponential backoff, then dead-letter on exhaustion. + policy.On() + .Retry( + attempts: 5, + delay: TimeSpan.FromMilliseconds(200), + backoff: RetryBackoffType.Exponential); + + // --- Redeliver only (skip retry) --- + // Auth token expired — immediate retry is pointless because the token + // won't refresh in milliseconds. Wait for redelivery instead. + policy.On().Redeliver(); + + // --- Escalation: Retry then Redeliver --- + // Transient DB errors — try a few times quickly (connection hiccup), + // then back off with redelivery if the database is truly struggling. + policy.On() + .Retry(attempts: 3) + .ThenRedeliver(); + + // --- Escalation: Retry then DeadLetter (skip redelivery) --- + // Poison messages — try once in case it was a transient parse glitch, + // then give up immediately. Redelivery won't fix a corrupt payload. + policy.On() + .Retry(attempts: 1) + .ThenDeadLetter(); + + // --- Full chain: Retry -> Redeliver -> DeadLetter --- + // External service completely down — aggressive retry first, then + // patient redelivery with increasing intervals, then dead-letter + // as the last resort so operators can investigate. + policy.On() + .Retry(attempts: 5, delay: TimeSpan.FromMilliseconds(500)) + .ThenRedeliver( + [ + TimeSpan.FromSeconds(10), + TimeSpan.FromSeconds(30), + TimeSpan.FromSeconds(60) + ]) + .ThenDeadLetter(); + + // --- Conditional: Different policies for the same exception type --- + // HTTP 404 = resource is gone permanently, dead-letter it. + policy.On(ex => ex.StatusCode == 404) + .DeadLetter(); + + // HTTP 429 = rate limited, back off with redelivery. + policy.On(ex => ex.StatusCode == 429) + .Redeliver( + [ + TimeSpan.FromSeconds(5), + TimeSpan.FromSeconds(15), + TimeSpan.FromSeconds(30) + ]); + + // HTTP 503 = service unavailable, retry quickly then redeliver. + policy.On(ex => ex.StatusCode == 503) + .Retry(attempts: 3) + .ThenRedeliver(); + + // --- Catch-all: Default for unmatched exceptions --- + // Most-specific-type-wins means this only fires for exceptions + // not matched by any of the rules above. + policy.On() + .Retry(attempts: 2) + .ThenRedeliver(); + }) + + // ----------------------------------------------------------------------- + // Register handlers + // ----------------------------------------------------------------------- + .AddEventHandler() + .AddEventHandler() + .AddEventHandler() + .AddEventHandler() + .AddEventHandler() + .AddEventHandler() + .AddEventHandler() + .AddEventHandler() + + // ----------------------------------------------------------------------- + // Transport — InMemory for this demo (no external dependencies) + // ----------------------------------------------------------------------- + .AddInMemory(); + +var app = builder.Build(); +await app.RunAsync(); diff --git a/src/Mocha/src/Demo/Demo.Billing/Program.cs b/src/Mocha/src/Demo/Demo.Billing/Program.cs index 98757e06b61..50959616b33 100644 --- a/src/Mocha/src/Demo/Demo.Billing/Program.cs +++ b/src/Mocha/src/Demo/Demo.Billing/Program.cs @@ -31,6 +31,7 @@ builder .Services.AddMessageBus() .AddInstrumentation() + .AddResilience() .AddBilling() .AddBatchHandler(opts => { diff --git a/src/Mocha/src/Demo/Demo.Catalog/Program.cs b/src/Mocha/src/Demo/Demo.Catalog/Program.cs index f6c2583b767..ed8e6181c62 100644 --- a/src/Mocha/src/Demo/Demo.Catalog/Program.cs +++ b/src/Mocha/src/Demo/Demo.Catalog/Program.cs @@ -31,6 +31,7 @@ builder .Services.AddMessageBus() .AddInstrumentation() + .AddResilience() .AddCatalog() .AddEntityFramework(p => { diff --git a/src/Mocha/src/Demo/Demo.Shipping/Program.cs b/src/Mocha/src/Demo/Demo.Shipping/Program.cs index eb08a0c7865..0d136e3976d 100644 --- a/src/Mocha/src/Demo/Demo.Shipping/Program.cs +++ b/src/Mocha/src/Demo/Demo.Shipping/Program.cs @@ -28,6 +28,7 @@ builder .Services.AddMessageBus() .AddInstrumentation() + .AddResilience() .AddShipping() .AddEntityFramework(p => { diff --git a/src/Mocha/src/Mocha/Builder/MessageBusBuilderExtensions.cs b/src/Mocha/src/Mocha/Builder/MessageBusBuilderExtensions.cs index efb387b97db..3fe7171fa2d 100644 --- a/src/Mocha/src/Mocha/Builder/MessageBusBuilderExtensions.cs +++ b/src/Mocha/src/Mocha/Builder/MessageBusBuilderExtensions.cs @@ -30,6 +30,7 @@ public static IMessageBusBuilder ConfigureFeature( internal static void AddDefaults(this MessageBusBuilder builder) { + builder.UseConsume(ConsumerMiddlewares.Retry, before: "Instrumentation"); builder.UseConsume(ConsumerMiddlewares.Instrumentation); builder.UseReceive(ReceiveMiddlewares.TransportCircuitBreaker); @@ -37,6 +38,7 @@ internal static void AddDefaults(this MessageBusBuilder builder) builder.UseReceive(ReceiveMiddlewares.Instrumentation); builder.UseReceive(ReceiveMiddlewares.DeadLetter); builder.UseReceive(ReceiveMiddlewares.Fault); + builder.UseReceive(ReceiveMiddlewares.Redelivery, after: "Fault"); builder.UseReceive(ReceiveMiddlewares.CircuitBreaker); builder.UseReceive(ReceiveMiddlewares.Expiry); builder.UseReceive(ReceiveMiddlewares.MessageTypeSelection); diff --git a/src/Mocha/src/Mocha/Extensions/IMessageBusHostBuilderExtensions.cs b/src/Mocha/src/Mocha/Extensions/IMessageBusHostBuilderExtensions.cs index 62045a64ad4..e282d6ba9e9 100644 --- a/src/Mocha/src/Mocha/Extensions/IMessageBusHostBuilderExtensions.cs +++ b/src/Mocha/src/Mocha/Extensions/IMessageBusHostBuilderExtensions.cs @@ -29,6 +29,26 @@ public static IMessageBusHostBuilder AddEventHandler< return builder; } + /// + /// Registers an event handler with the message bus and adds it to the service collection, + /// with additional consumer configuration. + /// + /// The event handler type. + /// The host builder. + /// The action to configure the consumer descriptor. + /// The builder for method chaining. + public static IMessageBusHostBuilder AddEventHandler< + [DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.PublicConstructors)] THandler>( + this IMessageBusHostBuilder builder, + Action configure) + where THandler : class, IEventHandler + { + builder.Services.TryAddScoped(); + builder.ConfigureMessageBus(h => h.AddHandler(configure)); + + return builder; + } + /// /// Registers a handler using pre-built configuration from the source generator. /// @@ -94,6 +114,25 @@ public static IMessageBusHostBuilder AddRequestHandler(this IMessageBu return builder; } + /// + /// Registers a request handler with the message bus and adds it to the service collection, + /// with additional consumer configuration. + /// + /// The request handler type. + /// The host builder. + /// The action to configure the consumer descriptor. + /// The builder for method chaining. + public static IMessageBusHostBuilder AddRequestHandler( + this IMessageBusHostBuilder builder, + Action configure) + where THandler : class, IEventRequestHandler + { + builder.Services.TryAddScoped(); + builder.ConfigureMessageBus(h => h.AddHandler(configure)); + + return builder; + } + /// /// Registers a consumer with the message bus and adds it to the service collection. /// @@ -111,6 +150,26 @@ public static IMessageBusHostBuilder AddConsumer< return builder; } + /// + /// Registers a consumer with the message bus and adds it to the service collection, + /// with additional consumer configuration. + /// + /// The consumer type implementing . + /// The host builder. + /// The action to configure the consumer descriptor. + /// The builder for method chaining. + public static IMessageBusHostBuilder AddConsumer< + [DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.PublicConstructors)] TConsumer>( + this IMessageBusHostBuilder builder, + Action configure) + where TConsumer : class, IConsumer + { + builder.Services.TryAddScoped(); + builder.ConfigureMessageBus(h => h.AddHandler(configure)); + + return builder; + } + /// /// Registers additional services into the internal service collection used by the message bus. /// This is the message bus equivalent of Hot Chocolate's ConfigureSchemaServices. diff --git a/src/Mocha/src/Mocha/Headers/MessageHeaders.cs b/src/Mocha/src/Mocha/Headers/MessageHeaders.cs index 6b22af15f64..ce37e775144 100644 --- a/src/Mocha/src/Mocha/Headers/MessageHeaders.cs +++ b/src/Mocha/src/Mocha/Headers/MessageHeaders.cs @@ -20,6 +20,17 @@ internal static class MessageHeaders /// public static readonly ContextDataKey MessageKind = new("message-kind"); + /// + /// Defines header keys used by the retry infrastructure. + /// + public static class Retry + { + /// + /// The header key for tracking the number of delayed redelivery attempts. + /// + public static readonly ContextDataKey DelayedRetryCount = new("delayed-retry-count"); + } + /// /// Defines header keys for fault information attached to messages that failed processing. /// diff --git a/src/Mocha/src/Mocha/Middlewares/Consume/ConsumerMiddlewares.cs b/src/Mocha/src/Mocha/Middlewares/Consume/ConsumerMiddlewares.cs index 01dda0cc482..de1e4af845b 100644 --- a/src/Mocha/src/Mocha/Middlewares/Consume/ConsumerMiddlewares.cs +++ b/src/Mocha/src/Mocha/Middlewares/Consume/ConsumerMiddlewares.cs @@ -7,6 +7,11 @@ namespace Mocha; /// public static class ConsumerMiddlewares { + /// + /// The retry middleware configuration that retries failed handler invocations with configurable backoff. + /// + public static readonly ConsumerMiddlewareConfiguration Retry = ConsumerRetryMiddleware.Create(); + /// /// The instrumentation middleware configuration that emits telemetry for consumer operations. /// diff --git a/src/Mocha/src/Mocha/Middlewares/Consume/Retry/ConsumerRetryMiddleware.cs b/src/Mocha/src/Mocha/Middlewares/Consume/Retry/ConsumerRetryMiddleware.cs new file mode 100644 index 00000000000..5a55bf4ac41 --- /dev/null +++ b/src/Mocha/src/Mocha/Middlewares/Consume/Retry/ConsumerRetryMiddleware.cs @@ -0,0 +1,71 @@ +using System.Collections.Immutable; +using Microsoft.Extensions.DependencyInjection; +using Mocha.Features; + +namespace Mocha; + +/// +/// A consumer middleware that implements in-process retry with configurable backoff strategies +/// when transient failures occur. +/// +internal sealed class ConsumerRetryMiddleware(ImmutableArray exceptionPolicyRules) +{ + public async ValueTask InvokeAsync(IConsumeContext context, ConsumerDelegate next) + { + // Read delayed retry count from headers (set by redelivery middleware). + var delayedRetryCount = 0; + + if (context.Headers.TryGetValue(MessageHeaders.Retry.DelayedRetryCount.Key, out var headerValue)) + { + delayedRetryCount = RedeliveryExecutor.ParseDelayedRetryCount(headerValue); + } + + // Expose retry state to handlers via features. + var retryState = context.Features.GetOrSet(); + retryState.DelayedRetryCount = delayedRetryCount; + retryState.ImmediateRetryCount = 0; + + await RetryExecutor.ExecuteAsync( + exceptionPolicyRules, + (next, context, retryState), + static (s) => s.next(s.context), + static (s, attempts) => s.retryState.ImmediateRetryCount = attempts, + context.CancellationToken); + } + + public static ConsumerMiddlewareConfiguration Create() + => new( + static (context, next) => + { + var feature = context.GetExceptionPolicyFeature(); + + if (feature is null) + { + // No exception policy configured - skip retry middleware entirely. + return next; + } + + var middleware = new ConsumerRetryMiddleware(feature.Rules.ToImmutableArray()); + + return ctx => middleware.InvokeAsync(ctx, next); + }, + "Retry"); +} + +file static class Extensions +{ + /// + /// Resolves the bus-level exception policy feature, if configured. + /// + public static ExceptionPolicyFeature? GetExceptionPolicyFeature(this ConsumerMiddlewareFactoryContext context) + { + var busFeatures = context.Services.GetRequiredService(); + + if (busFeatures.TryGet(out ExceptionPolicyFeature? busFeature)) + { + return busFeature; + } + + return null; + } +} diff --git a/src/Mocha/src/Mocha/Middlewares/Consume/Retry/ExceptionPolicyBuilder.cs b/src/Mocha/src/Mocha/Middlewares/Consume/Retry/ExceptionPolicyBuilder.cs new file mode 100644 index 00000000000..902c0bda118 --- /dev/null +++ b/src/Mocha/src/Mocha/Middlewares/Consume/Retry/ExceptionPolicyBuilder.cs @@ -0,0 +1,194 @@ +namespace Mocha; + +internal sealed class ExceptionPolicyBuilder(ExceptionPolicyRule rule, List rules) + : IExceptionPolicyBuilder + , IAfterRetryBuilder + , IAfterRedeliveryBuilder where TException : Exception +{ + private bool _committed; + + public void Discard() + { + EnsureCommitted(); + + rule.Terminal = TerminalAction.Discard; + } + + public void DeadLetter() + { + EnsureCommitted(); + + rule.Terminal = TerminalAction.DeadLetter; + } + + public IAfterRetryBuilder Retry() + { + EnsureCommitted(); + + rule.Retry = new RetryPolicyConfig + { + Attempts = RetryPolicyDefaults.Attempts, + Delay = RetryPolicyDefaults.Delay, + Backoff = RetryPolicyDefaults.Backoff, + UseJitter = RetryPolicyDefaults.UseJitter, + MaxDelay = RetryPolicyDefaults.MaxDelay + }; + rule.Redelivery = new RedeliveryPolicyConfig { Enabled = false }; + return this; + } + + public IAfterRetryBuilder Retry(int attempts) + { + EnsureCommitted(); + + rule.Retry = new RetryPolicyConfig + { + Attempts = attempts, + Delay = RetryPolicyDefaults.Delay, + Backoff = RetryPolicyDefaults.Backoff, + UseJitter = RetryPolicyDefaults.UseJitter, + MaxDelay = RetryPolicyDefaults.MaxDelay + }; + rule.Redelivery = new RedeliveryPolicyConfig { Enabled = false }; + + return this; + } + + public IAfterRetryBuilder Retry( + int attempts, + TimeSpan delay, + RetryBackoffType backoff = RetryBackoffType.Exponential) + { + EnsureCommitted(); + + rule.Retry = new RetryPolicyConfig + { + Attempts = attempts, + Delay = delay, + Backoff = backoff, + UseJitter = RetryPolicyDefaults.UseJitter, + MaxDelay = RetryPolicyDefaults.MaxDelay + }; + + rule.Redelivery = new RedeliveryPolicyConfig { Enabled = false }; + + return this; + } + + public IAfterRetryBuilder Retry(TimeSpan[] intervals) + { + EnsureCommitted(); + + rule.Retry = new RetryPolicyConfig { Intervals = [.. intervals], Attempts = intervals.Length }; + rule.Redelivery = new RedeliveryPolicyConfig { Enabled = false }; + + return this; + } + + public IAfterRedeliveryBuilder Redeliver() + { + EnsureCommitted(); + + rule.Retry = new RetryPolicyConfig { Enabled = false }; + rule.Redelivery = new RedeliveryPolicyConfig + { + Intervals = RedeliveryPolicyDefaults.Intervals, + Attempts = RedeliveryPolicyDefaults.Intervals.Length, + UseJitter = RedeliveryPolicyDefaults.UseJitter, + MaxDelay = RedeliveryPolicyDefaults.MaxDelay + }; + + return this; + } + + public IAfterRedeliveryBuilder Redeliver(int attempts, TimeSpan baseDelay) + { + EnsureCommitted(); + + rule.Retry = new RetryPolicyConfig { Enabled = false }; + rule.Redelivery = new RedeliveryPolicyConfig + { + Attempts = attempts, + BaseDelay = baseDelay, + UseJitter = RedeliveryPolicyDefaults.UseJitter, + MaxDelay = RedeliveryPolicyDefaults.MaxDelay + }; + + return this; + } + + public IAfterRedeliveryBuilder Redeliver(TimeSpan[] intervals) + { + EnsureCommitted(); + + rule.Retry = new RetryPolicyConfig { Enabled = false }; + rule.Redelivery = new RedeliveryPolicyConfig + { + Intervals = [.. intervals], + Attempts = intervals.Length, + UseJitter = RedeliveryPolicyDefaults.UseJitter, + MaxDelay = RedeliveryPolicyDefaults.MaxDelay + }; + return this; + } + + public IAfterRedeliveryBuilder ThenRedeliver() + { + rule.Redelivery = new RedeliveryPolicyConfig + { + Intervals = RedeliveryPolicyDefaults.Intervals, + Attempts = RedeliveryPolicyDefaults.Intervals.Length, + UseJitter = RedeliveryPolicyDefaults.UseJitter, + MaxDelay = RedeliveryPolicyDefaults.MaxDelay + }; + return this; + } + + public IAfterRedeliveryBuilder ThenRedeliver(int attempts, TimeSpan baseDelay) + { + rule.Redelivery = new RedeliveryPolicyConfig + { + Attempts = attempts, + BaseDelay = baseDelay, + UseJitter = RedeliveryPolicyDefaults.UseJitter, + MaxDelay = RedeliveryPolicyDefaults.MaxDelay + }; + return this; + } + + public IAfterRedeliveryBuilder ThenRedeliver(TimeSpan[] intervals) + { + rule.Redelivery = new RedeliveryPolicyConfig + { + Intervals = [.. intervals], + Attempts = intervals.Length, + UseJitter = RedeliveryPolicyDefaults.UseJitter, + MaxDelay = RedeliveryPolicyDefaults.MaxDelay + }; + return this; + } + + public void ThenDeadLetter() + { + rule.Terminal = TerminalAction.DeadLetter; + } + + private void EnsureCommitted() + { + if (!_committed) + { + // Replace any existing rule for the same exception type and predicate. + for (var i = rules.Count - 1; i >= 0; i--) + { + if (rules[i].ExceptionType == rule.ExceptionType + && rules[i].Predicate == rule.Predicate) + { + rules.RemoveAt(i); + } + } + + rules.Add(rule); + _committed = true; + } + } +} diff --git a/src/Mocha/src/Mocha/Middlewares/Consume/Retry/ExceptionPolicyFeature.cs b/src/Mocha/src/Mocha/Middlewares/Consume/Retry/ExceptionPolicyFeature.cs new file mode 100644 index 00000000000..5c84cedd03e --- /dev/null +++ b/src/Mocha/src/Mocha/Middlewares/Consume/Retry/ExceptionPolicyFeature.cs @@ -0,0 +1,40 @@ +using Mocha.Features; + +namespace Mocha; + +/// +/// A feature that exposes the per-exception policy configuration. +/// +public sealed class ExceptionPolicyFeature : ISealable +{ + private readonly List _rules = []; + + /// + public bool IsReadOnly { get; private set; } + + /// + /// Gets the configured exception policy rules. + /// + public IReadOnlyList Rules => _rules; + + /// + public void Seal() + { + IsReadOnly = true; + } + + /// + /// Applies configuration to the exception policy options. + /// + /// An action that modifies the exception policy options. + /// Thrown if the feature has been sealed. + public void Configure(Action configure) + { + if (IsReadOnly) + { + throw ThrowHelper.FeatureIsReadOnly(); + } + + configure(new ExceptionPolicyOptions(_rules)); + } +} diff --git a/src/Mocha/src/Mocha/Middlewares/Consume/Retry/ExceptionPolicyMatcher.cs b/src/Mocha/src/Mocha/Middlewares/Consume/Retry/ExceptionPolicyMatcher.cs new file mode 100644 index 00000000000..38ad7b6f963 --- /dev/null +++ b/src/Mocha/src/Mocha/Middlewares/Consume/Retry/ExceptionPolicyMatcher.cs @@ -0,0 +1,54 @@ +using System.Collections.Immutable; + +namespace Mocha; + +/// +/// Evaluates exception policy rules to find the best matching rule for an exception. +/// Most-specific-type-wins: if both DbException and NpgsqlException rules exist, +/// NpgsqlException rule takes priority for NpgsqlException instances. +/// +internal static class ExceptionPolicyMatcher +{ + /// + /// Finds the best matching exception policy rule for the given exception. + /// + /// The list of exception policy rules to evaluate. + /// The exception to match against. + /// The best matching rule, or null if no rule matches. + public static ExceptionPolicyRule? Match(ImmutableArray rules, Exception exception) + { + ExceptionPolicyRule? bestMatch = null; + var bestDepth = int.MaxValue; + + foreach (var rule in rules) + { + if (!rule.ExceptionType.IsInstanceOfType(exception)) + { + continue; + } + + if (rule.Predicate is not null && !rule.Predicate(exception)) + { + continue; + } + + // Calculate inheritance depth: most specific type = smallest depth + var depth = 0; + var type = exception.GetType(); + + while (type != null && type != rule.ExceptionType) + { + depth++; + type = type.BaseType; + } + + if (depth < bestDepth) + { + bestDepth = depth; + bestMatch = rule; + } + } + + return bestMatch; + } +} diff --git a/src/Mocha/src/Mocha/Middlewares/Consume/Retry/ExceptionPolicyOptions.cs b/src/Mocha/src/Mocha/Middlewares/Consume/Retry/ExceptionPolicyOptions.cs new file mode 100644 index 00000000000..3674db06f89 --- /dev/null +++ b/src/Mocha/src/Mocha/Middlewares/Consume/Retry/ExceptionPolicyOptions.cs @@ -0,0 +1,45 @@ +namespace Mocha; + +/// +/// Options for configuring exception policies with per-exception rules. +/// +/// +/// Initializes a new instance of the class. +/// +/// The shared list of exception policy rules to populate. +public class ExceptionPolicyOptions(List rules) +{ + private readonly List _rules = rules; + + /// + /// Configures the default behavior for all exceptions that don't match a more specific rule. + /// Equivalent to On<Exception>(). + /// + /// A builder for configuring the default exception behavior. + public IExceptionPolicyBuilder Default() => On(null); + + /// + /// Configures behavior for a specific exception type. + /// + /// The exception type to configure. + /// A builder for configuring the exception behavior. + public IExceptionPolicyBuilder On() where TException : Exception => On(null); + + /// + /// Configures behavior for a specific exception type matching a predicate. + /// + /// The exception type to configure. + /// An optional predicate to further filter the exception. + /// A builder for configuring the exception behavior. + public IExceptionPolicyBuilder On(Func? predicate) + where TException : Exception + { + Func? wrappedPredicate = predicate is not null + ? ex => ex is TException typed && predicate(typed) + : null; + + var rule = new ExceptionPolicyRule { ExceptionType = typeof(TException), Predicate = wrappedPredicate }; + + return new ExceptionPolicyBuilder(rule, _rules); + } +} diff --git a/src/Mocha/src/Mocha/Middlewares/Consume/Retry/ExceptionPolicyRule.cs b/src/Mocha/src/Mocha/Middlewares/Consume/Retry/ExceptionPolicyRule.cs new file mode 100644 index 00000000000..3e50fbff1ce --- /dev/null +++ b/src/Mocha/src/Mocha/Middlewares/Consume/Retry/ExceptionPolicyRule.cs @@ -0,0 +1,48 @@ +namespace Mocha; + +/// +/// Represents a single per-exception policy rule with optional retry, redelivery, and terminal actions. +/// +public sealed class ExceptionPolicyRule +{ + /// + /// Gets the exception type this rule applies to. + /// + public required Type ExceptionType { get; init; } + + /// + /// Gets the optional predicate to further filter the exception. + /// + public required Func? Predicate { get; init; } + + /// + /// Gets or sets the retry configuration for this exception. + /// + public RetryPolicyConfig? Retry { get; set; } + + /// + /// Gets or sets the redelivery configuration for this exception. + /// + public RedeliveryPolicyConfig? Redelivery { get; set; } + + /// + /// Gets or sets the terminal action for this exception. + /// + public TerminalAction? Terminal { get; set; } +} + +/// +/// Specifies the terminal action to take for an exception. +/// +public enum TerminalAction +{ + /// + /// Routes the message to the error endpoint. + /// + DeadLetter, + + /// + /// Swallows the exception; the message disappears. + /// + Discard +} diff --git a/src/Mocha/src/Mocha/Middlewares/Consume/Retry/IAfterRedeliveryBuilder.cs b/src/Mocha/src/Mocha/Middlewares/Consume/Retry/IAfterRedeliveryBuilder.cs new file mode 100644 index 00000000000..ed037b58212 --- /dev/null +++ b/src/Mocha/src/Mocha/Middlewares/Consume/Retry/IAfterRedeliveryBuilder.cs @@ -0,0 +1,13 @@ +namespace Mocha; + +/// +/// Builder for chaining actions after redelivery configuration. +/// If nothing is chained, the default behavior (dead-letter on exhaustion) applies. +/// +public interface IAfterRedeliveryBuilder +{ + /// + /// Routes the message to the error endpoint after redelivery exhaustion. + /// + void ThenDeadLetter(); +} diff --git a/src/Mocha/src/Mocha/Middlewares/Consume/Retry/IAfterRetryBuilder.cs b/src/Mocha/src/Mocha/Middlewares/Consume/Retry/IAfterRetryBuilder.cs new file mode 100644 index 00000000000..7fc4659c43a --- /dev/null +++ b/src/Mocha/src/Mocha/Middlewares/Consume/Retry/IAfterRetryBuilder.cs @@ -0,0 +1,34 @@ +namespace Mocha; + +/// +/// Builder for chaining actions after retry configuration. +/// If nothing is chained, retry-only behavior applies and redelivery is skipped. +/// +public interface IAfterRetryBuilder +{ + /// + /// Chains redelivery after retry exhaustion with default settings. + /// + /// A builder for chaining additional actions after redelivery. + IAfterRedeliveryBuilder ThenRedeliver(); + + /// + /// Chains redelivery after retry exhaustion with the specified attempts and base delay. + /// + /// The number of redelivery attempts. + /// The base delay for redelivery. + /// A builder for chaining additional actions after redelivery. + IAfterRedeliveryBuilder ThenRedeliver(int attempts, TimeSpan baseDelay); + + /// + /// Chains redelivery after retry exhaustion with explicit intervals. + /// + /// The explicit intervals between redeliveries. + /// A builder for chaining additional actions after redelivery. + IAfterRedeliveryBuilder ThenRedeliver(TimeSpan[] intervals); + + /// + /// Routes the message to the error endpoint after retry exhaustion. + /// + void ThenDeadLetter(); +} diff --git a/src/Mocha/src/Mocha/Middlewares/Consume/Retry/IExceptionPolicyBuilder.cs b/src/Mocha/src/Mocha/Middlewares/Consume/Retry/IExceptionPolicyBuilder.cs new file mode 100644 index 00000000000..d20a8996a96 --- /dev/null +++ b/src/Mocha/src/Mocha/Middlewares/Consume/Retry/IExceptionPolicyBuilder.cs @@ -0,0 +1,73 @@ +namespace Mocha; + +/// +/// Fluent builder for configuring per-exception retry/redelivery behavior. +/// +/// The exception type to configure behavior for. +public interface IExceptionPolicyBuilder where TException : Exception +{ + /// + /// Discards the message when this exception occurs. The exception is swallowed. + /// + void Discard(); + + /// + /// Routes the message to the error endpoint when this exception occurs. + /// Retry and redelivery are skipped. + /// + void DeadLetter(); + + /// + /// Retries the handler invocation with default settings. + /// Redelivery is disabled unless chained with . + /// + /// A builder for chaining additional actions after retry. + IAfterRetryBuilder Retry(); + + /// + /// Retries the handler invocation with the specified number of attempts. + /// Redelivery is disabled unless chained with . + /// + /// The number of retry attempts. + /// A builder for chaining additional actions after retry. + IAfterRetryBuilder Retry(int attempts); + + /// + /// Retries the handler invocation with full configuration. + /// Redelivery is disabled unless chained with . + /// + /// The number of retry attempts. + /// The base delay between retries. + /// The backoff strategy. + /// A builder for chaining additional actions after retry. + IAfterRetryBuilder Retry(int attempts, TimeSpan delay, RetryBackoffType backoff = RetryBackoffType.Exponential); + + /// + /// Retries the handler invocation with explicit intervals. + /// Redelivery is disabled unless chained with . + /// + /// The explicit intervals between retries. + /// A builder for chaining additional actions after retry. + IAfterRetryBuilder Retry(TimeSpan[] intervals); + + /// + /// Redelivers the message with default settings, skipping retry. + /// + /// A builder for chaining additional actions after redelivery. + IAfterRedeliveryBuilder Redeliver(); + + /// + /// Redelivers the message with the specified attempts and base delay, skipping retry. + /// + /// The number of redelivery attempts. + /// The base delay for redelivery. + /// A builder for chaining additional actions after redelivery. + IAfterRedeliveryBuilder Redeliver(int attempts, TimeSpan baseDelay); + + /// + /// Redelivers the message with explicit intervals, skipping retry. + /// + /// The explicit intervals between redeliveries. + /// A builder for chaining additional actions after redelivery. + IAfterRedeliveryBuilder Redeliver(TimeSpan[] intervals); +} diff --git a/src/Mocha/src/Mocha/Middlewares/Consume/Retry/RedeliveryPolicyConfig.cs b/src/Mocha/src/Mocha/Middlewares/Consume/Retry/RedeliveryPolicyConfig.cs new file mode 100644 index 00000000000..407f985bb82 --- /dev/null +++ b/src/Mocha/src/Mocha/Middlewares/Consume/Retry/RedeliveryPolicyConfig.cs @@ -0,0 +1,39 @@ +using System.Collections.Immutable; + +namespace Mocha; + +/// +/// Per-exception redelivery configuration overrides. +/// +public sealed class RedeliveryPolicyConfig +{ + /// + /// Gets whether redelivery is enabled for this exception. Defaults to true. + /// + public bool Enabled { get; init; } = true; + + /// + /// Gets the number of redelivery attempts, or null to use global defaults. + /// + public int? Attempts { get; init; } + + /// + /// Gets the base delay for redelivery, or null to use global defaults. + /// + public TimeSpan? BaseDelay { get; init; } + + /// + /// Gets the maximum delay cap, or null to use global defaults. + /// + public TimeSpan? MaxDelay { get; init; } + + /// + /// Gets whether jitter is enabled, or null to use global defaults. + /// + public bool? UseJitter { get; init; } + + /// + /// Gets the explicit redelivery intervals, or null to use global defaults. + /// + public ImmutableArray? Intervals { get; init; } +} diff --git a/src/Mocha/src/Mocha/Middlewares/Consume/Retry/RedeliveryPolicyDefaults.cs b/src/Mocha/src/Mocha/Middlewares/Consume/Retry/RedeliveryPolicyDefaults.cs new file mode 100644 index 00000000000..bb4df697960 --- /dev/null +++ b/src/Mocha/src/Mocha/Middlewares/Consume/Retry/RedeliveryPolicyDefaults.cs @@ -0,0 +1,29 @@ +using System.Collections.Immutable; + +namespace Mocha; + +/// +/// Provides the built-in default values for redelivery policy configuration. +/// +internal static class RedeliveryPolicyDefaults +{ + /// + /// The default redelivery intervals. Value: 5min, 15min, 30min. + /// + public static readonly ImmutableArray Intervals = + [ + TimeSpan.FromMinutes(5), + TimeSpan.FromMinutes(15), + TimeSpan.FromMinutes(30) + ]; + + /// + /// The default jitter setting. Value: true. + /// + public const bool UseJitter = true; + + /// + /// The default maximum delay cap. Value: 1 hour. + /// + public static readonly TimeSpan MaxDelay = TimeSpan.FromHours(1); +} diff --git a/src/Mocha/src/Mocha/Middlewares/Consume/Retry/ResilienceConfigurationExtensions.cs b/src/Mocha/src/Mocha/Middlewares/Consume/Retry/ResilienceConfigurationExtensions.cs new file mode 100644 index 00000000000..a37f1bf3632 --- /dev/null +++ b/src/Mocha/src/Mocha/Middlewares/Consume/Retry/ResilienceConfigurationExtensions.cs @@ -0,0 +1,112 @@ +using Mocha.Features; + +namespace Mocha; + +/// +/// Provides extension methods for configuring exception policies including retry, redelivery, +/// and per-exception rules on message bus builders, host builders, and descriptors. +/// +public static class ResilienceConfigurationExtensions +{ + /// + /// Adds exception policy configuration to the message bus with default settings. + /// Registers a catch-all rule for with default retry and redelivery. + /// + /// The message bus builder. + /// The builder for method chaining. + public static IMessageBusBuilder AddResilience(this IMessageBusBuilder builder) + { + builder.ConfigureFeature(f => f.GetOrSet().Configure(p => p.AddDefaultPolicy())); + + return builder; + } + + /// + /// Adds exception policy configuration to the message bus. + /// + /// The message bus builder. + /// The action to configure exception policy options. + /// The builder for method chaining. + public static IMessageBusBuilder AddResilience( + this IMessageBusBuilder builder, + Action configure) + { + builder.ConfigureFeature(f => + { + var feature = f.GetOrSet(); + feature.Configure(p => p.AddDefaultPolicy()); + feature.Configure(configure); + }); + + return builder; + } + + /// + /// Adds exception policy configuration to the host-level message bus with default settings. + /// + /// The host builder. + /// The builder for method chaining. + public static IMessageBusHostBuilder AddResilience(this IMessageBusHostBuilder builder) + { + builder.ConfigureMessageBus(x => x.AddResilience()); + return builder; + } + + /// + /// Adds exception policy configuration to the host-level message bus. + /// + /// The host builder. + /// The action to configure exception policy options. + /// The builder for method chaining. + public static IMessageBusHostBuilder AddResilience( + this IMessageBusHostBuilder builder, + Action configure) + { + builder.ConfigureMessageBus(x => x.AddResilience(configure)); + return builder; + } + + /// + /// Adds exception policy configuration to a specific descriptor (e.g., receive endpoint or transport) + /// with default settings. + /// Registers a catch-all rule for with default retry and redelivery. + /// + /// The descriptor type that supports receive middleware. + /// The descriptor to configure. + /// The descriptor for method chaining. + public static TDescriptor AddResilience(this TDescriptor descriptor) + where TDescriptor : IReceiveMiddlewareProvider + { + descriptor + .Extend() + .Configuration.Features.GetOrSet() + .Configure(p => p.AddDefaultPolicy()); + + return descriptor; + } + + /// + /// Adds exception policy configuration to a specific descriptor (e.g., receive endpoint or transport). + /// + /// The descriptor type that supports receive middleware. + /// The descriptor to configure. + /// The action to configure exception policy options. + /// The descriptor for method chaining. + public static TDescriptor AddResilience( + this TDescriptor descriptor, + Action configure) + where TDescriptor : IReceiveMiddlewareProvider + { + var feature = descriptor.Extend().Configuration.Features.GetOrSet(); + feature.Configure(p => p.AddDefaultPolicy()); + feature.Configure(configure); + + return descriptor; + } + + private static ExceptionPolicyOptions AddDefaultPolicy(this ExceptionPolicyOptions options) + { + options.Default().Retry().ThenRedeliver(); + return options; + } +} diff --git a/src/Mocha/src/Mocha/Middlewares/Consume/Retry/RetryBackoffType.cs b/src/Mocha/src/Mocha/Middlewares/Consume/Retry/RetryBackoffType.cs new file mode 100644 index 00000000000..d670e13b5cc --- /dev/null +++ b/src/Mocha/src/Mocha/Middlewares/Consume/Retry/RetryBackoffType.cs @@ -0,0 +1,22 @@ +namespace Mocha; + +/// +/// Specifies the backoff strategy for delay calculations between retry attempts. +/// +public enum RetryBackoffType +{ + /// + /// Constant delay between retries. Every attempt waits the same base delay. + /// + Constant, + + /// + /// Linearly increasing delay. Delay = baseDelay * attempt. + /// + Linear, + + /// + /// Exponentially increasing delay. Delay = baseDelay * 2^(attempt-1). + /// + Exponential +} diff --git a/src/Mocha/src/Mocha/Middlewares/Consume/Retry/RetryExecutor.cs b/src/Mocha/src/Mocha/Middlewares/Consume/Retry/RetryExecutor.cs new file mode 100644 index 00000000000..6e2afa3cf8d --- /dev/null +++ b/src/Mocha/src/Mocha/Middlewares/Consume/Retry/RetryExecutor.cs @@ -0,0 +1,125 @@ +using System.Collections.Immutable; + +namespace Mocha; + +/// +/// Executes an action with retry logic based on exception policy rules. +/// Returns normally on success or discard. Throws on dead-letter, no match, or exhausted retries. +/// +internal static class RetryExecutor +{ + public static ValueTask ExecuteAsync( + ImmutableArray rules, + TState state, + Func action, + Action? onRetry, + CancellationToken cancellationToken) + { + return ExecuteAsync(rules, state, action, onRetry, TimeProvider.System, cancellationToken); + } + + public static async ValueTask ExecuteAsync( + ImmutableArray rules, + TState state, + Func action, + Action? onRetry, + TimeProvider timeProvider, + CancellationToken cancellationToken) + { + var attempts = 0; + + while (true) + { + try + { + await action(state); + return; + } + catch (Exception ex) + { + // Match exception against policy rules. + var rule = ExceptionPolicyMatcher.Match(rules, ex); + + // No matching rule - no policy for this exception, let it propagate. + if (rule is null) + { + throw; + } + + // Discard: swallow the exception (always immediate, no retry chaining exists). + if (rule.Terminal == TerminalAction.Discard) + { + return; + } + + // No retry configured for this rule, or retry explicitly disabled. + // Terminal (e.g. DeadLetter) is metadata for the fault middleware downstream. + if (rule.Retry is null or { Enabled: false }) + { + throw; + } + + attempts++; + + // Use the rule's retry config (fully populated by the builder). + var retryConfig = rule.Retry; + + if (attempts > (retryConfig.Attempts ?? RetryPolicyDefaults.Attempts)) + { + throw; + } + + // Calculate delay. + var delay = CalculateDelay(attempts, retryConfig); + + // Notify caller of retry attempt. + onRetry?.Invoke(state, attempts); + + if (delay > TimeSpan.Zero) + { + await Task.Delay(delay, timeProvider, cancellationToken).ConfigureAwait(false); + } + } + } + } + + internal static TimeSpan CalculateDelay(int attempt, RetryPolicyConfig config) + { + // Explicit intervals take precedence. + if (config.Intervals is { Length: > 0 } intervals) + { + var index = Math.Min(attempt - 1, intervals.Length - 1); + return intervals[index]; + } + + // Calculate based on backoff type. + var baseDelay = config.Delay ?? RetryPolicyDefaults.Delay; + var backoff = config.Backoff ?? RetryPolicyDefaults.Backoff; + var maxDelay = config.MaxDelay ?? RetryPolicyDefaults.MaxDelay; + var useJitter = config.UseJitter ?? RetryPolicyDefaults.UseJitter; + + var delay = backoff switch + { + RetryBackoffType.Constant => baseDelay, + RetryBackoffType.Linear => baseDelay * attempt, + RetryBackoffType.Exponential => baseDelay * Math.Pow(2, attempt - 1), + _ => baseDelay * Math.Pow(2, attempt - 1) + }; + + // Cap at max delay. + if (delay > maxDelay) + { + delay = maxDelay; + } + + // Add jitter: +/- 25%. + if (useJitter) + { + var jitterRange = delay.TotalMilliseconds * 0.25; + var jitter = ((Random.Shared.NextDouble() * 2) - 1) * jitterRange; + delay = TimeSpan.FromMilliseconds(Math.Max(0, delay.TotalMilliseconds + jitter)); + } + + return delay; + } +} diff --git a/src/Mocha/src/Mocha/Middlewares/Consume/Retry/RetryFeature.cs b/src/Mocha/src/Mocha/Middlewares/Consume/Retry/RetryFeature.cs new file mode 100644 index 00000000000..03af7aa0955 --- /dev/null +++ b/src/Mocha/src/Mocha/Middlewares/Consume/Retry/RetryFeature.cs @@ -0,0 +1,21 @@ +namespace Mocha; + +/// +/// Provides retry state information to message handlers via +/// context.Features.Get<RetryState>(). +/// Null if retry is not configured via AddResilience. +/// +public sealed class RetryFeature +{ + /// + /// Number of immediate retries already attempted for this delivery round. + /// 0 on the first (original) attempt. + /// + public int ImmediateRetryCount { get; internal set; } + + /// + /// Number of delayed redeliveries already attempted. + /// Read from the delayed-retry-count header. + /// + public int DelayedRetryCount { get; internal set; } +} diff --git a/src/Mocha/src/Mocha/Middlewares/Consume/Retry/RetryPolicyConfig.cs b/src/Mocha/src/Mocha/Middlewares/Consume/Retry/RetryPolicyConfig.cs new file mode 100644 index 00000000000..5c9f2d74f5f --- /dev/null +++ b/src/Mocha/src/Mocha/Middlewares/Consume/Retry/RetryPolicyConfig.cs @@ -0,0 +1,44 @@ +using System.Collections.Immutable; + +namespace Mocha; + +/// +/// Per-exception retry configuration. +/// +public sealed class RetryPolicyConfig +{ + /// + /// Gets whether retry is enabled for this exception. Defaults to true. + /// + public bool Enabled { get; init; } = true; + + /// + /// Gets the number of retry attempts. + /// + public int? Attempts { get; init; } + + /// + /// Gets the base delay between retries. + /// + public TimeSpan? Delay { get; init; } + + /// + /// Gets the backoff strategy. + /// + public RetryBackoffType? Backoff { get; init; } + + /// + /// Gets the maximum delay cap. + /// + public TimeSpan? MaxDelay { get; init; } + + /// + /// Gets whether jitter is enabled. + /// + public bool? UseJitter { get; init; } + + /// + /// Gets the explicit retry intervals. + /// + public ImmutableArray? Intervals { get; init; } +} diff --git a/src/Mocha/src/Mocha/Middlewares/Consume/Retry/RetryPolicyDefaults.cs b/src/Mocha/src/Mocha/Middlewares/Consume/Retry/RetryPolicyDefaults.cs new file mode 100644 index 00000000000..91582f0da18 --- /dev/null +++ b/src/Mocha/src/Mocha/Middlewares/Consume/Retry/RetryPolicyDefaults.cs @@ -0,0 +1,32 @@ +namespace Mocha; + +/// +/// Provides the built-in default values for retry policy configuration. +/// +internal static class RetryPolicyDefaults +{ + /// + /// The default number of retry attempts. Value: 3. + /// + public const int Attempts = 3; + + /// + /// The default base delay between retries. Value: 200ms. + /// + public static readonly TimeSpan Delay = TimeSpan.FromMilliseconds(200); + + /// + /// The default backoff strategy. Value: . + /// + public const RetryBackoffType Backoff = RetryBackoffType.Exponential; + + /// + /// The default jitter setting. Value: true. + /// + public const bool UseJitter = true; + + /// + /// The default maximum delay cap. Value: 30 seconds. + /// + public static readonly TimeSpan MaxDelay = TimeSpan.FromSeconds(30); +} diff --git a/src/Mocha/src/Mocha/Middlewares/Receive/ReceiveMiddlewares.cs b/src/Mocha/src/Mocha/Middlewares/Receive/ReceiveMiddlewares.cs index 629e336a64f..68f256fc943 100644 --- a/src/Mocha/src/Mocha/Middlewares/Receive/ReceiveMiddlewares.cs +++ b/src/Mocha/src/Mocha/Middlewares/Receive/ReceiveMiddlewares.cs @@ -49,6 +49,11 @@ public static class ReceiveMiddlewares public static readonly ReceiveMiddlewareConfiguration MessageTypeSelection = MessageTypeSelectionMiddleware.Create(); + /// + /// The redelivery middleware configuration that reschedules failed messages for later delivery. + /// + public static readonly ReceiveMiddlewareConfiguration Redelivery = ReceiveRedeliveryMiddleware.Create(); + /// /// The routing middleware configuration that dispatches messages to the appropriate consumer. /// diff --git a/src/Mocha/src/Mocha/Middlewares/Receive/Redelivery/ReceiveRedeliveryMiddleware.cs b/src/Mocha/src/Mocha/Middlewares/Receive/Redelivery/ReceiveRedeliveryMiddleware.cs new file mode 100644 index 00000000000..78cf4df766a --- /dev/null +++ b/src/Mocha/src/Mocha/Middlewares/Receive/Redelivery/ReceiveRedeliveryMiddleware.cs @@ -0,0 +1,159 @@ +using System.Collections.Immutable; +using Microsoft.Extensions.DependencyInjection; +using Mocha.Features; +using Mocha.Middlewares; + +namespace Mocha; + +/// +/// A receive middleware that reschedules failed messages for later delivery, releasing the +/// concurrency slot while the message waits for its next attempt. +/// +/// +/// This middleware implements Tier 2 (delayed redelivery) of the retry model. On failure it +/// increments the delayed-retry-count header and dispatches the original envelope back +/// to the same endpoint with a scheduled delivery time. Request/reply messages are excluded +/// because the caller would time out waiting for a response. +/// +internal sealed class ReceiveRedeliveryMiddleware( + ImmutableArray exceptionPolicyRules, + TimeProvider timeProvider, + IMessagingPools pools) +{ + public async ValueTask InvokeAsync(IReceiveContext context, ReceiveDelegate next) + { + // Read the current delayed retry count from headers. + var delayedRetryCount = 0; + + if (context.Headers.TryGetValue(MessageHeaders.Retry.DelayedRetryCount.Key, out var headerValue)) + { + delayedRetryCount = RedeliveryExecutor.ParseDelayedRetryCount(headerValue); + } + + try + { + await next(context); + } + catch (Exception ex) + { + // Request/reply messages must not be redelivered - the caller is waiting. + if (context.Envelope?.ResponseAddress is not null) + { + throw; + } + + var decision = RedeliveryExecutor.Evaluate(exceptionPolicyRules, ex, delayedRetryCount); + + switch (decision.Action) + { + case RedeliveryAction.Discard: + context.Features.GetOrSet().MessageConsumed = true; + return; + + case RedeliveryAction.Redeliver: + var scheduledTime = timeProvider.GetUtcNow().Add(decision.Delay); + await DispatchRedeliveryAsync(context, delayedRetryCount, scheduledTime); + context.Features.GetOrSet().MessageConsumed = true; + return; + + default: + throw; + } + } + } + + private async ValueTask DispatchRedeliveryAsync( + IReceiveContext context, + int delayedRetryCount, + DateTimeOffset scheduledTime) + { + var envelope = + context.Envelope + ?? throw new InvalidOperationException("Cannot redeliver because the receive context has no envelope."); + + if (envelope.Headers is null) + { + throw new InvalidOperationException( + "Cannot increment delayed retry count because the envelope has no headers collection."); + } + + envelope.Headers.Set(MessageHeaders.Retry.DelayedRetryCount.Key, delayedRetryCount + 1); + + // Dispatch the envelope back to the same endpoint with the scheduled time. + // Use the Source address (queue/topic) rather than the endpoint address, because + // transports resolve dispatch endpoints from the topology resource address. + var dispatchEndpoint = context.Runtime.GetDispatchEndpoint(context.Endpoint.Source.Address); + var dispatchContext = pools.DispatchContext.Get(); + + try + { + dispatchContext.Initialize( + context.Services, + dispatchEndpoint, + context.Runtime, + context.MessageType, + context.CancellationToken); + + dispatchContext.Envelope = envelope; + dispatchContext.ScheduledTime = scheduledTime; + + await dispatchEndpoint.ExecuteAsync(dispatchContext); + } + finally + { + pools.DispatchContext.Return(dispatchContext); + } + } + + public static ReceiveMiddlewareConfiguration Create() + => new( + static (context, next) => + { + // Resolve exception policy feature from the most specific scope. + var feature = context.GetExceptionPolicyFeature(); + + if (feature is null) + { + // No exception policy configured - skip redelivery middleware entirely. + return next; + } + + var timeProvider = context.Services.GetRequiredService(); + var pools = context.Services.GetRequiredService(); + + var middleware = new ReceiveRedeliveryMiddleware(feature.Rules.ToImmutableArray(), timeProvider, pools); + + return ctx => middleware.InvokeAsync(ctx, next); + }, + "Redelivery"); +} + +file static class Extensions +{ + /// + /// Resolves exception policy feature with the most specific scope taking precedence. + /// Endpoint -> Transport -> Bus. + /// + public static ExceptionPolicyFeature? GetExceptionPolicyFeature(this ReceiveMiddlewareFactoryContext context) + { + var busFeatures = context.Services.GetRequiredService(); + + // Endpoint -> Transport -> Bus (most specific first). + if (context.Endpoint.Features.TryGet(out ExceptionPolicyFeature? endpointFeature)) + { + return endpointFeature; + } + + if (context.Transport.Features.TryGet(out ExceptionPolicyFeature? transportFeature)) + { + return transportFeature; + } + + if (busFeatures.TryGet(out ExceptionPolicyFeature? busFeature)) + { + return busFeature; + } + + return null; + } +} diff --git a/src/Mocha/src/Mocha/Middlewares/Receive/Redelivery/RedeliveryAction.cs b/src/Mocha/src/Mocha/Middlewares/Receive/Redelivery/RedeliveryAction.cs new file mode 100644 index 00000000000..91205429ae8 --- /dev/null +++ b/src/Mocha/src/Mocha/Middlewares/Receive/Redelivery/RedeliveryAction.cs @@ -0,0 +1,22 @@ +namespace Mocha; + +/// +/// Specifies the action the redelivery middleware should take for a failed message. +/// +internal enum RedeliveryAction +{ + /// + /// Re-throw the exception; let outer middleware handle it. + /// + Rethrow, + + /// + /// Discard the message; swallow the exception. + /// + Discard, + + /// + /// Redeliver the message after a delay. + /// + Redeliver +} diff --git a/src/Mocha/src/Mocha/Middlewares/Receive/Redelivery/RedeliveryDecision.cs b/src/Mocha/src/Mocha/Middlewares/Receive/Redelivery/RedeliveryDecision.cs new file mode 100644 index 00000000000..e613bc92343 --- /dev/null +++ b/src/Mocha/src/Mocha/Middlewares/Receive/Redelivery/RedeliveryDecision.cs @@ -0,0 +1,41 @@ +namespace Mocha; + +/// +/// The result of evaluating exception policy rules for a redelivery decision. +/// +internal readonly struct RedeliveryDecision +{ + private RedeliveryDecision(RedeliveryAction action, TimeSpan delay = default) + { + Action = action; + Delay = delay; + } + + /// + /// Gets the action to take. + /// + public RedeliveryAction Action { get; } + + /// + /// Gets the delay before redelivery. Only meaningful when is + /// . + /// + public TimeSpan Delay { get; } + + /// + /// A decision to re-throw the exception. + /// + public static readonly RedeliveryDecision Rethrow = new(RedeliveryAction.Rethrow); + + /// + /// A decision to discard the message. + /// + public static readonly RedeliveryDecision Discard = new(RedeliveryAction.Discard); + + /// + /// Creates a decision to redeliver the message after the specified delay. + /// + /// The delay before redelivery. + /// A redeliver decision. + public static RedeliveryDecision Redeliver(TimeSpan delay) => new(RedeliveryAction.Redeliver, delay); +} diff --git a/src/Mocha/src/Mocha/Middlewares/Receive/Redelivery/RedeliveryExecutor.cs b/src/Mocha/src/Mocha/Middlewares/Receive/Redelivery/RedeliveryExecutor.cs new file mode 100644 index 00000000000..c2897fb69f9 --- /dev/null +++ b/src/Mocha/src/Mocha/Middlewares/Receive/Redelivery/RedeliveryExecutor.cs @@ -0,0 +1,116 @@ +using System.Collections.Immutable; + +namespace Mocha; + +/// +/// Provides delay calculation and exception evaluation logic for the redelivery middleware. +/// +internal static class RedeliveryExecutor +{ + /// + /// Evaluates exception policy rules and determines whether to rethrow, discard, or redeliver. + /// + /// The exception policy rules to evaluate. + /// The exception that was thrown. + /// The current delayed retry count. + /// A indicating the action to take. + internal static RedeliveryDecision Evaluate( + ImmutableArray rules, + Exception exception, + int delayedRetryCount) + { + // Match exception against policy rules. + var rule = ExceptionPolicyMatcher.Match(rules, exception); + + // No matching rule — no policy for this exception. + if (rule is null) + { + return RedeliveryDecision.Rethrow; + } + + // Discard: swallow at receive level. + if (rule.Terminal == TerminalAction.Discard) + { + return RedeliveryDecision.Discard; + } + + // DeadLetter: skip redelivery, let fault middleware handle. + if (rule.Terminal == TerminalAction.DeadLetter) + { + return RedeliveryDecision.Rethrow; + } + + // No redelivery configured for this rule, or redelivery explicitly disabled. + if (rule.Redelivery is null or { Enabled: false }) + { + return RedeliveryDecision.Rethrow; + } + + var redeliveryConfig = rule.Redelivery; + + // Check if redelivery attempts remain. + var maxAttempts = redeliveryConfig.Attempts ?? redeliveryConfig.Intervals?.Length ?? 0; + + if (delayedRetryCount >= maxAttempts) + { + return RedeliveryDecision.Rethrow; + } + + // Calculate the delay for this redelivery attempt. + var delay = CalculateDelay(delayedRetryCount, redeliveryConfig); + return RedeliveryDecision.Redeliver(delay); + } + + /// + /// Parses a delayed retry count from a message header value. + /// + /// The raw header value. + /// The parsed integer count, or 0 if the value cannot be parsed. + internal static int ParseDelayedRetryCount(object? headerValue) + { + return headerValue switch + { + int i => i, + long l => (int)l, + double d => (int)d, + _ => 0 + }; + } + + internal static TimeSpan CalculateDelay(int attempt, RedeliveryPolicyConfig config) + { + TimeSpan baseDelay; + + if (config.Intervals is { Length: > 0 } intervals) + { + // Explicit intervals: use array index, clamp to last. + baseDelay = intervals[Math.Min(attempt, intervals.Length - 1)]; + } + else + { + // Calculated: BaseDelay * (attempt + 1). + var configuredBaseDelay = config.BaseDelay ?? RedeliveryPolicyDefaults.Intervals[0]; + baseDelay = configuredBaseDelay * (attempt + 1); + } + + // Cap by MaxDelay. + var maxDelay = config.MaxDelay ?? RedeliveryPolicyDefaults.MaxDelay; + + if (baseDelay > maxDelay) + { + baseDelay = maxDelay; + } + + // Add jitter: +/- 25%. + var useJitter = config.UseJitter ?? RedeliveryPolicyDefaults.UseJitter; + + if (useJitter) + { + var jitterRange = baseDelay.TotalMilliseconds * 0.25; + var jitter = ((Random.Shared.NextDouble() * 2) - 1) * jitterRange; + baseDelay = TimeSpan.FromMilliseconds(Math.Max(0, baseDelay.TotalMilliseconds + jitter)); + } + + return baseDelay; + } +} diff --git a/src/Mocha/test/Mocha.Tests/Middlewares/Consume/Retry/ExceptionPolicyMatcherTests.cs b/src/Mocha/test/Mocha.Tests/Middlewares/Consume/Retry/ExceptionPolicyMatcherTests.cs new file mode 100644 index 00000000000..8396879e4a4 --- /dev/null +++ b/src/Mocha/test/Mocha.Tests/Middlewares/Consume/Retry/ExceptionPolicyMatcherTests.cs @@ -0,0 +1,148 @@ +using System.Collections.Immutable; + +namespace Mocha.Tests.Middlewares.Consume.Retry; + +public sealed class ExceptionPolicyMatcherTests +{ + [Fact] + public void Match_Should_ReturnNull_When_RulesListIsEmpty() + { + // arrange + var rules = ImmutableArray.Empty; + var exception = new InvalidOperationException("test"); + + // act + var result = ExceptionPolicyMatcher.Match(rules, exception); + + // assert + Assert.Null(result); + } + + [Fact] + public void Match_Should_ReturnNull_When_NoRuleMatchesExceptionType() + { + // arrange + var rules = ImmutableArray.Create(CreateRule()); + var exception = new InvalidOperationException("test"); + + // act + var result = ExceptionPolicyMatcher.Match(rules, exception); + + // assert + Assert.Null(result); + } + + [Fact] + public void Match_Should_ReturnRule_When_ExceptionTypeMatchesExactly() + { + // arrange + var rule = CreateRule(); + var rules = ImmutableArray.Create(rule); + var exception = new InvalidOperationException("test"); + + // act + var result = ExceptionPolicyMatcher.Match(rules, exception); + + // assert + Assert.Same(rule, result); + } + + [Fact] + public void Match_Should_ReturnRule_When_ExceptionIsDerivedType() + { + // arrange + var rule = CreateRule(); + var rules = ImmutableArray.Create(rule); + var exception = new InvalidOperationException("test"); + + // act + var result = ExceptionPolicyMatcher.Match(rules, exception); + + // assert + Assert.Same(rule, result); + } + + [Fact] + public void Match_Should_ReturnMostSpecificRule_When_MultipleRulesMatchAtDifferentDepths() + { + // arrange + var baseRule = CreateRule(); + var specificRule = CreateRule(); + var rules = ImmutableArray.Create(baseRule, specificRule); + var exception = new InvalidOperationException("test"); + + // act + var result = ExceptionPolicyMatcher.Match(rules, exception); + + // assert + Assert.Same(specificRule, result); + } + + [Fact] + public void Match_Should_ReturnNull_When_TypeMatchesButPredicateReturnsFalse() + { + // arrange + var rule = CreateRule(predicate: _ => false); + var rules = ImmutableArray.Create(rule); + var exception = new InvalidOperationException("test"); + + // act + var result = ExceptionPolicyMatcher.Match(rules, exception); + + // assert + Assert.Null(result); + } + + [Fact] + public void Match_Should_ReturnRule_When_TypeMatchesAndPredicateReturnsTrue() + { + // arrange + var rule = CreateRule(predicate: _ => true); + var rules = ImmutableArray.Create(rule); + var exception = new InvalidOperationException("test"); + + // act + var result = ExceptionPolicyMatcher.Match(rules, exception); + + // assert + Assert.Same(rule, result); + } + + [Fact] + public void Match_Should_ReturnMoreSpecificRule_When_LessSpecificRuleHasPredicate() + { + // arrange + var baseRuleWithPredicate = CreateRule(predicate: _ => true); + var specificRule = CreateRule(); + var rules = ImmutableArray.Create(baseRuleWithPredicate, specificRule); + var exception = new InvalidOperationException("test"); + + // act + var result = ExceptionPolicyMatcher.Match(rules, exception); + + // assert + Assert.Same(specificRule, result); + } + + [Fact] + public void Match_Should_ReturnPassingRule_When_TwoRulesForSameTypeAndOnePredicateFails() + { + // arrange + var failingRule = CreateRule(predicate: _ => false); + var passingRule = CreateRule(predicate: _ => true); + var rules = ImmutableArray.Create(failingRule, passingRule); + var exception = new ArgumentNullException("param"); + + // act + var result = ExceptionPolicyMatcher.Match(rules, exception); + + // assert + Assert.Same(passingRule, result); + } + + private static ExceptionPolicyRule CreateRule(Func? predicate = null) + where TException : Exception + { + return new ExceptionPolicyRule { ExceptionType = typeof(TException), Predicate = predicate }; + } +} diff --git a/src/Mocha/test/Mocha.Tests/Middlewares/Consume/Retry/RetryExecutorTests.cs b/src/Mocha/test/Mocha.Tests/Middlewares/Consume/Retry/RetryExecutorTests.cs new file mode 100644 index 00000000000..2e5de6b8f33 --- /dev/null +++ b/src/Mocha/test/Mocha.Tests/Middlewares/Consume/Retry/RetryExecutorTests.cs @@ -0,0 +1,917 @@ +using System.Collections.Immutable; +using System.Diagnostics; + +namespace Mocha.Tests.Middlewares.Consume.Retry; + +public sealed class RetryExecutorTests +{ + [Fact] + public async Task ExecuteAsync_Should_Succeed_When_ActionDoesNotThrow() + { + // arrange + var rules = BuildRules(p => p.On().Retry()); + var counter = new Counter(); + + // act + await RetryExecutor.ExecuteAsync( + rules, + counter, + static (s) => + { + s.Increment(); + return ValueTask.CompletedTask; + }, + onRetry: null, + cancellationToken: default); + + // assert + Assert.Equal(1, counter.Count); + } + + [Fact] + public async Task ExecuteAsync_Should_Throw_When_NoRuleMatchesException() + { + // arrange + var rules = BuildRules(p => p.On().Retry()); + + // act & assert + await Assert.ThrowsAsync(() => + RetryExecutor + .ExecuteAsync( + rules, + 0, + static (_) => throw new InvalidOperationException("no match"), + onRetry: null, + cancellationToken: default) + .AsTask() + ); + } + + [Fact] + public async Task ExecuteAsync_Should_Return_When_TerminalIsDiscard() + { + // arrange + var rules = BuildRules(p => p.On().Discard()); + + // act - should not throw + await RetryExecutor.ExecuteAsync( + rules, + 0, + static (_) => throw new InvalidOperationException("discard me"), + onRetry: null, + cancellationToken: default); + } + + [Fact] + public async Task ExecuteAsync_Should_Throw_When_TerminalIsDeadLetter() + { + // arrange + var rules = BuildRules(p => p.On().DeadLetter()); + + // act & assert + await Assert.ThrowsAsync(() => + RetryExecutor + .ExecuteAsync( + rules, + 0, + static (_) => throw new InvalidOperationException("dead letter"), + onRetry: null, + cancellationToken: default) + .AsTask() + ); + } + + [Fact] + public async Task ExecuteAsync_Should_Throw_When_RetryIsDisabled() + { + // arrange - Redeliver() sets Retry.Enabled = false + var rules = BuildRules(p => p.On().Redeliver()); + + // act & assert + await Assert.ThrowsAsync(() => + RetryExecutor + .ExecuteAsync( + rules, + 0, + static (_) => throw new InvalidOperationException("no retry"), + onRetry: null, + cancellationToken: default) + .AsTask() + ); + } + + [Fact] + public async Task ExecuteAsync_Should_RetryAndSucceed_When_ActionFailsThenSucceeds() + { + // arrange + var rules = BuildRules(p => + p.On().Retry(3, TimeSpan.Zero, RetryBackoffType.Constant) + ); + var counter = new Counter(); + + // act + await RetryExecutor.ExecuteAsync( + rules, + counter, + static (s) => + { + s.Increment(); + + if (s.Count == 1) + { + throw new InvalidOperationException("transient"); + } + + return ValueTask.CompletedTask; + }, + onRetry: null, + cancellationToken: default); + + // assert + Assert.Equal(2, counter.Count); + } + + [Fact] + public async Task ExecuteAsync_Should_Throw_When_AllRetriesExhausted() + { + // arrange + var rules = BuildRules(p => + p.On().Retry(2, TimeSpan.Zero, RetryBackoffType.Constant) + ); + var counter = new Counter(); + + // act & assert + await Assert.ThrowsAsync(() => + RetryExecutor + .ExecuteAsync( + rules, + counter, + static (s) => + { + s.Increment(); + throw new InvalidOperationException("always fails"); + }, + onRetry: null, + cancellationToken: default) + .AsTask() + ); + + Assert.Equal(3, counter.Count); + } + + [Fact] + public async Task ExecuteAsync_Should_InvokeOnRetry_When_Retrying() + { + // arrange + var rules = BuildRules(p => + p.On().Retry(3, TimeSpan.Zero, RetryBackoffType.Constant) + ); + var counter = new Counter(); + var retryAttempts = new List(); + + // act + await RetryExecutor.ExecuteAsync( + rules, + (Counter: counter, Attempts: retryAttempts), + static (s) => + { + s.Counter.Increment(); + + if (s.Counter.Count <= 2) + { + throw new InvalidOperationException("transient"); + } + + return ValueTask.CompletedTask; + }, + onRetry: static (s, attempt) => s.Attempts.Add(attempt), + cancellationToken: default); + + // assert + Assert.Equal(3, counter.Count); + Assert.Equal(2, retryAttempts.Count); + Assert.Equal(1, retryAttempts[0]); + Assert.Equal(2, retryAttempts[1]); + } + + [Fact] + public async Task ExecuteAsync_Should_NotInvokeOnRetry_When_ActionSucceeds() + { + // arrange + var rules = BuildRules(p => + p.On().Retry(3, TimeSpan.Zero, RetryBackoffType.Constant) + ); + + // act + await RetryExecutor.ExecuteAsync( + rules, + 0, + static (_) => ValueTask.CompletedTask, + onRetry: static (_, _) => throw new InvalidOperationException("should not be called"), + cancellationToken: default); + } + + // -- CalculateDelay tests -- + + [Theory] + [InlineData(1)] + [InlineData(2)] + [InlineData(3)] + [InlineData(5)] + public void CalculateDelay_Should_ReturnConstantDelay_When_BackoffIsConstant(int attempt) + { + // arrange + var config = new RetryPolicyConfig + { + Backoff = RetryBackoffType.Constant, + Delay = TimeSpan.FromMilliseconds(100), + UseJitter = false, + MaxDelay = TimeSpan.FromSeconds(30) + }; + + // act + var delay = RetryExecutor.CalculateDelay(attempt, config); + + // assert + Assert.Equal(TimeSpan.FromMilliseconds(100), delay); + } + + [Fact] + public void CalculateDelay_Should_ReturnLinearDelay_When_BackoffIsLinear() + { + // arrange + var config = new RetryPolicyConfig + { + Backoff = RetryBackoffType.Linear, + Delay = TimeSpan.FromMilliseconds(100), + UseJitter = false, + MaxDelay = TimeSpan.FromSeconds(30) + }; + + // act & assert + Assert.Equal(TimeSpan.FromMilliseconds(100), RetryExecutor.CalculateDelay(1, config)); + Assert.Equal(TimeSpan.FromMilliseconds(200), RetryExecutor.CalculateDelay(2, config)); + Assert.Equal(TimeSpan.FromMilliseconds(300), RetryExecutor.CalculateDelay(3, config)); + } + + [Fact] + public void CalculateDelay_Should_ReturnExponentialDelay_When_BackoffIsExponential() + { + // arrange + var config = new RetryPolicyConfig + { + Backoff = RetryBackoffType.Exponential, + Delay = TimeSpan.FromMilliseconds(100), + UseJitter = false, + MaxDelay = TimeSpan.FromSeconds(30) + }; + + // act & assert + Assert.Equal(TimeSpan.FromMilliseconds(100), RetryExecutor.CalculateDelay(1, config)); + Assert.Equal(TimeSpan.FromMilliseconds(200), RetryExecutor.CalculateDelay(2, config)); + Assert.Equal(TimeSpan.FromMilliseconds(400), RetryExecutor.CalculateDelay(3, config)); + } + + [Fact] + public void CalculateDelay_Should_CapAtMaxDelay_When_DelayExceedsMax() + { + // arrange + var config = new RetryPolicyConfig + { + Backoff = RetryBackoffType.Exponential, + Delay = TimeSpan.FromSeconds(1), + MaxDelay = TimeSpan.FromSeconds(2), + UseJitter = false + }; + + // act + var delay = RetryExecutor.CalculateDelay(10, config); + + // assert + Assert.Equal(TimeSpan.FromSeconds(2), delay); + } + + [Fact] + public void CalculateDelay_Should_UseExplicitIntervals_When_IntervalsProvided() + { + // arrange + var config = new RetryPolicyConfig + { + Intervals = [TimeSpan.FromMilliseconds(10), TimeSpan.FromMilliseconds(50), TimeSpan.FromMilliseconds(100)], + UseJitter = false + }; + + // act & assert + Assert.Equal(TimeSpan.FromMilliseconds(10), RetryExecutor.CalculateDelay(1, config)); + Assert.Equal(TimeSpan.FromMilliseconds(50), RetryExecutor.CalculateDelay(2, config)); + Assert.Equal(TimeSpan.FromMilliseconds(100), RetryExecutor.CalculateDelay(3, config)); + } + + [Fact] + public void CalculateDelay_Should_ClampToLastInterval_When_AttemptExceedsIntervalCount() + { + // arrange + var config = new RetryPolicyConfig + { + Intervals = [TimeSpan.FromMilliseconds(10), TimeSpan.FromMilliseconds(50)], + UseJitter = false + }; + + // act + var delay = RetryExecutor.CalculateDelay(3, config); + + // assert + Assert.Equal(TimeSpan.FromMilliseconds(50), delay); + } + + // -- Multi-rule policy scenarios -- + + [Fact] + public async Task ExecuteAsync_Should_Discard_When_SpecificExceptionMatchesDiscardRule() + { + // arrange + var rules = BuildRules(p => + { + p.On().Retry(3, TimeSpan.Zero, RetryBackoffType.Constant); + p.On().Discard(); + }); + var counter = new Counter(); + + // act + await RetryExecutor.ExecuteAsync( + rules, + counter, + static (s) => + { + s.Increment(); + throw new ArgumentNullException("param"); + }, + onRetry: null, + cancellationToken: default); + + // assert + Assert.Equal(1, counter.Count); + } + + [Fact] + public async Task ExecuteAsync_Should_DeadLetter_When_SpecificExceptionMatchesDeadLetterRule() + { + // arrange + var rules = BuildRules(p => + { + p.On().Retry(3, TimeSpan.Zero, RetryBackoffType.Constant); + p.On().DeadLetter(); + }); + var counter = new Counter(); + + // act & assert + await Assert.ThrowsAsync(() => + RetryExecutor + .ExecuteAsync( + rules, + counter, + static (s) => + { + s.Increment(); + throw new InvalidOperationException("dead letter"); + }, + onRetry: null, + cancellationToken: default) + .AsTask() + ); + + Assert.Equal(1, counter.Count); + } + + [Fact] + public async Task ExecuteAsync_Should_RetryWithBaseRule_When_DerivedExceptionHasNoSpecificRule() + { + // arrange + var rules = BuildRules(p => p.On().Retry(2, TimeSpan.Zero, RetryBackoffType.Constant)); + var counter = new Counter(); + + // act & assert + await Assert.ThrowsAsync(() => + RetryExecutor + .ExecuteAsync( + rules, + counter, + static (s) => + { + s.Increment(); + throw new ArgumentException("derived"); + }, + onRetry: null, + cancellationToken: default) + .AsTask() + ); + + Assert.Equal(3, counter.Count); + } + + [Fact] + public async Task ExecuteAsync_Should_UseMostSpecificRule_When_MultipleRulesMatch() + { + // arrange + var rules = BuildRules(p => + { + p.On().Retry(5, TimeSpan.Zero, RetryBackoffType.Constant); + p.On().Retry(1, TimeSpan.Zero, RetryBackoffType.Constant); + }); + var counter = new Counter(); + + // act & assert + await Assert.ThrowsAsync(() => + RetryExecutor + .ExecuteAsync( + rules, + counter, + static (s) => + { + s.Increment(); + throw new InvalidOperationException("specific"); + }, + onRetry: null, + cancellationToken: default) + .AsTask() + ); + + Assert.Equal(2, counter.Count); + } + + [Fact] + public async Task ExecuteAsync_Should_Discard_When_PredicateMatches() + { + // arrange + var rules = BuildRules(p => + p.On(static ex => ex.Message.Contains("transient")).Discard() + ); + + // act - should not throw + await RetryExecutor.ExecuteAsync( + rules, + 0, + static (_) => throw new InvalidOperationException("transient failure"), + onRetry: null, + cancellationToken: default); + } + + [Fact] + public async Task ExecuteAsync_Should_Throw_When_PredicateDoesNotMatch() + { + // arrange + var rules = BuildRules(p => + p.On(static ex => ex.Message.Contains("transient")).Discard() + ); + + // act & assert + await Assert.ThrowsAsync(() => + RetryExecutor + .ExecuteAsync( + rules, + 0, + static (_) => throw new InvalidOperationException("permanent failure"), + onRetry: null, + cancellationToken: default) + .AsTask() + ); + } + + [Fact] + public async Task ExecuteAsync_Should_RetryThenDiscard_When_DifferentExceptionsThrown() + { + // arrange + var rules = BuildRules(p => + { + p.On().Retry(3, TimeSpan.Zero, RetryBackoffType.Constant); + p.On().Discard(); + }); + var counter = new Counter(); + + // act + await RetryExecutor.ExecuteAsync( + rules, + counter, + static (s) => + { + s.Increment(); + + if (s.Count == 1) + { + throw new InvalidOperationException("retry this"); + } + + throw new ArgumentException("discard this"); + }, + onRetry: null, + cancellationToken: default); + + // assert + Assert.Equal(2, counter.Count); + } + + [Fact] + public async Task ExecuteAsync_Should_RetryThenSucceed_When_ExceptionChangesOnRetry() + { + // arrange + var rules = BuildRules(p => + p.On().Retry(3, TimeSpan.Zero, RetryBackoffType.Constant) + ); + var counter = new Counter(); + + // act + await RetryExecutor.ExecuteAsync( + rules, + counter, + static (s) => + { + s.Increment(); + + if (s.Count <= 2) + { + throw new InvalidOperationException("transient"); + } + + return ValueTask.CompletedTask; + }, + onRetry: null, + cancellationToken: default); + + // assert + Assert.Equal(3, counter.Count); + } + + [Fact] + public async Task ExecuteAsync_Should_Throw_When_RulesListIsEmpty() + { + // arrange + var rules = BuildRules(_ => { }); + + // act & assert + await Assert.ThrowsAsync(() => + RetryExecutor + .ExecuteAsync( + rules, + 0, + static (_) => throw new InvalidOperationException("no rules"), + onRetry: null, + cancellationToken: default) + .AsTask() + ); + } + + [Fact] + public async Task ExecuteAsync_Should_RetryExactlyConfiguredAttempts_When_AlwaysFailing() + { + // arrange + var rules = BuildRules(p => p.On().Retry(5, TimeSpan.Zero, RetryBackoffType.Constant)); + var counter = new Counter(); + + // act & assert + await Assert.ThrowsAsync(() => + RetryExecutor + .ExecuteAsync( + rules, + counter, + static (s) => + { + s.Increment(); + throw new InvalidOperationException("always fails"); + }, + onRetry: null, + cancellationToken: default) + .AsTask() + ); + + Assert.Equal(6, counter.Count); + } + + [Fact] + public async Task ExecuteAsync_Should_TrackCorrectAttemptNumbers_When_Retrying() + { + // arrange + var rules = BuildRules(p => p.On().Retry(4, TimeSpan.Zero, RetryBackoffType.Constant)); + var counter = new Counter(); + var retryAttempts = new List(); + + // act & assert + await Assert.ThrowsAsync(() => + RetryExecutor + .ExecuteAsync( + rules, + (Counter: counter, Attempts: retryAttempts), + static (s) => + { + s.Counter.Increment(); + throw new InvalidOperationException("always fails"); + }, + onRetry: static (s, attempt) => s.Attempts.Add(attempt), + cancellationToken: default) + .AsTask() + ); + + Assert.Equal([1, 2, 3, 4], retryAttempts); + } + + [Fact] + public async Task ExecuteAsync_Should_UseDefaultAttempts_When_ParameterlessRetry() + { + // arrange - parameterless Retry() uses RetryPolicyDefaults.Attempts (3) + var rules = BuildRules(p => p.On().Retry()); + var counter = new Counter(); + + // act & assert + await Assert.ThrowsAsync(() => + RetryExecutor + .ExecuteAsync( + rules, + counter, + static (s) => + { + s.Increment(); + throw new InvalidOperationException("always fails"); + }, + onRetry: null, + cancellationToken: default) + .AsTask() + ); + + // RetryPolicyDefaults.Attempts is 3, so 1 original + 3 retries = 4 + Assert.Equal(4, counter.Count); + } + + [Fact] + public async Task ExecuteAsync_Should_ThrowOperationCanceled_When_CancellationRequestedDuringDelay() + { + // arrange - use a short real delay with pre-cancelled token to verify cancellation propagation + var rules = BuildRules(p => p.On().Retry(3, TimeSpan.FromSeconds(1), RetryBackoffType.Constant)); + using var cts = new CancellationTokenSource(); + var counter = new Counter(); + + // Pre-cancel so the delay will throw OperationCanceledException immediately + cts.Cancel(); + + // act & assert + await Assert.ThrowsAnyAsync(() => + RetryExecutor + .ExecuteAsync( + rules, + counter, + static (s) => + { + s.Increment(); + throw new InvalidOperationException("fail"); + }, + onRetry: null, + cts.Token) + .AsTask() + ); + + Assert.Equal(1, counter.Count); + } + + [Fact] + public async Task ExecuteAsync_Should_WaitForDelay_When_RetryDelayIsConfigured() + { + // arrange + var rules = BuildRules(p => + p.On().Retry(1, TimeSpan.FromMilliseconds(200), RetryBackoffType.Constant) + ); + var counter = new Counter(); + var sw = Stopwatch.StartNew(); + + // act & assert - the executor should wait for the retry delay before the second attempt + await Assert.ThrowsAsync(() => + RetryExecutor + .ExecuteAsync( + rules, + counter, + static (s) => + { + s.Increment(); + throw new InvalidOperationException("fail"); + }, + onRetry: null, + cancellationToken: default) + .AsTask() + ); + + sw.Stop(); + + // assert - delay of at least 200ms should have been applied + Assert.Equal(2, counter.Count); + Assert.True(sw.Elapsed >= TimeSpan.FromMilliseconds(150), + $"Expected delay of ~200ms but elapsed was {sw.Elapsed.TotalMilliseconds}ms"); + } + + [Fact] + public async Task ExecuteAsync_Should_RetryWithExplicitIntervals_When_IntervalsConfigured() + { + // arrange + var rules = BuildRules(p => p.On().Retry([TimeSpan.Zero, TimeSpan.Zero, TimeSpan.Zero])); + var counter = new Counter(); + + // act + await RetryExecutor.ExecuteAsync( + rules, + counter, + static (s) => + { + s.Increment(); + + if (s.Count <= 2) + { + throw new InvalidOperationException("transient"); + } + + return ValueTask.CompletedTask; + }, + onRetry: null, + cancellationToken: default); + + // assert + Assert.Equal(3, counter.Count); + } + + [Fact] + public async Task ExecuteAsync_Should_PropagateOriginalException_When_RetriesExhausted() + { + // arrange + var rules = BuildRules(p => p.On().Retry(1, TimeSpan.Zero, RetryBackoffType.Constant)); + + // act & assert + var ex = await Assert.ThrowsAsync(() => + RetryExecutor + .ExecuteAsync( + rules, + 0, + static (_) => throw new InvalidOperationException("specific message"), + onRetry: null, + cancellationToken: default) + .AsTask() + ); + + Assert.Equal("specific message", ex.Message); + } + + [Fact] + public async Task ExecuteAsync_Should_UseDefaultPolicy_When_DefaultConfigured() + { + // arrange - Default() is equivalent to On() + var rules = BuildRules(p => p.Default().Retry(2, TimeSpan.Zero, RetryBackoffType.Constant)); + var counter = new Counter(); + + // act & assert + await Assert.ThrowsAsync(() => + RetryExecutor + .ExecuteAsync( + rules, + counter, + static (s) => + { + s.Increment(); + throw new ArgumentException("caught by default"); + }, + onRetry: null, + cancellationToken: default) + .AsTask() + ); + + Assert.Equal(3, counter.Count); + } + + [Fact] + public async Task ExecuteAsync_Should_OverrideDefaultWithSpecific_When_BothConfigured() + { + // arrange + var rules = BuildRules(p => + { + p.Default().Retry(5, TimeSpan.Zero, RetryBackoffType.Constant); + p.On().Discard(); + }); + var counter = new Counter(); + + // act - InvalidOperationException should be discarded, not retried + await RetryExecutor.ExecuteAsync( + rules, + counter, + static (s) => + { + s.Increment(); + throw new InvalidOperationException("discarded despite default retry"); + }, + onRetry: null, + cancellationToken: default); + + Assert.Equal(1, counter.Count); + } + + [Fact] + public async Task ExecuteAsync_Should_FallThroughToDefault_When_NoSpecificRuleMatches() + { + // arrange + var rules = BuildRules(p => + { + p.On().Discard(); + p.Default().Retry(2, TimeSpan.Zero, RetryBackoffType.Constant); + }); + var counter = new Counter(); + + // act & assert - ArgumentException falls through to Default (Exception) rule + await Assert.ThrowsAsync(() => + RetryExecutor + .ExecuteAsync( + rules, + counter, + static (s) => + { + s.Increment(); + throw new ArgumentException("falls to default"); + }, + onRetry: null, + cancellationToken: default) + .AsTask() + ); + + Assert.Equal(3, counter.Count); + } + + [Fact] + public async Task ExecuteAsync_Should_RetryThenThrow_When_RetryWithThenDeadLetter() + { + // arrange - ThenDeadLetter() is metadata for fault middleware, + // the executor should still retry before propagating. + var rules = BuildRules(p => + p.On().Retry(2, TimeSpan.Zero, RetryBackoffType.Constant).ThenDeadLetter() + ); + var counter = new Counter(); + + // act & assert + await Assert.ThrowsAsync(() => + RetryExecutor + .ExecuteAsync( + rules, + counter, + static (s) => + { + s.Increment(); + throw new InvalidOperationException("exhaust then dead letter"); + }, + onRetry: null, + cancellationToken: default) + .AsTask() + ); + + // 1 original + 2 retries = 3 + Assert.Equal(3, counter.Count); + } + + [Fact] + public async Task ExecuteAsync_Should_RetryThenThrow_When_FullChainConfigured() + { + // arrange - Retry(2).ThenRedeliver().ThenDeadLetter() + // The executor only handles retry; redelivery and terminal are for other layers. + var rules = BuildRules(p => + p.On() + .Retry(2, TimeSpan.Zero, RetryBackoffType.Constant) + .ThenRedeliver() + .ThenDeadLetter() + ); + var counter = new Counter(); + + // act & assert + await Assert.ThrowsAsync(() => + RetryExecutor + .ExecuteAsync( + rules, + counter, + static (s) => + { + s.Increment(); + throw new InvalidOperationException("full chain"); + }, + onRetry: null, + cancellationToken: default) + .AsTask() + ); + + // 1 original + 2 retries = 3 + Assert.Equal(3, counter.Count); + } + + // -- Helpers -- + + private static ImmutableArray BuildRules(Action configure) + { + var feature = new ExceptionPolicyFeature(); + feature.Configure(configure); + return [.. feature.Rules]; + } + + private sealed class Counter + { + public int Count { get; private set; } + + public void Increment() => Count++; + } +} diff --git a/src/Mocha/test/Mocha.Tests/Middlewares/Receive/Redelivery/RedeliveryExecutorTests.cs b/src/Mocha/test/Mocha.Tests/Middlewares/Receive/Redelivery/RedeliveryExecutorTests.cs new file mode 100644 index 00000000000..cd5141b0560 --- /dev/null +++ b/src/Mocha/test/Mocha.Tests/Middlewares/Receive/Redelivery/RedeliveryExecutorTests.cs @@ -0,0 +1,501 @@ +using System.Collections.Immutable; + +namespace Mocha.Tests.Middlewares.Receive.Redelivery; + +public sealed class RedeliveryExecutorTests +{ + [Fact] + public void CalculateDelay_Should_ReturnCorrectInterval_When_ExplicitIntervalsProvided() + { + // arrange + var config = new RedeliveryPolicyConfig + { + Intervals = ImmutableArray.Create( + TimeSpan.FromMinutes(5), + TimeSpan.FromMinutes(15), + TimeSpan.FromMinutes(30)), + UseJitter = false + }; + + // act & assert + Assert.Equal(TimeSpan.FromMinutes(5), RedeliveryExecutor.CalculateDelay(0, config)); + Assert.Equal(TimeSpan.FromMinutes(15), RedeliveryExecutor.CalculateDelay(1, config)); + Assert.Equal(TimeSpan.FromMinutes(30), RedeliveryExecutor.CalculateDelay(2, config)); + } + + [Fact] + public void CalculateDelay_Should_ClampToLastInterval_When_AttemptExceedsIntervalCount() + { + // arrange + var config = new RedeliveryPolicyConfig + { + Intervals = ImmutableArray.Create(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15)), + UseJitter = false + }; + + // act + var delay = RedeliveryExecutor.CalculateDelay(5, config); + + // assert + Assert.Equal(TimeSpan.FromMinutes(15), delay); + } + + [Theory] + [InlineData(0, 10)] // BaseDelay * (0 + 1) = 10min + [InlineData(1, 20)] // BaseDelay * (1 + 1) = 20min + [InlineData(2, 30)] // BaseDelay * (2 + 1) = 30min + public void CalculateDelay_Should_ReturnLinearDelay_When_NoIntervalsProvided(int attempt, int expectedMinutes) + { + // arrange + var config = new RedeliveryPolicyConfig + { + BaseDelay = TimeSpan.FromMinutes(10), + UseJitter = false, + MaxDelay = TimeSpan.FromHours(2) + }; + + // act + var delay = RedeliveryExecutor.CalculateDelay(attempt, config); + + // assert + Assert.Equal(TimeSpan.FromMinutes(expectedMinutes), delay); + } + + [Fact] + public void CalculateDelay_Should_UseDefaultBaseDelay_When_NoBaseDelayAndNoIntervals() + { + // arrange - no BaseDelay, no Intervals => uses RedeliveryPolicyDefaults.Intervals[0] (5 min) + var config = new RedeliveryPolicyConfig { UseJitter = false, MaxDelay = TimeSpan.FromHours(2) }; + + // act + var delay = RedeliveryExecutor.CalculateDelay(0, config); + + // assert - 5min * (0 + 1) = 5min + Assert.Equal(TimeSpan.FromMinutes(5), delay); + } + + [Fact] + public void CalculateDelay_Should_ScaleDefaultBaseDelay_When_AttemptIncreases() + { + // arrange + var config = new RedeliveryPolicyConfig { UseJitter = false, MaxDelay = TimeSpan.FromHours(2) }; + + // act & assert - 5min * (attempt + 1) + Assert.Equal(TimeSpan.FromMinutes(5), RedeliveryExecutor.CalculateDelay(0, config)); + Assert.Equal(TimeSpan.FromMinutes(10), RedeliveryExecutor.CalculateDelay(1, config)); + Assert.Equal(TimeSpan.FromMinutes(15), RedeliveryExecutor.CalculateDelay(2, config)); + } + + [Fact] + public void CalculateDelay_Should_CapAtMaxDelay_When_ComputedDelayExceedsMax() + { + // arrange + var config = new RedeliveryPolicyConfig + { + BaseDelay = TimeSpan.FromMinutes(30), + MaxDelay = TimeSpan.FromMinutes(45), + UseJitter = false + }; + + // act - 30min * (1 + 1) = 60min, capped at 45min + var delay = RedeliveryExecutor.CalculateDelay(1, config); + + // assert + Assert.Equal(TimeSpan.FromMinutes(45), delay); + } + + [Fact] + public void CalculateDelay_Should_UseDefaultMaxDelay_When_NoMaxDelayConfigured() + { + // arrange - no MaxDelay => uses RedeliveryPolicyDefaults.MaxDelay (1 hour) + var config = new RedeliveryPolicyConfig { BaseDelay = TimeSpan.FromMinutes(30), UseJitter = false }; + + // act - 30min * (5 + 1) = 180min, capped at 60min (default) + var delay = RedeliveryExecutor.CalculateDelay(5, config); + + // assert + Assert.Equal(TimeSpan.FromHours(1), delay); + } + + [Fact] + public void CalculateDelay_Should_ReturnExactDelay_When_JitterDisabled() + { + // arrange + var config = new RedeliveryPolicyConfig + { + BaseDelay = TimeSpan.FromMinutes(10), + UseJitter = false, + MaxDelay = TimeSpan.FromHours(2) + }; + + // act - run multiple times to confirm determinism + var delay1 = RedeliveryExecutor.CalculateDelay(0, config); + var delay2 = RedeliveryExecutor.CalculateDelay(0, config); + var delay3 = RedeliveryExecutor.CalculateDelay(0, config); + + // assert + Assert.Equal(TimeSpan.FromMinutes(10), delay1); + Assert.Equal(delay1, delay2); + Assert.Equal(delay2, delay3); + } + + [Fact] + public void CalculateDelay_Should_ReturnDelayWithinJitterBounds_When_JitterEnabled() + { + // arrange + var config = new RedeliveryPolicyConfig + { + BaseDelay = TimeSpan.FromMinutes(10), + UseJitter = true, + MaxDelay = TimeSpan.FromHours(2) + }; + + var baseExpected = TimeSpan.FromMinutes(10); // 10min * (0 + 1) + var lowerBound = baseExpected.TotalMilliseconds * 0.75; + var upperBound = baseExpected.TotalMilliseconds * 1.25; + + // act - run multiple times to increase confidence + for (var i = 0; i < 100; i++) + { + var delay = RedeliveryExecutor.CalculateDelay(0, config); + + // assert + Assert.InRange(delay.TotalMilliseconds, lowerBound, upperBound); + } + } + + [Fact] + public void CalculateDelay_Should_ApplyJitterByDefault_When_UseJitterIsNull() + { + // arrange - UseJitter not set => defaults to RedeliveryPolicyDefaults.UseJitter (true) + var config = new RedeliveryPolicyConfig + { + BaseDelay = TimeSpan.FromMinutes(10), + MaxDelay = TimeSpan.FromHours(2) + }; + + var baseExpected = TimeSpan.FromMinutes(10); + var lowerBound = baseExpected.TotalMilliseconds * 0.75; + var upperBound = baseExpected.TotalMilliseconds * 1.25; + var hasVariation = false; + TimeSpan? firstDelay = null; + + // act - run multiple times; at least one should differ (proving jitter is active) + for (var i = 0; i < 100; i++) + { + var delay = RedeliveryExecutor.CalculateDelay(0, config); + + Assert.InRange(delay.TotalMilliseconds, lowerBound, upperBound); + + firstDelay ??= delay; + + if (delay != firstDelay) + { + hasVariation = true; + } + } + + // assert - with jitter, we expect variation across 100 runs + Assert.True(hasVariation, "Expected jitter to produce varying delays, but all 100 values were identical."); + } + + [Fact] + public void CalculateDelay_Should_CapExplicitIntervalAtMaxDelay_When_IntervalExceedsMax() + { + // arrange + var config = new RedeliveryPolicyConfig + { + Intervals = ImmutableArray.Create(TimeSpan.FromMinutes(5), TimeSpan.FromHours(2)), + MaxDelay = TimeSpan.FromHours(1), + UseJitter = false + }; + + // act + var delay = RedeliveryExecutor.CalculateDelay(1, config); + + // assert + Assert.Equal(TimeSpan.FromHours(1), delay); + } + + [Fact] + public void ParseDelayedRetryCount_Should_ReturnIntValue_When_HeaderValueIsInt() + { + // act + var result = RedeliveryExecutor.ParseDelayedRetryCount(42); + + // assert + Assert.Equal(42, result); + } + + [Fact] + public void ParseDelayedRetryCount_Should_ReturnConvertedValue_When_HeaderValueIsLong() + { + // act + var result = RedeliveryExecutor.ParseDelayedRetryCount(7L); + + // assert + Assert.Equal(7, result); + } + + [Fact] + public void ParseDelayedRetryCount_Should_ReturnConvertedValue_When_HeaderValueIsDouble() + { + // act + var result = RedeliveryExecutor.ParseDelayedRetryCount(3.0); + + // assert + Assert.Equal(3, result); + } + + [Fact] + public void ParseDelayedRetryCount_Should_ReturnZero_When_HeaderValueIsString() + { + // act + var result = RedeliveryExecutor.ParseDelayedRetryCount("not a number"); + + // assert + Assert.Equal(0, result); + } + + [Fact] + public void ParseDelayedRetryCount_Should_ReturnZero_When_HeaderValueIsNull() + { + // act + var result = RedeliveryExecutor.ParseDelayedRetryCount(null); + + // assert + Assert.Equal(0, result); + } + + [Fact] + public void Evaluate_Should_ReturnRethrow_When_NoRuleMatchesException() + { + // arrange - rules for ArgumentException, but we throw InvalidOperationException + var rules = BuildRules(p => p.On().Redeliver()); + + // act + var decision = RedeliveryExecutor.Evaluate(rules, new InvalidOperationException("no match"), 0); + + // assert + Assert.Equal(RedeliveryAction.Rethrow, decision.Action); + } + + [Fact] + public void Evaluate_Should_ReturnDiscard_When_TerminalIsDiscard() + { + // arrange + var rules = BuildRules(p => p.On().Discard()); + + // act + var decision = RedeliveryExecutor.Evaluate(rules, new InvalidOperationException("discard me"), 0); + + // assert + Assert.Equal(RedeliveryAction.Discard, decision.Action); + } + + [Fact] + public void Evaluate_Should_ReturnRethrow_When_TerminalIsDeadLetter() + { + // arrange + var rules = BuildRules(p => p.On().DeadLetter()); + + // act + var decision = RedeliveryExecutor.Evaluate(rules, new InvalidOperationException("dead letter"), 0); + + // assert + Assert.Equal(RedeliveryAction.Rethrow, decision.Action); + } + + [Fact] + public void Evaluate_Should_ReturnRethrow_When_RedeliveryIsDisabled() + { + // arrange - Retry() sets Redelivery.Enabled = false + var rules = BuildRules(p => p.On().Retry()); + + // act + var decision = RedeliveryExecutor.Evaluate(rules, new InvalidOperationException("no redelivery"), 0); + + // assert + Assert.Equal(RedeliveryAction.Rethrow, decision.Action); + } + + [Fact] + public void Evaluate_Should_ReturnRethrow_When_RedeliveryIsNull() + { + // arrange - manually build a rule with no redelivery config + var rules = ImmutableArray.Create( + new ExceptionPolicyRule + { + ExceptionType = typeof(Exception), + Predicate = null, + Redelivery = null + }); + + // act + var decision = RedeliveryExecutor.Evaluate(rules, new InvalidOperationException("no config"), 0); + + // assert + Assert.Equal(RedeliveryAction.Rethrow, decision.Action); + } + + [Fact] + public void Evaluate_Should_ReturnRedeliver_When_RedeliveryConfigured() + { + // arrange - Redeliver() uses defaults (3 intervals, jitter enabled) + var rules = BuildRules(p => p.On().Redeliver()); + + // act + var decision = RedeliveryExecutor.Evaluate(rules, new InvalidOperationException("redeliver me"), 0); + + // assert + Assert.Equal(RedeliveryAction.Redeliver, decision.Action); + Assert.True(decision.Delay > TimeSpan.Zero); + } + + [Fact] + public void Evaluate_Should_ReturnRethrow_When_AllRedeliveryAttemptsExhausted() + { + // arrange - 2 attempts configured, delayedRetryCount already at 2 + var rules = BuildRules(p => p.On().Redeliver(2, TimeSpan.FromMinutes(5))); + + // act + var decision = RedeliveryExecutor.Evaluate(rules, new InvalidOperationException("exhausted"), 2); + + // assert + Assert.Equal(RedeliveryAction.Rethrow, decision.Action); + } + + [Fact] + public void Evaluate_Should_ReturnRedeliver_When_AttemptsRemain() + { + // arrange - 3 attempts configured, delayedRetryCount at 1 + var rules = BuildRules(p => p.On().Redeliver(3, TimeSpan.FromMinutes(5))); + + // act + var decision = RedeliveryExecutor.Evaluate(rules, new InvalidOperationException("retry"), 1); + + // assert + Assert.Equal(RedeliveryAction.Redeliver, decision.Action); + Assert.True(decision.Delay > TimeSpan.Zero); + } + + [Fact] + public void Evaluate_Should_ReturnDiscard_When_MostSpecificRuleIsDiscard() + { + // arrange - base rule redelivers, but more specific rule discards + var rules = BuildRules(p => + { + p.On().Redeliver(); + p.On().Discard(); + }); + + // act + var decision = RedeliveryExecutor.Evaluate(rules, new InvalidOperationException("specific discard"), 0); + + // assert + Assert.Equal(RedeliveryAction.Discard, decision.Action); + } + + [Fact] + public void Evaluate_Should_ReturnRethrow_When_RulesListIsEmpty() + { + // arrange + var rules = BuildRules(_ => { }); + + // act + var decision = RedeliveryExecutor.Evaluate(rules, new InvalidOperationException("no rules"), 0); + + // assert + Assert.Equal(RedeliveryAction.Rethrow, decision.Action); + } + + [Fact] + public void Evaluate_Should_ReturnRethrow_When_PredicateDoesNotMatch() + { + // arrange + var rules = BuildRules(p => + p.On(static ex => ex.Message.Contains("transient")).Discard() + ); + + // act + var decision = RedeliveryExecutor.Evaluate(rules, new InvalidOperationException("permanent failure"), 0); + + // assert + Assert.Equal(RedeliveryAction.Rethrow, decision.Action); + } + + [Fact] + public void Evaluate_Should_ReturnDiscard_When_PredicateMatches() + { + // arrange + var rules = BuildRules(p => + p.On(static ex => ex.Message.Contains("transient")).Discard() + ); + + // act + var decision = RedeliveryExecutor.Evaluate(rules, new InvalidOperationException("transient failure"), 0); + + // assert + Assert.Equal(RedeliveryAction.Discard, decision.Action); + } + + [Fact] + public void Evaluate_Should_UseIntervalLengthAsMaxAttempts_When_AttemptsNotSet() + { + // arrange - 3 explicit intervals, no Attempts property + var rules = BuildRules(p => + p.On() + .Redeliver([TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30)]) + ); + + // act - attempt 2 (0-based), which is the 3rd interval → should still redeliver + var decision2 = RedeliveryExecutor.Evaluate(rules, new InvalidOperationException("retry"), 2); + + // act - attempt 3 (0-based), all 3 intervals exhausted → should rethrow + var decision3 = RedeliveryExecutor.Evaluate(rules, new InvalidOperationException("retry"), 3); + + // assert + Assert.Equal(RedeliveryAction.Redeliver, decision2.Action); + Assert.Equal(RedeliveryAction.Rethrow, decision3.Action); + } + + [Fact] + public void Evaluate_Should_ReturnRethrow_When_MaxAttemptsIsZero() + { + // arrange - rule with Redelivery having no Attempts and no Intervals → maxAttempts = 0 + var rules = ImmutableArray.Create( + new ExceptionPolicyRule + { + ExceptionType = typeof(Exception), + Predicate = null, + Redelivery = new RedeliveryPolicyConfig { Enabled = true, UseJitter = false } + }); + + // act + var decision = RedeliveryExecutor.Evaluate(rules, new InvalidOperationException("no attempts"), 0); + + // assert + Assert.Equal(RedeliveryAction.Rethrow, decision.Action); + } + + [Fact] + public void Evaluate_Should_ReturnRedeliver_When_DefaultRuleFallsThrough() + { + // arrange - Default() matches all exceptions, including derived types + var rules = BuildRules(p => p.Default().Redeliver()); + + // act - throw a derived exception + var decision = RedeliveryExecutor.Evaluate(rules, new InvalidOperationException("derived"), 0); + + // assert + Assert.Equal(RedeliveryAction.Redeliver, decision.Action); + Assert.True(decision.Delay > TimeSpan.Zero); + } + + private static ImmutableArray BuildRules(Action configure) + { + var feature = new ExceptionPolicyFeature(); + feature.Configure(configure); + return [.. feature.Rules]; + } +} diff --git a/src/Mocha/test/Mocha.Transport.InMemory.Tests/Behaviors/RedeliveryTests.cs b/src/Mocha/test/Mocha.Transport.InMemory.Tests/Behaviors/RedeliveryTests.cs new file mode 100644 index 00000000000..482d9f6bd4b --- /dev/null +++ b/src/Mocha/test/Mocha.Transport.InMemory.Tests/Behaviors/RedeliveryTests.cs @@ -0,0 +1,273 @@ +using System.Collections.Concurrent; +using Microsoft.Extensions.DependencyInjection; +using Mocha.Transport.InMemory.Tests.Helpers; + +namespace Mocha.Transport.InMemory.Tests.Behaviors; + +public sealed class RedeliveryTests +{ + private static readonly TimeSpan s_timeout = TimeSpan.FromSeconds(10); + + [Fact] + public async Task Redelivery_Should_ScheduleRedelivery_When_HandlerFails() + { + // arrange + var counter = new InvocationCounter(); + var recorder = new MessageRecorder(); + + await using var provider = await new ServiceCollection() + .AddSingleton(counter) + .AddSingleton(recorder) + .AddMessageBus() + .AddResilience(p => + { + p.On().Redeliver( + [ + TimeSpan.FromMilliseconds(1), + TimeSpan.FromMilliseconds(1), + TimeSpan.FromMilliseconds(1) + ]); + }) + .AddEventHandler() + .AddInMemory() + .BuildServiceProvider(); + + using var scope = provider.CreateScope(); + var bus = scope.ServiceProvider.GetRequiredService(); + + // act + await bus.PublishAsync(new OrderCreated { OrderId = "ORD-REDELIVER" }, CancellationToken.None); + + // assert - handler fails on first delivery, succeeds on redelivery + Assert.True( + await recorder.WaitAsync(s_timeout), + "Handler did not record the message after redelivery"); + + Assert.Equal(2, counter.Count); + } + + [Fact] + public async Task Redelivery_Should_SkipRedelivery_When_ExceptionIsIgnored() + { + // arrange + var counter = new InvocationCounter(); + + await using var provider = await new ServiceCollection() + .AddSingleton(counter) + .AddMessageBus() + .AddResilience(p => + { + p.On().Redeliver([TimeSpan.FromMilliseconds(1)]); + p.On().DeadLetter(); + }) + .AddEventHandler() + .AddInMemory() + .BuildServiceProvider(); + + using var scope = provider.CreateScope(); + var bus = scope.ServiceProvider.GetRequiredService(); + + // act + await bus.PublishAsync(new OrderCreated { OrderId = "ORD-IGNORED" }, CancellationToken.None); + + // assert - only 1 invocation, exception propagates without redelivery + await Task.Delay(500); + Assert.Equal(1, counter.Count); + } + + [Fact] + public async Task Redelivery_Should_PassThrough_When_Disabled() + { + // arrange — no exception policy configured, so retry and redelivery are both no-ops + var counter = new InvocationCounter(); + + await using var provider = await new ServiceCollection() + .AddSingleton(counter) + .AddMessageBus() + .AddEventHandler() + .AddInMemory() + .BuildServiceProvider(); + + using var scope = provider.CreateScope(); + var bus = scope.ServiceProvider.GetRequiredService(); + + // act + await bus.PublishAsync(new OrderCreated { OrderId = "ORD-DISABLED" }, CancellationToken.None); + + // assert - no exception policy: only 1 invocation, no retry or redelivery + await Task.Delay(500); + Assert.Equal(1, counter.Count); + } + + [Fact] + public async Task Redelivery_Should_PropagateToFault_When_AllAttemptsExhausted() + { + // arrange - 2 redelivery intervals = 2 redeliveries max, 3 total attempts + var counter = new InvocationCounter(); + + await using var provider = await new ServiceCollection() + .AddSingleton(counter) + .AddMessageBus() + .AddResilience(p => + { + p.On().Redeliver( + [ + TimeSpan.FromMilliseconds(1), + TimeSpan.FromMilliseconds(1) + ]); + }) + .AddEventHandler() + .AddInMemory() + .BuildServiceProvider(); + + using var scope = provider.CreateScope(); + var bus = scope.ServiceProvider.GetRequiredService(); + + // act + await bus.PublishAsync(new OrderCreated { OrderId = "ORD-EXHAUST" }, CancellationToken.None); + + // assert - 1 original + 2 redeliveries = 3 total invocations + Assert.True( + await counter.WaitForCountAsync(3, s_timeout), + $"Expected 3 invocations (1 original + 2 redeliveries), but got {counter.Count}"); + } + + [Fact] + public async Task Redelivery_Should_UseEndpointOverride_When_EndpointConfigured() + { + // arrange - bus-level: redeliver once, but the transport overrides to discard + var counter = new InvocationCounter(); + var builder = new ServiceCollection() + .AddSingleton(counter) + .AddScoped() + .AddMessageBus() + .AddResilience(p => + p.On().Redeliver([TimeSpan.FromMilliseconds(1)])); + + // Override at transport level to discard all exceptions. + builder.ConfigureMessageBus(b => b.AddHandler()); + + await using var provider = await builder + .AddInMemory(t => t.AddResilience(p => + p.On().DeadLetter())) + .BuildServiceProvider(); + + using var scope = provider.CreateScope(); + var bus = scope.ServiceProvider.GetRequiredService(); + + // act + await bus.PublishAsync(new OrderCreated { OrderId = "ORD-OVERRIDE" }, CancellationToken.None); + + // assert - redelivery disabled at transport level via DeadLetter: only 1 invocation + await Task.Delay(500); + Assert.Equal(1, counter.Count); + } + + [Fact] + public async Task Redelivery_Should_UseDefaults_When_ParameterlessAddResilience() + { + // arrange - defaults: 3 redelivery intervals from RedeliveryPolicyDefaults + var counter = new InvocationCounter(); + + await using var provider = await new ServiceCollection() + .AddSingleton(counter) + .AddMessageBus() + .AddResilience() + .AddEventHandler() + .AddInMemory() + .BuildServiceProvider(); + + using var scope = provider.CreateScope(); + var bus = scope.ServiceProvider.GetRequiredService(); + + // act + await bus.PublishAsync(new OrderCreated { OrderId = "ORD-DEFAULT" }, CancellationToken.None); + + // assert - default is 3 retries + 3 redeliveries: 1 original + 3 retries = 4 on first delivery, + // then 3 redeliveries each with 1 original + 3 retries = 4, total = 4 + 3*4 = 16 + // Wait for at least the first 4 (initial delivery with retries) + Assert.True( + await counter.WaitForCountAsync(4, s_timeout), + $"Expected at least 4 invocations (1 original + 3 default retries), but got {counter.Count}"); + } + + // ============================================================ + // Test Helpers + // ============================================================ + + private sealed class InvocationCounter + { + private int _count; + private readonly SemaphoreSlim _semaphore = new(0); + + public int Count => _count; + + public void Increment() + { + Interlocked.Increment(ref _count); + _semaphore.Release(); + } + + public async Task WaitForCountAsync(int targetCount, TimeSpan timeout) + { + for (var i = 0; i < targetCount; i++) + { + if (!await _semaphore.WaitAsync(timeout)) + { + return false; + } + } + + return true; + } + } + + // ============================================================ + // Test Handlers + // ============================================================ + + /// + /// Throws on the first invocation, succeeds on subsequent invocations. + /// + private sealed class ThrowThenSucceedHandler(InvocationCounter counter, MessageRecorder recorder) + : IEventHandler + { + public ValueTask HandleAsync(OrderCreated message, CancellationToken cancellationToken) + { + var invocation = counter.Count; + counter.Increment(); + + if (invocation == 0) + { + throw new InvalidOperationException("Transient failure"); + } + + recorder.Record(message); + return default; + } + } + + /// + /// Always throws an InvalidOperationException. + /// + private sealed class AlwaysThrowingHandler(InvocationCounter counter) : IEventHandler + { + public ValueTask HandleAsync(OrderCreated message, CancellationToken cancellationToken) + { + counter.Increment(); + throw new InvalidOperationException("Always fails"); + } + } + + /// + /// Always throws an InvalidOperationException (for the Ignore test). + /// + private sealed class ThrowInvalidOperationHandler(InvocationCounter counter) : IEventHandler + { + public ValueTask HandleAsync(OrderCreated message, CancellationToken cancellationToken) + { + counter.Increment(); + throw new InvalidOperationException("Should be ignored"); + } + } +} diff --git a/src/Mocha/test/Mocha.Transport.InMemory.Tests/Behaviors/RetryTests.cs b/src/Mocha/test/Mocha.Transport.InMemory.Tests/Behaviors/RetryTests.cs new file mode 100644 index 00000000000..369e6e71ceb --- /dev/null +++ b/src/Mocha/test/Mocha.Transport.InMemory.Tests/Behaviors/RetryTests.cs @@ -0,0 +1,423 @@ +using System.Collections.Concurrent; +using Microsoft.Extensions.DependencyInjection; +using Mocha.Transport.InMemory.Tests.Helpers; + +namespace Mocha.Transport.InMemory.Tests.Behaviors; + +public sealed class RetryTests +{ + private static readonly TimeSpan s_timeout = TimeSpan.FromSeconds(10); + + [Fact] + public async Task Retry_Should_RetryHandler_When_HandlerThrowsTransientException() + { + // arrange + var counter = new RetryInvocationCounter(); + var recorder = new MessageRecorder(); + await using var provider = await new ServiceCollection() + .AddSingleton(counter) + .AddSingleton(recorder) + .AddMessageBus() + .AddResilience(p => + { + p.On() + .Retry(3, TimeSpan.FromMilliseconds(1), RetryBackoffType.Constant); + }) + .AddEventHandler() + .AddInMemory() + .BuildServiceProvider(); + + using var scope = provider.CreateScope(); + var bus = scope.ServiceProvider.GetRequiredService(); + + // act + await bus.PublishAsync(new OrderCreated { OrderId = "ORD-1" }, CancellationToken.None); + + // assert - handler succeeds on 2nd attempt, so the message is recorded + Assert.True( + await recorder.WaitAsync(s_timeout), + "Handler did not record the message after retry"); + + Assert.Equal(2, counter.Count); + } + + [Fact] + public async Task Retry_Should_PropagateToFault_When_AllRetriesExhausted() + { + // arrange + var counter = new RetryInvocationCounter(); + await using var provider = await new ServiceCollection() + .AddSingleton(counter) + .AddMessageBus() + .AddResilience(p => + { + p.On() + .Retry(3, TimeSpan.FromMilliseconds(1), RetryBackoffType.Constant); + }) + .AddEventHandler() + .AddInMemory() + .BuildServiceProvider(); + + using var scope = provider.CreateScope(); + var bus = scope.ServiceProvider.GetRequiredService(); + + // act + await bus.PublishAsync(new OrderCreated { OrderId = "ORD-FAIL" }, CancellationToken.None); + + // assert - 1 original + 3 retries = 4 total invocations + Assert.True( + await counter.WaitForCountAsync(4, s_timeout), + $"Expected 4 invocations (1 original + 3 retries), but got {counter.Count}"); + } + + [Fact] + public async Task Retry_Should_SkipRetry_When_ExceptionIsIgnored() + { + // arrange + var counter = new RetryInvocationCounter(); + await using var provider = await new ServiceCollection() + .AddSingleton(counter) + .AddMessageBus() + .AddResilience(p => + { + p.On() + .Retry(3, TimeSpan.FromMilliseconds(1), RetryBackoffType.Constant); + p.On().DeadLetter(); + }) + .AddEventHandler() + .AddInMemory() + .BuildServiceProvider(); + + using var scope = provider.CreateScope(); + var bus = scope.ServiceProvider.GetRequiredService(); + + // act + await bus.PublishAsync(new OrderCreated { OrderId = "ORD-IGNORED" }, CancellationToken.None); + + // assert - only 1 invocation, exception propagates without retry + await Task.Delay(500); + Assert.Equal(1, counter.Count); + } + + [Fact] + public async Task Retry_Should_SkipRetry_When_PredicateMatchesIgnoredException() + { + // arrange + var matchingCounter = new RetryInvocationCounter(); + var nonMatchingCounter = new RetryInvocationCounter(); + + // Test 1: matching predicate (ParamName == "test") - should NOT retry + await using var matchingProvider = await new ServiceCollection() + .AddSingleton(matchingCounter) + .AddMessageBus() + .AddResilience(p => + { + p.On() + .Retry(3, TimeSpan.FromMilliseconds(1), RetryBackoffType.Constant); + p.On(ex => ex.ParamName == "test").DeadLetter(); + }) + .AddEventHandler() + .AddInMemory() + .BuildServiceProvider(); + + using var matchingScope = matchingProvider.CreateScope(); + var matchingBus = matchingScope.ServiceProvider.GetRequiredService(); + + await matchingBus.PublishAsync(new OrderCreated { OrderId = "ORD-MATCH" }, CancellationToken.None); + + // assert - matching predicate: no retry, only 1 invocation + await Task.Delay(500); + Assert.Equal(1, matchingCounter.Count); + + // Test 2: non-matching predicate (ParamName == "other") - SHOULD retry + await using var nonMatchingProvider = await new ServiceCollection() + .AddSingleton(nonMatchingCounter) + .AddMessageBus() + .AddResilience(p => + { + p.On() + .Retry(3, TimeSpan.FromMilliseconds(1), RetryBackoffType.Constant); + p.On(ex => ex.ParamName == "other").DeadLetter(); + }) + .AddEventHandler() + .AddInMemory() + .BuildServiceProvider(); + + using var nonMatchingScope = nonMatchingProvider.CreateScope(); + var nonMatchingBus = nonMatchingScope.ServiceProvider.GetRequiredService(); + + await nonMatchingBus.PublishAsync(new OrderCreated { OrderId = "ORD-NOMATCH" }, CancellationToken.None); + + // assert - non-matching predicate: should retry, 4 total invocations + Assert.True( + await nonMatchingCounter.WaitForCountAsync(4, s_timeout), + $"Expected 4 invocations for non-matching predicate, but got {nonMatchingCounter.Count}"); + } + + [Fact] + public async Task Retry_Should_ExposeRetryState_When_HandlerAccessesFeatures() + { + // arrange + var stateCapture = new RetryStateCapture(); + var builder = new ServiceCollection() + .AddSingleton(stateCapture) + .AddScoped() + .AddMessageBus() + .AddResilience(p => + { + p.On() + .Retry(2, TimeSpan.FromMilliseconds(1), RetryBackoffType.Constant); + }); + + builder.ConfigureMessageBus(b => b.AddHandler()); + + await using var provider = await builder.AddInMemory().BuildServiceProvider(); + + using var scope = provider.CreateScope(); + var bus = scope.ServiceProvider.GetRequiredService(); + + // act + await bus.PublishAsync(new OrderCreated { OrderId = "ORD-STATE" }, CancellationToken.None); + + // assert - 3 invocations (1 original + 2 retries), all fail + Assert.True( + await stateCapture.WaitForCountAsync(3, s_timeout), + $"Expected 3 invocations, but got {stateCapture.CapturedStates.Count}"); + + var states = stateCapture.CapturedStates.OrderBy(s => s).ToList(); + Assert.Equal(0, states[0]); // first attempt + Assert.Equal(1, states[1]); // first retry + Assert.Equal(2, states[2]); // second retry + } + + [Fact] + public async Task Retry_Should_UseExplicitIntervals_When_IntervalsConfigured() + { + // arrange + var counter = new RetryInvocationCounter(); + await using var provider = await new ServiceCollection() + .AddSingleton(counter) + .AddMessageBus() + .AddResilience(p => + { + p.On().Retry( + [ + TimeSpan.FromMilliseconds(10), + TimeSpan.FromMilliseconds(20), + TimeSpan.FromMilliseconds(30) + ]); + }) + .AddEventHandler() + .AddInMemory() + .BuildServiceProvider(); + + using var scope = provider.CreateScope(); + var bus = scope.ServiceProvider.GetRequiredService(); + + // act + await bus.PublishAsync(new OrderCreated { OrderId = "ORD-INTERVALS" }, CancellationToken.None); + + // assert - Intervals.Length = 3 retries, so 4 total invocations + Assert.True( + await counter.WaitForCountAsync(4, s_timeout), + $"Expected 4 invocations (1 original + 3 interval-based retries), but got {counter.Count}"); + } + + [Fact] + public async Task Retry_Should_RespectInheritance_When_BaseExceptionIgnored() + { + // arrange - ignore ArgumentException, handler throws ArgumentNullException (subclass) + var counter = new RetryInvocationCounter(); + await using var provider = await new ServiceCollection() + .AddSingleton(counter) + .AddMessageBus() + .AddResilience(p => + { + p.On() + .Retry(3, TimeSpan.FromMilliseconds(1), RetryBackoffType.Constant); + p.On().DeadLetter(); + }) + .AddEventHandler() + .AddInMemory() + .BuildServiceProvider(); + + using var scope = provider.CreateScope(); + var bus = scope.ServiceProvider.GetRequiredService(); + + // act + await bus.PublishAsync(new OrderCreated { OrderId = "ORD-INHERIT" }, CancellationToken.None); + + // assert - ArgumentNullException is a subclass of ArgumentException, so it's ignored: only 1 invocation + await Task.Delay(500); + Assert.Equal(1, counter.Count); + } + + [Fact] + public async Task Retry_Should_UseDefaults_When_ParameterlessAddResilience() + { + // arrange - default: 3 retries (from RetryPolicyDefaults.Attempts) + var counter = new RetryInvocationCounter(); + await using var provider = await new ServiceCollection() + .AddSingleton(counter) + .AddMessageBus() + .AddResilience() + .AddEventHandler() + .AddInMemory() + .BuildServiceProvider(); + + using var scope = provider.CreateScope(); + var bus = scope.ServiceProvider.GetRequiredService(); + + // act + await bus.PublishAsync(new OrderCreated { OrderId = "ORD-DEFAULT" }, CancellationToken.None); + + // assert - default is 3 retries: 1 original + 3 retries = 4 total + Assert.True( + await counter.WaitForCountAsync(4, s_timeout), + $"Expected 4 invocations (1 original + 3 default retries), but got {counter.Count}"); + } + + // ============================================================ + // Test Helpers + // ============================================================ + + private sealed class RetryInvocationCounter + { + private int _count; + private readonly SemaphoreSlim _semaphore = new(0); + + public int Count => _count; + + public void Increment() + { + Interlocked.Increment(ref _count); + _semaphore.Release(); + } + + public async Task WaitForCountAsync(int targetCount, TimeSpan timeout) + { + for (var i = 0; i < targetCount; i++) + { + if (!await _semaphore.WaitAsync(timeout)) + { + return false; + } + } + + return true; + } + } + + private sealed class RetryStateCapture + { + private readonly SemaphoreSlim _semaphore = new(0); + + public ConcurrentBag CapturedStates { get; } = []; + + public void Record(int immediateRetryCount) + { + CapturedStates.Add(immediateRetryCount); + _semaphore.Release(); + } + + public async Task WaitForCountAsync(int targetCount, TimeSpan timeout) + { + for (var i = 0; i < targetCount; i++) + { + if (!await _semaphore.WaitAsync(timeout)) + { + return false; + } + } + + return true; + } + } + + // ============================================================ + // Test Handlers + // ============================================================ + + /// + /// Throws on the first invocation, succeeds on subsequent invocations. + /// + private sealed class ThrowOnceHandler(RetryInvocationCounter counter, MessageRecorder recorder) + : IEventHandler + { + public ValueTask HandleAsync(OrderCreated message, CancellationToken cancellationToken) + { + var invocation = counter.Count; + counter.Increment(); + + if (invocation == 0) + { + throw new InvalidOperationException("Transient failure"); + } + + recorder.Record(message); + return default; + } + } + + /// + /// Always throws an InvalidOperationException. + /// + private sealed class AlwaysThrowingHandler(RetryInvocationCounter counter) : IEventHandler + { + public ValueTask HandleAsync(OrderCreated message, CancellationToken cancellationToken) + { + counter.Increment(); + throw new InvalidOperationException("Always fails"); + } + } + + /// + /// Always throws an InvalidOperationException (for the Ignore test). + /// + private sealed class ThrowInvalidOperationHandler(RetryInvocationCounter counter) : IEventHandler + { + public ValueTask HandleAsync(OrderCreated message, CancellationToken cancellationToken) + { + counter.Increment(); + throw new InvalidOperationException("Should be ignored"); + } + } + + /// + /// Always throws an ArgumentException with ParamName = "test". + /// + private sealed class ThrowMatchingArgumentHandler(RetryInvocationCounter counter) : IEventHandler + { + public ValueTask HandleAsync(OrderCreated message, CancellationToken cancellationToken) + { + counter.Increment(); + throw new ArgumentException("Argument error", "test"); + } + } + + /// + /// Always throws an ArgumentNullException (subclass of ArgumentException). + /// + private sealed class ThrowArgumentNullHandler(RetryInvocationCounter counter) : IEventHandler + { + public ValueTask HandleAsync(OrderCreated message, CancellationToken cancellationToken) + { + counter.Increment(); + throw new ArgumentNullException("param", "Null argument"); + } + } + + /// + /// Consumer that captures RetryState from the context features on each invocation, + /// then always throws to force retries. + /// + private sealed class RetryStateCapturingConsumer(RetryStateCapture capture) : IConsumer + { + public ValueTask ConsumeAsync(IConsumeContext context) + { + var retryState = context.Features.Get(); + capture.Record(retryState?.ImmediateRetryCount ?? -1); + throw new InvalidOperationException("Fail to trigger retry"); + } + } +} diff --git a/website/src/docs/mocha/v1/exception-policies.md b/website/src/docs/mocha/v1/exception-policies.md new file mode 100644 index 00000000000..c5ce18f2fa6 --- /dev/null +++ b/website/src/docs/mocha/v1/exception-policies.md @@ -0,0 +1,433 @@ +--- +title: "Exception Policies" +description: "Configure per-exception handling with composable retry, redelivery, and terminal actions." +--- + +Not every exception deserves the same treatment. A database deadlock might resolve on immediate retry. A downstream service outage needs minutes to recover. A validation error will never succeed no matter how many times you retry. Exception policies let you define per-exception handling strategies - retry, redeliver, dead-letter, or discard - as composable escalation chains in a single `AddResilience` call. + +```csharp +builder.Services + .AddMessageBus() + .AddResilience(policy => + { + // Validation errors are permanent - route straight to the error endpoint + policy.On().DeadLetter(); + + // Duplicate messages are safe to drop + policy.On().Discard(); + + // Database deadlocks resolve quickly - retry then redeliver + policy.On(ex => ex.IsTransient) + .Retry(5, TimeSpan.FromMilliseconds(200)) + .ThenRedeliver(); + + // Everything else: retry 3 times, then redeliver on a schedule, then dead-letter + policy.Default() + .Retry() + .ThenRedeliver() + .ThenDeadLetter(); + }) + .AddEventHandler() + .AddRabbitMQ(); +``` + +# How exception handling works + +When a handler throws an exception, Mocha evaluates exception policies to determine what happens next. The decision flows through two pipeline stages - retry in the consumer pipeline and redelivery in the receive pipeline - before reaching the fault middleware as a last resort. + +```mermaid +flowchart TD + A[Handler throws exception] --> B{Exception policy\nmatches?} + B -->|Match found| D{Terminal action?} + B -->|No match| M[Route to error endpoint] + + D -->|Discard| E[Swallow exception\nMessage disappears] + D -->|DeadLetter| F[Skip retry and redelivery\nRoute to error endpoint] + D -->|No terminal| G{Retry configured?} + + G -->|Yes| H[Retry with configured settings] + G -->|Disabled| K{Redelivery configured?} + + H -->|Success| I[Done] + H -->|All retries exhausted| K + + K -->|Yes| L[Schedule redelivery\nMessage re-enters pipeline later] + K -->|Disabled| M + + L -->|Success on redelivery| I + L -->|All redeliveries exhausted| M +``` + +Each exception policy rule targets a specific exception type and defines an escalation chain. The chain controls which stages the message passes through and with what settings. + +Exception matching respects inheritance. A policy on `NpgsqlException` also matches any subclass. When multiple rules could match, the most specific type wins - the same precedence as C# `catch` blocks. + +# Configure exception policies + +`AddResilience` is the single entry point for all exception handling configuration. There is no separate `AddRetry` or `AddRedelivery` call - retry and redelivery settings are configured per-exception within the policy. + +:::note Replacement semantics +Calling `On()` for the same exception type replaces the previous rule for that type - last write wins. If you call `On()` twice without a predicate, the second call overwrites the first. The same applies to `Default()`: calling it again replaces the previous default rule. For example, the parameterless `AddResilience()` registers `Default().Retry().ThenRedeliver()`. If you later call `AddResilience(p => p.Default().Retry(5))`, the new default replaces the one registered by the parameterless overload. +::: + +## Parameterless defaults + +The parameterless overload registers a catch-all `Default()` rule with both retry and redelivery enabled using built-in defaults: + +```csharp +builder.Services + .AddMessageBus() + .AddResilience() + .AddEventHandler() + .AddRabbitMQ(); +``` + +This is equivalent to: + +```csharp +.AddResilience(policy => +{ + policy.Default().Retry().ThenRedeliver(); +}) +``` + +## Default() and On<T>() + +`ExceptionPolicyOptions` exposes two methods for creating rules: + +- **`Default()`** - shorthand for `On()`. Configures the catch-all behavior for any exception that does not match a more specific rule. +- **`On()`** - configures behavior for a specific exception type. +- **`On(predicate)`** - configures behavior for a specific exception type when a predicate matches. + +```csharp +.AddResilience(policy => +{ + policy.On().Retry(5).ThenRedeliver(); + policy.On().Retry(3); + policy.Default().Retry().ThenRedeliver(); +}) +``` + +## Bus-level policies + +Bus-level policies apply to all endpoints and all consumers across the entire message bus. + +```csharp +builder.Services + .AddMessageBus() + .AddResilience(policy => + { + policy.On().DeadLetter(); + policy.On().Discard(); + policy.Default().Retry().ThenRedeliver(); + }) + .AddEventHandler() + .AddRabbitMQ(); +``` + +## Transport-level policies + +Override bus-level policies for a specific transport. Transport-level policies replace the bus-level policies entirely for all endpoints on that transport - they are not merged. + +```csharp +builder.Services + .AddMessageBus() + .AddResilience(policy => + { + policy.Default().Retry().ThenRedeliver(); + }) + .AddRabbitMQ(transport => + { + transport.AddResilience(policy => + { + policy.On().DeadLetter(); + }); + }); +``` + +## Consumer-level policies + +Override policies for a specific consumer. Consumer-level policies replace the bus-level and transport-level policies for that consumer. + +```csharp +builder.Services + .AddMessageBus() + .AddResilience(policy => + { + policy.On() + .Retry(2, TimeSpan.FromMilliseconds(100), RetryBackoffType.Constant); + }); + +builder.Services.ConfigureMessageBus(bus => +{ + bus.AddHandler(consumer => + { + consumer.AddResilience(policy => + { + policy.On() + .Retry(5, TimeSpan.FromMilliseconds(500), RetryBackoffType.Exponential); + }); + }); +}); +``` + +The `PaymentHandler` gets 5 retries with exponential backoff. All other consumers get the bus-level policy of 2 retries. + +## Scope hierarchy + +Exception policies resolve at four levels. The most specific scope wins, and replacement is atomic - the entire set of rules is replaced, not individual rules. + +| Scope | Applies to | Configured on | +| --------- | ------------------------------------- | ---------------------------- | +| Bus | All endpoints and consumers | `IMessageBusBuilder` | +| Host | All message buses on the host | `IMessageBusHostBuilder` | +| Transport | All endpoints on a specific transport | `IReceiveMiddlewareProvider` | +| Consumer | A single consumer | `IConsumerDescriptor` | + +```text +Consumer policies → Transport policies → Bus policies → Host policies + (highest priority) (lowest priority) +``` + +If a consumer defines exception policies, the bus-level and transport-level policies are ignored for that consumer. If you need a bus-level rule to also apply at the consumer level, include it in the consumer-level configuration. + +# Terminal actions + +Terminal actions end the message's lifecycle immediately. No retry, no redelivery - the message is either routed to the error endpoint or discarded. + +## DeadLetter + +`DeadLetter()` routes the message to the error endpoint, skipping both retry and redelivery. Use this for exceptions that are permanent - retrying will never succeed and you want the message preserved for inspection. + +```csharp +policy.On().DeadLetter(); +policy.On().DeadLetter(); +policy.On().DeadLetter(); +``` + +**When to use:** Malformed messages, authorization failures, schema violations, business rule violations that require manual intervention. + +## Discard + +`Discard()` swallows the exception and the message disappears. No error endpoint, no fault headers, no trace beyond logging. + +```csharp +policy.On().Discard(); +policy.On().Discard(); +``` + +**When to use:** Messages that are safe to lose - duplicates you have already processed, stale events that no longer matter. Use with caution: discarded messages leave no audit trail in the error endpoint. + +# Retry policies + +Retry re-runs the handler in-process using immediate retries. The message stays in memory, the concurrency slot is held, and the handler is invoked again after a short delay. Use retry for transient failures that resolve in milliseconds to seconds. + +When you call `.Retry()` without chaining `.ThenRedeliver()`, redelivery is disabled for that exception type. If all retries are exhausted, the message routes to the error endpoint. + +## Retry with defaults + +```csharp +policy.On().Retry(); +``` + +| Setting | Default value | +| --------- | ------------- | +| Attempts | 3 | +| Delay | 200 ms | +| Backoff | Exponential | +| Jitter | Enabled | +| Max delay | 30 seconds | + +## Retry with custom attempts + +```csharp +policy.On(ex => ex.IsTransient).Retry(5); +``` + +Overrides the number of retry attempts. Delay, backoff strategy, jitter, and max delay use the built-in defaults. + +## Retry with full configuration + +```csharp +policy.On() + .Retry(3, TimeSpan.FromMilliseconds(500), RetryBackoffType.Exponential); +``` + +Overrides attempts, base delay, and backoff strategy for this exception type. + +## Retry with explicit intervals + +```csharp +policy.On().Retry( +[ + TimeSpan.FromMilliseconds(100), + TimeSpan.FromMilliseconds(500), + TimeSpan.FromSeconds(2) +]); +``` + +Specifies the exact delay before each retry attempt. The array length determines the number of retries. + +## Backoff strategies + +| Strategy | Behavior | +| ------------- | ----------------------------------- | +| `Constant` | Same delay every attempt | +| `Linear` | `delay * attempt` | +| `Exponential` | `delay * 2^(attempt-1)` _(default)_ | + +All strategies apply jitter by default to prevent thundering herd effects. + +# Redelivery policies + +Redelivery schedules the message for later delivery through the transport. The concurrency slot is released, the message re-enters the full receive pipeline on each redelivery attempt, and fresh retry cycles run on each delivery. Use redelivery for failures that need minutes or hours to resolve - a downstream service recovering from an outage, a rate limit resetting, or a database completing a failover. + +When you call `.Redeliver()` directly (without `.Retry()` first), retry is disabled for that exception type. The handler failure goes straight to redelivery scheduling. + +## Redeliver with defaults + +```csharp +policy.On().Redeliver(); +``` + +| Setting | Default value | +| --------- | --------------------- | +| Intervals | 5 min, 15 min, 30 min | +| Jitter | Enabled | +| Max delay | 1 hour | + +## Redeliver with custom attempts and delay + +```csharp +policy.On().Redeliver(5, TimeSpan.FromMinutes(2)); +``` + +Overrides the number of attempts and base delay. + +## Redeliver with explicit intervals + +```csharp +policy.On().Redeliver( +[ + TimeSpan.FromSeconds(30), + TimeSpan.FromMinutes(5), + TimeSpan.FromMinutes(30) +]); +``` + +Specifies the exact delay before each redelivery attempt. The array length determines the number of redelivery attempts. + +# Escalation chains + +The fluent API composes retry, redelivery, and terminal actions into escalation chains. The interface design enforces valid chains at compile time - you cannot chain `.ThenRedeliver()` after `.Redeliver()`, or `.Retry()` after `.ThenRedeliver()`. + +## Retry then redeliver + +```csharp +policy.On(ex => ex.IsTransient) + .Retry(3) + .ThenRedeliver(); +``` + +Try 3 immediate retries. If all fail, schedule redelivery with defaults (5, 15, 30 minutes). Each redelivery attempt runs a fresh cycle of 3 retries. + +## Retry then redeliver with custom settings + +```csharp +policy.On() + .Retry(5, TimeSpan.FromMilliseconds(500)) + .ThenRedeliver( + [ + TimeSpan.FromMinutes(5), + TimeSpan.FromMinutes(15), + TimeSpan.FromMinutes(30) + ]) + .ThenDeadLetter(); +``` + +Try 5 immediate retries with 500 ms exponential backoff. If exhausted, schedule redeliveries at 5, 15, and 30 minutes. If all redeliveries are exhausted, route to the error endpoint. + +## Retry then dead-letter + +```csharp +policy.On() + .Retry(3) + .ThenDeadLetter(); +``` + +Try 3 immediate retries. If all fail, skip redelivery and route to the error endpoint immediately. + +## Chain behavior reference + +| Chain | Retry | Redelivery | Terminal | +| -------------------------------------------- | ---------- | ---------- | ---------- | +| `.Discard()` | - | - | Discard | +| `.DeadLetter()` | - | - | DeadLetter | +| `.Retry()` | Enabled | Disabled | Default | +| `.Retry(3)` | 3 attempts | Disabled | Default | +| `.Redeliver()` | Disabled | Enabled | Default | +| `.Retry(3).ThenRedeliver()` | 3 attempts | Enabled | Default | +| `.Retry(3).ThenDeadLetter()` | 3 attempts | Disabled | DeadLetter | +| `.Retry(3).ThenRedeliver().ThenDeadLetter()` | 3 attempts | Enabled | DeadLetter | + +**Default terminal behavior:** When no terminal action is specified, exhausted messages route to the error endpoint through the fault middleware. `.ThenDeadLetter()` makes this intent explicit but does not change the behavior. + +**Disabled vs. default:** "Disabled" means that tier is skipped for this exception type. A dash (-) means the tier is not configured and does not apply. + +# Conditional policies + +Use predicate overloads to apply policies only when an exception matches specific conditions. The predicate receives the typed exception instance. + +## Filter by exception property + +```csharp +// Only retry transient database errors +policy.On(ex => ex.IsTransient) + .Retry(5); + +// Dead-letter non-transient database errors +policy.On(ex => !ex.IsTransient) + .DeadLetter(); +``` + +## Filter by HTTP status code + +```csharp +policy.On(ex => + ex.StatusCode == System.Net.HttpStatusCode.TooManyRequests) + .Redeliver( + [ + TimeSpan.FromSeconds(30), + TimeSpan.FromMinutes(2), + TimeSpan.FromMinutes(10) + ]); + +policy.On(ex => + ex.StatusCode == System.Net.HttpStatusCode.BadRequest) + .DeadLetter(); +``` + +## Filter by inner exception + +```csharp +policy.On(ex => + ex.InnerException is TimeoutException) + .Retry(3); +``` + +## Multiple rules for the same type + +You can define multiple rules for the same exception type with different predicates. When an exception is thrown, the first matching rule wins. + +```csharp +policy.On(ex => + ex.StatusCode == System.Net.HttpStatusCode.ServiceUnavailable) + .Retry(3).ThenRedeliver(); + +policy.On(ex => + ex.StatusCode == System.Net.HttpStatusCode.BadRequest) + .DeadLetter(); + +// Catch-all for other HTTP errors +policy.On().Retry(3); +``` diff --git a/website/src/docs/mocha/v1/reliability.md b/website/src/docs/mocha/v1/reliability.md index 7bca2410f75..830902871e1 100644 --- a/website/src/docs/mocha/v1/reliability.md +++ b/website/src/docs/mocha/v1/reliability.md @@ -1,6 +1,6 @@ --- title: "Reliability" -description: "Configure fault handling, dead-letter routing, message expiry, concurrency limits, circuit breakers, the transactional outbox, and the idempotent inbox in Mocha to build resilient messaging pipelines." +description: "Configure exception policies, retry, delayed redelivery, fault handling, dead-letter routing, message expiry, concurrency limits, circuit breakers, the transactional outbox, and the idempotent inbox in Mocha to build resilient messaging pipelines." --- Messaging systems fail. Handlers throw exceptions, brokers go offline, databases lock up, and messages arrive faster than consumers can process them. Mocha's reliability features handle these failures at the infrastructure level so your handler code stays focused on business logic. @@ -8,6 +8,11 @@ Messaging systems fail. Handlers throw exceptions, brokers go offline, databases ```csharp builder.Services .AddMessageBus() + .AddResilience(policy => + { + policy.On().DeadLetter(); + policy.Default().Retry().ThenRedeliver(); + }) .AddCircuitBreaker(opts => { opts.FailureRatio = 0.5; @@ -24,7 +29,7 @@ builder.Services .AddRabbitMQ(); ``` -That configuration adds circuit breaking, concurrency limiting, transactional outbox, idempotent inbox, and database transaction wrapping - all as middleware in the receive and dispatch pipelines. +That configuration adds per-exception retry and redelivery policies, circuit breaking, concurrency limiting, transactional outbox, idempotent inbox, and database transaction wrapping - all as middleware in the receive and dispatch pipelines. # Delivery guarantees @@ -48,15 +53,17 @@ TransportCircuitBreaker -> Instrumentation -> DeadLetter -> Fault - -> CircuitBreaker - -> Expiry - -> MessageTypeSelection - -> Routing - -> Consumer pipeline - -> Transaction middleware (BEGIN) - -> Inbox (claim inside transaction) - -> Your handler - -> Transaction middleware (COMMIT/ROLLBACK) + -> Redelivery <- schedules for later if retries exhausted + -> CircuitBreaker + -> Expiry + -> MessageTypeSelection + -> Routing + -> Consumer pipeline + -> Retry <- immediate in-process retries + -> Transaction middleware (BEGIN) + -> Inbox (claim inside transaction) + -> Your handler + -> Transaction middleware (COMMIT/ROLLBACK) ``` Each middleware can intercept failures from downstream, transform them, or short-circuit the pipeline. The reliability middlewares - dead-letter, fault, circuit breaker, expiry, inbox, and concurrency limiter - are all enabled by default with sensible defaults. You tune them when the defaults do not match your workload. @@ -146,6 +153,420 @@ await bus.SendAsync( For time-sensitive commands, a short expiry prevents stale operations from executing after their validity window. +# Retry failed messages + +When a handler throws a transient exception - a database timeout, an HTTP 503, temporary lock contention - retrying the same operation a few hundred milliseconds later often succeeds. The retry middleware re-runs the handler in-process without releasing the concurrency slot or leaving the consumer pipeline. + +Retry lives in the **consumer pipeline**, not the receive pipeline. This matters for two reasons: + +1. **Only handler failures are retried.** Deserialization errors, unknown message types, and routing failures are almost always permanent. Retrying them wastes time. +2. **Multi-consumer correctness.** When one endpoint routes a message to multiple consumers, only the failing consumer retries. The others are unaffected. + +## Add retry with defaults + +```csharp +builder.Services + .AddMessageBus() + .AddResilience() + .AddEventHandler() + .AddRabbitMQ(); +``` + +The parameterless `AddResilience()` registers a catch-all `Default()` rule with both retry and redelivery enabled. Retry defaults to 3 attempts, 200 ms base delay, exponential backoff, jitter enabled, and a 30-second maximum delay. These defaults handle the majority of transient failures without tuning. + +## Customize retry behavior + +```csharp +builder.Services + .AddMessageBus() + .AddResilience(policy => + { + policy.Default() + .Retry(5, TimeSpan.FromSeconds(1), RetryBackoffType.Exponential) + .ThenRedeliver(); + }) + .AddEventHandler() + .AddRabbitMQ(); +``` + +## Use explicit retry intervals + +When you need precise control over each delay, pass an array of intervals. The array length determines the number of retries. + +```csharp +builder.Services + .AddMessageBus() + .AddResilience(policy => + { + policy.Default().Retry( + [ + TimeSpan.FromMilliseconds(100), + TimeSpan.FromMilliseconds(500), + TimeSpan.FromSeconds(2) + ]).ThenRedeliver(); + }) + .AddEventHandler() + .AddRabbitMQ(); +``` + +## Handle different exceptions differently + +Not every exception should be retried. Validation errors, authorization failures, and other permanent errors waste retry budget. Use `AddResilience` to define per-exception handling strategies. + +```csharp +builder.Services + .AddMessageBus() + .AddResilience(policy => + { + // Never retry validation failures - route to error endpoint + policy.On().DeadLetter(); + + // Never retry non-transient database errors + policy.On(ex => !ex.IsTransient).DeadLetter(); + + // Transient database errors get more retries + policy.On(ex => ex.IsTransient) + .Retry(5, TimeSpan.FromMilliseconds(200)) + .ThenRedeliver(); + + // Everything else: retry then redeliver + policy.Default().Retry().ThenRedeliver(); + }) + .AddEventHandler() + .AddRabbitMQ(); +``` + +Exception matching respects inheritance. `On()` also matches `NpgsqlException` and any other `DbException` subclass. When multiple rules match, the most specific type wins - the same precedence as C# `catch` blocks. + +For the full API including predicates, conditional policies, and escalation chains, see [Exception Policies](/docs/mocha/v1/exception-policies). + +## Override retry per consumer + +The bus-level exception policy applies to all consumers. Override it for specific consumers that need different behavior. See [Handlers and Consumers](/docs/mocha/v1/handlers-and-consumers) for the full consumer configuration API. + +```csharp +builder.Services + .AddMessageBus() + .AddResilience() // bus-level: default retry + redelivery for all consumers + .AddEventHandler(consumer => + { + // This consumer: 10 retries with longer delay, then redeliver + consumer.AddResilience(policy => + { + policy.Default() + .Retry(10, TimeSpan.FromSeconds(2)) + .ThenRedeliver(); + }); + }) + .AddEventHandler(consumer => + { + // This consumer: skip retry, redeliver only + consumer.AddResilience(policy => + { + policy.Default().Redeliver(); + }); + }) + .AddRabbitMQ(); +``` + +Consumer-level policies replace the bus-level policies entirely for that consumer - they are not merged. If you need a bus-level rule to also apply at the consumer level, include it in the consumer-level configuration. + +## Retry defaults reference + +| Setting | Default value | +| --------- | ------------- | +| Attempts | 3 | +| Delay | 200 ms | +| Backoff | Exponential | +| Jitter | Enabled | +| Max delay | 30 seconds | + +### RetryBackoffType values + +| Value | Formula | Example (Delay = 200 ms) | +| ------------- | ------------------------- | ------------------------ | +| `Constant` | `Delay` | 200 ms, 200 ms, 200 ms | +| `Linear` | `Delay * attempt` | 200 ms, 400 ms, 600 ms | +| `Exponential` | `Delay * 2^(attempt - 1)` | 200 ms, 400 ms, 800 ms | + +# Redeliver failed messages + +Retry handles transient blips that resolve in milliseconds. Some failures take longer - a downstream service is deploying, a database is recovering from a failover, an external API is rate-limiting. For these, you need **redelivery**: reschedule the message for later delivery through the transport. + +Redelivery lives in the **receive pipeline**, not the consumer pipeline. This matters for two reasons: + +1. **Single decision point.** When an endpoint has multiple consumers, you want one redelivery decision per message, not one per consumer. +2. **Releases the concurrency slot.** Retry holds the slot while waiting. Redelivery returns the message to the transport and frees the slot for other messages. + +Redelivery uses Mocha's [scheduling infrastructure](/docs/mocha/v1/scheduling) to schedule the message for future delivery. The message re-enters the full receive pipeline when it arrives - fresh routing, fresh consumer invocations, fresh retry attempts. + +```text +Receive pipeline: + + Fault + -> Redelivery + -> CircuitBreaker + -> ... (rest of pipeline) + +All retries exhausted -> exception propagates to Redelivery + -> redelivery attempts remaining -> schedule for later delivery + -> redelivery exhausted -> exception propagates to Fault -> error endpoint +``` + +## Add redelivery with defaults + +Redelivery is included when you call `AddResilience()` without arguments or when you chain `.ThenRedeliver()` in an escalation chain. + +```csharp +builder.Services + .AddMessageBus() + .AddResilience() // Default() rule enables both retry and redelivery + .AddEventHandler() + .AddRabbitMQ(); +``` + +The default redelivery schedule is three attempts at 5 minutes, 15 minutes, and 30 minutes with jitter enabled and a 1-hour maximum delay. + +## Use custom redelivery intervals + +```csharp +builder.Services + .AddMessageBus() + .AddResilience(policy => + { + policy.Default().Retry().ThenRedeliver( + [ + TimeSpan.FromSeconds(30), + TimeSpan.FromMinutes(5), + TimeSpan.FromMinutes(30) + ]); + }) + .AddEventHandler() + .AddRabbitMQ(); +``` + +The array length determines the number of redelivery attempts. Each element is the delay before that attempt. + +## Use calculated redelivery delays + +Instead of explicit intervals, configure a number of attempts and a base delay: + +```csharp +builder.Services + .AddMessageBus() + .AddResilience(policy => + { + policy.Default() + .Retry() + .ThenRedeliver(5, TimeSpan.FromMinutes(2)); + }) + .AddEventHandler() + .AddRabbitMQ(); +``` + +## Redeliver without retry + +To skip retry entirely and go straight to redelivery, use `.Redeliver()` directly: + +```csharp +builder.Services + .AddMessageBus() + .AddResilience(policy => + { + policy.Default().Redeliver(); + }) + .AddEventHandler() + .AddRabbitMQ(); +``` + +When `.Redeliver()` is called without `.Retry()` first, retry is disabled for that rule. The handler failure goes straight to redelivery scheduling. + +## Override redelivery per endpoint + +Override exception policies at the transport level for more granular control. Transport-level policies replace the bus-level policies entirely for all endpoints on that transport. + +```csharp +builder.Services + .AddMessageBus() + .AddResilience() // bus-level default + .AddRabbitMQ(transport => + { + transport.AddResilience(policy => + { + // Override for this transport: longer redelivery intervals + policy.Default().Retry().ThenRedeliver( + [ + TimeSpan.FromMinutes(10), + TimeSpan.FromMinutes(30), + TimeSpan.FromHours(1) + ]); + }); + }); +``` + +Disable redelivery for a specific transport by configuring retry-only: + +```csharp +transport.AddResilience(policy => +{ + policy.Default().Retry(); +}); +``` + +## Redelivery defaults reference + +| Setting | Default value | +| --------- | --------------------- | +| Intervals | 5 min, 15 min, 30 min | +| Jitter | Enabled | +| Max delay | 1 hour | + +# Combine retry and redelivery + +The two tiers compose naturally through escalation chains. When both are configured, a message goes through immediate retry first. If all retries are exhausted, the exception propagates up from the consumer pipeline to the receive pipeline, where redelivery catches it and schedules the message for later delivery. On the next delivery round, the full retry cycle runs again. + +```text +Message arrives + | + +- Consumer pipeline: Retry + | +- Attempt 1 -> handler throws -> retry + | +- Attempt 2 -> handler throws -> retry + | +- Attempt 3 -> handler throws -> retry + | +- Attempt 4 -> handler throws -> retries exhausted, exception propagates + | + +- Receive pipeline: Redelivery + | +- Schedule message for delivery in 5 minutes + | + +- ... 5 minutes later, message re-enters pipeline ... + | + +- Consumer pipeline: Retry + | +- Attempt 1 -> handler throws -> retry + | +- ... + | +- Attempt 4 -> retries exhausted, exception propagates + | + +- Receive pipeline: Redelivery + | +- Schedule message for delivery in 15 minutes + | + +- ... continues until redelivery attempts exhausted ... + | + +- Fault middleware -> error endpoint (dead letter) +``` + +The total number of handler invocations before a message reaches the error endpoint: + +```text +Total attempts = (retry attempts + 1) x (redelivery attempts + 1) +``` + +With the defaults (3 retries, 3 redeliveries): `(3 + 1) x (3 + 1) = 16` total handler invocations. + +```csharp +builder.Services + .AddMessageBus() + .AddResilience(policy => + { + policy.Default() + .Retry(5, TimeSpan.FromSeconds(1), RetryBackoffType.Exponential) + .ThenRedeliver( + [ + TimeSpan.FromMinutes(5), + TimeSpan.FromMinutes(15), + TimeSpan.FromMinutes(30) + ]); + }) + .AddEventHandler() + .AddRabbitMQ(); +``` + +With this configuration: `(5 + 1) x (3 + 1) = 24` total handler invocations before dead-letter. + +:::note Request/reply messages skip redelivery +Redelivery does not apply to request/reply messages. The caller is waiting synchronously for a response - scheduling the message for delivery minutes later would cause a timeout. When a request/reply handler fails after retry exhaustion, the exception propagates directly to the fault middleware, which sends a `NotAcknowledgedEvent` back to the caller. Immediate retry still applies to request/reply handlers. +::: + +# Inspect retry state in handlers + +Handlers can access the current retry state through the context features. This is useful for graceful degradation - trying a primary path on the first attempt and falling back to an alternative on subsequent retries. + +```csharp +public class PaymentConsumer(ILogger logger) : IConsumer +{ + public async ValueTask ConsumeAsync(IConsumeContext context) + { + var state = context.Features.Get(); + + if (state is { ImmediateRetryCount: > 2 }) + { + logger.LogWarning( + "Primary gateway failed after {Retries} retries, using fallback", + state.ImmediateRetryCount); + await ProcessViaFallbackGatewayAsync(context.Message); + return; + } + + if (state is { DelayedRetryCount: > 0 }) + { + logger.LogWarning( + "Processing after {Redeliveries} redeliveries", + state.DelayedRetryCount); + } + + await ProcessPaymentAsync(context.Message); + } +} +``` + +`RetryFeature` is `null` when `AddResilience()` is not configured. When present, it exposes two properties: + +| Property | Type | Description | +| --------------------- | ----- | ------------------------------------------------------------------------------------------ | +| `ImmediateRetryCount` | `int` | Number of immediate retries so far in this delivery round. `0` on the initial attempt. | +| `DelayedRetryCount` | `int` | Number of redelivery rounds completed. Read from the `delayed-retry-count` message header. | + +## Retry and sagas + +[Sagas](/docs/mocha/v1/sagas) are consumers, so retry and redelivery apply automatically - no special configuration needed. Each saga handler invocation runs inside a saga transaction. If the handler throws, the transaction is never committed and all state changes are discarded. On the next retry attempt, the saga state is loaded fresh from the store. + +This means retry is safe for sagas by default. You do not need to worry about partial state mutations leaking between retry attempts. + +Redelivery is also safe. When a redelivered message arrives, a new transaction starts and the saga state is loaded at whatever state the last successful commit left it. + +# Troubleshoot retry and redelivery + +## "My handler runs more times than expected" + +Check both retry and redelivery. With both configured, the total handler invocations are `(retry attempts + 1) x (redelivery attempts + 1)`. Three retries with three redeliveries means 16 total invocations, not 6. Use the [formula above](#combine-retry-and-redelivery) to calculate the exact count. + +## "Retry does not work for my consumer" + +`AddResilience()` must be called at the bus level (or on the specific consumer) to register the retry middleware in the consumer pipeline. Adding a consumer-level policy without a bus-level or consumer-level `AddResilience()` call has no effect - the middleware is not in the pipeline. + +## "Redelivery fails at startup" + +Redelivery uses Mocha's [scheduling infrastructure](/docs/mocha/v1/scheduling). If your transport does not support scheduling or the scheduling store is not configured, redelivery cannot schedule messages for later delivery. Check that your transport is configured with scheduling support. + +## "Validation exceptions are being retried" + +Use `AddResilience` to route permanent failures directly to the error endpoint: + +```csharp +builder.Services + .AddMessageBus() + .AddResilience(policy => + { + policy.On().DeadLetter(); + policy.Default().Retry().ThenRedeliver(); + }) + .AddEventHandler() + .AddRabbitMQ(); +``` + +See [Exception Policies](/docs/mocha/v1/exception-policies) for the full per-exception configuration API, including predicates, terminal actions, and escalation chains. + +## "Request/reply messages are not redelivered" + +This is by design. Redelivery skips request/reply messages because the caller is waiting for a synchronous response. Immediate retry still applies. See the [note above](#combine-retry-and-redelivery) for details. + # Limit concurrency The concurrency limiter middleware restricts how many messages a receive endpoint processes in parallel. @@ -551,8 +972,11 @@ The inbox cleanup worker is a background hosted service (`IHostedService`). It r # Next steps -Your messaging pipeline now handles failures, limits concurrency, breaks circuits on sustained errors, guarantees delivery through the outbox, and deduplicates messages through the inbox. To monitor your messaging system, see [Observability](/docs/mocha/v1/observability). +Your messaging pipeline now handles exceptions per-type with retry and redelivery policies, limits concurrency, breaks circuits on repeated failures, guarantees delivery through the outbox, and deduplicates messages through the inbox. To monitor your messaging system, see [Observability](/docs/mocha/v1/observability). +- [**Exception Policies**](/docs/mocha/v1/exception-policies) - Configure per-exception handling with composable retry, redelivery, and terminal actions. +- [**Handlers and Consumers**](/docs/mocha/v1/handlers-and-consumers) - Configure per-consumer exception policy overrides and understand handler exception behavior. +- [**Scheduling**](/docs/mocha/v1/scheduling) - Configure the scheduling infrastructure that redelivery uses for delayed message delivery. - [**Middleware and Pipelines**](/docs/mocha/v1/middleware-and-pipelines) - Write custom middleware, control pipeline ordering, and understand the three pipeline stages. - [**Sagas**](/docs/mocha/v1/sagas) - Coordinate multi-step workflows with state machine sagas that use compensation when steps fail. - [**Observability**](/docs/mocha/v1/observability) - Trace message flows across services and monitor pipeline health with OpenTelemetry. diff --git a/website/src/docs/mocha/v1/scheduling.md b/website/src/docs/mocha/v1/scheduling.md index 040d68c0062..06806c6aba1 100644 --- a/website/src/docs/mocha/v1/scheduling.md +++ b/website/src/docs/mocha/v1/scheduling.md @@ -300,66 +300,6 @@ For automatic saga timeouts that cancel themselves on completion, see [Timeouts] See [Sagas](/docs/mocha/v1/sagas) for the full saga configuration guide. -# API reference - -## Scheduling methods on `IMessageBus` - -| Method | Parameters | Returns | Description | -| ----------------------------- | ----------------------------------------------------------------------------------------- | ----------------------------- | ------------------------------------------------------------------------------------------------------------------------- | -| `SchedulePublishAsync` | `T message, DateTimeOffset scheduledTime, CancellationToken ct` | `ValueTask` | Publishes a message for delivery at an absolute time. | -| `SchedulePublishAsync` | `T message, DateTimeOffset scheduledTime, PublishOptions options, CancellationToken ct` | `ValueTask` | Publishes a message with scheduling and publish options. | -| `ScheduleSendAsync` | `object message, DateTimeOffset scheduledTime, CancellationToken ct` | `ValueTask` | Sends a message for delivery at an absolute time. | -| `ScheduleSendAsync` | `object message, DateTimeOffset scheduledTime, SendOptions options, CancellationToken ct` | `ValueTask` | Sends a message with scheduling and send options. | -| `CancelScheduledMessageAsync` | `string token, CancellationToken ct` | `ValueTask` | Cancels a scheduled message. Returns `true` if cancelled, `false` if already dispatched, already cancelled, or not found. | - -## `SchedulingResult` - -| Property | Type | Description | -| --------------- | ---------------- | ----------------------------------------------------------------------------------------- | -| `Token` | `string?` | An opaque token for cancelling this message, or `null` if cancellation is not supported. | -| `ScheduledTime` | `DateTimeOffset` | The time at which the message is scheduled for delivery. | -| `IsCancellable` | `bool` | `true` when the scheduling infrastructure supports cancellation and a token was assigned. | - -## Scheduling properties on options - -| Struct | Property | Type | Default | Description | -| ---------------- | --------------- | ----------------- | ------- | --------------------------------------------------------- | -| `PublishOptions` | `ScheduledTime` | `DateTimeOffset?` | `null` | Scheduled delivery time. `null` means immediate delivery. | -| `SendOptions` | `ScheduledTime` | `DateTimeOffset?` | `null` | Scheduled delivery time. `null` means immediate delivery. | - -`PublishOptions` and `SendOptions` have additional properties for expiration, headers, and other dispatch behavior. `ScheduledTime` can be combined with any of them. - -## Saga extensions - -| Method | Available on | Parameters | Description | -| ------------------ | ------------------------------------------------------- | ------------------------------------------------ | ------------------------------------------- | -| `ScheduledPublish` | `ISagaTransitionDescriptor`, `ISagaLifeCycleDescriptor` | `TimeSpan delay, Func factory` | Publishes a message with a scheduled delay. | -| `ScheduledSend` | `ISagaTransitionDescriptor`, `ISagaLifeCycleDescriptor` | `TimeSpan delay, Func factory` | Sends a message with a scheduled delay. | - -## `ScheduledMessage` entity columns - -| Column | Type | Description | -| ---------------- | ----------- | --------------------------------------------------------------------- | -| `id` | `uuid` | Primary key. | -| `envelope` | `json` | Serialized message envelope with headers and payload. | -| `scheduled_time` | `timestamp` | UTC time when the message becomes eligible for dispatch. | -| `times_sent` | `integer` | Number of dispatch attempts. | -| `max_attempts` | `integer` | Maximum dispatch attempts before the message is dropped. Default: 10. | -| `last_error` | `jsonb` | Last dispatch error (exception type, message, stack trace). | -| `created_at` | `timestamp` | UTC time when the scheduled message was created. | - -## EF Core model builder - -| Method | Description | -| --------------------------------------------- | ---------------------------------------------------------------------------------------- | -| `modelBuilder.AddPostgresScheduledMessages()` | Applies the `ScheduledMessage` entity configuration with default table and column names. | - -## Scheduling service registration - -| Method | Description | -| ------------------------- | ------------------------------------------------------------------------------------------------------- | -| `UsePostgresScheduling()` | Registers the Postgres scheduling pipeline: message store, background worker, and EF Core interceptors. | - # Troubleshooting **Scheduled messages are not being delivered.**