Skip to content

Conversation

@pmdevers
Copy link
Owner

@pmdevers pmdevers commented Jun 16, 2025

Summary by CodeRabbit

  • Enhancements
    • Expanded consumer configuration to support additional event handlers for partition revocation, partition loss, and logging.
    • Renamed and clarified partition assignment handler methods for better usability and consistency.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jun 16, 2025

Warning

Rate limit exceeded

@pmdevers has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 3 minutes and 2 seconds before requesting another review.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

📥 Commits

Reviewing files that changed from the base of the PR and between ccf1d45 and 69afb1f.

📒 Files selected for processing (2)
  • src/MinimalKafka/Extension/KafkaConsumerConfigMetadataExtensions.cs (2 hunks)
  • src/MinimalKafka/KafkaConsumerBuilder.cs (2 hunks)

Walkthrough

The KafkaConsumerBuilder class was updated to generalize and expand its handler assignment logic. The method previously responsible for setting only the partitions assigned handler was renamed and enhanced to conditionally set multiple handlers—including partitions assigned, revoked, lost, statistics, error, and log handlers—based on available metadata. Corresponding extension methods and interface properties were added or updated to support these handlers.

Changes

File Change Summary
src/MinimalKafka/KafkaConsumerBuilder.cs Renamed and expanded handler assignment method to set multiple handlers (assigned, revoked, lost, statistics, error, log).
src/MinimalKafka/Extension/KafkaConsumerConfigMetadataExtensions.cs Renamed WithPartitionHandler to WithPartitionAssignedHandler; added extension methods for revoked, lost, and log handlers.
src/MinimalKafka/Metadata/IConsumerHandlerMetadata.cs Added PartitionsLostHandler, PartitionsRevokedHandler, and LogHandler properties to interface; made existing handlers getter-only; updated implementing class accordingly.
examples/Examples/Program.cs Renamed call from .WithPartitionHandler to .WithPartitionAssignedHandler in Kafka configuration builder.

Sequence Diagram(s)

sequenceDiagram
    participant Builder as KafkaConsumerBuilder
    participant Metadata as IConsumerHandlerMetadata
    participant Consumer as ConsumerBuilder

    Builder->>Metadata: Check for handler delegates
    alt PartitionsAssignedHandler exists
        Builder->>Consumer: SetPartitionsAssignedHandler
    end
    alt PartitionsRevokedHandler exists
        Builder->>Consumer: SetPartitionsRevokedHandler
    end
    alt PartitionsLostHandler exists
        Builder->>Consumer: SetPartitionsLostHandler
    end
    alt StatisticsHandler exists
        Builder->>Consumer: SetStatisticsHandler
    end
    alt ErrorHandler exists
        Builder->>Consumer: SetErrorHandler
    end
    alt LogHandler exists
        Builder->>Consumer: SetLogHandler
    end
    Builder->>Consumer: Build consumer with assigned handlers
Loading

Possibly related PRs

  • Allow to set Consumer Handlers #82: Refactored and expanded the handler-setting logic in KafkaConsumerBuilder, directly building on the logic for assigning consumer handlers via metadata.

Poem

🐇 A builder once set just one hand,
Now many more handlers take their stand!
Assigned, revoked, and lost partitions too,
With logs and errors, the handlers grew.
Kafka consumers, now wise and aware,
With every event handled with care! 🎉

✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
src/MinimalKafka/KafkaConsumerBuilder.cs (1)

65-72: Consider covering the full Confluent-Kafka handler surface.

The new helper registers PartitionsAssigned, Statistics, and Error handlers, but there are still commonly-used hooks (e.g. PartitionsRevokedHandler, LogHandler) that might be present on the same metadata object. Expanding this block (or adding a follow-up helper) would future-proof the builder and avoid having multiple metadata types for different handler sets.

 if (handlers.PartitionsAssignedHandler is not null)
     consumerBuilder.SetPartitionsAssignedHandler(handlers.PartitionsAssignedHandler);
+if (handlers.PartitionsRevokedHandler is not null)
+    consumerBuilder.SetPartitionsRevokedHandler(handlers.PartitionsRevokedHandler);
 if (handlers.StatisticsHandler is not null)
     consumerBuilder.SetStatisticsHandler(handlers.StatisticsHandler);
 if (handlers.ErrorHandler is not null)
     consumerBuilder.SetErrorHandler(handlers.ErrorHandler);
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 66cabab and 6d96a73.

📒 Files selected for processing (1)
  • src/MinimalKafka/KafkaConsumerBuilder.cs (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: Analyze
🔇 Additional comments (1)
src/MinimalKafka/KafkaConsumerBuilder.cs (1)

61-62: Renamed helper improves clarity – looks good.

Switching to SetHandlers makes the intent of registering multiple handlers explicit and keeps Build() tidy.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (2)
src/MinimalKafka/Extension/KafkaConsumerConfigMetadataExtensions.cs (2)

66-85: Re-use pattern looks copy-pasted – extract a helper to avoid repetition

The body of every With*Handler method is identical except for the lambda that assigns a different property. Duplicating this block six times needlessly inflates the file and increases the chance that a future edit updates one branch but forgets the others.

Consider a small local helper:

private static void EnsureConsumerHandlerMetadata(this IKafkaConventionBuilder b,
                                                  Action<ConsumerHandlerMetadata> assign)
{
    if (!b.MetaData.OfType<ConsumerHandlerMetadata>().Any())
        b.MetaData.Add(new ConsumerHandlerMetadata());

    foreach (var ch in b.MetaData.OfType<ConsumerHandlerMetadata>())
        assign(ch);
}

and then:

builder.Add(b => b.EnsureConsumerHandlerMetadata(ch => 
    ch.PartitionsAssignedHandler = handler));

This single extract removes the six nearly-identical blocks and centralises the “ensure metadata” logic.


166-185: Minor: consider aligning WithLogHandler parameter order with Confluent’s LogHandler

Confluent’s LogHandler signature is Action<IConsumer<,>, LogMessage>. Your wrapper uses Action<object, LogMessage>, which is fine but diverges from the underlying API. If you don’t need to stay completely generic you might pass through the ­typed consumer – it can be handy for correlation / diagnostics.

Nothing blocking, just something to think about when finalising the public API.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6d96a73 and 0cb52f9.

📒 Files selected for processing (3)
  • src/MinimalKafka/Extension/KafkaConsumerConfigMetadataExtensions.cs (3 hunks)
  • src/MinimalKafka/KafkaConsumerBuilder.cs (1 hunks)
  • src/MinimalKafka/Metadata/IConsumerHandlerMetadata.cs (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/MinimalKafka/KafkaConsumerBuilder.cs
⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: Analyze

Comment on lines 106 to 125
public static TBuilder WithPartitionLostHandler<TBuilder>(this TBuilder builder, Func<object, List<TopicPartitionOffset>, IEnumerable<TopicPartitionOffset>> handler)
where TBuilder : IKafkaConventionBuilder
{

builder.Add(b =>
{
if (!b.MetaData.OfType<ConsumerHandlerMetadata>().Any())
{
b.MetaData.Add(new ConsumerHandlerMetadata());
}

foreach (var item in b.MetaData.OfType<ConsumerHandlerMetadata>())
{
item.PartitionsLostHandler = handler;
}
});

return builder;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Same delegate-type mismatch for Partitions Lost handler

Analogous to the previous comment, WithPartitionLostHandler should take Action<object, List<TopicPartitionOffset>>. Please amend the method and metadata property types consistently.

🤖 Prompt for AI Agents
In src/MinimalKafka/Extension/KafkaConsumerConfigMetadataExtensions.cs around
lines 106 to 125, the WithPartitionLostHandler method currently accepts a Func
delegate returning IEnumerable<TopicPartitionOffset>, but it should accept an
Action delegate with parameters object and List<TopicPartitionOffset> to match
the expected handler signature. Update the method parameter type and the
corresponding ConsumerHandlerMetadata property PartitionsLostHandler to use
Action<object, List<TopicPartitionOffset>> consistently.

Comment on lines 7 to 12
Func<object, List<TopicPartition>, IEnumerable<TopicPartitionOffset>>? PartitionsAssignedHandler { get; }
Func<object, List<TopicPartitionOffset>, IEnumerable<TopicPartitionOffset>>? PartitionsLostHandler { get; }
Func<object, List<TopicPartitionOffset>, IEnumerable<TopicPartitionOffset>>? PartitionsRevokedHandler { get; }
Action<object, string>? StatisticsHandler { get; }
Action<object, Error>? ErrorHandler { get; }
Action<object, LogMessage>? LogHandler { get; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Delegate type mismatch – these should be Action, not Func

For PartitionsLostHandler and PartitionsRevokedHandler the interface currently declares

Func<object, List<TopicPartitionOffset>, IEnumerable<TopicPartitionOffset>>?

The underlying Confluent handlers are void-returning, so exposing them as Func<…, IEnumerable<…>> is misleading and will cause compilation errors when the delegate is passed through unchanged.

-Func<object, List<TopicPartitionOffset>, IEnumerable<TopicPartitionOffset>>? PartitionsLostHandler { get; }
-Func<object, List<TopicPartitionOffset>, IEnumerable<TopicPartitionOffset>>? PartitionsRevokedHandler { get; }
+Action<object, List<TopicPartitionOffset>>? PartitionsLostHandler { get; }
+Action<object, List<TopicPartitionOffset>>? PartitionsRevokedHandler { get; }

Please update both interface and extension methods accordingly.

🤖 Prompt for AI Agents
In src/MinimalKafka/Metadata/IConsumerHandlerMetadata.cs around lines 7 to 12,
the PartitionsLostHandler and PartitionsRevokedHandler delegates are incorrectly
declared as Func returning IEnumerable, but they should be void-returning
Actions to match the underlying Confluent handlers. Change their types from
Func<object, List<TopicPartitionOffset>, IEnumerable<TopicPartitionOffset>>? to
Action<object, List<TopicPartitionOffset>>? in the interface and update any
related extension methods to use Action delegates accordingly.

Comment on lines 17 to 23
public Func<object, List<TopicPartition>, IEnumerable<TopicPartitionOffset>>? PartitionsAssignedHandler { get; set; }
public Func<object, List<TopicPartitionOffset>, IEnumerable<TopicPartitionOffset>>? PartitionsLostHandler { get; set; }
public Func<object, List<TopicPartitionOffset>, IEnumerable<TopicPartitionOffset>>? PartitionsRevokedHandler { get; set; }
public Action<object, string>? StatisticsHandler { get; set; }
public Action<object, Error>? ErrorHandler { get; set; }
public Action<object, LogMessage>? LogHandler { get; set; }
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Keep implementation in sync after changing interface

After adjusting the interface (previous comment), remember to change the implementation properties to Action<…> as well:

-public Action<object, List<TopicPartitionOffset>, IEnumerable<TopicPartitionOffset>>? PartitionsLostHandler { get; set; }
-public Action<object, List<TopicPartitionOffset>, IEnumerable<TopicPartitionOffset>>? PartitionsRevokedHandler { get; set; }
+public Action<object, List<TopicPartitionOffset>>? PartitionsLostHandler { get; set; }
+public Action<object, List<TopicPartitionOffset>>? PartitionsRevokedHandler { get; set; }

Failing to do so will leave the class unable to satisfy the (corrected) interface.

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In src/MinimalKafka/Metadata/IConsumerHandlerMetadata.cs around lines 17 to 23,
the interface properties PartitionsAssignedHandler, PartitionsLostHandler, and
PartitionsRevokedHandler are declared as Func types but should be changed to
Action types to match the updated interface. Update these properties to use
Action delegates with the appropriate parameters instead of Func to ensure the
class correctly implements the interface.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
examples/Examples/Program.cs (1)

16-18: Confirm the need to rewind every assignment to Offset.Beginning

WithPartitionAssignedHandler is now correctly invoked after the rename.
The supplied lambda, however, always seeks newly-assigned partitions to the beginning, bypassing any committed offsets.
Given that the group-id is random (Guid.NewGuid()), this is consistent with a “replay everything on each run” strategy, but it will also:

• Re-process the entire topic on every rebalance within the same process
• Inflate processing time and load on the broker for large topics

If the intent is simply “start from earliest when there is no commit yet”, consider checking WatermarkOffsets or using the default behaviour and letting AutoOffsetReset.Earliest handle the first assignment instead.

No action needed if the behaviour is deliberate—just flagging it for confirmation.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0cb52f9 and ccf1d45.

📒 Files selected for processing (3)
  • examples/Examples/Program.cs (1 hunks)
  • src/MinimalKafka/Extension/KafkaConsumerConfigMetadataExtensions.cs (3 hunks)
  • src/MinimalKafka/Metadata/IConsumerHandlerMetadata.cs (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • src/MinimalKafka/Extension/KafkaConsumerConfigMetadataExtensions.cs
  • src/MinimalKafka/Metadata/IConsumerHandlerMetadata.cs
⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: Analyze

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants