diff --git a/server/consumer.go b/server/consumer.go index a9ad0ebc417..1bf94e0d90c 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -4901,17 +4901,26 @@ func (o *consumer) hasNoLocalInterest() bool { // This is when the underlying stream has been purged. // sseq is the new first seq for the stream after purge. -// Lock should be held. -func (o *consumer) purge(sseq uint64, slseq uint64) { +// Lock should NOT be held. +func (o *consumer) purge(sseq uint64, slseq uint64, isWider bool) { // Do not update our state unless we know we are the leader. if !o.isLeader() { return } // Signals all have been purged for this consumer. - if sseq == 0 { + if sseq == 0 && !isWider { sseq = slseq + 1 } + var store StreamStore + if isWider { + o.mu.RLock() + if o.mset != nil { + store = o.mset.store + } + o.mu.RUnlock() + } + o.mu.Lock() // Do not go backwards if o.sseq < sseq { @@ -4920,7 +4929,6 @@ func (o *consumer) purge(sseq uint64, slseq uint64) { if o.asflr < sseq { o.asflr = sseq - 1 - // We need to remove those no longer relevant from pending. for seq, p := range o.pending { if seq <= o.asflr { @@ -4934,8 +4942,24 @@ func (o *consumer) purge(sseq uint64, slseq uint64) { delete(o.rdc, seq) // rdq handled below. } + if isWider && store != nil { + // Our filtered subject, which could be all, is wider than the underlying purge. + // We need to check if the pending items left are still valid. + var smv StoreMsg + if _, err := store.LoadMsg(seq, &smv); err == errDeletedMsg || err == ErrStoreMsgNotFound { + if p.Sequence > o.adflr { + o.adflr = p.Sequence + if o.adflr > o.dseq { + o.dseq = o.adflr + } + } + delete(o.pending, seq) + delete(o.rdc, seq) + } + } } } + // This means we can reset everything at this point. if len(o.pending) == 0 { o.pending, o.rdc = nil, nil diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 98d234f7f87..8ed2b3e7331 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -13534,6 +13534,13 @@ func TestJetStreamPurgeAndFilteredConsumers(t *testing.T) { t.Fatalf("Expected NumPending to be 10, got %d", ci.NumPending) } + // Also check unfiltered with interleaving messages. + _, err = js.AddConsumer("S", &nats.ConsumerConfig{ + Durable: "all", + AckPolicy: nats.AckExplicitPolicy, + }) + require_NoError(t, err) + // Now purge only adam. jr, _ := json.Marshal(&JSApiStreamPurgeRequest{Subject: "FOO.adam"}) _, err = nc.Request(fmt.Sprintf(JSApiStreamPurgeT, "S"), jr, time.Second) @@ -13559,6 +13566,12 @@ func TestJetStreamPurgeAndFilteredConsumers(t *testing.T) { if ci.AckFloor.Stream != 20 { t.Fatalf("Expected AckFloor for stream to be 20, got %d", ci.AckFloor.Stream) } + + ci, err = js.ConsumerInfo("S", "all") + require_NoError(t, err) + if ci.NumPending != 10 { + t.Fatalf("Expected NumPending to be 10, got %d", ci.NumPending) + } } // Issue #2662 @@ -22325,3 +22338,67 @@ func TestJetStreamConsumerNakThenAckFloorMove(t *testing.T) { require_Equal(t, ci.AckFloor.Stream, 11) require_Equal(t, ci.NumAckPending, 0) } + +func TestJetStreamSubjectFilteredPurgeClearsPendingAcks(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo", "bar"}, + }) + require_NoError(t, err) + + for i := 0; i < 5; i++ { + js.Publish("foo", []byte("OK")) + js.Publish("bar", []byte("OK")) + } + + // Note that there are no subject filters here, this is deliberate + // as previously the purge with filter code was checking for them. + // We want to prove that unfiltered consumers also get purged. + ci, err := js.AddConsumer("TEST", &nats.ConsumerConfig{ + Name: "my_consumer", + AckPolicy: nats.AckExplicitPolicy, + MaxAckPending: 10, + }) + require_NoError(t, err) + require_Equal(t, ci.NumPending, 10) + require_Equal(t, ci.NumAckPending, 0) + + sub, err := js.PullSubscribe(">", "", nats.Bind("TEST", "my_consumer")) + require_NoError(t, err) + + msgs, err := sub.Fetch(10) + require_NoError(t, err) + require_Len(t, len(msgs), 10) + + ci, err = js.ConsumerInfo("TEST", "my_consumer") + require_NoError(t, err) + require_Equal(t, ci.NumPending, 0) + require_Equal(t, ci.NumAckPending, 10) + + require_NoError(t, js.PurgeStream("TEST", &nats.StreamPurgeRequest{ + Subject: "foo", + })) + + ci, err = js.ConsumerInfo("TEST", "my_consumer") + require_NoError(t, err) + require_Equal(t, ci.NumPending, 0) + require_Equal(t, ci.NumAckPending, 5) + + for i := 0; i < 5; i++ { + js.Publish("foo", []byte("OK")) + } + msgs, err = sub.Fetch(5) + require_NoError(t, err) + require_Len(t, len(msgs), 5) + + ci, err = js.ConsumerInfo("TEST", "my_consumer") + require_NoError(t, err) + require_Equal(t, ci.NumPending, 0) + require_Equal(t, ci.NumAckPending, 10) +} diff --git a/server/stream.go b/server/stream.go index df27e4b1a36..5170b58fcd8 100644 --- a/server/stream.go +++ b/server/stream.go @@ -2003,12 +2003,13 @@ func (mset *stream) purge(preq *JSApiStreamPurgeRequest) (purged uint64, err err // Purge consumers. // Check for filtered purge. if preq != nil && preq.Subject != _EMPTY_ { - ss := store.FilteredState(state.FirstSeq, preq.Subject) + ss := store.FilteredState(fseq, preq.Subject) fseq = ss.First } mset.clsMu.RLock() for _, o := range mset.cList { + start := fseq o.mu.RLock() // we update consumer sequences if: // no subject was specified, we can purge all consumers sequences @@ -2018,10 +2019,15 @@ func (mset *stream) purge(preq *JSApiStreamPurgeRequest) (purged uint64, err err // or consumer filter subject is subset of purged subject, // but not the other way around. o.isEqualOrSubsetMatch(preq.Subject) + // Check if a consumer has a wider subject space then what we purged + var isWider bool + if !doPurge && preq != nil && o.isFilteredMatch(preq.Subject) { + doPurge, isWider = true, true + start = state.FirstSeq + } o.mu.RUnlock() if doPurge { - o.purge(fseq, lseq) - + o.purge(start, lseq, isWider) } } mset.clsMu.RUnlock()