diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index d5d6dc0d..ac0b068d 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -645,6 +645,7 @@ func (cl *Client) shardedRequest(ctx context.Context, req kmsg.Request) ([]Respo // to fall into the handleCoordinatorReq logic. switch req.(type) { case *kmsg.ListOffsetsRequest, // key 2 + *kmsg.OffsetFetchRequest, // key 9 *kmsg.DescribeGroupsRequest, // key 15 *kmsg.ListGroupsRequest, // key 16 *kmsg.DeleteRecordsRequest, // key 21 @@ -787,19 +788,18 @@ type coordinatorLoad struct { } // loadController returns the group/txn coordinator for the given key, retrying -// as necessary. If reload is true, this does not used a cache coordinator. -func (cl *Client) loadCoordinator(reload bool, ctx context.Context, key coordinatorKey) (*broker, error) { +// as necessary. Any non-retriable error does not cache the coordinator. +func (cl *Client) loadCoordinator(ctx context.Context, key coordinatorKey) (*broker, error) { cl.coordinatorsMu.Lock() c, ok := cl.coordinators[key] - if reload || !ok { + if !ok { c = &coordinatorLoad{ done: make(chan struct{}), // all requests for the same coordinator get collapsed into one } defer func() { // If our load fails, we avoid caching the coordinator, // but only if something else has not already replaced - // our pointer. We could be overwritten by a function - // setting reload to true. + // our pointer. if c.err != nil { cl.coordinatorsMu.Lock() if existing, ok := cl.coordinators[key]; ok && c == existing { @@ -813,7 +813,7 @@ func (cl *Client) loadCoordinator(reload bool, ctx context.Context, key coordina } cl.coordinatorsMu.Unlock() - if !reload && ok { + if ok { <-c.done if c.err != nil { return nil, c.err @@ -871,42 +871,40 @@ func (cl *Client) maybeDeleteStaleCoordinator(name string, typ int8, err error) return false } +type brokerOrErr struct { + b *broker + err error +} + // loadCoordinators does a concurrent load of many coordinators. -func (cl *Client) loadCoordinators(reload bool, typ int8, names ...string) (map[string]*broker, error) { - ctx, cancel := context.WithCancel(cl.ctx) - defer cancel() +func (cl *Client) loadCoordinators(typ int8, names ...string) map[string]brokerOrErr { + uniq := make(map[string]struct{}) + for _, name := range names { + uniq[name] = struct{}{} + } var mu sync.Mutex - m := make(map[string]*broker) - var firstErr error + m := make(map[string]brokerOrErr) var wg sync.WaitGroup - for _, name := range names { + for name := range uniq { myName := name wg.Add(1) go func() { defer wg.Done() - coordinator, err := cl.loadCoordinator(reload, ctx, coordinatorKey{ + coordinator, err := cl.loadCoordinator(cl.ctx, coordinatorKey{ name: myName, typ: typ, }) mu.Lock() defer mu.Unlock() - - if err != nil { - if firstErr == nil { - firstErr = err - cancel() - } - return - } - m[myName] = coordinator + m[myName] = brokerOrErr{coordinator, err} }() } wg.Wait() - return m, firstErr + return m } func (cl *Client) handleAdminReq(ctx context.Context, req kmsg.Request) ResponseShard { @@ -1010,8 +1008,6 @@ func (cl *Client) handleCoordinatorReq(ctx context.Context, req kmsg.Request, ty return cl.handleCoordinatorReqSimple(ctx, coordinatorTypeGroup, t.Group, req) case *kmsg.TxnOffsetCommitRequest: return cl.handleCoordinatorReqSimple(ctx, coordinatorTypeGroup, t.Group, req) - case *kmsg.OffsetFetchRequest: - return cl.handleCoordinatorReqSimple(ctx, coordinatorTypeGroup, t.Group, req) case *kmsg.JoinGroupRequest: return cl.handleCoordinatorReqSimple(ctx, coordinatorTypeGroup, t.Group, req) case *kmsg.HeartbeatRequest: @@ -1032,7 +1028,7 @@ func (cl *Client) handleCoordinatorReq(ctx context.Context, req kmsg.Request, ty // coordinator is deleted. func (cl *Client) handleCoordinatorReqSimple(ctx context.Context, typ int8, name string, req kmsg.Request) ResponseShard { coordinator, resp, err := cl.handleReqWithCoordinator(ctx, func() (*broker, error) { - return cl.loadCoordinator(false, ctx, coordinatorKey{ + return cl.loadCoordinator(ctx, coordinatorKey{ name: name, typ: typ, }) @@ -1078,12 +1074,6 @@ func (cl *Client) handleReqWithCoordinator( if len(t.Topics) > 0 && len(t.Topics[0].Partitions) > 0 { code = t.Topics[0].Partitions[0].ErrorCode } - case *kmsg.OffsetFetchResponse: - if t.Version >= 2 { - code = t.ErrorCode - } else if len(t.Topics) > 0 && len(t.Topics[0].Partitions) > 0 { - code = t.Topics[0].Partitions[0].ErrorCode - } case *kmsg.JoinGroupResponse: code = t.ErrorCode case *kmsg.HeartbeatResponse: @@ -1094,7 +1084,7 @@ func (cl *Client) handleReqWithCoordinator( code = t.ErrorCode } - // Describe and Delete handled in sharding. + // List, OffsetFetch, Describe, and Delete handled in sharding. if err := kerr.ErrorForCode(code); cl.maybeDeleteStaleCoordinator(name, typ, err) { return err @@ -1244,10 +1234,24 @@ type sharder interface { shard(context.Context, kmsg.Request) ([]issueShard, bool, error) // onResp is called on a successful response to investigate the - // response and potentially perform cleanup. + // response and potentially perform cleanup, and potentially returns an + // error signifying to retry. + // + // We consider a request entirely retriable if there is at least one + // retriable error, and all other errors are retriable or not an error. + // Any non-retriable error makes the request un-retriable. + // + // Generally we only perform this logic for group requests, because for + // non-group requests (topic / partition based requests), we load + // metadata immediately before issuing the request and thus we expect + // how we originally mapped the request to still be valid. For group + // requests, we use cached coordinators, so we may receive not + // coordinator errors once, after which we will delete the stale + // coordinator and remap. // - // We cannot retry responses that have retriable errors inside of them; - // doing so would require a very manual and tedious process: + // The most thorough approach would be to split the original request + // into retriable pieces and unretriable pieces, but this gets complicated + // fast. We would have to: // - pair all request partitions to the response partition (maybe the // response is missing some pieces because of a buggy kafka) // - split non-retriable pieces of the request & response: @@ -1257,15 +1261,7 @@ type sharder interface { // is retriable // - return the non-retriable request & response piece, and the retriable // request piece and err. - // - // Because the pairing is manual and tedious, and because the shard - // function above loads fresh metadata, we expect to not fall into - // stale metadata / coordinators before we issue the sharded requests. - // - // As well, for group describing or deleting, we force a load of the - // coordinators on every shard request. Thus, we do not expect the - // coordinators to be stale. - onResp(kmsg.Response) + onResp(kmsg.Request, kmsg.Response) error // merge is a function that can be used to merge sharded responses into // one response. This is used by the client.Request method. @@ -1280,6 +1276,8 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res switch req.(type) { case *kmsg.ListOffsetsRequest: sharder = &listOffsetsSharder{cl} + case *kmsg.OffsetFetchRequest: + sharder = &offsetFetchSharder{cl} case *kmsg.DescribeGroupsRequest: sharder = &describeGroupsSharder{cl} case *kmsg.ListGroupsRequest: @@ -1399,12 +1397,7 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res resp, err := broker.waitResp(ctx, myIssue.req) if err == nil { - // Successful responses may need to perform some - // response internal error checking cleanup. - // So, we call onResp, then keep the response. - sharder.onResp(resp) - addShard(shard(broker, myIssue.req, resp, nil)) - return + err = sharder.onResp(myIssue.req, resp) // perform some potential cleanup, and potentially receive an error to retry } // If we failed to issue the request, we *maybe* will retry. @@ -1424,7 +1417,7 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res return } - addShard(shard(broker, myIssue.req, nil, err)) // the error was not retriable + addShard(shard(broker, myIssue.req, resp, err)) // the error was not retriable }() } } @@ -1435,6 +1428,15 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res return shards, sharder.merge } +// Sets the error any amount of times to a retriable error, but sets to a +// non-retriable error once. +func onRespShardErr(err *error, newKerr error) { + if newKerr == nil || *err != nil && !kerr.IsRetriable(*err) { + return + } + *err = newKerr +} + // a convenience function for when a request needs to be issued identically to // all brokers. func (cl *Client) allBrokersShardedReq(ctx context.Context, fn func() kmsg.Request) ([]issueShard, bool, error) { @@ -1669,7 +1671,7 @@ func (cl *listOffsetsSharder) shard(ctx context.Context, kreq kmsg.Request) ([]i })...), true, nil // this is reshardable } -func (cl *listOffsetsSharder) onResp(kreq kmsg.Response) {} // metadata could be stale, but no cleanup we can do +func (cl *listOffsetsSharder) onResp(kmsg.Request, kmsg.Response) error { return nil } // topic / partitions: not retried func (cl *listOffsetsSharder) merge(sresps []ResponseShard) (kmsg.Response, error) { merged := new(kmsg.ListOffsetsResponse) @@ -1693,29 +1695,254 @@ func (cl *listOffsetsSharder) merge(sresps []ResponseShard) (kmsg.Response, erro return merged, firstErr } +// handles sharding OffsetFetchRequest +type offsetFetchSharder struct{ *Client } + +func offsetFetchReqToGroup(req *kmsg.OffsetFetchRequest) kmsg.OffsetFetchRequestGroup { + g := kmsg.NewOffsetFetchRequestGroup() + g.Group = req.Group + for _, topic := range req.Topics { + g.Topics = append(g.Topics, kmsg.OffsetFetchRequestGroupTopic{ + Topic: topic.Topic, + Partitions: topic.Partitions, + }) + } + return g +} + +func offsetFetchRespToGroup(req *kmsg.OffsetFetchRequest, resp *kmsg.OffsetFetchResponse) kmsg.OffsetFetchResponseGroup { + g := kmsg.OffsetFetchResponseGroup{ + Group: req.Group, + ErrorCode: resp.ErrorCode, + } + for _, topic := range resp.Topics { + t := kmsg.OffsetFetchResponseGroupTopic{ + Topic: topic.Topic, + } + for _, partition := range topic.Partitions { + t.Partitions = append(t.Partitions, kmsg.OffsetFetchResponseGroupTopicPartition{ + Partition: partition.Partition, + Offset: partition.Offset, + LeaderEpoch: partition.LeaderEpoch, + Metadata: partition.Metadata, + ErrorCode: partition.ErrorCode, + }) + } + g.Topics = append(g.Topics, t) + } + return g +} + +func offsetFetchRespGroupIntoResp(g kmsg.OffsetFetchResponseGroup, into *kmsg.OffsetFetchResponse) { + into.ErrorCode = g.ErrorCode + into.Topics = into.Topics[:0] + for _, topic := range g.Topics { + t := kmsg.OffsetFetchResponseTopic{ + Topic: topic.Topic, + } + for _, partition := range topic.Partitions { + t.Partitions = append(t.Partitions, kmsg.OffsetFetchResponseTopicPartition{ + Partition: partition.Partition, + Offset: partition.Offset, + LeaderEpoch: partition.LeaderEpoch, + Metadata: partition.Metadata, + ErrorCode: partition.ErrorCode, + }) + } + into.Topics = append(into.Topics, t) + } +} + +func (cl *offsetFetchSharder) shard(ctx context.Context, kreq kmsg.Request) ([]issueShard, bool, error) { + req := kreq.(*kmsg.OffsetFetchRequest) + + groups := make([]string, 0, len(req.Groups)+1) + if len(req.Groups) == 0 { // v0-v7 + groups = append(groups, req.Group) + } + for i := range req.Groups { // v8+ + groups = append(groups, req.Groups[i].Group) + } + + coordinators := cl.loadCoordinators(coordinatorTypeGroup, groups...) + + // If there is only the top level group, then we simply return our + // request mapped to its specific broker. For forward compatibility, we + // also embed the top level request into the Groups list: this allows + // operators of old request versions (v0-v7) to issue a v8 request + // appropriately. On response, if the length of groups is 1, we merge + // the first item back to the top level. + if len(req.Groups) == 0 { + berr := coordinators[req.Group] + if berr.err != nil { + return []issueShard{{ + req: req, + err: berr.err, + }}, false, nil // not reshardable, because this is an error + } + + dup := *req + brokerReq := &dup + brokerReq.Groups = append(brokerReq.Groups, offsetFetchReqToGroup(req)) + + return []issueShard{{ + req: brokerReq, + broker: berr.b.meta.NodeID, + }}, false, nil // reshardable to reload correct coordinator + } + + // v8+ behavior: we have multiple groups. + // + // Loading coordinators can have each group fail with its unique error, + // or with a kerr.Error that can be merged. Unique errors get their own + // failure shard, while kerr.Error's get merged. + type unkerr struct { + err error + group kmsg.OffsetFetchRequestGroup + } + var ( + brokerReqs = make(map[int32]*kmsg.OffsetFetchRequest) + kerrs = make(map[*kerr.Error][]kmsg.OffsetFetchRequestGroup) + unkerrs []unkerr + ) + + newReq := func(groups ...kmsg.OffsetFetchRequestGroup) *kmsg.OffsetFetchRequest { + return &kmsg.OffsetFetchRequest{ + RequireStable: req.RequireStable, + Groups: groups, + } + } + + for _, group := range req.Groups { + berr := coordinators[group.Group] + var ke *kerr.Error + switch { + case berr.err == nil: + brokerReq := brokerReqs[berr.b.meta.NodeID] + if brokerReq == nil { + brokerReq = newReq() + brokerReqs[berr.b.meta.NodeID] = brokerReq + } + brokerReq.Groups = append(brokerReq.Groups, group) + case errors.As(berr.err, &ke): + kerrs[ke] = append(kerrs[ke], group) + default: + unkerrs = append(unkerrs, unkerr{berr.err, group}) + } + } + + var issues []issueShard + for id, req := range brokerReqs { + issues = append(issues, issueShard{ + req: req, + broker: id, + }) + } + for _, unkerr := range unkerrs { + issues = append(issues, issueShard{ + req: newReq(unkerr.group), + err: unkerr.err, + }) + } + for kerr, groups := range kerrs { + issues = append(issues, issueShard{ + req: newReq(groups...), + err: kerr, + }) + } + + return issues, true, nil // reshardable to load correct coordinators +} + +func (cl *offsetFetchSharder) onResp(kreq kmsg.Request, kresp kmsg.Response) error { + req := kreq.(*kmsg.OffsetFetchRequest) + resp := kresp.(*kmsg.OffsetFetchResponse) + + switch len(resp.Groups) { + case 0: + resp.Groups = append(resp.Groups, offsetFetchRespToGroup(req, resp)) + case 1: + offsetFetchRespGroupIntoResp(resp.Groups[0], resp) + default: + } + + var retErr error + for i := range resp.Groups { + group := &resp.Groups[i] + err := kerr.ErrorForCode(group.ErrorCode) + cl.maybeDeleteStaleCoordinator(group.Group, coordinatorTypeGroup, err) + onRespShardErr(&retErr, err) + } + + // For a final bit of extra fun, v0 and v1 do not have a top level + // error code but instead a per-partition error code. If the + // coordinator is loading &c, then all per-partition error codes are + // the same so we only need to look at the first partition. + if resp.Version < 2 && len(resp.Topics) > 0 && len(resp.Topics[0].Partitions) > 0 { + code := resp.Topics[0].Partitions[0].ErrorCode + err := kerr.ErrorForCode(code) + cl.maybeDeleteStaleCoordinator(req.Group, coordinatorTypeGroup, err) + onRespShardErr(&retErr, err) + } + + return retErr +} + +func (cl *offsetFetchSharder) merge(sresps []ResponseShard) (kmsg.Response, error) { + merged := new(kmsg.OffsetFetchResponse) + + return merged, firstErrMerger(sresps, func(kresp kmsg.Response) { + resp := kresp.(*kmsg.OffsetFetchResponse) + merged.Version = resp.Version + merged.ThrottleMillis = resp.ThrottleMillis + merged.Groups = append(merged.Groups, resp.Groups...) + + if len(resp.Groups) == 1 { + offsetFetchRespGroupIntoResp(resp.Groups[0], merged) + } + }) +} + // handles sharding DescribeGroupsRequest type describeGroupsSharder struct{ *Client } func (cl *describeGroupsSharder) shard(ctx context.Context, kreq kmsg.Request) ([]issueShard, bool, error) { req := kreq.(*kmsg.DescribeGroupsRequest) - coordinators, err := cl.loadCoordinators(true, coordinatorTypeGroup, req.Groups...) - if err != nil { - return nil, false, err + coordinators := cl.loadCoordinators(coordinatorTypeGroup, req.Groups...) + type unkerr struct { + err error + group string } + var ( + brokerReqs = make(map[int32]*kmsg.DescribeGroupsRequest) + kerrs = make(map[*kerr.Error][]string) + unkerrs []unkerr + ) - brokerReqs := make(map[int32]*kmsg.DescribeGroupsRequest) + newReq := func(groups ...string) *kmsg.DescribeGroupsRequest { + return &kmsg.DescribeGroupsRequest{ + IncludeAuthorizedOperations: req.IncludeAuthorizedOperations, + Groups: groups, + } + } for _, group := range req.Groups { - broker := coordinators[group] - brokerReq := brokerReqs[broker.meta.NodeID] - if brokerReq == nil { - brokerReq = &kmsg.DescribeGroupsRequest{ - IncludeAuthorizedOperations: req.IncludeAuthorizedOperations, + berr := coordinators[group] + var ke *kerr.Error + switch { + case berr.err == nil: + brokerReq := brokerReqs[berr.b.meta.NodeID] + if brokerReq == nil { + brokerReq = newReq() + brokerReqs[berr.b.meta.NodeID] = brokerReq } - brokerReqs[broker.meta.NodeID] = brokerReq + brokerReq.Groups = append(brokerReq.Groups, group) + case errors.As(berr.err, &ke): + kerrs[ke] = append(kerrs[ke], group) + default: + unkerrs = append(unkerrs, unkerr{berr.err, group}) } - brokerReq.Groups = append(brokerReq.Groups, group) } var issues []issueShard @@ -1725,16 +1952,32 @@ func (cl *describeGroupsSharder) shard(ctx context.Context, kreq kmsg.Request) ( broker: id, }) } - return issues, true, nil // this is reshardable + for _, unkerr := range unkerrs { + issues = append(issues, issueShard{ + req: newReq(unkerr.group), + err: unkerr.err, + }) + } + for kerr, groups := range kerrs { + issues = append(issues, issueShard{ + req: newReq(groups...), + err: kerr, + }) + } + + return issues, true, nil // reshardable to load correct coordinators } -func (cl *describeGroupsSharder) onResp(kresp kmsg.Response) { // cleanup any stale groups +func (cl *describeGroupsSharder) onResp(_ kmsg.Request, kresp kmsg.Response) error { // cleanup any stale groups resp := kresp.(*kmsg.DescribeGroupsResponse) + var retErr error for i := range resp.Groups { group := &resp.Groups[i] err := kerr.ErrorForCode(group.ErrorCode) cl.maybeDeleteStaleCoordinator(group.Group, coordinatorTypeGroup, err) + onRespShardErr(&retErr, err) } + return retErr } func (cl *describeGroupsSharder) merge(sresps []ResponseShard) (kmsg.Response, error) { @@ -1759,7 +2002,10 @@ func (cl *listGroupsSharder) shard(ctx context.Context, kreq kmsg.Request) ([]is }) } -func (cl *listGroupsSharder) onResp(kresp kmsg.Response) {} // nothing to be done here +func (cl *listGroupsSharder) onResp(_ kmsg.Request, kresp kmsg.Response) error { + resp := kresp.(*kmsg.ListGroupsResponse) + return kerr.ErrorForCode(resp.ErrorCode) +} func (cl *listGroupsSharder) merge(sresps []ResponseShard) (kmsg.Response, error) { merged := new(kmsg.ListGroupsResponse) @@ -1845,7 +2091,7 @@ func (cl *deleteRecordsSharder) shard(ctx context.Context, kreq kmsg.Request) ([ })...), true, nil // this is reshardable } -func (cl *deleteRecordsSharder) onResp(kmsg.Response) {} // nothing to be done here +func (cl *deleteRecordsSharder) onResp(kmsg.Request, kmsg.Response) error { return nil } // topic / partitions: not retried func (cl *deleteRecordsSharder) merge(sresps []ResponseShard) (kmsg.Response, error) { merged := new(kmsg.DeleteRecordsResponse) @@ -1939,7 +2185,7 @@ func (cl *offsetForLeaderEpochSharder) shard(ctx context.Context, kreq kmsg.Requ })...), true, nil // this is reshardable } -func (cl *offsetForLeaderEpochSharder) onResp(kmsg.Response) {} +func (cl *offsetForLeaderEpochSharder) onResp(kmsg.Request, kmsg.Response) error { return nil } // topic / partitions: not retried func (cl *offsetForLeaderEpochSharder) merge(sresps []ResponseShard) (kmsg.Response, error) { merged := new(kmsg.OffsetForLeaderEpochResponse) @@ -2017,7 +2263,7 @@ func (cl *describeConfigsSharder) shard(ctx context.Context, kreq kmsg.Request) return issues, false, nil // this is not reshardable, but the any block can go anywhere } -func (cl *describeConfigsSharder) onResp(kmsg.Response) {} +func (cl *describeConfigsSharder) onResp(kmsg.Request, kmsg.Response) error { return nil } // configs: nothing retriable func (cl *describeConfigsSharder) merge(sresps []ResponseShard) (kmsg.Response, error) { merged := new(kmsg.DescribeConfigsResponse) @@ -2082,7 +2328,7 @@ func (cl *alterConfigsSharder) shard(ctx context.Context, kreq kmsg.Request) ([] return issues, false, nil // this is not reshardable, but the any block can go anywhere } -func (cl *alterConfigsSharder) onResp(kmsg.Response) {} +func (cl *alterConfigsSharder) onResp(kmsg.Request, kmsg.Response) error { return nil } // configs: nothing retriable func (cl *alterConfigsSharder) merge(sresps []ResponseShard) (kmsg.Response, error) { merged := new(kmsg.AlterConfigsResponse) @@ -2217,7 +2463,7 @@ func (cl *alterReplicaLogDirsSharder) shard(ctx context.Context, kreq kmsg.Reque return issues, true, nil // this is reshardable } -func (cl *alterReplicaLogDirsSharder) onResp(kmsg.Response) {} +func (cl *alterReplicaLogDirsSharder) onResp(kmsg.Request, kmsg.Response) error { return nil } // topic / partitions: not retried // merge does not make sense for this function, but we provide a one anyway. func (cl *alterReplicaLogDirsSharder) merge(sresps []ResponseShard) (kmsg.Response, error) { @@ -2321,7 +2567,7 @@ func (cl *describeLogDirsSharder) shard(ctx context.Context, kreq kmsg.Request) })...), true, nil // this is reshardable } -func (cl *describeLogDirsSharder) onResp(kmsg.Response) {} +func (cl *describeLogDirsSharder) onResp(kmsg.Request, kmsg.Response) error { return nil } // topic / configs: not retried // merge does not make sense for this function, but we provide one anyway. // We lose the error code for directories. @@ -2366,21 +2612,39 @@ type deleteGroupsSharder struct{ *Client } func (cl *deleteGroupsSharder) shard(ctx context.Context, kreq kmsg.Request) ([]issueShard, bool, error) { req := kreq.(*kmsg.DeleteGroupsRequest) - coordinators, err := cl.loadCoordinators(true, coordinatorTypeGroup, req.Groups...) - if err != nil { - return nil, false, err + coordinators := cl.loadCoordinators(coordinatorTypeGroup, req.Groups...) + type unkerr struct { + err error + group string } + var ( + brokerReqs = make(map[int32]*kmsg.DeleteGroupsRequest) + kerrs = make(map[*kerr.Error][]string) + unkerrs []unkerr + ) - brokerReqs := make(map[int32]*kmsg.DeleteGroupsRequest) + newReq := func(groups ...string) *kmsg.DeleteGroupsRequest { + return &kmsg.DeleteGroupsRequest{ + Groups: groups, + } + } for _, group := range req.Groups { - broker := coordinators[group] - brokerReq := brokerReqs[broker.meta.NodeID] - if brokerReq == nil { - brokerReq = new(kmsg.DeleteGroupsRequest) - brokerReqs[broker.meta.NodeID] = brokerReq + berr := coordinators[group] + var ke *kerr.Error + switch { + case berr.err == nil: + brokerReq := brokerReqs[berr.b.meta.NodeID] + if brokerReq == nil { + brokerReq = newReq() + brokerReqs[berr.b.meta.NodeID] = brokerReq + } + brokerReq.Groups = append(brokerReq.Groups, group) + case errors.As(berr.err, &ke): + kerrs[ke] = append(kerrs[ke], group) + default: + unkerrs = append(unkerrs, unkerr{berr.err, group}) } - brokerReq.Groups = append(brokerReq.Groups, group) } var issues []issueShard @@ -2390,16 +2654,32 @@ func (cl *deleteGroupsSharder) shard(ctx context.Context, kreq kmsg.Request) ([] broker: id, }) } - return issues, true, nil // this is reshardable + for _, unkerr := range unkerrs { + issues = append(issues, issueShard{ + req: newReq(unkerr.group), + err: unkerr.err, + }) + } + for kerr, groups := range kerrs { + issues = append(issues, issueShard{ + req: newReq(groups...), + err: kerr, + }) + } + + return issues, true, nil // reshardable to load correct coordinators } -func (cl *deleteGroupsSharder) onResp(kresp kmsg.Response) { +func (cl *deleteGroupsSharder) onResp(_ kmsg.Request, kresp kmsg.Response) error { resp := kresp.(*kmsg.DeleteGroupsResponse) + var retErr error for i := range resp.Groups { group := &resp.Groups[i] err := kerr.ErrorForCode(group.ErrorCode) cl.maybeDeleteStaleCoordinator(group.Group, coordinatorTypeGroup, err) + onRespShardErr(&retErr, err) } + return retErr } func (cl *deleteGroupsSharder) merge(sresps []ResponseShard) (kmsg.Response, error) { @@ -2465,7 +2745,7 @@ func (cl *incrementalAlterConfigsSharder) shard(ctx context.Context, kreq kmsg.R return issues, false, nil // this is not reshardable, but the any block can go anywhere } -func (cl *incrementalAlterConfigsSharder) onResp(kmsg.Response) {} +func (cl *incrementalAlterConfigsSharder) onResp(kmsg.Request, kmsg.Response) error { return nil } // config: nothing retriable func (cl *incrementalAlterConfigsSharder) merge(sresps []ResponseShard) (kmsg.Response, error) { merged := new(kmsg.IncrementalAlterConfigsResponse) @@ -2546,7 +2826,7 @@ func (cl *describeProducersSharder) shard(ctx context.Context, kreq kmsg.Request })...), true, nil // this is reshardable } -func (cl *describeProducersSharder) onResp(kmsg.Response) {} +func (cl *describeProducersSharder) onResp(kmsg.Request, kmsg.Response) error { return nil } // topic / partitions: not retriable func (cl *describeProducersSharder) merge(sresps []ResponseShard) (kmsg.Response, error) { merged := new(kmsg.DescribeProducersResponse) @@ -2576,21 +2856,39 @@ type describeTransactionsSharder struct{ *Client } func (cl *describeTransactionsSharder) shard(ctx context.Context, kreq kmsg.Request) ([]issueShard, bool, error) { req := kreq.(*kmsg.DescribeTransactionsRequest) - coordinators, err := cl.loadCoordinators(true, coordinatorTypeTxn, req.TransactionalIDs...) - if err != nil { - return nil, false, err + coordinators := cl.loadCoordinators(coordinatorTypeTxn, req.TransactionalIDs...) + type unkerr struct { + err error + txnID string } + var ( + brokerReqs = make(map[int32]*kmsg.DescribeTransactionsRequest) + kerrs = make(map[*kerr.Error][]string) + unkerrs []unkerr + ) - brokerReqs := make(map[int32]*kmsg.DescribeTransactionsRequest) + newReq := func(txnIDs ...string) *kmsg.DescribeTransactionsRequest { + return &kmsg.DescribeTransactionsRequest{ + TransactionalIDs: txnIDs, + } + } for _, txnID := range req.TransactionalIDs { - broker := coordinators[txnID] - brokerReq := brokerReqs[broker.meta.NodeID] - if brokerReq == nil { - brokerReq = &kmsg.DescribeTransactionsRequest{} - brokerReqs[broker.meta.NodeID] = brokerReq + berr := coordinators[txnID] + var ke *kerr.Error + switch { + case berr.err == nil: + brokerReq := brokerReqs[berr.b.meta.NodeID] + if brokerReq == nil { + brokerReq = newReq() + brokerReqs[berr.b.meta.NodeID] = brokerReq + } + brokerReq.TransactionalIDs = append(brokerReq.TransactionalIDs, txnID) + case errors.As(berr.err, &ke): + kerrs[ke] = append(kerrs[ke], txnID) + default: + unkerrs = append(unkerrs, unkerr{berr.err, txnID}) } - brokerReq.TransactionalIDs = append(brokerReq.TransactionalIDs, txnID) } var issues []issueShard @@ -2600,16 +2898,32 @@ func (cl *describeTransactionsSharder) shard(ctx context.Context, kreq kmsg.Requ broker: id, }) } - return issues, true, nil // this is reshardable + for _, unkerr := range unkerrs { + issues = append(issues, issueShard{ + req: newReq(unkerr.txnID), + err: unkerr.err, + }) + } + for kerr, txnIDs := range kerrs { + issues = append(issues, issueShard{ + req: newReq(txnIDs...), + err: kerr, + }) + } + + return issues, true, nil // reshardable to load correct coordinators } -func (cl *describeTransactionsSharder) onResp(kresp kmsg.Response) { // cleanup any stale coordinators +func (cl *describeTransactionsSharder) onResp(_ kmsg.Request, kresp kmsg.Response) error { // cleanup any stale coordinators resp := kresp.(*kmsg.DescribeTransactionsResponse) + var retErr error for i := range resp.TransactionStates { txnState := &resp.TransactionStates[i] err := kerr.ErrorForCode(txnState.ErrorCode) cl.maybeDeleteStaleCoordinator(txnState.TransactionalID, coordinatorTypeTxn, err) + onRespShardErr(&retErr, err) } + return retErr } func (cl *describeTransactionsSharder) merge(sresps []ResponseShard) (kmsg.Response, error) { @@ -2634,7 +2948,10 @@ func (cl *listTransactionsSharder) shard(ctx context.Context, kreq kmsg.Request) }) } -func (cl *listTransactionsSharder) onResp(kresp kmsg.Response) {} // nothing to do +func (cl *listTransactionsSharder) onResp(_ kmsg.Request, kresp kmsg.Response) error { + resp := kresp.(*kmsg.ListTransactionsResponse) + return kerr.ErrorForCode(resp.ErrorCode) +} func (cl *listTransactionsSharder) merge(sresps []ResponseShard) (kmsg.Response, error) { merged := new(kmsg.ListTransactionsResponse) diff --git a/pkg/kgo/consumer_group.go b/pkg/kgo/consumer_group.go index 7e77d0dd..3b0695de 100644 --- a/pkg/kgo/consumer_group.go +++ b/pkg/kgo/consumer_group.go @@ -1041,6 +1041,9 @@ func (g *groupConsumer) joinGroupProtocols() []kmsg.JoinGroupRequestProtocol { // fetchOffsets is issued once we join a group to see what the prior commits // were for the partitions we were assigned. func (g *groupConsumer) fetchOffsets(ctx context.Context, newAssigned map[string][]int32) error { + // Our client maps the v0 to v7 format to v8+ when sharding this + // request, if we are only requesting one group, as well as maps the + // response back, so we do not need to worry about v8+ here. start: req := kmsg.OffsetFetchRequest{ Group: g.cfg.group,