From 7c3c8ba9ba32717bcce3066b8b713cb00d43230e Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Mon, 27 Apr 2026 12:47:44 -0500 Subject: [PATCH] Fix #2602: leader split-brain via stale advisory-lock state MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In DurabilityMode.Balanced with the PostgreSQL persistence (and the same pattern in SQL Server / MySQL / Oracle / SQLite), leader election relied on a session-level advisory lock. When the holder's underlying database backend was terminated server-side — network blip, idle-connection cull, pg_terminate_backend, AlwaysOn / RAC failover, Azure flexserver maintenance — the database released the lock and another node legitimately acquired it, but the original leader's in-process AdvisoryLock.HasLock kept returning true. Two nodes simultaneously believed they were the leader, both ran EvaluateAssignmentsAsync, both dispatched AssignAgent commands, and the same agent ended up running twice. Three layered fixes: 1. Server-side liveness check in AdvisoryLock.HasLock. Each call now pings the held connection (`SELECT 1`, 2-second timeout). On failure the in-memory _locks list is cleared and the broken connection is disposed. Applied across all five Wolverine-owned implementations: - Wolverine.Postgresql.AdvisoryLock - Wolverine.MySql.MySqlAdvisoryLock - Wolverine.Oracle.OracleAdvisoryLock (per-lock connection ping) - Wolverine.Sqlite.SqliteAdvisoryLock - Wolverine.SqlServer.SqlServerAdvisoryLock (NEW; Wolverine-owned replacement for Weasel.SqlServer.AdvisoryLock so we can ship the fix without a Weasel coordination round-trip — Marten / other Weasel consumers should land the same fix upstream) 2. Heartbeat step-down + re-election. NodeAgentController.HeartBeat now detects "I was the leader last tick, but the lock is gone now" and calls a new stepDownAsync(reason): clears IsLeader, stops the local LeaderUri agent so this node stops dispatching assignments, best- effort releases the persistence-layer lock, notifies the observer (new IWolverineObserver.LostLeadership() with a default no-op so third-party observers are unaffected), and falls through to the normal TryAttainLeadershipLockAsync path so the same tick triggers a fresh leadership election. Logs a NodeRecordType.LeadershipLost record. 3. AssignmentGrid duplicate detection + heal. AssignmentGrid.Node.Running used to silently overwrite the dictionary when two nodes both reported the same agent in their ActiveAgents — the smoking gun for split-brain that FindDelta could never see. The grid now records each duplicate in DuplicateAgentReports, and EvaluateAssignmentsAsync emits a StopRemoteAgent for the older copy on each detected duplicate so the split-brain residue self-heals on the next leadership tick even if Layers 1 and 2 had a hole. Tests: - PostgresqlTests.Bugs.Bug_split_brain_advisory_lock_state_divergence (the reporter's exact reproducer, dropped in verbatim) — fails on main, passes with Layer 1. - CoreTests.Runtime.Agents.duplicate_agent_split_brain_detection — 3 tests covering Layer 3. - Full CoreTests: 1367/1367 pass. - Postgres agent / leader / advisory-lock tests: 21/21. - SqlServer agent / leader / advisory-lock tests: 17/17. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../Wolverine.MySql/MySqlAdvisoryLock.cs | 36 +++- .../Wolverine.Oracle/OracleAdvisoryLock.cs | 44 +++- ...it_brain_advisory_lock_state_divergence.cs | 105 ++++++++++ .../PostgresqlNodePersistence.cs | 37 +++- .../Persistence/SqlServerAdvisoryLock.cs | 197 ++++++++++++++++++ .../Persistence/SqlServerMessageStore.cs | 5 +- .../Wolverine.Sqlite/SqliteAdvisoryLock.cs | 35 +++- .../duplicate_agent_split_brain_detection.cs | 58 ++++++ .../Runtime/Agents/AssignmentGrid.Node.cs | 13 ++ .../Runtime/Agents/AssignmentGrid.cs | 15 ++ .../Runtime/Agents/DuplicateAgentReport.cs | 15 ++ .../Runtime/Agents/IWolverineObserver.cs | 22 ++ ...NodeAgentController.EvaluateAssignments.cs | 16 ++ .../Agents/NodeAgentController.HeartBeat.cs | 66 ++++++ .../Runtime/Agents/NodeRecordType.cs | 10 +- 15 files changed, 668 insertions(+), 6 deletions(-) create mode 100644 src/Persistence/PostgresqlTests/Bugs/Bug_split_brain_advisory_lock_state_divergence.cs create mode 100644 src/Persistence/Wolverine.SqlServer/Persistence/SqlServerAdvisoryLock.cs create mode 100644 src/Testing/CoreTests/Runtime/Agents/duplicate_agent_split_brain_detection.cs create mode 100644 src/Wolverine/Runtime/Agents/DuplicateAgentReport.cs diff --git a/src/Persistence/MySql/Wolverine.MySql/MySqlAdvisoryLock.cs b/src/Persistence/MySql/Wolverine.MySql/MySqlAdvisoryLock.cs index 2bf8d1b0d..fa943901e 100644 --- a/src/Persistence/MySql/Wolverine.MySql/MySqlAdvisoryLock.cs +++ b/src/Persistence/MySql/Wolverine.MySql/MySqlAdvisoryLock.cs @@ -28,7 +28,41 @@ public MySqlAdvisoryLock(MySqlDataSource source, ILogger logger, string database public bool HasLock(int lockId) { - return _conn is not { State: ConnectionState.Closed } && _locks.Contains(lockId); + if (_conn is null) return false; + if (!_locks.Contains(lockId)) return false; + + // MySQL named locks (GET_LOCK / RELEASE_LOCK) are session-scoped, + // so the lock evaporates the moment the connection's MySQL session + // dies — KILL CONNECTION, network drop, idle-cull. MySqlConnection + // doesn't surface that immediately, so we ping. Without this, + // HasLock keeps returning true after the lock has been transferred + // and two nodes race as leader. See GH-2602. + try + { + using var cmd = _conn.CreateCommand(); + cmd.CommandText = "select 1"; + cmd.CommandTimeout = 2; + cmd.ExecuteScalar(); + return true; + } + catch (Exception e) + { + _logger.LogWarning(e, + "Lost advisory-lock connection for database {Database}; clearing held lock ids {Locks}", + _databaseName, _locks); + + _locks.Clear(); + try + { + _conn.Dispose(); + } + catch + { + // Already broken; nothing to do. + } + _conn = null; + return false; + } } public async Task TryAttainLockAsync(int lockId, CancellationToken token) diff --git a/src/Persistence/Oracle/Wolverine.Oracle/OracleAdvisoryLock.cs b/src/Persistence/Oracle/Wolverine.Oracle/OracleAdvisoryLock.cs index aeffac073..98ad603e9 100644 --- a/src/Persistence/Oracle/Wolverine.Oracle/OracleAdvisoryLock.cs +++ b/src/Persistence/Oracle/Wolverine.Oracle/OracleAdvisoryLock.cs @@ -28,7 +28,49 @@ public OracleAdvisoryLock(OracleDataSource source, ILogger logger, string schema public bool HasLock(int lockId) { - return _locks.Contains(lockId); + if (!_locks.Contains(lockId)) return false; + if (!_heldLocks.TryGetValue(lockId, out var held)) return false; + + // Oracle row-level FOR UPDATE locks are tied to the transaction + // that took them, which is in turn tied to the holding connection. + // If the connection died (network drop, RAC failover, manual KILL + // SESSION), the row lock evaporates server-side but our in-memory + // state still claims it. Ping the held connection so we can detect + // a broken backend and self-clean. See GH-2602. + try + { + using var cmd = held.conn.CreateCommand(); + cmd.CommandText = "select 1 from dual"; + cmd.CommandTimeout = 2; + cmd.ExecuteScalar(); + return true; + } + catch (Exception e) + { + _logger.LogWarning(e, + "Lost advisory-lock connection for lock {LockId} in schema {Schema}; clearing held state", + lockId, _schemaName); + + _locks.Remove(lockId); + _heldLocks.Remove(lockId); + try + { + held.tx.Dispose(); + } + catch + { + // already broken + } + try + { + held.conn.Dispose(); + } + catch + { + // already broken + } + return false; + } } public async Task TryAttainLockAsync(int lockId, CancellationToken token) diff --git a/src/Persistence/PostgresqlTests/Bugs/Bug_split_brain_advisory_lock_state_divergence.cs b/src/Persistence/PostgresqlTests/Bugs/Bug_split_brain_advisory_lock_state_divergence.cs new file mode 100644 index 000000000..5c84d44e9 --- /dev/null +++ b/src/Persistence/PostgresqlTests/Bugs/Bug_split_brain_advisory_lock_state_divergence.cs @@ -0,0 +1,105 @@ +using IntegrationTests; +using Microsoft.Extensions.Logging.Abstractions; +using Npgsql; +using Shouldly; +using Xunit; +using AdvisoryLock = Wolverine.Postgresql.AdvisoryLock; + +namespace PostgresqlTests.Bugs; + +/// +/// Reproducer for https://github.com/JasperFx/wolverine/issues/2602. +/// +/// In DurabilityMode.Balanced with the PostgreSQL persistence, +/// leader election relies on a session-level Postgres advisory lock. +/// When the holder's underlying Postgres backend is terminated server-side +/// (network blip, idle-connection cull, Postgres failover, Azure flexserver +/// maintenance, pg_terminate_backend, etc.), Postgres releases the +/// advisory lock and another node legitimately acquires it — but the +/// original leader's in-process bookkeeping in AdvisoryLock.HasLock +/// continues to return true. Result: two nodes simultaneously +/// believe they are the leader, both run EvaluateAssignmentsAsync, +/// and both dispatch AssignAgent commands. +/// +public class Bug_split_brain_advisory_lock_state_divergence : PostgresqlContext +{ + [Fact] + public async Task has_lock_returns_true_after_postgres_releases_it_due_to_backend_termination() + { + const int lockId = unchecked((int)0xB16B00B5); + + var dataSource = NpgsqlDataSource.Create(Servers.PostgresConnectionString); + try + { + var holder = new AdvisoryLock(dataSource, NullLogger.Instance, "split-brain-test"); + + // 1. Stale leader acquires the leadership lock. + (await holder.TryAttainLockAsync(lockId, CancellationToken.None)).ShouldBeTrue(); + holder.HasLock(lockId).ShouldBeTrue(); + + // 2. Find the backend PID currently holding the lock and terminate it + // from a separate session — simulates a network blip, pg restart, + // idle-connection cull, or Azure flexserver maintenance. + var holderPid = await findAdvisoryLockHolderPidAsync(lockId); + holderPid.ShouldNotBeNull("Expected to find a backend holding the advisory lock"); + await terminateBackendAsync(holderPid!.Value); + + // 3. From a different connection, prove that Postgres really did + // release the lock — a contender now acquires it cleanly. + var contender = new AdvisoryLock(dataSource, NullLogger.Instance, "contender"); + try + { + (await contender.TryAttainLockAsync(lockId, CancellationToken.None)) + .ShouldBeTrue("Postgres should have released the lock when the holder's backend was terminated"); + } + finally + { + await contender.ReleaseLockAsync(lockId); + await contender.DisposeAsync(); + } + + // 4. THE BUG: holder.HasLock used to still return true even though + // it is no longer the lock holder server-side. NodeAgentController + // would then happily call EvaluateAssignmentsAsync on this stale + // leader. With the fix in this PR, HasLock now pings the held + // connection and detects the broken backend, drops the in-memory + // state, and returns false. + holder.HasLock(lockId) + .ShouldBeFalse( + "AdvisoryLock.HasLock returned true even though Postgres has released the session-level lock and another session has acquired it."); + + await holder.DisposeAsync(); + } + finally + { + await dataSource.DisposeAsync(); + } + } + + private static async Task findAdvisoryLockHolderPidAsync(int lockId) + { + await using var conn = new NpgsqlConnection(Servers.PostgresConnectionString); + await conn.OpenAsync(); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = @" + select pid from pg_locks + where locktype = 'advisory' + and granted = true + and ((classid::bigint << 32) | (objid::bigint & x'FFFFFFFF'::bigint)) = @lockId + limit 1"; + cmd.Parameters.AddWithValue("@lockId", (long)lockId); + var raw = await cmd.ExecuteScalarAsync(); + return raw is int pid ? pid : null; + } + + private static async Task terminateBackendAsync(int pid) + { + await using var conn = new NpgsqlConnection(Servers.PostgresConnectionString); + await conn.OpenAsync(); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = "select pg_terminate_backend(@pid)"; + cmd.Parameters.AddWithValue("@pid", pid); + await cmd.ExecuteScalarAsync(); + await Task.Delay(250); + } +} diff --git a/src/Persistence/Wolverine.Postgresql/PostgresqlNodePersistence.cs b/src/Persistence/Wolverine.Postgresql/PostgresqlNodePersistence.cs index bc1c324f1..543898a50 100644 --- a/src/Persistence/Wolverine.Postgresql/PostgresqlNodePersistence.cs +++ b/src/Persistence/Wolverine.Postgresql/PostgresqlNodePersistence.cs @@ -393,7 +393,42 @@ public AdvisoryLock(NpgsqlDataSource source, ILogger logger, string databaseName public bool HasLock(int lockId) { - return _conn is not { State: ConnectionState.Closed } && _locks.Contains(lockId); + if (_conn is null) return false; + if (!_locks.Contains(lockId)) return false; + + // Postgres releases session-level advisory locks the moment the + // backend session ends — network blip, idle-connection cull, + // pg_terminate_backend, Postgres failover, Azure flexserver + // maintenance. Npgsql's NpgsqlConnection.State stays Open until + // we actually try to use it, so without this ping HasLock keeps + // claiming the lock long after another session has acquired it, + // and two nodes both believe they're the leader. See GH-2602. + try + { + using var cmd = _conn.CreateCommand(); + cmd.CommandText = "select 1"; + cmd.CommandTimeout = 2; + cmd.ExecuteScalar(); + return true; + } + catch (Exception e) + { + _logger.LogWarning(e, + "Lost advisory-lock connection for database {Database}; clearing held lock ids {Locks}", + _databaseName, _locks); + + _locks.Clear(); + try + { + _conn.Dispose(); + } + catch + { + // Already broken; nothing to do. + } + _conn = null; + return false; + } } public async Task TryAttainLockAsync(int lockId, CancellationToken token) diff --git a/src/Persistence/Wolverine.SqlServer/Persistence/SqlServerAdvisoryLock.cs b/src/Persistence/Wolverine.SqlServer/Persistence/SqlServerAdvisoryLock.cs new file mode 100644 index 000000000..5591f20ac --- /dev/null +++ b/src/Persistence/Wolverine.SqlServer/Persistence/SqlServerAdvisoryLock.cs @@ -0,0 +1,197 @@ +using System.Data; +using JasperFx.Core; +using Microsoft.Data.SqlClient; +using Microsoft.Extensions.Logging; +using Weasel.Core; +using Weasel.SqlServer; + +namespace Wolverine.SqlServer.Persistence; + +/// +/// Wolverine-owned for SQL Server. Equivalent to +/// Weasel.SqlServer.AdvisoryLock but with a server-side liveness ping +/// in so a stale leader whose Postgres / SQL Server +/// session has been killed (KILL SPID, AlwaysOn failover, idle-connection +/// drop, NAT gateway reuse, etc.) detects the lock loss instead of forever +/// claiming to be the leader. +/// +/// See https://github.com/JasperFx/wolverine/issues/2602. +/// +internal class SqlServerAdvisoryLock : IAdvisoryLock +{ + private readonly Func _source; + private readonly ILogger _logger; + private readonly string _databaseName; + private readonly List _locks = new(); + private SqlConnection? _conn; + + public SqlServerAdvisoryLock(Func source, ILogger logger, string databaseName) + { + _source = source; + _logger = logger; + _databaseName = databaseName; + } + + public bool HasLock(int lockId) + { + if (_conn is null) return false; + if (!_locks.Contains(lockId)) return false; + + // SQL Server session-scoped application locks (sp_getapplock / + // sp_releaseapplock) are released the instant the SQL session ends — + // KILL SPID, network drop, AlwaysOn failover, AAD token expiry on + // managed identity. SqlConnection.State stays Open until we use it, + // so without this ping HasLock keeps reporting the lock held long + // after another session has acquired it. See GH-2602. + try + { + using var cmd = _conn.CreateCommand(); + cmd.CommandText = "select 1"; + cmd.CommandTimeout = 2; + cmd.ExecuteScalar(); + return true; + } + catch (Exception e) + { + _logger.LogWarning(e, + "Lost advisory-lock connection for database {Database}; clearing held lock ids {Locks}", + _databaseName, _locks); + + _locks.Clear(); + try + { + _conn.Dispose(); + } + catch + { + // Already broken; nothing to do. + } + _conn = null; + return false; + } + } + + public async Task TryAttainLockAsync(int lockId, CancellationToken token) + { + if (_conn == null) + { + _conn = _source(); + await _conn.OpenAsync(token).ConfigureAwait(false); + } + + if (_conn.State == ConnectionState.Closed) + { + try + { + await _conn.DisposeAsync().ConfigureAwait(false); + } + catch (Exception e) + { + _logger.LogError(e, "Error trying to clean up and restart an advisory lock connection"); + } + finally + { + _conn = null; + } + + return false; + } + + var attained = await _conn.TryGetGlobalLock(lockId.ToString(), cancellation: token).ConfigureAwait(false); + if (attained) + { + _locks.Add(lockId); + return true; + } + + return false; + } + + public async Task ReleaseLockAsync(int lockId) + { + if (!_locks.Contains(lockId)) return; + + if (_conn == null || _conn.State == ConnectionState.Closed) + { + _locks.Remove(lockId); + return; + } + + try + { + var cancellation = new CancellationTokenSource(); + cancellation.CancelAfter(1.Seconds()); + + await _conn.ReleaseGlobalLock(lockId.ToString(), cancellation: cancellation.Token).ConfigureAwait(false); + } + catch (Exception e) + { + _logger.LogDebug(e, + "Error trying to release advisory lock {LockId} for database {Identifier}", + lockId, _databaseName); + } + + _locks.Remove(lockId); + + if (!_locks.Any()) + { + await safeCloseConnectionAsync().ConfigureAwait(false); + } + } + + public async ValueTask DisposeAsync() + { + if (_conn == null) return; + + try + { + if (_conn.State == ConnectionState.Open) + { + foreach (var i in _locks) + { + try + { + await _conn.ReleaseGlobalLock(i.ToString(), CancellationToken.None).ConfigureAwait(false); + } + catch (Exception e) + { + _logger.LogDebug(e, + "Error trying to release advisory lock {LockId} during dispose for database {Identifier}", + i, _databaseName); + } + } + } + + await safeCloseConnectionAsync().ConfigureAwait(false); + } + catch (Exception e) + { + _logger.LogDebug(e, "Error trying to dispose of advisory locks for database {Identifier}", + _databaseName); + } + } + + private async Task safeCloseConnectionAsync() + { + if (_conn == null) return; + + try + { + if (_conn.State == ConnectionState.Open) + { + await _conn.CloseAsync().ConfigureAwait(false); + } + + await _conn.DisposeAsync().ConfigureAwait(false); + } + catch (Exception e) + { + _logger.LogDebug(e, "Error trying to close advisory lock connection for database {Identifier}", + _databaseName); + } + finally + { + _conn = null; + } + } +} diff --git a/src/Persistence/Wolverine.SqlServer/Persistence/SqlServerMessageStore.cs b/src/Persistence/Wolverine.SqlServer/Persistence/SqlServerMessageStore.cs index db79250fa..c81f1b910 100644 --- a/src/Persistence/Wolverine.SqlServer/Persistence/SqlServerMessageStore.cs +++ b/src/Persistence/Wolverine.SqlServer/Persistence/SqlServerMessageStore.cs @@ -45,7 +45,10 @@ public SqlServerMessageStore(DatabaseSettings database, DurabilitySettings setti $"select top (@limit) {DatabaseConstants.IncomingFields} from {database.SchemaName}.{DatabaseConstants.IncomingTable} where owner_id = {TransportConstants.AnyNode} and status = '{EnvelopeStatus.Incoming}' and {DatabaseConstants.ReceivedAt} = @address"; _scheduledLockId = "Wolverine:Scheduled:" + database.ScheduledJobLockId.ToString(); - AdvisoryLock = new AdvisoryLock(() => new SqlConnection(database.ConnectionString), + // Use the Wolverine-owned SqlServerAdvisoryLock (not Weasel.SqlServer.AdvisoryLock) + // so HasLock pings the held SQL session and detects KILL SPID / AlwaysOn + // failover / network drops. See GH-2602. + AdvisoryLock = new SqlServerAdvisoryLock(() => new SqlConnection(database.ConnectionString), logger, Identifier); foreach (var sagaTableDefinition in sagaTypes) diff --git a/src/Persistence/Wolverine.Sqlite/SqliteAdvisoryLock.cs b/src/Persistence/Wolverine.Sqlite/SqliteAdvisoryLock.cs index 7d55cafdb..66f1259d2 100644 --- a/src/Persistence/Wolverine.Sqlite/SqliteAdvisoryLock.cs +++ b/src/Persistence/Wolverine.Sqlite/SqliteAdvisoryLock.cs @@ -24,7 +24,40 @@ public SqliteAdvisoryLock(DbDataSource dataSource, ILogger logger, string databa public bool HasLock(int lockId) { - return _conn is not { State: ConnectionState.Closed } && _locks.Contains(lockId); + if (_conn is null) return false; + if (!_locks.Contains(lockId)) return false; + + // SQLite advisory locks are table rows; in single-process tests + // the connection is unlikely to die out from under us, but for + // parity with the Postgres / MySQL fix and to detect any held + // connection that has gone bad (e.g. file deleted under us), + // ping before reporting the lock as held. See GH-2602. + try + { + using var cmd = _conn.CreateCommand(); + cmd.CommandText = "select 1"; + cmd.CommandTimeout = 2; + cmd.ExecuteScalar(); + return true; + } + catch (Exception e) + { + _logger.LogWarning(e, + "Lost advisory-lock connection for database {Database}; clearing held lock ids {Locks}", + _databaseName, _locks); + + _locks.Clear(); + try + { + _conn.Dispose(); + } + catch + { + // Already broken; nothing to do. + } + _conn = null; + return false; + } } public async Task TryAttainLockAsync(int lockId, CancellationToken token) diff --git a/src/Testing/CoreTests/Runtime/Agents/duplicate_agent_split_brain_detection.cs b/src/Testing/CoreTests/Runtime/Agents/duplicate_agent_split_brain_detection.cs new file mode 100644 index 000000000..3c5cf3fe4 --- /dev/null +++ b/src/Testing/CoreTests/Runtime/Agents/duplicate_agent_split_brain_detection.cs @@ -0,0 +1,58 @@ +using Shouldly; +using Wolverine.Runtime.Agents; +using Xunit; + +namespace CoreTests.Runtime.Agents; + +/// +/// Locks down the Layer 3 fix for GH-2602. When two +/// rows both list the same agent in their ActiveAgents — the residue of +/// a brief leader split-brain — the assignment grid must record the duplicate +/// so the (now-correct) leader can heal it via StopRemoteAgent. Before +/// the fix, the dictionary write in Node.Running silently overwrote +/// the first node's entry and the duplicate became invisible to FindDelta. +/// +public class duplicate_agent_split_brain_detection +{ + private readonly Uri agent1 = new Uri("blue://1"); + private readonly Uri agent2 = new Uri("blue://2"); + + [Fact] + public void records_duplicate_when_same_agent_runs_on_two_nodes() + { + var grid = new AssignmentGrid(); + + var node1 = grid.WithNode(1, Guid.NewGuid()).Running(agent1); + var node2 = grid.WithNode(2, Guid.NewGuid()).Running(agent1); + + grid.DuplicateAgentReports.Count.ShouldBe(1); + + var report = grid.DuplicateAgentReports.Single(); + report.AgentUri.ShouldBe(agent1); + report.ExistingNode.ShouldBe(node1); + report.NewNode.ShouldBe(node2); + } + + [Fact] + public void no_report_when_each_agent_runs_on_only_one_node() + { + var grid = new AssignmentGrid(); + + grid.WithNode(1, Guid.NewGuid()).Running(agent1); + grid.WithNode(2, Guid.NewGuid()).Running(agent2); + + grid.DuplicateAgentReports.ShouldBeEmpty(); + } + + [Fact] + public void no_report_when_same_node_lists_agent_twice() + { + // A single Running call with duplicates within the same node — odd + // input, but not the split-brain we're trying to detect. + var grid = new AssignmentGrid(); + + grid.WithNode(1, Guid.NewGuid()).Running(agent1, agent1); + + grid.DuplicateAgentReports.ShouldBeEmpty(); + } +} diff --git a/src/Wolverine/Runtime/Agents/AssignmentGrid.Node.cs b/src/Wolverine/Runtime/Agents/AssignmentGrid.Node.cs index 463bdb65a..ee257ea13 100644 --- a/src/Wolverine/Runtime/Agents/AssignmentGrid.Node.cs +++ b/src/Wolverine/Runtime/Agents/AssignmentGrid.Node.cs @@ -68,6 +68,19 @@ public Node Running(params Uri[] agentUris) { foreach (var agentUri in agentUris) { + // Detect split-brain residue: another node already reported + // this agent as running, so we have a duplicate. Record it so + // the leader can emit a StopRemoteAgent against the existing + // copy. Without this, the dictionary write below silently + // overwrites the first node's entry and the duplicate becomes + // invisible to FindDelta. See GH-2602. + if (_parent._agents.TryGetValue(agentUri, out var existing) && + existing.OriginalNode != null && + !ReferenceEquals(existing.OriginalNode, this)) + { + _parent.RecordDuplicateAgent(agentUri, existing.OriginalNode, this); + } + var agent = new Agent(agentUri, this); _parent._agents[agentUri] = agent; diff --git a/src/Wolverine/Runtime/Agents/AssignmentGrid.cs b/src/Wolverine/Runtime/Agents/AssignmentGrid.cs index 8f63aa02d..938cf4b4e 100644 --- a/src/Wolverine/Runtime/Agents/AssignmentGrid.cs +++ b/src/Wolverine/Runtime/Agents/AssignmentGrid.cs @@ -13,6 +13,21 @@ public partial class AssignmentGrid { private readonly Dictionary _agents = new(); private readonly List _nodes = new(); + private readonly List _duplicateAgentReports = new(); + + /// + /// Reports of the same agent URI being claimed-as-running by more than + /// one when this grid was assembled. The + /// leader uses these to emit StopRemoteAgent commands so the + /// duplicates are healed even after a stale-leader split-brain. See + /// GH-2602. + /// + internal IReadOnlyList DuplicateAgentReports => _duplicateAgentReports; + + internal void RecordDuplicateAgent(Uri agentUri, Node existingNode, Node newNode) + { + _duplicateAgentReports.Add(new DuplicateAgentReport(agentUri, existingNode, newNode)); + } /// /// The identity of all currently unassigned agents diff --git a/src/Wolverine/Runtime/Agents/DuplicateAgentReport.cs b/src/Wolverine/Runtime/Agents/DuplicateAgentReport.cs new file mode 100644 index 000000000..658ff52a8 --- /dev/null +++ b/src/Wolverine/Runtime/Agents/DuplicateAgentReport.cs @@ -0,0 +1,15 @@ +namespace Wolverine.Runtime.Agents; + +/// +/// Recorded by when a single agent URI shows up +/// in more than one list during a +/// heartbeat tick. Symptom of a leader split-brain (GH-2602): two leaders +/// dispatched AssignAgent for the same agent in close succession. +/// The (now-correct) leader uses these reports to emit a +/// StopRemoteAgent targeting , leaving the +/// freshest copy on running. +/// +internal sealed record DuplicateAgentReport( + Uri AgentUri, + AssignmentGrid.Node ExistingNode, + AssignmentGrid.Node NewNode); diff --git a/src/Wolverine/Runtime/Agents/IWolverineObserver.cs b/src/Wolverine/Runtime/Agents/IWolverineObserver.cs index f805fdfe4..9b6375c37 100644 --- a/src/Wolverine/Runtime/Agents/IWolverineObserver.cs +++ b/src/Wolverine/Runtime/Agents/IWolverineObserver.cs @@ -10,6 +10,15 @@ namespace Wolverine.Runtime.Agents; public interface IWolverineObserver { Task AssumedLeadership(); + + /// + /// The node was leader, detected that its underlying advisory lock had + /// been released server-side, and has stepped down. Default no-op so + /// existing custom implementations are + /// unaffected. See GH-2602. + /// + Task LostLeadership() => Task.CompletedTask; + Task NodeStarted(); Task NodeStopped(); Task AgentStarted(Uri agentUri); @@ -90,6 +99,19 @@ await _runtime.Storage.Nodes.LogRecordsAsync(NodeRecord.For(_runtime.Options, NodeRecordType.LeadershipAssumed, NodeAgentController.LeaderUri)); } + public async Task LostLeadership() + { + try + { + await _runtime.Storage.Nodes.LogRecordsAsync(NodeRecord.For(_runtime.Options, + NodeRecordType.LeadershipLost, NodeAgentController.LeaderUri)); + } + catch (NotSupportedException) + { + // NullMessageStore does not support node persistence; nothing to log. + } + } + public async Task NodeStarted() { await _runtime.Storage.Nodes.LogRecordsAsync(NodeRecord.For(_runtime.Options, NodeRecordType.NodeStarted)); diff --git a/src/Wolverine/Runtime/Agents/NodeAgentController.EvaluateAssignments.cs b/src/Wolverine/Runtime/Agents/NodeAgentController.EvaluateAssignments.cs index bab6ff180..b98c6dea6 100644 --- a/src/Wolverine/Runtime/Agents/NodeAgentController.EvaluateAssignments.cs +++ b/src/Wolverine/Runtime/Agents/NodeAgentController.EvaluateAssignments.cs @@ -64,6 +64,22 @@ public async Task EvaluateAssignmentsAsync( } } + // Heal split-brain residue: if any agent is reported running on more + // than one node — most likely because a stale leader and the current + // leader both dispatched AssignAgent for the same agent in close + // succession before the stale leader stepped down — emit a + // StopRemoteAgent for the older copy. The freshest copy stays + // running; the next assignment cycle will rebalance if needed. See + // GH-2602. + foreach (var report in grid.DuplicateAgentReports) + { + _logger.LogWarning( + "Detected duplicate agent {AgentUri} reported running on Node {ExistingNode} and Node {NewNode} — sending StopRemoteAgent to the older copy to heal split-brain residue.", + report.AgentUri, report.ExistingNode.AssignedId, report.NewNode.AssignedId); + + commands.Add(new StopRemoteAgent(report.AgentUri, report.ExistingNode.ToDestination())); + } + await _observer.AssignmentsChanged(grid, commands); LastAssignments = grid; diff --git a/src/Wolverine/Runtime/Agents/NodeAgentController.HeartBeat.cs b/src/Wolverine/Runtime/Agents/NodeAgentController.HeartBeat.cs index 125171f16..9063666ae 100644 --- a/src/Wolverine/Runtime/Agents/NodeAgentController.HeartBeat.cs +++ b/src/Wolverine/Runtime/Agents/NodeAgentController.HeartBeat.cs @@ -49,6 +49,19 @@ public async Task DoHealthChecksAsync() // Do it no matter what await ejectStaleNodes(staleNodes); + // Detect lost leadership: we *thought* we were the leader (from a + // previous heartbeat tick) but our underlying advisory lock has been + // released server-side. With the AdvisoryLock.HasLock liveness ping + // (Layer 1) this branch is reachable; without it, the bug from + // GH-2602 manifests as two nodes simultaneously claiming leadership. + // Step down cleanly here, then fall through to the normal election + // path so this same tick either re-attains the lock (if no one else + // has taken it) or peacefully becomes a follower. + if (IsLeader && !_persistence.HasLeadershipLock()) + { + await stepDownAsync("the leadership advisory lock was released server-side"); + } + if (_persistence.HasLeadershipLock()) { IsLeader = true; @@ -70,6 +83,59 @@ public async Task DoHealthChecksAsync() return AgentCommands.Empty; } + /// + /// Reverse the local effects of when + /// the node discovers it is no longer the leader server-side. Stops the + /// local agent so this node + /// stops dispatching AssignAgent / ReassignAgent commands, + /// best-effort releases the persistence-layer leadership lock, and + /// notifies observers. The caller falls through to the normal election + /// path so a fresh leadership election happens on the same tick. See + /// GH-2602. + /// + internal async Task stepDownAsync(string reason) + { + _logger.LogWarning( + "Node {NodeNumber} stepping down from leadership: {Reason}. Triggering a new leadership election.", + _runtime.Options.Durability.AssignedNodeNumber, reason); + + IsLeader = false; + + try + { + if (Agents.ContainsKey(LeaderUri)) + { + await StopAgentAsync(LeaderUri); + } + } + catch (Exception e) + { + _logger.LogError(e, + "Error stopping the local leader agent during leadership step-down on node {NodeNumber}", + _runtime.Options.Durability.AssignedNodeNumber); + } + + try + { + await _persistence.ReleaseLeadershipLockAsync(); + } + catch (Exception e) + { + _logger.LogDebug(e, + "Error trying to release the leadership lock during step-down (non-fatal)"); + } + + try + { + await _observer.LostLeadership(); + } + catch (Exception e) + { + _logger.LogDebug(e, + "Observer.LostLeadership threw during step-down (non-fatal)"); + } + } + private async Task tryStartLeadershipAsync(IReadOnlyList nodes, AgentRestrictions restrictions) { diff --git a/src/Wolverine/Runtime/Agents/NodeRecordType.cs b/src/Wolverine/Runtime/Agents/NodeRecordType.cs index 62415ddb9..2aa637dca 100644 --- a/src/Wolverine/Runtime/Agents/NodeRecordType.cs +++ b/src/Wolverine/Runtime/Agents/NodeRecordType.cs @@ -11,7 +11,15 @@ public enum NodeRecordType DormantNodeEjected, AssignmentChanged, LeadershipAssumed, - ListenerLatched + ListenerLatched, + + /// + /// A node that thought it was the leader detected that its underlying + /// advisory lock had been released server-side (network blip, idle-cull, + /// pg_terminate_backend, AlwaysOn failover, etc.) and stepped down so a + /// new leadership election could happen. See GH-2602. + /// + LeadershipLost } // This is marked as ISerializable so that it can go to CritterWatch w/o