diff --git a/server/filestore.go b/server/filestore.go index 6eebcb63141..177ad0d82a8 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -1498,7 +1498,7 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) { minTombstoneTs int64 ) - // To detect gaps from compaction. + // To detect gaps from compaction, and to ensure the sequence keeps moving up. var last uint64 for index, lbuf := uint32(0), uint32(len(buf)); index < lbuf; { @@ -1587,6 +1587,13 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) { firstNeedsSet, mb.first.ts = false, ts } + // The sequence needs to only ever move up. + if seq <= last { + // Advance to next record. + // We've already accounted for this sequence and marked it as deleted. + index += rl + continue + } if !mb.dmap.Exists(seq) { mb.msgs++ mb.bytes += uint64(rl) @@ -6872,6 +6879,9 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error { var seq, ttls, schedules uint64 var sm StoreMsg // Used for finding TTL headers + // To ensure the sequence keeps moving up. + var last uint64 + for index < lbuf { if index+msgHdrSize > lbuf { return errCorruptState @@ -6903,6 +6913,15 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error { erased := seq&ebit != 0 seq = seq &^ ebit + // The sequence needs to only ever move up. + if seq <= last { + // Advance to next record. + // We've already accounted for this sequence and marked it as deleted. + index += rl + continue + } + last = seq + // We defer checksum checks to individual msg cache lookups to amortorize costs and // not introduce latency for first message from a newly loaded block. if seq >= mbFirstSeq { diff --git a/server/filestore_test.go b/server/filestore_test.go index fdbf5fae2f2..475faa4d9b8 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -10424,3 +10424,59 @@ func TestFileStoreMessageScheduleEncodeDecode(t *testing.T) { require_Equal(t, sched.seq, nsched.seq) } } + +func TestFileStoreCorruptedNonOrderedSequences(t *testing.T) { + for _, test := range []struct { + title string + seqs []uint64 + msgs uint64 + deleted int + }{ + {title: "Unordered", seqs: []uint64{1, 3, 2, 4}, msgs: 3, deleted: 1}, + {title: "Duplicated", seqs: []uint64{1, 2, 2, 3}, msgs: 3, deleted: 0}, + } { + t.Run(test.title, func(t *testing.T) { + testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) { + cfg := StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage} + created := time.Now() + fs, err := newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() + + for _, seq := range test.seqs { + _, err = fs.writeMsgRecord(seq, 0, _EMPTY_, nil, nil) + require_NoError(t, err) + } + + fs.mu.RLock() + lmb := fs.lmb + fs.mu.RUnlock() + + // The filestore will not yet know that something was corrupt. + lmb.mu.RLock() + defer lmb.mu.RUnlock() + require_Equal(t, lmb.msgs, 4) + require_Equal(t, lmb.dmap.Size(), 0) + + // Need to reset, otherwise the rebuild will be incorrect. + atomic.StoreUint64(&lmb.first.seq, 0) + + // Upon rebuild it should realize and correct. + _, _, err = lmb.rebuildStateLocked() + require_NoError(t, err) + require_Equal(t, lmb.msgs, test.msgs) + require_Equal(t, lmb.dmap.Size(), test.deleted) + + // Indexing should also realize and correct. + require_True(t, lmb.cacheNotLoaded()) + buf, err := lmb.loadBlock(nil) + require_NoError(t, err) + require_NoError(t, lmb.encryptOrDecryptIfNeeded(buf)) + buf, err = lmb.decompressIfNeeded(buf) + require_NoError(t, err) + require_NoError(t, lmb.indexCacheBuf(buf)) + require_True(t, lmb.cacheAlreadyLoaded()) + }) + }) + } +}