diff --git a/server/filestore.go b/server/filestore.go index 62e23a63df9..5de4b91d712 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -4096,8 +4096,9 @@ func (mb *msgBlock) compact() { if !isDeleted(seq) { // Check for tombstones. if seq&tbit != 0 { - // If we are last mb we should consider to keep these unless the tombstone reflects a seq in this mb. - if mb == mb.fs.lmb && seq < fseq { + seq = seq &^ tbit + // If this entry is for a lower seq than ours then keep around. + if seq < fseq { nbuf = append(nbuf, buf[index:index+rl]...) } } else { @@ -4153,6 +4154,9 @@ func (mb *msgBlock) compact() { return } + // Make sure to sync + mb.needSync = true + // Capture the updated rbytes. mb.rbytes = uint64(len(nbuf)) @@ -6994,6 +6998,9 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) { if smb != fs.lmb { smb.dirtyCloseWithRemove(true) deleted++ + } else { + // Make sure to sync changes. + smb.needSync = true } // Update fs first here as well. fs.state.FirstSeq = atomic.LoadUint64(&smb.last.seq) + 1 diff --git a/server/filestore_test.go b/server/filestore_test.go index 564512f5756..68dd1f90a4c 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -6897,6 +6897,53 @@ func TestFileStoreFSSExpireNumPending(t *testing.T) { fs.mu.RUnlock() } +// We want to ensure that recovery of deleted messages survives no index.db and compactions. +func TestFileStoreRecoverWithRemovesAndNoIndexDB(t *testing.T) { + sd := t.TempDir() + fs, err := newFileStore( + FileStoreConfig{StoreDir: sd, BlockSize: 250}, + StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage}) + require_NoError(t, err) + defer fs.Stop() + + msg := []byte("abc") + for i := 1; i <= 10; i++ { + fs.StoreMsg(fmt.Sprintf("foo.%d", i), nil, msg) + } + fs.RemoveMsg(1) + fs.RemoveMsg(2) + fs.RemoveMsg(8) + + var ss StreamState + fs.FastState(&ss) + require_Equal(t, ss.FirstSeq, 3) + require_Equal(t, ss.LastSeq, 10) + require_Equal(t, ss.Msgs, 7) + + // Compact last block. + fs.mu.RLock() + lmb := fs.lmb + fs.mu.RUnlock() + lmb.mu.Lock() + lmb.compact() + lmb.mu.Unlock() + // Stop but remove index.db + sfile := filepath.Join(sd, msgDir, streamStreamStateFile) + fs.Stop() + os.Remove(sfile) + + fs, err = newFileStore( + FileStoreConfig{StoreDir: sd}, + StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage}) + require_NoError(t, err) + defer fs.Stop() + + fs.FastState(&ss) + require_Equal(t, ss.FirstSeq, 3) + require_Equal(t, ss.LastSeq, 10) + require_Equal(t, ss.Msgs, 7) +} + /////////////////////////////////////////////////////////////////////////// // Benchmarks ///////////////////////////////////////////////////////////////////////////