Skip to content

Commit

Permalink
source: more properly ignore truncated partitions
Browse files Browse the repository at this point in the history
A truncated partition can come at any moment, even before we read the
magic. We need to not set the partition error if we fail reading. In
some places, particularly when we validate we have enough length ahead
of time, we do now properly set the partition error because if we have
enough length, we should be able to decode.

We now only use LastOffsetDelta if we properly decoded all records. If
we did not before, then we could have failed on reading a truncated
batch and then skipped all records.

Finally, if we decode a magic we do not know of, we set an error in our
response and properly skip the offset.
  • Loading branch information
twmb committed Apr 8, 2021
1 parent 993ef27 commit d4fe91d
Showing 1 changed file with 52 additions and 34 deletions.
86 changes: 52 additions & 34 deletions pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -863,14 +863,19 @@ func (o *cursorOffsetNext) processRespPartition(version int16, rp *kmsg.FetchRes
in = rp.RecordBatches

r readerFrom
kind string
length int32
lengthField *int32
crcField *int32
crcTable *crc32.Table
crcAt int

check = func() bool {
// If we call into check, we know we have a valid
// length, so we should be at least able to parse our
// top level struct and validate the length and CRC.
if err := r.ReadFrom(in[:length]); err != nil {
fp.Err = fmt.Errorf("unable to read %s, not enough data", kind)
return false
}
if length := int32(len(in[12:length])); length != *lengthField {
Expand All @@ -886,6 +891,7 @@ func (o *cursorOffsetNext) processRespPartition(version int16, rp *kmsg.FetchRes
)

for len(in) > 17 && fp.Err == nil {
offset := int64(binary.BigEndian.Uint64(in))
length = int32(binary.BigEndian.Uint32(in[8:]))
length += 12 // for the int64 offset we skipped and int32 length field itself
if len(in) < int(length) {
Expand All @@ -895,26 +901,35 @@ func (o *cursorOffsetNext) processRespPartition(version int16, rp *kmsg.FetchRes
switch magic := in[16]; magic {
case 0:
m := new(kmsg.MessageV0)
kind = "message v0"
lengthField = &m.MessageSize
crcField = &m.CRC
crcTable = crc32.IEEETable
crcAt = 16
r = m
case 1:
m := new(kmsg.MessageV1)
kind = "message v1"
lengthField = &m.MessageSize
crcField = &m.CRC
crcTable = crc32.IEEETable
crcAt = 16
r = m
case 2:
rb := new(kmsg.RecordBatch)
kind = "record batch"
lengthField = &rb.Length
crcField = &rb.CRC
crcTable = crc32c
crcAt = 21
r = rb

default:
fp.Err = fmt.Errorf("unknown magic %d; message offset is %d and length is %d, skipping and setting to next offset", magic, offset, length)
if next := offset + 1; next > o.offset {
o.offset = next
}
return fp
}

if !check() {
Expand Down Expand Up @@ -978,22 +993,22 @@ func (a aborter) trackAbortedPID(producerID int64) {
// processing records to fetch part //
//////////////////////////////////////

// readRawRecords reads n records from in and returns them, returning
// kbin.ErrNotEnoughData if in does not contain enough data.
func readRawRecords(n int, in []byte) ([]kmsg.Record, error) {
// readRawRecords reads n records from in and returns them, returning early if
// there were partial records.
func readRawRecords(n int, in []byte) []kmsg.Record {
rs := make([]kmsg.Record, n)
for i := 0; i < n; i++ {
length, used := kbin.Varint(in)
total := used + int(length)
if used == 0 || length < 0 || len(in) < total {
return nil, kbin.ErrNotEnoughData
return rs[:i]
}
if err := (&rs[i]).ReadFrom(in[:total]); err != nil {
return nil, err
return rs[:i]
}
in = in[total:]
}
return rs, nil
return rs
}

func (o *cursorOffsetNext) processRecordBatch(
Expand All @@ -1006,15 +1021,6 @@ func (o *cursorOffsetNext) processRecordBatch(
fp.Err = fmt.Errorf("unknown batch magic %d", batch.Magic)
return
}
rawRecords := batch.Records
if compression := byte(batch.Attributes & 0x0007); compression != 0 {
var err error
if rawRecords, err = decompressor.decompress(rawRecords, compression); err != nil {
fp.Err = fmt.Errorf("unable to decompress batch: %v", err)
return
}
}

lastOffset := batch.FirstOffset + int64(batch.LastOffsetDelta)
if lastOffset < o.offset {
// If the last offset in this batch is less than what we asked
Expand All @@ -1023,23 +1029,34 @@ func (o *cursorOffsetNext) processRecordBatch(
return
}

rawRecords := batch.Records
if compression := byte(batch.Attributes & 0x0007); compression != 0 {
var err error
if rawRecords, err = decompressor.decompress(rawRecords, compression); err != nil {
return // truncated batch
}
}

numRecords := int(batch.NumRecords)
krecords := readRawRecords(numRecords, rawRecords)

// KAFKA-5443: compacted topics preserve the last offset in a batch,
// even if the last record is removed, meaning that using offsets from
// records alone may not get us to the next offset we need to ask for.
//
// We only perform this logic if we did not consume a truncated batch.
// If we consume a truncated batch, then what was truncated could have
// been an offset we are interested in consuming. Even if our fetch did
// not advance this partition at all, we will eventually fetch from the
// partition and not have a truncated response, at which point we will
// either advance offsets or will set to nextAskOffset.
nextAskOffset := lastOffset + 1
defer func() {
if o.offset < nextAskOffset {
// KAFKA-5443: compacted topics preserve the last offset in a
// batch, even if the last record is removed, meaning that
// using offsets from records alone may not get us to the next
// offset we need to ask for.
if numRecords == len(krecords) && o.offset < nextAskOffset {
o.offset = nextAskOffset
}
}()

krecords, err := readRawRecords(int(batch.NumRecords), rawRecords)
if err != nil {
fp.Err = fmt.Errorf("invalid record batch: %v, on offset %d, asking next for offset %d", err, o.offset, nextAskOffset)
return
}

abortBatch := aborter.shouldAbortBatch(batch)
for i := range krecords {
record := recordToRecord(
Expand Down Expand Up @@ -1079,14 +1096,14 @@ func (o *cursorOffsetNext) processV1OuterMessage(

rawInner, err := decompressor.decompress(message.Value, compression)
if err != nil {
fp.Err = fmt.Errorf("unable to decompress messages: %v", err)
return
return // truncated batch
}

var innerMessages []readerFrom
out:
for len(rawInner) > 17 { // magic at byte 17
length := int32(binary.BigEndian.Uint32(rawInner[8:]))
length += 12 // skip offset and length fields
length += 12 // offset and length fields
if len(rawInner) < int(length) {
break
}
Expand All @@ -1113,10 +1130,11 @@ func (o *cursorOffsetNext) processV1OuterMessage(

default:
fp.Err = fmt.Errorf("message set v1 has inner message with invalid magic %d", magic)
break
break out
}

if err := msg.ReadFrom(rawInner[:length]); err != nil {
fp.Err = fmt.Errorf("unable to read message v%d, not enough data", magic)
break
}
if length := int32(len(rawInner[12:length])); length != *lengthField {
Expand Down Expand Up @@ -1184,19 +1202,19 @@ func (o *cursorOffsetNext) processV0OuterMessage(

rawInner, err := decompressor.decompress(message.Value, compression)
if err != nil {
fp.Err = fmt.Errorf("unable to decompress messages: %v", err)
return
return // truncated batch
}

var innerMessages []kmsg.MessageV0
for len(rawInner) > 17 { // magic at byte 17
length := int32(binary.BigEndian.Uint32(rawInner[8:]))
length += 12 // skip offset and length fields
length += 12 // offset and length fields
if len(rawInner) < int(length) {
break
break // truncated batch
}
var m kmsg.MessageV0
if err := m.ReadFrom(rawInner[:length]); err != nil {
fp.Err = fmt.Errorf("unable to read message v0, not enough data")
break
}
if length := int32(len(rawInner[12:length])); length != m.MessageSize {
Expand Down

0 comments on commit d4fe91d

Please sign in to comment.