Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions src/Transports/Kafka/Wolverine.Kafka.Tests/DeadLetterQueueTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,103 @@ public async Task DisposeAsync()
}
}

public class BufferedDeadLetterQueueTests : IAsyncLifetime
{
private IHost _host = null!;
private readonly string _topicName;
private readonly string _dlqTopicName;

public BufferedDeadLetterQueueTests()
{
_topicName = $"dlq-buffered-test-{Guid.NewGuid():N}";
_dlqTopicName = $"dlq-buffered-verify-{Guid.NewGuid():N}";
}

public async Task InitializeAsync()
{
_host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UseKafka(KafkaContainerFixture.ConnectionString)
.AutoProvision()
.DeadLetterQueueTopicName(_dlqTopicName)
.ConfigureConsumers(c => c.AutoOffsetReset = AutoOffsetReset.Earliest);

opts.ListenToKafkaTopic(_topicName)
.BufferedInMemory()
.EnableNativeDeadLetterQueue();

opts.PublishMessage<DlqTestMessage>()
.ToKafkaTopic(_topicName);

opts.Policies.OnException<AlwaysFailException>().MoveToErrorQueue();

opts.Discovery.IncludeAssembly(GetType().Assembly);

opts.Services.AddResourceSetupOnStartup();
}).StartAsync();
}

private ConsumeResult<string, byte[]>? ConsumeFromTopic(string topic, TimeSpan timeout)
{
var config = new ConsumerConfig
{
BootstrapServers = KafkaContainerFixture.ConnectionString,
GroupId = $"dlq-buffered-verify-{Guid.NewGuid():N}",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = true
};

using var consumer = new ConsumerBuilder<string, byte[]>(config).Build();
consumer.Subscribe(topic);

var deadline = DateTime.UtcNow.Add(timeout);
while (DateTime.UtcNow < deadline)
{
try
{
var result = consumer.Consume(TimeSpan.FromSeconds(5));
if (result != null) return result;
}
catch (ConsumeException)
{
// Retry on transient errors
}
}

return null;
}

[Fact]
public async Task buffered_failed_message_lands_on_dlq_topic_not_source_topic()
{
// Regression test: the DLQ sender used to read envelope.TopicName (set to the
// source topic on inbound messages) and produce failed messages back to the
// source topic instead of the DLQ topic — so nothing ever reached the DLQ
// under BufferedInMemory mode. See InlineKafkaSender.fixedDestination.

await _host.TrackActivity()
.IncludeExternalTransports()
.DoNotAssertOnExceptionsDetected()
.Timeout(30.Seconds())
.ExecuteAndWaitAsync(ctx => ctx.PublishAsync(new DlqTestMessage("buffered-fail-me")));

var result = ConsumeFromTopic(_dlqTopicName, 30.Seconds());
result.ShouldNotBeNull("Expected failed message to land on the DLQ Kafka topic under BufferedInMemory mode");
result.Topic.ShouldBe(_dlqTopicName);
result.Message.Value.ShouldNotBeNull();
}

public async Task DisposeAsync()
{
if (_host != null)
{
await _host.StopAsync();
_host.Dispose();
}
}
}

public record DlqTestMessage(string Id);

public class AlwaysFailException : Exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ public class InlineKafkaSender : ISender, IDisposable
{
private readonly KafkaTopic _topic;
private readonly IProducer<string, byte[]> _producer;
private readonly bool _fixedDestination;

public InlineKafkaSender(KafkaTopic topic)
public InlineKafkaSender(KafkaTopic topic, bool fixedDestination = false)
{
_topic = topic;
_fixedDestination = fixedDestination;
Destination = topic.Uri;
Config = topic.GetEffectiveProducerConfig();
_producer = _topic.Parent.CreateProducer(Config);
Expand All @@ -37,7 +39,8 @@ public async ValueTask SendAsync(Envelope envelope)
{
var message = await _topic.EnvelopeMapper!.CreateMessage(envelope);

await _producer.ProduceAsync(envelope.TopicName ?? _topic.TopicName, message);
var topicName = _fixedDestination ? _topic.TopicName : envelope.TopicName ?? _topic.TopicName;
await _producer.ProduceAsync(topicName, message);
_producer.Flush();
}

Expand Down
2 changes: 1 addition & 1 deletion src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public override bool TryBuildDeadLetterSender(IWolverineRuntime runtime, out ISe
{
var dlqTopic = Parent.Topics[Parent.DeadLetterQueueTopicName];
dlqTopic.EnvelopeMapper ??= dlqTopic.BuildMapper(runtime);
deadLetterSender = new InlineKafkaSender(dlqTopic);
deadLetterSender = new InlineKafkaSender(dlqTopic, fixedDestination: true);
return true;
}

Expand Down
2 changes: 1 addition & 1 deletion src/Transports/Kafka/Wolverine.Kafka/KafkaTopicGroup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public override bool TryBuildDeadLetterSender(IWolverineRuntime runtime, out ISe
{
var dlqTopic = Parent.Topics[Parent.DeadLetterQueueTopicName];
dlqTopic.EnvelopeMapper ??= dlqTopic.BuildMapper(runtime);
deadLetterSender = new InlineKafkaSender(dlqTopic);
deadLetterSender = new InlineKafkaSender(dlqTopic, fixedDestination: true);
return true;
}

Expand Down
Loading