diff --git a/docs/guide/messaging/transports/kafka.md b/docs/guide/messaging/transports/kafka.md index b64f485be..70fccb92c 100644 --- a/docs/guide/messaging/transports/kafka.md +++ b/docs/guide/messaging/transports/kafka.md @@ -264,6 +264,53 @@ using var host = await Host.CreateDefaultBuilder() Note that the `Uri` scheme within Wolverine for any endpoints from a "named" Kafka broker is the name that you supply for the broker. So in the example above, you might see `Uri` values for `emea://colors` or `americas://red`. +## Native Dead Letter Queue + +Wolverine supports routing failed Kafka messages to a designated dead letter queue (DLQ) Kafka topic instead of relying on database-backed dead letter storage. This is opt-in on a per-listener basis. + +### Enabling the Dead Letter Queue + +To enable the native DLQ for a Kafka listener, use the `EnableNativeDeadLetterQueue()` method: + +```cs +using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseKafka("localhost:9092").AutoProvision(); + + opts.ListenToKafkaTopic("incoming") + .ProcessInline() + .EnableNativeDeadLetterQueue(); + }).StartAsync(); +``` + +When a message fails all retry attempts, it will be produced to the DLQ Kafka topic (default: `wolverine-dead-letter-queue`) with the original message body and Wolverine envelope headers intact. The following exception metadata headers are added: + +- `exception-type` - The full type name of the exception +- `exception-message` - The exception message +- `exception-stack` - The exception stack trace +- `failed-at` - Unix timestamp in milliseconds when the failure occurred + +### Configuring the DLQ Topic Name + +The default DLQ topic name is `wolverine-dead-letter-queue`. You can customize this at the transport level: + +```cs +using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseKafka("localhost:9092") + .AutoProvision() + .DeadLetterQueueTopicName("my-app-dead-letters"); + + opts.ListenToKafkaTopic("incoming") + .ProcessInline() + .EnableNativeDeadLetterQueue(); + }).StartAsync(); +``` + +The DLQ topic is shared across all listeners on the same Kafka transport that have native DLQ enabled. When `AutoProvision` is enabled, the DLQ topic will be automatically created. + ## Disabling all Sending Hey, you might have an application that only consumes Kafka messages, but there are a *few* diagnostics in Wolverine that diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/DeadLetterQueueTests.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/DeadLetterQueueTests.cs new file mode 100644 index 000000000..abf5fec9c --- /dev/null +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/DeadLetterQueueTests.cs @@ -0,0 +1,186 @@ +using System.Text; +using Confluent.Kafka; +using JasperFx.Core; +using JasperFx.Resources; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Wolverine.Configuration; +using Wolverine.ErrorHandling; +using Wolverine.Kafka.Internals; +using Wolverine.Tracking; +using Xunit.Abstractions; + +namespace Wolverine.Kafka.Tests; + +public class DeadLetterQueueTests : IAsyncLifetime +{ + private readonly ITestOutputHelper _output; + private IHost _host; + private readonly string _topicName; + private readonly string _dlqTopicName; + + public DeadLetterQueueTests(ITestOutputHelper output) + { + _output = output; + _topicName = $"dlq-test-{Guid.NewGuid():N}"; + _dlqTopicName = "wolverine-dead-letter-queue"; + } + + public async Task InitializeAsync() + { + _host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseKafka("localhost:9092") + .AutoProvision() + .ConfigureConsumers(c => c.AutoOffsetReset = AutoOffsetReset.Earliest); + + opts.ListenToKafkaTopic(_topicName) + .ProcessInline() + .EnableNativeDeadLetterQueue(); + + opts.PublishMessage() + .ToKafkaTopic(_topicName) + .SendInline(); + + opts.Policies.OnException().MoveToErrorQueue(); + + opts.Discovery.IncludeAssembly(GetType().Assembly); + + opts.Services.AddResourceSetupOnStartup(); + }).StartAsync(); + } + + private ConsumeResult? ConsumeDlqMessage(TimeSpan timeout) + { + var config = new ConsumerConfig + { + BootstrapServers = "localhost:9092", + GroupId = $"dlq-verify-{Guid.NewGuid():N}", + AutoOffsetReset = AutoOffsetReset.Earliest, + EnableAutoCommit = true + }; + + using var consumer = new ConsumerBuilder(config).Build(); + consumer.Subscribe(_dlqTopicName); + + var deadline = DateTime.UtcNow.Add(timeout); + while (DateTime.UtcNow < deadline) + { + try + { + var result = consumer.Consume(TimeSpan.FromSeconds(5)); + if (result != null) return result; + } + catch (ConsumeException) + { + // Retry on transient errors + } + } + + return null; + } + + [Fact] + public async Task failed_message_moves_to_dead_letter_queue() + { + var session = await _host.TrackActivity() + .IncludeExternalTransports() + .DoNotAssertOnExceptionsDetected() + .Timeout(30.Seconds()) + .ExecuteAndWaitAsync(ctx => ctx.PublishAsync(new DlqTestMessage("fail-me"))); + + // Verify Wolverine tracked MovedToErrorQueue + var movedRecord = session.AllRecordsInOrder() + .FirstOrDefault(x => x.MessageEventType == MessageEventType.MovedToErrorQueue); + movedRecord.ShouldNotBeNull("Expected message to be moved to error queue"); + + // Consume from DLQ topic and verify message arrived + var result = ConsumeDlqMessage(30.Seconds()); + result.ShouldNotBeNull("Expected message on DLQ Kafka topic"); + result.Message.ShouldNotBeNull(); + result.Message.Value.ShouldNotBeNull(); + } + + [Fact] + public async Task dead_letter_message_has_exception_headers() + { + await _host.TrackActivity() + .IncludeExternalTransports() + .DoNotAssertOnExceptionsDetected() + .Timeout(60.Seconds()) + .WaitForMessageToBeReceivedAt(_host) + .PublishMessageAndWaitAsync(new DlqTestMessage("fail-headers")); + + var result = ConsumeDlqMessage(30.Seconds()); + result.ShouldNotBeNull("Expected message on DLQ Kafka topic"); + + var headers = result.Message.Headers; + headers.ShouldNotBeNull(); + + GetHeaderValue(headers, "exception-type").ShouldContain("AlwaysFailException"); + GetHeaderValue(headers, "exception-message").ShouldNotBeNullOrEmpty(); + GetHeaderValue(headers, "exception-stack").ShouldNotBeNullOrEmpty(); + GetHeaderValue(headers, "failed-at").ShouldNotBeNullOrEmpty(); + } + + [Fact] + public void native_dlq_disabled_by_default() + { + var transport = new KafkaTransport(); + var topic = new KafkaTopic(transport, "test-topic", EndpointRole.Application); + topic.NativeDeadLetterQueueEnabled.ShouldBeFalse(); + } + + [Fact] + public void default_dlq_topic_name() + { + var transport = new KafkaTransport(); + transport.DeadLetterQueueTopicName.ShouldBe("wolverine-dead-letter-queue"); + } + + [Fact] + public void custom_dlq_topic_name() + { + var transport = new KafkaTransport(); + transport.DeadLetterQueueTopicName = "my-custom-dlq"; + transport.DeadLetterQueueTopicName.ShouldBe("my-custom-dlq"); + } + + private static string GetHeaderValue(Headers headers, string key) + { + if (headers.TryGetLastBytes(key, out var bytes)) + { + return Encoding.UTF8.GetString(bytes); + } + + return string.Empty; + } + + public async Task DisposeAsync() + { + if (_host != null) + { + await _host.StopAsync(); + _host.Dispose(); + } + } +} + +public record DlqTestMessage(string Id); + +public class AlwaysFailException : Exception +{ + public AlwaysFailException(string message) : base(message) + { + } +} + +public static class DlqTestMessageHandler +{ + public static void Handle(DlqTestMessage message) + { + throw new AlwaysFailException($"Intentional failure for DLQ testing: {message.Id}"); + } +} diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/dlq_compliance.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/dlq_compliance.cs new file mode 100644 index 000000000..08ee82336 --- /dev/null +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/dlq_compliance.cs @@ -0,0 +1,53 @@ +using Confluent.Kafka; +using JasperFx.Resources; +using Microsoft.Extensions.DependencyInjection; +using Wolverine.ComplianceTests.Compliance; + +namespace Wolverine.Kafka.Tests; + +public class BufferedComplianceWithDlqFixture : TransportComplianceFixture, IAsyncLifetime +{ + public BufferedComplianceWithDlqFixture() : base(new Uri("kafka://topic/receiver"), 120) + { + } + + public async Task InitializeAsync() + { + var receiverTopic = "buffered.dlq.receiver"; + var senderTopic = "buffered.dlq.sender"; + + OutboundAddress = new Uri("kafka://topic/" + receiverTopic); + + await ReceiverIs(opts => + { + opts.UseKafka("localhost:9092").AutoProvision(); + + opts.ListenToKafkaTopic(receiverTopic) + .Named("receiver") + .BufferedInMemory() + .EnableNativeDeadLetterQueue(); + + opts.Services.AddResourceSetupOnStartup(); + }); + + await SenderIs(opts => + { + opts.UseKafka("localhost:9092") + .AutoProvision() + .ConfigureConsumers(x => x.EnableAutoCommit = false); + + opts.ListenToKafkaTopic(senderTopic); + + opts.PublishAllMessages().ToKafkaTopic(receiverTopic).BufferedInMemory(); + + opts.Services.AddResourceSetupOnStartup(); + }); + } + + public new Task DisposeAsync() + { + return Task.CompletedTask; + } +} + +public class BufferedSendingAndReceivingWithDlqCompliance : TransportCompliance; diff --git a/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaListener.cs b/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaListener.cs index 440b3fd89..7c578d4b4 100644 --- a/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaListener.cs +++ b/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaListener.cs @@ -1,3 +1,4 @@ +using System.Text; using Confluent.Kafka; using JasperFx.Core; using Microsoft.Extensions.Logging; @@ -7,19 +8,23 @@ namespace Wolverine.Kafka.Internals; -public class KafkaListener : IListener, IDisposable +public class KafkaListener : IListener, IDisposable, ISupportDeadLetterQueue { + private readonly KafkaTopic _endpoint; private readonly IConsumer _consumer; private CancellationTokenSource _cancellation = new(); private readonly Task _runner; private readonly IReceiver _receiver; private readonly string? _messageTypeName; private readonly QualityOfService _qualityOfService; + private readonly ILogger _logger; public KafkaListener(KafkaTopic topic, ConsumerConfig config, IConsumer consumer, IReceiver receiver, ILogger logger) { + _endpoint = topic; + _logger = logger; Address = topic.Uri; _consumer = consumer; var mapper = topic.EnvelopeMapper; @@ -137,6 +142,51 @@ public async ValueTask StopAsync() await _runner; } + public bool NativeDeadLetterQueueEnabled => _endpoint.NativeDeadLetterQueueEnabled; + + public async Task MoveToErrorsAsync(Envelope envelope, Exception exception) + { + var transport = _endpoint.Parent; + var dlqTopicName = transport.DeadLetterQueueTopicName; + + try + { + var message = await _endpoint.EnvelopeMapper!.CreateMessage(envelope); + + message.Headers ??= new Headers(); + message.Headers.Add("exception-type", Encoding.UTF8.GetBytes(exception.GetType().FullName ?? "Unknown")); + message.Headers.Add("exception-message", Encoding.UTF8.GetBytes(exception.Message)); + message.Headers.Add("exception-stack", Encoding.UTF8.GetBytes(exception.StackTrace ?? "")); + message.Headers.Add("failed-at", Encoding.UTF8.GetBytes(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds().ToString())); + + using var producer = transport.CreateProducer(_endpoint.GetEffectiveProducerConfig()); + await producer.ProduceAsync(dlqTopicName, message); + producer.Flush(); + + _logger.LogInformation( + "Moved envelope {EnvelopeId} to dead letter queue topic {DlqTopic}. Exception: {ExceptionType}: {ExceptionMessage}", + envelope.Id, dlqTopicName, exception.GetType().Name, exception.Message); + + try + { + _consumer.Commit(); + } + catch (Exception commitEx) + { + _logger.LogWarning(commitEx, + "Error committing offset after moving envelope {EnvelopeId} to dead letter queue", + envelope.Id); + } + } + catch (Exception ex) + { + _logger.LogError(ex, + "Failed to move envelope {EnvelopeId} to dead letter queue topic {DlqTopic}", + envelope.Id, dlqTopicName); + throw; + } + } + public void Dispose() { _consumer.SafeDispose(); diff --git a/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaTransport.cs b/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaTransport.cs index 12915ccff..35d93645c 100644 --- a/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaTransport.cs +++ b/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaTransport.cs @@ -37,6 +37,12 @@ public KafkaTransport(string protocol) : base(protocol, "Kafka Topics", ["kafka" Topics = new Cache(topicName => new KafkaTopic(this, topicName, EndpointRole.Application)); } + /// + /// The Kafka topic name used for native dead letter queue messages. + /// Default is "wolverine-dead-letter-queue". + /// + public string DeadLetterQueueTopicName { get; set; } = "wolverine-dead-letter-queue"; + public KafkaUsage Usage { get; set; } = KafkaUsage.ProduceAndConsume; public override Uri ResourceUri @@ -80,7 +86,19 @@ protected override void tryBuildSystemEndpoints(IWolverineRuntime runtime) public override ValueTask ConnectAsync(IWolverineRuntime runtime) { - foreach (var endpoint in Topics) endpoint.Compile(runtime); + var needsDlqTopic = false; + foreach (var endpoint in Topics) + { + endpoint.Compile(runtime); + if (endpoint.NativeDeadLetterQueueEnabled) needsDlqTopic = true; + } + + // Ensure the DLQ topic is registered and compiled so it gets auto-provisioned + if (needsDlqTopic) + { + var dlqTopic = Topics[DeadLetterQueueTopicName]; + dlqTopic.Compile(runtime); + } return ValueTask.CompletedTask; } diff --git a/src/Transports/Kafka/Wolverine.Kafka/KafkaListenerConfiguration.cs b/src/Transports/Kafka/Wolverine.Kafka/KafkaListenerConfiguration.cs index fb3dad49c..15aa8e258 100644 --- a/src/Transports/Kafka/Wolverine.Kafka/KafkaListenerConfiguration.cs +++ b/src/Transports/Kafka/Wolverine.Kafka/KafkaListenerConfiguration.cs @@ -79,6 +79,31 @@ public KafkaListenerConfiguration ReceiveRawJson(Type messageType, JsonSerialize return UseInterop((e, _) => new JsonOnlyMapper(e, options ?? new())); } + /// + /// Enable native dead letter queue support for this Kafka listener. + /// Failed messages will be produced to the DLQ Kafka topic + /// (default: "wolverine-dead-letter-queue") with exception details + /// in Kafka headers. + /// + /// + public KafkaListenerConfiguration EnableNativeDeadLetterQueue() + { + add(topic => topic.NativeDeadLetterQueueEnabled = true); + return this; + } + + /// + /// Disable native dead letter queue support for this Kafka listener. + /// Failed messages will use Wolverine's default dead letter handling + /// (database persistence). + /// + /// + public KafkaListenerConfiguration DisableNativeDeadLetterQueue() + { + add(topic => topic.NativeDeadLetterQueueEnabled = false); + return this; + } + /// /// Configure the consumer config for only this topic. This overrides the default /// settings at the transport level. This is not combinatorial with the parent configuration diff --git a/src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs b/src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs index 9d944f188..87e2edaa0 100644 --- a/src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs +++ b/src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs @@ -50,6 +50,14 @@ public override bool AutoStartSendingAgent() /// public ProducerConfig? ProducerConfig { get; internal set; } + /// + /// Enable native dead letter queue support for this endpoint. + /// When enabled, failed messages will be produced to the Kafka DLQ topic + /// instead of being moved to database-backed dead letter storage. + /// Default is false (opt-in). + /// + public bool NativeDeadLetterQueueEnabled { get; set; } + public static string TopicNameForUri(Uri uri) { return uri.Segments.Last().Trim('/'); @@ -101,6 +109,20 @@ protected override ISender CreateSender(IWolverineRuntime runtime) runtime.LoggerFactory.CreateLogger()); } + public override bool TryBuildDeadLetterSender(IWolverineRuntime runtime, out ISender? deadLetterSender) + { + if (NativeDeadLetterQueueEnabled) + { + var dlqTopic = Parent.Topics[Parent.DeadLetterQueueTopicName]; + dlqTopic.EnvelopeMapper ??= dlqTopic.BuildMapper(runtime); + deadLetterSender = new InlineKafkaSender(dlqTopic); + return true; + } + + deadLetterSender = default; + return false; + } + public async ValueTask CheckAsync() { // Can't do anything about this diff --git a/src/Transports/Kafka/Wolverine.Kafka/KafkaTransportExpression.cs b/src/Transports/Kafka/Wolverine.Kafka/KafkaTransportExpression.cs index e2941ac44..8ae0e8f1c 100644 --- a/src/Transports/Kafka/Wolverine.Kafka/KafkaTransportExpression.cs +++ b/src/Transports/Kafka/Wolverine.Kafka/KafkaTransportExpression.cs @@ -106,6 +106,18 @@ public KafkaTransportExpression DeleteExistingTopicsOnStartup() return this; } + /// + /// Configure the Kafka topic name used for native dead letter queue messages. + /// Default is "wolverine-dead-letter-queue". + /// + /// The Kafka topic name for the dead letter queue + /// + public KafkaTransportExpression DeadLetterQueueTopicName(string topicName) + { + _transport.DeadLetterQueueTopicName = topicName; + return this; + } + protected override KafkaListenerConfiguration createListenerExpression(KafkaTopic listenerEndpoint) { return new KafkaListenerConfiguration(listenerEndpoint);