From 65a5efe421020d669304672229bee8f58f7cc495 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Mon, 6 Apr 2026 10:49:09 -0500 Subject: [PATCH] Add Wire Tap feature for endpoint-level message auditing (#2222) Implements the Wire Tap pattern (EIP) allowing users to record copies of messages flowing through configured endpoints for auditing, compliance, analytics, or monitoring. Wire taps are configured per-endpoint via UseWireTap() on listeners, senders, and local queues, with support for keyed DI services to vary implementations across endpoints. Co-Authored-By: Claude Opus 4.6 --- docs/guide/logging.md | 145 +++++++++++++++ .../Configuration/wire_tap_configuration.cs | 175 ++++++++++++++++++ src/Wolverine/Configuration/Endpoint.cs | 36 ++++ .../Configuration/IListenerConfiguration.cs | 17 ++ .../Configuration/ListenerConfiguration.cs | 16 ++ .../Configuration/SubscriberConfiguration.cs | 16 ++ src/Wolverine/Envelope.Internals.cs | 12 +- src/Wolverine/IWireTap.cs | 27 +++ src/Wolverine/Runtime/DestinationEndpoint.cs | 5 +- src/Wolverine/Runtime/Routing/MessageRoute.cs | 6 +- .../Runtime/WolverineRuntime.Tracking.cs | 32 ++++ .../Runtime/WorkerQueues/BufferedReceiver.cs | 4 +- .../Runtime/WorkerQueues/DurableReceiver.cs | 4 +- .../Runtime/WorkerQueues/InlineReceiver.cs | 2 +- 14 files changed, 487 insertions(+), 10 deletions(-) create mode 100644 src/Testing/CoreTests/Configuration/wire_tap_configuration.cs create mode 100644 src/Wolverine/IWireTap.cs diff --git a/docs/guide/logging.md b/docs/guide/logging.md index 4c4718193..b5326d806 100644 --- a/docs/guide/logging.md +++ b/docs/guide/logging.md @@ -265,6 +265,151 @@ This will extend your log entries to like this: [09:41:00 INFO] Starting to process IAccountMessage ("018761ad-8ed2-4bc9-bde5-c3cbb643f9f3") with AccountId: "c446fa0b-7496-42a5-b6c8-dd53c65c96c8" ``` +## Wire Tap + +Wolverine supports the [Wire Tap](https://www.enterpriseintegrationpatterns.com/patterns/messaging/WireTap.html) pattern +from the Enterprise Integration Patterns book. A wire tap lets you record a copy of every message flowing through +configured endpoints for auditing, compliance, analytics, or monitoring purposes — without affecting the primary +message processing pipeline. + +### Defining a Wire Tap + +Implement the `IWireTap` interface: + +```csharp +public class AuditWireTap : IWireTap +{ + private readonly IAuditStore _store; + + public AuditWireTap(IAuditStore store) + { + _store = store; + } + + public async ValueTask RecordSuccessAsync(Envelope envelope) + { + await _store.RecordAsync(new AuditEntry + { + MessageId = envelope.Id, + MessageType = envelope.MessageType, + Destination = envelope.Destination?.ToString(), + Timestamp = DateTimeOffset.UtcNow, + Succeeded = true + }); + } + + public async ValueTask RecordFailureAsync(Envelope envelope, Exception exception) + { + await _store.RecordAsync(new AuditEntry + { + MessageId = envelope.Id, + MessageType = envelope.MessageType, + Destination = envelope.Destination?.ToString(), + Timestamp = DateTimeOffset.UtcNow, + Succeeded = false, + ExceptionType = exception.GetType().Name, + ExceptionMessage = exception.Message + }); + } +} +``` + +::: warning +**Implementations must never allow exceptions to escape.** Wolverine wraps wire tap calls in a safety-net `try/catch`, +but if your wire tap throws, the exception will only be logged — it will *not* retry or affect message processing. +Your implementation should handle all errors internally (e.g., log and swallow) to avoid polluting application logs +with wire tap noise. +::: + +::: tip +For production wire taps that write to a database or external system, consider using `System.Threading.Channels` +(specifically Wolverine's built-in `BatchingChannel`) to batch the recording operations. This keeps the wire tap +mechanics off the hot path of message handling, improving throughput while batching database writes for efficiency. +::: + +### Registering a Wire Tap + +Register your `IWireTap` in the IoC container. **Singleton lifetime is strongly recommended** since wire taps are +resolved once per endpoint at startup: + +```csharp +using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + // Register a singleton wire tap + opts.Services.AddSingleton(); + }).StartAsync(); +``` + +### Enabling Wire Taps on Endpoints + +Wire taps must be explicitly enabled on each endpoint — there is no global "enable everywhere" switch. This is +intentional: you should deliberately choose which endpoints need auditing. + +```csharp +using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.Services.AddSingleton(); + + // Enable on a specific listener + opts.ListenToRabbitQueue("incoming").UseWireTap(); + + // Enable on a specific sender + opts.PublishAllMessages().ToRabbitExchange("outgoing").UseWireTap(); + + // Enable on a specific local queue + opts.LocalQueue("important").UseWireTap(); + + // Enable across all external listeners (excludes local queues) + opts.Policies.AllListeners(x => x.UseWireTap()); + + // Enable across all local queues separately + opts.Policies.AllLocalQueues(x => x.UseWireTap()); + + // Enable across all sender endpoints + opts.Policies.AllSenders(x => x.UseWireTap()); + }).StartAsync(); +``` + +### Using Keyed Wire Taps + +If different endpoints need different wire tap implementations (e.g., one endpoint writes to a compliance database +while another sends to a monitoring service), use keyed services: + +```csharp +using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + // Register multiple wire tap implementations + opts.Services.AddSingleton(); + opts.Services.AddKeyedSingleton("monitoring", new MonitoringWireTap()); + + // Default wire tap (uses the non-keyed registration) + opts.ListenToRabbitQueue("orders").UseWireTap(); + + // Specific wire tap by service key + opts.ListenToRabbitQueue("payments").UseWireTap("monitoring"); + }).StartAsync(); +``` + +### What Gets Recorded + +- **`RecordSuccessAsync`** is called when: + - A message has been successfully handled at a listening endpoint + - A message has been successfully sent from a sending endpoint +- **`RecordFailureAsync`** is called when: + - Message handling fails at a listening endpoint after exhausting all error handling policies (moved to dead letter queue) + +### Auditing and Compliance Considerations + +For systems with regulatory auditing requirements (SOC 2, HIPAA, PCI-DSS, GDPR): + +- Wire taps provide a natural integration point for recording message flow for audit trails +- Combine with Wolverine's [contextual logging and audited members](#contextual-logging-with-audited-members) to include business identifiers in your audit records +- The `Envelope` passed to wire tap methods includes correlation IDs, tenant IDs, and message metadata useful for compliance reporting +- Consider separate wire tap implementations per compliance domain using keyed services + ## Open Telemetry Wolverine also supports the [Open Telemetry](https://opentelemetry.io/docs/instrumentation/net/) standard for distributed tracing. To enable diff --git a/src/Testing/CoreTests/Configuration/wire_tap_configuration.cs b/src/Testing/CoreTests/Configuration/wire_tap_configuration.cs new file mode 100644 index 000000000..f998f5341 --- /dev/null +++ b/src/Testing/CoreTests/Configuration/wire_tap_configuration.cs @@ -0,0 +1,175 @@ +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Wolverine.Configuration; +using Wolverine.Tracking; +using Xunit; + +namespace CoreTests.Configuration; + +public class wire_tap_configuration : IAsyncLifetime +{ + private IHost _host = default!; + private readonly RecordingWireTap _wireTap = new(); + + public async Task InitializeAsync() + { + _host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.Discovery.DisableConventionalDiscovery() + .IncludeType(); + + opts.Services.AddSingleton(_wireTap); + + opts.Policies.AllLocalQueues(x => x.UseWireTap()); + }).StartAsync(); + } + + public async Task DisposeAsync() + { + await _host.StopAsync(); + _host.Dispose(); + } + + [Fact] + public async Task wire_tap_records_success_on_message_handled() + { + await _host.SendMessageAndWaitAsync(new WireTapMessage("hello")); + + // Give async fire-and-forget wire tap a moment + await Task.Delay(500); + + _wireTap.Successes.ShouldContain(e => e.Message is WireTapMessage); + } + + [Fact] + public async Task wire_tap_not_called_without_configuration() + { + // Build a second host without wire tap configured + var tap = new RecordingWireTap(); + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.Discovery.DisableConventionalDiscovery() + .IncludeType(); + + opts.Services.AddSingleton(tap); + // Notably: no UseWireTap() call + }).StartAsync(); + + await host.SendMessageAndWaitAsync(new WireTapMessage("no-tap")); + await Task.Delay(500); + + tap.Successes.ShouldBeEmpty(); + } + + [Fact] + public void wire_tap_not_set_on_endpoint_without_configuration() + { + var endpoint = new TestEndpoint(EndpointRole.Application); + endpoint.WireTap.ShouldBeNull(); + } + + [Fact] + public void use_wire_tap_sets_flag() + { + var endpoint = new TestEndpoint(EndpointRole.Application); + endpoint.UseWireTap = true; + endpoint.UseWireTap.ShouldBeTrue(); + } +} + +public class keyed_wire_tap_configuration : IAsyncLifetime +{ + private IHost _host = default!; + private readonly RecordingWireTap _defaultTap = new(); + private readonly RecordingWireTap _specialTap = new(); + + public async Task InitializeAsync() + { + _host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.Discovery.DisableConventionalDiscovery() + .IncludeType(); + + opts.Services.AddSingleton(_defaultTap); + opts.Services.AddKeyedSingleton("special", _specialTap); + + opts.PublishMessage().ToLocalQueue("default-tapped"); + opts.LocalQueue("default-tapped").UseWireTap(); + + opts.PublishMessage().ToLocalQueue("special-tapped"); + opts.LocalQueue("special-tapped").UseWireTap("special"); + }).StartAsync(); + } + + public async Task DisposeAsync() + { + await _host.StopAsync(); + _host.Dispose(); + } + + [Fact] + public async Task default_wire_tap_is_used_without_key() + { + await _host.SendMessageAndWaitAsync(new WireTapMessage("default")); + + await Task.Delay(500); + + _defaultTap.Successes.ShouldContain(e => e.Message is WireTapMessage); + } + + [Fact] + public async Task keyed_wire_tap_is_used_with_service_key() + { + await _host.SendMessageAndWaitAsync(new KeyedWireTapMessage("special")); + + await Task.Delay(500); + + _specialTap.Successes.ShouldContain(e => e.Message is KeyedWireTapMessage); + _defaultTap.Successes.ShouldNotContain(e => e.Message is KeyedWireTapMessage); + } +} + +public record WireTapMessage(string Text); + +public record WireTapFailingMessage; + +public record KeyedWireTapMessage(string Text); + +public class WireTapMessageHandler +{ + public static void Handle(WireTapMessage message) + { + // No-op + } + + public static void Handle(KeyedWireTapMessage message) + { + // No-op + } + + public static void Handle(WireTapFailingMessage message) + { + throw new InvalidOperationException("Intentional failure for wire tap testing"); + } +} + +public class RecordingWireTap : IWireTap +{ + public List Successes { get; } = new(); + public List<(Envelope envelope, Exception exception)> Failures { get; } = new(); + + public ValueTask RecordSuccessAsync(Envelope envelope) + { + Successes.Add(envelope); + return ValueTask.CompletedTask; + } + + public ValueTask RecordFailureAsync(Envelope envelope, Exception exception) + { + Failures.Add((envelope, exception)); + return ValueTask.CompletedTask; + } +} diff --git a/src/Wolverine/Configuration/Endpoint.cs b/src/Wolverine/Configuration/Endpoint.cs index 99530a7a3..1bb89773f 100644 --- a/src/Wolverine/Configuration/Endpoint.cs +++ b/src/Wolverine/Configuration/Endpoint.cs @@ -7,6 +7,7 @@ using JasperFx.Core; using JasperFx.Core.Reflection; using JasperFx.Descriptors; +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Wolverine.ErrorHandling; using Wolverine.Runtime; @@ -283,6 +284,25 @@ public EndpointMode Mode internal IWolverineRuntime? Runtime { get; set; } + /// + /// When true, this endpoint will resolve an from the IoC container + /// to record message success/failure for auditing purposes. + /// + internal bool UseWireTap { get; set; } + + /// + /// Optional keyed service key for resolving a specific + /// implementation from the IoC container. When null, the default (non-keyed) + /// IWireTap registration is used. + /// + internal string? WireTapServiceKey { get; set; } + + /// + /// The resolved wire tap instance, populated during . + /// + [IgnoreDescription] + internal IWireTap? WireTap { get; set; } + /// /// Get or override the default message serializer for just this endpoint /// @@ -444,9 +464,25 @@ public void Compile(IWolverineRuntime runtime) DefaultSerializer ??= runtime.Options.DefaultSerializer; + if (UseWireTap) + { + WireTap = ResolveWireTap(runtime); + } + _hasCompiled = true; } + private IWireTap? ResolveWireTap(IWolverineRuntime runtime) + { + var services = runtime.Services; + if (WireTapServiceKey != null) + { + return services.GetKeyedService(WireTapServiceKey); + } + + return services.GetService(); + } + internal bool ShouldSendMessage(Type messageType) { return Subscriptions.Any(x => x.Matches(messageType)); diff --git a/src/Wolverine/Configuration/IListenerConfiguration.cs b/src/Wolverine/Configuration/IListenerConfiguration.cs index db7a18b13..96662cac8 100644 --- a/src/Wolverine/Configuration/IListenerConfiguration.cs +++ b/src/Wolverine/Configuration/IListenerConfiguration.cs @@ -42,6 +42,23 @@ public interface IEndpointConfiguration /// /// T TelemetryEnabled(bool isEnabled); + + /// + /// Enable wire tap auditing on this endpoint using the default + /// registered in the IoC container. The wire tap will record message success and failure + /// for auditing, compliance, or monitoring purposes. + /// + /// + T UseWireTap(); + + /// + /// Enable wire tap auditing on this endpoint using a keyed + /// service from the IoC container. Use this to vary wire tap implementations + /// across different endpoints. + /// + /// The keyed service identifier used to resolve the IWireTap + /// + T UseWireTap(string serviceKey); } public interface IListenerConfiguration : IEndpointConfiguration diff --git a/src/Wolverine/Configuration/ListenerConfiguration.cs b/src/Wolverine/Configuration/ListenerConfiguration.cs index dedb7e315..9f1b8185e 100644 --- a/src/Wolverine/Configuration/ListenerConfiguration.cs +++ b/src/Wolverine/Configuration/ListenerConfiguration.cs @@ -413,4 +413,20 @@ public TSelf DefaultIncomingMessage(Type messageType) add(e => e.MessageType = messageType); return this.As(); } + + public TSelf UseWireTap() + { + add(e => e.UseWireTap = true); + return this.As(); + } + + public TSelf UseWireTap(string serviceKey) + { + add(e => + { + e.UseWireTap = true; + e.WireTapServiceKey = serviceKey; + }); + return this.As(); + } } diff --git a/src/Wolverine/Configuration/SubscriberConfiguration.cs b/src/Wolverine/Configuration/SubscriberConfiguration.cs index 288ea82f6..0a82da5ab 100644 --- a/src/Wolverine/Configuration/SubscriberConfiguration.cs +++ b/src/Wolverine/Configuration/SubscriberConfiguration.cs @@ -228,6 +228,22 @@ public T ConfigureSending(Action configure) }); return this.As(); } + + public T UseWireTap() + { + add(e => e.UseWireTap = true); + return this.As(); + } + + public T UseWireTap(string serviceKey) + { + add(e => + { + e.UseWireTap = true; + e.WireTapServiceKey = serviceKey; + }); + return this.As(); + } } internal class SubscriberConfiguration : SubscriberConfiguration, diff --git a/src/Wolverine/Envelope.Internals.cs b/src/Wolverine/Envelope.Internals.cs index 099e090ef..d8dabe338 100644 --- a/src/Wolverine/Envelope.Internals.cs +++ b/src/Wolverine/Envelope.Internals.cs @@ -121,6 +121,14 @@ internal Envelope(object message, IMessageSerializer writer) [JsonIgnore] internal bool HasBeenAcked { get; set; } + /// + /// The active wire tap for this envelope, set from the endpoint configuration + /// during message receive or send. Used by the message tracker to record + /// success/failure without coupling the tracking infrastructure to endpoint config. + /// + [JsonIgnore] + internal IWireTap? WireTap { get; set; } + internal void StartTiming() { _startTimestamp = Stopwatch.GetTimestamp(); @@ -174,9 +182,11 @@ public void SetMetricsTag(string tagName, object value) _metricHeaders.Add(new KeyValuePair(tagName, value)); } - internal void MarkReceived(IListener listener, DateTimeOffset now, DurabilitySettings settings) + internal void MarkReceived(IListener listener, DateTimeOffset now, DurabilitySettings settings, + IWireTap? wireTap = null) { Listener = listener; + WireTap = wireTap; // If this is a stream with multiple consumers, use the consumer-specific address if (listener is ISupportMultipleConsumers multiConsumerListener) diff --git a/src/Wolverine/IWireTap.cs b/src/Wolverine/IWireTap.cs new file mode 100644 index 000000000..ef3033bfb --- /dev/null +++ b/src/Wolverine/IWireTap.cs @@ -0,0 +1,27 @@ +namespace Wolverine; + +/// +/// Implement this interface to create a "wire tap" that records a copy of every message +/// flowing through configured endpoints. Useful for auditing, compliance, analytics, +/// or feeding monitoring systems. Register implementations in the IoC container, +/// preferably as Singleton lifetime. Use keyed services to vary implementations +/// by endpoint. +/// +/// IMPORTANT: Implementations must never allow exceptions to escape. Wolverine wraps +/// calls in try/catch as a safety net, but implementations should handle their own +/// errors internally. +/// +public interface IWireTap +{ + /// + /// Called when a message has been successfully handled at a listening endpoint, + /// or when a message has been successfully sent from a sending endpoint. + /// + ValueTask RecordSuccessAsync(Envelope envelope); + + /// + /// Called when message handling fails at a listening endpoint after exhausting + /// all error handling policies. + /// + ValueTask RecordFailureAsync(Envelope envelope, Exception exception); +} diff --git a/src/Wolverine/Runtime/DestinationEndpoint.cs b/src/Wolverine/Runtime/DestinationEndpoint.cs index 9fd203327..3da8c3626 100644 --- a/src/Wolverine/Runtime/DestinationEndpoint.cs +++ b/src/Wolverine/Runtime/DestinationEndpoint.cs @@ -35,7 +35,7 @@ public ValueTask SendAsync(T message, DeliveryOptions? options = null) } var route = _endpoint.RouteFor(message.GetType(), _parent.Runtime); - var envelope = new Envelope(message, _endpoint.Agent!); + var envelope = new Envelope(message, _endpoint.Agent!) { WireTap = _endpoint.WireTap }; if (options != null && options.ContentType.IsNotEmpty() && options.ContentType != envelope.ContentType) { envelope.Serializer = _parent.Runtime.Options.FindSerializer(options.ContentType); @@ -74,7 +74,8 @@ public ValueTask SendRawMessageAsync(byte[] data, Type? messageType = null, Acti var envelope = new Envelope { Data = data, - Sender = _parent.Runtime.Endpoints.GetOrBuildSendingAgent(_endpoint.Uri) + Sender = _parent.Runtime.Endpoints.GetOrBuildSendingAgent(_endpoint.Uri), + WireTap = _endpoint.WireTap }; if (messageType != null) diff --git a/src/Wolverine/Runtime/Routing/MessageRoute.cs b/src/Wolverine/Runtime/Routing/MessageRoute.cs index e2e82881e..62e688f81 100644 --- a/src/Wolverine/Runtime/Routing/MessageRoute.cs +++ b/src/Wolverine/Runtime/Routing/MessageRoute.cs @@ -88,7 +88,8 @@ public Envelope CreateForSending(object message, DeliveryOptions? options, ISend { Serializer = Serializer, ContentType = Serializer?.ContentType, - TopicName = topicName + TopicName = topicName, + WireTap = _endpoint.WireTap }; if (Sender.Endpoint is LocalQueue) @@ -179,7 +180,8 @@ internal async Task RemoteInvokeAsync(object message, MessageBus bus, Canc var envelope = new Envelope(message, Sender) { TenantId = options?.TenantId ?? bus.TenantId, - TopicName = topicName + TopicName = topicName, + WireTap = _endpoint.WireTap }; options?.Override(envelope); diff --git a/src/Wolverine/Runtime/WolverineRuntime.Tracking.cs b/src/Wolverine/Runtime/WolverineRuntime.Tracking.cs index b3f1eed88..44c2fb731 100644 --- a/src/Wolverine/Runtime/WolverineRuntime.Tracking.cs +++ b/src/Wolverine/Runtime/WolverineRuntime.Tracking.cs @@ -77,6 +77,8 @@ public void Sent(Envelope envelope) _sent(Logger, envelope.CorrelationId!, envelope.GetMessageTypeName(), envelope.Id, envelope.Destination?.ToString() ?? string.Empty, null); + + fireWireTapSuccess(envelope); } public void Received(Envelope envelope) @@ -148,6 +150,8 @@ public void MessageSucceeded(Envelope envelope) } ActiveSession?.Record(MessageEventType.MessageSucceeded, envelope, _serviceName, _uniqueNodeId); + + fireWireTapSuccess(envelope); } public void MessageFailed(Envelope envelope, Exception ex) @@ -167,6 +171,8 @@ public void MessageFailed(Envelope envelope, Exception ex) } ActiveSession?.Record(MessageEventType.Sent, envelope, _serviceName, _uniqueNodeId, ex); + + fireWireTapFailure(envelope, ex); } public void NoHandlerFor(Envelope envelope) @@ -222,6 +228,32 @@ public void LogStatus(string message) ActiveSession?.LogStatus(message); } + private void fireWireTapSuccess(Envelope envelope) + { + if (envelope.WireTap == null) return; + try + { + _ = envelope.WireTap.RecordSuccessAsync(envelope); + } + catch (Exception ex) + { + Logger.LogError(ex, "Wire tap failed for envelope {EnvelopeId}", envelope.Id); + } + } + + private void fireWireTapFailure(Envelope envelope, Exception exception) + { + if (envelope.WireTap == null) return; + try + { + _ = envelope.WireTap.RecordFailureAsync(envelope, exception); + } + catch (Exception ex) + { + Logger.LogError(ex, "Wire tap failed for envelope {EnvelopeId}", envelope.Id); + } + } + /// /// Returns true if the destination URI belongs to a Wolverine system endpoint /// (e.g. wolverine.response reply queues or local:// queues) that should not diff --git a/src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs b/src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs index 27dc6ac51..63d8898df 100644 --- a/src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs +++ b/src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs @@ -191,7 +191,7 @@ async ValueTask IReceiver.ReceivedAsync(IListener listener, Envelope[] messages) foreach (var envelope in messages) { - envelope.MarkReceived(listener, now, _settings); + envelope.MarkReceived(listener, now, _settings, _endpoint.WireTap); if (!envelope.IsExpired()) { await EnqueueAsync(envelope); @@ -206,7 +206,7 @@ async ValueTask IReceiver.ReceivedAsync(IListener listener, Envelope[] messages) public async ValueTask ReceivedAsync(IListener listener, Envelope envelope) { var now = DateTimeOffset.Now; - envelope.MarkReceived(listener, now, _settings); + envelope.MarkReceived(listener, now, _settings, _endpoint.WireTap); if (envelope.IsExpired()) { diff --git a/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs b/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs index 7ac1777bd..8e89d0b93 100644 --- a/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs +++ b/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs @@ -295,7 +295,7 @@ public async ValueTask ReceivedAsync(IListener listener, Envelope envelope) try { var now = DateTimeOffset.UtcNow; - envelope.MarkReceived(listener, now, _settings); + envelope.MarkReceived(listener, now, _settings, _endpoint.WireTap); await _receivingOne.PostAsync(envelope); } @@ -548,7 +548,7 @@ public async ValueTask ProcessReceivedMessagesAsync(DateTimeOffset now, IListene throw new OperationCanceledException(); } - foreach (var envelope in envelopes) envelope.MarkReceived(listener, now, _settings); + foreach (var envelope in envelopes) envelope.MarkReceived(listener, now, _settings, _endpoint.WireTap); var batchSucceeded = false; if (ShouldPersistBeforeProcessing) diff --git a/src/Wolverine/Runtime/WorkerQueues/InlineReceiver.cs b/src/Wolverine/Runtime/WorkerQueues/InlineReceiver.cs index a2381458a..3862fdc6e 100644 --- a/src/Wolverine/Runtime/WorkerQueues/InlineReceiver.cs +++ b/src/Wolverine/Runtime/WorkerQueues/InlineReceiver.cs @@ -117,7 +117,7 @@ private async ValueTask ProcessMessageAsync(IListener listener, Envelope envelop try { - envelope.MarkReceived(listener, DateTimeOffset.UtcNow, _settings); + envelope.MarkReceived(listener, DateTimeOffset.UtcNow, _settings, _endpoint.WireTap); await _pipeline.InvokeAsync(envelope, listener, activity!); _logger.IncomingReceived(envelope, listener.Address);