Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/Mocha/examples/ExceptionPolicies/ExceptionPolicies.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net10.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\Mocha\Mocha.csproj" />
<ProjectReference Include="..\..\src\Mocha.Abstractions\Mocha.Abstractions.csproj" />
<ProjectReference Include="..\..\src\Mocha.Transport.InMemory\Mocha.Transport.InMemory.csproj" />
<ProjectReference Include="..\..\src\Mocha.Hosting\Mocha.Hosting.csproj" />
</ItemGroup>
</Project>
44 changes: 44 additions & 0 deletions src/Mocha/examples/ExceptionPolicies/Exceptions/Exceptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
namespace ExceptionPolicies.Exceptions;

/// <summary>
/// Transient database failure — worth retrying because it usually resolves quickly.
/// </summary>
public class TransientDatabaseException(string message) : Exception(message);

/// <summary>
/// The message payload is malformed — retrying will never help.
/// </summary>
public class MessageValidationException(string message) : Exception(message);

/// <summary>
/// The message was already processed — expected in at-least-once delivery.
/// </summary>
public class DuplicateMessageException(string message) : Exception(message);

/// <summary>
/// Payment gateway returned an error — flaky but usually recovers.
/// </summary>
public class PaymentGatewayException(string message) : Exception(message);

/// <summary>
/// Auth token expired — immediate retry is pointless, need to wait for refresh.
/// </summary>
public class AuthTokenExpiredException(string message) : Exception(message);

/// <summary>
/// External service is completely unavailable — needs time to recover.
/// </summary>
public class ExternalServiceUnavailableException(string message) : Exception(message);

/// <summary>
/// HTTP-level failure with a status code for conditional policy matching.
/// </summary>
public class HttpServiceException(string message, int statusCode) : Exception(message)
{
public int StatusCode { get; } = statusCode;
}

/// <summary>
/// Corrupt or unparseable message payload — a poison message.
/// </summary>
public class PoisonMessageException(string message) : Exception(message);
148 changes: 148 additions & 0 deletions src/Mocha/examples/ExceptionPolicies/Handlers/Handlers.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
using ExceptionPolicies.Exceptions;
using ExceptionPolicies.Messages;
using Mocha;

namespace ExceptionPolicies.Handlers;

/// <summary>
/// Simulates a flaky payment gateway that succeeds after 3 failures.
/// Policy: Retry 5x with exponential backoff.
/// </summary>
public class ProcessPaymentHandler : IEventHandler<ProcessPayment>
{
private static int _attempts;

public ValueTask HandleAsync(ProcessPayment message, CancellationToken cancellationToken)
{
var attempt = Interlocked.Increment(ref _attempts);
Console.WriteLine($"[Payment] Attempt {attempt} for order {message.OrderId}");

if (attempt <= 3)
{
throw new PaymentGatewayException("Gateway timeout");
}

Console.WriteLine($"[Payment] Successfully processed order {message.OrderId}");
return ValueTask.CompletedTask;
}
}

/// <summary>
/// Receives a message with an invalid payload.
/// Policy: DeadLetter immediately — the message is permanently bad.
/// </summary>
public class ValidateOrderHandler : IEventHandler<ValidateOrder>
{
public ValueTask HandleAsync(ValidateOrder message, CancellationToken cancellationToken)
{
Console.WriteLine($"[Validate] Validating order {message.OrderId}");
throw new MessageValidationException($"Order {message.OrderId} has invalid schema");
}
}

/// <summary>
/// Detects a duplicate message that was already processed.
/// Policy: Discard silently — no retry, no dead-letter.
/// </summary>
public class DeduplicateMessageHandler : IEventHandler<DeduplicateMessage>
{
public ValueTask HandleAsync(DeduplicateMessage message, CancellationToken cancellationToken)
{
Console.WriteLine($"[Dedup] Message {message.MessageId} already processed");
throw new DuplicateMessageException($"Message {message.MessageId} is a duplicate");
}
}

/// <summary>
/// Calls an external API that is completely down.
/// Policy: Retry 5x aggressively, then redeliver with increasing intervals, then dead-letter.
/// </summary>
public class CallExternalApiHandler : IEventHandler<CallExternalApi>
{
private static int _attempts;

public ValueTask HandleAsync(CallExternalApi message, CancellationToken cancellationToken)
{
var attempt = Interlocked.Increment(ref _attempts);
Console.WriteLine($"[ExternalApi] Attempt {attempt} for {message.Url}");
throw new ExternalServiceUnavailableException($"Service at {message.Url} is down");
}
}

/// <summary>
/// Service with an expired auth token.
/// Policy: Redeliver only (skip retry) — immediate retry won't help.
/// </summary>
public class RefreshAuthTokenHandler : IEventHandler<RefreshAuthToken>
{
private static int _attempts;

public ValueTask HandleAsync(RefreshAuthToken message, CancellationToken cancellationToken)
{
var attempt = Interlocked.Increment(ref _attempts);
Console.WriteLine($"[Auth] Attempt {attempt} for service {message.Service}");
throw new AuthTokenExpiredException($"Token for {message.Service} expired");
}
}

/// <summary>
/// Transient database failure during batch processing.
/// Policy: Retry 3x quickly, then escalate to redelivery.
/// </summary>
public class ProcessBatchHandler : IEventHandler<ProcessBatch>
{
private static int _attempts;

public ValueTask HandleAsync(ProcessBatch message, CancellationToken cancellationToken)
{
var attempt = Interlocked.Increment(ref _attempts);
Console.WriteLine($"[Batch] Attempt {attempt} for batch {message.BatchId}");

if (attempt <= 4)
{
throw new TransientDatabaseException("Connection pool exhausted");
}

Console.WriteLine($"[Batch] Successfully processed batch {message.BatchId}");
return ValueTask.CompletedTask;
}
}

/// <summary>
/// Ingests telemetry data from a device, encountering various HTTP errors.
/// Policy: Conditional — different behavior for 404, 429, and 503 status codes.
/// </summary>
public class IngestTelemetryHandler : IEventHandler<IngestTelemetry>
{
private static int _attempts;

public ValueTask HandleAsync(IngestTelemetry message, CancellationToken cancellationToken)
{
var attempt = Interlocked.Increment(ref _attempts);
Console.WriteLine($"[Telemetry] Attempt {attempt} for device {message.DeviceId}");

// Rotate through different HTTP errors to demonstrate conditional policies
throw (attempt % 3) switch
{
0 => new HttpServiceException("Not Found", 404),
1 => new HttpServiceException("Too Many Requests", 429),
_ => new HttpServiceException("Service Unavailable", 503)
};
}
}

/// <summary>
/// Handles a corrupt/unparseable message.
/// Policy: Retry once (in case of transient parse issue), then dead-letter immediately.
/// </summary>
public class HandlePoisonMessageHandler : IEventHandler<HandlePoisonMessage>
{
private static int _attempts;

public ValueTask HandleAsync(HandlePoisonMessage message, CancellationToken cancellationToken)
{
var attempt = Interlocked.Increment(ref _attempts);
Console.WriteLine($"[Poison] Attempt {attempt} for data: {message.Data}");
throw new PoisonMessageException("Payload cannot be deserialized");
}
}
17 changes: 17 additions & 0 deletions src/Mocha/examples/ExceptionPolicies/Messages/Messages.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
namespace ExceptionPolicies.Messages;

public record ProcessPayment(string OrderId, decimal Amount);

public record ValidateOrder(string OrderId);

public record DeduplicateMessage(string MessageId);

public record CallExternalApi(string Url);

public record RefreshAuthToken(string Service);

public record ProcessBatch(string BatchId);

public record IngestTelemetry(string DeviceId);

public record HandlePoisonMessage(string Data);
125 changes: 125 additions & 0 deletions src/Mocha/examples/ExceptionPolicies/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
using ExceptionPolicies.Exceptions;
using ExceptionPolicies.Handlers;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Mocha;
using Mocha.Transport.InMemory;

// ---------------------------------------------------------------------------
// Exception Policies Demo
//
// Demonstrates all per-exception policy configurations available in Mocha.
// Uses the InMemory transport for simplicity — no external dependencies.
// ---------------------------------------------------------------------------

var builder = Host.CreateApplicationBuilder(args);

builder.Services
.AddMessageBus()

// -----------------------------------------------------------------------
// Exception Policies — the main showcase
//
// Per-exception rules are configured in a single AddExceptionPolicy call.
// The On<Exception>() catch-all provides global retry/redelivery defaults.
// -----------------------------------------------------------------------
.AddExceptionPolicy(policy =>
{
// --- Terminal: DeadLetter ---
// Validation errors are permanent — the message payload is bad.
// Skip retry and redelivery entirely; route straight to the error endpoint.
policy.On<MessageValidationException>().DeadLetter();

// --- Terminal: Discard ---
// Duplicate messages are expected in at-least-once delivery systems.
// Silently drop them — no retry, no redelivery, no error endpoint.
policy.On<DuplicateMessageException>().Discard();

// --- Retry only (skip redelivery) ---
// Payment gateway is flaky but usually recovers within a few attempts.
// Retry 5 times with exponential backoff, then dead-letter on exhaustion.
policy.On<PaymentGatewayException>()
.Retry(
attempts: 5,
delay: TimeSpan.FromMilliseconds(200),
backoff: RetryBackoffType.Exponential);

// --- Redeliver only (skip retry) ---
// Auth token expired — immediate retry is pointless because the token
// won't refresh in milliseconds. Wait for redelivery instead.
policy.On<AuthTokenExpiredException>().Redeliver();

// --- Escalation: Retry then Redeliver ---
// Transient DB errors — try a few times quickly (connection hiccup),
// then back off with redelivery if the database is truly struggling.
policy.On<TransientDatabaseException>()
.Retry(attempts: 3)
.ThenRedeliver();

// --- Escalation: Retry then DeadLetter (skip redelivery) ---
// Poison messages — try once in case it was a transient parse glitch,
// then give up immediately. Redelivery won't fix a corrupt payload.
policy.On<PoisonMessageException>()
.Retry(attempts: 1)
.ThenDeadLetter();

// --- Full chain: Retry -> Redeliver -> DeadLetter ---
// External service completely down — aggressive retry first, then
// patient redelivery with increasing intervals, then dead-letter
// as the last resort so operators can investigate.
policy.On<ExternalServiceUnavailableException>()
.Retry(attempts: 5, delay: TimeSpan.FromMilliseconds(500))
.ThenRedeliver(
[
TimeSpan.FromSeconds(10),
TimeSpan.FromSeconds(30),
TimeSpan.FromSeconds(60)
])
.ThenDeadLetter();

// --- Conditional: Different policies for the same exception type ---
// HTTP 404 = resource is gone permanently, dead-letter it.
policy.On<HttpServiceException>(ex => ex.StatusCode == 404)
.DeadLetter();

// HTTP 429 = rate limited, back off with redelivery.
policy.On<HttpServiceException>(ex => ex.StatusCode == 429)
.Redeliver(
[
TimeSpan.FromSeconds(5),
TimeSpan.FromSeconds(15),
TimeSpan.FromSeconds(30)
]);

// HTTP 503 = service unavailable, retry quickly then redeliver.
policy.On<HttpServiceException>(ex => ex.StatusCode == 503)
.Retry(attempts: 3)
.ThenRedeliver();

// --- Catch-all: Default for unmatched exceptions ---
// Most-specific-type-wins means this only fires for exceptions
// not matched by any of the rules above.
policy.On<Exception>()
.Retry(attempts: 2)
.ThenRedeliver();
})

// -----------------------------------------------------------------------
// Register handlers
// -----------------------------------------------------------------------
.AddEventHandler<ProcessPaymentHandler>()
.AddEventHandler<ValidateOrderHandler>()
.AddEventHandler<DeduplicateMessageHandler>()
.AddEventHandler<CallExternalApiHandler>()
.AddEventHandler<RefreshAuthTokenHandler>()
.AddEventHandler<ProcessBatchHandler>()
.AddEventHandler<IngestTelemetryHandler>()
.AddEventHandler<HandlePoisonMessageHandler>()

// -----------------------------------------------------------------------
// Transport — InMemory for this demo (no external dependencies)
// -----------------------------------------------------------------------
.AddInMemory();

var app = builder.Build();
await app.RunAsync();
1 change: 1 addition & 0 deletions src/Mocha/src/Demo/Demo.Billing/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
builder
.Services.AddMessageBus()
.AddInstrumentation()
.AddResilience()
.AddBilling()
.AddBatchHandler<OrderPlacedBatchHandler>(opts =>
{
Expand Down
1 change: 1 addition & 0 deletions src/Mocha/src/Demo/Demo.Catalog/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
builder
.Services.AddMessageBus()
.AddInstrumentation()
.AddResilience()
.AddCatalog()
.AddEntityFramework<CatalogDbContext>(p =>
{
Expand Down
1 change: 1 addition & 0 deletions src/Mocha/src/Demo/Demo.Shipping/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
builder
.Services.AddMessageBus()
.AddInstrumentation()
.AddResilience()
.AddShipping()
.AddEntityFramework<ShippingDbContext>(p =>
{
Expand Down
2 changes: 2 additions & 0 deletions src/Mocha/src/Mocha/Builder/MessageBusBuilderExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@ public static IMessageBusBuilder ConfigureFeature<TFeature>(

internal static void AddDefaults(this MessageBusBuilder builder)
{
builder.UseConsume(ConsumerMiddlewares.Retry, before: "Instrumentation");
builder.UseConsume(ConsumerMiddlewares.Instrumentation);

builder.UseReceive(ReceiveMiddlewares.TransportCircuitBreaker);
builder.UseReceive(ReceiveMiddlewares.ConcurrencyLimiter);
builder.UseReceive(ReceiveMiddlewares.Instrumentation);
builder.UseReceive(ReceiveMiddlewares.DeadLetter);
builder.UseReceive(ReceiveMiddlewares.Fault);
builder.UseReceive(ReceiveMiddlewares.Redelivery, after: "Fault");
builder.UseReceive(ReceiveMiddlewares.CircuitBreaker);
builder.UseReceive(ReceiveMiddlewares.Expiry);
builder.UseReceive(ReceiveMiddlewares.MessageTypeSelection);
Expand Down
Loading
Loading