Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions server/jetstream_cluster_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8495,6 +8495,9 @@ func TestJetStreamClusterSnapshotStreamAssetOnShutdown(t *testing.T) {
// Publish, so we have something new to snapshot.
_, err = js.Publish("foo", nil)
require_NoError(t, err)
checkFor(t, 2*time.Second, 200*time.Millisecond, func() error {
return checkState(t, c, globalAccountName, "TEST")
})

// Shutdown servers, and check if all made stream snapshots.
for _, s := range c.servers {
Expand Down
12 changes: 10 additions & 2 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ type raft struct {
isSysAcc atomic.Bool // Are we utilizing the system account?
maybeLeader bool // The group had a preferred leader. And is maybe already acting as leader prior to scale up.

observer bool // The node is observing, i.e. not participating in voting
observer bool // The node is observing, i.e. not able to become leader

extSt extensionState // Extension state

Expand Down Expand Up @@ -3015,6 +3015,12 @@ func (n *raft) trackResponse(ar *appendEntryResponse) {

n.Lock()

// Check state under lock, we might not be leader anymore.
if n.State() != Leader {
n.Unlock()
return
}

// Update peer's last index.
if ps := n.peers[ar.peer]; ps != nil && ar.index > ps.li {
ps.li = ar.index
Expand Down Expand Up @@ -3675,8 +3681,10 @@ CONTINUE:
}
}

// Only ever respond to new entries.
// Never respond to catchup messages, because providing quorum based on this is unsafe.
var ar *appendEntryResponse
if sub != nil {
if sub != nil && isNew {
ar = newAppendEntryResponse(n.pterm, n.pindex, n.id, true)
}
n.Unlock()
Expand Down
8 changes: 5 additions & 3 deletions server/raft_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package server

import (
"encoding/binary"
"errors"
"fmt"
"math/rand"
"sync"
Expand Down Expand Up @@ -329,14 +330,15 @@ func (a *stateAdder) snapshot(t *testing.T) {
func (rg smGroup) waitOnTotal(t *testing.T, expected int64) {
t.Helper()
checkFor(t, 5*time.Second, 200*time.Millisecond, func() error {
var err error
for _, sm := range rg {
asm := sm.(*stateAdder)
if total := asm.total(); total != expected {
return fmt.Errorf("Adder on %v has wrong total: %d vs %d",
asm.server(), total, expected)
err = errors.Join(err, fmt.Errorf("Adder on %v has wrong total: %d vs %d",
asm.server(), total, expected))
}
}
return nil
return err
})
}

Expand Down
92 changes: 92 additions & 0 deletions server/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"os"
"path"
"path/filepath"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -1566,6 +1567,7 @@ func TestNRGSnapshotAndTruncateToApplied(t *testing.T) {
n.Applied(1)

// Send heartbeat, which commits the second message.
n.switchToLeader()
n.processAppendEntryResponse(&appendEntryResponse{
term: aeHeartbeat1.term,
index: aeHeartbeat1.pindex,
Expand Down Expand Up @@ -1597,6 +1599,7 @@ func TestNRGSnapshotAndTruncateToApplied(t *testing.T) {
require_Equal(t, entry.leader, nats0)

// Receive heartbeat from new leader, should not lose commits.
n.stepdown(noLeader)
n.processAppendEntry(aeHeartbeat2, n.aesub)
require_Equal(t, n.wal.State().Msgs, 0)
require_Equal(t, n.commit, 2)
Expand Down Expand Up @@ -2363,6 +2366,95 @@ func TestNRGSignalLeadChangeFalseIfCampaignImmediately(t *testing.T) {
}
}

func TestNRGCatchupDontCountTowardQuorum(t *testing.T) {
n, cleanup := initSingleMemRaftNode(t)
defer cleanup()

// Create a sample entry, the content doesn't matter, just that it's stored.
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
entries := []*Entry{newEntry(EntryNormal, esm)}

nats0 := "S1Nunr6R" // "nats-0"

aeReply := "$TEST"
nc, err := nats.Connect(n.s.ClientURL(), nats.UserInfo("admin", "s3cr3t!"))
require_NoError(t, err)
defer nc.Close()

sub, err := nc.SubscribeSync(aeReply)
require_NoError(t, err)
defer sub.Drain()

// Timeline
aeMissedMsg := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries, reply: aeReply})
ae := appendEntry{leader: nats0, term: 1, commit: 0, pterm: 1, pindex: 1, entries: entries, reply: aeReply}
aeCatchupTrigger := encode(t, &ae)
aeHeartbeat := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 1, pindex: 2, entries: nil, reply: aeReply})

// Simulate we missed all messages up to this point.
n.processAppendEntry(aeCatchupTrigger, n.aesub)
require_True(t, n.catchup != nil)
require_Equal(t, n.catchup.pterm, 0) // n.pterm
require_Equal(t, n.catchup.pindex, 0) // n.pindex
require_Equal(t, n.catchup.cterm, ae.pterm)
require_Equal(t, n.catchup.cindex, ae.pindex)

// Should reply we require catchup.
msg, err := sub.NextMsg(time.Second)
require_NoError(t, err)
ar := n.decodeAppendEntryResponse(msg.Data)
require_Equal(t, ar.index, 0)
require_False(t, ar.success)
require_True(t, strings.HasPrefix(msg.Reply, "$NRG.CR"))

// Should NEVER respond to catchup messages.
n.processAppendEntry(aeMissedMsg, n.catchup.sub)
_, err = sub.NextMsg(time.Second)
require_Error(t, err, nats.ErrTimeout)

n.processAppendEntry(aeCatchupTrigger, n.catchup.sub)
_, err = sub.NextMsg(time.Second)
require_Error(t, err, nats.ErrTimeout)

// Now we've received all messages, stop catchup, and respond success to new message.
n.processAppendEntry(aeHeartbeat, n.aesub)
msg, err = sub.NextMsg(time.Second)
require_NoError(t, err)
ar = n.decodeAppendEntryResponse(msg.Data)
require_Equal(t, ar.index, aeHeartbeat.pindex)
require_True(t, ar.success)
require_Equal(t, msg.Reply, _EMPTY_)
}

func TestNRGIgnoreTrackResponseWhenNotLeader(t *testing.T) {
n, cleanup := initSingleMemRaftNode(t)
defer cleanup()

// Create a sample entry, the content doesn't matter, just that it's stored.
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
entries := []*Entry{newEntry(EntryNormal, esm)}

// Switch this node to leader, and send two entries. The first will get quorum, the second will not.
n.term++
n.switchToLeader()
require_Equal(t, n.term, 1)
require_Equal(t, n.pindex, 0)
n.sendAppendEntry(entries)
require_Equal(t, n.pindex, 1)
require_Equal(t, n.pterm, 1)
require_Equal(t, n.commit, 0)

// Step down
n.stepdown(noLeader)
require_Equal(t, n.pindex, 1)
require_Equal(t, n.pterm, 1)
require_Equal(t, n.commit, 0)

// Normally would commit the entry, but since we're not leader anymore we should ignore it.
n.trackResponse(&appendEntryResponse{1, 1, "peer", _EMPTY_, true})
require_Equal(t, n.commit, 0)
}

// This is a RaftChainOfBlocks test where a block is proposed and then we wait for all replicas to apply it before
// proposing the next one.
// The test may fail if:
Expand Down
Loading