diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index 3c7c4c9d..b2c50def 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -863,6 +863,7 @@ func (o *cursorOffsetNext) processRespPartition(version int16, rp *kmsg.FetchRes in = rp.RecordBatches r readerFrom + kind string length int32 lengthField *int32 crcField *int32 @@ -870,7 +871,11 @@ func (o *cursorOffsetNext) processRespPartition(version int16, rp *kmsg.FetchRes crcAt int check = func() bool { + // If we call into check, we know we have a valid + // length, so we should be at least able to parse our + // top level struct and validate the length and CRC. if err := r.ReadFrom(in[:length]); err != nil { + fp.Err = fmt.Errorf("unable to read %s, not enough data", kind) return false } if length := int32(len(in[12:length])); length != *lengthField { @@ -886,6 +891,7 @@ func (o *cursorOffsetNext) processRespPartition(version int16, rp *kmsg.FetchRes ) for len(in) > 17 && fp.Err == nil { + offset := int64(binary.BigEndian.Uint64(in)) length = int32(binary.BigEndian.Uint32(in[8:])) length += 12 // for the int64 offset we skipped and int32 length field itself if len(in) < int(length) { @@ -895,6 +901,7 @@ func (o *cursorOffsetNext) processRespPartition(version int16, rp *kmsg.FetchRes switch magic := in[16]; magic { case 0: m := new(kmsg.MessageV0) + kind = "message v0" lengthField = &m.MessageSize crcField = &m.CRC crcTable = crc32.IEEETable @@ -902,6 +909,7 @@ func (o *cursorOffsetNext) processRespPartition(version int16, rp *kmsg.FetchRes r = m case 1: m := new(kmsg.MessageV1) + kind = "message v1" lengthField = &m.MessageSize crcField = &m.CRC crcTable = crc32.IEEETable @@ -909,12 +917,19 @@ func (o *cursorOffsetNext) processRespPartition(version int16, rp *kmsg.FetchRes r = m case 2: rb := new(kmsg.RecordBatch) + kind = "record batch" lengthField = &rb.Length crcField = &rb.CRC crcTable = crc32c crcAt = 21 r = rb + default: + fp.Err = fmt.Errorf("unknown magic %d; message offset is %d and length is %d, skipping and setting to next offset", magic, offset, length) + if next := offset + 1; next > o.offset { + o.offset = next + } + return fp } if !check() { @@ -978,22 +993,22 @@ func (a aborter) trackAbortedPID(producerID int64) { // processing records to fetch part // ////////////////////////////////////// -// readRawRecords reads n records from in and returns them, returning -// kbin.ErrNotEnoughData if in does not contain enough data. -func readRawRecords(n int, in []byte) ([]kmsg.Record, error) { +// readRawRecords reads n records from in and returns them, returning early if +// there were partial records. +func readRawRecords(n int, in []byte) []kmsg.Record { rs := make([]kmsg.Record, n) for i := 0; i < n; i++ { length, used := kbin.Varint(in) total := used + int(length) if used == 0 || length < 0 || len(in) < total { - return nil, kbin.ErrNotEnoughData + return rs[:i] } if err := (&rs[i]).ReadFrom(in[:total]); err != nil { - return nil, err + return rs[:i] } in = in[total:] } - return rs, nil + return rs } func (o *cursorOffsetNext) processRecordBatch( @@ -1006,15 +1021,6 @@ func (o *cursorOffsetNext) processRecordBatch( fp.Err = fmt.Errorf("unknown batch magic %d", batch.Magic) return } - rawRecords := batch.Records - if compression := byte(batch.Attributes & 0x0007); compression != 0 { - var err error - if rawRecords, err = decompressor.decompress(rawRecords, compression); err != nil { - fp.Err = fmt.Errorf("unable to decompress batch: %v", err) - return - } - } - lastOffset := batch.FirstOffset + int64(batch.LastOffsetDelta) if lastOffset < o.offset { // If the last offset in this batch is less than what we asked @@ -1023,23 +1029,34 @@ func (o *cursorOffsetNext) processRecordBatch( return } + rawRecords := batch.Records + if compression := byte(batch.Attributes & 0x0007); compression != 0 { + var err error + if rawRecords, err = decompressor.decompress(rawRecords, compression); err != nil { + return // truncated batch + } + } + + numRecords := int(batch.NumRecords) + krecords := readRawRecords(numRecords, rawRecords) + + // KAFKA-5443: compacted topics preserve the last offset in a batch, + // even if the last record is removed, meaning that using offsets from + // records alone may not get us to the next offset we need to ask for. + // + // We only perform this logic if we did not consume a truncated batch. + // If we consume a truncated batch, then what was truncated could have + // been an offset we are interested in consuming. Even if our fetch did + // not advance this partition at all, we will eventually fetch from the + // partition and not have a truncated response, at which point we will + // either advance offsets or will set to nextAskOffset. nextAskOffset := lastOffset + 1 defer func() { - if o.offset < nextAskOffset { - // KAFKA-5443: compacted topics preserve the last offset in a - // batch, even if the last record is removed, meaning that - // using offsets from records alone may not get us to the next - // offset we need to ask for. + if numRecords == len(krecords) && o.offset < nextAskOffset { o.offset = nextAskOffset } }() - krecords, err := readRawRecords(int(batch.NumRecords), rawRecords) - if err != nil { - fp.Err = fmt.Errorf("invalid record batch: %v, on offset %d, asking next for offset %d", err, o.offset, nextAskOffset) - return - } - abortBatch := aborter.shouldAbortBatch(batch) for i := range krecords { record := recordToRecord( @@ -1079,14 +1096,14 @@ func (o *cursorOffsetNext) processV1OuterMessage( rawInner, err := decompressor.decompress(message.Value, compression) if err != nil { - fp.Err = fmt.Errorf("unable to decompress messages: %v", err) - return + return // truncated batch } var innerMessages []readerFrom +out: for len(rawInner) > 17 { // magic at byte 17 length := int32(binary.BigEndian.Uint32(rawInner[8:])) - length += 12 // skip offset and length fields + length += 12 // offset and length fields if len(rawInner) < int(length) { break } @@ -1113,10 +1130,11 @@ func (o *cursorOffsetNext) processV1OuterMessage( default: fp.Err = fmt.Errorf("message set v1 has inner message with invalid magic %d", magic) - break + break out } if err := msg.ReadFrom(rawInner[:length]); err != nil { + fp.Err = fmt.Errorf("unable to read message v%d, not enough data", magic) break } if length := int32(len(rawInner[12:length])); length != *lengthField { @@ -1184,19 +1202,19 @@ func (o *cursorOffsetNext) processV0OuterMessage( rawInner, err := decompressor.decompress(message.Value, compression) if err != nil { - fp.Err = fmt.Errorf("unable to decompress messages: %v", err) - return + return // truncated batch } var innerMessages []kmsg.MessageV0 for len(rawInner) > 17 { // magic at byte 17 length := int32(binary.BigEndian.Uint32(rawInner[8:])) - length += 12 // skip offset and length fields + length += 12 // offset and length fields if len(rawInner) < int(length) { - break + break // truncated batch } var m kmsg.MessageV0 if err := m.ReadFrom(rawInner[:length]); err != nil { + fp.Err = fmt.Errorf("unable to read message v0, not enough data") break } if length := int32(len(rawInner[12:length])); length != m.MessageSize {