Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -3466,6 +3466,7 @@ func (js *jetStream) processStreamAssignment(sa *streamAssignment) bool {
} else if mset, _ := acc.lookupStream(sa.Config.Name); mset != nil {
// We have one here even though we are not a member. This can happen on re-assignment.
s.removeStream(ourID, mset, sa)
didRemove = true
}

// If this stream assignment does not have a sync subject (bug) set that the meta-leader should check when elected.
Expand Down
78 changes: 78 additions & 0 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"io/fs"
"math/rand"
"os"
"path"
"path/filepath"
"runtime"
"slices"
Expand Down Expand Up @@ -4159,3 +4162,78 @@ func TestJetStreamClusterDesyncAfterCatchupTooManyRetries(t *testing.T) {
newStreamLeaderServer := c.streamLeader(globalAccountName, "TEST")
require_Equal(t, newStreamLeaderServer.Name(), clusterResetServerName)
}

func TestJetStreamClusterHardKillAfterStreamAdd(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)

copyDir := func(dst, src string) error {
srcFS := os.DirFS(src)
return fs.WalkDir(srcFS, ".", func(p string, d os.DirEntry, err error) error {
if err != nil {
return err
}
newPath := path.Join(dst, p)
if d.IsDir() {
return os.MkdirAll(newPath, defaultDirPerms)
}
r, err := srcFS.Open(p)
if err != nil {
return err
}
defer r.Close()

w, err := os.OpenFile(newPath, os.O_CREATE|os.O_WRONLY, defaultFilePerms)
if err != nil {
return err
}
defer w.Close()
_, err = io.Copy(w, r)
return err
})
}

// Simulate being hard killed by:
// 1. copy directories before shutdown
copyToSrcMap := make(map[string]string)
for _, s := range c.servers {
sd := s.StoreDir()
copySd := path.Join(t.TempDir(), JetStreamStoreDir)
err = copyDir(copySd, sd)
require_NoError(t, err)
copyToSrcMap[copySd] = sd
}

// 2. stop all
nc.Close()
c.stopAll()

// 3. revert directories to before shutdown
for cp, dest := range copyToSrcMap {
err = os.RemoveAll(dest)
require_NoError(t, err)
err = copyDir(dest, cp)
require_NoError(t, err)
}

// 4. restart
c.restartAll()
c.waitOnAllCurrent()

nc, js = jsClientConnect(t, c.randomServer())
defer nc.Close()

// Stream should exist still and not be removed after hard killing all servers, so expect no error.
_, err = js.StreamInfo("TEST")
require_NoError(t, err)
}
2 changes: 2 additions & 0 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,8 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe
truncateAndErr(index)
break
}
// ae.commit must be equal to index, otherwise applyCommit will not be run.
ae.commit = index
n.processAppendEntry(ae, nil)
// Check how much we have queued up so far to determine if we should pause.
for _, e := range ae.entries {
Expand Down