diff --git a/pkg/kgo/errors.go b/pkg/kgo/errors.go index 9af0c81c..74913254 100644 --- a/pkg/kgo/errors.go +++ b/pkg/kgo/errors.go @@ -319,3 +319,18 @@ func (e *ErrGroupSession) Error() string { } func (e *ErrGroupSession) Unwrap() error { return e.Err } + +type errDecompress struct { + err error +} + +func (e *errDecompress) Error() string { + return fmt.Sprintf("unable to decompress batch: %v", e.err) +} + +func (e *errDecompress) Unwrap() error { return e.err } + +func isDecompressErr(err error) bool { + var ed *errDecompress + return errors.As(err, &ed) +} diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index 12732e90..4ac26b0c 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -1389,6 +1389,19 @@ func (o *cursorOffsetNext) processRespPartition(br *broker, rp *kmsg.FetchRespon h.OnFetchBatchRead(br.meta, o.from.topic, o.from.partition, m) } }) + + // If we encounter a decompression error BUT we have successfully decompressed + // one batch, it is likely that we have received a partial batch. Kafka returns + // UP TO the requested max partition bytes, sometimes truncating data at the end. + // It returns at least one valid batch, but everything after is copied as is + // (i.e. a quick slab copy). We set the error to nil and return what we have. + // + // If we have a decompression error immediately, we keep it and bubble it up. + // The client cannot progress, and the end user needs visibility. + if isDecompressErr(fp.Err) && len(fp.Records) > 0 { + fp.Err = nil + break + } } return fp @@ -1476,6 +1489,7 @@ func (o *cursorOffsetNext) processRecordBatch( if compression := byte(batch.Attributes & 0x0007); compression != 0 { var err error if rawRecords, err = decompressor.decompress(rawRecords, compression); err != nil { + fp.Err = &errDecompress{err} return 0, 0 // truncated batch } } @@ -1542,6 +1556,7 @@ func (o *cursorOffsetNext) processV1OuterMessage( rawInner, err := decompressor.decompress(message.Value, compression) if err != nil { + fp.Err = &errDecompress{err} return 0, 0 // truncated batch } @@ -1653,6 +1668,7 @@ func (o *cursorOffsetNext) processV0OuterMessage( rawInner, err := decompressor.decompress(message.Value, compression) if err != nil { + fp.Err = &errDecompress{err} return 0, 0 // truncated batch }