diff --git a/src/Testing/CoreTests/Runtime/Agents/leader_election_self_visibility_tests.cs b/src/Testing/CoreTests/Runtime/Agents/leader_election_self_visibility_tests.cs new file mode 100644 index 000000000..ca636491c --- /dev/null +++ b/src/Testing/CoreTests/Runtime/Agents/leader_election_self_visibility_tests.cs @@ -0,0 +1,166 @@ +using CoreTests.Transports; +using JasperFx.Core; +using Microsoft.Extensions.Logging.Abstractions; +using NSubstitute; +using Shouldly; +using Wolverine.Configuration; +using Wolverine.Logging; +using Wolverine.Runtime; +using Wolverine.Runtime.Agents; +using Xunit; + +namespace CoreTests.Runtime.Agents; + +/// +/// Regression coverage for GH-2682. Stale snapshot reads (read replica lag, +/// snapshot isolation, GC pause between the heartbeat write and the read, +/// Oracle session-TZ round-trip, an aggressive StaleNodeTimeout) used +/// to fold the current node into the staleNodes filter inside +/// . The tick then crashed +/// in tryStartLeadershipAsync with NRE on +/// self!.AssignAgents([LeaderUri]) — *after* IsLeader had been set +/// true, the leadership lock was held, AssumedLeadership had fired, and the +/// assignment row was written. The cluster ended up half-elected with no +/// agent dispatch. The fix is "we know we're alive because we just wrote our +/// own heartbeat" — never let self into staleNodes, and inject self if the +/// snapshot didn't include it at all. +/// +public class leader_election_self_visibility_tests +{ + private readonly WolverineOptions _options; + private readonly INodeAgentPersistence _persistence; + private readonly IWolverineRuntime _runtime; + private readonly NodeAgentController _controller; + + public leader_election_self_visibility_tests() + { + _options = new WolverineOptions + { + ApplicationAssembly = GetType().Assembly + }; + _options.Transports.NodeControlEndpoint = new FakeEndpoint("fake://self".ToUri(), EndpointRole.System); + _options.Durability.DurabilityAgentEnabled = false; // skip MessageStoreCollection wiring + _options.Durability.StaleNodeTimeout = 30.Seconds(); // matches the bug report + + _runtime = Substitute.For(); + _runtime.Options.Returns(_options); + _runtime.DurabilitySettings.Returns(_options.Durability); + _runtime.Observer.Returns(Substitute.For()); + + _persistence = Substitute.For(); + + _controller = new NodeAgentController( + _runtime, + _persistence, + Array.Empty(), + NullLogger.Instance, + CancellationToken.None); + } + + private WolverineNode SelfRow(DateTimeOffset lastHealthCheck) => new() + { + NodeId = _options.UniqueNodeId, + AssignedNodeNumber = _options.Durability.AssignedNodeNumber, + ControlUri = _options.Transports.NodeControlEndpoint!.Uri, + LastHealthCheck = lastHealthCheck + }; + + private WolverineNode OtherRow(DateTimeOffset lastHealthCheck) => new() + { + NodeId = Guid.NewGuid(), + AssignedNodeNumber = 99, + ControlUri = new Uri("fake://other"), + LastHealthCheck = lastHealthCheck + }; + + private void primeForElection() + { + // The fix focuses on the staleness filter. Drive the controller down + // the tryStartLeadershipAsync branch by handing it the leadership lock. + _persistence.HasLeadershipLock().Returns(false); + _persistence.TryAttainLeadershipLockAsync(Arg.Any()) + .Returns(true); + } + + [Fact] + public async Task does_not_throw_when_self_appears_stale_in_loaded_snapshot() + { + primeForElection(); + + // Self's row in the snapshot is past the StaleNodeTimeout (30s) — the + // exact pathology described in GH-2682. Pre-fix this folded self into + // staleNodes and led to NRE in tryStartLeadershipAsync. + var staleSelf = SelfRow(DateTimeOffset.UtcNow.Subtract(60.Seconds())); + var liveOther = OtherRow(DateTimeOffset.UtcNow); + + _persistence.LoadNodeAgentStateAsync(Arg.Any()) + .Returns(new NodeAgentState(new[] { staleSelf, liveOther }, new AgentRestrictions())); + + await _controller.DoHealthChecksAsync(); + + _controller.IsLeader.ShouldBeTrue(); + _controller.LastAssignments.ShouldNotBeNull(); + _controller.LastAssignments!.Nodes + .ShouldContain(n => n.NodeId == _options.UniqueNodeId, + "self must remain in the assignment grid even when the snapshot reported it as stale"); + } + + [Fact] + public async Task does_not_eject_self_even_when_snapshot_marks_self_stale() + { + primeForElection(); + + var staleSelf = SelfRow(DateTimeOffset.UtcNow.Subtract(60.Seconds())); + _persistence.LoadNodeAgentStateAsync(Arg.Any()) + .Returns(new NodeAgentState(new[] { staleSelf }, new AgentRestrictions())); + + await _controller.DoHealthChecksAsync(); + + // ejectStaleNodes used to skip self by AssignedNodeNumber match (line + // 179 of NodeAgentController.HeartBeat.cs); the in-memory list still + // dropped self though. With the fix self must never even reach the + // staleNodes array, so DeleteAsync must not be called for self. + await _persistence.DidNotReceive() + .DeleteAsync(_options.UniqueNodeId, _options.Durability.AssignedNodeNumber); + } + + [Fact] + public async Task injects_self_when_snapshot_omits_self_entirely() + { + primeForElection(); + + // Snapshot has no row for self at all (read-after-write lag against the + // upsert above, brand-new node still propagating). The defensive + // self-injection branch in DoHealthChecksAsync must keep the election + // path safe. + var liveOther = OtherRow(DateTimeOffset.UtcNow); + _persistence.LoadNodeAgentStateAsync(Arg.Any()) + .Returns(new NodeAgentState(new[] { liveOther }, new AgentRestrictions())); + + await _controller.DoHealthChecksAsync(); + + _controller.IsLeader.ShouldBeTrue(); + _controller.LastAssignments.ShouldNotBeNull(); + _controller.LastAssignments!.Nodes + .ShouldContain(n => n.NodeId == _options.UniqueNodeId, + "self must be injected into the assignment grid when the snapshot omits it"); + } + + [Fact] + public async Task evaluate_assignments_injects_self_when_caller_passes_non_empty_list_without_self() + { + // Defense in depth for EvaluateAssignmentsAsync. The pre-existing guard + // at the top of the method only fired when the list was empty; a stale + // snapshot that filtered self out but kept other nodes slipped through. + var liveOther = OtherRow(DateTimeOffset.UtcNow); + + var commands = await _controller.EvaluateAssignmentsAsync( + new[] { liveOther }, + new AgentRestrictions()); + + commands.ShouldNotBeNull(); + _controller.LastAssignments.ShouldNotBeNull(); + _controller.LastAssignments!.Nodes + .ShouldContain(n => n.NodeId == _options.UniqueNodeId); + } +} diff --git a/src/Wolverine/Runtime/Agents/NodeAgentController.EvaluateAssignments.cs b/src/Wolverine/Runtime/Agents/NodeAgentController.EvaluateAssignments.cs index b98c6dea6..6d4f80369 100644 --- a/src/Wolverine/Runtime/Agents/NodeAgentController.EvaluateAssignments.cs +++ b/src/Wolverine/Runtime/Agents/NodeAgentController.EvaluateAssignments.cs @@ -23,6 +23,18 @@ public async Task EvaluateAssignmentsAsync( nodes = new List { WolverineNode.For(_runtime.Options) }; nodes[0].AssignAgents([LeaderUri]); } + else if (nodes.All(x => x.NodeId != _runtime.Options.UniqueNodeId)) + { + // GH-2682 defense in depth: if the caller hands us a non-empty + // node list that's missing the current node (e.g. a stale snapshot + // read filtered self out), inject self so the assignment grid + // doesn't omit the leader. The current node only owes the LeaderUri + // here when IsLeader is true on this tick — the heartbeat path + // upstream is responsible for actually calling AddAssignmentAsync. + var self = WolverineNode.For(_runtime.Options); + if (IsLeader) self.AssignAgents([LeaderUri]); + nodes = nodes.Concat(new[] { self }).ToList(); + } var grid = new AssignmentGrid(); diff --git a/src/Wolverine/Runtime/Agents/NodeAgentController.HeartBeat.cs b/src/Wolverine/Runtime/Agents/NodeAgentController.HeartBeat.cs index cafa1c4fc..88013de23 100644 --- a/src/Wolverine/Runtime/Agents/NodeAgentController.HeartBeat.cs +++ b/src/Wolverine/Runtime/Agents/NodeAgentController.HeartBeat.cs @@ -39,13 +39,35 @@ public async Task DoHealthChecksAsync() await _persistence.MarkHealthCheckAsync(WolverineNode.For(_runtime.Options), _cancellation.Token); var (nodes, restrictions) = await _persistence.LoadNodeAgentStateAsync(_cancellation.Token); - - // Check for stale nodes that are no longer writing health checks + + // Check for stale nodes that are no longer writing health checks. By + // definition we just wrote our own heartbeat above, so we must never + // consider ourselves stale on this tick — a stale snapshot read (read + // replica lag, snapshot isolation, GC pause between the write and the + // read, Oracle session-TZ round-trip, an aggressive StaleNodeTimeout) + // could otherwise fold us into staleNodes. That path crashes + // tryStartLeadershipAsync with NRE on `self!.AssignAgents([LeaderUri])` + // *after* IsLeader=true, the lock is held, AssumedLeadership has + // fired, and the assignment row is written — leaving the cluster in a + // half-elected state with no agent dispatch. See GH-2682. + var selfNodeId = _runtime.Options.UniqueNodeId; var staleTime = DateTimeOffset.UtcNow.Subtract(_runtime.Options.Durability.StaleNodeTimeout); - var staleNodes = nodes.Where(x => x.LastHealthCheck < staleTime).ToArray(); + var staleNodes = nodes + .Where(x => x.LastHealthCheck < staleTime && x.NodeId != selfNodeId) + .ToArray(); nodes = nodes.Where(x => !staleNodes.Contains(x)).ToList(); + // Defensive: if the snapshot didn't include our own row at all + // (read-after-write lag against the upsert above, brand-new node still + // propagating), inject self so downstream leader-election and + // assignment-evaluation code can find us. We rely on the next tick to + // pick up the persisted row with its full Capabilities / ActiveAgents. + if (nodes.All(x => x.NodeId != selfNodeId)) + { + nodes = nodes.Concat(new[] { WolverineNode.For(_runtime.Options) }).ToList(); + } + // Do it no matter what await ejectStaleNodes(staleNodes);