From 944cc6b5ae4eda0b0a565309d50a1de177bae1bb Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Mon, 21 Jul 2025 17:56:35 -0500 Subject: [PATCH] Doc updates for configuration within Kafka. Closes GH-1554 --- docs/guide/messaging/transports/kafka.md | 4 ++ .../DocumentationSamples.cs | 2 + .../configuration_precedence.cs | 50 +++++++++++++++++++ .../KafkaListenerConfiguration.cs | 3 +- .../Kafka/Wolverine.Kafka/KafkaTopic.cs | 3 +- 5 files changed, 60 insertions(+), 2 deletions(-) create mode 100644 src/Transports/Kafka/Wolverine.Kafka.Tests/configuration_precedence.cs diff --git a/docs/guide/messaging/transports/kafka.md b/docs/guide/messaging/transports/kafka.md index dc7911356..e95fb8c1e 100644 --- a/docs/guide/messaging/transports/kafka.md +++ b/docs/guide/messaging/transports/kafka.md @@ -13,6 +13,10 @@ To use [Kafka](https://www.confluent.io/what-is-apache-kafka/) as a messaging tr dotnet add WolverineFx.Kafka ``` +```warning +The configuration in `ConfigureConsumer()` for each topic completely overwrites any previous configuration +``` + To connect to Kafka, use this syntax: diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/DocumentationSamples.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/DocumentationSamples.cs index d7234780b..0868d6bbb 100644 --- a/src/Transports/Kafka/Wolverine.Kafka.Tests/DocumentationSamples.cs +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/DocumentationSamples.cs @@ -71,6 +71,8 @@ public static async Task configure() // Override the consumer configuration for only this // topic + // This is NOT combinatorial with the ConfigureConsumers() call above + // and completely replaces the parent configuration .ConfigureConsumer(config => { // This will also set the Envelope.GroupId for any diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/configuration_precedence.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/configuration_precedence.cs new file mode 100644 index 000000000..14b7ea71a --- /dev/null +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/configuration_precedence.cs @@ -0,0 +1,50 @@ +using JasperFx.Core; +using JasperFx.Resources; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Wolverine.Tracking; +using Xunit.Abstractions; + +namespace Wolverine.Kafka.Tests; + +public class configuration_precedence +{ + private readonly ITestOutputHelper _output; + + public configuration_precedence(ITestOutputHelper output) + { + _output = output; + } + + [Fact] + public async Task explicit_configuration_wins() + { + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseKafka("localhost:9092").ConfigureConsumers( x => x.GroupId = "Conventional").AutoProvision(); + + opts.ListenToKafkaTopic("General").Named("General"); + + opts.ListenToKafkaTopic("ResponseMessages") + .ConfigureConsumer(x => x.GroupId = "Specific").Named("Specific"); // Not working as expected + + opts.Services.AddResourceSetupOnStartup(); + }).StartAsync(); + + var runtime = host.GetRuntime(); + + foreach (var agent in runtime.Endpoints.ActiveListeners()) + { + _output.WriteLine(agent.Uri.ToString()); + } + + var general = runtime.Endpoints.EndpointFor("kafka://topic/General".ToUri()).ShouldBeOfType(); + general.ConsumerConfig.ShouldBeNull(); + + general.Parent.ConsumerConfig.GroupId.ShouldBe("Conventional"); + + var specific = runtime.Endpoints.EndpointFor("kafka://topic/ResponseMessages".ToUri()).ShouldBeOfType(); + specific.ConsumerConfig.GroupId.ShouldBe("Specific"); + } +} \ No newline at end of file diff --git a/src/Transports/Kafka/Wolverine.Kafka/KafkaListenerConfiguration.cs b/src/Transports/Kafka/Wolverine.Kafka/KafkaListenerConfiguration.cs index c53d9038f..da0b6f603 100644 --- a/src/Transports/Kafka/Wolverine.Kafka/KafkaListenerConfiguration.cs +++ b/src/Transports/Kafka/Wolverine.Kafka/KafkaListenerConfiguration.cs @@ -60,7 +60,8 @@ public KafkaListenerConfiguration ReceiveRawJson(Type messageType, JsonSerialize /// /// Configure the consumer config for only this topic. This overrides the default - /// settings at the transport level + /// settings at the transport level. This is not combinatorial with the parent configuration + /// and overwrites all ConsumerConfig from the parent /// /// /// diff --git a/src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs b/src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs index 8d665cc8a..4e08cfaa7 100644 --- a/src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs +++ b/src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs @@ -52,7 +52,8 @@ public static string TopicNameForUri(Uri uri) public override ValueTask BuildListenerAsync(IWolverineRuntime runtime, IReceiver receiver) { - var listener = new KafkaListener(this, ConsumerConfig ?? Parent.ConsumerConfig, + var config = ConsumerConfig ?? Parent.ConsumerConfig; + var listener = new KafkaListener(this, config, Parent.CreateConsumer(ConsumerConfig), receiver, runtime.LoggerFactory.CreateLogger()); return ValueTask.FromResult((IListener)listener); }