Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 101 additions & 10 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2339,7 +2339,7 @@ func (fs *fileStore) recoverMsgs() error {
for _, mb := range emptyBlks {
// Need the mb lock here.
mb.mu.Lock()
fs.removeMsgBlock(mb)
fs.forceRemoveMsgBlock(mb)
mb.mu.Unlock()
}
}
Expand Down Expand Up @@ -4669,7 +4669,7 @@ func (fs *fileStore) rebuildFirst() {
fmb.mu.RUnlock()
if isEmpty {
fmb.mu.Lock()
fs.removeMsgBlock(fmb)
fs.forceRemoveMsgBlock(fmb)
fmb.mu.Unlock()
}
fs.selectNextFirst()
Expand Down Expand Up @@ -5102,8 +5102,8 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
isLastBlock := mb == fs.lmb
isEmpty := mb.msgs == 0

// If erase but block is empty, we can simply remove the block later.
if secure && !isEmpty {
// Must always perform the erase, even if the block is empty as it could contain tombstones.
if secure {
// Grab record info, but use the pre-computed record length.
ri, _, _, err := mb.slotInfo(int(seq - mb.cache.fseq))
if err != nil {
Expand Down Expand Up @@ -5222,15 +5222,15 @@ func (mb *msgBlock) shouldCompactSync() bool {
// This will compact and rewrite this block. This version will not process any tombstone cleanup.
// Write lock needs to be held.
func (mb *msgBlock) compact() {
mb.compactWithFloor(0)
mb.compactWithFloor(0, nil)
}

// This will compact and rewrite this block. This should only be called when we know we want to rewrite this block.
// This should not be called on the lmb since we will prune tail deleted messages which could cause issues with
// writing new messages. We will silently bail on any issues with the underlying block and let someone else detect.
// if fseq > 0 we will attempt to cleanup stale tombstones.
// Write lock needs to be held.
func (mb *msgBlock) compactWithFloor(floor uint64) {
func (mb *msgBlock) compactWithFloor(floor uint64, fsDmap *avl.SequenceSet) {
wasLoaded := mb.cache != nil && mb.cacheAlreadyLoaded()
if !wasLoaded {
if err := mb.loadMsgsWithLock(); err != nil {
Expand Down Expand Up @@ -5275,7 +5275,9 @@ func (mb *msgBlock) compactWithFloor(floor uint64) {
// If this entry is for a lower seq than ours then keep around.
// We also check that it is greater than our floor. Floor is zero on normal
// calls to compact.
if seq < fseq && seq >= floor {
// If the global delete map is set, check if a tombstone is still
// referencing a message in another block. If not, it can be removed.
if seq < fseq && seq >= floor && (fsDmap == nil || fsDmap.Exists(seq)) {
nbuf = append(nbuf, buf[index:index+rl]...)
}
} else {
Expand Down Expand Up @@ -6791,6 +6793,9 @@ func (fs *fileStore) syncBlocks() {
fs.firstMoved = false
fs.mu.Unlock()

var fsDmapLoaded bool
var fsDmap avl.SequenceSet

var markDirty bool
for _, mb := range blks {
// Do actual sync. Hold lock for consistency.
Expand Down Expand Up @@ -6828,9 +6833,16 @@ func (fs *fileStore) syncBlocks() {
// Check if we should compact here.
// Need to hold fs lock in case we reference psim when loading in the mb and we may remove this block if truly empty.
if needsCompact {
// Load a delete map containing only interior deletes.
// This is used when compacting to know if tombstones are still relevant,
// and if not they can be compacted.
if !fsDmapLoaded {
fsDmapLoaded = true
fsDmap = fs.deleteMap()
}
fs.mu.RLock()
mb.mu.Lock()
mb.compactWithFloor(firstSeq)
mb.compactWithFloor(firstSeq, &fsDmap)
// If this compact removed all raw bytes due to tombstone cleanup, schedule to remove.
shouldRemove := mb.rbytes == 0
mb.mu.Unlock()
Expand Down Expand Up @@ -9112,6 +9124,55 @@ func (mb *msgBlock) tombsLocked() []msgId {
return tombs
}

// Return number of tombstones for messages prior to this msgBlock.
// Both locks should be held.
// Write lock should be held for block.
func (mb *msgBlock) numPriorTombsLocked() int {
if mb.cacheNotLoaded() {
if err := mb.loadMsgsWithLock(); err != nil {
return 0
}
}
defer mb.finishedWithCache()

var fseq uint64
var tombs int
var le = binary.LittleEndian
buf := mb.cache.buf

for index, lbuf := uint32(0), uint32(len(buf)); index < lbuf; {
if index+msgHdrSize > lbuf {
return tombs
}
hdr := buf[index : index+msgHdrSize]
rl, seq := le.Uint32(hdr[0:]), le.Uint64(hdr[4:])
// Clear any headers bit that could be set.
rl &^= hbit
// Check for tombstones.
if seq&tbit != 0 {
seq = seq &^ tbit
// Tombstones below the global first seq are irrelevant.
// And we only count tombstones below this block's first seq.
if seq >= mb.fs.state.FirstSeq && (fseq == 0 || seq < fseq) {
tombs++
}
index += rl
continue
}
if seq == 0 || seq&ebit != 0 {
index += rl
continue
}
// Advance to next record.
index += rl
if fseq == 0 {
fseq = seq
}
}

return tombs
}

// Truncate will truncate a stream store up to seq. Sequence needs to be valid.
func (fs *fileStore) Truncate(seq uint64) error {
// Check for request to reset.
Expand Down Expand Up @@ -9360,7 +9421,10 @@ func (fs *fileStore) removeMsgBlock(mb *msgBlock) {
fs.writeTombstone(lseq, lts)
}
// Only delete message block after (potentially) writing a tombstone.
fs.forceRemoveMsgBlock(mb)
// But only if it doesn't contain any tombstones for prior blocks.
if mb.numPriorTombsLocked() == 0 {
fs.forceRemoveMsgBlock(mb)
}
}

// Removes the msgBlock, without writing tombstones to ensure the last sequence is preserved.
Expand Down Expand Up @@ -10584,7 +10648,18 @@ func (fs *fileStore) deleteBlocks() DeleteBlocks {
// Detect if we have a gap between these blocks.
fseq := atomic.LoadUint64(&mb.first.seq)
if prevLast > 0 && prevLast+1 != fseq {
dbs = append(dbs, &DeleteRange{First: prevLast + 1, Num: fseq - prevLast - 1})
var reuseGap bool
if len(dbs) > 0 {
// Detect multiple blocks that only contain large gaps. We can simply make
// the previous gap larger to account for these, instead of adding a new range.
if dr, ok := dbs[len(dbs)-1].(*DeleteRange); ok {
dr.Num += fseq - prevLast - 1
reuseGap = true
}
}
if !reuseGap {
dbs = append(dbs, &DeleteRange{First: prevLast + 1, Num: fseq - prevLast - 1})
}
}
if mb.dmap.Size() > 0 {
dbs = append(dbs, &mb.dmap)
Expand All @@ -10594,6 +10669,22 @@ func (fs *fileStore) deleteBlocks() DeleteBlocks {
return dbs
}

// deleteMap returns all interior deletes for each block based on the mb.dmap.
// Specifically, this will not contain any deletes for blocks that have been removed.
// This is useful to know whether a tombstone is still relevant and marked as deleted by an active block.
// All blocks should be at least read locked.
func (fs *fileStore) deleteMap() (dmap avl.SequenceSet) {
for _, mb := range fs.blks {
if mb.dmap.Size() > 0 {
mb.dmap.Range(func(seq uint64) bool {
dmap.Insert(seq)
return true
})
}
}
return dmap
}

// SyncDeleted will make sure this stream has same deleted state as dbs.
// This will only process deleted state within our current state.
func (fs *fileStore) SyncDeleted(dbs DeleteBlocks) {
Expand Down
Loading