Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public static TBuilder WithClientIdFormatter<TBuilder>(this TBuilder builder, Fu
return builder;
}

public static TBuilder WithPartitionHandler<TBuilder>(this TBuilder builder, Func<object, List<TopicPartition>, IEnumerable<TopicPartitionOffset>> handler)
public static TBuilder WithPartitionAssignedHandler<TBuilder>(this TBuilder builder, Func<object, List<TopicPartition>, IEnumerable<TopicPartitionOffset>> handler)
where TBuilder : IKafkaConventionBuilder
{

Expand All @@ -82,6 +82,47 @@ public static TBuilder WithPartitionHandler<TBuilder>(this TBuilder builder, Fun

return builder;
}

public static TBuilder WithPartitionRevokedHandler<TBuilder>(this TBuilder builder, Func<object, List<TopicPartitionOffset>, IEnumerable<TopicPartitionOffset>> handler)
where TBuilder : IKafkaConventionBuilder
{

builder.Add(b =>
{
if (!b.MetaData.OfType<ConsumerHandlerMetadata>().Any())
{
b.MetaData.Add(new ConsumerHandlerMetadata());
}

foreach (var item in b.MetaData.OfType<ConsumerHandlerMetadata>())
{
item.PartitionsRevokedHandler = handler;
}
});

return builder;
}

public static TBuilder WithPartitionLostHandler<TBuilder>(this TBuilder builder, Func<object, List<TopicPartitionOffset>, IEnumerable<TopicPartitionOffset>> handler)
where TBuilder : IKafkaConventionBuilder
{

builder.Add(b =>
{
if (!b.MetaData.OfType<ConsumerHandlerMetadata>().Any())
{
b.MetaData.Add(new ConsumerHandlerMetadata());
}

foreach (var item in b.MetaData.OfType<ConsumerHandlerMetadata>())
{
item.PartitionsLostHandler = handler;
}
});

return builder;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Same delegate-type mismatch for Partitions Lost handler

Analogous to the previous comment, WithPartitionLostHandler should take Action<object, List<TopicPartitionOffset>>. Please amend the method and metadata property types consistently.

🤖 Prompt for AI Agents
In src/MinimalKafka/Extension/KafkaConsumerConfigMetadataExtensions.cs around
lines 106 to 125, the WithPartitionLostHandler method currently accepts a Func
delegate returning IEnumerable<TopicPartitionOffset>, but it should accept an
Action delegate with parameters object and List<TopicPartitionOffset> to match
the expected handler signature. Update the method parameter type and the
corresponding ConsumerHandlerMetadata property PartitionsLostHandler to use
Action<object, List<TopicPartitionOffset>> consistently.

public static TBuilder WithErrorHandler<TBuilder>(this TBuilder builder, Action<object, Error> handler)
where TBuilder : IKafkaConventionBuilder
{
Expand Down Expand Up @@ -122,6 +163,26 @@ public static TBuilder WithStatisticsHandler<TBuilder>(this TBuilder builder, Ac
return builder;
}

public static TBuilder WithLogHandler<TBuilder>(this TBuilder builder, Action<object, LogMessage> handler)
where TBuilder : IKafkaConventionBuilder
{

builder.Add(b =>
{
if (!b.MetaData.OfType<ConsumerHandlerMetadata>().Any())
{
b.MetaData.Add(new ConsumerHandlerMetadata());
}

foreach (var item in b.MetaData.OfType<ConsumerHandlerMetadata>())
{
item.LogHandler = handler;
}
});

return builder;
}

public static TBuilder WithAutoCommit<TBuilder>(this TBuilder builder, bool enabled = true)
where TBuilder : IKafkaConventionBuilder
{
Expand Down
7 changes: 5 additions & 2 deletions src/MinimalKafka/KafkaConsumerBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,20 @@ private bool GetMetaData<T>([NotNullWhen(true)] out T? metadata)
public IConsumer<TKey, TValue> Build()
{
SetDeserializers(_consumerBuilder);
SetPartitionsAssignedHandler(_consumerBuilder);
SetHandlers(_consumerBuilder);
return _consumerBuilder.Build();
}

private void SetPartitionsAssignedHandler(ConsumerBuilder<TKey, TValue> consumerBuilder)
private void SetHandlers(ConsumerBuilder<TKey, TValue> consumerBuilder)
{
if(GetMetaData<IConsumerHandlerMetadata>(out var handlers))
{
if (handlers.PartitionsAssignedHandler is not null) consumerBuilder.SetPartitionsAssignedHandler(handlers.PartitionsAssignedHandler);
if (handlers.PartitionsRevokedHandler is not null) consumerBuilder.SetPartitionsRevokedHandler(handlers.PartitionsRevokedHandler);
if (handlers.PartitionsLostHandler is not null) consumerBuilder.SetPartitionsLostHandler(handlers.PartitionsLostHandler);
if (handlers.StatisticsHandler is not null) consumerBuilder.SetStatisticsHandler(handlers.StatisticsHandler);
if (handlers.ErrorHandler is not null) consumerBuilder.SetErrorHandler(handlers.ErrorHandler);
if (handlers.LogHandler is not null) consumerBuilder.SetLogHandler(handlers.LogHandler);
}


Expand Down
13 changes: 9 additions & 4 deletions src/MinimalKafka/Metadata/IConsumerHandlerMetadata.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,20 @@ namespace MinimalKafka.Metadata;

public interface IConsumerHandlerMetadata
{
Func<object, List<TopicPartition>, IEnumerable<TopicPartitionOffset>>? PartitionsAssignedHandler { get; set; }

Action<object, string>? StatisticsHandler { get; set;}
Action<object, Error>? ErrorHandler { get; set; }
Func<object, List<TopicPartition>, IEnumerable<TopicPartitionOffset>>? PartitionsAssignedHandler { get; }
Func<object, List<TopicPartitionOffset>, IEnumerable<TopicPartitionOffset>>? PartitionsLostHandler { get; }
Func<object, List<TopicPartitionOffset>, IEnumerable<TopicPartitionOffset>>? PartitionsRevokedHandler { get; }
Action<object, string>? StatisticsHandler { get; }
Action<object, Error>? ErrorHandler { get; }
Action<object, LogMessage>? LogHandler { get; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Delegate type mismatch – these should be Action, not Func

For PartitionsLostHandler and PartitionsRevokedHandler the interface currently declares

Func<object, List<TopicPartitionOffset>, IEnumerable<TopicPartitionOffset>>?

The underlying Confluent handlers are void-returning, so exposing them as Func<…, IEnumerable<…>> is misleading and will cause compilation errors when the delegate is passed through unchanged.

-Func<object, List<TopicPartitionOffset>, IEnumerable<TopicPartitionOffset>>? PartitionsLostHandler { get; }
-Func<object, List<TopicPartitionOffset>, IEnumerable<TopicPartitionOffset>>? PartitionsRevokedHandler { get; }
+Action<object, List<TopicPartitionOffset>>? PartitionsLostHandler { get; }
+Action<object, List<TopicPartitionOffset>>? PartitionsRevokedHandler { get; }

Please update both interface and extension methods accordingly.

🤖 Prompt for AI Agents
In src/MinimalKafka/Metadata/IConsumerHandlerMetadata.cs around lines 7 to 12,
the PartitionsLostHandler and PartitionsRevokedHandler delegates are incorrectly
declared as Func returning IEnumerable, but they should be void-returning
Actions to match the underlying Confluent handlers. Change their types from
Func<object, List<TopicPartitionOffset>, IEnumerable<TopicPartitionOffset>>? to
Action<object, List<TopicPartitionOffset>>? in the interface and update any
related extension methods to use Action delegates accordingly.

}

internal class ConsumerHandlerMetadata : IConsumerHandlerMetadata
{
public Func<object, List<TopicPartition>, IEnumerable<TopicPartitionOffset>>? PartitionsAssignedHandler { get; set; }
public Func<object, List<TopicPartitionOffset>, IEnumerable<TopicPartitionOffset>>? PartitionsLostHandler { get; set; }
public Func<object, List<TopicPartitionOffset>, IEnumerable<TopicPartitionOffset>>? PartitionsRevokedHandler { get; set; }
public Action<object, string>? StatisticsHandler { get; set; }
public Action<object, Error>? ErrorHandler { get; set; }
public Action<object, LogMessage>? LogHandler { get; set; }
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Keep implementation in sync after changing interface

After adjusting the interface (previous comment), remember to change the implementation properties to Action<…> as well:

-public Action<object, List<TopicPartitionOffset>, IEnumerable<TopicPartitionOffset>>? PartitionsLostHandler { get; set; }
-public Action<object, List<TopicPartitionOffset>, IEnumerable<TopicPartitionOffset>>? PartitionsRevokedHandler { get; set; }
+public Action<object, List<TopicPartitionOffset>>? PartitionsLostHandler { get; set; }
+public Action<object, List<TopicPartitionOffset>>? PartitionsRevokedHandler { get; set; }

Failing to do so will leave the class unable to satisfy the (corrected) interface.

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In src/MinimalKafka/Metadata/IConsumerHandlerMetadata.cs around lines 17 to 23,
the interface properties PartitionsAssignedHandler, PartitionsLostHandler, and
PartitionsRevokedHandler are declared as Func types but should be changed to
Action types to match the updated interface. Update these properties to use
Action delegates with the appropriate parameters instead of Func to ensure the
class correctly implements the interface.

Loading