diff --git a/server/consumer.go b/server/consumer.go index 3a46aaea197..080ec7a1fdc 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -4882,14 +4882,14 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) { } if err == ErrStoreMsgNotFound || err == errDeletedMsg || err == ErrStoreEOF || err == errMaxAckPending { goto waitForMsgs - } else if err == errPartialCache { - s.Warnf("Unexpected partial cache error looking up message for consumer '%s > %s > %s'", - o.mset.acc, stream, o.cfg.Name) - goto waitForMsgs - } else { - s.Errorf("Received an error looking up message for consumer '%s > %s > %s': %v", - o.mset.acc, stream, o.cfg.Name, err) + if pmsg != nil { + s.Errorf("Received an error looking up message with sequence %d for consumer '%s > %s > %s': %v", + pmsg.seq, o.mset.acc, stream, o.cfg.Name, err) + } else { + s.Errorf("Received an error looking up message for consumer '%s > %s > %s': %v", + o.mset.acc, stream, o.cfg.Name, err) + } goto waitForMsgs } } diff --git a/server/filestore.go b/server/filestore.go index 38b8dcd3449..940784f20e3 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -1521,7 +1521,7 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) { // Do some quick sanity checks here. if dlen < 0 || slen > (dlen-recordHashSize) || dlen > int(rl) || index+rl > lbuf || rl > rlBadThresh { truncate(index) - return gatherLost(lbuf - index), tombstones, errBadMsg + return gatherLost(lbuf - index), tombstones, errBadMsg{mb.mfn, fmt.Sprintf("sanity check failed (dlen %d slen %d rl %d index %d lbuf %d)", dlen, slen, rl, index, lbuf)} } // Check for checksum failures before additional processing. @@ -1539,7 +1539,7 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) { checksum := hh.Sum(nil) if !bytes.Equal(checksum, data[len(data)-recordHashSize:]) { truncate(index) - return gatherLost(lbuf - index), tombstones, errBadMsg + return gatherLost(lbuf - index), tombstones, errBadMsg{mb.mfn, "invalid checksum"} } copy(mb.lchk[0:], checksum) } @@ -5291,7 +5291,7 @@ func (mb *msgBlock) slotInfo(slot int) (uint32, uint32, bool, error) { } } if rl < msgHdrSize { - return 0, 0, false, errBadMsg + return 0, 0, false, errBadMsg{mb.mfn, fmt.Sprintf("length too short for slot %d", slot)} } return uint32(ri), rl, hashChecked, nil } @@ -6840,7 +6840,7 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error { // Sanity check here since we calculate size to allocate based on this. if mbFirstSeq > (mbLastSeq + 1) { // Purged state first == last + 1 - mb.fs.warn("indexCacheBuf corrupt state: mb.first %d mb.last %d", mbFirstSeq, mbLastSeq) + mb.fs.warn("indexCacheBuf corrupt state in %s: mb.first %d mb.last %d", mb.mfn, mbFirstSeq, mbLastSeq) // This would cause idxSz to wrap. return errCorruptState } @@ -6905,7 +6905,7 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error { // Do some quick sanity checks here. if dlen < 0 || slen > (dlen-recordHashSize) || dlen > int(rl) || index+rl > lbuf || rl > rlBadThresh { - mb.fs.warn("indexCacheBuf corrupt record state: dlen %d slen %d index %d rl %d lbuf %d", dlen, slen, index, rl, lbuf) + mb.fs.warn("indexCacheBuf corrupt record state in %s: dlen %d slen %d index %d rl %d lbuf %d", mb.mfn, dlen, slen, index, rl, lbuf) // This means something is off. // TODO(dlc) - Add into bad list? return errCorruptState @@ -7423,7 +7423,6 @@ func (mb *msgBlock) fetchMsgEx(seq uint64, sm *StoreMsg, doCopy bool) (*StoreMsg var ( errNoCache = errors.New("no message cache") - errBadMsg = errors.New("malformed or corrupt message") errDeletedMsg = errors.New("deleted message") errPartialCache = errors.New("partial cache") errNoPending = errors.New("message block does not have pending data") @@ -7441,6 +7440,17 @@ var ( errStateTooBig = errors.New("store state too big for optional write") ) +type ( + errBadMsg struct{ fn, detail string } +) + +func (e errBadMsg) Error() string { + if e.detail != _EMPTY_ { + return fmt.Sprintf("malformed or corrupt message in %s: %s", filepath.Base(e.fn), e.detail) + } + return fmt.Sprintf("malformed or corrupt message in %s", filepath.Base(e.fn)) +} + const ( // "Checksum bit" is used in "mb.cache.idx" for marking messages that have had their checksums checked. cbit = 1 << 31 @@ -7644,7 +7654,7 @@ func (mb *msgBlock) msgFromBufNoCopy(buf []byte, sm *StoreMsg, hh hash.Hash64) ( // Lock should be held. func (mb *msgBlock) msgFromBufEx(buf []byte, sm *StoreMsg, hh hash.Hash64, doCopy bool) (*StoreMsg, error) { if len(buf) < emptyRecordLen { - return nil, errBadMsg + return nil, errBadMsg{mb.mfn, "record too short"} } var le = binary.LittleEndian @@ -7656,7 +7666,7 @@ func (mb *msgBlock) msgFromBufEx(buf []byte, sm *StoreMsg, hh hash.Hash64, doCop slen := int(le.Uint16(hdr[20:])) // Simple sanity check. if dlen < 0 || slen > (dlen-recordHashSize) || dlen > int(rl) || int(rl) > len(buf) || rl > rlBadThresh { - return nil, errBadMsg + return nil, errBadMsg{mb.mfn, fmt.Sprintf("sanity check failed (dlen %d slen %d rl %d buf %d)", dlen, slen, rl, buf)} } data := buf[msgHdrSize : msgHdrSize+dlen] // Do checksum tests here if requested. @@ -7670,7 +7680,7 @@ func (mb *msgBlock) msgFromBufEx(buf []byte, sm *StoreMsg, hh hash.Hash64, doCop hh.Write(data[slen : dlen-recordHashSize]) } if !bytes.Equal(hh.Sum(nil), data[len(data)-8:]) { - return nil, errBadMsg + return nil, errBadMsg{mb.mfn, "invalid checksum"} } } seq := le.Uint64(hdr[4:]) @@ -7689,18 +7699,18 @@ func (mb *msgBlock) msgFromBufEx(buf []byte, sm *StoreMsg, hh hash.Hash64, doCop // layers and for us to be safe to expire, and recycle, the large msgBlocks. end := dlen - 8 if len(data) < end { - return nil, errBadMsg + return nil, errBadMsg{mb.mfn, "invalid data length"} } if hasHeaders { if slen+4 > len(data) { - return nil, errBadMsg + return nil, errBadMsg{mb.mfn, "invalid subject length greataer than data length"} } hl := le.Uint32(data[slen:]) bi := slen + 4 li := bi + int(hl) if bi > end { - return nil, errBadMsg + return nil, errBadMsg{mb.mfn, "invalid buffer index"} } if doCopy { sm.buf = append(sm.buf, data[bi:end]...) @@ -7709,13 +7719,13 @@ func (mb *msgBlock) msgFromBufEx(buf []byte, sm *StoreMsg, hh hash.Hash64, doCop } li, end = li-bi, end-bi if li > len(sm.buf) || end > len(sm.buf) { - return nil, errBadMsg + return nil, errBadMsg{mb.mfn, "invalid message length or end greater than buffer length"} } sm.hdr = sm.buf[0:li:li] sm.msg = sm.buf[li:end] } else { if slen > end { - return nil, errBadMsg + return nil, errBadMsg{mb.mfn, "invalid subject length greater than end"} } if doCopy { sm.buf = append(sm.buf, data[slen:end]...) @@ -7724,14 +7734,14 @@ func (mb *msgBlock) msgFromBufEx(buf []byte, sm *StoreMsg, hh hash.Hash64, doCop } mlen := end - slen if mlen > len(sm.buf) { - return nil, errBadMsg + return nil, errBadMsg{mb.mfn, "invalid message length greater than buffer length"} } sm.msg = sm.buf[0:mlen] } sm.seq, sm.ts = seq, ts if slen > 0 { if slen > len(data) { - return nil, errBadMsg + return nil, errBadMsg{mb.mfn, "invalid subject length greater than data length"} } if doCopy { // Make a copy since sm.subj lifetime may last longer.