diff --git a/pkg/kgo/consumer_group.go b/pkg/kgo/consumer_group.go index 84fe9672..f1a004ab 100644 --- a/pkg/kgo/consumer_group.go +++ b/pkg/kgo/consumer_group.go @@ -51,6 +51,15 @@ type groupConsumer struct { lastAssigned map[string][]int32 // only updated in join&sync loop nowAssigned map[string][]int32 // only updated in join&sync loop + // Fetching ensures we continue fetching offsets across cooperative + // rebalance if an offset fetch returns early due to an immediate + // rebalance. See the large comment on adjustCooperativeFetchOffsets + // for more details. + // + // This is modified only in that function, or in the manage loop on a + // hard error once the heartbeat/fetch has returned. + fetching map[string]map[int32]struct{} + // leader is whether we are the leader right now. This is set to false // // - set to false at the beginning of a join group session @@ -284,6 +293,7 @@ func (g *groupConsumer) manage() { g.nowAssigned = nil g.lastAssigned = nil + g.fetching = nil g.leader.set(false) } @@ -653,7 +663,7 @@ func (g *groupConsumer) setupAssignedAndHeartbeat() error { defer close(fetchDone) defer close(fetchErrCh) g.cfg.logger.Log(LogLevelInfo, "fetching offsets for added partitions", "group", g.cfg.group, "added", added) - fetchErrCh <- g.fetchOffsets(ctx, added) + fetchErrCh <- g.fetchOffsets(ctx, added, lost) }() } else { close(fetchDone) @@ -1062,9 +1072,85 @@ func (g *groupConsumer) joinGroupProtocols() []kmsg.JoinGroupRequestProtocol { return protos } +// If we are cooperatively consuming, we have a potential problem: if fetch +// offsets is canceled due to an immediate rebalance, when we resume, we will +// not re-fetch offsets for partitions we were previously assigned and are +// still assigned. We will only fetch offsets for new assignments. +// +// To work around that issue, we track everything we are fetching in g.fetching +// and only clear g.fetching if fetchOffsets returns with no error. +// +// Now, if fetching returns early due to an error, when we rejoin and re-fetch, +// we will resume fetching what we were previously: +// +// * first we remove what was lost +// * then we add anything new +// * then we translate our total set into the "added" list to be fetched on return +// +// Any time a group is completely lost, the manage loop clears fetching. When +// cooperative consuming, a hard error is basically losing the entire state and +// rejoining from scratch. +func (g *groupConsumer) adjustCooperativeFetchOffsets(added, lost map[string][]int32) map[string][]int32 { + if g.fetching != nil { + // We were fetching previously: remove anything lost. + for topic, partitions := range lost { + ft := g.fetching[topic] + if ft == nil { + continue // we were not fetching this topic + } + for _, partition := range partitions { + delete(ft, partition) + } + if len(ft) == 0 { + delete(g.fetching, topic) + } + } + } else { + // We were not fetching previously: start a new map for what we + // are adding. + g.fetching = make(map[string]map[int32]struct{}) + } + + // Merge everything we are newly fetching to our fetching map. + for topic, partitions := range added { + ft := g.fetching[topic] + if ft == nil { + ft = make(map[int32]struct{}, len(partitions)) + g.fetching[topic] = ft + } + for _, partition := range partitions { + ft[partition] = struct{}{} + } + } + + // Now translate our full set (previously fetching ++ newly fetching -- + // lost) into a new "added" map to be fetched. + added = make(map[string][]int32, len(g.fetching)) + for topic, partitions := range g.fetching { + ps := make([]int32, 0, len(partitions)) + for partition := range partitions { + ps = append(ps, partition) + } + added[topic] = ps + } + return added +} + // fetchOffsets is issued once we join a group to see what the prior commits // were for the partitions we were assigned. -func (g *groupConsumer) fetchOffsets(ctx context.Context, newAssigned map[string][]int32) error { +func (g *groupConsumer) fetchOffsets(ctx context.Context, added, lost map[string][]int32) (err error) { + // If cooperative consuming, we may have to resume fetches. See the + // comment on adjustCooperativeFetchOffsets. If we successfully fetch, + // we clear what we were fetching. + if g.cooperative { + added = g.adjustCooperativeFetchOffsets(added, lost) + defer func() { + if err == nil { + g.fetching = nil + } + }() + } + // Our client maps the v0 to v7 format to v8+ when sharding this // request, if we are only requesting one group, as well as maps the // response back, so we do not need to worry about v8+ here. @@ -1072,17 +1158,14 @@ start: req := kmsg.NewPtrOffsetFetchRequest() req.Group = g.cfg.group req.RequireStable = g.cfg.requireStable - for topic, partitions := range newAssigned { + for topic, partitions := range added { reqTopic := kmsg.NewOffsetFetchRequestTopic() reqTopic.Topic = topic reqTopic.Partitions = partitions req.Topics = append(req.Topics, reqTopic) } - var ( - resp *kmsg.OffsetFetchResponse - err error - ) + var resp *kmsg.OffsetFetchResponse fetchDone := make(chan struct{}) go func() { @@ -1092,7 +1175,7 @@ start: select { case <-fetchDone: case <-ctx.Done(): - g.cfg.logger.Log(LogLevelError, "fetch offsets failed due to context cancelation", "group", g.cfg.group) + g.cfg.logger.Log(LogLevelInfo, "fetch offsets failed due to context cancelation", "group", g.cfg.group) return ctx.Err() } if err != nil { @@ -1126,6 +1209,12 @@ start: goto start } } + g.cfg.logger.Log(LogLevelError, "fetch offsets failed", + "group", g.cfg.group, + "topic", rTopic.Topic, + "partition", rPartition.Partition, + "err", err, + ) return err } offset := Offset{