diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index a38430e0901..340970dfef3 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -8495,6 +8495,9 @@ func TestJetStreamClusterSnapshotStreamAssetOnShutdown(t *testing.T) { // Publish, so we have something new to snapshot. _, err = js.Publish("foo", nil) require_NoError(t, err) + checkFor(t, 2*time.Second, 200*time.Millisecond, func() error { + return checkState(t, c, globalAccountName, "TEST") + }) // Shutdown servers, and check if all made stream snapshots. for _, s := range c.servers { diff --git a/server/raft.go b/server/raft.go index e90248d3d71..79623e09b34 100644 --- a/server/raft.go +++ b/server/raft.go @@ -188,7 +188,7 @@ type raft struct { isSysAcc atomic.Bool // Are we utilizing the system account? maybeLeader bool // The group had a preferred leader. And is maybe already acting as leader prior to scale up. - observer bool // The node is observing, i.e. not participating in voting + observer bool // The node is observing, i.e. not able to become leader extSt extensionState // Extension state @@ -3015,6 +3015,12 @@ func (n *raft) trackResponse(ar *appendEntryResponse) { n.Lock() + // Check state under lock, we might not be leader anymore. + if n.State() != Leader { + n.Unlock() + return + } + // Update peer's last index. if ps := n.peers[ar.peer]; ps != nil && ar.index > ps.li { ps.li = ar.index @@ -3675,8 +3681,10 @@ CONTINUE: } } + // Only ever respond to new entries. + // Never respond to catchup messages, because providing quorum based on this is unsafe. var ar *appendEntryResponse - if sub != nil { + if sub != nil && isNew { ar = newAppendEntryResponse(n.pterm, n.pindex, n.id, true) } n.Unlock() diff --git a/server/raft_helpers_test.go b/server/raft_helpers_test.go index eaf8acc0512..7b583f4ff69 100644 --- a/server/raft_helpers_test.go +++ b/server/raft_helpers_test.go @@ -18,6 +18,7 @@ package server import ( "encoding/binary" + "errors" "fmt" "math/rand" "sync" @@ -329,14 +330,15 @@ func (a *stateAdder) snapshot(t *testing.T) { func (rg smGroup) waitOnTotal(t *testing.T, expected int64) { t.Helper() checkFor(t, 5*time.Second, 200*time.Millisecond, func() error { + var err error for _, sm := range rg { asm := sm.(*stateAdder) if total := asm.total(); total != expected { - return fmt.Errorf("Adder on %v has wrong total: %d vs %d", - asm.server(), total, expected) + err = errors.Join(err, fmt.Errorf("Adder on %v has wrong total: %d vs %d", + asm.server(), total, expected)) } } - return nil + return err }) } diff --git a/server/raft_test.go b/server/raft_test.go index 2f6015fd3b1..ac972f12ca2 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -23,6 +23,7 @@ import ( "os" "path" "path/filepath" + "strings" "testing" "time" @@ -1566,6 +1567,7 @@ func TestNRGSnapshotAndTruncateToApplied(t *testing.T) { n.Applied(1) // Send heartbeat, which commits the second message. + n.switchToLeader() n.processAppendEntryResponse(&appendEntryResponse{ term: aeHeartbeat1.term, index: aeHeartbeat1.pindex, @@ -1597,6 +1599,7 @@ func TestNRGSnapshotAndTruncateToApplied(t *testing.T) { require_Equal(t, entry.leader, nats0) // Receive heartbeat from new leader, should not lose commits. + n.stepdown(noLeader) n.processAppendEntry(aeHeartbeat2, n.aesub) require_Equal(t, n.wal.State().Msgs, 0) require_Equal(t, n.commit, 2) @@ -2363,6 +2366,95 @@ func TestNRGSignalLeadChangeFalseIfCampaignImmediately(t *testing.T) { } } +func TestNRGCatchupDontCountTowardQuorum(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + // Create a sample entry, the content doesn't matter, just that it's stored. + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + + nats0 := "S1Nunr6R" // "nats-0" + + aeReply := "$TEST" + nc, err := nats.Connect(n.s.ClientURL(), nats.UserInfo("admin", "s3cr3t!")) + require_NoError(t, err) + defer nc.Close() + + sub, err := nc.SubscribeSync(aeReply) + require_NoError(t, err) + defer sub.Drain() + + // Timeline + aeMissedMsg := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries, reply: aeReply}) + ae := appendEntry{leader: nats0, term: 1, commit: 0, pterm: 1, pindex: 1, entries: entries, reply: aeReply} + aeCatchupTrigger := encode(t, &ae) + aeHeartbeat := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 1, pindex: 2, entries: nil, reply: aeReply}) + + // Simulate we missed all messages up to this point. + n.processAppendEntry(aeCatchupTrigger, n.aesub) + require_True(t, n.catchup != nil) + require_Equal(t, n.catchup.pterm, 0) // n.pterm + require_Equal(t, n.catchup.pindex, 0) // n.pindex + require_Equal(t, n.catchup.cterm, ae.pterm) + require_Equal(t, n.catchup.cindex, ae.pindex) + + // Should reply we require catchup. + msg, err := sub.NextMsg(time.Second) + require_NoError(t, err) + ar := n.decodeAppendEntryResponse(msg.Data) + require_Equal(t, ar.index, 0) + require_False(t, ar.success) + require_True(t, strings.HasPrefix(msg.Reply, "$NRG.CR")) + + // Should NEVER respond to catchup messages. + n.processAppendEntry(aeMissedMsg, n.catchup.sub) + _, err = sub.NextMsg(time.Second) + require_Error(t, err, nats.ErrTimeout) + + n.processAppendEntry(aeCatchupTrigger, n.catchup.sub) + _, err = sub.NextMsg(time.Second) + require_Error(t, err, nats.ErrTimeout) + + // Now we've received all messages, stop catchup, and respond success to new message. + n.processAppendEntry(aeHeartbeat, n.aesub) + msg, err = sub.NextMsg(time.Second) + require_NoError(t, err) + ar = n.decodeAppendEntryResponse(msg.Data) + require_Equal(t, ar.index, aeHeartbeat.pindex) + require_True(t, ar.success) + require_Equal(t, msg.Reply, _EMPTY_) +} + +func TestNRGIgnoreTrackResponseWhenNotLeader(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + // Create a sample entry, the content doesn't matter, just that it's stored. + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + + // Switch this node to leader, and send two entries. The first will get quorum, the second will not. + n.term++ + n.switchToLeader() + require_Equal(t, n.term, 1) + require_Equal(t, n.pindex, 0) + n.sendAppendEntry(entries) + require_Equal(t, n.pindex, 1) + require_Equal(t, n.pterm, 1) + require_Equal(t, n.commit, 0) + + // Step down + n.stepdown(noLeader) + require_Equal(t, n.pindex, 1) + require_Equal(t, n.pterm, 1) + require_Equal(t, n.commit, 0) + + // Normally would commit the entry, but since we're not leader anymore we should ignore it. + n.trackResponse(&appendEntryResponse{1, 1, "peer", _EMPTY_, true}) + require_Equal(t, n.commit, 0) +} + // This is a RaftChainOfBlocks test where a block is proposed and then we wait for all replicas to apply it before // proposing the next one. // The test may fail if: