diff --git a/src/Testing/CoreTests/Transports/GloballyLatchedListenerTests.cs b/src/Testing/CoreTests/Transports/GloballyLatchedListenerTests.cs index 299aceaa2..453aded47 100644 --- a/src/Testing/CoreTests/Transports/GloballyLatchedListenerTests.cs +++ b/src/Testing/CoreTests/Transports/GloballyLatchedListenerTests.cs @@ -13,25 +13,32 @@ namespace CoreTests.Transports; -public class GloballyLatchedListenerTests : IDisposable +public class GloballyLatchedListenerTests : IAsyncLifetime { private readonly int _port; - private readonly IHost _host; + private IHost _host = null!; public GloballyLatchedListenerTests() { _port = PortFinder.GetAvailablePort(); + } - _host = WolverineHost.For(opts => + public async Task InitializeAsync() + { + _host = await WolverineHost.ForAsync(opts => { opts.Durability.Mode = DurabilityMode.Solo; opts.ListenAtPort(_port).Named("latching-test"); }); } - public void Dispose() + public async Task DisposeAsync() { - _host?.Dispose(); + if (_host != null) + { + await _host.StopAsync(); + _host.Dispose(); + } } [Fact] diff --git a/src/Transports/AWS/Wolverine.AmazonSqs/Internal/AmazonSqsTransport.cs b/src/Transports/AWS/Wolverine.AmazonSqs/Internal/AmazonSqsTransport.cs index 5a48455e1..01861cbd4 100644 --- a/src/Transports/AWS/Wolverine.AmazonSqs/Internal/AmazonSqsTransport.cs +++ b/src/Transports/AWS/Wolverine.AmazonSqs/Internal/AmazonSqsTransport.cs @@ -145,6 +145,11 @@ public override async ValueTask ConnectAsync(IWolverineRuntime runtime) } } + public WolverineTransportHealthCheck BuildHealthCheck(IWolverineRuntime runtime) + { + return new SqsHealthCheck(this); + } + internal async Task CleanupOrphanedSystemQueuesAsync(IWolverineRuntime runtime) { var logger = runtime.LoggerFactory.CreateLogger(); diff --git a/src/Transports/AWS/Wolverine.AmazonSqs/Internal/SqsHealthCheck.cs b/src/Transports/AWS/Wolverine.AmazonSqs/Internal/SqsHealthCheck.cs new file mode 100644 index 000000000..32207d9bd --- /dev/null +++ b/src/Transports/AWS/Wolverine.AmazonSqs/Internal/SqsHealthCheck.cs @@ -0,0 +1,81 @@ +using Wolverine.Transports; + +namespace Wolverine.AmazonSqs.Internal; + +internal class SqsHealthCheck : WolverineTransportHealthCheck +{ + private readonly AmazonSqsTransport _transport; + + public SqsHealthCheck(AmazonSqsTransport transport) => _transport = transport; + + public override string TransportName => "Amazon SQS"; + public override string Protocol => "sqs"; + + public override async Task CheckHealthAsync(CancellationToken cancellationToken = default) + { + var status = TransportHealthStatus.Healthy; + string? message = null; + var data = new Dictionary(); + + var client = _transport.Client; + data["HasClient"] = client != null; + + if (client == null) + { + return new TransportHealthResult(TransportName, Protocol, TransportHealthStatus.Degraded, + "SQS client not yet initialized", DateTimeOffset.UtcNow, data); + } + + try + { + // Lightweight probe: list queues with a limit + var response = await client.ListQueuesAsync("wolverine", cancellationToken); + data["QueueCount"] = response.QueueUrls?.Count ?? 0; + } + catch (Exception ex) + { + status = TransportHealthStatus.Unhealthy; + message = $"SQS connectivity check failed: {ex.Message}"; + } + + return new TransportHealthResult(TransportName, Protocol, status, message, + DateTimeOffset.UtcNow, data); + } + + public override async Task GetBrokerQueueDepthAsync(Uri endpointUri, + CancellationToken cancellationToken = default) + { + if (endpointUri.Scheme != "sqs") return null; + + var client = _transport.Client; + if (client == null) return null; + + var queueName = endpointUri.Segments.LastOrDefault()?.TrimEnd('/'); + if (string.IsNullOrEmpty(queueName)) return null; + + try + { + // Find the queue URL + var endpoint = _transport.Queues[queueName]; + var queueUrl = endpoint?.QueueUrl; + if (queueUrl == null) return null; + + var response = await client.GetQueueAttributesAsync( + queueUrl, + new List { "ApproximateNumberOfMessages" }, + cancellationToken); + + if (response.Attributes.TryGetValue("ApproximateNumberOfMessages", out var countStr) && + long.TryParse(countStr, out var count)) + { + return count; + } + } + catch + { + // Queue may not exist or access denied + } + + return null; + } +} diff --git a/src/Transports/Azure/Wolverine.AzureServiceBus/AzureServiceBusTransport.cs b/src/Transports/Azure/Wolverine.AzureServiceBus/AzureServiceBusTransport.cs index f835cacbe..def0db8fd 100644 --- a/src/Transports/Azure/Wolverine.AzureServiceBus/AzureServiceBusTransport.cs +++ b/src/Transports/Azure/Wolverine.AzureServiceBus/AzureServiceBusTransport.cs @@ -286,6 +286,11 @@ public override ValueTask ConnectAsync(IWolverineRuntime runtime) return ValueTask.CompletedTask; } + public WolverineTransportHealthCheck BuildHealthCheck(IWolverineRuntime runtime) + { + return new AzureServiceBusHealthCheck(this); + } + public override IEnumerable DiagnosticColumns() { yield return new PropertyColumn("Queue", "Name"); diff --git a/src/Transports/Azure/Wolverine.AzureServiceBus/Internal/AzureServiceBusHealthCheck.cs b/src/Transports/Azure/Wolverine.AzureServiceBus/Internal/AzureServiceBusHealthCheck.cs new file mode 100644 index 000000000..92ca9136e --- /dev/null +++ b/src/Transports/Azure/Wolverine.AzureServiceBus/Internal/AzureServiceBusHealthCheck.cs @@ -0,0 +1,60 @@ +using Wolverine.Transports; + +namespace Wolverine.AzureServiceBus.Internal; + +internal class AzureServiceBusHealthCheck : WolverineTransportHealthCheck +{ + private readonly AzureServiceBusTransport _transport; + + public AzureServiceBusHealthCheck(AzureServiceBusTransport transport) => _transport = transport; + + public override string TransportName => "Azure Service Bus"; + public override string Protocol => "asb"; + + public override async Task CheckHealthAsync(CancellationToken cancellationToken = default) + { + var status = TransportHealthStatus.Healthy; + string? message = null; + var data = new Dictionary(); + + try + { + var managementClient = _transport.ManagementClient; + data["HasManagementClient"] = true; + + // Lightweight probe: check namespace properties + var nsProperties = await managementClient.GetNamespacePropertiesAsync(cancellationToken); + data["Namespace"] = nsProperties?.Value?.Name ?? "unknown"; + } + catch (Exception ex) + { + status = TransportHealthStatus.Unhealthy; + message = $"Azure Service Bus connectivity check failed: {ex.Message}"; + data["HasManagementClient"] = false; + } + + return new TransportHealthResult(TransportName, Protocol, status, message, + DateTimeOffset.UtcNow, data); + } + + public override async Task GetBrokerQueueDepthAsync(Uri endpointUri, + CancellationToken cancellationToken = default) + { + if (endpointUri.Scheme != "asb") return null; + + var queueName = endpointUri.Segments.LastOrDefault()?.TrimEnd('/'); + if (string.IsNullOrEmpty(queueName)) return null; + + try + { + var managementClient = _transport.ManagementClient; + var runtimeProps = await managementClient.GetQueueRuntimePropertiesAsync(queueName, cancellationToken); + return runtimeProps.Value.ActiveMessageCount; + } + catch + { + // Queue may not exist, or it's a topic/subscription endpoint + return null; + } + } +} diff --git a/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaHealthCheck.cs b/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaHealthCheck.cs new file mode 100644 index 000000000..0636671f3 --- /dev/null +++ b/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaHealthCheck.cs @@ -0,0 +1,45 @@ +using Confluent.Kafka; +using Wolverine.Transports; + +namespace Wolverine.Kafka.Internals; + +internal class KafkaHealthCheck : WolverineTransportHealthCheck +{ + private readonly KafkaTransport _transport; + + public KafkaHealthCheck(KafkaTransport transport) => _transport = transport; + + public override string TransportName => "Kafka"; + public override string Protocol => "kafka"; + + public override Task CheckHealthAsync(CancellationToken cancellationToken = default) + { + var status = TransportHealthStatus.Healthy; + string? message = null; + var data = new Dictionary(); + + try + { + // Create a short-lived admin client to check broker connectivity + using var adminClient = _transport.CreateAdminClient(); + var metadata = adminClient.GetMetadata(TimeSpan.FromSeconds(5)); + + data["BrokerCount"] = metadata.Brokers.Count; + data["TopicCount"] = metadata.Topics.Count; + + if (metadata.Brokers.Count == 0) + { + status = TransportHealthStatus.Unhealthy; + message = "No Kafka brokers available"; + } + } + catch (Exception ex) + { + status = TransportHealthStatus.Unhealthy; + message = $"Kafka connectivity check failed: {ex.Message}"; + } + + return Task.FromResult(new TransportHealthResult(TransportName, Protocol, status, message, + DateTimeOffset.UtcNow, data)); + } +} diff --git a/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaTransport.cs b/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaTransport.cs index c475827ea..e3e5aea7e 100644 --- a/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaTransport.cs +++ b/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaTransport.cs @@ -112,6 +112,11 @@ public override ValueTask ConnectAsync(IWolverineRuntime runtime) return ValueTask.CompletedTask; } + public WolverineTransportHealthCheck BuildHealthCheck(IWolverineRuntime runtime) + { + return new KafkaHealthCheck(this); + } + public override IEnumerable DiagnosticColumns() { yield break; diff --git a/src/Transports/MQTT/Wolverine.MQTT/Internals/MqttHealthCheck.cs b/src/Transports/MQTT/Wolverine.MQTT/Internals/MqttHealthCheck.cs new file mode 100644 index 000000000..325790465 --- /dev/null +++ b/src/Transports/MQTT/Wolverine.MQTT/Internals/MqttHealthCheck.cs @@ -0,0 +1,41 @@ +using Wolverine.Transports; + +namespace Wolverine.MQTT.Internals; + +internal class MqttHealthCheck : WolverineTransportHealthCheck +{ + private readonly MqttTransport _transport; + + public MqttHealthCheck(MqttTransport transport) => _transport = transport; + + public override string TransportName => "MQTT"; + public override string Protocol => "mqtt"; + + public override Task CheckHealthAsync(CancellationToken cancellationToken = default) + { + var status = TransportHealthStatus.Healthy; + string? message = null; + var data = new Dictionary(); + + try + { + var client = _transport.Client; + data["IsConnected"] = client.IsConnected; + + if (!client.IsConnected) + { + status = TransportHealthStatus.Unhealthy; + message = "MQTT client is not connected"; + } + } + catch + { + status = TransportHealthStatus.Degraded; + message = "MQTT transport not yet initialized"; + } + + // MQTT is pub/sub — no broker-side queue depth concept + return Task.FromResult(new TransportHealthResult(TransportName, Protocol, status, message, + DateTimeOffset.UtcNow, data)); + } +} diff --git a/src/Transports/MQTT/Wolverine.MQTT/Internals/MqttTransport.cs b/src/Transports/MQTT/Wolverine.MQTT/Internals/MqttTransport.cs index 984db0c85..fc07c20d2 100644 --- a/src/Transports/MQTT/Wolverine.MQTT/Internals/MqttTransport.cs +++ b/src/Transports/MQTT/Wolverine.MQTT/Internals/MqttTransport.cs @@ -80,6 +80,11 @@ public override async ValueTask InitializeAsync(IWolverineRuntime runtime) } } + public WolverineTransportHealthCheck BuildHealthCheck(IWolverineRuntime runtime) + { + return new MqttHealthCheck(this); + } + private void startSubscribing() { if (_subscribed) return; diff --git a/src/Transports/NATS/Wolverine.Nats/Internal/NatsHealthCheck.cs b/src/Transports/NATS/Wolverine.Nats/Internal/NatsHealthCheck.cs new file mode 100644 index 000000000..57cb2d811 --- /dev/null +++ b/src/Transports/NATS/Wolverine.Nats/Internal/NatsHealthCheck.cs @@ -0,0 +1,43 @@ +using Wolverine.Transports; + +namespace Wolverine.Nats.Internal; + +internal class NatsHealthCheck : WolverineTransportHealthCheck +{ + private readonly NatsTransport _transport; + + public NatsHealthCheck(NatsTransport transport) => _transport = transport; + + public override string TransportName => "NATS"; + public override string Protocol => "nats"; + + public override Task CheckHealthAsync(CancellationToken cancellationToken = default) + { + var status = TransportHealthStatus.Healthy; + string? message = null; + var data = new Dictionary(); + + try + { + var connection = _transport.Connection; + var serverInfo = connection.ServerInfo; + data["HasConnection"] = true; + data["ServerVersion"] = serverInfo?.Version ?? "unknown"; + + if (serverInfo == null) + { + status = TransportHealthStatus.Degraded; + message = "NATS connected but no server info available"; + } + } + catch + { + status = TransportHealthStatus.Degraded; + message = "NATS transport not yet initialized"; + data["HasConnection"] = false; + } + + return Task.FromResult(new TransportHealthResult(TransportName, Protocol, status, message, + DateTimeOffset.UtcNow, data)); + } +} diff --git a/src/Transports/NATS/Wolverine.Nats/Internal/NatsTransport.cs b/src/Transports/NATS/Wolverine.Nats/Internal/NatsTransport.cs index 0b3809adc..3f758aa2e 100644 --- a/src/Transports/NATS/Wolverine.Nats/Internal/NatsTransport.cs +++ b/src/Transports/NATS/Wolverine.Nats/Internal/NatsTransport.cs @@ -112,6 +112,11 @@ public override async ValueTask ConnectAsync(IWolverineRuntime runtime) } } + public WolverineTransportHealthCheck BuildHealthCheck(IWolverineRuntime runtime) + { + return new NatsHealthCheck(this); + } + public override IEnumerable DiagnosticColumns() { yield return new PropertyColumn("Subject", "header"); diff --git a/src/Transports/Pulsar/Wolverine.Pulsar/PulsarHealthCheck.cs b/src/Transports/Pulsar/Wolverine.Pulsar/PulsarHealthCheck.cs new file mode 100644 index 000000000..9a12f8123 --- /dev/null +++ b/src/Transports/Pulsar/Wolverine.Pulsar/PulsarHealthCheck.cs @@ -0,0 +1,32 @@ +using Wolverine.Transports; + +namespace Wolverine.Pulsar; + +internal class PulsarHealthCheck : WolverineTransportHealthCheck +{ + private readonly PulsarTransport _transport; + + public PulsarHealthCheck(PulsarTransport transport) => _transport = transport; + + public override string TransportName => "Pulsar"; + public override string Protocol => "pulsar"; + + public override Task CheckHealthAsync(CancellationToken cancellationToken = default) + { + var status = TransportHealthStatus.Healthy; + string? message = null; + var data = new Dictionary(); + + var client = _transport.Client; + data["HasClient"] = client != null; + + if (client == null) + { + status = TransportHealthStatus.Degraded; + message = "Pulsar client not yet initialized"; + } + + return Task.FromResult(new TransportHealthResult(TransportName, Protocol, status, message, + DateTimeOffset.UtcNow, data)); + } +} diff --git a/src/Transports/Pulsar/Wolverine.Pulsar/PulsarTransport.cs b/src/Transports/Pulsar/Wolverine.Pulsar/PulsarTransport.cs index 721808cd1..1604b9b52 100644 --- a/src/Transports/Pulsar/Wolverine.Pulsar/PulsarTransport.cs +++ b/src/Transports/Pulsar/Wolverine.Pulsar/PulsarTransport.cs @@ -88,7 +88,10 @@ public override ValueTask InitializeAsync(IWolverineRuntime runtime) return ValueTask.CompletedTask; } - + public WolverineTransportHealthCheck BuildHealthCheck(IWolverineRuntime runtime) + { + return new PulsarHealthCheck(this); + } public PulsarEndpoint EndpointFor(string topicPath) { diff --git a/src/Transports/Redis/Wolverine.Redis/Internal/RedisHealthCheck.cs b/src/Transports/Redis/Wolverine.Redis/Internal/RedisHealthCheck.cs new file mode 100644 index 000000000..ebe761562 --- /dev/null +++ b/src/Transports/Redis/Wolverine.Redis/Internal/RedisHealthCheck.cs @@ -0,0 +1,62 @@ +using Wolverine.Transports; + +namespace Wolverine.Redis.Internal; + +internal class RedisHealthCheck : WolverineTransportHealthCheck +{ + private readonly RedisTransport _transport; + + public RedisHealthCheck(RedisTransport transport) => _transport = transport; + + public override string TransportName => "Redis"; + public override string Protocol => "redis"; + + public override Task CheckHealthAsync(CancellationToken cancellationToken = default) + { + var status = TransportHealthStatus.Healthy; + string? message = null; + var data = new Dictionary(); + + try + { + var connection = _transport.GetConnection(); + data["IsConnected"] = connection.IsConnected; + + if (!connection.IsConnected) + { + status = TransportHealthStatus.Unhealthy; + message = "Redis connection is not connected"; + } + } + catch + { + status = TransportHealthStatus.Degraded; + message = "Redis transport not yet initialized"; + } + + return Task.FromResult(new TransportHealthResult(TransportName, Protocol, status, message, + DateTimeOffset.UtcNow, data)); + } + + public override async Task GetBrokerQueueDepthAsync(Uri endpointUri, + CancellationToken cancellationToken = default) + { + if (endpointUri.Scheme != "redis") return null; + + var streamKey = endpointUri.Segments.LastOrDefault()?.TrimEnd('/'); + if (string.IsNullOrEmpty(streamKey)) return null; + + try + { + var connection = _transport.GetConnection(); + if (!connection.IsConnected) return null; + + var db = connection.GetDatabase(); + return await db.StreamLengthAsync(streamKey); + } + catch + { + return null; + } + } +} diff --git a/src/Transports/Redis/Wolverine.Redis/Internal/RedisTransport.cs b/src/Transports/Redis/Wolverine.Redis/Internal/RedisTransport.cs index 6b1d556e6..4e7de2cee 100644 --- a/src/Transports/Redis/Wolverine.Redis/Internal/RedisTransport.cs +++ b/src/Transports/Redis/Wolverine.Redis/Internal/RedisTransport.cs @@ -116,6 +116,11 @@ public override async ValueTask ConnectAsync(IWolverineRuntime runtime) } } + public WolverineTransportHealthCheck BuildHealthCheck(IWolverineRuntime runtime) + { + return new RedisHealthCheck(this); + } + public override IEnumerable DiagnosticColumns() { yield return new PropertyColumn("Stream Key", "streamKey"); diff --git a/src/Wolverine/Transports/Tcp/SocketListener.cs b/src/Wolverine/Transports/Tcp/SocketListener.cs index 2a45c31d3..311ce21ba 100644 --- a/src/Wolverine/Transports/Tcp/SocketListener.cs +++ b/src/Wolverine/Transports/Tcp/SocketListener.cs @@ -58,7 +58,15 @@ public async ValueTask DisposeAsync() if (_receivingLoop != null) { - await _receivingLoop; + try + { + await _receivingLoop; + } + catch (OperationCanceledException) + { + // Expected during disposal — the receiving loop was cancelled + } + _receivingLoop.Dispose(); _receivingLoop = null; } diff --git a/src/Wolverine/Transports/WolverineTransportHealthCheckAdapter.cs b/src/Wolverine/Transports/WolverineTransportHealthCheckAdapter.cs new file mode 100644 index 000000000..c9f54c053 --- /dev/null +++ b/src/Wolverine/Transports/WolverineTransportHealthCheckAdapter.cs @@ -0,0 +1,48 @@ +using Microsoft.Extensions.Diagnostics.HealthChecks; + +namespace Wolverine.Transports; + +/// +/// Wraps a as an ASP.NET Core +/// for integration with the standard /health endpoint. +/// +internal class WolverineTransportHealthCheckAdapter : IHealthCheck +{ + private readonly WolverineTransportHealthCheck _inner; + private readonly DateTimeOffset _startedAt = DateTimeOffset.UtcNow; + + /// + /// Grace period after startup during which the health check reports Healthy + /// even if the transport hasn't connected yet. Prevents false Unhealthy reports + /// during application initialization. + /// + public static TimeSpan StartupGracePeriod { get; set; } = TimeSpan.FromSeconds(30); + + public WolverineTransportHealthCheckAdapter(WolverineTransportHealthCheck inner) + { + _inner = inner; + } + + public async Task CheckHealthAsync(HealthCheckContext context, + CancellationToken cancellationToken = default) + { + var result = await _inner.CheckHealthAsync(cancellationToken); + + // During startup grace period, report Healthy even if Degraded/Unhealthy + if (result.Status != TransportHealthStatus.Healthy && + DateTimeOffset.UtcNow - _startedAt < StartupGracePeriod) + { + return HealthCheckResult.Healthy( + $"Startup grace period ({StartupGracePeriod.TotalSeconds}s) — {result.Message ?? "initializing"}", + result.Data); + } + + return result.Status switch + { + TransportHealthStatus.Healthy => HealthCheckResult.Healthy(result.Message, result.Data), + TransportHealthStatus.Degraded => HealthCheckResult.Degraded(result.Message, data: result.Data), + TransportHealthStatus.Unhealthy => HealthCheckResult.Unhealthy(result.Message, data: result.Data), + _ => HealthCheckResult.Unhealthy($"Unknown status: {result.Status}") + }; + } +}