diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 6928d3199ec..f090428f21f 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -3466,6 +3466,7 @@ func (js *jetStream) processStreamAssignment(sa *streamAssignment) bool { } else if mset, _ := acc.lookupStream(sa.Config.Name); mset != nil { // We have one here even though we are not a member. This can happen on re-assignment. s.removeStream(ourID, mset, sa) + didRemove = true } // If this stream assignment does not have a sync subject (bug) set that the meta-leader should check when elected. diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 2311638d576..eeb95924cd8 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -21,8 +21,11 @@ import ( "encoding/json" "errors" "fmt" + "io" + "io/fs" "math/rand" "os" + "path" "path/filepath" "runtime" "slices" @@ -4159,3 +4162,78 @@ func TestJetStreamClusterDesyncAfterCatchupTooManyRetries(t *testing.T) { newStreamLeaderServer := c.streamLeader(globalAccountName, "TEST") require_Equal(t, newStreamLeaderServer.Name(), clusterResetServerName) } + +func TestJetStreamClusterHardKillAfterStreamAdd(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + + copyDir := func(dst, src string) error { + srcFS := os.DirFS(src) + return fs.WalkDir(srcFS, ".", func(p string, d os.DirEntry, err error) error { + if err != nil { + return err + } + newPath := path.Join(dst, p) + if d.IsDir() { + return os.MkdirAll(newPath, defaultDirPerms) + } + r, err := srcFS.Open(p) + if err != nil { + return err + } + defer r.Close() + + w, err := os.OpenFile(newPath, os.O_CREATE|os.O_WRONLY, defaultFilePerms) + if err != nil { + return err + } + defer w.Close() + _, err = io.Copy(w, r) + return err + }) + } + + // Simulate being hard killed by: + // 1. copy directories before shutdown + copyToSrcMap := make(map[string]string) + for _, s := range c.servers { + sd := s.StoreDir() + copySd := path.Join(t.TempDir(), JetStreamStoreDir) + err = copyDir(copySd, sd) + require_NoError(t, err) + copyToSrcMap[copySd] = sd + } + + // 2. stop all + nc.Close() + c.stopAll() + + // 3. revert directories to before shutdown + for cp, dest := range copyToSrcMap { + err = os.RemoveAll(dest) + require_NoError(t, err) + err = copyDir(dest, cp) + require_NoError(t, err) + } + + // 4. restart + c.restartAll() + c.waitOnAllCurrent() + + nc, js = jsClientConnect(t, c.randomServer()) + defer nc.Close() + + // Stream should exist still and not be removed after hard killing all servers, so expect no error. + _, err = js.StreamInfo("TEST") + require_NoError(t, err) +} diff --git a/server/raft.go b/server/raft.go index ebbe845c42d..8bc60eaac2d 100644 --- a/server/raft.go +++ b/server/raft.go @@ -471,6 +471,8 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe truncateAndErr(index) break } + // ae.commit must be equal to index, otherwise applyCommit will not be run. + ae.commit = index n.processAppendEntry(ae, nil) // Check how much we have queued up so far to determine if we should pause. for _, e := range ae.entries {