Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions PLAN.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ For batch failures (via `ISenderCallback`), the exception needs to be passed thr

Add `IWolverineRuntime` to the `SendingAgent` constructor (needed for `SendingEnvelopeLifecycle`).

### Step 7: Create `LatchSenderContinuation`
### Step 7: Create `PauseSendingContinuation`

**New file**: `src/Wolverine/ErrorHandling/LatchSenderContinuation.cs`
**New file**: `src/Wolverine/ErrorHandling/PauseSendingContinuation.cs`

Create a continuation that latches (pauses) the sender, similar to `PauseListenerContinuation`:
- `ExecuteAsync()` calls `agent.LatchAndDrainAsync()` on the sending agent
Expand All @@ -112,7 +112,7 @@ Create a continuation that latches (pauses) the sender, similar to `PauseListene
### Step 8: Add sending-specific actions to the fluent interface

Extend the sending failure fluent API to include:
- **`LatchSender()`** / **`AndLatchSender()`** — standalone and composable latch action
- **`PauseSending()`** / **`AndPauseSending()`** — standalone and composable latch action
- The existing actions (`Discard()`, `MoveToErrorQueue()`, `RetryNow()`, `ScheduleRetry()`, `CustomAction()`) work as-is through `IFailureActions`/`PolicyExpression` reuse

### Step 9: Integrate into `InlineSendingAgent`
Expand Down Expand Up @@ -156,7 +156,7 @@ When building sending agents, resolve the combined `SendingFailurePolicies` (glo
|------|---------|
| `src/Wolverine/ErrorHandling/SendingFailurePolicies.cs` | Policy collection + `IWithFailurePolicies` for sending |
| `src/Wolverine/Transports/Sending/SendingEnvelopeLifecycle.cs` | `IEnvelopeLifecycle` adapter for outgoing envelopes |
| `src/Wolverine/ErrorHandling/LatchSenderContinuation.cs` | Continuation to pause/latch a sender |
| `src/Wolverine/ErrorHandling/PauseSendingContinuation.cs` | Continuation to pause/latch a sender |
| `src/Wolverine/Transports/MessageTooLargeException.cs` | Exception for oversized messages |

## Files to Modify
Expand Down
3 changes: 2 additions & 1 deletion docs/.vitepress/config.mts
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,8 @@ const config: UserConfig<DefaultTheme.Config> = {
{text: 'Endpoint Specific Operations', link: '/guide/messaging/endpoint-operations'},
{text: 'Broadcast to a Specific Topic', link: '/guide/messaging/broadcast-to-topic'},
{text: 'Message Expiration', link: '/guide/messaging/expiration'},
{text: 'Endpoint Policies', link: '/guide/messaging/policies'}
{text: 'Endpoint Policies', link: '/guide/messaging/policies'},
{text: 'Sending Error Handling', link: '/guide/messaging/sending-error-handling'}
]
},
{
Expand Down
155 changes: 155 additions & 0 deletions docs/guide/messaging/sending-error-handling.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
# Sending Error Handling

Wolverine's existing [error handling](/guide/handlers/error-handling) policies apply to failures that happen while *processing* incoming messages in handlers. But what about failures that happen while *sending* outgoing messages to external transports?

When an outgoing message fails to send — maybe the message broker is temporarily unavailable, a message is too large for the transport, or a network error occurs — Wolverine's default behavior is to retry and eventually trip a circuit breaker on the sending endpoint. Starting in Wolverine 5.x, you can now configure **sending failure policies** to take fine-grained action on these outgoing send failures, using the same fluent API you already know from handler error handling.

## Why Use Sending Failure Policies?

Without sending failure policies, all send failures follow the same path: retry a few times, then trip the circuit breaker, which pauses all sending on that endpoint. This is often fine, but sometimes you need more control:

* **Oversized messages**: If a message is too large for the transport's batch size, retrying will never succeed. You want to discard or dead-letter it immediately.
* **Permanent failures**: Some exceptions indicate the message can never be delivered (e.g., invalid routing, serialization issues). Retrying wastes resources.
* **Custom notification**: You may want to publish a compensating event when a send fails.
* **Selective pausing**: You may want to pause sending only for certain exception types, then automatically resume after a cooldown period.

## Configuring Global Sending Failure Policies

Use `WolverineOptions.SendingFailure` to configure policies that apply to all outgoing endpoints:

```csharp
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
// Discard messages that are too large for any transport batch
opts.SendingFailure
.OnException<MessageTooLargeException>()
.Discard();

// Retry sending up to 3 times, then move to dead letter storage
opts.SendingFailure
.OnException<TimeoutException>()
.RetryTimes(3).Then.MoveToErrorQueue();

// Schedule retries with exponential backoff
opts.SendingFailure
.OnException<IOException>()
.ScheduleRetry(1.Seconds(), 5.Seconds(), 30.Seconds());
}).StartAsync();
```

::: tip
If no sending failure policy matches the exception, Wolverine falls through to the existing retry and circuit breaker behavior. Your existing applications are completely unaffected unless you explicitly configure sending failure policies.
:::

## Per-Endpoint Sending Failure Policies

You can also configure sending failure policies on a per-endpoint basis using the `ConfigureSending()` method on any subscriber configuration. Per-endpoint rules take priority over global rules:

```csharp
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
// Global default: retry 3 times then dead letter
opts.SendingFailure
.OnException<Exception>()
.RetryTimes(3).Then.MoveToErrorQueue();

// Override for a specific endpoint: just discard on any failure
opts.PublishAllMessages().ToRabbitQueue("low-priority")
.ConfigureSending(sending =>
{
sending.OnException<Exception>().Discard();
});
}).StartAsync();
```

## Available Actions

Sending failure policies support the same actions as handler error handling:

| Action | Description |
|----------------------|-------------------------------------------------------------------------------------------------|
| Retry | Immediately retry the send inline |
| Retry with Cooldown | Wait a short time, then retry inline |
| Schedule Retry | Schedule the message to be retried at a certain time |
| Discard | Log and discard the message without further send attempts |
| Move to Error Queue | Move the message to dead letter storage |
| Pause Sending | Pause the sending agent for a duration, then automatically resume |
| Custom Action | Execute arbitrary logic, including publishing compensating messages |

## Oversized Message Detection

Wolverine can detect messages that are too large to ever fit in a transport batch. When a message fails to be added to an *empty* batch (meaning even a single message exceeds the maximum batch size), Wolverine throws a `MessageTooLargeException`. You can handle this with a sending failure policy:

```csharp
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
// Immediately discard messages that are too large for the broker
opts.SendingFailure
.OnException<MessageTooLargeException>()
.Discard();
}).StartAsync();
```

This is currently supported for the Azure Service Bus transport, and will be extended to other transports over time.

## Pausing the Sender

Similar to pausing a listener, you can pause the sending agent when a certain failure condition is detected. Unlike a permanent latch, pausing automatically resumes sending after the specified duration:

```csharp
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
// On a catastrophic broker failure, pause sending for 30 seconds
opts.SendingFailure
.OnException<BrokerUnreachableException>()
.PauseSending(30.Seconds());

// Or combine with another action: dead letter the message, then pause
opts.SendingFailure
.OnException<BrokerUnreachableException>()
.MoveToErrorQueue().AndPauseSending(1.Minutes());
}).StartAsync();
```

When paused, the sending agent drains any in-flight messages and stops accepting new ones. After the specified duration elapses, Wolverine automatically attempts to resume the sender. If the resume attempt fails (e.g., the broker is still unreachable), Wolverine falls back to its built-in circuit watcher which will keep retrying periodically.

## Custom Actions

You can define custom logic to execute when a send failure occurs. This is useful for publishing compensating events or logging to external systems:

```csharp
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.SendingFailure
.OnException<Exception>()
.CustomAction(async (runtime, lifecycle, ex) =>
{
// Publish a notification about the send failure
await lifecycle.PublishAsync(new SendingFailed(
lifecycle.Envelope!.Id,
ex.Message
));
}, "Notify on send failure");
}).StartAsync();
```

## Send Attempts Tracking

Wolverine tracks sending attempts separately from handler processing attempts through the `Envelope.SendAttempts` property. This counter is incremented each time a sending failure policy is evaluated, and is used internally by the failure rule infrastructure to determine which action slot to execute (e.g., retry twice, then move to error queue on the third failure).

## How It Works

Sending failure policies are evaluated *before* the existing circuit breaker logic in the sending agent. The evaluation flow is:

1. An outgoing message fails to send, producing an exception
2. `Envelope.SendAttempts` is incremented
3. Sending failure policies are evaluated against the exception and envelope
4. If a matching policy is found, its continuation is executed (discard, dead letter, retry, custom action, etc.)
5. If no policy matches, the existing retry/circuit breaker behavior proceeds as before

This means sending failure policies are purely additive — they only change behavior when explicitly configured and when a rule matches.
184 changes: 184 additions & 0 deletions src/Testing/CoreTests/ErrorHandling/SendingFailurePoliciesTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
using JasperFx.Core;
using Shouldly;
using Wolverine.ComplianceTests;
using Wolverine.ErrorHandling;
using Xunit;

namespace CoreTests.ErrorHandling;

public class SendingFailurePoliciesTests
{
private readonly SendingFailurePolicies _policies = new();

[Fact]
public void has_no_rules_by_default()
{
_policies.HasAnyRules.ShouldBeFalse();
}

[Fact]
public void has_rules_after_configuration()
{
_policies.OnException<DivideByZeroException>().Discard();
_policies.HasAnyRules.ShouldBeTrue();
}

[Fact]
public void returns_null_when_no_rules_match()
{
_policies.OnException<DivideByZeroException>().Discard();

var envelope = ObjectMother.Envelope();
envelope.SendAttempts = 1;

var continuation = _policies.DetermineAction(new InvalidOperationException(), envelope);
continuation.ShouldBeNull();
}

[Fact]
public void returns_continuation_when_rule_matches()
{
_policies.OnException<DivideByZeroException>().Discard();

var envelope = ObjectMother.Envelope();
envelope.SendAttempts = 1;

var continuation = _policies.DetermineAction(new DivideByZeroException(), envelope);
continuation.ShouldNotBeNull();
continuation.ShouldBeOfType<DiscardEnvelope>();
}

[Fact]
public void uses_send_attempts_not_handler_attempts()
{
// Configure: retry once, then discard
_policies.OnException<DivideByZeroException>()
.RetryOnce().Then.Discard();

var envelope = ObjectMother.Envelope();

// First send attempt → should get retry
envelope.SendAttempts = 1;
envelope.Attempts = 42; // handler attempts should be irrelevant
var continuation = _policies.DetermineAction(new DivideByZeroException(), envelope);
continuation.ShouldBeOfType<RetryInlineContinuation>();

// Handler attempts should be restored after evaluation
envelope.Attempts.ShouldBe(42);

// Second send attempt → should get discard
envelope.SendAttempts = 2;
continuation = _policies.DetermineAction(new DivideByZeroException(), envelope);
continuation.ShouldBeOfType<DiscardEnvelope>();

// Handler attempts should still be restored
envelope.Attempts.ShouldBe(42);
}

[Fact]
public void combine_with_parent_policies()
{
// Local policy handles DivideByZeroException
_policies.OnException<DivideByZeroException>().Discard();

// Parent policy handles InvalidOperationException
var parent = new SendingFailurePolicies();
parent.OnException<InvalidOperationException>().MoveToErrorQueue();

var combined = _policies.CombineWith(parent);

var envelope = ObjectMother.Envelope();
envelope.SendAttempts = 1;

// Local rule should match
combined.DetermineAction(new DivideByZeroException(), envelope)
.ShouldBeOfType<DiscardEnvelope>();

// Parent rule should match
combined.DetermineAction(new InvalidOperationException(), envelope)
.ShouldNotBeNull();

// Neither should match
combined.DetermineAction(new BadImageFormatException(), envelope)
.ShouldBeNull();
}

[Fact]
public void local_policies_take_priority_over_parent()
{
// Local: discard on Exception
_policies.OnException<Exception>().Discard();

// Parent: move to error queue on Exception
var parent = new SendingFailurePolicies();
parent.OnException<Exception>().MoveToErrorQueue();

var combined = _policies.CombineWith(parent);

var envelope = ObjectMother.Envelope();
envelope.SendAttempts = 1;

// Local discard should win over parent's MoveToErrorQueue
combined.DetermineAction(new InvalidOperationException(), envelope)
.ShouldBeOfType<DiscardEnvelope>();
}

[Fact]
public void pause_sending_action_via_fluent_api()
{
_policies.OnException<DivideByZeroException>()
.PauseSending(30.Seconds());

var envelope = ObjectMother.Envelope();
envelope.SendAttempts = 1;

var continuation = _policies.DetermineAction(new DivideByZeroException(), envelope);
continuation.ShouldNotBeNull();
continuation.ShouldBeOfType<PauseSendingContinuation>()
.PauseTime.ShouldBe(30.Seconds());
}

[Fact]
public void and_pause_sending_as_additional_action()
{
_policies.OnException<DivideByZeroException>()
.MoveToErrorQueue().AndPauseSending(1.Minutes());

var envelope = ObjectMother.Envelope();
envelope.SendAttempts = 1;

var continuation = _policies.DetermineAction(new DivideByZeroException(), envelope);
continuation.ShouldNotBeNull();

// Should be a composite continuation containing both actions
var composite = continuation.ShouldBeOfType<CompositeContinuation>();
composite.Inner.OfType<PauseSendingContinuation>().ShouldHaveSingleItem()
.PauseTime.ShouldBe(1.Minutes());
}

[Fact]
public void schedule_retry_then_pause_sending()
{
_policies.OnException<IOException>()
.ScheduleRetry(5.Seconds(), 30.Seconds()).Then
.PauseSending(2.Minutes());

var envelope = ObjectMother.Envelope();

// First attempt → schedule retry at 5s
envelope.SendAttempts = 1;
var continuation = _policies.DetermineAction(new IOException(), envelope);
continuation.ShouldBeOfType<ScheduledRetryContinuation>();

// Second attempt → schedule retry at 30s
envelope.SendAttempts = 2;
continuation = _policies.DetermineAction(new IOException(), envelope);
continuation.ShouldBeOfType<ScheduledRetryContinuation>();

// Third attempt → pause sending
envelope.SendAttempts = 3;
continuation = _policies.DetermineAction(new IOException(), envelope);
continuation.ShouldBeOfType<PauseSendingContinuation>()
.PauseTime.ShouldBe(2.Minutes());
}
}
4 changes: 2 additions & 2 deletions src/Testing/CoreTests/Runtime/WolverineTracingTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void should_set_the_tenant_id()
public void should_set_the_otel_conversation_id_to_correlation_id()
{
theActivity.GetTagItem(WolverineTracing.MessagingConversationId)
.ShouldBe(theEnvelope.ConversationId);
.ShouldBe(theEnvelope.CorrelationId);
}

[Fact]
Expand Down Expand Up @@ -103,6 +103,6 @@ public void should_set_the_payload_size_bytes_when_it_exists()
public void trace_the_conversation_id()
{
theActivity.GetTagItem(WolverineTracing.MessagingConversationId)
.ShouldBe(theEnvelope.ConversationId);
.ShouldBe(theEnvelope.CorrelationId);
}
}
Loading
Loading