Skip to content

Commit

Permalink
consumer group: bugfix fetch offsets spanning rebalance
Browse files Browse the repository at this point in the history
4f2e7fe, addressing #98, was not complete: if, after a rebalance, the
member was assigned no new added partitions, then fetchOffsets would not
run. This prevented the logic in 4f2e7fe, which was to resume fetching
for prior partitions that the client kept.

We now add back in the previously-fetching partitions _before_
fetchOffsets.

This was found by testing running AddConsumeTopics and
PurgeConsumeTopics concurrent against each other, each spin looping
1000x, where we purged 'foo' and 'bar' and added 'foo' and 'bar' and
'biz'. Turns out 'biz' was not being consumed after the end; after this
commit, 'biz' is consumed.
  • Loading branch information
twmb committed Feb 22, 2022
1 parent aa2550d commit 02560c7
Showing 1 changed file with 16 additions and 13 deletions.
29 changes: 16 additions & 13 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,11 +672,18 @@ func (g *groupConsumer) setupAssignedAndHeartbeat() error {
// is specifically used for this function's return.
fetchDone := make(chan struct{})
defer func() { <-fetchDone }()

// If cooperative consuming, we may have to resume fetches. See the
// comment on adjustCooperativeFetchOffsets.
if g.cooperative {
added = g.adjustCooperativeFetchOffsets(added, lost)
}

if len(added) > 0 {
go func() {
defer close(fetchDone)
defer close(fetchErrCh)
fetchErrCh <- g.fetchOffsets(ctx, added, lost)
fetchErrCh <- g.fetchOffsets(ctx, added)
}()
} else {
close(fetchDone)
Expand Down Expand Up @@ -1156,18 +1163,14 @@ func (g *groupConsumer) adjustCooperativeFetchOffsets(added, lost map[string][]i

// fetchOffsets is issued once we join a group to see what the prior commits
// were for the partitions we were assigned.
func (g *groupConsumer) fetchOffsets(ctx context.Context, added, lost map[string][]int32) (rerr error) { // we must use "rerr"! see introducing commit
// If cooperative consuming, we may have to resume fetches. See the
// comment on adjustCooperativeFetchOffsets. If we successfully fetch,
// we clear what we were fetching.
if g.cooperative {
added = g.adjustCooperativeFetchOffsets(added, lost)
defer func() {
if rerr == nil {
g.fetching = nil
}
}()
}
func (g *groupConsumer) fetchOffsets(ctx context.Context, added map[string][]int32) (rerr error) { // we must use "rerr"! see introducing commit
// If we fetch successfully, we can clear the cross-group-cycle
// fetching tracking.
defer func() {
if rerr == nil {
g.fetching = nil
}
}()

// Our client maps the v0 to v7 format to v8+ when sharding this
// request, if we are only requesting one group, as well as maps the
Expand Down

0 comments on commit 02560c7

Please sign in to comment.