From 4795a13bf65ef4be95da193e6b31952294103d0e Mon Sep 17 00:00:00 2001 From: Lyall Guiney Date: Mon, 30 Mar 2026 15:21:21 +0100 Subject: [PATCH] Add topic creation options for kafka listener group --- .../KafkaTransportTests.cs | 82 +++++++++++++++++++ .../Kafka/Wolverine.Kafka/KafkaTopicGroup.cs | 25 +++++- .../KafkaTopicGroupListenerConfiguration.cs | 42 ++++++++++ 3 files changed, 147 insertions(+), 2 deletions(-) diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/KafkaTransportTests.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/KafkaTransportTests.cs index 75b976a47..7a2b59224 100644 --- a/src/Transports/Kafka/Wolverine.Kafka.Tests/KafkaTransportTests.cs +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/KafkaTransportTests.cs @@ -1,9 +1,91 @@ +using Confluent.Kafka.Admin; using Shouldly; using Wolverine.Configuration; using Wolverine.Kafka.Internals; namespace Wolverine.Kafka.Tests; +public class KafkaTopicGroupConfigurationTests +{ + private KafkaTopicGroup BuildGroup(params string[] topics) + { + var transport = new KafkaTransport(); + var group = new KafkaTopicGroup(transport, topics, EndpointRole.Application); + return group; + } + + [Fact] + public void specification_uniform_sets_config_on_group() + { + var group = BuildGroup("topic-a", "topic-b"); + new KafkaTopicGroupListenerConfiguration(group) + .Specification(spec => spec.NumPartitions = 12); + + var capturedSpec = new TopicSpecification { Name = "topic-a" }; + group.SpecificationConfig.ShouldNotBeNull(); + group.SpecificationConfig!("topic-a", capturedSpec); + capturedSpec.NumPartitions.ShouldBe(12); + } + + [Fact] + public void specification_per_topic_receives_topic_name() + { + var group = BuildGroup("topic-a", "topic-b"); + new KafkaTopicGroupListenerConfiguration(group) + .Specification((topicName, spec) => spec.NumPartitions = topicName == "topic-a" ? 6 : 24); + + group.SpecificationConfig.ShouldNotBeNull(); + + var specA = new TopicSpecification { Name = "topic-a" }; + group.SpecificationConfig!("topic-a", specA); + specA.NumPartitions.ShouldBe(6); + + var specB = new TopicSpecification { Name = "topic-b" }; + group.SpecificationConfig!("topic-b", specB); + specB.NumPartitions.ShouldBe(24); + } + + [Fact] + public void topic_creation_sets_func_on_group() + { + var group = BuildGroup("topic-a", "topic-b"); + Func func = (_, _) => Task.CompletedTask; + + new KafkaTopicGroupListenerConfiguration(group) + .TopicCreation(func); + + group.CreateTopicFunc.ShouldNotBeNull(); + group.CreateTopicFunc.ShouldBeSameAs(func); + } + + [Fact] + public void specification_null_throws() + { + var group = BuildGroup("topic-a"); + Should.Throw(() => + new KafkaTopicGroupListenerConfiguration(group) + .Specification((Action)null!)); + } + + [Fact] + public void specification_per_topic_null_throws() + { + var group = BuildGroup("topic-a"); + Should.Throw(() => + new KafkaTopicGroupListenerConfiguration(group) + .Specification((Action)null!)); + } + + [Fact] + public void topic_creation_null_throws() + { + var group = BuildGroup("topic-a"); + Should.Throw(() => + new KafkaTopicGroupListenerConfiguration(group) + .TopicCreation(null!)); + } +} + public class KafkaTransportTests { [Theory] diff --git a/src/Transports/Kafka/Wolverine.Kafka/KafkaTopicGroup.cs b/src/Transports/Kafka/Wolverine.Kafka/KafkaTopicGroup.cs index 76a060613..12236e81f 100644 --- a/src/Transports/Kafka/Wolverine.Kafka/KafkaTopicGroup.cs +++ b/src/Transports/Kafka/Wolverine.Kafka/KafkaTopicGroup.cs @@ -104,6 +104,18 @@ public override bool TryBuildDeadLetterSender(IWolverineRuntime runtime, out ISe await adminClient.DeleteTopicsAsync(TopicNames); } + /// + /// Optional action to configure the TopicSpecification for each topic created by this group. + /// The string parameter is the topic name. Applied before topic creation unless is overridden. + /// + public Action? SpecificationConfig { get; set; } + + /// + /// Optional override for topic creation logic. Receives the admin client and topic name. + /// Defaults to creating the topic using if set. + /// + public new Func? CreateTopicFunc { get; set; } + new public async ValueTask SetupAsync(ILogger logger) { using var adminClient = Parent.CreateAdminClient(); @@ -112,8 +124,17 @@ public override bool TryBuildDeadLetterSender(IWolverineRuntime runtime, out ISe { try { - var spec = new TopicSpecification { Name = topicName }; - await adminClient.CreateTopicsAsync([spec]); + if (CreateTopicFunc != null) + { + await CreateTopicFunc(adminClient, topicName); + } + else + { + var spec = new TopicSpecification { Name = topicName }; + SpecificationConfig?.Invoke(topicName, spec); + await adminClient.CreateTopicsAsync([spec]); + } + logger.LogInformation("Created Kafka topic {Topic}", topicName); } catch (CreateTopicsException e) diff --git a/src/Transports/Kafka/Wolverine.Kafka/KafkaTopicGroupListenerConfiguration.cs b/src/Transports/Kafka/Wolverine.Kafka/KafkaTopicGroupListenerConfiguration.cs index 9db1ab176..a7669a790 100644 --- a/src/Transports/Kafka/Wolverine.Kafka/KafkaTopicGroupListenerConfiguration.cs +++ b/src/Transports/Kafka/Wolverine.Kafka/KafkaTopicGroupListenerConfiguration.cs @@ -1,4 +1,5 @@ using Confluent.Kafka; +using Confluent.Kafka.Admin; using Wolverine.Configuration; namespace Wolverine.Kafka; @@ -47,4 +48,45 @@ public KafkaTopicGroupListenerConfiguration EnableNativeDeadLetterQueue() add(group => group.NativeDeadLetterQueueEnabled = true); return this; } + + /// + /// Fine tune the TopicSpecification for all topics in this group if they are being created by Wolverine. + /// Use this to set partition count, replication factor, etc. uniformly across all topics. + /// + /// + /// + public KafkaTopicGroupListenerConfiguration Specification(Action configure) + { + if (configure == null) throw new ArgumentNullException(nameof(configure)); + add(group => group.SpecificationConfig = (_, spec) => configure(spec)); + return this; + } + + /// + /// Fine tune the TopicSpecification per topic in this group if they are being created by Wolverine. + /// The first parameter is the topic name, allowing per-topic configuration such as different partition counts. + /// + /// + /// + public KafkaTopicGroupListenerConfiguration Specification(Action configure) + { + if (configure == null) throw new ArgumentNullException(nameof(configure)); + add(group => group.SpecificationConfig = configure); + return this; + } + + /// + /// If you need to do anything "special" to create topics at runtime with Wolverine, + /// this overrides the simple logic that Wolverine uses and replaces it with whatever + /// you need to do having full access to the Kafka IAdminClient and the topic name. + /// Called once per topic in the group. + /// + /// + /// + public KafkaTopicGroupListenerConfiguration TopicCreation(Func creation) + { + if (creation == null) throw new ArgumentNullException(nameof(creation)); + add(group => group.CreateTopicFunc = creation); + return this; + } }