diff --git a/server/filestore.go b/server/filestore.go index 0faf571b83c..fab64bf65d1 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -5080,7 +5080,7 @@ func (mb *msgBlock) compactWithFloor(floor uint64) { // Grab info from a slot. // Lock should be held. func (mb *msgBlock) slotInfo(slot int) (uint32, uint32, bool, error) { - if mb.cache == nil || slot >= len(mb.cache.idx) { + if slot < 0 || mb.cache == nil || slot >= len(mb.cache.idx) { return 0, 0, false, errPartialCache } @@ -8682,6 +8682,10 @@ func (fs *fileStore) Truncate(seq uint64) error { return ErrStoreSnapshotInProgress } + // Any existing state file will no longer be applicable. We will force write a new one + // at the end, after we release the lock. + os.Remove(filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile)) + var lsm *StoreMsg smb := fs.selectMsgBlock(seq) if smb != nil { @@ -8708,9 +8712,15 @@ func (fs *fileStore) Truncate(seq uint64) error { return err } + // The selected message block needs to be removed if it needs to be fully truncated. + var removeSmb bool + if smb != nil { + removeSmb = atomic.LoadUint64(&smb.first.seq) > seq + } + // If the selected block is not found or the message was deleted, we'll need to write a tombstone // at the truncated sequence so we don't roll backward on our last sequence and timestamp. - if lsm == nil { + if lsm == nil || removeSmb { fs.writeTombstone(seq, lastTime) } @@ -8750,8 +8760,28 @@ func (fs *fileStore) Truncate(seq uint64) error { hasWrittenTombstones := len(tmb.tombs()) > 0 if smb != nil { - // Make sure writeable. smb.mu.Lock() + if removeSmb { + purged += smb.msgs + bytes += smb.bytes + + // We could have tombstones for messages before the truncated sequence. + if tombs := smb.tombsLocked(); len(tombs) > 0 { + // Temporarily unlock while we write tombstones. + smb.mu.Unlock() + for _, tomb := range tombs { + if tomb.seq < seq { + fs.writeTombstone(tomb.seq, tomb.ts) + } + } + smb.mu.Lock() + } + fs.removeMsgBlock(smb) + smb.mu.Unlock() + goto SKIP + } + + // Make sure writeable. if err := smb.enableForWriting(fs.fip); err != nil { smb.mu.Unlock() fs.mu.Unlock() @@ -8782,6 +8812,7 @@ func (fs *fileStore) Truncate(seq uint64) error { smb.mu.Unlock() } +SKIP: // If no tombstones were written, we can remove the block and // purely rely on the selected block as the last block. if !hasWrittenTombstones { @@ -8807,9 +8838,6 @@ func (fs *fileStore) Truncate(seq uint64) error { // Reset our subject lookup info. fs.resetGlobalPerSubjectInfo() - // Any existing state file no longer applicable. We will force write a new one - // after we release the lock. - os.Remove(filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile)) fs.dirty++ cb := fs.scb diff --git a/server/filestore_test.go b/server/filestore_test.go index 0b8528f10b3..91ea25ee51c 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -10064,6 +10064,63 @@ func TestFileStoreCompressionAfterTruncate(t *testing.T) { } } +func TestFileStoreTruncateRemovedBlock(t *testing.T) { + testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) { + cfg := StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage} + created := time.Now() + fs, err := newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() + + for i := range 3 { + if i > 0 { + _, err = fs.newMsgBlockForWrite() + require_NoError(t, err) + } + _, _, err = fs.StoreMsg("foo", nil, nil, 0) + require_NoError(t, err) + } + + fs.mu.RLock() + blks := len(fs.blks) + fs.mu.RUnlock() + require_Len(t, blks, 3) + + state := fs.State() + require_Equal(t, state.Msgs, 3) + require_Equal(t, state.FirstSeq, 1) + require_Equal(t, state.LastSeq, 3) + require_Equal(t, state.NumDeleted, 0) + + removed, err := fs.RemoveMsg(2) + require_NoError(t, err) + require_True(t, removed) + + fs.mu.RLock() + blks = len(fs.blks) + fs.mu.RUnlock() + require_Len(t, blks, 2) + + fs.mu.RLock() + blks = len(fs.blks) + fs.mu.RUnlock() + require_Len(t, blks, 2) + + state = fs.State() + require_Equal(t, state.Msgs, 2) + require_Equal(t, state.FirstSeq, 1) + require_Equal(t, state.LastSeq, 3) + require_Equal(t, state.NumDeleted, 1) + + require_NoError(t, fs.Truncate(2)) + state = fs.State() + require_Equal(t, state.Msgs, 1) + require_Equal(t, state.FirstSeq, 1) + require_Equal(t, state.LastSeq, 2) + require_Equal(t, state.NumDeleted, 1) + }) +} + func TestFileStoreAtomicEraseMsg(t *testing.T) { for _, lmb := range []bool{true, false} { t.Run(fmt.Sprintf("lmb=%v", lmb), func(t *testing.T) { diff --git a/server/raft.go b/server/raft.go index 139ee418927..9870b284d4e 100644 --- a/server/raft.go +++ b/server/raft.go @@ -498,12 +498,14 @@ func (s *Server) initRaftNode(accName string, cfg *RaftConfig, labels pprofLabel ae, err := n.loadEntry(index) if err != nil { n.warn("Could not load %d from WAL [%+v]: %v", index, state, err) - truncateAndErr(index) + // Truncate to the previous correct entry. + truncateAndErr(index - 1) break } if ae.pindex != index-1 { n.warn("Corrupt WAL, will truncate") - truncateAndErr(index) + // Truncate to the previous correct entry. + truncateAndErr(index - 1) break } n.processAppendEntry(ae, nil) diff --git a/server/raft_test.go b/server/raft_test.go index d6a9359b28f..84fcf1a5219 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -3158,6 +3158,101 @@ func TestNRGStepdownWithHighestTermDuringCatchup(t *testing.T) { require_Equal(t, n.pindex, 2) } +func TestNRGTruncateOnStartup(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + s := c.servers[0] // RunBasicJetStreamServer not available + defer c.shutdown() + + storeDir := t.TempDir() + fcfg := FileStoreConfig{StoreDir: storeDir, BlockSize: defaultMediumBlockSize, AsyncFlush: false, srv: s} + scfg := StreamConfig{Name: "RAFT", Storage: FileStorage} + fs, err := newFileStore(fcfg, scfg) + require_NoError(t, err) + + cfg := &RaftConfig{Name: "TEST", Store: storeDir, Log: fs} + + err = s.bootstrapRaftNode(cfg, nil, false) + require_NoError(t, err) + n, err := s.initRaftNode(globalAccountName, cfg, pprofLabels{}) + require_NoError(t, err) + + // Create a sample entry, the content doesn't matter, just that it's stored. + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + + nats0 := "S1Nunr6R" // "nats-0" + aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries}) + aeMsg2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 1, pindex: 1, entries: entries}) + + // Store two messages the normal way. + n.processAppendEntry(aeMsg1, n.aesub) + n.processAppendEntry(aeMsg2, n.aesub) + require_Equal(t, n.pindex, 2) + + state := n.wal.State() + require_Equal(t, state.Msgs, 2) + require_Equal(t, state.FirstSeq, 1) + require_Equal(t, state.LastSeq, 2) + require_Equal(t, state.NumDeleted, 0) + + // Simulate a truncation that's only performed halfway, and we got hard killed at some point. + removed, err := n.wal.RemoveMsg(2) + require_True(t, removed) + require_NoError(t, err) + + state = n.wal.State() + require_Equal(t, state.Msgs, 1) + require_Equal(t, state.FirstSeq, 1) + require_Equal(t, state.LastSeq, 2) + require_Equal(t, state.NumDeleted, 1) + + // Restart. + n.Stop() + n.WaitForStop() + require_NoError(t, fs.Stop()) + fs, err = newFileStore(fcfg, scfg) + require_NoError(t, err) + cfg = &RaftConfig{Name: "TEST", Store: storeDir, Log: fs} + n, err = s.initRaftNode(globalAccountName, cfg, pprofLabels{}) + require_NoError(t, err) + + // Should truncate the WAL on startup, the message was removed. + state = n.wal.State() + require_Equal(t, state.Msgs, 1) + require_Equal(t, state.FirstSeq, 1) + require_Equal(t, state.LastSeq, 1) + require_Equal(t, state.NumDeleted, 0) + + // Store an invalid append entry manually. + aeMsg2.pindex = 0 + aeMsg2 = encode(t, aeMsg2) + _, _, err = n.wal.StoreMsg(_EMPTY_, nil, aeMsg2.buf, 0) + require_NoError(t, err) + + state = n.wal.State() + require_Equal(t, state.Msgs, 2) + require_Equal(t, state.FirstSeq, 1) + require_Equal(t, state.LastSeq, 2) + require_Equal(t, state.NumDeleted, 0) + + // Restart. + n.Stop() + n.WaitForStop() + require_NoError(t, fs.Stop()) + fs, err = newFileStore(fcfg, scfg) + require_NoError(t, err) + cfg = &RaftConfig{Name: "TEST", Store: storeDir, Log: fs} + n, err = s.initRaftNode(globalAccountName, cfg, pprofLabels{}) + require_NoError(t, err) + + // Should truncate the WAL on startup, the append entry is invalid. + state = n.wal.State() + require_Equal(t, state.Msgs, 1) + require_Equal(t, state.FirstSeq, 1) + require_Equal(t, state.LastSeq, 1) + require_Equal(t, state.NumDeleted, 0) +} + // This is a RaftChainOfBlocks test where a block is proposed and then we wait for all replicas to apply it before // proposing the next one. // The test may fail if: