diff --git a/server/filestore.go b/server/filestore.go index 9f7f32f4a58..c607617e453 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -270,7 +270,6 @@ type cache struct { buf []byte wp int idx []uint32 - lrl uint32 fseq uint64 nra bool } @@ -5071,7 +5070,10 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) ( // If erase but block is empty, we can simply remove the block later. if secure && !isEmpty { // Grab record info, but use the pre-computed record length. - ri, _, _, _ := mb.slotInfo(int(seq - mb.cache.fseq)) + ri, _, _, err := mb.slotInfo(int(seq - mb.cache.fseq)) + if err != nil { + return false, err + } if err := mb.eraseMsg(seq, int(ri), int(msz), isLastBlock); err != nil { mb.finishedWithCache() return false, err @@ -5316,7 +5318,14 @@ func (mb *msgBlock) compactWithFloor(floor uint64) { // Grab info from a slot. // Lock should be held. func (mb *msgBlock) slotInfo(slot int) (uint32, uint32, bool, error) { - if slot < 0 || mb.cache == nil || slot >= len(mb.cache.idx) { + switch { + case mb.cache == nil: // Shouldn't be possible, but check it anyway. + return 0, 0, false, errNoCache + case slot < 0: + mb.fs.warn("Partial cache: offset slot index %d is less zero", slot) + return 0, 0, false, errPartialCache + case slot >= len(mb.cache.idx): + mb.fs.warn("Partial cache: offset slot index %d is greater than index len %d", slot, len(mb.cache.idx)) return 0, 0, false, errPartialCache } @@ -5330,24 +5339,20 @@ func (mb *msgBlock) slotInfo(slot int) (uint32, uint32, bool, error) { // Determine record length var rl uint32 - if slot >= len(mb.cache.idx) { - rl = mb.cache.lrl - } else { - // Need to account for dbit markers in idx. - // So we will walk until we find valid idx slot to calculate rl. - for i := 1; slot+i < len(mb.cache.idx); i++ { - ni := mb.cache.idx[slot+i] &^ cbit - if ni == dbit { - continue - } - rl = ni - ri - break - } - // check if we had all trailing dbits. - // If so use len of cache buf minus ri. - if rl == 0 { - rl = uint32(len(mb.cache.buf)) - ri + // Need to account for dbit markers in idx. + // So we will walk until we find valid idx slot to calculate rl. + for i := 1; slot+i < len(mb.cache.idx); i++ { + ni := mb.cache.idx[slot+i] &^ cbit + if ni == dbit { + continue } + rl = ni - ri + break + } + // check if we had all trailing dbits. + // If so use len of cache buf minus ri. + if rl == 0 { + rl = uint32(len(mb.cache.buf)) - ri } if rl < msgHdrSize { return 0, 0, false, errBadMsg{mb.mfn, fmt.Sprintf("length too short for slot %d", slot)} @@ -5772,10 +5777,10 @@ func (mb *msgBlock) tryForceExpireCache() { // We will attempt to force expire this by temporarily clearing the last load time. func (mb *msgBlock) tryForceExpireCacheLocked() { - llts := mb.llts - mb.llts = 0 + llts, lwts := mb.llts, mb.lwts + mb.llts, mb.lwts = 0, 0 mb.expireCacheLocked() - mb.llts = llts + mb.llts, mb.lwts = llts, lwts } // This is for expiration of the write cache, which will be partial with fip. @@ -5850,6 +5855,7 @@ func (mb *msgBlock) expireCacheLocked() { recycleMsgBlockBuf(mb.cache.buf) } mb.cache.buf = nil + mb.cache.idx = mb.cache.idx[:0] mb.cache.wp = 0 } @@ -6342,7 +6348,6 @@ func (mb *msgBlock) writeMsgRecordLocked(rl, seq uint64, subj string, mhdr, msg // Update write through cache. // Write to msg record. mb.cache.buf = append(mb.cache.buf, checksum...) - mb.cache.lrl = uint32(rl) // Set cache timestamp for last store. mb.lwts = ts @@ -7051,7 +7056,6 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error { } // Add to our index. idx = append(idx, index) - mb.cache.lrl = uint32(rl) // Adjust if we guessed wrong. if seq != 0 && seq < fseq { fseq = seq @@ -7599,7 +7603,7 @@ func (mb *msgBlock) cacheLookupEx(seq uint64, sm *StoreMsg, doCopy bool) (*Store } // Check partial cache status. if seq < mb.cache.fseq { - mb.fs.warn("Cache lookup detected partial cache: seq %d vs cache fseq %d", seq, mb.cache.fseq) + mb.fs.warn("Partial cache: seq %d is less than cache fseq %d", seq, mb.cache.fseq) return nil, errPartialCache } @@ -7613,6 +7617,7 @@ func (mb *msgBlock) cacheLookupEx(seq uint64, sm *StoreMsg, doCopy bool) (*Store li := int(bi) if li >= len(mb.cache.buf) { + mb.fs.warn("Partial cache: slot index %d is less than cache buffer len %d", li, len(mb.cache.buf)) return nil, errPartialCache } buf := mb.cache.buf[li:] @@ -7635,8 +7640,7 @@ func (mb *msgBlock) cacheLookupEx(seq uint64, sm *StoreMsg, doCopy bool) (*Store } if seq != fsm.seq { // See TestFileStoreInvalidIndexesRebuilt. - recycleMsgBlockBuf(mb.cache.buf) - mb.cache.buf = nil + mb.tryForceExpireCacheLocked() return nil, fmt.Errorf("sequence numbers for cache load did not match, %d vs %d", seq, fsm.seq) } diff --git a/server/filestore_test.go b/server/filestore_test.go index 42b3f059ff9..7bbeef720e9 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -1784,7 +1784,7 @@ func TestFileStoreInvalidIndexesRebuilt(t *testing.T) { // to discard the cache. _, err = mb.cacheLookupEx(1, nil, false) require_Error(t, err) - require_True(t, mb.cache.buf == nil) + require_True(t, mb.ecache.Value() == nil) // Now fetchMsg should notice and rebuild the index with the // correct sequence from disk.