Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/guide/messaging/transports/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -234,3 +234,10 @@ using var host = await Host.CreateDefaultBuilder()

Note that the `Uri` scheme within Wolverine for any endpoints from a "named" Kafka broker is the name that you supply
for the broker. So in the example above, you might see `Uri` values for `emea://colors` or `americas://red`.

## Disabling all Sending

Hey, you might have an application that only consumes Kafka messages, but there are a *few* diagnostics in Wolverine that
try to send messages. To completely eliminate that, you can disable all message sending in Wolverine like this:

snippet: sample_disable_all_kafka_sending
20 changes: 20 additions & 0 deletions src/Transports/Kafka/Wolverine.Kafka.Tests/DocumentationSamples.cs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,26 @@ public static async Task configure()
#endregion
}

public static async Task disable_producing()
{
#region sample_disable_all_kafka_sending

using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts
.UseKafka("localhost:9092")

// Tell Wolverine that this application will never
// produce messages to turn off any diagnostics that might
// try to "ping" a topic and result in errors
.ConsumeOnly();

}).StartAsync();

#endregion
}

public static async Task use_named_brokers()
{
#region sample_using_multiple_kafka_brokers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,12 @@ public void endpoint_name_is_topic_name()
new KafkaTopic(transport, "one.two", EndpointRole.Application)
.EndpointName.ShouldBe("one.two");
}

[Fact]
public void produce_and_consume_by_default()
{
new KafkaTransport().Usage.ShouldBe(KafkaUsage.ProduceAndConsume);
}


}
22 changes: 22 additions & 0 deletions src/Transports/Kafka/Wolverine.Kafka.Tests/disable_requeueing.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
using Microsoft.Extensions.Hosting;
using Shouldly;
using Wolverine.Kafka.Internals;
using Wolverine.Tracking;

namespace Wolverine.Kafka.Tests;

public class disable_requeueing
{
[Fact]
public async Task can_disable()
{
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UseKafka("").ConsumeOnly();
}).StartAsync();

host.GetRuntime().Options.Transports.GetOrCreate<KafkaTransport>()
.Usage.ShouldBe(KafkaUsage.ConsumeOnly);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@

namespace Wolverine.Kafka.Internals;

public enum KafkaUsage
{
ProduceAndConsume,
ProduceOnly,
ConsumeOnly
}

public class KafkaTransport : BrokerTransport<KafkaTopic>
{
public Cache<string, KafkaTopic> Topics { get; }
Expand All @@ -30,6 +37,8 @@ public KafkaTransport(string protocol) : base(protocol, "Kafka Topics")
Topics = new Cache<string, KafkaTopic>(topicName => new KafkaTopic(this, topicName, EndpointRole.Application));
}

public KafkaUsage Usage { get; set; } = KafkaUsage.ProduceAndConsume;

public override Uri ResourceUri
{
get
Expand Down
3 changes: 3 additions & 0 deletions src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ protected override ISender CreateSender(IWolverineRuntime runtime)

public async ValueTask<bool> CheckAsync()
{
// Can't do anything about this
if (Parent.Usage == KafkaUsage.ConsumeOnly) return true;

if (TopicName == WolverineTopicsName) return true; // don't care, this is just a marker
try
{
Expand Down
12 changes: 12 additions & 0 deletions src/Transports/Kafka/Wolverine.Kafka/KafkaTransportExpression.cs
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,16 @@ protected override KafkaSubscriberConfiguration createSubscriberExpression(Kafka
{
return new KafkaSubscriberConfiguration(subscriberEndpoint);
}

/// <summary>
/// In normal usage Wolverine will try to create a producer for each topic it listens to just for
/// "Requeue" actions. In the case of an application that only consumes messages from Kafka, use
/// this setting to disable that behavior and eliminate any error messages from that behavior
/// </summary>
/// <returns></returns>
public KafkaTransportExpression ConsumeOnly()
{
_transport.Usage = KafkaUsage.ConsumeOnly;
return this;
}
}
Loading