Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,10 @@ public static TBuilder WithClientIdFormatter<TBuilder>(this TBuilder builder, Fu
return builder;
}

public static TBuilder WithAutoCommit<TBuilder>(this TBuilder builder, bool enabled = true)
where TBuilder : IKafkaConventionBuilder
{
builder.WithSingle(new AutoCommitMetaDataAttribute(enabled));
return builder;
}
}
11 changes: 9 additions & 2 deletions src/MinimalKafka/Helpers/Logging.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ internal static partial class Logging

[LoggerMessage(
EventId = 1,
Level = LogLevel.Critical,
Message = "Consumer with GroupId: '{GroupId}' and ClientId: '{ClientId}', Operation cancelled return Empty Context."
Level = LogLevel.Information,
Message = "Consumer with GroupId: '{GroupId}' and ClientId: '{ClientId}' was closed, Operation cancelled."
)]
public static partial void OperatonCanceled(this ILogger logger, string groupId, string clientId);

Expand Down Expand Up @@ -67,4 +67,11 @@ internal static partial class Logging
Message = "Consumer with GroupId: '{GroupId}' and ClientId: '{ClientId}' already closed."
)]
public static partial void ConsumerAlreadyClosed(this ILogger logger, string groupId, string clientId);

[LoggerMessage(
EventId = 9,
Level = LogLevel.Information,
Message = "Consumer with GroupId: '{GroupId}' and ClientId: '{ClientId}' committing offset."
)]
public static partial void Committing(this ILogger logger, string groupId, string clientId);
}
36 changes: 26 additions & 10 deletions src/MinimalKafka/KafkaConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public abstract class KafkaConsumer
{
public abstract ILogger Logger { get; }
public abstract void Subscribe();
public abstract KafkaContext Consume(CancellationToken cancellationToken);
public abstract Task Consume(KafkaDelegate kafkaDelegate, CancellationToken cancellationToken);

public abstract void Close();

Expand All @@ -32,9 +32,9 @@ public override void Close()
{
}

public override KafkaContext Consume(CancellationToken cancellationToken)
public override Task Consume(KafkaDelegate kafkaDelegate, CancellationToken cancellationToken)
{
return KafkaContext.Empty;
return Task.CompletedTask;
}

public override void Subscribe()
Expand All @@ -60,28 +60,41 @@ public class KafkaConsumer<TKey, TValue>(KafkaConsumerOptions options) : KafkaCo

public override ILogger Logger => options.KafkaLogger;

public override KafkaContext Consume(CancellationToken cancellationToken)
public override async Task Consume(KafkaDelegate kafkaDelegate, CancellationToken cancellationToken)
{
try
{
var scope = _serviceProvider.CreateScope();
using var scope = _serviceProvider.CreateScope();
var result = _consumer.Consume(cancellationToken);

if (++_recordsConsumed % _consumeReportInterval == 0)
{
Logger.RecordsConsumed(options.Metadata.GroupId(), options.Metadata.ClientId(), _recordsConsumed, result.Topic);
}

return KafkaContext.Create(result, scope.ServiceProvider, options.Metadata);
var context = KafkaContext.Create(result, scope.ServiceProvider, options.Metadata);

if (context is EmptyKafkaContext)
{
return;
}

await kafkaDelegate.Invoke(context);

if (options.Metadata.IsAutoCommitEnabled())
{
return;
}

Logger.Committing(options.Metadata.GroupId(), options.Metadata.ClientId());

_consumer.StoreOffset(result);
_consumer.Commit();
}
catch (OperationCanceledException ex)
when (ex.CancellationToken == cancellationToken)
{
Logger.OperatonCanceled(options.Metadata.GroupId(), options.Metadata.ClientId());

_consumer.Close();
_consumer.Dispose();
return KafkaContext.Empty;
}
}

Expand Down Expand Up @@ -123,6 +136,9 @@ public static string GroupId(this IReadOnlyList<object> metadata)

private static T? GetMetaData<T>(this IReadOnlyList<object> metaData)
=> metaData.OfType<T>().FirstOrDefault();

public static bool IsAutoCommitEnabled(this IReadOnlyList<object> metaData)
=> metaData.GetMetaData<IAutoCommitMetaData>()?.Enabled ?? false;
}


Expand Down
1 change: 1 addition & 0 deletions src/MinimalKafka/KafkaExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public static IServiceCollection AddMinimalKafka(this IServiceCollection service

configBuilder.WithClientId(AppDomain.CurrentDomain.FriendlyName);
configBuilder.WithGroupId(AppDomain.CurrentDomain.FriendlyName);
configBuilder.WithAutoCommit(false);
configBuilder.WithKeyDeserializer(typeof(JsonTextSerializer<>));
configBuilder.WithValueDeserializer(typeof(JsonTextSerializer<>));
configBuilder.WithTopicFormatter(topic => topic);
Expand Down
15 changes: 2 additions & 13 deletions src/MinimalKafka/KafkaProcess.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,7 @@ public async Task Start(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
var context = _consumer.Consume(cancellationToken);

if (context is EmptyKafkaContext)
{

_consumer.Logger.EmptyContext();

continue;
}

await _handler.Invoke(context);
await _consumer.Consume(_handler, cancellationToken);
}
}
catch(Exception ex)
Expand All @@ -58,9 +48,8 @@ public async Task Start(CancellationToken cancellationToken)
finally
{
_consumer.Logger.DropOutOfConsumeLoop();
_consumer.Close();
}

}

public async Task Stop()
Expand Down
23 changes: 18 additions & 5 deletions src/MinimalKafka/KafkaService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,28 @@ public IEnumerable<IKafkaProcess> Processes

private readonly List<Task> _runningTasks = [];


protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
using var cts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken);
foreach (var process in Processes)
{
var task = Task.Run(() => process.Start(stoppingToken), stoppingToken);
{
var task = Task.Run(async () =>
{
try
{
await process.Start(cts.Token);
}
catch (KafkaProcesException)
{
await cts.CancelAsync();
throw;
}

}
, cts.Token);
_runningTasks.Add(task);
}

}
await Task.WhenAll(_runningTasks);
}

Expand Down
19 changes: 19 additions & 0 deletions src/MinimalKafka/Metadata/AutoCommitMetaData.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using Confluent.Kafka;

namespace MinimalKafka.Metadata;
public interface IAutoCommitMetaData : IConsumerConfigMetadata
{
bool Enabled { get; }
}

[AttributeUsage(AttributeTargets.Method, Inherited = false, AllowMultiple = false)]
internal class AutoCommitMetaDataAttribute(bool enabled) : Attribute, IAutoCommitMetaData
{
public bool Enabled { get; } = enabled;

public void Set(ClientConfig config)
{
config.Set("enable.auto.commit", Enabled ? "true" : "false");
config.Set("enable.auto.offset.store", Enabled ? "true" : "false");
}
}
65 changes: 6 additions & 59 deletions test/MinimalKafka.Tests/KafkaProcessTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,8 @@ public async Task KafkaProcess_Start_ShouldInvokeSubscribeMethodOnce()
public async Task KafkaProcess_Start_ShouldInvokeHandlerWithValidContext()
{
// Arrange
var validContext = new TestKafkaContext();
var delay = false;
_consumer.Consume(Arg.Any<CancellationToken>()).Returns(x =>
{
if (delay)
{
Task.Delay(200).GetAwaiter().GetResult();
return KafkaContext.Empty;
}
delay = true;
return validContext;
}) ;
_consumer.Consume(Arg.Any<KafkaDelegate>(), Arg.Any<CancellationToken>())
.Returns(callInfo => callInfo.Arg<KafkaDelegate>().Invoke(new TestKafkaContext()));

var task = Task.Run(() => _kafkaProcess.Start(_cancellationTokenSource.Token));

Expand All @@ -70,70 +60,28 @@ public async Task KafkaProcess_Start_ShouldInvokeHandlerWithValidContext()
await Task.Delay(100);

// Assert
await _handler.Received(1).Invoke(validContext);
await _handler.ReceivedWithAnyArgs().Invoke(Arg.Any<KafkaContext>());
}

[Fact]
public async Task KafkaProcess_Start_ShouldLogErrorOnEmptyContext()
public async Task KafkaProcess_Start_ShouldLogErrorOnException()
{
// Arrange
var delay = false;

var logger = Substitute.For<ILogger>();

_consumer.Logger.Returns(logger);

_consumer.Consume(Arg.Any<CancellationToken>()).Returns(x =>
{
if (delay)
{
Task.Delay(200).GetAwaiter().GetResult();
return KafkaContext.Empty;
}
delay = true;
return KafkaContext.Empty;
});

var process = KafkaProcess.Create(new()
{
Consumer = _consumer,
Delegate = (c) =>
{
return Task.CompletedTask;
}
});

var task = Task.Run(() => process.Start(_cancellationTokenSource.Token));

// Act
_cancellationTokenSource.CancelAfter(100); // Stop the task after a short delay
await Task.Delay(100);

// Assert
logger.Received(1).EmptyContext();
await _handler.DidNotReceive().Invoke(Arg.Any<KafkaContext>());
}

[Fact]
public async Task KafkaProcess_Start_ShouldLogErrorOnException()
{
// Arrange
var logger = Substitute.For<ILogger>();

_consumer.Logger.Returns(logger);

_consumer.Consume(Arg.Any<CancellationToken>()).Returns(x =>
_consumer.Consume(Arg.Any<KafkaDelegate>(), Arg.Any<CancellationToken>()).Returns(x =>
{
throw new NotImplementedException();
});

var process = KafkaProcess.Create(new()
{
Consumer = _consumer,
Delegate = (c) =>
{
throw new NotImplementedException();
}
Delegate = (c) => Task.CompletedTask
});

// Act
Expand All @@ -144,7 +92,6 @@ public async Task KafkaProcess_Start_ShouldLogErrorOnException()

// Assert
logger.Received(1).UnknownProcessException(new NotImplementedException().Message);
await _handler.DidNotReceive().Invoke(Arg.Any<KafkaContext>());
}

[Fact]
Expand Down
9 changes: 7 additions & 2 deletions test/MinimalKafka.Tests/RegistrationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ static void config(IAddKafkaBuilder builder) =>
}

[Fact]
public void AddMinimalKafka_Should_Set_ClientId_And_GroupId_To_Default()
public void AddMinimalKafka_Should_Set_Default_Config()
{
var services = new ServiceCollection();

Expand All @@ -117,15 +117,20 @@ static void config(IAddKafkaBuilder builder) =>
.Should()
.ContainSingle(x => x is IClientIdMetadata)
.And
.ContainSingle(x => x is IGroupIdMetadata);
.ContainSingle(x => x is IGroupIdMetadata)
.And
.ContainSingle(x => x is IAutoCommitMetaData);

// Verify the actual values
var clientId = kafkaBuilder.MetaData
.OfType<IClientIdMetadata>().Single().ClientId;
var groupId = kafkaBuilder.MetaData
.OfType<IGroupIdMetadata>().Single().GroupId;
var autoCommit = kafkaBuilder.MetaData
.OfType<IAutoCommitMetaData>().Single().Enabled;

clientId.Should().Be(AppDomain.CurrentDomain.FriendlyName);
groupId.Should().Be(AppDomain.CurrentDomain.FriendlyName);
autoCommit.Should().BeFalse();
}
}