diff --git a/server/filestore.go b/server/filestore.go index 0630083f8a0..f2edebbdb72 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -6394,10 +6394,17 @@ func (mb *msgBlock) writeMsgRecordLocked(rl, seq uint64, subj string, mhdr, msg // Only update index and do accounting if not a delete tombstone. if seq&tbit == 0 { + last := atomic.LoadUint64(&mb.last.seq) // Accounting, do this before stripping ebit, it is ebit aware. mb.updateAccounting(seq, ts, rl) // Strip ebit if set. seq = seq &^ ebit + // If we have a hole due to skipping many messages, fill it. + if len(mb.cache.idx) > 0 && last+1 < seq { + for dseq := last + 1; dseq < seq; dseq++ { + mb.cache.idx = append(mb.cache.idx, dbit) + } + } // Write index if mb.cache.idx = append(mb.cache.idx, uint32(index)|cbit); len(mb.cache.idx) == 1 { mb.cache.fseq = seq diff --git a/server/filestore_test.go b/server/filestore_test.go index d5e5906b0c3..22f3f2c5323 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -11087,3 +11087,49 @@ func TestFileStoreMissingDeletesAfterCompact(t *testing.T) { require_Equal(t, atomic.LoadUint64(&fmb.last.seq), 2) }) } + +func TestFileStoreIdxAccountingForSkipMsgs(t *testing.T) { + test := func(t *testing.T, skipMany bool) { + testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) { + cfg := StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage} + created := time.Now() + fs, err := newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() + + _, _, err = fs.StoreMsg("foo", nil, nil, 0) + require_NoError(t, err) + if skipMany { + require_NoError(t, fs.SkipMsgs(2, 10)) + } else { + for i := range 10 { + _, err = fs.SkipMsg(uint64(i + 2)) + require_NoError(t, err) + } + } + _, _, err = fs.StoreMsg("foo", nil, nil, 0) + require_NoError(t, err) + + fmb := fs.getFirstBlock() + fmb.mu.Lock() + defer fmb.mu.Unlock() + + for i := range 12 { + seq := uint64(i + 1) + _, err = fmb.cacheLookupNoCopy(seq, nil) + if seq >= 2 && seq <= 11 { + require_Error(t, err, errDeletedMsg) + } else { + require_NoError(t, err) + } + } + + cache := fmb.cache + require_NotNil(t, cache) + require_Len(t, len(cache.idx), 12) + }) + } + + t.Run("SkipMsg", func(t *testing.T) { test(t, false) }) + t.Run("SkipMsgs", func(t *testing.T) { test(t, true) }) +}