Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 32 additions & 28 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,6 @@ type cache struct {
buf []byte
wp int
idx []uint32
lrl uint32
fseq uint64
nra bool
}
Expand Down Expand Up @@ -5071,7 +5070,10 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
// If erase but block is empty, we can simply remove the block later.
if secure && !isEmpty {
// Grab record info, but use the pre-computed record length.
ri, _, _, _ := mb.slotInfo(int(seq - mb.cache.fseq))
ri, _, _, err := mb.slotInfo(int(seq - mb.cache.fseq))
if err != nil {
return false, err
}
if err := mb.eraseMsg(seq, int(ri), int(msz), isLastBlock); err != nil {
mb.finishedWithCache()
return false, err
Expand Down Expand Up @@ -5316,7 +5318,14 @@ func (mb *msgBlock) compactWithFloor(floor uint64) {
// Grab info from a slot.
// Lock should be held.
func (mb *msgBlock) slotInfo(slot int) (uint32, uint32, bool, error) {
if slot < 0 || mb.cache == nil || slot >= len(mb.cache.idx) {
switch {
case mb.cache == nil: // Shouldn't be possible, but check it anyway.
return 0, 0, false, errNoCache
case slot < 0:
mb.fs.warn("Partial cache: offset slot index %d is less zero", slot)
return 0, 0, false, errPartialCache
case slot >= len(mb.cache.idx):
mb.fs.warn("Partial cache: offset slot index %d is greater than index len %d", slot, len(mb.cache.idx))
return 0, 0, false, errPartialCache
}

Expand All @@ -5330,24 +5339,20 @@ func (mb *msgBlock) slotInfo(slot int) (uint32, uint32, bool, error) {

// Determine record length
var rl uint32
if slot >= len(mb.cache.idx) {
rl = mb.cache.lrl
} else {
// Need to account for dbit markers in idx.
// So we will walk until we find valid idx slot to calculate rl.
for i := 1; slot+i < len(mb.cache.idx); i++ {
ni := mb.cache.idx[slot+i] &^ cbit
if ni == dbit {
continue
}
rl = ni - ri
break
}
// check if we had all trailing dbits.
// If so use len of cache buf minus ri.
if rl == 0 {
rl = uint32(len(mb.cache.buf)) - ri
// Need to account for dbit markers in idx.
// So we will walk until we find valid idx slot to calculate rl.
for i := 1; slot+i < len(mb.cache.idx); i++ {
ni := mb.cache.idx[slot+i] &^ cbit
if ni == dbit {
continue
}
rl = ni - ri
break
}
// check if we had all trailing dbits.
// If so use len of cache buf minus ri.
if rl == 0 {
rl = uint32(len(mb.cache.buf)) - ri
}
if rl < msgHdrSize {
return 0, 0, false, errBadMsg{mb.mfn, fmt.Sprintf("length too short for slot %d", slot)}
Expand Down Expand Up @@ -5772,10 +5777,10 @@ func (mb *msgBlock) tryForceExpireCache() {

// We will attempt to force expire this by temporarily clearing the last load time.
func (mb *msgBlock) tryForceExpireCacheLocked() {
llts := mb.llts
mb.llts = 0
llts, lwts := mb.llts, mb.lwts
mb.llts, mb.lwts = 0, 0
mb.expireCacheLocked()
mb.llts = llts
mb.llts, mb.lwts = llts, lwts
}

// This is for expiration of the write cache, which will be partial with fip.
Expand Down Expand Up @@ -5850,6 +5855,7 @@ func (mb *msgBlock) expireCacheLocked() {
recycleMsgBlockBuf(mb.cache.buf)
}
mb.cache.buf = nil
mb.cache.idx = mb.cache.idx[:0]
mb.cache.wp = 0
}

Expand Down Expand Up @@ -6342,7 +6348,6 @@ func (mb *msgBlock) writeMsgRecordLocked(rl, seq uint64, subj string, mhdr, msg
// Update write through cache.
// Write to msg record.
mb.cache.buf = append(mb.cache.buf, checksum...)
mb.cache.lrl = uint32(rl)

// Set cache timestamp for last store.
mb.lwts = ts
Expand Down Expand Up @@ -7051,7 +7056,6 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error {
}
// Add to our index.
idx = append(idx, index)
mb.cache.lrl = uint32(rl)
// Adjust if we guessed wrong.
if seq != 0 && seq < fseq {
fseq = seq
Expand Down Expand Up @@ -7599,7 +7603,7 @@ func (mb *msgBlock) cacheLookupEx(seq uint64, sm *StoreMsg, doCopy bool) (*Store
}
// Check partial cache status.
if seq < mb.cache.fseq {
mb.fs.warn("Cache lookup detected partial cache: seq %d vs cache fseq %d", seq, mb.cache.fseq)
mb.fs.warn("Partial cache: seq %d is less than cache fseq %d", seq, mb.cache.fseq)
return nil, errPartialCache
}

Expand All @@ -7613,6 +7617,7 @@ func (mb *msgBlock) cacheLookupEx(seq uint64, sm *StoreMsg, doCopy bool) (*Store

li := int(bi)
if li >= len(mb.cache.buf) {
mb.fs.warn("Partial cache: slot index %d is less than cache buffer len %d", li, len(mb.cache.buf))
return nil, errPartialCache
}
buf := mb.cache.buf[li:]
Expand All @@ -7635,8 +7640,7 @@ func (mb *msgBlock) cacheLookupEx(seq uint64, sm *StoreMsg, doCopy bool) (*Store
}

if seq != fsm.seq { // See TestFileStoreInvalidIndexesRebuilt.
recycleMsgBlockBuf(mb.cache.buf)
mb.cache.buf = nil
mb.tryForceExpireCacheLocked()
return nil, fmt.Errorf("sequence numbers for cache load did not match, %d vs %d", seq, fsm.seq)
}

Expand Down
2 changes: 1 addition & 1 deletion server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1784,7 +1784,7 @@ func TestFileStoreInvalidIndexesRebuilt(t *testing.T) {
// to discard the cache.
_, err = mb.cacheLookupEx(1, nil, false)
require_Error(t, err)
require_True(t, mb.cache.buf == nil)
require_True(t, mb.ecache.Value() == nil)

// Now fetchMsg should notice and rebuild the index with the
// correct sequence from disk.
Expand Down
Loading