Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 5 additions & 39 deletions examples/Examples/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,46 +19,12 @@

var app = builder.Build();

app.MapStream<Guid, Command>("commands")
.SplitInto(branches =>
{
branches.Branch((_, v) => v.Name == "cmd1", (_, _, _) => Task.CompletedTask);
branches.Branch((_, v) => v.Name == "cmd2", (_, _, _) => Task.CompletedTask);
branches.DefaultBranch((_,_,_) => Task.CompletedTask);
});
app.MapTopic("test", (KafkaContext context) => {

Console.WriteLine("Test topic received message: " + context.Value);

app.MapStream<Guid, LeftObject>("left")
.Join<Guid, RightObject>("right").On((l, r) => l.RightObjectId == r.Id)
.SplitInto(branches =>
{
branches.Branch((_, _) => true, (_, _, _) => Task.CompletedTask);
});
throw new Exception("Test exception");

app.MapStream<Guid, LeftObject>("left")
.Join<int, RightObject>("right").On((l, r) => l.RightObjectId == r.Id)
.Into(async (c, value) =>
{
var (left, right) = value;
var result = new ResultObject(left.Id, right);
Console.WriteLine($"multi into - {left.Id} - {result}");
await c.ProduceAsync("result", left.Id, new ResultObject(left.Id, right));
})
.WithGroupId($"multi-{Guid.NewGuid()}")
.WithClientId("multi");

app.MapStream<Guid,LeftObject>("left")
.Join<Guid, RightObject>("right")
.OnKey()
.Into("string");


app.MapStream<Guid, LeftObject>("left")
.Into((_, k, v) =>
{
Console.WriteLine($"single Into - {k} - {v}");
return Task.CompletedTask;
})
.WithGroupId($"single-{Guid.NewGuid()}")
.WithClientId("single");
});

await app.RunAsync();
7 changes: 7 additions & 0 deletions src/MinimalKafka/Helpers/Logging.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,11 @@ internal static partial class Logging
Message = "Consumer returned an Exception!. {message}"
)]
public static partial void UnknownProcessException(this ILogger logger, string message);

[LoggerMessage(
EventId = 8,
Level = LogLevel.Information,
Message = "Consumer with GroupId: '{GroupId}' and ClientId: '{ClientId}' already closed."
)]
public static partial void ConsumerAlreadyClosed(this ILogger logger, string groupId, string clientId);
}
10 changes: 8 additions & 2 deletions src/MinimalKafka/KafkaConsumer.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using Confluent.Kafka;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MinimalKafka.Helpers;
using MinimalKafka.Metadata;

Expand Down Expand Up @@ -53,6 +52,7 @@ public class KafkaConsumer<TKey, TValue>(KafkaConsumerOptions options) : KafkaCo
private readonly IConsumer<TKey, TValue> _consumer =
new KafkaConsumerBuilder<TKey, TValue>(options.Metadata, options.ServiceProvider).Build();

private bool _isClosed = false;
private long _recordsConsumed;
private readonly int _consumeReportInterval =
options.Metadata.OfType<ReportIntervalMetadata>().FirstOrDefault()?.ReportInterval
Expand Down Expand Up @@ -87,8 +87,14 @@ public override KafkaContext Consume(CancellationToken cancellationToken)

public override void Close()
{
if(_isClosed)
{
Logger.ConsumerAlreadyClosed(options.Metadata.GroupId(), options.Metadata.ClientId());
return;
}

_isClosed = true;
Logger.ConsumerClosed(options.Metadata.GroupId(), options.Metadata.ClientId());

_consumer.Close();
_consumer.Dispose();
}
Expand Down
2 changes: 1 addition & 1 deletion src/MinimalKafka/KafkaDelegateFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ private static MethodCallExpression CreateMethodCall(MethodInfo methodInfo, Expr

factoryContext.ArgumentTypes = new Type[parameters.Length];
factoryContext.BoxedArgs = new Expression[parameters.Length];
factoryContext.Parameters = new List<ParameterInfo>(parameters);
factoryContext.Parameters = [.. parameters];

for (var i = 0; i < parameters.Length; i++)
{
Expand Down
14 changes: 7 additions & 7 deletions src/MinimalKafka/KafkaProcess.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
using Microsoft.Extensions.Logging;
using MinimalKafka.Helpers;
using MinimalKafka.Helpers;

namespace MinimalKafka;
public interface IKafkaProcess
{
void Start(CancellationToken cancellationToken);
void Stop();
Task Start(CancellationToken cancellationToken);
Task Stop();
}

public class KafkaProcessOptions
Expand All @@ -30,7 +29,7 @@ KafkaDelegate handler
public static KafkaProcess Create(KafkaProcessOptions options)
=> new(options.Consumer, options.Delegate);

public void Start(CancellationToken cancellationToken)
public async Task Start(CancellationToken cancellationToken)
{
_consumer.Subscribe();

Expand All @@ -48,7 +47,7 @@ public void Start(CancellationToken cancellationToken)
continue;
}

_handler.Invoke(context);
await _handler.Invoke(context);
}
}
catch(Exception ex)
Expand All @@ -64,9 +63,10 @@ public void Start(CancellationToken cancellationToken)

}

public void Stop()
public async Task Stop()
{
_consumer.Close();
await Task.CompletedTask;
}
}

Expand Down
7 changes: 4 additions & 3 deletions src/MinimalKafka/KafkaService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
await Task.WhenAll(_runningTasks);
}

public override Task StopAsync(CancellationToken cancellationToken)
public override async Task StopAsync(CancellationToken cancellationToken)
{
foreach (var process in Processes)
{
process.Stop();
await process.Stop();
}
return base.StopAsync(cancellationToken);

await base.StopAsync(cancellationToken);
}
}
8 changes: 4 additions & 4 deletions test/MinimalKafka.Tests/KafkaProcessTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -138,20 +138,20 @@ public async Task KafkaProcess_Start_ShouldLogErrorOnException()

// Act

var task = () => process.Start(_cancellationTokenSource.Token);
var task = async () => await process.Start(_cancellationTokenSource.Token);

task.Should().Throw<KafkaProcesException>();
await task.Should().ThrowAsync<KafkaProcesException>();

// Assert
logger.Received(1).UnknownProcessException(new NotImplementedException().Message);
await _handler.DidNotReceive().Invoke(Arg.Any<KafkaContext>());
}

[Fact]
public void KafkaProcess_Stop_ShouldInvokeCloseMethod()
public async Task KafkaProcess_Stop_ShouldInvokeCloseMethod()
{
// Act
_kafkaProcess.Stop();
await _kafkaProcess.Stop();

// Assert
_consumer.Received(1).Close();
Expand Down
4 changes: 2 additions & 2 deletions test/MinimalKafka.Tests/KafkaServiceTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public async Task KafkaService_StartAsync_ShouldStartAllProcesses()
// Assert
foreach (var process in _processes)
{
process.Received(1).Start(Arg.Any<CancellationToken>());
await process.Received(1).Start(Arg.Any<CancellationToken>());
}
}

Expand All @@ -63,7 +63,7 @@ public async Task KafkaService_StopAsync_ShouldStopAllProcesses()
// Assert
foreach (var process in _processes)
{
process.Received(1).Stop();
await process.Received(1).Stop();
}
}

Expand Down
71 changes: 71 additions & 0 deletions tools/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
name: RedPanda
services:
redpanda:
image: docker.redpanda.com/redpandadata/redpanda:v24.2.5
container_name: redpanda
command:
- redpanda
- start
- --kafka-addr internal://0.0.0.0:9092,external://0.0.0.0:19092
# Address the broker advertises to clients that connect to the Kafka API.
# Use the internal addresses to connect to the Redpanda brokers'
# from inside the same Docker network.
# Use the external addresses to connect to the Redpanda brokers'
# from outside the Docker network.
- --advertise-kafka-addr internal://redpanda:9092,external://localhost:19092
- --pandaproxy-addr internal://0.0.0.0:8082,external://0.0.0.0:18082
# Address the broker advertises to clients that connect to the HTTP Proxy.
- --advertise-pandaproxy-addr internal://redpanda:8082,external://localhost:18082
- --schema-registry-addr internal://0.0.0.0:8081,external://0.0.0.0:18081
# Redpanda brokers use the RPC API to communicate with each other internally.
- --rpc-addr redpanda:33145
- --advertise-rpc-addr redpanda:33145
# Mode dev-container uses well-known configuration properties for development in containers.
- --mode dev-container
# Tells Seastar (the framework Redpanda uses under the hood) to use 1 core on the system.
- --smp 1
- --default-log-level=info
volumes:
- redpanda:/var/lib/redpanda/data
networks:
- redpanda_network
ports:
- 18081:18081
- 18082:18082
- 19092:19092
- 19644:9644
console:
container_name: redpanda-console
image: docker.redpanda.com/redpandadata/console:v2.7.2
networks:
- redpanda_network
entrypoint: /bin/sh
command: -c 'echo "$$CONSOLE_CONFIG_FILE" > /tmp/config.yml; /app/console'
environment:
CONFIG_FILEPATH: /tmp/config.yml
CONSOLE_CONFIG_FILE: |
kafka:
brokers: ["redpanda:9092"]
schemaRegistry:
enabled: true
urls: ["http://redpanda:8081"]
redpanda:
adminApi:
enabled: true
urls: ["http://redpanda:9644"]
ports:
- 8080:8080
depends_on:
- redpanda

networks:
redpanda_network:
driver: bridge

volumes:
redpanda:
driver: local
#driver_opts:
#o: bind
#type: rw
#device: /redpanda_data