Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/Transports/Kafka/Wolverine.Kafka.Tests/DocumentationSamples.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ public static async Task configure()
opts.PublishMessage<ColorMessage>()
.ToKafkaTopic("colors")

// Fine tune how the Kafka Topic is declared by Wolverine
.Specification(spec =>
{
spec.NumPartitions = 6;
spec.ReplicationFactor = 3;
})

// Override the producer configuration for just this topic
.ConfigureProducer(config =>
{
Expand All @@ -85,6 +92,13 @@ public static async Task configure()
config.BootstrapServers = "localhost:9092";

// Other configuration
})

// Fine tune how the Kafka Topic is declared by Wolverine
.Specification(spec =>
{
spec.NumPartitions = 6;
spec.ReplicationFactor = 3;
});

opts.ListenToKafkaTopic("green")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
<PropertyGroup>
<IsPackable>false</IsPackable>
<IsTestProject>true</IsTestProject>
<TargetFrameworks>net8.0;net9.0</TargetFrameworks>
</PropertyGroup>

<ItemGroup>
Expand Down
18 changes: 18 additions & 0 deletions src/Transports/Kafka/Wolverine.Kafka/KafkaListenerConfiguration.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Text.Json;
using Confluent.Kafka;
using Confluent.Kafka.Admin;
using Wolverine.Configuration;
using Wolverine.Kafka.Internals;

Expand All @@ -15,6 +16,23 @@ public KafkaListenerConfiguration(Func<KafkaTopic> source) : base(source)
{
}

/// <summary>
/// Fine tune the TopicSpecification for this Kafka Topic if it is being created by Wolverine
/// </summary>
/// <param name="configure"></param>
/// <returns></returns>
/// <exception cref="ArgumentNullException"></exception>
public KafkaListenerConfiguration Specification(Action<TopicSpecification> configure)
{
if (configure == null)
{
throw new ArgumentNullException(nameof(configure));
}

add(topic => configure(topic.Specification));
return this;
}

/// <summary>
/// Configure this endpoint to receive messages of type T from
/// JSON message bodies. This option maybe be necessary to receive
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Text.Json;
using Confluent.Kafka;
using Confluent.Kafka.Admin;
using Wolverine.Configuration;
using Wolverine.Kafka.Internals;

Expand All @@ -11,6 +12,23 @@ internal KafkaSubscriberConfiguration(KafkaTopic endpoint) : base(endpoint)
{
}

/// <summary>
/// Fine tune the TopicSpecification for this Kafka Topic if it is being created by Wolverine
/// </summary>
/// <param name="configure"></param>
/// <returns></returns>
/// <exception cref="ArgumentNullException"></exception>
public KafkaSubscriberConfiguration Specification(Action<TopicSpecification> configure)
{
if (configure == null)
{
throw new ArgumentNullException(nameof(configure));
}

add(topic => configure(topic.Specification));
return this;
}

/// <summary>
/// Publish only the raw, serialized JSON representation of messages to the downstream
/// Kafka subscribers
Expand Down
11 changes: 7 additions & 4 deletions src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ public KafkaTopic(KafkaTransport parent, string topicName, EndpointRole role) :
Parent = parent;
EndpointName = topicName;
TopicName = topicName;

Specification.Name = topicName;
}

protected override KafkaEnvelopeMapper buildMapper(IWolverineRuntime runtime)
Expand All @@ -34,6 +36,8 @@ public override bool AutoStartSendingAgent()
return true;
}

public TopicSpecification Specification { get; } = new();

public string TopicName { get; }

/// <summary>
Expand Down Expand Up @@ -110,12 +114,11 @@ public async ValueTask SetupAsync(ILogger logger)

try
{
Specification.Name = TopicName;

await adminClient.CreateTopicsAsync(
[
new TopicSpecification
{
Name = TopicName
}
Specification
]);

logger.LogInformation("Created Kafka topic {Topic}", TopicName);
Expand Down
Loading