diff --git a/pkg/kgo/consumer_group.go b/pkg/kgo/consumer_group.go index 6b7b8a68..9908f823 100644 --- a/pkg/kgo/consumer_group.go +++ b/pkg/kgo/consumer_group.go @@ -50,8 +50,11 @@ type groupConsumer struct { heartbeatForceCh chan func(error) // The following two are only updated in the manager / join&sync loop - lastAssigned map[string][]int32 // only updated in join&sync loop - nowAssigned map[string][]int32 // only updated in join&sync loop + // The nowAssigned map is read when commits fail: if the commit fails + // with ILLEGAL_GENERATION and it contains only partitions that are in + // nowAssigned, we re-issue. + lastAssigned map[string][]int32 + nowAssigned amtps // Fetching ensures we continue fetching offsets across cooperative // rebalance if an offset fetch returns early due to an immediate @@ -298,12 +301,12 @@ func (g *groupConsumer) manage() { // onRevoked, but since we are handling this case for // the cooperative consumer we may as well just also // include the eager consumer. - g.cfg.onRevoked(g.cl.ctx, g.cl, g.nowAssigned) + g.cfg.onRevoked(g.cl.ctx, g.cl, g.nowAssigned.read()) } else { // Any other error is perceived as a fatal error, // and we go into OnLost as appropriate. if g.cfg.onLost != nil { - g.cfg.onLost(g.cl.ctx, g.cl, g.nowAssigned) + g.cfg.onLost(g.cl.ctx, g.cl, g.nowAssigned.read()) } hook() } @@ -317,7 +320,7 @@ func (g *groupConsumer) manage() { g.uncommitted = nil g.mu.Unlock() - g.nowAssigned = nil + g.nowAssigned.store(nil) g.lastAssigned = nil g.fetching = nil @@ -403,19 +406,20 @@ func (g *groupConsumer) leave() (wait func()) { // returns the difference of g.nowAssigned and g.lastAssigned. func (g *groupConsumer) diffAssigned() (added, lost map[string][]int32) { + nowAssigned := g.nowAssigned.clone() if g.lastAssigned == nil { - return g.nowAssigned, nil + return nowAssigned, nil } - added = make(map[string][]int32, len(g.nowAssigned)) - lost = make(map[string][]int32, len(g.nowAssigned)) + added = make(map[string][]int32, len(nowAssigned)) + lost = make(map[string][]int32, len(nowAssigned)) // First, we diff lasts: any topic in last but not now is lost, // otherwise, (1) new partitions are added, (2) common partitions are // ignored, and (3) partitions no longer in now are lost. lasts := make(map[int32]struct{}, 100) for topic, lastPartitions := range g.lastAssigned { - nowPartitions, exists := g.nowAssigned[topic] + nowPartitions, exists := nowAssigned[topic] if !exists { lost[topic] = lastPartitions continue @@ -444,7 +448,7 @@ func (g *groupConsumer) diffAssigned() (added, lost map[string][]int32) { } // Finally, any new topics in now assigned are strictly added. - for topic, nowPartitions := range g.nowAssigned { + for topic, nowPartitions := range nowAssigned { if _, exists := g.lastAssigned[topic]; !exists { added[topic] = nowPartitions } @@ -488,14 +492,14 @@ func (g *groupConsumer) revoke(stage revokeStage, lost map[string][]int32, leavi g.c.mu.Unlock() if !g.cooperative { - g.cfg.logger.Log(LogLevelInfo, "eager consumer revoking prior assigned partitions", "group", g.cfg.group, "revoking", g.nowAssigned) + g.cfg.logger.Log(LogLevelInfo, "eager consumer revoking prior assigned partitions", "group", g.cfg.group, "revoking", g.nowAssigned.read()) } else { - g.cfg.logger.Log(LogLevelInfo, "cooperative consumer revoking prior assigned partitions because leaving group", "group", g.cfg.group, "revoking", g.nowAssigned) + g.cfg.logger.Log(LogLevelInfo, "cooperative consumer revoking prior assigned partitions because leaving group", "group", g.cfg.group, "revoking", g.nowAssigned.read()) } if g.cfg.onRevoked != nil { - g.cfg.onRevoked(g.cl.ctx, g.cl, g.nowAssigned) + g.cfg.onRevoked(g.cl.ctx, g.cl, g.nowAssigned.read()) } - g.nowAssigned = nil + g.nowAssigned.store(nil) // After nilling uncommitted here, nothing should recreate // uncommitted until a future fetch after the group is @@ -521,17 +525,19 @@ func (g *groupConsumer) revoke(stage revokeStage, lost map[string][]int32, leavi // which causes a new metadata request -- in short, this could // be concurrent with a metadata findNewAssignments, so we // lock. - g.mu.Lock() - for topic, partitions := range g.nowAssigned { - if _, exists := g.using[topic]; !exists { - if lost == nil { - lost = make(map[string][]int32) + g.nowAssigned.write(func(nowAssigned map[string][]int32) { + g.mu.Lock() + for topic, partitions := range nowAssigned { + if _, exists := g.using[topic]; !exists { + if lost == nil { + lost = make(map[string][]int32) + } + lost[topic] = partitions + delete(nowAssigned, topic) } - lost[topic] = partitions - delete(g.nowAssigned, topic) } - } - g.mu.Unlock() + g.mu.Unlock() + }) } if len(lost) > 0 { @@ -685,7 +691,7 @@ func (g *groupConsumer) setupAssignedAndHeartbeat() (string, error) { s := newAssignRevokeSession() added, lost := g.diffAssigned() - g.cfg.logger.Log(LogLevelInfo, "new group session begun", "group", g.cfg.group, "added", tpsFmt(added), "lost", tpsFmt(lost)) + g.cfg.logger.Log(LogLevelInfo, "new group session begun", "group", g.cfg.group, "added", mtps(added), "lost", mtps(lost)) s.prerevoke(g, lost) // for cooperative consumers // Since we have joined the group, we immediately begin heartbeating. @@ -1192,14 +1198,14 @@ func (g *groupConsumer) handleSyncResp(protocol string, resp *kmsg.SyncGroupResp return err } - g.cfg.logger.Log(LogLevelInfo, "synced", "group", g.cfg.group, "assigned", tpsFmt(assigned)) + g.cfg.logger.Log(LogLevelInfo, "synced", "group", g.cfg.group, "assigned", mtps(assigned)) // Past this point, we will fall into the setupAssigned prerevoke code, // meaning for cooperative, we will revoke what we need to. if g.cooperative { - g.lastAssigned = g.nowAssigned + g.lastAssigned = g.nowAssigned.clone() } - g.nowAssigned = assigned + g.nowAssigned.store(assigned) return nil } @@ -1210,10 +1216,7 @@ func (g *groupConsumer) joinGroupProtocols() []kmsg.JoinGroupRequestProtocol { for topic := range g.using { topics = append(topics, topic) } - nowDup := make(map[string][]int32) // deep copy to allow modifications - for topic, partitions := range g.nowAssigned { - nowDup[topic] = append([]int32(nil), partitions...) - } + nowDup := g.nowAssigned.clone() // deep copy to allow modifications gen := g.generation g.mu.Unlock() @@ -2333,13 +2336,30 @@ func (g *groupConsumer) defaultRevoke(context.Context, *Client, map[string][]int } } -// commit is the logic for Commit; see Commit's documentation -// -// This is called under the groupConsumer's lock. +// commit is the first step of actually committing; see doc below. func (g *groupConsumer) commit( ctx context.Context, uncommitted map[string]map[int32]EpochOffset, onDone func(*Client, *kmsg.OffsetCommitRequest, *kmsg.OffsetCommitResponse, error), +) { + g.commitAcrossRebalance(ctx, uncommitted, onDone, 1) +} + +// commitAcrossRebalances, called under the group mu, actually issues a commit. +// This function retries committing up to *once*. In standard mode of +// consuming, if a cooperative rebalance happens, a user may commit records +// while the client is rebalancing. This can cause ILLEGAL_GENERATION or +// REBALANCE_IN_PROGRESS errors. If we see either of those errors (once, to +// prevent spin looping), we re-issue the commit. See #137 for an example. +// +// We only try this logic for a cooperative group. Non-cooperative groups give +// up all their partitions on rebalance and do not continue to consume during +// the rebalancing. +func (g *groupConsumer) commitAcrossRebalance( + ctx context.Context, + uncommitted map[string]map[int32]EpochOffset, + onDone func(*Client, *kmsg.OffsetCommitRequest, *kmsg.OffsetCommitResponse, error), + tries int8, ) { if onDone == nil { // note we must always call onDone onDone = func(*Client, *kmsg.OffsetCommitRequest, *kmsg.OffsetCommitResponse, error) {} @@ -2351,6 +2371,69 @@ func (g *groupConsumer) commit( return } + // We retry up to twice (three tries total): cooperative rebalancing + // uses two back to back rebalances, and the commit could + // pathologically end during both. A third failure is unexpected. + if g.cooperative && tries < 3 { + origDone := onDone + onDone = func(cl *Client, req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) { + retry := err == nil + + // Per package docs: if all partitions indicate rebalancing + // or illegal generation, we re-issue the commit. + if retry { + checkErr: + for i := range resp.Topics { + t := &resp.Topics[i] + for j := range t.Partitions { + p := &t.Partitions[j] + err := kerr.ErrorForCode(p.ErrorCode) + retry = retry && (err == kerr.RebalanceInProgress || err == kerr.IllegalGeneration) + if !retry { + break checkErr + } + } + } + + // All errors are generation or rebalance. We now check + // if we are still assigned everything in the commit. + nowAssigned := g.nowAssigned.read() + mps := make(map[int32]struct{}) + checkAssign: + for i := range resp.Topics { + t := &resp.Topics[i] + ps, exists := nowAssigned[t.Topic] + if retry = exists; !exists { + break checkAssign // no longer assigned this topic + } + for p := range mps { + delete(mps, p) + } + for _, p := range ps { + mps[p] = struct{}{} + } + for j := range t.Partitions { + p := &t.Partitions[j] + _, exists := mps[p.Partition] + if retry = exists; !exists { + break checkAssign // no longer assigned this partition + } + } + } + } + + if retry { + go func() { + g.mu.Lock() + defer g.mu.Unlock() + g.commitAcrossRebalance(ctx, uncommitted, origDone, tries+1) + }() + } else { + origDone(cl, req, resp, err) + } + } + } + priorCancel := g.commitCancel priorDone := g.commitDone diff --git a/pkg/kgo/topics_and_partitions.go b/pkg/kgo/topics_and_partitions.go index 1f7671b2..5872dff3 100644 --- a/pkg/kgo/topics_and_partitions.go +++ b/pkg/kgo/topics_and_partitions.go @@ -2,6 +2,7 @@ package kgo import ( "fmt" + "sort" "strings" "sync" "sync/atomic" @@ -17,15 +18,53 @@ func dupmsi32(m map[string]int32) map[string]int32 { return d } -type tpsFmt map[string][]int32 +// "Atomic map of topic partitions", for lack of a better name at this point. +type amtps struct { + v atomic.Value +} + +func (a *amtps) read() map[string][]int32 { + v := a.v.Load() + if v == nil { + return nil + } + return v.(map[string][]int32) +} + +func (a *amtps) write(fn func(map[string][]int32)) { + dup := a.clone() + fn(dup) + a.store(dup) +} -func (f tpsFmt) String() string { +func (a *amtps) clone() map[string][]int32 { + orig := a.read() + dup := make(map[string][]int32, len(orig)) + for t, ps := range orig { + dup[t] = append(dup[t], ps...) + } + return dup +} + +func (a *amtps) store(m map[string][]int32) { a.v.Store(m) } + +type mtps map[string][]int32 + +func (m mtps) String() string { var sb strings.Builder var topicsWritten int - for topic, partitions := range f { + ts := make([]string, 0, len(m)) + var ps []int32 + for t := range m { + ts = append(ts, t) + } + sort.Strings(ts) + for _, t := range ts { + ps = append(ps[:0], m[t]...) + sort.Slice(ps, func(i, j int) bool { return ps[i] < ps[j] }) topicsWritten++ - fmt.Fprintf(&sb, "%s%v", topic, partitions) - if topicsWritten < len(f) { + fmt.Fprintf(&sb, "%s%v", t, ps) + if topicsWritten < len(m) { sb.WriteString(", ") } }