Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions src/Transports/Kafka/Wolverine.Kafka.Tests/KafkaTransportTests.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,91 @@
using Confluent.Kafka.Admin;
using Shouldly;
using Wolverine.Configuration;
using Wolverine.Kafka.Internals;

namespace Wolverine.Kafka.Tests;

public class KafkaTopicGroupConfigurationTests
{
private KafkaTopicGroup BuildGroup(params string[] topics)
{
var transport = new KafkaTransport();
var group = new KafkaTopicGroup(transport, topics, EndpointRole.Application);
return group;
}

[Fact]
public void specification_uniform_sets_config_on_group()
{
var group = BuildGroup("topic-a", "topic-b");
new KafkaTopicGroupListenerConfiguration(group)
.Specification(spec => spec.NumPartitions = 12);

var capturedSpec = new TopicSpecification { Name = "topic-a" };
group.SpecificationConfig.ShouldNotBeNull();
group.SpecificationConfig!("topic-a", capturedSpec);
capturedSpec.NumPartitions.ShouldBe(12);
}

[Fact]
public void specification_per_topic_receives_topic_name()
{
var group = BuildGroup("topic-a", "topic-b");
new KafkaTopicGroupListenerConfiguration(group)
.Specification((topicName, spec) => spec.NumPartitions = topicName == "topic-a" ? 6 : 24);

group.SpecificationConfig.ShouldNotBeNull();

var specA = new TopicSpecification { Name = "topic-a" };
group.SpecificationConfig!("topic-a", specA);
specA.NumPartitions.ShouldBe(6);

var specB = new TopicSpecification { Name = "topic-b" };
group.SpecificationConfig!("topic-b", specB);
specB.NumPartitions.ShouldBe(24);
}

[Fact]
public void topic_creation_sets_func_on_group()
{
var group = BuildGroup("topic-a", "topic-b");
Func<Confluent.Kafka.IAdminClient, string, Task> func = (_, _) => Task.CompletedTask;

new KafkaTopicGroupListenerConfiguration(group)
.TopicCreation(func);

group.CreateTopicFunc.ShouldNotBeNull();
group.CreateTopicFunc.ShouldBeSameAs(func);
}

[Fact]
public void specification_null_throws()
{
var group = BuildGroup("topic-a");
Should.Throw<ArgumentNullException>(() =>
new KafkaTopicGroupListenerConfiguration(group)
.Specification((Action<TopicSpecification>)null!));
}

[Fact]
public void specification_per_topic_null_throws()
{
var group = BuildGroup("topic-a");
Should.Throw<ArgumentNullException>(() =>
new KafkaTopicGroupListenerConfiguration(group)
.Specification((Action<string, TopicSpecification>)null!));
}

[Fact]
public void topic_creation_null_throws()
{
var group = BuildGroup("topic-a");
Should.Throw<ArgumentNullException>(() =>
new KafkaTopicGroupListenerConfiguration(group)
.TopicCreation(null!));
}
}

public class KafkaTransportTests
{
[Theory]
Expand Down
25 changes: 23 additions & 2 deletions src/Transports/Kafka/Wolverine.Kafka/KafkaTopicGroup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,18 @@ public override bool TryBuildDeadLetterSender(IWolverineRuntime runtime, out ISe
await adminClient.DeleteTopicsAsync(TopicNames);
}

/// <summary>
/// Optional action to configure the TopicSpecification for each topic created by this group.
/// The string parameter is the topic name. Applied before topic creation unless <see cref="CreateTopicFunc"/> is overridden.
/// </summary>
public Action<string, TopicSpecification>? SpecificationConfig { get; set; }

/// <summary>
/// Optional override for topic creation logic. Receives the admin client and topic name.
/// Defaults to creating the topic using <see cref="SpecificationConfig"/> if set.
/// </summary>
public new Func<IAdminClient, string, Task>? CreateTopicFunc { get; set; }

new public async ValueTask SetupAsync(ILogger logger)
{
using var adminClient = Parent.CreateAdminClient();
Expand All @@ -112,8 +124,17 @@ public override bool TryBuildDeadLetterSender(IWolverineRuntime runtime, out ISe
{
try
{
var spec = new TopicSpecification { Name = topicName };
await adminClient.CreateTopicsAsync([spec]);
if (CreateTopicFunc != null)
{
await CreateTopicFunc(adminClient, topicName);
}
else
{
var spec = new TopicSpecification { Name = topicName };
SpecificationConfig?.Invoke(topicName, spec);
await adminClient.CreateTopicsAsync([spec]);
}

logger.LogInformation("Created Kafka topic {Topic}", topicName);
}
catch (CreateTopicsException e)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Confluent.Kafka;
using Confluent.Kafka.Admin;
using Wolverine.Configuration;

namespace Wolverine.Kafka;
Expand Down Expand Up @@ -47,4 +48,45 @@ public KafkaTopicGroupListenerConfiguration EnableNativeDeadLetterQueue()
add(group => group.NativeDeadLetterQueueEnabled = true);
return this;
}

/// <summary>
/// Fine tune the TopicSpecification for all topics in this group if they are being created by Wolverine.
/// Use this to set partition count, replication factor, etc. uniformly across all topics.
/// </summary>
/// <param name="configure"></param>
/// <returns></returns>
public KafkaTopicGroupListenerConfiguration Specification(Action<TopicSpecification> configure)
{
if (configure == null) throw new ArgumentNullException(nameof(configure));
add(group => group.SpecificationConfig = (_, spec) => configure(spec));
return this;
}

/// <summary>
/// Fine tune the TopicSpecification per topic in this group if they are being created by Wolverine.
/// The first parameter is the topic name, allowing per-topic configuration such as different partition counts.
/// </summary>
/// <param name="configure"></param>
/// <returns></returns>
public KafkaTopicGroupListenerConfiguration Specification(Action<string, TopicSpecification> configure)
{
if (configure == null) throw new ArgumentNullException(nameof(configure));
add(group => group.SpecificationConfig = configure);
return this;
}

/// <summary>
/// If you need to do anything "special" to create topics at runtime with Wolverine,
/// this overrides the simple logic that Wolverine uses and replaces it with whatever
/// you need to do having full access to the Kafka IAdminClient and the topic name.
/// Called once per topic in the group.
/// </summary>
/// <param name="creation"></param>
/// <returns></returns>
public KafkaTopicGroupListenerConfiguration TopicCreation(Func<IAdminClient, string, Task> creation)
{
if (creation == null) throw new ArgumentNullException(nameof(creation));
add(group => group.CreateTopicFunc = creation);
return this;
}
}
Loading