Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -6849,11 +6849,19 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error {
dms := uint64(mb.dmap.Size())
idxSz := mbLastSeq - mbFirstSeq + 1

if mb.cache == nil {
if mb.cache == nil || mb.cache.buf == nil {
// Approximation, may adjust below.
fseq = mbFirstSeq
idx = make([]uint32, 0, idxSz)
mb.cache = &cache{}
if mb.cache != nil && mb.cache.idx != nil {
idx = mb.cache.idx[:0]
} else {
idx = make([]uint32, 0, idxSz)
}
if mb.cache == nil {
mb.cache = &cache{}
} else {
*mb.cache = cache{}
}
} else {
fseq = mb.cache.fseq
idx = mb.cache.idx
Expand Down Expand Up @@ -7535,7 +7543,7 @@ func (mb *msgBlock) cacheLookupEx(seq uint64, sm *StoreMsg, doCopy bool) (*Store
return nil, errDeletedMsg
}

if seq != fsm.seq {
if seq != fsm.seq { // See TestFileStoreInvalidIndexesRebuilt.
recycleMsgBlockBuf(mb.cache.buf)
mb.cache.buf = nil
return nil, fmt.Errorf("sequence numbers for cache load did not match, %d vs %d", seq, fsm.seq)
Expand Down
44 changes: 44 additions & 0 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1746,6 +1746,50 @@ func TestFileStorePartialIndexes(t *testing.T) {
})
}

func TestFileStoreInvalidIndexesRebuilt(t *testing.T) {
testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) {
fs, err := newFileStoreWithCreated(fcfg, StreamConfig{Name: "zzz", Storage: FileStorage}, time.Now(), prf(&fcfg), nil)
require_NoError(t, err)
defer fs.Stop()

toSend := 5
for i := 0; i < toSend; i++ {
fs.StoreMsg("foo", nil, []byte("ok-1"), 0)
}
fs.FlushAllPending()

// Now we're going to mangle the in-memory cache by changing
// the sequence number of the first message. We also need to
// trick the cache into believing the hash is already validated
// so we don't fail on that. This is specifically testing the
// seq != fsm.seq condition.
mb := fs.selectMsgBlock(1)
require_NotNil(t, mb)
require_NoError(t, mb.loadMsgs())
require_True(t, mb.cacheAlreadyLoaded())
ri, rl, _, err := mb.slotInfo(0)
require_NoError(t, err)
require_NotNil(t, mb.cache)
require_NotNil(t, mb.cache.buf)
slot := mb.cache.buf[ri : ri+rl]
require_Equal(t, binary.LittleEndian.Uint64(slot[4:]), 1)
binary.LittleEndian.PutUint64(slot[4:], 12345)
mb.cache.idx[0] = (mb.cache.idx[0] | cbit)

// Expect an error on the first instance and for cacheLookupEx
// to discard the cache.
_, err = mb.cacheLookupEx(1, nil, false)
require_Error(t, err)
require_True(t, mb.cache.buf == nil)

// Now fetchMsg should notice and rebuild the index with the
// correct sequence from disk.
sm, _, err := mb.fetchMsg(1, nil)
require_NoError(t, err)
require_Equal(t, sm.seq, 1)
})
}

func TestFileStoreSnapshot(t *testing.T) {
testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) {
subj, msg := "foo", []byte("Hello Snappy!")
Expand Down