Skip to content

Commit 8274516

Browse files
committed
Refactors Kafka consumer and adds features
This commit refactors the Kafka consumer implementation for improved modularity and testability. It introduces a configuration object for Kafka consumers, centralizing configuration settings. Also adds handlers, offset management and report interval functionality.
1 parent 309cc73 commit 8274516

File tree

14 files changed

+321
-130
lines changed

14 files changed

+321
-130
lines changed

examples/Examples/Aggregate/TestAggregate.cs

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
1-
using Microsoft.AspNetCore.Mvc.TagHelpers.Cache;
2-
3-
namespace Examples.Aggregate;
1+
namespace Examples.Aggregate;
42

53
public class TestCommands : IAggregateCommands<Guid>
64
{
75
public Guid Id { get; init; } = Guid.NewGuid();
86
public required int Version { get; init; }
97
public required string CommandName { get; init; }
108

9+
public SetCounter? SetCounter { get; set; }
10+
1111
}
12+
13+
public record SetCounter(int Counter);
14+
15+
1216
public record Test : IAggregate<Guid, Test, TestCommands>
1317
{
1418
public Guid Id { get; init; }
@@ -25,6 +29,7 @@ public static Result<Test> Apply(Test state, TestCommands command)
2529
nameof(Create) => Create(command),
2630
nameof(Increment) => state.Increment(),
2731
nameof(Decrement) => state.Decrement(),
32+
nameof(SetCounter) => state.SetCounter(command.SetCounter!),
2833
_ => Result.Failed(state, "Unknown command: " + command.CommandName)
2934
};
3035

@@ -61,4 +66,22 @@ public Result<Test> Decrement()
6166
Counter = Counter - 1
6267
};
6368
}
69+
70+
public Result<Test> SetCounter(SetCounter cmd)
71+
{
72+
if(cmd.Counter < 0)
73+
{
74+
return Result.Failed(this, "Counter cannot be less than 0.");
75+
}
76+
77+
if(cmd.Counter > 100)
78+
{
79+
return Result.Failed(this, "Counter connot be more then 100.");
80+
}
81+
82+
return this with
83+
{
84+
Counter = cmd.Counter
85+
};
86+
}
6487
}

examples/Examples/Program.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
using Confluent.Kafka;
12
using Examples;
23
using Examples.Aggregate;
34
using MinimalKafka;
@@ -9,12 +10,12 @@
910
{
1011
config
1112
.WithConfiguration(builder.Configuration.GetSection("Kafka"))
12-
//.WithBootstrapServers("nas:9092")
13+
.WithBootstrapServers("nas:9092")
1314
.WithGroupId(AppDomain.CurrentDomain.FriendlyName)
1415
.WithClientId(AppDomain.CurrentDomain.FriendlyName)
1516
.WithTransactionalId(AppDomain.CurrentDomain.FriendlyName)
16-
//.WithOffsetReset(AutoOffsetReset.Earliest)
17-
//.WithPartitionAssignedHandler((_, p) => p.Select(tp => new TopicPartitionOffset(tp, Offset.Beginning)))
17+
.WithOffsetReset(AutoOffsetReset.Earliest)
18+
.WithPartitionAssignedHandler((_, p) => p.Select(tp => new TopicPartitionOffset(tp, Offset.Beginning)))
1819
.WithJsonSerializers()
1920
.UseRocksDB(x =>
2021
{

src/MinimalKafka/Builders/KafkaConsumerBuilder.cs

Lines changed: 5 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
using Confluent.Kafka;
22
using Microsoft.Extensions.DependencyInjection;
3-
using Microsoft.Extensions.Logging;
43
using MinimalKafka.Internals;
5-
using MinimalKafka.Metadata;
6-
using MinimalKafka.Metadata.Internals;
74

85
namespace MinimalKafka.Builders;
96

@@ -13,55 +10,15 @@ internal class KafkaConsumerBuilder(KafkaConsumerKey key, IKafkaBuilder builder)
1310

1411
public IKafkaConsumer Build()
1512
{
16-
var consumer = CreateConsumer();
17-
var producer = builder.ServiceProvider.GetRequiredService<IKafkaProducer>();
18-
var logger = builder.ServiceProvider.GetRequiredService<ILogger<KafkaConsumer>>();
19-
var formatter = builder.ServiceProvider.GetRequiredService<KafkaTopicFormatter>();
20-
21-
return new KafkaConsumer(
22-
key,
23-
true,
24-
consumer,
25-
producer,
26-
[.. Delegates],
27-
formatter,
28-
[.. builder.MetaData],
13+
var conf = KafkaConsumerConfig.Create(key, Delegates, builder.MetaData);
14+
15+
return ActivatorUtilities.CreateInstance<KafkaConsumer>(
2916
builder.ServiceProvider,
30-
logger
17+
conf
3118
);
3219
}
3320

34-
private IConsumer<byte[], byte[]> CreateConsumer()
35-
{
36-
var config = builder.MetaData.OfType<IConfigMetadata>().First();
37-
var handlers = builder.MetaData.OfType<IConsumerHandlerMetadata>().FirstOrDefault() ??
38-
new ConsumerHandlerMetadata();
39-
40-
return new ConsumerBuilder<byte[], byte[]>(config.ConsumerConfig.AsEnumerable())
41-
.SetKeyDeserializer(Deserializers.ByteArray)
42-
.SetValueDeserializer(Deserializers.ByteArray)
43-
.SetStatisticsHandler(handlers?.StatisticsHandler)
44-
.SetErrorHandler(handlers?.ErrorHandler)
45-
.SetLogHandler(handlers?.LogHandler)
46-
.SetPartitionsAssignedHandler(handlers?.PartitionsAssignedHandler)
47-
.SetPartitionsLostHandler(handlers?.PartitionsLostHandler)
48-
.SetPartitionsRevokedHandler((c, partitions) =>
49-
{
50-
Console.WriteLine($"Partitions revoked: [{string.Join(", ", partitions)}]");
51-
try
52-
{
53-
handlers?.PartitionsRevokedHandler?.Invoke(c, partitions);
54-
55-
//c.Commit(); // commit offsets before losing partitions
56-
Console.WriteLine("Offsets committed on revoke.");
57-
}
58-
catch (KafkaException e) when (e.Error.Code == ErrorCode.Local_State)
59-
{
60-
Console.WriteLine($"Skip commit on revoke: {e.Error.Reason}");
61-
}
62-
})
63-
.Build();
64-
}
21+
6522

6623
internal void AddDelegate(KafkaDelegate del)
6724
{

src/MinimalKafka/Internals/KafkaConsumer.cs

Lines changed: 58 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,22 +5,38 @@
55

66
namespace MinimalKafka.Internals;
77

8+
internal class KafkaConsumerConfig
9+
{
10+
public required KafkaConsumerKey Key { get; init; }
11+
public required IReadOnlyList<KafkaDelegate> Delegates { get; init; }
12+
public required IReadOnlyList<object> Metadata { get; init; }
13+
14+
internal static KafkaConsumerConfig Create(KafkaConsumerKey key, List<KafkaDelegate> delegates, List<object> metaData)
15+
=> new()
16+
{
17+
Key = key,
18+
Delegates = [.. delegates],
19+
Metadata = [.. metaData]
20+
};
21+
}
22+
823
internal class KafkaConsumer(
9-
KafkaConsumerKey consumerKey,
10-
bool autoCommitEnabled,
11-
IConsumer<byte[], byte[]> consumer,
24+
KafkaConsumerConfig config,
1225
IKafkaProducer producer,
13-
KafkaDelegate[] kafkaDelegates,
1426
KafkaTopicFormatter topicFormatter,
15-
IReadOnlyList<object> metadata,
1627
IServiceProvider serviceProvider,
1728
ILogger<KafkaConsumer> logger) : IKafkaConsumer
1829
{
30+
private long _recordsConsumed;
31+
private readonly int _reportInterval = config.Metadata.ReportInterval();
32+
private readonly bool _autoCommitEnabled = config.Metadata.AutoCommitEnabled();
33+
private readonly IConsumer<byte[], byte[]> _consumer = CreateConsumer(config.Metadata);
34+
1935
public void Subscribe()
2036
{
21-
var topic = topicFormatter(consumerKey.TopicName);
22-
consumer.Subscribe(topic);
23-
logger.Subscribed(consumerKey.GroupId, consumerKey.ClientId, topic);
37+
var topic = topicFormatter(config.Key.TopicName);
38+
_consumer.Subscribe(topic);
39+
logger.Subscribed(config.Key.GroupId, config.Key.ClientId, topic);
2440
}
2541

2642
public async Task Consume(CancellationToken cancellationToken)
@@ -29,15 +45,20 @@ public async Task Consume(CancellationToken cancellationToken)
2945
{
3046
await using var scope = serviceProvider.CreateAsyncScope();
3147

32-
var result = consumer.Consume(cancellationToken);
48+
var result = _consumer.Consume(cancellationToken);
3349

34-
var context = KafkaContext.Create(consumerKey, result.Message, scope.ServiceProvider, metadata);
50+
if (++_recordsConsumed % _reportInterval == 0)
51+
{
52+
logger.RecordsConsumed(config.Key.GroupId, config.Key.ClientId, _recordsConsumed, result.Topic);
53+
}
54+
55+
var context = KafkaContext.Create(config, result.Message, scope.ServiceProvider);
3556

3657
var store = context.GetTopicStore();
3758

3859
await store.AddOrUpdate(context.Key, context.Value);
3960

40-
foreach (var kafkaDelegate in kafkaDelegates)
61+
foreach (var kafkaDelegate in config.Delegates)
4162
{
4263
await kafkaDelegate.Invoke(context);
4364
}
@@ -50,37 +71,54 @@ public async Task Consume(CancellationToken cancellationToken)
5071
when(ex.CancellationToken == cancellationToken)
5172
{
5273

53-
logger.OperatonCanceled(consumerKey.GroupId, consumerKey.ClientId);
74+
logger.OperatonCanceled(config.Key.GroupId, config.Key.ClientId);
5475
}
5576
}
5677

5778
public void Close()
5879
{
5980
if (_isClosed)
6081
{
61-
logger.ConsumerAlreadyClosed(consumerKey.GroupId, consumerKey.ClientId);
82+
logger.ConsumerAlreadyClosed(config.Key.GroupId, config.Key.ClientId);
6283
return;
6384
}
6485

6586
_isClosed = true;
6687

67-
consumer.Close();
68-
consumer.Dispose();
69-
logger.ConsumerClosed(consumerKey.GroupId, consumerKey.ClientId);
88+
_consumer.Close();
89+
_consumer.Dispose();
90+
logger.ConsumerClosed(config.Key.GroupId, config.Key.ClientId);
7091
}
7192

7293
private bool _isClosed;
7394

7495
private void Commit(ConsumeResult<byte[], byte[]> result)
7596
{
76-
if (!autoCommitEnabled)
97+
if (!_autoCommitEnabled)
7798
{
78-
logger.Committing(consumerKey.GroupId, consumerKey.ClientId);
99+
logger.Committing(config.Key.GroupId, config.Key.ClientId);
79100

80-
consumer.StoreOffset(result);
81-
consumer.Commit();
101+
_consumer.StoreOffset(result);
102+
_consumer.Commit();
82103
}
83104
}
105+
106+
private static IConsumer<byte[], byte[]> CreateConsumer(IReadOnlyList<object> metadata)
107+
{
108+
var config = metadata.ConsumerConfig();
109+
var handlers = metadata.ConsumerHandlers();
110+
111+
return new ConsumerBuilder<byte[], byte[]>(config)
112+
.SetKeyDeserializer(Deserializers.ByteArray)
113+
.SetValueDeserializer(Deserializers.ByteArray)
114+
.SetStatisticsHandler(handlers?.StatisticsHandler)
115+
.SetErrorHandler(handlers?.ErrorHandler)
116+
.SetLogHandler(handlers?.LogHandler)
117+
.SetPartitionsAssignedHandler(handlers?.PartitionsAssignedHandler)
118+
.SetPartitionsLostHandler(handlers?.PartitionsLostHandler)
119+
.SetPartitionsRevokedHandler(handlers?.PartitionsRevokedHandler)
120+
.Build();
121+
}
84122
}
85123

86124

src/MinimalKafka/Internals/KafkaContextProducer.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ public async Task ProduceAsync(KafkaContext ctx, CancellationToken ct)
5858

5959
public async Task ProduceAsync<TKey, TValue>(string topic, TKey key, TValue value, Dictionary<string, string>? header = null)
6060
{
61-
var context = KafkaContext.Create(KafkaConsumerKey.Random(Guid.NewGuid().ToString()), new() { Key = [], Value = [] }, _serviceProvider, []);
61+
var config = KafkaConsumerConfig.Create(KafkaConsumerKey.Random(Guid.NewGuid().ToString()), [], []);
62+
var context = KafkaContext.Create(config, new() { Key = [], Value = [] }, _serviceProvider);
6263
await context.ProduceAsync(topic, key, value, header);
6364
await ProduceAsync(context, CancellationToken.None);
6465
}

0 commit comments

Comments
 (0)