diff --git a/examples/Examples/Program.cs b/examples/Examples/Program.cs index 52f5798..126701b 100644 --- a/examples/Examples/Program.cs +++ b/examples/Examples/Program.cs @@ -19,46 +19,12 @@ var app = builder.Build(); -app.MapStream("commands") - .SplitInto(branches => - { - branches.Branch((_, v) => v.Name == "cmd1", (_, _, _) => Task.CompletedTask); - branches.Branch((_, v) => v.Name == "cmd2", (_, _, _) => Task.CompletedTask); - branches.DefaultBranch((_,_,_) => Task.CompletedTask); - }); +app.MapTopic("test", (KafkaContext context) => { + + Console.WriteLine("Test topic received message: " + context.Value); -app.MapStream("left") - .Join("right").On((l, r) => l.RightObjectId == r.Id) - .SplitInto(branches => - { - branches.Branch((_, _) => true, (_, _, _) => Task.CompletedTask); - }); + throw new Exception("Test exception"); -app.MapStream("left") - .Join("right").On((l, r) => l.RightObjectId == r.Id) - .Into(async (c, value) => - { - var (left, right) = value; - var result = new ResultObject(left.Id, right); - Console.WriteLine($"multi into - {left.Id} - {result}"); - await c.ProduceAsync("result", left.Id, new ResultObject(left.Id, right)); - }) - .WithGroupId($"multi-{Guid.NewGuid()}") - .WithClientId("multi"); - -app.MapStream("left") - .Join("right") - .OnKey() - .Into("string"); - - -app.MapStream("left") - .Into((_, k, v) => - { - Console.WriteLine($"single Into - {k} - {v}"); - return Task.CompletedTask; - }) - .WithGroupId($"single-{Guid.NewGuid()}") - .WithClientId("single"); +}); await app.RunAsync(); diff --git a/src/MinimalKafka/Helpers/Logging.cs b/src/MinimalKafka/Helpers/Logging.cs index 5402909..c297b1b 100644 --- a/src/MinimalKafka/Helpers/Logging.cs +++ b/src/MinimalKafka/Helpers/Logging.cs @@ -60,4 +60,11 @@ internal static partial class Logging Message = "Consumer returned an Exception!. {message}" )] public static partial void UnknownProcessException(this ILogger logger, string message); + + [LoggerMessage( + EventId = 8, + Level = LogLevel.Information, + Message = "Consumer with GroupId: '{GroupId}' and ClientId: '{ClientId}' already closed." + )] + public static partial void ConsumerAlreadyClosed(this ILogger logger, string groupId, string clientId); } diff --git a/src/MinimalKafka/KafkaConsumer.cs b/src/MinimalKafka/KafkaConsumer.cs index d3df4a1..4176c05 100644 --- a/src/MinimalKafka/KafkaConsumer.cs +++ b/src/MinimalKafka/KafkaConsumer.cs @@ -1,7 +1,6 @@ using Confluent.Kafka; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; using MinimalKafka.Helpers; using MinimalKafka.Metadata; @@ -53,6 +52,7 @@ public class KafkaConsumer(KafkaConsumerOptions options) : KafkaCo private readonly IConsumer _consumer = new KafkaConsumerBuilder(options.Metadata, options.ServiceProvider).Build(); + private bool _isClosed = false; private long _recordsConsumed; private readonly int _consumeReportInterval = options.Metadata.OfType().FirstOrDefault()?.ReportInterval @@ -87,8 +87,14 @@ public override KafkaContext Consume(CancellationToken cancellationToken) public override void Close() { + if(_isClosed) + { + Logger.ConsumerAlreadyClosed(options.Metadata.GroupId(), options.Metadata.ClientId()); + return; + } + + _isClosed = true; Logger.ConsumerClosed(options.Metadata.GroupId(), options.Metadata.ClientId()); - _consumer.Close(); _consumer.Dispose(); } diff --git a/src/MinimalKafka/KafkaDelegateFactory.cs b/src/MinimalKafka/KafkaDelegateFactory.cs index cfc397c..6a43a9a 100644 --- a/src/MinimalKafka/KafkaDelegateFactory.cs +++ b/src/MinimalKafka/KafkaDelegateFactory.cs @@ -84,7 +84,7 @@ private static MethodCallExpression CreateMethodCall(MethodInfo methodInfo, Expr factoryContext.ArgumentTypes = new Type[parameters.Length]; factoryContext.BoxedArgs = new Expression[parameters.Length]; - factoryContext.Parameters = new List(parameters); + factoryContext.Parameters = [.. parameters]; for (var i = 0; i < parameters.Length; i++) { diff --git a/src/MinimalKafka/KafkaProcess.cs b/src/MinimalKafka/KafkaProcess.cs index 2c5534a..f782c03 100644 --- a/src/MinimalKafka/KafkaProcess.cs +++ b/src/MinimalKafka/KafkaProcess.cs @@ -1,11 +1,10 @@ -using Microsoft.Extensions.Logging; -using MinimalKafka.Helpers; +using MinimalKafka.Helpers; namespace MinimalKafka; public interface IKafkaProcess { - void Start(CancellationToken cancellationToken); - void Stop(); + Task Start(CancellationToken cancellationToken); + Task Stop(); } public class KafkaProcessOptions @@ -30,7 +29,7 @@ KafkaDelegate handler public static KafkaProcess Create(KafkaProcessOptions options) => new(options.Consumer, options.Delegate); - public void Start(CancellationToken cancellationToken) + public async Task Start(CancellationToken cancellationToken) { _consumer.Subscribe(); @@ -48,7 +47,7 @@ public void Start(CancellationToken cancellationToken) continue; } - _handler.Invoke(context); + await _handler.Invoke(context); } } catch(Exception ex) @@ -64,9 +63,10 @@ public void Start(CancellationToken cancellationToken) } - public void Stop() + public async Task Stop() { _consumer.Close(); + await Task.CompletedTask; } } diff --git a/src/MinimalKafka/KafkaService.cs b/src/MinimalKafka/KafkaService.cs index 75224d7..08ad32b 100644 --- a/src/MinimalKafka/KafkaService.cs +++ b/src/MinimalKafka/KafkaService.cs @@ -21,12 +21,13 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) await Task.WhenAll(_runningTasks); } - public override Task StopAsync(CancellationToken cancellationToken) + public override async Task StopAsync(CancellationToken cancellationToken) { foreach (var process in Processes) { - process.Stop(); + await process.Stop(); } - return base.StopAsync(cancellationToken); + + await base.StopAsync(cancellationToken); } } diff --git a/test/MinimalKafka.Tests/KafkaProcessTests.cs b/test/MinimalKafka.Tests/KafkaProcessTests.cs index d90d79b..055f7ec 100644 --- a/test/MinimalKafka.Tests/KafkaProcessTests.cs +++ b/test/MinimalKafka.Tests/KafkaProcessTests.cs @@ -138,9 +138,9 @@ public async Task KafkaProcess_Start_ShouldLogErrorOnException() // Act - var task = () => process.Start(_cancellationTokenSource.Token); + var task = async () => await process.Start(_cancellationTokenSource.Token); - task.Should().Throw(); + await task.Should().ThrowAsync(); // Assert logger.Received(1).UnknownProcessException(new NotImplementedException().Message); @@ -148,10 +148,10 @@ public async Task KafkaProcess_Start_ShouldLogErrorOnException() } [Fact] - public void KafkaProcess_Stop_ShouldInvokeCloseMethod() + public async Task KafkaProcess_Stop_ShouldInvokeCloseMethod() { // Act - _kafkaProcess.Stop(); + await _kafkaProcess.Stop(); // Assert _consumer.Received(1).Close(); diff --git a/test/MinimalKafka.Tests/KafkaServiceTests.cs b/test/MinimalKafka.Tests/KafkaServiceTests.cs index 35bedcb..876cac6 100644 --- a/test/MinimalKafka.Tests/KafkaServiceTests.cs +++ b/test/MinimalKafka.Tests/KafkaServiceTests.cs @@ -46,7 +46,7 @@ public async Task KafkaService_StartAsync_ShouldStartAllProcesses() // Assert foreach (var process in _processes) { - process.Received(1).Start(Arg.Any()); + await process.Received(1).Start(Arg.Any()); } } @@ -63,7 +63,7 @@ public async Task KafkaService_StopAsync_ShouldStopAllProcesses() // Assert foreach (var process in _processes) { - process.Received(1).Stop(); + await process.Received(1).Stop(); } } diff --git a/tools/docker-compose.yml b/tools/docker-compose.yml new file mode 100644 index 0000000..82be96f --- /dev/null +++ b/tools/docker-compose.yml @@ -0,0 +1,71 @@ +name: RedPanda +services: + redpanda: + image: docker.redpanda.com/redpandadata/redpanda:v24.2.5 + container_name: redpanda + command: + - redpanda + - start + - --kafka-addr internal://0.0.0.0:9092,external://0.0.0.0:19092 + # Address the broker advertises to clients that connect to the Kafka API. + # Use the internal addresses to connect to the Redpanda brokers' + # from inside the same Docker network. + # Use the external addresses to connect to the Redpanda brokers' + # from outside the Docker network. + - --advertise-kafka-addr internal://redpanda:9092,external://localhost:19092 + - --pandaproxy-addr internal://0.0.0.0:8082,external://0.0.0.0:18082 + # Address the broker advertises to clients that connect to the HTTP Proxy. + - --advertise-pandaproxy-addr internal://redpanda:8082,external://localhost:18082 + - --schema-registry-addr internal://0.0.0.0:8081,external://0.0.0.0:18081 + # Redpanda brokers use the RPC API to communicate with each other internally. + - --rpc-addr redpanda:33145 + - --advertise-rpc-addr redpanda:33145 + # Mode dev-container uses well-known configuration properties for development in containers. + - --mode dev-container + # Tells Seastar (the framework Redpanda uses under the hood) to use 1 core on the system. + - --smp 1 + - --default-log-level=info + volumes: + - redpanda:/var/lib/redpanda/data + networks: + - redpanda_network + ports: + - 18081:18081 + - 18082:18082 + - 19092:19092 + - 19644:9644 + console: + container_name: redpanda-console + image: docker.redpanda.com/redpandadata/console:v2.7.2 + networks: + - redpanda_network + entrypoint: /bin/sh + command: -c 'echo "$$CONSOLE_CONFIG_FILE" > /tmp/config.yml; /app/console' + environment: + CONFIG_FILEPATH: /tmp/config.yml + CONSOLE_CONFIG_FILE: | + kafka: + brokers: ["redpanda:9092"] + schemaRegistry: + enabled: true + urls: ["http://redpanda:8081"] + redpanda: + adminApi: + enabled: true + urls: ["http://redpanda:9644"] + ports: + - 8080:8080 + depends_on: + - redpanda + +networks: + redpanda_network: + driver: bridge + +volumes: + redpanda: + driver: local + #driver_opts: + #o: bind + #type: rw + #device: /redpanda_data