diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/DeadLetterQueueTests.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/DeadLetterQueueTests.cs index a3e573781..25d5dd344 100644 --- a/src/Transports/Kafka/Wolverine.Kafka.Tests/DeadLetterQueueTests.cs +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/DeadLetterQueueTests.cs @@ -168,6 +168,103 @@ public async Task DisposeAsync() } } +public class BufferedDeadLetterQueueTests : IAsyncLifetime +{ + private IHost _host = null!; + private readonly string _topicName; + private readonly string _dlqTopicName; + + public BufferedDeadLetterQueueTests() + { + _topicName = $"dlq-buffered-test-{Guid.NewGuid():N}"; + _dlqTopicName = $"dlq-buffered-verify-{Guid.NewGuid():N}"; + } + + public async Task InitializeAsync() + { + _host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseKafka(KafkaContainerFixture.ConnectionString) + .AutoProvision() + .DeadLetterQueueTopicName(_dlqTopicName) + .ConfigureConsumers(c => c.AutoOffsetReset = AutoOffsetReset.Earliest); + + opts.ListenToKafkaTopic(_topicName) + .BufferedInMemory() + .EnableNativeDeadLetterQueue(); + + opts.PublishMessage() + .ToKafkaTopic(_topicName); + + opts.Policies.OnException().MoveToErrorQueue(); + + opts.Discovery.IncludeAssembly(GetType().Assembly); + + opts.Services.AddResourceSetupOnStartup(); + }).StartAsync(); + } + + private ConsumeResult? ConsumeFromTopic(string topic, TimeSpan timeout) + { + var config = new ConsumerConfig + { + BootstrapServers = KafkaContainerFixture.ConnectionString, + GroupId = $"dlq-buffered-verify-{Guid.NewGuid():N}", + AutoOffsetReset = AutoOffsetReset.Earliest, + EnableAutoCommit = true + }; + + using var consumer = new ConsumerBuilder(config).Build(); + consumer.Subscribe(topic); + + var deadline = DateTime.UtcNow.Add(timeout); + while (DateTime.UtcNow < deadline) + { + try + { + var result = consumer.Consume(TimeSpan.FromSeconds(5)); + if (result != null) return result; + } + catch (ConsumeException) + { + // Retry on transient errors + } + } + + return null; + } + + [Fact] + public async Task buffered_failed_message_lands_on_dlq_topic_not_source_topic() + { + // Regression test: the DLQ sender used to read envelope.TopicName (set to the + // source topic on inbound messages) and produce failed messages back to the + // source topic instead of the DLQ topic — so nothing ever reached the DLQ + // under BufferedInMemory mode. See InlineKafkaSender.fixedDestination. + + await _host.TrackActivity() + .IncludeExternalTransports() + .DoNotAssertOnExceptionsDetected() + .Timeout(30.Seconds()) + .ExecuteAndWaitAsync(ctx => ctx.PublishAsync(new DlqTestMessage("buffered-fail-me"))); + + var result = ConsumeFromTopic(_dlqTopicName, 30.Seconds()); + result.ShouldNotBeNull("Expected failed message to land on the DLQ Kafka topic under BufferedInMemory mode"); + result.Topic.ShouldBe(_dlqTopicName); + result.Message.Value.ShouldNotBeNull(); + } + + public async Task DisposeAsync() + { + if (_host != null) + { + await _host.StopAsync(); + _host.Dispose(); + } + } +} + public record DlqTestMessage(string Id); public class AlwaysFailException : Exception diff --git a/src/Transports/Kafka/Wolverine.Kafka/Internals/InlineKafkaSender.cs b/src/Transports/Kafka/Wolverine.Kafka/Internals/InlineKafkaSender.cs index bf6525e2b..ce27e8f9a 100644 --- a/src/Transports/Kafka/Wolverine.Kafka/Internals/InlineKafkaSender.cs +++ b/src/Transports/Kafka/Wolverine.Kafka/Internals/InlineKafkaSender.cs @@ -7,10 +7,12 @@ public class InlineKafkaSender : ISender, IDisposable { private readonly KafkaTopic _topic; private readonly IProducer _producer; + private readonly bool _fixedDestination; - public InlineKafkaSender(KafkaTopic topic) + public InlineKafkaSender(KafkaTopic topic, bool fixedDestination = false) { _topic = topic; + _fixedDestination = fixedDestination; Destination = topic.Uri; Config = topic.GetEffectiveProducerConfig(); _producer = _topic.Parent.CreateProducer(Config); @@ -37,7 +39,8 @@ public async ValueTask SendAsync(Envelope envelope) { var message = await _topic.EnvelopeMapper!.CreateMessage(envelope); - await _producer.ProduceAsync(envelope.TopicName ?? _topic.TopicName, message); + var topicName = _fixedDestination ? _topic.TopicName : envelope.TopicName ?? _topic.TopicName; + await _producer.ProduceAsync(topicName, message); _producer.Flush(); } diff --git a/src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs b/src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs index b35eda901..ade091f64 100644 --- a/src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs +++ b/src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs @@ -129,7 +129,7 @@ public override bool TryBuildDeadLetterSender(IWolverineRuntime runtime, out ISe { var dlqTopic = Parent.Topics[Parent.DeadLetterQueueTopicName]; dlqTopic.EnvelopeMapper ??= dlqTopic.BuildMapper(runtime); - deadLetterSender = new InlineKafkaSender(dlqTopic); + deadLetterSender = new InlineKafkaSender(dlqTopic, fixedDestination: true); return true; } diff --git a/src/Transports/Kafka/Wolverine.Kafka/KafkaTopicGroup.cs b/src/Transports/Kafka/Wolverine.Kafka/KafkaTopicGroup.cs index 116b5029b..d46e60e23 100644 --- a/src/Transports/Kafka/Wolverine.Kafka/KafkaTopicGroup.cs +++ b/src/Transports/Kafka/Wolverine.Kafka/KafkaTopicGroup.cs @@ -64,7 +64,7 @@ public override bool TryBuildDeadLetterSender(IWolverineRuntime runtime, out ISe { var dlqTopic = Parent.Topics[Parent.DeadLetterQueueTopicName]; dlqTopic.EnvelopeMapper ??= dlqTopic.BuildMapper(runtime); - deadLetterSender = new InlineKafkaSender(dlqTopic); + deadLetterSender = new InlineKafkaSender(dlqTopic, fixedDestination: true); return true; }