From 143c8669cd6cff31cc25404dfcb813466c3a6829 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 24 Jul 2024 23:07:25 -0700 Subject: [PATCH] When checking consumer state for interest policy streams, make sure we match if we are filtered. Signed-off-by: Derek Collison --- server/consumer.go | 26 +++++++++++++- server/stream.go | 85 +--------------------------------------------- 2 files changed, 26 insertions(+), 85 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 308fa0e6bed..bc09a790360 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -2990,6 +2990,28 @@ func (o *consumer) isFiltered() bool { return false } +// Check if we would have matched and needed an ack for this store seq. +// This is called for interest based retention streams to remove messages. +func (o *consumer) matchAck(sseq uint64) bool { + o.mu.RLock() + defer o.mu.RUnlock() + + // Check if we are filtered, and if so check if this is even applicable to us. + if o.isFiltered() { + if o.mset == nil { + return false + } + var svp StoreMsg + if _, err := o.mset.store.LoadMsg(sseq, &svp); err != nil { + return false + } + if !o.isFilteredMatch(svp.subj) { + return false + } + } + return true +} + // Check if we need an ack for this store seq. // This is called for interest based retention streams to remove messages. func (o *consumer) needAck(sseq uint64, subj string) bool { @@ -5601,7 +5623,9 @@ func (o *consumer) checkStateForInterestStream() error { } for seq := ss.FirstSeq; asflr > 0 && seq <= asflr; seq++ { - mset.ackMsg(o, seq) + if o.matchAck(seq) { + mset.ackMsg(o, seq) + } } o.mu.RLock() diff --git a/server/stream.go b/server/stream.go index 75cd2581733..40244f20d81 100644 --- a/server/stream.go +++ b/server/stream.go @@ -5542,98 +5542,15 @@ func (mset *stream) checkInterestState() { return } - var zeroAcks []*consumer - var lowAckFloor uint64 = math.MaxUint64 - for _, o := range mset.getConsumers() { o.checkStateForInterestStream() - - o.mu.Lock() - if o.isLeader() { - // We need to account for consumers with ack floor of zero. - // We will collect them and see if we need to check pending below. - if o.asflr == 0 { - zeroAcks = append(zeroAcks, o) - } else if o.asflr < lowAckFloor { - lowAckFloor = o.asflr - } - } else { - // We are a follower so only have the store state, so read that in. - state, err := o.store.State() - if err != nil { - // On error we will not have enough information to process correctly so bail. - o.mu.Unlock() - return - } - // We need to account for consumers with ack floor of zero. - if state.AckFloor.Stream == 0 { - zeroAcks = append(zeroAcks, o) - } else if state.AckFloor.Stream < lowAckFloor { - lowAckFloor = state.AckFloor.Stream - } - // We are a follower here but if we detect a drift from when we were previous leader correct here. - if o.asflr > state.AckFloor.Stream || o.sseq > state.Delivered.Stream+1 { - o.applyState(state) - } - } - o.mu.Unlock() - } - - // If nothing was set we can bail. - if lowAckFloor == math.MaxUint64 { - return - } - - // Capture our current state. - // ok to do so without lock. - var state StreamState - mset.store.FastState(&state) - - if lowAckFloor <= state.FirstSeq { - return - } - - // Do not want to hold stream lock if calculating numPending. - // Check if we had any zeroAcks, we will need to check them. - for _, o := range zeroAcks { - var np uint64 - o.mu.RLock() - if o.isLeader() { - np = uint64(o.numPending()) - } else { - np, _ = o.calculateNumPending() - } - o.mu.RUnlock() - // This means we have pending and can not remove anything at this time. - if np > 0 { - return - } } - - mset.mu.Lock() - defer mset.mu.Unlock() - - // Check which purge we need to perform. - if lowAckFloor <= state.LastSeq || state.Msgs == 0 { - // Purge the stream to lowest ack floor + 1 - mset.store.PurgeEx(_EMPTY_, lowAckFloor+1, 0) - } else { - // Here we have a low ack floor higher then our last seq. - // So we will just do normal purge. - mset.store.Purge() - } - - // Make sure to reset our local lseq. - mset.store.FastState(&state) - mset.lseq = state.LastSeq - // Also make sure we clear any pending acks. - mset.clearAllPreAcksBelowFloor(state.FirstSeq) } func (mset *stream) isInterestRetention() bool { mset.mu.RLock() defer mset.mu.RUnlock() - return mset.cfg.Retention == InterestPolicy + return mset.cfg.Retention != LimitsPolicy } // NumConsumers reports on number of active consumers for this stream.