Skip to content

Commit

Permalink
Added EventEnvelopeExtensions for Kafka message deserialisation and a…
Browse files Browse the repository at this point in the history
…ligned with EventStoreDB subscription to all to ignore messages that's not able to deserialise
  • Loading branch information
oskardudycz committed May 17, 2022
1 parent db4e782 commit 89c7d1d
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ private async Task HandleEvent(StreamSubscription subscription, ResolvedEvent re
{
if (IsEventWithEmptyData(resolvedEvent) || IsCheckpointEvent(resolvedEvent)) return;

var streamEvent = resolvedEvent.ToEventEnvelope();
var eventEnvelope = resolvedEvent.ToEventEnvelope();

if (streamEvent == null)
if (eventEnvelope == null)
{
// That can happen if we're sharing database between modules.
// If we're subscribing to all and not filtering out events from other modules,
Expand All @@ -97,7 +97,7 @@ private async Task HandleEvent(StreamSubscription subscription, ResolvedEvent re
}

// publish event to internal event bus
await eventBus.Publish(streamEvent, ct);
await eventBus.Publish(eventEnvelope, ct);

await checkpointRepository.Store(SubscriptionId, resolvedEvent.Event.Position.CommitPosition, ct);
}
Expand Down
27 changes: 18 additions & 9 deletions Core.Kafka/Consumers/KafkaConsumer.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
using Confluent.Kafka;
using Core.Events;
using Core.Events.External;
using Core.Reflection;
using Core.Serialization.Newtonsoft;
using Core.Kafka.Events;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using IEventBus = Core.Events.IEventBus;

namespace Core.Kafka.Consumers;

Expand Down Expand Up @@ -68,15 +65,27 @@ private async Task ConsumeNextEvent(IConsumer<string, string> consumer, Cancella
var message = consumer.Consume(cancellationToken);

// get event type from name stored in message.Key
var eventType = TypeProvider.GetTypeFromAnyReferencingAssembly(message.Message.Key)!;
var eventEnvelope = message.ToEventEnvelope();

var eventEnvelopeType = typeof(EventEnvelope<>).MakeGenericType(eventType);
if (eventEnvelope == null)
{
// That can happen if we're sharing database between modules.
// If we're subscribing to all and not filtering out events from other modules,
// then we might get events that are from other module and we might not be able to deserialize them.
// In that case it's safe to ignore deserialization error.
// You may add more sophisticated logic checking if it should be ignored or not.
logger.LogWarning("Couldn't deserialize event of type: {EventType}", message.Message.Key);

if (!config.IgnoreDeserializationErrors)
throw new InvalidOperationException(
$"Unable to deserialize event {message.Message.Key}"
);

// deserialize event
var @event = (IEventEnvelope)message.Message.Value.FromJson(eventEnvelopeType);
return;
}

// publish event to internal event bus
await eventBus.Publish(@event, cancellationToken);
await eventBus.Publish(eventEnvelope, cancellationToken);

consumer.Commit();
}
Expand Down
2 changes: 2 additions & 0 deletions Core.Kafka/Consumers/KafkaConsumerConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ public class KafkaConsumerConfig
{
public ConsumerConfig? ConsumerConfig { get; set; }
public string[]? Topics { get; set; }

public bool IgnoreDeserializationErrors { get; set; } = true;
}

public static class KafkaConsumerConfigExtensions
Expand Down
22 changes: 22 additions & 0 deletions Core.Kafka/Events/EventEnvelopeExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
using Confluent.Kafka;
using Core.Events;
using Core.Reflection;
using Core.Serialization.Newtonsoft;

namespace Core.Kafka.Events;

public static class EventEnvelopeExtensions
{
public static IEventEnvelope? ToEventEnvelope(this ConsumeResult<string, string> message)
{
var eventType = TypeProvider.GetTypeFromAnyReferencingAssembly(message.Message.Key);

if (eventType == null)
return null;

var eventEnvelopeType = typeof(EventEnvelope<>).MakeGenericType(eventType);

// deserialize event
return message.Message.Value.FromJson(eventEnvelopeType) as IEventEnvelope;
}
}
4 changes: 2 additions & 2 deletions Core.Serialization/Newtonsoft/SerializationExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ public static T FromJson<T>(this string json)
/// <param name="json">json string</param>
/// <param name="type">object type</param>
/// <returns>deserialized object</returns>
public static object FromJson(this string json, Type type)
public static object? FromJson(this string json, Type type)
{
return JsonConvert.DeserializeObject(json, type,
new JsonSerializerSettings().WithNonDefaultConstructorContractResolver())!;
new JsonSerializerSettings().WithNonDefaultConstructorContractResolver());
}

/// <summary>
Expand Down
9 changes: 6 additions & 3 deletions Core.Testing/ApiFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,16 @@ public IReadOnlyCollection<TCommand> PublishedExternalCommandOfType<TCommand>()
return externalCommandBus.SentCommands.OfType<TCommand>().ToList();
}

public async Task PublishInternalEvent<TEvent>(TEvent @event, CancellationToken ct = default) where TEvent : notnull
public Task PublishInternalEvent<TEvent>(TEvent @event, CancellationToken ct = default) where TEvent : notnull =>
PublishInternalEvent(
new EventEnvelope<TEvent>(@event, new EventMetadata(Guid.NewGuid().ToString(), 0, 0, null)), ct);

public async Task PublishInternalEvent<TEvent>(EventEnvelope<TEvent> eventEnvelope, CancellationToken ct = default)
where TEvent : notnull
{
using var scope = Server.Host.Services.CreateScope();
var eventBus = scope.ServiceProvider.GetRequiredService<IEventBus>();

//TODO: metadata should be taken by event bus internally
var eventEnvelope = new EventEnvelope<TEvent>(@event, new EventMetadata(Guid.NewGuid().ToString(), 0, 0, null));
await eventBus.Publish(eventEnvelope, ct);
}

Expand Down

0 comments on commit 89c7d1d

Please sign in to comment.