Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
264 changes: 77 additions & 187 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"io/fs"
"math/rand"
"os"
"path"
Expand Down Expand Up @@ -2798,85 +2796,6 @@ func TestJetStreamClusterKeyValueDesyncAfterHardKill(t *testing.T) {
c.waitOnClusterReady()
c.waitOnAllCurrent()

getStreamDetails := func(t *testing.T, c *cluster, accountName, streamName string) *StreamDetail {
t.Helper()
srv := c.streamLeader(accountName, streamName)
if srv == nil {
return nil
}
jsz, err := srv.Jsz(&JSzOptions{Accounts: true, Streams: true, Consumer: true})
require_NoError(t, err)
for _, acc := range jsz.AccountDetails {
if acc.Name == accountName {
for _, stream := range acc.Streams {
if stream.Name == streamName {
return &stream
}
}
}
}
t.Error("Could not find account details")
return nil
}

checkState := func(t *testing.T, c *cluster, accountName, streamName string) error {
t.Helper()

leaderSrv := c.streamLeader(accountName, streamName)
if leaderSrv == nil {
return fmt.Errorf("no leader server found for stream %q", streamName)
}
streamLeader := getStreamDetails(t, c, accountName, streamName)
if streamLeader == nil {
return fmt.Errorf("no leader found for stream %q", streamName)
}
var errs []error
for _, srv := range c.servers {
if srv == leaderSrv {
// Skip self
continue
}
acc, err := srv.LookupAccount(accountName)
require_NoError(t, err)
stream, err := acc.lookupStream(streamName)
require_NoError(t, err)
state := stream.state()

if state.Msgs != streamLeader.State.Msgs {
err := fmt.Errorf("[%s] Leader %v has %d messages, Follower %v has %d messages",
streamName, leaderSrv, streamLeader.State.Msgs,
srv, state.Msgs,
)
errs = append(errs, err)
}
if state.FirstSeq != streamLeader.State.FirstSeq {
err := fmt.Errorf("[%s] Leader %v FirstSeq is %d, Follower %v is at %d",
streamName, leaderSrv, streamLeader.State.FirstSeq,
srv, state.FirstSeq,
)
errs = append(errs, err)
}
if state.LastSeq != streamLeader.State.LastSeq {
err := fmt.Errorf("[%s] Leader %v LastSeq is %d, Follower %v is at %d",
streamName, leaderSrv, streamLeader.State.LastSeq,
srv, state.LastSeq,
)
errs = append(errs, err)
}
if state.NumDeleted != streamLeader.State.NumDeleted {
err := fmt.Errorf("[%s] Leader %v NumDeleted is %d, Follower %v is at %d",
streamName, leaderSrv, streamLeader.State.NumDeleted,
srv, state.NumDeleted,
)
errs = append(errs, err)
}
}
if len(errs) > 0 {
return errors.Join(errs...)
}
return nil
}

err = checkState(t, c, "$G", "KV_inconsistency")
require_NoError(t, err)
}
Expand Down Expand Up @@ -2935,84 +2854,6 @@ func TestJetStreamClusterKeyValueSync(t *testing.T) {
var counter int64
var errorCounter int64

getStreamDetails := func(t *testing.T, c *cluster, accountName, streamName string) *StreamDetail {
t.Helper()
srv := c.streamLeader(accountName, streamName)
if srv == nil {
return nil
}
jsz, err := srv.Jsz(&JSzOptions{Accounts: true, Streams: true, Consumer: true})
require_NoError(t, err)
for _, acc := range jsz.AccountDetails {
if acc.Name == accountName {
for _, stream := range acc.Streams {
if stream.Name == streamName {
return &stream
}
}
}
}
t.Error("Could not find account details")
return nil
}
checkState := func(t *testing.T, c *cluster, accountName, streamName string) error {
t.Helper()

leaderSrv := c.streamLeader(accountName, streamName)
if leaderSrv == nil {
return fmt.Errorf("no leader server found for stream %q", streamName)
}
streamLeader := getStreamDetails(t, c, accountName, streamName)
if streamLeader == nil {
return fmt.Errorf("no leader found for stream %q", streamName)
}
var errs []error
for _, srv := range c.servers {
if srv == leaderSrv {
// Skip self
continue
}
acc, err := srv.LookupAccount(accountName)
require_NoError(t, err)
stream, err := acc.lookupStream(streamName)
require_NoError(t, err)
state := stream.state()

if state.Msgs != streamLeader.State.Msgs {
err := fmt.Errorf("[%s] Leader %v has %d messages, Follower %v has %d messages",
streamName, leaderSrv, streamLeader.State.Msgs,
srv, state.Msgs,
)
errs = append(errs, err)
}
if state.FirstSeq != streamLeader.State.FirstSeq {
err := fmt.Errorf("[%s] Leader %v FirstSeq is %d, Follower %v is at %d",
streamName, leaderSrv, streamLeader.State.FirstSeq,
srv, state.FirstSeq,
)
errs = append(errs, err)
}
if state.LastSeq != streamLeader.State.LastSeq {
err := fmt.Errorf("[%s] Leader %v LastSeq is %d, Follower %v is at %d",
streamName, leaderSrv, streamLeader.State.LastSeq,
srv, state.LastSeq,
)
errs = append(errs, err)
}
if state.NumDeleted != streamLeader.State.NumDeleted {
err := fmt.Errorf("[%s] Leader %v NumDeleted is %d, Follower %v is at %d\nSTATE_A: %+v\nSTATE_B: %+v\n",
streamName, leaderSrv, streamLeader.State.NumDeleted,
srv, state.NumDeleted, streamLeader.State, state,
)
errs = append(errs, err)
}
}
if len(errs) > 0 {
return errors.Join(errs...)
}
return nil
}

checkMsgsEqual := func(t *testing.T, accountName, streamName string) error {
// Gather all the streams replicas and compare contents.
msets := make(map[*Server]*stream)
Expand Down Expand Up @@ -4218,39 +4059,13 @@ func TestJetStreamClusterHardKillAfterStreamAdd(t *testing.T) {
})
require_NoError(t, err)

copyDir := func(dst, src string) error {
srcFS := os.DirFS(src)
return fs.WalkDir(srcFS, ".", func(p string, d os.DirEntry, err error) error {
if err != nil {
return err
}
newPath := path.Join(dst, p)
if d.IsDir() {
return os.MkdirAll(newPath, defaultDirPerms)
}
r, err := srcFS.Open(p)
if err != nil {
return err
}
defer r.Close()

w, err := os.OpenFile(newPath, os.O_CREATE|os.O_WRONLY, defaultFilePerms)
if err != nil {
return err
}
defer w.Close()
_, err = io.Copy(w, r)
return err
})
}

// Simulate being hard killed by:
// 1. copy directories before shutdown
copyToSrcMap := make(map[string]string)
for _, s := range c.servers {
sd := s.StoreDir()
copySd := path.Join(t.TempDir(), JetStreamStoreDir)
err = copyDir(copySd, sd)
err = copyDir(t, copySd, sd)
require_NoError(t, err)
copyToSrcMap[copySd] = sd
}
Expand All @@ -4263,7 +4078,7 @@ func TestJetStreamClusterHardKillAfterStreamAdd(t *testing.T) {
for cp, dest := range copyToSrcMap {
err = os.RemoveAll(dest)
require_NoError(t, err)
err = copyDir(dest, cp)
err = copyDir(t, dest, cp)
require_NoError(t, err)
}

Expand Down Expand Up @@ -4452,3 +4267,78 @@ func TestJetStreamClusterPreserveWALDuringCatchupWithMatchingTerm(t *testing.T)
}
}
}

func TestJetStreamClusterDesyncAfterRestartReplacesLeaderSnapshot(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)

// Reconnect to the leader.
leader := c.streamLeader(globalAccountName, "TEST")
nc.Close()
nc, js = jsClientConnect(t, leader)
defer nc.Close()

lookupStream := func(s *Server) *stream {
t.Helper()
acc, err := s.lookupAccount(globalAccountName)
require_NoError(t, err)
mset, err := acc.lookupStream("TEST")
require_NoError(t, err)
return mset
}

// Stop one follower so it lags behind.
rs := c.randomNonStreamLeader(globalAccountName, "TEST")
mset := lookupStream(rs)
n := mset.node.(*raft)
followerSnapshots := path.Join(n.sd, snapshotsDir)
rs.Shutdown()
rs.WaitForShutdown()

// Move the stream forward so the follower requires a snapshot.
err = js.PurgeStream("TEST", &nats.StreamPurgeRequest{Sequence: 10})
require_NoError(t, err)
_, err = js.Publish("foo", nil)
require_NoError(t, err)

// Install a snapshot on the leader, ensuring RAFT entries are compacted and a snapshot remains.
mset = lookupStream(leader)
n = mset.node.(*raft)
err = n.InstallSnapshot(mset.stateSnapshot())
require_NoError(t, err)

c.stopAll()

// Replace follower snapshot with the leader's.
// This simulates the follower coming online, getting a snapshot from the leader after which it goes offline.
leaderSnapshots := path.Join(n.sd, snapshotsDir)
err = os.RemoveAll(followerSnapshots)
require_NoError(t, err)
err = copyDir(t, followerSnapshots, leaderSnapshots)
require_NoError(t, err)

// Start the follower, it will load the snapshot from the leader.
rs = c.restartServer(rs)

// Shutting down must check that the leader's snapshot is not overwritten.
rs.Shutdown()
rs.WaitForShutdown()

// Now start all servers back up.
c.restartAll()
c.waitOnAllCurrent()

checkFor(t, 10*time.Second, 500*time.Millisecond, func() error {
return checkState(t, c, globalAccountName, "TEST")
})
}
Loading