From ea289e1736c1b95a6a0ca9c7070909858291c2a0 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Fri, 14 Nov 2025 15:20:22 -0600 Subject: [PATCH] Consumer only mode for the Kafka transport. Closes GH-1800 --- docs/guide/messaging/transports/kafka.md | 7 ++++++ .../DocumentationSamples.cs | 20 +++++++++++++++++ .../KafkaTransportTests.cs | 8 +++++++ .../disable_requeueing.cs | 22 +++++++++++++++++++ .../Internals/KafkaTransport.cs | 9 ++++++++ .../Kafka/Wolverine.Kafka/KafkaTopic.cs | 3 +++ .../KafkaTransportExpression.cs | 12 ++++++++++ 7 files changed, 81 insertions(+) create mode 100644 src/Transports/Kafka/Wolverine.Kafka.Tests/disable_requeueing.cs diff --git a/docs/guide/messaging/transports/kafka.md b/docs/guide/messaging/transports/kafka.md index 606f4aaf5..8a4f640d8 100644 --- a/docs/guide/messaging/transports/kafka.md +++ b/docs/guide/messaging/transports/kafka.md @@ -234,3 +234,10 @@ using var host = await Host.CreateDefaultBuilder() Note that the `Uri` scheme within Wolverine for any endpoints from a "named" Kafka broker is the name that you supply for the broker. So in the example above, you might see `Uri` values for `emea://colors` or `americas://red`. + +## Disabling all Sending + +Hey, you might have an application that only consumes Kafka messages, but there are a *few* diagnostics in Wolverine that +try to send messages. To completely eliminate that, you can disable all message sending in Wolverine like this: + +snippet: sample_disable_all_kafka_sending \ No newline at end of file diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/DocumentationSamples.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/DocumentationSamples.cs index 09f5520da..d2349d64c 100644 --- a/src/Transports/Kafka/Wolverine.Kafka.Tests/DocumentationSamples.cs +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/DocumentationSamples.cs @@ -100,6 +100,26 @@ public static async Task configure() #endregion } + public static async Task disable_producing() + { + #region sample_disable_all_kafka_sending + + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts + .UseKafka("localhost:9092") + + // Tell Wolverine that this application will never + // produce messages to turn off any diagnostics that might + // try to "ping" a topic and result in errors + .ConsumeOnly(); + + }).StartAsync(); + + #endregion + } + public static async Task use_named_brokers() { #region sample_using_multiple_kafka_brokers diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/KafkaTransportTests.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/KafkaTransportTests.cs index f8dafa436..75b976a47 100644 --- a/src/Transports/Kafka/Wolverine.Kafka.Tests/KafkaTransportTests.cs +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/KafkaTransportTests.cs @@ -32,4 +32,12 @@ public void endpoint_name_is_topic_name() new KafkaTopic(transport, "one.two", EndpointRole.Application) .EndpointName.ShouldBe("one.two"); } + + [Fact] + public void produce_and_consume_by_default() + { + new KafkaTransport().Usage.ShouldBe(KafkaUsage.ProduceAndConsume); + } + + } \ No newline at end of file diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/disable_requeueing.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/disable_requeueing.cs new file mode 100644 index 000000000..eb4470b8d --- /dev/null +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/disable_requeueing.cs @@ -0,0 +1,22 @@ +using Microsoft.Extensions.Hosting; +using Shouldly; +using Wolverine.Kafka.Internals; +using Wolverine.Tracking; + +namespace Wolverine.Kafka.Tests; + +public class disable_requeueing +{ + [Fact] + public async Task can_disable() + { + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseKafka("").ConsumeOnly(); + }).StartAsync(); + + host.GetRuntime().Options.Transports.GetOrCreate() + .Usage.ShouldBe(KafkaUsage.ConsumeOnly); + } +} \ No newline at end of file diff --git a/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaTransport.cs b/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaTransport.cs index c3bd1b50b..87fbb1298 100644 --- a/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaTransport.cs +++ b/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaTransport.cs @@ -7,6 +7,13 @@ namespace Wolverine.Kafka.Internals; +public enum KafkaUsage +{ + ProduceAndConsume, + ProduceOnly, + ConsumeOnly +} + public class KafkaTransport : BrokerTransport { public Cache Topics { get; } @@ -30,6 +37,8 @@ public KafkaTransport(string protocol) : base(protocol, "Kafka Topics") Topics = new Cache(topicName => new KafkaTopic(this, topicName, EndpointRole.Application)); } + public KafkaUsage Usage { get; set; } = KafkaUsage.ProduceAndConsume; + public override Uri ResourceUri { get diff --git a/src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs b/src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs index f6c6cb5c0..7055f774e 100644 --- a/src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs +++ b/src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs @@ -73,6 +73,9 @@ protected override ISender CreateSender(IWolverineRuntime runtime) public async ValueTask CheckAsync() { + // Can't do anything about this + if (Parent.Usage == KafkaUsage.ConsumeOnly) return true; + if (TopicName == WolverineTopicsName) return true; // don't care, this is just a marker try { diff --git a/src/Transports/Kafka/Wolverine.Kafka/KafkaTransportExpression.cs b/src/Transports/Kafka/Wolverine.Kafka/KafkaTransportExpression.cs index 54ef95ad6..e2941ac44 100644 --- a/src/Transports/Kafka/Wolverine.Kafka/KafkaTransportExpression.cs +++ b/src/Transports/Kafka/Wolverine.Kafka/KafkaTransportExpression.cs @@ -115,4 +115,16 @@ protected override KafkaSubscriberConfiguration createSubscriberExpression(Kafka { return new KafkaSubscriberConfiguration(subscriberEndpoint); } + + /// + /// In normal usage Wolverine will try to create a producer for each topic it listens to just for + /// "Requeue" actions. In the case of an application that only consumes messages from Kafka, use + /// this setting to disable that behavior and eliminate any error messages from that behavior + /// + /// + public KafkaTransportExpression ConsumeOnly() + { + _transport.Usage = KafkaUsage.ConsumeOnly; + return this; + } }