Skip to content

Commit

Permalink
consumer group: fix race
Browse files Browse the repository at this point in the history
The the group was rotated, we did not wait for the revoke to be done
before falling back into managing.

Revoking happens in a dedicated goroutine, and managing revokes as well
on return (different type of revoke). These two revokes raced.

We now unconditionally wait for the heartbeat function to finish
revoking.
  • Loading branch information
twmb committed Feb 22, 2021
1 parent 5f0c5c1 commit 0ca274c
Showing 1 changed file with 9 additions and 2 deletions.
11 changes: 9 additions & 2 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,8 @@ func (g *groupConsumer) heartbeat(fetchErrCh <-chan error, s *assignRevokeSessio
var heartbeat, didMetadone, didRevoke bool
var lastErr error

ctxCh := g.ctx.Done()

for {
var err error
heartbeat = false
Expand All @@ -815,8 +817,13 @@ func (g *groupConsumer) heartbeat(fetchErrCh <-chan error, s *assignRevokeSessio
case <-revoked:
revoked = nil
didRevoke = true
case <-g.ctx.Done():
return errLeftGroup
case <-ctxCh:
// Even if the group is left, we need to wait for our
// revoke to finish before returning, otherwise the
// manage goroutine will race with us setting
// nowAssigned.
ctxCh = nil
err = errLeftGroup
}

if heartbeat {
Expand Down

0 comments on commit 0ca274c

Please sign in to comment.