diff --git a/src/Persistence/MySql/Wolverine.MySql/MySqlAdvisoryLock.cs b/src/Persistence/MySql/Wolverine.MySql/MySqlAdvisoryLock.cs index 2bf8d1b0d..fa943901e 100644 --- a/src/Persistence/MySql/Wolverine.MySql/MySqlAdvisoryLock.cs +++ b/src/Persistence/MySql/Wolverine.MySql/MySqlAdvisoryLock.cs @@ -28,7 +28,41 @@ public MySqlAdvisoryLock(MySqlDataSource source, ILogger logger, string database public bool HasLock(int lockId) { - return _conn is not { State: ConnectionState.Closed } && _locks.Contains(lockId); + if (_conn is null) return false; + if (!_locks.Contains(lockId)) return false; + + // MySQL named locks (GET_LOCK / RELEASE_LOCK) are session-scoped, + // so the lock evaporates the moment the connection's MySQL session + // dies — KILL CONNECTION, network drop, idle-cull. MySqlConnection + // doesn't surface that immediately, so we ping. Without this, + // HasLock keeps returning true after the lock has been transferred + // and two nodes race as leader. See GH-2602. + try + { + using var cmd = _conn.CreateCommand(); + cmd.CommandText = "select 1"; + cmd.CommandTimeout = 2; + cmd.ExecuteScalar(); + return true; + } + catch (Exception e) + { + _logger.LogWarning(e, + "Lost advisory-lock connection for database {Database}; clearing held lock ids {Locks}", + _databaseName, _locks); + + _locks.Clear(); + try + { + _conn.Dispose(); + } + catch + { + // Already broken; nothing to do. + } + _conn = null; + return false; + } } public async Task TryAttainLockAsync(int lockId, CancellationToken token) diff --git a/src/Persistence/Oracle/Wolverine.Oracle/OracleAdvisoryLock.cs b/src/Persistence/Oracle/Wolverine.Oracle/OracleAdvisoryLock.cs index aeffac073..98ad603e9 100644 --- a/src/Persistence/Oracle/Wolverine.Oracle/OracleAdvisoryLock.cs +++ b/src/Persistence/Oracle/Wolverine.Oracle/OracleAdvisoryLock.cs @@ -28,7 +28,49 @@ public OracleAdvisoryLock(OracleDataSource source, ILogger logger, string schema public bool HasLock(int lockId) { - return _locks.Contains(lockId); + if (!_locks.Contains(lockId)) return false; + if (!_heldLocks.TryGetValue(lockId, out var held)) return false; + + // Oracle row-level FOR UPDATE locks are tied to the transaction + // that took them, which is in turn tied to the holding connection. + // If the connection died (network drop, RAC failover, manual KILL + // SESSION), the row lock evaporates server-side but our in-memory + // state still claims it. Ping the held connection so we can detect + // a broken backend and self-clean. See GH-2602. + try + { + using var cmd = held.conn.CreateCommand(); + cmd.CommandText = "select 1 from dual"; + cmd.CommandTimeout = 2; + cmd.ExecuteScalar(); + return true; + } + catch (Exception e) + { + _logger.LogWarning(e, + "Lost advisory-lock connection for lock {LockId} in schema {Schema}; clearing held state", + lockId, _schemaName); + + _locks.Remove(lockId); + _heldLocks.Remove(lockId); + try + { + held.tx.Dispose(); + } + catch + { + // already broken + } + try + { + held.conn.Dispose(); + } + catch + { + // already broken + } + return false; + } } public async Task TryAttainLockAsync(int lockId, CancellationToken token) diff --git a/src/Persistence/PostgresqlTests/Bugs/Bug_split_brain_advisory_lock_state_divergence.cs b/src/Persistence/PostgresqlTests/Bugs/Bug_split_brain_advisory_lock_state_divergence.cs new file mode 100644 index 000000000..5c84d44e9 --- /dev/null +++ b/src/Persistence/PostgresqlTests/Bugs/Bug_split_brain_advisory_lock_state_divergence.cs @@ -0,0 +1,105 @@ +using IntegrationTests; +using Microsoft.Extensions.Logging.Abstractions; +using Npgsql; +using Shouldly; +using Xunit; +using AdvisoryLock = Wolverine.Postgresql.AdvisoryLock; + +namespace PostgresqlTests.Bugs; + +/// +/// Reproducer for https://github.com/JasperFx/wolverine/issues/2602. +/// +/// In DurabilityMode.Balanced with the PostgreSQL persistence, +/// leader election relies on a session-level Postgres advisory lock. +/// When the holder's underlying Postgres backend is terminated server-side +/// (network blip, idle-connection cull, Postgres failover, Azure flexserver +/// maintenance, pg_terminate_backend, etc.), Postgres releases the +/// advisory lock and another node legitimately acquires it — but the +/// original leader's in-process bookkeeping in AdvisoryLock.HasLock +/// continues to return true. Result: two nodes simultaneously +/// believe they are the leader, both run EvaluateAssignmentsAsync, +/// and both dispatch AssignAgent commands. +/// +public class Bug_split_brain_advisory_lock_state_divergence : PostgresqlContext +{ + [Fact] + public async Task has_lock_returns_true_after_postgres_releases_it_due_to_backend_termination() + { + const int lockId = unchecked((int)0xB16B00B5); + + var dataSource = NpgsqlDataSource.Create(Servers.PostgresConnectionString); + try + { + var holder = new AdvisoryLock(dataSource, NullLogger.Instance, "split-brain-test"); + + // 1. Stale leader acquires the leadership lock. + (await holder.TryAttainLockAsync(lockId, CancellationToken.None)).ShouldBeTrue(); + holder.HasLock(lockId).ShouldBeTrue(); + + // 2. Find the backend PID currently holding the lock and terminate it + // from a separate session — simulates a network blip, pg restart, + // idle-connection cull, or Azure flexserver maintenance. + var holderPid = await findAdvisoryLockHolderPidAsync(lockId); + holderPid.ShouldNotBeNull("Expected to find a backend holding the advisory lock"); + await terminateBackendAsync(holderPid!.Value); + + // 3. From a different connection, prove that Postgres really did + // release the lock — a contender now acquires it cleanly. + var contender = new AdvisoryLock(dataSource, NullLogger.Instance, "contender"); + try + { + (await contender.TryAttainLockAsync(lockId, CancellationToken.None)) + .ShouldBeTrue("Postgres should have released the lock when the holder's backend was terminated"); + } + finally + { + await contender.ReleaseLockAsync(lockId); + await contender.DisposeAsync(); + } + + // 4. THE BUG: holder.HasLock used to still return true even though + // it is no longer the lock holder server-side. NodeAgentController + // would then happily call EvaluateAssignmentsAsync on this stale + // leader. With the fix in this PR, HasLock now pings the held + // connection and detects the broken backend, drops the in-memory + // state, and returns false. + holder.HasLock(lockId) + .ShouldBeFalse( + "AdvisoryLock.HasLock returned true even though Postgres has released the session-level lock and another session has acquired it."); + + await holder.DisposeAsync(); + } + finally + { + await dataSource.DisposeAsync(); + } + } + + private static async Task findAdvisoryLockHolderPidAsync(int lockId) + { + await using var conn = new NpgsqlConnection(Servers.PostgresConnectionString); + await conn.OpenAsync(); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = @" + select pid from pg_locks + where locktype = 'advisory' + and granted = true + and ((classid::bigint << 32) | (objid::bigint & x'FFFFFFFF'::bigint)) = @lockId + limit 1"; + cmd.Parameters.AddWithValue("@lockId", (long)lockId); + var raw = await cmd.ExecuteScalarAsync(); + return raw is int pid ? pid : null; + } + + private static async Task terminateBackendAsync(int pid) + { + await using var conn = new NpgsqlConnection(Servers.PostgresConnectionString); + await conn.OpenAsync(); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = "select pg_terminate_backend(@pid)"; + cmd.Parameters.AddWithValue("@pid", pid); + await cmd.ExecuteScalarAsync(); + await Task.Delay(250); + } +} diff --git a/src/Persistence/Wolverine.Postgresql/PostgresqlNodePersistence.cs b/src/Persistence/Wolverine.Postgresql/PostgresqlNodePersistence.cs index bc1c324f1..543898a50 100644 --- a/src/Persistence/Wolverine.Postgresql/PostgresqlNodePersistence.cs +++ b/src/Persistence/Wolverine.Postgresql/PostgresqlNodePersistence.cs @@ -393,7 +393,42 @@ public AdvisoryLock(NpgsqlDataSource source, ILogger logger, string databaseName public bool HasLock(int lockId) { - return _conn is not { State: ConnectionState.Closed } && _locks.Contains(lockId); + if (_conn is null) return false; + if (!_locks.Contains(lockId)) return false; + + // Postgres releases session-level advisory locks the moment the + // backend session ends — network blip, idle-connection cull, + // pg_terminate_backend, Postgres failover, Azure flexserver + // maintenance. Npgsql's NpgsqlConnection.State stays Open until + // we actually try to use it, so without this ping HasLock keeps + // claiming the lock long after another session has acquired it, + // and two nodes both believe they're the leader. See GH-2602. + try + { + using var cmd = _conn.CreateCommand(); + cmd.CommandText = "select 1"; + cmd.CommandTimeout = 2; + cmd.ExecuteScalar(); + return true; + } + catch (Exception e) + { + _logger.LogWarning(e, + "Lost advisory-lock connection for database {Database}; clearing held lock ids {Locks}", + _databaseName, _locks); + + _locks.Clear(); + try + { + _conn.Dispose(); + } + catch + { + // Already broken; nothing to do. + } + _conn = null; + return false; + } } public async Task TryAttainLockAsync(int lockId, CancellationToken token) diff --git a/src/Persistence/Wolverine.SqlServer/Persistence/SqlServerAdvisoryLock.cs b/src/Persistence/Wolverine.SqlServer/Persistence/SqlServerAdvisoryLock.cs new file mode 100644 index 000000000..5591f20ac --- /dev/null +++ b/src/Persistence/Wolverine.SqlServer/Persistence/SqlServerAdvisoryLock.cs @@ -0,0 +1,197 @@ +using System.Data; +using JasperFx.Core; +using Microsoft.Data.SqlClient; +using Microsoft.Extensions.Logging; +using Weasel.Core; +using Weasel.SqlServer; + +namespace Wolverine.SqlServer.Persistence; + +/// +/// Wolverine-owned for SQL Server. Equivalent to +/// Weasel.SqlServer.AdvisoryLock but with a server-side liveness ping +/// in so a stale leader whose Postgres / SQL Server +/// session has been killed (KILL SPID, AlwaysOn failover, idle-connection +/// drop, NAT gateway reuse, etc.) detects the lock loss instead of forever +/// claiming to be the leader. +/// +/// See https://github.com/JasperFx/wolverine/issues/2602. +/// +internal class SqlServerAdvisoryLock : IAdvisoryLock +{ + private readonly Func _source; + private readonly ILogger _logger; + private readonly string _databaseName; + private readonly List _locks = new(); + private SqlConnection? _conn; + + public SqlServerAdvisoryLock(Func source, ILogger logger, string databaseName) + { + _source = source; + _logger = logger; + _databaseName = databaseName; + } + + public bool HasLock(int lockId) + { + if (_conn is null) return false; + if (!_locks.Contains(lockId)) return false; + + // SQL Server session-scoped application locks (sp_getapplock / + // sp_releaseapplock) are released the instant the SQL session ends — + // KILL SPID, network drop, AlwaysOn failover, AAD token expiry on + // managed identity. SqlConnection.State stays Open until we use it, + // so without this ping HasLock keeps reporting the lock held long + // after another session has acquired it. See GH-2602. + try + { + using var cmd = _conn.CreateCommand(); + cmd.CommandText = "select 1"; + cmd.CommandTimeout = 2; + cmd.ExecuteScalar(); + return true; + } + catch (Exception e) + { + _logger.LogWarning(e, + "Lost advisory-lock connection for database {Database}; clearing held lock ids {Locks}", + _databaseName, _locks); + + _locks.Clear(); + try + { + _conn.Dispose(); + } + catch + { + // Already broken; nothing to do. + } + _conn = null; + return false; + } + } + + public async Task TryAttainLockAsync(int lockId, CancellationToken token) + { + if (_conn == null) + { + _conn = _source(); + await _conn.OpenAsync(token).ConfigureAwait(false); + } + + if (_conn.State == ConnectionState.Closed) + { + try + { + await _conn.DisposeAsync().ConfigureAwait(false); + } + catch (Exception e) + { + _logger.LogError(e, "Error trying to clean up and restart an advisory lock connection"); + } + finally + { + _conn = null; + } + + return false; + } + + var attained = await _conn.TryGetGlobalLock(lockId.ToString(), cancellation: token).ConfigureAwait(false); + if (attained) + { + _locks.Add(lockId); + return true; + } + + return false; + } + + public async Task ReleaseLockAsync(int lockId) + { + if (!_locks.Contains(lockId)) return; + + if (_conn == null || _conn.State == ConnectionState.Closed) + { + _locks.Remove(lockId); + return; + } + + try + { + var cancellation = new CancellationTokenSource(); + cancellation.CancelAfter(1.Seconds()); + + await _conn.ReleaseGlobalLock(lockId.ToString(), cancellation: cancellation.Token).ConfigureAwait(false); + } + catch (Exception e) + { + _logger.LogDebug(e, + "Error trying to release advisory lock {LockId} for database {Identifier}", + lockId, _databaseName); + } + + _locks.Remove(lockId); + + if (!_locks.Any()) + { + await safeCloseConnectionAsync().ConfigureAwait(false); + } + } + + public async ValueTask DisposeAsync() + { + if (_conn == null) return; + + try + { + if (_conn.State == ConnectionState.Open) + { + foreach (var i in _locks) + { + try + { + await _conn.ReleaseGlobalLock(i.ToString(), CancellationToken.None).ConfigureAwait(false); + } + catch (Exception e) + { + _logger.LogDebug(e, + "Error trying to release advisory lock {LockId} during dispose for database {Identifier}", + i, _databaseName); + } + } + } + + await safeCloseConnectionAsync().ConfigureAwait(false); + } + catch (Exception e) + { + _logger.LogDebug(e, "Error trying to dispose of advisory locks for database {Identifier}", + _databaseName); + } + } + + private async Task safeCloseConnectionAsync() + { + if (_conn == null) return; + + try + { + if (_conn.State == ConnectionState.Open) + { + await _conn.CloseAsync().ConfigureAwait(false); + } + + await _conn.DisposeAsync().ConfigureAwait(false); + } + catch (Exception e) + { + _logger.LogDebug(e, "Error trying to close advisory lock connection for database {Identifier}", + _databaseName); + } + finally + { + _conn = null; + } + } +} diff --git a/src/Persistence/Wolverine.SqlServer/Persistence/SqlServerMessageStore.cs b/src/Persistence/Wolverine.SqlServer/Persistence/SqlServerMessageStore.cs index db79250fa..c81f1b910 100644 --- a/src/Persistence/Wolverine.SqlServer/Persistence/SqlServerMessageStore.cs +++ b/src/Persistence/Wolverine.SqlServer/Persistence/SqlServerMessageStore.cs @@ -45,7 +45,10 @@ public SqlServerMessageStore(DatabaseSettings database, DurabilitySettings setti $"select top (@limit) {DatabaseConstants.IncomingFields} from {database.SchemaName}.{DatabaseConstants.IncomingTable} where owner_id = {TransportConstants.AnyNode} and status = '{EnvelopeStatus.Incoming}' and {DatabaseConstants.ReceivedAt} = @address"; _scheduledLockId = "Wolverine:Scheduled:" + database.ScheduledJobLockId.ToString(); - AdvisoryLock = new AdvisoryLock(() => new SqlConnection(database.ConnectionString), + // Use the Wolverine-owned SqlServerAdvisoryLock (not Weasel.SqlServer.AdvisoryLock) + // so HasLock pings the held SQL session and detects KILL SPID / AlwaysOn + // failover / network drops. See GH-2602. + AdvisoryLock = new SqlServerAdvisoryLock(() => new SqlConnection(database.ConnectionString), logger, Identifier); foreach (var sagaTableDefinition in sagaTypes) diff --git a/src/Persistence/Wolverine.Sqlite/SqliteAdvisoryLock.cs b/src/Persistence/Wolverine.Sqlite/SqliteAdvisoryLock.cs index 7d55cafdb..66f1259d2 100644 --- a/src/Persistence/Wolverine.Sqlite/SqliteAdvisoryLock.cs +++ b/src/Persistence/Wolverine.Sqlite/SqliteAdvisoryLock.cs @@ -24,7 +24,40 @@ public SqliteAdvisoryLock(DbDataSource dataSource, ILogger logger, string databa public bool HasLock(int lockId) { - return _conn is not { State: ConnectionState.Closed } && _locks.Contains(lockId); + if (_conn is null) return false; + if (!_locks.Contains(lockId)) return false; + + // SQLite advisory locks are table rows; in single-process tests + // the connection is unlikely to die out from under us, but for + // parity with the Postgres / MySQL fix and to detect any held + // connection that has gone bad (e.g. file deleted under us), + // ping before reporting the lock as held. See GH-2602. + try + { + using var cmd = _conn.CreateCommand(); + cmd.CommandText = "select 1"; + cmd.CommandTimeout = 2; + cmd.ExecuteScalar(); + return true; + } + catch (Exception e) + { + _logger.LogWarning(e, + "Lost advisory-lock connection for database {Database}; clearing held lock ids {Locks}", + _databaseName, _locks); + + _locks.Clear(); + try + { + _conn.Dispose(); + } + catch + { + // Already broken; nothing to do. + } + _conn = null; + return false; + } } public async Task TryAttainLockAsync(int lockId, CancellationToken token) diff --git a/src/Testing/CoreTests/Runtime/Agents/duplicate_agent_split_brain_detection.cs b/src/Testing/CoreTests/Runtime/Agents/duplicate_agent_split_brain_detection.cs new file mode 100644 index 000000000..3c5cf3fe4 --- /dev/null +++ b/src/Testing/CoreTests/Runtime/Agents/duplicate_agent_split_brain_detection.cs @@ -0,0 +1,58 @@ +using Shouldly; +using Wolverine.Runtime.Agents; +using Xunit; + +namespace CoreTests.Runtime.Agents; + +/// +/// Locks down the Layer 3 fix for GH-2602. When two +/// rows both list the same agent in their ActiveAgents — the residue of +/// a brief leader split-brain — the assignment grid must record the duplicate +/// so the (now-correct) leader can heal it via StopRemoteAgent. Before +/// the fix, the dictionary write in Node.Running silently overwrote +/// the first node's entry and the duplicate became invisible to FindDelta. +/// +public class duplicate_agent_split_brain_detection +{ + private readonly Uri agent1 = new Uri("blue://1"); + private readonly Uri agent2 = new Uri("blue://2"); + + [Fact] + public void records_duplicate_when_same_agent_runs_on_two_nodes() + { + var grid = new AssignmentGrid(); + + var node1 = grid.WithNode(1, Guid.NewGuid()).Running(agent1); + var node2 = grid.WithNode(2, Guid.NewGuid()).Running(agent1); + + grid.DuplicateAgentReports.Count.ShouldBe(1); + + var report = grid.DuplicateAgentReports.Single(); + report.AgentUri.ShouldBe(agent1); + report.ExistingNode.ShouldBe(node1); + report.NewNode.ShouldBe(node2); + } + + [Fact] + public void no_report_when_each_agent_runs_on_only_one_node() + { + var grid = new AssignmentGrid(); + + grid.WithNode(1, Guid.NewGuid()).Running(agent1); + grid.WithNode(2, Guid.NewGuid()).Running(agent2); + + grid.DuplicateAgentReports.ShouldBeEmpty(); + } + + [Fact] + public void no_report_when_same_node_lists_agent_twice() + { + // A single Running call with duplicates within the same node — odd + // input, but not the split-brain we're trying to detect. + var grid = new AssignmentGrid(); + + grid.WithNode(1, Guid.NewGuid()).Running(agent1, agent1); + + grid.DuplicateAgentReports.ShouldBeEmpty(); + } +} diff --git a/src/Wolverine/Runtime/Agents/AssignmentGrid.Node.cs b/src/Wolverine/Runtime/Agents/AssignmentGrid.Node.cs index 463bdb65a..ee257ea13 100644 --- a/src/Wolverine/Runtime/Agents/AssignmentGrid.Node.cs +++ b/src/Wolverine/Runtime/Agents/AssignmentGrid.Node.cs @@ -68,6 +68,19 @@ public Node Running(params Uri[] agentUris) { foreach (var agentUri in agentUris) { + // Detect split-brain residue: another node already reported + // this agent as running, so we have a duplicate. Record it so + // the leader can emit a StopRemoteAgent against the existing + // copy. Without this, the dictionary write below silently + // overwrites the first node's entry and the duplicate becomes + // invisible to FindDelta. See GH-2602. + if (_parent._agents.TryGetValue(agentUri, out var existing) && + existing.OriginalNode != null && + !ReferenceEquals(existing.OriginalNode, this)) + { + _parent.RecordDuplicateAgent(agentUri, existing.OriginalNode, this); + } + var agent = new Agent(agentUri, this); _parent._agents[agentUri] = agent; diff --git a/src/Wolverine/Runtime/Agents/AssignmentGrid.cs b/src/Wolverine/Runtime/Agents/AssignmentGrid.cs index 8f63aa02d..938cf4b4e 100644 --- a/src/Wolverine/Runtime/Agents/AssignmentGrid.cs +++ b/src/Wolverine/Runtime/Agents/AssignmentGrid.cs @@ -13,6 +13,21 @@ public partial class AssignmentGrid { private readonly Dictionary _agents = new(); private readonly List _nodes = new(); + private readonly List _duplicateAgentReports = new(); + + /// + /// Reports of the same agent URI being claimed-as-running by more than + /// one when this grid was assembled. The + /// leader uses these to emit StopRemoteAgent commands so the + /// duplicates are healed even after a stale-leader split-brain. See + /// GH-2602. + /// + internal IReadOnlyList DuplicateAgentReports => _duplicateAgentReports; + + internal void RecordDuplicateAgent(Uri agentUri, Node existingNode, Node newNode) + { + _duplicateAgentReports.Add(new DuplicateAgentReport(agentUri, existingNode, newNode)); + } /// /// The identity of all currently unassigned agents diff --git a/src/Wolverine/Runtime/Agents/DuplicateAgentReport.cs b/src/Wolverine/Runtime/Agents/DuplicateAgentReport.cs new file mode 100644 index 000000000..658ff52a8 --- /dev/null +++ b/src/Wolverine/Runtime/Agents/DuplicateAgentReport.cs @@ -0,0 +1,15 @@ +namespace Wolverine.Runtime.Agents; + +/// +/// Recorded by when a single agent URI shows up +/// in more than one list during a +/// heartbeat tick. Symptom of a leader split-brain (GH-2602): two leaders +/// dispatched AssignAgent for the same agent in close succession. +/// The (now-correct) leader uses these reports to emit a +/// StopRemoteAgent targeting , leaving the +/// freshest copy on running. +/// +internal sealed record DuplicateAgentReport( + Uri AgentUri, + AssignmentGrid.Node ExistingNode, + AssignmentGrid.Node NewNode); diff --git a/src/Wolverine/Runtime/Agents/IWolverineObserver.cs b/src/Wolverine/Runtime/Agents/IWolverineObserver.cs index f805fdfe4..9b6375c37 100644 --- a/src/Wolverine/Runtime/Agents/IWolverineObserver.cs +++ b/src/Wolverine/Runtime/Agents/IWolverineObserver.cs @@ -10,6 +10,15 @@ namespace Wolverine.Runtime.Agents; public interface IWolverineObserver { Task AssumedLeadership(); + + /// + /// The node was leader, detected that its underlying advisory lock had + /// been released server-side, and has stepped down. Default no-op so + /// existing custom implementations are + /// unaffected. See GH-2602. + /// + Task LostLeadership() => Task.CompletedTask; + Task NodeStarted(); Task NodeStopped(); Task AgentStarted(Uri agentUri); @@ -90,6 +99,19 @@ await _runtime.Storage.Nodes.LogRecordsAsync(NodeRecord.For(_runtime.Options, NodeRecordType.LeadershipAssumed, NodeAgentController.LeaderUri)); } + public async Task LostLeadership() + { + try + { + await _runtime.Storage.Nodes.LogRecordsAsync(NodeRecord.For(_runtime.Options, + NodeRecordType.LeadershipLost, NodeAgentController.LeaderUri)); + } + catch (NotSupportedException) + { + // NullMessageStore does not support node persistence; nothing to log. + } + } + public async Task NodeStarted() { await _runtime.Storage.Nodes.LogRecordsAsync(NodeRecord.For(_runtime.Options, NodeRecordType.NodeStarted)); diff --git a/src/Wolverine/Runtime/Agents/NodeAgentController.EvaluateAssignments.cs b/src/Wolverine/Runtime/Agents/NodeAgentController.EvaluateAssignments.cs index bab6ff180..b98c6dea6 100644 --- a/src/Wolverine/Runtime/Agents/NodeAgentController.EvaluateAssignments.cs +++ b/src/Wolverine/Runtime/Agents/NodeAgentController.EvaluateAssignments.cs @@ -64,6 +64,22 @@ public async Task EvaluateAssignmentsAsync( } } + // Heal split-brain residue: if any agent is reported running on more + // than one node — most likely because a stale leader and the current + // leader both dispatched AssignAgent for the same agent in close + // succession before the stale leader stepped down — emit a + // StopRemoteAgent for the older copy. The freshest copy stays + // running; the next assignment cycle will rebalance if needed. See + // GH-2602. + foreach (var report in grid.DuplicateAgentReports) + { + _logger.LogWarning( + "Detected duplicate agent {AgentUri} reported running on Node {ExistingNode} and Node {NewNode} — sending StopRemoteAgent to the older copy to heal split-brain residue.", + report.AgentUri, report.ExistingNode.AssignedId, report.NewNode.AssignedId); + + commands.Add(new StopRemoteAgent(report.AgentUri, report.ExistingNode.ToDestination())); + } + await _observer.AssignmentsChanged(grid, commands); LastAssignments = grid; diff --git a/src/Wolverine/Runtime/Agents/NodeAgentController.HeartBeat.cs b/src/Wolverine/Runtime/Agents/NodeAgentController.HeartBeat.cs index 125171f16..9063666ae 100644 --- a/src/Wolverine/Runtime/Agents/NodeAgentController.HeartBeat.cs +++ b/src/Wolverine/Runtime/Agents/NodeAgentController.HeartBeat.cs @@ -49,6 +49,19 @@ public async Task DoHealthChecksAsync() // Do it no matter what await ejectStaleNodes(staleNodes); + // Detect lost leadership: we *thought* we were the leader (from a + // previous heartbeat tick) but our underlying advisory lock has been + // released server-side. With the AdvisoryLock.HasLock liveness ping + // (Layer 1) this branch is reachable; without it, the bug from + // GH-2602 manifests as two nodes simultaneously claiming leadership. + // Step down cleanly here, then fall through to the normal election + // path so this same tick either re-attains the lock (if no one else + // has taken it) or peacefully becomes a follower. + if (IsLeader && !_persistence.HasLeadershipLock()) + { + await stepDownAsync("the leadership advisory lock was released server-side"); + } + if (_persistence.HasLeadershipLock()) { IsLeader = true; @@ -70,6 +83,59 @@ public async Task DoHealthChecksAsync() return AgentCommands.Empty; } + /// + /// Reverse the local effects of when + /// the node discovers it is no longer the leader server-side. Stops the + /// local agent so this node + /// stops dispatching AssignAgent / ReassignAgent commands, + /// best-effort releases the persistence-layer leadership lock, and + /// notifies observers. The caller falls through to the normal election + /// path so a fresh leadership election happens on the same tick. See + /// GH-2602. + /// + internal async Task stepDownAsync(string reason) + { + _logger.LogWarning( + "Node {NodeNumber} stepping down from leadership: {Reason}. Triggering a new leadership election.", + _runtime.Options.Durability.AssignedNodeNumber, reason); + + IsLeader = false; + + try + { + if (Agents.ContainsKey(LeaderUri)) + { + await StopAgentAsync(LeaderUri); + } + } + catch (Exception e) + { + _logger.LogError(e, + "Error stopping the local leader agent during leadership step-down on node {NodeNumber}", + _runtime.Options.Durability.AssignedNodeNumber); + } + + try + { + await _persistence.ReleaseLeadershipLockAsync(); + } + catch (Exception e) + { + _logger.LogDebug(e, + "Error trying to release the leadership lock during step-down (non-fatal)"); + } + + try + { + await _observer.LostLeadership(); + } + catch (Exception e) + { + _logger.LogDebug(e, + "Observer.LostLeadership threw during step-down (non-fatal)"); + } + } + private async Task tryStartLeadershipAsync(IReadOnlyList nodes, AgentRestrictions restrictions) { diff --git a/src/Wolverine/Runtime/Agents/NodeRecordType.cs b/src/Wolverine/Runtime/Agents/NodeRecordType.cs index 62415ddb9..2aa637dca 100644 --- a/src/Wolverine/Runtime/Agents/NodeRecordType.cs +++ b/src/Wolverine/Runtime/Agents/NodeRecordType.cs @@ -11,7 +11,15 @@ public enum NodeRecordType DormantNodeEjected, AssignmentChanged, LeadershipAssumed, - ListenerLatched + ListenerLatched, + + /// + /// A node that thought it was the leader detected that its underlying + /// advisory lock had been released server-side (network blip, idle-cull, + /// pg_terminate_backend, AlwaysOn failover, etc.) and stepped down so a + /// new leadership election could happen. See GH-2602. + /// + LeadershipLost } // This is marked as ISerializable so that it can go to CritterWatch w/o