-
-
Notifications
You must be signed in to change notification settings - Fork 522
/
Copy pathMartenEventPublisher.cs
90 lines (80 loc) · 3.12 KB
/
MartenEventPublisher.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
using Core.Events;
using Core.OpenTelemetry;
using Marten;
using Marten.Events.Daemon;
using Marten.Events.Daemon.Internals;
using Marten.Subscriptions;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
namespace Core.Marten.Subscriptions;
public class MartenEventPublisher(
IServiceProvider serviceProvider,
IActivityScope activityScope,
ILogger<MartenEventPublisher> logger
): SubscriptionBase
{
public override async Task<IChangeListener> ProcessEventsAsync(
EventRange eventRange,
ISubscriptionController subscriptionController,
IDocumentOperations operations,
CancellationToken token
)
{
var lastProcessed = eventRange.SequenceFloor;
try
{
foreach (var @event in eventRange.Events)
{
var parentContext =
TelemetryPropagator.Extract(@event.Headers, ExtractTraceContextFromEventMetadata);
await activityScope.Run($"{nameof(MartenEventPublisher)}/{nameof(ProcessEventsAsync)}",
async (_, ct) =>
{
using var scope = serviceProvider.CreateScope();
var eventBus = scope.ServiceProvider.GetRequiredService<IEventBus>();
var eventMetadata = new EventMetadata(
@event.Id.ToString(),
(ulong)@event.Version,
(ulong)@event.Sequence,
parentContext
);
await eventBus.Publish(EventEnvelope.From(@event.Data, eventMetadata), ct)
.ConfigureAwait(false);
// TODO: you can also differentiate based on the exception
// await controller.RecordDeadLetterEventAsync(e, ex);
},
new StartActivityOptions
{
Tags = { { TelemetryTags.EventHandling.Event, @event.Data.GetType() } },
Parent = parentContext.ActivityContext
},
token
).ConfigureAwait(false);
}
return NullChangeListener.Instance;
}
catch (Exception exc)
{
logger.LogError("Error while processing Marten Subscription: {ExceptionMessage}", exc.Message);
await subscriptionController.ReportCriticalFailureAsync(exc, lastProcessed).ConfigureAwait(false);
throw;
}
}
private IEnumerable<string> ExtractTraceContextFromEventMetadata(Dictionary<string, object>? headers, string key)
{
try
{
if (headers!.TryGetValue(key, out var value) != true)
return [];
var stringValue = value.ToString();
return stringValue != null
? new[] { stringValue }
: Enumerable.Empty<string>();
}
catch (Exception ex)
{
logger.LogError("Failed to extract trace context: {ex}", ex);
return [];
}
}
}