diff --git a/src/MinimalKafka/Extension/KafkaConsumerConfigMetadataExtensions.cs b/src/MinimalKafka/Extension/KafkaConsumerConfigMetadataExtensions.cs index 3616f14..490b172 100644 --- a/src/MinimalKafka/Extension/KafkaConsumerConfigMetadataExtensions.cs +++ b/src/MinimalKafka/Extension/KafkaConsumerConfigMetadataExtensions.cs @@ -62,4 +62,10 @@ public static TBuilder WithClientIdFormatter(this TBuilder builder, Fu return builder; } + public static TBuilder WithAutoCommit(this TBuilder builder, bool enabled = true) + where TBuilder : IKafkaConventionBuilder + { + builder.WithSingle(new AutoCommitMetaDataAttribute(enabled)); + return builder; + } } diff --git a/src/MinimalKafka/Helpers/Logging.cs b/src/MinimalKafka/Helpers/Logging.cs index c297b1b..6db2c04 100644 --- a/src/MinimalKafka/Helpers/Logging.cs +++ b/src/MinimalKafka/Helpers/Logging.cs @@ -14,8 +14,8 @@ internal static partial class Logging [LoggerMessage( EventId = 1, - Level = LogLevel.Critical, - Message = "Consumer with GroupId: '{GroupId}' and ClientId: '{ClientId}', Operation cancelled return Empty Context." + Level = LogLevel.Information, + Message = "Consumer with GroupId: '{GroupId}' and ClientId: '{ClientId}' was closed, Operation cancelled." )] public static partial void OperatonCanceled(this ILogger logger, string groupId, string clientId); @@ -67,4 +67,11 @@ internal static partial class Logging Message = "Consumer with GroupId: '{GroupId}' and ClientId: '{ClientId}' already closed." )] public static partial void ConsumerAlreadyClosed(this ILogger logger, string groupId, string clientId); + + [LoggerMessage( + EventId = 9, + Level = LogLevel.Information, + Message = "Consumer with GroupId: '{GroupId}' and ClientId: '{ClientId}' committing offset." + )] + public static partial void Committing(this ILogger logger, string groupId, string clientId); } diff --git a/src/MinimalKafka/KafkaConsumer.cs b/src/MinimalKafka/KafkaConsumer.cs index 4176c05..19b74b2 100644 --- a/src/MinimalKafka/KafkaConsumer.cs +++ b/src/MinimalKafka/KafkaConsumer.cs @@ -10,7 +10,7 @@ public abstract class KafkaConsumer { public abstract ILogger Logger { get; } public abstract void Subscribe(); - public abstract KafkaContext Consume(CancellationToken cancellationToken); + public abstract Task Consume(KafkaDelegate kafkaDelegate, CancellationToken cancellationToken); public abstract void Close(); @@ -32,9 +32,9 @@ public override void Close() { } - public override KafkaContext Consume(CancellationToken cancellationToken) + public override Task Consume(KafkaDelegate kafkaDelegate, CancellationToken cancellationToken) { - return KafkaContext.Empty; + return Task.CompletedTask; } public override void Subscribe() @@ -60,11 +60,11 @@ public class KafkaConsumer(KafkaConsumerOptions options) : KafkaCo public override ILogger Logger => options.KafkaLogger; - public override KafkaContext Consume(CancellationToken cancellationToken) + public override async Task Consume(KafkaDelegate kafkaDelegate, CancellationToken cancellationToken) { try { - var scope = _serviceProvider.CreateScope(); + using var scope = _serviceProvider.CreateScope(); var result = _consumer.Consume(cancellationToken); if (++_recordsConsumed % _consumeReportInterval == 0) @@ -72,16 +72,29 @@ public override KafkaContext Consume(CancellationToken cancellationToken) Logger.RecordsConsumed(options.Metadata.GroupId(), options.Metadata.ClientId(), _recordsConsumed, result.Topic); } - return KafkaContext.Create(result, scope.ServiceProvider, options.Metadata); + var context = KafkaContext.Create(result, scope.ServiceProvider, options.Metadata); + + if (context is EmptyKafkaContext) + { + return; + } + + await kafkaDelegate.Invoke(context); + + if (options.Metadata.IsAutoCommitEnabled()) + { + return; + } + + Logger.Committing(options.Metadata.GroupId(), options.Metadata.ClientId()); + + _consumer.StoreOffset(result); + _consumer.Commit(); } catch (OperationCanceledException ex) when (ex.CancellationToken == cancellationToken) { Logger.OperatonCanceled(options.Metadata.GroupId(), options.Metadata.ClientId()); - - _consumer.Close(); - _consumer.Dispose(); - return KafkaContext.Empty; } } @@ -123,6 +136,9 @@ public static string GroupId(this IReadOnlyList metadata) private static T? GetMetaData(this IReadOnlyList metaData) => metaData.OfType().FirstOrDefault(); + + public static bool IsAutoCommitEnabled(this IReadOnlyList metaData) + => metaData.GetMetaData()?.Enabled ?? false; } diff --git a/src/MinimalKafka/KafkaExtensions.cs b/src/MinimalKafka/KafkaExtensions.cs index 6c7be0a..b1cdbb0 100644 --- a/src/MinimalKafka/KafkaExtensions.cs +++ b/src/MinimalKafka/KafkaExtensions.cs @@ -50,6 +50,7 @@ public static IServiceCollection AddMinimalKafka(this IServiceCollection service configBuilder.WithClientId(AppDomain.CurrentDomain.FriendlyName); configBuilder.WithGroupId(AppDomain.CurrentDomain.FriendlyName); + configBuilder.WithAutoCommit(false); configBuilder.WithKeyDeserializer(typeof(JsonTextSerializer<>)); configBuilder.WithValueDeserializer(typeof(JsonTextSerializer<>)); configBuilder.WithTopicFormatter(topic => topic); diff --git a/src/MinimalKafka/KafkaProcess.cs b/src/MinimalKafka/KafkaProcess.cs index 6a50022..1ebf7b1 100644 --- a/src/MinimalKafka/KafkaProcess.cs +++ b/src/MinimalKafka/KafkaProcess.cs @@ -37,17 +37,7 @@ public async Task Start(CancellationToken cancellationToken) { while (!cancellationToken.IsCancellationRequested) { - var context = _consumer.Consume(cancellationToken); - - if (context is EmptyKafkaContext) - { - - _consumer.Logger.EmptyContext(); - - continue; - } - - await _handler.Invoke(context); + await _consumer.Consume(_handler, cancellationToken); } } catch(Exception ex) @@ -58,9 +48,8 @@ public async Task Start(CancellationToken cancellationToken) finally { _consumer.Logger.DropOutOfConsumeLoop(); - _consumer.Close(); } - + } public async Task Stop() diff --git a/src/MinimalKafka/KafkaService.cs b/src/MinimalKafka/KafkaService.cs index 08ad32b..6edbb2c 100644 --- a/src/MinimalKafka/KafkaService.cs +++ b/src/MinimalKafka/KafkaService.cs @@ -9,15 +9,28 @@ public IEnumerable Processes private readonly List _runningTasks = []; - protected override async Task ExecuteAsync(CancellationToken stoppingToken) { + using var cts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken); foreach (var process in Processes) - { - var task = Task.Run(() => process.Start(stoppingToken), stoppingToken); + { + var task = Task.Run(async () => + { + try + { + await process.Start(cts.Token); + } + catch (KafkaProcesException) + { + await cts.CancelAsync(); + throw; + } + + } + , cts.Token); _runningTasks.Add(task); - } - + } + await Task.WhenAll(_runningTasks); } diff --git a/src/MinimalKafka/Metadata/AutoCommitMetaData.cs b/src/MinimalKafka/Metadata/AutoCommitMetaData.cs new file mode 100644 index 0000000..04321b8 --- /dev/null +++ b/src/MinimalKafka/Metadata/AutoCommitMetaData.cs @@ -0,0 +1,19 @@ +using Confluent.Kafka; + +namespace MinimalKafka.Metadata; +public interface IAutoCommitMetaData : IConsumerConfigMetadata +{ + bool Enabled { get; } +} + +[AttributeUsage(AttributeTargets.Method, Inherited = false, AllowMultiple = false)] +internal class AutoCommitMetaDataAttribute(bool enabled) : Attribute, IAutoCommitMetaData +{ + public bool Enabled { get; } = enabled; + + public void Set(ClientConfig config) + { + config.Set("enable.auto.commit", Enabled ? "true" : "false"); + config.Set("enable.auto.offset.store", Enabled ? "true" : "false"); + } +} diff --git a/test/MinimalKafka.Tests/KafkaProcessTests.cs b/test/MinimalKafka.Tests/KafkaProcessTests.cs index 055f7ec..50a26ac 100644 --- a/test/MinimalKafka.Tests/KafkaProcessTests.cs +++ b/test/MinimalKafka.Tests/KafkaProcessTests.cs @@ -50,18 +50,8 @@ public async Task KafkaProcess_Start_ShouldInvokeSubscribeMethodOnce() public async Task KafkaProcess_Start_ShouldInvokeHandlerWithValidContext() { // Arrange - var validContext = new TestKafkaContext(); - var delay = false; - _consumer.Consume(Arg.Any()).Returns(x => - { - if (delay) - { - Task.Delay(200).GetAwaiter().GetResult(); - return KafkaContext.Empty; - } - delay = true; - return validContext; - }) ; + _consumer.Consume(Arg.Any(), Arg.Any()) + .Returns(callInfo => callInfo.Arg().Invoke(new TestKafkaContext())); var task = Task.Run(() => _kafkaProcess.Start(_cancellationTokenSource.Token)); @@ -70,59 +60,20 @@ public async Task KafkaProcess_Start_ShouldInvokeHandlerWithValidContext() await Task.Delay(100); // Assert - await _handler.Received(1).Invoke(validContext); + await _handler.ReceivedWithAnyArgs().Invoke(Arg.Any()); } [Fact] - public async Task KafkaProcess_Start_ShouldLogErrorOnEmptyContext() + public async Task KafkaProcess_Start_ShouldLogErrorOnException() { // Arrange - var delay = false; - var logger = Substitute.For(); _consumer.Logger.Returns(logger); - _consumer.Consume(Arg.Any()).Returns(x => - { - if (delay) - { - Task.Delay(200).GetAwaiter().GetResult(); - return KafkaContext.Empty; - } - delay = true; - return KafkaContext.Empty; - }); - var process = KafkaProcess.Create(new() - { - Consumer = _consumer, - Delegate = (c) => - { - return Task.CompletedTask; - } - }); - - var task = Task.Run(() => process.Start(_cancellationTokenSource.Token)); - - // Act - _cancellationTokenSource.CancelAfter(100); // Stop the task after a short delay - await Task.Delay(100); - - // Assert - logger.Received(1).EmptyContext(); - await _handler.DidNotReceive().Invoke(Arg.Any()); - } - - [Fact] - public async Task KafkaProcess_Start_ShouldLogErrorOnException() - { - // Arrange - var logger = Substitute.For(); - - _consumer.Logger.Returns(logger); - _consumer.Consume(Arg.Any()).Returns(x => + _consumer.Consume(Arg.Any(), Arg.Any()).Returns(x => { throw new NotImplementedException(); }); @@ -130,10 +81,7 @@ public async Task KafkaProcess_Start_ShouldLogErrorOnException() var process = KafkaProcess.Create(new() { Consumer = _consumer, - Delegate = (c) => - { - throw new NotImplementedException(); - } + Delegate = (c) => Task.CompletedTask }); // Act @@ -144,7 +92,6 @@ public async Task KafkaProcess_Start_ShouldLogErrorOnException() // Assert logger.Received(1).UnknownProcessException(new NotImplementedException().Message); - await _handler.DidNotReceive().Invoke(Arg.Any()); } [Fact] diff --git a/test/MinimalKafka.Tests/RegistrationTests.cs b/test/MinimalKafka.Tests/RegistrationTests.cs index 9496296..3fbc07e 100644 --- a/test/MinimalKafka.Tests/RegistrationTests.cs +++ b/test/MinimalKafka.Tests/RegistrationTests.cs @@ -100,7 +100,7 @@ static void config(IAddKafkaBuilder builder) => } [Fact] - public void AddMinimalKafka_Should_Set_ClientId_And_GroupId_To_Default() + public void AddMinimalKafka_Should_Set_Default_Config() { var services = new ServiceCollection(); @@ -117,15 +117,20 @@ static void config(IAddKafkaBuilder builder) => .Should() .ContainSingle(x => x is IClientIdMetadata) .And - .ContainSingle(x => x is IGroupIdMetadata); + .ContainSingle(x => x is IGroupIdMetadata) + .And + .ContainSingle(x => x is IAutoCommitMetaData); // Verify the actual values var clientId = kafkaBuilder.MetaData .OfType().Single().ClientId; var groupId = kafkaBuilder.MetaData .OfType().Single().GroupId; + var autoCommit = kafkaBuilder.MetaData + .OfType().Single().Enabled; clientId.Should().Be(AppDomain.CurrentDomain.FriendlyName); groupId.Should().Be(AppDomain.CurrentDomain.FriendlyName); + autoCommit.Should().BeFalse(); } } \ No newline at end of file