Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 7 additions & 10 deletions server/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -1023,11 +1023,11 @@ func (s *Server) shutdownJetStream() {
js.accounts = nil

var qch chan struct{}

var stopped chan struct{}
if cc := js.cluster; cc != nil {
if cc.qch != nil {
qch = cc.qch
cc.qch = nil
qch, stopped = cc.qch, cc.stopped
cc.qch, cc.stopped = nil, nil
}
js.stopUpdatesSub()
if cc.c != nil {
Expand All @@ -1044,14 +1044,11 @@ func (s *Server) shutdownJetStream() {
// We will wait for a bit for it to close.
// Do this without the lock.
if qch != nil {
close(qch) // Must be close() to signal *all* listeners
select {
case qch <- struct{}{}:
select {
case <-qch:
case <-time.After(2 * time.Second):
s.Warnf("Did not receive signal for successful shutdown of cluster routine")
}
default:
case <-stopped:
case <-time.After(10 * time.Second):
s.Warnf("Did not receive signal for successful shutdown of cluster routine")
}
}
}
Expand Down
18 changes: 15 additions & 3 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ type jetStreamCluster struct {
peerStreamCancelMove *subscription
// To pop out the monitorCluster before the raft layer.
qch chan struct{}
// To notify others that monitorCluster has actually stopped.
stopped chan struct{}
// Track last meta snapshot time and duration for monitoring.
lastMetaSnapTime int64 // Unix nanoseconds
lastMetaSnapDuration int64 // Duration in nanoseconds
Expand Down Expand Up @@ -954,6 +956,7 @@ func (js *jetStream) setupMetaGroup() error {
s: s,
c: c,
qch: make(chan struct{}),
stopped: make(chan struct{}),
}
atomic.StoreInt32(&js.clustered, 1)
c.registerWithAccount(sysAcc)
Expand Down Expand Up @@ -1190,6 +1193,16 @@ func (js *jetStream) clusterQuitC() chan struct{} {
return nil
}

// Return the cluster stopped chan.
func (js *jetStream) clusterStoppedC() chan struct{} {
js.mu.RLock()
defer js.mu.RUnlock()
if js.cluster != nil {
return js.cluster.stopped
}
return nil
}

// Mark that the meta layer is recovering.
func (js *jetStream) setMetaRecovering() {
js.mu.Lock()
Expand Down Expand Up @@ -1346,9 +1359,10 @@ func (js *jetStream) checkForOrphans() {

func (js *jetStream) monitorCluster() {
s, n := js.server(), js.getMetaGroup()
qch, rqch, lch, aq := js.clusterQuitC(), n.QuitC(), n.LeadChangeC(), n.ApplyQ()
qch, stopped, rqch, lch, aq := js.clusterQuitC(), js.clusterStoppedC(), n.QuitC(), n.LeadChangeC(), n.ApplyQ()

defer s.grWG.Done()
defer close(stopped)

s.Debugf("Starting metadata monitor")
defer s.Debugf("Exiting metadata monitor")
Expand Down Expand Up @@ -1445,8 +1459,6 @@ func (js *jetStream) monitorCluster() {
case <-qch:
// Clean signal from shutdown routine so do best effort attempt to snapshot meta layer.
doSnapshot(false)
// Return the signal back since shutdown will be waiting.
close(qch)
return
case <-aq.ch:
ces := aq.pop()
Expand Down