Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Persistence/PostgresqlTests/PostgresqlTests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
<FrameworkReference Include="Microsoft.AspNetCore.App"/>
<PackageReference Include="Microsoft.NET.Test.Sdk" />
<PackageReference Include="GitHubActionsTestLogger" PrivateAssets="All" />
<PackageReference Include="NSubstitute"/>
<PackageReference Include="xunit"/>
<PackageReference Include="xunit.runner.visualstudio">
<PrivateAssets>all</PrivateAssets>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
using IntegrationTests;
using JasperFx;
using Microsoft.Extensions.Diagnostics.HealthChecks;
using NSubstitute;
using Npgsql;
using Shouldly;
using Wolverine;
using Wolverine.Postgresql.Transport;
using Wolverine.Runtime;
using Xunit;

namespace PostgresqlTests.Transport;

// Coverage for the StickyPostgresqlQueueListenerAgent health-check enrichment
// added in #2647. The agent layers per-tenant DB reachability, listener-latch
// state, and per-tenant queue depth on top of the default Status-based check.
//
// Pure logic (status precedence, no-endpoint case) is covered by the
// no-runtime test class. The reachability + queue-depth signals genuinely need
// a Postgres connection, so those live in the [Collection("postgresql")] class
// using PostgresqlContext.
public class sticky_listener_health_tests_no_runtime
{
private static StickyPostgresqlQueueListenerAgent agent_with_status(AgentStatus status)
{
var runtime = Substitute.For<IWolverineRuntime>();
runtime.Options.Returns(new WolverineOptions());

var agent = new StickyPostgresqlQueueListenerAgent(runtime, "messages", "tenant1");
agent.Status = status;
return agent;
}

[Fact]
public async Task unhealthy_when_status_is_not_running()
{
var agent = agent_with_status(AgentStatus.Stopped);

var result = await agent.CheckHealthAsync(new HealthCheckContext(), CancellationToken.None);

result.Status.ShouldBe(HealthStatus.Unhealthy);
result.Description.ShouldContain("Stopped");
}

[Fact]
public async Task healthy_when_running_and_endpoint_not_yet_built()
{
// Before StartAsync wires up _tenantEndpoint there's nothing to ping; the
// agent should report Healthy rather than crash on a null deref.
var agent = agent_with_status(AgentStatus.Running);

var result = await agent.CheckHealthAsync(new HealthCheckContext(), CancellationToken.None);

result.Status.ShouldBe(HealthStatus.Healthy);
agent.ConsecutiveDbFailureCount.ShouldBe(0);
}

[Fact]
public void agent_description_mentions_tenant_database_and_queue_name()
{
var agent = agent_with_status(AgentStatus.Running);

agent.Description.ShouldContain("tenant1");
agent.Description.ShouldContain("messages");
}
}

[Collection("postgresql")]
public class sticky_listener_health_db_tests : PostgresqlContext, IAsyncLifetime
{
private NpgsqlDataSource _dataSource = null!;
private TenantedPostgresqlQueue _endpoint = null!;
private PostgresqlQueue _parentQueue = null!;
private PostgresqlTransport _transport = null!;
private string _tableName = null!;

public async Task InitializeAsync()
{
_transport = new PostgresqlTransport();
_parentQueue = new PostgresqlQueue("stickyhealth", _transport);

_dataSource = NpgsqlDataSource.Create(Servers.PostgresConnectionString);
_endpoint = new TenantedPostgresqlQueue(_parentQueue, _dataSource, "tenantA");

_tableName = _parentQueue.QueueTable.Identifier.QualifiedName;

// Drop+recreate so each test starts from a known empty state.
await using var conn = await _dataSource.OpenConnectionAsync();
try
{
var drop = conn.CreateCommand();
drop.CommandText = $"DROP TABLE IF EXISTS {_tableName} CASCADE";
await drop.ExecuteNonQueryAsync();
}
finally
{
await conn.CloseAsync();
}

await _parentQueue.EnsureSchemaExists("tenantA", _dataSource);
}

public async Task DisposeAsync()
{
await using var conn = await _dataSource.OpenConnectionAsync();
try
{
var drop = conn.CreateCommand();
drop.CommandText = $"DROP TABLE IF EXISTS {_tableName} CASCADE";
await drop.ExecuteNonQueryAsync();
}
finally
{
await conn.CloseAsync();
}

await _dataSource.DisposeAsync();
}

[Fact]
public async Task ping_database_succeeds_against_real_postgres()
{
// Smoke — the SELECT 1 ping that the sticky-listener agent uses for the
// per-tenant DB reachability signal must not throw against a healthy DB.
await Should.NotThrowAsync(() => _endpoint.PingDatabaseAsync(CancellationToken.None));
}

[Fact]
public async Task get_queue_depth_returns_zero_for_empty_table()
{
(await _endpoint.GetQueueDepthAsync(CancellationToken.None)).ShouldBe(0);
}

[Fact]
public async Task get_queue_depth_reflects_inserted_rows()
{
await using (var conn = await _dataSource.OpenConnectionAsync())
{
try
{
for (var i = 0; i < 7; i++)
{
var insert = conn.CreateCommand();
insert.CommandText =
$"INSERT INTO {_tableName} (id, body, message_type, keep_until) " +
"VALUES (gen_random_uuid(), '\\x00'::bytea, 'TestMessage', null)";
await insert.ExecuteNonQueryAsync();
}
}
finally
{
await conn.CloseAsync();
}
}

(await _endpoint.GetQueueDepthAsync(CancellationToken.None)).ShouldBe(7);
}

[Fact]
public async Task ping_database_throws_against_unreachable_database()
{
// Point at a deliberately broken connection string. A failure here is the
// signal source the sticky-listener health check turns into Degraded /
// Unhealthy via its consecutive-failure counter.
var unreachable = NpgsqlDataSource.Create(
"Host=127.0.0.1;Port=1;Database=nope;Username=nope;password=nope;Timeout=2;Command Timeout=2;Pooling=false");

try
{
var endpoint = new TenantedPostgresqlQueue(_parentQueue, unreachable, "missing");
await Should.ThrowAsync<Exception>(
() => endpoint.PingDatabaseAsync(CancellationToken.None));
}
finally
{
await unreachable.DisposeAsync();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
using JasperFx;
using Microsoft.Extensions.Diagnostics.HealthChecks;
using Wolverine.Runtime;
using Wolverine.Runtime.Agents;
using Wolverine.Transports;

namespace Wolverine.Postgresql.Transport;

internal class StickyPostgresqlQueueListenerAgent : IAgent
{
// Per-tenant DB failures stay Degraded for the first few cycles and only escalate to
// Unhealthy once they persist. Mirrors the consecutive-failure semantics in
// DurabilityHealthSignals (#2646). Hard-coded here rather than pulling from
// DurabilitySettings so #2647 doesn't depend on #2646 landing first.
private const int ConsecutiveDbFailureUnhealthyThreshold = 3;

private readonly IWolverineRuntime _runtime;
private readonly string _queue;
private readonly string _databaseName;
private TenantedPostgresqlQueue? _tenantEndpoint;
private int _consecutiveDbFailures;

public StickyPostgresqlQueueListenerAgent(IWolverineRuntime runtime, string queue, string databaseName)
{
Expand All @@ -19,7 +28,7 @@ public StickyPostgresqlQueueListenerAgent(IWolverineRuntime runtime, string queu

Uri = new Uri($"{StickyPostgresqlQueueListenerAgentFamily.StickyListenerSchema}://{_queue}/{_databaseName}");
}

public AgentStatus Status { get; set; } = AgentStatus.Running;

public async Task StartAsync(CancellationToken cancellationToken)
Expand Down Expand Up @@ -58,4 +67,114 @@ public async Task StopAsync(CancellationToken cancellationToken)
/// <see cref="IAgent.Description"/>.
/// </summary>
public string Description => $"Sticky Postgres queue listener — pinned to the per-tenant database '{_databaseName}' for queue '{_queue}'. Only one node listens to each tenant database to avoid duplicate consumption.";
}

/// <summary>
/// Per-tenant health-check enrichments for the sticky Postgres queue listener (see #2647).
/// Layers three signals on top of the agent <see cref="Status"/>:
///
/// <list type="number">
/// <item><b>Per-tenant database reachability</b> — a `SELECT 1` against the assigned
/// <see cref="NpgsqlDataSource"/>. One failure ⇒ Degraded; <see cref="ConsecutiveDbFailureUnhealthyThreshold"/>
/// consecutive failures ⇒ Unhealthy. The underlying error is surfaced as the
/// description so operators see the specific node + tenant pair that's misbehaving.</item>
///
/// <item><b>Listener latch state</b> — mirrors what <c>ExclusiveListenerAgent</c> already
/// does: ask the runtime for the underlying <see cref="IListeningAgent"/> and translate
/// <see cref="ListeningStatus.TooBusy"/> ⇒ Degraded, <see cref="ListeningStatus.GloballyLatched"/>
/// ⇒ Unhealthy. Sticky listeners are pinned to specific nodes, so this localizes the
/// degradation to the affected node + tenant.</item>
///
/// <item><b>Per-tenant queue depth</b> — counts rows in the queue table for the
/// assigned database. If depth ≥ the parent endpoint's <see cref="BufferingLimits.Maximum"/>
/// threshold (when configured), Degraded. Skipped silently when the listener has no
/// buffering limits set.</item>
/// </list>
/// </summary>
public async Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context,
CancellationToken cancellationToken = default)
{
if (Status != AgentStatus.Running)
{
return HealthCheckResult.Unhealthy($"Agent {Uri} is {Status}");
}

var degraded = new List<string>(capacity: 4);
string? unhealthyReason = null;

// 1) Per-tenant database reachability — SELECT 1 ping
if (_tenantEndpoint is not null)
{
try
{
await _tenantEndpoint.PingDatabaseAsync(cancellationToken);
_consecutiveDbFailures = 0;
}
catch (Exception e)
{
_consecutiveDbFailures++;
if (_consecutiveDbFailures >= ConsecutiveDbFailureUnhealthyThreshold)
{
unhealthyReason =
$"Per-tenant database '{_databaseName}' unreachable for {_consecutiveDbFailures} consecutive checks: {e.Message}";
}
else
{
degraded.Add(
$"Per-tenant database '{_databaseName}' poll failed: {e.Message}");
}
}

// 2) Listener latch state — mirror ExclusiveListenerAgent
var listeningAgent = _runtime.Endpoints.FindListeningAgent(_tenantEndpoint.Uri);
if (listeningAgent is not null)
{
switch (listeningAgent.Status)
{
case ListeningStatus.TooBusy:
degraded.Add($"Listener {_queue}/{_databaseName} is too busy");
break;
case ListeningStatus.GloballyLatched:
unhealthyReason ??= $"Listener {_queue}/{_databaseName} is globally latched";
break;
}
}

// 3) Per-tenant queue depth — only when the parent endpoint sets a buffering ceiling
var bufferingLimits = _tenantEndpoint.BufferingLimits;
if (bufferingLimits is { Maximum: > 0 } && unhealthyReason is null)
{
try
{
var depth = await _tenantEndpoint.GetQueueDepthAsync(cancellationToken);
if (depth >= bufferingLimits.Maximum)
{
degraded.Add(
$"Queue {_queue}/{_databaseName} depth ({depth}) is at or above the buffering threshold ({bufferingLimits.Maximum})");
}
}
catch
{
// The DB-reachability ping above already captured the connection issue.
// Avoid double-counting toward _consecutiveDbFailures here.
}
}
}

if (unhealthyReason is not null)
{
return HealthCheckResult.Unhealthy(
degraded.Count == 0
? unhealthyReason
: $"{unhealthyReason}; {string.Join("; ", degraded)}");
}

return degraded.Count == 0
? HealthCheckResult.Healthy()
: HealthCheckResult.Degraded(string.Join("; ", degraded));
}

/// <summary>
/// Test-only window into the sticky-listener's consecutive-DB-failure tracker.
/// </summary>
internal int ConsecutiveDbFailureCount => _consecutiveDbFailures;
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,50 @@ public Task ScheduleRetryAsync(Envelope envelope, CancellationToken cancellation
{
return _sender.ScheduleRetryAsync(envelope, cancellation);
}

/// <summary>
/// Cheap connectivity ping against the per-tenant database. Used by
/// <see cref="StickyPostgresqlQueueListenerAgent.CheckHealthAsync"/> to surface
/// per-tenant DB reachability as a health signal.
/// </summary>
internal async Task PingDatabaseAsync(CancellationToken cancellationToken)
{
await using var conn = await _dataSource.OpenConnectionAsync(cancellationToken);
try
{
var cmd = conn.CreateCommand();
cmd.CommandText = "SELECT 1";
await cmd.ExecuteScalarAsync(cancellationToken);
}
finally
{
await conn.CloseAsync();
}
}

/// <summary>
/// Returns the row count of the parent queue table on this tenant's database. Used
/// by <see cref="StickyPostgresqlQueueListenerAgent.CheckHealthAsync"/> to surface
/// per-tenant queue depth as a health signal.
/// </summary>
internal async Task<long> GetQueueDepthAsync(CancellationToken cancellationToken)
{
await using var conn = await _dataSource.OpenConnectionAsync(cancellationToken);
try
{
var cmd = conn.CreateCommand();
cmd.CommandText = $"select count(*) from {_parent.QueueTable.Identifier}";
var raw = await cmd.ExecuteScalarAsync(cancellationToken);
return raw switch
{
long l => l,
int i => i,
_ => Convert.ToInt64(raw)
};
}
finally
{
await conn.CloseAsync();
}
}
}
Loading