diff --git a/server/consumer.go b/server/consumer.go index 0ffd72af57a..1322384e600 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1821,6 +1821,8 @@ var ( consumerNotActiveMaxInterval = defaultConsumerNotActiveMaxInterval ) +// deleteNotActive must only be called from time.AfterFunc or in its own +// goroutine, as it can block on clean-up. func (o *consumer) deleteNotActive() { o.mu.Lock() if o.mset == nil { @@ -1863,6 +1865,16 @@ func (o *consumer) deleteNotActive() { acc, stream, name, isDirect := o.acc.Name, o.stream, o.name, o.cfg.Direct o.mu.Unlock() + // Useful for pprof. + setGoRoutineLabels(pprofLabels{ + "account": acc, + "stream": stream, + "consumer": name, + }) + + // We will delete locally regardless. + defer o.delete() + // If we are clustered, check if we still have this consumer assigned. // If we do forward a proposal to delete ourselves to the metacontroller leader. if !isDirect && s.JetStreamIsClustered() { @@ -1885,38 +1897,33 @@ func (o *consumer) deleteNotActive() { if ca != nil && cc != nil { // Check to make sure we went away. // Don't think this needs to be a monitored go routine. - go func() { - jitter := time.Duration(rand.Int63n(int64(consumerNotActiveStartInterval))) - interval := consumerNotActiveStartInterval + jitter - ticker := time.NewTicker(interval) - defer ticker.Stop() - for range ticker.C { - js.mu.RLock() - if js.shuttingDown { - js.mu.RUnlock() - return - } - nca := js.consumerAssignment(acc, stream, name) + jitter := time.Duration(rand.Int63n(int64(consumerNotActiveStartInterval))) + interval := consumerNotActiveStartInterval + jitter + ticker := time.NewTicker(interval) + defer ticker.Stop() + for range ticker.C { + js.mu.RLock() + if js.shuttingDown { js.mu.RUnlock() - // Make sure this is not a new consumer with the same name. - if nca != nil && nca == ca { - s.Warnf("Consumer assignment for '%s > %s > %s' not cleaned up, retrying", acc, stream, name) - meta.ForwardProposal(removeEntry) - if interval < consumerNotActiveMaxInterval { - interval *= 2 - ticker.Reset(interval) - } - continue - } - // We saw that consumer has been removed, all done. return } - }() + nca := js.consumerAssignment(acc, stream, name) + js.mu.RUnlock() + // Make sure this is not a new consumer with the same name. + if nca != nil && nca == ca { + s.Warnf("Consumer assignment for '%s > %s > %s' not cleaned up, retrying", acc, stream, name) + meta.ForwardProposal(removeEntry) + if interval < consumerNotActiveMaxInterval { + interval *= 2 + ticker.Reset(interval) + } + continue + } + // We saw that consumer has been removed, all done. + return + } } } - - // We will delete here regardless. - o.delete() } func (o *consumer) watchGWinterest() {