From c40b96b7e65c2bfe8022641768f8fb5c8e0e3798 Mon Sep 17 00:00:00 2001 From: souravagrawal Date: Wed, 9 Apr 2025 18:29:29 +0530 Subject: [PATCH 1/2] Delete lmb only after successfully creating a new msg block Signed-off-by: souravagrawal --- server/filestore.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 5bbd8df98fe..cf25a19e4fc 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -8452,8 +8452,6 @@ func (fs *fileStore) removeMsgBlockFromList(mb *msgBlock) { // Removes the msgBlock // Both locks should be held. func (fs *fileStore) removeMsgBlock(mb *msgBlock) { - mb.dirtyCloseWithRemove(true) - fs.removeMsgBlockFromList(mb) // Check for us being last message block if mb == fs.lmb { lseq, lts := atomic.LoadUint64(&mb.last.seq), mb.last.ts @@ -8465,6 +8463,8 @@ func (fs *fileStore) removeMsgBlock(mb *msgBlock) { } mb.mu.Lock() } + mb.dirtyCloseWithRemove(true) + fs.removeMsgBlockFromList(mb) } // Called by purge to simply get rid of the cache and close our fds. From ed4de99736e9dc89564fc253f1b15991acfd5ad9 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Fri, 11 Apr 2025 10:21:41 +0200 Subject: [PATCH 2/2] Test hard kill during removeMsgBlock Signed-off-by: Maurice van Veen --- server/filestore.go | 1 + server/filestore_test.go | 102 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 103 insertions(+) diff --git a/server/filestore.go b/server/filestore.go index cf25a19e4fc..3042453b2a2 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -8463,6 +8463,7 @@ func (fs *fileStore) removeMsgBlock(mb *msgBlock) { } mb.mu.Lock() } + // Only delete message block after (potentially) writing a new lmb. mb.dirtyCloseWithRemove(true) fs.removeMsgBlockFromList(mb) } diff --git a/server/filestore_test.go b/server/filestore_test.go index 4c384af2408..22a37d95c40 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -9341,3 +9341,105 @@ func TestFileStoreRecoverWithEmptyMessageBlock(t *testing.T) { } }) } + +func TestFileStoreRemoveMsgBlockFirst(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + dir := t.TempDir() + + fs, err := newFileStore( + FileStoreConfig{StoreDir: dir, srv: s}, + StreamConfig{Name: "zzz", Subjects: []string{"test"}, Storage: FileStorage}) + require_NoError(t, err) + defer fs.Stop() + + _, _, err = fs.StoreMsg("test", nil, nil, 0) + require_NoError(t, err) + + var ss StreamState + fs.FastState(&ss) + require_Equal(t, ss.Msgs, 1) + require_Equal(t, ss.FirstSeq, 1) + require_Equal(t, ss.LastSeq, 1) + + fs.Stop() + + for _, f := range []string{streamStreamStateFile, "1.blk"} { + fn := filepath.Join(dir, msgDir, f) + require_NoError(t, os.RemoveAll(fn)) + } + + fs, err = newFileStore( + FileStoreConfig{StoreDir: dir, srv: s}, + StreamConfig{Name: "zzz", Subjects: []string{"test"}, Storage: FileStorage, AllowMsgTTL: true}) + require_NoError(t, err) + defer fs.Stop() + + // If the block is removed first, we have nothing to recover. So starting out empty would be expected. + fs.FastState(&ss) + require_Equal(t, ss.Msgs, 0) + require_Equal(t, ss.FirstSeq, 0) + require_Equal(t, ss.LastSeq, 0) +} + +func TestFileStoreRemoveMsgBlockLast(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + dir := t.TempDir() + + fs, err := newFileStore( + FileStoreConfig{StoreDir: dir, srv: s}, + StreamConfig{Name: "zzz", Subjects: []string{"test"}, Storage: FileStorage}) + require_NoError(t, err) + defer fs.Stop() + + _, _, err = fs.StoreMsg("test", nil, nil, 0) + require_NoError(t, err) + + var ss StreamState + fs.FastState(&ss) + require_Equal(t, ss.Msgs, 1) + require_Equal(t, ss.FirstSeq, 1) + require_Equal(t, ss.LastSeq, 1) + + // Copy first block so we can put it back later. + ofn := filepath.Join(dir, msgDir, "1.blk") + nfn := filepath.Join(dir, msgDir, "1.blk.cp") + require_NoError(t, os.Rename(ofn, nfn)) + + // Removing the last message will result in '2.blk' to be created, and '1.blk' to be removed. + _, err = fs.RemoveMsg(1) + require_NoError(t, err) + _, err = os.Stat(filepath.Join(dir, msgDir, "2.blk")) + require_NoError(t, err) + _, err = os.Stat(ofn) + require_True(t, os.IsNotExist(err)) + + fs.Stop() + + // Remove index.db so we need to recover based on blocks. + fn := filepath.Join(dir, msgDir, streamStreamStateFile) + require_NoError(t, os.RemoveAll(fn)) + + // Put back '1.blk' file to simulate being hard killed right + // after creating '2.blk' but before cleaning up '1.blk'. + require_NoError(t, os.Rename(nfn, ofn)) + _, err = os.Stat(ofn) + require_False(t, os.IsNotExist(err)) + + fs, err = newFileStore( + FileStoreConfig{StoreDir: dir, srv: s}, + StreamConfig{Name: "zzz", Subjects: []string{"test"}, Storage: FileStorage, AllowMsgTTL: true}) + require_NoError(t, err) + defer fs.Stop() + + // Should recognize correct state, and remove '1.blk'. + fs.FastState(&ss) + require_Equal(t, ss.Msgs, 0) + require_Equal(t, ss.FirstSeq, 2) + require_Equal(t, ss.LastSeq, 1) + _, err = os.Stat(ofn) + require_True(t, os.IsNotExist(err)) +}