From 2cd01871a90efbbb4d89cd4c53177361986ed196 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Wed, 16 Jul 2025 18:01:54 +0200 Subject: [PATCH] [FIXED] Panic when selectMsgBlock returns nil Signed-off-by: Maurice van Veen --- server/filestore.go | 35 +++++++++++++++++++---------------- server/filestore_test.go | 39 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 58 insertions(+), 16 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index b32bd623075..734b6d58b0b 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -1917,31 +1917,34 @@ func (fs *fileStore) recoverTTLState() error { defer fs.resetAgeChk(0) if fs.state.Msgs > 0 && ttlseq <= fs.state.LastSeq { fs.warn("TTL state is outdated; attempting to recover using linear scan (seq %d to %d)", ttlseq, fs.state.LastSeq) - var sm StoreMsg - mb := fs.selectMsgBlock(ttlseq) - if mb == nil { - return nil - } - mblseq := atomic.LoadUint64(&mb.last.seq) + var ( + mb *msgBlock + sm StoreMsg + mblseq uint64 + ) for seq := ttlseq; seq <= fs.state.LastSeq; seq++ { retry: + if mb == nil { + if mb = fs.selectMsgBlock(seq); mb == nil { + // Selecting the message block should return a block that contains this sequence, + // or a later block if it can't be found. + // It's an error if we can't find any block within the bounds of first and last seq. + fs.warn("Error loading msg block with seq %d for recovering TTL: %s", seq) + continue + } + seq = atomic.LoadUint64(&mb.first.seq) + mblseq = atomic.LoadUint64(&mb.last.seq) + } if mb.ttls == 0 { // None of the messages in the block have message TTLs so don't // bother doing anything further with this block, skip to the end. seq = atomic.LoadUint64(&mb.last.seq) + 1 } if seq > mblseq { - // We've reached the end of the loaded block, see if we can continue - // by loading the next one. + // We've reached the end of the loaded block, so let's go back to the + // beginning and process the next block. mb.tryForceExpireCache() - if mb = fs.selectMsgBlock(seq); mb == nil { - // TODO(nat): Deal with gaps properly. Right now this will be - // probably expensive on CPU. - continue - } - mblseq = atomic.LoadUint64(&mb.last.seq) - // At this point we've loaded another block, so let's go back to the - // beginning and see if we need to skip this one too. + mb = nil goto retry } msg, _, err := mb.fetchMsgNoCopy(seq, &sm) diff --git a/server/filestore_test.go b/server/filestore_test.go index 112437c02cc..bdcf4660d1a 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -9726,3 +9726,42 @@ func TestFileStoreFirstMatchingMultiExpiry(t *testing.T) { require_True(t, didLoad) // last message, should expire }) } + +func TestFileStoreNoPanicOnRecoverTTLWithCorruptBlocks(t *testing.T) { + testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) { + fs, err := newFileStoreWithCreated(fcfg, StreamConfig{Name: "zzz", Storage: FileStorage, AllowMsgTTL: true}, time.Now(), prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() + + hdr := genHeader(nil, JSMessageTTL, "1") + for i := range 3 { + if i > 0 { + _, err = fs.newMsgBlockForWrite() + require_NoError(t, err) + } + _, _, err = fs.StoreMsg("foo", hdr, []byte("A"), 1) + require_NoError(t, err) + } + + fs.mu.Lock() + if blks := len(fs.blks); blks != 3 { + fs.mu.Unlock() + t.Fatalf("Expected 3 blocks, got %d", blks) + } + + // Manually corrupt the blocks by removing the second and changing the + // sequence range for the last to that of the first. + fmb := fs.blks[0] + smb := fs.blks[1] + lmb := fs.lmb + fseq, lseq := atomic.LoadUint64(&fmb.first.seq), atomic.LoadUint64(&fmb.last.seq) + smb.mu.Lock() + fs.removeMsgBlock(smb) + smb.mu.Unlock() + fs.mu.Unlock() + atomic.StoreUint64(&lmb.first.seq, fseq) + atomic.StoreUint64(&lmb.last.seq, lseq) + + require_NoError(t, fs.recoverTTLState()) + }) +}