diff --git a/server/consumer.go b/server/consumer.go index 61d6420bd37..a30fb31dc8a 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1637,6 +1637,16 @@ func (o *consumer) updateDeliveryInterest(localInterest bool) bool { return false } +const ( + defaultConsumerNotActiveStartInterval = 30 * time.Second + defaultConsumerNotActiveMaxInterval = 5 * time.Minute +) + +var ( + consumerNotActiveStartInterval = defaultConsumerNotActiveStartInterval + consumerNotActiveMaxInterval = defaultConsumerNotActiveMaxInterval +) + func (o *consumer) deleteNotActive() { o.mu.Lock() if o.mset == nil { @@ -1702,12 +1712,8 @@ func (o *consumer) deleteNotActive() { // Check to make sure we went away. // Don't think this needs to be a monitored go routine. go func() { - const ( - startInterval = 30 * time.Second - maxInterval = 5 * time.Minute - ) - jitter := time.Duration(rand.Int63n(int64(startInterval))) - interval := startInterval + jitter + jitter := time.Duration(rand.Int63n(int64(consumerNotActiveStartInterval))) + interval := consumerNotActiveStartInterval + jitter ticker := time.NewTicker(interval) defer ticker.Stop() for range ticker.C { @@ -1722,7 +1728,7 @@ func (o *consumer) deleteNotActive() { if nca != nil && nca == ca { s.Warnf("Consumer assignment for '%s > %s > %s' not cleaned up, retrying", acc, stream, name) meta.ForwardProposal(removeEntry) - if interval < maxInterval { + if interval < consumerNotActiveMaxInterval { interval *= 2 ticker.Reset(interval) } diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 6928d3199ec..e43869c371b 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1406,10 +1406,6 @@ func (js *jetStream) monitorCluster() { aq.recycle(&ces) case isLeader = <-lch: - // For meta layer synchronize everyone to our state on becoming leader. - if isLeader && n.ApplyQ().len() == 0 { - n.SendSnapshot(js.metaSnapshot()) - } // Process the change. js.processLeaderChange(isLeader) if isLeader { diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 045aa5c1c79..001b114c9ee 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -1600,6 +1600,11 @@ func TestJetStreamClusterParallelConsumerCreation(t *testing.T) { } func TestJetStreamClusterGhostEphemeralsAfterRestart(t *testing.T) { + consumerNotActiveStartInterval = time.Second * 5 + defer func() { + consumerNotActiveStartInterval = defaultConsumerNotActiveStartInterval + }() + c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown() @@ -1632,6 +1637,7 @@ func TestJetStreamClusterGhostEphemeralsAfterRestart(t *testing.T) { time.Sleep(2 * time.Second) // Restart first and wait so that we know it will try cleanup without a metaleader. + // It will fail as there's no metaleader at that time, it should keep retrying on an interval. c.restartServer(rs) time.Sleep(time.Second) @@ -1643,8 +1649,9 @@ func TestJetStreamClusterGhostEphemeralsAfterRestart(t *testing.T) { defer nc.Close() subj := fmt.Sprintf(JSApiConsumerListT, "TEST") - checkFor(t, 10*time.Second, 200*time.Millisecond, func() error { - m, err := nc.Request(subj, nil, time.Second) + checkFor(t, 20*time.Second, 200*time.Millisecond, func() error { + // Request will take at most 4 seconds if some consumers can't be found. + m, err := nc.Request(subj, nil, 5*time.Second) if err != nil { return err } diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 2311638d576..eeb95924cd8 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -21,8 +21,11 @@ import ( "encoding/json" "errors" "fmt" + "io" + "io/fs" "math/rand" "os" + "path" "path/filepath" "runtime" "slices" @@ -4159,3 +4162,78 @@ func TestJetStreamClusterDesyncAfterCatchupTooManyRetries(t *testing.T) { newStreamLeaderServer := c.streamLeader(globalAccountName, "TEST") require_Equal(t, newStreamLeaderServer.Name(), clusterResetServerName) } + +func TestJetStreamClusterHardKillAfterStreamAdd(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + + copyDir := func(dst, src string) error { + srcFS := os.DirFS(src) + return fs.WalkDir(srcFS, ".", func(p string, d os.DirEntry, err error) error { + if err != nil { + return err + } + newPath := path.Join(dst, p) + if d.IsDir() { + return os.MkdirAll(newPath, defaultDirPerms) + } + r, err := srcFS.Open(p) + if err != nil { + return err + } + defer r.Close() + + w, err := os.OpenFile(newPath, os.O_CREATE|os.O_WRONLY, defaultFilePerms) + if err != nil { + return err + } + defer w.Close() + _, err = io.Copy(w, r) + return err + }) + } + + // Simulate being hard killed by: + // 1. copy directories before shutdown + copyToSrcMap := make(map[string]string) + for _, s := range c.servers { + sd := s.StoreDir() + copySd := path.Join(t.TempDir(), JetStreamStoreDir) + err = copyDir(copySd, sd) + require_NoError(t, err) + copyToSrcMap[copySd] = sd + } + + // 2. stop all + nc.Close() + c.stopAll() + + // 3. revert directories to before shutdown + for cp, dest := range copyToSrcMap { + err = os.RemoveAll(dest) + require_NoError(t, err) + err = copyDir(dest, cp) + require_NoError(t, err) + } + + // 4. restart + c.restartAll() + c.waitOnAllCurrent() + + nc, js = jsClientConnect(t, c.randomServer()) + defer nc.Close() + + // Stream should exist still and not be removed after hard killing all servers, so expect no error. + _, err = js.StreamInfo("TEST") + require_NoError(t, err) +} diff --git a/server/norace_test.go b/server/norace_test.go index 5e01b6b9ed0..98d29cb2bc0 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -6577,6 +6577,11 @@ func TestNoRaceJetStreamConsumerCreateTimeNumPending(t *testing.T) { } func TestNoRaceJetStreamClusterGhostConsumers(t *testing.T) { + consumerNotActiveStartInterval = time.Second * 5 + defer func() { + consumerNotActiveStartInterval = defaultConsumerNotActiveStartInterval + }() + c := createJetStreamClusterExplicit(t, "GHOST", 3) defer c.shutdown() @@ -6670,22 +6675,17 @@ func TestNoRaceJetStreamClusterGhostConsumers(t *testing.T) { time.Sleep(5 * time.Second) cancel() - getMissing := func() []string { - m, err := nc.Request("$JS.API.CONSUMER.LIST.TEST", nil, time.Second*10) - require_NoError(t, err) - + checkFor(t, 30*time.Second, time.Second, func() error { + m, err := nc.Request("$JS.API.CONSUMER.LIST.TEST", nil, time.Second) + if err != nil { + return err + } var resp JSApiConsumerListResponse - err = json.Unmarshal(m.Data, &resp) - require_NoError(t, err) - return resp.Missing - } - - checkFor(t, 10*time.Second, 500*time.Millisecond, func() error { - missing := getMissing() - if len(missing) == 0 { + require_NoError(t, json.Unmarshal(m.Data, &resp)) + if len(resp.Missing) == 0 { return nil } - return fmt.Errorf("Still have missing: %+v", missing) + return fmt.Errorf("Still have missing: %+v", resp.Missing) }) }