Skip to content

Commit

Permalink
bugfix source: advance offsets even if we return no records
Browse files Browse the repository at this point in the history
Commit 058f692 changed things to not buffer a fetch if the fetch
contained no records. Unfortunately, if a fetch has a small MaxBytes and
can only consume one record, and that one record is a control batch,
then we would never advance offsets.

We now advance offsets internally if we process a fetch successfully,
even if we do not buffer a fetch to be returned.
  • Loading branch information
twmb committed Aug 31, 2021
1 parent 7eb02f1 commit a478251
Showing 1 changed file with 19 additions and 10 deletions.
29 changes: 19 additions & 10 deletions pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,8 @@ func (os usedOffsets) eachOffset(fn func(*cursorOffsetNext)) {
}
}

func (os usedOffsets) finishUsingAllWith(fn func(*cursorOffsetNext)) {
os.eachOffset(func(o *cursorOffsetNext) { fn(o); o.from.allowUsable() })
func (os usedOffsets) finishUsingAllWithSet() {
os.eachOffset(func(o *cursorOffsetNext) { o.from.setOffset(o.cursorOffset); o.from.allowUsable() })
}

func (os usedOffsets) finishUsingAll() {
Expand Down Expand Up @@ -328,11 +328,7 @@ func (s *source) hook(f *Fetch, buffered, polled bool) {

// takeBuffered drains a buffered fetch and updates offsets.
func (s *source) takeBuffered() Fetch {
return s.takeBufferedFn(true, func(usedOffsets usedOffsets) {
usedOffsets.finishUsingAllWith(func(o *cursorOffsetNext) {
o.from.setOffset(o.cursorOffset)
})
})
return s.takeBufferedFn(true, usedOffsets.finishUsingAllWithSet)
}

func (s *source) discardBuffered() {
Expand Down Expand Up @@ -547,12 +543,19 @@ func (s *source) fetch(consumerSession *consumerSession, doneFetch chan<- struct

// For all returns, if we do not buffer our fetch, then we want to
// ensure our used offsets are usable again.
var alreadySentToDoneFetch bool
var buffered bool
var (
alreadySentToDoneFetch bool
setOffsets bool
buffered bool
)
defer func() {
if !buffered {
if req.numOffsets > 0 {
req.usedOffsets.finishUsingAll()
if setOffsets {
req.usedOffsets.finishUsingAllWithSet()
} else {
req.usedOffsets.finishUsingAll()
}
}
if !alreadySentToDoneFetch {
doneFetch <- struct{}{}
Expand Down Expand Up @@ -689,6 +692,12 @@ func (s *source) fetch(consumerSession *consumerSession, doneFetch chan<- struct
return
}

// At this point, we have successfully processed the response. Even if
// the response contains no records, we want to keep any offset
// advancements (we could have consumed only control records, we must
// advance past them).
setOffsets = true

if resp.Version < 7 {
// If the version is less than 7, we cannot use fetch sessions,
// so we kill them on the first response.
Expand Down

0 comments on commit a478251

Please sign in to comment.