From a478251b3c5730317dd5f76b52a55e57f7cf5221 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Mon, 30 Aug 2021 22:25:00 -0600 Subject: [PATCH] bugfix source: advance offsets even if we return no records Commit 058f692 changed things to not buffer a fetch if the fetch contained no records. Unfortunately, if a fetch has a small MaxBytes and can only consume one record, and that one record is a control batch, then we would never advance offsets. We now advance offsets internally if we process a fetch successfully, even if we do not buffer a fetch to be returned. --- pkg/kgo/source.go | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index 9b775fcf..7d628edb 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -263,8 +263,8 @@ func (os usedOffsets) eachOffset(fn func(*cursorOffsetNext)) { } } -func (os usedOffsets) finishUsingAllWith(fn func(*cursorOffsetNext)) { - os.eachOffset(func(o *cursorOffsetNext) { fn(o); o.from.allowUsable() }) +func (os usedOffsets) finishUsingAllWithSet() { + os.eachOffset(func(o *cursorOffsetNext) { o.from.setOffset(o.cursorOffset); o.from.allowUsable() }) } func (os usedOffsets) finishUsingAll() { @@ -328,11 +328,7 @@ func (s *source) hook(f *Fetch, buffered, polled bool) { // takeBuffered drains a buffered fetch and updates offsets. func (s *source) takeBuffered() Fetch { - return s.takeBufferedFn(true, func(usedOffsets usedOffsets) { - usedOffsets.finishUsingAllWith(func(o *cursorOffsetNext) { - o.from.setOffset(o.cursorOffset) - }) - }) + return s.takeBufferedFn(true, usedOffsets.finishUsingAllWithSet) } func (s *source) discardBuffered() { @@ -547,12 +543,19 @@ func (s *source) fetch(consumerSession *consumerSession, doneFetch chan<- struct // For all returns, if we do not buffer our fetch, then we want to // ensure our used offsets are usable again. - var alreadySentToDoneFetch bool - var buffered bool + var ( + alreadySentToDoneFetch bool + setOffsets bool + buffered bool + ) defer func() { if !buffered { if req.numOffsets > 0 { - req.usedOffsets.finishUsingAll() + if setOffsets { + req.usedOffsets.finishUsingAllWithSet() + } else { + req.usedOffsets.finishUsingAll() + } } if !alreadySentToDoneFetch { doneFetch <- struct{}{} @@ -689,6 +692,12 @@ func (s *source) fetch(consumerSession *consumerSession, doneFetch chan<- struct return } + // At this point, we have successfully processed the response. Even if + // the response contains no records, we want to keep any offset + // advancements (we could have consumed only control records, we must + // advance past them). + setOffsets = true + if resp.Version < 7 { // If the version is less than 7, we cannot use fetch sessions, // so we kill them on the first response.