Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/guide/messaging/transports/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ To use [Kafka](https://www.confluent.io/what-is-apache-kafka/) as a messaging tr
dotnet add WolverineFx.Kafka
```

```warning
The configuration in `ConfigureConsumer()` for each topic completely overwrites any previous configuration
```

To connect to Kafka, use this syntax:

<!-- snippet: sample_bootstrapping_with_kafka -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ public static async Task configure()

// Override the consumer configuration for only this
// topic
// This is NOT combinatorial with the ConfigureConsumers() call above
// and completely replaces the parent configuration
.ConfigureConsumer(config =>
{
// This will also set the Envelope.GroupId for any
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
using JasperFx.Core;
using JasperFx.Resources;
using Microsoft.Extensions.Hosting;
using Shouldly;
using Wolverine.Tracking;
using Xunit.Abstractions;

namespace Wolverine.Kafka.Tests;

public class configuration_precedence
{
private readonly ITestOutputHelper _output;

public configuration_precedence(ITestOutputHelper output)
{
_output = output;
}

[Fact]
public async Task explicit_configuration_wins()
{
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UseKafka("localhost:9092").ConfigureConsumers( x => x.GroupId = "Conventional").AutoProvision();

opts.ListenToKafkaTopic("General").Named("General");

opts.ListenToKafkaTopic("ResponseMessages")
.ConfigureConsumer(x => x.GroupId = "Specific").Named("Specific"); // Not working as expected

opts.Services.AddResourceSetupOnStartup();
}).StartAsync();

var runtime = host.GetRuntime();

foreach (var agent in runtime.Endpoints.ActiveListeners())
{
_output.WriteLine(agent.Uri.ToString());
}

var general = runtime.Endpoints.EndpointFor("kafka://topic/General".ToUri()).ShouldBeOfType<KafkaTopic>();
general.ConsumerConfig.ShouldBeNull();

general.Parent.ConsumerConfig.GroupId.ShouldBe("Conventional");

var specific = runtime.Endpoints.EndpointFor("kafka://topic/ResponseMessages".ToUri()).ShouldBeOfType<KafkaTopic>();
specific.ConsumerConfig.GroupId.ShouldBe("Specific");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ public KafkaListenerConfiguration ReceiveRawJson(Type messageType, JsonSerialize

/// <summary>
/// Configure the consumer config for only this topic. This overrides the default
/// settings at the transport level
/// settings at the transport level. This is not combinatorial with the parent configuration
/// and overwrites all ConsumerConfig from the parent
/// </summary>
/// <param name="configuration"></param>
/// <returns></returns>
Expand Down
3 changes: 2 additions & 1 deletion src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ public static string TopicNameForUri(Uri uri)

public override ValueTask<IListener> BuildListenerAsync(IWolverineRuntime runtime, IReceiver receiver)
{
var listener = new KafkaListener(this, ConsumerConfig ?? Parent.ConsumerConfig,
var config = ConsumerConfig ?? Parent.ConsumerConfig;
var listener = new KafkaListener(this, config,
Parent.CreateConsumer(ConsumerConfig), receiver, runtime.LoggerFactory.CreateLogger<KafkaListener>());
return ValueTask.FromResult((IListener)listener);
}
Expand Down
Loading