diff --git a/server/consumer.go b/server/consumer.go index 308fa0e6bed..bc09a790360 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -2990,6 +2990,28 @@ func (o *consumer) isFiltered() bool { return false } +// Check if we would have matched and needed an ack for this store seq. +// This is called for interest based retention streams to remove messages. +func (o *consumer) matchAck(sseq uint64) bool { + o.mu.RLock() + defer o.mu.RUnlock() + + // Check if we are filtered, and if so check if this is even applicable to us. + if o.isFiltered() { + if o.mset == nil { + return false + } + var svp StoreMsg + if _, err := o.mset.store.LoadMsg(sseq, &svp); err != nil { + return false + } + if !o.isFilteredMatch(svp.subj) { + return false + } + } + return true +} + // Check if we need an ack for this store seq. // This is called for interest based retention streams to remove messages. func (o *consumer) needAck(sseq uint64, subj string) bool { @@ -5601,7 +5623,9 @@ func (o *consumer) checkStateForInterestStream() error { } for seq := ss.FirstSeq; asflr > 0 && seq <= asflr; seq++ { - mset.ackMsg(o, seq) + if o.matchAck(seq) { + mset.ackMsg(o, seq) + } } o.mu.RLock() diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 6add85e48a7..c97cd64838a 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -2446,3 +2446,50 @@ func TestJetStreamClusterConsumerLeak(t *testing.T) { t.Fatalf("Extra memory usage too high: %v", after.HeapInuse-before.HeapInuse) } } + +func TestJetStreamClusterWQRoundRobinSubjectRetention(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "wq_stream", + Subjects: []string{"something.>"}, + Storage: nats.FileStorage, + Retention: nats.WorkQueuePolicy, + Replicas: 3, + }) + require_NoError(t, err) + + for i := 0; i < 100; i++ { + n := (i % 5) + 1 + _, err := js.Publish(fmt.Sprintf("something.%d", n), nil) + require_NoError(t, err) + } + + sub, err := js.PullSubscribe( + "something.5", + "wq_consumer_5", + nats.BindStream("wq_stream"), + nats.ConsumerReplicas(3), + ) + require_NoError(t, err) + + for { + msgs, _ := sub.Fetch(5) + if len(msgs) == 0 { + break + } + for _, msg := range msgs { + require_NoError(t, msg.AckSync()) + } + } + + si, err := js.StreamInfo("wq_stream") + require_NoError(t, err) + require_Equal(t, si.State.Msgs, 80) + require_Equal(t, si.State.NumDeleted, 20) + require_Equal(t, si.State.NumSubjects, 4) +} diff --git a/server/stream.go b/server/stream.go index 545a1dac19c..40244f20d81 100644 --- a/server/stream.go +++ b/server/stream.go @@ -5542,92 +5542,9 @@ func (mset *stream) checkInterestState() { return } - var zeroAcks []*consumer - var lowAckFloor uint64 = math.MaxUint64 - for _, o := range mset.getConsumers() { o.checkStateForInterestStream() - - o.mu.Lock() - if o.isLeader() { - // We need to account for consumers with ack floor of zero. - // We will collect them and see if we need to check pending below. - if o.asflr == 0 { - zeroAcks = append(zeroAcks, o) - } else if o.asflr < lowAckFloor { - lowAckFloor = o.asflr - } - } else { - // We are a follower so only have the store state, so read that in. - state, err := o.store.State() - if err != nil { - // On error we will not have enough information to process correctly so bail. - o.mu.Unlock() - return - } - // We need to account for consumers with ack floor of zero. - if state.AckFloor.Stream == 0 { - zeroAcks = append(zeroAcks, o) - } else if state.AckFloor.Stream < lowAckFloor { - lowAckFloor = state.AckFloor.Stream - } - // We are a follower here but if we detect a drift from when we were previous leader correct here. - if o.asflr > state.AckFloor.Stream || o.sseq > state.Delivered.Stream+1 { - o.applyState(state) - } - } - o.mu.Unlock() - } - - // If nothing was set we can bail. - if lowAckFloor == math.MaxUint64 { - return - } - - // Capture our current state. - // ok to do so without lock. - var state StreamState - mset.store.FastState(&state) - - if lowAckFloor <= state.FirstSeq { - return - } - - // Do not want to hold stream lock if calculating numPending. - // Check if we had any zeroAcks, we will need to check them. - for _, o := range zeroAcks { - var np uint64 - o.mu.RLock() - if o.isLeader() { - np = uint64(o.numPending()) - } else { - np, _ = o.calculateNumPending() - } - o.mu.RUnlock() - // This means we have pending and can not remove anything at this time. - if np > 0 { - return - } } - - mset.mu.Lock() - defer mset.mu.Unlock() - - // Check which purge we need to perform. - if lowAckFloor <= state.LastSeq || state.Msgs == 0 { - // Purge the stream to lowest ack floor + 1 - mset.store.PurgeEx(_EMPTY_, lowAckFloor+1, 0) - } else { - // Here we have a low ack floor higher then our last seq. - // So we will just do normal purge. - mset.store.Purge() - } - - // Make sure to reset our local lseq. - mset.store.FastState(&state) - mset.lseq = state.LastSeq - // Also make sure we clear any pending acks. - mset.clearAllPreAcksBelowFloor(state.FirstSeq) } func (mset *stream) isInterestRetention() bool {