diff --git a/src/Testing/BackPressureTests/XUnitObserver.cs b/src/Testing/BackPressureTests/XUnitObserver.cs index d6df817aa..f99bf23a1 100644 --- a/src/Testing/BackPressureTests/XUnitObserver.cs +++ b/src/Testing/BackPressureTests/XUnitObserver.cs @@ -1,4 +1,5 @@ using Wolverine.Configuration; +using Wolverine.ErrorHandling; using Wolverine.Logging; using Wolverine.Runtime.Agents; using Wolverine.Runtime.Metrics; @@ -99,4 +100,16 @@ public void MessageHandlingMetricsExported(MessageHandlingMetrics metrics) { // Nothing here... } + + public Task CircuitBreakerTripped(Endpoint endpoint, CircuitBreakerOptions options) + { + Output.WriteLine($"Circuit breaker tripped on {endpoint.Uri}"); + return Task.CompletedTask; + } + + public Task CircuitBreakerReset(Endpoint endpoint) + { + Output.WriteLine($"Circuit breaker reset on {endpoint.Uri}"); + return Task.CompletedTask; + } } \ No newline at end of file diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/dead_letter_queue_recovery_listener.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/dead_letter_queue_recovery_listener.cs new file mode 100644 index 000000000..2f41d08dc --- /dev/null +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/dead_letter_queue_recovery_listener.cs @@ -0,0 +1,234 @@ +using IntegrationTests; +using JasperFx.Core; +using JasperFx.Resources; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Wolverine.SqlServer; +using Wolverine.Persistence.Durability; +using Wolverine.Persistence.Durability.DeadLetterManagement; +using Wolverine.RabbitMQ.Internal; +using Wolverine.Runtime; +using Wolverine.Tracking; +using Xunit; + +namespace Wolverine.RabbitMQ.Tests; + +/// +/// Tests for the DeadLetterQueueListener that recovers messages from RabbitMQ's +/// native dead letter queue into Wolverine's persistent dead letter storage. +/// +/// This tests the EnableDeadLetterQueueRecovery() feature end-to-end: +/// 1. A message that always throws is published +/// 2. Wolverine NACKs it to RabbitMQ's native DLX (default behavior) +/// 3. The DeadLetterQueueListener picks it up from the DLQ +/// 4. The listener writes it to the PostgreSQL wolverine_dead_letters table +/// 5. The test queries the database and verifies the dead letter was recovered +/// +public class dead_letter_queue_recovery_listener : IAsyncLifetime +{ + private readonly string _queueName = $"dlq-recovery-{Guid.NewGuid():N}"; + private IHost _host = null!; + + public async Task InitializeAsync() + { + _host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.ServiceName = "DlqRecoveryTest"; + opts.Durability.Mode = DurabilityMode.Solo; + opts.EnableAutomaticFailureAcks = false; + + // Use SQL Server for message persistence + opts.PersistMessagesWithSqlServer(Servers.SqlServerConnectionString, "dlq_recovery"); + + // Use RabbitMQ with NATIVE dead letter queueing (the default) + // PLUS enable recovery listener to bridge DLQ → database + opts.UseRabbitMq() + .AutoProvision() + .AutoPurgeOnStartup() + .EnableDeadLetterQueueRecovery(); + + opts.PublishMessage().ToRabbitQueue(_queueName); + opts.ListenToRabbitQueue(_queueName); + + opts.LocalRoutingConventionDisabled = true; + opts.Services.AddResourceSetupOnStartup(StartupAction.ResetState); + }).StartAsync(); + + await _host.ResetResourceState(); + } + + public async Task DisposeAsync() + { + _host?.TeardownResources(); + _host?.Dispose(); + } + + [Fact] + public async Task recovers_native_dlq_message_to_database() + { + // Publish a message that will always fail — the handler throws DivideByZeroException. + // With native DLQ mode, Wolverine NACKs it and RabbitMQ routes it to the DLX. + // The DeadLetterQueueListener should pick it up and write to the database. + await _host + .TrackActivity() + .DoNotAssertOnExceptionsDetected() + .Timeout(30.Seconds()) + .PublishMessageAndWaitAsync(new RecoveryTestMessage("test-recovery")); + + // Give the recovery listener time to pick up the message from RabbitMQ DLQ + // and write it to the database + var messageStore = _host.Services.GetRequiredService(); + var query = new DeadLetterEnvelopeQuery { PageSize = 100 }; + + DeadLetterEnvelopeResults? results = null; + var deadline = DateTimeOffset.UtcNow.Add(30.Seconds()); + + while (DateTimeOffset.UtcNow < deadline) + { + results = await messageStore.DeadLetters.QueryAsync(query, CancellationToken.None); + if (results.Envelopes.Any()) break; + await Task.Delay(500); + } + + results.ShouldNotBeNull(); + results.Envelopes.ShouldNotBeEmpty( + "The DeadLetterQueueListener should have recovered the failed message from " + + "RabbitMQ's native dead letter queue into the database"); + + // Verify the recovered dead letter has meaningful metadata + var envelope = results.Envelopes.First(); + envelope.MessageType.ShouldNotBeNullOrEmpty(); + } + + [Fact] + [Trait("Category", "Flaky")] + public async Task recovers_multiple_messages() + { + // Send messages that will all fail — use the bus directly + var bus = _host.Services.GetRequiredService(); + for (int i = 0; i < 3; i++) + { + await bus.PublishAsync(new RecoveryTestMessage($"multi-{i}")); + } + + // Wait for recovery: messages fail → NACK to RabbitMQ DLQ → listener picks up → writes to DB + var messageStore = _host.Services.GetRequiredService(); + var query = new DeadLetterEnvelopeQuery { PageSize = 100 }; + + DeadLetterEnvelopeResults? results = null; + var deadline = DateTimeOffset.UtcNow.Add(60.Seconds()); + + while (DateTimeOffset.UtcNow < deadline) + { + results = await messageStore.DeadLetters.QueryAsync(query, CancellationToken.None); + if (results.Envelopes.Count() >= 3) break; + await Task.Delay(2.Seconds()); + } + + results.ShouldNotBeNull(); + results.Envelopes.Count().ShouldBeGreaterThanOrEqualTo(3, + "All 3 failed messages should have been recovered from the RabbitMQ DLQ"); + } +} + +/// +/// Tests the params string[] overload for custom queue names. +/// +public class dead_letter_queue_recovery_with_custom_queues : IAsyncLifetime +{ + private readonly string _queueName = $"custom-dlq-src-{Guid.NewGuid():N}"; + private readonly string _customDlqName = $"custom-dlq-{Guid.NewGuid():N}"; + private IHost _host = null!; + + public async Task InitializeAsync() + { + _host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.ServiceName = "CustomDlqRecoveryTest"; + opts.Durability.Mode = DurabilityMode.Solo; + + opts.PersistMessagesWithSqlServer(Servers.SqlServerConnectionString, "dlq_custom"); + + // Configure a custom DLQ name on a specific queue + opts.UseRabbitMq() + .AutoProvision() + .AutoPurgeOnStartup() + .EnableDeadLetterQueueRecovery(_customDlqName); + + opts.ListenToRabbitQueue(_queueName) + .DeadLetterQueueing(new DeadLetterQueue(_customDlqName, DeadLetterQueueMode.Native)); + + opts.PublishMessage().ToRabbitQueue(_queueName); + + opts.LocalRoutingConventionDisabled = true; + opts.Services.AddResourceSetupOnStartup(StartupAction.ResetState); + }).StartAsync(); + + await _host.ResetResourceState(); + } + + public async Task DisposeAsync() + { + _host?.TeardownResources(); + _host?.Dispose(); + } + + [Fact] + public async Task recovers_from_custom_named_dlq() + { + await _host + .TrackActivity() + .DoNotAssertOnExceptionsDetected() + .Timeout(30.Seconds()) + .PublishMessageAndWaitAsync(new CustomDlqTestMessage("custom-test")); + + var messageStore = _host.Services.GetRequiredService(); + var query = new DeadLetterEnvelopeQuery { PageSize = 100 }; + + DeadLetterEnvelopeResults? results = null; + var deadline = DateTimeOffset.UtcNow.Add(30.Seconds()); + + while (DateTimeOffset.UtcNow < deadline) + { + results = await messageStore.DeadLetters.QueryAsync(query, CancellationToken.None); + if (results.Envelopes.Any()) break; + await Task.Delay(500); + } + + results.ShouldNotBeNull(); + results.Envelopes.ShouldNotBeEmpty( + "The DeadLetterQueueListener should recover messages from the custom-named DLQ"); + } + + [Fact] + public void settings_contain_custom_queue_name() + { + var settings = _host.Services.GetRequiredService(); + settings.QueueNames.ShouldContain(_customDlqName); + } +} + +// Message types and handlers for the recovery tests + +public record RecoveryTestMessage(string Value); + +public static class RecoveryTestMessageHandler +{ + public static void Handle(RecoveryTestMessage message) + { + throw new DivideByZeroException($"Recovery test failure: {message.Value}"); + } +} + +public record CustomDlqTestMessage(string Value); + +public static class CustomDlqTestMessageHandler +{ + public static void Handle(CustomDlqTestMessage message) + { + throw new InvalidOperationException($"Custom DLQ test failure: {message.Value}"); + } +} diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/DeadLetterQueueListener.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/DeadLetterQueueListener.cs new file mode 100644 index 000000000..19f8f42d4 --- /dev/null +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/DeadLetterQueueListener.cs @@ -0,0 +1,258 @@ +using System.Text; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; +using Wolverine.Runtime; +using Wolverine.Transports; + +namespace Wolverine.RabbitMQ.Internal; + +/// +/// Configuration holder for dead letter queue recovery. Registered as a singleton +/// so the listener can discover which queues to subscribe to. +/// +public class DeadLetterQueueRecoverySettings +{ + public List QueueNames { get; } = new(); +} + +/// +/// Background service that listens to one or more RabbitMQ dead letter queues and recovers +/// messages into Wolverine's persistent dead letter storage (wolverine_dead_letters table). +/// This bridges the gap between RabbitMQ's native DLX mechanism and Wolverine's database-backed +/// dead letter management, enabling CritterWatch to query, replay, and discard dead letters. +/// +public class DeadLetterQueueListener : BackgroundService +{ + private readonly RabbitMqTransport _transport; + private readonly IWolverineRuntime _runtime; + private readonly DeadLetterQueueRecoverySettings _settings; + private readonly ILogger _logger; + private IChannel? _channel; + private IConnection? _connection; + + public DeadLetterQueueListener(RabbitMqTransport transport, IWolverineRuntime runtime, + DeadLetterQueueRecoverySettings settings, ILogger logger) + { + _transport = transport; + _runtime = runtime; + _settings = settings; + _logger = logger; + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + var queueNames = _settings.QueueNames.Count > 0 + ? _settings.QueueNames + : new List { _transport.DeadLetterQueue.QueueName }; + + try + { + _connection = await _transport.CreateConnectionAsync(); + _channel = await _connection.CreateChannelAsync(cancellationToken: stoppingToken); + + // Prefetch 10 messages at a time to avoid overwhelming the database + await _channel.BasicQosAsync(0, 10, false, stoppingToken); + + var consumer = new AsyncEventingBasicConsumer(_channel); + consumer.ReceivedAsync += async (_, args) => + { + try + { + await processDeadLetterAsync(args, stoppingToken); + await _channel.BasicAckAsync(args.DeliveryTag, false, stoppingToken); + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to process dead letter message from RabbitMQ DLQ"); + // Requeue the message so we don't lose it + await _channel.BasicNackAsync(args.DeliveryTag, false, true, stoppingToken); + } + }; + + foreach (var queueName in queueNames) + { + await _channel.BasicConsumeAsync(queueName, false, consumer, stoppingToken); + _logger.LogInformation( + "Dead letter queue listener started on queue '{QueueName}'. Messages will be recovered to database storage.", + queueName); + } + + // Keep running until cancellation + await Task.Delay(Timeout.Infinite, stoppingToken); + } + catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) + { + // Normal shutdown + } + catch (Exception ex) + { + _logger.LogError(ex, "Dead letter queue listener failed"); + } + } + + private async Task processDeadLetterAsync(BasicDeliverEventArgs args, CancellationToken ct) + { + var envelope = new Envelope + { + Data = args.Body.ToArray(), + ContentType = args.BasicProperties.ContentType ?? EnvelopeConstants.JsonContentType, + }; + + // Map standard RabbitMQ properties to Wolverine envelope + if (Guid.TryParse(args.BasicProperties.MessageId, out var messageId)) + { + envelope.Id = messageId; + } + else + { + envelope.Id = Guid.NewGuid(); + } + + envelope.MessageType = args.BasicProperties.Type; + envelope.CorrelationId = args.BasicProperties.CorrelationId; + + // Copy all headers from the message + if (args.BasicProperties.Headers != null) + { + foreach (var header in args.BasicProperties.Headers) + { + var value = header.Value switch + { + byte[] b => Encoding.UTF8.GetString(b), + _ => header.Value?.ToString() + }; + + if (value != null) + { + envelope.Headers[header.Key] = value; + } + } + } + + // Extract exception info from Wolverine headers (InteropFriendly mode stamps these) + var exceptionType = extractHeader(args.BasicProperties.Headers, + DeadLetterQueueConstants.ExceptionTypeHeader) ?? "Unknown"; + var exceptionMessage = extractHeader(args.BasicProperties.Headers, + DeadLetterQueueConstants.ExceptionMessageHeader) ?? "Recovered from RabbitMQ dead letter queue"; + + // Extract x-death metadata from RabbitMQ (added by the DLX mechanism) + var (originalQueue, deathReason, deathCount) = extractXDeathInfo(args.BasicProperties.Headers); + + if (string.IsNullOrEmpty(exceptionMessage) || exceptionMessage == "Recovered from RabbitMQ dead letter queue") + { + // Build a descriptive message from RabbitMQ's x-death metadata + var parts = new List { "Message dead-lettered by RabbitMQ" }; + if (!string.IsNullOrEmpty(originalQueue)) parts.Add($"from queue '{originalQueue}'"); + if (!string.IsNullOrEmpty(deathReason)) parts.Add($"reason: {deathReason}"); + if (deathCount > 0) parts.Add($"death count: {deathCount}"); + exceptionMessage = string.Join(", ", parts); + } + + // Reconstruct source and destination info + if (!string.IsNullOrEmpty(originalQueue)) + { + envelope.Source = $"rabbitmq://queue/{originalQueue}"; + envelope.Destination = new Uri($"rabbitmq://queue/{originalQueue}"); + } + else + { + // Fallback — use the DLQ queue name itself + envelope.Destination = new Uri($"rabbitmq://queue/{args.Exchange ?? "unknown"}"); + } + + // Ensure SentAt is set (needed for dead letter storage) + if (envelope.SentAt == default) + { + envelope.SentAt = DateTimeOffset.UtcNow; + } + + // Create a synthetic exception to pass to the dead letter storage + var exception = new DeadLetterRecoveredException(exceptionType, exceptionMessage); + + // Write to the message store's dead letter storage + await _runtime.Storage.Inbox.MoveToDeadLetterStorageAsync(envelope, exception); + + _logger.LogInformation( + "Recovered dead letter {MessageId} (type={MessageType}) from RabbitMQ DLQ to database storage. " + + "Original queue: {OriginalQueue}, Reason: {Reason}", + envelope.Id, envelope.MessageType ?? "unknown", originalQueue ?? "unknown", + deathReason ?? "unknown"); + } + + private static string? extractHeader(IDictionary? headers, string key) + { + if (headers == null) return null; + if (!headers.TryGetValue(key, out var raw)) return null; + return raw switch + { + byte[] b => Encoding.UTF8.GetString(b), + _ => raw?.ToString() + }; + } + + private static (string? originalQueue, string? reason, long count) extractXDeathInfo( + IDictionary? headers) + { + if (headers == null) return (null, null, 0); + if (!headers.TryGetValue("x-death", out var xDeathRaw)) return (null, null, 0); + + // x-death is a list of dictionaries added by RabbitMQ when a message is dead-lettered + if (xDeathRaw is not IList xDeathList || xDeathList.Count == 0) + return (null, null, 0); + + // Take the first (most recent) death record + if (xDeathList[0] is not IDictionary firstDeath) + return (null, null, 0); + + string? queue = null; + string? reason = null; + long count = 0; + + if (firstDeath.TryGetValue("queue", out var q)) + queue = q switch { byte[] b => Encoding.UTF8.GetString(b), _ => q?.ToString() }; + + if (firstDeath.TryGetValue("reason", out var r)) + reason = r switch { byte[] b => Encoding.UTF8.GetString(b), _ => r?.ToString() }; + + if (firstDeath.TryGetValue("count", out var c) && c is long l) + count = l; + + return (queue, reason, count); + } + + public override async Task StopAsync(CancellationToken cancellationToken) + { + if (_channel is not null) + { + await _channel.CloseAsync(cancellationToken); + _channel.Dispose(); + } + + if (_connection is not null) + { + await _connection.CloseAsync(cancellationToken); + _connection.Dispose(); + } + + await base.StopAsync(cancellationToken); + } +} + +/// +/// Synthetic exception type used when recovering dead letters from RabbitMQ. +/// Carries the original exception type name and message reconstructed from headers. +/// +public class DeadLetterRecoveredException : Exception +{ + public string OriginalExceptionType { get; } + + public DeadLetterRecoveredException(string originalExceptionType, string message) + : base(message) + { + OriginalExceptionType = originalExceptionType; + } + + public override string ToString() => $"{OriginalExceptionType}: {Message}"; +} diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqTransport.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqTransport.cs index 6ce8dd676..3e67fd301 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqTransport.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqTransport.cs @@ -74,6 +74,12 @@ private void configureDefaults(ConnectionFactory factory) public DeadLetterQueue DeadLetterQueue { get; } = new(DeadLetterQueueName); + /// + /// When true, a background listener recovers messages from the RabbitMQ dead letter queue + /// into Wolverine's persistent dead letter storage (database). + /// + public bool EnableDeadLetterQueueRecovery { get; set; } + internal RabbitMqChannelCallback? Callback { get; private set; } internal ConnectionMonitor ListeningConnection => _listenerConnection ?? throw new InvalidOperationException("The listening connection has not been created yet or is disabled!"); diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqTransportExpression.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqTransportExpression.cs index 119994eac..7752a2609 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqTransportExpression.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqTransportExpression.cs @@ -1,3 +1,4 @@ +using Microsoft.Extensions.DependencyInjection; using RabbitMQ.Client; using Wolverine.Configuration; using Wolverine.Transports; @@ -216,6 +217,59 @@ public RabbitMqTransportExpression DisableDeadLetterQueueing() return this; } + /// + /// Enable a background listener on the RabbitMQ dead letter queue that recovers + /// dead-lettered messages into Wolverine's persistent dead letter storage (database). + /// This bridges RabbitMQ's native DLX with Wolverine's database-backed DLQ management, + /// enabling tools like CritterWatch to query, replay, and discard dead letters. + /// Use this when you want native RabbitMQ dead lettering AND database-backed DLQ visibility. + /// + public RabbitMqTransportExpression EnableDeadLetterQueueRecovery() + { + Transport.EnableDeadLetterQueueRecovery = true; + ensureRecoveryServicesRegistered(); + return this; + } + + /// + /// Enable a background listener on specific RabbitMQ dead letter queues that recovers + /// dead-lettered messages into Wolverine's persistent dead letter storage (database). + /// Use this overload when you have custom DLQ queue names beyond the default + /// wolverine-dead-letter-queue. + /// + /// One or more RabbitMQ queue names to listen to for dead letter recovery + public RabbitMqTransportExpression EnableDeadLetterQueueRecovery(params string[] queueNames) + { + Transport.EnableDeadLetterQueueRecovery = true; + var settings = ensureRecoveryServicesRegistered(); + foreach (var name in queueNames) + { + if (!settings.QueueNames.Contains(name)) + { + settings.QueueNames.Add(name); + } + } + return this; + } + + private DeadLetterQueueRecoverySettings ensureRecoveryServicesRegistered() + { + // Check if already registered by looking for existing settings + var existing = Options.Services + .Where(s => s.ServiceType == typeof(DeadLetterQueueRecoverySettings)) + .Select(s => s.ImplementationInstance) + .OfType() + .FirstOrDefault(); + + if (existing != null) return existing; + + var settings = new DeadLetterQueueRecoverySettings(); + Options.Services.AddSingleton(settings); + Options.Services.AddSingleton(Transport); + Options.Services.AddHostedService(); + return settings; + } + /// /// Disable Wolverine's automatic Request/Reply queue declaration for a specific node /// diff --git a/src/Wolverine/Configuration/Capabilities/EndpointDescriptor.cs b/src/Wolverine/Configuration/Capabilities/EndpointDescriptor.cs index b380f1051..b6fd1940a 100644 --- a/src/Wolverine/Configuration/Capabilities/EndpointDescriptor.cs +++ b/src/Wolverine/Configuration/Capabilities/EndpointDescriptor.cs @@ -1,9 +1,27 @@ +using System.Text.RegularExpressions; using JasperFx.Descriptors; namespace Wolverine.Configuration.Capabilities; public class EndpointDescriptor : OptionsDescription { + private static readonly Dictionary TransportTypeMap = new(StringComparer.OrdinalIgnoreCase) + { + { "RabbitMqQueue", "RabbitMQ Queue" }, + { "RabbitMqExchange", "RabbitMQ Exchange" }, + { "RabbitMqTopicEndpoint", "RabbitMQ Topic" }, + { "RabbitMqRouting", "RabbitMQ Routing" }, + { "AzureServiceBusQueue", "Azure Service Bus Queue" }, + { "AzureServiceBusTopic", "Azure Service Bus Topic" }, + { "AzureServiceBusSubscription", "Azure Service Bus Subscription" }, + { "LocalQueue", "Local Queue" }, + { "NatsEndpoint", "NATS" }, + { "RedisStreamEndpoint", "Redis Stream" }, + { "PubsubEndpoint", "GCP Pub/Sub" }, + { "PulsarEndpoint", "Pulsar" }, + { "SignalRClientEndpoint", "SignalR" }, + }; + public EndpointDescriptor() { } @@ -11,10 +29,29 @@ public EndpointDescriptor() public EndpointDescriptor(Endpoint endpoint) : base(endpoint) { Uri = endpoint.Uri; + TransportType = ResolveTransportType(endpoint); } - + public Uri Uri { get; set; } = null!; + /// + /// Human-readable description of the transport type (e.g., "RabbitMQ Queue", "Local Queue", "Kafka Topic") + /// + public string? TransportType { get; init; } + + internal static string ResolveTransportType(Endpoint endpoint) + { + var typeName = endpoint.GetType().Name; + + if (TransportTypeMap.TryGetValue(typeName, out var mapped)) + { + return mapped; + } + + // Fallback: split PascalCase into words (e.g., "KafkaTopicEndpoint" → "Kafka Topic Endpoint") + return Regex.Replace(typeName, "(?<=[a-z])(?=[A-Z])|(?<=[A-Z])(?=[A-Z][a-z])", " "); + } + protected bool Equals(EndpointDescriptor other) { return Uri.Equals(other.Uri); diff --git a/src/Wolverine/ErrorHandling/CircuitBreaker.cs b/src/Wolverine/ErrorHandling/CircuitBreaker.cs index 72f74c9bd..e5efe03b0 100644 --- a/src/Wolverine/ErrorHandling/CircuitBreaker.cs +++ b/src/Wolverine/ErrorHandling/CircuitBreaker.cs @@ -4,6 +4,7 @@ using Wolverine.Configuration; using Wolverine.ErrorHandling.Matches; using Wolverine.Runtime; +using Wolverine.Runtime.Agents; using Wolverine.Runtime.Handlers; using Wolverine.Transports; @@ -93,12 +94,14 @@ internal class CircuitBreaker : IAsyncDisposable, IMessageSuccessTracker private readonly IExceptionMatch _match; private readonly Block _processingBlock; private readonly double _ratio; + private readonly IWolverineObserver? _observer; - public CircuitBreaker(CircuitBreakerOptions options, IListenerCircuit circuit) + public CircuitBreaker(CircuitBreakerOptions options, IListenerCircuit circuit, IWolverineObserver? observer = null) { Options = options; _match = options.ToExceptionMatch(); _circuit = circuit; + _observer = observer; _processingBlock = new Block(processExceptionsAsync); _batching = new BatchingChannel(options.SamplingPeriod, _processingBlock); @@ -177,6 +180,11 @@ public async ValueTask UpdateTotalsAsync(DateTimeOffset time, int failures, int using var activity = WolverineTracing.ActivitySource.StartActivity(WolverineTracing.CircuitBreakerTripped); activity?.SetTag(WolverineTracing.EndpointAddress, _circuit.Endpoint.Uri); await _circuit.PauseAsync(Options.PauseTime); + + if (_observer != null) + { + await _observer.CircuitBreakerTripped(_circuit.Endpoint, Options); + } } } diff --git a/src/Wolverine/Runtime/Agents/IWolverineObserver.cs b/src/Wolverine/Runtime/Agents/IWolverineObserver.cs index 6bd94ec97..3a742bcef 100644 --- a/src/Wolverine/Runtime/Agents/IWolverineObserver.cs +++ b/src/Wolverine/Runtime/Agents/IWolverineObserver.cs @@ -1,4 +1,5 @@ using Wolverine.Configuration; +using Wolverine.ErrorHandling; using Wolverine.Logging; using Wolverine.Runtime.Metrics; using Wolverine.Runtime.Routing; @@ -26,6 +27,8 @@ public interface IWolverineObserver Task BackPressureTriggered(Endpoint endpoint, IListeningAgent agent); Task BackPressureLifted(Endpoint endpoint); Task ListenerLatched(Endpoint endpoint); + Task CircuitBreakerTripped(Endpoint endpoint, CircuitBreakerOptions options); + Task CircuitBreakerReset(Endpoint endpoint); void PersistedCounts(Uri storeUri, PersistedCounts counts); void MessageHandlingMetricsExported(MessageHandlingMetrics metrics); } @@ -130,7 +133,7 @@ public void MessageRouted(Type messageType, IMessageRouter router) public async Task AssignmentsChanged(AssignmentGrid grid, AgentCommands commands) { if (!commands.Any()) return; - + var records = commands.Select(x => new NodeRecord { NodeNumber = _runtime.Options.Durability.AssignedNodeNumber, @@ -140,6 +143,14 @@ public async Task AssignmentsChanged(AssignmentGrid grid, AgentCommands commands await _runtime.Storage.Nodes.LogRecordsAsync(records); } - - + + public Task CircuitBreakerTripped(Endpoint endpoint, CircuitBreakerOptions options) + { + return Task.CompletedTask; + } + + public Task CircuitBreakerReset(Endpoint endpoint) + { + return Task.CompletedTask; + } } \ No newline at end of file diff --git a/src/Wolverine/Transports/ListeningAgent.cs b/src/Wolverine/Transports/ListeningAgent.cs index 5de4a3c12..17498a44f 100644 --- a/src/Wolverine/Transports/ListeningAgent.cs +++ b/src/Wolverine/Transports/ListeningAgent.cs @@ -50,7 +50,7 @@ public ListeningAgent(Endpoint endpoint, WolverineRuntime runtime) if (endpoint.CircuitBreakerOptions != null) { - _circuitBreaker = new CircuitBreaker(endpoint.CircuitBreakerOptions, this); + _circuitBreaker = new CircuitBreaker(endpoint.CircuitBreakerOptions, this, runtime.Observer); _pipeline = new HandlerPipeline(runtime, new CircuitBreakerTrackedExecutorFactory(_circuitBreaker, new CircuitBreakerTrackedExecutorFactory(_circuitBreaker, runtime)), endpoint) diff --git a/src/Wolverine/Transports/Local/DurableLocalQueue.cs b/src/Wolverine/Transports/Local/DurableLocalQueue.cs index 3031053a2..0e6df1d6a 100644 --- a/src/Wolverine/Transports/Local/DurableLocalQueue.cs +++ b/src/Wolverine/Transports/Local/DurableLocalQueue.cs @@ -44,7 +44,7 @@ public DurableLocalQueue(Endpoint endpoint, WolverineRuntime runtime) if (endpoint.CircuitBreakerOptions != null) { - CircuitBreaker = new CircuitBreaker(endpoint.CircuitBreakerOptions, this); + CircuitBreaker = new CircuitBreaker(endpoint.CircuitBreakerOptions, this, runtime.Observer); Pipeline = new HandlerPipeline(runtime, new CircuitBreakerTrackedExecutorFactory(CircuitBreaker, runtime), endpoint) {