Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/jetstream_cluster_3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4744,7 +4744,7 @@ func TestJetStreamClusterSnapshotAndRestoreWithHealthz(t *testing.T) {

checkHealth := func() {
t.Helper()
checkFor(t, 2*time.Second, 200*time.Millisecond, func() error {
checkFor(t, 10*time.Second, 200*time.Millisecond, func() error {
for _, s := range c.servers {
status := s.healthz(nil)
if status.Error != _EMPTY_ {
Expand Down
11 changes: 10 additions & 1 deletion server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1118,11 +1118,11 @@ func (n *raft) ResumeApply() {
func (n *raft) DrainAndReplaySnapshot() bool {
n.Lock()
defer n.Unlock()
n.warn("Draining and replaying snapshot")
snap, err := n.loadLastSnapshot()
if err != nil {
return false
}
n.warn("Draining and replaying snapshot")
n.pauseApplyLocked()
n.apply.drain()
n.commit = snap.lastIndex
Expand Down Expand Up @@ -3069,6 +3069,15 @@ func (n *raft) applyCommit(index uint64) error {
committed = append(committed, newEntry(EntrySnapshot, e.Data))
case EntrySnapshot:
committed = append(committed, e)
// If we have no snapshot, install the leader's snapshot as our own.
if len(ae.entries) == 1 && n.snapfile == _EMPTY_ && ae.commit > 0 {
n.installSnapshot(&snapshot{
lastTerm: ae.pterm,
lastIndex: ae.commit,
peerstate: encodePeerState(&peerState{n.peerNames(), n.csz, n.extSt}),
data: e.Data,
})
}
case EntryPeerState:
if n.State() != Leader {
if ps, err := decodePeerState(e.Data); err == nil {
Expand Down
39 changes: 39 additions & 0 deletions server/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3764,6 +3764,45 @@ func TestNRGReportLeaderAfterNoopEntry(t *testing.T) {
require_True(t, n.Leader())
}

func TestNRGSendSnapshotInstallsSnapshot(t *testing.T) {
n, cleanup := initSingleMemRaftNode(t)
defer cleanup()

require_Equal(t, n.pindex, 0)
require_Equal(t, n.snapfile, _EMPTY_)

// Switch to candidate, to become leader.
require_Equal(t, n.term, 0)
n.switchToCandidate()
require_Equal(t, n.term, 1)

// When switching to leader a NOOP-entry is sent.
n.switchToLeader()
require_Equal(t, n.pindex, 1)
require_Equal(t, n.snapfile, _EMPTY_)
require_NoError(t, n.applyCommit(1))
require_Equal(t, n.snapfile, _EMPTY_)

// On scaleup, we send a snapshot.
require_NoError(t, n.SendSnapshot([]byte("snapshot_data")))
require_Equal(t, n.pindex, 2)
require_Equal(t, n.snapfile, _EMPTY_)

// When applying the entry, the sent snapshot should be installed.
require_NoError(t, n.applyCommit(2))
require_NotEqual(t, n.snapfile, _EMPTY_)

snap, err := n.loadLastSnapshot()
require_NoError(t, err)
require_NotNil(t, snap)
require_Equal(t, snap.lastTerm, 1)
require_Equal(t, snap.lastIndex, 1)
require_Equal(t, string(snap.data), "snapshot_data")

// Draining and replaying the snapshot should work.
require_True(t, n.DrainAndReplaySnapshot())
}

// This is a RaftChainOfBlocks test where a block is proposed and then we wait for all replicas to apply it before
// proposing the next one.
// The test may fail if:
Expand Down
Loading