From e88b9d4f70d9abe27b37efe95abf10fe809946eb Mon Sep 17 00:00:00 2001 From: souravagrawal Date: Wed, 9 Apr 2025 18:29:29 +0530 Subject: [PATCH 1/2] Delete lmb only after successfully creating a new msg block Signed-off-by: souravagrawal --- server/filestore.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 67fe6cda67f..cebb96ec35a 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -8429,8 +8429,6 @@ func (fs *fileStore) removeMsgBlockFromList(mb *msgBlock) { // Removes the msgBlock // Both locks should be held. func (fs *fileStore) removeMsgBlock(mb *msgBlock) { - mb.dirtyCloseWithRemove(true) - fs.removeMsgBlockFromList(mb) // Check for us being last message block if mb == fs.lmb { lseq, lts := atomic.LoadUint64(&mb.last.seq), mb.last.ts @@ -8442,6 +8440,8 @@ func (fs *fileStore) removeMsgBlock(mb *msgBlock) { } mb.mu.Lock() } + mb.dirtyCloseWithRemove(true) + fs.removeMsgBlockFromList(mb) } // Called by purge to simply get rid of the cache and close our fds. From 3b70f6a66aa8ab8ddfa1e5e5fb4e121423d0de4a Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Fri, 11 Apr 2025 10:21:41 +0200 Subject: [PATCH 2/2] Test hard kill during removeMsgBlock Signed-off-by: Maurice van Veen --- server/filestore.go | 1 + server/filestore_test.go | 102 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 103 insertions(+) diff --git a/server/filestore.go b/server/filestore.go index cebb96ec35a..a6c482ed584 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -8440,6 +8440,7 @@ func (fs *fileStore) removeMsgBlock(mb *msgBlock) { } mb.mu.Lock() } + // Only delete message block after (potentially) writing a new lmb. mb.dirtyCloseWithRemove(true) fs.removeMsgBlockFromList(mb) } diff --git a/server/filestore_test.go b/server/filestore_test.go index 5831e268f77..8a4c871354d 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -9398,3 +9398,105 @@ func TestFileStoreRecoverWithEmptyMessageBlock(t *testing.T) { } }) } + +func TestFileStoreRemoveMsgBlockFirst(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + dir := t.TempDir() + + fs, err := newFileStore( + FileStoreConfig{StoreDir: dir, srv: s}, + StreamConfig{Name: "zzz", Subjects: []string{"test"}, Storage: FileStorage}) + require_NoError(t, err) + defer fs.Stop() + + _, _, err = fs.StoreMsg("test", nil, nil, 0) + require_NoError(t, err) + + var ss StreamState + fs.FastState(&ss) + require_Equal(t, ss.Msgs, 1) + require_Equal(t, ss.FirstSeq, 1) + require_Equal(t, ss.LastSeq, 1) + + fs.Stop() + + for _, f := range []string{streamStreamStateFile, "1.blk"} { + fn := filepath.Join(dir, msgDir, f) + require_NoError(t, os.RemoveAll(fn)) + } + + fs, err = newFileStore( + FileStoreConfig{StoreDir: dir, srv: s}, + StreamConfig{Name: "zzz", Subjects: []string{"test"}, Storage: FileStorage, AllowMsgTTL: true}) + require_NoError(t, err) + defer fs.Stop() + + // If the block is removed first, we have nothing to recover. So starting out empty would be expected. + fs.FastState(&ss) + require_Equal(t, ss.Msgs, 0) + require_Equal(t, ss.FirstSeq, 0) + require_Equal(t, ss.LastSeq, 0) +} + +func TestFileStoreRemoveMsgBlockLast(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + dir := t.TempDir() + + fs, err := newFileStore( + FileStoreConfig{StoreDir: dir, srv: s}, + StreamConfig{Name: "zzz", Subjects: []string{"test"}, Storage: FileStorage}) + require_NoError(t, err) + defer fs.Stop() + + _, _, err = fs.StoreMsg("test", nil, nil, 0) + require_NoError(t, err) + + var ss StreamState + fs.FastState(&ss) + require_Equal(t, ss.Msgs, 1) + require_Equal(t, ss.FirstSeq, 1) + require_Equal(t, ss.LastSeq, 1) + + // Copy first block so we can put it back later. + ofn := filepath.Join(dir, msgDir, "1.blk") + nfn := filepath.Join(dir, msgDir, "1.blk.cp") + require_NoError(t, os.Rename(ofn, nfn)) + + // Removing the last message will result in '2.blk' to be created, and '1.blk' to be removed. + _, err = fs.RemoveMsg(1) + require_NoError(t, err) + _, err = os.Stat(filepath.Join(dir, msgDir, "2.blk")) + require_NoError(t, err) + _, err = os.Stat(ofn) + require_True(t, os.IsNotExist(err)) + + fs.Stop() + + // Remove index.db so we need to recover based on blocks. + fn := filepath.Join(dir, msgDir, streamStreamStateFile) + require_NoError(t, os.RemoveAll(fn)) + + // Put back '1.blk' file to simulate being hard killed right + // after creating '2.blk' but before cleaning up '1.blk'. + require_NoError(t, os.Rename(nfn, ofn)) + _, err = os.Stat(ofn) + require_False(t, os.IsNotExist(err)) + + fs, err = newFileStore( + FileStoreConfig{StoreDir: dir, srv: s}, + StreamConfig{Name: "zzz", Subjects: []string{"test"}, Storage: FileStorage, AllowMsgTTL: true}) + require_NoError(t, err) + defer fs.Stop() + + // Should recognize correct state, and remove '1.blk'. + fs.FastState(&ss) + require_Equal(t, ss.Msgs, 0) + require_Equal(t, ss.FirstSeq, 2) + require_Equal(t, ss.LastSeq, 1) + _, err = os.Stat(ofn) + require_True(t, os.IsNotExist(err)) +}