Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions src/Testing/CoreTests/Transports/GloballyLatchedListenerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,32 @@

namespace CoreTests.Transports;

public class GloballyLatchedListenerTests : IDisposable
public class GloballyLatchedListenerTests : IAsyncLifetime
{
private readonly int _port;
private readonly IHost _host;
private IHost _host = null!;

public GloballyLatchedListenerTests()
{
_port = PortFinder.GetAvailablePort();
}

_host = WolverineHost.For(opts =>
public async Task InitializeAsync()
{
_host = await WolverineHost.ForAsync(opts =>
{
opts.Durability.Mode = DurabilityMode.Solo;
opts.ListenAtPort(_port).Named("latching-test");
});
}

public void Dispose()
public async Task DisposeAsync()
{
_host?.Dispose();
if (_host != null)
{
await _host.StopAsync();
_host.Dispose();
}
}

[Fact]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,11 @@ public override async ValueTask ConnectAsync(IWolverineRuntime runtime)
}
}

public WolverineTransportHealthCheck BuildHealthCheck(IWolverineRuntime runtime)
{
return new SqsHealthCheck(this);
}

internal async Task CleanupOrphanedSystemQueuesAsync(IWolverineRuntime runtime)
{
var logger = runtime.LoggerFactory.CreateLogger<AmazonSqsTransport>();
Expand Down
81 changes: 81 additions & 0 deletions src/Transports/AWS/Wolverine.AmazonSqs/Internal/SqsHealthCheck.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
using Wolverine.Transports;

namespace Wolverine.AmazonSqs.Internal;

internal class SqsHealthCheck : WolverineTransportHealthCheck
{
private readonly AmazonSqsTransport _transport;

public SqsHealthCheck(AmazonSqsTransport transport) => _transport = transport;

public override string TransportName => "Amazon SQS";
public override string Protocol => "sqs";

public override async Task<TransportHealthResult> CheckHealthAsync(CancellationToken cancellationToken = default)
{
var status = TransportHealthStatus.Healthy;
string? message = null;
var data = new Dictionary<string, object>();

var client = _transport.Client;
data["HasClient"] = client != null;

if (client == null)
{
return new TransportHealthResult(TransportName, Protocol, TransportHealthStatus.Degraded,
"SQS client not yet initialized", DateTimeOffset.UtcNow, data);
}

try
{
// Lightweight probe: list queues with a limit
var response = await client.ListQueuesAsync("wolverine", cancellationToken);
data["QueueCount"] = response.QueueUrls?.Count ?? 0;
}
catch (Exception ex)
{
status = TransportHealthStatus.Unhealthy;
message = $"SQS connectivity check failed: {ex.Message}";
}

return new TransportHealthResult(TransportName, Protocol, status, message,
DateTimeOffset.UtcNow, data);
}

public override async Task<long?> GetBrokerQueueDepthAsync(Uri endpointUri,
CancellationToken cancellationToken = default)
{
if (endpointUri.Scheme != "sqs") return null;

var client = _transport.Client;
if (client == null) return null;

var queueName = endpointUri.Segments.LastOrDefault()?.TrimEnd('/');
if (string.IsNullOrEmpty(queueName)) return null;

try
{
// Find the queue URL
var endpoint = _transport.Queues[queueName];
var queueUrl = endpoint?.QueueUrl;
if (queueUrl == null) return null;

var response = await client.GetQueueAttributesAsync(
queueUrl,
new List<string> { "ApproximateNumberOfMessages" },
cancellationToken);

if (response.Attributes.TryGetValue("ApproximateNumberOfMessages", out var countStr) &&
long.TryParse(countStr, out var count))
{
return count;
}
}
catch
{
// Queue may not exist or access denied
}

return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,11 @@ public override ValueTask ConnectAsync(IWolverineRuntime runtime)
return ValueTask.CompletedTask;
}

public WolverineTransportHealthCheck BuildHealthCheck(IWolverineRuntime runtime)
{
return new AzureServiceBusHealthCheck(this);
}

public override IEnumerable<PropertyColumn> DiagnosticColumns()
{
yield return new PropertyColumn("Queue", "Name");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
using Wolverine.Transports;

namespace Wolverine.AzureServiceBus.Internal;

internal class AzureServiceBusHealthCheck : WolverineTransportHealthCheck
{
private readonly AzureServiceBusTransport _transport;

public AzureServiceBusHealthCheck(AzureServiceBusTransport transport) => _transport = transport;

public override string TransportName => "Azure Service Bus";
public override string Protocol => "asb";

public override async Task<TransportHealthResult> CheckHealthAsync(CancellationToken cancellationToken = default)
{
var status = TransportHealthStatus.Healthy;
string? message = null;
var data = new Dictionary<string, object>();

try
{
var managementClient = _transport.ManagementClient;
data["HasManagementClient"] = true;

// Lightweight probe: check namespace properties
var nsProperties = await managementClient.GetNamespacePropertiesAsync(cancellationToken);
data["Namespace"] = nsProperties?.Value?.Name ?? "unknown";
}
catch (Exception ex)
{
status = TransportHealthStatus.Unhealthy;
message = $"Azure Service Bus connectivity check failed: {ex.Message}";
data["HasManagementClient"] = false;
}

return new TransportHealthResult(TransportName, Protocol, status, message,
DateTimeOffset.UtcNow, data);
}

public override async Task<long?> GetBrokerQueueDepthAsync(Uri endpointUri,
CancellationToken cancellationToken = default)
{
if (endpointUri.Scheme != "asb") return null;

var queueName = endpointUri.Segments.LastOrDefault()?.TrimEnd('/');
if (string.IsNullOrEmpty(queueName)) return null;

try
{
var managementClient = _transport.ManagementClient;
var runtimeProps = await managementClient.GetQueueRuntimePropertiesAsync(queueName, cancellationToken);
return runtimeProps.Value.ActiveMessageCount;
}
catch
{
// Queue may not exist, or it's a topic/subscription endpoint
return null;
}
}
}
45 changes: 45 additions & 0 deletions src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaHealthCheck.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
using Confluent.Kafka;
using Wolverine.Transports;

namespace Wolverine.Kafka.Internals;

internal class KafkaHealthCheck : WolverineTransportHealthCheck
{
private readonly KafkaTransport _transport;

public KafkaHealthCheck(KafkaTransport transport) => _transport = transport;

public override string TransportName => "Kafka";
public override string Protocol => "kafka";

public override Task<TransportHealthResult> CheckHealthAsync(CancellationToken cancellationToken = default)
{
var status = TransportHealthStatus.Healthy;
string? message = null;
var data = new Dictionary<string, object>();

try
{
// Create a short-lived admin client to check broker connectivity
using var adminClient = _transport.CreateAdminClient();
var metadata = adminClient.GetMetadata(TimeSpan.FromSeconds(5));

data["BrokerCount"] = metadata.Brokers.Count;
data["TopicCount"] = metadata.Topics.Count;

if (metadata.Brokers.Count == 0)
{
status = TransportHealthStatus.Unhealthy;
message = "No Kafka brokers available";
}
}
catch (Exception ex)
{
status = TransportHealthStatus.Unhealthy;
message = $"Kafka connectivity check failed: {ex.Message}";
}

return Task.FromResult(new TransportHealthResult(TransportName, Protocol, status, message,
DateTimeOffset.UtcNow, data));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ public override ValueTask ConnectAsync(IWolverineRuntime runtime)
return ValueTask.CompletedTask;
}

public WolverineTransportHealthCheck BuildHealthCheck(IWolverineRuntime runtime)
{
return new KafkaHealthCheck(this);
}

public override IEnumerable<PropertyColumn> DiagnosticColumns()
{
yield break;
Expand Down
41 changes: 41 additions & 0 deletions src/Transports/MQTT/Wolverine.MQTT/Internals/MqttHealthCheck.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
using Wolverine.Transports;

namespace Wolverine.MQTT.Internals;

internal class MqttHealthCheck : WolverineTransportHealthCheck
{
private readonly MqttTransport _transport;

public MqttHealthCheck(MqttTransport transport) => _transport = transport;

public override string TransportName => "MQTT";
public override string Protocol => "mqtt";

public override Task<TransportHealthResult> CheckHealthAsync(CancellationToken cancellationToken = default)
{
var status = TransportHealthStatus.Healthy;
string? message = null;
var data = new Dictionary<string, object>();

try
{
var client = _transport.Client;
data["IsConnected"] = client.IsConnected;

if (!client.IsConnected)
{
status = TransportHealthStatus.Unhealthy;
message = "MQTT client is not connected";
}
}
catch
{
status = TransportHealthStatus.Degraded;
message = "MQTT transport not yet initialized";
}

// MQTT is pub/sub — no broker-side queue depth concept
return Task.FromResult(new TransportHealthResult(TransportName, Protocol, status, message,
DateTimeOffset.UtcNow, data));
}
}
5 changes: 5 additions & 0 deletions src/Transports/MQTT/Wolverine.MQTT/Internals/MqttTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ public override async ValueTask InitializeAsync(IWolverineRuntime runtime)
}
}

public WolverineTransportHealthCheck BuildHealthCheck(IWolverineRuntime runtime)
{
return new MqttHealthCheck(this);
}

private void startSubscribing()
{
if (_subscribed) return;
Expand Down
43 changes: 43 additions & 0 deletions src/Transports/NATS/Wolverine.Nats/Internal/NatsHealthCheck.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
using Wolverine.Transports;

namespace Wolverine.Nats.Internal;

internal class NatsHealthCheck : WolverineTransportHealthCheck
{
private readonly NatsTransport _transport;

public NatsHealthCheck(NatsTransport transport) => _transport = transport;

public override string TransportName => "NATS";
public override string Protocol => "nats";

public override Task<TransportHealthResult> CheckHealthAsync(CancellationToken cancellationToken = default)
{
var status = TransportHealthStatus.Healthy;
string? message = null;
var data = new Dictionary<string, object>();

try
{
var connection = _transport.Connection;
var serverInfo = connection.ServerInfo;
data["HasConnection"] = true;
data["ServerVersion"] = serverInfo?.Version ?? "unknown";

if (serverInfo == null)
{
status = TransportHealthStatus.Degraded;
message = "NATS connected but no server info available";
}
}
catch
{
status = TransportHealthStatus.Degraded;
message = "NATS transport not yet initialized";
data["HasConnection"] = false;
}

return Task.FromResult(new TransportHealthResult(TransportName, Protocol, status, message,
DateTimeOffset.UtcNow, data));
}
}
5 changes: 5 additions & 0 deletions src/Transports/NATS/Wolverine.Nats/Internal/NatsTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ public override async ValueTask ConnectAsync(IWolverineRuntime runtime)
}
}

public WolverineTransportHealthCheck BuildHealthCheck(IWolverineRuntime runtime)
{
return new NatsHealthCheck(this);
}

public override IEnumerable<PropertyColumn> DiagnosticColumns()
{
yield return new PropertyColumn("Subject", "header");
Expand Down
Loading
Loading