From 607849b7f82a9a6201508b00e976f6ec0d76c5e6 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 16 Apr 2025 18:13:41 -0400 Subject: [PATCH 1/6] Spelling fixes Signed-off-by: Derek Collison --- server/pse/pse_freebsd_sysctl.go | 4 ++-- server/stream.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/server/pse/pse_freebsd_sysctl.go b/server/pse/pse_freebsd_sysctl.go index 09010417ee0..f4ea1993409 100644 --- a/server/pse/pse_freebsd_sysctl.go +++ b/server/pse/pse_freebsd_sysctl.go @@ -1,4 +1,4 @@ -// Copyright 2015-2020 The NATS Authors +// Copyright 2015-2025 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -22,7 +22,7 @@ // We've switched the other implementation to include '_cgo' in the filename, // to show that it's not the default. This isn't an os or arch build tag, // so we have to use explicit build-tags within. -// If lacking CGO support and targetting an unsupported arch, then before the +// If lacking CGO support and targeting an unsupported arch, then before the // change you would have a compile failure for not being able to cross-compile. // After the change, you have a compile failure for not having the symbols // because no source file satisfies them. diff --git a/server/stream.go b/server/stream.go index d46535037e1..a8f9d26fbf3 100644 --- a/server/stream.go +++ b/server/stream.go @@ -2279,7 +2279,7 @@ func (mset *stream) purge(preq *JSApiStreamPurgeRequest) (purged uint64, err err // or consumer filter subject is subset of purged subject, // but not the other way around. o.isEqualOrSubsetMatch(preq.Subject) - // Check if a consumer has a wider subject space then what we purged + // Check if a consumer has a wider subject space than what we purged var isWider bool if !doPurge && preq != nil && o.isFilteredMatch(preq.Subject) { doPurge, isWider = true, true From 235b666e50c687fb9b31be39a82fcd1a46649a54 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 16 Apr 2025 18:23:03 -0400 Subject: [PATCH 2/6] Optimization to not allocate []byte Signed-off-by: Derek Collison --- server/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/client.go b/server/client.go index 64c0c3b9429..c6e76a954e2 100644 --- a/server/client.go +++ b/server/client.go @@ -4329,7 +4329,7 @@ func sliceHeader(key string, hdr []byte) []byte { if len(hdr) == 0 { return nil } - index := bytes.Index(hdr, []byte(key)) + index := bytes.Index(hdr, stringToBytes(key)) hdrLen := len(hdr) // Check that we have enough characters, this will handle the -1 case of the key not // being found and will also handle not having enough characters for trailing CRLF. From 6bad97cffd2a786416404ca1b38b9cc36bbe629d Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 16 Apr 2025 18:30:08 -0400 Subject: [PATCH 3/6] Optimized gathering skip list for last by subject consumers Signed-off-by: Derek Collison --- server/consumer.go | 30 ++++++++---------------------- 1 file changed, 8 insertions(+), 22 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index a441663bf8e..4852c9b59fa 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -5322,9 +5322,6 @@ func (o *consumer) selectStartingSeqNo() { if mmp == 1 { o.sseq = state.FirstSeq } else { - // A threshold for when we switch from get last msg to subjects state. - const numSubjectsThresh = 256 - lss := &lastSeqSkipList{resume: state.LastSeq} var filters []string if o.subjf == nil { filters = append(filters, o.cfg.FilterSubject) @@ -5333,24 +5330,10 @@ func (o *consumer) selectStartingSeqNo() { filters = append(filters, filter.subject) } } - for _, filter := range filters { - if st := o.mset.store.SubjectsTotals(filter); len(st) < numSubjectsThresh { - var smv StoreMsg - for subj := range st { - if sm, err := o.mset.store.LoadLastMsg(subj, &smv); err == nil { - lss.seqs = append(lss.seqs, sm.seq) - } - } - } else if mss := o.mset.store.SubjectsState(filter); len(mss) > 0 { - for _, ss := range mss { - lss.seqs = append(lss.seqs, ss.Last) - } - } - } - // Sort the skip list if needed. - if len(lss.seqs) > 1 { - slices.Sort(lss.seqs) - } + + lss := &lastSeqSkipList{resume: state.LastSeq} + lss.seqs, _ = o.mset.store.MultiLastSeqs(filters, 0, 0) + if len(lss.seqs) == 0 { o.sseq = state.LastSeq } else { @@ -5874,7 +5857,10 @@ func (o *consumer) decStreamPending(sseq uint64, subj string) { // Check if this message was pending. p, wasPending := o.pending[sseq] - rdc := o.deliveryCount(sseq) + var rdc uint64 + if wasPending { + rdc = o.deliveryCount(sseq) + } o.mu.Unlock() From 05d362fff8cff2801eff1bd128e28616045a7fdc Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 16 Apr 2025 18:32:21 -0400 Subject: [PATCH 4/6] Various optimizations and introduction of AllLastSeqs(). Filestore specific items. 1. Internal calls to fetch a msg now use a NoCopy version to avoid unecessary allocations. 2. We introduced a last purge time to detect repeated calls to Purge from a client KV PurgeDeletes operation. 3. Access times for cache lifetime management are now from a single ticker updating an atomic every 100ms. 4. filteredPendingLocked now detects non-wildcard filters. 5. AllLastSeqs() is an optimized version to return all last sequences for all subjects. Helpful for KV watchers. 6. Optimized MultiLastSeqs() and allow it to call into AllLastSeqs() when appropriate. 7. Allow multiple tombstones to be written async and flushed all at once. Signed-off-by: Derek Collison --- server/filestore.go | 593 +++++++++++++++++++++++++++------------ server/filestore_test.go | 120 +++++++- server/memstore.go | 22 +- server/memstore_test.go | 190 +++++++++++++ server/store.go | 1 + 5 files changed, 738 insertions(+), 188 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index cd62de47815..89dc68614b3 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -203,6 +203,7 @@ type fileStore struct { firstMoved bool ttls *thw.HashWheel sdm *SDMMeta + lpex time.Time // Last PurgeEx call. } // Represents a message store block and its data. @@ -1909,7 +1910,7 @@ func (fs *fileStore) recoverTTLState() error { // beginning and see if we need to skip this one too. goto retry } - msg, _, err := mb.fetchMsg(seq, &sm) + msg, _, err := mb.fetchMsgNoCopy(seq, &sm) if err != nil { fs.warn("Error loading msg seq %d for recovering TTL: %s", seq, err) continue @@ -2192,7 +2193,7 @@ func (fs *fileStore) expireMsgsOnRecover() error { // Walk messages and remove if expired. fseq, lseq := atomic.LoadUint64(&mb.first.seq), atomic.LoadUint64(&mb.last.seq) for seq := fseq; seq <= lseq; seq++ { - sm, err := mb.cacheLookup(seq, &smv) + sm, err := mb.cacheLookupNoCopy(seq, &smv) // Process interior deleted msgs. if err == errDeletedMsg { // Update dmap. @@ -2337,7 +2338,7 @@ func (fs *fileStore) GetSeqFromTime(t time.Time) uint64 { // Linear search, hence the dumb part.. ts := t.UnixNano() for seq := fseq; seq <= lseq; seq++ { - sm, _, _ := mb.fetchMsg(seq, &smv) + sm, _, _ := mb.fetchMsgNoCopy(seq, &smv) if sm != nil && sm.ts >= ts { return sm.seq } @@ -2352,7 +2353,7 @@ func (mb *msgBlock) firstMatchingMulti(sl *Sublist, start uint64, sm *StoreMsg) var updateLLTS bool defer func() { if updateLLTS { - mb.llts = time.Now().UnixNano() + mb.llts = getAccessTime() } mb.mu.Unlock() }() @@ -2467,7 +2468,7 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor var updateLLTS bool defer func() { if updateLLTS { - mb.llts = time.Now().UnixNano() + mb.llts = getAccessTime() } mb.mu.Unlock() }() @@ -2481,7 +2482,7 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor didLoad = true } // Mark fss activity. - mb.lsts = time.Now().UnixNano() + mb.lsts = getAccessTime() if filter == _EMPTY_ { filter = fwcs @@ -2642,8 +2643,7 @@ func (mb *msgBlock) filteredPendingLocked(filter string, wc bool, sseq uint64) ( } if filter == _EMPTY_ { - filter = fwcs - wc = true + filter, wc = fwcs, true } update := func(ss *SimpleState) { @@ -2659,39 +2659,39 @@ func (mb *msgBlock) filteredPendingLocked(filter string, wc bool, sseq uint64) ( // Make sure we have fss loaded. mb.ensurePerSubjectInfoLoaded() - _tsa, _fsa := [32]string{}, [32]string{} - tsa, fsa := _tsa[:0], _fsa[:0] - fsa = tokenizeSubjectIntoSlice(fsa[:0], filter) - - // 1. See if we match any subs from fss. - // 2. If we match and the sseq is past ss.Last then we can use meta only. - // 3. If we match and we need to do a partial, break and clear any totals and do a full scan like num pending. + var havePartial bool - isMatch := func(subj string) bool { - if !wc { - return subj == filter + // If we are not a wildcard just use Find() here. Avoids allocations. + if !wc { + if ss, ok := mb.fss.Find(stringToBytes(filter)); ok && ss != nil { + if ss.firstNeedsUpdate || ss.lastNeedsUpdate { + mb.recalculateForSubj(filter, ss) + } + if sseq <= ss.First { + update(ss) + } else if sseq <= ss.Last { + // We matched but its a partial. + havePartial = true + } } - tsa = tokenizeSubjectIntoSlice(tsa[:0], subj) - return isSubsetMatchTokenized(tsa, fsa) + } else { + mb.fss.Match(stringToBytes(filter), func(bsubj []byte, ss *SimpleState) { + if havePartial { + // If we already found a partial then don't do anything else. + return + } + if ss.firstNeedsUpdate || ss.lastNeedsUpdate { + mb.recalculateForSubj(bytesToString(bsubj), ss) + } + if sseq <= ss.First { + update(ss) + } else if sseq <= ss.Last { + // We matched but its a partial. + havePartial = true + } + }) } - var havePartial bool - mb.fss.Match(stringToBytes(filter), func(bsubj []byte, ss *SimpleState) { - if havePartial { - // If we already found a partial then don't do anything else. - return - } - if ss.firstNeedsUpdate || ss.lastNeedsUpdate { - mb.recalculateForSubj(bytesToString(bsubj), ss) - } - if sseq <= ss.First { - update(ss) - } else if sseq <= ss.Last { - // We matched but its a partial. - havePartial = true - } - }) - // If we did not encounter any partials we can return here. if !havePartial { return total, first, last @@ -2708,9 +2708,27 @@ func (mb *msgBlock) filteredPendingLocked(filter string, wc bool, sseq uint64) ( shouldExpire = true } + _tsa, _fsa := [32]string{}, [32]string{} + tsa, fsa := _tsa[:0], _fsa[:0] + var isMatch func(subj string) bool + + if !wc { + isMatch = func(subj string) bool { return subj == filter } + } else { + fsa = tokenizeSubjectIntoSlice(fsa[:0], filter) + isMatch = func(subj string) bool { + tsa = tokenizeSubjectIntoSlice(tsa[:0], subj) + return isSubsetMatchTokenized(tsa, fsa) + } + } + + // 1. See if we match any subs from fss. + // 2. If we match and the sseq is past ss.Last then we can use meta only. + // 3. If we match and we need to do a partial, break and clear any totals and do a full scan like num pending. + var smv StoreMsg for seq, lseq := sseq, atomic.LoadUint64(&mb.last.seq); seq <= lseq; seq++ { - sm, _ := mb.cacheLookup(seq, &smv) + sm, _ := mb.cacheLookupNoCopy(seq, &smv) if sm == nil { continue } @@ -2978,7 +2996,7 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState { shouldExpire = true } // Mark fss activity. - mb.lsts = time.Now().UnixNano() + mb.lsts = getAccessTime() mb.fss.Match(stringToBytes(subject), func(bsubj []byte, ss *SimpleState) { subj := string(bsubj) if ss.firstNeedsUpdate || ss.lastNeedsUpdate { @@ -3007,6 +3025,69 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState { return fss } +// AllLastSeqs will return a sorted list of last sequences for all subjects. +func (fs *fileStore) AllLastSeqs() ([]uint64, error) { + fs.mu.RLock() + defer fs.mu.RUnlock() + + if fs.state.Msgs == 0 || fs.noTrackSubjects() { + return nil, nil + } + + numSubjects := fs.psim.Size() + seqs := make([]uint64, 0, numSubjects) + subs := make(map[string]struct{}, numSubjects) + + for i := len(fs.blks) - 1; i >= 0; i-- { + if len(subs) == numSubjects { + break + } + mb := fs.blks[i] + mb.mu.Lock() + + var shouldExpire bool + if mb.fssNotLoaded() { + // Make sure we have fss loaded. + mb.loadMsgsWithLock() + shouldExpire = true + } + + mb.fss.IterFast(func(bsubj []byte, ss *SimpleState) bool { + // Check if already been processed and accounted. + if _, ok := subs[string(bsubj)]; !ok { + seqs = append(seqs, ss.Last) + subs[string(bsubj)] = struct{}{} + } + return true + }) + if shouldExpire { + // Expire this cache before moving on. + mb.tryForceExpireCacheLocked() + } + mb.mu.Unlock() + } + + slices.Sort(seqs) + return seqs, nil +} + +// Helper to determine if the filter(s) represent all the subjects. +// Most clients send in subjects even if they match the stream's ingest subjects. +// Lock should be held. +func (fs *fileStore) filterIsAll(filters []string) bool { + if len(filters) != len(fs.cfg.Subjects) { + return false + } + // Sort so we can compare. + slices.Sort(filters) + for i, subj := range filters { + if !subjectIsSubsetMatch(fs.cfg.Subjects[i], subj) { + return false + } + } + return true +} + // MultiLastSeqs will return a sorted list of sequences that match all subjects presented in filters. // We will not exceed the maxSeq, which if 0 becomes the store's last sequence. func (fs *fileStore) MultiLastSeqs(filters []string, maxSeq uint64, maxAllowed int) ([]uint64, error) { @@ -3017,6 +3098,11 @@ func (fs *fileStore) MultiLastSeqs(filters []string, maxSeq uint64, maxAllowed i return nil, nil } + // See if we can short circuit if we think they are asking for all last sequences and have no maxSeq or maxAllowed set. + if maxSeq == 0 && maxAllowed <= 0 && fs.filterIsAll(filters) { + return fs.AllLastSeqs() + } + lastBlkIndex := len(fs.blks) - 1 lastMB := fs.blks[lastBlkIndex] @@ -3027,7 +3113,7 @@ func (fs *fileStore) MultiLastSeqs(filters []string, maxSeq uint64, maxAllowed i // Udate last mb index if not last seq. lastBlkIndex, lastMB = fs.selectMsgBlockWithIndex(maxSeq) } - //Make sure non-nil + // Make sure non-nil if lastMB == nil { return nil, nil } @@ -3038,26 +3124,24 @@ func (fs *fileStore) MultiLastSeqs(filters []string, maxSeq uint64, maxAllowed i lastMB.mu.RUnlock() subs := make(map[string]*psi) - ltSeen := make(map[string]uint32) + var numLess int + var maxBlk uint32 + for _, filter := range filters { fs.psim.Match(stringToBytes(filter), func(subj []byte, psi *psi) { - s := string(subj) - subs[s] = psi + subs[string(subj)] = psi if psi.lblk < lastMBIndex { - ltSeen[s] = psi.lblk + numLess++ + if psi.lblk > maxBlk { + maxBlk = psi.lblk + } } }) } // If all subjects have a lower last index, select the largest for our walk backwards. - if len(ltSeen) == len(subs) { - max := uint32(0) - for _, mbi := range ltSeen { - if mbi > max { - max = mbi - } - } - lastMB = fs.bim[max] + if numLess == len(subs) { + lastMB = fs.bim[maxBlk] } // Collect all sequences needed. @@ -3076,42 +3160,53 @@ func (fs *fileStore) MultiLastSeqs(filters []string, maxSeq uint64, maxAllowed i // We can start properly looking here. mb.mu.Lock() mb.ensurePerSubjectInfoLoaded() - for subj, psi := range subs { - if ss, ok := mb.fss.Find(stringToBytes(subj)); ok && ss != nil { - if ss.Last <= maxSeq { - seqs = append(seqs, ss.Last) - delete(subs, subj) - } else { - // Need to search for it since last is > maxSeq. - if mb.cacheNotLoaded() { - mb.loadMsgsWithLock() - } - var smv StoreMsg - fseq := atomic.LoadUint64(&mb.first.seq) - for seq := maxSeq; seq >= fseq; seq-- { - sm, _ := mb.cacheLookup(seq, &smv) - if sm == nil || sm.subj != subj { - continue - } - seqs = append(seqs, sm.seq) - delete(subs, subj) - break + + // Iterate the fss and check against our subs. We will delete from subs as we add. + // Once len(subs) == 0 we are done. + mb.fss.IterFast(func(bsubj []byte, ss *SimpleState) bool { + // Already been processed and accounted for was not matched in the first place. + if subs[string(bsubj)] == nil { + return true + } + // Check if we need to recalculate. We only care about the last sequence. + if ss.lastNeedsUpdate { + // mb is already loaded into the cache so should be fast-ish. + mb.recalculateForSubj(bytesToString(bsubj), ss) + } + // If we are equal or below just add to seqs slice. + if ss.Last <= maxSeq { + seqs = append(seqs, ss.Last) + delete(subs, string(bsubj)) + } else { + // Need to search for the real last since recorded last is > maxSeq. + if mb.cacheNotLoaded() { + mb.loadMsgsWithLock() + } + var smv StoreMsg + fseq := atomic.LoadUint64(&mb.first.seq) + lseq := atomic.LoadUint64(&mb.last.seq) + if lseq > maxSeq { + lseq = maxSeq + } + ssubj := bytesToString(bsubj) + for seq := lseq; seq >= fseq; seq-- { + sm, _ := mb.cacheLookupNoCopy(seq, &smv) + if sm == nil || sm.subj != ssubj { + continue } + seqs = append(seqs, sm.seq) + delete(subs, ssubj) + break } - } else if mb.index <= psi.fblk { - // Track which subs are no longer applicable, meaning we will not find a valid msg at this point. - delete(subs, subj) } - // TODO(dlc) we could track lblk like above in case some subs are very far apart. - // Not too bad if fss loaded since we will skip over quickly with it loaded, but might be worth it. - } + return true + }) mb.mu.Unlock() // If maxAllowed was sepcified check that we will not exceed that. if maxAllowed > 0 && len(seqs) > maxAllowed { return nil, ErrTooManyResults } - } if len(seqs) == 0 { return nil, nil @@ -3233,7 +3328,7 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool) updateLLTS = true continue } - sm, _ := mb.cacheLookup(seq, &smv) + sm, _ := mb.cacheLookupNoCopy(seq, &smv) if sm == nil || sm.subj == _EMPTY_ || !lbm[sm.subj] { continue } @@ -3259,7 +3354,7 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool) mb.tryForceExpireCacheLocked() } if updateLLTS { - mb.llts = time.Now().UnixNano() + mb.llts = getAccessTime() } mb.mu.Unlock() return total, validThrough @@ -3285,7 +3380,7 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool) shouldExpire = true } // Mark fss activity. - mb.lsts = time.Now().UnixNano() + mb.lsts = getAccessTime() var t uint64 var havePartial bool @@ -3321,7 +3416,7 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool) } var smv StoreMsg for seq, lseq := start, atomic.LoadUint64(&mb.last.seq); seq <= lseq; seq++ { - if sm, _ := mb.cacheLookup(seq, &smv); sm != nil && isMatch(sm.subj) { + if sm, _ := mb.cacheLookupNoCopy(seq, &smv); sm != nil && isMatch(sm.subj) { t++ } } @@ -3385,7 +3480,7 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool) shouldExpire = true } // Mark fss activity. - mb.lsts = time.Now().UnixNano() + mb.lsts = getAccessTime() mb.fss.Match(stringToBytes(filter), func(bsubj []byte, ss *SimpleState) { adjust += ss.Msgs @@ -3409,7 +3504,7 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool) updateLLTS = true continue } - sm, _ := mb.cacheLookup(seq, &smv) + sm, _ := mb.cacheLookupNoCopy(seq, &smv) if sm == nil || sm.subj == _EMPTY_ { continue } @@ -3425,7 +3520,7 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool) mb.tryForceExpireCacheLocked() } if updateLLTS { - mb.llts = time.Now().UnixNano() + mb.llts = getAccessTime() } mb.mu.Unlock() } @@ -3542,7 +3637,7 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo updateLLTS = true continue } - sm, _ := mb.cacheLookup(seq, &smv) + sm, _ := mb.cacheLookupNoCopy(seq, &smv) if sm == nil || sm.subj == _EMPTY_ || !lbm[sm.subj] { continue } @@ -3568,7 +3663,7 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo mb.tryForceExpireCacheLocked() } if updateLLTS { - mb.llts = time.Now().UnixNano() + mb.llts = getAccessTime() } mb.mu.Unlock() return total, validThrough @@ -3593,7 +3688,7 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo shouldExpire = true } // Mark fss activity. - mb.lsts = time.Now().UnixNano() + mb.lsts = getAccessTime() var t uint64 var havePartial bool @@ -3635,7 +3730,7 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo updateLLTS = true continue } - if sm, _ := mb.cacheLookup(seq, &smv); sm != nil && isMatch(sm.subj) { + if sm, _ := mb.cacheLookupNoCopy(seq, &smv); sm != nil && isMatch(sm.subj) { t++ updateLLTS = false // cacheLookup already updated it. } @@ -3646,7 +3741,7 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo mb.tryForceExpireCacheLocked() } if updateLLTS { - mb.llts = time.Now().UnixNano() + mb.llts = getAccessTime() } mb.mu.Unlock() total += t @@ -3706,7 +3801,7 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo shouldExpire = true } // Mark fss activity. - mb.lsts = time.Now().UnixNano() + mb.lsts = getAccessTime() IntersectStree(mb.fss, sl, func(bsubj []byte, ss *SimpleState) { adjust += ss.Msgs }) @@ -3729,7 +3824,7 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo updateLLTS = true continue } - sm, _ := mb.cacheLookup(seq, &smv) + sm, _ := mb.cacheLookupNoCopy(seq, &smv) if sm == nil || sm.subj == _EMPTY_ { continue } @@ -3745,7 +3840,7 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo mb.tryForceExpireCacheLocked() } if updateLLTS { - mb.llts = time.Now().UnixNano() + mb.llts = getAccessTime() } mb.mu.Unlock() } @@ -3830,7 +3925,7 @@ func (mb *msgBlock) setupWriteCache(buf []byte) { if fi != nil { mb.cache.off = int(fi.Size()) } - mb.llts = time.Now().UnixNano() + mb.llts = getAccessTime() mb.startCacheExpireTimer() } @@ -3861,8 +3956,7 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) { mb.fss = stree.NewSubjectTree[SimpleState]() // Set cache time to creation time to start. - ts := time.Now().UnixNano() - mb.llts, mb.lwts = 0, ts + mb.llts, mb.lwts = 0, getAccessTime() // Remember our last sequence number. atomic.StoreUint64(&mb.first.seq, fs.state.LastSeq+1) atomic.StoreUint64(&mb.last.seq, fs.state.LastSeq) @@ -4120,8 +4214,7 @@ func (mb *msgBlock) skipMsg(seq uint64, now time.Time) { return } var needsRecord bool - - nowts := now.UnixNano() + nowts := getAccessTime() mb.mu.Lock() // If we are empty can just do meta. @@ -4160,21 +4253,13 @@ func (fs *fileStore) SkipMsg() uint64 { defer fs.mu.Unlock() // Grab our current last message block. - mb := fs.lmb - if mb == nil || mb.msgs > 0 && mb.blkSize()+emptyRecordLen > fs.fcfg.BlockSize { - if mb != nil && fs.fcfg.Compression != NoCompression { - // We've now reached the end of this message block, if we want - // to compress blocks then now's the time to do it. - go mb.recompressOnDiskIfNeeded() - } - var err error - if mb, err = fs.newMsgBlockForWrite(); err != nil { - return 0 - } + mb, err := fs.checkLastBlock(emptyRecordLen) + if err != nil { + return 0 } // Grab time and last seq. - now, seq := time.Now().UTC(), fs.state.LastSeq+1 + now, seq := time.Now(), fs.state.LastSeq+1 // Write skip msg. mb.skipMsg(seq, now) @@ -4227,7 +4312,7 @@ func (fs *fileStore) SkipMsgs(seq uint64, num uint64) error { } // Insert into dmap all entries and place last as marker. - now := time.Now().UTC() + now := time.Now() nowts := now.UnixNano() lseq := seq + num - 1 @@ -4321,7 +4406,7 @@ func (fs *fileStore) firstSeqForSubj(subj string) (uint64, error) { shouldExpire = true } // Mark fss activity. - mb.lsts = time.Now().UnixNano() + mb.lsts = getAccessTime() bsubj := stringToBytes(subj) if ss, ok := mb.fss.Find(bsubj); ok && ss != nil { @@ -4601,7 +4686,7 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) ( } var smv StoreMsg - sm, err := mb.cacheLookup(seq, &smv) + sm, err := mb.cacheLookupNoCopy(seq, &smv) if err != nil { mb.mu.Unlock() fsUnlock() @@ -4615,7 +4700,7 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) ( msz := fileStoreMsgSize(sm.subj, sm.hdr, sm.msg) // Set cache timestamp for last remove. - mb.lrts = time.Now().UnixNano() + mb.lrts = getAccessTime() // Global stats if fs.state.Msgs > 0 { @@ -4948,7 +5033,12 @@ func (fs *fileStore) isClosed() bool { func (mb *msgBlock) spinUpFlushLoop() { mb.mu.Lock() defer mb.mu.Unlock() + mb.spinUpFlushLoopLocked() +} +// Will spin up our flush loop. +// Lock should be held. +func (mb *msgBlock) spinUpFlushLoopLocked() { // Are we already running or closed? if mb.flusher || mb.closed { return @@ -5128,7 +5218,7 @@ func (mb *msgBlock) truncate(sm *StoreMsg) (nmsgs, nbytes uint64, err error) { } } // We should have a valid msg to calculate removal stats. - if m, err := mb.cacheLookup(seq, &smv); err == nil { + if m, err := mb.cacheLookupNoCopy(seq, &smv); err == nil { if mb.msgs > 0 { rl := fileStoreMsgSize(m.subj, m.hdr, m.msg) mb.msgs-- @@ -5248,11 +5338,11 @@ func (mb *msgBlock) selectNextFirst() { // Need to get the timestamp. // We will try the cache direct and fallback if needed. var smv StoreMsg - sm, _ := mb.cacheLookup(seq, &smv) + sm, _ := mb.cacheLookupNoCopy(seq, &smv) if sm == nil { // Slow path, need to unlock. mb.mu.Unlock() - sm, _, _ = mb.fetchMsg(seq, &smv) + sm, _, _ = mb.fetchMsgNoCopy(seq, &smv) mb.mu.Lock() } if sm != nil { @@ -5400,7 +5490,7 @@ func (mb *msgBlock) expireCacheLocked() { } // Grab timestamp to compare. - tns := time.Now().UnixNano() + tns := getAccessTime() // For the core buffer of messages, we care about reads and writes, but not removes. bufts := mb.llts @@ -5507,14 +5597,16 @@ func (fs *fileStore) expireMsgs() { // Reason is that we need more information to adjust ack pending in consumers. var smv StoreMsg var sm *StoreMsg + fs.mu.RLock() maxAge := int64(fs.cfg.MaxAge) - minAge := time.Now().UnixNano() - maxAge + minAge := getAccessTime() - maxAge rmcb := fs.rmcb sdmcb := fs.sdmcb sdmTTL := int64(fs.cfg.SubjectDeleteMarkerTTL.Seconds()) sdmEnabled := sdmTTL > 0 fs.mu.RUnlock() + if sdmEnabled && (rmcb == nil || sdmcb == nil) { return } @@ -5525,7 +5617,7 @@ func (fs *fileStore) expireMsgs() { if len(sm.hdr) > 0 { if ttl, err := getMessageTTL(sm.hdr); err == nil && ttl < 0 { // The message has a negative TTL, therefore it must "never expire". - minAge = time.Now().UnixNano() - maxAge + minAge = getAccessTime() - maxAge continue } } @@ -5542,7 +5634,7 @@ func (fs *fileStore) expireMsgs() { fs.mu.Unlock() } // Recalculate in case we are expiring a bunch. - minAge = time.Now().UnixNano() - maxAge + minAge = getAccessTime() - maxAge } } @@ -5736,7 +5828,7 @@ func (mb *msgBlock) enableForWriting(fip bool) error { // Spin up our flusher loop if needed. if !fip { - mb.spinUpFlushLoop() + mb.spinUpFlushLoopLocked() } return nil @@ -5752,10 +5844,16 @@ func (mb *msgBlock) writeTombstone(seq uint64, ts int64) error { func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte, ts int64, flush bool) error { mb.mu.Lock() defer mb.mu.Unlock() + return mb.writeMsgRecordLocked(rl, seq, subj, mhdr, msg, ts, flush, true) +} +// Will write the message record to the underlying message block. +// filestore lock will be held. +// mb lock should be held. +func (mb *msgBlock) writeMsgRecordLocked(rl, seq uint64, subj string, mhdr, msg []byte, ts int64, flush, kick bool) error { // Enable for writing if our mfd is not open. if mb.mfd == nil { - if err := mb.enableForWriting(flush); err != nil { + if err := mb.enableForWriting(flush && kick); err != nil { return err } } @@ -5774,7 +5872,7 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte return err } // Mark fss activity. - mb.lsts = time.Now().UnixNano() + mb.lsts = getAccessTime() if ss, ok := mb.fss.Find(stringToBytes(subj)); ok && ss != nil { ss.Msgs++ ss.Last = seq @@ -5867,7 +5965,7 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte if err != nil { return err } - } else { + } else if kick { // Kick the flusher here. kickFlusher(fch) } @@ -5941,6 +6039,9 @@ func (mb *msgBlock) bytesPending() ([]byte, error) { // Returns the current blkSize including deleted msgs etc. func (mb *msgBlock) blkSize() uint64 { + if mb == nil { + return 0 + } mb.mu.RLock() nb := mb.rbytes mb.mu.RUnlock() @@ -5970,32 +6071,41 @@ func (mb *msgBlock) updateAccounting(seq uint64, ts int64, rl uint64) { } } +// Helper to check last msg block and create new one if too big. // Lock should be held. -func (fs *fileStore) writeMsgRecord(seq uint64, ts int64, subj string, hdr, msg []byte) (uint64, error) { - var err error +func (fs *fileStore) checkLastBlock(rl uint64) (lmb *msgBlock, err error) { + // Grab our current last message block. + lmb = fs.lmb + rbytes := lmb.blkSize() + if lmb == nil || (rbytes > 0 && rbytes+rl > fs.fcfg.BlockSize) { + if lmb != nil && fs.fcfg.Compression != NoCompression { + // We've now reached the end of this message block, if we want + // to compress blocks then now's the time to do it. + go lmb.recompressOnDiskIfNeeded() + } + if lmb, err = fs.newMsgBlockForWrite(); err != nil { + return nil, err + } + } + return lmb, nil +} +// Lock should be held. +func (fs *fileStore) writeMsgRecord(seq uint64, ts int64, subj string, hdr, msg []byte) (uint64, error) { // Get size for this message. rl := fileStoreMsgSize(subj, hdr, msg) if rl&hbit != 0 || rl > rlBadThresh { return 0, ErrMsgTooLarge } // Grab our current last message block. - mb := fs.lmb + mb, err := fs.checkLastBlock(rl) + if err != nil { + return 0, err + } // Mark as dirty for stream state. fs.dirty++ - if mb == nil || mb.msgs > 0 && mb.blkSize()+rl > fs.fcfg.BlockSize { - if mb != nil && fs.fcfg.Compression != NoCompression { - // We've now reached the end of this message block, if we want - // to compress blocks then now's the time to do it. - go mb.recompressOnDiskIfNeeded() - } - if mb, err = fs.newMsgBlockForWrite(); err != nil { - return 0, err - } - } - // Ask msg block to store in write through cache. err = mb.writeMsgRecord(rl, seq, subj, hdr, msg, ts, fs.fip) @@ -6006,19 +6116,28 @@ func (fs *fileStore) writeMsgRecord(seq uint64, ts int64, subj string, hdr, msg // Lock should be held. func (fs *fileStore) writeTombstone(seq uint64, ts int64) error { // Grab our current last message block. - lmb := fs.lmb - var err error + lmb, err := fs.checkLastBlock(emptyRecordLen) + if err != nil { + return err + } + return lmb.writeTombstone(seq, ts) +} - if lmb == nil || lmb.blkSize()+emptyRecordLen > fs.fcfg.BlockSize { - if lmb != nil && fs.fcfg.Compression != NoCompression { - // We've now reached the end of this message block, if we want - // to compress blocks then now's the time to do it. - go lmb.recompressOnDiskIfNeeded() - } - if lmb, err = fs.newMsgBlockForWrite(); err != nil { - return err - } +// For writing tombstones to our lmb. This version will enforce maximum block sizes. +// This version does not flush contents. +// Lock should be held. +func (fs *fileStore) writeTombstoneNoFlush(seq uint64, ts int64) error { + // Grab our current last message block. + olmb := fs.lmb + lmb, err := fs.checkLastBlock(emptyRecordLen) + if err != nil { + return err } + // If we swapped out our lmb, flush any pending. + if olmb != lmb { + olmb.flushPendingMsgs() + } + // Write tombstone without flush or kick. return lmb.writeTombstone(seq, ts) } @@ -6434,12 +6553,13 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error { popFss = true } // Mark fss activity. - mb.lsts = time.Now().UnixNano() + mb.lsts = getAccessTime() mb.ttls = 0 lbuf := uint32(len(buf)) var seq, ttls uint64 var sm StoreMsg // Used for finding TTL headers + for index < lbuf { if index+msgHdrSize > lbuf { return errCorruptState @@ -6516,8 +6636,8 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error { // Count how many TTLs we think are in this message block. // TODO(nat): Not terribly optimal... if hasHeaders { - if fsm, err := mb.msgFromBuf(buf[index:], &sm, nil); err == nil && fsm != nil { - if _, err = getMessageTTL(fsm.hdr); err == nil && len(fsm.hdr) > 0 { + if fsm, err := mb.msgFromBufNoCopy(buf[index:], &sm, nil); err == nil && fsm != nil { + if ttl := getHeader(JSMessageTTL, fsm.hdr); len(ttl) > 0 { ttls++ } } @@ -6659,7 +6779,7 @@ func (mb *msgBlock) flushPendingMsgsLocked() (*LostStreamData, error) { // Decide what we want to do with the buffer in hand. If we have load interest // we will hold onto the whole thing, otherwise empty the buffer, possibly reusing it. - if ts := time.Now().UnixNano(); ts < mb.llts || (ts-mb.llts) <= int64(mb.cexp) { + if ts := getAccessTime(); ts < mb.llts || (ts-mb.llts) <= int64(mb.cexp) { mb.cache.wp += lob } else { if cap(mb.cache.buf) <= maxBufReuse { @@ -6822,7 +6942,7 @@ checkCache: return nil } - mb.llts = time.Now().UnixNano() + mb.llts = getAccessTime() // FIXME(dlc) - We could be smarter here. if buf, _ := mb.bytesPending(); len(buf) > 0 { @@ -6896,7 +7016,24 @@ checkCache: // Fetch a message from this block, possibly reading in and caching the messages. // We assume the block was selected and is correct, so we do not do range checks. +// Lock should not be held. func (mb *msgBlock) fetchMsg(seq uint64, sm *StoreMsg) (*StoreMsg, bool, error) { + return mb.fetchMsgEx(seq, sm, true) +} + +// Fetch a message from this block, possibly reading in and caching the messages. +// We assume the block was selected and is correct, so we do not do range checks. +// We will not copy the msg data. +// Lock should not be held. +func (mb *msgBlock) fetchMsgNoCopy(seq uint64, sm *StoreMsg) (*StoreMsg, bool, error) { + return mb.fetchMsgEx(seq, sm, false) +} + +// Fetch a message from this block, possibly reading in and caching the messages. +// We assume the block was selected and is correct, so we do not do range checks. +// We will copy the msg data based on doCopy boolean. +// Lock should not be held. +func (mb *msgBlock) fetchMsgEx(seq uint64, sm *StoreMsg, doCopy bool) (*StoreMsg, bool, error) { mb.mu.Lock() defer mb.mu.Unlock() @@ -6923,7 +7060,7 @@ func (mb *msgBlock) fetchMsg(seq uint64, sm *StoreMsg) (*StoreMsg, bool, error) } llseq := mb.llseq - fsm, err := mb.cacheLookup(seq, sm) + fsm, err := mb.cacheLookupEx(seq, sm, doCopy) if err != nil { return nil, false, err } @@ -6965,8 +7102,22 @@ const ( ) // Will do a lookup from cache. +// This will copy the msg from the cache. // Lock should be held. func (mb *msgBlock) cacheLookup(seq uint64, sm *StoreMsg) (*StoreMsg, error) { + return mb.cacheLookupEx(seq, sm, true) +} + +// Will do a lookup from cache. +// This will NOT copy the msg from the cache. +// Lock should be held. +func (mb *msgBlock) cacheLookupNoCopy(seq uint64, sm *StoreMsg) (*StoreMsg, error) { + return mb.cacheLookupEx(seq, sm, false) +} + +// Will do a lookup from cache. +// Lock should be held. +func (mb *msgBlock) cacheLookupEx(seq uint64, sm *StoreMsg, doCopy bool) (*StoreMsg, error) { if seq < atomic.LoadUint64(&mb.first.seq) || seq > atomic.LoadUint64(&mb.last.seq) { return nil, ErrStoreMsgNotFound } @@ -6980,7 +7131,7 @@ func (mb *msgBlock) cacheLookup(seq uint64, sm *StoreMsg) (*StoreMsg, error) { // If we have a delete map check it. if mb.dmap.Exists(seq) { - mb.llts = time.Now().UnixNano() + mb.llts = getAccessTime() return nil, errDeletedMsg } @@ -7011,7 +7162,7 @@ func (mb *msgBlock) cacheLookup(seq uint64, sm *StoreMsg) (*StoreMsg, error) { } // Update cache activity. - mb.llts = time.Now().UnixNano() + mb.llts = getAccessTime() li := int(bi) - mb.cache.off if li >= len(mb.cache.buf) { @@ -7026,7 +7177,7 @@ func (mb *msgBlock) cacheLookup(seq uint64, sm *StoreMsg) (*StoreMsg, error) { } // Parse from the raw buffer. - fsm, err := mb.msgFromBuf(buf, sm, hh) + fsm, err := mb.msgFromBufEx(buf, sm, hh, doCopy) if err != nil || fsm == nil { return nil, err } @@ -7059,7 +7210,7 @@ func (fs *fileStore) sizeForSeq(seq uint64) int { } var smv StoreMsg if mb := fs.selectMsgBlock(seq); mb != nil { - if sm, _, _ := mb.fetchMsg(seq, &smv); sm != nil { + if sm, _, _ := mb.fetchMsgNoCopy(seq, &smv); sm != nil { return int(fileStoreMsgSize(sm.subj, sm.hdr, sm.msg)) } } @@ -7067,6 +7218,7 @@ func (fs *fileStore) sizeForSeq(seq uint64) int { } // Will return message for the given sequence number. +// This will be returned to external callers. func (fs *fileStore) msgForSeq(seq uint64, sm *StoreMsg) (*StoreMsg, error) { return fs.msgForSeqLocked(seq, sm, true) } @@ -7117,8 +7269,24 @@ func (fs *fileStore) msgForSeqLocked(seq uint64, sm *StoreMsg, needFSLock bool) } // Internal function to return msg parts from a raw buffer. +// Raw buffer will be copied into sm. // Lock should be held. func (mb *msgBlock) msgFromBuf(buf []byte, sm *StoreMsg, hh hash.Hash64) (*StoreMsg, error) { + return mb.msgFromBufEx(buf, sm, hh, true) +} + +// Internal function to return msg parts from a raw buffer. +// Raw buffer will NOT be copied into sm. +// Only use for internal use, any message that is passed to upper layers should use mb.msgFromBuf. +// Lock should be held. +func (mb *msgBlock) msgFromBufNoCopy(buf []byte, sm *StoreMsg, hh hash.Hash64) (*StoreMsg, error) { + return mb.msgFromBufEx(buf, sm, hh, false) +} + +// Internal function to return msg parts from a raw buffer. +// copy boolean will determine if we make a copy or not. +// Lock should be held. +func (mb *msgBlock) msgFromBufEx(buf []byte, sm *StoreMsg, hh hash.Hash64, doCopy bool) (*StoreMsg, error) { if len(buf) < emptyRecordLen { return nil, errBadMsg } @@ -7169,18 +7337,30 @@ func (mb *msgBlock) msgFromBuf(buf []byte, sm *StoreMsg, hh hash.Hash64) (*Store hl := le.Uint32(data[slen:]) bi := slen + 4 li := bi + int(hl) - sm.buf = append(sm.buf, data[bi:end]...) + if doCopy { + sm.buf = append(sm.buf, data[bi:end]...) + } else { + sm.buf = data[bi:end] + } li, end = li-bi, end-bi sm.hdr = sm.buf[0:li:li] sm.msg = sm.buf[li:end] } else { - sm.buf = append(sm.buf, data[slen:end]...) + if doCopy { + sm.buf = append(sm.buf, data[slen:end]...) + } else { + sm.buf = data[slen:end] + } sm.msg = sm.buf[0 : end-slen] } sm.seq, sm.ts = seq, ts if slen > 0 { - // Make a copy since sm.subj lifetime may last longer. - sm.subj = string(data[:slen]) + if doCopy { + // Make a copy since sm.subj lifetime may last longer. + sm.subj = string(data[:slen]) + } else { + sm.subj = bytesToString(data[:slen]) + } } return sm, nil @@ -7243,7 +7423,7 @@ func (fs *fileStore) loadLast(subj string, sm *StoreMsg) (lsm *StoreMsg, err err return nil, err } // Mark fss activity. - mb.lsts = time.Now().UnixNano() + mb.lsts = getAccessTime() var l uint64 // Optimize if subject is not a wildcard. @@ -7781,6 +7961,7 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint var smv StoreMsg var tombs []msgId + var lowSeq uint64 fs.mu.Lock() // We may remove blocks as we purge, so don't range directly on fs.blks @@ -7812,8 +7993,8 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint shouldExpire = true } - for seq := f; seq <= l; seq++ { - if sm, _ := mb.cacheLookup(seq, &smv); sm != nil && eq(sm.subj, subject) { + for seq, te := f, len(tombs); seq <= l; seq++ { + if sm, _ := mb.cacheLookupNoCopy(seq, &smv); sm != nil && eq(sm.subj, subject) { rl := fileStoreMsgSize(sm.subj, sm.hdr, sm.msg) // Do fast in place remove. // Stats @@ -7837,13 +8018,19 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint // PSIM and FSS updates. mb.removeSeqPerSubject(sm.subj, seq) fs.removePerSubject(sm.subj) + // Track tombstones we need to write. tombs = append(tombs, msgId{sm.seq, sm.ts}) + if sm.seq < lowSeq || lowSeq == 0 { + lowSeq = sm.seq + } // Check for first message. if seq == atomic.LoadUint64(&mb.first.seq) { mb.selectNextFirst() if mb.isEmpty() { + // Since we are removing this block don't need to write tombstones. + tombs = tombs[:te] fs.removeMsgBlock(mb) i-- // keep flag set, if set previously @@ -7856,14 +8043,15 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint // Out of order delete. mb.dmap.Insert(seq) } - - if maxp > 0 && purged >= maxp { + // Break if we have emptied this block or if we set a maximum purge count. + if mb.isEmpty() || (maxp > 0 && purged >= maxp) { break } } } - // Expire if we were responsible for loading. - if shouldExpire { + // Expire if we were responsible for loading and we do not seem to be doing successive purgeEx calls. + // On successive calls - most likely from KV purge deletes, we want to keep the data loaded. + if shouldExpire && time.Since(fs.lpex) > time.Second { // Expire this cache before moving on. mb.tryForceExpireCacheLocked() } @@ -7878,18 +8066,33 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint fs.selectNextFirst() } + // Update the last purgeEx call time. + defer func() { fs.lpex = time.Now() }() + // Write any tombstones as needed. - for _, tomb := range tombs { - fs.writeTombstone(tomb.seq, tomb.ts) + // When writing multiple tombstones we will flush at the end. + if len(tombs) > 0 { + for _, tomb := range tombs { + if err := fs.writeTombstoneNoFlush(tomb.seq, tomb.ts); err != nil { + return purged, err + } + } + if lmb := fs.lmb; lmb != nil { + lmb.flushPendingMsgs() + } } - os.Remove(filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile)) fs.dirty++ cb := fs.scb fs.mu.Unlock() if cb != nil { - cb(-int64(purged), -int64(bytes), 0, _EMPTY_) + if purged == 1 { + cb(-int64(purged), -int64(bytes), lowSeq, subject) + } else { + // FIXME(dlc) - Since we track lowSeq we could send to upper layer if they dealt with the condition properly. + cb(-int64(purged), -int64(bytes), 0, _EMPTY_) + } } return purged, nil @@ -8064,7 +8267,7 @@ func (fs *fileStore) compact(seq uint64) (uint64, error) { } } for mseq := atomic.LoadUint64(&smb.first.seq); mseq < seq; mseq++ { - sm, err := smb.cacheLookup(mseq, &smv) + sm, err := smb.cacheLookupNoCopy(mseq, &smv) if err == errDeletedMsg { // Update dmap. if !smb.dmap.IsEmpty() { @@ -8331,7 +8534,7 @@ func (fs *fileStore) Truncate(seq uint64) error { fs.mu.Unlock() return ErrInvalidSequence } - lsm, _, _ := nlmb.fetchMsg(seq, nil) + lsm, _, _ := nlmb.fetchMsgNoCopy(seq, nil) if lsm == nil { fs.mu.Unlock() return ErrInvalidSequence @@ -8722,7 +8925,7 @@ func (mb *msgBlock) generatePerSubjectInfo() error { // It gets set later on if the fss is non-empty anyway. continue } - sm, err := mb.cacheLookup(seq, &smv) + sm, err := mb.cacheLookupNoCopy(seq, &smv) if err != nil { // Since we are walking by sequence we can ignore some errors that are benign to rebuilding our state. if err == ErrStoreMsgNotFound || err == errDeletedMsg { @@ -8746,9 +8949,9 @@ func (mb *msgBlock) generatePerSubjectInfo() error { if mb.fss.Size() > 0 { // Make sure we run the cache expire timer. - mb.llts = time.Now().UnixNano() - // Mark fss activity. - mb.lsts = time.Now().UnixNano() + mb.llts = getAccessTime() + // Mark fss activity same as load time. + mb.lsts = mb.llts mb.startCacheExpireTimer() } return nil @@ -8760,7 +8963,7 @@ func (mb *msgBlock) ensurePerSubjectInfoLoaded() error { if mb.fss != nil || mb.noTrack { if mb.fss != nil { // Mark fss activity. - mb.lsts = time.Now().UnixNano() + mb.lsts = getAccessTime() } return nil } @@ -10906,3 +11109,27 @@ func writeFileWithSync(name string, data []byte, perm fs.FileMode) error { } return f.Close() } + +// This is to offload UnixNano() processing from timestamp creation for cache management. +var ( + tsOnce sync.Once + accessTime atomic.Int64 +) + +// Update every 100ms. +const accessTimeTickInterval = 100 * time.Millisecond + +// Will load the access time from an atomic. We will also setup the Go routine +// to update this in one place. +func getAccessTime() int64 { + tsOnce.Do(func() { + accessTime.Store(time.Now().UnixNano()) + go func() { + ticker := time.NewTicker(accessTimeTickInterval) + for range ticker.C { + accessTime.Store(time.Now().UnixNano()) + } + }() + }) + return accessTime.Load() +} diff --git a/server/filestore_test.go b/server/filestore_test.go index 22a37d95c40..60fee412dd1 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -33,6 +33,7 @@ import ( "os" "path/filepath" "reflect" + "slices" "strconv" "strings" "sync" @@ -3666,6 +3667,9 @@ func TestFileStorePurgeExWithSubject(t *testing.T) { _, _, err = fs.StoreMsg("foo.2", nil, []byte("xxxxxx"), 0) require_NoError(t, err) + // Make sure we have our state file prior to Purge call. + fs.forceWriteFullState() + // This should purge all "foo.1" p, err := fs.PurgeEx("foo.1", 1, 0) require_NoError(t, err) @@ -3675,6 +3679,12 @@ func TestFileStorePurgeExWithSubject(t *testing.T) { require_Equal(t, state.Msgs, 2) require_Equal(t, state.FirstSeq, 1) + // Capture the current index.db file if it exists. + sfile := filepath.Join(fcfg.StoreDir, msgDir, streamStreamStateFile) + buf, err := os.ReadFile(sfile) + require_NoError(t, err) + require_True(t, len(buf) > 0) + // Make sure we can recover same state. fs.Stop() fs, err = newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil) @@ -3698,6 +3708,79 @@ func TestFileStorePurgeExWithSubject(t *testing.T) { if state := fs.State(); !reflect.DeepEqual(state, before) { t.Fatalf("Expected state of %+v, got %+v without index.db state", before, state) } + + // If we had an index.db from after PurgeEx but before Stop() would rewrite, make sure we + // properly can recover with the old index file. This would be a crash after the PurgeEx() call. + fs.Stop() + err = os.WriteFile(sfile, buf, defaultFilePerms) + require_NoError(t, err) + + fs, err = newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() + + if state := fs.State(); !reflect.DeepEqual(state, before) { + t.Fatalf("Expected state of %+v, got %+v with old index.db state", before, state) + } + }) +} + +func TestFileStorePurgeExNoTombsOnBlockRemoval(t *testing.T) { + testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) { + fcfg.BlockSize = 1000 + cfg := StreamConfig{Name: "TEST", Subjects: []string{"foo.>"}, Storage: FileStorage} + created := time.Now() + fs, err := newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() + + payload := make([]byte, 20) + + total := 100 + for i := 0; i < total; i++ { + _, _, err = fs.StoreMsg("foo.1", nil, payload, 0) + require_NoError(t, err) + } + _, _, err = fs.StoreMsg("foo.2", nil, payload, 0) + require_NoError(t, err) + + require_Equal(t, fs.numMsgBlocks(), 6) + + // Make sure we have our state file prior to Purge call. + fs.forceWriteFullState() + + // Capture the current index.db file if it exists. + sfile := filepath.Join(fcfg.StoreDir, msgDir, streamStreamStateFile) + buf, err := os.ReadFile(sfile) + require_NoError(t, err) + require_True(t, len(buf) > 0) + + // This should purge all "foo.1". This will remove the blocks so we want to make sure + // we do not write excessive tombstones here. + p, err := fs.PurgeEx("foo.1", 1, 0) + require_NoError(t, err) + require_Equal(t, p, uint64(total)) + + state := fs.State() + require_Equal(t, state.Msgs, 1) + require_Equal(t, state.FirstSeq, 101) + + // Check that we only have 1 msg block. + require_Equal(t, fs.numMsgBlocks(), 1) + + // Put the old index.db back. We want to make sure without the empty block tombstones that we + // properly recover state. + fs.Stop() + err = os.WriteFile(sfile, buf, defaultFilePerms) + require_NoError(t, err) + + fs, err = newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() + + state = fs.State() + require_Equal(t, state.Msgs, 1) + require_Equal(t, state.FirstSeq, 101) }) } @@ -4941,7 +5024,7 @@ func TestFileStoreSkipMsgAndNumBlocks(t *testing.T) { fs.SkipMsg() } fs.StoreMsg(subj, nil, msg, 0) - require_True(t, fs.numMsgBlocks() == 2) + require_Equal(t, fs.numMsgBlocks(), 3) } func TestFileStoreRestoreEncryptedWithNoKeyFuncFails(t *testing.T) { @@ -6457,7 +6540,7 @@ func TestFileStoreFSSMeta(t *testing.T) { fs.StoreMsg("A", nil, msg, 0) // Let cache's expire before PurgeEx which will load them back in. - time.Sleep(250 * time.Millisecond) + time.Sleep(500 * time.Millisecond) p, err := fs.PurgeEx("A", 1, 0) require_NoError(t, err) @@ -6508,7 +6591,7 @@ func TestFileStoreExpireCacheOnLinearWalk(t *testing.T) { } // Let them all expire. This way we load as we walk and can test that we expire all blocks without // needing to worry about last write times blocking forced expiration. - time.Sleep(expire) + time.Sleep(expire + accessTimeTickInterval) checkNoCache := func() { t.Helper() @@ -6731,7 +6814,7 @@ func TestFileStoreEraseMsgWithAllTrailingDbitSlots(t *testing.T) { func TestFileStoreMultiLastSeqs(t *testing.T) { fs, err := newFileStore( FileStoreConfig{StoreDir: t.TempDir(), BlockSize: 256}, // Make block size small to test multiblock selections with maxSeq - StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage}) + StreamConfig{Name: "zzz", Subjects: []string{"foo.*", "bar.*"}, Storage: FileStorage}) require_NoError(t, err) defer fs.Stop() @@ -9443,3 +9526,32 @@ func TestFileStoreRemoveMsgBlockLast(t *testing.T) { _, err = os.Stat(ofn) require_True(t, os.IsNotExist(err)) } + +func TestFileStoreAllLastSeqs(t *testing.T) { + fs, err := newFileStore( + FileStoreConfig{StoreDir: t.TempDir()}, // Make block size small to test multiblock selections with maxSeq + StreamConfig{Name: "zzz", Subjects: []string{"*.*"}, MaxMsgsPer: 50, Storage: FileStorage}) + require_NoError(t, err) + defer fs.Stop() + + subjs := []string{"foo.foo", "foo.bar", "foo.baz", "bar.foo", "bar.bar", "bar.baz"} + msg := []byte("abc") + + for i := 0; i < 100_000; i++ { + subj := subjs[rand.Intn(len(subjs))] + fs.StoreMsg(subj, nil, msg, 0) + } + + expected := make([]uint64, 0, len(subjs)) + var smv StoreMsg + for _, subj := range subjs { + sm, err := fs.LoadLastMsg(subj, &smv) + require_NoError(t, err) + expected = append(expected, sm.seq) + } + slices.Sort(expected) + + seqs, err := fs.AllLastSeqs() + require_NoError(t, err) + require_True(t, reflect.DeepEqual(seqs, expected)) +} diff --git a/server/memstore.go b/server/memstore.go index 884cb139280..24b9400339b 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -637,6 +637,27 @@ func (ms *memStore) SubjectsState(subject string) map[string]SimpleState { return fss } +// AllLastSeqs will return a sorted list of last sequences for all subjects. +func (ms *memStore) AllLastSeqs() ([]uint64, error) { + ms.mu.RLock() + defer ms.mu.RUnlock() + + if len(ms.msgs) == 0 { + return nil, nil + } + + seqs := make([]uint64, 0, ms.fss.Size()) + ms.fss.IterFast(func(subj []byte, ss *SimpleState) bool { + seqs = append(seqs, ss.Last) + return true + }) + + slices.Sort(seqs) + return seqs, nil +} + +// MultiLastSeqs will return a sorted list of sequences that match all subjects presented in filters. +// We will not exceed the maxSeq, which if 0 becomes the store's last sequence. func (ms *memStore) MultiLastSeqs(filters []string, maxSeq uint64, maxAllowed int) ([]uint64, error) { ms.mu.RLock() defer ms.mu.RUnlock() @@ -650,7 +671,6 @@ func (ms *memStore) MultiLastSeqs(filters []string, maxSeq uint64, maxAllowed in maxSeq = ms.state.LastSeq } - //subs := make(map[string]*SimpleState) seqs := make([]uint64, 0, 64) seen := make(map[uint64]struct{}) diff --git a/server/memstore_test.go b/server/memstore_test.go index 1fa04165fc1..c8aa91cae6d 100644 --- a/server/memstore_test.go +++ b/server/memstore_test.go @@ -21,6 +21,7 @@ import ( "fmt" "math/rand" "reflect" + "slices" "testing" "time" @@ -1258,6 +1259,195 @@ func TestMemStoreSubjectDeleteMarkers(t *testing.T) { require_Equal(t, bytesToString(getHeader(JSMessageTTL, im.hdr)), "1s") } +func TestMemStoreSubjectDeleteMarkersOnPurge(t *testing.T) { + t.SkipNow() + + ms, err := newMemStore( + &StreamConfig{ + Name: "zzz", Subjects: []string{"test.*"}, Storage: MemoryStorage, + MaxAge: time.Second, AllowMsgTTL: true, + SubjectDeleteMarkerTTL: time.Second, + }, + ) + require_NoError(t, err) + defer ms.Stop() + + for i := 0; i < 10; i++ { + _, _, err := ms.StoreMsg(fmt.Sprintf("test.%d", i), nil, nil, 0) + require_NoError(t, err) + } + + _, err = ms.Purge() + require_NoError(t, err) + + for i := uint64(0); i < 10; i++ { + sm, err := ms.LoadMsg(11+i, nil) + require_NoError(t, err) + require_Equal(t, sm.subj, fmt.Sprintf("test.%d", i)) + require_Equal(t, bytesToString(getHeader(JSMarkerReason, sm.hdr)), JSMarkerReasonPurge) + require_Equal(t, bytesToString(getHeader(JSMessageTTL, sm.hdr)), "1s") + } +} + +func TestMemStoreSubjectDeleteMarkersOnPurgeEx(t *testing.T) { + t.SkipNow() + + ms, err := newMemStore( + &StreamConfig{ + Name: "zzz", Subjects: []string{"test.*"}, Storage: MemoryStorage, + MaxAge: time.Second, AllowMsgTTL: true, + SubjectDeleteMarkerTTL: time.Second, + }, + ) + require_NoError(t, err) + defer ms.Stop() + + for i := 0; i < 10; i++ { + _, _, err := ms.StoreMsg(fmt.Sprintf("test.%d", i), nil, nil, 0) + require_NoError(t, err) + } + + _, err = ms.PurgeEx("test.*", 1, 0) + require_NoError(t, err) + + for i := uint64(0); i < 10; i++ { + sm, err := ms.LoadMsg(11+i, nil) + require_NoError(t, err) + require_Equal(t, sm.subj, fmt.Sprintf("test.%d", i)) + require_Equal(t, bytesToString(getHeader(JSMarkerReason, sm.hdr)), JSMarkerReasonPurge) + require_Equal(t, bytesToString(getHeader(JSMessageTTL, sm.hdr)), "1s") + } +} + +func TestMemStoreSubjectDeleteMarkersOnPurgeExNoMarkers(t *testing.T) { + t.SkipNow() + + ms, err := newMemStore( + &StreamConfig{ + Name: "zzz", Subjects: []string{"test.*"}, Storage: MemoryStorage, + MaxAge: time.Second, AllowMsgTTL: true, + SubjectDeleteMarkerTTL: time.Second, + }, + ) + require_NoError(t, err) + defer ms.Stop() + + for i := 0; i < 10; i++ { + _, _, err := ms.StoreMsg(fmt.Sprintf("test.%d", i), nil, nil, 0) + require_NoError(t, err) + } + + _, err = ms.PurgeEx("test.*", 1, 0) + require_NoError(t, err) + + for i := uint64(0); i < 10; i++ { + _, err := ms.LoadMsg(11+i, nil) + require_Error(t, err) + } +} + +func TestMemStoreSubjectDeleteMarkersOnCompact(t *testing.T) { + t.SkipNow() + + ms, err := newMemStore( + &StreamConfig{ + Name: "zzz", Subjects: []string{"test.*"}, Storage: MemoryStorage, + MaxAge: time.Second, AllowMsgTTL: true, + SubjectDeleteMarkerTTL: time.Second, + }, + ) + require_NoError(t, err) + defer ms.Stop() + + for i := 0; i < 10; i++ { + _, _, err := ms.StoreMsg(fmt.Sprintf("test.%d", i), nil, nil, 0) + require_NoError(t, err) + } + + _, err = ms.Compact(6) + require_NoError(t, err) + + for i := uint64(6); i <= 15; i++ { + sm, err := ms.LoadMsg(i, nil) + require_NoError(t, err) + if i <= 10 { + require_Equal(t, sm.subj, fmt.Sprintf("test.%d", i-1)) + require_Equal(t, bytesToString(getHeader(JSMarkerReason, sm.hdr)), _EMPTY_) + require_Equal(t, bytesToString(getHeader(JSMessageTTL, sm.hdr)), _EMPTY_) + } else { + require_Equal(t, sm.subj, fmt.Sprintf("test.%d", 15-i)) + require_Equal(t, bytesToString(getHeader(JSMarkerReason, sm.hdr)), JSMarkerReasonPurge) + require_Equal(t, bytesToString(getHeader(JSMessageTTL, sm.hdr)), "1s") + } + } +} + +func TestMemStoreSubjectDeleteMarkersOnRemoveMsg(t *testing.T) { + t.SkipNow() + + ms, err := newMemStore( + &StreamConfig{ + Name: "zzz", Subjects: []string{"test"}, Storage: MemoryStorage, + MaxAge: time.Second, AllowMsgTTL: true, + SubjectDeleteMarkerTTL: time.Second, + }, + ) + require_NoError(t, err) + defer ms.Stop() + + _, _, err = ms.StoreMsg("test", nil, nil, 0) + require_NoError(t, err) + + _, err = ms.RemoveMsg(1) + require_NoError(t, err) + + sm, err := ms.LoadMsg(2, nil) + require_NoError(t, err) + require_Equal(t, sm.subj, "test") + require_Equal(t, bytesToString(getHeader(JSMarkerReason, sm.hdr)), JSMarkerReasonRemove) + require_Equal(t, bytesToString(getHeader(JSMessageTTL, sm.hdr)), "1s") + + _, err = ms.RemoveMsg(2) + require_NoError(t, err) + + // The deleted subject marker at seq 2 should not have been replaced. + _, err = ms.LoadMsg(3, nil) + require_Error(t, err) +} + +func TestMemStoreAllLastSeqs(t *testing.T) { + cfg := &StreamConfig{ + Name: "zzz", + Subjects: []string{"*.*"}, + MaxMsgsPer: 50, + Storage: MemoryStorage, + } + ms, err := newMemStore(cfg) + require_NoError(t, err) + defer ms.Stop() + + subjs := []string{"foo.foo", "foo.bar", "foo.baz", "bar.foo", "bar.bar", "bar.baz"} + msg := []byte("abc") + + for i := 0; i < 100_000; i++ { + subj := subjs[rand.Intn(len(subjs))] + ms.StoreMsg(subj, nil, msg, 0) + } + + expected := make([]uint64, 0, len(subjs)) + var smv StoreMsg + for _, subj := range subjs { + sm, err := ms.LoadLastMsg(subj, &smv) + require_NoError(t, err) + expected = append(expected, sm.seq) + } + slices.Sort(expected) + + seqs, err := ms.AllLastSeqs() + require_NoError(t, err) + require_True(t, reflect.DeepEqual(seqs, expected)) +} + /////////////////////////////////////////////////////////////////////////// // Benchmarks /////////////////////////////////////////////////////////////////////////// diff --git a/server/store.go b/server/store.go index ec7c6e8e4a4..308d0b08ee2 100644 --- a/server/store.go +++ b/server/store.go @@ -110,6 +110,7 @@ type StreamStore interface { FilteredState(seq uint64, subject string) SimpleState SubjectsState(filterSubject string) map[string]SimpleState SubjectsTotals(filterSubject string) map[string]uint64 + AllLastSeqs() ([]uint64, error) MultiLastSeqs(filters []string, maxSeq uint64, maxAllowed int) ([]uint64, error) NumPending(sseq uint64, filter string, lastPerSubject bool) (total, validThrough uint64) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bool) (total, validThrough uint64) From ce76f67415c162e9a9cecc18d6cb088ca55e8acd Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Thu, 17 Apr 2025 10:17:45 +0100 Subject: [PATCH 5/6] Reduce allocations in `MultiLastSeqs` and `indexCacheBuf` Signed-off-by: Neil Twigg --- server/filestore.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 89dc68614b3..78a2c9345a5 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -3176,7 +3176,7 @@ func (fs *fileStore) MultiLastSeqs(filters []string, maxSeq uint64, maxAllowed i // If we are equal or below just add to seqs slice. if ss.Last <= maxSeq { seqs = append(seqs, ss.Last) - delete(subs, string(bsubj)) + delete(subs, bytesToString(bsubj)) } else { // Need to search for the real last since recorded last is > maxSeq. if mb.cacheNotLoaded() { @@ -3184,10 +3184,7 @@ func (fs *fileStore) MultiLastSeqs(filters []string, maxSeq uint64, maxAllowed i } var smv StoreMsg fseq := atomic.LoadUint64(&mb.first.seq) - lseq := atomic.LoadUint64(&mb.last.seq) - if lseq > maxSeq { - lseq = maxSeq - } + lseq := min(atomic.LoadUint64(&mb.last.seq), maxSeq) ssubj := bytesToString(bsubj) for seq := lseq; seq >= fseq; seq-- { sm, _ := mb.cacheLookupNoCopy(seq, &smv) @@ -6637,7 +6634,7 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error { // TODO(nat): Not terribly optimal... if hasHeaders { if fsm, err := mb.msgFromBufNoCopy(buf[index:], &sm, nil); err == nil && fsm != nil { - if ttl := getHeader(JSMessageTTL, fsm.hdr); len(ttl) > 0 { + if ttl := sliceHeader(JSMessageTTL, fsm.hdr); len(ttl) > 0 { ttls++ } } From 852bfc7863d98c8b7339ae896c901353ba82fa91 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 17 Apr 2025 06:26:16 -0400 Subject: [PATCH 6/6] Updates based on PR feedback Signed-off-by: Derek Collison --- server/filestore_test.go | 12 +-- server/memstore.go | 22 ++++++ server/memstore_test.go | 158 +-------------------------------------- 3 files changed, 29 insertions(+), 163 deletions(-) diff --git a/server/filestore_test.go b/server/filestore_test.go index 60fee412dd1..a08654c305c 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -3670,6 +3670,12 @@ func TestFileStorePurgeExWithSubject(t *testing.T) { // Make sure we have our state file prior to Purge call. fs.forceWriteFullState() + // Capture the current index.db file. + sfile := filepath.Join(fcfg.StoreDir, msgDir, streamStreamStateFile) + buf, err := os.ReadFile(sfile) + require_NoError(t, err) + require_True(t, len(buf) > 0) + // This should purge all "foo.1" p, err := fs.PurgeEx("foo.1", 1, 0) require_NoError(t, err) @@ -3679,12 +3685,6 @@ func TestFileStorePurgeExWithSubject(t *testing.T) { require_Equal(t, state.Msgs, 2) require_Equal(t, state.FirstSeq, 1) - // Capture the current index.db file if it exists. - sfile := filepath.Join(fcfg.StoreDir, msgDir, streamStreamStateFile) - buf, err := os.ReadFile(sfile) - require_NoError(t, err) - require_True(t, len(buf) > 0) - // Make sure we can recover same state. fs.Stop() fs, err = newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil) diff --git a/server/memstore.go b/server/memstore.go index 24b9400339b..708485981b1 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -656,6 +656,23 @@ func (ms *memStore) AllLastSeqs() ([]uint64, error) { return seqs, nil } +// Helper to determine if the filter(s) represent all the subjects. +// Most clients send in subjects even if they match the stream's ingest subjects. +// Lock should be held. +func (ms *memStore) filterIsAll(filters []string) bool { + if len(filters) != len(ms.cfg.Subjects) { + return false + } + // Sort so we can compare. + slices.Sort(filters) + for i, subj := range filters { + if !subjectIsSubsetMatch(ms.cfg.Subjects[i], subj) { + return false + } + } + return true +} + // MultiLastSeqs will return a sorted list of sequences that match all subjects presented in filters. // We will not exceed the maxSeq, which if 0 becomes the store's last sequence. func (ms *memStore) MultiLastSeqs(filters []string, maxSeq uint64, maxAllowed int) ([]uint64, error) { @@ -666,6 +683,11 @@ func (ms *memStore) MultiLastSeqs(filters []string, maxSeq uint64, maxAllowed in return nil, nil } + // See if we can short circuit if we think they are asking for all last sequences and have no maxSeq or maxAllowed set. + if maxSeq == 0 && maxAllowed <= 0 && ms.filterIsAll(filters) { + return ms.AllLastSeqs() + } + // Implied last sequence. if maxSeq == 0 { maxSeq = ms.state.LastSeq diff --git a/server/memstore_test.go b/server/memstore_test.go index c8aa91cae6d..77ab3a2c22c 100644 --- a/server/memstore_test.go +++ b/server/memstore_test.go @@ -921,7 +921,7 @@ func TestMemStoreSkipMsgs(t *testing.T) { func TestMemStoreMultiLastSeqs(t *testing.T) { cfg := &StreamConfig{ Name: "zzz", - Subjects: []string{"foo.*"}, + Subjects: []string{"foo.*", "bar.*"}, Storage: MemoryStorage, } ms, err := newMemStore(cfg) @@ -1259,162 +1259,6 @@ func TestMemStoreSubjectDeleteMarkers(t *testing.T) { require_Equal(t, bytesToString(getHeader(JSMessageTTL, im.hdr)), "1s") } -func TestMemStoreSubjectDeleteMarkersOnPurge(t *testing.T) { - t.SkipNow() - - ms, err := newMemStore( - &StreamConfig{ - Name: "zzz", Subjects: []string{"test.*"}, Storage: MemoryStorage, - MaxAge: time.Second, AllowMsgTTL: true, - SubjectDeleteMarkerTTL: time.Second, - }, - ) - require_NoError(t, err) - defer ms.Stop() - - for i := 0; i < 10; i++ { - _, _, err := ms.StoreMsg(fmt.Sprintf("test.%d", i), nil, nil, 0) - require_NoError(t, err) - } - - _, err = ms.Purge() - require_NoError(t, err) - - for i := uint64(0); i < 10; i++ { - sm, err := ms.LoadMsg(11+i, nil) - require_NoError(t, err) - require_Equal(t, sm.subj, fmt.Sprintf("test.%d", i)) - require_Equal(t, bytesToString(getHeader(JSMarkerReason, sm.hdr)), JSMarkerReasonPurge) - require_Equal(t, bytesToString(getHeader(JSMessageTTL, sm.hdr)), "1s") - } -} - -func TestMemStoreSubjectDeleteMarkersOnPurgeEx(t *testing.T) { - t.SkipNow() - - ms, err := newMemStore( - &StreamConfig{ - Name: "zzz", Subjects: []string{"test.*"}, Storage: MemoryStorage, - MaxAge: time.Second, AllowMsgTTL: true, - SubjectDeleteMarkerTTL: time.Second, - }, - ) - require_NoError(t, err) - defer ms.Stop() - - for i := 0; i < 10; i++ { - _, _, err := ms.StoreMsg(fmt.Sprintf("test.%d", i), nil, nil, 0) - require_NoError(t, err) - } - - _, err = ms.PurgeEx("test.*", 1, 0) - require_NoError(t, err) - - for i := uint64(0); i < 10; i++ { - sm, err := ms.LoadMsg(11+i, nil) - require_NoError(t, err) - require_Equal(t, sm.subj, fmt.Sprintf("test.%d", i)) - require_Equal(t, bytesToString(getHeader(JSMarkerReason, sm.hdr)), JSMarkerReasonPurge) - require_Equal(t, bytesToString(getHeader(JSMessageTTL, sm.hdr)), "1s") - } -} - -func TestMemStoreSubjectDeleteMarkersOnPurgeExNoMarkers(t *testing.T) { - t.SkipNow() - - ms, err := newMemStore( - &StreamConfig{ - Name: "zzz", Subjects: []string{"test.*"}, Storage: MemoryStorage, - MaxAge: time.Second, AllowMsgTTL: true, - SubjectDeleteMarkerTTL: time.Second, - }, - ) - require_NoError(t, err) - defer ms.Stop() - - for i := 0; i < 10; i++ { - _, _, err := ms.StoreMsg(fmt.Sprintf("test.%d", i), nil, nil, 0) - require_NoError(t, err) - } - - _, err = ms.PurgeEx("test.*", 1, 0) - require_NoError(t, err) - - for i := uint64(0); i < 10; i++ { - _, err := ms.LoadMsg(11+i, nil) - require_Error(t, err) - } -} - -func TestMemStoreSubjectDeleteMarkersOnCompact(t *testing.T) { - t.SkipNow() - - ms, err := newMemStore( - &StreamConfig{ - Name: "zzz", Subjects: []string{"test.*"}, Storage: MemoryStorage, - MaxAge: time.Second, AllowMsgTTL: true, - SubjectDeleteMarkerTTL: time.Second, - }, - ) - require_NoError(t, err) - defer ms.Stop() - - for i := 0; i < 10; i++ { - _, _, err := ms.StoreMsg(fmt.Sprintf("test.%d", i), nil, nil, 0) - require_NoError(t, err) - } - - _, err = ms.Compact(6) - require_NoError(t, err) - - for i := uint64(6); i <= 15; i++ { - sm, err := ms.LoadMsg(i, nil) - require_NoError(t, err) - if i <= 10 { - require_Equal(t, sm.subj, fmt.Sprintf("test.%d", i-1)) - require_Equal(t, bytesToString(getHeader(JSMarkerReason, sm.hdr)), _EMPTY_) - require_Equal(t, bytesToString(getHeader(JSMessageTTL, sm.hdr)), _EMPTY_) - } else { - require_Equal(t, sm.subj, fmt.Sprintf("test.%d", 15-i)) - require_Equal(t, bytesToString(getHeader(JSMarkerReason, sm.hdr)), JSMarkerReasonPurge) - require_Equal(t, bytesToString(getHeader(JSMessageTTL, sm.hdr)), "1s") - } - } -} - -func TestMemStoreSubjectDeleteMarkersOnRemoveMsg(t *testing.T) { - t.SkipNow() - - ms, err := newMemStore( - &StreamConfig{ - Name: "zzz", Subjects: []string{"test"}, Storage: MemoryStorage, - MaxAge: time.Second, AllowMsgTTL: true, - SubjectDeleteMarkerTTL: time.Second, - }, - ) - require_NoError(t, err) - defer ms.Stop() - - _, _, err = ms.StoreMsg("test", nil, nil, 0) - require_NoError(t, err) - - _, err = ms.RemoveMsg(1) - require_NoError(t, err) - - sm, err := ms.LoadMsg(2, nil) - require_NoError(t, err) - require_Equal(t, sm.subj, "test") - require_Equal(t, bytesToString(getHeader(JSMarkerReason, sm.hdr)), JSMarkerReasonRemove) - require_Equal(t, bytesToString(getHeader(JSMessageTTL, sm.hdr)), "1s") - - _, err = ms.RemoveMsg(2) - require_NoError(t, err) - - // The deleted subject marker at seq 2 should not have been replaced. - _, err = ms.LoadMsg(3, nil) - require_Error(t, err) -} - func TestMemStoreAllLastSeqs(t *testing.T) { cfg := &StreamConfig{ Name: "zzz",