From 02560c7b49cadacdb9104b90fafd9b3a34f29295 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Thu, 17 Feb 2022 03:00:44 -0700 Subject: [PATCH] consumer group: bugfix fetch offsets spanning rebalance 4f2e7fe3, addressing #98, was not complete: if, after a rebalance, the member was assigned no new added partitions, then fetchOffsets would not run. This prevented the logic in 4f2e7fe3, which was to resume fetching for prior partitions that the client kept. We now add back in the previously-fetching partitions _before_ fetchOffsets. This was found by testing running AddConsumeTopics and PurgeConsumeTopics concurrent against each other, each spin looping 1000x, where we purged 'foo' and 'bar' and added 'foo' and 'bar' and 'biz'. Turns out 'biz' was not being consumed after the end; after this commit, 'biz' is consumed. --- pkg/kgo/consumer_group.go | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/pkg/kgo/consumer_group.go b/pkg/kgo/consumer_group.go index 7741649f..c3e51a67 100644 --- a/pkg/kgo/consumer_group.go +++ b/pkg/kgo/consumer_group.go @@ -672,11 +672,18 @@ func (g *groupConsumer) setupAssignedAndHeartbeat() error { // is specifically used for this function's return. fetchDone := make(chan struct{}) defer func() { <-fetchDone }() + + // If cooperative consuming, we may have to resume fetches. See the + // comment on adjustCooperativeFetchOffsets. + if g.cooperative { + added = g.adjustCooperativeFetchOffsets(added, lost) + } + if len(added) > 0 { go func() { defer close(fetchDone) defer close(fetchErrCh) - fetchErrCh <- g.fetchOffsets(ctx, added, lost) + fetchErrCh <- g.fetchOffsets(ctx, added) }() } else { close(fetchDone) @@ -1156,18 +1163,14 @@ func (g *groupConsumer) adjustCooperativeFetchOffsets(added, lost map[string][]i // fetchOffsets is issued once we join a group to see what the prior commits // were for the partitions we were assigned. -func (g *groupConsumer) fetchOffsets(ctx context.Context, added, lost map[string][]int32) (rerr error) { // we must use "rerr"! see introducing commit - // If cooperative consuming, we may have to resume fetches. See the - // comment on adjustCooperativeFetchOffsets. If we successfully fetch, - // we clear what we were fetching. - if g.cooperative { - added = g.adjustCooperativeFetchOffsets(added, lost) - defer func() { - if rerr == nil { - g.fetching = nil - } - }() - } +func (g *groupConsumer) fetchOffsets(ctx context.Context, added map[string][]int32) (rerr error) { // we must use "rerr"! see introducing commit + // If we fetch successfully, we can clear the cross-group-cycle + // fetching tracking. + defer func() { + if rerr == nil { + g.fetching = nil + } + }() // Our client maps the v0 to v7 format to v8+ when sharding this // request, if we are only requesting one group, as well as maps the