diff --git a/server/consumer.go b/server/consumer.go index c22b16c3382..9826b39fe0e 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1077,7 +1077,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri // If we have multiple filter subjects, create a sublist which we will use // in calling store.LoadNextMsgMulti. if len(o.cfg.FilterSubjects) > 0 { - o.filters = NewSublistWithCache() + o.filters = NewSublistNoCache() for _, filter := range o.cfg.FilterSubjects { o.filters.Insert(&subscription{subject: []byte(filter)}) } @@ -2202,7 +2202,7 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error { if len(o.subjf) == 1 { o.filters = nil } else { - o.filters = NewSublistWithCache() + o.filters = NewSublistNoCache() for _, filter := range o.subjf { o.filters.Insert(&subscription{subject: []byte(filter.subject)}) } @@ -4061,7 +4061,7 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) { // Check if we are multi-filtered or not. if filters != nil { sm, sseq, err = store.LoadNextMsgMulti(filters, fseq, &pmsg.StoreMsg) - } else if subjf != nil { // Means single filtered subject since o.filters means > 1. + } else if len(subjf) > 0 { // Means single filtered subject since o.filters means > 1. filter, wc := subjf[0].subject, subjf[0].hasWildcard sm, sseq, err = store.LoadNextMsg(filter, wc, fseq, &pmsg.StoreMsg) } else { @@ -4730,37 +4730,15 @@ func (o *consumer) calculateNumPending() (npc, npf uint64) { } isLastPerSubject := o.cfg.DeliverPolicy == DeliverLastPerSubject + filters, subjf := o.filters, o.subjf - // Deliver Last Per Subject calculates num pending differently. - if isLastPerSubject { - // Consumer without filters. - if o.subjf == nil { - return o.mset.store.NumPending(o.sseq, _EMPTY_, isLastPerSubject) - } - // Consumer with filters. - for _, filter := range o.subjf { - lnpc, lnpf := o.mset.store.NumPending(o.sseq, filter.subject, isLastPerSubject) - npc += lnpc - if lnpf > npf { - npf = lnpf // Always last - } - } - return npc, npf - } - // Every other Delivery Policy is handled here. - // Consumer without filters. - if o.subjf == nil { - return o.mset.store.NumPending(o.sseq, _EMPTY_, false) - } - // Consumer with filters. - for _, filter := range o.subjf { - lnpc, lnpf := o.mset.store.NumPending(o.sseq, filter.subject, false) - npc += lnpc - if lnpf > npf { - npf = lnpf // Always last - } + if filters != nil { + return o.mset.store.NumPendingMulti(o.sseq, filters, isLastPerSubject) + } else if len(subjf) > 0 { + filter := subjf[0].subject + return o.mset.store.NumPending(o.sseq, filter, isLastPerSubject) } - return npc, npf + return o.mset.store.NumPending(o.sseq, _EMPTY_, isLastPerSubject) } func convertToHeadersOnly(pmsg *jsPubMsg) { diff --git a/server/filestore.go b/server/filestore.go index c23e6ce9bee..0a04a7c4913 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -2932,7 +2932,9 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool) _tsa, _fsa := [32]string{}, [32]string{} tsa, fsa := _tsa[:0], _fsa[:0] - fsa = tokenizeSubjectIntoSlice(fsa[:0], filter) + if wc { + fsa = tokenizeSubjectIntoSlice(fsa[:0], filter) + } isMatch := func(subj string) bool { if isAll { @@ -3026,7 +3028,6 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool) mb := fs.blks[i] // Hold write lock in case we need to load cache. mb.mu.Lock() - var t uint64 if isAll && sseq <= atomic.LoadUint64(&mb.first.seq) { total += mb.msgs mb.mu.Unlock() @@ -3041,6 +3042,7 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool) // Mark fss activity. mb.lsts = time.Now().UnixNano() + var t uint64 var havePartial bool mb.fss.Match(stringToBytes(filter), func(bsubj []byte, ss *SimpleState) { if havePartial { @@ -3068,8 +3070,12 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool) } // Clear on partial. t = 0 + start := sseq + if fseq := atomic.LoadUint64(&mb.first.seq); fseq > start { + start = fseq + } var smv StoreMsg - for seq, lseq := sseq, atomic.LoadUint64(&mb.last.seq); seq <= lseq; seq++ { + for seq, lseq := start, atomic.LoadUint64(&mb.last.seq); seq <= lseq; seq++ { if sm, _ := mb.cacheLookup(seq, &smv); sm != nil && isMatch(sm.subj) { t++ } @@ -3174,6 +3180,300 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool) return total, validThrough } +// NumPending will return the number of pending messages matching any subject in the sublist starting at sequence. +// Optimized for stream num pending calculations for consumers with lots of filtered subjects. +// Subjects should not overlap, this property is held when doing multi-filtered consumers. +func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bool) (total, validThrough uint64) { + fs.mu.RLock() + defer fs.mu.RUnlock() + + // This can always be last for these purposes. + validThrough = fs.state.LastSeq + + if fs.state.Msgs == 0 || sseq > fs.state.LastSeq { + return 0, validThrough + } + + // If sseq is less then our first set to first. + if sseq < fs.state.FirstSeq { + sseq = fs.state.FirstSeq + } + // Track starting for both block for the sseq and staring block that matches any subject. + var seqStart int + // See if we need to figure out starting block per sseq. + if sseq > fs.state.FirstSeq { + // This should not, but can return -1, so make sure we check to avoid panic below. + if seqStart, _ = fs.selectMsgBlockWithIndex(sseq); seqStart < 0 { + seqStart = 0 + } + } + + isAll := sl == nil + + // See if filter was provided but its the only subject. + if !isAll && fs.psim.Size() == 1 { + fs.psim.Iter(func(subject []byte, _ *psi) bool { + isAll = sl.HasInterest(bytesToString(subject)) + return true + }) + } + // If we are isAll and have no deleted we can do a simpler calculation. + if !lastPerSubject && isAll && (fs.state.LastSeq-fs.state.FirstSeq+1) == fs.state.Msgs { + if sseq == 0 { + return fs.state.Msgs, validThrough + } + return fs.state.LastSeq - sseq + 1, validThrough + } + // Setup the isMatch function. + isMatch := func(subj string) bool { + if isAll { + return true + } + return sl.HasInterest(subj) + } + + // Handle last by subject a bit differently. + // We will scan PSIM since we accurately track the last block we have seen the subject in. This + // allows us to only need to load at most one block now. + // For the last block, we need to track the subjects that we know are in that block, and track seen + // while in the block itself, but complexity there worth it. + if lastPerSubject { + // If we want all and our start sequence is equal or less than first return number of subjects. + if isAll && sseq <= fs.state.FirstSeq { + return uint64(fs.psim.Size()), validThrough + } + // If we are here we need to scan. We are going to scan the PSIM looking for lblks that are >= seqStart. + // This will build up a list of all subjects from the selected block onward. + lbm := make(map[string]bool) + mb := fs.blks[seqStart] + bi := mb.index + + subs := make([]*subscription, 0, sl.Count()) + sl.All(&subs) + for _, sub := range subs { + fs.psim.Match(sub.subject, func(subj []byte, psi *psi) { + // If the select blk start is greater than entry's last blk skip. + if bi > psi.lblk { + return + } + total++ + // We will track the subjects that are an exact match to the last block. + // This is needed for last block processing. + if psi.lblk == bi { + lbm[string(subj)] = true + } + }) + } + + // Now check if we need to inspect the seqStart block. + // Grab write lock in case we need to load in msgs. + mb.mu.Lock() + var shouldExpire bool + // We need to walk this block to correct accounting from above. + if sseq > mb.first.seq { + // Track the ones we add back in case more than one. + seen := make(map[string]bool) + // We need to discount the total by subjects seen before sseq, but also add them right back in if they are >= sseq for this blk. + // This only should be subjects we know have the last blk in this block. + if mb.cacheNotLoaded() { + mb.loadMsgsWithLock() + shouldExpire = true + } + var smv StoreMsg + for seq, lseq := atomic.LoadUint64(&mb.first.seq), atomic.LoadUint64(&mb.last.seq); seq <= lseq; seq++ { + sm, _ := mb.cacheLookup(seq, &smv) + if sm == nil || sm.subj == _EMPTY_ || !lbm[sm.subj] { + continue + } + if isMatch(sm.subj) { + // If less than sseq adjust off of total as long as this subject matched the last block. + if seq < sseq { + if !seen[sm.subj] { + total-- + seen[sm.subj] = true + } + } else if seen[sm.subj] { + // This is equal or more than sseq, so add back in. + total++ + // Make sure to not process anymore. + delete(seen, sm.subj) + } + } + } + } + // If we loaded the block try to force expire. + if shouldExpire { + mb.tryForceExpireCacheLocked() + } + mb.mu.Unlock() + return total, validThrough + } + + // If we would need to scan more from the beginning, revert back to calculating directly here. + if seqStart >= (len(fs.blks) / 2) { + for i := seqStart; i < len(fs.blks); i++ { + var shouldExpire bool + mb := fs.blks[i] + // Hold write lock in case we need to load cache. + mb.mu.Lock() + if isAll && sseq <= atomic.LoadUint64(&mb.first.seq) { + total += mb.msgs + mb.mu.Unlock() + continue + } + // If we are here we need to at least scan the subject fss. + // Make sure we have fss loaded. + if mb.fssNotLoaded() { + mb.loadMsgsWithLock() + shouldExpire = true + } + // Mark fss activity. + mb.lsts = time.Now().UnixNano() + + var t uint64 + var havePartial bool + mb.fss.Iter(func(bsubj []byte, ss *SimpleState) bool { + subj := bytesToString(bsubj) + if havePartial || !sl.HasInterest(subj) { + // If we already found a partial then don't do anything else. + return !havePartial + } + if ss.firstNeedsUpdate { + mb.recalculateFirstForSubj(subj, ss.First, ss) + } + if sseq <= ss.First { + t += ss.Msgs + } else if sseq <= ss.Last { + // We matched but its a partial. + havePartial = true + } + return !havePartial + }) + + // See if we need to scan msgs here. + if havePartial { + // Make sure we have the cache loaded. + if mb.cacheNotLoaded() { + mb.loadMsgsWithLock() + shouldExpire = true + } + // Clear on partial. + t = 0 + start := sseq + if fseq := atomic.LoadUint64(&mb.first.seq); fseq > start { + start = fseq + } + var smv StoreMsg + for seq, lseq := start, atomic.LoadUint64(&mb.last.seq); seq <= lseq; seq++ { + if sm, _ := mb.cacheLookup(seq, &smv); sm != nil && isMatch(sm.subj) { + t++ + } + } + } + // If we loaded this block for this operation go ahead and expire it here. + if shouldExpire { + mb.tryForceExpireCacheLocked() + } + mb.mu.Unlock() + total += t + } + return total, validThrough + } + + // If we are here it's better to calculate totals from psim and adjust downward by scanning less blocks. + start := uint32(math.MaxUint32) + subs := make([]*subscription, 0, sl.Count()) + sl.All(&subs) + for _, sub := range subs { + fs.psim.Match(sub.subject, func(_ []byte, psi *psi) { + total += psi.total + // Keep track of start index for this subject. + if psi.fblk < start { + start = psi.fblk + } + }) + } + // See if we were asked for all, if so we are done. + if sseq <= fs.state.FirstSeq { + return total, validThrough + } + + // If we are here we need to calculate partials for the first blocks. + firstSubjBlk := fs.bim[start] + var firstSubjBlkFound bool + // Adjust in case not found. + if firstSubjBlk == nil { + firstSubjBlkFound = true + } + + // Track how many we need to adjust against the total. + var adjust uint64 + for i := 0; i <= seqStart; i++ { + mb := fs.blks[i] + // We can skip blks if we know they are below the first one that has any subject matches. + if !firstSubjBlkFound { + if firstSubjBlkFound = (mb == firstSubjBlk); !firstSubjBlkFound { + continue + } + } + // We need to scan this block. + var shouldExpire bool + mb.mu.Lock() + // Check if we should include all of this block in adjusting. If so work with metadata. + if sseq > atomic.LoadUint64(&mb.last.seq) { + if isAll { + adjust += mb.msgs + } else { + // We need to adjust for all matches in this block. + // Make sure we have fss loaded. This loads whole block now. + if mb.fssNotLoaded() { + mb.loadMsgsWithLock() + shouldExpire = true + } + // Mark fss activity. + mb.lsts = time.Now().UnixNano() + mb.fss.Iter(func(bsubj []byte, ss *SimpleState) bool { + if sl.HasInterest(bytesToString(bsubj)) { + adjust += ss.Msgs + } + return true + }) + } + } else { + // This is the last block. We need to scan per message here. + if mb.cacheNotLoaded() { + mb.loadMsgsWithLock() + shouldExpire = true + } + var last = atomic.LoadUint64(&mb.last.seq) + if sseq < last { + last = sseq + } + // We need to walk all messages in this block + var smv StoreMsg + for seq := atomic.LoadUint64(&mb.first.seq); seq < last; seq++ { + sm, _ := mb.cacheLookup(seq, &smv) + if sm == nil || sm.subj == _EMPTY_ { + continue + } + // Check if it matches our filter. + if sm.seq < sseq && isMatch(sm.subj) { + adjust++ + } + } + } + // If we loaded the block try to force expire. + if shouldExpire { + mb.tryForceExpireCacheLocked() + } + mb.mu.Unlock() + } + // Make final adjustment. + total -= adjust + + return total, validThrough +} + // SubjectsTotal return message totals per subject. func (fs *fileStore) SubjectsTotals(filter string) map[string]uint64 { fs.mu.RLock() diff --git a/server/filestore_test.go b/server/filestore_test.go index cda2b72ea95..fbd6a9ddfe8 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -40,6 +40,7 @@ import ( "time" "github.com/klauspost/compress/s2" + "github.com/nats-io/nuid" ) func testFileStoreAllPermutations(t *testing.T, fn func(t *testing.T, fcfg FileStoreConfig)) { @@ -8245,3 +8246,52 @@ func TestFileStoreRecoverFullStateDetectCorruptState(t *testing.T) { err = fs.recoverFullState() require_Error(t, err, errCorruptState) } + +func TestFileStoreNumPendingMulti(t *testing.T) { + fs, err := newFileStore( + FileStoreConfig{StoreDir: t.TempDir()}, + StreamConfig{Name: "zzz", Subjects: []string{"ev.*"}, Storage: FileStorage}) + require_NoError(t, err) + defer fs.Stop() + + totalMsgs := 100_000 + totalSubjects := 10_000 + numFiltered := 5000 + startSeq := uint64(5_000 + rand.Intn(90_000)) + + subjects := make([]string, 0, totalSubjects) + for i := 0; i < totalSubjects; i++ { + subjects = append(subjects, fmt.Sprintf("ev.%s", nuid.Next())) + } + + // Put in 100k msgs with random subjects. + msg := bytes.Repeat([]byte("ZZZ"), 333) + for i := 0; i < totalMsgs; i++ { + _, _, err = fs.StoreMsg(subjects[rand.Intn(totalSubjects)], nil, msg) + require_NoError(t, err) + } + + // Now we want to do a calculate NumPendingMulti. + filters := NewSublistNoCache() + for filters.Count() < uint32(numFiltered) { + filter := subjects[rand.Intn(totalSubjects)] + if !filters.HasInterest(filter) { + filters.Insert(&subscription{subject: []byte(filter)}) + } + } + + // Use new function. + total, _ := fs.NumPendingMulti(startSeq, filters, false) + + // Check our results. + var checkTotal uint64 + var smv StoreMsg + for seq := startSeq; seq <= uint64(totalMsgs); seq++ { + sm, err := fs.LoadMsg(seq, &smv) + require_NoError(t, err) + if filters.HasInterest(sm.subj) { + checkTotal++ + } + } + require_Equal(t, total, checkTotal) +} diff --git a/server/memstore.go b/server/memstore.go index 39008f10fcb..55e98882195 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -360,15 +360,13 @@ func (ms *memStore) FilteredState(sseq uint64, subj string) SimpleState { } func (ms *memStore) filteredStateLocked(sseq uint64, filter string, lastPerSubject bool) SimpleState { - var ss SimpleState - if sseq < ms.state.FirstSeq { sseq = ms.state.FirstSeq } // If past the end no results. if sseq > ms.state.LastSeq { - return ss + return SimpleState{} } if filter == _EMPTY_ { @@ -392,9 +390,10 @@ func (ms *memStore) filteredStateLocked(sseq uint64, filter string, lastPerSubje _tsa, _fsa := [32]string{}, [32]string{} tsa, fsa := _tsa[:0], _fsa[:0] - fsa = tokenizeSubjectIntoSlice(fsa[:0], filter) wc := subjectHasWildcard(filter) - + if wc { + fsa = tokenizeSubjectIntoSlice(fsa[:0], filter) + } // 1. See if we match any subs from fss. // 2. If we match and the sseq is past ss.Last then we can use meta only. // 3. If we match we need to do a partial, break and clear any totals and do a full scan like num pending. @@ -410,6 +409,7 @@ func (ms *memStore) filteredStateLocked(sseq uint64, filter string, lastPerSubje return isSubsetMatchTokenized(tsa, fsa) } + var ss SimpleState update := func(fss *SimpleState) { msgs, first, last := fss.Msgs, fss.First, fss.Last if lastPerSubject { @@ -425,6 +425,7 @@ func (ms *memStore) filteredStateLocked(sseq uint64, filter string, lastPerSubje } var havePartial bool + var totalSkipped uint64 // We will track start and end sequences as we go. ms.fss.Match(stringToBytes(filter), func(subj []byte, fss *SimpleState) { if fss.firstNeedsUpdate { @@ -437,6 +438,8 @@ func (ms *memStore) filteredStateLocked(sseq uint64, filter string, lastPerSubje havePartial = true // Don't break here, we will update to keep tracking last. update(fss) + } else { + totalSkipped += fss.Msgs } }) @@ -493,6 +496,7 @@ func (ms *memStore) filteredStateLocked(sseq uint64, filter string, lastPerSubje } else { // We will adjust from the totals above by scanning what we need to exclude. ss.First = first + ss.Msgs += totalSkipped var adjust uint64 var tss *SimpleState @@ -679,6 +683,158 @@ func (ms *memStore) NumPending(sseq uint64, filter string, lastPerSubject bool) return ss.Msgs, ms.state.LastSeq } +// NumPending will return the number of pending messages matching any subject in the sublist starting at sequence. +func (ms *memStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bool) (total, validThrough uint64) { + if sl == nil { + return ms.NumPending(sseq, fwcs, lastPerSubject) + } + + // This needs to be a write lock, as we can mutate the per-subject state. + ms.mu.Lock() + defer ms.mu.Unlock() + + var ss SimpleState + if sseq < ms.state.FirstSeq { + sseq = ms.state.FirstSeq + } + // If past the end no results. + if sseq > ms.state.LastSeq { + return 0, ms.state.LastSeq + } + + update := func(fss *SimpleState) { + msgs, first, last := fss.Msgs, fss.First, fss.Last + if lastPerSubject { + msgs, first = 1, last + } + ss.Msgs += msgs + if ss.First == 0 || first < ss.First { + ss.First = first + } + if last > ss.Last { + ss.Last = last + } + } + + var havePartial bool + var totalSkipped uint64 + // We will track start and end sequences as we go. + ms.fss.Iter(func(subj []byte, fss *SimpleState) bool { + if !sl.HasInterest(bytesToString(subj)) { + return true + } + if fss.firstNeedsUpdate { + ms.recalculateFirstForSubj(bytesToString(subj), fss.First, fss) + } + if sseq <= fss.First { + update(fss) + } else if sseq <= fss.Last { + // We matched but it is a partial. + havePartial = true + // Don't break here, we will update to keep tracking last. + update(fss) + } else { + totalSkipped += fss.Msgs + } + return true + }) + + // If we did not encounter any partials we can return here. + if !havePartial { + return ss.Msgs, ms.state.LastSeq + } + + // If we are here we need to scan the msgs. + // Capture first and last sequences for scan and then clear what we had. + first, last := ss.First, ss.Last + // To track if we decide to exclude we need to calculate first. + if first < sseq { + first = sseq + } + + // Now we want to check if it is better to scan inclusive and recalculate that way + // or leave and scan exclusive and adjust our totals. + // ss.Last is always correct here. + toScan, toExclude := last-first, first-ms.state.FirstSeq+ms.state.LastSeq-ss.Last + var seen map[string]bool + if lastPerSubject { + seen = make(map[string]bool) + } + if toScan < toExclude { + ss.Msgs, ss.First = 0, 0 + + update := func(sm *StoreMsg) { + ss.Msgs++ + if ss.First == 0 { + ss.First = sm.seq + } + if seen != nil { + seen[sm.subj] = true + } + } + // Check if easier to just scan msgs vs the sequence range. + // This can happen with lots of interior deletes. + if last-first > uint64(len(ms.msgs)) { + for _, sm := range ms.msgs { + if sm.seq >= first && sm.seq <= last && !seen[sm.subj] && sl.HasInterest(sm.subj) { + update(sm) + } + } + } else { + for seq := first; seq <= last; seq++ { + if sm, ok := ms.msgs[seq]; ok && !seen[sm.subj] && sl.HasInterest(sm.subj) { + update(sm) + } + } + } + } else { + // We will adjust from the totals above by scanning what we need to exclude. + ss.First = first + ss.Msgs += totalSkipped + var adjust uint64 + var tss *SimpleState + + update := func(sm *StoreMsg) { + if lastPerSubject { + tss, _ = ms.fss.Find(stringToBytes(sm.subj)) + } + // If we are last per subject, make sure to only adjust if all messages are before our first. + if tss == nil || tss.Last < first { + adjust++ + } + if seen != nil { + seen[sm.subj] = true + } + } + // Check if easier to just scan msgs vs the sequence range. + if first-ms.state.FirstSeq > uint64(len(ms.msgs)) { + for _, sm := range ms.msgs { + if sm.seq < first && !seen[sm.subj] && sl.HasInterest(sm.subj) { + update(sm) + } + } + } else { + for seq := ms.state.FirstSeq; seq < first; seq++ { + if sm, ok := ms.msgs[seq]; ok && !seen[sm.subj] && sl.HasInterest(sm.subj) { + update(sm) + } + } + } + // Now do range at end. + for seq := last + 1; seq < ms.state.LastSeq; seq++ { + if sm, ok := ms.msgs[seq]; ok && !seen[sm.subj] && sl.HasInterest(sm.subj) { + adjust++ + if seen != nil { + seen[sm.subj] = true + } + } + } + ss.Msgs -= adjust + } + + return ss.Msgs, ms.state.LastSeq +} + // Will check the msg limit for this tracked subject. // Lock should be held. func (ms *memStore) enforcePerSubjectLimit(subj string, ss *SimpleState) { diff --git a/server/memstore_test.go b/server/memstore_test.go index 867249a5549..cfc2aa95bd5 100644 --- a/server/memstore_test.go +++ b/server/memstore_test.go @@ -24,6 +24,8 @@ import ( "reflect" "testing" "time" + + "github.com/nats-io/nuid" ) func TestMemStoreBasics(t *testing.T) { @@ -1077,6 +1079,88 @@ func TestMemStoreDeleteAllFirstSequenceCheck(t *testing.T) { require_Equal(t, state.Msgs, 0) } +func TestMemStoreNumPendingMulti(t *testing.T) { + cfg := &StreamConfig{ + Name: "zzz", + Subjects: []string{"ev.*"}, + Storage: MemoryStorage, + } + ms, err := newMemStore(cfg) + require_NoError(t, err) + defer ms.Stop() + + totalMsgs := 100_000 + totalSubjects := 10_000 + numFiltered := 5000 + startSeq := uint64(5_000 + rand.Intn(90_000)) + + subjects := make([]string, 0, totalSubjects) + for i := 0; i < totalSubjects; i++ { + subjects = append(subjects, fmt.Sprintf("ev.%s", nuid.Next())) + } + + // Put in 100k msgs with random subjects. + msg := bytes.Repeat([]byte("ZZZ"), 333) + for i := 0; i < totalMsgs; i++ { + _, _, err = ms.StoreMsg(subjects[rand.Intn(totalSubjects)], nil, msg) + require_NoError(t, err) + } + + // Now we want to do a calculate NumPendingMulti. + filters := NewSublistNoCache() + for filters.Count() < uint32(numFiltered) { + filter := subjects[rand.Intn(totalSubjects)] + if !filters.HasInterest(filter) { + filters.Insert(&subscription{subject: []byte(filter)}) + } + } + + // Use new function. + total, _ := ms.NumPendingMulti(startSeq, filters, false) + + // Check our results. + var checkTotal uint64 + var smv StoreMsg + for seq := startSeq; seq <= uint64(totalMsgs); seq++ { + sm, err := ms.LoadMsg(seq, &smv) + require_NoError(t, err) + if filters.HasInterest(sm.subj) { + checkTotal++ + } + } + require_Equal(t, total, checkTotal) +} + +func TestMemStoreNumPendingBug(t *testing.T) { + cfg := &StreamConfig{ + Name: "zzz", + Subjects: []string{"foo.*"}, + Storage: MemoryStorage, + } + ms, err := newMemStore(cfg) + require_NoError(t, err) + defer ms.Stop() + + // 12 msgs total + for _, subj := range []string{"foo.foo", "foo.bar", "foo.baz", "foo.zzz"} { + ms.StoreMsg("foo.aaa", nil, nil) + ms.StoreMsg(subj, nil, nil) + ms.StoreMsg(subj, nil, nil) + } + total, _ := ms.NumPending(4, "foo.*", false) + + var checkTotal uint64 + var smv StoreMsg + for seq := 4; seq <= 12; seq++ { + sm, err := ms.LoadMsg(uint64(seq), &smv) + require_NoError(t, err) + if subjectIsSubsetMatch(sm.subj, "foo.*") { + checkTotal++ + } + } + require_Equal(t, total, checkTotal) +} + /////////////////////////////////////////////////////////////////////////// // Benchmarks /////////////////////////////////////////////////////////////////////////// diff --git a/server/store.go b/server/store.go index f31cf220833..6d894b28fb3 100644 --- a/server/store.go +++ b/server/store.go @@ -104,6 +104,7 @@ type StreamStore interface { SubjectsTotals(filterSubject string) map[string]uint64 MultiLastSeqs(filters []string, maxSeq uint64, maxAllowed int) ([]uint64, error) NumPending(sseq uint64, filter string, lastPerSubject bool) (total, validThrough uint64) + NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bool) (total, validThrough uint64) State() StreamState FastState(*StreamState) EncodedStreamState(failed uint64) (enc []byte, err error)