diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index a71b73b0cd3..a40e9590cd1 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -2623,11 +2623,19 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps return case <-mqch: // Clean signal from shutdown routine so do best effort attempt to snapshot. - doSnapshot() + // Don't snapshot if not shutting down, monitor goroutine could be going away + // on a scale down or a remove for example. + if s.isShuttingDown() { + doSnapshot() + } return case <-qch: // Clean signal from shutdown routine so do best effort attempt to snapshot. - doSnapshot() + // Don't snapshot if not shutting down, Raft node could be going away on a + // scale down or remove for example. + if s.isShuttingDown() { + doSnapshot() + } return case <-aq.ch: var ne, nb uint64 @@ -5521,11 +5529,19 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { return case <-mqch: // Clean signal from shutdown routine so do best effort attempt to snapshot. - doSnapshot(false) + // Don't snapshot if not shutting down, monitor goroutine could be going away + // on a scale down or a remove for example. + if s.isShuttingDown() { + doSnapshot(false) + } return case <-qch: // Clean signal from shutdown routine so do best effort attempt to snapshot. - doSnapshot(false) + // Don't snapshot if not shutting down, Raft node could be going away on a + // scale down or remove for example. + if s.isShuttingDown() { + doSnapshot(false) + } return case <-aq.ch: ces := aq.pop() diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index 8b2e54cb168..80257d0f163 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -9234,13 +9234,11 @@ func TestJetStreamClusterAsyncFlushFileStoreFlushOnSnapshot(t *testing.T) { // Confirm above write is pending. require_Equal(t, lmb.pendingWriteSize(), 33) - // Stop stream monitor routine, which will install a snapshot on shutdown. - mset.mu.Lock() - if mset.mqch != nil { - close(mset.mqch) - mset.mqch = nil - } - mset.mu.Unlock() + // Make the upper layer snapshot by sending leader change signal. + // It doesn't matter that we're already leader, it still gets handled. + // Previously this used the mqch, but that now only snapshots on shutdown. + n.leadc <- true + checkFor(t, 2*time.Second, 200*time.Millisecond, func() error { n.Lock() snap, err := n.loadLastSnapshot()