diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index aa3431ad793..5c4107e2921 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1283,10 +1283,10 @@ func (js *jetStream) monitorCluster() { } else if nb > compactSizeMin && time.Since(lastSnapTime) > minSnapDelta { doSnapshot() } - ce.ReturnToPool() } else { s.Warnf("Error applying JetStream cluster entries: %v", err) } + ce.ReturnToPool() } aq.recycle(&ces) @@ -2460,6 +2460,8 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps ne, nb = n.Applied(ce.Index) ce.ReturnToPool() } else { + // Make sure to clean up. + ce.ReturnToPool() // Our stream was closed out from underneath of us, simply return here. if err == errStreamClosed || err == errCatchupStreamStopped || err == ErrServerNotRunning { aq.recycle(&ces) @@ -4886,13 +4888,14 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { if n.NeedSnapshot() { doSnapshot(true) } - } else if err := js.applyConsumerEntries(o, ce, isLeader); err == nil { + continue + } + if err := js.applyConsumerEntries(o, ce, isLeader); err == nil { var ne, nb uint64 // We can't guarantee writes are flushed while we're shutting down. Just rely on replay during recovery. if !js.isShuttingDown() { ne, nb = n.Applied(ce.Index) } - ce.ReturnToPool() // If we have at least min entries to compact, go ahead and snapshot/compact. if nb > 0 && ne >= compactNumMin || nb > compactSizeMin { doSnapshot(false) @@ -4900,6 +4903,7 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { } else if err != errConsumerClosed { s.Warnf("Error applying consumer entries to '%s > %s'", ca.Client.serviceAccount(), ca.Name) } + ce.ReturnToPool() } aq.recycle(&ces)