From 89c7d1da1e5fee413c05575cf5dcffc9789b6067 Mon Sep 17 00:00:00 2001 From: Oskar Dudycz Date: Tue, 17 May 2022 18:29:35 +0200 Subject: [PATCH] Added EventEnvelopeExtensions for Kafka message deserialisation and aligned with EventStoreDB subscription to all to ignore messages that's not able to deserialise --- .../EventStoreDBSubscriptionToAll.cs | 6 ++--- Core.Kafka/Consumers/KafkaConsumer.cs | 27 ++++++++++++------- Core.Kafka/Consumers/KafkaConsumerConfig.cs | 2 ++ Core.Kafka/Events/EventEnvelopeExtensions.cs | 22 +++++++++++++++ .../Newtonsoft/SerializationExtensions.cs | 4 +-- Core.Testing/ApiFixture.cs | 9 ++++--- 6 files changed, 53 insertions(+), 17 deletions(-) create mode 100644 Core.Kafka/Events/EventEnvelopeExtensions.cs diff --git a/Core.EventStoreDB/Subscriptions/EventStoreDBSubscriptionToAll.cs b/Core.EventStoreDB/Subscriptions/EventStoreDBSubscriptionToAll.cs index 38f7637c9..dea1dd590 100644 --- a/Core.EventStoreDB/Subscriptions/EventStoreDBSubscriptionToAll.cs +++ b/Core.EventStoreDB/Subscriptions/EventStoreDBSubscriptionToAll.cs @@ -77,9 +77,9 @@ private async Task HandleEvent(StreamSubscription subscription, ResolvedEvent re { if (IsEventWithEmptyData(resolvedEvent) || IsCheckpointEvent(resolvedEvent)) return; - var streamEvent = resolvedEvent.ToEventEnvelope(); + var eventEnvelope = resolvedEvent.ToEventEnvelope(); - if (streamEvent == null) + if (eventEnvelope == null) { // That can happen if we're sharing database between modules. // If we're subscribing to all and not filtering out events from other modules, @@ -97,7 +97,7 @@ private async Task HandleEvent(StreamSubscription subscription, ResolvedEvent re } // publish event to internal event bus - await eventBus.Publish(streamEvent, ct); + await eventBus.Publish(eventEnvelope, ct); await checkpointRepository.Store(SubscriptionId, resolvedEvent.Event.Position.CommitPosition, ct); } diff --git a/Core.Kafka/Consumers/KafkaConsumer.cs b/Core.Kafka/Consumers/KafkaConsumer.cs index 2a75f960f..440d39248 100644 --- a/Core.Kafka/Consumers/KafkaConsumer.cs +++ b/Core.Kafka/Consumers/KafkaConsumer.cs @@ -1,12 +1,9 @@ using Confluent.Kafka; using Core.Events; using Core.Events.External; -using Core.Reflection; -using Core.Serialization.Newtonsoft; +using Core.Kafka.Events; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; -using Newtonsoft.Json; -using IEventBus = Core.Events.IEventBus; namespace Core.Kafka.Consumers; @@ -68,15 +65,27 @@ private async Task ConsumeNextEvent(IConsumer consumer, Cancella var message = consumer.Consume(cancellationToken); // get event type from name stored in message.Key - var eventType = TypeProvider.GetTypeFromAnyReferencingAssembly(message.Message.Key)!; + var eventEnvelope = message.ToEventEnvelope(); - var eventEnvelopeType = typeof(EventEnvelope<>).MakeGenericType(eventType); + if (eventEnvelope == null) + { + // That can happen if we're sharing database between modules. + // If we're subscribing to all and not filtering out events from other modules, + // then we might get events that are from other module and we might not be able to deserialize them. + // In that case it's safe to ignore deserialization error. + // You may add more sophisticated logic checking if it should be ignored or not. + logger.LogWarning("Couldn't deserialize event of type: {EventType}", message.Message.Key); + + if (!config.IgnoreDeserializationErrors) + throw new InvalidOperationException( + $"Unable to deserialize event {message.Message.Key}" + ); - // deserialize event - var @event = (IEventEnvelope)message.Message.Value.FromJson(eventEnvelopeType); + return; + } // publish event to internal event bus - await eventBus.Publish(@event, cancellationToken); + await eventBus.Publish(eventEnvelope, cancellationToken); consumer.Commit(); } diff --git a/Core.Kafka/Consumers/KafkaConsumerConfig.cs b/Core.Kafka/Consumers/KafkaConsumerConfig.cs index f386efe9c..08fd92fee 100644 --- a/Core.Kafka/Consumers/KafkaConsumerConfig.cs +++ b/Core.Kafka/Consumers/KafkaConsumerConfig.cs @@ -7,6 +7,8 @@ public class KafkaConsumerConfig { public ConsumerConfig? ConsumerConfig { get; set; } public string[]? Topics { get; set; } + + public bool IgnoreDeserializationErrors { get; set; } = true; } public static class KafkaConsumerConfigExtensions diff --git a/Core.Kafka/Events/EventEnvelopeExtensions.cs b/Core.Kafka/Events/EventEnvelopeExtensions.cs new file mode 100644 index 000000000..b2f944008 --- /dev/null +++ b/Core.Kafka/Events/EventEnvelopeExtensions.cs @@ -0,0 +1,22 @@ +using Confluent.Kafka; +using Core.Events; +using Core.Reflection; +using Core.Serialization.Newtonsoft; + +namespace Core.Kafka.Events; + +public static class EventEnvelopeExtensions +{ + public static IEventEnvelope? ToEventEnvelope(this ConsumeResult message) + { + var eventType = TypeProvider.GetTypeFromAnyReferencingAssembly(message.Message.Key); + + if (eventType == null) + return null; + + var eventEnvelopeType = typeof(EventEnvelope<>).MakeGenericType(eventType); + + // deserialize event + return message.Message.Value.FromJson(eventEnvelopeType) as IEventEnvelope; + } +} diff --git a/Core.Serialization/Newtonsoft/SerializationExtensions.cs b/Core.Serialization/Newtonsoft/SerializationExtensions.cs index 19f41b5be..4e8154040 100644 --- a/Core.Serialization/Newtonsoft/SerializationExtensions.cs +++ b/Core.Serialization/Newtonsoft/SerializationExtensions.cs @@ -40,10 +40,10 @@ public static T FromJson(this string json) /// json string /// object type /// deserialized object - public static object FromJson(this string json, Type type) + public static object? FromJson(this string json, Type type) { return JsonConvert.DeserializeObject(json, type, - new JsonSerializerSettings().WithNonDefaultConstructorContractResolver())!; + new JsonSerializerSettings().WithNonDefaultConstructorContractResolver()); } /// diff --git a/Core.Testing/ApiFixture.cs b/Core.Testing/ApiFixture.cs index 704932056..134a55014 100644 --- a/Core.Testing/ApiFixture.cs +++ b/Core.Testing/ApiFixture.cs @@ -40,13 +40,16 @@ public IReadOnlyCollection PublishedExternalCommandOfType() return externalCommandBus.SentCommands.OfType().ToList(); } - public async Task PublishInternalEvent(TEvent @event, CancellationToken ct = default) where TEvent : notnull + public Task PublishInternalEvent(TEvent @event, CancellationToken ct = default) where TEvent : notnull => + PublishInternalEvent( + new EventEnvelope(@event, new EventMetadata(Guid.NewGuid().ToString(), 0, 0, null)), ct); + + public async Task PublishInternalEvent(EventEnvelope eventEnvelope, CancellationToken ct = default) + where TEvent : notnull { using var scope = Server.Host.Services.CreateScope(); var eventBus = scope.ServiceProvider.GetRequiredService(); - //TODO: metadata should be taken by event bus internally - var eventEnvelope = new EventEnvelope(@event, new EventMetadata(Guid.NewGuid().ToString(), 0, 0, null)); await eventBus.Publish(eventEnvelope, ct); }