Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2990,6 +2990,28 @@ func (o *consumer) isFiltered() bool {
return false
}

// Check if we would have matched and needed an ack for this store seq.
// This is called for interest based retention streams to remove messages.
func (o *consumer) matchAck(sseq uint64) bool {
o.mu.RLock()
defer o.mu.RUnlock()

// Check if we are filtered, and if so check if this is even applicable to us.
if o.isFiltered() {
if o.mset == nil {
return false
}
var svp StoreMsg
if _, err := o.mset.store.LoadMsg(sseq, &svp); err != nil {
return false
}
if !o.isFilteredMatch(svp.subj) {
return false
}
}
return true
}

// Check if we need an ack for this store seq.
// This is called for interest based retention streams to remove messages.
func (o *consumer) needAck(sseq uint64, subj string) bool {
Expand Down Expand Up @@ -5601,7 +5623,9 @@ func (o *consumer) checkStateForInterestStream() error {
}

for seq := ss.FirstSeq; asflr > 0 && seq <= asflr; seq++ {
mset.ackMsg(o, seq)
if o.matchAck(seq) {
mset.ackMsg(o, seq)
}
}

o.mu.RLock()
Expand Down
85 changes: 1 addition & 84 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -5542,98 +5542,15 @@ func (mset *stream) checkInterestState() {
return
}

var zeroAcks []*consumer
var lowAckFloor uint64 = math.MaxUint64

for _, o := range mset.getConsumers() {
o.checkStateForInterestStream()

o.mu.Lock()
if o.isLeader() {
// We need to account for consumers with ack floor of zero.
// We will collect them and see if we need to check pending below.
if o.asflr == 0 {
zeroAcks = append(zeroAcks, o)
} else if o.asflr < lowAckFloor {
lowAckFloor = o.asflr
}
} else {
// We are a follower so only have the store state, so read that in.
state, err := o.store.State()
if err != nil {
// On error we will not have enough information to process correctly so bail.
o.mu.Unlock()
return
}
// We need to account for consumers with ack floor of zero.
if state.AckFloor.Stream == 0 {
zeroAcks = append(zeroAcks, o)
} else if state.AckFloor.Stream < lowAckFloor {
lowAckFloor = state.AckFloor.Stream
}
// We are a follower here but if we detect a drift from when we were previous leader correct here.
if o.asflr > state.AckFloor.Stream || o.sseq > state.Delivered.Stream+1 {
o.applyState(state)
}
}
o.mu.Unlock()
}

// If nothing was set we can bail.
if lowAckFloor == math.MaxUint64 {
return
}

// Capture our current state.
// ok to do so without lock.
var state StreamState
mset.store.FastState(&state)

if lowAckFloor <= state.FirstSeq {
return
}

// Do not want to hold stream lock if calculating numPending.
// Check if we had any zeroAcks, we will need to check them.
for _, o := range zeroAcks {
var np uint64
o.mu.RLock()
if o.isLeader() {
np = uint64(o.numPending())
} else {
np, _ = o.calculateNumPending()
}
o.mu.RUnlock()
// This means we have pending and can not remove anything at this time.
if np > 0 {
return
}
}

mset.mu.Lock()
defer mset.mu.Unlock()

// Check which purge we need to perform.
if lowAckFloor <= state.LastSeq || state.Msgs == 0 {
// Purge the stream to lowest ack floor + 1
mset.store.PurgeEx(_EMPTY_, lowAckFloor+1, 0)
} else {
// Here we have a low ack floor higher then our last seq.
// So we will just do normal purge.
mset.store.Purge()
}

// Make sure to reset our local lseq.
mset.store.FastState(&state)
mset.lseq = state.LastSeq
// Also make sure we clear any pending acks.
mset.clearAllPreAcksBelowFloor(state.FirstSeq)
}

func (mset *stream) isInterestRetention() bool {
mset.mu.RLock()
defer mset.mu.RUnlock()
return mset.cfg.Retention == InterestPolicy
return mset.cfg.Retention != LimitsPolicy
}

// NumConsumers reports on number of active consumers for this stream.
Expand Down