From 4f2e7fe3f420f0b8e68430fbf826c38070a2c0f4 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Mon, 25 Oct 2021 02:18:40 -0600 Subject: [PATCH] consumer group: bugfix cooperative rebalancing losing offset fetches An immediate rebalance can kill offset fetching but keep assignments, and when the rebalance completes, the client does not continue fetching offsets from prior assignments that the group member still has. This commit fixes that by persisting a bit of state across rebalances for the cooperative consumer. See the embedded comments for more details. Closes #98. --- pkg/kgo/consumer_group.go | 105 +++++++++++++++++++++++++++++++++++--- 1 file changed, 97 insertions(+), 8 deletions(-) diff --git a/pkg/kgo/consumer_group.go b/pkg/kgo/consumer_group.go index 84fe9672..f1a004ab 100644 --- a/pkg/kgo/consumer_group.go +++ b/pkg/kgo/consumer_group.go @@ -51,6 +51,15 @@ type groupConsumer struct { lastAssigned map[string][]int32 // only updated in join&sync loop nowAssigned map[string][]int32 // only updated in join&sync loop + // Fetching ensures we continue fetching offsets across cooperative + // rebalance if an offset fetch returns early due to an immediate + // rebalance. See the large comment on adjustCooperativeFetchOffsets + // for more details. + // + // This is modified only in that function, or in the manage loop on a + // hard error once the heartbeat/fetch has returned. + fetching map[string]map[int32]struct{} + // leader is whether we are the leader right now. This is set to false // // - set to false at the beginning of a join group session @@ -284,6 +293,7 @@ func (g *groupConsumer) manage() { g.nowAssigned = nil g.lastAssigned = nil + g.fetching = nil g.leader.set(false) } @@ -653,7 +663,7 @@ func (g *groupConsumer) setupAssignedAndHeartbeat() error { defer close(fetchDone) defer close(fetchErrCh) g.cfg.logger.Log(LogLevelInfo, "fetching offsets for added partitions", "group", g.cfg.group, "added", added) - fetchErrCh <- g.fetchOffsets(ctx, added) + fetchErrCh <- g.fetchOffsets(ctx, added, lost) }() } else { close(fetchDone) @@ -1062,9 +1072,85 @@ func (g *groupConsumer) joinGroupProtocols() []kmsg.JoinGroupRequestProtocol { return protos } +// If we are cooperatively consuming, we have a potential problem: if fetch +// offsets is canceled due to an immediate rebalance, when we resume, we will +// not re-fetch offsets for partitions we were previously assigned and are +// still assigned. We will only fetch offsets for new assignments. +// +// To work around that issue, we track everything we are fetching in g.fetching +// and only clear g.fetching if fetchOffsets returns with no error. +// +// Now, if fetching returns early due to an error, when we rejoin and re-fetch, +// we will resume fetching what we were previously: +// +// * first we remove what was lost +// * then we add anything new +// * then we translate our total set into the "added" list to be fetched on return +// +// Any time a group is completely lost, the manage loop clears fetching. When +// cooperative consuming, a hard error is basically losing the entire state and +// rejoining from scratch. +func (g *groupConsumer) adjustCooperativeFetchOffsets(added, lost map[string][]int32) map[string][]int32 { + if g.fetching != nil { + // We were fetching previously: remove anything lost. + for topic, partitions := range lost { + ft := g.fetching[topic] + if ft == nil { + continue // we were not fetching this topic + } + for _, partition := range partitions { + delete(ft, partition) + } + if len(ft) == 0 { + delete(g.fetching, topic) + } + } + } else { + // We were not fetching previously: start a new map for what we + // are adding. + g.fetching = make(map[string]map[int32]struct{}) + } + + // Merge everything we are newly fetching to our fetching map. + for topic, partitions := range added { + ft := g.fetching[topic] + if ft == nil { + ft = make(map[int32]struct{}, len(partitions)) + g.fetching[topic] = ft + } + for _, partition := range partitions { + ft[partition] = struct{}{} + } + } + + // Now translate our full set (previously fetching ++ newly fetching -- + // lost) into a new "added" map to be fetched. + added = make(map[string][]int32, len(g.fetching)) + for topic, partitions := range g.fetching { + ps := make([]int32, 0, len(partitions)) + for partition := range partitions { + ps = append(ps, partition) + } + added[topic] = ps + } + return added +} + // fetchOffsets is issued once we join a group to see what the prior commits // were for the partitions we were assigned. -func (g *groupConsumer) fetchOffsets(ctx context.Context, newAssigned map[string][]int32) error { +func (g *groupConsumer) fetchOffsets(ctx context.Context, added, lost map[string][]int32) (err error) { + // If cooperative consuming, we may have to resume fetches. See the + // comment on adjustCooperativeFetchOffsets. If we successfully fetch, + // we clear what we were fetching. + if g.cooperative { + added = g.adjustCooperativeFetchOffsets(added, lost) + defer func() { + if err == nil { + g.fetching = nil + } + }() + } + // Our client maps the v0 to v7 format to v8+ when sharding this // request, if we are only requesting one group, as well as maps the // response back, so we do not need to worry about v8+ here. @@ -1072,17 +1158,14 @@ start: req := kmsg.NewPtrOffsetFetchRequest() req.Group = g.cfg.group req.RequireStable = g.cfg.requireStable - for topic, partitions := range newAssigned { + for topic, partitions := range added { reqTopic := kmsg.NewOffsetFetchRequestTopic() reqTopic.Topic = topic reqTopic.Partitions = partitions req.Topics = append(req.Topics, reqTopic) } - var ( - resp *kmsg.OffsetFetchResponse - err error - ) + var resp *kmsg.OffsetFetchResponse fetchDone := make(chan struct{}) go func() { @@ -1092,7 +1175,7 @@ start: select { case <-fetchDone: case <-ctx.Done(): - g.cfg.logger.Log(LogLevelError, "fetch offsets failed due to context cancelation", "group", g.cfg.group) + g.cfg.logger.Log(LogLevelInfo, "fetch offsets failed due to context cancelation", "group", g.cfg.group) return ctx.Err() } if err != nil { @@ -1126,6 +1209,12 @@ start: goto start } } + g.cfg.logger.Log(LogLevelError, "fetch offsets failed", + "group", g.cfg.group, + "topic", rTopic.Topic, + "partition", rPartition.Partition, + "err", err, + ) return err } offset := Offset{