Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions docs/guide/messaging/transports/rabbitmq/deadletterqueues.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,36 @@ using var host = await Host.CreateDefaultBuilder()
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Samples.cs#L435-L458' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_overriding_rabbit_mq_dead_letter_queue_interop_friendly' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## Enhanced Dead Lettering with Exception Metadata

By default, Wolverine uses RabbitMQ's native NACK mechanism to move failed messages to the dead letter exchange. While simple, this approach does not include any information about *why* the message failed.

With `EnableEnhancedDeadLettering()`, Wolverine will instead publish failed messages directly to the dead letter queue with exception metadata headers, then ACK the original message. This gives you structured failure information on each dead-lettered message:

| Header | Description |
|--------|-------------|
| `exception-type` | Full type name of the exception |
| `exception-message` | The exception message |
| `exception-stack` | The exception stack trace |
| `failed-at` | Unix timestamp (milliseconds) when the failure occurred |

```cs
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UseRabbitMq()
.EnableEnhancedDeadLettering();
}).StartAsync();
```

::: tip
These same metadata headers are automatically included for all other Wolverine transports (SQS, Azure Service Bus, GCP Pub/Sub, NATS, Kafka, Redis, Pulsar) when messages are moved to dead letter queues.
:::

::: warning
Enhanced dead lettering bypasses RabbitMQ's native dead letter exchange (DLX) mechanism. Messages are published to the DLQ by Wolverine rather than being NACK'd. If you rely on native DLX routing or policies, this mode may not be appropriate.
:::

And lastly, if you don't particularly want to have any Rabbit MQ dead letter queues and you quite like the [database backed
dead letter queues](/guide/durability/dead-letter-storage) you get with Wolverine's message durability, you can use the `WolverineStorage` option:

Expand Down
75 changes: 75 additions & 0 deletions src/Testing/CoreTests/Transports/DeadLetterQueueConstantsTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
using Shouldly;
using Wolverine.Transports;
using Xunit;

namespace CoreTests.Transports;

public class DeadLetterQueueConstantsTests
{
[Fact]
public void stamp_failure_metadata_sets_all_headers()
{
var envelope = new Envelope();
var exception = new InvalidOperationException("something went wrong");

DeadLetterQueueConstants.StampFailureMetadata(envelope, exception);

envelope.Headers[DeadLetterQueueConstants.ExceptionTypeHeader]
.ShouldBe(typeof(InvalidOperationException).FullName);
envelope.Headers[DeadLetterQueueConstants.ExceptionMessageHeader]
.ShouldBe("something went wrong");
envelope.Headers[DeadLetterQueueConstants.ExceptionStackHeader]
.ShouldNotBeNull();
envelope.Headers[DeadLetterQueueConstants.FailedAtHeader]
.ShouldNotBeNull();

long.TryParse(envelope.Headers[DeadLetterQueueConstants.FailedAtHeader], out var timestamp)
.ShouldBeTrue();
timestamp.ShouldBeGreaterThan(0);
}

[Fact]
public void stamp_failure_metadata_preserves_existing_headers()
{
var envelope = new Envelope();
envelope.Headers["custom-header"] = "custom-value";
envelope.Headers["another"] = "one";

var exception = new ArgumentException("bad arg");

DeadLetterQueueConstants.StampFailureMetadata(envelope, exception);

envelope.Headers["custom-header"].ShouldBe("custom-value");
envelope.Headers["another"].ShouldBe("one");
envelope.Headers[DeadLetterQueueConstants.ExceptionTypeHeader]
.ShouldBe(typeof(ArgumentException).FullName);
}

[Fact]
public void stamp_failure_metadata_handles_null_stack_trace()
{
var envelope = new Envelope();
// Exception created without throwing has null StackTrace
var exception = new Exception("test");

DeadLetterQueueConstants.StampFailureMetadata(envelope, exception);

envelope.Headers[DeadLetterQueueConstants.ExceptionStackHeader].ShouldBe("");
}

[Fact]
public void stamp_failure_metadata_overwrites_previous_failure_headers()
{
var envelope = new Envelope();
var firstException = new InvalidOperationException("first");
DeadLetterQueueConstants.StampFailureMetadata(envelope, firstException);

var secondException = new ArgumentException("second");
DeadLetterQueueConstants.StampFailureMetadata(envelope, secondException);

envelope.Headers[DeadLetterQueueConstants.ExceptionTypeHeader]
.ShouldBe(typeof(ArgumentException).FullName);
envelope.Headers[DeadLetterQueueConstants.ExceptionMessageHeader]
.ShouldBe("second");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ public async Task<bool> TryRequeueAsync(Envelope envelope)

public Task MoveToErrorsAsync(Envelope envelope, Exception exception)
{
DeadLetterQueueConstants.StampFailureMetadata(envelope, exception);
return _deadLetterBlock!.PostAsync(envelope);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public async Task MoveToErrorsAsync(Envelope envelope, Exception exception)
{
if (envelope is AzureServiceBusEnvelope e)
{
DeadLetterQueueConstants.StampFailureMetadata(envelope, exception);
e.Exception = exception;
await _deadLetter.PostAsync(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public async Task MoveToErrorsAsync(Envelope envelope, Exception exception)
{
if (envelope is AzureServiceBusEnvelope e)
{
DeadLetterQueueConstants.StampFailureMetadata(envelope, exception);
e.Exception = exception;
await _deadLetter.PostAsync(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ public ValueTask DisposeAsync()

public Task MoveToErrorsAsync(Envelope envelope, Exception exception)
{
DeadLetterQueueConstants.StampFailureMetadata(envelope, exception);
return _deadLetter.PostAsync(envelope);
}

Expand Down
6 changes: 1 addition & 5 deletions src/Transports/NATS/Wolverine.Nats/Internal/NatsListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,8 @@ await natsEnvelope.JetStreamMsg.AckAsync(
{
envelope.Attempts = (int)(metadata?.NumDelivered ?? 1);

envelope.Headers["x-dlq-reason"] = exception.Message;
envelope.Headers["x-dlq-timestamp"] = DateTimeOffset.UtcNow.ToString("O");
DeadLetterQueueConstants.StampFailureMetadata(envelope, exception);
envelope.Headers["x-dlq-original-subject"] = _endpoint.Subject;
envelope.Headers["x-dlq-attempts"] = envelope.Attempts.ToString();
envelope.Headers["x-dlq-exception-type"] =
exception.GetType().FullName ?? "Unknown";

await _deadLetterSender.SendAsync(envelope);
}
Expand Down
2 changes: 2 additions & 0 deletions src/Transports/Pulsar/Wolverine.Pulsar/PulsarListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,8 @@ private MessageMetadata BuildMessageMetadata(Envelope envelope, PulsarEnvelope e
messageMetadata.DeliverAtTimeAsDateTimeOffset = DateTimeOffset.UtcNow;
if (exception != null)
{
DeadLetterQueueConstants.StampFailureMetadata(envelope, exception);

var exceptionText = exception.ToString();
messageMetadata[PulsarEnvelopeConstants.Exception] = exceptionText;
e.Headers[PulsarEnvelopeConstants.Exception] = exceptionText;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public ValueTask DeferAsync(Envelope envelope)

public async Task MoveToErrorsAsync(Envelope envelope, Exception exception)
{
DeadLetterQueueConstants.StampFailureMetadata(envelope, exception);
await _sendBlock.PostAsync(envelope);
}

Expand Down Expand Up @@ -73,8 +74,11 @@ public RabbitMqListener(IWolverineRuntime runtime,
_transport = transport;
_receiver = receiver ?? throw new ArgumentNullException(nameof(receiver));

_callback = (Queue.DeadLetterQueue != null) &
(Queue.DeadLetterQueue?.Mode == DeadLetterQueueMode.InteropFriendly)
var useEnhancedOrInterop = Queue.DeadLetterQueue != null &&
(Queue.DeadLetterQueue.Mode == DeadLetterQueueMode.InteropFriendly ||
_transport.UseEnhancedDeadLettering);

_callback = useEnhancedOrInterop
? new RabbitMqInteropFriendlyCallback(_transport, _transport.Queues[Queue.DeadLetterQueue!.QueueName],
_runtime)
: _transport.Callback!;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,14 @@ internal ConnectionMonitor BuildConnection(ConnectionRole role)

public ILogger<RabbitMqTransport> Logger { get; private set; } = NullLogger<RabbitMqTransport>.Instance;

/// <summary>
/// When true, Wolverine will publish failed messages to the dead letter queue
/// with exception metadata headers instead of using RabbitMQ's native NACK-based
/// dead lettering. This enables richer error information at the cost of not using
/// native RabbitMQ dead letter exchange mechanisms.
/// </summary>
public bool UseEnhancedDeadLettering { get; set; }

/// <summary>
/// Opt into making Wolverine "auto ping" new listeners by trying to send a fake Wolverine "ping" message
/// This *might* assist in Wolverine auto-starting rabbit mq connections that have failed on the Rabbit MQ side
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,18 @@ public RabbitMqTransportExpression(RabbitMqTransport transport, WolverineOptions
{
}

/// <summary>
/// When enabled, Wolverine will publish failed messages to the dead letter queue
/// with exception metadata headers (exception type, message, stack trace, timestamp)
/// instead of using RabbitMQ's native NACK-based dead lettering. This enables richer
/// error information at the cost of not using native RabbitMQ dead letter exchange mechanisms.
/// </summary>
public RabbitMqTransportExpression EnableEnhancedDeadLettering()
{
Transport.UseEnhancedDeadLettering = true;
return this;
}

/// <summary>
/// Opt into making Wolverine "auto ping" new listeners by trying to send a fake Wolverine "ping" message
/// This *might* assist in Wolverine auto-starting rabbit mq connections that have failed on the Rabbit MQ side
Expand Down
1 change: 1 addition & 0 deletions src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@
}
}

public IHandlerPipeline? Pipeline { get; }

Check warning on line 111 in src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? BufferedReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 111 in src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? BufferedReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 111 in src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? BufferedReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 111 in src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? BufferedReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 111 in src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? BufferedReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 111 in src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? BufferedReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 111 in src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? BufferedReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 111 in src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? BufferedReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 111 in src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? BufferedReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 111 in src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? BufferedReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

public Uri Uri { get; }

Expand Down Expand Up @@ -219,6 +219,7 @@

public Task MoveToErrorsAsync(Envelope envelope, Exception exception)
{
DeadLetterQueueConstants.StampFailureMetadata(envelope, exception);
return _moveToErrors!.PostAsync(envelope);
}

Expand Down
1 change: 1 addition & 0 deletions src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@
await EnqueueAsync(envelope);
}

public IHandlerPipeline? Pipeline { get; }

Check warning on line 212 in src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? DurableReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 212 in src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? DurableReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 212 in src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? DurableReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 212 in src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? DurableReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 212 in src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? DurableReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 212 in src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? DurableReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 212 in src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? DurableReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 212 in src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? DurableReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 212 in src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? DurableReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 212 in src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? DurableReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

public Uri Uri { get; set; }

Expand Down Expand Up @@ -313,6 +313,7 @@
public Task MoveToErrorsAsync(Envelope envelope, Exception exception)
{
envelope.Failure = exception;
DeadLetterQueueConstants.StampFailureMetadata(envelope, exception);

return _moveToErrors.PostAsync(envelope);
}
Expand Down
11 changes: 11 additions & 0 deletions src/Wolverine/Transports/DeadLetterQueueConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,17 @@ namespace Wolverine.Transports;

public static class DeadLetterQueueConstants
{
/// <summary>
/// Stamps the envelope headers with standard failure metadata from the given exception.
/// </summary>
public static void StampFailureMetadata(Envelope envelope, Exception exception)
{
envelope.Headers[ExceptionTypeHeader] = exception.GetType().FullName ?? "Unknown";
envelope.Headers[ExceptionMessageHeader] = exception.Message;
envelope.Headers[ExceptionStackHeader] = exception.StackTrace ?? "";
envelope.Headers[FailedAtHeader] = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds().ToString();
}

/// <summary>
/// The default queue/topic name used for dead letter queues across all transports.
/// </summary>
Expand Down
Loading