Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions src/Persistence/RavenDbTests/leadership_locking.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using JasperFx.Core.Reflection;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Raven.Client.Documents;
Expand Down Expand Up @@ -112,4 +113,29 @@ public async Task expired_scheduled_job_lock_from_dead_predecessor_can_be_taken_
(await ravenStore.TryAttainScheduledJobLockAsync(CancellationToken.None))
.ShouldBeTrue();
}

[Fact]
public async Task try_attain_renews_the_server_side_lease_when_already_held()
{
// Calling TryAttain when the lease is already held must refresh the
// server-side entry — that's the contract the heartbeat relies on to
// keep the lease alive.
var store = _host.Services.GetService<IMessageStore>()!.As<RavenDbMessageStore>();
var lockId = "wolverine/leader/locking";

(await store.Nodes.TryAttainLeadershipLockAsync(CancellationToken.None)).ShouldBeTrue();
var initial = await _store.Operations.SendAsync(
new GetCompareExchangeValueOperation<DistributedLock>(lockId));

await Task.Delay(10);

(await store.Nodes.TryAttainLeadershipLockAsync(CancellationToken.None)).ShouldBeTrue();
var renewed = await _store.Operations.SendAsync(
new GetCompareExchangeValueOperation<DistributedLock>(lockId));

renewed.Index.ShouldBeGreaterThan(initial.Index);
renewed.Value.ExpirationTime.ShouldBeGreaterThan(initial.Value.ExpirationTime);

await store.Nodes.ReleaseLeadershipLockAsync();
}
}
20 changes: 14 additions & 6 deletions src/Wolverine/Runtime/Agents/NodeAgentController.HeartBeat.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,26 @@ public async Task<AgentCommands> DoHealthChecksAsync()
await stepDownAsync("the leadership advisory lock was released server-side");
}

if (_persistence.HasLeadershipLock())
{
IsLeader = true;
return await EvaluateAssignmentsAsync(nodes, restrictions);
}

// Always call TryAttainLeadershipLockAsync. If we're already leader it
// refreshes the lease so lease-based backends (RavenDb, Cosmos) don't
// age out and trigger a false stepdown; otherwise it's the normal
// election attempt.
try
{
if (await _persistence.TryAttainLeadershipLockAsync(_cancellation.Token))
{
if (IsLeader)
{
return await EvaluateAssignmentsAsync(nodes, restrictions);
}

return await tryStartLeadershipAsync(nodes, restrictions);
}

if (IsLeader)
{
await stepDownAsync("the leadership advisory lock could not be renewed");
}
}
catch (Exception e)
{
Expand Down
Loading