Skip to content

Commit

Permalink
kgo: avoid a consumer logic race where the consumer stops consuming
Browse files Browse the repository at this point in the history
  • Loading branch information
twmb committed Jul 17, 2023
1 parent c1bb2be commit 1f696ca
Showing 1 changed file with 17 additions and 0 deletions.
17 changes: 17 additions & 0 deletions pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,23 @@ func (s *source) loopFetch() {

if session == noConsumerSession {
s.fetchState.hardFinish()
// It is possible that we were triggered to consume while we
// had no consumer session, and then *after* loopFetch loaded
// noConsumerSession, the session was saved and triggered to
// consume again. If this function is slow the first time
// around, it could still be running and about to hardFinish.
// The second trigger will do nothing, and then we hardFinish
// and block a new session from actually starting consuming.
//
// To guard against this, after we hard finish, we load the
// session again: if it is *not* noConsumerSession, we trigger
// attempting to consume again. Worst case, the trigger is
// useless and it will exit below when it builds an empty
// request.
sessionNow := consumer.loadSession()
if session != sessionNow {
s.maybeConsume()
}
return
}

Expand Down

0 comments on commit 1f696ca

Please sign in to comment.