From 8fc13bfd5379b30a7e5e756b7481345ebae4dcfb Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Sat, 6 Dec 2025 11:20:22 -0500 Subject: [PATCH] Fine grained control over the Kafka TopicSpecification for Wolverine controlled topic creation. Closes GH-1795 --- .../DocumentationSamples.cs | 14 ++++++++++++++ .../Wolverine.Kafka.Tests.csproj | 1 - .../KafkaListenerConfiguration.cs | 18 ++++++++++++++++++ .../KafkaSubscriberConfiguration.cs | 18 ++++++++++++++++++ .../Kafka/Wolverine.Kafka/KafkaTopic.cs | 11 +++++++---- 5 files changed, 57 insertions(+), 5 deletions(-) diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/DocumentationSamples.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/DocumentationSamples.cs index d2349d64c..445e35081 100644 --- a/src/Transports/Kafka/Wolverine.Kafka.Tests/DocumentationSamples.cs +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/DocumentationSamples.cs @@ -61,6 +61,13 @@ public static async Task configure() opts.PublishMessage() .ToKafkaTopic("colors") + // Fine tune how the Kafka Topic is declared by Wolverine + .Specification(spec => + { + spec.NumPartitions = 6; + spec.ReplicationFactor = 3; + }) + // Override the producer configuration for just this topic .ConfigureProducer(config => { @@ -85,6 +92,13 @@ public static async Task configure() config.BootstrapServers = "localhost:9092"; // Other configuration + }) + + // Fine tune how the Kafka Topic is declared by Wolverine + .Specification(spec => + { + spec.NumPartitions = 6; + spec.ReplicationFactor = 3; }); opts.ListenToKafkaTopic("green") diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/Wolverine.Kafka.Tests.csproj b/src/Transports/Kafka/Wolverine.Kafka.Tests/Wolverine.Kafka.Tests.csproj index 6f39bc226..a297245f2 100644 --- a/src/Transports/Kafka/Wolverine.Kafka.Tests/Wolverine.Kafka.Tests.csproj +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/Wolverine.Kafka.Tests.csproj @@ -3,7 +3,6 @@ false true - net8.0;net9.0 diff --git a/src/Transports/Kafka/Wolverine.Kafka/KafkaListenerConfiguration.cs b/src/Transports/Kafka/Wolverine.Kafka/KafkaListenerConfiguration.cs index 3a146fb68..b4dcc6271 100644 --- a/src/Transports/Kafka/Wolverine.Kafka/KafkaListenerConfiguration.cs +++ b/src/Transports/Kafka/Wolverine.Kafka/KafkaListenerConfiguration.cs @@ -1,5 +1,6 @@ using System.Text.Json; using Confluent.Kafka; +using Confluent.Kafka.Admin; using Wolverine.Configuration; using Wolverine.Kafka.Internals; @@ -15,6 +16,23 @@ public KafkaListenerConfiguration(Func source) : base(source) { } + /// + /// Fine tune the TopicSpecification for this Kafka Topic if it is being created by Wolverine + /// + /// + /// + /// + public KafkaListenerConfiguration Specification(Action configure) + { + if (configure == null) + { + throw new ArgumentNullException(nameof(configure)); + } + + add(topic => configure(topic.Specification)); + return this; + } + /// /// Configure this endpoint to receive messages of type T from /// JSON message bodies. This option maybe be necessary to receive diff --git a/src/Transports/Kafka/Wolverine.Kafka/KafkaSubscriberConfiguration.cs b/src/Transports/Kafka/Wolverine.Kafka/KafkaSubscriberConfiguration.cs index 6efe3f2fc..b75673196 100644 --- a/src/Transports/Kafka/Wolverine.Kafka/KafkaSubscriberConfiguration.cs +++ b/src/Transports/Kafka/Wolverine.Kafka/KafkaSubscriberConfiguration.cs @@ -1,5 +1,6 @@ using System.Text.Json; using Confluent.Kafka; +using Confluent.Kafka.Admin; using Wolverine.Configuration; using Wolverine.Kafka.Internals; @@ -11,6 +12,23 @@ internal KafkaSubscriberConfiguration(KafkaTopic endpoint) : base(endpoint) { } + /// + /// Fine tune the TopicSpecification for this Kafka Topic if it is being created by Wolverine + /// + /// + /// + /// + public KafkaSubscriberConfiguration Specification(Action configure) + { + if (configure == null) + { + throw new ArgumentNullException(nameof(configure)); + } + + add(topic => configure(topic.Specification)); + return this; + } + /// /// Publish only the raw, serialized JSON representation of messages to the downstream /// Kafka subscribers diff --git a/src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs b/src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs index 7055f774e..b2cf9b3f8 100644 --- a/src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs +++ b/src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs @@ -22,6 +22,8 @@ public KafkaTopic(KafkaTransport parent, string topicName, EndpointRole role) : Parent = parent; EndpointName = topicName; TopicName = topicName; + + Specification.Name = topicName; } protected override KafkaEnvelopeMapper buildMapper(IWolverineRuntime runtime) @@ -34,6 +36,8 @@ public override bool AutoStartSendingAgent() return true; } + public TopicSpecification Specification { get; } = new(); + public string TopicName { get; } /// @@ -110,12 +114,11 @@ public async ValueTask SetupAsync(ILogger logger) try { + Specification.Name = TopicName; + await adminClient.CreateTopicsAsync( [ - new TopicSpecification - { - Name = TopicName - } + Specification ]); logger.LogInformation("Created Kafka topic {Topic}", TopicName);