diff --git a/server/consumer.go b/server/consumer.go index 653098302bf..4ad75acbd54 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -474,6 +474,7 @@ type consumer struct { dthresh time.Duration mch chan struct{} // Message channel qch chan struct{} // Quit channel + mqch chan struct{} // The monitor's quit channel. inch chan bool // Interest change channel sfreq int32 ackEventT string @@ -1125,6 +1126,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri outq: mset.outq, active: true, qch: make(chan struct{}), + mqch: make(chan struct{}), uch: make(chan struct{}, 1), mch: make(chan struct{}, 1), sfreq: int32(sampleFreq), @@ -1389,6 +1391,26 @@ func (o *consumer) setConsumerAssignment(ca *consumerAssignment) { } } +func (o *consumer) monitorQuitC() <-chan struct{} { + if o == nil { + return nil + } + o.mu.RLock() + defer o.mu.RUnlock() + return o.mqch +} + +// signalMonitorQuit signals to exit the monitor loop. If there's no Raft node, +// this will be the only way to stop the monitor goroutine. +func (o *consumer) signalMonitorQuit() { + o.mu.Lock() + defer o.mu.Unlock() + if o.mqch != nil { + close(o.mqch) + o.mqch = nil + } +} + func (o *consumer) updateC() <-chan struct{} { o.mu.RLock() defer o.mu.RUnlock() @@ -6000,6 +6022,13 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error { } o.closed = true + // Signal to the monitor loop. + // Can't use only qch here, since that's used when stepping down as a leader. + if o.mqch != nil { + close(o.mqch) + o.mqch = nil + } + // Check if we are the leader and are being deleted (as a node). if dflag && o.isLeader() { // If we are clustered and node leader (probable from above), stepdown. diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 7acc6e14a79..3c0c8c98371 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -3146,6 +3146,7 @@ func (mset *stream) resetClusteredState(err error) bool { // Need to do the rest in a separate Go routine. go func() { + mset.signalMonitorQuit() mset.monitorWg.Wait() mset.resetAndWaitOnConsumers() // Stop our stream. @@ -4159,6 +4160,7 @@ func (s *Server) removeStream(mset *stream, nsa *streamAssignment) { if !isShuttingDown { // wait for monitor to be shutdown. + mset.signalMonitorQuit() mset.monitorWg.Wait() } mset.stop(true, false) @@ -4779,6 +4781,7 @@ func (js *jetStream) processClusterDeleteStream(sa *streamAssignment, isMember, n.Delete() } // wait for monitor to be shut down + mset.signalMonitorQuit() mset.monitorWg.Wait() err = mset.stop(true, wasLeader) stopped = true @@ -5520,7 +5523,7 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { // from underneath the one that is running since it will be the same raft node. defer n.Stop() - qch, lch, aq, uch, ourPeerId := n.QuitC(), n.LeadChangeC(), n.ApplyQ(), o.updateC(), meta.ID() + qch, mqch, lch, aq, uch, ourPeerId := n.QuitC(), o.monitorQuitC(), n.LeadChangeC(), n.ApplyQ(), o.updateC(), meta.ID() s.Debugf("Starting consumer monitor for '%s > %s > %s' [%s]", o.acc.Name, ca.Stream, ca.Name, n.Group()) defer s.Debugf("Exiting consumer monitor for '%s > %s > %s' [%s]", o.acc.Name, ca.Stream, ca.Name, n.Group()) @@ -5609,6 +5612,10 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { // Server shutting down, but we might receive this before qch, so try to snapshot. doSnapshot(false) return + case <-mqch: + // Clean signal from shutdown routine so do best effort attempt to snapshot. + doSnapshot(false) + return case <-qch: // Clean signal from shutdown routine so do best effort attempt to snapshot. doSnapshot(false) diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index 72e6e677fbe..3ccee326640 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -10688,6 +10688,114 @@ func TestJetStreamClusterOfflineStreamAndConsumerStrictDecoding(t *testing.T) { require_True(t, bytes.Equal(wca.unsupportedJson, unsupportedJson)) } +func TestJetStreamClusterStreamMonitorShutdownWithoutRaftNode(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + checkFor(t, 2*time.Second, 200*time.Millisecond, func() error { + for _, s := range c.servers { + if !s.JetStreamIsStreamAssigned(globalAccountName, "TEST") { + return fmt.Errorf("stream not assigned on %s", s.Name()) + } + } + return nil + }) + + var nodes []RaftNode + for _, s := range c.servers { + mset, err := s.globalAccount().lookupStream("TEST") + require_NoError(t, err) + // Manually nil-out the node. This shouldn't happen normally, + // but tests we can shut down purely with the monitor goroutine quit channel. + mset.mu.Lock() + n := mset.node + mset.node = nil + mset.mu.Unlock() + require_NotNil(t, n) + nodes = append(nodes, n) + } + for _, n := range nodes { + require_NotEqual(t, n.State(), Closed) + } + + require_NoError(t, js.DeleteStream("TEST")) + checkFor(t, 2*time.Second, 200*time.Millisecond, func() error { + for _, n := range nodes { + if state := n.State(); state != Closed { + return fmt.Errorf("node not closed on %s: %s", n.ID(), state.String()) + } + } + return nil + }) +} + +func TestJetStreamClusterConsumerMonitorShutdownWithoutRaftNode(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: "DURABLE", + Replicas: 3, + }) + require_NoError(t, err) + checkFor(t, 2*time.Second, 200*time.Millisecond, func() error { + for _, s := range c.servers { + _, cc := s.getJetStreamCluster() + if !cc.isConsumerAssigned(s.globalAccount(), "TEST", "DURABLE") { + return fmt.Errorf("consumer not assigned on %s", s.Name()) + } + } + return nil + }) + + var nodes []RaftNode + for _, s := range c.servers { + mset, err := s.globalAccount().lookupStream("TEST") + require_NoError(t, err) + o := mset.lookupConsumer("DURABLE") + require_NotNil(t, o) + // Manually nil-out the node. This shouldn't happen normally, + // but tests we can shut down purely with the monitor goroutine quit channel. + o.mu.Lock() + n := o.node + o.node = nil + o.mu.Unlock() + require_NotNil(t, n) + nodes = append(nodes, n) + } + for _, n := range nodes { + require_NotEqual(t, n.State(), Closed) + } + + require_NoError(t, js.DeleteStream("TEST")) + checkFor(t, 2*time.Second, 200*time.Millisecond, func() error { + for _, n := range nodes { + if state := n.State(); state != Closed { + return fmt.Errorf("node not closed on %s: %s", n.ID(), state.String()) + } + } + return nil + }) +} + // // DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times) // Add at the end of jetstream_cluster__test.go, with being the highest value. diff --git a/server/stream.go b/server/stream.go index a8201b23ba0..aac2333ae17 100644 --- a/server/stream.go +++ b/server/stream.go @@ -1018,6 +1018,17 @@ func (mset *stream) monitorQuitC() <-chan struct{} { return mset.mqch } +// signalMonitorQuit signals to exit the monitor loop. If there's no Raft node, +// this will be the only way to stop the monitor goroutine. +func (mset *stream) signalMonitorQuit() { + mset.mu.Lock() + defer mset.mu.Unlock() + if mset.mqch != nil { + close(mset.mqch) + mset.mqch = nil + } +} + func (mset *stream) updateC() <-chan struct{} { if mset == nil { return nil @@ -6790,6 +6801,7 @@ func (mset *stream) resetAndWaitOnConsumers() { node.Stop() } if o.isMonitorRunning() { + o.signalMonitorQuit() o.monitorWg.Wait() } } @@ -6881,6 +6893,7 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { // but should we log? o.stopWithFlags(deleteFlag, deleteFlag, false, advisory) if !isShuttingDown { + o.signalMonitorQuit() o.monitorWg.Wait() } }