Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -8429,8 +8429,6 @@ func (fs *fileStore) removeMsgBlockFromList(mb *msgBlock) {
// Removes the msgBlock
// Both locks should be held.
func (fs *fileStore) removeMsgBlock(mb *msgBlock) {
mb.dirtyCloseWithRemove(true)
fs.removeMsgBlockFromList(mb)
// Check for us being last message block
if mb == fs.lmb {
lseq, lts := atomic.LoadUint64(&mb.last.seq), mb.last.ts
Expand All @@ -8442,6 +8440,9 @@ func (fs *fileStore) removeMsgBlock(mb *msgBlock) {
}
mb.mu.Lock()
}
// Only delete message block after (potentially) writing a new lmb.
mb.dirtyCloseWithRemove(true)
fs.removeMsgBlockFromList(mb)
}

// Called by purge to simply get rid of the cache and close our fds.
Expand Down
102 changes: 102 additions & 0 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9398,3 +9398,105 @@ func TestFileStoreRecoverWithEmptyMessageBlock(t *testing.T) {
}
})
}

func TestFileStoreRemoveMsgBlockFirst(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

dir := t.TempDir()

fs, err := newFileStore(
FileStoreConfig{StoreDir: dir, srv: s},
StreamConfig{Name: "zzz", Subjects: []string{"test"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

_, _, err = fs.StoreMsg("test", nil, nil, 0)
require_NoError(t, err)

var ss StreamState
fs.FastState(&ss)
require_Equal(t, ss.Msgs, 1)
require_Equal(t, ss.FirstSeq, 1)
require_Equal(t, ss.LastSeq, 1)

fs.Stop()

for _, f := range []string{streamStreamStateFile, "1.blk"} {
fn := filepath.Join(dir, msgDir, f)
require_NoError(t, os.RemoveAll(fn))
}

fs, err = newFileStore(
FileStoreConfig{StoreDir: dir, srv: s},
StreamConfig{Name: "zzz", Subjects: []string{"test"}, Storage: FileStorage, AllowMsgTTL: true})
require_NoError(t, err)
defer fs.Stop()

// If the block is removed first, we have nothing to recover. So starting out empty would be expected.
fs.FastState(&ss)
require_Equal(t, ss.Msgs, 0)
require_Equal(t, ss.FirstSeq, 0)
require_Equal(t, ss.LastSeq, 0)
}

func TestFileStoreRemoveMsgBlockLast(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

dir := t.TempDir()

fs, err := newFileStore(
FileStoreConfig{StoreDir: dir, srv: s},
StreamConfig{Name: "zzz", Subjects: []string{"test"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

_, _, err = fs.StoreMsg("test", nil, nil, 0)
require_NoError(t, err)

var ss StreamState
fs.FastState(&ss)
require_Equal(t, ss.Msgs, 1)
require_Equal(t, ss.FirstSeq, 1)
require_Equal(t, ss.LastSeq, 1)

// Copy first block so we can put it back later.
ofn := filepath.Join(dir, msgDir, "1.blk")
nfn := filepath.Join(dir, msgDir, "1.blk.cp")
require_NoError(t, os.Rename(ofn, nfn))

// Removing the last message will result in '2.blk' to be created, and '1.blk' to be removed.
_, err = fs.RemoveMsg(1)
require_NoError(t, err)
_, err = os.Stat(filepath.Join(dir, msgDir, "2.blk"))
require_NoError(t, err)
_, err = os.Stat(ofn)
require_True(t, os.IsNotExist(err))

fs.Stop()

// Remove index.db so we need to recover based on blocks.
fn := filepath.Join(dir, msgDir, streamStreamStateFile)
require_NoError(t, os.RemoveAll(fn))

// Put back '1.blk' file to simulate being hard killed right
// after creating '2.blk' but before cleaning up '1.blk'.
require_NoError(t, os.Rename(nfn, ofn))
_, err = os.Stat(ofn)
require_False(t, os.IsNotExist(err))

fs, err = newFileStore(
FileStoreConfig{StoreDir: dir, srv: s},
StreamConfig{Name: "zzz", Subjects: []string{"test"}, Storage: FileStorage, AllowMsgTTL: true})
require_NoError(t, err)
defer fs.Stop()

// Should recognize correct state, and remove '1.blk'.
fs.FastState(&ss)
require_Equal(t, ss.Msgs, 0)
require_Equal(t, ss.FirstSeq, 2)
require_Equal(t, ss.LastSeq, 1)
_, err = os.Stat(ofn)
require_True(t, os.IsNotExist(err))
}
Loading