From 1ac5ba54c138ea63d88b99949617d2aaf10d6476 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Tue, 4 Nov 2025 18:44:38 +0100 Subject: [PATCH] NRG: Install leader snapshot on scaleup Signed-off-by: Maurice van Veen --- server/jetstream_cluster_3_test.go | 2 +- server/raft.go | 11 ++++++++- server/raft_test.go | 39 ++++++++++++++++++++++++++++++ 3 files changed, 50 insertions(+), 2 deletions(-) diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index a9851482f7f..912f041fe31 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -4744,7 +4744,7 @@ func TestJetStreamClusterSnapshotAndRestoreWithHealthz(t *testing.T) { checkHealth := func() { t.Helper() - checkFor(t, 2*time.Second, 200*time.Millisecond, func() error { + checkFor(t, 10*time.Second, 200*time.Millisecond, func() error { for _, s := range c.servers { status := s.healthz(nil) if status.Error != _EMPTY_ { diff --git a/server/raft.go b/server/raft.go index 163d3e6e061..547be9076a7 100644 --- a/server/raft.go +++ b/server/raft.go @@ -1118,11 +1118,11 @@ func (n *raft) ResumeApply() { func (n *raft) DrainAndReplaySnapshot() bool { n.Lock() defer n.Unlock() - n.warn("Draining and replaying snapshot") snap, err := n.loadLastSnapshot() if err != nil { return false } + n.warn("Draining and replaying snapshot") n.pauseApplyLocked() n.apply.drain() n.commit = snap.lastIndex @@ -3069,6 +3069,15 @@ func (n *raft) applyCommit(index uint64) error { committed = append(committed, newEntry(EntrySnapshot, e.Data)) case EntrySnapshot: committed = append(committed, e) + // If we have no snapshot, install the leader's snapshot as our own. + if len(ae.entries) == 1 && n.snapfile == _EMPTY_ && ae.commit > 0 { + n.installSnapshot(&snapshot{ + lastTerm: ae.pterm, + lastIndex: ae.commit, + peerstate: encodePeerState(&peerState{n.peerNames(), n.csz, n.extSt}), + data: e.Data, + }) + } case EntryPeerState: if n.State() != Leader { if ps, err := decodePeerState(e.Data); err == nil { diff --git a/server/raft_test.go b/server/raft_test.go index 2ae33c73b6f..076ca623149 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -3764,6 +3764,45 @@ func TestNRGReportLeaderAfterNoopEntry(t *testing.T) { require_True(t, n.Leader()) } +func TestNRGSendSnapshotInstallsSnapshot(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + require_Equal(t, n.pindex, 0) + require_Equal(t, n.snapfile, _EMPTY_) + + // Switch to candidate, to become leader. + require_Equal(t, n.term, 0) + n.switchToCandidate() + require_Equal(t, n.term, 1) + + // When switching to leader a NOOP-entry is sent. + n.switchToLeader() + require_Equal(t, n.pindex, 1) + require_Equal(t, n.snapfile, _EMPTY_) + require_NoError(t, n.applyCommit(1)) + require_Equal(t, n.snapfile, _EMPTY_) + + // On scaleup, we send a snapshot. + require_NoError(t, n.SendSnapshot([]byte("snapshot_data"))) + require_Equal(t, n.pindex, 2) + require_Equal(t, n.snapfile, _EMPTY_) + + // When applying the entry, the sent snapshot should be installed. + require_NoError(t, n.applyCommit(2)) + require_NotEqual(t, n.snapfile, _EMPTY_) + + snap, err := n.loadLastSnapshot() + require_NoError(t, err) + require_NotNil(t, snap) + require_Equal(t, snap.lastTerm, 1) + require_Equal(t, snap.lastIndex, 1) + require_Equal(t, string(snap.data), "snapshot_data") + + // Draining and replaying the snapshot should work. + require_True(t, n.DrainAndReplaySnapshot()) +} + // This is a RaftChainOfBlocks test where a block is proposed and then we wait for all replicas to apply it before // proposing the next one. // The test may fail if: