Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/Transports/Kafka/Wolverine.Kafka.Tests/DocumentationSamples.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,18 @@ public static async Task configure()
spec.ReplicationFactor = 3;
})

// OR, you can completely control topic creation through this:
.TopicCreation(async (client, topic) =>
{
topic.Specification.NumPartitions = 8;
topic.Specification.ReplicationFactor = 2;

// You do have full access to the IAdminClient to do
// whatever you need to do

await client.CreateTopicsAsync([topic.Specification]);
})

// Override the producer configuration for just this topic
.ConfigureProducer(config =>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ public async Task InitializeAsync()
// Or explicitly make subscription rules
opts.PublishMessage<ColorMessage>()
.ToKafkaTopic("colors")
.TopicCreation(async (c, t) =>
{
t.Specification.NumPartitions = 4;
await c.CreateTopicsAsync([t.Specification]);
})

// Override the producer configuration for just this topic
.ConfigureProducer(config =>
Expand Down
19 changes: 19 additions & 0 deletions src/Transports/Kafka/Wolverine.Kafka/KafkaListenerConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,25 @@ public KafkaListenerConfiguration Specification(Action<TopicSpecification> confi
add(topic => configure(topic.Specification));
return this;
}

/// <summary>
/// If you need to do anything "special" to create topics at runtime with Wolverine,
/// this overrides the simple logic that Wolverine uses and replaces
/// it with whatever you need to do having full access to the Kafka IAdminClient
/// and the Wolverine KafkaTopic configuration
/// </summary>
/// <param name="creation"></param>
/// <returns></returns>
public KafkaListenerConfiguration TopicCreation(Func<IAdminClient, KafkaTopic, Task> creation)
{
if (creation == null)
{
throw new ArgumentNullException(nameof(creation));
}

add(topic => topic.CreateTopicFunc = creation);
return this;
}

/// <summary>
/// Configure this endpoint to receive messages of type T from
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,25 @@ public class KafkaSubscriberConfiguration : InteroperableSubscriberConfiguration
internal KafkaSubscriberConfiguration(KafkaTopic endpoint) : base(endpoint)
{
}

/// <summary>
/// If you need to do anything "special" to create topics at runtime with Wolverine,
/// this overrides the simple logic that Wolverine uses and replaces
/// it with whatever you need to do having full access to the Kafka IAdminClient
/// and the Wolverine KafkaTopic configuration
/// </summary>
/// <param name="creation"></param>
/// <returns></returns>
public KafkaSubscriberConfiguration TopicCreation(Func<IAdminClient, KafkaTopic, Task> creation)
{
if (creation == null)
{
throw new ArgumentNullException(nameof(creation));
}

add(topic => topic.CreateTopicFunc = creation);
return this;
}

/// <summary>
/// Fine tune the TopicSpecification for this Kafka Topic if it is being created by Wolverine
Expand Down
13 changes: 7 additions & 6 deletions src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,11 @@ public async ValueTask SetupAsync(ILogger logger)
if (TopicName == WolverineTopicsName) return; // don't care, this is just a marker

using var adminClient = Parent.CreateAdminClient();
Specification.Name = TopicName;

try
{
Specification.Name = TopicName;

await adminClient.CreateTopicsAsync(
[
Specification
]);
await CreateTopicFunc(adminClient, this);

logger.LogInformation("Created Kafka topic {Topic}", TopicName);
}
Expand All @@ -129,6 +125,11 @@ await adminClient.CreateTopicsAsync(
throw;
}
}

/// <summary>
/// Override how this Kafka topic is created
/// </summary>
public Func<IAdminClient, KafkaTopic, Task> CreateTopicFunc { get; internal set; } = (c, t) => c.CreateTopicsAsync([t.Specification]);
}

public enum QualityOfService
Expand Down
Loading