diff --git a/PLAN.md b/PLAN.md index 60fbdc7a1..a34c32013 100644 --- a/PLAN.md +++ b/PLAN.md @@ -101,9 +101,9 @@ For batch failures (via `ISenderCallback`), the exception needs to be passed thr Add `IWolverineRuntime` to the `SendingAgent` constructor (needed for `SendingEnvelopeLifecycle`). -### Step 7: Create `LatchSenderContinuation` +### Step 7: Create `PauseSendingContinuation` -**New file**: `src/Wolverine/ErrorHandling/LatchSenderContinuation.cs` +**New file**: `src/Wolverine/ErrorHandling/PauseSendingContinuation.cs` Create a continuation that latches (pauses) the sender, similar to `PauseListenerContinuation`: - `ExecuteAsync()` calls `agent.LatchAndDrainAsync()` on the sending agent @@ -112,7 +112,7 @@ Create a continuation that latches (pauses) the sender, similar to `PauseListene ### Step 8: Add sending-specific actions to the fluent interface Extend the sending failure fluent API to include: -- **`LatchSender()`** / **`AndLatchSender()`** — standalone and composable latch action +- **`PauseSending()`** / **`AndPauseSending()`** — standalone and composable latch action - The existing actions (`Discard()`, `MoveToErrorQueue()`, `RetryNow()`, `ScheduleRetry()`, `CustomAction()`) work as-is through `IFailureActions`/`PolicyExpression` reuse ### Step 9: Integrate into `InlineSendingAgent` @@ -156,7 +156,7 @@ When building sending agents, resolve the combined `SendingFailurePolicies` (glo |------|---------| | `src/Wolverine/ErrorHandling/SendingFailurePolicies.cs` | Policy collection + `IWithFailurePolicies` for sending | | `src/Wolverine/Transports/Sending/SendingEnvelopeLifecycle.cs` | `IEnvelopeLifecycle` adapter for outgoing envelopes | -| `src/Wolverine/ErrorHandling/LatchSenderContinuation.cs` | Continuation to pause/latch a sender | +| `src/Wolverine/ErrorHandling/PauseSendingContinuation.cs` | Continuation to pause/latch a sender | | `src/Wolverine/Transports/MessageTooLargeException.cs` | Exception for oversized messages | ## Files to Modify diff --git a/docs/.vitepress/config.mts b/docs/.vitepress/config.mts index bc45bee58..7c4d447b7 100644 --- a/docs/.vitepress/config.mts +++ b/docs/.vitepress/config.mts @@ -203,7 +203,8 @@ const config: UserConfig = { {text: 'Endpoint Specific Operations', link: '/guide/messaging/endpoint-operations'}, {text: 'Broadcast to a Specific Topic', link: '/guide/messaging/broadcast-to-topic'}, {text: 'Message Expiration', link: '/guide/messaging/expiration'}, - {text: 'Endpoint Policies', link: '/guide/messaging/policies'} + {text: 'Endpoint Policies', link: '/guide/messaging/policies'}, + {text: 'Sending Error Handling', link: '/guide/messaging/sending-error-handling'} ] }, { diff --git a/docs/guide/messaging/sending-error-handling.md b/docs/guide/messaging/sending-error-handling.md new file mode 100644 index 000000000..84c5a01f8 --- /dev/null +++ b/docs/guide/messaging/sending-error-handling.md @@ -0,0 +1,155 @@ +# Sending Error Handling + +Wolverine's existing [error handling](/guide/handlers/error-handling) policies apply to failures that happen while *processing* incoming messages in handlers. But what about failures that happen while *sending* outgoing messages to external transports? + +When an outgoing message fails to send — maybe the message broker is temporarily unavailable, a message is too large for the transport, or a network error occurs — Wolverine's default behavior is to retry and eventually trip a circuit breaker on the sending endpoint. Starting in Wolverine 5.x, you can now configure **sending failure policies** to take fine-grained action on these outgoing send failures, using the same fluent API you already know from handler error handling. + +## Why Use Sending Failure Policies? + +Without sending failure policies, all send failures follow the same path: retry a few times, then trip the circuit breaker, which pauses all sending on that endpoint. This is often fine, but sometimes you need more control: + +* **Oversized messages**: If a message is too large for the transport's batch size, retrying will never succeed. You want to discard or dead-letter it immediately. +* **Permanent failures**: Some exceptions indicate the message can never be delivered (e.g., invalid routing, serialization issues). Retrying wastes resources. +* **Custom notification**: You may want to publish a compensating event when a send fails. +* **Selective pausing**: You may want to pause sending only for certain exception types, then automatically resume after a cooldown period. + +## Configuring Global Sending Failure Policies + +Use `WolverineOptions.SendingFailure` to configure policies that apply to all outgoing endpoints: + +```csharp +using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + // Discard messages that are too large for any transport batch + opts.SendingFailure + .OnException() + .Discard(); + + // Retry sending up to 3 times, then move to dead letter storage + opts.SendingFailure + .OnException() + .RetryTimes(3).Then.MoveToErrorQueue(); + + // Schedule retries with exponential backoff + opts.SendingFailure + .OnException() + .ScheduleRetry(1.Seconds(), 5.Seconds(), 30.Seconds()); + }).StartAsync(); +``` + +::: tip +If no sending failure policy matches the exception, Wolverine falls through to the existing retry and circuit breaker behavior. Your existing applications are completely unaffected unless you explicitly configure sending failure policies. +::: + +## Per-Endpoint Sending Failure Policies + +You can also configure sending failure policies on a per-endpoint basis using the `ConfigureSending()` method on any subscriber configuration. Per-endpoint rules take priority over global rules: + +```csharp +using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + // Global default: retry 3 times then dead letter + opts.SendingFailure + .OnException() + .RetryTimes(3).Then.MoveToErrorQueue(); + + // Override for a specific endpoint: just discard on any failure + opts.PublishAllMessages().ToRabbitQueue("low-priority") + .ConfigureSending(sending => + { + sending.OnException().Discard(); + }); + }).StartAsync(); +``` + +## Available Actions + +Sending failure policies support the same actions as handler error handling: + +| Action | Description | +|----------------------|-------------------------------------------------------------------------------------------------| +| Retry | Immediately retry the send inline | +| Retry with Cooldown | Wait a short time, then retry inline | +| Schedule Retry | Schedule the message to be retried at a certain time | +| Discard | Log and discard the message without further send attempts | +| Move to Error Queue | Move the message to dead letter storage | +| Pause Sending | Pause the sending agent for a duration, then automatically resume | +| Custom Action | Execute arbitrary logic, including publishing compensating messages | + +## Oversized Message Detection + +Wolverine can detect messages that are too large to ever fit in a transport batch. When a message fails to be added to an *empty* batch (meaning even a single message exceeds the maximum batch size), Wolverine throws a `MessageTooLargeException`. You can handle this with a sending failure policy: + +```csharp +using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + // Immediately discard messages that are too large for the broker + opts.SendingFailure + .OnException() + .Discard(); + }).StartAsync(); +``` + +This is currently supported for the Azure Service Bus transport, and will be extended to other transports over time. + +## Pausing the Sender + +Similar to pausing a listener, you can pause the sending agent when a certain failure condition is detected. Unlike a permanent latch, pausing automatically resumes sending after the specified duration: + +```csharp +using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + // On a catastrophic broker failure, pause sending for 30 seconds + opts.SendingFailure + .OnException() + .PauseSending(30.Seconds()); + + // Or combine with another action: dead letter the message, then pause + opts.SendingFailure + .OnException() + .MoveToErrorQueue().AndPauseSending(1.Minutes()); + }).StartAsync(); +``` + +When paused, the sending agent drains any in-flight messages and stops accepting new ones. After the specified duration elapses, Wolverine automatically attempts to resume the sender. If the resume attempt fails (e.g., the broker is still unreachable), Wolverine falls back to its built-in circuit watcher which will keep retrying periodically. + +## Custom Actions + +You can define custom logic to execute when a send failure occurs. This is useful for publishing compensating events or logging to external systems: + +```csharp +using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.SendingFailure + .OnException() + .CustomAction(async (runtime, lifecycle, ex) => + { + // Publish a notification about the send failure + await lifecycle.PublishAsync(new SendingFailed( + lifecycle.Envelope!.Id, + ex.Message + )); + }, "Notify on send failure"); + }).StartAsync(); +``` + +## Send Attempts Tracking + +Wolverine tracks sending attempts separately from handler processing attempts through the `Envelope.SendAttempts` property. This counter is incremented each time a sending failure policy is evaluated, and is used internally by the failure rule infrastructure to determine which action slot to execute (e.g., retry twice, then move to error queue on the third failure). + +## How It Works + +Sending failure policies are evaluated *before* the existing circuit breaker logic in the sending agent. The evaluation flow is: + +1. An outgoing message fails to send, producing an exception +2. `Envelope.SendAttempts` is incremented +3. Sending failure policies are evaluated against the exception and envelope +4. If a matching policy is found, its continuation is executed (discard, dead letter, retry, custom action, etc.) +5. If no policy matches, the existing retry/circuit breaker behavior proceeds as before + +This means sending failure policies are purely additive — they only change behavior when explicitly configured and when a rule matches. diff --git a/src/Testing/CoreTests/ErrorHandling/SendingFailurePoliciesTests.cs b/src/Testing/CoreTests/ErrorHandling/SendingFailurePoliciesTests.cs new file mode 100644 index 000000000..d6ca2b42c --- /dev/null +++ b/src/Testing/CoreTests/ErrorHandling/SendingFailurePoliciesTests.cs @@ -0,0 +1,184 @@ +using JasperFx.Core; +using Shouldly; +using Wolverine.ComplianceTests; +using Wolverine.ErrorHandling; +using Xunit; + +namespace CoreTests.ErrorHandling; + +public class SendingFailurePoliciesTests +{ + private readonly SendingFailurePolicies _policies = new(); + + [Fact] + public void has_no_rules_by_default() + { + _policies.HasAnyRules.ShouldBeFalse(); + } + + [Fact] + public void has_rules_after_configuration() + { + _policies.OnException().Discard(); + _policies.HasAnyRules.ShouldBeTrue(); + } + + [Fact] + public void returns_null_when_no_rules_match() + { + _policies.OnException().Discard(); + + var envelope = ObjectMother.Envelope(); + envelope.SendAttempts = 1; + + var continuation = _policies.DetermineAction(new InvalidOperationException(), envelope); + continuation.ShouldBeNull(); + } + + [Fact] + public void returns_continuation_when_rule_matches() + { + _policies.OnException().Discard(); + + var envelope = ObjectMother.Envelope(); + envelope.SendAttempts = 1; + + var continuation = _policies.DetermineAction(new DivideByZeroException(), envelope); + continuation.ShouldNotBeNull(); + continuation.ShouldBeOfType(); + } + + [Fact] + public void uses_send_attempts_not_handler_attempts() + { + // Configure: retry once, then discard + _policies.OnException() + .RetryOnce().Then.Discard(); + + var envelope = ObjectMother.Envelope(); + + // First send attempt → should get retry + envelope.SendAttempts = 1; + envelope.Attempts = 42; // handler attempts should be irrelevant + var continuation = _policies.DetermineAction(new DivideByZeroException(), envelope); + continuation.ShouldBeOfType(); + + // Handler attempts should be restored after evaluation + envelope.Attempts.ShouldBe(42); + + // Second send attempt → should get discard + envelope.SendAttempts = 2; + continuation = _policies.DetermineAction(new DivideByZeroException(), envelope); + continuation.ShouldBeOfType(); + + // Handler attempts should still be restored + envelope.Attempts.ShouldBe(42); + } + + [Fact] + public void combine_with_parent_policies() + { + // Local policy handles DivideByZeroException + _policies.OnException().Discard(); + + // Parent policy handles InvalidOperationException + var parent = new SendingFailurePolicies(); + parent.OnException().MoveToErrorQueue(); + + var combined = _policies.CombineWith(parent); + + var envelope = ObjectMother.Envelope(); + envelope.SendAttempts = 1; + + // Local rule should match + combined.DetermineAction(new DivideByZeroException(), envelope) + .ShouldBeOfType(); + + // Parent rule should match + combined.DetermineAction(new InvalidOperationException(), envelope) + .ShouldNotBeNull(); + + // Neither should match + combined.DetermineAction(new BadImageFormatException(), envelope) + .ShouldBeNull(); + } + + [Fact] + public void local_policies_take_priority_over_parent() + { + // Local: discard on Exception + _policies.OnException().Discard(); + + // Parent: move to error queue on Exception + var parent = new SendingFailurePolicies(); + parent.OnException().MoveToErrorQueue(); + + var combined = _policies.CombineWith(parent); + + var envelope = ObjectMother.Envelope(); + envelope.SendAttempts = 1; + + // Local discard should win over parent's MoveToErrorQueue + combined.DetermineAction(new InvalidOperationException(), envelope) + .ShouldBeOfType(); + } + + [Fact] + public void pause_sending_action_via_fluent_api() + { + _policies.OnException() + .PauseSending(30.Seconds()); + + var envelope = ObjectMother.Envelope(); + envelope.SendAttempts = 1; + + var continuation = _policies.DetermineAction(new DivideByZeroException(), envelope); + continuation.ShouldNotBeNull(); + continuation.ShouldBeOfType() + .PauseTime.ShouldBe(30.Seconds()); + } + + [Fact] + public void and_pause_sending_as_additional_action() + { + _policies.OnException() + .MoveToErrorQueue().AndPauseSending(1.Minutes()); + + var envelope = ObjectMother.Envelope(); + envelope.SendAttempts = 1; + + var continuation = _policies.DetermineAction(new DivideByZeroException(), envelope); + continuation.ShouldNotBeNull(); + + // Should be a composite continuation containing both actions + var composite = continuation.ShouldBeOfType(); + composite.Inner.OfType().ShouldHaveSingleItem() + .PauseTime.ShouldBe(1.Minutes()); + } + + [Fact] + public void schedule_retry_then_pause_sending() + { + _policies.OnException() + .ScheduleRetry(5.Seconds(), 30.Seconds()).Then + .PauseSending(2.Minutes()); + + var envelope = ObjectMother.Envelope(); + + // First attempt → schedule retry at 5s + envelope.SendAttempts = 1; + var continuation = _policies.DetermineAction(new IOException(), envelope); + continuation.ShouldBeOfType(); + + // Second attempt → schedule retry at 30s + envelope.SendAttempts = 2; + continuation = _policies.DetermineAction(new IOException(), envelope); + continuation.ShouldBeOfType(); + + // Third attempt → pause sending + envelope.SendAttempts = 3; + continuation = _policies.DetermineAction(new IOException(), envelope); + continuation.ShouldBeOfType() + .PauseTime.ShouldBe(2.Minutes()); + } +} diff --git a/src/Testing/CoreTests/Runtime/WolverineTracingTests.cs b/src/Testing/CoreTests/Runtime/WolverineTracingTests.cs index c13fdaabb..a18be451e 100644 --- a/src/Testing/CoreTests/Runtime/WolverineTracingTests.cs +++ b/src/Testing/CoreTests/Runtime/WolverineTracingTests.cs @@ -61,7 +61,7 @@ public void should_set_the_tenant_id() public void should_set_the_otel_conversation_id_to_correlation_id() { theActivity.GetTagItem(WolverineTracing.MessagingConversationId) - .ShouldBe(theEnvelope.ConversationId); + .ShouldBe(theEnvelope.CorrelationId); } [Fact] @@ -103,6 +103,6 @@ public void should_set_the_payload_size_bytes_when_it_exists() public void trace_the_conversation_id() { theActivity.GetTagItem(WolverineTracing.MessagingConversationId) - .ShouldBe(theEnvelope.ConversationId); + .ShouldBe(theEnvelope.CorrelationId); } } \ No newline at end of file diff --git a/src/Transports/Azure/Wolverine.AzureServiceBus/Internal/AzureServiceBusSenderProtocol.cs b/src/Transports/Azure/Wolverine.AzureServiceBus/Internal/AzureServiceBusSenderProtocol.cs index c84bd9ac3..68fc7223f 100644 --- a/src/Transports/Azure/Wolverine.AzureServiceBus/Internal/AzureServiceBusSenderProtocol.cs +++ b/src/Transports/Azure/Wolverine.AzureServiceBus/Internal/AzureServiceBusSenderProtocol.cs @@ -65,6 +65,13 @@ private async Task sendBatches(ISenderCallback callback, List<(Envelope Envelope { if (!serviceBusMessageBatch.TryAddMessage(message)) { + // If the batch is empty and the message still doesn't fit, it's too large for any batch + if (serviceBusMessageBatch.Count == 0) + { + serviceBusMessageBatch.Dispose(); + throw new MessageTooLargeException(envelope, serviceBusMessageBatch.MaxSizeInBytes); + } + _logger.LogInformation("Wolverine had to break up outgoing message batches at {Uri}, you may want to reduce the MaximumMessagesToReceive configuration. No messages were lost, this is strictly informative", _endpoint.Uri); // Send the currently full batch @@ -122,10 +129,17 @@ private async Task sendPartitionedBatches(ISenderCallback callback, List<(Envelo _logger.LogDebug("Processing batch with session id '{SessionId}'", group.Key); - foreach (var (_, message) in group) + foreach (var (envelope, message) in group) { if (!serviceBusMessageBatch.TryAddMessage(message)) { + // If the batch is empty and the message still doesn't fit, it's too large for any batch + if (serviceBusMessageBatch.Count == 0) + { + serviceBusMessageBatch.Dispose(); + throw new MessageTooLargeException(envelope, serviceBusMessageBatch.MaxSizeInBytes); + } + _logger.LogInformation("Wolverine had to break up outgoing message batches at {Uri}, you may want to reduce the MaximumMessagesToReceive configuration. No messages were lost, this is strictly informative", _endpoint.Uri); // Send the currently full batch diff --git a/src/Wolverine/Configuration/Endpoint.cs b/src/Wolverine/Configuration/Endpoint.cs index 840d20fd4..62b5e2a3d 100644 --- a/src/Wolverine/Configuration/Endpoint.cs +++ b/src/Wolverine/Configuration/Endpoint.cs @@ -376,6 +376,12 @@ internal IEnumerable RulesForIncoming() /// public int MaximumEnvelopeRetryStorage { get; set; } = 100; + /// + /// Per-endpoint failure handling policies for outgoing message send failures. + /// When set, these rules take priority over the global SendingFailure policies. + /// + public SendingFailurePolicies? SendingFailure { get; set; } + public virtual IDictionary DescribeProperties() { var dict = new Dictionary diff --git a/src/Wolverine/Configuration/EndpointCollection.cs b/src/Wolverine/Configuration/EndpointCollection.cs index 186654014..f7724a742 100644 --- a/src/Wolverine/Configuration/EndpointCollection.cs +++ b/src/Wolverine/Configuration/EndpointCollection.cs @@ -1,6 +1,7 @@ using ImTools; using JasperFx.Core; using Microsoft.Extensions.Logging; +using Wolverine.ErrorHandling; using Wolverine.Persistence.Durability; using Wolverine.Runtime; using Wolverine.Runtime.Routing; @@ -312,31 +313,57 @@ private ISendingAgent buildSendingAgent(ISender sender, Endpoint endpoint) return a; } + // Resolve combined sending failure policies (endpoint-specific takes priority over global) + var sendingPolicies = resolveSendingFailurePolicies(endpoint); + switch (endpoint.Mode) { case EndpointMode.Durable: var outbox = _runtime.Stores.HasAnyAncillaryStores() ? new DelegatingMessageOutbox(_runtime.Storage.Outbox, _runtime.Stores) : _runtime.Storage.Outbox; - + return new DurableSendingAgent(sender, _options.Durability, _runtime.LoggerFactory.CreateLogger(), _runtime.MessageTracking, - outbox, endpoint); + outbox, endpoint, _runtime, sendingPolicies); case EndpointMode.BufferedInMemory: return new BufferedSendingAgent(_runtime.LoggerFactory.CreateLogger(), _runtime.MessageTracking, sender, _runtime.DurabilitySettings, - endpoint); + endpoint, _runtime, sendingPolicies); case EndpointMode.Inline: return new InlineSendingAgent(_runtime.LoggerFactory.CreateLogger(), sender, endpoint, _runtime.MessageTracking, - _runtime.DurabilitySettings); + _runtime.DurabilitySettings, _runtime, sendingPolicies); } throw new InvalidOperationException(); } + private SendingFailurePolicies? resolveSendingFailurePolicies(Endpoint endpoint) + { + var globalPolicies = _options.SendingFailure; + var endpointPolicies = endpoint.SendingFailure; + + if (endpointPolicies != null && globalPolicies.HasAnyRules) + { + return endpointPolicies.CombineWith(globalPolicies); + } + + if (endpointPolicies != null) + { + return endpointPolicies; + } + + if (globalPolicies.HasAnyRules) + { + return globalPolicies; + } + + return null; + } + private ISendingAgent buildSendingAgent(Uri uri, Action? configureNewEndpoint) { var transport = _options.Transports.ForScheme(uri.Scheme); diff --git a/src/Wolverine/Configuration/ISubscriberConfiguration.cs b/src/Wolverine/Configuration/ISubscriberConfiguration.cs index 199fa7d6c..a72a9e887 100644 --- a/src/Wolverine/Configuration/ISubscriberConfiguration.cs +++ b/src/Wolverine/Configuration/ISubscriberConfiguration.cs @@ -1,3 +1,5 @@ +using Wolverine.ErrorHandling; + namespace Wolverine.Configuration; public interface ISubscriberConfiguration : IEndpointConfiguration where T : ISubscriberConfiguration @@ -71,6 +73,14 @@ public interface ISubscriberConfiguration : IEndpointConfiguration where T /// public T GlobalSender(); + /// + /// Configure failure handling policies for outgoing message send failures + /// on this specific endpoint. These rules take priority over global policies. + /// + /// Action to configure the sending failure policies + /// + T ConfigureSending(Action configure); + public Endpoint Endpoint { get; } } diff --git a/src/Wolverine/Configuration/SubscriberConfiguration.cs b/src/Wolverine/Configuration/SubscriberConfiguration.cs index 9b0f3fda3..288ea82f6 100644 --- a/src/Wolverine/Configuration/SubscriberConfiguration.cs +++ b/src/Wolverine/Configuration/SubscriberConfiguration.cs @@ -1,6 +1,7 @@ using System.Text.Json; using JasperFx.Core.Reflection; using Newtonsoft.Json; +using Wolverine.ErrorHandling; using Wolverine.Runtime; using Wolverine.Runtime.Interop; using Wolverine.Runtime.Serialization; @@ -217,6 +218,16 @@ public T AddOutgoingRule(IEnvelopeRule rule) add(e => e.OutgoingRules.Add(rule)); return this.As(); } + + public T ConfigureSending(Action configure) + { + add(e => + { + e.SendingFailure ??= new SendingFailurePolicies(); + configure(e.SendingFailure); + }); + return this.As(); + } } internal class SubscriberConfiguration : SubscriberConfiguration, diff --git a/src/Wolverine/Envelope.cs b/src/Wolverine/Envelope.cs index 6fc83718d..9ea263e2c 100644 --- a/src/Wolverine/Envelope.cs +++ b/src/Wolverine/Envelope.cs @@ -223,6 +223,13 @@ public object? Message /// public int Attempts { get; set; } + /// + /// Number of times that Wolverine has tried to send this message. + /// This is tracked separately from handler Attempts and is used + /// by sending failure policies. + /// + public int SendAttempts { get; set; } + public DateTimeOffset SentAt { get; set; } = DateTimeOffset.UtcNow; /// diff --git a/src/Wolverine/ErrorHandling/PauseSendingContinuation.cs b/src/Wolverine/ErrorHandling/PauseSendingContinuation.cs new file mode 100644 index 000000000..b71d6aa11 --- /dev/null +++ b/src/Wolverine/ErrorHandling/PauseSendingContinuation.cs @@ -0,0 +1,51 @@ +using System.Diagnostics; +using Microsoft.Extensions.Logging; +using Wolverine.Runtime; +using Wolverine.Transports.Sending; + +namespace Wolverine.ErrorHandling; + +/// +/// Continuation that pauses the sending agent for a specified duration, then automatically +/// resumes sending. Similar to PauseListenerContinuation but for outgoing message senders. +/// +internal class PauseSendingContinuation : IContinuation, IContinuationSource +{ + public PauseSendingContinuation(TimeSpan pauseTime) + { + PauseTime = pauseTime; + } + + public TimeSpan PauseTime { get; } + + public async ValueTask ExecuteAsync(IEnvelopeLifecycle lifecycle, IWolverineRuntime runtime, DateTimeOffset now, + Activity? activity) + { + var envelope = lifecycle.Envelope; + if (envelope?.Destination == null) + { + return; + } + + var agent = runtime.Endpoints.GetOrBuildSendingAgent(envelope.Destination); + if (agent is SendingAgent sendingAgent) + { + activity?.AddEvent(new ActivityEvent(WolverineTracing.SendingPaused)); + + await sendingAgent.PauseAsync(PauseTime); + } + else + { + runtime.Logger.LogInformation( + "Unable to pause sending agent for {Destination}.", + envelope.Destination); + } + } + + public string Description => "Pause sending for " + PauseTime; + + public IContinuation Build(Exception ex, Envelope envelope) + { + return this; + } +} diff --git a/src/Wolverine/ErrorHandling/PolicyExpression.cs b/src/Wolverine/ErrorHandling/PolicyExpression.cs index bca3dcf4d..b698c5ab3 100644 --- a/src/Wolverine/ErrorHandling/PolicyExpression.cs +++ b/src/Wolverine/ErrorHandling/PolicyExpression.cs @@ -82,6 +82,13 @@ public interface IAdditionalActions /// IAdditionalActions AndPauseProcessing(TimeSpan pauseTime); + /// + /// Pause the sending agent for the specified duration, then automatically resume. + /// Only applicable when used with sending failure policies. + /// + /// How long to pause sending before automatically resuming + IAdditionalActions AndPauseSending(TimeSpan pauseTime); + /// /// Perform a user defined action as well as the initial action @@ -150,6 +157,15 @@ public IAdditionalActions AndPauseProcessing(TimeSpan pauseTime) return this; } + /// + /// Pause the sending agent for the specified duration, then automatically resume. + /// Only applicable when used with sending failure policies. + /// + public IAdditionalActions AndPauseSending(TimeSpan pauseTime) + { + return And(new PauseSendingContinuation(pauseTime)); + } + /// /// Take out an additional, user-defined action upon message failures /// @@ -259,6 +275,13 @@ public IAdditionalActions Discard() return this; } + public IAdditionalActions PauseSending(TimeSpan pauseTime) + { + var slot = _rule.AddSlot(new PauseSendingContinuation(pauseTime)); + _slots.Add(slot); + return this; + } + public IAdditionalActions ScheduleRetry(params TimeSpan[] delays) { if (delays.Length == 0) @@ -496,6 +519,13 @@ IAdditionalActions CustomAction(Func IAdditionalActions CustomActionIndefinitely(Func action, string description, InvokeResult? invokeUsage = null); + + /// + /// Pause the sending agent for the specified duration, then automatically resume. + /// Only applicable when used with sending failure policies. + /// + /// How long to pause sending before automatically resuming + IAdditionalActions PauseSending(TimeSpan pauseTime); } public class PolicyExpression : IFailureActions @@ -552,6 +582,15 @@ public IAdditionalActions Discard() return new FailureActions(_match, _parent).Discard(); } + /// + /// Pause the sending agent for the specified duration, then automatically resume. + /// Only applicable when used with sending failure policies. + /// + public IAdditionalActions PauseSending(TimeSpan pauseTime) + { + return new FailureActions(_match, _parent).PauseSending(pauseTime); + } + /// /// Schedule the message for additional attempts with a delay. Use this /// method to effect an "exponential backoff" policy diff --git a/src/Wolverine/ErrorHandling/SendingFailurePolicies.cs b/src/Wolverine/ErrorHandling/SendingFailurePolicies.cs new file mode 100644 index 000000000..c8ccbcc35 --- /dev/null +++ b/src/Wolverine/ErrorHandling/SendingFailurePolicies.cs @@ -0,0 +1,71 @@ +using Wolverine.Runtime; + +namespace Wolverine.ErrorHandling; + +/// +/// Collection of failure handling policies for outgoing message send failures. +/// Unlike handler failure policies, unmatched exceptions return null +/// so the existing retry/circuit-breaker behavior is preserved. +/// +public class SendingFailurePolicies : IWithFailurePolicies +{ + /// + /// Collection of error handling policies for exception handling during the sending of a message + /// + public FailureRuleCollection Failures { get; } = new(); + + /// + /// Determine the continuation action for a sending failure. + /// Returns null if no rule matches, allowing the existing retry/circuit-breaker + /// behavior to proceed unchanged. + /// + public IContinuation? DetermineAction(Exception exception, Envelope envelope) + { + // FailureRule.TryCreateContinuation uses Attempts for slot matching. + // For sending failures, we use SendAttempts instead. + var savedAttempts = envelope.Attempts; + envelope.Attempts = envelope.SendAttempts; + + try + { + foreach (var rule in Failures) + { + if (rule.TryCreateContinuation(exception, envelope, out var continuation)) + { + return continuation; + } + } + + return null; + } + finally + { + envelope.Attempts = savedAttempts; + } + } + + /// + /// Combine this policy set with a parent (global) policy set. + /// Local rules take priority over parent rules. + /// + public SendingFailurePolicies CombineWith(SendingFailurePolicies parent) + { + var combined = new SendingFailurePolicies(); + + // Local rules first (higher priority) + foreach (var rule in Failures) + { + combined.Failures.Add(rule); + } + + // Then parent rules + foreach (var rule in parent.Failures) + { + combined.Failures.Add(rule); + } + + return combined; + } + + internal bool HasAnyRules => Failures.Any(); +} diff --git a/src/Wolverine/Persistence/Durability/DurableSendingAgent.cs b/src/Wolverine/Persistence/Durability/DurableSendingAgent.cs index 9590e4c40..ff295a964 100644 --- a/src/Wolverine/Persistence/Durability/DurableSendingAgent.cs +++ b/src/Wolverine/Persistence/Durability/DurableSendingAgent.cs @@ -1,6 +1,7 @@ using JasperFx.Blocks; using Microsoft.Extensions.Logging; using Wolverine.Configuration; +using Wolverine.ErrorHandling; using Wolverine.Logging; using Wolverine.Runtime; using Wolverine.Transports; @@ -22,7 +23,14 @@ internal class DurableSendingAgent : SendingAgent public DurableSendingAgent(ISender sender, DurabilitySettings settings, ILogger logger, IMessageTracker messageLogger, - IMessageOutbox outbox, Endpoint endpoint) : base(logger, messageLogger, sender, settings, endpoint) + IMessageOutbox outbox, Endpoint endpoint) : this(sender, settings, logger, messageLogger, outbox, endpoint, null, null) + { + } + + public DurableSendingAgent(ISender sender, DurabilitySettings settings, ILogger logger, + IMessageTracker messageLogger, + IMessageOutbox outbox, Endpoint endpoint, IWolverineRuntime? runtime, + SendingFailurePolicies? sendingFailurePolicies) : base(logger, messageLogger, sender, settings, endpoint, runtime, sendingFailurePolicies) { _logger = logger; @@ -47,6 +55,8 @@ public DurableSendingAgent(ISender sender, DurabilitySettings settings, ILogger public override bool IsDurable => true; + protected override IMessageOutbox? resolveOutbox() => _outbox; + protected override async Task drainOtherAsync() { await _deleteOutgoingMany.DrainAsync(); diff --git a/src/Wolverine/Runtime/Batching/IMessageBatcher.cs b/src/Wolverine/Runtime/Batching/IMessageBatcher.cs index 056c86d3e..108ff3730 100644 --- a/src/Wolverine/Runtime/Batching/IMessageBatcher.cs +++ b/src/Wolverine/Runtime/Batching/IMessageBatcher.cs @@ -29,14 +29,16 @@ internal class DefaultMessageBatcher : IMessageBatcher { public IEnumerable Group(IReadOnlyList envelopes) { - // Group by tenant id - var groups = new Dictionary>(); + // Group by tenant id. Use empty string as sentinel for null TenantId + // since Dictionary does not allow null keys + var groups = new Dictionary>(); foreach (var envelope in envelopes) { - if (!groups.TryGetValue(envelope.TenantId, out var list)) + var key = envelope.TenantId ?? string.Empty; + if (!groups.TryGetValue(key, out var list)) { list = new List(); - groups[envelope.TenantId] = list; + groups[key] = list; } list.Add(envelope); @@ -55,7 +57,7 @@ public IEnumerable Group(IReadOnlyList envelopes) yield return new Envelope(messages.ToArray(), group.Value) { - TenantId = group.Key + TenantId = group.Key.Length == 0 ? null : group.Key }; } } diff --git a/src/Wolverine/Transports/MessageTooLargeException.cs b/src/Wolverine/Transports/MessageTooLargeException.cs new file mode 100644 index 000000000..a8258f9d7 --- /dev/null +++ b/src/Wolverine/Transports/MessageTooLargeException.cs @@ -0,0 +1,20 @@ +namespace Wolverine.Transports; + +/// +/// Exception thrown when a message is too large to fit in any transport batch. +/// Use this in sending failure policies to discard or dead-letter oversized messages. +/// +public class MessageTooLargeException : Exception +{ + public MessageTooLargeException(Envelope envelope, long maxSizeInBytes) + : base($"Message {envelope.Id} of type '{envelope.MessageType}' is too large to fit in a batch (max size: {maxSizeInBytes} bytes)") + { + EnvelopeId = envelope.Id; + MessageType = envelope.MessageType; + MaxSizeInBytes = maxSizeInBytes; + } + + public Guid EnvelopeId { get; } + public string? MessageType { get; } + public long MaxSizeInBytes { get; } +} diff --git a/src/Wolverine/Transports/Sending/BufferedSendingAgent.cs b/src/Wolverine/Transports/Sending/BufferedSendingAgent.cs index a87a885fa..576e671e2 100644 --- a/src/Wolverine/Transports/Sending/BufferedSendingAgent.cs +++ b/src/Wolverine/Transports/Sending/BufferedSendingAgent.cs @@ -1,5 +1,6 @@ using Microsoft.Extensions.Logging; using Wolverine.Configuration; +using Wolverine.ErrorHandling; using Wolverine.Logging; using Wolverine.Runtime; @@ -14,6 +15,12 @@ public BufferedSendingAgent(ILogger logger, IMessageTracker messageLogger, ISend { } + public BufferedSendingAgent(ILogger logger, IMessageTracker messageLogger, ISender sender, + DurabilitySettings settings, Endpoint endpoint, IWolverineRuntime? runtime, + SendingFailurePolicies? sendingFailurePolicies) : base(logger, messageLogger, sender, settings, endpoint, runtime, sendingFailurePolicies) + { + } + public override bool IsDurable => false; public override Task EnqueueForRetryAsync(OutgoingMessageBatch batch) diff --git a/src/Wolverine/Transports/Sending/InlineSendingAgent.cs b/src/Wolverine/Transports/Sending/InlineSendingAgent.cs index 3685c42e2..04b2b88fe 100644 --- a/src/Wolverine/Transports/Sending/InlineSendingAgent.cs +++ b/src/Wolverine/Transports/Sending/InlineSendingAgent.cs @@ -1,6 +1,7 @@ using JasperFx.Blocks; using Microsoft.Extensions.Logging; using Wolverine.Configuration; +using Wolverine.ErrorHandling; using Wolverine.Logging; using Wolverine.Runtime; @@ -8,17 +9,28 @@ namespace Wolverine.Transports.Sending; public class InlineSendingAgent : ISendingAgent, IDisposable { + private readonly ILogger _logger; private readonly IMessageTracker _messageLogger; private readonly IRetryBlock _sending; private readonly DurabilitySettings _settings; + private readonly IWolverineRuntime? _runtime; + private readonly SendingFailurePolicies? _sendingFailurePolicies; public InlineSendingAgent(ILogger logger, ISender sender, Endpoint endpoint, IMessageTracker messageLogger, - DurabilitySettings settings) + DurabilitySettings settings) : this(logger, sender, endpoint, messageLogger, settings, null, null) { + } + + public InlineSendingAgent(ILogger logger, ISender sender, Endpoint endpoint, IMessageTracker messageLogger, + DurabilitySettings settings, IWolverineRuntime? runtime, SendingFailurePolicies? sendingFailurePolicies) + { + _logger = logger; Sender = sender; _messageLogger = messageLogger; _settings = settings; Endpoint = endpoint; + _runtime = runtime; + _sendingFailurePolicies = sendingFailurePolicies; if (settings.UseSyncRetryBlock) { @@ -75,6 +87,13 @@ private async Task sendWithTracing(Envelope e, CancellationToken cancellationTok { // ignore it } + catch (Exception ex) + { + if (!await tryHandleSendingFailureAsync(e, ex)) + { + throw; + } + } finally { activity?.Stop(); @@ -83,8 +102,46 @@ private async Task sendWithTracing(Envelope e, CancellationToken cancellationTok private async Task sendWithOutTracing(Envelope e, CancellationToken cancellationToken) { - await Sender.SendAsync(e); - _messageLogger.Sent(e); + try + { + await Sender.SendAsync(e); + _messageLogger.Sent(e); + } + catch (Exception ex) + { + if (!await tryHandleSendingFailureAsync(e, ex)) + { + throw; + } + } + } + + private async Task tryHandleSendingFailureAsync(Envelope envelope, Exception exception) + { + if (_sendingFailurePolicies == null || !_sendingFailurePolicies.HasAnyRules || _runtime == null) + { + return false; + } + + envelope.SendAttempts++; + + var continuation = _sendingFailurePolicies.DetermineAction(exception, envelope); + if (continuation == null) + { + return false; + } + + try + { + var lifecycle = new SendingEnvelopeLifecycle(envelope, _runtime, this, null); + await continuation.ExecuteAsync(lifecycle, _runtime, DateTimeOffset.UtcNow, null); + return true; + } + catch (Exception ex) + { + _logger.LogError(ex, "Error executing sending failure policy for envelope {EnvelopeId}", envelope.Id); + return false; + } } private void setDefaults(Envelope envelope) @@ -93,4 +150,4 @@ private void setDefaults(Envelope envelope) envelope.OwnerId = _settings.AssignedNodeNumber; envelope.ReplyUri ??= ReplyUri; } -} \ No newline at end of file +} diff --git a/src/Wolverine/Transports/Sending/SendingAgent.cs b/src/Wolverine/Transports/Sending/SendingAgent.cs index 51fb04727..2701ebf2d 100644 --- a/src/Wolverine/Transports/Sending/SendingAgent.cs +++ b/src/Wolverine/Transports/Sending/SendingAgent.cs @@ -3,7 +3,9 @@ using JasperFx.Core; using Microsoft.Extensions.Logging; using Wolverine.Configuration; +using Wolverine.ErrorHandling; using Wolverine.Logging; +using Wolverine.Persistence.Durability; using Wolverine.Runtime; namespace Wolverine.Transports.Sending; @@ -20,15 +22,24 @@ public abstract class SendingAgent : ISendingAgent, ISenderCallback, ISenderCirc private CircuitWatcher? _circuitWatcher; private int _failureCount; + private readonly IWolverineRuntime? _runtime; + private readonly SendingFailurePolicies? _sendingFailurePolicies; public SendingAgent(ILogger logger, IMessageTracker messageLogger, ISender sender, DurabilitySettings settings, - Endpoint endpoint) + Endpoint endpoint) : this(logger, messageLogger, sender, settings, endpoint, null, null) + { + } + + public SendingAgent(ILogger logger, IMessageTracker messageLogger, ISender sender, DurabilitySettings settings, + Endpoint endpoint, IWolverineRuntime? runtime, SendingFailurePolicies? sendingFailurePolicies) { _logger = logger; _messageLogger = messageLogger; _sender = sender; _settings = settings; Endpoint = endpoint; + _runtime = runtime; + _sendingFailurePolicies = sendingFailurePolicies; Func senderDelegate = _sender is ISenderRequiresCallback ? sendWithCallbackHandlingAsync @@ -91,7 +102,7 @@ public Task MarkProcessingFailureAsync(OutgoingMessageBatch outgoing, Exception? _logger.LogError(exception, "Failure trying to send a message batch to {Destination}", outgoing.Destination); _logger.OutgoingBatchFailed(outgoing, exception); - return markFailedAsync(outgoing); + return markFailedAsync(outgoing, exception); } Task ISenderCallback.MarkSenderIsLatchedAsync(OutgoingMessageBatch outgoing) @@ -194,6 +205,34 @@ public async Task LatchAndDrainAsync() _logger.CircuitBroken(Destination); } + /// + /// Pause sending for the specified duration, then automatically resume. + /// + public async Task PauseAsync(TimeSpan pauseTime) + { + await LatchAndDrainAsync(); + + _logger.LogInformation("Pausing sending to {Destination} for {PauseTime}", Destination, pauseTime); + + _ = Task.Delay(pauseTime, _settings.Cancellation).ContinueWith(async _ => + { + if (_settings.Cancellation.IsCancellationRequested) return; + + try + { + _logger.LogInformation("Resuming sending to {Destination} after pause", Destination); + await (this as ISenderCircuit).ResumeAsync(_settings.Cancellation); + } + catch (Exception e) + { + _logger.LogError(e, "Error trying to resume sending to {Destination} after pause", Destination); + + // Fall back to the circuit watcher to keep trying + _circuitWatcher ??= new CircuitWatcher(this, _settings.Cancellation); + } + }, TaskScheduler.Default); + } + protected virtual Task drainOtherAsync() { return Task.CompletedTask; @@ -253,8 +292,61 @@ private async Task sendWithExplicitHandlingAsync(Envelope envelope, Cancellation } } - private async Task markFailedAsync(OutgoingMessageBatch batch) + /// + /// Try to apply sending failure policies to each envelope in the batch. + /// Returns true if all envelopes were handled by policies; false if any were not. + /// + private async Task tryApplySendingFailurePoliciesAsync(OutgoingMessageBatch batch, Exception? exception) { + if (_sendingFailurePolicies == null || !_sendingFailurePolicies.HasAnyRules || _runtime == null || exception == null) + { + return false; + } + + var allHandled = true; + + foreach (var envelope in batch.Messages) + { + envelope.SendAttempts++; + + var continuation = _sendingFailurePolicies.DetermineAction(exception, envelope); + if (continuation != null) + { + try + { + var outbox = resolveOutbox(); + var lifecycle = new SendingEnvelopeLifecycle(envelope, _runtime, this, outbox); + await continuation.ExecuteAsync(lifecycle, _runtime, DateTimeOffset.UtcNow, null); + } + catch (Exception ex) + { + _logger.LogError(ex, "Error executing sending failure policy for envelope {EnvelopeId}", envelope.Id); + allHandled = false; + } + } + else + { + allHandled = false; + } + } + + return allHandled; + } + + /// + /// Resolve the outbox for creating SendingEnvelopeLifecycle instances. + /// Overridden in DurableSendingAgent to provide the actual outbox. + /// + protected virtual IMessageOutbox? resolveOutbox() => null; + + private async Task markFailedAsync(OutgoingMessageBatch batch, Exception? exception = null) + { + // Try sending failure policies first + if (exception != null && await tryApplySendingFailurePoliciesAsync(batch, exception)) + { + return; + } + // If it's already latched, just enqueue again if (Latched) { @@ -326,6 +418,6 @@ public Task MarkProcessingFailureAsync(Envelope outgoing, Exception? exception) var batch = new OutgoingMessageBatch(outgoing.Destination, new[] { outgoing }); _logger.OutgoingBatchFailed(batch, exception); - return markFailedAsync(batch); + return markFailedAsync(batch, exception); } -} \ No newline at end of file +} diff --git a/src/Wolverine/Transports/Sending/SendingEnvelopeLifecycle.cs b/src/Wolverine/Transports/Sending/SendingEnvelopeLifecycle.cs new file mode 100644 index 000000000..52640ca55 --- /dev/null +++ b/src/Wolverine/Transports/Sending/SendingEnvelopeLifecycle.cs @@ -0,0 +1,132 @@ +using Wolverine.Persistence.Durability; +using Wolverine.Runtime; + +namespace Wolverine.Transports.Sending; + +/// +/// IEnvelopeLifecycle adapter for outgoing envelopes during send failure handling. +/// Allows existing IContinuation implementations to operate on outgoing envelopes. +/// +internal class SendingEnvelopeLifecycle : IEnvelopeLifecycle +{ + private readonly MessageBus _bus; + private readonly ISendingAgent _agent; + private readonly IMessageOutbox? _outbox; + + public SendingEnvelopeLifecycle(Envelope envelope, IWolverineRuntime runtime, ISendingAgent agent, + IMessageOutbox? outbox) + { + Envelope = envelope; + _bus = new MessageBus(runtime); + _agent = agent; + _outbox = outbox; + } + + public Envelope? Envelope { get; } + + /// + /// For sending failures, "complete" means discard the envelope. + /// Delete from outbox if durable, otherwise just drop it. + /// + public async ValueTask CompleteAsync() + { + if (_outbox != null && Envelope != null) + { + await _outbox.DeleteOutgoingAsync(Envelope); + } + } + + /// + /// Re-enqueue for retry through the sending agent. + /// + public async ValueTask DeferAsync() + { + if (Envelope != null) + { + await _agent.EnqueueOutgoingAsync(Envelope); + } + } + + /// + /// Schedule the envelope for later retry via the outbox. + /// + public async Task ReScheduleAsync(DateTimeOffset scheduledTime) + { + if (Envelope != null) + { + Envelope.ScheduledTime = scheduledTime; + Envelope.Status = EnvelopeStatus.Scheduled; + + if (_outbox != null) + { + await _outbox.StoreOutgoingAsync(Envelope, Envelope.OwnerId); + } + } + } + + /// + /// Move the failed outgoing envelope to dead letter storage. + /// + public async Task MoveToDeadLetterQueueAsync(Exception exception) + { + if (Envelope != null) + { + await _bus.Runtime.Storage.Inbox.MoveToDeadLetterStorageAsync(Envelope, exception); + } + } + + /// + /// Re-enqueue immediately for retry. + /// + public async Task RetryExecutionNowAsync() + { + if (Envelope != null) + { + await _agent.EnqueueOutgoingAsync(Envelope); + } + } + + // Not applicable for outgoing messages + public ValueTask SendAcknowledgementAsync() => ValueTask.CompletedTask; + public ValueTask SendFailureAcknowledgementAsync(string failureDescription) => ValueTask.CompletedTask; + public ValueTask RespondToSenderAsync(object response) => ValueTask.CompletedTask; + + // No transaction context for sending failures, so flushing is a no-op + public Task FlushOutgoingMessagesAsync() => Task.CompletedTask; + + // Delegate IMessageBus methods to inner MessageBus + public string? TenantId + { + get => _bus.TenantId; + set => _bus.TenantId = value; + } + + public Task InvokeAsync(object message, CancellationToken cancellation = default, TimeSpan? timeout = default) + => _bus.InvokeAsync(message, cancellation, timeout); + + public Task InvokeAsync(object message, DeliveryOptions options, CancellationToken cancellation = default, TimeSpan? timeout = default) + => _bus.InvokeAsync(message, options, cancellation, timeout); + + public Task InvokeAsync(object message, CancellationToken cancellation = default, TimeSpan? timeout = default) + => _bus.InvokeAsync(message, cancellation, timeout); + + public Task InvokeAsync(object message, DeliveryOptions options, CancellationToken cancellation = default, TimeSpan? timeout = default) + => _bus.InvokeAsync(message, options, cancellation, timeout); + + public Task InvokeForTenantAsync(string tenantId, object message, CancellationToken cancellation = default, TimeSpan? timeout = default) + => _bus.InvokeForTenantAsync(tenantId, message, cancellation, timeout); + + public Task InvokeForTenantAsync(string tenantId, object message, CancellationToken cancellation = default, TimeSpan? timeout = default) + => _bus.InvokeForTenantAsync(tenantId, message, cancellation, timeout); + + public IDestinationEndpoint EndpointFor(string endpointName) => _bus.EndpointFor(endpointName); + public IDestinationEndpoint EndpointFor(Uri uri) => _bus.EndpointFor(uri); + + public IReadOnlyList PreviewSubscriptions(object message) => _bus.PreviewSubscriptions(message); + public IReadOnlyList PreviewSubscriptions(object message, DeliveryOptions options) => _bus.PreviewSubscriptions(message, options); + + public ValueTask SendAsync(T message, DeliveryOptions? options = null) => _bus.SendAsync(message, options); + public ValueTask PublishAsync(T message, DeliveryOptions? options = null) => _bus.PublishAsync(message, options); + public ValueTask BroadcastToTopicAsync(string topicName, object message, DeliveryOptions? options = null) + => _bus.BroadcastToTopicAsync(topicName, message, options); +} diff --git a/src/Wolverine/WolverineOptions.cs b/src/Wolverine/WolverineOptions.cs index a1c95a3ca..eed0923d4 100644 --- a/src/Wolverine/WolverineOptions.cs +++ b/src/Wolverine/WolverineOptions.cs @@ -145,6 +145,13 @@ public WolverineOptions(string? assemblyName) public MetricsOptions Metrics { get; } = new(); + /// + /// Global failure handling policies for outgoing message send failures. + /// Use this to configure how Wolverine handles exceptions that occur when + /// trying to send messages to external transports. + /// + public SendingFailurePolicies SendingFailure { get; } = new(); + /// /// What is the policy within this application for whether or not it is valid to allow Service Location within /// the generated code for message handlers or HTTP endpoints. Default is AllowedByWarn. Just keep in mind that