diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/DocumentationSamples.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/DocumentationSamples.cs index 445e35081..0c6cac00e 100644 --- a/src/Transports/Kafka/Wolverine.Kafka.Tests/DocumentationSamples.cs +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/DocumentationSamples.cs @@ -68,6 +68,18 @@ public static async Task configure() spec.ReplicationFactor = 3; }) + // OR, you can completely control topic creation through this: + .TopicCreation(async (client, topic) => + { + topic.Specification.NumPartitions = 8; + topic.Specification.ReplicationFactor = 2; + + // You do have full access to the IAdminClient to do + // whatever you need to do + + await client.CreateTopicsAsync([topic.Specification]); + }) + // Override the producer configuration for just this topic .ConfigureProducer(config => { diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/configure_consumers_and_publishers.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/configure_consumers_and_publishers.cs index ee3a1939c..4d5227cc9 100644 --- a/src/Transports/Kafka/Wolverine.Kafka.Tests/configure_consumers_and_publishers.cs +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/configure_consumers_and_publishers.cs @@ -43,6 +43,11 @@ public async Task InitializeAsync() // Or explicitly make subscription rules opts.PublishMessage() .ToKafkaTopic("colors") + .TopicCreation(async (c, t) => + { + t.Specification.NumPartitions = 4; + await c.CreateTopicsAsync([t.Specification]); + }) // Override the producer configuration for just this topic .ConfigureProducer(config => diff --git a/src/Transports/Kafka/Wolverine.Kafka/KafkaListenerConfiguration.cs b/src/Transports/Kafka/Wolverine.Kafka/KafkaListenerConfiguration.cs index b4dcc6271..fb3dad49c 100644 --- a/src/Transports/Kafka/Wolverine.Kafka/KafkaListenerConfiguration.cs +++ b/src/Transports/Kafka/Wolverine.Kafka/KafkaListenerConfiguration.cs @@ -32,6 +32,25 @@ public KafkaListenerConfiguration Specification(Action confi add(topic => configure(topic.Specification)); return this; } + + /// + /// If you need to do anything "special" to create topics at runtime with Wolverine, + /// this overrides the simple logic that Wolverine uses and replaces + /// it with whatever you need to do having full access to the Kafka IAdminClient + /// and the Wolverine KafkaTopic configuration + /// + /// + /// + public KafkaListenerConfiguration TopicCreation(Func creation) + { + if (creation == null) + { + throw new ArgumentNullException(nameof(creation)); + } + + add(topic => topic.CreateTopicFunc = creation); + return this; + } /// /// Configure this endpoint to receive messages of type T from diff --git a/src/Transports/Kafka/Wolverine.Kafka/KafkaSubscriberConfiguration.cs b/src/Transports/Kafka/Wolverine.Kafka/KafkaSubscriberConfiguration.cs index b75673196..a36d844d9 100644 --- a/src/Transports/Kafka/Wolverine.Kafka/KafkaSubscriberConfiguration.cs +++ b/src/Transports/Kafka/Wolverine.Kafka/KafkaSubscriberConfiguration.cs @@ -11,6 +11,25 @@ public class KafkaSubscriberConfiguration : InteroperableSubscriberConfiguration internal KafkaSubscriberConfiguration(KafkaTopic endpoint) : base(endpoint) { } + + /// + /// If you need to do anything "special" to create topics at runtime with Wolverine, + /// this overrides the simple logic that Wolverine uses and replaces + /// it with whatever you need to do having full access to the Kafka IAdminClient + /// and the Wolverine KafkaTopic configuration + /// + /// + /// + public KafkaSubscriberConfiguration TopicCreation(Func creation) + { + if (creation == null) + { + throw new ArgumentNullException(nameof(creation)); + } + + add(topic => topic.CreateTopicFunc = creation); + return this; + } /// /// Fine tune the TopicSpecification for this Kafka Topic if it is being created by Wolverine diff --git a/src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs b/src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs index b2cf9b3f8..7f3b15a75 100644 --- a/src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs +++ b/src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs @@ -111,15 +111,11 @@ public async ValueTask SetupAsync(ILogger logger) if (TopicName == WolverineTopicsName) return; // don't care, this is just a marker using var adminClient = Parent.CreateAdminClient(); + Specification.Name = TopicName; try { - Specification.Name = TopicName; - - await adminClient.CreateTopicsAsync( - [ - Specification - ]); + await CreateTopicFunc(adminClient, this); logger.LogInformation("Created Kafka topic {Topic}", TopicName); } @@ -129,6 +125,11 @@ await adminClient.CreateTopicsAsync( throw; } } + + /// + /// Override how this Kafka topic is created + /// + public Func CreateTopicFunc { get; internal set; } = (c, t) => c.CreateTopicsAsync([t.Specification]); } public enum QualityOfService