diff --git a/docs/guide/messaging/transports/kafka.md b/docs/guide/messaging/transports/kafka.md index af63b976b..b8444e7c9 100644 --- a/docs/guide/messaging/transports/kafka.md +++ b/docs/guide/messaging/transports/kafka.md @@ -401,6 +401,66 @@ using var host = await Host.CreateDefaultBuilder() The DLQ topic is shared across all listeners on the same Kafka transport that have native DLQ enabled. When `AutoProvision` is enabled, the DLQ topic will be automatically created. +## Multi-Topic Listening + +By default, each call to `ListenToKafkaTopic()` creates a separate Kafka consumer. If you have many topics that share +the same logical workload, this can lead to excessive consumer group rebalances and slower startup times. + +Wolverine supports subscribing to multiple Kafka topics with a single consumer using `ListenToKafkaTopics()`: + +```csharp +using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseKafka("localhost:9092").AutoProvision(); + + // Subscribe to multiple topics with a single consumer + opts.ListenToKafkaTopics("orders", "invoices", "shipments") + .ProcessInline(); + }).StartAsync(); +``` + +This creates a single `KafkaTopicGroup` endpoint that subscribes to all listed topics using one Kafka consumer. +The endpoint name defaults to the topic names joined by underscores (e.g. `orders_invoices_shipments`), and the +URI follows the pattern `kafka://topic/orders_invoices_shipments`. + +### Consumer Configuration + +You can override the consumer configuration for a topic group just like for individual topics: + +```csharp +opts.ListenToKafkaTopics("orders", "invoices") + .ConfigureConsumer(config => + { + config.GroupId = "order-processing"; + config.AutoOffsetReset = AutoOffsetReset.Earliest; + }); +``` + +### Dead Letter Queue Support + +Multi-topic listeners support the same native dead letter queue as individual topic listeners: + +```csharp +opts.ListenToKafkaTopics("orders", "invoices") + .ProcessInline() + .EnableNativeDeadLetterQueue(); +``` + +### When to Use Multi-Topic Listening + +Use `ListenToKafkaTopics()` when: + +- Multiple topics feed into the same handler pipeline +- You want to reduce the number of Kafka consumer connections +- You need faster startup and fewer consumer group rebalances + +Use individual `ListenToKafkaTopic()` calls when: + +- Topics need different consumer configurations (e.g. different `GroupId` values) +- Topics need different processing modes (inline vs buffered vs durable) +- You want independent scaling or error handling per topic + ## Disabling all Sending Hey, you might have an application that only consumes Kafka messages, but there are a *few* diagnostics in Wolverine that diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/multi_topic_listening.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/multi_topic_listening.cs new file mode 100644 index 000000000..08128223c --- /dev/null +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/multi_topic_listening.cs @@ -0,0 +1,144 @@ +using System.Diagnostics; +using Confluent.Kafka; +using JasperFx.Core; +using JasperFx.Resources; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Shouldly; +using Wolverine.Attributes; +using Wolverine.ComplianceTests; +using Wolverine.Runtime; +using Wolverine.Tracking; +using Xunit.Abstractions; + +namespace Wolverine.Kafka.Tests; + +public class multi_topic_listening : IAsyncLifetime +{ + private readonly ITestOutputHelper _output; + private IHost _sender; + private IHost _receiver; + + public multi_topic_listening(ITestOutputHelper output) + { + _output = output; + } + + public async Task InitializeAsync() + { + _receiver = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseKafka("localhost:9092") + .AutoProvision() + .ConfigureConsumers(c => + { + c.AutoOffsetReset = AutoOffsetReset.Earliest; + c.GroupId = "multi-topic-test"; + }); + + // Listen to both topics with a single consumer + opts.ListenToKafkaTopics("multi-alpha", "multi-beta") + .ProcessInline(); + + opts.ServiceName = "multi-topic-receiver"; + + opts.Discovery.IncludeAssembly(GetType().Assembly); + + opts.Services.AddResourceSetupOnStartup(); + opts.Services.AddSingleton(new OutputLoggerProvider(_output)); + }).StartAsync(); + + _sender = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseKafka("localhost:9092").AutoProvision(); + opts.Policies.DisableConventionalLocalRouting(); + + opts.PublishAllMessages().ToKafkaTopics().SendInline(); + + opts.ServiceName = "multi-topic-sender"; + + opts.Services.AddResourceSetupOnStartup(); + opts.Services.AddSingleton(new OutputLoggerProvider(_output)); + }).StartAsync(); + } + + [Fact] + public async Task receive_from_multiple_topics_with_single_consumer() + { + MultiTopicAlphaHandler.Received = new TaskCompletionSource(); + MultiTopicBetaHandler.Received = new TaskCompletionSource(); + + await _sender + .TrackActivity() + .AlsoTrack(_receiver) + .Timeout(60.Seconds()) + .WaitForMessageToBeReceivedAt(_receiver) + .PublishMessageAndWaitAsync(new AlphaMessage("hello")); + + await _sender + .TrackActivity() + .AlsoTrack(_receiver) + .Timeout(60.Seconds()) + .WaitForMessageToBeReceivedAt(_receiver) + .PublishMessageAndWaitAsync(new BetaMessage("world")); + + await MultiTopicAlphaHandler.Received.Task.TimeoutAfterAsync(30000); + await MultiTopicBetaHandler.Received.Task.TimeoutAfterAsync(30000); + } + + [Fact] + public async Task topic_group_uri_uses_concatenated_names() + { + var runtime = _receiver.Services.GetRequiredService(); + var endpoints = runtime.Options.Transports.SelectMany(t => t.Endpoints()).ToArray(); + + // Should find the topic group endpoint with concatenated name + var groupEndpoint = endpoints + .OfType() + .FirstOrDefault(); + + groupEndpoint.ShouldNotBeNull(); + groupEndpoint.TopicNames.ShouldContain("multi-alpha"); + groupEndpoint.TopicNames.ShouldContain("multi-beta"); + groupEndpoint.Uri.ToString().ShouldContain("topic/"); + } + + public async Task DisposeAsync() + { + await _sender.StopAsync(); + _sender.Dispose(); + await _receiver.StopAsync(); + _receiver.Dispose(); + } +} + +[Topic("multi-alpha")] +public record AlphaMessage(string Text); + +[Topic("multi-beta")] +public record BetaMessage(string Text); + +public static class MultiTopicAlphaHandler +{ + public static TaskCompletionSource Received { get; set; } = new(); + + public static void Handle(AlphaMessage message) + { + Debug.WriteLine("Got alpha: " + message.Text); + Received.TrySetResult(true); + } +} + +public static class MultiTopicBetaHandler +{ + public static TaskCompletionSource Received { get; set; } = new(); + + public static void Handle(BetaMessage message) + { + Debug.WriteLine("Got beta: " + message.Text); + Received.TrySetResult(true); + } +} diff --git a/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaTopicGroupListener.cs b/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaTopicGroupListener.cs new file mode 100644 index 000000000..a2263d9e0 --- /dev/null +++ b/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaTopicGroupListener.cs @@ -0,0 +1,168 @@ +using System.Text; +using Confluent.Kafka; +using JasperFx.Core; +using Microsoft.Extensions.Logging; +using Wolverine.Runtime; +using Wolverine.Transports; +using Wolverine.Util; + +namespace Wolverine.Kafka.Internals; + +public class KafkaTopicGroupListener : IListener, IDisposable, ISupportDeadLetterQueue +{ + private readonly KafkaTopicGroup _endpoint; + private readonly IConsumer _consumer; + private CancellationTokenSource _cancellation = new(); + private readonly Task _runner; + private readonly IReceiver _receiver; + private readonly ILogger _logger; + + public KafkaTopicGroupListener(KafkaTopicGroup endpoint, ConsumerConfig config, + IConsumer consumer, IReceiver receiver, + ILogger logger) + { + _endpoint = endpoint; + _logger = logger; + Address = endpoint.Uri; + _consumer = consumer; + var mapper = endpoint.EnvelopeMapper; + + Config = config; + _receiver = receiver; + + _runner = Task.Run(async () => + { + // Subscribe to all topics at once — single consumer, multiple topics + _consumer.Subscribe(endpoint.TopicNames); + try + { + while (!_cancellation.IsCancellationRequested) + { + try + { + var result = _consumer.Consume(_cancellation.Token); + var message = result.Message; + + var envelope = mapper!.CreateEnvelope(result.Topic, message); + envelope.Offset = result.Offset.Value; + envelope.GroupId = config.GroupId; + + await receiver.ReceivedAsync(this, envelope); + } + catch (OperationCanceledException) + { + // we're done here! + } + catch (Exception e) + { + // Might be a poison pill message, try to get out of here + try + { + _consumer.Commit(); + } + // ReSharper disable once EmptyGeneralCatchClause + catch (Exception) + { + } + + logger.LogError(e, "Error trying to map Kafka message to a Wolverine envelope"); + } + } + } + catch (OperationCanceledException) + { + // Shutting down + } + finally + { + _consumer.Close(); + } + }, _cancellation.Token); + } + + public ConsumerConfig Config { get; } + + public IHandlerPipeline? Pipeline => _receiver.Pipeline; + + public ValueTask CompleteAsync(Envelope envelope) + { + try + { + _consumer.Commit(); + } + catch (Exception) + { + } + return ValueTask.CompletedTask; + } + + public ValueTask DeferAsync(Envelope envelope) + { + return _receiver.ReceivedAsync(this, envelope); + } + + public ValueTask DisposeAsync() + { + Dispose(); + return ValueTask.CompletedTask; + } + + public Uri Address { get; } + + public async ValueTask StopAsync() + { + _cancellation.Cancel(); + await _runner; + } + + public bool NativeDeadLetterQueueEnabled => _endpoint.NativeDeadLetterQueueEnabled; + + public async Task MoveToErrorsAsync(Envelope envelope, Exception exception) + { + var transport = _endpoint.Parent; + var dlqTopicName = transport.DeadLetterQueueTopicName; + + try + { + var message = await _endpoint.EnvelopeMapper!.CreateMessage(envelope); + + message.Headers ??= new Headers(); + message.Headers.Add(DeadLetterQueueConstants.ExceptionTypeHeader, Encoding.UTF8.GetBytes(exception.GetType().FullName ?? "Unknown")); + message.Headers.Add(DeadLetterQueueConstants.ExceptionMessageHeader, Encoding.UTF8.GetBytes(exception.Message)); + message.Headers.Add(DeadLetterQueueConstants.ExceptionStackHeader, Encoding.UTF8.GetBytes(exception.StackTrace ?? "")); + message.Headers.Add(DeadLetterQueueConstants.FailedAtHeader, Encoding.UTF8.GetBytes(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds().ToString())); + + using var producer = transport.CreateProducer(transport.ProducerConfig); + await producer.ProduceAsync(dlqTopicName, message); + producer.Flush(); + + _logger.LogInformation( + "Moved envelope {EnvelopeId} to dead letter queue topic {DlqTopic}. Exception: {ExceptionType}: {ExceptionMessage}", + envelope.Id, dlqTopicName, exception.GetType().Name, exception.Message); + + try + { + _consumer.Commit(); + } + catch (Exception commitEx) + { + _logger.LogWarning(commitEx, + "Error committing offset after moving envelope {EnvelopeId} to dead letter queue", + envelope.Id); + } + } + catch (Exception ex) + { + _logger.LogError(ex, + "Failed to move envelope {EnvelopeId} to dead letter queue topic {DlqTopic}", + envelope.Id, dlqTopicName); + throw; + } + } + + public void Dispose() + { + _consumer.SafeDispose(); + _runner.Dispose(); + } +} diff --git a/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaTransport.cs b/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaTransport.cs index 77ee29d4d..c475827ea 100644 --- a/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaTransport.cs +++ b/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaTransport.cs @@ -18,6 +18,8 @@ public class KafkaTransport : BrokerTransport { public Cache Topics { get; } + internal List TopicGroups { get; } = new(); + public ProducerConfig ProducerConfig { get; } = new(); public Action> ConfigureProducerBuilders { get; internal set; } = _ => {}; @@ -64,7 +66,8 @@ public override Uri ResourceUri protected override IEnumerable endpoints() { - return Topics; + foreach (var topic in Topics) yield return topic; + foreach (var group in TopicGroups) yield return group; } protected override KafkaTopic findEndpointByUri(Uri uri) @@ -93,6 +96,12 @@ public override ValueTask ConnectAsync(IWolverineRuntime runtime) if (endpoint.NativeDeadLetterQueueEnabled) needsDlqTopic = true; } + foreach (var group in TopicGroups) + { + group.Compile(runtime); + if (group.NativeDeadLetterQueueEnabled) needsDlqTopic = true; + } + // Ensure the DLQ topic is registered and compiled so it gets auto-provisioned if (needsDlqTopic) { diff --git a/src/Transports/Kafka/Wolverine.Kafka/KafkaTopicGroup.cs b/src/Transports/Kafka/Wolverine.Kafka/KafkaTopicGroup.cs new file mode 100644 index 000000000..76a060613 --- /dev/null +++ b/src/Transports/Kafka/Wolverine.Kafka/KafkaTopicGroup.cs @@ -0,0 +1,126 @@ +using System.Text.RegularExpressions; +using Confluent.Kafka; +using Confluent.Kafka.Admin; +using Microsoft.Extensions.Logging; +using Wolverine.Configuration; +using Wolverine.Kafka.Internals; +using Wolverine.Runtime; +using Wolverine.Transports; +using Wolverine.Transports.Sending; + +namespace Wolverine.Kafka; + +/// +/// Represents a single Wolverine listener that subscribes to multiple Kafka topics +/// using one Kafka consumer. This reduces consumer group rebalances and startup time +/// when many topics share the same logical workload. +/// +public class KafkaTopicGroup : KafkaTopic, IBrokerEndpoint +{ + /// + /// The Kafka topic names this endpoint subscribes to + /// + public string[] TopicNames { get; } + + public KafkaTopicGroup(KafkaTransport parent, string[] topicNames, EndpointRole role) + : base(parent, SanitizeGroupName(string.Join("_", topicNames)), role) + { + TopicNames = topicNames; + EndpointName = string.Join("_", topicNames); + } + + internal static string SanitizeGroupName(string name) + { + // Replace any characters that are not valid in a URI path segment + return Regex.Replace(name, @"[^a-zA-Z0-9_\-.]", "_"); + } + + public override bool AutoStartSendingAgent() => false; + + public override ValueTask BuildListenerAsync(IWolverineRuntime runtime, IReceiver receiver) + { + EnvelopeMapper ??= BuildMapper(runtime); + + var config = GetEffectiveConsumerConfig(); + + if (Mode == EndpointMode.Durable) + { + config.EnableAutoCommit = false; + } + + var listener = new KafkaTopicGroupListener(this, config, + Parent.CreateConsumer(config), receiver, runtime.LoggerFactory.CreateLogger()); + return ValueTask.FromResult((IListener)listener); + } + + protected override ISender CreateSender(IWolverineRuntime runtime) + { + throw new NotSupportedException("KafkaTopicGroup is a listen-only endpoint. Use KafkaTopic for publishing."); + } + + public override bool TryBuildDeadLetterSender(IWolverineRuntime runtime, out ISender? deadLetterSender) + { + if (NativeDeadLetterQueueEnabled) + { + var dlqTopic = Parent.Topics[Parent.DeadLetterQueueTopicName]; + dlqTopic.EnvelopeMapper ??= dlqTopic.BuildMapper(runtime); + deadLetterSender = new InlineKafkaSender(dlqTopic); + return true; + } + + deadLetterSender = default; + return false; + } + + // Re-implement IBrokerEndpoint for multi-topic support + + new public async ValueTask CheckAsync() + { + if (Parent.Usage == KafkaUsage.ConsumeOnly) return true; + + try + { + using var client = Parent.CreateProducer(Parent.ProducerConfig); + foreach (var topicName in TopicNames) + { + await client.ProduceAsync(topicName, new Message + { + Key = "ping", + Value = System.Text.Encoding.Default.GetBytes("ping") + }); + } + + return true; + } + catch (Exception) + { + return false; + } + } + + new public async ValueTask TeardownAsync(ILogger logger) + { + using var adminClient = Parent.CreateAdminClient(); + await adminClient.DeleteTopicsAsync(TopicNames); + } + + new public async ValueTask SetupAsync(ILogger logger) + { + using var adminClient = Parent.CreateAdminClient(); + + foreach (var topicName in TopicNames) + { + try + { + var spec = new TopicSpecification { Name = topicName }; + await adminClient.CreateTopicsAsync([spec]); + logger.LogInformation("Created Kafka topic {Topic}", topicName); + } + catch (CreateTopicsException e) + { + if (e.Message.Contains("already exists.")) continue; + throw; + } + } + } +} diff --git a/src/Transports/Kafka/Wolverine.Kafka/KafkaTopicGroupListenerConfiguration.cs b/src/Transports/Kafka/Wolverine.Kafka/KafkaTopicGroupListenerConfiguration.cs new file mode 100644 index 000000000..23d0db832 --- /dev/null +++ b/src/Transports/Kafka/Wolverine.Kafka/KafkaTopicGroupListenerConfiguration.cs @@ -0,0 +1,38 @@ +using Confluent.Kafka; +using Wolverine.Configuration; + +namespace Wolverine.Kafka; + +public class KafkaTopicGroupListenerConfiguration : ListenerConfiguration +{ + public KafkaTopicGroupListenerConfiguration(KafkaTopicGroup endpoint) : base(endpoint) + { + } + + /// + /// Configure the consumer config for this topic group. This overrides the default + /// settings at the transport level. + /// + /// + /// + public KafkaTopicGroupListenerConfiguration ConfigureConsumer(Action configuration) + { + add(group => + { + var config = new ConsumerConfig(); + configuration(config); + group.ConsumerConfig = config; + }); + return this; + } + + /// + /// Enable native dead letter queue support for this listener group. + /// + /// + public KafkaTopicGroupListenerConfiguration EnableNativeDeadLetterQueue() + { + add(group => group.NativeDeadLetterQueueEnabled = true); + return this; + } +} diff --git a/src/Transports/Kafka/Wolverine.Kafka/KafkaTransportExtensions.cs b/src/Transports/Kafka/Wolverine.Kafka/KafkaTransportExtensions.cs index 1a15e38a5..da66f1d4f 100644 --- a/src/Transports/Kafka/Wolverine.Kafka/KafkaTransportExtensions.cs +++ b/src/Transports/Kafka/Wolverine.Kafka/KafkaTransportExtensions.cs @@ -73,6 +73,28 @@ public static KafkaTransportExpression ConfigureKafka(this WolverineOptions opti return new KafkaTransportExpression(transport, options); } + /// + /// Listen for incoming messages from multiple Kafka topics using a single consumer. + /// This reduces consumer group rebalances and startup time compared to creating + /// individual listeners for each topic. + /// + /// + /// The names of the Kafka topics to subscribe to + /// + public static KafkaTopicGroupListenerConfiguration ListenToKafkaTopics(this WolverineOptions endpoints, + params string[] topicNames) + { + if (topicNames.Length == 0) throw new ArgumentException("At least one topic name is required", nameof(topicNames)); + + var transport = endpoints.KafkaTransport(); + + var group = new KafkaTopicGroup(transport, topicNames, EndpointRole.Application); + group.IsListener = true; + transport.TopicGroups.Add(group); + + return new KafkaTopicGroupListenerConfiguration(group); + } + /// /// Listen for incoming messages at the designated Kafka topic name ///