From bcfbfb8b5a75d38e24a2d5313299be91340110db Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Mon, 6 Oct 2025 19:31:16 +0200 Subject: [PATCH] [FIXED] Filestore detect delete gap with last SkipMsg Signed-off-by: Maurice van Veen --- server/filestore.go | 32 ++++++++++++++++++------------ server/filestore_test.go | 43 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 62 insertions(+), 13 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index c607617e453..3096bdadc62 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -1527,6 +1527,23 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) { var last uint64 var hb [highwayhash.Size64]byte + updateLast := func(seq uint64, ts int64) { + // The sequence needs to only ever move up. + if seq <= last { + return + } + + // Check for any gaps from compaction, meaning no ebit entry. + if last > 0 && seq != last+1 { + for dseq := last + 1; dseq < seq; dseq++ { + addToDmap(dseq) + } + } + last = seq + atomic.StoreUint64(&mb.last.seq, last) + mb.last.ts = ts + } + for index, lbuf := uint32(0), uint32(len(buf)); index < lbuf; { if index+msgHdrSize > lbuf { truncate(index) @@ -1591,8 +1608,7 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) { if seq == 0 || seq&ebit != 0 || seq < fseq { seq = seq &^ ebit if seq >= fseq { - atomic.StoreUint64(&mb.last.seq, seq) - mb.last.ts = ts + updateLast(seq, ts) if mb.msgs == 0 { atomic.StoreUint64(&mb.first.seq, seq+1) mb.first.ts = 0 @@ -1640,17 +1656,7 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) { } } - // Check for any gaps from compaction, meaning no ebit entry. - if last > 0 && seq != last+1 { - for dseq := last + 1; dseq < seq; dseq++ { - addToDmap(dseq) - } - } - - // Always set last - last = seq - atomic.StoreUint64(&mb.last.seq, last) - mb.last.ts = ts + updateLast(seq, ts) // Advance to next record. index += rl diff --git a/server/filestore_test.go b/server/filestore_test.go index 7bbeef720e9..29bb77a9693 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -10852,3 +10852,46 @@ func TestFileStoreTombstonesSelectNextFirstCleanupOnRecovery(t *testing.T) { } }) } + +func TestFileStoreDetectDeleteGapWithLastSkipMsg(t *testing.T) { + testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) { + cfg := StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage} + created := time.Now() + fs, err := newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() + + _, _, err = fs.StoreMsg("foo", nil, nil, 0) + require_NoError(t, err) + + // Skip a message at a sequence such that a gap is created. + // The gap should be detected later on as deleted messages. + require_NoError(t, fs.SkipMsgs(2, 3)) + + // We should have 3 deletes, one is the skip msg, the other two is the gap. + before := fs.State() + require_Equal(t, before.Msgs, 1) + require_Equal(t, before.FirstSeq, 1) + require_Equal(t, before.LastSeq, 4) + require_Equal(t, before.NumDeleted, 3) + + // Make sure we can recover properly with no index.db present. + fs.Stop() + os.Remove(filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile)) + + fs, err = newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() + + if state := fs.State(); !reflect.DeepEqual(state, before) { + t.Fatalf("Expected state of %+v, got %+v", before, state) + } + + mb := fs.getFirstBlock() + mb.mu.RLock() + defer mb.mu.RUnlock() + require_Equal(t, atomic.LoadUint64(&mb.first.seq), 1) + require_Equal(t, atomic.LoadUint64(&mb.last.seq), 4) + require_Len(t, mb.dmap.Size(), 3) + }) +}