From 9ac6c9707c04fa424ccd3489f2656399cf5ab04d Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Mon, 7 Nov 2022 23:34:14 -0700 Subject: [PATCH] kgo: support forward & backward batch requests for FindCoordinator, OffsetFetch Previously, the client was only forward compatible. Opting into batching was at your own risk. We now default to batching and split to the old single behavior as needed. To support this, we have a new internal request version pinner. We always pin to batched version, and if we get errBrokerTooOld, we split. loadCoordinators is a good bit more complex now, and we basically delete loadCoordinator. This is a bit complicated to describe in a single commit message. --- pkg/kgo/broker.go | 37 ++- pkg/kgo/client.go | 571 ++++++++++++++++++++++++++++++---------------- 2 files changed, 406 insertions(+), 202 deletions(-) diff --git a/pkg/kgo/broker.go b/pkg/kgo/broker.go index 74895945..ef0dce8d 100644 --- a/pkg/kgo/broker.go +++ b/pkg/kgo/broker.go @@ -22,6 +22,24 @@ import ( "github.com/twmb/franz-go/pkg/sasl" ) +type pinReq struct { + kmsg.Request + min int16 + max int16 + pinMin bool + pinMax bool +} + +func (p *pinReq) SetVersion(v int16) { + if p.pinMin && v < p.min { + v = p.min + } + if p.pinMax && v > p.max { + v = p.max + } + p.Request.SetVersion(v) +} + type promisedReq struct { ctx context.Context req kmsg.Request @@ -303,18 +321,33 @@ func (b *broker) handleReq(pr promisedReq) { version = brokerMax } + minVersion := int16(-1) + // If the version now (after potential broker downgrading) is // lower than we desire, we fail the request for the broker is // too old. if b.cl.cfg.minVersions != nil { - minVersion, minVersionExists := b.cl.cfg.minVersions.LookupMaxKeyVersion(req.Key()) - if minVersionExists && version < minVersion { + minVersion, _ = b.cl.cfg.minVersions.LookupMaxKeyVersion(req.Key()) + if minVersion > -1 && version < minVersion { pr.promise(nil, errBrokerTooOld) return } } req.SetVersion(version) // always go for highest version + setVersion := req.GetVersion() + if minVersion > -1 && setVersion < minVersion { + pr.promise(nil, fmt.Errorf("request key %d version returned %d below the user defined min of %d", req.Key(), setVersion, minVersion)) + return + } + if version < setVersion { + // If we want to set an old version, but the request is pinned + // high, we need to fail with errBrokerTooOld. The broker wants + // an old version, we want a high version. We rely on this + // error in backcompat request sharding. + pr.promise(nil, errBrokerTooOld) + return + } for reauthentications := 1; !cxn.expiry.IsZero() && time.Now().After(cxn.expiry); reauthentications++ { // We allow 15 reauths, which is a lot. If a new lifetime is diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index 90159a3b..25858a89 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -478,8 +478,7 @@ func (cl *Client) waitTries(ctx context.Context, backoff time.Duration) bool { // case, such as when a person explicitly assigns offsets with epochs, but we // catch a few areas that would be returned from a broker itself. // -// This function is always used *after* at least one request has been issued, -// so we do not check ensurePinged. +// This function is always used *after* at least one request has been issued. // // NOTE: This is a weak check; we check if any broker in the cluster supports // the request. We use this function in three locations: @@ -503,7 +502,6 @@ func (cl *Client) supportsOffsetForLeaderEpoch() bool { // A broker may not support some requests we want to make. This function checks // support. This should only be used *after* at least one successful response. -// To absolutely ensure a response has been received, use ensurePinged. func (cl *Client) supportsKeyVersion(key, version int16) bool { cl.brokersMu.RLock() defer cl.brokersMu.RUnlock() @@ -742,6 +740,7 @@ func (cl *Client) Close() { // // ListOffsets // OffsetFetch (if using v8+ for Kafka 3.0+) +// FindCoordinator (if using v4+ for Kafka 3.0+) // DescribeGroups // ListGroups // DeleteRecords @@ -757,10 +756,9 @@ func (cl *Client) Close() { // ListTransactions // // Kafka 3.0 introduced batch OffsetFetch and batch FindCoordinator requests. -// This function is forward-compatible for the old, singular OffsetFetch and -// FindCoordinator requests, but is not backward-compatible for batched -// requests. It is recommended to only use the old format unless you know you -// are speaking to Kafka 3.0+. +// This function is forward and backward compatible: old requests will be +// batched as necessary, and batched requests will be split as necessary. It is +// recommended to always use batch requests for simplicity. // // In short, this method tries to do the correct thing depending on what type // of request is being issued. @@ -963,6 +961,7 @@ func (cl *Client) shardedRequest(ctx context.Context, req kmsg.Request) ([]Respo switch t := req.(type) { case *kmsg.ListOffsetsRequest, // key 2 *kmsg.OffsetFetchRequest, // key 9 + *kmsg.FindCoordinatorRequest, // key 10 *kmsg.DescribeGroupsRequest, // key 15 *kmsg.ListGroupsRequest, // key 16 *kmsg.DeleteRecordsRequest, // key 21 @@ -978,24 +977,19 @@ func (cl *Client) shardedRequest(ctx context.Context, req kmsg.Request) ([]Respo *kmsg.ListTransactionsRequest: // key 66 return cl.handleShardedReq(ctx, req) - // We support being forward-compatible with FindCoordinator, so we need - // to use our special hijack function that batches a singular key. - case *kmsg.FindCoordinatorRequest: - last, resp, err := cl.findCoordinator(ctx, t) - return shards(shard(last, req, resp, err)), nil - } - - switch t := req.(type) { case *kmsg.MetadataRequest: // We hijack any metadata request so as to populate our // own brokers and controller ID. br, resp, err := cl.fetchMetadata(ctx, t, false) return shards(shard(br, req, resp, err)), nil + case kmsg.AdminRequest: return shards(cl.handleAdminReq(ctx, t)), nil + case kmsg.GroupCoordinatorRequest, kmsg.TxnCoordinatorRequest: return shards(cl.handleCoordinatorReq(ctx, t)), nil + case *kmsg.ApiVersionsRequest: // As of v3, software name and version are required. // If they are missing, we use the config options. @@ -1121,108 +1115,172 @@ type coordinatorKey struct { } type coordinatorLoad struct { - done chan struct{} - node int32 - err error + loadWait chan struct{} + node int32 + err error +} + +func (cl *Client) loadCoordinator(ctx context.Context, typ int8, key string) (*broker, error) { + berr := cl.loadCoordinators(ctx, typ, key)[key] + return berr.b, berr.err } -// findCoordinator is allows FindCoordinator request to be forward compatible, -// by duplicating a top level request into a single-element batch request, and -// downconverting the response. -func (cl *Client) findCoordinator(ctx context.Context, req *kmsg.FindCoordinatorRequest) (*broker, *kmsg.FindCoordinatorResponse, error) { - var compat bool - if len(req.CoordinatorKeys) == 0 { - req.CoordinatorKeys = []string{req.CoordinatorKey} - compat = true +func (cl *Client) loadCoordinators(ctx context.Context, typ int8, keys ...string) map[string]brokerOrErr { + m := make(map[string]brokerOrErr, len(keys)) + if len(keys) == 0 { + return m } - r := cl.retriable() - resp, err := req.RequestWith(ctx, r) - if resp != nil { - if compat && resp.Version >= 4 { - if l := len(resp.Coordinators); l != 1 { - return r.last, resp, fmt.Errorf("unexpectedly received %d coordinators when requesting 1", l) - } - first := resp.Coordinators[0] - resp.ErrorCode = first.ErrorCode - resp.ErrorMessage = first.ErrorMessage - resp.NodeID = first.NodeID - resp.Host = first.Host - resp.Port = first.Port + toRequest := make(map[string]bool, len(keys)) // true == bypass the cache + for _, key := range keys { + if len(key) > 0 { + toRequest[key] = false } } - return r.last, resp, err -} -func (cl *Client) deleteStaleCoordinatorIfEqual(key coordinatorKey, current *coordinatorLoad) { - cl.coordinatorsMu.Lock() - defer cl.coordinatorsMu.Unlock() - if existing, ok := cl.coordinators[key]; ok && current == existing { - delete(cl.coordinators, key) - } -} + // For each of these keys, we have two cases: + // + // 1) The key is cached. It is either loading or loaded. We do not + // request the key ourselves; we wait for the load to finish. + // + // 2) The key is not cached, and we request it. + // + // If a key is cached but the coordinator no longer exists for us, we + // re-request to refresh the coordinator by setting toRequest[key] to + // true (bypass cache). + // + // If we ever request a key ourselves, we do not request it again. We + // ensure this by deleting from toRequest. We also delete if the key + // was cached with no error. + // + // We could have some keys cached and some that need to be requested. + // We issue a request but do not request what is cached. + // + // Lastly, we only ever trigger one metadata update, which happens if + // we have an unknown coordinator after we load coordinators. + var hasLoadedBrokers bool + for len(toRequest) > 0 { + var loadWait chan struct{} + load2key := make(map[*coordinatorLoad][]string) -// loadController returns the group/txn coordinator for the given key, retrying -// as necessary. Any non-retriable error does not cache the coordinator. -func (cl *Client) loadCoordinator(ctx context.Context, key coordinatorKey) (*broker, error) { - var restarted bool -start: - cl.coordinatorsMu.Lock() - c, ok := cl.coordinators[key] + cl.coordinatorsMu.Lock() + for key, bypassCache := range toRequest { + c, ok := cl.coordinators[coordinatorKey{key, typ}] + if !ok || bypassCache { + if loadWait == nil { + loadWait = make(chan struct{}) + } + c = &coordinatorLoad{ + loadWait: loadWait, + err: errors.New("coordinator was not returned in broker response"), + } + cl.coordinators[coordinatorKey{key, typ}] = c + } + load2key[c] = append(load2key[c], key) + } + cl.coordinatorsMu.Unlock() - if !ok { - c = &coordinatorLoad{ - done: make(chan struct{}), // all requests for the same coordinator get collapsed into one + if loadWait == nil { // all coordinators were cached + hasLoadedBrokers = cl.waitCoordinatorLoad(ctx, typ, load2key, !hasLoadedBrokers, toRequest, m) + continue } - defer func() { - // If our load fails, we avoid caching the coordinator, - // but only if something else has not already replaced - // our pointer. - if c.err != nil { - cl.deleteStaleCoordinatorIfEqual(key, c) + + key2load := make(map[string]*coordinatorLoad) + req := kmsg.NewPtrFindCoordinatorRequest() + req.CoordinatorType = typ + for c, keys := range load2key { + if c.loadWait == loadWait { // if this is our wait, this is ours to request + req.CoordinatorKeys = append(req.CoordinatorKeys, keys...) + for _, key := range keys { + key2load[key] = c + delete(toRequest, key) + } } - close(c.done) - }() - cl.coordinators[key] = c - } - cl.coordinatorsMu.Unlock() + } - if ok { - <-c.done - if c.err != nil { - return nil, c.err + shards := cl.RequestSharded(ctx, req) + + for _, shard := range shards { + if shard.Err != nil { + req := shard.Req.(*kmsg.FindCoordinatorRequest) + for _, key := range req.CoordinatorKeys { + c, ok := key2load[key] + if ok { + c.err = shard.Err + } + } + } else { + resp, _ := shard.Resp.(*kmsg.FindCoordinatorResponse) + for _, rc := range resp.Coordinators { + c, ok := key2load[rc.Key] + if ok { + c.err = kerr.ErrorForCode(rc.ErrorCode) + c.node = resp.NodeID + } + } + } } - // If brokerOrErr returns an error, then our cached coordinator - // is using metadata that has updated and removed knowledge of - // that coordinator. We delete the stale coordinator here and - // retry once. The retry will force a coordinator reload, and - // everything will be fresh. Any errors after that we keep. - b, err := cl.brokerOrErr(nil, c.node, &errUnknownCoordinator{c.node, key}) - if err != nil && !restarted { - restarted = true - cl.deleteStaleCoordinatorIfEqual(key, c) - goto start + // For anything we loaded, if it has a load failure (including + // not being replied to), we remove the key from the cache. We + // do not want to cache erroring values. + // + // We range key2load, which contains only coordinators we are + // responsible for loading. + cl.coordinatorsMu.Lock() + for key, c := range key2load { + if c.err != nil { + ck := coordinatorKey{key, typ} + if loading, ok := cl.coordinators[ck]; ok && loading == c { + delete(cl.coordinators, ck) + } + } } - return b, err - } + cl.coordinatorsMu.Unlock() - var resp *kmsg.FindCoordinatorResponse - req := kmsg.NewPtrFindCoordinatorRequest() - req.CoordinatorKey = key.name - req.CoordinatorType = key.typ - _, resp, c.err = cl.findCoordinator(ctx, req) - if c.err != nil { - return nil, c.err - } - if c.err = kerr.ErrorForCode(resp.ErrorCode); c.err != nil { - return nil, c.err + close(loadWait) + hasLoadedBrokers = cl.waitCoordinatorLoad(ctx, typ, load2key, !hasLoadedBrokers, toRequest, m) } - c.node = resp.NodeID + return m +} - var b *broker - b, c.err = cl.brokerOrErr(ctx, c.node, &errUnknownCoordinator{c.node, key}) - return b, c.err +// After some prep work, we wait for coordinators to load. We update toRequest +// values with true if the caller should bypass cache and re-load these +// coordinators. +// +// This returns if we load brokers, and populates m with results. +func (cl *Client) waitCoordinatorLoad(ctx context.Context, typ int8, load2key map[*coordinatorLoad][]string, shouldLoadBrokers bool, toRequest map[string]bool, m map[string]brokerOrErr) bool { + var loadedBrokers bool + for c, keys := range load2key { + <-c.loadWait + for _, key := range keys { + if c.err != nil { + delete(toRequest, key) + m[key] = brokerOrErr{nil, c.err} + continue + } + + var brokerCtx context.Context + if shouldLoadBrokers && !loadedBrokers { + brokerCtx = ctx + loadedBrokers = true + } + + b, err := cl.brokerOrErr(brokerCtx, c.node, &errUnknownCoordinator{c.node, coordinatorKey{key, typ}}) + if err != nil { + if _, exists := toRequest[key]; exists { + toRequest[key] = true + continue + } + // If the key does not exist, we just loaded this + // coordinator and also the brokers. We do not + // re-request. + } + delete(toRequest, key) + m[key] = brokerOrErr{b, err} + } + } + return loadedBrokers } func (cl *Client) maybeDeleteStaleCoordinator(name string, typ int8, err error) bool { @@ -1247,37 +1305,6 @@ type brokerOrErr struct { err error } -// loadCoordinators does a concurrent load of many coordinators. -func (cl *Client) loadCoordinators(typ int8, names ...string) map[string]brokerOrErr { - uniq := make(map[string]struct{}) - for _, name := range names { - uniq[name] = struct{}{} - } - - var mu sync.Mutex - m := make(map[string]brokerOrErr) - - var wg sync.WaitGroup - for uniqName := range uniq { - myName := uniqName - wg.Add(1) - go func() { - defer wg.Done() - coordinator, err := cl.loadCoordinator(cl.ctx, coordinatorKey{ - name: myName, - typ: typ, - }) - - mu.Lock() - defer mu.Unlock() - m[myName] = brokerOrErr{coordinator, err} - }() - } - wg.Wait() - - return m -} - func (cl *Client) handleAdminReq(ctx context.Context, req kmsg.Request) ResponseShard { // Loading a controller can perform some wait; we accept that and do // not account for the retries or the time to load the controller as @@ -1411,10 +1438,7 @@ func (cl *Client) handleCoordinatorReq(ctx context.Context, req kmsg.Request) Re // coordinator is deleted. func (cl *Client) handleCoordinatorReqSimple(ctx context.Context, typ int8, name string, req kmsg.Request) ResponseShard { coordinator, resp, err := cl.handleReqWithCoordinator(ctx, func() (*broker, error) { - return cl.loadCoordinator(ctx, coordinatorKey{ - name: name, - typ: typ, - }) + return cl.loadCoordinator(ctx, typ, name) }, typ, name, req) return shard(coordinator, req, resp, err) } @@ -1597,6 +1621,14 @@ type issueShard struct { // sharder splits a request. type sharder interface { + // If a request originally was not batched, then the protocol switched + // to being batched, we always try batched first then fallback. + // + // Requests that make this switch should always return pinReq requests, + // and we must unpack the pinReq to return to end users / use + // internally. + unpackPinReq() bool + // shard splits a request and returns the requests to issue tied to the // brokers to issue the requests to. This can return an error if there // is some pre-loading that needs to happen. If an error is returned, @@ -1605,7 +1637,10 @@ type sharder interface { // Due to sharded requests not being retriable if a response is // received, to avoid stale coordinator errors, this function should // not use any previously cached metadata. - shard(context.Context, kmsg.Request) ([]issueShard, bool, error) + // + // This takes the last error if the request is being retried, which is + // currently only useful for errBrokerTooOld. + shard(context.Context, kmsg.Request, error) ([]issueShard, bool, error) // onResp is called on a successful response to investigate the // response and potentially perform cleanup, and potentially returns an @@ -1628,6 +1663,8 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res sharder = &listOffsetsSharder{cl} case *kmsg.OffsetFetchRequest: sharder = &offsetFetchSharder{cl} + case *kmsg.FindCoordinatorRequest: + sharder = &findCoordinatorSharder{cl} case *kmsg.DescribeGroupsRequest: sharder = &describeGroupsSharder{cl} case *kmsg.ListGroupsRequest: @@ -1660,8 +1697,9 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res // again). reqTry tracks how many total tries a request piece has had; // we quit at either the max configured tries or max configured time. type reqTry struct { - tries int - req kmsg.Request + tries int + req kmsg.Request + lastErr error } var ( @@ -1687,8 +1725,9 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res // issue is called to progressively split and issue requests. // // This recursively calls itself if a request fails and can be retried. + // We avoid stack problems because this calls itself in a goroutine. issue = func(try reqTry) { - issues, reshardable, err := sharder.shard(ctx, try.req) + issues, reshardable, err := sharder.shard(ctx, try.req, try.lastErr) if err != nil { l.Log(LogLevelDebug, "unable to shard request", "previous_tries", try.tries, "err", err) addShard(shard(nil, try.req, nil, err)) // failure to shard means data loading failed; this request is failed @@ -1722,9 +1761,13 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res for i := range issues { myIssue := issues[i] + myUnderlyingReq := myIssue.req + if sharder.unpackPinReq() { + myUnderlyingReq = myIssue.req.(*pinReq).Request + } if myIssue.err != nil { - addShard(shard(nil, myIssue.req, nil, myIssue.err)) + addShard(shard(nil, myUnderlyingReq, nil, myIssue.err)) continue } @@ -1741,20 +1784,22 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res broker, err = cl.brokerOrErr(ctx, myIssue.broker, errUnknownBroker) } if err != nil { - addShard(shard(nil, myIssue.req, nil, err)) // failure to load a broker is a failure to issue a request + addShard(shard(nil, myUnderlyingReq, nil, err)) // failure to load a broker is a failure to issue a request return } resp, err := broker.waitResp(ctx, myIssue.req) if err == nil { - err = sharder.onResp(myIssue.req, resp) // perform some potential cleanup, and potentially receive an error to retry + err = sharder.onResp(myUnderlyingReq, resp) // perform some potential cleanup, and potentially receive an error to retry } // If we failed to issue the request, we *maybe* will retry. // We could have failed to even issue the request or receive // a response, which is retriable. backoff := cl.cfg.retryBackoff(tries) - if err != nil && (retryTimeout == 0 || time.Now().Add(backoff).Sub(start) < retryTimeout) && cl.shouldRetry(tries, err) && cl.waitTries(ctx, backoff) { + if err != nil && + (retryTimeout == 0 || time.Now().Add(backoff).Sub(start) < retryTimeout) && + (reshardable && sharder.unpackPinReq() && errors.Is(err, errBrokerTooOld) || cl.shouldRetry(tries, err) && cl.waitTries(ctx, backoff)) { // Non-reshardable re-requests just jump back to the // top where the broker is loaded. This is the case on // requests where the original request is split to @@ -1764,16 +1809,16 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res goto start } l.Log(LogLevelDebug, "sharded request failed, resharding and reissuing", "time_since_start", time.Since(start), "tries", try.tries, "err", err) - issue(reqTry{tries, myIssue.req}) + issue(reqTry{tries, myUnderlyingReq, err}) return } - addShard(shard(broker, myIssue.req, resp, err)) // the error was not retriable + addShard(shard(broker, myUnderlyingReq, resp, err)) // the error was not retriable }() } } - issue(reqTry{0, req}) + issue(reqTry{0, req, nil}) wg.Wait() return shards, sharder.merge @@ -2031,7 +2076,9 @@ func (l *unknownErrShards) collect(mkreq, mergeParts interface{}) []issueShard { // handles sharding ListOffsetsRequest type listOffsetsSharder struct{ *Client } -func (cl *listOffsetsSharder) shard(ctx context.Context, kreq kmsg.Request) ([]issueShard, bool, error) { +func (*listOffsetsSharder) unpackPinReq() bool { return false } + +func (cl *listOffsetsSharder) shard(ctx context.Context, kreq kmsg.Request, _ error) ([]issueShard, bool, error) { req := kreq.(*kmsg.ListOffsetsRequest) // For listing offsets, we need the broker leader for each partition we @@ -2157,6 +2204,8 @@ func (*listOffsetsSharder) merge(sresps []ResponseShard) (kmsg.Response, error) // handles sharding OffsetFetchRequest type offsetFetchSharder struct{ *Client } +func (*offsetFetchSharder) unpackPinReq() bool { return true } // batch first, single group fallback + func offsetFetchReqToGroup(req *kmsg.OffsetFetchRequest) kmsg.OffsetFetchRequestGroup { g := kmsg.NewOffsetFetchRequestGroup() g.Group = req.Group @@ -2169,6 +2218,19 @@ func offsetFetchReqToGroup(req *kmsg.OffsetFetchRequest) kmsg.OffsetFetchRequest return g } +func offsetFetchGroupToReq(requireStable bool, group kmsg.OffsetFetchRequestGroup) *kmsg.OffsetFetchRequest { + req := kmsg.NewPtrOffsetFetchRequest() + req.RequireStable = requireStable + req.Group = group.Group + for _, topic := range group.Topics { + reqTopic := kmsg.NewOffsetFetchRequestTopic() + reqTopic.Topic = topic.Topic + reqTopic.Partitions = topic.Partitions + req.Topics = append(req.Topics, reqTopic) + } + return req +} + func offsetFetchRespToGroup(req *kmsg.OffsetFetchRequest, resp *kmsg.OffsetFetchResponse) kmsg.OffsetFetchResponseGroup { g := kmsg.NewOffsetFetchResponseGroup() g.Group = req.Group @@ -2209,46 +2271,26 @@ func offsetFetchRespGroupIntoResp(g kmsg.OffsetFetchResponseGroup, into *kmsg.Of } } -func (cl *offsetFetchSharder) shard(_ context.Context, kreq kmsg.Request) ([]issueShard, bool, error) { +func (cl *offsetFetchSharder) shard(ctx context.Context, kreq kmsg.Request, lastErr error) ([]issueShard, bool, error) { req := kreq.(*kmsg.OffsetFetchRequest) - groups := make([]string, 0, len(req.Groups)+1) - if len(req.Groups) == 0 { // v0-v7 - groups = append(groups, req.Group) - } - for i := range req.Groups { // v8+ - groups = append(groups, req.Groups[i].Group) - } - - coordinators := cl.loadCoordinators(coordinatorTypeGroup, groups...) + // We always try batching and only split at the end if lastErr + // indicates too old. We convert to batching immediately. + dup := *req + req = &dup - // If there is only the top level group, then we simply return our - // request mapped to its specific broker. For forward compatibility, we - // also embed the top level request into the Groups list: this allows - // operators of old request versions (v0-v7) to issue a v8 request - // appropriately. On response, if the length of groups is 1, we merge - // the first item back to the top level. if len(req.Groups) == 0 { - berr := coordinators[req.Group] - if berr.err != nil { - return []issueShard{{ //nolint:nilerr // error is returned in the struct - req: req, - err: berr.err, - }}, false, nil // not reshardable, because this is an error + req.Groups = append(req.Groups, offsetFetchReqToGroup(req)) + } + groups := make([]string, 0, len(req.Groups)) + for i := range req.Groups { + if g := req.Groups[i].Group; len(g) > 0 { + groups = append(groups, req.Groups[i].Group) } - - dup := *req - brokerReq := &dup - brokerReq.Groups = append(brokerReq.Groups, offsetFetchReqToGroup(req)) - - return []issueShard{{ - req: brokerReq, - broker: berr.b.meta.NodeID, - }}, false, nil // reshardable to reload correct coordinator } - // v8+ behavior: we have multiple groups. - // + coordinators := cl.loadCoordinators(ctx, coordinatorTypeGroup, groups...) + // Loading coordinators can have each group fail with its unique error, // or with a kerr.Error that can be merged. Unique errors get their own // failure shard, while kerr.Error's get merged. @@ -2287,12 +2329,24 @@ func (cl *offsetFetchSharder) shard(_ context.Context, kreq kmsg.Request) ([]iss } } + splitReq := errors.Is(lastErr, errBrokerTooOld) + var issues []issueShard for id, req := range brokerReqs { - issues = append(issues, issueShard{ - req: req, - broker: id, - }) + if splitReq { + for _, group := range req.Groups { + req := offsetFetchGroupToReq(req.RequireStable, group) + issues = append(issues, issueShard{ + req: &pinReq{Request: req, pinMax: true, max: 7}, + broker: id, + }) + } + } else { + issues = append(issues, issueShard{ + req: &pinReq{Request: req, pinMin: true, min: 8}, + broker: id, + }) + } } for _, unkerr := range unkerrs { issues = append(issues, issueShard{ @@ -2311,7 +2365,7 @@ func (cl *offsetFetchSharder) shard(_ context.Context, kreq kmsg.Request) ([]iss } func (cl *offsetFetchSharder) onResp(kreq kmsg.Request, kresp kmsg.Response) error { - req := kreq.(*kmsg.OffsetFetchRequest) + req := kreq.(*kmsg.OffsetFetchRequest) // we always issue pinned requests resp := kresp.(*kmsg.OffsetFetchResponse) switch len(resp.Groups) { @@ -2365,13 +2419,106 @@ func (*offsetFetchSharder) merge(sresps []ResponseShard) (kmsg.Response, error) }) } +// handles sharding FindCoordinatorRequest +type findCoordinatorSharder struct{ *Client } + +func (*findCoordinatorSharder) unpackPinReq() bool { return true } // batch first, single key fallback + +func findCoordinatorRespCoordinatorIntoResp(c kmsg.FindCoordinatorResponseCoordinator, into *kmsg.FindCoordinatorResponse) { + into.NodeID = c.NodeID + into.Host = c.Host + into.Port = c.Port + into.ErrorCode = c.ErrorCode + into.ErrorMessage = c.ErrorMessage +} + +func (*findCoordinatorSharder) shard(_ context.Context, kreq kmsg.Request, lastErr error) ([]issueShard, bool, error) { + req := kreq.(*kmsg.FindCoordinatorRequest) + + // We always try batching and only split at the end if lastErr + // indicates too old. We convert to batching immediately. + dup := *req + req = &dup + + uniq := make(map[string]struct{}, len(req.CoordinatorKeys)) + uniq[req.CoordinatorKey] = struct{}{} + for _, key := range req.CoordinatorKeys { + uniq[key] = struct{}{} + } + req.CoordinatorKeys = req.CoordinatorKeys[:0] + for key := range uniq { + if len(key) > 0 { + req.CoordinatorKeys = append(req.CoordinatorKeys, key) + } + } + + splitReq := errors.Is(lastErr, errBrokerTooOld) + if !splitReq { + return []issueShard{{ + req: &pinReq{Request: req, pinMin: true, min: 4}, + any: true, + }}, true, nil // this is "reshardable", in that we will split the request next + } + + var issues []issueShard + for _, key := range req.CoordinatorKeys { + sreq := kmsg.NewPtrFindCoordinatorRequest() + sreq.CoordinatorType = req.CoordinatorType + sreq.CoordinatorKey = key + issues = append(issues, issueShard{ + req: &pinReq{Request: sreq, pinMax: true, max: 3}, + any: true, + }) + } + return issues, false, nil // not reshardable +} + +func (*findCoordinatorSharder) onResp(kreq kmsg.Request, kresp kmsg.Response) error { + req := kreq.(*kmsg.FindCoordinatorRequest) // we always issue pinned requests + resp := kresp.(*kmsg.FindCoordinatorResponse) + + switch len(resp.Coordinators) { + case 0: + // Convert v3 and prior to v4+ + rc := kmsg.NewFindCoordinatorResponseCoordinator() + rc.Key = req.CoordinatorKey + rc.NodeID = resp.NodeID + rc.Host = resp.Host + rc.Port = resp.Port + rc.ErrorCode = resp.ErrorCode + rc.ErrorMessage = resp.ErrorMessage + resp.Coordinators = append(resp.Coordinators, rc) + case 1: + // Convert v4 to v3 and prior + findCoordinatorRespCoordinatorIntoResp(resp.Coordinators[0], resp) + } + + return nil +} + +func (*findCoordinatorSharder) merge(sresps []ResponseShard) (kmsg.Response, error) { + merged := kmsg.NewPtrFindCoordinatorResponse() + return merged, firstErrMerger(sresps, func(kresp kmsg.Response) { + resp := kresp.(*kmsg.FindCoordinatorResponse) + merged.Version = resp.Version + merged.ThrottleMillis = resp.ThrottleMillis + merged.Coordinators = append(merged.Coordinators, resp.Coordinators...) + + if len(resp.Coordinators) == 1 { + findCoordinatorRespCoordinatorIntoResp(resp.Coordinators[0], merged) + } + }) +} + // handles sharding DescribeGroupsRequest type describeGroupsSharder struct{ *Client } -func (cl *describeGroupsSharder) shard(_ context.Context, kreq kmsg.Request) ([]issueShard, bool, error) { +func (*describeGroupsSharder) unpackPinReq() bool { return false } + +func (cl *describeGroupsSharder) shard(ctx context.Context, kreq kmsg.Request, _ error) ([]issueShard, bool, error) { req := kreq.(*kmsg.DescribeGroupsRequest) - coordinators := cl.loadCoordinators(coordinatorTypeGroup, req.Groups...) + coordinators := cl.loadCoordinators(ctx, coordinatorTypeGroup, req.Groups...) type unkerr struct { err error group string @@ -2455,7 +2602,9 @@ func (*describeGroupsSharder) merge(sresps []ResponseShard) (kmsg.Response, erro // handles sharding ListGroupsRequest type listGroupsSharder struct{ *Client } -func (cl *listGroupsSharder) shard(ctx context.Context, kreq kmsg.Request) ([]issueShard, bool, error) { +func (*listGroupsSharder) unpackPinReq() bool { return false } + +func (cl *listGroupsSharder) shard(ctx context.Context, kreq kmsg.Request, _ error) ([]issueShard, bool, error) { req := kreq.(*kmsg.ListGroupsRequest) return cl.allBrokersShardedReq(ctx, func() kmsg.Request { dup := *req @@ -2484,7 +2633,9 @@ func (*listGroupsSharder) merge(sresps []ResponseShard) (kmsg.Response, error) { // handle sharding DeleteRecordsRequest type deleteRecordsSharder struct{ *Client } -func (cl *deleteRecordsSharder) shard(ctx context.Context, kreq kmsg.Request) ([]issueShard, bool, error) { +func (*deleteRecordsSharder) unpackPinReq() bool { return false } + +func (cl *deleteRecordsSharder) shard(ctx context.Context, kreq kmsg.Request, _ error) ([]issueShard, bool, error) { req := kreq.(*kmsg.DeleteRecordsRequest) var need []string @@ -2601,7 +2752,9 @@ func (*deleteRecordsSharder) merge(sresps []ResponseShard) (kmsg.Response, error // handle sharding OffsetForLeaderEpochRequest type offsetForLeaderEpochSharder struct{ *Client } -func (cl *offsetForLeaderEpochSharder) shard(ctx context.Context, kreq kmsg.Request) ([]issueShard, bool, error) { +func (*offsetForLeaderEpochSharder) unpackPinReq() bool { return false } + +func (cl *offsetForLeaderEpochSharder) shard(ctx context.Context, kreq kmsg.Request, _ error) ([]issueShard, bool, error) { req := kreq.(*kmsg.OffsetForLeaderEpochRequest) var need []string @@ -2718,7 +2871,9 @@ func (*offsetForLeaderEpochSharder) merge(sresps []ResponseShard) (kmsg.Response // handle sharding DescribeConfigsRequest type describeConfigsSharder struct{ *Client } -func (*describeConfigsSharder) shard(_ context.Context, kreq kmsg.Request) ([]issueShard, bool, error) { +func (*describeConfigsSharder) unpackPinReq() bool { return false } + +func (*describeConfigsSharder) shard(_ context.Context, kreq kmsg.Request, _ error) ([]issueShard, bool, error) { req := kreq.(*kmsg.DescribeConfigsRequest) brokerReqs := make(map[int32][]kmsg.DescribeConfigsRequestResource) @@ -2783,7 +2938,9 @@ func (*describeConfigsSharder) merge(sresps []ResponseShard) (kmsg.Response, err // handle sharding AlterConfigsRequest type alterConfigsSharder struct{ *Client } -func (*alterConfigsSharder) shard(_ context.Context, kreq kmsg.Request) ([]issueShard, bool, error) { +func (*alterConfigsSharder) unpackPinReq() bool { return false } + +func (*alterConfigsSharder) shard(_ context.Context, kreq kmsg.Request, _ error) ([]issueShard, bool, error) { req := kreq.(*kmsg.AlterConfigsRequest) brokerReqs := make(map[int32][]kmsg.AlterConfigsRequestResource) @@ -2846,7 +3003,9 @@ func (*alterConfigsSharder) merge(sresps []ResponseShard) (kmsg.Response, error) // handles sharding AlterReplicaLogDirsRequest type alterReplicaLogDirsSharder struct{ *Client } -func (cl *alterReplicaLogDirsSharder) shard(ctx context.Context, kreq kmsg.Request) ([]issueShard, bool, error) { +func (*alterReplicaLogDirsSharder) unpackPinReq() bool { return false } + +func (cl *alterReplicaLogDirsSharder) shard(ctx context.Context, kreq kmsg.Request, _ error) ([]issueShard, bool, error) { req := kreq.(*kmsg.AlterReplicaLogDirsRequest) needMap := make(map[string]struct{}) @@ -2991,7 +3150,9 @@ func (*alterReplicaLogDirsSharder) merge(sresps []ResponseShard) (kmsg.Response, // handles sharding DescribeLogDirsRequest type describeLogDirsSharder struct{ *Client } -func (cl *describeLogDirsSharder) shard(ctx context.Context, kreq kmsg.Request) ([]issueShard, bool, error) { +func (*describeLogDirsSharder) unpackPinReq() bool { return false } + +func (cl *describeLogDirsSharder) shard(ctx context.Context, kreq kmsg.Request, _ error) ([]issueShard, bool, error) { req := kreq.(*kmsg.DescribeLogDirsRequest) // If req.Topics is nil, the request is to describe all logdirs. Thus, @@ -3106,10 +3267,12 @@ func (*describeLogDirsSharder) merge(sresps []ResponseShard) (kmsg.Response, err // handles sharding DeleteGroupsRequest type deleteGroupsSharder struct{ *Client } -func (cl *deleteGroupsSharder) shard(_ context.Context, kreq kmsg.Request) ([]issueShard, bool, error) { +func (*deleteGroupsSharder) unpackPinReq() bool { return false } + +func (cl *deleteGroupsSharder) shard(ctx context.Context, kreq kmsg.Request, _ error) ([]issueShard, bool, error) { req := kreq.(*kmsg.DeleteGroupsRequest) - coordinators := cl.loadCoordinators(coordinatorTypeGroup, req.Groups...) + coordinators := cl.loadCoordinators(ctx, coordinatorTypeGroup, req.Groups...) type unkerr struct { err error group string @@ -3192,7 +3355,9 @@ func (*deleteGroupsSharder) merge(sresps []ResponseShard) (kmsg.Response, error) // handle sharding IncrementalAlterConfigsRequest type incrementalAlterConfigsSharder struct{ *Client } -func (*incrementalAlterConfigsSharder) shard(_ context.Context, kreq kmsg.Request) ([]issueShard, bool, error) { +func (*incrementalAlterConfigsSharder) unpackPinReq() bool { return false } + +func (*incrementalAlterConfigsSharder) shard(_ context.Context, kreq kmsg.Request, _ error) ([]issueShard, bool, error) { req := kreq.(*kmsg.IncrementalAlterConfigsRequest) brokerReqs := make(map[int32][]kmsg.IncrementalAlterConfigsRequestResource) @@ -3255,7 +3420,9 @@ func (*incrementalAlterConfigsSharder) merge(sresps []ResponseShard) (kmsg.Respo // handle sharding DescribeProducersRequest type describeProducersSharder struct{ *Client } -func (cl *describeProducersSharder) shard(ctx context.Context, kreq kmsg.Request) ([]issueShard, bool, error) { +func (*describeProducersSharder) unpackPinReq() bool { return false } + +func (cl *describeProducersSharder) shard(ctx context.Context, kreq kmsg.Request, _ error) ([]issueShard, bool, error) { req := kreq.(*kmsg.DescribeProducersRequest) var need []string @@ -3363,10 +3530,12 @@ func (*describeProducersSharder) merge(sresps []ResponseShard) (kmsg.Response, e // handles sharding DescribeTransactionsRequest type describeTransactionsSharder struct{ *Client } -func (cl *describeTransactionsSharder) shard(_ context.Context, kreq kmsg.Request) ([]issueShard, bool, error) { +func (*describeTransactionsSharder) unpackPinReq() bool { return false } + +func (cl *describeTransactionsSharder) shard(ctx context.Context, kreq kmsg.Request, _ error) ([]issueShard, bool, error) { req := kreq.(*kmsg.DescribeTransactionsRequest) - coordinators := cl.loadCoordinators(coordinatorTypeTxn, req.TransactionalIDs...) + coordinators := cl.loadCoordinators(ctx, coordinatorTypeTxn, req.TransactionalIDs...) type unkerr struct { err error txnID string @@ -3449,7 +3618,9 @@ func (*describeTransactionsSharder) merge(sresps []ResponseShard) (kmsg.Response // handles sharding ListTransactionsRequest type listTransactionsSharder struct{ *Client } -func (cl *listTransactionsSharder) shard(ctx context.Context, kreq kmsg.Request) ([]issueShard, bool, error) { +func (*listTransactionsSharder) unpackPinReq() bool { return false } + +func (cl *listTransactionsSharder) shard(ctx context.Context, kreq kmsg.Request, _ error) ([]issueShard, bool, error) { req := kreq.(*kmsg.ListTransactionsRequest) return cl.allBrokersShardedReq(ctx, func() kmsg.Request { dup := *req