diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index e5de479eb9c..c54ab05cf4b 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -2415,7 +2415,6 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps compactInterval = 2 * time.Minute compactSizeMin = 8 * 1024 * 1024 compactNumMin = 65536 - minSnapDelta = 10 * time.Second ) // Spread these out for large numbers on server restart. @@ -2439,16 +2438,15 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps // a complete and detailed state which could be costly in terms of memory, cpu and GC. // This only entails how many messages, and the first and last sequence of the stream. // This is all that is needed to detect a change, and we can get this from FilteredState() - // with and empty filter. + // with an empty filter. var lastState SimpleState - var lastSnapTime time.Time // Don't allow the upper layer to install snapshots until we have // fully recovered from disk. isRecovering := true doSnapshot := func() { - if mset == nil || isRecovering || isRestore || time.Since(lastSnapTime) < minSnapDelta { + if mset == nil || isRecovering || isRestore { return } @@ -2466,7 +2464,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps } if err := n.InstallSnapshot(mset.stateSnapshot()); err == nil { - lastState, lastSnapTime = curState, time.Now() + lastState = curState } else if err != errNoSnapAvailable && err != errNodeClosed && err != errCatchupsRunning { s.RateLimitWarnf("Failed to install snapshot for '%s > %s' [%s]: %v", mset.acc.Name, mset.name(), n.Group(), err) } diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index 49c35befff6..cd1c15642e0 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -6945,45 +6945,6 @@ func TestJetStreamClusterConsumerInfoAfterCreate(t *testing.T) { require_NoError(t, err) } -func TestJetStreamClusterDontSnapshotTooOften(t *testing.T) { - c := createJetStreamClusterExplicit(t, "R3S", 3) - defer c.shutdown() - - nc, js := jsClientConnect(t, c.randomServer()) - defer nc.Close() - - _, err := js.AddStream(&nats.StreamConfig{ - Name: "TEST", - Subjects: []string{"foo"}, - Replicas: 3, - }) - require_NoError(t, err) - - // We force the snapshot compact size to hit multiple times. - // But, we should not be making snapshots too often since that would degrade performance. - data := make([]byte, 1024*1024) // 1MB payload - _, err = crand.Read(data) - require_NoError(t, err) - for i := 0; i < 50; i++ { - // We do synchronous publishes so we're more likely to have entries pass through the apply queue. - _, err = js.Publish("foo", data) - require_NoError(t, err) - } - - for _, s := range c.servers { - acc, err := s.lookupAccount(globalAccountName) - require_NoError(t, err) - mset, err := acc.lookupStream("TEST") - require_NoError(t, err) - snap, err := mset.node.(*raft).loadLastSnapshot() - require_NoError(t, err) - // This measure is not exact and more of a side effect. - // We expect one snapshot to be made pretty soon and to be on cooldown after. - // So no snapshots should be made after that. - require_LessThan(t, snap.lastIndex, 20) - } -} - // // DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times) // Add at the end of jetstream_cluster__test.go, with being the highest value.