From 4979b52f67f64d6cccd3a264d7743ed5ed973ab5 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Sun, 14 Feb 2021 14:12:35 -0700 Subject: [PATCH] consuming: bugfix for consuming compacted topics See KAFKA-5443: a compacted topic can have later records removed from within a batch, but the last offset delta is preserved. We need to use the last offset delta for a batch rather than just relying on consumed records. If we do not, we could ask for 144, get 143 while having a last offset delta of 145, and never advance since we discard 143 and do nothing about 145. --- pkg/kgo/source.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index ea2240cd..77deaf1f 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -861,6 +861,15 @@ func (o *cursorOffsetNext) processRecordBatch( return } } + + lastOffset := batch.FirstOffset + int64(batch.LastOffsetDelta) + if lastOffset < o.offset { + // If the last offset in this batch is less than what we asked + // for, we got a batch that we entirely do not need. We can + // avoid all work (although we should not get this batch). + return + } + krecords, err := kmsg.ReadRecords(int(batch.NumRecords), rawRecords) if err != nil { fp.Err = fmt.Errorf("invalid record batch: %v", err) @@ -880,6 +889,15 @@ func (o *cursorOffsetNext) processRecordBatch( o.maybeKeepRecord(fp, record, abortBatch) } + nextAskOffset := lastOffset + 1 + if o.offset < nextAskOffset { + // KAFKA-5443: compacted topics preserve the last offset in a + // batch, even if the last record is removed, meaning that + // using offsets from records alone may not get us to the next + // offset we need to ask for. + o.offset = nextAskOffset + } + if abortBatch && lastRecord != nil && lastRecord.Attrs.IsControl() { aborter.trackAbortedPID(batch.ProducerID) }