Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions docs/guide/messaging/transports/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,53 @@ using var host = await Host.CreateDefaultBuilder()
Note that the `Uri` scheme within Wolverine for any endpoints from a "named" Kafka broker is the name that you supply
for the broker. So in the example above, you might see `Uri` values for `emea://colors` or `americas://red`.

## Native Dead Letter Queue

Wolverine supports routing failed Kafka messages to a designated dead letter queue (DLQ) Kafka topic instead of relying on database-backed dead letter storage. This is opt-in on a per-listener basis.

### Enabling the Dead Letter Queue

To enable the native DLQ for a Kafka listener, use the `EnableNativeDeadLetterQueue()` method:

```cs
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UseKafka("localhost:9092").AutoProvision();

opts.ListenToKafkaTopic("incoming")
.ProcessInline()
.EnableNativeDeadLetterQueue();
}).StartAsync();
```

When a message fails all retry attempts, it will be produced to the DLQ Kafka topic (default: `wolverine-dead-letter-queue`) with the original message body and Wolverine envelope headers intact. The following exception metadata headers are added:

- `exception-type` - The full type name of the exception
- `exception-message` - The exception message
- `exception-stack` - The exception stack trace
- `failed-at` - Unix timestamp in milliseconds when the failure occurred

### Configuring the DLQ Topic Name

The default DLQ topic name is `wolverine-dead-letter-queue`. You can customize this at the transport level:

```cs
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UseKafka("localhost:9092")
.AutoProvision()
.DeadLetterQueueTopicName("my-app-dead-letters");

opts.ListenToKafkaTopic("incoming")
.ProcessInline()
.EnableNativeDeadLetterQueue();
}).StartAsync();
```

The DLQ topic is shared across all listeners on the same Kafka transport that have native DLQ enabled. When `AutoProvision` is enabled, the DLQ topic will be automatically created.

## Disabling all Sending

Hey, you might have an application that only consumes Kafka messages, but there are a *few* diagnostics in Wolverine that
Expand Down
186 changes: 186 additions & 0 deletions src/Transports/Kafka/Wolverine.Kafka.Tests/DeadLetterQueueTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
using System.Text;
using Confluent.Kafka;
using JasperFx.Core;
using JasperFx.Resources;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Shouldly;
using Wolverine.Configuration;
using Wolverine.ErrorHandling;
using Wolverine.Kafka.Internals;
using Wolverine.Tracking;
using Xunit.Abstractions;

namespace Wolverine.Kafka.Tests;

public class DeadLetterQueueTests : IAsyncLifetime
{
private readonly ITestOutputHelper _output;
private IHost _host;
private readonly string _topicName;
private readonly string _dlqTopicName;

public DeadLetterQueueTests(ITestOutputHelper output)
{
_output = output;
_topicName = $"dlq-test-{Guid.NewGuid():N}";
_dlqTopicName = "wolverine-dead-letter-queue";
}

public async Task InitializeAsync()
{
_host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UseKafka("localhost:9092")
.AutoProvision()
.ConfigureConsumers(c => c.AutoOffsetReset = AutoOffsetReset.Earliest);

opts.ListenToKafkaTopic(_topicName)
.ProcessInline()
.EnableNativeDeadLetterQueue();

opts.PublishMessage<DlqTestMessage>()
.ToKafkaTopic(_topicName)
.SendInline();

opts.Policies.OnException<AlwaysFailException>().MoveToErrorQueue();

opts.Discovery.IncludeAssembly(GetType().Assembly);

opts.Services.AddResourceSetupOnStartup();
}).StartAsync();
}

private ConsumeResult<string, byte[]>? ConsumeDlqMessage(TimeSpan timeout)
{
var config = new ConsumerConfig
{
BootstrapServers = "localhost:9092",
GroupId = $"dlq-verify-{Guid.NewGuid():N}",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = true
};

using var consumer = new ConsumerBuilder<string, byte[]>(config).Build();
consumer.Subscribe(_dlqTopicName);

var deadline = DateTime.UtcNow.Add(timeout);
while (DateTime.UtcNow < deadline)
{
try
{
var result = consumer.Consume(TimeSpan.FromSeconds(5));
if (result != null) return result;
}
catch (ConsumeException)
{
// Retry on transient errors
}
}

return null;
}

[Fact]
public async Task failed_message_moves_to_dead_letter_queue()
{
var session = await _host.TrackActivity()
.IncludeExternalTransports()
.DoNotAssertOnExceptionsDetected()
.Timeout(30.Seconds())
.ExecuteAndWaitAsync(ctx => ctx.PublishAsync(new DlqTestMessage("fail-me")));

// Verify Wolverine tracked MovedToErrorQueue
var movedRecord = session.AllRecordsInOrder()
.FirstOrDefault(x => x.MessageEventType == MessageEventType.MovedToErrorQueue);
movedRecord.ShouldNotBeNull("Expected message to be moved to error queue");

// Consume from DLQ topic and verify message arrived
var result = ConsumeDlqMessage(30.Seconds());
result.ShouldNotBeNull("Expected message on DLQ Kafka topic");
result.Message.ShouldNotBeNull();
result.Message.Value.ShouldNotBeNull();
}

[Fact]
public async Task dead_letter_message_has_exception_headers()
{
await _host.TrackActivity()
.IncludeExternalTransports()
.DoNotAssertOnExceptionsDetected()
.Timeout(60.Seconds())
.WaitForMessageToBeReceivedAt<DlqTestMessage>(_host)
.PublishMessageAndWaitAsync(new DlqTestMessage("fail-headers"));

var result = ConsumeDlqMessage(30.Seconds());
result.ShouldNotBeNull("Expected message on DLQ Kafka topic");

var headers = result.Message.Headers;
headers.ShouldNotBeNull();

GetHeaderValue(headers, "exception-type").ShouldContain("AlwaysFailException");
GetHeaderValue(headers, "exception-message").ShouldNotBeNullOrEmpty();
GetHeaderValue(headers, "exception-stack").ShouldNotBeNullOrEmpty();
GetHeaderValue(headers, "failed-at").ShouldNotBeNullOrEmpty();
}

[Fact]
public void native_dlq_disabled_by_default()
{
var transport = new KafkaTransport();
var topic = new KafkaTopic(transport, "test-topic", EndpointRole.Application);
topic.NativeDeadLetterQueueEnabled.ShouldBeFalse();
}

[Fact]
public void default_dlq_topic_name()
{
var transport = new KafkaTransport();
transport.DeadLetterQueueTopicName.ShouldBe("wolverine-dead-letter-queue");
}

[Fact]
public void custom_dlq_topic_name()
{
var transport = new KafkaTransport();
transport.DeadLetterQueueTopicName = "my-custom-dlq";
transport.DeadLetterQueueTopicName.ShouldBe("my-custom-dlq");
}

private static string GetHeaderValue(Headers headers, string key)
{
if (headers.TryGetLastBytes(key, out var bytes))
{
return Encoding.UTF8.GetString(bytes);
}

return string.Empty;
}

public async Task DisposeAsync()
{
if (_host != null)
{
await _host.StopAsync();
_host.Dispose();
}
}
}

public record DlqTestMessage(string Id);

public class AlwaysFailException : Exception
{
public AlwaysFailException(string message) : base(message)
{
}
}

public static class DlqTestMessageHandler
{
public static void Handle(DlqTestMessage message)
{
throw new AlwaysFailException($"Intentional failure for DLQ testing: {message.Id}");
}
}
53 changes: 53 additions & 0 deletions src/Transports/Kafka/Wolverine.Kafka.Tests/dlq_compliance.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
using Confluent.Kafka;
using JasperFx.Resources;
using Microsoft.Extensions.DependencyInjection;
using Wolverine.ComplianceTests.Compliance;

namespace Wolverine.Kafka.Tests;

public class BufferedComplianceWithDlqFixture : TransportComplianceFixture, IAsyncLifetime
{
public BufferedComplianceWithDlqFixture() : base(new Uri("kafka://topic/receiver"), 120)
{
}

public async Task InitializeAsync()
{
var receiverTopic = "buffered.dlq.receiver";
var senderTopic = "buffered.dlq.sender";

OutboundAddress = new Uri("kafka://topic/" + receiverTopic);

await ReceiverIs(opts =>
{
opts.UseKafka("localhost:9092").AutoProvision();

opts.ListenToKafkaTopic(receiverTopic)
.Named("receiver")
.BufferedInMemory()
.EnableNativeDeadLetterQueue();

opts.Services.AddResourceSetupOnStartup();
});

await SenderIs(opts =>
{
opts.UseKafka("localhost:9092")
.AutoProvision()
.ConfigureConsumers(x => x.EnableAutoCommit = false);

opts.ListenToKafkaTopic(senderTopic);

opts.PublishAllMessages().ToKafkaTopic(receiverTopic).BufferedInMemory();

opts.Services.AddResourceSetupOnStartup();
});
}

public new Task DisposeAsync()
{
return Task.CompletedTask;
}
}

public class BufferedSendingAndReceivingWithDlqCompliance : TransportCompliance<BufferedComplianceWithDlqFixture>;
52 changes: 51 additions & 1 deletion src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaListener.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Text;
using Confluent.Kafka;
using JasperFx.Core;
using Microsoft.Extensions.Logging;
Expand All @@ -7,19 +8,23 @@

namespace Wolverine.Kafka.Internals;

public class KafkaListener : IListener, IDisposable
public class KafkaListener : IListener, IDisposable, ISupportDeadLetterQueue
{
private readonly KafkaTopic _endpoint;
private readonly IConsumer<string, byte[]> _consumer;
private CancellationTokenSource _cancellation = new();
private readonly Task _runner;
private readonly IReceiver _receiver;
private readonly string? _messageTypeName;
private readonly QualityOfService _qualityOfService;
private readonly ILogger _logger;

public KafkaListener(KafkaTopic topic, ConsumerConfig config,
IConsumer<string, byte[]> consumer, IReceiver receiver,
ILogger<KafkaListener> logger)
{
_endpoint = topic;
_logger = logger;
Address = topic.Uri;
_consumer = consumer;
var mapper = topic.EnvelopeMapper;
Expand Down Expand Up @@ -137,6 +142,51 @@ public async ValueTask StopAsync()
await _runner;
}

public bool NativeDeadLetterQueueEnabled => _endpoint.NativeDeadLetterQueueEnabled;

public async Task MoveToErrorsAsync(Envelope envelope, Exception exception)
{
var transport = _endpoint.Parent;
var dlqTopicName = transport.DeadLetterQueueTopicName;

try
{
var message = await _endpoint.EnvelopeMapper!.CreateMessage(envelope);

message.Headers ??= new Headers();
message.Headers.Add("exception-type", Encoding.UTF8.GetBytes(exception.GetType().FullName ?? "Unknown"));
message.Headers.Add("exception-message", Encoding.UTF8.GetBytes(exception.Message));
message.Headers.Add("exception-stack", Encoding.UTF8.GetBytes(exception.StackTrace ?? ""));
message.Headers.Add("failed-at", Encoding.UTF8.GetBytes(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds().ToString()));

using var producer = transport.CreateProducer(_endpoint.GetEffectiveProducerConfig());
await producer.ProduceAsync(dlqTopicName, message);
producer.Flush();

_logger.LogInformation(
"Moved envelope {EnvelopeId} to dead letter queue topic {DlqTopic}. Exception: {ExceptionType}: {ExceptionMessage}",
envelope.Id, dlqTopicName, exception.GetType().Name, exception.Message);

try
{
_consumer.Commit();
}
catch (Exception commitEx)
{
_logger.LogWarning(commitEx,
"Error committing offset after moving envelope {EnvelopeId} to dead letter queue",
envelope.Id);
}
}
catch (Exception ex)
{
_logger.LogError(ex,
"Failed to move envelope {EnvelopeId} to dead letter queue topic {DlqTopic}",
envelope.Id, dlqTopicName);
throw;
}
}

public void Dispose()
{
_consumer.SafeDispose();
Expand Down
Loading
Loading