-
-
Notifications
You must be signed in to change notification settings - Fork 525
/
Copy pathKafkaConsumer.cs
124 lines (110 loc) · 4.76 KB
/
KafkaConsumer.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
using System.Diagnostics;
using Confluent.Kafka;
using Core.Events;
using Core.Events.External;
using Core.Kafka.Events;
using Core.OpenTelemetry;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using static Core.Extensions.DictionaryExtensions;
namespace Core.Kafka.Consumers;
public class KafkaConsumer: IExternalEventConsumer
{
private readonly KafkaConsumerConfig config;
private readonly IEventBus eventBus;
private readonly IActivityScope activityScope;
private readonly ILogger<KafkaConsumer> logger;
private readonly IConsumer<string, string> consumer;
public KafkaConsumer(
IConfiguration configuration,
IEventBus eventBus,
IActivityScope activityScope,
ILogger<KafkaConsumer> logger,
IConsumer<string, string>? consumer = null
)
{
this.eventBus = eventBus;
this.activityScope = activityScope;
this.logger = logger;
// get configuration from appsettings.json
config = configuration.GetKafkaConsumerConfig();
this.consumer = consumer ?? new ConsumerBuilder<string, string>(config.ConsumerConfig).Build();
}
public async Task StartAsync(CancellationToken cancellationToken)
{
logger.LogInformation("Kafka consumer started");
// subscribe to Kafka topics (taken from config)
consumer.Subscribe(config.Topics);
try
{
// keep consumer working until it get signal that it should be shuted down
while (!cancellationToken.IsCancellationRequested)
{
// consume event from Kafka
await ConsumeNextEvent(consumer, cancellationToken).ConfigureAwait(false);
}
}
catch (Exception e)
{
logger.LogError("Error consuming Kafka message: {Message} {StackTrace}", e.Message, e.StackTrace);
// Ensure the consumer leaves the group cleanly and final offsets are committed.
consumer.Close();
}
}
private async Task ConsumeNextEvent(IConsumer<string, string> consumer, CancellationToken token)
{
try
{
//lol ^_^ - remove this hack when this GH issue is solved: https://github.com/dotnet/extensions/issues/2149#issuecomment-518709751
await Task.Yield();
// wait for the upcoming message, consume it when arrives
var message = consumer.Consume(token);
// get event type from name stored in message.Key
var eventEnvelope = message.ToEventEnvelope();
if (eventEnvelope == null)
{
// That can happen if we're sharing database between modules.
// If we're subscribing to all and not filtering out events from other modules,
// then we might get events that are from other module and we might not be able to deserialize them.
// In that case it's safe to ignore deserialization error.
// You may add more sophisticated logic checking if it should be ignored or not.
logger.LogWarning("Couldn't deserialize event of type: {EventType}", message.Message.Key);
if (!config.IgnoreDeserializationErrors)
throw new InvalidOperationException(
$"Unable to deserialize event {message.Message.Key}"
);
return;
}
await activityScope.Run($"{nameof(KafkaConsumer)}/{nameof(ConsumeNextEvent)}",
async (_, ct) =>
{
// publish event to internal event bus
await eventBus.Publish(eventEnvelope, ct).ConfigureAwait(false);
consumer.Commit();
},
new StartActivityOptions
{
Tags = Merge(
TelemetryTags.Messaging.Kafka.ConsumerTags(
config.ConsumerConfig.GroupId,
message.Topic,
message.Message.Key,
message.Partition.Value.ToString(),
config.ConsumerConfig.GroupId
),
new Dictionary<string, object?>
{
{ TelemetryTags.EventHandling.Event, eventEnvelope.Data.GetType() }
}),
Parent = eventEnvelope.Metadata.PropagationContext?.ActivityContext,
Kind = ActivityKind.Consumer
},
token
).ConfigureAwait(false);
}
catch (Exception e)
{
logger.LogError("Error consuming Kafka message: {Message} {StackTrace}", e.Message, e.StackTrace);
}
}
}