-
Notifications
You must be signed in to change notification settings - Fork 2
Refactor to allow single consumer with multiple handlers #96
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Improves the structure and organization of the Kafka integration components, moving towards a more streamlined and maintainable design. This includes: - Removing unused files and consolidating builder patterns. - Enhancing consumer management and message handling. - Updating metadata handling for improved configuration and flexibility.
Streamlines the Kafka processing pipeline by introducing a dedicated store factory for managing consumer stores. This change enhances modularity by decoupling consumer creation and management from the builder, and provides a central place for store creation and cleanup. It also introduces a timed in-memory store for persistence and retrieval of message data. Additionally, this commit updates the message handling logic, using spans for efficient memory access and incorporating serializers for key and value deserialization.
Integrates RocksDB as a state store implementation for stateful stream processing, enabling persistence and retrieval of state data. This provides a robust and scalable solution for managing application state. It also refactors the KafkaBuilderExtensions to use IKafkaConfigBuilder instead of IAddKafkaBuilder. The memory stream implementation for the Kafka store is preserved. Removes unused stream store implementation and byte serializers.
Simplifies Kafka configuration by removing unnecessary extensions and centralizing topic formatting. Improves RocksDB store management by using a more robust key generation strategy for column families, preventing potential naming conflicts and ensuring data integrity. Enhances consumer configuration by allowing customization of various event handlers, providing better control and flexibility in handling Kafka events. Updates producer implementation to support transactional message production, improving reliability and consistency.
WalkthroughThis update introduces a significant architectural refactor and feature expansion to the MinimalKafka codebase. It adds a new Aggregates module for domain-driven design, overhauls the Kafka builder and stream store abstractions, removes legacy metadata/configuration interfaces, and unifies serialization with a new serializer interface. The RocksDB integration is migrated to a byte-level store, and configuration is now managed via centralized metadata and DI. Test and example projects are updated accordingly, and dependencies are upgraded. Changes
Sequence Diagram(s)sequenceDiagram
participant App as Application
participant DI as DI Container
participant KafkaBuilder as IKafkaBuilder
participant KafkaProcess as IKafkaProcess
participant KafkaConsumer as IKafkaConsumer
participant KafkaProducer as IKafkaProducer
participant Store as IKafkaStore
App->>DI: Build service provider with AddMinimalKafka()
App->>KafkaBuilder: Configure topics, aggregates, etc.
App->>KafkaProcess: Start()
KafkaProcess->>KafkaConsumer: Subscribe()
loop Message loop
KafkaProcess->>KafkaConsumer: Consume()
KafkaConsumer->>Store: AddOrUpdate(key, value)
KafkaConsumer->>KafkaBuilder: Resolve delegates
KafkaConsumer->>KafkaProducer: ProduceAsync(messages)
end
App->>KafkaProcess: Stop()
KafkaProcess->>KafkaConsumer: Close()
sequenceDiagram
participant App as Application
participant DI as DI Container
participant KafkaBuilder as IKafkaBuilder
participant Aggregate as IAggregate
participant KafkaProcess as IKafkaProcess
App->>KafkaBuilder: MapAggregate<TAggregate, TKey, TCommand>()
KafkaBuilder->>KafkaProcess: Create consumer for command topic
KafkaProcess->>Aggregate: On command, validate version, apply command
Aggregate-->>KafkaProcess: Result<TAggregate>
KafkaProcess->>KafkaBuilder: Produce updated aggregate state or error
Possibly related PRs
Poem
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 39
🔭 Outside diff range comments (1)
src/MinimalKafka/KafkaService.cs (1)
8-8: RenameGetProcesestoGetProcessesacross all usagesThe method name
GetProceses(with a typo) is declared and referenced in multiple locations and must be updated consistently when the interface is corrected:
- src/MinimalKafka/IKafkaDataSource.cs
- src/MinimalKafka/KafkaService.cs
- src/MinimalKafka/Builders/KafkaDataSource.cs
- test/MinimalKafka.Tests/KafkaServiceTests.cs
Ensure you update the interface declaration, all implementations, service calls, and tests to use the corrected name
GetProcesses().
🧹 Nitpick comments (30)
src/MinimalKafka/Metadata/Internals/ConsumerHandlerMetadata.cs (1)
12-12: Terminate file with a newline for POSIX‐friendly tooling.The closing brace is now directly followed by EOF. Adding a trailing newline avoids “No newline at end of file” warnings in Git diffs and improves compatibility with some linters and concatenation tools.
} +examples/KafkaAdventure/Program.cs (1)
22-25: Consider reverting to expression-bodied lambda for better readability.The logic is unchanged, but the expression-bodied form was more concise and readable for this simple formatting operation.
- x.WithTopicFormatter((topic) => - { - return $"{topic}-{Environment.GetEnvironmentVariable("ASPNETCORE_ENVIRONMENT")?.ToLower()}"; - }); + x.WithTopicFormatter(topic => $"{topic}-{Environment.GetEnvironmentVariable("ASPNETCORE_ENVIRONMENT")?.ToLower()}");src/MinimalKafka/Metadata/IProducerConfigMetadata.cs (2)
5-7: Add meaningful XML documentation.The XML documentation comments are empty placeholders. Please provide descriptive documentation for the interface and its purpose.
/// <summary> -/// +/// Represents metadata that carries producer configuration for Kafka operations. /// </summary>
10-13: Add meaningful XML documentation for the Config property.The XML documentation for the
Configproperty is empty. Please provide descriptive documentation./// <summary> - /// + /// Gets the producer configuration settings for Kafka operations. /// </summary>src/MinimalKafka/Metadata/ITopicFormaterMetadata.cs (2)
3-5: Add meaningful XML documentation.The XML documentation comments are empty placeholders. Please provide descriptive documentation for the interface.
/// <summary> -/// +/// Represents metadata that carries a topic name formatter function. /// </summary>
8-11: Add meaningful XML documentation for the TopicFormatter property.The XML documentation for the
TopicFormatterproperty is empty. Please provide descriptive documentation./// <summary> - /// + /// Gets the function used to format Kafka topic names. /// </summary>src/MinimalKafka/Metadata/Internals/ProducerConfigMetadataAttribute.cs (2)
5-9: Consider the design of non-configurable metadata attribute.The attribute creates a new
ProducerConfig()instance but provides no mechanism to configure it. This limits its utility as a metadata carrier.Consider one of these approaches:
- Add constructor parameters to accept configuration values
- Add properties to allow configuration after instantiation
- If this is intentionally a marker attribute, consider documenting this purpose
Example with constructor parameters:
[AttributeUsage(AttributeTargets.Method, Inherited = false, AllowMultiple = false)] -internal class ProducerConfigMetadataAttribute : Attribute, IProducerConfigMetadata +internal class ProducerConfigMetadataAttribute(string? clientId = null, string? bootstrapServers = null) : Attribute, IProducerConfigMetadata { - public ProducerConfig Config { get; } = new ProducerConfig(); + public ProducerConfig Config { get; } = new ProducerConfig + { + ClientId = clientId, + BootstrapServers = bootstrapServers + }; }
6-6: Add documentation for internal metadata attribute.Even internal classes benefit from documentation to help maintainers understand their purpose and usage.
+/// <summary> +/// Internal attribute for carrying producer configuration metadata on methods. +/// </summary> internal class ProducerConfigMetadataAttribute : Attribute, IProducerConfigMetadatasrc/MinimalKafka/Internals/KafkaMessage.cs (1)
15-15: Remove unnecessary trailing semicolon.The trailing semicolon after the closing brace is unnecessary for record declarations.
-}; +}src/MinimalKafka/Builders/KafkaDelegateFactory.cs (1)
134-161: Well-implemented serializer-based deserialization.The refactoring correctly uses the DI container to resolve serializers and properly invokes the Deserialize method. Consider adding try-catch blocks or validation to handle cases where the serializer might not be registered for a specific type.
src/MinimalKafka/Metadata/Internals/ConsumerConfigMetadataAttribute.cs (1)
12-17: Add XML documentation for the factory method.The
FromConfigmethod lacks XML documentation explaining its purpose, parameters, and return value. Consider adding proper documentation for consistency with the rest of the codebase./// <summary> - /// + /// Creates a ConfigMetadataAttribute instance from an IConfiguration source. /// </summary> - /// <param name="configuration"></param> - /// <returns></returns> + /// <param name="configuration">The configuration source to extract Kafka settings from.</param> + /// <returns>A new ConfigMetadataAttribute instance with the extracted configuration.</returns>src/MinimalKafka/Stream/Internals/BranchBuilder.cs (1)
36-38: Simplify constructor syntax.The empty parentheses in the primary constructor and base constructor call are redundant and can be removed for cleaner code.
-public class UnhandledBranchException() : Exception() +public class UnhandledBranchException : Exceptionsrc/MinimalKafka/IKafkaStore.cs (1)
13-13: Consider removing ServiceProvider exposure from the store interface.Exposing
IServiceProviderdirectly on a storage interface may violate separation of concerns and create tight coupling. Consider whether the store implementations truly need direct access to the service provider, or if dependencies should be injected during construction.src/MinimalKafka/Internals/KafkaProcess.cs (1)
10-11: Consider removing the redundant static factory method.The static
Createmethod simply calls the constructor without adding any logic. Consider removing it to simplify the API, especially since the class already uses primary constructor syntax.- public static KafkaProcess Create(KafkaConsumer consumer, ILogger<KafkaProcess> logger) - => new(consumer, logger);src/MinimalKafka/Stream/Internals/JoinIntoBuilder.cs (1)
30-33: Consider adding ConfigureAwait(false) to the foreach await.For library code, consider using
ConfigureAwait(false)to avoid potential deadlocks in sync-over-async scenarios.- await foreach (var right in rightStore.FindAsync<V2>(x => on(value, x))) + await foreach (var right in rightStore.FindAsync<V2>(x => on(value, x)).ConfigureAwait(false))Apply the same pattern to line 40-42 in
ExecuteRightAsync.src/MinimalKafka/Serializers/SystemTextJsonSerializer.cs (1)
25-31: Optimize serialization performance.Using
MemoryStreamandToArray()creates unnecessary allocations. Consider usingJsonSerializer.SerializeToUtf8Bytesfor better performance.- public byte[] Serialize(T value) - { - using var stream = new MemoryStream(); - JsonSerializer.Serialize(stream, value, typeof(T)); - return stream.ToArray(); - } + public byte[] Serialize(T value) + { + return JsonSerializer.SerializeToUtf8Bytes(value, _jsonOptions); + }src/MinimalKafka/Internals/KafkaContextProducer.cs (3)
5-8: Add documentation for the KafkaTopicFormatter delegateThe XML documentation comment is empty. Please provide a meaningful description.
/// <summary> -/// +/// Delegate for formatting Kafka topic names based on application-specific rules. /// </summary> +/// <param name="topic">The original topic name to format</param> +/// <returns>The formatted topic name</returns> public delegate string KafkaTopicFormatter(string topic);
37-37: Fix typo in variable name-var formmattedTopic = _formatter(msg.Topic); +var formattedTopic = _formatter(msg.Topic);
61-61: Simplify random consumer key generationThe nested
Guid.NewGuid()calls are redundant sinceKafkaConsumerKey.Randomalready generates a unique identifier.-var context = KafkaContext.Create(KafkaConsumerKey.Random(Guid.NewGuid().ToString()), new() { Key = [], Value = [] }, _serviceProvider, []); +var context = KafkaContext.Create(KafkaConsumerKey.Random(topic), new() { Key = [], Value = [] }, _serviceProvider, []);src/MinimalKafka/Internals/KafkaConsumer.cs (1)
49-51: Improve when clause formattingThe when clause condition should be on the same line for better readability.
-catch (OperationCanceledException ex) -when(ex.CancellationToken == cancellationToken) +catch (OperationCanceledException ex) when (ex.CancellationToken == cancellationToken)src/MinimalKafka/Builders/KafkaConsumerBuilder.cs (2)
55-55: Remove or explain commented codeThe commented
c.Commit()call should either be removed or have a clear explanation of why it's commented out.
68-68: Remove extra space- Delegates.Add( del ); + Delegates.Add(del);src/MinimalKafka/Internals/KafkaInMemoryStoreFactory.cs (2)
11-23: Simplify store creation logic.Since you're already under lock, the
TryAddcall is redundant. Use direct assignment instead.public IKafkaStore GetStore(KafkaConsumerKey consumerKey) { lock (_lock) { if (!_stores.TryGetValue(consumerKey, out KafkaInMemoryStore? store)) { store = new KafkaInMemoryStore(serviceProvider); - _stores.TryAdd(consumerKey, store); - + _stores[consumerKey] = store; } return store; } }
63-66: Remove unnecessary async/await.The method doesn't need to be async since it's not awaiting any async operations.
-public async ValueTask<byte[]> FindByIdAsync(byte[] key) +public ValueTask<byte[]> FindByIdAsync(byte[] key) { - return await _store.FindByIdAsync(key) ?? []; + var result = _store.FindByIdAsync(key); + return result.IsCompletedSuccessfully + ? ValueTask.FromResult(result.Result ?? []) + : HandleAsync(result); + + static async ValueTask<byte[]> HandleAsync(ValueTask<byte[]?> task) + { + return await task.ConfigureAwait(false) ?? []; + } }src/MinimalKafka/KafkaContextExtenions.cs (2)
50-65: Avoid unnecessary dictionary allocation.Pass
headersdirectly instead of creating an empty dictionary when null.context.Produce(new() { Topic = topic, Key = keySerializer.Serialize(key), Value = valueSerializer.Serialize(value), - Headers = headers ?? [], + Headers = headers, Timestamp = timeprovider.GetTimestamp() });
78-82: Consider making ProduceAsync truly asynchronous.The current implementation just wraps the synchronous call. If the underlying
Producemethod could be made async, this would provide better scalability.Consider refactoring the underlying
context.Produceto support async operations, which would allow for truly asynchronous message production and better throughput under load.src/MinimalKafka/Builders/KafkaDataSource.cs (1)
1-1: Remove unnecessary blank lines.Multiple unnecessary blank lines affect code readability.
Remove the extra blank lines at lines 1, 71, 79, and 98 to improve code formatting consistency.
Also applies to: 71-71, 79-79, 98-98
src/MinimalKafka/KafkaContext.cs (1)
8-11: Fill in empty XML documentation commentsMultiple XML documentation comments are empty throughout the file. Please provide meaningful descriptions for these public API elements to improve code documentation.
Consider adding descriptions like:
-/// <summary> -/// -/// </summary> +/// <summary> +/// Represents the context for a Kafka message with associated metadata and services. +/// </summary>Also applies to: 25-27, 31-33, 36-38, 40-43, 45-47, 49-51, 59-63, 70-73, 80-83, 94-97
src/MinimalKafka/KafkaExtensions.cs (2)
15-16: Complete empty XML documentation commentsMultiple XML documentation comments are empty. Please provide meaningful descriptions for these public API methods to improve code documentation and IntelliSense support.
Also applies to: 20-21, 79-79, 104-107, 137-143, 167-169, 191-196, 290-291
203-233: Refactor duplicated metadata handling logicThe
UpdateConsumerConfigandUpdateProducerConfigmethods have nearly identical logic for finding or creatingIConfigMetadata. Consider extracting this into a shared helper method.+private static IConfigMetadata GetOrCreateConfigMetadata(IKafkaBuilder builder) +{ + var item = builder.MetaData.OfType<IConfigMetadata>().FirstOrDefault(); + if (item is null) + { + item = new ConfigMetadataAttribute(new Dictionary<string, string>()); + builder.MetaData.Add(item); + } + return item; +} internal static TBuilder UpdateConsumerConfig<TBuilder>(this TBuilder builder, Action<ConsumerConfig> update) where TBuilder: IKafkaConventionBuilder { - builder.Add(b => { - var item = b.MetaData.OfType<IConfigMetadata>().FirstOrDefault(); - if(item is null) - { - item = new ConfigMetadataAttribute(new Dictionary<string, string>()); - b.MetaData.Add(item); - } - - update?.Invoke(item.ConsumerConfig); + builder.Add(b => { + var item = GetOrCreateConfigMetadata(b); + update?.Invoke(item.ConsumerConfig); }); return builder; }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (107)
examples/Examples/Program.cs(4 hunks)examples/KafkaAdventure/Extensions/KafkaBuilderExtensions.cs(0 hunks)examples/KafkaAdventure/Features/CommandProcessor/ProcessorFeature.cs(3 hunks)examples/KafkaAdventure/Features/Input/InputFeature.cs(2 hunks)examples/KafkaAdventure/Features/Input/InputHub.cs(2 hunks)examples/KafkaAdventure/Features/Locations/LocationsFeature.cs(1 hunks)examples/KafkaAdventure/Features/Movement/MovementFeature.cs(3 hunks)examples/KafkaAdventure/Features/PlayerLocation/PlayerLocations.cs(2 hunks)examples/KafkaAdventure/Program.cs(1 hunks)examples/KafkaAdventure/StreamStore.cs(0 hunks)src/MinimalKafka.RocksDB/ByteSerializer.cs(0 hunks)src/MinimalKafka.RocksDB/IByteSerializer.cs(0 hunks)src/MinimalKafka.RocksDB/KafkaBuilderExtensions.cs(2 hunks)src/MinimalKafka.RocksDB/RocksDBOptions.cs(0 hunks)src/MinimalKafka.RocksDB/RocksDBStreamStore.cs(1 hunks)src/MinimalKafka.RocksDB/RocksDBStreamStoreFactory.cs(4 hunks)src/MinimalKafka/Builders/Internals/AddKafkaBuilder.cs(0 hunks)src/MinimalKafka/Builders/KafkaBuilder.cs(1 hunks)src/MinimalKafka/Builders/KafkaConfigConventionBuilder.cs(1 hunks)src/MinimalKafka/Builders/KafkaConsumerBuilder.cs(1 hunks)src/MinimalKafka/Builders/KafkaConventionBuilder.cs(2 hunks)src/MinimalKafka/Builders/KafkaDataSource.cs(3 hunks)src/MinimalKafka/Builders/KafkaDelegateFactory.cs(3 hunks)src/MinimalKafka/Extension/AddKafkaBuilderExtensions.cs(0 hunks)src/MinimalKafka/Extension/KafkaConsumerBuilderMetadataExtensions.cs(0 hunks)src/MinimalKafka/Extension/KafkaConsumerConfigMetadataExtensions.cs(0 hunks)src/MinimalKafka/Extension/KafkaContextExtensions.cs(0 hunks)src/MinimalKafka/Extension/KafkaConventionBuilderExtensions.cs(0 hunks)src/MinimalKafka/Extension/KafkaProducerBuilderMetadataExtensions.cs(0 hunks)src/MinimalKafka/Extension/MetadataHelperExtensions.cs(0 hunks)src/MinimalKafka/FromKeyAttribute.cs(0 hunks)src/MinimalKafka/FromValueAttribute.cs(0 hunks)src/MinimalKafka/Helpers/DebuggerHelper.cs(0 hunks)src/MinimalKafka/Helpers/Logging.cs(0 hunks)src/MinimalKafka/IKafkaBuilder.cs(2 hunks)src/MinimalKafka/IKafkaConfigBuilder.cs(1 hunks)src/MinimalKafka/IKafkaConsumer.cs(1 hunks)src/MinimalKafka/IKafkaConsumerBuilder.cs(1 hunks)src/MinimalKafka/IKafkaConventionBuilder.cs(2 hunks)src/MinimalKafka/IKafkaDataSource.cs(2 hunks)src/MinimalKafka/IKafkaProcess.cs(1 hunks)src/MinimalKafka/IKafkaProducer.cs(1 hunks)src/MinimalKafka/IKafkaStore.cs(1 hunks)src/MinimalKafka/Internals/KafkaConsumer.cs(1 hunks)src/MinimalKafka/Internals/KafkaContextProducer.cs(1 hunks)src/MinimalKafka/Internals/KafkaInMemoryStoreFactory.cs(1 hunks)src/MinimalKafka/Internals/KafkaMessage.cs(1 hunks)src/MinimalKafka/Internals/KafkaProcess.cs(1 hunks)src/MinimalKafka/Internals/TimedConcurrentDictionary.cs(3 hunks)src/MinimalKafka/KafkaConsumer.cs(0 hunks)src/MinimalKafka/KafkaConsumerBuilder.cs(0 hunks)src/MinimalKafka/KafkaConsumerKey.cs(1 hunks)src/MinimalKafka/KafkaConsumerOptions.cs(0 hunks)src/MinimalKafka/KafkaContext.cs(1 hunks)src/MinimalKafka/KafkaContextExtenions.cs(1 hunks)src/MinimalKafka/KafkaDelegate.cs(0 hunks)src/MinimalKafka/KafkaExtensions.cs(3 hunks)src/MinimalKafka/KafkaProcess.cs(0 hunks)src/MinimalKafka/KafkaProducerFactory.cs(0 hunks)src/MinimalKafka/KafkaService.cs(3 hunks)src/MinimalKafka/Metadata/IAutoCommitMetaData.cs(0 hunks)src/MinimalKafka/Metadata/IAutoOffsetResetMetadata.cs(0 hunks)src/MinimalKafka/Metadata/IBootstrapServersMetadata.cs(0 hunks)src/MinimalKafka/Metadata/IClientIdMetadata.cs(0 hunks)src/MinimalKafka/Metadata/IConfigurationMetadata.cs(0 hunks)src/MinimalKafka/Metadata/IConsumerConfigMetadata.cs(1 hunks)src/MinimalKafka/Metadata/IConsumerGroupMetadata.cs(0 hunks)src/MinimalKafka/Metadata/IConsumerHandlerMetadata.cs(1 hunks)src/MinimalKafka/Metadata/IKeySerializerMetadata.cs(0 hunks)src/MinimalKafka/Metadata/IProducerConfigMetadata.cs(1 hunks)src/MinimalKafka/Metadata/IReportIntervalMetadata.cs(0 hunks)src/MinimalKafka/Metadata/ISerializerMetadata.cs(0 hunks)src/MinimalKafka/Metadata/ITopicFormaterMetadata.cs(1 hunks)src/MinimalKafka/Metadata/ITopicFormatterMetadata.cs(0 hunks)src/MinimalKafka/Metadata/Internals/AutoCommitMetaData.cs(0 hunks)src/MinimalKafka/Metadata/Internals/AutoOffsetResetMetadata.cs(0 hunks)src/MinimalKafka/Metadata/Internals/BootstrapServersMetadata.cs(0 hunks)src/MinimalKafka/Metadata/Internals/ClientIdMetadata.cs(0 hunks)src/MinimalKafka/Metadata/Internals/ConfigurationMetadata.cs(0 hunks)src/MinimalKafka/Metadata/Internals/ConsumerConfigMetadataAttribute.cs(1 hunks)src/MinimalKafka/Metadata/Internals/ConsumerHandlerMetadata.cs(1 hunks)src/MinimalKafka/Metadata/Internals/GroupIdMetadata.cs(0 hunks)src/MinimalKafka/Metadata/Internals/KeyDeserializerMetadata.cs(0 hunks)src/MinimalKafka/Metadata/Internals/KeySerializerMetadata.cs(0 hunks)src/MinimalKafka/Metadata/Internals/ProducerConfigMetadataAttribute.cs(1 hunks)src/MinimalKafka/Metadata/Internals/ReportIntervalMetadata.cs(0 hunks)src/MinimalKafka/Metadata/Internals/TopicFormatterMetadataAttribute.cs(1 hunks)src/MinimalKafka/Metadata/Internals/ValueDeserializerMetadata.cs(0 hunks)src/MinimalKafka/Metadata/Internals/ValueSerializerMetadata.cs(0 hunks)src/MinimalKafka/Serializers/IKafkaSerializer.cs(1 hunks)src/MinimalKafka/Serializers/JsonTextSerializer.cs(0 hunks)src/MinimalKafka/Serializers/SystemTextJsonSerializer.cs(1 hunks)src/MinimalKafka/Serializers/Utf8Constants.cs(1 hunks)src/MinimalKafka/Stream/IBranchBuilder.cs(1 hunks)src/MinimalKafka/Stream/IIntoBuilder.cs(2 hunks)src/MinimalKafka/Stream/IIntoBuilderExtensions.cs(1 hunks)src/MinimalKafka/Stream/IJoinBuilder.cs(0 hunks)src/MinimalKafka/Stream/IJoinBuilderExtensions.cs(1 hunks)src/MinimalKafka/Stream/IStreamBuilder.cs(0 hunks)src/MinimalKafka/Stream/IStreamStore.cs(0 hunks)src/MinimalKafka/Stream/IStreamStoreFactory.cs(0 hunks)src/MinimalKafka/Stream/Internals/BranchBuilder.cs(1 hunks)src/MinimalKafka/Stream/Internals/JoinBuilder.cs(1 hunks)src/MinimalKafka/Stream/Internals/JoinByKeyIntoBuilder.cs(2 hunks)src/MinimalKafka/Stream/Internals/JoinConventionBuilder.cs(1 hunks)src/MinimalKafka/Stream/Internals/JoinIntoBuilder.cs(2 hunks)src/MinimalKafka/Stream/Internals/StreamBuilder.cs(2 hunks)
⛔ Files not processed due to max files limit (13)
- src/MinimalKafka/Stream/KafkaStoreExtensions.cs
- src/MinimalKafka/Stream/Storage/InMemoryStore.cs
- src/MinimalKafka/Stream/StreamExtensions.cs
- src/MinimalKafka/Stream/UnhandledBranchException.cs
- test/MinimalKafka.RocksDB.Tests/StreamStore_Tests.cs
- test/MinimalKafka.Tests/ConsumerBuilderTest.cs
- test/MinimalKafka.Tests/DelegateFactoryTests.cs
- test/MinimalKafka.Tests/KafkaContextTests.cs
- test/MinimalKafka.Tests/KafkaProcessTests.cs
- test/MinimalKafka.Tests/KafkaServiceTests.cs
- test/MinimalKafka.Tests/RegistrationTests.cs
- test/MinimalKafka.Tests/Serializers/JsonTextSerializersTests.cs
- test/MinimalKafka.Tests/Stream/StreamExtensionsTests.cs
💤 Files with no reviewable changes (49)
- src/MinimalKafka/Stream/IStreamBuilder.cs
- src/MinimalKafka/Helpers/Logging.cs
- src/MinimalKafka/FromKeyAttribute.cs
- src/MinimalKafka/FromValueAttribute.cs
- src/MinimalKafka/Metadata/IConsumerGroupMetadata.cs
- src/MinimalKafka.RocksDB/RocksDBOptions.cs
- src/MinimalKafka/Metadata/IBootstrapServersMetadata.cs
- src/MinimalKafka/Helpers/DebuggerHelper.cs
- src/MinimalKafka/Metadata/IReportIntervalMetadata.cs
- src/MinimalKafka.RocksDB/ByteSerializer.cs
- src/MinimalKafka/Metadata/Internals/ValueDeserializerMetadata.cs
- src/MinimalKafka/Metadata/Internals/ConfigurationMetadata.cs
- src/MinimalKafka/Metadata/Internals/KeySerializerMetadata.cs
- src/MinimalKafka/Metadata/Internals/ReportIntervalMetadata.cs
- src/MinimalKafka/Metadata/IKeySerializerMetadata.cs
- src/MinimalKafka/Stream/IJoinBuilder.cs
- src/MinimalKafka/Metadata/ITopicFormatterMetadata.cs
- examples/KafkaAdventure/Extensions/KafkaBuilderExtensions.cs
- src/MinimalKafka/Metadata/IConfigurationMetadata.cs
- src/MinimalKafka/Metadata/IClientIdMetadata.cs
- src/MinimalKafka/Metadata/Internals/BootstrapServersMetadata.cs
- src/MinimalKafka/Metadata/ISerializerMetadata.cs
- src/MinimalKafka/Metadata/Internals/AutoOffsetResetMetadata.cs
- src/MinimalKafka/Metadata/Internals/GroupIdMetadata.cs
- src/MinimalKafka/Builders/Internals/AddKafkaBuilder.cs
- src/MinimalKafka/Metadata/Internals/ClientIdMetadata.cs
- src/MinimalKafka/Extension/KafkaContextExtensions.cs
- src/MinimalKafka/Metadata/Internals/ValueSerializerMetadata.cs
- src/MinimalKafka/Metadata/Internals/AutoCommitMetaData.cs
- examples/KafkaAdventure/StreamStore.cs
- src/MinimalKafka/KafkaProducerFactory.cs
- src/MinimalKafka/Extension/MetadataHelperExtensions.cs
- src/MinimalKafka/KafkaConsumerOptions.cs
- src/MinimalKafka/KafkaDelegate.cs
- src/MinimalKafka/KafkaProcess.cs
- src/MinimalKafka/Metadata/IAutoOffsetResetMetadata.cs
- src/MinimalKafka/Metadata/IAutoCommitMetaData.cs
- src/MinimalKafka/Serializers/JsonTextSerializer.cs
- src/MinimalKafka/KafkaConsumer.cs
- src/MinimalKafka/Extension/KafkaConsumerBuilderMetadataExtensions.cs
- src/MinimalKafka/Stream/IStreamStoreFactory.cs
- src/MinimalKafka.RocksDB/IByteSerializer.cs
- src/MinimalKafka/Metadata/Internals/KeyDeserializerMetadata.cs
- src/MinimalKafka/Extension/AddKafkaBuilderExtensions.cs
- src/MinimalKafka/Stream/IStreamStore.cs
- src/MinimalKafka/Extension/KafkaProducerBuilderMetadataExtensions.cs
- src/MinimalKafka/KafkaConsumerBuilder.cs
- src/MinimalKafka/Extension/KafkaConventionBuilderExtensions.cs
- src/MinimalKafka/Extension/KafkaConsumerConfigMetadataExtensions.cs
🧰 Additional context used
🧬 Code Graph Analysis (11)
src/MinimalKafka/Stream/Internals/JoinConventionBuilder.cs (4)
src/MinimalKafka/KafkaExtensions.cs (2)
IKafkaConventionBuilder(72-76)IKafkaConventionBuilder(90-95)src/MinimalKafka/Stream/Internals/JoinByKeyIntoBuilder.cs (1)
IKafkaConventionBuilder(16-24)src/MinimalKafka/Stream/IIntoBuilder.cs (2)
IKafkaConventionBuilder(15-15)IKafkaConventionBuilder(29-29)src/MinimalKafka/Stream/Internals/JoinIntoBuilder.cs (1)
IKafkaConventionBuilder(16-24)
src/MinimalKafka/IKafkaBuilder.cs (1)
src/MinimalKafka/KafkaExtensions.cs (1)
IKafkaDataSource(97-101)
src/MinimalKafka/Serializers/IKafkaSerializer.cs (2)
src/MinimalKafka/KafkaContext.cs (2)
T(63-67)T(74-78)src/MinimalKafka/Serializers/SystemTextJsonSerializer.cs (2)
T(10-23)Serialize(25-30)
src/MinimalKafka/Builders/KafkaBuilder.cs (1)
src/MinimalKafka/KafkaExtensions.cs (1)
IKafkaDataSource(97-101)
src/MinimalKafka/Builders/KafkaConfigConventionBuilder.cs (2)
src/MinimalKafka/KafkaExtensions.cs (1)
IServiceCollection(25-59)src/MinimalKafka/Builders/KafkaConventionBuilder.cs (1)
KafkaConventionBuilder(2-15)
src/MinimalKafka/IKafkaProducer.cs (1)
src/MinimalKafka/KafkaContext.cs (3)
KafkaContext(12-92)KafkaContext(16-22)KafkaContext(55-56)
src/MinimalKafka.RocksDB/KafkaBuilderExtensions.cs (2)
src/MinimalKafka.RocksDB/RocksDBOptions.cs (1)
RocksDBOptions(8-15)src/MinimalKafka.RocksDB/RocksDBStreamStoreFactory.cs (2)
RocksDBStreamStoreFactory(5-71)RocksDBStreamStoreFactory(10-45)
src/MinimalKafka/IKafkaStore.cs (4)
src/MinimalKafka.RocksDB/RocksDBStreamStoreFactory.cs (1)
IKafkaStore(55-70)src/MinimalKafka/Internals/KafkaInMemoryStoreFactory.cs (1)
IKafkaStore(11-23)src/MinimalKafka/KafkaContextExtenions.cs (3)
IKafkaStore(17-24)IKafkaStore(31-35)IKafkaStoreFactory(37-38)src/MinimalKafka/KafkaConsumerKey.cs (1)
KafkaConsumerKey(16-17)
src/MinimalKafka/Stream/Internals/JoinIntoBuilder.cs (1)
src/MinimalKafka/KafkaContext.cs (3)
KafkaContext(12-92)KafkaContext(16-22)KafkaContext(55-56)
src/MinimalKafka.RocksDB/RocksDBStreamStore.cs (4)
src/MinimalKafka.RocksDB/RocksDBStreamStoreFactory.cs (1)
IKafkaStore(55-70)src/MinimalKafka/IKafkaStore.cs (4)
IKafkaStore(48-48)ValueTask(21-21)ValueTask(28-28)IAsyncEnumerable(34-34)src/MinimalKafka/Internals/KafkaInMemoryStoreFactory.cs (4)
IKafkaStore(11-23)ValueTask(46-56)ValueTask(63-66)IAsyncEnumerable(68-71)src/MinimalKafka/Stream/KafkaStoreExtensions.cs (2)
ValueTask(7-16)IAsyncEnumerable(18-31)
src/MinimalKafka.RocksDB/RocksDBStreamStoreFactory.cs (5)
src/MinimalKafka.RocksDB/RocksDBOptions.cs (1)
RocksDBOptions(8-15)src/MinimalKafka/IKafkaStore.cs (1)
IKafkaStore(48-48)src/MinimalKafka/Internals/KafkaInMemoryStoreFactory.cs (1)
IKafkaStore(11-23)src/MinimalKafka/KafkaConsumerKey.cs (1)
KafkaConsumerKey(16-17)src/MinimalKafka.RocksDB/RocksDBStreamStore.cs (1)
RocksDBStreamStore(7-37)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Analyze
🔇 Additional comments (53)
src/MinimalKafka/Metadata/Internals/ConsumerHandlerMetadata.cs (1)
1-11: No functional changes detected – implementation remains solid.The refactor does not alter any behavior or public surface area. 👍
src/MinimalKafka/Serializers/Utf8Constants.cs (1)
4-4: Verify C# 12 collection-expression support
ReadOnlySpan<byte> BOM => [0xEF, 0xBB, 0xBF];relies on the new collection-expression syntax introduced in C# 12 (.NET 8 SDK).
Please ensure the project’sLangVersion/ target framework has already been bumped; otherwise this line will not compile on earlier toolchains.src/MinimalKafka/Metadata/IConsumerHandlerMetadata.cs (1)
57-58: No functional change – formatting onlyTrailing blank line removal is fine; nothing else to address.
src/MinimalKafka/IKafkaConventionBuilder.cs (1)
1-1: Namespace relocation may break existingusingdirectivesMoving the interface to the root
MinimalKafkanamespace is reasonable, but all downstream code (including external packages) must swap
using MinimalKafka.Builders;→using MinimalKafka;. Please grep the solution to avoid orphan references.src/MinimalKafka/Builders/KafkaConventionBuilder.cs (1)
1-1: Namespace alignment looks good
KafkaConventionBuildernow lives inMinimalKafka.Builders, matching the public interface namespace. No further issues observed.src/MinimalKafka/Stream/Internals/JoinConventionBuilder.cs (1)
1-5: Namespace change acknowledgedThe switch to
MinimalKafka.Stream.Internalsplus minor whitespace cleanup is harmless and keeps the builder consistent with surrounding types.examples/KafkaAdventure/Features/Input/InputFeature.cs (2)
1-1: Good cleanup of unused import.Removing the unused
KafkaAdventure.Extensionsimport improves code cleanliness.
14-14: Excellent simplification supporting the PR objectives.Removing the
.AsFeature("Input")call decouples the topic mapping from feature-specific identity, which aligns perfectly with the goal of enabling a single consumer to work with multiple handlers.src/MinimalKafka/Stream/Internals/JoinBuilder.cs (1)
1-1: Good cleanup of unused import and formatting.Removing the unused
MinimalKafka.Buildersimport and cleaning up the file structure improves code organization.src/MinimalKafka/IKafkaBuilder.cs (2)
1-1: LGTM: Namespace reorganization aligns with broader refactoring.The namespace change from
MinimalKafka.BuilderstoMinimalKafkais consistent with the overall refactoring effort to simplify the namespace structure.
16-16: LGTM: Nullable DataSource enables lazy initialization pattern.Making the
DataSourceproperty nullable is a good design choice that enables lazy initialization, as evidenced by theGetOrAddDatasourceextension method in the relevant code snippets. This pattern supports the builder's flexibility.src/MinimalKafka/Metadata/IProducerConfigMetadata.cs (1)
8-14: LGTM: Clean interface design for producer configuration metadata.The interface design is clean and follows a clear pattern for exposing Confluent.Kafka configuration objects. This aligns well with the metadata consolidation effort mentioned in the PR summary.
src/MinimalKafka/IKafkaConfigBuilder.cs (2)
3-3: LGTM: Namespace consolidation improves organization.Moving the interface from
MinimalKafka.Buildersto the rootMinimalKafkanamespace aligns with the broader refactoring effort and simplifies the namespace structure.
9-9: LGTM: Interface rename improves clarity.Renaming from
IAddKafkaBuildertoIKafkaConfigBuildermakes the interface's purpose much clearer - it's specifically for configuration building rather than just "adding Kafka".src/MinimalKafka/Metadata/ITopicFormaterMetadata.cs (1)
11-11: LGTM: Good use of delegate for topic formatting.Using
Func<string, string>for topic formatting is a flexible and clean approach that aligns with the shift towards function delegates mentioned in the AI summary.examples/KafkaAdventure/Features/Movement/MovementFeature.cs (3)
2-2: LGTM: Using statement updated for namespace reorganization.The change from
MinimalKafka.ExtensiontoMinimalKafkais consistent with the broader namespace refactoring effort.
43-43: LGTM: Removal of AsFeature call aligns with refactoring.The removal of the
.AsFeature("Movement")call is consistent with the broader refactoring mentioned in the AI summary where AsFeature extension methods were removed.
18-18: Filter on the joined topic is correct – no changes requiredThe updated condition intentionally skips any callbacks originating from the
game-player-positiontopic (the joined table) so that only primary stream events are processed.examples/KafkaAdventure/Features/Movement/MovementFeature.cs, line 18:
if (v.Item1 is null || c.ConsumerKey.TopicName == "game-player-position") { return; }examples/KafkaAdventure/Features/CommandProcessor/ProcessorFeature.cs (2)
1-1: LGTM: Namespace consolidation improves clarity.The simplification from multiple namespace imports to a single
using MinimalKafka;aligns with the framework refactoring and improves code readability.
12-48: LGTM: Stream processing logic preserved correctly.The removal of
.AsFeature("Commands")aligns with the framework refactoring while preserving all the command processing logic. The stream still properly routes commands to appropriate topics (game-response, game-movement, game-inventory) based on command types.src/MinimalKafka/IKafkaDataSource.cs (1)
6-14: LGTM: Clean interface design supports multi-handler architecture.The interface design effectively supports the PR objective of allowing single consumers with multiple handlers. The
AddTopicDelegatemethod enables registering multiple handlers for topics, and the return ofIKafkaConventionBuilderallows for flexible configuration.src/MinimalKafka/KafkaService.cs (2)
2-2: LGTM: Namespace change aligns with refactoring.The namespace change from
MinimalKafka.BuilderstoMinimalKafka.Internalsis consistent with the architectural refactoring described in the AI summary.
16-32: LGTM: Improved formatting and error handling.The formatting improvements make the code more readable while preserving the proper error handling and task management logic.
src/MinimalKafka/Metadata/IConsumerConfigMetadata.cs (1)
8-21: LGTM: Interface design supports configuration consolidation.The interface effectively consolidates consumer and producer configuration access, which aligns with the refactoring objective to simplify configuration management.
src/MinimalKafka/IKafkaProcess.cs (1)
13-18: LGTM: Clean interface design for process lifecycle management.The interface provides a clean abstraction for Kafka process lifecycle with proper async/await support and cancellation token handling.
examples/KafkaAdventure/Features/PlayerLocation/PlayerLocations.cs (2)
1-3: LGTM: Namespace cleanup aligns with framework refactoring.The namespace changes correctly remove obsolete extensions and replace them with the updated MinimalKafka imports, consistent with the framework refactoring described in the PR.
27-27: LGTM: Removal of obsolete fluent call.The removal of
.AsFeature("PlayerLocations")is correct and aligns with the framework refactoring that eliminated obsolete extension methods.src/MinimalKafka/Serializers/IKafkaSerializer.cs (1)
7-22: Well-designed interface with modern .NET patterns.The interface design is excellent:
- Generic type parameter provides type safety
ReadOnlySpan<byte>parameter in Deserialize is efficient and modern- Clean, focused interface following single responsibility principle
- Appropriate for dependency injection as shown in the usage examples
src/MinimalKafka/KafkaConsumerKey.cs (1)
9-18: Well-designed record type for consumer identification.The record design is excellent:
- Immutable by default with records
- Appropriate property types (all strings for Kafka identifiers)
- Convenient factory method for generating random instances
- Clean, focused design for its purpose as a composite key
src/MinimalKafka/Internals/TimedConcurrentDictionary.cs (3)
3-3: LGTM: Namespace change aligns with architectural refactoring.The namespace change from
MinimalKafka.Stream.StoragetoMinimalKafka.Internalscorrectly reflects the class's role as an internal utility in the refactored framework architecture.
14-14: LGTM: Interface removal consistent with framework simplification.The removal of the
IStreamStore<TKey, TValue>interface implementation aligns with the framework's shift away from generic stream stores to a unified byte-array store interface, as described in the AI summary.
62-68: LGTM: Minor formatting improvement.The formatting improvement in the conditional check enhances readability without affecting functionality.
src/MinimalKafka/Builders/KafkaBuilder.cs (1)
9-9: Good refactor to nullable property with lazy initialization.The change from a non-nullable property with default initialization to a nullable property aligns well with the lazy initialization pattern used in the
GetOrAddDatasourceextension method fromsrc/MinimalKafka/KafkaExtensions.cs(lines 97-101). This avoids creating unnecessaryKafkaDataSourceinstances when they might not be needed.src/MinimalKafka/Stream/Internals/StreamBuilder.cs (1)
25-28: Well-designed record for stream branching functionality.The
Branch<TKey, TValue>record provides a clean abstraction for conditional stream processing with proper async support. The combination of a predicate for filtering and a branch action for processing aligns well with stream processing patterns.src/MinimalKafka/Builders/KafkaConfigConventionBuilder.cs (1)
5-10: Clean builder pattern implementation.The class design effectively combines convention management from the base class with service collection access required by
IKafkaConfigBuilder. The constructor properly delegates to the base class with an emptyfinallyConventionscollection, and theServicesproperty provides the necessary dependency injection integration.src/MinimalKafka/Stream/Internals/JoinByKeyIntoBuilder.cs (2)
26-33: LGTM! Clean null handling for join logic.The null checking correctly implements inner join semantics by requiring both values to be present when
innerJoinis true.
36-44: GetTopicStore and FindByKey extensions confirmed
GetTopicStore(this KafkaContext, string)and its overload are defined in src/MinimalKafka/KafkaContextExtenions.cs (lines 17, 31).FindByKey<TKey, TValue>(this IKafkaStore, TKey key)is implemented in src/MinimalKafka/Stream/KafkaStoreExtensions.cs (line 7) returningValueTask<TValue?>.No further action required.
src/MinimalKafka/Stream/IBranchBuilder.cs (1)
1-24: Excellent interface documentation!The
IBranchBuilderinterface is well-documented with clear descriptions for the interface, methods, and parameters.src/MinimalKafka/Metadata/Internals/ConsumerConfigMetadataAttribute.cs (1)
19-21: Consider the implications of converting null values to empty strings.The factory method converts null configuration values to empty strings. While this prevents null reference exceptions, it might mask configuration issues where a null value should be treated differently than an empty string in Kafka configuration.
Consider whether this behavior aligns with your intended configuration validation strategy.
src/MinimalKafka/Stream/Internals/BranchBuilder.cs (1)
3-28: BranchBuilder logic looks correct.The implementation properly handles branch matching logic and fallback to default behavior. The first-match semantics and composite delegate construction are well-implemented.
examples/Examples/Program.cs (3)
7-20: Configuration simplification looks good.The explicit configuration of bootstrap servers, group ID, client ID, and transactional ID using
AppDomain.CurrentDomain.FriendlyNameprovides a clear example of the new configuration pattern. The commented-out lines suggest features that are now handled differently in the refactored API.
24-41: Verify that duplicate topic handlers are intentional.Two identical handlers are registered for "my-topic". Given the PR objective to "allow single consumer with multiple handlers," this appears intentional to demonstrate the feature. However, please confirm this is the intended behavior and not an oversight.
If this is intentional, consider adding a comment to clarify the demonstration purpose:
+// Demonstrate multiple handlers for the same topic app.MapTopic("my-topic", ([FromKey] string key, [FromValue] string value) =>
43-83: Stream processing configuration looks correct.The join operations and stream transformations demonstrate proper usage of the new API. The left-to-right and right-to-left joins, along with the stream transformation that produces to "left-update", showcase the framework's capabilities effectively.
examples/KafkaAdventure/Features/Input/InputHub.cs (1)
6-8: Good simplification with the new IKafkaProducer interface.The constructor change from multiple
IProducerparameters to a singleIKafkaProduceraligns well with the new abstraction layer and simplifies dependency management.src/MinimalKafka.RocksDB/KafkaBuilderExtensions.cs (1)
19-28: Good refactoring to align with new builder pattern.The changes successfully adapt the RocksDB configuration to the new
IKafkaConfigBuilderinterface. The factory delegate pattern (s => new RocksDBStreamStoreFactory(s, config)) properly supports dependency injection, and the directory creation ensures the data path exists before RocksDB initialization.The implementation correctly passes the
IServiceProviderto the factory, which aligns with theRocksDBStreamStoreFactoryconstructor signature shown in the relevant code snippets.src/MinimalKafka/Internals/KafkaProcess.cs (1)
13-33: LGTM - Good exception handling and cancellation patterns.The implementation correctly handles cancellation tokens, includes proper exception logging and wrapping, and ensures cleanup logging in the finally block.
src/MinimalKafka/Stream/Internals/JoinIntoBuilder.cs (1)
26-34: LGTM - Simplified store retrieval and correct join logic.The refactored approach using
context.GetTopicStore()is cleaner than the previous delegate-based approach. The async enumeration and join predicate application look correct.src/MinimalKafka/Metadata/Internals/TopicFormatterMetadataAttribute.cs (1)
4-7: LGTM - Good simplification with required formatter.Making the formatter parameter required (non-nullable) is a good design choice that prevents runtime errors and makes the intent clearer.
src/MinimalKafka/Serializers/SystemTextJsonSerializer.cs (1)
50-57: LGTM - Proper service registration pattern.The extension method correctly registers both the configured
JsonSerializerOptionsas a singleton and the generic serializer as transient, enabling proper dependency injection.src/MinimalKafka/Stream/IIntoBuilder.cs (1)
1-30: Well-structured builder interfacesThe interfaces are properly documented and follow good design patterns. The overloaded interface approach with different generic parameters is a valid pattern, though developers should be aware of potential ambiguity in type inference scenarios.
src/MinimalKafka.RocksDB/RocksDBStreamStore.cs (1)
29-36: Implementation looks good.The async enumerable implementation correctly iterates through all items and properly disposes of the iterator.
src/MinimalKafka/Builders/KafkaDataSource.cs (1)
31-69: Good architectural improvement with consumer aggregation.The refactoring to aggregate entries by
KafkaConsumerKeyand consolidate delegates per consumer is a solid architectural improvement that should reduce resource usage and improve consistency.src/MinimalKafka/KafkaExtensions.cs (1)
157-163: Good implementation of singleton topic formatterThe method correctly ensures only one
KafkaTopicFormatteris registered by removing existing ones before adding the new one.
| }) | ||
| .WithOffsetReset(AutoOffsetReset.Earliest) | ||
| .AsFeature("Locationstest"); | ||
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Verify impact of removing offset reset configuration.
While removing .AsFeature("Locationstest") aligns with the PR objectives, the removal of .WithOffsetReset(AutoOffsetReset.Earliest) might change the consumer behavior for this stream.
🏁 Script executed:
#!/bin/bash
# Description: Check if offset reset is configured elsewhere or if default behavior is acceptable
# Expected: Find global offset reset configuration or confirm default behavior is intended
# Search for other offset reset configurations
rg -A 3 -B 3 "OffsetReset|AutoOffsetReset"
# Check for global Kafka consumer configuration
rg -A 5 "AddMinimalKafka|WithConfiguration"Length of output: 11232
🏁 Script executed:
#!/bin/bash
# Locate WithSingle usage and its implementation context
echo "=== KafkaExtensions.cs: WithSingle ==="
rg "WithSingle" -n src/MinimalKafka/KafkaExtensions.cs -A3 -B1
# Locate ConfigMetadataAttribute and its FromConfig method
echo "=== ConfigMetadataAttribute definition ==="
rg "class ConfigMetadataAttribute" -n src/MinimalKafka -A5 -B1
echo "=== ConfigMetadataAttribute.FromConfig ==="
rg "FromConfig" -n src/MinimalKafka -A3 -B1
# Locate KafkaConfigConventionBuilder to understand how conventions are applied
echo "=== KafkaConfigConventionBuilder declaration ==="
rg "class KafkaConfigConventionBuilder" -n src/MinimalKafka -A5 -B1
# Inspect how conventions are applied in KafkaBuilder (if available)
echo "=== KafkaBuilder apply conventions ==="
rg "conventions" -n src/MinimalKafka -A5 -B1Length of output: 11461
Confirm Offset Reset Behavior
Removing .WithOffsetReset(AutoOffsetReset.Earliest) changes the consumer’s default from Earliest to Latest, which may cause new instances to skip unprocessed messages on startup.
• File: examples/KafkaAdventure/Features/Locations/LocationsFeature.cs (line 44)
- Current:
MapStream(...) .Into((location, ctx) => { … });
- Previously:
MapStream(...) - .WithOffsetReset(AutoOffsetReset.Earliest) .Into((location, ctx) => { … });
• Ensure your WithConfiguration(builder.Configuration.GetSection("kafka")) includes auto.offset.reset=earliest if you intend to preserve prior behavior.
• Otherwise, explicitly re-add the offset reset call or document that the new default is Latest.
🤖 Prompt for AI Agents
In examples/KafkaAdventure/Features/Locations/LocationsFeature.cs at line 44,
the removal of .WithOffsetReset(AutoOffsetReset.Earliest) changes the Kafka
consumer's default offset reset behavior from Earliest to Latest, which may
cause new consumer instances to skip unprocessed messages. To fix this, either
explicitly re-add the .WithOffsetReset(AutoOffsetReset.Earliest) call to
preserve the original behavior or ensure that the Kafka configuration section
used in WithConfiguration includes the setting auto.offset.reset=earliest.
Alternatively, document clearly that the offset reset default is now Latest if
that is intended.
| /// <summary> | ||
| /// | ||
| /// </summary> | ||
| internal interface ITopicFormaterMetadata |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix typo in interface name.
There's a typo in the interface name: ITopicFormaterMetadata should be ITopicFormatterMetadata (missing 't' in "Formatter").
-internal interface ITopicFormaterMetadata
+internal interface ITopicFormatterMetadata📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| internal interface ITopicFormaterMetadata | |
| internal interface ITopicFormatterMetadata |
🤖 Prompt for AI Agents
In src/MinimalKafka/Metadata/ITopicFormaterMetadata.cs at line 6, rename the
interface from ITopicFormaterMetadata to ITopicFormatterMetadata to correct the
typo by adding the missing 't' in "Formatter".
| /// <summary> | ||
| /// | ||
| /// </summary> | ||
| public interface IConfigMetadata | ||
| { | ||
|
|
||
| /// <summary> | ||
| /// | ||
| /// </summary> | ||
| ConsumerConfig ConsumerConfig { get; } | ||
|
|
||
| /// <summary> | ||
| /// | ||
| /// </summary> | ||
| ProducerConfig ProducerConfig { get; } | ||
|
|
||
| } No newline at end of file |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add meaningful XML documentation.
The interface has empty XML documentation summaries. Please add proper documentation to improve API discoverability and maintainability.
/// <summary>
-///
+/// Provides access to Kafka consumer and producer configuration metadata.
/// </summary>
public interface IConfigMetadata
{
/// <summary>
- ///
+ /// Gets the consumer configuration for Kafka operations.
/// </summary>
ConsumerConfig ConsumerConfig { get; }
/// <summary>
- ///
+ /// Gets the producer configuration for Kafka operations.
/// </summary>
ProducerConfig ProducerConfig { get; }
}🤖 Prompt for AI Agents
In src/MinimalKafka/Metadata/IConsumerConfigMetadata.cs between lines 5 and 21,
the XML documentation summaries for the interface and its properties are empty.
Add meaningful descriptions explaining the purpose of the IConfigMetadata
interface and what the ConsumerConfig and ProducerConfig properties represent to
improve API clarity and maintainability.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add null checks to prevent runtime exceptions.
Using First() without verification could throw InvalidOperationException if no IConfigMetadata exists in the metadata collection.
internal static class MetadataExtensions
{
public static string GetClientId(this IKafkaBuilder builder)
{
- return builder.MetaData.OfType<IConfigMetadata>().First().ConsumerConfig.ClientId;
+ var configMetadata = builder.MetaData.OfType<IConfigMetadata>().FirstOrDefault()
+ ?? throw new InvalidOperationException("No IConfigMetadata found in builder metadata.");
+ return configMetadata.ConsumerConfig.ClientId;
}
public static string GetGroupId(this IKafkaBuilder builder)
{
- return builder.MetaData.OfType<IConfigMetadata>().First().ConsumerConfig.GroupId;
+ var configMetadata = builder.MetaData.OfType<IConfigMetadata>().FirstOrDefault()
+ ?? throw new InvalidOperationException("No IConfigMetadata found in builder metadata.");
+ return configMetadata.ConsumerConfig.GroupId;
}
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| internal static class MetadataExtensions | |
| { | |
| public static string GetClientId(this IKafkaBuilder builder) | |
| { | |
| return builder.MetaData.OfType<IConfigMetadata>().First().ConsumerConfig.ClientId; | |
| } | |
| public static string GetGroupId(this IKafkaBuilder builder) | |
| { | |
| return builder.MetaData.OfType<IConfigMetadata>().First().ConsumerConfig.GroupId; | |
| } | |
| } | |
| internal static class MetadataExtensions | |
| { | |
| public static string GetClientId(this IKafkaBuilder builder) | |
| { | |
| var configMetadata = builder.MetaData | |
| .OfType<IConfigMetadata>() | |
| .FirstOrDefault() | |
| ?? throw new InvalidOperationException("No IConfigMetadata found in builder metadata."); | |
| return configMetadata.ConsumerConfig.ClientId; | |
| } | |
| public static string GetGroupId(this IKafkaBuilder builder) | |
| { | |
| var configMetadata = builder.MetaData | |
| .OfType<IConfigMetadata>() | |
| .FirstOrDefault() | |
| ?? throw new InvalidOperationException("No IConfigMetadata found in builder metadata."); | |
| return configMetadata.ConsumerConfig.GroupId; | |
| } | |
| } |
🤖 Prompt for AI Agents
In src/MinimalKafka/Builders/KafkaDataSource.cs around lines 99 to 110, the
methods use First() on the MetaData collection without checking if any
IConfigMetadata exists, which can cause runtime exceptions. Modify the methods
to safely check if any IConfigMetadata is present using FirstOrDefault(), then
verify the result is not null before accessing ConsumerConfig properties. If
null, handle appropriately to avoid exceptions.
| internal sealed class EmptyServiceProvider : IServiceProvider | ||
| { | ||
| public static EmptyServiceProvider Instance { get; } = new EmptyServiceProvider(); | ||
| public object? GetService(Type serviceType) => null; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Verify the usage and purpose of EmptyServiceProvider
The EmptyServiceProvider always returns null from GetService. Please verify this is the intended behavior and document its purpose.
🏁 Script executed:
#!/bin/bash
# Find usages of EmptyServiceProvider to understand its purpose
rg "EmptyServiceProvider" -A 3 -B 3Length of output: 3483
Consolidate and document EmptyServiceProvider null-object fallback
It looks like EmptyServiceProvider is defined twice (in KafkaContext.cs and KafkaDelegateFactory.cs), both implementing IServiceProvider to always return null. To improve clarity and avoid duplication, please:
- Consolidate into a single
internal sealed class EmptyServiceProviderin a shared namespace. - Add XML or inline documentation explaining that it’s a null-object pattern fallback when no DI container is supplied.
- Verify that returning
nullfromGetServiceis the intended behavior in all code paths (e.g. inKafkaDelegateFactory.Create).
Locations needing attention:
src/MinimalKafka/KafkaContext.cs(lines 101–105)src/MinimalKafka/Builders/KafkaDelegateFactory.cs(duplicate definition at end of file)
🤖 Prompt for AI Agents
In src/MinimalKafka/KafkaContext.cs lines 101-105 and
src/MinimalKafka/Builders/KafkaDelegateFactory.cs at the end, the
EmptyServiceProvider class is duplicated. Consolidate this class into a single
internal sealed class in a shared namespace accessible to both files. Add XML or
inline documentation to explain that this class implements the null-object
pattern by always returning null from GetService as a fallback when no DI
container is provided. Also, review usages such as KafkaDelegateFactory.Create
to confirm that returning null is appropriate in all cases before finalizing the
consolidation.
src/MinimalKafka/KafkaContext.cs
Outdated
| public T? GetKey<T>() | ||
| { | ||
| var serializer = RequestServices.GetRequiredService<IKafkaSerializer<T>>(); | ||
| return serializer.Deserialize(Key); | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Creates a new <see cref="KafkaContext"/> instance for the specified result, service provider, and metadata. | ||
| /// | ||
| /// </summary> | ||
| /// <param name="result">The consumed Kafka result object.</param> | ||
| /// <param name="serviceProvider">The service provider for dependency resolution.</param> | ||
| /// <param name="metadata">The metadata associated with the message.</param> | ||
| /// <returns>A concrete <see cref="KafkaContext"/> instance or <see cref="Empty"/> if the result is not valid.</returns> | ||
| public static KafkaContext Create(object result, IServiceProvider serviceProvider, IReadOnlyList<object> metadata) | ||
| /// <typeparam name="T"></typeparam> | ||
| /// <returns></returns> | ||
| public T? GetValue<T>() | ||
| { | ||
| var resultType = result.GetType(); | ||
| if (!resultType.IsGenericType) | ||
| { | ||
| return Empty; | ||
| } | ||
|
|
||
| if (resultType.GetGenericTypeDefinition() != typeof(ConsumeResult<,>).GetGenericTypeDefinition()) | ||
| { | ||
| return Empty; | ||
| } | ||
|
|
||
| var keyType = resultType.GenericTypeArguments[0]; | ||
| var valueType = resultType.GenericTypeArguments[1]; | ||
|
|
||
|
|
||
| var creator = typeof(KafkaContext<,>).MakeGenericType(keyType, valueType) | ||
| .GetConstructor([resultType, typeof(IServiceProvider), typeof(IReadOnlyList<object>)]); | ||
|
|
||
| return (KafkaContext)( | ||
| creator?.Invoke([result, serviceProvider, metadata]) ?? | ||
| Empty | ||
| ); | ||
| var serializer = RequestServices.GetRequiredService<IKafkaSerializer<T>>(); | ||
| return serializer.Deserialize(Value); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Handle missing serializer registrations gracefully
The GetKey<T>() and GetValue<T>() methods use GetRequiredService which will throw if the serializer is not registered. Consider adding error handling or documentation about this requirement.
🏁 Script executed:
#!/bin/bash
# Check if there are any catch blocks or error handling for GetRequiredService calls
rg -A 5 "GetRequiredService.*IKafkaSerializer" Length of output: 2653
Handle missing serializer registrations gracefully
GetKey() and GetValue() in src/MinimalKafka/KafkaContext.cs (lines 63–78) rely on RequestServices.GetRequiredService<IKafkaSerializer>() and will throw if no serializer is registered. The same pattern appears in:
- src/MinimalKafka/KafkaContextExtensions.cs
- src/MinimalKafka/Stream/KafkaStoreExtensions.cs
Consider replacing GetRequiredService with a null-check using GetService or catching and re-throwing with a clearer error. For example:
- var serializer = RequestServices.GetRequiredService<IKafkaSerializer<T>>();
+ var serializer = RequestServices.GetService<IKafkaSerializer<T>>()
+ ?? throw new InvalidOperationException($"No IKafkaSerializer registered for type {typeof(T).Name}");This will produce a more descriptive error and avoid unexpected runtime failures.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| public T? GetKey<T>() | |
| { | |
| var serializer = RequestServices.GetRequiredService<IKafkaSerializer<T>>(); | |
| return serializer.Deserialize(Key); | |
| } | |
| /// <summary> | |
| /// Creates a new <see cref="KafkaContext"/> instance for the specified result, service provider, and metadata. | |
| /// | |
| /// </summary> | |
| /// <param name="result">The consumed Kafka result object.</param> | |
| /// <param name="serviceProvider">The service provider for dependency resolution.</param> | |
| /// <param name="metadata">The metadata associated with the message.</param> | |
| /// <returns>A concrete <see cref="KafkaContext"/> instance or <see cref="Empty"/> if the result is not valid.</returns> | |
| public static KafkaContext Create(object result, IServiceProvider serviceProvider, IReadOnlyList<object> metadata) | |
| /// <typeparam name="T"></typeparam> | |
| /// <returns></returns> | |
| public T? GetValue<T>() | |
| { | |
| var resultType = result.GetType(); | |
| if (!resultType.IsGenericType) | |
| { | |
| return Empty; | |
| } | |
| if (resultType.GetGenericTypeDefinition() != typeof(ConsumeResult<,>).GetGenericTypeDefinition()) | |
| { | |
| return Empty; | |
| } | |
| var keyType = resultType.GenericTypeArguments[0]; | |
| var valueType = resultType.GenericTypeArguments[1]; | |
| var creator = typeof(KafkaContext<,>).MakeGenericType(keyType, valueType) | |
| .GetConstructor([resultType, typeof(IServiceProvider), typeof(IReadOnlyList<object>)]); | |
| return (KafkaContext)( | |
| creator?.Invoke([result, serviceProvider, metadata]) ?? | |
| Empty | |
| ); | |
| var serializer = RequestServices.GetRequiredService<IKafkaSerializer<T>>(); | |
| return serializer.Deserialize(Value); | |
| } | |
| public T? GetKey<T>() | |
| { | |
| - var serializer = RequestServices.GetRequiredService<IKafkaSerializer<T>>(); | |
| + var serializer = RequestServices.GetService<IKafkaSerializer<T>>() | |
| + ?? throw new InvalidOperationException($"No IKafkaSerializer registered for type {typeof(T).Name}"); | |
| return serializer.Deserialize(Key); | |
| } | |
| /// <summary> | |
| /// | |
| /// </summary> | |
| /// <typeparam name="T"></typeparam> | |
| /// <returns></returns> | |
| public T? GetValue<T>() | |
| { | |
| - var serializer = RequestServices.GetRequiredService<IKafkaSerializer<T>>(); | |
| + var serializer = RequestServices.GetService<IKafkaSerializer<T>>() | |
| + ?? throw new InvalidOperationException($"No IKafkaSerializer registered for type {typeof(T).Name}"); | |
| return serializer.Deserialize(Value); | |
| } |
🤖 Prompt for AI Agents
In src/MinimalKafka/KafkaContext.cs lines 63 to 78, the methods GetKey<T>() and
GetValue<T>() use RequestServices.GetRequiredService<IKafkaSerializer<T>>()
which throws an exception if no serializer is registered. To fix this, replace
GetRequiredService with GetService to allow a null return, then check if the
serializer is null and throw a new exception with a clear, descriptive error
message indicating the missing serializer registration. This approach should
also be applied similarly in src/MinimalKafka/KafkaContextExtensions.cs and
src/MinimalKafka/Stream/KafkaStoreExtensions.cs.
| services.AddSingleton(sp => | ||
| { | ||
| var builder = sp.GetRequiredService<IKafkaBuilder>(); | ||
| var config = builder.MetaData.OfType<IConfigMetadata>().First(); | ||
| return new ProducerBuilder<byte[], byte[]>(config.ProducerConfig.AsEnumerable()) | ||
| .SetKeySerializer(Confluent.Kafka.Serializers.ByteArray) | ||
| .SetValueSerializer(Confluent.Kafka.Serializers.ByteArray) | ||
| .Build(); | ||
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add null check for metadata retrieval
The code uses First() to retrieve IConfigMetadata which will throw InvalidOperationException if no matching metadata exists. Consider using FirstOrDefault() with appropriate null handling.
-var config = builder.MetaData.OfType<IConfigMetadata>().First();
+var config = builder.MetaData.OfType<IConfigMetadata>().FirstOrDefault();
+if (config == null)
+{
+ throw new InvalidOperationException("No IConfigMetadata found in builder metadata.");
+}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| services.AddSingleton(sp => | |
| { | |
| var builder = sp.GetRequiredService<IKafkaBuilder>(); | |
| var config = builder.MetaData.OfType<IConfigMetadata>().First(); | |
| return new ProducerBuilder<byte[], byte[]>(config.ProducerConfig.AsEnumerable()) | |
| .SetKeySerializer(Confluent.Kafka.Serializers.ByteArray) | |
| .SetValueSerializer(Confluent.Kafka.Serializers.ByteArray) | |
| .Build(); | |
| }); | |
| services.AddSingleton(sp => | |
| { | |
| var builder = sp.GetRequiredService<IKafkaBuilder>(); | |
| - var config = builder.MetaData.OfType<IConfigMetadata>().First(); | |
| + var config = builder.MetaData.OfType<IConfigMetadata>().FirstOrDefault(); | |
| + if (config == null) | |
| + { | |
| + throw new InvalidOperationException("No IConfigMetadata found in builder metadata."); | |
| + } | |
| return new ProducerBuilder<byte[], byte[]>(config.ProducerConfig.AsEnumerable()) | |
| .SetKeySerializer(Confluent.Kafka.Serializers.ByteArray) | |
| .SetValueSerializer(Confluent.Kafka.Serializers.ByteArray) | |
| .Build(); | |
| }); |
🤖 Prompt for AI Agents
In src/MinimalKafka/KafkaExtensions.cs around lines 46 to 54, replace the use of
First() with FirstOrDefault() when retrieving IConfigMetadata to avoid
InvalidOperationException if no matching metadata exists. Add a null check after
retrieval and handle the case where the metadata is null, such as throwing a
clear exception or returning a default value, to ensure safe and predictable
behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
src/MinimalKafka/Internals/KafkaProcess.cs(1 hunks)src/MinimalKafka/MinimalKafka.csproj(1 hunks)test/MinimalKafka.Tests/DelegateFactoryTests.cs(3 hunks)test/MinimalKafka.Tests/KafkaContextTests.cs(3 hunks)test/MinimalKafka.Tests/KafkaProcessTests.cs(2 hunks)
✅ Files skipped from review due to trivial changes (1)
- src/MinimalKafka/MinimalKafka.csproj
🚧 Files skipped from review as they are similar to previous changes (1)
- src/MinimalKafka/Internals/KafkaProcess.cs
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Analyze
🔇 Additional comments (12)
test/MinimalKafka.Tests/KafkaContextTests.cs (5)
2-2: Good addition for encoding operations.The
System.Textnamespace import is correctly added to support the UTF-8 encoding operations used throughout the tests.
12-13: Correct transition to byte array handling.The conversion from string literals to byte arrays using
Encoding.UTF8.GetBytesproperly reflects the architectural shift to byte-array-based message handling. This ensures test data matches the newKafkaContextrequirements.Also applies to: 41-42
32-33: Appropriate byte array equality assertions.Using
SequenceEqualfor byte array comparisons is the correct approach since direct equality (==) would compare references rather than content for arrays.Also applies to: 58-59
60-60: Clarify Headers assertion changeThe
Headersproperty now returns a newIReadOnlyDictionary<string, byte[]>(via_message.Headers.ToDictionary(…)) rather than exposing the originalHeadersinstance. Consequently, testing for instance equality (BeSameAs) is no longer valid—asserting emptiness (BeEmpty) correctly verifies an empty header collection.• src/MinimalKafka/KafkaContext.cs –
public IReadOnlyDictionary<string, byte[]> Headers => _message.Headers
.ToDictionary(x => x.Key, y => y.GetValueBytes());If you later need to verify header mapping, add a test where you populate
Message.Headersand assert thatcontext.Headerscontains the expected key/value pairs.
28-28: ConfirmKafkaConsumerKey.Random& empty-array metadataPlease verify that:
KafkaConsumerKey.Random("topic")produces a valid, correctly-formattedKafkaConsumerKeyfor tests.- The final
[]argument aligns with theKafkaContext.Createsignature (type and behavior) and yields an empty metadata collection as intended.Key locations to check:
- test/MinimalKafka.Tests/KafkaContextTests.cs (lines 28, 55)
- test/MinimalKafka.Tests/DelegateFactoryTests.cs (lines 97, 183)
- src/MinimalKafka/Internals/KafkaContextProducer.cs (line 61)
test/MinimalKafka.Tests/KafkaProcessTests.cs (4)
1-2: Updated imports align with refactored dependencies.The import changes reflect the shift to interface-based design and internal abstractions, which supports better testability and maintainability.
7-8: Good transition to interface-based testing.Moving from concrete
KafkaConsumertoIKafkaConsumerinterface improves testability and follows dependency inversion principle. The corresponding mock setup is correctly updated.Also applies to: 15-16
60-60: Test focus shifted to subscription verification.The assertion now verifies that
Subscribewas called, which aligns with the simplified testing approach. This is appropriate if the test's primary concern is lifecycle management rather than message processing details.
18-18: Verify simplified KafkaProcess.Create signatureWe’ve confirmed that in
src/MinimalKafka/Internals/KafkaProcess.cstheCreatemethod is now defined as:public static KafkaProcess Create(IKafkaConsumer consumer, ILogger<KafkaProcess> logger) => new(consumer, logger);and that the
KafkaProcessOptionstype has been removed from the codebase. Please manually ensure that this new overload still provides all of the configuration and behavior that was previously exposed viaKafkaProcessOptions.• Affected test locations:
– test/MinimalKafka.Tests/KafkaProcessTests.cs (lines 18 and 26)test/MinimalKafka.Tests/DelegateFactoryTests.cs (3)
2-8: Comprehensive imports support new serialization testing.The additional using statements for dependency injection, serializers, text encoding, and JSON properly support the expanded test functionality for the new serialization approach.
97-97: Correct update to new KafkaContext API.The change from
KafkaContext.EmptytoKafkaContext.Createwith proper parameters aligns with the architectural refactoring and ensures tests use the actual context creation path.
170-171: No action needed: test data matches JSON serialization outputThe
SystemTextJsonSerializer<T>uses the default Web settings (JsonSerializerDefaults.Web), and JSON-serializing a string always includes the surrounding double quotes. Therefore:
- Encoding.UTF8.GetBytes(""testKey"")
- Encoding.UTF8.GetBytes(""testValue"")
correctly reflects the serializer’s output for string values.
| _consumer.Consume(Arg.Any<CancellationToken>()) | ||
| .Returns(Task.CompletedTask); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Simplified mock setup may miss important behaviors.
The Consume method mock now simply returns Task.CompletedTask without any delegate invocation. While this simplifies testing, ensure that important consumption behaviors aren't being overlooked in the test coverage.
Consider adding more comprehensive tests that verify the actual message consumption workflow, especially if delegate invocation logic has been moved elsewhere in the architecture.
🤖 Prompt for AI Agents
In test/MinimalKafka.Tests/KafkaProcessTests.cs around lines 51 to 52, the mock
setup for _consumer.Consume simply returns Task.CompletedTask, which may omit
important consumption behaviors. Update the mock to simulate actual message
consumption by invoking the delegate or callback logic expected during
consumption, ensuring the test covers the real workflow. Additionally, add or
enhance tests to verify that message consumption and delegate invocations behave
correctly in the overall architecture.
| [Fact] | ||
| public async Task Key_Should_Be_Serialized() | ||
| { | ||
| var services = new ServiceCollection(); | ||
| services.AddSingleton(JsonSerializerOptions.Default); | ||
| //services.AddTransient(typeof(JsonTextSerializer<>)); | ||
|
|
||
| // Arrange | ||
| var serviceProvider = services.BuildServiceProvider(); | ||
|
|
||
| var kafkaBuilder = Substitute.For<IKafkaBuilder>(); | ||
| var options = new KafkaDelegateFactoryOptions | ||
| { | ||
| ServiceProvider = serviceProvider, | ||
| KafkaBuilder = kafkaBuilder | ||
| }; | ||
| Delegate handler = ([FromKey] string key, [FromValue] string value) => { | ||
|
|
||
| key.Should().Be("testKey"); | ||
| value.Should().Be("testValue"); | ||
|
|
||
| return Task.CompletedTask; | ||
| }; | ||
| // Act | ||
| var result = KafkaDelegateFactory.Create(handler, options); | ||
| // Assert | ||
| result.KeyType.Should().Be(typeof(string)); | ||
| result.ValueType.Should().Be(typeof(string)); | ||
|
|
||
| var key = Encoding.UTF8.GetBytes("\"testKey\""); | ||
| var value = Encoding.UTF8.GetBytes("\"testValue\""); | ||
| var headers = new Headers(); | ||
| var consumeResult = new ConsumeResult<byte[], byte[]> | ||
| { | ||
| Message = new Message<byte[], byte[]> | ||
| { | ||
| Key = key, | ||
| Value = value, | ||
| Headers = headers | ||
| } | ||
| }; | ||
|
|
||
| var context = KafkaContext.Create(KafkaConsumerKey.Random("topic"), consumeResult.Message, serviceProvider, []); | ||
|
|
||
| try | ||
| { | ||
| await result.Delegate.Invoke(context); | ||
| } catch (Exception ex) | ||
| { | ||
| Console.WriteLine(ex.ToString()); | ||
| } | ||
|
|
||
|
|
||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Comprehensive serialization test with minor issues.
The new test Key_Should_Be_Serialized provides excellent coverage for the serialization/deserialization functionality. However, there are a few concerns:
- Commented code: Line 146 contains commented-out service registration that should be removed or explained.
- Exception handling: The try-catch block (lines 185-192) catches exceptions but only logs them, potentially masking test failures.
- Test method name: The name suggests only key serialization, but it tests both key and value.
Apply these improvements:
- //services.AddTransient(typeof(JsonTextSerializer<>));- try
- {
- await result.Delegate.Invoke(context);
- } catch (Exception ex)
- {
- Console.WriteLine(ex.ToString());
- }
+ // Act & Assert - should not throw
+ await result.Delegate.Invoke(context);- public async Task Key_Should_Be_Serialized()
+ public async Task Key_And_Value_Should_Be_Deserialized()📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| [Fact] | |
| public async Task Key_Should_Be_Serialized() | |
| { | |
| var services = new ServiceCollection(); | |
| services.AddSingleton(JsonSerializerOptions.Default); | |
| //services.AddTransient(typeof(JsonTextSerializer<>)); | |
| // Arrange | |
| var serviceProvider = services.BuildServiceProvider(); | |
| var kafkaBuilder = Substitute.For<IKafkaBuilder>(); | |
| var options = new KafkaDelegateFactoryOptions | |
| { | |
| ServiceProvider = serviceProvider, | |
| KafkaBuilder = kafkaBuilder | |
| }; | |
| Delegate handler = ([FromKey] string key, [FromValue] string value) => { | |
| key.Should().Be("testKey"); | |
| value.Should().Be("testValue"); | |
| return Task.CompletedTask; | |
| }; | |
| // Act | |
| var result = KafkaDelegateFactory.Create(handler, options); | |
| // Assert | |
| result.KeyType.Should().Be(typeof(string)); | |
| result.ValueType.Should().Be(typeof(string)); | |
| var key = Encoding.UTF8.GetBytes("\"testKey\""); | |
| var value = Encoding.UTF8.GetBytes("\"testValue\""); | |
| var headers = new Headers(); | |
| var consumeResult = new ConsumeResult<byte[], byte[]> | |
| { | |
| Message = new Message<byte[], byte[]> | |
| { | |
| Key = key, | |
| Value = value, | |
| Headers = headers | |
| } | |
| }; | |
| var context = KafkaContext.Create(KafkaConsumerKey.Random("topic"), consumeResult.Message, serviceProvider, []); | |
| try | |
| { | |
| await result.Delegate.Invoke(context); | |
| } catch (Exception ex) | |
| { | |
| Console.WriteLine(ex.ToString()); | |
| } | |
| } | |
| [Fact] | |
| public async Task Key_And_Value_Should_Be_Deserialized() | |
| { | |
| var services = new ServiceCollection(); | |
| services.AddSingleton(JsonSerializerOptions.Default); | |
| // Arrange | |
| var serviceProvider = services.BuildServiceProvider(); | |
| var kafkaBuilder = Substitute.For<IKafkaBuilder>(); | |
| var options = new KafkaDelegateFactoryOptions | |
| { | |
| ServiceProvider = serviceProvider, | |
| KafkaBuilder = kafkaBuilder | |
| }; | |
| Delegate handler = ([FromKey] string key, [FromValue] string value) => { | |
| key.Should().Be("testKey"); | |
| value.Should().Be("testValue"); | |
| return Task.CompletedTask; | |
| }; | |
| // Act | |
| var result = KafkaDelegateFactory.Create(handler, options); | |
| // Assert | |
| result.KeyType.Should().Be(typeof(string)); | |
| result.ValueType.Should().Be(typeof(string)); | |
| var key = Encoding.UTF8.GetBytes("\"testKey\""); | |
| var value = Encoding.UTF8.GetBytes("\"testValue\""); | |
| var headers = new Headers(); | |
| var consumeResult = new ConsumeResult<byte[], byte[]> | |
| { | |
| Message = new Message<byte[], byte[]> | |
| { | |
| Key = key, | |
| Value = value, | |
| Headers = headers | |
| } | |
| }; | |
| var context = KafkaContext.Create(KafkaConsumerKey.Random("topic"), consumeResult.Message, serviceProvider, []); | |
| // Act & Assert - should not throw | |
| await result.Delegate.Invoke(context); | |
| } |
🤖 Prompt for AI Agents
In test/MinimalKafka.Tests/DelegateFactoryTests.cs from lines 141 to 194, remove
the commented-out service registration on line 146 to clean up the code. Replace
the try-catch block around the delegate invocation with a direct await call so
that any exceptions cause the test to fail instead of just logging them. Rename
the test method to reflect that it verifies both key and value serialization,
for example, rename it to KeyAndValue_Should_Be_Serialized.
Adds factory and proxy to allow users to provide custom JsonSerializerOptions for serialization. This enables better control over JSON serialization behavior within the Kafka message processing pipeline.
Updates package versions in Directory.Packages.props to address security vulnerabilities and improve performance. Migrates to AwesomeAssertions from FluentAssertions. Updates xunit runner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 6
♻️ Duplicate comments (1)
src/MinimalKafka/KafkaExtensions.cs (1)
45-53: The previous review concern about exception handling remains unaddressed.The code still uses
First()which can throwInvalidOperationExceptionif noIConfigMetadataexists, as highlighted in the previous review.Apply the previously suggested fix:
-var config = builder.MetaData.OfType<IConfigMetadata>().First(); +var config = builder.MetaData.OfType<IConfigMetadata>().FirstOrDefault(); +if (config == null) +{ + throw new InvalidOperationException("No IConfigMetadata found in builder metadata."); +}
🧹 Nitpick comments (1)
src/MinimalKafka/KafkaExtensions.cs (1)
296-315: Review the UseJsonSerializer method implementation.The method registers
SystemTextJsonSerializer<>directly instead of using the factory pattern established elsewhere in the codebase. This creates an inconsistency with theISerializerFactoryapproach.Consider aligning with the factory pattern:
public static TBuilder UseJsonSerializer<TBuilder>(this TBuilder builder, Action<JsonSerializerOptions> options) where TBuilder : IKafkaConfigBuilder { - builder.Services.AddTransient(typeof(IKafkaSerializer<>), typeof(SystemTextJsonSerializer<>)); + var jsonOptions = new JsonSerializerOptions(JsonSerializerDefaults.Web); + options?.Invoke(jsonOptions); + builder.Services.AddSingleton<ISerializerFactory>(new SystemTextJsonSerializerFactory(jsonOptions)); + builder.Services.AddTransient(typeof(IKafkaSerializer<>), typeof(KafkaSerializerProxy<>)); return builder; }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (8)
src/MinimalKafka/KafkaExtensions.cs(2 hunks)src/MinimalKafka/Serializers/ISerializerFactory.cs(1 hunks)src/MinimalKafka/Serializers/KafkaSerializerProxy.cs(1 hunks)src/MinimalKafka/Serializers/SystemTextJsonSerializer.cs(1 hunks)src/MinimalKafka/Serializers/SystemTextJsonSerializerFactory.cs(1 hunks)test/MinimalKafka.Tests/Serializers/JsonTextSerializersTests.cs(7 hunks)test/MinimalKafka.Tests/Stream/StreamBuilderTests.cs(1 hunks)test/MinimalKafka.Tests/Stream/StreamExtensionsTests.cs(2 hunks)
✅ Files skipped from review due to trivial changes (2)
- test/MinimalKafka.Tests/Stream/StreamExtensionsTests.cs
- src/MinimalKafka/Serializers/KafkaSerializerProxy.cs
🚧 Files skipped from review as they are similar to previous changes (1)
- src/MinimalKafka/Serializers/SystemTextJsonSerializer.cs
🧰 Additional context used
🪛 GitHub Actions: Build
test/MinimalKafka.Tests/Stream/StreamBuilderTests.cs
[error] 19-19: Test failure in Join_ReturnsJoinBuilder_WithCorrectParams: Expected type to be MinimalKafka.Stream.IJoinBuilder<string, int, long, double>, but found MinimalKafka.Stream.Internals.JoinBuilder<string, int, long, double>.
[error] 30-30: Test failure in InnerJoin_ReturnsJoinBuilder_WithCorrectParams: Expected type to be MinimalKafka.Stream.IJoinBuilder<string, int, long, double>, but found MinimalKafka.Stream.Internals.JoinBuilder<string, int, long, double>.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Analyze
🔇 Additional comments (3)
test/MinimalKafka.Tests/Stream/StreamBuilderTests.cs (1)
8-10: LGTM: Well-defined test constants.The topic constants provide good test isolation and readability.
src/MinimalKafka/Serializers/SystemTextJsonSerializerFactory.cs (1)
5-13: Well-implemented factory pattern.The implementation correctly follows the factory pattern with appropriate defaults. Using
JsonSerializerDefaults.Webas the default is a good choice for typical web applications.test/MinimalKafka.Tests/Serializers/JsonTextSerializersTests.cs (1)
12-99: Test updates correctly reflect the new serializer API.The migration from
JsonTextSerializertoSystemTextJsonSerializeris properly implemented across all test methods. The simplified API (removingSerializationContextparameters) indicates better encapsulation while maintaining comprehensive test coverage.
| public void Into_Calls_Builder_MapTopic_And_Delegates_Handler() | ||
| { | ||
| var builder = new KafkaBuilder(EmptyServiceProvider.Instance); | ||
|
|
||
| var streamBuilder = new StreamBuilder<string, int>(builder, _mainTopic); | ||
|
|
||
| static Task Handler(KafkaContext ctx, string key, int value) | ||
| { | ||
| return Task.CompletedTask; | ||
| } | ||
|
|
||
| // Act | ||
| var result = streamBuilder.Into(Handler); | ||
|
|
||
|
|
||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Complete the test implementation with proper assertions.
The test is incomplete - it sets up the scenario and calls the method but lacks any assertions to verify the expected behavior. The comment suggests there should be an "Assert" section.
// Act
var result = streamBuilder.Into(Handler);
-
-
+
+ // Assert
+ result.Should().NotBeNull();
+ // Add additional assertions to verify the handler was registered correctlyConsider adding assertions to verify:
- The result is not null
- The builder's MapTopic method was called with correct parameters
- The handler was properly registered
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| [Fact] | |
| public void Into_Calls_Builder_MapTopic_And_Delegates_Handler() | |
| { | |
| var builder = new KafkaBuilder(EmptyServiceProvider.Instance); | |
| var streamBuilder = new StreamBuilder<string, int>(builder, _mainTopic); | |
| static Task Handler(KafkaContext ctx, string key, int value) | |
| { | |
| return Task.CompletedTask; | |
| } | |
| // Act | |
| var result = streamBuilder.Into(Handler); | |
| } | |
| [Fact] | |
| public void Into_Calls_Builder_MapTopic_And_Delegates_Handler() | |
| { | |
| var builder = new KafkaBuilder(EmptyServiceProvider.Instance); | |
| var streamBuilder = new StreamBuilder<string, int>(builder, _mainTopic); | |
| static Task Handler(KafkaContext ctx, string key, int value) | |
| { | |
| return Task.CompletedTask; | |
| } | |
| // Act | |
| var result = streamBuilder.Into(Handler); | |
| // Assert | |
| result.Should().NotBeNull(); | |
| // Add additional assertions to verify the handler was registered correctly | |
| } |
🤖 Prompt for AI Agents
In test/MinimalKafka.Tests/Stream/StreamBuilderTests.cs between lines 33 and 49,
the test method Into_Calls_Builder_MapTopic_And_Delegates_Handler is incomplete
as it lacks assertions. Add assertions to verify that the result of
streamBuilder.Into(Handler) is not null, confirm that the builder's MapTopic
method was called with the expected topic and handler parameters, and ensure the
handler was properly registered. Use mocking or appropriate verification
techniques to check these conditions and complete the test.
src/MinimalKafka/KafkaExtensions.cs
Outdated
| builder.Add(b => { | ||
| var item = b.MetaData.OfType<IConfigMetadata>().FirstOrDefault(); | ||
| if(item is null) | ||
| { | ||
| item = new ConfigMetadataAttribute(new Dictionary<string, string>()); | ||
| b.MetaData.Add(item); | ||
| } | ||
|
|
||
| update?.Invoke(item.ConsumerConfig); | ||
| }); | ||
| return builder; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Inconsistent null handling pattern in UpdateConsumerConfig.
The method uses FirstOrDefault() with proper null checking, which is the correct approach. However, this pattern should be consistently applied throughout the codebase, particularly in the producer registration (lines 45-53).
Verify that this pattern is consistently applied across similar methods:
🏁 Script executed:
#!/bin/bash
# Description: Check for inconsistent usage of First() vs FirstOrDefault() with IConfigMetadata
# Search for First() usage with IConfigMetadata
echo "=== Usage of First() with IConfigMetadata ==="
rg -A 2 -B 2 'OfType<IConfigMetadata>\(\)\.First\('
echo -e "\n=== Usage of FirstOrDefault() with IConfigMetadata ==="
rg -A 2 -B 2 'OfType<IConfigMetadata>\(\)\.FirstOrDefault\('Length of output: 2829
Ensure consistent null‐handling for IConfigMetadata retrieval
The UpdateConsumerConfig extension correctly uses FirstOrDefault() with a null check, but elsewhere we still call .First() unguarded. This can lead to runtime InvalidOperationException if metadata is missing. Please update the following locations to mirror the safe pattern:
• Producer registration (src/MinimalKafka/KafkaExtensions.cs)
- var config = builder.MetaData.OfType<IConfigMetadata>().First();
+ var config = builder.MetaData.OfType<IConfigMetadata>().FirstOrDefault()
+ ?? throw new InvalidOperationException(
+ "Missing IConfigMetadata; call UpdateConsumerConfig or AddConsumerConfig first.");• GetClientId / GetGroupId (src/MinimalKafka/Builders/KafkaDataSource.cs)
- return builder.MetaData.OfType<IConfigMetadata>().First().ConsumerConfig.ClientId;
+ return builder.MetaData.OfType<IConfigMetadata>().FirstOrDefault()?.ConsumerConfig.ClientId
+ ?? throw new InvalidOperationException(
+ "Missing IConfigMetadata; ensure consumer config is registered before calling GetClientId().");(Similarly for GroupId.)
• CreateConsumer (src/MinimalKafka/Builders/KafkaConsumerBuilder.cs)
- var config = builder.MetaData.OfType<IConfigMetadata>().First();
+ var config = builder.MetaData.OfType<IConfigMetadata>().FirstOrDefault()
+ ?? throw new InvalidOperationException(
+ "Missing IConfigMetadata; call UpdateConsumerConfig before building the consumer.");This will align all metadata lookups with the null‐safe pattern.
🤖 Prompt for AI Agents
In src/MinimalKafka/KafkaExtensions.cs around lines 207 to 217, the code safely
retrieves IConfigMetadata using FirstOrDefault() with a null check, but other
parts of the codebase still use First() without null checks, risking
InvalidOperationException. Locate all instances in Producer registration (same
file), GetClientId and GetGroupId methods in
src/MinimalKafka/Builders/KafkaDataSource.cs, and CreateConsumer in
src/MinimalKafka/Builders/KafkaConsumerBuilder.cs where IConfigMetadata is
retrieved with First(), and replace those calls with FirstOrDefault() followed
by appropriate null checks before usage to ensure consistent and safe null
handling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
♻️ Duplicate comments (2)
src/MinimalKafka.RocksDB/RocksDBStreamStore.cs (2)
10-14: Error handling for RocksDB operations is still missing.The
db.Putoperation can throw exceptions if the database is closed or corrupted. The past review comment about adding error handling is still valid and should be addressed.
16-20: Missing parameter validation and error handling.The
FindByIdAsyncmethod lacks parameter validation for the key and doesn't handle potential RocksDB exceptions. The past review comment about adding validation and error handling is still applicable.
🧹 Nitpick comments (13)
test/MinimalKafka.Tests/KafkaServiceTests.cs (2)
14-14: Minor naming convention issue: variable name should follow camelCase.The variable
_datasourceshould be_dataSourceto follow C# naming conventions.- var _datasource = Substitute.For<IKafkaDataSource>(); + var _dataSource = Substitute.For<IKafkaDataSource>();Also update the references on lines 19 and 21:
- _datasource.GetProceses().Returns(_processes); + _dataSource.GetProceses().Returns(_processes); - _builder.DataSource.Returns(_datasource); + _builder.DataSource.Returns(_dataSource);
70-70: Fix indentation inconsistency.The test method declaration has inconsistent indentation - it's missing proper spacing.
- [Fact] + [Fact]src/MinimalKafka.RocksDB/RocksDBStreamStore.cs (1)
22-31: Consider removing pragma warnings by making the method truly async.The
GetItemsmethod uses pragma warnings to suppress async method warnings, but this could indicate a design issue. If the method needs to be async for consistency with the interface, consider yielding asynchronously.-#pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously public async IAsyncEnumerable<byte[]> GetItems() { using var iterator = db.NewIterator(cfHandle); for (iterator.SeekToFirst(); iterator.Valid(); iterator.Next()) { + await Task.Yield(); // Allow other tasks to run yield return iterator.Value(); } } -#pragma warning restore CS1998 // Async method lacks 'await' operators and will run synchronouslysrc/MinimalKafka/KafkaConsumerConfigExtensions.cs (5)
14-15: Complete the XML documentation summary.The class-level XML documentation has an empty summary. This should be completed to describe the purpose of the extension methods.
/// <summary> -/// +/// Provides extension methods for configuring Kafka consumer and producer settings. /// </summary>
45-51: Complete the XML documentation.The method has incomplete XML documentation with empty summary and missing parameter descriptions.
/// <summary> -/// +/// Adds group ID metadata to the builder. /// </summary> -/// <typeparam name="TBuilder"></typeparam> -/// <param name="builder"></param> -/// <param name="groupId"></param> -/// <returns></returns> +/// <typeparam name="TBuilder">The type of the Kafka convention builder.</typeparam> +/// <param name="builder">The builder to configure.</param> +/// <param name="groupId">The group ID.</param> +/// <returns>The same <typeparamref name="TBuilder"/> instance for chaining.</returns>
74-80: Complete the XML documentation.The method has incomplete XML documentation with empty summary and missing parameter descriptions.
/// <summary> -/// +/// Adds bootstrap servers metadata to the builder. /// </summary> -/// <typeparam name="TBuilder"></typeparam> -/// <param name="builder"></param> -/// <param name="bootstrapServers"></param> -/// <returns></returns> +/// <typeparam name="TBuilder">The type of the Kafka convention builder.</typeparam> +/// <param name="builder">The builder to configure.</param> +/// <param name="bootstrapServers">The bootstrap servers.</param> +/// <returns>The same <typeparamref name="TBuilder"/> instance for chaining.</returns>
99-105: Complete the XML documentation.The method has incomplete XML documentation with empty summary and missing parameter descriptions.
/// <summary> -/// +/// Adds transactional ID metadata to the builder. /// </summary> -/// <typeparam name="TBuilder"></typeparam> -/// <param name="builder"></param> -/// <param name="transactionalId"></param> -/// <returns></returns> +/// <typeparam name="TBuilder">The type of the Kafka convention builder.</typeparam> +/// <param name="builder">The builder to configure.</param> +/// <param name="transactionalId">The transactional ID.</param> +/// <returns>The same <typeparamref name="TBuilder"/> instance for chaining.</returns>
172-178: Consider atomic operation for WithSingle method.The
WithSinglemethod performs two separate operations (remove then add) which could create race conditions if used concurrently. Consider making this an atomic operation.public static TBuilder WithSingle<TBuilder>(this TBuilder builder, object metadata) where TBuilder : IKafkaConventionBuilder { - builder.RemoveMetaData(metadata); - builder.WithMetaData(metadata); + builder.Add(b => + { + b.MetaData.RemoveAll(x => x.GetType() == metadata.GetType()); + b.MetaData.Add(metadata); + }); return builder; }examples/Examples/Aggregate/Result.cs (1)
5-11: Consider using params array more efficiently.The
Failedmethod creates a new array forerrorMessageparameter. Consider using a span or collection expression for better performance.public static Result<TResult> Failed<TResult>(TResult value, params string[] errorMessage) => new() { State = value, ErrorMessage = errorMessage, IsSuccess = false };The current implementation is fine, but if this method is called frequently, consider performance optimizations.
examples/Examples/Program.cs (2)
12-22: Consider removing commented configuration or documenting why it's preserved.The configuration has several commented lines. If these are no longer needed, they should be removed for code cleanliness. If they're kept for reference, consider adding a comment explaining why.
Either remove the commented lines:
- //.WithBootstrapServers("nas:9092") - //.WithOffsetReset(AutoOffsetReset.Earliest) - //.WithPartitionAssignedHandler((_, p) => p.Select(tp => new TopicPartitionOffset(tp, Offset.Beginning)))Or add a comment explaining their purpose:
+ // Alternative configurations for different environments: //.WithBootstrapServers("nas:9092")
32-89: Remove commented code if no longer needed.There's a large block of commented code. If this code is no longer needed, it should be removed to improve code readability. If it's kept for reference, consider moving it to documentation or adding a comment explaining its purpose.
Consider removing the commented code entirely or adding a comment explaining why it's preserved:
+// Previous implementation examples - kept for reference during migration //app.MapTopic("my-topic", ([FromKey] string key, [FromValue] string value) =>examples/Examples/Aggregate/TestAggregate.cs (1)
1-1: Remove unused importThe
Microsoft.AspNetCore.Mvc.TagHelpers.Cacheimport is not used in this file and appears to be accidentally included.-using Microsoft.AspNetCore.Mvc.TagHelpers.Cache; -examples/Examples/Aggregate/AggregateExtensions.cs (1)
46-49: Add clarifying comment for early return logicThe condition for early return when
c.ConsumerKey.TopicName == namecould benefit from a comment explaining why we skip processing when the command comes from the aggregate state topic.+ // Skip processing if command is null or if the message is from the state topic (to avoid circular processing) if (cmd is null || c.ConsumerKey.TopicName == name) { return; }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (14)
examples/Examples/Aggregate/AggregateExtensions.cs(1 hunks)examples/Examples/Aggregate/IAggregate.cs(1 hunks)examples/Examples/Aggregate/IAggregateCommands.cs(1 hunks)examples/Examples/Aggregate/Result.cs(1 hunks)examples/Examples/Aggregate/TestAggregate.cs(1 hunks)examples/Examples/Examples.csproj(0 hunks)examples/Examples/Program.cs(2 hunks)global.json(1 hunks)src/MinimalKafka.RocksDB/RocksDBStreamStore.cs(1 hunks)src/MinimalKafka/Internals/KafkaService.cs(3 hunks)src/MinimalKafka/KafkaConsumerConfigExtensions.cs(1 hunks)src/MinimalKafka/KafkaExtensions.cs(2 hunks)src/MinimalKafka/Serializers/SystemTextJsonSerializer.cs(1 hunks)test/MinimalKafka.Tests/KafkaServiceTests.cs(1 hunks)
💤 Files with no reviewable changes (1)
- examples/Examples/Examples.csproj
✅ Files skipped from review due to trivial changes (3)
- global.json
- src/MinimalKafka/Internals/KafkaService.cs
- examples/Examples/Aggregate/IAggregateCommands.cs
🚧 Files skipped from review as they are similar to previous changes (2)
- src/MinimalKafka/Serializers/SystemTextJsonSerializer.cs
- src/MinimalKafka/KafkaExtensions.cs
🧰 Additional context used
🧬 Code Graph Analysis (3)
examples/Examples/Program.cs (1)
examples/Examples/Aggregate/TestAggregate.cs (2)
TestCommands(5-11)Test(18-19)
examples/Examples/Aggregate/Result.cs (2)
examples/Examples/Aggregate/IAggregate.cs (1)
Result(8-8)examples/Examples/Aggregate/TestAggregate.cs (3)
Result(21-37)Result(39-50)Result(52-63)
src/MinimalKafka.RocksDB/RocksDBStreamStore.cs (4)
src/MinimalKafka/Internals/KafkaInMemoryStoreFactory.cs (4)
IKafkaStore(11-23)ValueTask(46-53)ValueTask(60-63)IAsyncEnumerable(65-68)src/MinimalKafka/IKafkaStore.cs (4)
IKafkaStore(48-48)ValueTask(21-21)ValueTask(28-28)IAsyncEnumerable(34-34)src/MinimalKafka.RocksDB/RocksDBStreamStoreFactory.cs (1)
IKafkaStore(55-70)src/MinimalKafka/Stream/KafkaStoreExtensions.cs (2)
ValueTask(7-16)IAsyncEnumerable(18-31)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Analyze
🔇 Additional comments (11)
test/MinimalKafka.Tests/KafkaServiceTests.cs (2)
1-1: LGTM: Using directive updated to reflect namespace refactoring.The change from
MinimalKafka.BuilderstoMinimalKafka.Internalsaligns with the internal API reorganization mentioned in the summary whereKafkaServicewas moved to theMinimalKafka.Internalsnamespace.
70-80: Well-designed test for null handling scenario.This test correctly validates that
KafkaServicehandles the case whereDataSourceis null (due to no mock setup) by returning an empty processes list. The approach of creating a separate builder instance for this specific test case is good practice and aligns with the refactored nullableDataSourceproperty mentioned in the summary.src/MinimalKafka/KafkaConsumerConfigExtensions.cs (1)
115-126: Review Thread Safety inbuilder.AddMetadata ManipulationThe lambda passed to
builder.Addreads and writes the sharedMetaDatacollection without any synchronization. If multiple threads invoke this extension method concurrently, you could encounter:
- Race conditions when retrieving or adding the
IConfigMetadatainstance.- Duplicate or lost updates to the
MetaDatalist.Please verify whether:
- The
IKafkaConventionBuilder.Addimplementation serializes access toMetaData, or- The underlying
MetaDatacollection is a thread-safe type.If it isn’t, consider one of the following fixes:
- Switch
MetaDatato a thread-safe collection (e.g.,ConcurrentBag<object>,ConcurrentDictionary<,>).- Introduce locking around the metadata check/add logic.
- Restrict
Addto single-threaded use and document this constraint.Requested next steps:
- Confirm the thread-safety guarantees of both
AddandMetaData.- Apply synchronization or update the implementation as needed.
examples/Examples/Aggregate/IAggregate.cs (1)
3-10: Well-designed aggregate interface.The interface follows the aggregate pattern effectively with clear separation of concerns. The static abstract methods provide a clean contract for aggregate implementations.
examples/Examples/Aggregate/Result.cs (1)
20-25: Implicit conversion is well-implemented.The implicit conversion from
TtoResult<T>provides a clean API for success cases, making the code more readable when returning successful results.examples/Examples/Program.cs (1)
2-2: Good addition of aggregate namespace.The addition of
using Examples.Aggregate;aligns with the new aggregate functionality being demonstrated.examples/Examples/Aggregate/TestAggregate.cs (3)
5-11: LGTM!The
TestCommandsclass correctly implements the aggregate command interface with appropriate properties.
39-50: LGTM!The
Incrementmethod correctly implements validation and immutable state update.
52-63: LGTM!The
Decrementmethod correctly implements validation and immutable state update, maintaining consistency with theIncrementmethod.examples/Examples/Aggregate/AggregateExtensions.cs (2)
76-86: LGTM!The
CommandResultfactory class correctly creates typed command results with all necessary properties.
88-92: LGTM!The
CommandResult<TState, TCommand>class appropriately extends the base result with command information.
| .WithGroupId(AppDomain.CurrentDomain.FriendlyName) | ||
| .WithClientId(AppDomain.CurrentDomain.FriendlyName) | ||
| .WithTransactionalId(AppDomain.CurrentDomain.FriendlyName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Using AppDomain.CurrentDomain.FriendlyName for multiple Kafka settings may cause issues.
Using the same value for GroupId, ClientId, and TransactionalId might not be optimal. These should typically be different values to avoid conflicts, especially in distributed environments.
- .WithGroupId(AppDomain.CurrentDomain.FriendlyName)
- .WithClientId(AppDomain.CurrentDomain.FriendlyName)
- .WithTransactionalId(AppDomain.CurrentDomain.FriendlyName)
+ .WithGroupId($"{AppDomain.CurrentDomain.FriendlyName}-group")
+ .WithClientId($"{AppDomain.CurrentDomain.FriendlyName}-client")
+ .WithTransactionalId($"{AppDomain.CurrentDomain.FriendlyName}-tx")🤖 Prompt for AI Agents
In examples/Examples/Program.cs around lines 13 to 15, the same value from
AppDomain.CurrentDomain.FriendlyName is used for GroupId, ClientId, and
TransactionalId, which can cause conflicts in Kafka settings. Update the code to
assign distinct and meaningful values to each of these properties to avoid
potential issues in distributed environments.
| var result = command.CommandName switch | ||
| { | ||
| nameof(Create) => Create(command), | ||
| nameof(Increment) => state.Increment(), | ||
| nameof(Decrement) => state.Decrement(), | ||
| _ => Result.Failed(state, "Unknown command: " + command.CommandName) | ||
| }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix type mismatch in Create case
The Create case returns Test but the switch expression expects Result<Test>. This will cause a compilation error.
- nameof(Create) => Create(command),
+ nameof(Create) => Result.Success(Create(command)),📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| var result = command.CommandName switch | |
| { | |
| nameof(Create) => Create(command), | |
| nameof(Increment) => state.Increment(), | |
| nameof(Decrement) => state.Decrement(), | |
| _ => Result.Failed(state, "Unknown command: " + command.CommandName) | |
| }; | |
| var result = command.CommandName switch | |
| { | |
| nameof(Create) => Result.Success(Create(command)), | |
| nameof(Increment) => state.Increment(), | |
| nameof(Decrement) => state.Decrement(), | |
| _ => Result.Failed(state, "Unknown command: " + command.CommandName) | |
| }; |
🤖 Prompt for AI Agents
In examples/Examples/Aggregate/TestAggregate.cs around lines 23 to 29, the
switch expression expects a Result<Test> return type, but the Create case
returns a Test type causing a type mismatch. Modify the Create case to return a
Result<Test> by wrapping the Create(command) call in a successful Result,
ensuring all switch cases return the same type.
| /// </summary> | ||
| /// <typeparam name="TKey"></typeparam> | ||
| /// <typeparam name="TCommand"></typeparam> | ||
| /// <typeparam name="TAgrregate"></typeparam> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix typo in generic parameter name
The generic parameter is misspelled as TAgrregate instead of TAggregate in multiple locations.
- /// <typeparam name="TAgrregate"></typeparam>
+ /// <typeparam name="TAggregate"></typeparam>- public static IKafkaConventionBuilder MapAggregate<TKey, TCommand, TAgrregate>(this IApplicationBuilder builder, string topic)
+ public static IKafkaConventionBuilder MapAggregate<TKey, TCommand, TAggregate>(this IApplicationBuilder builder, string topic)- where TAgrregate : IAggregate<TKey, TAgrregate, TCommand>
+ where TAggregate : IAggregate<TKey, TAggregate, TCommand>- return sb.MapAggregate<TKey, TCommand, TAgrregate>(topic);
+ return sb.MapAggregate<TKey, TCommand, TAggregate>(topic);- public static IKafkaConventionBuilder MapAggregate<TKey, TCommand, TAgrregate>(this IKafkaBuilder builder, string name)
+ public static IKafkaConventionBuilder MapAggregate<TKey, TCommand, TAggregate>(this IKafkaBuilder builder, string name)- where TAgrregate : IAggregate<TKey, TAgrregate, TCommand>
+ where TAggregate : IAggregate<TKey, TAggregate, TCommand>- state ??= TAgrregate.Create(cmd);
+ state ??= TAggregate.Create(cmd);- var result = TAgrregate.Apply(state, cmd);
+ var result = TAggregate.Apply(state, cmd);Also applies to: 17-17, 35-35, 37-37
🤖 Prompt for AI Agents
In examples/Examples/Aggregate/AggregateExtensions.cs at lines 13, 17, 35, and
37, the generic parameter name is misspelled as TAgrregate. Correct the spelling
to TAggregate consistently in all these locations to fix the typo.
This commit refactors the Kafka consumer implementation for improved modularity and testability. It introduces a configuration object for Kafka consumers, centralizing configuration settings. Also adds handlers, offset management and report interval functionality.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
♻️ Duplicate comments (7)
examples/Examples/Aggregate/TestAggregate.cs (1)
27-34: Fix type mismatch in Create caseThe
Createcase returnsTestbut the switch expression expectsResult<Test>. This will cause a compilation error.- nameof(Create) => Create(command), + nameof(Create) => Result.Success(Create(command)),test/MinimalKafka.Tests/DelegateFactoryTests.cs (3)
147-147: Remove commented code.The commented-out service registration should be removed to clean up the code.
- //services.AddTransient(typeof(JsonTextSerializer<>));
187-194: Exception handling masks test failures.The try-catch block catches exceptions but only logs them, potentially masking test failures. Tests should fail when exceptions occur.
- try - { - await result.Delegate.Invoke(context); - } catch (Exception ex) - { - Console.WriteLine(ex.ToString()); - } + // Act & Assert - should not throw + await result.Delegate.Invoke(context);
143-143: Test method name doesn't reflect full functionality.The method name suggests only key serialization, but it tests both key and value deserialization.
- public async Task Key_Should_Be_Serialized() + public async Task Key_And_Value_Should_Be_Deserialized()src/MinimalKafka/Internals/KafkaConsumer.cs (1)
74-74: Fix typo in logger method name.There's a typo in the logger method name "OperatonCanceled" - it should be "OperationCanceled".
-logger.OperatonCanceled(config.Key.GroupId, config.Key.ClientId); +logger.OperationCanceled(config.Key.GroupId, config.Key.ClientId);src/MinimalKafka/KafkaContext.cs (2)
64-79: Handle missing serializer registrations gracefullyThe GetKey() and GetValue() methods use GetRequiredService which will throw if the serializer is not registered.
102-106: Consolidate and document EmptyServiceProvider null-object fallbackThe EmptyServiceProvider class is duplicated across multiple files and lacks documentation.
🧹 Nitpick comments (7)
examples/Examples/Aggregate/TestAggregate.cs (1)
79-79: Fix typos in error messageThe error message contains two typos: "connot" should be "cannot" and "then" should be "than".
- return Result.Failed(this, "Counter connot be more then 100."); + return Result.Failed(this, "Counter cannot be more than 100.");src/MinimalKafka/KafkaMetadataExtensions.cs (1)
7-52: Missing XML documentation content.All XML documentation comments are empty, which reduces code maintainability and developer experience.
Consider adding meaningful documentation:
/// <summary> -/// +/// Provides extension methods for extracting Kafka-related metadata from metadata collections. /// </summary> public static class KafkaMetadataExtensions { /// <summary> - /// + /// Extracts the consumer configuration from the metadata collection. /// </summary> /// <param name="metadata">The metadata collection to search.</param> - /// <returns></returns> + /// <returns>The consumer configuration.</returns> + /// <exception cref="InvalidOperationException">Thrown when no consumer configuration is found.</exception> public static ConsumerConfig ConsumerConfig(this IReadOnlyList<object> metadata)src/MinimalKafka/Internals/KafkaConsumer.cs (1)
93-93: Unusual field placement affects readability.The
_isClosedfield is declared after its usage in theClose()method, which is unusual and can affect code readability.Move the field declaration to the top with other fields:
internal class KafkaConsumer( KafkaConsumerConfig config, IKafkaProducer producer, KafkaTopicFormatter topicFormatter, IServiceProvider serviceProvider, ILogger<KafkaConsumer> logger) : IKafkaConsumer { private long _recordsConsumed; + private bool _isClosed; private readonly int _reportInterval = config.Metadata.ReportInterval(); private readonly bool _autoCommitEnabled = config.Metadata.AutoCommitEnabled(); private readonly IConsumer<byte[], byte[]> _consumer = CreateConsumer(config.Metadata); // ... rest of the code ... - private bool _isClosed;src/MinimalKafka/KafkaContext.cs (2)
9-13: Add missing XML documentation commentsMultiple methods and properties have empty XML documentation comments. Please add meaningful descriptions to improve code documentation and IntelliSense support.
Also applies to: 25-40, 41-49, 50-55, 59-80, 81-92
53-55: Headers property creates a new dictionary on every accessThe Headers property recreates the dictionary every time it's accessed, which could impact performance if called frequently. Consider caching the dictionary or using lazy initialization.
Apply this diff to optimize the Headers property:
+private IReadOnlyDictionary<string, string>? _cachedHeaders; + /// <summary> /// /// </summary> -public IReadOnlyDictionary<string, string> Headers => _message.Headers - .ToDictionary(x => x.Key, y => Encoding.UTF8.GetString(y.GetValueBytes())); +public IReadOnlyDictionary<string, string> Headers => _cachedHeaders ??= _message.Headers + .ToDictionary(x => x.Key, y => Encoding.UTF8.GetString(y.GetValueBytes()));src/MinimalKafka/KafkaConsumerConfigExtensions.cs (2)
13-17: Add missing XML documentation commentsThe class and several methods have empty XML documentation comments. Please add descriptions to improve API documentation.
Also applies to: 45-52, 74-80, 100-113
32-33: Update documentation to reflect actual implementationThe documentation mentions "Adds client ID metadata" but the implementation directly updates the ConsumerConfig instead of adding metadata.
Apply this diff to fix the documentation:
/// <summary> -/// Adds client ID metadata to the builder. +/// Sets the client ID in the consumer configuration. /// </summary>
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (14)
examples/Examples/Aggregate/TestAggregate.cs(1 hunks)examples/Examples/Program.cs(2 hunks)src/MinimalKafka/Builders/KafkaConsumerBuilder.cs(1 hunks)src/MinimalKafka/Internals/KafkaConsumer.cs(1 hunks)src/MinimalKafka/Internals/KafkaContextProducer.cs(1 hunks)src/MinimalKafka/KafkaConsumerConfigExtensions.cs(2 hunks)src/MinimalKafka/KafkaContext.cs(1 hunks)src/MinimalKafka/KafkaMetadataExtensions.cs(1 hunks)src/MinimalKafka/Metadata/IReportIntervalMetadata.cs(0 hunks)src/MinimalKafka/Metadata/Internals/ReportIntervalMetadata.cs(1 hunks)src/MinimalKafka/Stream/Internals/StreamBuilder.cs(2 hunks)test/MinimalKafka.Tests/DelegateFactoryTests.cs(3 hunks)test/MinimalKafka.Tests/KafkaContextTests.cs(1 hunks)test/MinimalKafka.Tests/Stream/StreamBuilderTests.cs(1 hunks)
💤 Files with no reviewable changes (1)
- src/MinimalKafka/Metadata/IReportIntervalMetadata.cs
🚧 Files skipped from review as they are similar to previous changes (5)
- src/MinimalKafka/Builders/KafkaConsumerBuilder.cs
- src/MinimalKafka/Stream/Internals/StreamBuilder.cs
- test/MinimalKafka.Tests/Stream/StreamBuilderTests.cs
- examples/Examples/Program.cs
- src/MinimalKafka/Internals/KafkaContextProducer.cs
🧰 Additional context used
🧬 Code Graph Analysis (2)
src/MinimalKafka/Metadata/Internals/ReportIntervalMetadata.cs (2)
src/MinimalKafka/Metadata/Internals/TopicFormatterMetadataAttribute.cs (1)
AttributeUsage(3-7)src/MinimalKafka/KafkaMetadataExtensions.cs (1)
ReportInterval(42-43)
src/MinimalKafka/KafkaMetadataExtensions.cs (1)
src/MinimalKafka/Metadata/Internals/ConsumerHandlerMetadata.cs (1)
ConsumerHandlerMetadata(4-12)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Analyze
🔇 Additional comments (14)
examples/Examples/Aggregate/TestAggregate.cs (6)
3-11: LGTM - Clean command implementationThe
TestCommandsclass correctly implements theIAggregateCommands<Guid>interface with appropriate properties and initialization.
13-13: LGTM - Simple record definitionThe
SetCounterrecord is well-defined and appropriate for its purpose.
22-23: LGTM - Clean factory methodThe
Createmethod correctly initializes a new aggregate instance with version 0.
44-55: LGTM - Correct increment logic with validationThe
Incrementmethod properly validates the upper bound and returns appropriate results.
57-68: LGTM - Correct decrement logic with validationThe
Decrementmethod properly validates the lower bound and returns appropriate results.
70-86: LGTM - Correct SetCounter implementation with validationThe
SetCountermethod properly validates the range (0-100) and returns appropriate results. The validation logic is consistent with the other methods.test/MinimalKafka.Tests/KafkaContextTests.cs (3)
13-14: Test data preparation looks correct.The use of UTF8 encoding for converting string test data to byte arrays is appropriate for the new byte array-based Kafka context implementation.
19-27: Factory method usage aligns with new API design.The test correctly uses the new
KafkaConsumerConfig.CreateandKafkaContext.Createfactory methods, which properly encapsulate the configuration and context creation logic.
31-32: Correct byte array comparison method.Using
SequenceEqualfor byte array comparison is the appropriate approach, as direct equality comparison would only check reference equality for arrays.src/MinimalKafka/Metadata/Internals/ReportIntervalMetadata.cs (1)
4-8: Clean attribute implementation following best practices.The conversion from a regular class to an attribute-based metadata implementation is well-executed:
- Proper
AttributeUsagerestriction to methods- Clean constructor parameter injection
- Simple property implementation returning the constructor parameter
- Appropriate inheritance from both
AttributeandIReportIntervalMetadataThis design aligns with the broader refactoring toward attribute-based metadata configuration.
src/MinimalKafka/Internals/KafkaConsumer.cs (2)
8-21: Well-designed configuration class.The
KafkaConsumerConfigclass provides a clean, immutable configuration structure with proper required properties and a factory method. The use of collection expressions and required properties follows modern C# practices.
95-104: Proper conditional offset management.The commit logic correctly respects the auto-commit configuration, only manually committing when auto-commit is disabled. The use of
StoreOffsetfollowed byCommitis the correct pattern for manual offset management.src/MinimalKafka/KafkaConsumerConfigExtensions.cs (2)
114-129: Well-structured internal configuration update methodsThe UpdateConsumerConfig and UpdateProducerConfig methods properly ensure a ConfigMetadata instance exists before applying updates. Good defensive programming practice.
Also applies to: 130-145
217-238: Appropriate JSON serializer configuration with sensible defaultsThe WithJsonSerializers method properly configures JSON serialization with Web defaults and allows customization through the options parameter. Good design choice for flexibility.
| nameof(Create) => Create(command), | ||
| nameof(Increment) => state.Increment(), | ||
| nameof(Decrement) => state.Decrement(), | ||
| nameof(SetCounter) => state.SetCounter(command.SetCounter!), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add null check for SetCounter command
The null-forgiving operator ! is used on command.SetCounter, but the SetCounter property is nullable. This could cause a NullReferenceException if the property is null when the SetCounter command is processed.
- nameof(SetCounter) => state.SetCounter(command.SetCounter!),
+ nameof(SetCounter) => command.SetCounter is not null
+ ? state.SetCounter(command.SetCounter)
+ : Result.Failed(state, "SetCounter command payload is required"),Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In examples/Examples/Aggregate/TestAggregate.cs at line 32, the code uses the
null-forgiving operator on the nullable property command.SetCounter, which risks
a NullReferenceException if it is null. Modify the code to add an explicit null
check for command.SetCounter before calling state.SetCounter, and handle the
null case appropriately to avoid exceptions.
| var consumeResult = new ConsumeResult<byte[], byte[]> | ||
| { | ||
| Message = new Message<byte[], byte[]> | ||
| { | ||
| Key = key, | ||
| Value = value, | ||
| Headers = headers | ||
| } | ||
| }; | ||
|
|
||
| var config = KafkaConsumerConfig.Create(KafkaConsumerKey.Random("topic"), [], []); | ||
| var context = KafkaContext.Create(config, new Message<byte[], byte[]>(), serviceProvider); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test logic flaw: prepared ConsumeResult is not used.
The test creates a ConsumeResult with serialized JSON data (lines 174-182) but then creates a KafkaContext with an empty message (line 185), making the test setup ineffective.
Use the prepared ConsumeResult data in the context creation:
- var config = KafkaConsumerConfig.Create(KafkaConsumerKey.Random("topic"), [], []);
- var context = KafkaContext.Create(config, new Message<byte[], byte[]>(), serviceProvider);
+ var config = KafkaConsumerConfig.Create(KafkaConsumerKey.Random("topic"), [], []);
+ var context = KafkaContext.Create(config, consumeResult.Message, serviceProvider);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| var consumeResult = new ConsumeResult<byte[], byte[]> | |
| { | |
| Message = new Message<byte[], byte[]> | |
| { | |
| Key = key, | |
| Value = value, | |
| Headers = headers | |
| } | |
| }; | |
| var config = KafkaConsumerConfig.Create(KafkaConsumerKey.Random("topic"), [], []); | |
| var context = KafkaContext.Create(config, new Message<byte[], byte[]>(), serviceProvider); | |
| var consumeResult = new ConsumeResult<byte[], byte[]> | |
| { | |
| Message = new Message<byte[], byte[]> | |
| { | |
| Key = key, | |
| Value = value, | |
| Headers = headers | |
| } | |
| }; | |
| var config = KafkaConsumerConfig.Create(KafkaConsumerKey.Random("topic"), [], []); | |
| var context = KafkaContext.Create(config, consumeResult.Message, serviceProvider); |
🤖 Prompt for AI Agents
In test/MinimalKafka.Tests/DelegateFactoryTests.cs around lines 174 to 185, the
test creates a ConsumeResult with specific message data but then initializes
KafkaContext with an empty message, ignoring the prepared ConsumeResult. To fix
this, pass the Message property from the prepared ConsumeResult instance to
KafkaContext.Create instead of creating a new empty message, ensuring the test
uses the intended data setup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (3)
src/MinimalKafka/KafkaContext.cs (2)
9-11: Fix malformed XML documentation commentThe XML comment has an extra forward slash, making it malformed documentation.
Apply this diff to fix the documentation:
/// <summary> -/// +/// Represents a Kafka message context containing message data and metadata. /// </summary>
76-80: Consolidate and document EmptyServiceProvider null-object fallbackThis is a duplicate class definition that was previously identified in the past review comments. The EmptyServiceProvider class appears in multiple locations and should be consolidated.
Please consolidate this into a single
internal sealed class EmptyServiceProviderin a shared namespace and add proper documentation explaining its purpose as a null-object pattern fallback.src/MinimalKafka/Builders/KafkaDataSource.cs (1)
99-110: Add null checks to prevent runtime exceptions.Using
First()without verification could throwInvalidOperationExceptionif noIConfigMetadataexists in the metadata collection.internal static class MetadataExtensions { public static string GetClientId(this IKafkaBuilder builder) { - return builder.MetaData.OfType<IConfigMetadata>().First().ConsumerConfig.ClientId; + var configMetadata = builder.MetaData.OfType<IConfigMetadata>().FirstOrDefault() + ?? throw new InvalidOperationException("No IConfigMetadata found in builder metadata."); + return configMetadata.ConsumerConfig.ClientId; } public static string GetGroupId(this IKafkaBuilder builder) { - return builder.MetaData.OfType<IConfigMetadata>().First().ConsumerConfig.GroupId; + var configMetadata = builder.MetaData.OfType<IConfigMetadata>().FirstOrDefault() + ?? throw new InvalidOperationException("No IConfigMetadata found in builder metadata."); + return configMetadata.ConsumerConfig.GroupId; } }
🧹 Nitpick comments (3)
src/MinimalKafka/KafkaContext.cs (3)
15-15: Consider making the message field readonly for immutabilityThe
_messagefield should be marked asreadonlyto prevent accidental reassignment and improve immutability.- private readonly Message<byte[], byte[]> _message; + private readonly Message<byte[], byte[]> _message;Wait, it's already readonly. This is correct as implemented.
31-34: Fix typo in XML documentation commentThere's a typo in the XML documentation comment.
/// <summary> -/// Thhe service provider. +/// The service provider. /// </summary>
53-54: Consider caching the Headers dictionary for performanceThe Headers property creates a new dictionary on every access, which could impact performance if called frequently. Consider caching the result or using a lazy initialization pattern.
+ private IReadOnlyDictionary<string, string>? _cachedHeaders; + /// <summary> /// The kafka message headers. /// </summary> - public IReadOnlyDictionary<string, string> Headers => _message.Headers - .ToDictionary(x => x.Key, y => Encoding.UTF8.GetString(y.GetValueBytes())); + public IReadOnlyDictionary<string, string> Headers => _cachedHeaders ??= _message.Headers + .ToDictionary(x => x.Key, y => Encoding.UTF8.GetString(y.GetValueBytes()));
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (13)
examples/Examples/Aggregate/AggregateExtensions.cs(1 hunks)examples/Examples/Aggregate/IAggregate.cs(1 hunks)examples/Examples/Aggregate/TestAggregate.cs(1 hunks)src/MinimalKafka/Builders/KafkaDataSource.cs(1 hunks)src/MinimalKafka/IKafkaConsumerBuilder.cs(1 hunks)src/MinimalKafka/IKafkaDataSource.cs(2 hunks)src/MinimalKafka/Internals/KafkaService.cs(2 hunks)src/MinimalKafka/KafkaContext.cs(1 hunks)src/MinimalKafka/KafkaMetadataExtensions.cs(1 hunks)src/MinimalKafka/Metadata/IConfigMetadata.cs(1 hunks)src/MinimalKafka/Metadata/Internals/TopicFormatterMetadataAttribute.cs(0 hunks)src/MinimalKafka/Serializers/ISerializerFactory.cs(1 hunks)test/MinimalKafka.Tests/KafkaServiceTests.cs(2 hunks)
💤 Files with no reviewable changes (1)
- src/MinimalKafka/Metadata/Internals/TopicFormatterMetadataAttribute.cs
🚧 Files skipped from review as they are similar to previous changes (9)
- test/MinimalKafka.Tests/KafkaServiceTests.cs
- src/MinimalKafka/Internals/KafkaService.cs
- src/MinimalKafka/Serializers/ISerializerFactory.cs
- examples/Examples/Aggregate/IAggregate.cs
- src/MinimalKafka/IKafkaConsumerBuilder.cs
- src/MinimalKafka/IKafkaDataSource.cs
- src/MinimalKafka/KafkaMetadataExtensions.cs
- examples/Examples/Aggregate/TestAggregate.cs
- examples/Examples/Aggregate/AggregateExtensions.cs
🧰 Additional context used
🧬 Code Graph Analysis (1)
src/MinimalKafka/Metadata/IConfigMetadata.cs (1)
src/MinimalKafka/KafkaMetadataExtensions.cs (2)
ConsumerConfig(17-19)ProducerConfig(26-28)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Analyze
🔇 Additional comments (3)
src/MinimalKafka/KafkaContext.cs (2)
17-23: Constructor implementation looks correctThe constructor properly initializes all required fields and follows good practices by making the constructor private and providing a static factory method.
59-65: Verify thread-safety requirements for_messages
I wasn’t able to find any evidence thatKafkaContextinstances are shared or invoked concurrently by multiple threads. Please confirm whether a singleKafkaContextcan be accessed in parallel (e.g., via DI in a multithreaded consumer or producer loop). If it is, you should make the backing collection thread-safe—either by switching toConcurrentQueue<KafkaMessage>/ConcurrentBag<KafkaMessage>or by guarding_messages.Add(message)andAsReadOnly()with a lock.• Please verify usage patterns of
KafkaContextacross your codebase.
• If concurrent use is required, update lines 59–65 insrc/MinimalKafka/KafkaContext.csaccordingly.src/MinimalKafka/Metadata/IConfigMetadata.cs (1)
1-21: LGTM! Clean interface design.The interface provides a clear contract for accessing Kafka configuration metadata, with appropriate XML documentation and read-only properties.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (2)
src/MinimalKafka/Metadata/Internals/ConfigMetadataAttribute.cs (2)
8-10: Consider making properties init-only to prevent post-construction modifications.While the current implementation uses private setters, making the properties
init-only would better express the immutable intent and align with modern C# patterns.- public ConsumerConfig ConsumerConfig { get; private set; } = new ConsumerConfig(config); - public ProducerConfig ProducerConfig { get; private set; } = new ProducerConfig(config); + public ConsumerConfig ConsumerConfig { get; init; } = new ConsumerConfig(config); + public ProducerConfig ProducerConfig { get; init; } = new ProducerConfig(config);
12-16: Complete the XML documentation.The XML documentation summary is empty and the parameter description is missing.
/// <summary> - /// + /// Creates a ConfigMetadataAttribute instance from the provided configuration. /// </summary> - /// <param name="configuration"></param> - /// <returns></returns> + /// <param name="configuration">The configuration object to extract settings from.</param> + /// <returns>A new ConfigMetadataAttribute instance with the extracted configuration values.</returns>
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
src/MinimalKafka/KafkaExtensions.cs(2 hunks)src/MinimalKafka/KafkaMetadataExtensions.cs(1 hunks)src/MinimalKafka/Metadata/Internals/ConfigMetadataAttribute.cs(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- src/MinimalKafka/KafkaMetadataExtensions.cs
- src/MinimalKafka/KafkaExtensions.cs
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Analyze
🔇 Additional comments (2)
src/MinimalKafka/Metadata/Internals/ConfigMetadataAttribute.cs (2)
6-7: LGTM: Proper attribute configuration.The
AttributeUsageconfiguration correctly restricts the attribute to methods only and prevents inheritance and multiple instances, which aligns with its intended use as configuration metadata.
17-24: Configuration enumeration is correctly scoped to the Kafka section
AsEnumerable(true) operates on the passed IConfiguration instance—which in our KafkaConsumerConfigExtensions is alwaysGetSection("kafka")—so it only collects keys under that section (including nested Kafka settings) and cannot pull in unrelated app settings. No changes required.
| var value = configuration.AsEnumerable(true) | ||
| .Select(x => new KeyValuePair<string, string>(x.Key, x.Value ?? string.Empty)) | ||
| .ToDictionary(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Handle potential null values more defensively.
The current implementation uses x.Value ?? string.Empty but doesn't handle the case where x.Key might be null, which could cause issues downstream.
var value = configuration.AsEnumerable(true)
- .Select(x => new KeyValuePair<string, string>(x.Key, x.Value ?? string.Empty))
+ .Where(x => !string.IsNullOrEmpty(x.Key))
+ .Select(x => new KeyValuePair<string, string>(x.Key, x.Value ?? string.Empty))
.ToDictionary();📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| var value = configuration.AsEnumerable(true) | |
| .Select(x => new KeyValuePair<string, string>(x.Key, x.Value ?? string.Empty)) | |
| .ToDictionary(); | |
| var value = configuration.AsEnumerable(true) | |
| .Where(x => !string.IsNullOrEmpty(x.Key)) | |
| .Select(x => new KeyValuePair<string, string>(x.Key, x.Value ?? string.Empty)) | |
| .ToDictionary(); |
🤖 Prompt for AI Agents
In src/MinimalKafka/Metadata/Internals/ConfigMetadataAttribute.cs around lines
19 to 21, the code currently handles null values for x.Value but does not check
if x.Key is null, which can cause exceptions. Update the code to filter out or
handle entries where x.Key is null before creating the dictionary to prevent
potential runtime errors.
Introduces an `IAggregate` interface and related extensions to support aggregate command processing. This commit adds a new `MinimalKafka.Aggregates` project, which includes: - Definitions for `IAggregate`, `IAggregateCommands`, and `Result` - Extension methods for mapping aggregate command streams to aggregate state streams. - Command handling, aggregate initialization, version checking, state updates, and error publishing. The example project is also updated to demonstrate the usage of the new aggregate feature with a simple counter aggregate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (2)
src/MinimalKafka.Aggregates/CommandResult.cs (1)
3-13: Consider adding null validation in the factory method.The
Createmethod doesn't validate input parameters. Consider adding null checks to prevent potential NullReferenceException.public static CommandResult<T, TCmd> Create<T, TCmd>(Result<T> result, TCmd command) +{ + ArgumentNullException.ThrowIfNull(result); + ArgumentNullException.ThrowIfNull(command); => new() { Command = command, State = result.State, IsSuccess = result.IsSuccess, ErrorMessage = result.ErrorMessage, }; +}src/MinimalKafka.Aggregates/AggregateExtensions.cs (1)
8-11: Empty XML documentation comment.The summary XML documentation is empty. Either provide meaningful documentation or remove the empty tag.
/// <summary> -/// +/// Provides extension methods for mapping aggregate command streams to aggregate state streams. /// </summary>
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (12)
MinimalKafka.sln(3 hunks)examples/Examples/Aggregate/Test.cs(1 hunks)examples/Examples/Aggregate/TestCommands.cs(1 hunks)examples/Examples/Examples.csproj(1 hunks)examples/Examples/Program.cs(2 hunks)src/MinimalKafka.Aggregates/AggregateExtensions.cs(1 hunks)src/MinimalKafka.Aggregates/CommandResult.cs(1 hunks)src/MinimalKafka.Aggregates/IAggregate.cs(1 hunks)src/MinimalKafka.Aggregates/IAggregateCommands.cs(1 hunks)src/MinimalKafka.Aggregates/MinimalKafka.Aggregates.csproj(1 hunks)src/MinimalKafka.Aggregates/Result.cs(1 hunks)src/MinimalKafka.Aggregates/packages.lock.json(1 hunks)
✅ Files skipped from review due to trivial changes (5)
- src/MinimalKafka.Aggregates/MinimalKafka.Aggregates.csproj
- MinimalKafka.sln
- examples/Examples/Aggregate/TestCommands.cs
- src/MinimalKafka.Aggregates/packages.lock.json
- src/MinimalKafka.Aggregates/Result.cs
🚧 Files skipped from review as they are similar to previous changes (2)
- examples/Examples/Examples.csproj
- examples/Examples/Program.cs
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Analyze
🔇 Additional comments (6)
src/MinimalKafka.Aggregates/IAggregateCommands.cs (1)
1-27: LGTM! Well-designed interface for aggregate commands.The interface is properly designed with:
- Appropriate use of covariance (
out TKey) since the type parameter only appears in output positions- Read-only properties ensuring immutability
- Comprehensive XML documentation
src/MinimalKafka.Aggregates/CommandResult.cs (1)
15-19: Good use of required init property.The
requiredmodifier ensures the Command property is always initialized, preventing potential null reference issues.src/MinimalKafka.Aggregates/IAggregate.cs (1)
32-42: Note: Abstract static methods require C# 11 or later.The use of
abstract staticmethods is a modern C# feature that provides clean aggregate factory pattern implementation. Ensure the project targets C# 11 or later.examples/Examples/Aggregate/Test.cs (1)
58-104: Well-implemented counter operations with proper validation.The increment, decrement, and set operations properly validate bounds and return clear error messages. Good use of immutable updates with the record
withsyntax.src/MinimalKafka.Aggregates/AggregateExtensions.cs (2)
78-102: Excellent error handling and version control implementation.The code properly handles:
- Version conflicts with clear error messages
- Command application failures
- Successful state updates
- Routing errors to a dedicated error topic for monitoring and debugging
65-104: Ensure exactly-once state initialization with Kafka transactionsThe current logic at line 76 may allow multiple consumers to concurrently initialize and produce the same new aggregate state, leading to duplicate “create” messages.
• The
WithTransactionalIdextension is present in KafkaExtensions.cs (line 33) and KafkaConsumerConfigExtensions.cs, but merely configuring a transactional ID doesn’t automatically wrap your produce calls in a transaction.
• To guarantee exactly-once processing you should:
- Begin a transaction before handling the command (
producer.BeginTransaction()).- Produce state updates and any error messages within that transaction.
- Send consumer offsets to the transaction (
producer.SendOffsetsToTransaction(...)).- Commit the transaction (
producer.CommitTransaction()).- Alternatively, enable idempotent producers (
EnableIdempotence = true) to avoid duplicate messages.Please verify that your consumer/producer pipeline in this aggregate mapping is actually using transactions or idempotence under the hood to prevent duplicate state initialization.
Prevents exceptions when the SetCounter command data is null. Returns a failed result with an appropriate message in this case. Improves ConfigMetadataAttribute to use init properties and adds documentation.
To allow consistend data between handlers of the same topic or multiple diffrent joins with topics
The data need to be consistend.
Summary by CodeRabbit
New Features
Improvements
Bug Fixes
Refactor
Chores