From 909a4294d83bd1f60465d930a7499132334cee75 Mon Sep 17 00:00:00 2001 From: Dmytro Pryvedeniuk Date: Sat, 30 May 2026 19:06:49 +0300 Subject: [PATCH] Prevent concurrent health check race causing leadership lock corruption --- .../RavenDbTests.LeaderElection.csproj | 6 +- .../RavenDbTests/leadership_locking.cs | 188 +++++++++++++++++- .../leader_election_self_visibility_tests.cs | 34 +++- .../LeadershipElectionCompliance.cs | 83 +++++++- .../Agents/NodeAgentController.HeartBeat.cs | 27 ++- .../Runtime/Agents/NodeAgentController.cs | 10 +- 6 files changed, 325 insertions(+), 23 deletions(-) diff --git a/src/Persistence/LeaderElection/RavenDbTests.LeaderElection/RavenDbTests.LeaderElection.csproj b/src/Persistence/LeaderElection/RavenDbTests.LeaderElection/RavenDbTests.LeaderElection.csproj index e9ce300e8..07ecf9dd5 100644 --- a/src/Persistence/LeaderElection/RavenDbTests.LeaderElection/RavenDbTests.LeaderElection.csproj +++ b/src/Persistence/LeaderElection/RavenDbTests.LeaderElection/RavenDbTests.LeaderElection.csproj @@ -10,10 +10,10 @@ - + - + all runtime; build; native; contentfiles; analyzers; buildtransitive @@ -21,7 +21,7 @@ - + diff --git a/src/Persistence/RavenDbTests/leadership_locking.cs b/src/Persistence/RavenDbTests/leadership_locking.cs index 56a713b4d..5b928fc19 100644 --- a/src/Persistence/RavenDbTests/leadership_locking.cs +++ b/src/Persistence/RavenDbTests/leadership_locking.cs @@ -1,14 +1,16 @@ using JasperFx.Core.Reflection; +using JasperFx.Core; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Raven.Client.Documents; using Raven.Client.Documents.Operations.CompareExchange; -using Raven.Embedded; using Shouldly; using Wolverine; using Wolverine.Persistence.Durability; using Wolverine.RavenDb; using Wolverine.RavenDb.Internals; +using Wolverine.Tracking; +using Wolverine.Transports.Tcp; namespace RavenDbTests; @@ -118,7 +120,7 @@ public async Task expired_scheduled_job_lock_from_dead_predecessor_can_be_taken_ public async Task try_attain_renews_the_server_side_lease_when_already_held() { // Calling TryAttain when the lease is already held must refresh the - // server-side entry — that's the contract the heartbeat relies on to + // server-side entry - that's the contract the heartbeat relies on to // keep the lease alive. var store = _host.Services.GetService()!.As(); var lockId = "wolverine/leader/locking"; @@ -138,4 +140,184 @@ public async Task try_attain_renews_the_server_side_lease_when_already_held() await store.Nodes.ReleaseLeadershipLockAsync(); } -} \ No newline at end of file + + [Fact] + public async Task raw_compare_exchange_exclusivity_proof() + { + // Direct RavenDB CompareExchange test - no Wolverine wrapping. + // Proves PutCompareExchangeValueOperation with index=0 is truly exclusive. + var key = "test/exclusivity/" + Guid.NewGuid(); + var lock1 = new DistributedLock { NodeId = Guid.NewGuid(), ExpirationTime = DateTimeOffset.UtcNow.AddMinutes(5) }; + var lock2 = new DistributedLock { NodeId = Guid.NewGuid(), ExpirationTime = DateTimeOffset.UtcNow.AddMinutes(5) }; + + // First acquisition - should succeed (key doesn't exist) + var firstPut = await _store.Operations.SendAsync( + new PutCompareExchangeValueOperation(key, lock1, 0)); + firstPut.Successful.ShouldBeTrue("First acquisition with index=0 must succeed"); + + // Second acquisition with index=0 on same key - must FAIL if CE is exclusive + var secondPut = await _store.Operations.SendAsync( + new PutCompareExchangeValueOperation(key, lock2, 0)); + secondPut.Successful.ShouldBeFalse( + "Second acquisition with index=0 on same key must fail - CompareExchange is exclusive"); + secondPut.Value.ShouldNotBeNull(); + secondPut.Value.NodeId.ShouldBe(lock1.NodeId, "Existing value should still be lock1's node"); + + // Correct-index acquisition (using the index from first put) - should succeed + var correctIndexPut = await _store.Operations.SendAsync( + new PutCompareExchangeValueOperation(key, lock2, firstPut.Index)); + correctIndexPut.Successful.ShouldBeTrue("Acquisition with correct index must succeed"); + + // Wrong-index delete - should FAIL + var wrongIndex = correctIndexPut.Index + 999; + var wrongDelete = await _store.Operations.SendAsync( + new DeleteCompareExchangeValueOperation(key, wrongIndex)); + wrongDelete.Successful.ShouldBeFalse("Delete with wrong index must fail"); + + // Correct-index delete - should succeed + var correctDelete = await _store.Operations.SendAsync( + new DeleteCompareExchangeValueOperation(key, correctIndexPut.Index)); + correctDelete.Successful.ShouldBeTrue("Delete with correct index must succeed"); + } + + [Fact] + public async Task stale_index_causes_lock_renewal_failure() + { + // Proves: when two callers concurrently read the same server-side + // lock index, one renewal succeeds (bumping the index) and the other + // fails (using the stale index). Wolverine interprets the failed + // renewal as "leadership lock lost" and calls stepDownAsync, which + // tries to delete the lock value using the stale index. This delete + // ALSO fails because the index has changed. The lock value survives + // under the winning caller's index - orphaned. This is the exact + // mechanism that causes the split-brain / leadership timeout. + + var key = "test/renewal-race/" + Guid.NewGuid(); + var lockVal1 = new DistributedLock { NodeId = Guid.NewGuid(), ExpirationTime = DateTimeOffset.UtcNow.AddMinutes(5) }; + + // Acquire initially -> index becomes N + var create = await _store.Operations.SendAsync( + new PutCompareExchangeValueOperation(key, lockVal1, 0)); + create.Successful.ShouldBeTrue(); + var expectedIndex = create.Index; + + // Now simulate TWO concurrent callers both reading _lastLockIndex=N + // Caller 1 renews first -> index becomes N+1, _lastLockIndex updated + // Caller 2 uses stale N -> PUT fails + + // - Caller 1 (correct index) succeeds - + var lockVal2 = new DistributedLock { NodeId = Guid.NewGuid(), ExpirationTime = DateTimeOffset.UtcNow.AddMinutes(5) }; + var caller1 = await _store.Operations.SendAsync( + new PutCompareExchangeValueOperation(key, lockVal2, expectedIndex)); + caller1.Successful.ShouldBeTrue("Caller 1 renewal with correct index succeeds"); + var afterCaller1Index = caller1.Index; + + // - Caller 2 (stale index N, but actual index is now N+1) fails - + var lockVal3 = new DistributedLock { NodeId = Guid.NewGuid(), ExpirationTime = DateTimeOffset.UtcNow.AddMinutes(5) }; + var caller2 = await _store.Operations.SendAsync( + new PutCompareExchangeValueOperation(key, lockVal3, expectedIndex)); + caller2.Successful.ShouldBeFalse("Caller 2 with stale index MUST fail - proves race causes renewal failure"); + + // - What Wolverine does: stepDownAsync -> ReleaseLeadershipLockAsync + // which deletes the lock value using its stale _lastLockIndex - + // This delete ALSO fails because index is wrong! + var staleDelete = await _store.Operations.SendAsync( + new DeleteCompareExchangeValueOperation(key, expectedIndex)); + staleDelete.Successful.ShouldBeFalse( + "Delete with stale index fails - lock value remains, so another node can acquire it via take-over"); + + // - Cleanup: delete with correct index - + var cleanDelete = await _store.Operations.SendAsync( + new DeleteCompareExchangeValueOperation(key, afterCaller1Index)); + cleanDelete.Successful.ShouldBeTrue("Cleanup delete with correct index succeeds"); + } + + [Fact] + public async Task concurrent_lock_renewal_race_orphans_the_lock() + { + // Simulates the exact race in the leader election test: + // executeHealthChecks loop and CheckAgentHealth message + // call DoHealthChecksAsync -> TryAttainLeadershipLockAsync concurrently. + // Both callers read the same server-side lock index. The first + // renewal bumps the index; the second uses the stale value and + // fails. Wolverine then calls stepDownAsync -> ReleaseLeadershipLockAsync + // with the stale index, which ALSO fails. The lock value survives + // under the winning index - orphaned. No other node can acquire + // it (index=0 fails because a value already exists). Only + // lease-expiry takeover (5-minute timeout) can recover. + + var key = "test/concurrent-race/" + Guid.NewGuid(); + var owner = Guid.NewGuid(); + var initialLock = new DistributedLock { NodeId = owner, ExpirationTime = DateTimeOffset.UtcNow.AddMinutes(5) }; + + // === Step 1: Host1 acquires the lock (index=N) === + var create = await _store.Operations.SendAsync( + new PutCompareExchangeValueOperation(key, initialLock, 0)); + create.Successful.ShouldBeTrue(); + var sharedIndex = create.Index; // This is like _lastLockIndex on Host1 + Console.WriteLine($"Step 1: Acquired lock, index={sharedIndex}"); + + // === Step 2: TWO concurrent renewal callers both read sharedIndex=N === + // Caller A is the health check loop + // Caller B is CheckAgentHealth message processing + var lockA = new DistributedLock { NodeId = owner, ExpirationTime = DateTimeOffset.UtcNow.AddMinutes(5) }; + var lockB = new DistributedLock { NodeId = owner, ExpirationTime = DateTimeOffset.UtcNow.AddMinutes(5) }; + var resultA = await _store.Operations.SendAsync( + new PutCompareExchangeValueOperation(key, lockA, sharedIndex)); + var resultB = await _store.Operations.SendAsync( + new PutCompareExchangeValueOperation(key, lockB, sharedIndex)); + + // Exactly one succeeds (the one whose request wins the network race) + // The other fails because the lock index was bumped by the first + resultA.Successful.ShouldNotBe(resultB.Successful, + "Exactly one concurrent renewal must succeed - proves the race on the shared index"); + + // The FAILING caller simulates what happens in stepDownAsync: + // it tries to delete the lock value using its stale sharedIndex + // This delete FAILS because the lock value has a new index + var staleDelete = await _store.Operations.SendAsync( + new DeleteCompareExchangeValueOperation(key, sharedIndex)); + staleDelete.Successful.ShouldBeFalse(); + + // === Step 3: Cleanup - delete with actual current index === + var currentIndex = resultA.Successful ? resultA.Index : resultB.Index; + var cleanDelete = await _store.Operations.SendAsync( + new DeleteCompareExchangeValueOperation(key, currentIndex)); + cleanDelete.Successful.ShouldBeTrue("Cleanup delete succeeds"); + } + + [Fact] + public async Task concurrent_DoHealthChecksAsync_guard_prevents_spurious_stepdown() + { + // Wolverine-level race test: the Interlocked guard in + // NodeAgentController.DoHealthChecksAsync must prevent concurrent + // health check execution. Without the guard, two callers race on + // _lastLockIndex in TryAttainLeadershipLockAsync — one renewal + // succeeds, the other fails, triggering stepDownAsync and leadership + // loss. With the guard, the second caller returns Empty immediately + // and the leader stays leader. + + using var balancedHost = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.Services.AddSingleton(_store); + opts.Durability.Mode = DurabilityMode.Balanced; + opts.Durability.HealthCheckPollingTime = 10.Minutes(); + opts.Durability.CheckAssignmentPeriod = 10.Minutes(); + opts.Durability.FirstHealthCheckExecution = 10.Minutes(); + opts.ServiceName = "race-test"; + opts.UseRavenDbPersistence(); + opts.UseTcpForControlEndpoint(); + }).StartAsync(); + + var runtime = balancedHost.GetRuntime(); + await runtime.DoHealthChecksAsync(); + runtime.IsLeader().ShouldBeTrue("Host must be leader BEFORE"); + runtime.Storage.Nodes.HasLeadershipLock().ShouldBeTrue("Leadership lock must be true BEFORE"); + + await Task.WhenAll(Enumerable.Repeat(1, 10).Select(_ => runtime.DoHealthChecksAsync())); + + runtime.IsLeader().ShouldBeTrue("Host must be leader AFTER"); + runtime.Storage.Nodes.HasLeadershipLock().ShouldBeTrue("Leadership lock must be true AFTER"); + } +} diff --git a/src/Testing/CoreTests/Runtime/Agents/leader_election_self_visibility_tests.cs b/src/Testing/CoreTests/Runtime/Agents/leader_election_self_visibility_tests.cs index ca636491c..4258bcb25 100644 --- a/src/Testing/CoreTests/Runtime/Agents/leader_election_self_visibility_tests.cs +++ b/src/Testing/CoreTests/Runtime/Agents/leader_election_self_visibility_tests.cs @@ -2,9 +2,7 @@ using JasperFx.Core; using Microsoft.Extensions.Logging.Abstractions; using NSubstitute; -using Shouldly; using Wolverine.Configuration; -using Wolverine.Logging; using Wolverine.Runtime; using Wolverine.Runtime.Agents; using Xunit; @@ -163,4 +161,36 @@ public async Task evaluate_assignments_injects_self_when_caller_passes_non_empty _controller.LastAssignments!.Nodes .ShouldContain(n => n.NodeId == _options.UniqueNodeId); } + + [Fact] + public async Task reentrancy_guard_prevents_concurrent_DoHealthChecksAsync() + { + // Regression coverage for the _lastLockIndex race in lease-based + // backends (RavenDB, CosmosDB). The heartbeat loop and + // CheckAgentHealth message can call DoHealthChecksAsync concurrently. + // The Interlocked.CompareExchange guard must let only one execution + // through; the other calls must return AgentCommands.Empty + // immediately instead of racing on shared mutable state. + + _persistence.HasLeadershipLock().Returns(false); + _persistence.TryAttainLeadershipLockAsync(Arg.Any()) + .Returns(async _ => + { + // Yield so the other concurrent DoHealthChecksAsync calls + // start before this one completes, exercising the guard. + await Task.Yield(); + return true; + }); + _persistence.LoadNodeAgentStateAsync(Arg.Any()) + .Returns(new NodeAgentState( + [SelfRow(DateTimeOffset.UtcNow)], + new AgentRestrictions())); + + await Task.WhenAll(Enumerable.Repeat(1, 10) + .Select(_ => _controller.DoHealthChecksAsync())); + + await _persistence.Received(1) + .TryAttainLeadershipLockAsync(Arg.Any()); + _controller.IsLeader.ShouldBeTrue(); + } } diff --git a/src/Testing/Wolverine.ComplianceTests/LeadershipElectionCompliance.cs b/src/Testing/Wolverine.ComplianceTests/LeadershipElectionCompliance.cs index b7cb74fa0..a68b2170f 100644 --- a/src/Testing/Wolverine.ComplianceTests/LeadershipElectionCompliance.cs +++ b/src/Testing/Wolverine.ComplianceTests/LeadershipElectionCompliance.cs @@ -1,6 +1,5 @@ using System.Collections.Concurrent; using JasperFx.Core; -using JasperFx.Core.Reflection; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; @@ -71,6 +70,11 @@ public IHost FindHostRunning(Uri agentUri) protected abstract void configureNode(WolverineOptions options); protected async Task startHostAsync() + { + return await startHostAsync(null); + } + + protected async Task startHostAsync(Action? configure) { var host = await Host.CreateDefaultBuilder() .UseWolverine(opts => @@ -83,6 +87,7 @@ protected async Task startHostAsync() opts.Services.AddSingleton(); configureNode(opts); + configure?.Invoke(opts); opts.Services.AddSingleton(new OutputLoggerProvider(_output)); opts.Services.AddResourceSetupOnStartup(); @@ -160,16 +165,21 @@ public async Task singular_agent_is_only_running_on_one() _hosts.SelectMany(x => x.RunningAgents()).Where(x => x == uri) .Count().ShouldBe(1); + } + + private static void ConfigureSlowHeartbeat(WolverineOptions o) + { + o.Durability.HealthCheckPollingTime = 10.Minutes(); + o.Durability.StaleNodeTimeout = 15.Minutes(); } - + [Fact] public async Task leader_switchover_between_nodes() { await _originalHost.WaitUntilAssumesLeadershipAsync(5.Seconds()); - var host2 = await startHostAsync(); - var host3 = await startHostAsync(); - var host4 = await startHostAsync(); + var host3 = await startHostAsync(ConfigureSlowHeartbeat); + var host4 = await startHostAsync(ConfigureSlowHeartbeat); await shutdownHostAsync(_originalHost); @@ -177,6 +187,9 @@ public async Task leader_switchover_between_nodes() await shutdownHostAsync(host2); + // host3 has a 10-minute heartbeat, so trigger a health check + // explicitly to acquire the lock after host2 releases it. + await host3.InvokeMessageAndWaitAsync(new CheckAgentHealth()); await host3.WaitUntilAssumesLeadershipAsync(30.Seconds()); } @@ -264,6 +277,48 @@ public async Task take_over_leader_ship_if_leader_becomes_stale() { await _originalHost.WaitUntilAssumesLeadershipAsync(5.Seconds()); + // All remaining nodes use a 10-minute heartbeat interval so no + // background loop races for the leadership lock after host1 is + // disabled. The sole trigger is the CheckAgentHealth message + // sent explicitly to host2 below. + var host2 = await startHostAsync(ConfigureSlowHeartbeat); + var host3 = await startHostAsync(ConfigureSlowHeartbeat); + var host4 = await startHostAsync(ConfigureSlowHeartbeat); + + // This is just to eliminate some errors in test output + await _originalHost.WaitUntilAssignmentsChangeTo(w => + { + w.ExpectRunningAgents(_originalHost, 3); + w.ExpectRunningAgents(host2, 3); + w.ExpectRunningAgents(host3, 3); + w.ExpectRunningAgents(host4, 3); + }, 30.Seconds()); + + await _originalHost.GetRuntime().DisableAgentsAsync(DateTimeOffset.UtcNow.AddHours(-1)); + + // No background heartbeats can interfere - the triggered + // CheckAgentHealth is the only path acquiring the lock. + await host2.InvokeMessageAndWaitAsync(new CheckAgentHealth()); + await host2.WaitUntilAssumesLeadershipAsync(15.Seconds()); + + await host2.WaitUntilAssignmentsChangeTo(w => + { + w.ExpectRunningAgents(host2, 4); + w.ExpectRunningAgents(host3, 4); + w.ExpectRunningAgents(host4, 4); + }, 30.Seconds()); + } + + [Fact] + public async Task take_over_leader_ship_if_leader_becomes_stale_with_racing_nodes() + { + // Realistic scenario: the leader is disabled and the remaining + // nodes compete for leadership through their natural 1-second + // heartbeat loops. No explicit CheckAgentHealth messages. + // Any node may win; we just verify one does and the cluster + // rebalances correctly. + await _originalHost.WaitUntilAssumesLeadershipAsync(5.Seconds()); + var host2 = await startHostAsync(); var host3 = await startHostAsync(); var host4 = await startHostAsync(); @@ -278,10 +333,11 @@ await _originalHost.WaitUntilAssignmentsChangeTo(w => }, 30.Seconds()); await _originalHost.GetRuntime().DisableAgentsAsync(DateTimeOffset.UtcNow.AddHours(-1)); - - await host2.InvokeMessageAndWaitAsync(new CheckAgentHealth()); - await host2.WaitUntilAssumesLeadershipAsync(15.Seconds()); + // Wait for any of the remaining 3 nodes to assume leadership + // via their natural heartbeat loop. With all nodes on a 1s + // interval, at least one should grab the lock before timeout + await WaitForAnyLeadershipAsync([host2, host3, host4], 15.Seconds()); await host2.WaitUntilAssignmentsChangeTo(w => { @@ -291,6 +347,17 @@ await host2.WaitUntilAssignmentsChangeTo(w => }, 30.Seconds()); } + private static async Task WaitForAnyLeadershipAsync(IHost[] hosts, TimeSpan timeout) + { + using var cts = new CancellationTokenSource(timeout); + while (!cts.IsCancellationRequested) + { + if (hosts.Any(h => h.GetRuntime().IsLeader())) return; + await Task.Delay(25.Milliseconds(), cts.Token); + } + throw new TimeoutException($"No node assumed leadership within {timeout}"); + } + [Fact] public async Task persist_and_load_node_records() { diff --git a/src/Wolverine/Runtime/Agents/NodeAgentController.HeartBeat.cs b/src/Wolverine/Runtime/Agents/NodeAgentController.HeartBeat.cs index 88013de23..c08538c69 100644 --- a/src/Wolverine/Runtime/Agents/NodeAgentController.HeartBeat.cs +++ b/src/Wolverine/Runtime/Agents/NodeAgentController.HeartBeat.cs @@ -27,12 +27,31 @@ public partial class NodeAgentController public async Task DoHealthChecksAsync() { if (_cancellation.IsCancellationRequested) - { return AgentCommands.Empty; + + // Re-entrancy guard: the heartbeat loop and a CheckAgentHealth + // message handler can call DoHealthChecksAsync concurrently. + // Without this, two callers race on shared mutable state in + // TryAttainLeadershipLockAsync (_lastLockIndex / _lastLockETag), + // causing one renewal to fail and triggering a spurious stepdown. + // Non-blocking: if a health check is already in flight, skip. + if (Interlocked.CompareExchange(ref _healthCheckGuard, 1, 0) != 0) + return AgentCommands.Empty; + + try + { + return await DoHealthChecksInternalAsync(); } - - using var activity = ShouldTraceHealthCheck() - ? WolverineTracing.ActivitySource.StartActivity("wolverine_node_assignments") + finally + { + Interlocked.Exchange(ref _healthCheckGuard, 0); + } + } + + private async Task DoHealthChecksInternalAsync() + { + using var activity = ShouldTraceHealthCheck() + ? WolverineTracing.ActivitySource.StartActivity("wolverine_node_assignments") : null; // write health check regardless, and due to GH-1232, pass in the whole node so you can do an upsert diff --git a/src/Wolverine/Runtime/Agents/NodeAgentController.cs b/src/Wolverine/Runtime/Agents/NodeAgentController.cs index 50e844e32..ff4871dde 100644 --- a/src/Wolverine/Runtime/Agents/NodeAgentController.cs +++ b/src/Wolverine/Runtime/Agents/NodeAgentController.cs @@ -1,9 +1,6 @@ using System.Collections.Concurrent; using JasperFx; -using JasperFx.Core; using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; -using Wolverine.Persistence.Durability; using Wolverine.Transports; namespace Wolverine.Runtime.Agents; @@ -28,6 +25,13 @@ private readonly Dictionary private readonly IWolverineObserver _observer; private DateTimeOffset? _lastNodeAssignmentHealthCheckTrace; + // 0=free, 1=busy; guards against concurrent DoHealthChecksAsync calls + // from the heartbeat loop and a CheckAgentHealth message arriving + // simultaneously. Prevents a race on _lastLockIndex / _lastLockETag in + // lease-based backends (RavenDb, CosmosDb) that would corrupt the + // leadership lock. + private int _healthCheckGuard; + private bool ShouldTraceHealthCheck() { if (!_runtime.DurabilitySettings.NodeAssignmentHealthCheckTracingEnabled)