From d11066fdbc8d2b781529175bf7b4ac126aadf440 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Sun, 27 Feb 2022 12:06:23 -0700 Subject: [PATCH] committing: internally retry on some errors when cooperative See #137. Cooperative consumers can consume during rebalancing. If they commit at the start of a rebalance that ends after, then the commit will fail with ILLEGAL_GENERATION (or sometimes, REBALANCE_IN_PROGRESS). For cooperative specifically, if the commit fails but the consumer still owns all partitions being committed, we now retry the commit once. This should help alleviate commit errors that well written consumers are currently running into. We retry up to twice because the rebalancing when cooperative results in two rebalances. A third failure is more unexpected. --- pkg/kgo/consumer_group.go | 155 ++++++++++++++++++++++++------- pkg/kgo/topics_and_partitions.go | 49 +++++++++- 2 files changed, 165 insertions(+), 39 deletions(-) diff --git a/pkg/kgo/consumer_group.go b/pkg/kgo/consumer_group.go index 6b7b8a68..c65b3052 100644 --- a/pkg/kgo/consumer_group.go +++ b/pkg/kgo/consumer_group.go @@ -50,8 +50,11 @@ type groupConsumer struct { heartbeatForceCh chan func(error) // The following two are only updated in the manager / join&sync loop - lastAssigned map[string][]int32 // only updated in join&sync loop - nowAssigned map[string][]int32 // only updated in join&sync loop + // The nowAssigned map is read when commits fail: if the commit fails + // with ILLEGAL_GENERATION and it contains only partitions that are in + // nowAssigned, we re-issue. + lastAssigned map[string][]int32 + nowAssigned amtps // Fetching ensures we continue fetching offsets across cooperative // rebalance if an offset fetch returns early due to an immediate @@ -298,12 +301,12 @@ func (g *groupConsumer) manage() { // onRevoked, but since we are handling this case for // the cooperative consumer we may as well just also // include the eager consumer. - g.cfg.onRevoked(g.cl.ctx, g.cl, g.nowAssigned) + g.cfg.onRevoked(g.cl.ctx, g.cl, g.nowAssigned.read()) } else { // Any other error is perceived as a fatal error, // and we go into OnLost as appropriate. if g.cfg.onLost != nil { - g.cfg.onLost(g.cl.ctx, g.cl, g.nowAssigned) + g.cfg.onLost(g.cl.ctx, g.cl, g.nowAssigned.read()) } hook() } @@ -317,7 +320,7 @@ func (g *groupConsumer) manage() { g.uncommitted = nil g.mu.Unlock() - g.nowAssigned = nil + g.nowAssigned.store(nil) g.lastAssigned = nil g.fetching = nil @@ -403,19 +406,20 @@ func (g *groupConsumer) leave() (wait func()) { // returns the difference of g.nowAssigned and g.lastAssigned. func (g *groupConsumer) diffAssigned() (added, lost map[string][]int32) { + nowAssigned := g.nowAssigned.clone() if g.lastAssigned == nil { - return g.nowAssigned, nil + return nowAssigned, nil } - added = make(map[string][]int32, len(g.nowAssigned)) - lost = make(map[string][]int32, len(g.nowAssigned)) + added = make(map[string][]int32, len(nowAssigned)) + lost = make(map[string][]int32, len(nowAssigned)) // First, we diff lasts: any topic in last but not now is lost, // otherwise, (1) new partitions are added, (2) common partitions are // ignored, and (3) partitions no longer in now are lost. lasts := make(map[int32]struct{}, 100) for topic, lastPartitions := range g.lastAssigned { - nowPartitions, exists := g.nowAssigned[topic] + nowPartitions, exists := nowAssigned[topic] if !exists { lost[topic] = lastPartitions continue @@ -444,7 +448,7 @@ func (g *groupConsumer) diffAssigned() (added, lost map[string][]int32) { } // Finally, any new topics in now assigned are strictly added. - for topic, nowPartitions := range g.nowAssigned { + for topic, nowPartitions := range nowAssigned { if _, exists := g.lastAssigned[topic]; !exists { added[topic] = nowPartitions } @@ -488,14 +492,14 @@ func (g *groupConsumer) revoke(stage revokeStage, lost map[string][]int32, leavi g.c.mu.Unlock() if !g.cooperative { - g.cfg.logger.Log(LogLevelInfo, "eager consumer revoking prior assigned partitions", "group", g.cfg.group, "revoking", g.nowAssigned) + g.cfg.logger.Log(LogLevelInfo, "eager consumer revoking prior assigned partitions", "group", g.cfg.group, "revoking", g.nowAssigned.read()) } else { - g.cfg.logger.Log(LogLevelInfo, "cooperative consumer revoking prior assigned partitions because leaving group", "group", g.cfg.group, "revoking", g.nowAssigned) + g.cfg.logger.Log(LogLevelInfo, "cooperative consumer revoking prior assigned partitions because leaving group", "group", g.cfg.group, "revoking", g.nowAssigned.read()) } if g.cfg.onRevoked != nil { - g.cfg.onRevoked(g.cl.ctx, g.cl, g.nowAssigned) + g.cfg.onRevoked(g.cl.ctx, g.cl, g.nowAssigned.read()) } - g.nowAssigned = nil + g.nowAssigned.store(nil) // After nilling uncommitted here, nothing should recreate // uncommitted until a future fetch after the group is @@ -521,17 +525,19 @@ func (g *groupConsumer) revoke(stage revokeStage, lost map[string][]int32, leavi // which causes a new metadata request -- in short, this could // be concurrent with a metadata findNewAssignments, so we // lock. - g.mu.Lock() - for topic, partitions := range g.nowAssigned { - if _, exists := g.using[topic]; !exists { - if lost == nil { - lost = make(map[string][]int32) + g.nowAssigned.write(func(nowAssigned map[string][]int32) { + g.mu.Lock() + for topic, partitions := range nowAssigned { + if _, exists := g.using[topic]; !exists { + if lost == nil { + lost = make(map[string][]int32) + } + lost[topic] = partitions + delete(nowAssigned, topic) } - lost[topic] = partitions - delete(g.nowAssigned, topic) } - } - g.mu.Unlock() + g.mu.Unlock() + }) } if len(lost) > 0 { @@ -685,7 +691,7 @@ func (g *groupConsumer) setupAssignedAndHeartbeat() (string, error) { s := newAssignRevokeSession() added, lost := g.diffAssigned() - g.cfg.logger.Log(LogLevelInfo, "new group session begun", "group", g.cfg.group, "added", tpsFmt(added), "lost", tpsFmt(lost)) + g.cfg.logger.Log(LogLevelInfo, "new group session begun", "group", g.cfg.group, "added", mtps(added), "lost", mtps(lost)) s.prerevoke(g, lost) // for cooperative consumers // Since we have joined the group, we immediately begin heartbeating. @@ -1192,14 +1198,14 @@ func (g *groupConsumer) handleSyncResp(protocol string, resp *kmsg.SyncGroupResp return err } - g.cfg.logger.Log(LogLevelInfo, "synced", "group", g.cfg.group, "assigned", tpsFmt(assigned)) + g.cfg.logger.Log(LogLevelInfo, "synced", "group", g.cfg.group, "assigned", mtps(assigned)) // Past this point, we will fall into the setupAssigned prerevoke code, // meaning for cooperative, we will revoke what we need to. if g.cooperative { - g.lastAssigned = g.nowAssigned + g.lastAssigned = g.nowAssigned.clone() } - g.nowAssigned = assigned + g.nowAssigned.store(assigned) return nil } @@ -1210,10 +1216,7 @@ func (g *groupConsumer) joinGroupProtocols() []kmsg.JoinGroupRequestProtocol { for topic := range g.using { topics = append(topics, topic) } - nowDup := make(map[string][]int32) // deep copy to allow modifications - for topic, partitions := range g.nowAssigned { - nowDup[topic] = append([]int32(nil), partitions...) - } + nowDup := g.nowAssigned.clone() // deep copy to allow modifications gen := g.generation g.mu.Unlock() @@ -2333,13 +2336,30 @@ func (g *groupConsumer) defaultRevoke(context.Context, *Client, map[string][]int } } -// commit is the logic for Commit; see Commit's documentation -// -// This is called under the groupConsumer's lock. +// commit is the first step of actually committing; see doc below. func (g *groupConsumer) commit( ctx context.Context, uncommitted map[string]map[int32]EpochOffset, onDone func(*Client, *kmsg.OffsetCommitRequest, *kmsg.OffsetCommitResponse, error), +) { + g.commitAcrossRebalance(ctx, uncommitted, onDone, 1) +} + +// commitAcrossRebalances, called under the group mu, actually issues a commit. +// This function retries committing up to *once*. In standard mode of +// consuming, if a cooperative rebalance happens, a user may commit records +// while the client is rebalancing. This can cause ILLEGAL_GENERATION or +// REBALANCE_IN_PROGRESS errors. If we see either of those errors (once, to +// prevent spin looping), we re-issue the commit. See #137 for an example. +// +// We only try this logic for a cooperative group. Non-cooperative groups give +// up all their partitions on rebalance and do not continue to consume during +// the rebalancing. +func (g *groupConsumer) commitAcrossRebalance( + ctx context.Context, + uncommitted map[string]map[int32]EpochOffset, + onDone func(*Client, *kmsg.OffsetCommitRequest, *kmsg.OffsetCommitResponse, error), + tries int8, ) { if onDone == nil { // note we must always call onDone onDone = func(*Client, *kmsg.OffsetCommitRequest, *kmsg.OffsetCommitResponse, error) {} @@ -2351,6 +2371,73 @@ func (g *groupConsumer) commit( return } + // We retry up to twice (three tries total): cooperative rebalancing + // uses two back to back rebalances, and the commit could + // pathologically end during both. A third failure is unexpected. + if g.cooperative && tries < 3 { + origDone := onDone + onDone = func(cl *Client, req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) { + retry := err == nil + var retryErr error + + // Per package docs: if all partitions indicate rebalancing + // or illegal generation, we re-issue the commit. + if retry { + checkErr: + for i := range resp.Topics { + t := &resp.Topics[i] + for j := range t.Partitions { + p := &t.Partitions[j] + retryErr = kerr.ErrorForCode(p.ErrorCode) + retry = retry && (retryErr == kerr.RebalanceInProgress || retryErr == kerr.IllegalGeneration) + if !retry { + break checkErr + } + } + } + } + + if retry { + // All errors are generation or rebalance. We now check + // if we are still assigned everything in the commit. + nowAssigned := g.nowAssigned.read() + mps := make(map[int32]struct{}) + checkAssign: + for i := range resp.Topics { + t := &resp.Topics[i] + ps, exists := nowAssigned[t.Topic] + if retry = exists; !exists { + break checkAssign // no longer assigned this topic + } + for p := range mps { + delete(mps, p) + } + for _, p := range ps { + mps[p] = struct{}{} + } + for j := range t.Partitions { + p := &t.Partitions[j] + _, exists := mps[p.Partition] + if retry = exists; !exists { + break checkAssign // no longer assigned this partition + } + } + } + } + + if retry { + go func() { + g.cl.cfg.logger.Log(LogLevelInfo, "CommitOffsets spanned a rebalance, we are cooperative and did not lose any partition we were trying to commit, recommitting", "err", retryErr) + g.mu.Lock() + defer g.mu.Unlock() + g.commitAcrossRebalance(ctx, uncommitted, origDone, tries+1) + }() + } else { + origDone(cl, req, resp, err) + } + } + } + priorCancel := g.commitCancel priorDone := g.commitDone diff --git a/pkg/kgo/topics_and_partitions.go b/pkg/kgo/topics_and_partitions.go index 1f7671b2..5872dff3 100644 --- a/pkg/kgo/topics_and_partitions.go +++ b/pkg/kgo/topics_and_partitions.go @@ -2,6 +2,7 @@ package kgo import ( "fmt" + "sort" "strings" "sync" "sync/atomic" @@ -17,15 +18,53 @@ func dupmsi32(m map[string]int32) map[string]int32 { return d } -type tpsFmt map[string][]int32 +// "Atomic map of topic partitions", for lack of a better name at this point. +type amtps struct { + v atomic.Value +} + +func (a *amtps) read() map[string][]int32 { + v := a.v.Load() + if v == nil { + return nil + } + return v.(map[string][]int32) +} + +func (a *amtps) write(fn func(map[string][]int32)) { + dup := a.clone() + fn(dup) + a.store(dup) +} -func (f tpsFmt) String() string { +func (a *amtps) clone() map[string][]int32 { + orig := a.read() + dup := make(map[string][]int32, len(orig)) + for t, ps := range orig { + dup[t] = append(dup[t], ps...) + } + return dup +} + +func (a *amtps) store(m map[string][]int32) { a.v.Store(m) } + +type mtps map[string][]int32 + +func (m mtps) String() string { var sb strings.Builder var topicsWritten int - for topic, partitions := range f { + ts := make([]string, 0, len(m)) + var ps []int32 + for t := range m { + ts = append(ts, t) + } + sort.Strings(ts) + for _, t := range ts { + ps = append(ps[:0], m[t]...) + sort.Slice(ps, func(i, j int) bool { return ps[i] < ps[j] }) topicsWritten++ - fmt.Fprintf(&sb, "%s%v", topic, partitions) - if topicsWritten < len(f) { + fmt.Fprintf(&sb, "%s%v", t, ps) + if topicsWritten < len(m) { sb.WriteString(", ") } }