Skip to content

Conversation

@pmdevers
Copy link
Owner

@pmdevers pmdevers commented May 29, 2025

Summary by CodeRabbit

  • New Features
    • Introduced support for configuring automatic commit of Kafka consumer offsets, allowing users to enable or disable this behavior.
  • Improvements
    • Kafka consumer processing is now fully asynchronous, improving efficiency and scalability.
    • Enhanced logging for consumer commit actions and operation cancellations.
    • Improved service lifecycle management for more robust handling of process start and stop events.
  • Bug Fixes
    • Addressed issues related to consumer shutdown and error handling during process execution.
  • Tests
    • Updated and streamlined tests to reflect changes in asynchronous consumer processing and error handling.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented May 29, 2025

Warning

Rate limit exceeded

@pmdevers has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 3 minutes and 34 seconds before requesting another review.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

📥 Commits

Reviewing files that changed from the base of the PR and between ef9b92c and 3665df9.

📒 Files selected for processing (2)
  • src/MinimalKafka/KafkaConsumer.cs (4 hunks)
  • test/MinimalKafka.Tests/RegistrationTests.cs (2 hunks)

Walkthrough

The changes introduce a configurable auto-commit mechanism for Kafka consumers, refactor the consumer's message handling to be fully asynchronous, and update logging and service control flow accordingly. The public API for consuming messages is modified to accept a delegate and cancellation token, with tests and service orchestration updated to match the new asynchronous pattern and auto-commit configuration.

Changes

File(s) Change Summary
src/MinimalKafka/Extension/KafkaConsumerConfigMetadataExtensions.cs Added WithAutoCommit<TBuilder> extension method for configuring auto-commit metadata on Kafka consumer builders.
src/MinimalKafka/Metadata/AutoCommitMetaData.cs Introduced IAutoCommitMetaData interface and AutoCommitMetaDataAttribute for auto-commit configuration; includes logic to set relevant Kafka config values.
src/MinimalKafka/Helpers/Logging.cs Updated log level and message for operation canceled; added Committing log method for offset commits.
src/MinimalKafka/KafkaConsumer.cs, src/MinimalKafka/Helpers/MetadataHelperExtensions.cs Refactored Consume methods to be asynchronous, accept a delegate; added HasAutoCommit extension; updated commit logic based on auto-commit metadata.
src/MinimalKafka/KafkaProcess.cs Simplified consumption loop to delegate message handling to the consumer's new async Consume method; removed explicit handler invocation and consumer closure in Start.
src/MinimalKafka/KafkaExtensions.cs Explicitly disables auto-commit by default in AddMinimalKafka via WithAutoCommit(false).
src/MinimalKafka/KafkaService.cs Refactored ExecuteAsync to use linked cancellation tokens, wrap process start calls in try-catch for KafkaProcesException, cancel token source on exception, and await all tasks.
test/MinimalKafka.Tests/KafkaProcessTests.cs Updated tests for new async Consume signature; removed test for empty context; revised exception handling and assertions for new flow.

Sequence Diagram(s)

sequenceDiagram
    participant KafkaService
    participant KafkaProcess
    participant KafkaConsumer
    participant Handler

    KafkaService->>KafkaProcess: Start(ct)
    KafkaProcess->>KafkaConsumer: Consume(handler, ct)
    KafkaConsumer->>KafkaConsumer: Consume message (async)
    KafkaConsumer->>Handler: Invoke handler with KafkaContext
    alt Auto-commit disabled
        KafkaConsumer->>KafkaConsumer: Log commit, store offset, commit consumer
    end
    KafkaConsumer-->>KafkaProcess: Task completes
    KafkaProcess-->>KafkaService: Start completes
Loading

Possibly related PRs

  • Make KafkaProcess Async #76: Changes KafkaProcess to be asynchronous by modifying its Start and Stop methods, closely related to this PR's async Consume method and KafkaProcess consumption handling.

Poem

A hop and a skip, the offsets commit,
With auto-commit toggled, we’re nimble and fit!
Async now reigns in our message parade,
Delegates dancing in service we’ve made.
Logs chirp with info, no critical cries—
Kafka’s new warren is clever and wise!
🐇✨

✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@pmdevers
Copy link
Owner Author

@coderabbitai review

@coderabbitai
Copy link
Contributor

coderabbitai bot commented May 29, 2025

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (1)
src/MinimalKafka/Extension/KafkaConsumerConfigMetadataExtensions.cs (1)

5-5: Remove unnecessary using directive.

The using System.Reflection.Emit; directive is not used in this file and should be removed.

-using System.Reflection.Emit;
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 755f9eb and c19e105.

📒 Files selected for processing (6)
  • src/MinimalKafka/Extension/KafkaConsumerConfigMetadataExtensions.cs (2 hunks)
  • src/MinimalKafka/KafkaConsumer.cs (5 hunks)
  • src/MinimalKafka/KafkaContext.cs (3 hunks)
  • src/MinimalKafka/KafkaProcess.cs (1 hunks)
  • src/MinimalKafka/Metadata/AutoCommitMetaData.cs (1 hunks)
  • test/MinimalKafka.Tests/KafkaProcessTests.cs (1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (2)
src/MinimalKafka/KafkaProcess.cs (1)
src/MinimalKafka/KafkaConsumer.cs (3)
  • Commit (15-15)
  • Commit (37-39)
  • Commit (97-108)
src/MinimalKafka/Extension/KafkaConsumerConfigMetadataExtensions.cs (2)
src/MinimalKafka/KafkaExtensions.cs (5)
  • TBuilder (87-99)
  • TBuilder (100-106)
  • TBuilder (107-116)
  • IKafkaConventionBuilder (75-79)
  • IKafkaConventionBuilder (80-86)
src/MinimalKafka/Extension/KafkaConsumerBuilderMetadataExtensions.cs (6)
  • TBuilder (9-14)
  • TBuilder (16-25)
  • TBuilder (27-32)
  • TBuilder (34-39)
  • TBuilder (41-50)
  • TBuilder (52-57)
🔇 Additional comments (11)
src/MinimalKafka/KafkaContext.cs (2)

13-13: LGTM! Well-designed abstraction for offset access.

The abstract Offset property provides a clean contract for accessing Kafka message offsets across different context implementations.


76-77: Clean implementation using ConsumeResult offset.

The implementation correctly delegates to the underlying ConsumeResult.TopicPartitionOffset property, maintaining consistency with Kafka client semantics.

src/MinimalKafka/KafkaProcess.cs (1)

52-52:

✅ Verification successful

Verify EmptyKafkaContext handling with offset commit.

The commit call looks correct and integrates well with the auto-commit metadata system from KafkaConsumer.cs. However, ensure that EmptyKafkaContext never reaches this point since its Offset property throws NotImplementedException.


🏁 Script executed:

#!/bin/bash
# Verify that EmptyKafkaContext is properly filtered out before commit
rg -A 10 -B 5 "EmptyKafkaContext.*continue" --type cs

Length of output: 55


🏁 Script executed:

#!/bin/bash
set -e
# Locate KafkaProcess.cs
KAFKA_PROCESS=$(fd KafkaProcess.cs | head -1)
echo "KafkaProcess.cs path: $KAFKA_PROCESS"

echo "---- Around _consumer.Commit in KafkaProcess.cs ----"
rg -A5 -B5 "_consumer.Commit" "$KAFKA_PROCESS"

echo "---- References to EmptyKafkaContext in codebase ----"
rg -n "EmptyKafkaContext" .

FILE_EMPTY=$(rg -l "class EmptyKafkaContext" --type cs || true)
if [ -n "$FILE_EMPTY" ]; then
  echo "EmptyKafkaContext defined in: $FILE_EMPTY"
  echo "---- Definition of EmptyKafkaContext ----"
  rg -A10 -B0 "class EmptyKafkaContext" "$FILE_EMPTY"
else
  echo "No EmptyKafkaContext definition found."
fi

Length of output: 2305


EmptyKafkaContext is safely filtered before committing offsets

I’ve confirmed that in src/MinimalKafka/KafkaProcess.cs the code checks:

if (context is EmptyKafkaContext)
    continue;

immediately after consuming, so context.Offset is never accessed on an EmptyKafkaContext. The commit call is therefore safe and can be approved.

src/MinimalKafka/Extension/KafkaConsumerConfigMetadataExtensions.cs (1)

66-71: Well-implemented extension method following established patterns.

The WithAutoCommit method correctly follows the same pattern as other metadata extension methods in this file, using WithSingle to add the AutoCommitMetaDataAttribute. This integrates nicely with the auto-commit system implemented across the other files.

src/MinimalKafka/Metadata/AutoCommitMetaData.cs (2)

5-7: LGTM! Clean interface design.

The marker interface follows good design patterns for metadata configuration.


9-17:

❓ Verification inconclusive

Verify the auto-commit configuration aligns with PR objectives.

The attribute enables both "enable.auto.commit" and "enable.auto.offset.store", but the PR title suggests "Default Disable AutoCommit". Please confirm this behavior aligns with the intended changes.

Additionally, setting both auto-commit flags to true while also implementing manual commit logic in the consumer (as seen in the other file) may create redundancy or conflicts in offset management.


🏁 Script executed:

#!/bin/bash
# Description: Check if there are other auto-commit configurations or default settings
# Expected: Find how auto-commit is configured by default in the codebase

# Search for auto-commit related configurations
rg -i "auto.commit|auto.offset" --type cs

# Search for default consumer configurations
rg -i "enable\.auto\." --type cs

Length of output: 482


Confirm Auto-Commit Behavior

Based on our code-wide search, the only places setting auto-commit flags are here:

• src/MinimalKafka/Metadata/AutoCommitMetaData.cs:
config.Set("enable.auto.commit", "true");
config.Set("enable.auto.offset.store", "true");

No other enable.auto.* settings were found. Since the PR is titled “Default Disable AutoCommit,” please:

  • Verify that enabling both auto-commit flags here matches the intended change.
  • Check whether you’ve implemented any manual commit logic elsewhere (e.g., calls to consumer.Commit()) and ensure these two modes won’t conflict.
src/MinimalKafka/KafkaConsumer.cs (5)

15-15: LGTM! Proper abstraction for commit functionality.

The abstract method declaration correctly establishes the commit contract for all consumer implementations.


37-39: LGTM! Appropriate no-op implementation.

The empty implementation is correct for the NoConsumer placeholder class.


78-78: LGTM! Consistent use of stored consume result.

The changes correctly store the consume result and use it consistently throughout the method.

Also applies to: 82-82, 85-85


149-150: LGTM! Clean extension method implementation.

The extension method provides a clear way to check for auto-commit metadata presence.


71-71:

✅ Verification successful

Consider thread safety for the consume result field.

The _consumeResult field is accessed across multiple methods without synchronization. Since Kafka consumers are typically single-threaded per partition, this might be acceptable, but please verify the threading model.


🏁 Script executed:

#!/bin/bash
# Description: Check how the consumer is used in the codebase to verify threading assumptions
# Expected: Find usage patterns that confirm single-threaded access

# Search for consumer usage patterns
rg -A 5 -B 5 "KafkaConsumer.*Consume" --type cs

# Look for any concurrent access patterns
rg -i "thread|task|parallel|concurrent" --type cs -A 3 -B 3

Length of output: 47727


🏁 Script executed:

#!/bin/bash
# Search for the declaration and usages of _consumeResult in the codebase
rg -n "_consumeResult" --type cs || echo "No direct matches for _consumeResult"
rg -A5 -B5 "_consumeResult" --type cs

Length of output: 3091


No thread‐safety concerns for _consumeResult
All references to _consumeResult occur within the single‐threaded KafkaProcess.Start loop on each consumer instance. Each KafkaConsumer is used by exactly one Task, and no other code invokes Consume (or accesses _consumeResult) concurrently. No additional synchronization is required.


public override DateTime Timestamp => TimeProvider.System.GetUtcNow().DateTime;

public override TopicPartitionOffset Offset => new(new("options", Partition.Any), 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

❓ Verification inconclusive

Good test implementation of the Offset property.

The test context provides a reasonable fixed offset value for testing scenarios. Consider adding specific tests to verify that the commit functionality works correctly with this offset.


🏁 Script executed:

#!/bin/bash
# Check if there are tests that verify the commit behavior
rg -A 5 -B 5 "Commit.*Offset|\.Commit\(" test/MinimalKafka.Tests/ --type cs

Length of output: 77


Missing commit behavior tests

  • I ran a search in test/MinimalKafka.Tests for any Commit or CommitOffset calls and found no existing tests covering commit behavior against the fixed Offset.
  • Please add one or more tests to verify that your commit functionality (e.g. KafkaProcess.Commit(...)) correctly uses the overridden Offset value (new TopicPartitionOffset("options", Partition.Any, 1)).
🤖 Prompt for AI Agents
In test/MinimalKafka.Tests/KafkaProcessTests.cs at line 175, the Offset property
is correctly overridden with a fixed value, but there are no tests verifying
that commit functionality uses this Offset. Add one or more unit tests that call
the commit method (e.g., KafkaProcess.Commit) and assert that it commits using
the overridden Offset value (TopicPartitionOffset with "options", Partition.Any,
and offset 1). This will ensure the commit behavior is properly tested against
the fixed Offset.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c19e105 and ec5210c.

📒 Files selected for processing (1)
  • src/MinimalKafka/KafkaContext.cs (3 hunks)
🧰 Additional context used
🪛 GitHub Check: Analyze
src/MinimalKafka/KafkaContext.cs

[failure] 59-59:
'Partition' does not contain a definition for 'Unassigned'


[failure] 60-60:
'TopicPartitionOffset' does not contain a definition for 'Unset' and no accessible extension method 'Unset' accepting a first argument of type 'TopicPartitionOffset' could be found (are you missing a using directive or an assembly reference?)


[failure] 58-58:
Add a way to break out of this property's recursion. (https://rules.sonarsource.com/csharp/RSPEC-2190)


[failure] 59-59:
'Partition' does not contain a definition for 'Unassigned'


[failure] 60-60:
'TopicPartitionOffset' does not contain a definition for 'Unset' and no accessible extension method 'Unset' accepting a first argument of type 'TopicPartitionOffset' could be found (are you missing a using directive or an assembly reference?)


[failure] 58-58:
Add a way to break out of this property's recursion. (https://rules.sonarsource.com/csharp/RSPEC-2190)

🪛 GitHub Check: Build
src/MinimalKafka/KafkaContext.cs

[failure] 59-59:
'Partition' does not contain a definition for 'Unassigned'


[failure] 60-60:
'TopicPartitionOffset' does not contain a definition for 'Unset' and no accessible extension method 'Unset' accepting a first argument of type 'TopicPartitionOffset' could be found (are you missing a using directive or an assembly reference?)


[failure] 58-58:
Add a way to break out of this property's recursion. (https://rules.sonarsource.com/csharp/RSPEC-2190)


[failure] 59-59:
'Partition' does not contain a definition for 'Unassigned'


[failure] 60-60:
'TopicPartitionOffset' does not contain a definition for 'Unset' and no accessible extension method 'Unset' accepting a first argument of type 'TopicPartitionOffset' could be found (are you missing a using directive or an assembly reference?)


[failure] 58-58:
Add a way to break out of this property's recursion. (https://rules.sonarsource.com/csharp/RSPEC-2190)

🪛 GitHub Actions: Build
src/MinimalKafka/KafkaContext.cs

[error] 59-59: CS0117: 'Partition' does not contain a definition for 'Unassigned'

🪛 GitHub Actions: Code Security Testing
src/MinimalKafka/KafkaContext.cs

[error] 59-59: CS0117: 'Partition' does not contain a definition for 'Unassigned'

🔇 Additional comments (2)
src/MinimalKafka/KafkaContext.cs (2)

13-13: LGTM: Abstract property establishes clear contract.

The addition of the abstract Offset property correctly establishes that all KafkaContext implementations must provide offset information, which aligns with the auto-commit functionality described in the PR objectives.


79-80: LGTM: Correct implementation for generic context.

The implementation correctly returns the offset from the ConsumeResult, providing the actual offset information needed for auto-commit functionality.

Comment on lines 58 to 62
public override TopicPartitionOffset Offset => new TopicPartitionOffset(
new TopicPartition("", Partition.Unassigned),
Offset.Unset
);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix compilation errors in EmptyKafkaContext.Offset implementation.

The implementation has addressed the previous feedback about returning a sentinel value instead of throwing NotImplementedException, but there are compilation errors that need to be resolved:

  1. Partition.Unassigned doesn't exist in the Confluent.Kafka API
  2. Offset.Unset is causing a naming conflict with the property name

Apply this diff to fix the compilation errors:

-    public override TopicPartitionOffset Offset => new TopicPartitionOffset(
-        new TopicPartition("", Partition.Unassigned),
-        Offset.Unset
-    );
+    public override TopicPartitionOffset Offset => new TopicPartitionOffset(
+        new TopicPartition("", Partition.Any),
+        Confluent.Kafka.Offset.Unset
+    );

The fixes:

  • Replace Partition.Unassigned with Partition.Any (the correct API)
  • Fully qualify Confluent.Kafka.Offset.Unset to avoid naming conflict with the property
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
public override TopicPartitionOffset Offset => new TopicPartitionOffset(
new TopicPartition("", Partition.Unassigned),
Offset.Unset
);
public override TopicPartitionOffset Offset => new TopicPartitionOffset(
new TopicPartition("", Partition.Any),
Confluent.Kafka.Offset.Unset
);
🧰 Tools
🪛 GitHub Check: Analyze

[failure] 59-59:
'Partition' does not contain a definition for 'Unassigned'


[failure] 60-60:
'TopicPartitionOffset' does not contain a definition for 'Unset' and no accessible extension method 'Unset' accepting a first argument of type 'TopicPartitionOffset' could be found (are you missing a using directive or an assembly reference?)


[failure] 58-58:
Add a way to break out of this property's recursion. (https://rules.sonarsource.com/csharp/RSPEC-2190)


[failure] 59-59:
'Partition' does not contain a definition for 'Unassigned'


[failure] 60-60:
'TopicPartitionOffset' does not contain a definition for 'Unset' and no accessible extension method 'Unset' accepting a first argument of type 'TopicPartitionOffset' could be found (are you missing a using directive or an assembly reference?)


[failure] 58-58:
Add a way to break out of this property's recursion. (https://rules.sonarsource.com/csharp/RSPEC-2190)

🪛 GitHub Check: Build

[failure] 59-59:
'Partition' does not contain a definition for 'Unassigned'


[failure] 60-60:
'TopicPartitionOffset' does not contain a definition for 'Unset' and no accessible extension method 'Unset' accepting a first argument of type 'TopicPartitionOffset' could be found (are you missing a using directive or an assembly reference?)


[failure] 58-58:
Add a way to break out of this property's recursion. (https://rules.sonarsource.com/csharp/RSPEC-2190)


[failure] 59-59:
'Partition' does not contain a definition for 'Unassigned'


[failure] 60-60:
'TopicPartitionOffset' does not contain a definition for 'Unset' and no accessible extension method 'Unset' accepting a first argument of type 'TopicPartitionOffset' could be found (are you missing a using directive or an assembly reference?)


[failure] 58-58:
Add a way to break out of this property's recursion. (https://rules.sonarsource.com/csharp/RSPEC-2190)

🪛 GitHub Actions: Build

[error] 59-59: CS0117: 'Partition' does not contain a definition for 'Unassigned'

🪛 GitHub Actions: Code Security Testing

[error] 59-59: CS0117: 'Partition' does not contain a definition for 'Unassigned'

🤖 Prompt for AI Agents
In src/MinimalKafka/KafkaContext.cs around lines 58 to 62, fix the compilation
errors in the EmptyKafkaContext.Offset property by replacing the incorrect
Partition.Unassigned with Partition.Any from the Confluent.Kafka API, and fully
qualify the Offset.Unset value as Confluent.Kafka.Offset.Unset to avoid the
naming conflict with the property name.

@pmdevers pmdevers changed the title Default Disable AutoCommit Allow to to disable auto commit May 30, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
src/MinimalKafka/Metadata/AutoCommitMetaData.cs (1)

5-8: Remove redundant access modifier.

The public modifier on interface members is redundant in modern C# as interface members are public by default.

 public interface IAutoCommitMetaData : IConsumerConfigMetadata
 {
-    public bool Enabled { get; }
+    bool Enabled { get; }
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ec5210c and 0e814ef.

📒 Files selected for processing (7)
  • src/MinimalKafka/Extension/KafkaConsumerConfigMetadataExtensions.cs (2 hunks)
  • src/MinimalKafka/Helpers/Logging.cs (1 hunks)
  • src/MinimalKafka/KafkaConsumer.cs (5 hunks)
  • src/MinimalKafka/KafkaContext.cs (2 hunks)
  • src/MinimalKafka/KafkaProcess.cs (1 hunks)
  • src/MinimalKafka/Metadata/AutoCommitMetaData.cs (1 hunks)
  • test/MinimalKafka.Tests/KafkaProcessTests.cs (1 hunks)
✅ Files skipped from review due to trivial changes (3)
  • src/MinimalKafka/KafkaContext.cs
  • test/MinimalKafka.Tests/KafkaProcessTests.cs
  • src/MinimalKafka/Helpers/Logging.cs
🚧 Files skipped from review as they are similar to previous changes (3)
  • src/MinimalKafka/KafkaProcess.cs
  • src/MinimalKafka/Extension/KafkaConsumerConfigMetadataExtensions.cs
  • src/MinimalKafka/KafkaConsumer.cs
⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: Analyze
🔇 Additional comments (1)
src/MinimalKafka/Metadata/AutoCommitMetaData.cs (1)

10-20:

❓ Verification inconclusive

Verify the coupling of auto-commit settings.

The implementation correctly uses C# 12 primary constructor syntax and proper attribute configuration. However, please confirm that enable.auto.commit and enable.auto.offset.store should always have the same value, as these are separate Kafka configurations that could potentially be configured independently.


🌐 Web query:

What is the relationship between enable.auto.commit and enable.auto.offset.store in Kafka consumer configuration? Should they always have the same value?

💡 Result:

In Kafka consumer configuration, enable.auto.commit and enable.auto.offset.store are related to how offsets are managed, but they serve different purposes:

  1. enable.auto.commit:

    • This setting controls whether the consumer automatically commits offsets back to Kafka at regular intervals.
    • When set to true, Kafka consumer will periodically commit the latest offset returned by the poll method to Kafka, which means Kafka remembers the consumer's progress.
    • This helps in fault tolerance so that after a restart, the consumer resumes from the last committed offset.
    • Default value is usually true.
  2. enable.auto.offset.store:

    • This setting governs whether the consumer automatically stores the offset of messages returned by poll() in the consumer's local offset store.
    • The local offset store is what Kafka uses internally to track offsets before they are committed.
    • When set to true, the offsets are automatically stored locally without manual intervention.
    • When set to false, you have to manually call consumer.storeOffsets() or consumer.commitSync() to store or commit offsets.
    • This setting is useful when you want fine-grained control over when offsets are stored with respect to message processing.

Their relationship:

  • enable.auto.offset.store controls storing offsets in the local consumer state; enable.auto.commit controls committing those stored offsets to Kafka.
  • Typically, if enable.auto.commit is true, it makes sense for enable.auto.offset.store to also be true so that offsets are automatically stored and then periodically committed.
  • If you want to manually manage offset storing (e.g., only after processing messages successfully), you might set enable.auto.offset.store to false and call storeOffsets() manually; in that case, you might also disable enable.auto.commit and do manual commits to ensure precise control.

Should they always have the same value?

  • Not necessarily. They can differ depending on your offset management strategy.
  • For simple, automatic offset management, both can be true.
  • For manual, fine-tuned control, you might disable auto offset store to control when offsets are stored, and disable auto commit to control when offsets are committed.

In summary:

Setting Purpose Often set together?
enable.auto.offset.store Automatically store offsets locally Often yes
enable.auto.commit Automatically commit stored offsets to Kafka Depends on manual vs auto control

If you want full manual control: both might be false.
If you want automatic offset tracking and committing: both might be true.

Let me know if you want a recommendation for a particular use case!


Confirm coupling of auto-commit and offset-store settings

Kafka’s enable.auto.offset.store (auto-store) and enable.auto.commit (auto-commit) serve distinct roles:

  • auto-store: when true, offsets returned by poll() are stored locally without manual calls to storeOffsets().
  • auto-commit: when true, the consumer periodically commits those stored offsets back to Kafka.

It’s common for simple use cases to set both to true, and for fully manual offset management to set both to false. However, you can also mix them (e.g. auto-store = true but auto-commit = false) to store offsets automatically but commit them only when you decide.

Please confirm that AutoCommitMetaDataAttribute is intended to toggle both settings in lockstep. If the goal is solely to control auto-commit behavior, consider decoupling enable.auto.offset.store or documenting this combined effect clearly.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🔭 Outside diff range comments (1)
src/MinimalKafka/KafkaConsumer.cs (1)

67-77: ⚠️ Potential issue

IServiceScope is never disposed – leads to memory leaks

CreateScope() should be wrapped in a using (or await using if you ever migrate to async disposables).
Without disposal, scoped services (DbContexts, etc.) accumulate indefinitely.

-var scope = _serviceProvider.CreateScope();
-var result = _consumer.Consume(cancellationToken);
+using var scope = _serviceProvider.CreateScope();
+var result = _consumer.Consume(cancellationToken);
🧹 Nitpick comments (5)
src/MinimalKafka/KafkaExtensions.cs (1)

53-53: Defaulting to WithAutoCommit(false) may surprise consumers

Disabling auto-commit before invoking the user-supplied config delegate means every application now has to explicitly enable it to restore the old behaviour.
If backward compatibility is required, consider moving this call after config(configBuilder) or documenting it very visibly.

test/MinimalKafka.Tests/KafkaProcessTests.cs (1)

65-66: Assertion does not verify call count

ReceivedWithAnyArgs() only checks at least one invocation.
For deterministic tests prefer an exact count:

-await _handler.ReceivedWithAnyArgs().Invoke(Arg.Any<KafkaContext>());
+await _handler.Received(1).Invoke(Arg.Any<KafkaContext>());
src/MinimalKafka/KafkaConsumer.cs (3)

84-88: Logic inversion risk: commit when auto-commit is disabled

The method commits offsets only when HasAutoCommit() is false.
This is correct given your metadata semantics but counter-intuitive; a short comment would save the next maintainer some head-scratching.

// Manual commit because auto-commit is disabled

91-93: No exception handling around Commit

_consumer.Commit() can throw (e.g., broker unavailable).
Consider wrapping it to avoid the entire process crashing on transient commit failures and potentially retry.


140-142: Helper name suggests boolean but returns opposite of Kafka’s default

HasAutoCommit() returns Enabled ?? false.
If Enabled is true you skip the manual commit.
A clearer name could be IsAutoCommitEnabled.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between aeacdbc and e2da136.

📒 Files selected for processing (8)
  • src/MinimalKafka/Extension/KafkaConsumerConfigMetadataExtensions.cs (1 hunks)
  • src/MinimalKafka/Helpers/Logging.cs (2 hunks)
  • src/MinimalKafka/KafkaConsumer.cs (5 hunks)
  • src/MinimalKafka/KafkaExtensions.cs (1 hunks)
  • src/MinimalKafka/KafkaProcess.cs (2 hunks)
  • src/MinimalKafka/KafkaService.cs (1 hunks)
  • src/MinimalKafka/Metadata/AutoCommitMetaData.cs (1 hunks)
  • test/MinimalKafka.Tests/KafkaProcessTests.cs (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
  • src/MinimalKafka/Helpers/Logging.cs
  • src/MinimalKafka/KafkaProcess.cs
  • src/MinimalKafka/Extension/KafkaConsumerConfigMetadataExtensions.cs
  • src/MinimalKafka/Metadata/AutoCommitMetaData.cs
🧰 Additional context used
🧬 Code Graph Analysis (3)
src/MinimalKafka/KafkaConsumer.cs (4)
src/MinimalKafka/KafkaProcess.cs (4)
  • Task (6-6)
  • Task (7-7)
  • Task (32-53)
  • Task (55-59)
src/MinimalKafka/KafkaService.cs (2)
  • Task (13-37)
  • Task (39-52)
src/MinimalKafka/KafkaContext.cs (4)
  • KafkaContext (4-40)
  • KafkaContext (15-39)
  • KafkaContext (57-70)
  • EmptyKafkaContext (42-55)
src/MinimalKafka/KafkaConsumerBuilder.cs (1)
  • GetMetaData (46-56)
test/MinimalKafka.Tests/KafkaProcessTests.cs (3)
test/MinimalKafka.Tests/KafkaContextTests.cs (7)
  • Fact (7-19)
  • Fact (21-33)
  • Fact (35-63)
  • Fact (65-74)
  • Fact (76-84)
  • Fact (86-94)
  • Fact (96-121)
src/MinimalKafka/KafkaProcess.cs (7)
  • Task (6-6)
  • Task (7-7)
  • Task (32-53)
  • Task (55-59)
  • KafkaProcess (15-60)
  • KafkaProcess (20-27)
  • KafkaProcess (29-30)
src/MinimalKafka/KafkaConsumer.cs (3)
  • Task (13-13)
  • Task (35-38)
  • Task (63-99)
src/MinimalKafka/KafkaService.cs (1)
src/MinimalKafka/KafkaProcess.cs (5)
  • Task (6-6)
  • Task (7-7)
  • Task (32-53)
  • Task (55-59)
  • KafkaProcesException (62-64)
⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: Analyze
🔇 Additional comments (2)
src/MinimalKafka/KafkaService.cs (1)

26-29:

❌ Incorrect review comment

CancellationTokenSource.CancelAsync() limits target frameworks

CancelAsync() is only available from .NET 8 onward.
If the library still targets LTS TFMs such as net6.0 / net7.0 this will not compile.

-await cts.CancelAsync();
+cts.Cancel();            // synchronous, universally available

🏁 Script executed:

#!/bin/bash
set -e
# Search for TargetFramework/TargetFrameworks in all csproj files
rg -n "<TargetFramework" .

Length of output: 270


CancelAsync is supported—no change needed
All projects target .NET 8.0 (per Directory.Build.props and Examples.csproj), so CancellationTokenSource.CancelAsync() is available. The synchronous Cancel() replacement isn’t required.

Likely an incorrect or invalid review comment.

test/MinimalKafka.Tests/KafkaProcessTests.cs (1)

78-97: Verifying extension-method call on ILogger is brittle

UnknownProcessException is a static extension; NSubstitute cannot intercept static calls. The test will always pass regardless of invocation.

Either:

  1. Verify the underlying ILogger.Log call (preferred), or
  2. Expose an interface/wrapper around the logger extensions and substitute that.

@pmdevers pmdevers changed the title Allow to to disable auto commit AutoCommit Default False Jun 13, 2025
@pmdevers
Copy link
Owner Author

@coderabbitai review

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jun 13, 2025

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Contributor

@JurJean JurJean left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!!!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants