diff --git a/server/filestore.go b/server/filestore.go index 78a2c9345a5..1a9e42465ae 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -4602,9 +4602,9 @@ func (fs *fileStore) EraseMsg(seq uint64) (bool, error) { // Convenience function to remove per subject tracking at the filestore level. // Lock should be held. -func (fs *fileStore) removePerSubject(subj string) { +func (fs *fileStore) removePerSubject(subj string) uint64 { if len(subj) == 0 || fs.psim == nil { - return + return 0 } // We do not update sense of fblk here but will do so when we resolve during lookup. bsubj := stringToBytes(subj) @@ -4615,10 +4615,12 @@ func (fs *fileStore) removePerSubject(subj string) { } else if info.total == 0 { if _, ok = fs.psim.Delete(bsubj); ok { fs.tsl -= len(subj) - return + return 0 } } + return info.total } + return 0 } // Remove a message, optionally rewriting the mb file. @@ -5836,6 +5838,14 @@ func (mb *msgBlock) writeTombstone(seq uint64, ts int64) error { return mb.writeMsgRecord(emptyRecordLen, seq|tbit, _EMPTY_, nil, nil, ts, true) } +// Helper function to place a delete tombstone without flush. +// Lock should not be held. +func (mb *msgBlock) writeTombstoneNoFlush(seq uint64, ts int64) error { + mb.mu.Lock() + defer mb.mu.Unlock() + return mb.writeMsgRecordLocked(emptyRecordLen, seq|tbit, _EMPTY_, nil, nil, ts, false, false) +} + // Will write the message record to the underlying message block. // filestore lock will be held. func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte, ts int64, flush bool) error { @@ -6075,10 +6085,13 @@ func (fs *fileStore) checkLastBlock(rl uint64) (lmb *msgBlock, err error) { lmb = fs.lmb rbytes := lmb.blkSize() if lmb == nil || (rbytes > 0 && rbytes+rl > fs.fcfg.BlockSize) { - if lmb != nil && fs.fcfg.Compression != NoCompression { - // We've now reached the end of this message block, if we want - // to compress blocks then now's the time to do it. - go lmb.recompressOnDiskIfNeeded() + if lmb != nil { + lmb.flushPendingMsgs() + if fs.fcfg.Compression != NoCompression { + // We've now reached the end of this message block, if we want + // to compress blocks then now's the time to do it. + go lmb.recompressOnDiskIfNeeded() + } } if lmb, err = fs.newMsgBlockForWrite(); err != nil { return nil, err @@ -6124,18 +6137,12 @@ func (fs *fileStore) writeTombstone(seq uint64, ts int64) error { // This version does not flush contents. // Lock should be held. func (fs *fileStore) writeTombstoneNoFlush(seq uint64, ts int64) error { - // Grab our current last message block. - olmb := fs.lmb lmb, err := fs.checkLastBlock(emptyRecordLen) if err != nil { return err } - // If we swapped out our lmb, flush any pending. - if olmb != lmb { - olmb.flushPendingMsgs() - } // Write tombstone without flush or kick. - return lmb.writeTombstone(seq, ts) + return lmb.writeTombstoneNoFlush(seq, ts) } func (mb *msgBlock) recompressOnDiskIfNeeded() error { @@ -7942,6 +7949,11 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint } } + // Make sure to not leave subject if empty and we reach this spot. + if subject == _EMPTY_ { + subject = fwcs + } + eq, wc := compareFn(subject), subjectHasWildcard(subject) var firstSeqNeedsUpdate bool var bytes uint64 @@ -7990,6 +8002,8 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint shouldExpire = true } + var nrg uint64 // Number of remaining messages globally after removal from psim. + for seq, te := f, len(tombs); seq <= l; seq++ { if sm, _ := mb.cacheLookupNoCopy(seq, &smv); sm != nil && eq(sm.subj, subject) { rl := fileStoreMsgSize(sm.subj, sm.hdr, sm.msg) @@ -8013,8 +8027,8 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint bytes += rl } // PSIM and FSS updates. - mb.removeSeqPerSubject(sm.subj, seq) - fs.removePerSubject(sm.subj) + nr := mb.removeSeqPerSubject(sm.subj, seq) + nrg = fs.removePerSubject(sm.subj) // Track tombstones we need to write. tombs = append(tombs, msgId{sm.seq, sm.ts}) @@ -8044,6 +8058,11 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint if mb.isEmpty() || (maxp > 0 && purged >= maxp) { break } + // Also break if we know we have no more messages matching here. + // This is only applicable for non-wildcarded filters. + if !wc && nr == 0 { + break + } } } // Expire if we were responsible for loading and we do not seem to be doing successive purgeEx calls. @@ -8058,6 +8077,10 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint if maxp > 0 && purged >= maxp { break } + // Also check if not wildcarded and we have no remaining matches. + if !wc && nrg == 0 { + break + } } if firstSeqNeedsUpdate { fs.selectNextFirst() @@ -8074,6 +8097,7 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint return purged, err } } + // Flush any pending. If we change blocks the checkLastBlock() will flush any pending for us. if lmb := fs.lmb; lmb != nil { lmb.flushPendingMsgs() } @@ -8727,21 +8751,21 @@ func (mb *msgBlock) dirtyCloseWithRemove(remove bool) error { // Remove a seq from the fss and select new first. // Lock should be held. -func (mb *msgBlock) removeSeqPerSubject(subj string, seq uint64) { +func (mb *msgBlock) removeSeqPerSubject(subj string, seq uint64) uint64 { mb.ensurePerSubjectInfoLoaded() if mb.fss == nil { - return + return 0 } bsubj := stringToBytes(subj) ss, ok := mb.fss.Find(bsubj) if !ok || ss == nil { - return + return 0 } mb.fs.sdm.removeSeqAndSubject(seq, subj) if ss.Msgs == 1 { mb.fss.Delete(bsubj) - return + return 0 } ss.Msgs-- @@ -8751,18 +8775,20 @@ func (mb *msgBlock) removeSeqPerSubject(subj string, seq uint64) { if !ss.lastNeedsUpdate && seq != ss.Last { ss.First = ss.Last ss.firstNeedsUpdate = false - return + return 1 } if !ss.firstNeedsUpdate && seq != ss.First { ss.Last = ss.First ss.lastNeedsUpdate = false - return + return 1 } } // We can lazily calculate the first/last sequence when needed. ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate ss.lastNeedsUpdate = seq == ss.Last || ss.lastNeedsUpdate + + return ss.Msgs } // Will recalculate the first and/or last sequence for this subject in this block. diff --git a/server/filestore_test.go b/server/filestore_test.go index a08654c305c..74665610afd 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -9529,7 +9529,7 @@ func TestFileStoreRemoveMsgBlockLast(t *testing.T) { func TestFileStoreAllLastSeqs(t *testing.T) { fs, err := newFileStore( - FileStoreConfig{StoreDir: t.TempDir()}, // Make block size small to test multiblock selections with maxSeq + FileStoreConfig{StoreDir: t.TempDir()}, StreamConfig{Name: "zzz", Subjects: []string{"*.*"}, MaxMsgsPer: 50, Storage: FileStorage}) require_NoError(t, err) defer fs.Stop() diff --git a/server/norace_2_test.go b/server/norace_2_test.go index 12b0c3baf07..ccafb450a3c 100644 --- a/server/norace_2_test.go +++ b/server/norace_2_test.go @@ -3352,3 +3352,46 @@ func TestNoRaceJetStreamClusterConsumerDeleteInterestPolicyUniqueFiltersPerf(t * } expectedStreamMsgs(0) } + +func TestNoRaceFileStorePurgeExAsyncTombstones(t *testing.T) { + fs, err := newFileStore( + FileStoreConfig{StoreDir: t.TempDir()}, + StreamConfig{Name: "zzz", Subjects: []string{"*.*"}, Storage: FileStorage}) + require_NoError(t, err) + defer fs.Stop() + + msg := []byte("zzz") + + fs.StoreMsg("foo.A", nil, msg, 0) + fs.StoreMsg("foo.B", nil, msg, 0) + for i := 0; i < 500; i++ { + fs.StoreMsg("foo.C", nil, msg, 0) + } + fs.StoreMsg("foo.D", nil, msg, 0) + + // Load all blocks to avoid that being a factor in timing. + fs.mu.RLock() + for _, mb := range fs.blks { + mb.loadMsgs() + } + fs.mu.RUnlock() + + // Now purge 1 that is not the first message and take note of time. + // Since we are loaded this should mostly be the time to write / flush tombstones. + start := time.Now() + n, err := fs.PurgeEx("foo.B", 0, 0) + elapsed := time.Since(start) + require_NoError(t, err) + require_Equal(t, n, 1) + + start = time.Now() + n, err = fs.PurgeEx("foo.C", 0, 0) + elapsed2 := time.Since(start) + require_NoError(t, err) + require_Equal(t, n, 500) + + // If we are flushing for each tombstone the second elapsed time will be a larger multiple of the single message purge. + // In testing this is like >200x + // With async and flush for all tombstones will be ~30x + require_True(t, elapsed*50 > elapsed2) +} diff --git a/server/sdm.go b/server/sdm.go index 4de53582cae..7431479580b 100644 --- a/server/sdm.go +++ b/server/sdm.go @@ -1,3 +1,16 @@ +// Copyright 2025 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package server import "time"