diff --git a/server/jetstream.go b/server/jetstream.go index 2213a476a74..e9f040939be 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -1023,11 +1023,11 @@ func (s *Server) shutdownJetStream() { js.accounts = nil var qch chan struct{} - + var stopped chan struct{} if cc := js.cluster; cc != nil { if cc.qch != nil { - qch = cc.qch - cc.qch = nil + qch, stopped = cc.qch, cc.stopped + cc.qch, cc.stopped = nil, nil } js.stopUpdatesSub() if cc.c != nil { @@ -1044,14 +1044,11 @@ func (s *Server) shutdownJetStream() { // We will wait for a bit for it to close. // Do this without the lock. if qch != nil { + close(qch) // Must be close() to signal *all* listeners select { - case qch <- struct{}{}: - select { - case <-qch: - case <-time.After(2 * time.Second): - s.Warnf("Did not receive signal for successful shutdown of cluster routine") - } - default: + case <-stopped: + case <-time.After(10 * time.Second): + s.Warnf("Did not receive signal for successful shutdown of cluster routine") } } } diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 87ad2c60956..c7f7f27b73e 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -70,6 +70,8 @@ type jetStreamCluster struct { peerStreamCancelMove *subscription // To pop out the monitorCluster before the raft layer. qch chan struct{} + // To notify others that monitorCluster has actually stopped. + stopped chan struct{} // Track last meta snapshot time and duration for monitoring. lastMetaSnapTime int64 // Unix nanoseconds lastMetaSnapDuration int64 // Duration in nanoseconds @@ -954,6 +956,7 @@ func (js *jetStream) setupMetaGroup() error { s: s, c: c, qch: make(chan struct{}), + stopped: make(chan struct{}), } atomic.StoreInt32(&js.clustered, 1) c.registerWithAccount(sysAcc) @@ -1190,6 +1193,16 @@ func (js *jetStream) clusterQuitC() chan struct{} { return nil } +// Return the cluster stopped chan. +func (js *jetStream) clusterStoppedC() chan struct{} { + js.mu.RLock() + defer js.mu.RUnlock() + if js.cluster != nil { + return js.cluster.stopped + } + return nil +} + // Mark that the meta layer is recovering. func (js *jetStream) setMetaRecovering() { js.mu.Lock() @@ -1346,9 +1359,10 @@ func (js *jetStream) checkForOrphans() { func (js *jetStream) monitorCluster() { s, n := js.server(), js.getMetaGroup() - qch, rqch, lch, aq := js.clusterQuitC(), n.QuitC(), n.LeadChangeC(), n.ApplyQ() + qch, stopped, rqch, lch, aq := js.clusterQuitC(), js.clusterStoppedC(), n.QuitC(), n.LeadChangeC(), n.ApplyQ() defer s.grWG.Done() + defer close(stopped) s.Debugf("Starting metadata monitor") defer s.Debugf("Exiting metadata monitor") @@ -1445,8 +1459,6 @@ func (js *jetStream) monitorCluster() { case <-qch: // Clean signal from shutdown routine so do best effort attempt to snapshot meta layer. doSnapshot(false) - // Return the signal back since shutdown will be waiting. - close(qch) return case <-aq.ch: ces := aq.pop()