Skip to content

Commit

Permalink
Handle ProcessedMessageStatus in KafkaConsumer (#747)
Browse files Browse the repository at this point in the history
Co-authored-by: Sina.Keller <[email protected]>
  • Loading branch information
rngcntr and svkeller authored Feb 7, 2023
1 parent ba0a5bf commit 87c93ee
Show file tree
Hide file tree
Showing 21 changed files with 247 additions and 73 deletions.
1 change: 0 additions & 1 deletion examples/ConsumeAndPublishNATS/Model/IncomingMessage.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.Text.Json.Serialization;

namespace ConsumeAndPublishNATS.Model;

Expand Down
1 change: 0 additions & 1 deletion examples/ConsumeAndPublishNATS/Model/OutgoingMessage.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.Text.Json.Serialization;

namespace ConsumeAndPublishNATS.Model;

Expand Down
2 changes: 0 additions & 2 deletions examples/OpenTelemetryExample/SingleOutputService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using CloudNative.CloudEvents.Extensions;
using Motor.Extensions.Diagnostics.Telemetry;
using Motor.Extensions.Diagnostics.Tracing;
using OpenTelemetryExample.Model;
using Motor.Extensions.Hosting.Abstractions;
using Motor.Extensions.Hosting.CloudEvents;
Expand Down
2 changes: 1 addition & 1 deletion shared.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project>

<PropertyGroup>
<Version>0.11.0</Version>
<Version>0.11.1</Version>
<LangVersion>11</LangVersion>
<Nullable>enable</Nullable>
<WarningsAsErrors>CS8600;CS8602;CS8625;CS8618;CS8604;CS8601</WarningsAsErrors>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using System;
using System.Runtime.InteropServices;
using Microsoft.Extensions.Options;
using Motor.Extensions.Hosting.Abstractions;
using Motor.Extensions.Hosting.CloudEvents;
using Prometheus.Client;
using Prometheus.Client.AspNetCore;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Motor.Extensions.Hosting.CloudEvents;

namespace Motor.Extensions.Hosting.Abstractions;
Expand Down
107 changes: 64 additions & 43 deletions src/Motor.Extensions.Hosting.Kafka/KafkaMessageConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
using System.Threading.Tasks;
using CloudNative.CloudEvents;
using Confluent.Kafka;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Motor.Extensions.Diagnostics.Metrics.Abstractions;
using Motor.Extensions.Hosting.Abstractions;
using Motor.Extensions.Hosting.CloudEvents;
using Motor.Extensions.Hosting.Kafka.Options;
using Polly;
using Prometheus.Client;

namespace Motor.Extensions.Hosting.Kafka;
Expand All @@ -24,16 +26,19 @@ public sealed class KafkaMessageConsumer<TData> : IMessageConsumer<TData>, IDisp
private readonly IMetricFamily<IGauge>? _consumerLagGauge;
private readonly IMetricFamily<ISummary>? _consumerLagSummary;
private readonly ILogger<KafkaMessageConsumer<TData>> _logger;
private readonly IHostApplicationLifetime _applicationLifetime;
private IConsumer<string?, byte[]>? _consumer;
private readonly SemaphoreSlim _messageSemaphore;

public KafkaMessageConsumer(ILogger<KafkaMessageConsumer<TData>> logger,
IOptions<KafkaConsumerOptions<TData>> config,
IHostApplicationLifetime applicationLifetime,
IMetricsFactory<KafkaMessageConsumer<TData>>? metricsFactory,
IApplicationNameService applicationNameService,
CloudEventFormatter cloudEventFormatter)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_applicationLifetime = applicationLifetime;
_applicationNameService = applicationNameService ?? throw new ArgumentNullException(nameof(config));
_cloudEventFormatter = cloudEventFormatter;
_options = config.Value ?? throw new ArgumentNullException(nameof(config));
Expand Down Expand Up @@ -76,23 +81,23 @@ await Task.Run(async () =>
try
{
var msg = _consumer?.Consume(token);
if (msg != null && !msg.IsPartitionEOF)
if (msg is { IsPartitionEOF: false })
{
SingleMessageHandling(token, msg);
SingleMessageHandlingAsync(msg, token);
}
else
{
_logger.LogDebug("No messages received");
_logger.LogDebug(LogEvents.NoMessageReceived, "No messages received");
}
}
catch (OperationCanceledException)
{
_logger.LogInformation("Terminating Kafka listener...");
_logger.LogInformation(LogEvents.TerminatingKafkaListener, "Terminating Kafka listener...");
break;
}
catch (Exception e)
{
_logger.LogError(e, "Failed to receive message.", e);
_logger.LogError(LogEvents.MessageReceivedFailure, e, "Failed to receive message.");
}
}
}, token).ConfigureAwait(false);
Expand Down Expand Up @@ -162,47 +167,63 @@ private void WriteStatistics(string json)
}
}

private void SingleMessageHandling(CancellationToken token, ConsumeResult<string?, byte[]> msg)
private async Task SingleMessageHandlingAsync(ConsumeResult<string?, byte[]> msg, CancellationToken token)
{
_logger.LogDebug(
$"Received message from topic '{msg.Topic}:{msg.Partition}' with offset: '{msg.Offset}[{msg.TopicPartitionOffset}]'");
var cloudEvent = KafkaMessageToCloudEvent(msg.Message);

var taskAwaiter = ConsumeCallbackAsync?.Invoke(cloudEvent, token).GetAwaiter();
taskAwaiter?.OnCompleted(() =>
try
{
var processedMessageStatus = taskAwaiter?.GetResult();
_messageSemaphore.Release();
switch (processedMessageStatus)
{
case ProcessedMessageStatus.Success:
break;
case ProcessedMessageStatus.TemporaryFailure:
break;
case ProcessedMessageStatus.InvalidInput:
break;
case ProcessedMessageStatus.CriticalFailure:
break;
case ProcessedMessageStatus.Failure:
break;
default:
throw new ArgumentOutOfRangeException();
}
if (msg.Offset % _options.CommitPeriod != 0)
{
return;
}
_logger.LogDebug(LogEvents.ReceivedMessage,
"Received message from topic '{Topic}:{Partition}' with offset: '{Offset}[{TopicPartitionOffset}]'",
msg.Topic, msg.Partition, msg.Offset, msg.TopicPartitionOffset);
var cloudEvent = KafkaMessageToCloudEvent(msg.Message);

var retryPolicy = Policy
.HandleResult<ProcessedMessageStatus>(status => status == ProcessedMessageStatus.TemporaryFailure)
.WaitAndRetryAsync(_options.RetriesOnTemporaryFailure,
retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)));
var status = await retryPolicy.ExecuteAsync(() => ConsumeCallbackAsync!.Invoke(cloudEvent, token));
HandleMessageStatus(msg, status);
}
catch (Exception e)
{
_logger.LogCritical(LogEvents.MessageHandlingUnexpectedException, e,
"Unexpected exception in message handling");
_applicationLifetime.StopApplication();
}
}

try
{
_consumer?.Commit(msg);
}
catch (KafkaException e)
{
_logger.LogError($"Commit error: {e.Error.Reason}");
}
});
private void HandleMessageStatus(ConsumeResult<string?, byte[]> msg, ProcessedMessageStatus? status)
{
switch (status)
{
case ProcessedMessageStatus.Success:
case ProcessedMessageStatus.InvalidInput:
case ProcessedMessageStatus.Failure:
if (msg.Offset.Value % _options.CommitPeriod == 0)
{
try
{
_consumer?.Commit(msg);
}
catch (KafkaException e)
{
_logger.LogError(LogEvents.CommitError, e, "Commit error: {Reason}", e.Error.Reason);
}
}
_messageSemaphore.Release();
break;
case ProcessedMessageStatus.TemporaryFailure:
_logger.LogWarning(LogEvents.FailureDespiteRetrying,
"Message consume fails despite retrying");
_applicationLifetime.StopApplication();
break;
case ProcessedMessageStatus.CriticalFailure:
_logger.LogWarning(LogEvents.CriticalFailureOnConsume,
"Message consume fails with critical failure");
_applicationLifetime.StopApplication();
break;
default:
throw new ArgumentOutOfRangeException(nameof(status), status, "Unhandled ProcessedMessageStatus");
}
}

public MotorCloudEvent<byte[]> KafkaMessageToCloudEvent(Message<string?, byte[]> msg)
Expand Down
17 changes: 17 additions & 0 deletions src/Motor.Extensions.Hosting.Kafka/LogEvents.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using Microsoft.Extensions.Logging;

namespace Motor.Extensions.Hosting.Kafka;

public static class LogEvents
{
public static readonly EventId CriticalFailureOnConsume = new(0, nameof(CriticalFailureOnConsume));
public static readonly EventId FailureDespiteRetrying = new(1, nameof(FailureDespiteRetrying));
public static readonly EventId CommitError = new(2, nameof(CommitError));
public static readonly EventId NoMessageReceived = new(3, nameof(NoMessageReceived));
public static readonly EventId TerminatingKafkaListener = new(4, nameof(TerminatingKafkaListener));
public static readonly EventId MessageReceivedFailure = new(5, nameof(MessageReceivedFailure));
public static readonly EventId ReceivedMessage = new(6, nameof(ReceivedMessage));

public static readonly EventId MessageHandlingUnexpectedException =
new(7, nameof(MessageHandlingUnexpectedException));
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
<PackageReference Include="CloudNative.CloudEvents.SystemTextJson" Version="2.3.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="7.0.0" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="7.0.0" />
<PackageReference Include="Polly" Version="7.2.3" />
<PackageReference Include="System.Text.Json" Version="7.0.1" />
</ItemGroup>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ public KafkaConsumerOptions()
public string? Topic { get; set; }
public int CommitPeriod { get; set; } = 1000;
public int MaxConcurrentMessages { get; set; } = 1000;
public int RetriesOnTemporaryFailure { get; set; } = 10;
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
using System;
using Motor.Extensions.ContentEncoding.Abstractions;
using Motor.Extensions.Hosting.CloudEvents;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace Motor.Extensions.Hosting.RabbitMQ;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
using System.Text;
using CloudNative.CloudEvents;
using Motor.Extensions.ContentEncoding.Abstractions;
using Motor.Extensions.Hosting.Abstractions;
using Motor.Extensions.Hosting.CloudEvents;
using Motor.Extensions.Hosting.RabbitMQ.Options;
using RabbitMQ.Client;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using System;
using System.ComponentModel.DataAnnotations;
using System.Net.Security;
using System.Security.Authentication;
using System.Threading;
using Motor.Extensions.Hosting.RabbitMQ.Options;
Expand Down
4 changes: 0 additions & 4 deletions src/Motor.Extensions.TestUtilities/MotorCloudEvent.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
using System;
using System.Collections.Generic;
using System.Linq;
using CloudNative.CloudEvents;
using Motor.Extensions.Hosting.Abstractions;
using Motor.Extensions.Hosting.CloudEvents;

namespace Motor.Extensions.TestUtilities;
Expand Down
1 change: 0 additions & 1 deletion src/Motor.Extensions.Utilities/MotorHost.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
using Motor.Extensions.Diagnostics.Metrics;
using Motor.Extensions.Diagnostics.Telemetry;
using Motor.Extensions.Hosting;
using Motor.Extensions.Hosting.Abstractions;
using Motor.Extensions.Hosting.CloudEvents;
using Motor.Extensions.Http;
using Motor.Extensions.Utilities.Abstractions;
Expand Down
Loading

0 comments on commit 87c93ee

Please sign in to comment.