Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 145 additions & 0 deletions docs/guide/logging.md
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,151 @@ This will extend your log entries to like this:
[09:41:00 INFO] Starting to process IAccountMessage ("018761ad-8ed2-4bc9-bde5-c3cbb643f9f3") with AccountId: "c446fa0b-7496-42a5-b6c8-dd53c65c96c8"
```

## Wire Tap <Badge type="tip" text="5.13" />

Wolverine supports the [Wire Tap](https://www.enterpriseintegrationpatterns.com/patterns/messaging/WireTap.html) pattern
from the Enterprise Integration Patterns book. A wire tap lets you record a copy of every message flowing through
configured endpoints for auditing, compliance, analytics, or monitoring purposes — without affecting the primary
message processing pipeline.

### Defining a Wire Tap

Implement the `IWireTap` interface:

```csharp
public class AuditWireTap : IWireTap
{
private readonly IAuditStore _store;

public AuditWireTap(IAuditStore store)
{
_store = store;
}

public async ValueTask RecordSuccessAsync(Envelope envelope)
{
await _store.RecordAsync(new AuditEntry
{
MessageId = envelope.Id,
MessageType = envelope.MessageType,
Destination = envelope.Destination?.ToString(),
Timestamp = DateTimeOffset.UtcNow,
Succeeded = true
});
}

public async ValueTask RecordFailureAsync(Envelope envelope, Exception exception)
{
await _store.RecordAsync(new AuditEntry
{
MessageId = envelope.Id,
MessageType = envelope.MessageType,
Destination = envelope.Destination?.ToString(),
Timestamp = DateTimeOffset.UtcNow,
Succeeded = false,
ExceptionType = exception.GetType().Name,
ExceptionMessage = exception.Message
});
}
}
```

::: warning
**Implementations must never allow exceptions to escape.** Wolverine wraps wire tap calls in a safety-net `try/catch`,
but if your wire tap throws, the exception will only be logged — it will *not* retry or affect message processing.
Your implementation should handle all errors internally (e.g., log and swallow) to avoid polluting application logs
with wire tap noise.
:::

::: tip
For production wire taps that write to a database or external system, consider using `System.Threading.Channels`
(specifically Wolverine's built-in `BatchingChannel`) to batch the recording operations. This keeps the wire tap
mechanics off the hot path of message handling, improving throughput while batching database writes for efficiency.
:::

### Registering a Wire Tap

Register your `IWireTap` in the IoC container. **Singleton lifetime is strongly recommended** since wire taps are
resolved once per endpoint at startup:

```csharp
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
// Register a singleton wire tap
opts.Services.AddSingleton<IWireTap, AuditWireTap>();
}).StartAsync();
```

### Enabling Wire Taps on Endpoints

Wire taps must be explicitly enabled on each endpoint — there is no global "enable everywhere" switch. This is
intentional: you should deliberately choose which endpoints need auditing.

```csharp
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.Services.AddSingleton<IWireTap, AuditWireTap>();

// Enable on a specific listener
opts.ListenToRabbitQueue("incoming").UseWireTap();

// Enable on a specific sender
opts.PublishAllMessages().ToRabbitExchange("outgoing").UseWireTap();

// Enable on a specific local queue
opts.LocalQueue("important").UseWireTap();

// Enable across all external listeners (excludes local queues)
opts.Policies.AllListeners(x => x.UseWireTap());

// Enable across all local queues separately
opts.Policies.AllLocalQueues(x => x.UseWireTap());

// Enable across all sender endpoints
opts.Policies.AllSenders(x => x.UseWireTap());
}).StartAsync();
```

### Using Keyed Wire Taps

If different endpoints need different wire tap implementations (e.g., one endpoint writes to a compliance database
while another sends to a monitoring service), use keyed services:

```csharp
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
// Register multiple wire tap implementations
opts.Services.AddSingleton<IWireTap, ComplianceWireTap>();
opts.Services.AddKeyedSingleton<IWireTap>("monitoring", new MonitoringWireTap());

// Default wire tap (uses the non-keyed registration)
opts.ListenToRabbitQueue("orders").UseWireTap();

// Specific wire tap by service key
opts.ListenToRabbitQueue("payments").UseWireTap("monitoring");
}).StartAsync();
```

### What Gets Recorded

- **`RecordSuccessAsync`** is called when:
- A message has been successfully handled at a listening endpoint
- A message has been successfully sent from a sending endpoint
- **`RecordFailureAsync`** is called when:
- Message handling fails at a listening endpoint after exhausting all error handling policies (moved to dead letter queue)

### Auditing and Compliance Considerations

For systems with regulatory auditing requirements (SOC 2, HIPAA, PCI-DSS, GDPR):

- Wire taps provide a natural integration point for recording message flow for audit trails
- Combine with Wolverine's [contextual logging and audited members](#contextual-logging-with-audited-members) to include business identifiers in your audit records
- The `Envelope` passed to wire tap methods includes correlation IDs, tenant IDs, and message metadata useful for compliance reporting
- Consider separate wire tap implementations per compliance domain using keyed services

## Open Telemetry

Wolverine also supports the [Open Telemetry](https://opentelemetry.io/docs/instrumentation/net/) standard for distributed tracing. To enable
Expand Down
175 changes: 175 additions & 0 deletions src/Testing/CoreTests/Configuration/wire_tap_configuration.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Wolverine.Configuration;
using Wolverine.Tracking;
using Xunit;

namespace CoreTests.Configuration;

public class wire_tap_configuration : IAsyncLifetime
{
private IHost _host = default!;
private readonly RecordingWireTap _wireTap = new();

public async Task InitializeAsync()
{
_host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.Discovery.DisableConventionalDiscovery()
.IncludeType<WireTapMessageHandler>();

opts.Services.AddSingleton<IWireTap>(_wireTap);

opts.Policies.AllLocalQueues(x => x.UseWireTap());
}).StartAsync();
}

public async Task DisposeAsync()
{
await _host.StopAsync();
_host.Dispose();
}

[Fact]
public async Task wire_tap_records_success_on_message_handled()
{
await _host.SendMessageAndWaitAsync(new WireTapMessage("hello"));

// Give async fire-and-forget wire tap a moment
await Task.Delay(500);

_wireTap.Successes.ShouldContain(e => e.Message is WireTapMessage);
}

[Fact]
public async Task wire_tap_not_called_without_configuration()
{
// Build a second host without wire tap configured
var tap = new RecordingWireTap();
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.Discovery.DisableConventionalDiscovery()
.IncludeType<WireTapMessageHandler>();

opts.Services.AddSingleton<IWireTap>(tap);
// Notably: no UseWireTap() call
}).StartAsync();

await host.SendMessageAndWaitAsync(new WireTapMessage("no-tap"));
await Task.Delay(500);

tap.Successes.ShouldBeEmpty();
}

[Fact]
public void wire_tap_not_set_on_endpoint_without_configuration()
{
var endpoint = new TestEndpoint(EndpointRole.Application);
endpoint.WireTap.ShouldBeNull();
}

[Fact]
public void use_wire_tap_sets_flag()
{
var endpoint = new TestEndpoint(EndpointRole.Application);
endpoint.UseWireTap = true;
endpoint.UseWireTap.ShouldBeTrue();
}
}

public class keyed_wire_tap_configuration : IAsyncLifetime
{
private IHost _host = default!;
private readonly RecordingWireTap _defaultTap = new();
private readonly RecordingWireTap _specialTap = new();

public async Task InitializeAsync()
{
_host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.Discovery.DisableConventionalDiscovery()
.IncludeType<WireTapMessageHandler>();

opts.Services.AddSingleton<IWireTap>(_defaultTap);
opts.Services.AddKeyedSingleton<IWireTap>("special", _specialTap);

opts.PublishMessage<WireTapMessage>().ToLocalQueue("default-tapped");
opts.LocalQueue("default-tapped").UseWireTap();

opts.PublishMessage<KeyedWireTapMessage>().ToLocalQueue("special-tapped");
opts.LocalQueue("special-tapped").UseWireTap("special");
}).StartAsync();
}

public async Task DisposeAsync()
{
await _host.StopAsync();
_host.Dispose();
}

[Fact]
public async Task default_wire_tap_is_used_without_key()
{
await _host.SendMessageAndWaitAsync(new WireTapMessage("default"));

await Task.Delay(500);

_defaultTap.Successes.ShouldContain(e => e.Message is WireTapMessage);
}

[Fact]
public async Task keyed_wire_tap_is_used_with_service_key()
{
await _host.SendMessageAndWaitAsync(new KeyedWireTapMessage("special"));

await Task.Delay(500);

_specialTap.Successes.ShouldContain(e => e.Message is KeyedWireTapMessage);
_defaultTap.Successes.ShouldNotContain(e => e.Message is KeyedWireTapMessage);
}
}

public record WireTapMessage(string Text);

public record WireTapFailingMessage;

public record KeyedWireTapMessage(string Text);

public class WireTapMessageHandler
{
public static void Handle(WireTapMessage message)
{
// No-op
}

public static void Handle(KeyedWireTapMessage message)
{
// No-op
}

public static void Handle(WireTapFailingMessage message)
{
throw new InvalidOperationException("Intentional failure for wire tap testing");
}
}

public class RecordingWireTap : IWireTap
{
public List<Envelope> Successes { get; } = new();
public List<(Envelope envelope, Exception exception)> Failures { get; } = new();

public ValueTask RecordSuccessAsync(Envelope envelope)
{
Successes.Add(envelope);
return ValueTask.CompletedTask;
}

public ValueTask RecordFailureAsync(Envelope envelope, Exception exception)
{
Failures.Add((envelope, exception));
return ValueTask.CompletedTask;
}
}
36 changes: 36 additions & 0 deletions src/Wolverine/Configuration/Endpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using JasperFx.Core;
using JasperFx.Core.Reflection;
using JasperFx.Descriptors;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Wolverine.ErrorHandling;
using Wolverine.Runtime;
Expand Down Expand Up @@ -283,6 +284,25 @@ public EndpointMode Mode

internal IWolverineRuntime? Runtime { get; set; }

/// <summary>
/// When true, this endpoint will resolve an <see cref="IWireTap"/> from the IoC container
/// to record message success/failure for auditing purposes.
/// </summary>
internal bool UseWireTap { get; set; }

/// <summary>
/// Optional keyed service key for resolving a specific <see cref="IWireTap"/>
/// implementation from the IoC container. When null, the default (non-keyed)
/// IWireTap registration is used.
/// </summary>
internal string? WireTapServiceKey { get; set; }

/// <summary>
/// The resolved wire tap instance, populated during <see cref="Compile"/>.
/// </summary>
[IgnoreDescription]
internal IWireTap? WireTap { get; set; }

/// <summary>
/// Get or override the default message serializer for just this endpoint
/// </summary>
Expand Down Expand Up @@ -444,9 +464,25 @@ public void Compile(IWolverineRuntime runtime)

DefaultSerializer ??= runtime.Options.DefaultSerializer;

if (UseWireTap)
{
WireTap = ResolveWireTap(runtime);
}

_hasCompiled = true;
}

private IWireTap? ResolveWireTap(IWolverineRuntime runtime)
{
var services = runtime.Services;
if (WireTapServiceKey != null)
{
return services.GetKeyedService<IWireTap>(WireTapServiceKey);
}

return services.GetService<IWireTap>();
}

internal bool ShouldSendMessage(Type messageType)
{
return Subscriptions.Any(x => x.Matches(messageType));
Expand Down
Loading
Loading