Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion src/Persistence/MySql/Wolverine.MySql/MySqlAdvisoryLock.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,41 @@ public MySqlAdvisoryLock(MySqlDataSource source, ILogger logger, string database

public bool HasLock(int lockId)
{
return _conn is not { State: ConnectionState.Closed } && _locks.Contains(lockId);
if (_conn is null) return false;
if (!_locks.Contains(lockId)) return false;

// MySQL named locks (GET_LOCK / RELEASE_LOCK) are session-scoped,
// so the lock evaporates the moment the connection's MySQL session
// dies — KILL CONNECTION, network drop, idle-cull. MySqlConnection
// doesn't surface that immediately, so we ping. Without this,
// HasLock keeps returning true after the lock has been transferred
// and two nodes race as leader. See GH-2602.
try
{
using var cmd = _conn.CreateCommand();
cmd.CommandText = "select 1";
cmd.CommandTimeout = 2;
cmd.ExecuteScalar();
return true;
}
catch (Exception e)
{
_logger.LogWarning(e,
"Lost advisory-lock connection for database {Database}; clearing held lock ids {Locks}",
_databaseName, _locks);

_locks.Clear();
try
{
_conn.Dispose();
}
catch
{
// Already broken; nothing to do.
}
_conn = null;
return false;
}
}

public async Task<bool> TryAttainLockAsync(int lockId, CancellationToken token)
Expand Down
44 changes: 43 additions & 1 deletion src/Persistence/Oracle/Wolverine.Oracle/OracleAdvisoryLock.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,49 @@ public OracleAdvisoryLock(OracleDataSource source, ILogger logger, string schema

public bool HasLock(int lockId)
{
return _locks.Contains(lockId);
if (!_locks.Contains(lockId)) return false;
if (!_heldLocks.TryGetValue(lockId, out var held)) return false;

// Oracle row-level FOR UPDATE locks are tied to the transaction
// that took them, which is in turn tied to the holding connection.
// If the connection died (network drop, RAC failover, manual KILL
// SESSION), the row lock evaporates server-side but our in-memory
// state still claims it. Ping the held connection so we can detect
// a broken backend and self-clean. See GH-2602.
try
{
using var cmd = held.conn.CreateCommand();
cmd.CommandText = "select 1 from dual";
cmd.CommandTimeout = 2;
cmd.ExecuteScalar();
return true;
}
catch (Exception e)
{
_logger.LogWarning(e,
"Lost advisory-lock connection for lock {LockId} in schema {Schema}; clearing held state",
lockId, _schemaName);

_locks.Remove(lockId);
_heldLocks.Remove(lockId);
try
{
held.tx.Dispose();
}
catch
{
// already broken
}
try
{
held.conn.Dispose();
}
catch
{
// already broken
}
return false;
}
}

public async Task<bool> TryAttainLockAsync(int lockId, CancellationToken token)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
using IntegrationTests;
using Microsoft.Extensions.Logging.Abstractions;
using Npgsql;
using Shouldly;
using Xunit;
using AdvisoryLock = Wolverine.Postgresql.AdvisoryLock;

namespace PostgresqlTests.Bugs;

/// <summary>
/// Reproducer for https://github.com/JasperFx/wolverine/issues/2602.
///
/// In <c>DurabilityMode.Balanced</c> with the PostgreSQL persistence,
/// leader election relies on a session-level Postgres advisory lock.
/// When the holder's underlying Postgres backend is terminated server-side
/// (network blip, idle-connection cull, Postgres failover, Azure flexserver
/// maintenance, <c>pg_terminate_backend</c>, etc.), Postgres releases the
/// advisory lock and another node legitimately acquires it — but the
/// original leader's in-process bookkeeping in <c>AdvisoryLock.HasLock</c>
/// continues to return <c>true</c>. Result: two nodes simultaneously
/// believe they are the leader, both run <c>EvaluateAssignmentsAsync</c>,
/// and both dispatch <c>AssignAgent</c> commands.
/// </summary>
public class Bug_split_brain_advisory_lock_state_divergence : PostgresqlContext
{
[Fact]
public async Task has_lock_returns_true_after_postgres_releases_it_due_to_backend_termination()
{
const int lockId = unchecked((int)0xB16B00B5);

var dataSource = NpgsqlDataSource.Create(Servers.PostgresConnectionString);
try
{
var holder = new AdvisoryLock(dataSource, NullLogger.Instance, "split-brain-test");

// 1. Stale leader acquires the leadership lock.
(await holder.TryAttainLockAsync(lockId, CancellationToken.None)).ShouldBeTrue();
holder.HasLock(lockId).ShouldBeTrue();

// 2. Find the backend PID currently holding the lock and terminate it
// from a separate session — simulates a network blip, pg restart,
// idle-connection cull, or Azure flexserver maintenance.
var holderPid = await findAdvisoryLockHolderPidAsync(lockId);
holderPid.ShouldNotBeNull("Expected to find a backend holding the advisory lock");
await terminateBackendAsync(holderPid!.Value);

// 3. From a different connection, prove that Postgres really did
// release the lock — a contender now acquires it cleanly.
var contender = new AdvisoryLock(dataSource, NullLogger.Instance, "contender");
try
{
(await contender.TryAttainLockAsync(lockId, CancellationToken.None))
.ShouldBeTrue("Postgres should have released the lock when the holder's backend was terminated");
}
finally
{
await contender.ReleaseLockAsync(lockId);
await contender.DisposeAsync();
}

// 4. THE BUG: holder.HasLock used to still return true even though
// it is no longer the lock holder server-side. NodeAgentController
// would then happily call EvaluateAssignmentsAsync on this stale
// leader. With the fix in this PR, HasLock now pings the held
// connection and detects the broken backend, drops the in-memory
// state, and returns false.
holder.HasLock(lockId)
.ShouldBeFalse(
"AdvisoryLock.HasLock returned true even though Postgres has released the session-level lock and another session has acquired it.");

await holder.DisposeAsync();
}
finally
{
await dataSource.DisposeAsync();
}
}

private static async Task<int?> findAdvisoryLockHolderPidAsync(int lockId)
{
await using var conn = new NpgsqlConnection(Servers.PostgresConnectionString);
await conn.OpenAsync();
await using var cmd = conn.CreateCommand();
cmd.CommandText = @"
select pid from pg_locks
where locktype = 'advisory'
and granted = true
and ((classid::bigint << 32) | (objid::bigint & x'FFFFFFFF'::bigint)) = @lockId
limit 1";
cmd.Parameters.AddWithValue("@lockId", (long)lockId);
var raw = await cmd.ExecuteScalarAsync();
return raw is int pid ? pid : null;
}

private static async Task terminateBackendAsync(int pid)
{
await using var conn = new NpgsqlConnection(Servers.PostgresConnectionString);
await conn.OpenAsync();
await using var cmd = conn.CreateCommand();
cmd.CommandText = "select pg_terminate_backend(@pid)";
cmd.Parameters.AddWithValue("@pid", pid);
await cmd.ExecuteScalarAsync();
await Task.Delay(250);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,42 @@ public AdvisoryLock(NpgsqlDataSource source, ILogger logger, string databaseName

public bool HasLock(int lockId)
{
return _conn is not { State: ConnectionState.Closed } && _locks.Contains(lockId);
if (_conn is null) return false;
if (!_locks.Contains(lockId)) return false;

// Postgres releases session-level advisory locks the moment the
// backend session ends — network blip, idle-connection cull,
// pg_terminate_backend, Postgres failover, Azure flexserver
// maintenance. Npgsql's NpgsqlConnection.State stays Open until
// we actually try to use it, so without this ping HasLock keeps
// claiming the lock long after another session has acquired it,
// and two nodes both believe they're the leader. See GH-2602.
try
{
using var cmd = _conn.CreateCommand();
cmd.CommandText = "select 1";
cmd.CommandTimeout = 2;
cmd.ExecuteScalar();
return true;
}
catch (Exception e)
{
_logger.LogWarning(e,
"Lost advisory-lock connection for database {Database}; clearing held lock ids {Locks}",
_databaseName, _locks);

_locks.Clear();
try
{
_conn.Dispose();
}
catch
{
// Already broken; nothing to do.
}
_conn = null;
return false;
}
}

public async Task<bool> TryAttainLockAsync(int lockId, CancellationToken token)
Expand Down
Loading
Loading