diff --git a/server/filestore.go b/server/filestore.go index c6e594e557f..61605837f24 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -2339,7 +2339,7 @@ func (fs *fileStore) recoverMsgs() error { for _, mb := range emptyBlks { // Need the mb lock here. mb.mu.Lock() - fs.removeMsgBlock(mb) + fs.forceRemoveMsgBlock(mb) mb.mu.Unlock() } } @@ -4669,7 +4669,7 @@ func (fs *fileStore) rebuildFirst() { fmb.mu.RUnlock() if isEmpty { fmb.mu.Lock() - fs.removeMsgBlock(fmb) + fs.forceRemoveMsgBlock(fmb) fmb.mu.Unlock() } fs.selectNextFirst() @@ -5102,8 +5102,8 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) ( isLastBlock := mb == fs.lmb isEmpty := mb.msgs == 0 - // If erase but block is empty, we can simply remove the block later. - if secure && !isEmpty { + // Must always perform the erase, even if the block is empty as it could contain tombstones. + if secure { // Grab record info, but use the pre-computed record length. ri, _, _, err := mb.slotInfo(int(seq - mb.cache.fseq)) if err != nil { @@ -5222,7 +5222,7 @@ func (mb *msgBlock) shouldCompactSync() bool { // This will compact and rewrite this block. This version will not process any tombstone cleanup. // Write lock needs to be held. func (mb *msgBlock) compact() { - mb.compactWithFloor(0) + mb.compactWithFloor(0, nil) } // This will compact and rewrite this block. This should only be called when we know we want to rewrite this block. @@ -5230,7 +5230,7 @@ func (mb *msgBlock) compact() { // writing new messages. We will silently bail on any issues with the underlying block and let someone else detect. // if fseq > 0 we will attempt to cleanup stale tombstones. // Write lock needs to be held. -func (mb *msgBlock) compactWithFloor(floor uint64) { +func (mb *msgBlock) compactWithFloor(floor uint64, fsDmap *avl.SequenceSet) { wasLoaded := mb.cache != nil && mb.cacheAlreadyLoaded() if !wasLoaded { if err := mb.loadMsgsWithLock(); err != nil { @@ -5275,7 +5275,9 @@ func (mb *msgBlock) compactWithFloor(floor uint64) { // If this entry is for a lower seq than ours then keep around. // We also check that it is greater than our floor. Floor is zero on normal // calls to compact. - if seq < fseq && seq >= floor { + // If the global delete map is set, check if a tombstone is still + // referencing a message in another block. If not, it can be removed. + if seq < fseq && seq >= floor && (fsDmap == nil || fsDmap.Exists(seq)) { nbuf = append(nbuf, buf[index:index+rl]...) } } else { @@ -6791,6 +6793,9 @@ func (fs *fileStore) syncBlocks() { fs.firstMoved = false fs.mu.Unlock() + var fsDmapLoaded bool + var fsDmap avl.SequenceSet + var markDirty bool for _, mb := range blks { // Do actual sync. Hold lock for consistency. @@ -6828,9 +6833,16 @@ func (fs *fileStore) syncBlocks() { // Check if we should compact here. // Need to hold fs lock in case we reference psim when loading in the mb and we may remove this block if truly empty. if needsCompact { + // Load a delete map containing only interior deletes. + // This is used when compacting to know if tombstones are still relevant, + // and if not they can be compacted. + if !fsDmapLoaded { + fsDmapLoaded = true + fsDmap = fs.deleteMap() + } fs.mu.RLock() mb.mu.Lock() - mb.compactWithFloor(firstSeq) + mb.compactWithFloor(firstSeq, &fsDmap) // If this compact removed all raw bytes due to tombstone cleanup, schedule to remove. shouldRemove := mb.rbytes == 0 mb.mu.Unlock() @@ -9112,6 +9124,55 @@ func (mb *msgBlock) tombsLocked() []msgId { return tombs } +// Return number of tombstones for messages prior to this msgBlock. +// Both locks should be held. +// Write lock should be held for block. +func (mb *msgBlock) numPriorTombsLocked() int { + if mb.cacheNotLoaded() { + if err := mb.loadMsgsWithLock(); err != nil { + return 0 + } + } + defer mb.finishedWithCache() + + var fseq uint64 + var tombs int + var le = binary.LittleEndian + buf := mb.cache.buf + + for index, lbuf := uint32(0), uint32(len(buf)); index < lbuf; { + if index+msgHdrSize > lbuf { + return tombs + } + hdr := buf[index : index+msgHdrSize] + rl, seq := le.Uint32(hdr[0:]), le.Uint64(hdr[4:]) + // Clear any headers bit that could be set. + rl &^= hbit + // Check for tombstones. + if seq&tbit != 0 { + seq = seq &^ tbit + // Tombstones below the global first seq are irrelevant. + // And we only count tombstones below this block's first seq. + if seq >= mb.fs.state.FirstSeq && (fseq == 0 || seq < fseq) { + tombs++ + } + index += rl + continue + } + if seq == 0 || seq&ebit != 0 { + index += rl + continue + } + // Advance to next record. + index += rl + if fseq == 0 { + fseq = seq + } + } + + return tombs +} + // Truncate will truncate a stream store up to seq. Sequence needs to be valid. func (fs *fileStore) Truncate(seq uint64) error { // Check for request to reset. @@ -9360,7 +9421,10 @@ func (fs *fileStore) removeMsgBlock(mb *msgBlock) { fs.writeTombstone(lseq, lts) } // Only delete message block after (potentially) writing a tombstone. - fs.forceRemoveMsgBlock(mb) + // But only if it doesn't contain any tombstones for prior blocks. + if mb.numPriorTombsLocked() == 0 { + fs.forceRemoveMsgBlock(mb) + } } // Removes the msgBlock, without writing tombstones to ensure the last sequence is preserved. @@ -10584,7 +10648,18 @@ func (fs *fileStore) deleteBlocks() DeleteBlocks { // Detect if we have a gap between these blocks. fseq := atomic.LoadUint64(&mb.first.seq) if prevLast > 0 && prevLast+1 != fseq { - dbs = append(dbs, &DeleteRange{First: prevLast + 1, Num: fseq - prevLast - 1}) + var reuseGap bool + if len(dbs) > 0 { + // Detect multiple blocks that only contain large gaps. We can simply make + // the previous gap larger to account for these, instead of adding a new range. + if dr, ok := dbs[len(dbs)-1].(*DeleteRange); ok { + dr.Num += fseq - prevLast - 1 + reuseGap = true + } + } + if !reuseGap { + dbs = append(dbs, &DeleteRange{First: prevLast + 1, Num: fseq - prevLast - 1}) + } } if mb.dmap.Size() > 0 { dbs = append(dbs, &mb.dmap) @@ -10594,6 +10669,22 @@ func (fs *fileStore) deleteBlocks() DeleteBlocks { return dbs } +// deleteMap returns all interior deletes for each block based on the mb.dmap. +// Specifically, this will not contain any deletes for blocks that have been removed. +// This is useful to know whether a tombstone is still relevant and marked as deleted by an active block. +// All blocks should be at least read locked. +func (fs *fileStore) deleteMap() (dmap avl.SequenceSet) { + for _, mb := range fs.blks { + if mb.dmap.Size() > 0 { + mb.dmap.Range(func(seq uint64) bool { + dmap.Insert(seq) + return true + }) + } + } + return dmap +} + // SyncDeleted will make sure this stream has same deleted state as dbs. // This will only process deleted state within our current state. func (fs *fileStore) SyncDeleted(dbs DeleteBlocks) { diff --git a/server/filestore_test.go b/server/filestore_test.go index 22f3f2c5323..43e85125f60 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -10682,6 +10682,74 @@ func TestFileStoreEraseMsgDoesNotLoseTombstones(t *testing.T) { }) } +func TestFileStoreEraseMsgDoesNotLoseTombstonesInEmptyBlock(t *testing.T) { + testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) { + cfg := StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage} + created := time.Now() + fs, err := newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() + + // The first message will remain throughout. + _, _, err = fs.StoreMsg("foo", nil, nil, 0) + require_NoError(t, err) + // The second message wil be removed, so a tombstone will be placed. + _, _, err = fs.StoreMsg("foo", nil, nil, 0) + require_NoError(t, err) + + mb, err := fs.newMsgBlockForWrite() + require_NoError(t, err) + + secret := []byte("secret!") + // The third message is secret and will be erased. + _, _, err = fs.StoreMsg("foo", nil, secret, 0) + require_NoError(t, err) + + // Removing the second message places a tombstone. + _, err = fs.RemoveMsg(2) + require_NoError(t, err) + + // Now we erase the third message. + // This erases this message and should not lose the tombstone that comes after it. + // It should do the erase, even if the block would be empty afterward as it could contain tombstones. + _, err = fs.EraseMsg(3) + require_NoError(t, err) + + before := fs.State() + require_Equal(t, before.Msgs, 1) + require_Equal(t, before.FirstSeq, 1) + require_Equal(t, before.LastSeq, 3) + require_True(t, slices.Equal(before.Deleted, []uint64{2, 3})) + + _, err = fs.LoadMsg(2, nil) + require_Error(t, err, errDeletedMsg) + _, err = fs.LoadMsg(3, nil) + require_Error(t, err, ErrStoreMsgNotFound) + + // The message should be erased. + buf, err := mb.loadBlock(nil) + require_NoError(t, err) + require_False(t, bytes.Contains(buf, secret)) + + // Make sure we can recover properly with no index.db present. + fs.Stop() + os.Remove(filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile)) + + fs, err = newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() + + if state := fs.State(); !reflect.DeepEqual(state, before) { + t.Fatalf("Expected state\n of %+v, \ngot %+v without index.db state", before, state) + } + + _, err = fs.LoadMsg(2, nil) + require_Error(t, err, errDeletedMsg) + _, err = fs.LoadMsg(3, nil) + require_Error(t, err, ErrStoreMsgNotFound) + }) +} + func TestFileStoreTombstonesNoFirstSeqRollback(t *testing.T) { testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) { fcfg.BlockSize = 10 * 33 // 10 messages per block. @@ -11133,3 +11201,160 @@ func TestFileStoreIdxAccountingForSkipMsgs(t *testing.T) { t.Run("SkipMsg", func(t *testing.T) { test(t, false) }) t.Run("SkipMsgs", func(t *testing.T) { test(t, true) }) } + +func TestFileStoreEmptyBlockContainsPriorTombstones(t *testing.T) { + testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) { + cfg := StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage} + created := time.Now() + fs, err := newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() + + // 1.blk + _, _, err = fs.StoreMsg("foo", nil, nil, 0) + require_NoError(t, err) + _, _, err = fs.StoreMsg("foo", nil, nil, 0) + require_NoError(t, err) + + // 2.blk + _, err = fs.newMsgBlockForWrite() + require_NoError(t, err) + _, _, err = fs.StoreMsg("foo", nil, nil, 0) + require_NoError(t, err) + _, err = fs.RemoveMsg(2) + require_NoError(t, err) + _, err = fs.RemoveMsg(3) // Will create a new lmb with this as tombstone. + require_NoError(t, err) + + before := fs.State() + require_Equal(t, before.Msgs, 1) + require_Equal(t, before.FirstSeq, 1) + require_Equal(t, before.LastSeq, 3) + require_True(t, slices.Equal(before.Deleted, []uint64{2, 3})) + + fs.mu.RLock() + lblks := len(fs.blks) + fs.mu.RUnlock() + require_Equal(t, lblks, 3) + + _, err = fs.LoadMsg(2, nil) + require_Error(t, err, errDeletedMsg) + _, err = fs.LoadMsg(3, nil) + require_Error(t, err, ErrStoreMsgNotFound) + + // Make sure we can recover properly with no index.db present. + fs.Stop() + os.Remove(filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile)) + + fs, err = newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() + + if state := fs.State(); !reflect.DeepEqual(state, before) { + t.Fatalf("Expected state\n of %+v, \ngot %+v without index.db state", before, state) + } + + fs.mu.RLock() + lblks = len(fs.blks) + fs.mu.RUnlock() + require_Equal(t, lblks, 3) + + _, err = fs.LoadMsg(2, nil) + require_Error(t, err, errDeletedMsg) + _, err = fs.LoadMsg(3, nil) + require_Error(t, err, ErrStoreMsgNotFound) + + // 3.blk + _, _, err = fs.StoreMsg("foo", nil, nil, 0) + require_NoError(t, err) + + fs.mu.RLock() + lblks = len(fs.blks) + fs.mu.RUnlock() + require_Equal(t, lblks, 3) + + // Removing the first message moves the first seq up. + // Should also remove blocks without any messages and (invalidated) tombstones. + _, err = fs.RemoveMsg(1) + require_NoError(t, err) + + fs.mu.RLock() + lblks = len(fs.blks) + fs.mu.RUnlock() + require_Equal(t, lblks, 1) + }) +} + +func TestFileStoreCompactTombstonesBelowFirstSeq(t *testing.T) { + testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) { + cfg := StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage} + created := time.Now() + fs, err := newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() + + // 1.blk + _, _, err = fs.StoreMsg("foo", nil, nil, 0) + require_NoError(t, err) + _, _, err = fs.StoreMsg("foo", nil, nil, 0) + require_NoError(t, err) + _, _, err = fs.StoreMsg("foo", nil, nil, 0) + require_NoError(t, err) + + // 2.blk + _, err = fs.newMsgBlockForWrite() + require_NoError(t, err) + _, _, err = fs.StoreMsg("foo", nil, nil, 0) + require_NoError(t, err) + _, err = fs.RemoveMsg(3) + require_NoError(t, err) + _, err = fs.RemoveMsg(2) + require_NoError(t, err) + + state := fs.State() + require_Equal(t, state.Msgs, 2) + require_Equal(t, state.FirstSeq, 1) + require_Equal(t, state.LastSeq, 4) + require_True(t, slices.Equal(state.Deleted, []uint64{2, 3})) + + fs.mu.RLock() + lblks := len(fs.blks) + fs.mu.RUnlock() + require_Equal(t, lblks, 2) + + // Block should report the two prior tombstones. + fs.mu.Lock() + lmb := fs.lmb + lmb.mu.Lock() + priorTombs := lmb.numPriorTombsLocked() + lmb.mu.Unlock() + fs.mu.Unlock() + require_Equal(t, priorTombs, 2) + + // The first sequence moves up as a result of the removal. + _, err = fs.RemoveMsg(1) + require_NoError(t, err) + + // Block should now report no prior tombstones, since they are now invalid. + fs.mu.Lock() + lmb.mu.Lock() + priorTombs = lmb.numPriorTombsLocked() + lmb.mu.Unlock() + fs.mu.Unlock() + require_Equal(t, priorTombs, 0) + + // Make sure we have a new last block such that we can compact. + _, err = fs.newMsgBlockForWrite() + require_NoError(t, err) + + lmb.mu.RLock() + rbytes := lmb.rbytes + lmb.mu.RUnlock() + require_True(t, lmb.shouldCompactSync()) + fs.syncBlocks() + + lmb.mu.RLock() + defer lmb.mu.RUnlock() + require_NotEqual(t, lmb.rbytes, rbytes) + }) +} diff --git a/server/norace_2_test.go b/server/norace_2_test.go index e401894c4bd..f2fb148834d 100644 --- a/server/norace_2_test.go +++ b/server/norace_2_test.go @@ -456,7 +456,7 @@ func TestNoRaceFilestoreBinaryStreamSnapshotEncodingLargeGaps(t *testing.T) { storeDir := t.TempDir() fcfg := FileStoreConfig{ StoreDir: storeDir, - BlockSize: 512, // Small on purpose to create alot of blks. + BlockSize: 512, // Small on purpose to create a lot of blks. } fs, err := newFileStore(fcfg, StreamConfig{Name: "zzz", Subjects: []string{"zzz"}, Storage: FileStorage}) require_NoError(t, err) @@ -473,19 +473,22 @@ func TestNoRaceFilestoreBinaryStreamSnapshotEncodingLargeGaps(t *testing.T) { } fs.StoreMsg(subj, nil, msg, 0) + // The tombstones from above will only be cleaned up when syncing blocks. + fs.syncBlocks() + snap, err := fs.EncodedStreamState(0) require_NoError(t, err) - require_True(t, len(snap) < 512) + require_LessThan(t, len(snap), 512) // Now decode the snapshot. ss, err := DecodeStreamState(snap) require_NoError(t, err) - require_True(t, ss.FirstSeq == 1) - require_True(t, ss.LastSeq == 20_000) - require_True(t, ss.Msgs == 2) - require_True(t, len(ss.Deleted) <= 2) - require_True(t, ss.Deleted.NumDeleted() == 19_998) + require_Equal(t, ss.FirstSeq, 1) + require_Equal(t, ss.LastSeq, 20_000) + require_Equal(t, ss.Msgs, 2) + require_Equal(t, len(ss.Deleted), 2) + require_Equal(t, ss.Deleted.NumDeleted(), 19_998) } func TestNoRaceJetStreamClusterStreamSnapshotCatchup(t *testing.T) {