Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions docs/guide/messaging/transports/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,66 @@ using var host = await Host.CreateDefaultBuilder()

The DLQ topic is shared across all listeners on the same Kafka transport that have native DLQ enabled. When `AutoProvision` is enabled, the DLQ topic will be automatically created.

## Multi-Topic Listening <Badge type="tip" text="5.18" />

By default, each call to `ListenToKafkaTopic()` creates a separate Kafka consumer. If you have many topics that share
the same logical workload, this can lead to excessive consumer group rebalances and slower startup times.

Wolverine supports subscribing to multiple Kafka topics with a single consumer using `ListenToKafkaTopics()`:

```csharp
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UseKafka("localhost:9092").AutoProvision();

// Subscribe to multiple topics with a single consumer
opts.ListenToKafkaTopics("orders", "invoices", "shipments")
.ProcessInline();
}).StartAsync();
```

This creates a single `KafkaTopicGroup` endpoint that subscribes to all listed topics using one Kafka consumer.
The endpoint name defaults to the topic names joined by underscores (e.g. `orders_invoices_shipments`), and the
URI follows the pattern `kafka://topic/orders_invoices_shipments`.

### Consumer Configuration

You can override the consumer configuration for a topic group just like for individual topics:

```csharp
opts.ListenToKafkaTopics("orders", "invoices")
.ConfigureConsumer(config =>
{
config.GroupId = "order-processing";
config.AutoOffsetReset = AutoOffsetReset.Earliest;
});
```

### Dead Letter Queue Support

Multi-topic listeners support the same native dead letter queue as individual topic listeners:

```csharp
opts.ListenToKafkaTopics("orders", "invoices")
.ProcessInline()
.EnableNativeDeadLetterQueue();
```

### When to Use Multi-Topic Listening

Use `ListenToKafkaTopics()` when:

- Multiple topics feed into the same handler pipeline
- You want to reduce the number of Kafka consumer connections
- You need faster startup and fewer consumer group rebalances

Use individual `ListenToKafkaTopic()` calls when:

- Topics need different consumer configurations (e.g. different `GroupId` values)
- Topics need different processing modes (inline vs buffered vs durable)
- You want independent scaling or error handling per topic

## Disabling all Sending

Hey, you might have an application that only consumes Kafka messages, but there are a *few* diagnostics in Wolverine that
Expand Down
144 changes: 144 additions & 0 deletions src/Transports/Kafka/Wolverine.Kafka.Tests/multi_topic_listening.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
using System.Diagnostics;
using Confluent.Kafka;
using JasperFx.Core;
using JasperFx.Resources;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Shouldly;
using Wolverine.Attributes;
using Wolverine.ComplianceTests;
using Wolverine.Runtime;
using Wolverine.Tracking;
using Xunit.Abstractions;

namespace Wolverine.Kafka.Tests;

public class multi_topic_listening : IAsyncLifetime
{
private readonly ITestOutputHelper _output;
private IHost _sender;
private IHost _receiver;

public multi_topic_listening(ITestOutputHelper output)
{
_output = output;
}

public async Task InitializeAsync()
{
_receiver = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UseKafka("localhost:9092")
.AutoProvision()
.ConfigureConsumers(c =>
{
c.AutoOffsetReset = AutoOffsetReset.Earliest;
c.GroupId = "multi-topic-test";
});

// Listen to both topics with a single consumer
opts.ListenToKafkaTopics("multi-alpha", "multi-beta")
.ProcessInline();

opts.ServiceName = "multi-topic-receiver";

opts.Discovery.IncludeAssembly(GetType().Assembly);

opts.Services.AddResourceSetupOnStartup();
opts.Services.AddSingleton<ILoggerProvider>(new OutputLoggerProvider(_output));
}).StartAsync();

_sender = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UseKafka("localhost:9092").AutoProvision();
opts.Policies.DisableConventionalLocalRouting();

opts.PublishAllMessages().ToKafkaTopics().SendInline();

opts.ServiceName = "multi-topic-sender";

opts.Services.AddResourceSetupOnStartup();
opts.Services.AddSingleton<ILoggerProvider>(new OutputLoggerProvider(_output));
}).StartAsync();
}

[Fact]
public async Task receive_from_multiple_topics_with_single_consumer()
{
MultiTopicAlphaHandler.Received = new TaskCompletionSource<bool>();
MultiTopicBetaHandler.Received = new TaskCompletionSource<bool>();

await _sender
.TrackActivity()
.AlsoTrack(_receiver)
.Timeout(60.Seconds())
.WaitForMessageToBeReceivedAt<AlphaMessage>(_receiver)
.PublishMessageAndWaitAsync(new AlphaMessage("hello"));

await _sender
.TrackActivity()
.AlsoTrack(_receiver)
.Timeout(60.Seconds())
.WaitForMessageToBeReceivedAt<BetaMessage>(_receiver)
.PublishMessageAndWaitAsync(new BetaMessage("world"));

await MultiTopicAlphaHandler.Received.Task.TimeoutAfterAsync(30000);
await MultiTopicBetaHandler.Received.Task.TimeoutAfterAsync(30000);
}

[Fact]
public async Task topic_group_uri_uses_concatenated_names()
{
var runtime = _receiver.Services.GetRequiredService<IWolverineRuntime>();
var endpoints = runtime.Options.Transports.SelectMany(t => t.Endpoints()).ToArray();

// Should find the topic group endpoint with concatenated name
var groupEndpoint = endpoints
.OfType<KafkaTopicGroup>()
.FirstOrDefault();

groupEndpoint.ShouldNotBeNull();
groupEndpoint.TopicNames.ShouldContain("multi-alpha");
groupEndpoint.TopicNames.ShouldContain("multi-beta");
groupEndpoint.Uri.ToString().ShouldContain("topic/");
}

public async Task DisposeAsync()
{
await _sender.StopAsync();
_sender.Dispose();
await _receiver.StopAsync();
_receiver.Dispose();
}
}

[Topic("multi-alpha")]
public record AlphaMessage(string Text);

[Topic("multi-beta")]
public record BetaMessage(string Text);

public static class MultiTopicAlphaHandler
{
public static TaskCompletionSource<bool> Received { get; set; } = new();

public static void Handle(AlphaMessage message)
{
Debug.WriteLine("Got alpha: " + message.Text);
Received.TrySetResult(true);
}
}

public static class MultiTopicBetaHandler
{
public static TaskCompletionSource<bool> Received { get; set; } = new();

public static void Handle(BetaMessage message)
{
Debug.WriteLine("Got beta: " + message.Text);
Received.TrySetResult(true);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
using System.Text;
using Confluent.Kafka;
using JasperFx.Core;
using Microsoft.Extensions.Logging;
using Wolverine.Runtime;
using Wolverine.Transports;
using Wolverine.Util;

namespace Wolverine.Kafka.Internals;

public class KafkaTopicGroupListener : IListener, IDisposable, ISupportDeadLetterQueue
{
private readonly KafkaTopicGroup _endpoint;
private readonly IConsumer<string, byte[]> _consumer;
private CancellationTokenSource _cancellation = new();
private readonly Task _runner;
private readonly IReceiver _receiver;
private readonly ILogger _logger;

public KafkaTopicGroupListener(KafkaTopicGroup endpoint, ConsumerConfig config,
IConsumer<string, byte[]> consumer, IReceiver receiver,
ILogger<KafkaTopicGroupListener> logger)
{
_endpoint = endpoint;
_logger = logger;
Address = endpoint.Uri;
_consumer = consumer;
var mapper = endpoint.EnvelopeMapper;

Config = config;
_receiver = receiver;

_runner = Task.Run(async () =>
{
// Subscribe to all topics at once — single consumer, multiple topics
_consumer.Subscribe(endpoint.TopicNames);
try
{
while (!_cancellation.IsCancellationRequested)
{
try
{
var result = _consumer.Consume(_cancellation.Token);
var message = result.Message;

var envelope = mapper!.CreateEnvelope(result.Topic, message);
envelope.Offset = result.Offset.Value;
envelope.GroupId = config.GroupId;

await receiver.ReceivedAsync(this, envelope);
}
catch (OperationCanceledException)
{
// we're done here!
}
catch (Exception e)
{
// Might be a poison pill message, try to get out of here
try
{
_consumer.Commit();
}
// ReSharper disable once EmptyGeneralCatchClause
catch (Exception)
{
}

logger.LogError(e, "Error trying to map Kafka message to a Wolverine envelope");
}
}
}
catch (OperationCanceledException)
{
// Shutting down
}
finally
{
_consumer.Close();
}
}, _cancellation.Token);
}

public ConsumerConfig Config { get; }

public IHandlerPipeline? Pipeline => _receiver.Pipeline;

public ValueTask CompleteAsync(Envelope envelope)
{
try
{
_consumer.Commit();
}
catch (Exception)
{
}
return ValueTask.CompletedTask;
}

public ValueTask DeferAsync(Envelope envelope)
{
return _receiver.ReceivedAsync(this, envelope);
}

public ValueTask DisposeAsync()
{
Dispose();
return ValueTask.CompletedTask;
}

public Uri Address { get; }

public async ValueTask StopAsync()
{
_cancellation.Cancel();
await _runner;
}

public bool NativeDeadLetterQueueEnabled => _endpoint.NativeDeadLetterQueueEnabled;

public async Task MoveToErrorsAsync(Envelope envelope, Exception exception)
{
var transport = _endpoint.Parent;
var dlqTopicName = transport.DeadLetterQueueTopicName;

try
{
var message = await _endpoint.EnvelopeMapper!.CreateMessage(envelope);

message.Headers ??= new Headers();
message.Headers.Add(DeadLetterQueueConstants.ExceptionTypeHeader, Encoding.UTF8.GetBytes(exception.GetType().FullName ?? "Unknown"));
message.Headers.Add(DeadLetterQueueConstants.ExceptionMessageHeader, Encoding.UTF8.GetBytes(exception.Message));
message.Headers.Add(DeadLetterQueueConstants.ExceptionStackHeader, Encoding.UTF8.GetBytes(exception.StackTrace ?? ""));
message.Headers.Add(DeadLetterQueueConstants.FailedAtHeader, Encoding.UTF8.GetBytes(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds().ToString()));

using var producer = transport.CreateProducer(transport.ProducerConfig);
await producer.ProduceAsync(dlqTopicName, message);
producer.Flush();

_logger.LogInformation(
"Moved envelope {EnvelopeId} to dead letter queue topic {DlqTopic}. Exception: {ExceptionType}: {ExceptionMessage}",
envelope.Id, dlqTopicName, exception.GetType().Name, exception.Message);

try
{
_consumer.Commit();
}
catch (Exception commitEx)
{
_logger.LogWarning(commitEx,
"Error committing offset after moving envelope {EnvelopeId} to dead letter queue",
envelope.Id);
}
}
catch (Exception ex)
{
_logger.LogError(ex,
"Failed to move envelope {EnvelopeId} to dead letter queue topic {DlqTopic}",
envelope.Id, dlqTopicName);
throw;
}
}

public void Dispose()
{
_consumer.SafeDispose();
_runner.Dispose();
}
}
Loading
Loading