Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4882,14 +4882,14 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
}
if err == ErrStoreMsgNotFound || err == errDeletedMsg || err == ErrStoreEOF || err == errMaxAckPending {
goto waitForMsgs
} else if err == errPartialCache {
s.Warnf("Unexpected partial cache error looking up message for consumer '%s > %s > %s'",
o.mset.acc, stream, o.cfg.Name)
goto waitForMsgs

} else {
s.Errorf("Received an error looking up message for consumer '%s > %s > %s': %v",
o.mset.acc, stream, o.cfg.Name, err)
if pmsg != nil {
s.Errorf("Received an error looking up message with sequence %d for consumer '%s > %s > %s': %v",
pmsg.seq, o.mset.acc, stream, o.cfg.Name, err)
} else {
s.Errorf("Received an error looking up message for consumer '%s > %s > %s': %v",
o.mset.acc, stream, o.cfg.Name, err)
}
goto waitForMsgs
}
}
Expand Down
42 changes: 26 additions & 16 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1521,7 +1521,7 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) {
// Do some quick sanity checks here.
if dlen < 0 || slen > (dlen-recordHashSize) || dlen > int(rl) || index+rl > lbuf || rl > rlBadThresh {
truncate(index)
return gatherLost(lbuf - index), tombstones, errBadMsg
return gatherLost(lbuf - index), tombstones, errBadMsg{mb.mfn, fmt.Sprintf("sanity check failed (dlen %d slen %d rl %d index %d lbuf %d)", dlen, slen, rl, index, lbuf)}
}

// Check for checksum failures before additional processing.
Expand All @@ -1539,7 +1539,7 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) {
checksum := hh.Sum(nil)
if !bytes.Equal(checksum, data[len(data)-recordHashSize:]) {
truncate(index)
return gatherLost(lbuf - index), tombstones, errBadMsg
return gatherLost(lbuf - index), tombstones, errBadMsg{mb.mfn, "invalid checksum"}
}
copy(mb.lchk[0:], checksum)
}
Expand Down Expand Up @@ -5291,7 +5291,7 @@ func (mb *msgBlock) slotInfo(slot int) (uint32, uint32, bool, error) {
}
}
if rl < msgHdrSize {
return 0, 0, false, errBadMsg
return 0, 0, false, errBadMsg{mb.mfn, fmt.Sprintf("length too short for slot %d", slot)}
}
return uint32(ri), rl, hashChecked, nil
}
Expand Down Expand Up @@ -6840,7 +6840,7 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error {

// Sanity check here since we calculate size to allocate based on this.
if mbFirstSeq > (mbLastSeq + 1) { // Purged state first == last + 1
mb.fs.warn("indexCacheBuf corrupt state: mb.first %d mb.last %d", mbFirstSeq, mbLastSeq)
mb.fs.warn("indexCacheBuf corrupt state in %s: mb.first %d mb.last %d", mb.mfn, mbFirstSeq, mbLastSeq)
// This would cause idxSz to wrap.
return errCorruptState
}
Expand Down Expand Up @@ -6905,7 +6905,7 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error {

// Do some quick sanity checks here.
if dlen < 0 || slen > (dlen-recordHashSize) || dlen > int(rl) || index+rl > lbuf || rl > rlBadThresh {
mb.fs.warn("indexCacheBuf corrupt record state: dlen %d slen %d index %d rl %d lbuf %d", dlen, slen, index, rl, lbuf)
mb.fs.warn("indexCacheBuf corrupt record state in %s: dlen %d slen %d index %d rl %d lbuf %d", mb.mfn, dlen, slen, index, rl, lbuf)
// This means something is off.
// TODO(dlc) - Add into bad list?
return errCorruptState
Expand Down Expand Up @@ -7423,7 +7423,6 @@ func (mb *msgBlock) fetchMsgEx(seq uint64, sm *StoreMsg, doCopy bool) (*StoreMsg

var (
errNoCache = errors.New("no message cache")
errBadMsg = errors.New("malformed or corrupt message")
errDeletedMsg = errors.New("deleted message")
errPartialCache = errors.New("partial cache")
errNoPending = errors.New("message block does not have pending data")
Expand All @@ -7441,6 +7440,17 @@ var (
errStateTooBig = errors.New("store state too big for optional write")
)

type (
errBadMsg struct{ fn, detail string }
)

func (e errBadMsg) Error() string {
if e.detail != _EMPTY_ {
return fmt.Sprintf("malformed or corrupt message in %s: %s", filepath.Base(e.fn), e.detail)
}
return fmt.Sprintf("malformed or corrupt message in %s", filepath.Base(e.fn))
}

const (
// "Checksum bit" is used in "mb.cache.idx" for marking messages that have had their checksums checked.
cbit = 1 << 31
Expand Down Expand Up @@ -7644,7 +7654,7 @@ func (mb *msgBlock) msgFromBufNoCopy(buf []byte, sm *StoreMsg, hh hash.Hash64) (
// Lock should be held.
func (mb *msgBlock) msgFromBufEx(buf []byte, sm *StoreMsg, hh hash.Hash64, doCopy bool) (*StoreMsg, error) {
if len(buf) < emptyRecordLen {
return nil, errBadMsg
return nil, errBadMsg{mb.mfn, "record too short"}
}
var le = binary.LittleEndian

Expand All @@ -7656,7 +7666,7 @@ func (mb *msgBlock) msgFromBufEx(buf []byte, sm *StoreMsg, hh hash.Hash64, doCop
slen := int(le.Uint16(hdr[20:]))
// Simple sanity check.
if dlen < 0 || slen > (dlen-recordHashSize) || dlen > int(rl) || int(rl) > len(buf) || rl > rlBadThresh {
return nil, errBadMsg
return nil, errBadMsg{mb.mfn, fmt.Sprintf("sanity check failed (dlen %d slen %d rl %d buf %d)", dlen, slen, rl, buf)}
}
data := buf[msgHdrSize : msgHdrSize+dlen]
// Do checksum tests here if requested.
Expand All @@ -7670,7 +7680,7 @@ func (mb *msgBlock) msgFromBufEx(buf []byte, sm *StoreMsg, hh hash.Hash64, doCop
hh.Write(data[slen : dlen-recordHashSize])
}
if !bytes.Equal(hh.Sum(nil), data[len(data)-8:]) {
return nil, errBadMsg
return nil, errBadMsg{mb.mfn, "invalid checksum"}
}
}
seq := le.Uint64(hdr[4:])
Expand All @@ -7689,18 +7699,18 @@ func (mb *msgBlock) msgFromBufEx(buf []byte, sm *StoreMsg, hh hash.Hash64, doCop
// layers and for us to be safe to expire, and recycle, the large msgBlocks.
end := dlen - 8
if len(data) < end {
return nil, errBadMsg
return nil, errBadMsg{mb.mfn, "invalid data length"}
}

if hasHeaders {
if slen+4 > len(data) {
return nil, errBadMsg
return nil, errBadMsg{mb.mfn, "invalid subject length greataer than data length"}
}
hl := le.Uint32(data[slen:])
bi := slen + 4
li := bi + int(hl)
if bi > end {
return nil, errBadMsg
return nil, errBadMsg{mb.mfn, "invalid buffer index"}
}
if doCopy {
sm.buf = append(sm.buf, data[bi:end]...)
Expand All @@ -7709,13 +7719,13 @@ func (mb *msgBlock) msgFromBufEx(buf []byte, sm *StoreMsg, hh hash.Hash64, doCop
}
li, end = li-bi, end-bi
if li > len(sm.buf) || end > len(sm.buf) {
return nil, errBadMsg
return nil, errBadMsg{mb.mfn, "invalid message length or end greater than buffer length"}
}
sm.hdr = sm.buf[0:li:li]
sm.msg = sm.buf[li:end]
} else {
if slen > end {
return nil, errBadMsg
return nil, errBadMsg{mb.mfn, "invalid subject length greater than end"}
}
if doCopy {
sm.buf = append(sm.buf, data[slen:end]...)
Expand All @@ -7724,14 +7734,14 @@ func (mb *msgBlock) msgFromBufEx(buf []byte, sm *StoreMsg, hh hash.Hash64, doCop
}
mlen := end - slen
if mlen > len(sm.buf) {
return nil, errBadMsg
return nil, errBadMsg{mb.mfn, "invalid message length greater than buffer length"}
}
sm.msg = sm.buf[0:mlen]
}
sm.seq, sm.ts = seq, ts
if slen > 0 {
if slen > len(data) {
return nil, errBadMsg
return nil, errBadMsg{mb.mfn, "invalid subject length greater than data length"}
}
if doCopy {
// Make a copy since sm.subj lifetime may last longer.
Expand Down
Loading