Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/Examples/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
.WithConfiguration(builder.Configuration.GetSection("Kafka"))
.WithGroupId(Guid.NewGuid().ToString())
.WithOffsetReset(AutoOffsetReset.Earliest)
.WithPartitionHandler((_, p) => {
.WithPartitionAssignedHandler((_, p) => {
return p.Select(tp => new TopicPartitionOffset(tp, Offset.Beginning));
})
.WithJsonSerializers()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public static TBuilder WithClientIdFormatter<TBuilder>(this TBuilder builder, Fu
return builder;
}

public static TBuilder WithPartitionHandler<TBuilder>(this TBuilder builder, Func<object, List<TopicPartition>, IEnumerable<TopicPartitionOffset>> handler)
public static TBuilder WithPartitionAssignedHandler<TBuilder>(this TBuilder builder, Func<object, List<TopicPartition>, IEnumerable<TopicPartitionOffset>> handler)
where TBuilder : IKafkaConventionBuilder
{

Expand All @@ -82,6 +82,47 @@ public static TBuilder WithPartitionHandler<TBuilder>(this TBuilder builder, Fun

return builder;
}

public static TBuilder WithPartitionRevokedHandler<TBuilder>(this TBuilder builder, Action<object, List<TopicPartitionOffset>> handler)
where TBuilder : IKafkaConventionBuilder
{

builder.Add(b =>
{
if (!b.MetaData.OfType<ConsumerHandlerMetadata>().Any())
{
b.MetaData.Add(new ConsumerHandlerMetadata());
}

foreach (var item in b.MetaData.OfType<ConsumerHandlerMetadata>())
{
item.PartitionsRevokedHandler = handler;
}
});

return builder;
}

public static TBuilder WithPartitionLostHandler<TBuilder>(this TBuilder builder, Func<object, List<TopicPartitionOffset>, IEnumerable<TopicPartitionOffset>> handler)
where TBuilder : IKafkaConventionBuilder
{

builder.Add(b =>
{
if (!b.MetaData.OfType<ConsumerHandlerMetadata>().Any())
{
b.MetaData.Add(new ConsumerHandlerMetadata());
}

foreach (var item in b.MetaData.OfType<ConsumerHandlerMetadata>())
{
item.PartitionsLostHandler = handler;
}
});

return builder;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Same delegate-type mismatch for Partitions Lost handler

Analogous to the previous comment, WithPartitionLostHandler should take Action<object, List<TopicPartitionOffset>>. Please amend the method and metadata property types consistently.

🤖 Prompt for AI Agents
In src/MinimalKafka/Extension/KafkaConsumerConfigMetadataExtensions.cs around
lines 106 to 125, the WithPartitionLostHandler method currently accepts a Func
delegate returning IEnumerable<TopicPartitionOffset>, but it should accept an
Action delegate with parameters object and List<TopicPartitionOffset> to match
the expected handler signature. Update the method parameter type and the
corresponding ConsumerHandlerMetadata property PartitionsLostHandler to use
Action<object, List<TopicPartitionOffset>> consistently.

public static TBuilder WithErrorHandler<TBuilder>(this TBuilder builder, Action<object, Error> handler)
where TBuilder : IKafkaConventionBuilder
{
Expand Down Expand Up @@ -122,6 +163,26 @@ public static TBuilder WithStatisticsHandler<TBuilder>(this TBuilder builder, Ac
return builder;
}

public static TBuilder WithLogHandler<TBuilder>(this TBuilder builder, Action<object, LogMessage> handler)
where TBuilder : IKafkaConventionBuilder
{

builder.Add(b =>
{
if (!b.MetaData.OfType<ConsumerHandlerMetadata>().Any())
{
b.MetaData.Add(new ConsumerHandlerMetadata());
}

foreach (var item in b.MetaData.OfType<ConsumerHandlerMetadata>())
{
item.LogHandler = handler;
}
});

return builder;
}

public static TBuilder WithAutoCommit<TBuilder>(this TBuilder builder, bool enabled = true)
where TBuilder : IKafkaConventionBuilder
{
Expand Down
7 changes: 5 additions & 2 deletions src/MinimalKafka/KafkaConsumerBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,20 @@ private bool GetMetaData<T>([NotNullWhen(true)] out T? metadata)
public IConsumer<TKey, TValue> Build()
{
SetDeserializers(_consumerBuilder);
SetPartitionsAssignedHandler(_consumerBuilder);
SetHandlers(_consumerBuilder);
return _consumerBuilder.Build();
}

private void SetPartitionsAssignedHandler(ConsumerBuilder<TKey, TValue> consumerBuilder)
private void SetHandlers(ConsumerBuilder<TKey, TValue> consumerBuilder)
{
if(GetMetaData<IConsumerHandlerMetadata>(out var handlers))
{
if (handlers.PartitionsAssignedHandler is not null) consumerBuilder.SetPartitionsAssignedHandler(handlers.PartitionsAssignedHandler);
if (handlers.PartitionsRevokedHandler is not null) consumerBuilder.SetPartitionsRevokedHandler(handlers.PartitionsRevokedHandler);
if (handlers.PartitionsLostHandler is not null) consumerBuilder.SetPartitionsLostHandler(handlers.PartitionsLostHandler);
if (handlers.StatisticsHandler is not null) consumerBuilder.SetStatisticsHandler(handlers.StatisticsHandler);
if (handlers.ErrorHandler is not null) consumerBuilder.SetErrorHandler(handlers.ErrorHandler);
if (handlers.LogHandler is not null) consumerBuilder.SetLogHandler(handlers.LogHandler);
}


Expand Down
13 changes: 9 additions & 4 deletions src/MinimalKafka/Metadata/IConsumerHandlerMetadata.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,20 @@ namespace MinimalKafka.Metadata;

public interface IConsumerHandlerMetadata
{
Func<object, List<TopicPartition>, IEnumerable<TopicPartitionOffset>>? PartitionsAssignedHandler { get; set; }

Action<object, string>? StatisticsHandler { get; set;}
Action<object, Error>? ErrorHandler { get; set; }
Func<object, List<TopicPartition>, IEnumerable<TopicPartitionOffset>>? PartitionsAssignedHandler { get; }
Func<object, List<TopicPartitionOffset>, IEnumerable<TopicPartitionOffset>>? PartitionsLostHandler { get; }
Action<object, List<TopicPartitionOffset>>? PartitionsRevokedHandler { get; }
Action<object, string>? StatisticsHandler { get; }
Action<object, Error>? ErrorHandler { get; }
Action<object, LogMessage>? LogHandler { get; }
}

internal class ConsumerHandlerMetadata : IConsumerHandlerMetadata
{
public Func<object, List<TopicPartition>, IEnumerable<TopicPartitionOffset>>? PartitionsAssignedHandler { get; set; }
public Func<object, List<TopicPartitionOffset>, IEnumerable<TopicPartitionOffset>>? PartitionsLostHandler { get; set; }
public Action<object, List<TopicPartitionOffset>>? PartitionsRevokedHandler { get; set; }
public Action<object, string>? StatisticsHandler { get; set; }
public Action<object, Error>? ErrorHandler { get; set; }
public Action<object, LogMessage>? LogHandler { get; set; }
}