From 1473778f73e803eb1d311c0bd972e9c938400379 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Tue, 7 Jan 2025 18:37:53 -0700 Subject: [PATCH] kgo: return decompression errors while consuming Kafka can return partial batches, so decompression errors are common. If I ask for at most 100 bytes, and the broker has two 60 byte batches, I will receive one valid 60 byte batch and then a partial 40 byte batch. The second partial batch will fail at decompressing. This is the reason I previously never returned decompression errors. However, if a client truly does produce somewhat-valid compressed data that *some* decompressors can process, but *others* (Go's) cannot, then the first batch received could fail to decompress. The client would fail processing, return an empty batch, and try consuming at the same spot. The client would spin loop trying to consume and the end user would never be aware. Now, if the first error received is a decompression error, we bubble it up to the end user. This is hard to test internally, so this was hack manually tested. Scenario one: * I changed the code to ignore crc errors, since that just got in the way * I ran a local kfake where the first five bytes of a RecordBatch.Records was overwritten with "aaaaa" * I consumed _before_ this patch -- the client spin-looped, never progressing and never printing anything. * I consumed _after_ this patch -- the client immediately received the error. Scenario two: * Same crc ignoring * I ran a local kfake where, when consuming, all batches AFTER the first had their RecordBatch.Records overwritten with "aaaaa". * I consumed before and after this patch -- in both cases, the client progressed to the end of the partition and no errors were printed. * To double verify the decompression error was being encountered, I added a println in kgo where the decompression error is generated -- the println was always encountered. Closes #854. --- pkg/kgo/errors.go | 15 +++++++++++++++ pkg/kgo/source.go | 16 ++++++++++++++++ 2 files changed, 31 insertions(+) diff --git a/pkg/kgo/errors.go b/pkg/kgo/errors.go index 9af0c81c..74913254 100644 --- a/pkg/kgo/errors.go +++ b/pkg/kgo/errors.go @@ -319,3 +319,18 @@ func (e *ErrGroupSession) Error() string { } func (e *ErrGroupSession) Unwrap() error { return e.Err } + +type errDecompress struct { + err error +} + +func (e *errDecompress) Error() string { + return fmt.Sprintf("unable to decompress batch: %v", e.err) +} + +func (e *errDecompress) Unwrap() error { return e.err } + +func isDecompressErr(err error) bool { + var ed *errDecompress + return errors.As(err, &ed) +} diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index 12732e90..4ac26b0c 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -1389,6 +1389,19 @@ func (o *cursorOffsetNext) processRespPartition(br *broker, rp *kmsg.FetchRespon h.OnFetchBatchRead(br.meta, o.from.topic, o.from.partition, m) } }) + + // If we encounter a decompression error BUT we have successfully decompressed + // one batch, it is likely that we have received a partial batch. Kafka returns + // UP TO the requested max partition bytes, sometimes truncating data at the end. + // It returns at least one valid batch, but everything after is copied as is + // (i.e. a quick slab copy). We set the error to nil and return what we have. + // + // If we have a decompression error immediately, we keep it and bubble it up. + // The client cannot progress, and the end user needs visibility. + if isDecompressErr(fp.Err) && len(fp.Records) > 0 { + fp.Err = nil + break + } } return fp @@ -1476,6 +1489,7 @@ func (o *cursorOffsetNext) processRecordBatch( if compression := byte(batch.Attributes & 0x0007); compression != 0 { var err error if rawRecords, err = decompressor.decompress(rawRecords, compression); err != nil { + fp.Err = &errDecompress{err} return 0, 0 // truncated batch } } @@ -1542,6 +1556,7 @@ func (o *cursorOffsetNext) processV1OuterMessage( rawInner, err := decompressor.decompress(message.Value, compression) if err != nil { + fp.Err = &errDecompress{err} return 0, 0 // truncated batch } @@ -1653,6 +1668,7 @@ func (o *cursorOffsetNext) processV0OuterMessage( rawInner, err := decompressor.decompress(message.Value, compression) if err != nil { + fp.Err = &errDecompress{err} return 0, 0 // truncated batch }