From 9467a951d71e511c28180239336093225082896c Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Mon, 9 Nov 2020 14:52:22 -0700 Subject: [PATCH] kgo.Client: add explicit ShardedRequest; .Broker: add RetriableRequest The prior Request always merged responses, which was not perfect all of the time. If one request to a broker failed of many requests, then we would only have a partial response, and it would not be clear that the response was partial. The new ShardedRequest returns _all_ issued request/response/error pairs and the broker that the requests went to, or an unknown broker if the request could not be issued. This is a rather large change that has a couple of other side effects: - the admin controller is un-cached whenever a NOT_CONTROLLER error is returned from an admin request. Previously, we would continue using the cached controller until a metadata reload. - this adds sharding for DescribeConfigs, AlterConfigs, and IncrementalAlterConfigs - for AlterReplicaLogDirs and DescribeLogDirs, requests are issued to all replicas, rather than just the leader - some requests in the Request flow are now retried, rather than failing (such as if we cannot load metadata on a certain broker when looking up where to send the request) Since some aspect of retrying has been abstracted into a function, this also now easily adds retrying an individual, broker dedicated request to the Broker type. --- pkg/kgo/broker.go | 4 + pkg/kgo/client.go | 2316 ++++++++++++++++++++++++++----------------- pkg/kgo/metadata.go | 2 +- 3 files changed, 1437 insertions(+), 885 deletions(-) diff --git a/pkg/kgo/broker.go b/pkg/kgo/broker.go index 090f2db6..bcdb936c 100644 --- a/pkg/kgo/broker.go +++ b/pkg/kgo/broker.go @@ -63,6 +63,10 @@ type waitingResp struct { err error } +var unknownMetadata = BrokerMetadata{ + NodeID: -1, +} + // BrokerMetadata is metadata for a broker. // // This struct mirrors kmsg.MetadataResponseBroker. diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index a8ea8352..176a6962 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -52,7 +52,17 @@ type Client struct { bufPool bufPool // for to brokers to share underlying reusable request buffers - controllerID int32 // atomic + controllerIDMu sync.Mutex + controllerID int32 + + // The following two ensure that we only have one fetchBrokerMetadata + // at once. This avoids unnecessary broker metadata requests and + // metadata trampling. + fetchingBrokersMu sync.Mutex + fetchingBrokers *struct { + done chan struct{} + err error + } producer producer consumer consumer @@ -276,48 +286,50 @@ func (cl *Client) waitTries(ctx context.Context, tries int) bool { // fetchBrokerMetadata issues a metadata request solely for broker information. func (cl *Client) fetchBrokerMetadata(ctx context.Context) error { - _, err := cl.fetchMetadata(ctx, false, nil) - return err + cl.fetchingBrokersMu.Lock() + wait := cl.fetchingBrokers + if wait != nil { + cl.fetchingBrokersMu.Unlock() + <-wait.done + return wait.err + } + wait = &struct { + done chan struct{} + err error + }{done: make(chan struct{})} + cl.fetchingBrokers = wait + cl.fetchingBrokersMu.Unlock() + + defer close(wait.done) + + _, _, wait.err = cl.fetchMetadata(ctx, false, nil) + return wait.err } -func (cl *Client) fetchMetadata(ctx context.Context, all bool, topics []string) (*kmsg.MetadataResponse, error) { - if all { - topics = nil - } else if len(topics) == 0 { - topics = []string{} - } - tries := 0 - tryStart := time.Now() - retryTimeout := cl.cfg.retryTimeout(3) // 3 is metadata request key -start: - tries++ - broker := cl.broker() +func (cl *Client) fetchMetadata(ctx context.Context, all bool, topics []string) (*broker, *kmsg.MetadataResponse, error) { req := &kmsg.MetadataRequest{ AllowAutoTopicCreation: cl.cfg.allowAutoTopicCreation, - // DO NOT preallocate topics, since nil is significant } for _, topic := range topics { req.Topics = append(req.Topics, kmsg.MetadataRequestTopic{Topic: topic}) } - kresp, err := broker.waitResp(ctx, req) - if err != nil { - if retryTimeout > 0 && time.Since(tryStart) > retryTimeout { - return nil, err - } - if err == ErrConnDead && tries < cl.cfg.brokerConnDeadRetries || (kerr.IsRetriable(err) || isRetriableBrokerErr(err)) && tries < cl.cfg.retries { - if ok := cl.waitTries(ctx, tries); ok { - goto start - } - return nil, err - } - return nil, err + if all { + req.Topics = nil + } else if len(topics) == 0 { + req.Topics = []kmsg.MetadataRequestTopic{} } - meta := kresp.(*kmsg.MetadataResponse) - if meta.ControllerID >= 0 { - atomic.StoreInt32(&cl.controllerID, meta.ControllerID) + + r := cl.retriable() + meta, err := req.RequestWith(ctx, r) + if err == nil { + if meta.ControllerID >= 0 { + cl.controllerIDMu.Lock() + cl.controllerID = meta.ControllerID + cl.controllerIDMu.Unlock() + } + cl.updateBrokers(meta.Brokers) } - cl.updateBrokers(meta.Brokers) - return meta, err + return r.last, meta, err } // updateBrokers is called with the broker portion of every metadata response. @@ -417,26 +429,28 @@ func (cl *Client) Close() { // If the request is a group or transaction coordinator request, this will // issue the request to the appropriate group or transaction coordinator. // -// For group coordinator requests, if the request contains multiple groups -// (delete groups, describe groups), the request is split into one request per -// broker containing the groups that broker can respond to. Thus, you do not -// have to worry about maxing groups that different brokers are coordinators -// for. All responses are merged. Only if all requests error is an error -// returned. -// // For transaction requests, the request is issued to the transaction // coordinator. However, if the request is an init producer ID request and the // request has no transactional ID, the request goes to any broker. // -// If the request is an of a ListOffsets, DeleteRecords, OffsetForLeaderEpoch, -// AlterReplicaLogDirs, or DescribeLogDirs request, this will properly split -// the request to send partitions to the appropriate broker. If you want to -// ensure the request is not split and instead sent directly to a single -// broker, use the Broker function. +// Some requests need to be split and sent to many brokers. For these requests, +// it is *highly* recommended to use ShardedRequest. Not all responses from +// many brokers can be cleanly merged. However, for the requests that are +// split, this does attempt to merge them in a sane way. // -// If the request is a ListGroups request, this will send ListGroups to every -// known broker after a broker metadata lookup. The first error code of any -// response is kept, and all responded groups are merged. +// The following requests are split: +// +// ListOffsets +// DescribeGroups +// ListGroups +// DeleteRecords +// OffsetForLeaderEpoch +// DescribeConfigs +// AlterConfigs +// AlterReplicaLogDirs +// DescribeLogDirs +// DeleteGroups +// IncrementalAlterConfigs // // In short, this method tries to do the correct thing depending on what type // of request is being issued. @@ -446,34 +460,133 @@ func (cl *Client) Close() { // just end up canceling and not receiving the response to what Kafka // inevitably does. func (cl *Client) Request(ctx context.Context, req kmsg.Request) (kmsg.Response, error) { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - var resp kmsg.Response - var err error - done := make(chan struct{}) - go func() { - defer close(done) - resp, err = cl.request(ctx, req) - }() - select { - case <-done: - return resp, err - case <-ctx.Done(): - return nil, ctx.Err() - case <-cl.ctx.Done(): - return nil, cl.ctx.Err() + resps, merge := cl.shardedRequest(ctx, req) + // If there is no merge function, only one request was issued directly + // to a broker. Return the resp and err directly. + if merge == nil { + return resps[0].Resp, resps[0].Err } + return merge(resps) } -// request is the logic for Request. -func (cl *Client) request(ctx context.Context, req kmsg.Request) (kmsg.Response, error) { - var resp kmsg.Response - var err error +func (cl *Client) retriable() *retriable { + return cl.retriableBrokerFn(func() (*broker, error) { return cl.broker(), nil }) +} + +func (cl *Client) retriableBrokerFn(fn func() (*broker, error)) *retriable { + return &retriable{cl: cl, br: fn} +} + +func (cl *Client) shouldRetry(tries int, err error) bool { + return err == ErrConnDead && tries < cl.cfg.brokerConnDeadRetries || (kerr.IsRetriable(err) || isRetriableBrokerErr(err)) && tries < cl.cfg.retries +} + +type retriable struct { + cl *Client + br func() (*broker, error) + last *broker + parseErr func(kmsg.Response) error +} + +func (r *retriable) Request(ctx context.Context, req kmsg.Request) (kmsg.Response, error) { tries := 0 tryStart := time.Now() - retryTimeout := cl.cfg.retryTimeout(req.Key()) + retryTimeout := r.cl.cfg.retryTimeout(req.Key()) start: tries++ + br, err := r.br() + r.last = br + if err != nil { + return nil, err + } + resp, err := r.last.waitResp(ctx, req) + if err == nil && r.parseErr != nil { + err = r.parseErr(resp) + } + if err != nil { + if retryTimeout > 0 && time.Since(tryStart) > retryTimeout { + return nil, err + } + if r.cl.shouldRetry(tries, err) && r.cl.waitTries(ctx, tries) { + goto start + } + return nil, err + } + return resp, err +} + +// ShardedResponse ties together a request with either the response it received +// or an error that prevented a response from being received. +type ShardedResponse struct { + // Meta contains the broker that this request was issued to, or an + // unknown (node ID -1) metadata if the request could not be issued. + // + // Requests can fail to even be issued if an appropriate broker cannot + // be loaded of if the client cannot understand the request. + Meta BrokerMetadata + + // Req is the request that was issued to this broker. + Req kmsg.Request + + // Resp is the response received from the broker, if any. + Resp kmsg.Response + + // Err, if non-nil, is the error that prevented a response from being + // received or the request from being issued. + Err error +} + +// ShardedRequest performs the same logic as Request, but returns all responses +// from any broker that the request was split to. This always returns at least +// one shard. +// +// There are only a few requests that are strongly recommended to explicitly +// use ShardedRequest; the rest can by default use Request. These few requests +// are mentioned in the documentation for Request. +// +// If, in the process of splitting a request, some topics or partitions are +// found to not exist, or Kafka replies that a request should go to a broker +// that does not exist, all those non-existent pieces are grouped into one +// request to the first seed broker. This will show up as a seed broker node ID +// (min int32) and the response will likely contain purely errors. +func (cl *Client) ShardedRequest(ctx context.Context, req kmsg.Request) []ShardedResponse { + resps, _ := cl.shardedRequest(ctx, req) + return resps +} + +type shardMerge func([]ShardedResponse) (kmsg.Response, error) + +func (cl *Client) shardedRequest(ctx context.Context, req kmsg.Request) ([]ShardedResponse, shardMerge) { + ctx, cancel := context.WithCancel(ctx) + done := make(chan struct{}) + defer close(done) + go func() { + defer cancel() + select { + case <-done: + case <-ctx.Done(): + case <-cl.ctx.Done(): + } + }() + + // First, handle any sharded request. This comes before the conditional + // below because this handles two group requests, which we do not want + // to fall into the handleCoordinatorReq logic. + switch req.(type) { + case *kmsg.ListOffsetsRequest, // key 2 + *kmsg.DescribeGroupsRequest, // key 15 + *kmsg.ListGroupsRequest, // key 16 + *kmsg.DeleteRecordsRequest, // key 21 + *kmsg.OffsetForLeaderEpochRequest, // key 23 + *kmsg.DescribeConfigsRequest, // key 32 + *kmsg.AlterConfigsRequest, // key 33 + *kmsg.AlterReplicaLogDirsRequest, // key 34 + *kmsg.DescribeLogDirsRequest, // key 35 + *kmsg.DeleteGroupsRequest, // key 42 + *kmsg.IncrementalAlterConfigsRequest: // key 44 + return cl.handleShardedReq(ctx, req) + } + if metaReq, isMetaReq := req.(*kmsg.MetadataRequest); isMetaReq { // We hijack any metadata request so as to populate our // own brokers and controller ID. @@ -481,64 +594,69 @@ start: for _, topic := range metaReq.Topics { topics = append(topics, topic.Topic) } - // fetchMetadata does its own retrying, so we do not go - // into the retrying logic below. - return cl.fetchMetadata(ctx, metaReq.Topics == nil, topics) - } else if _, admin := req.(kmsg.AdminRequest); admin { - var controller *broker - if controller, err = cl.controller(ctx); err == nil { - resp, err = controller.waitResp(ctx, req) - } + // fetchMetadata does its own retrying, so we do not do + // retrying here. + br, resp, err := cl.fetchMetadata(ctx, metaReq.Topics == nil, topics) + return shards(shard(br, req, resp, err)), nil + + } else if adminReq, admin := req.(kmsg.AdminRequest); admin { + return shards(cl.handleAdminReq(ctx, adminReq)), nil + } else if groupReq, isGroupReq := req.(kmsg.GroupCoordinatorRequest); isGroupReq { - resp, err = cl.handleCoordinatorReq(ctx, groupReq, coordinatorTypeGroup) + return shards(cl.handleCoordinatorReq(ctx, groupReq, coordinatorTypeGroup)), nil + } else if txnReq, isTxnReq := req.(kmsg.TxnCoordinatorRequest); isTxnReq { - resp, err = cl.handleCoordinatorReq(ctx, txnReq, coordinatorTypeTxn) - } else { - switch t := req.(type) { - case *kmsg.ListOffsetsRequest, - *kmsg.DeleteRecordsRequest, - *kmsg.OffsetForLeaderEpochRequest, - *kmsg.AlterReplicaLogDirsRequest, - *kmsg.DescribeLogDirsRequest: - resp, err = cl.handleShardedReq(ctx, req) - case *kmsg.ListGroupsRequest: - resp, err = cl.handleListGroupsReq(ctx, t) - case *kmsg.ApiVersionsRequest: - // As of v3, software name and version are required. - // If they are missing, we use the config options. - if t.ClientSoftwareName == "" && t.ClientSoftwareVersion == "" { - dup := *t - dup.ClientSoftwareName = cl.cfg.softwareName - dup.ClientSoftwareVersion = cl.cfg.softwareVersion - req = &dup - } - resp, err = cl.broker().waitResp(ctx, req) - default: - resp, err = cl.broker().waitResp(ctx, req) + return shards(cl.handleCoordinatorReq(ctx, txnReq, coordinatorTypeTxn)), nil + + } else if apiVersReq, isApiVersReq := req.(*kmsg.ApiVersionsRequest); isApiVersReq { + // As of v3, software name and version are required. + // If they are missing, we use the config options. + if apiVersReq.ClientSoftwareName == "" && apiVersReq.ClientSoftwareVersion == "" { + dup := *apiVersReq + dup.ClientSoftwareName = cl.cfg.softwareName + dup.ClientSoftwareVersion = cl.cfg.softwareVersion + req = &dup } } - if err != nil { - if retryTimeout > 0 && time.Since(tryStart) > retryTimeout { - return nil, err - } - if err == ErrConnDead && tries < cl.cfg.brokerConnDeadRetries || (kerr.IsRetriable(err) || isRetriableBrokerErr(err)) && tries < cl.cfg.retries { - if ok := cl.waitTries(ctx, tries); ok { - goto start - } - return nil, err - } + // All other requests not handled above can be issued to any broker + // with the default retriable logic. + r := cl.retriable() + resp, err := r.Request(ctx, req) + return shards(shard(r.last, req, resp, err)), nil +} + +func shard(br *broker, req kmsg.Request, resp kmsg.Response, err error) ShardedResponse { + if br == nil { // the broker could be nil if loading the broker failed. + return ShardedResponse{unknownMetadata, req, resp, err} } - return resp, err + return ShardedResponse{br.meta, req, resp, err} +} + +func shards(shard ...ShardedResponse) []ShardedResponse { + return shard } // brokerOrErr returns the broker for ID or the error if the broker does not // exist. -func (cl *Client) brokerOrErr(id int32, err error) (*broker, error) { +// +// If tryLoad is true and the broker does not exist, this attempts a broker +// metadata load once before failing. If the metadata load fails, this returns +// that error. +func (cl *Client) brokerOrErr(ctx context.Context, id int32, err error) (*broker, error) { + tryLoad := ctx != nil +start: cl.brokersMu.RLock() broker := cl.brokers[id] cl.brokersMu.RUnlock() + if broker == nil { + if tryLoad { + if loadErr := cl.fetchBrokerMetadata(ctx); loadErr != nil { + return nil, loadErr + } + goto start + } return nil, err } return broker, nil @@ -547,17 +665,33 @@ func (cl *Client) brokerOrErr(id int32, err error) (*broker, error) { // controller returns the controller broker, forcing a broker load if // necessary. func (cl *Client) controller(ctx context.Context) (*broker, error) { + get := func() int32 { + cl.controllerIDMu.Lock() + defer cl.controllerIDMu.Unlock() + return cl.controllerID + } + var id int32 - if id = atomic.LoadInt32(&cl.controllerID); id < 0 { + if id = get(); id < 0 { if err := cl.fetchBrokerMetadata(ctx); err != nil { return nil, err } - if id = atomic.LoadInt32(&cl.controllerID); id < 0 { + if id = get(); id < 0 { return nil, &errUnknownController{id} } } - return cl.brokerOrErr(id, &errUnknownController{id}) + return cl.brokerOrErr(nil, id, &errUnknownController{id}) +} + +// forgetControllerID is called once an admin requests sees NOT_CONTROLLER. +func (cl *Client) forgetControllerID(id int32) { + cl.controllerIDMu.Lock() + defer cl.controllerIDMu.Unlock() + + if cl.controllerID == id { + cl.controllerID = unknownControllerID + } } const ( @@ -571,51 +705,22 @@ type coordinatorKey struct { } // loadController returns the group/txn coordinator for the given key, retrying -// as necessary. -func (cl *Client) loadCoordinator(ctx context.Context, key coordinatorKey) (*broker, error) { - // If there is no controller, we have never loaded brokers. We will - // need the brokers after we know which one owns this key, so force - // a load of the brokers now. - if atomic.LoadInt32(&cl.controllerID) < 0 { - if _, err := cl.controller(ctx); err != nil { - return nil, err - } - } - - tries := 0 - tryStart := time.Now() - retryTimeout := cl.cfg.retryTimeout(10) // 10 is find coordinator key -start: +// as necessary. If reload is true, this does not used a cache coordinator. +func (cl *Client) loadCoordinator(reload bool, ctx context.Context, key coordinatorKey) (*broker, error) { cl.coordinatorsMu.Lock() coordinator, ok := cl.coordinators[key] cl.coordinatorsMu.Unlock() - if ok { - return cl.brokerOrErr(coordinator, &errUnknownCoordinator{coordinator, key}) + if !reload && ok { + return cl.brokerOrErr(nil, coordinator, &errUnknownCoordinator{coordinator, key}) } - tries++ - kresp, err := cl.broker().waitResp(ctx, &kmsg.FindCoordinatorRequest{ + resp, err := (&kmsg.FindCoordinatorRequest{ CoordinatorKey: key.name, CoordinatorType: key.typ, - }) - - var resp *kmsg.FindCoordinatorResponse - if err == nil { - resp = kresp.(*kmsg.FindCoordinatorResponse) - err = kerr.ErrorForCode(resp.ErrorCode) - } + }).RequestWith(ctx, cl.retriable()) if err != nil { - if retryTimeout > 0 && time.Since(tryStart) > retryTimeout { - return nil, err - } - if err == ErrConnDead && tries < cl.cfg.brokerConnDeadRetries || (kerr.IsRetriable(err) || isRetriableBrokerErr(err)) && tries < cl.cfg.retries { - if ok := cl.waitTries(ctx, tries); ok { - goto start - } - return nil, err - } return nil, err } @@ -624,11 +729,28 @@ start: cl.coordinators[key] = coordinator cl.coordinatorsMu.Unlock() - return cl.brokerOrErr(coordinator, &errUnknownCoordinator{coordinator, key}) + return cl.brokerOrErr(ctx, coordinator, &errUnknownCoordinator{coordinator, key}) +} + +func (cl *Client) maybeDeleteStaleCoordinator(name string, typ int8, err error) bool { + switch err { + case kerr.CoordinatorNotAvailable, + kerr.CoordinatorLoadInProgress, + kerr.NotCoordinator: + + cl.coordinatorsMu.Lock() + delete(cl.coordinators, coordinatorKey{ + name: name, + typ: typ, + }) + cl.coordinatorsMu.Unlock() + return true + } + return false } // loadCoordinators does a concurrent load of many coordinators. -func (cl *Client) loadCoordinators(typ int8, names ...string) (map[string]*broker, error) { +func (cl *Client) loadCoordinators(reload bool, typ int8, names ...string) (map[string]*broker, error) { ctx, cancel := context.WithCancel(cl.ctx) defer cancel() @@ -642,7 +764,7 @@ func (cl *Client) loadCoordinators(typ int8, names ...string) (map[string]*broke wg.Add(1) go func() { defer wg.Done() - coordinator, err := cl.loadCoordinator(ctx, coordinatorKey{ + coordinator, err := cl.loadCoordinator(reload, ctx, coordinatorKey{ name: myName, typ: typ, }) @@ -665,34 +787,74 @@ func (cl *Client) loadCoordinators(typ int8, names ...string) (map[string]*broke return m, errQuit } -// handleCoordinatorEq issues group or txn requests. -// -// The logic for group requests is mildly convoluted; a single request can -// contain multiple groups which could go to multiple brokers due to the group -// coordinators being different. -// -// All transaction requests are simple. -// -// Most requests go to one coordinator; those are simple and we issue those -// simply. -// -// Requests that go to multiple have the groups split into individual requests -// containing a single group. We only return err if all requests error. -func (cl *Client) handleCoordinatorReq(ctx context.Context, req kmsg.Request, typ int8) (kmsg.Response, error) { - // If we have to split requests, the following four variables are - // used for splitting and then merging responses. - var ( - broker2req map[*broker]kmsg.Request - names []string - kresp kmsg.Response - merge func(kmsg.Response) - ) +func (cl *Client) handleAdminReq(ctx context.Context, req kmsg.Request) ShardedResponse { + // Loading a controller can perform some wait; we accept that and do + // not account for the retries or the time to load the controller as + // part of the retries / time to issue the req. + r := cl.retriableBrokerFn(func() (*broker, error) { + return cl.controller(ctx) + }) + + r.parseErr = func(resp kmsg.Response) error { + var code int16 + switch t := resp.(type) { + case *kmsg.CreateTopicsResponse: + if len(t.Topics) > 0 { + code = t.Topics[0].ErrorCode + } + case *kmsg.DeleteTopicsResponse: + if len(t.Topics) > 0 { + code = t.Topics[0].ErrorCode + } + case *kmsg.CreatePartitionsResponse: + if len(t.Topics) > 0 { + code = t.Topics[0].ErrorCode + } + case *kmsg.ElectLeadersResponse: + if len(t.Topics) > 0 && len(t.Topics[0].Partitions) > 0 { + code = t.Topics[0].Partitions[0].ErrorCode + } + case *kmsg.AlterPartitionAssignmentsResponse: + code = t.ErrorCode + case *kmsg.ListPartitionReassignmentsResponse: + code = t.ErrorCode + case *kmsg.AlterUserSCRAMCredentialsResponse: + if len(t.Results) > 0 { + code = t.Results[0].ErrorCode + } + case *kmsg.VoteResponse: + code = t.ErrorCode + case *kmsg.BeginQuorumEpochResponse: + code = t.ErrorCode + case *kmsg.EndQuorumEpochResponse: + code = t.ErrorCode + case *kmsg.DescribeQuorumResponse: + code = t.ErrorCode + case *kmsg.AlterISRResponse: + code = t.ErrorCode + case *kmsg.UpdateFeaturesResponse: + code = t.ErrorCode + } + err := kerr.ErrorForCode(code) + if err == kerr.NotController { + // There must be a last broker if we were able to issue + // the request and get a response. + cl.forgetControllerID(r.last.meta.NodeID) + } + return err + } + + resp, err := r.Request(ctx, req) + return shard(r.last, req, resp, err) +} +// handleCoordinatorEq issues simple (non-shardable) group or txn requests. +func (cl *Client) handleCoordinatorReq(ctx context.Context, req kmsg.Request, typ int8) ShardedResponse { switch t := req.(type) { default: // All group requests should be listed below, so if it isn't, // then we do not know what this request is. - return nil, ErrClientTooOld + return shard(nil, req, nil, ErrClientTooOld) ///////// // TXN // -- all txn reqs are simple @@ -707,7 +869,8 @@ func (cl *Client) handleCoordinatorReq(ctx context.Context, req kmsg.Request, ty // retriable-error parsing, even though we are not actually // using a defined txn coordinator. This is fine; by passing no // names, we delete no coordinator. - return cl.handleReqWithCoordinator(ctx, cl.broker(), coordinatorTypeTxn, nil, req) + coordinator, resp, err := cl.handleReqWithCoordinator(ctx, func() (*broker, error) { return cl.broker(), nil }, coordinatorTypeTxn, "", req) + return shard(coordinator, req, resp, err) case *kmsg.AddPartitionsToTxnRequest: return cl.handleCoordinatorReqSimple(ctx, coordinatorTypeTxn, t.TransactionalID, req) case *kmsg.AddOffsetsToTxnRequest: @@ -733,204 +896,87 @@ func (cl *Client) handleCoordinatorReq(ctx context.Context, req kmsg.Request, ty return cl.handleCoordinatorReqSimple(ctx, coordinatorTypeGroup, t.Group, req) case *kmsg.SyncGroupRequest: return cl.handleCoordinatorReqSimple(ctx, coordinatorTypeGroup, t.Group, req) - - case *kmsg.DescribeGroupsRequest: - names = append(names, t.Groups...) - coordinators, err := cl.loadCoordinators(coordinatorTypeGroup, names...) - if err != nil { - return nil, err - } - broker2req = make(map[*broker]kmsg.Request) - - for _, group := range t.Groups { - broker := coordinators[group] - if broker2req[broker] == nil { - broker2req[broker] = &kmsg.DescribeGroupsRequest{ - IncludeAuthorizedOperations: t.IncludeAuthorizedOperations, - } - } - req := broker2req[broker].(*kmsg.DescribeGroupsRequest) - req.Groups = append(req.Groups, group) - } - - resp := new(kmsg.DescribeGroupsResponse) - kresp = resp - merge = func(newKResp kmsg.Response) { - newResp := newKResp.(*kmsg.DescribeGroupsResponse) - resp.Version = newResp.Version - resp.ThrottleMillis = newResp.ThrottleMillis - resp.Groups = append(resp.Groups, newResp.Groups...) - } - - case *kmsg.DeleteGroupsRequest: - names = append(names, t.Groups...) - coordinators, err := cl.loadCoordinators(coordinatorTypeGroup, names...) - if err != nil { - return nil, err - } - broker2req = make(map[*broker]kmsg.Request) - - for _, group := range t.Groups { - broker := coordinators[group] - if broker2req[broker] == nil { - broker2req[broker] = new(kmsg.DeleteGroupsRequest) - } - req := broker2req[broker].(*kmsg.DeleteGroupsRequest) - req.Groups = append(req.Groups, group) - } - - resp := new(kmsg.DeleteGroupsResponse) - kresp = resp - merge = func(newKResp kmsg.Response) { - newResp := newKResp.(*kmsg.DeleteGroupsResponse) - resp.Version = newResp.Version - resp.ThrottleMillis = newResp.ThrottleMillis - resp.Groups = append(resp.Groups, newResp.Groups...) - } - } - - var ( - mergeMu sync.Mutex - wg sync.WaitGroup - firstErr error - errs int - ) - for broker, req := range broker2req { - wg.Add(1) - myBroker, myReq := broker, req - go func() { - defer wg.Done() - resp, err := cl.handleReqWithCoordinator(ctx, myBroker, typ, names, myReq) - - mergeMu.Lock() - defer mergeMu.Unlock() - - if err != nil { - errs++ - if firstErr == nil { - firstErr = err - } - return - } - merge(resp) - }() - } - wg.Wait() - - if errs == len(broker2req) { - return kresp, firstErr } - return kresp, nil } // handleCoordinatorReqSimple issues a request that contains a single group or // txn to its coordinator. // // The error is inspected to see if it is a retriable error and, if so, the -// coordinator is deleted. That is, we only retry on coordinator errors, which -// would be common on all partitions. Thus, if the response contains many -// errors due to many partitions, only the first partition needs to be -// investigated. -func (cl *Client) handleCoordinatorReqSimple(ctx context.Context, typ int8, name string, req kmsg.Request) (kmsg.Response, error) { - coordinator, err := cl.loadCoordinator(ctx, coordinatorKey{ - name: name, - typ: typ, - }) - if err != nil { - return nil, err - } - return cl.handleReqWithCoordinator(ctx, coordinator, typ, []string{name}, req) +// coordinator is deleted. +func (cl *Client) handleCoordinatorReqSimple(ctx context.Context, typ int8, name string, req kmsg.Request) ShardedResponse { + coordinator, resp, err := cl.handleReqWithCoordinator(ctx, func() (*broker, error) { + return cl.loadCoordinator(false, ctx, coordinatorKey{ + name: name, + typ: typ, + }) + }, typ, name, req) + return shard(coordinator, req, resp, err) } // handleReqWithCoordinator actually issues a request to a coordinator and -// does retry error parsing. +// does retry handling. +// +// This avoids retries on the two group requests that need to be sharded. func (cl *Client) handleReqWithCoordinator( ctx context.Context, - coordinator *broker, + coordinator func() (*broker, error), typ int8, - names []string, // group IDs or the transactional id + name string, // group ID or the transactional id req kmsg.Request, -) (kmsg.Response, error) { - kresp, err := coordinator.waitResp(ctx, req) - if err != nil { - return kresp, err - } - - var errCode int16 - switch t := kresp.(type) { - - ///////// - // TXN // - ///////// - - case *kmsg.InitProducerIDResponse: - errCode = t.ErrorCode - case *kmsg.AddPartitionsToTxnResponse: - if len(t.Topics) > 0 { - if len(t.Topics[0].Partitions) > 0 { - errCode = t.Topics[0].Partitions[0].ErrorCode +) (*broker, kmsg.Response, error) { + + r := cl.retriableBrokerFn(coordinator) + r.parseErr = func(resp kmsg.Response) error { + var code int16 + switch t := resp.(type) { + + // TXN + case *kmsg.InitProducerIDResponse: + code = t.ErrorCode + case *kmsg.AddPartitionsToTxnResponse: + if len(t.Topics) > 0 && len(t.Topics[0].Partitions) > 0 { + code = t.Topics[0].Partitions[0].ErrorCode } - } - case *kmsg.AddOffsetsToTxnResponse: - errCode = t.ErrorCode - case *kmsg.EndTxnResponse: - errCode = t.ErrorCode - - /////////// - // GROUP // - /////////// - - case *kmsg.OffsetCommitResponse: - if len(t.Topics) > 0 && len(t.Topics[0].Partitions) > 0 { - errCode = t.Topics[0].Partitions[0].ErrorCode - } - case *kmsg.TxnOffsetCommitResponse: - if len(t.Topics) > 0 { - if len(t.Topics[0].Partitions) > 0 { - errCode = t.Topics[0].Partitions[0].ErrorCode + case *kmsg.AddOffsetsToTxnResponse: + code = t.ErrorCode + case *kmsg.EndTxnResponse: + code = t.ErrorCode + + // GROUP + case *kmsg.OffsetCommitResponse: + if len(t.Topics) > 0 && len(t.Topics[0].Partitions) > 0 { + code = t.Topics[0].Partitions[0].ErrorCode } - } - case *kmsg.OffsetFetchResponse: - if t.Version >= 2 { - errCode = t.ErrorCode - } else if len(t.Topics) > 0 && len(t.Topics[0].Partitions) > 0 { - errCode = t.Topics[0].Partitions[0].ErrorCode - } - case *kmsg.JoinGroupResponse: - errCode = t.ErrorCode - case *kmsg.HeartbeatResponse: - errCode = t.ErrorCode - case *kmsg.LeaveGroupResponse: - errCode = t.ErrorCode - case *kmsg.SyncGroupResponse: - errCode = t.ErrorCode - case *kmsg.DescribeGroupsResponse: - if len(t.Groups) > 0 { - errCode = t.Groups[0].ErrorCode - } - case *kmsg.DeleteGroupsResponse: - if len(t.Groups) > 0 { - errCode = t.Groups[0].ErrorCode - } - } - - switch retriableErr := kerr.ErrorForCode(errCode); retriableErr { - case kerr.CoordinatorNotAvailable, - kerr.CoordinatorLoadInProgress, - kerr.NotCoordinator: - err = retriableErr + case *kmsg.TxnOffsetCommitResponse: + if len(t.Topics) > 0 && len(t.Topics[0].Partitions) > 0 { + code = t.Topics[0].Partitions[0].ErrorCode + } + case *kmsg.OffsetFetchResponse: + if t.Version >= 2 { + code = t.ErrorCode + } else if len(t.Topics) > 0 && len(t.Topics[0].Partitions) > 0 { + code = t.Topics[0].Partitions[0].ErrorCode + } + case *kmsg.JoinGroupResponse: + code = t.ErrorCode + case *kmsg.HeartbeatResponse: + code = t.ErrorCode + case *kmsg.LeaveGroupResponse: + code = t.ErrorCode + case *kmsg.SyncGroupResponse: + code = t.ErrorCode - cl.coordinatorsMu.Lock() - for _, name := range names { - delete(cl.coordinators, coordinatorKey{ - name: name, - typ: typ, - }) } - cl.coordinatorsMu.Unlock() + // Describe and Delete handled in sharding. + + err := kerr.ErrorForCode(code) + cl.maybeDeleteStaleCoordinator(name, typ, err) + return err } - return kresp, err + resp, err := r.Request(ctx, req) + return r.last, resp, err } // Broker returns a handle to a specific broker to directly issue requests to. @@ -978,654 +1024,1156 @@ func (cl *Client) SeedBrokers() []*Broker { } } -// handleListGroupsReq issues a list group request to every broker following a -// metadata update. We do no retries unless everything fails, at which point -// the calling function will retry. -func (cl *Client) handleListGroupsReq(ctx context.Context, req *kmsg.ListGroupsRequest) (kmsg.Response, error) { - if err := cl.fetchBrokerMetadata(ctx); err != nil { - return nil, err - } - - var wg sync.WaitGroup - type respErr struct { - resp kmsg.Response - err error - } - cl.brokersMu.RLock() - respErrs := make(chan respErr, len(cl.brokers)) - var numReqs int - for _, br := range cl.brokers { - if br.meta.NodeID < 0 { - continue // we skip seed brokers - } - wg.Add(1) - numReqs++ - myReq := *req - go func(br *broker) { - defer wg.Done() - resp, err := br.waitResp(ctx, &myReq) - respErrs <- respErr{resp, err} - }(br) - } - cl.brokersMu.RUnlock() - wg.Wait() - close(respErrs) +// Broker pairs a broker ID with a client to directly issue requests to a +// specific broker. +type Broker struct { + id int32 + cl *Client +} - var mergeResp kmsg.ListGroupsResponse - var firstErr error - var errs int - for re := range respErrs { - if re.err != nil { - if firstErr == nil { - firstErr = re.err - errs++ - } - continue - } - resp := re.resp.(*kmsg.ListGroupsResponse) - if mergeResp.ErrorCode == 0 { - mergeResp.ErrorCode = resp.ErrorCode - } - mergeResp.Version = resp.Version - mergeResp.ThrottleMillis = resp.ThrottleMillis - mergeResp.Groups = append(mergeResp.Groups, resp.Groups...) - } +// Request issues a request to a broker. If the broker does not exist in the +// client, this returns ErrUnknownBroker. Requests are not retried. +// +// The passed context can be used to cancel a request and return early. +// Note that if the request is not canceled before it is written to Kafka, +// you may just end up canceling and not receiving the response to what Kafka +// inevitably does. +// +// It is more beneficial to always use RetriableRequest. +func (b *Broker) Request(ctx context.Context, req kmsg.Request) (kmsg.Response, error) { + return b.request(false, ctx, req) +} - if errs == numReqs { - return nil, firstErr +// RetriableRequest issues a request to a broker the same as Broker, but +// retries in the face of retriable broker connection errors. This does not +// retry on response internal errors. +func (b *Broker) RetriableRequest(ctx context.Context, req kmsg.Request) (kmsg.Response, error) { + return b.request(true, ctx, req) +} + +func (b *Broker) request(retry bool, ctx context.Context, req kmsg.Request) (kmsg.Response, error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + var resp kmsg.Response + var err error + done := make(chan struct{}) + + go func() { + defer close(done) + + if !retry { + var br *broker + br, err = b.cl.brokerOrErr(ctx, b.id, ErrUnknownBroker) + if err == nil { + resp, err = br.waitResp(ctx, req) + } + } else { + resp, err = b.cl.retriableBrokerFn(func() (*broker, error) { + return b.cl.brokerOrErr(ctx, b.id, ErrUnknownBroker) + }).Request(ctx, req) + } + }() + + select { + case <-done: + return resp, err + case <-ctx.Done(): + return nil, ctx.Err() + case <-b.cl.ctx.Done(): + return nil, b.cl.ctx.Err() } - return &mergeResp, nil } -// handleShardedReq is simple-in-theory function that is long due to types. -// This simply sends all partitions of a broker-sharded request to the -// appropriate brokers and then merges the response. -// -// Handled: -// - list offsets (key 2) -// - delete records request (key 21) -// - offset for leader epoch (key 23) -// - alter replica log dirs (key 34) -// - describe log dirs (key 35) -func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) (kmsg.Response, error) { - // First, pull out the topics from either request and set them as - // topics we need to load metadata for. - var needTopics []string - switch t := req.(type) { +////////////////////// +// REQUEST SHARDING // +////////////////////// + +// Below here lies all logic to handle requests that need to be split and sent +// to many brokers. A lot of the logic for each sharding function is very +// similar, but each sharding function uses slightly different types. + +// issueShard is a request that has been split and is ready to be sent to the +// given broker ID. +type issueShard struct { + req kmsg.Request + broker int32 + any bool +} + +// sharder splits a request. +type sharder interface { + // shard splits a request and returns the requests to issue tied to the + // brokers to issue the requests to. This can return an error if there + // is some pre-loading that needs to happen. If an error is returned, + // the request that was intended for splitting is failed wholesale. + // + // Due to sharded requests not being retriable if a response is + // received, to avoid stale coordinator errors, this function should + // not use any previously cached metadata. + shard(context.Context, kmsg.Request) ([]issueShard, bool, error) + + // onResp is called on a successful response to investigate the + // response and potentially perform cleanup. + // + // We cannot retry responses that have retriable errors inside of them; + // doing so would require a very manual and tedious process: + // - pair all request partitions to the response partition (maybe the + // response is missing some pieces because of a buggy kafka) + // - split non-retriable pieces of the request & response: + // - any missing response pieces have a request piece that is not + // retriable + // - any matching piece can be retriable if the response piece err + // is retriable + // - return the non-retriable request & response piece, and the retriable + // request piece and err. + // + // Because the pairing is manual and tedious, and because the shard + // function above loads fresh metadata, we expect to not fall into + // stale metadata / coordinators before we issue the sharded requests. + // + // As well, for group describing or deleting, we force a load of the + // coordinators on every shard request. Thus, we do not expect the + // coordinators to be stale. + onResp(kmsg.Response) + + // merge is a function that can be used to merge sharded responses into + // one response. This is used by the client.Request method. + merge([]ShardedResponse) (kmsg.Response, error) +} + +// handleShardedReq splits and issues requests to brokers, recursively +// splitting as necessary if requests fail and need remapping. +func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]ShardedResponse, shardMerge) { + + // First, determine our sharder. + var sharder sharder + switch req.(type) { case *kmsg.ListOffsetsRequest: - for _, topic := range t.Topics { - needTopics = append(needTopics, topic.Topic) - } + sharder = &listOffsetsSharder{cl} + case *kmsg.DescribeGroupsRequest: + sharder = &describeGroupsSharder{cl} + case *kmsg.ListGroupsRequest: + sharder = &listGroupsSharder{cl} case *kmsg.DeleteRecordsRequest: - for _, topic := range t.Topics { - needTopics = append(needTopics, topic.Topic) - } + sharder = &deleteRecordsSharder{cl} case *kmsg.OffsetForLeaderEpochRequest: - for _, topic := range t.Topics { - needTopics = append(needTopics, topic.Topic) - } + sharder = &offsetForLeaderEpochSharder{cl} + case *kmsg.DescribeConfigsRequest: + sharder = &describeConfigsSharder{cl} + case *kmsg.AlterConfigsRequest: + sharder = &alterConfigsSharder{cl} case *kmsg.AlterReplicaLogDirsRequest: - for _, dir := range t.Dirs { - for _, topic := range dir.Topics { - needTopics = append(needTopics, topic.Topic) - } - } + sharder = &alterReplicaLogDirsSharder{cl} case *kmsg.DescribeLogDirsRequest: - for _, topic := range t.Topics { - needTopics = append(needTopics, topic.Topic) - } - } - cl.topicsMu.Lock() - topics := cl.cloneTopics() - for _, topic := range needTopics { - if _, exists := topics[topic]; !exists { - topics[topic] = newTopicPartitions(topic) - } - } - cl.topics.Store(topics) - cl.topicsMu.Unlock() - - // While we have not loaded metadata for *all* of the topics, force - // load metadata. Ideally, this will only wait for one metadata. - needLoad := true - for needLoad && ctx.Err() == nil { - cl.waitmeta(ctx, 5*time.Second) - needLoad = false - topics = cl.loadTopics() - for _, topic := range needTopics { - topicPartitions := topics[topic].load() - if len(topicPartitions.all) == 0 && topicPartitions.loadErr == nil { - needLoad = true - } - } + sharder = &describeLogDirsSharder{cl} + case *kmsg.DeleteGroupsRequest: + sharder = &deleteGroupsSharder{cl} + case *kmsg.IncrementalAlterConfigsRequest: + sharder = &incrementalAlterConfigsSharder{cl} + } + + // If a request fails, we re-shard it (in case it needs to be split + // again). reqTry tracks how many total tries a request piece has had; + // we quit at either the max configured tries or max configured time. + type reqTry struct { + tries int + req kmsg.Request } - // Now, we split the incoming request by broker that handles the - // request's partitions. - broker2req := make(map[*broker]kmsg.Request) - var kresp kmsg.Response - var merge func(kmsg.Response) // serially called - var finalize func() + var ( + shardsMu sync.Mutex + shards []ShardedResponse - // We hold the brokers mu while determining what to split by - // so that we can look up leader partitions. - cl.brokersMu.RLock() - brokers := cl.brokers + addShard = func(shard ShardedResponse) { + shardsMu.Lock() + defer shardsMu.Unlock() + shards = append(shards, shard) + } - switch t := req.(type) { - case *kmsg.ListOffsetsRequest: - resp := new(kmsg.ListOffsetsResponse) - kresp = resp + start = time.Now() + retryTimeout = cl.cfg.retryTimeout(req.Key()) - reqParts := make(map[*broker]map[string][]kmsg.ListOffsetsRequestTopicPartition) - respParts := make(map[string][]kmsg.ListOffsetsResponseTopicPartition) + wg sync.WaitGroup + issue func(reqs []reqTry) + ) - // Over all the req topics, - for _, topic := range t.Topics { - topicPartitions := topics[topic.Topic].load() - // Over each topic's partitions, - for _, partition := range topic.Partitions { - // if we could not load the metadata for this partition, we save - // in the resp UnknownTopicOrPartition, - topicPartition, exists := topicPartitions.all[partition.Partition] - if !exists { - respParts[topic.Topic] = append(respParts[topic.Topic], kmsg.ListOffsetsResponseTopicPartition{ - Partition: partition.Partition, - ErrorCode: kerr.UnknownTopicOrPartition.Code, - }) - continue - } + // issue is called to progressively split and issue requests. + // + // This recursively calls itself if a request fails and can be retried. + issue = func(reqTries []reqTry) { + defer wg.Done() + + var rereqs []reqTry + defer func() { + if len(rereqs) > 0 { + wg.Add(1) + issue(rereqs) + } + }() - // or, if we could load the metadata, but the load err is non-nil - // or the broker is nil, we save an error, - broker := brokers[topicPartition.leader] - if topicPartition.loadErr != nil || broker == nil { - errCode := kerr.UnknownServerError.Code - if topicPartition.loadErr != nil { - if ke, ok := topicPartition.loadErr.(*kerr.Error); ok { - errCode = ke.Code + for i := range reqTries { + try := reqTries[i] + issues, reshardable, err := sharder.shard(ctx, try.req) + if err != nil { + addShard(shard(nil, try.req, nil, err)) // failure to shard means data loading failed; this request is failed + continue + } + + for i := range issues { + wg.Add(1) + tries := try.tries + issue := issues[i] + go func() { + defer wg.Done() + start: + tries++ + + broker := cl.broker() + var err error + if !issue.any { + broker, err = cl.brokerOrErr(ctx, issue.broker, ErrUnknownBroker) + } + if err != nil { + addShard(shard(nil, issue.req, nil, err)) // failure to load a broker is a failure to issue a request + return + } + + resp, err := broker.waitResp(ctx, issue.req) + if err == nil { + // Successful responses may need to perform some + // response internal error checking cleanup. + // So, we call onResp, then keep the response. + sharder.onResp(resp) + addShard(shard(broker, issue.req, resp, nil)) + return + } + + // If we failed to issue the request, we *maybe* will retry. + // We could have failed to even issue the request or receive + // a response, which is retriable. + if err != nil && (retryTimeout == 0 || time.Since(start) < retryTimeout) && cl.shouldRetry(tries, err) && cl.waitTries(ctx, tries) { + // Non-reshardable re-requests just jump back to the + // top where the broker is loaded. This is the case on + // requests where the original request is split to + // dedicated brokers; we do not want to re-shard that. + if !reshardable { + goto start } + rereqs = append(rereqs, reqTry{tries, issue.req}) + return } - respParts[topic.Topic] = append(respParts[topic.Topic], kmsg.ListOffsetsResponseTopicPartition{ - Partition: partition.Partition, - ErrorCode: errCode, - }) - continue - } - // otherwise, for this broker, we ask for this partition. - brokerReqParts := reqParts[broker] - if brokerReqParts == nil { - brokerReqParts = make(map[string][]kmsg.ListOffsetsRequestTopicPartition) - reqParts[broker] = brokerReqParts - } - brokerReqParts[topic.Topic] = append(brokerReqParts[topic.Topic], partition) + addShard(shard(broker, issue.req, nil, err)) // the error was not retriable + }() } } + } - // Now, over each req part (per broker request), we initialize our - // request for the broker. - for broker, brokerReqParts := range reqParts { - req := &kmsg.ListOffsetsRequest{ - ReplicaID: t.ReplicaID, - IsolationLevel: t.IsolationLevel, - } - for topic, parts := range brokerReqParts { - req.Topics = append(req.Topics, kmsg.ListOffsetsRequestTopic{ - Topic: topic, - Partitions: parts, - }) + wg.Add(1) + issue([]reqTry{{0, req}}) + wg.Wait() + + return shards, sharder.merge +} + +// a convenience function for saving the first ShardedResponse error. +func firstErrMerger(sresps []ShardedResponse, merge func(kresp kmsg.Response)) error { + var firstErr error + for _, sresp := range sresps { + if sresp.Err != nil { + if firstErr == nil { + firstErr = sresp.Err } - broker2req[broker] = req + continue + } + merge(sresp.Resp) + } + return firstErr +} + +type mappedMetadataTopic struct { + topic kmsg.MetadataResponseTopic + mapping map[int32]kmsg.MetadataResponseTopicPartition +} + +// fetchMappedMetadata provides a convenience type of working with metadata; +// this is garbage heavy, so it is only used in one off requests in this +// package. +func (cl *Client) fetchMappedMetadata(ctx context.Context, topics []string) (map[string]mappedMetadataTopic, error) { + _, meta, err := cl.fetchMetadata(ctx, false, topics) + if err != nil { + return nil, err + } + mapping := make(map[string]mappedMetadataTopic) + for _, topic := range meta.Topics { + t := mappedMetadataTopic{ + topic: topic, + mapping: make(map[int32]kmsg.MetadataResponseTopicPartition), + } + mapping[topic.Topic] = t + for _, partition := range topic.Partitions { + t.mapping[partition.Partition] = partition } + } + return mapping, nil +} + +// handles sharding ListOffsetsRequest +type listOffsetsSharder struct{ *Client } - // Merging the responses is pretty simple. We always keep the final version / throttle, - // but we expect both to be identical across all brokers. - merge = func(newKResp kmsg.Response) { - newResp := newKResp.(*kmsg.ListOffsetsResponse) - resp.Version = newResp.Version - resp.ThrottleMillis = newResp.ThrottleMillis +func (cl *listOffsetsSharder) shard(ctx context.Context, kreq kmsg.Request) ([]issueShard, bool, error) { + req := kreq.(*kmsg.ListOffsetsRequest) - for _, topic := range newResp.Topics { - respParts[topic.Topic] = append(respParts[topic.Topic], topic.Partitions...) + // For listing offsets, we need the broker leader for each partition we + // are listing. Thus, we first load metadata for the topics. + // + // Metadata loading performs retries; if we fail here, the we do not + // issue sharded requests. + var need []string + for _, topic := range req.Topics { + need = append(need, topic.Topic) + } + mapping, err := cl.fetchMappedMetadata(ctx, need) + if err != nil { + return nil, false, err + } + + brokerReqs := make(map[int32]map[string][]kmsg.ListOffsetsRequestTopicPartition) + unknowns := make(map[string][]kmsg.ListOffsetsRequestTopicPartition) + + // For any topic or partition that had an error load, we blindly issue + // a load to the first seed broker. We expect the list to fail, but it + // is the best we could do. + for _, topic := range req.Topics { + tmapping, exists := mapping[topic.Topic] + if err := kerr.ErrorForCode(tmapping.topic.ErrorCode); err != nil || !exists { + for _, partition := range topic.Partitions { + unknowns[topic.Topic] = append(unknowns[topic.Topic], partition) } + continue } + for _, partition := range topic.Partitions { + p, exists := tmapping.mapping[partition.Partition] + if !exists || kerr.ErrorForCode(p.ErrorCode) != nil { + unknowns[topic.Topic] = append(unknowns[topic.Topic], partition) + continue + } - // To finalize the response, we take our per-topic respParts and merge - // them into our final response. - finalize = func() { - for topic, parts := range respParts { - resp.Topics = append(resp.Topics, kmsg.ListOffsetsResponseTopic{ - Topic: topic, - Partitions: parts, - }) + brokerReq := brokerReqs[p.Leader] + if brokerReq == nil { + brokerReq = make(map[string][]kmsg.ListOffsetsRequestTopicPartition) + brokerReqs[p.Leader] = brokerReq } + brokerReq[topic.Topic] = append(brokerReq[topic.Topic], partition) } + } - case *kmsg.DeleteRecordsRequest: // similar to above, except types - resp := new(kmsg.DeleteRecordsResponse) - kresp = resp + if len(unknowns) > 0 { + brokerReqs[unknownSeedID(0)] = unknowns + } - reqParts := make(map[*broker]map[string][]kmsg.DeleteRecordsRequestTopicPartition) - respParts := make(map[string][]kmsg.DeleteRecordsResponseTopicPartition) + var issues []issueShard + for brokerID, brokerReq := range brokerReqs { + req := &kmsg.ListOffsetsRequest{ + ReplicaID: req.ReplicaID, + IsolationLevel: req.IsolationLevel, + } + for topic, parts := range brokerReq { + req.Topics = append(req.Topics, kmsg.ListOffsetsRequestTopic{ + Topic: topic, + Partitions: parts, + }) + } - for _, topic := range t.Topics { - topicPartitions := topics[topic.Topic].load() - for _, partition := range topic.Partitions { - topicPartition, exists := topicPartitions.all[partition.Partition] - if !exists { - respParts[topic.Topic] = append(respParts[topic.Topic], kmsg.DeleteRecordsResponseTopicPartition{ - Partition: partition.Partition, - ErrorCode: kerr.UnknownTopicOrPartition.Code, - }) - continue - } + issues = append(issues, issueShard{ + req: req, + broker: brokerID, + }) + } - broker := brokers[topicPartition.leader] - if topicPartition.loadErr != nil || broker == nil { - errCode := kerr.UnknownServerError.Code - if topicPartition.loadErr != nil { - if ke, ok := topicPartition.loadErr.(*kerr.Error); ok { - errCode = ke.Code - } - } - respParts[topic.Topic] = append(respParts[topic.Topic], kmsg.DeleteRecordsResponseTopicPartition{ - Partition: partition.Partition, - ErrorCode: errCode, - }) - continue - } + return issues, true, nil // this is reshardable +} - brokerReqParts := reqParts[broker] - if brokerReqParts == nil { - brokerReqParts = make(map[string][]kmsg.DeleteRecordsRequestTopicPartition) - reqParts[broker] = brokerReqParts - } - brokerReqParts[topic.Topic] = append(brokerReqParts[topic.Topic], partition) - } +func (cl *listOffsetsSharder) onResp(kreq kmsg.Response) {} // metadata could be stale, but no cleanup we can do + +func (cl *listOffsetsSharder) merge(sresps []ShardedResponse) (kmsg.Response, error) { + merged := new(kmsg.ListOffsetsResponse) + topics := make(map[string][]kmsg.ListOffsetsResponseTopicPartition) + + firstErr := firstErrMerger(sresps, func(kresp kmsg.Response) { + resp := kresp.(*kmsg.ListOffsetsResponse) + merged.Version = resp.Version + merged.ThrottleMillis = resp.ThrottleMillis + + for _, topic := range resp.Topics { + topics[topic.Topic] = append(topics[topic.Topic], topic.Partitions...) } + }) + for topic, partitions := range topics { + merged.Topics = append(merged.Topics, kmsg.ListOffsetsResponseTopic{ + Topic: topic, + Partitions: partitions, + }) + } + return merged, firstErr +} - for broker, brokerReqParts := range reqParts { - req := &kmsg.DeleteRecordsRequest{ - TimeoutMillis: t.TimeoutMillis, - } - for topic, parts := range brokerReqParts { - req.Topics = append(req.Topics, kmsg.DeleteRecordsRequestTopic{ - Topic: topic, - Partitions: parts, - }) +// handles sharding DescribeGroupsRequest +type describeGroupsSharder struct{ *Client } + +func (cl *describeGroupsSharder) shard(ctx context.Context, kreq kmsg.Request) ([]issueShard, bool, error) { + req := kreq.(*kmsg.DescribeGroupsRequest) + + coordinators, err := cl.loadCoordinators(true, coordinatorTypeGroup, req.Groups...) + if err != nil { + return nil, false, err + } + + brokerReqs := make(map[int32]*kmsg.DescribeGroupsRequest) + + for _, group := range req.Groups { + broker := coordinators[group] + brokerReq := brokerReqs[broker.meta.NodeID] + if brokerReq == nil { + brokerReq = &kmsg.DescribeGroupsRequest{ + IncludeAuthorizedOperations: req.IncludeAuthorizedOperations, } - broker2req[broker] = req + brokerReqs[broker.meta.NodeID] = brokerReq } - merge = func(newKResp kmsg.Response) { - newResp := newKResp.(*kmsg.DeleteRecordsResponse) - resp.Version = newResp.Version - resp.ThrottleMillis = newResp.ThrottleMillis + brokerReq.Groups = append(brokerReq.Groups, group) + } - for _, topic := range newResp.Topics { - respParts[topic.Topic] = append(respParts[topic.Topic], topic.Partitions...) - } + var issues []issueShard + for id, req := range brokerReqs { + issues = append(issues, issueShard{ + req: req, + broker: id, + }) + } + return issues, true, nil // this is reshardable +} + +func (cl *describeGroupsSharder) onResp(kresp kmsg.Response) { // cleanup any stale groups + resp := kresp.(*kmsg.DescribeGroupsResponse) + for i := range resp.Groups { + group := &resp.Groups[i] + err := kerr.ErrorForCode(group.ErrorCode) + cl.maybeDeleteStaleCoordinator(group.Group, coordinatorTypeGroup, err) + } +} + +func (cl *describeGroupsSharder) merge(sresps []ShardedResponse) (kmsg.Response, error) { + merged := new(kmsg.DescribeGroupsResponse) + + return merged, firstErrMerger(sresps, func(kresp kmsg.Response) { + resp := kresp.(*kmsg.DescribeGroupsResponse) + merged.Version = resp.Version + merged.ThrottleMillis = resp.ThrottleMillis + merged.Groups = append(merged.Groups, resp.Groups...) + }) +} + +// handles sharding ListGroupsRequest +type listGroupsSharder struct{ *Client } + +func (cl *listGroupsSharder) shard(ctx context.Context, kreq kmsg.Request) ([]issueShard, bool, error) { + if err := cl.fetchBrokerMetadata(ctx); err != nil { + return nil, false, err + } + + req := kreq.(*kmsg.ListGroupsRequest) + + var issues []issueShard + cl.brokersMu.RLock() + for _, broker := range cl.brokers { + if broker.meta.NodeID < 0 { + continue // we skip seed brokers } + myReq := *req + issues = append(issues, issueShard{ + req: &myReq, + broker: broker.meta.NodeID, + }) + } + cl.brokersMu.RUnlock() - finalize = func() { - for topic, parts := range respParts { - resp.Topics = append(resp.Topics, kmsg.DeleteRecordsResponseTopic{ - Topic: topic, - Partitions: parts, - }) - } + return issues, false, nil // we do NOT re-shard this request +} + +func (cl *listGroupsSharder) onResp(kresp kmsg.Response) {} // nothing to be done here + +func (cl *listGroupsSharder) merge(sresps []ShardedResponse) (kmsg.Response, error) { + merged := new(kmsg.ListGroupsResponse) + + return merged, firstErrMerger(sresps, func(kresp kmsg.Response) { + resp := kresp.(*kmsg.ListGroupsResponse) + merged.Version = resp.Version + merged.ThrottleMillis = resp.ThrottleMillis + if merged.ErrorCode == 0 { + merged.ErrorCode = resp.ErrorCode } + merged.Groups = append(merged.Groups, resp.Groups...) + }) +} - case *kmsg.OffsetForLeaderEpochRequest: // similar to above, except types - resp := new(kmsg.OffsetForLeaderEpochResponse) - kresp = resp +// handle sharding DeleteRecordsRequest +type deleteRecordsSharder struct{ *Client } - reqParts := make(map[*broker]map[string][]kmsg.OffsetForLeaderEpochRequestTopicPartition) - respParts := make(map[string][]kmsg.OffsetForLeaderEpochResponseTopicPartition) +func (cl *deleteRecordsSharder) shard(ctx context.Context, kreq kmsg.Request) ([]issueShard, bool, error) { + req := kreq.(*kmsg.DeleteRecordsRequest) - for _, topic := range t.Topics { - topicPartitions := topics[topic.Topic].load() - for _, partition := range topic.Partitions { - topicPartition, exists := topicPartitions.all[partition.Partition] - if !exists { - respParts[topic.Topic] = append(respParts[topic.Topic], kmsg.OffsetForLeaderEpochResponseTopicPartition{ - Partition: partition.Partition, - ErrorCode: kerr.UnknownTopicOrPartition.Code, - }) - continue - } + var need []string + for _, topic := range req.Topics { + need = append(need, topic.Topic) + } + mapping, err := cl.fetchMappedMetadata(ctx, need) + if err != nil { + return nil, false, err + } - broker := brokers[topicPartition.leader] - if topicPartition.loadErr != nil || broker == nil { - errCode := kerr.UnknownServerError.Code - if topicPartition.loadErr != nil { - if ke, ok := topicPartition.loadErr.(*kerr.Error); ok { - errCode = ke.Code - } - } - respParts[topic.Topic] = append(respParts[topic.Topic], kmsg.OffsetForLeaderEpochResponseTopicPartition{ - Partition: partition.Partition, - ErrorCode: errCode, - }) - continue - } + brokerReqs := make(map[int32]map[string][]kmsg.DeleteRecordsRequestTopicPartition) + unknowns := make(map[string][]kmsg.DeleteRecordsRequestTopicPartition) - brokerReqParts := reqParts[broker] - if brokerReqParts == nil { - brokerReqParts = make(map[string][]kmsg.OffsetForLeaderEpochRequestTopicPartition) - reqParts[broker] = brokerReqParts - } - brokerReqParts[topic.Topic] = append(brokerReqParts[topic.Topic], partition) + for _, topic := range req.Topics { + tmapping, exists := mapping[topic.Topic] + if err := kerr.ErrorForCode(tmapping.topic.ErrorCode); err != nil || !exists { + for _, partition := range topic.Partitions { + unknowns[topic.Topic] = append(unknowns[topic.Topic], partition) } + continue } - - for broker, brokerReqParts := range reqParts { - req := &kmsg.OffsetForLeaderEpochRequest{ - ReplicaID: t.ReplicaID, + for _, partition := range topic.Partitions { + p, exists := tmapping.mapping[partition.Partition] + if !exists || kerr.ErrorForCode(p.ErrorCode) != nil { + unknowns[topic.Topic] = append(unknowns[topic.Topic], partition) + continue } - for topic, parts := range brokerReqParts { - req.Topics = append(req.Topics, kmsg.OffsetForLeaderEpochRequestTopic{ - Topic: topic, - Partitions: parts, - }) + + brokerReq := brokerReqs[p.Leader] + if brokerReq == nil { + brokerReq = make(map[string][]kmsg.DeleteRecordsRequestTopicPartition) + brokerReqs[p.Leader] = brokerReq } - broker2req[broker] = req + brokerReq[topic.Topic] = append(brokerReq[topic.Topic], partition) } - merge = func(newKResp kmsg.Response) { - newResp := newKResp.(*kmsg.OffsetForLeaderEpochResponse) - resp.Version = newResp.Version - resp.ThrottleMillis = newResp.ThrottleMillis + } - for _, topic := range newResp.Topics { - respParts[topic.Topic] = append(respParts[topic.Topic], topic.Partitions...) - } + if len(unknowns) > 0 { + brokerReqs[unknownSeedID(0)] = unknowns + } + + var issues []issueShard + for brokerID, brokerReq := range brokerReqs { + req := &kmsg.DeleteRecordsRequest{ + TimeoutMillis: req.TimeoutMillis, + } + for topic, parts := range brokerReq { + req.Topics = append(req.Topics, kmsg.DeleteRecordsRequestTopic{ + Topic: topic, + Partitions: parts, + }) } - finalize = func() { - for topic, parts := range respParts { - resp.Topics = append(resp.Topics, kmsg.OffsetForLeaderEpochResponseTopic{ - Topic: topic, - Partitions: parts, - }) - } + issues = append(issues, issueShard{ + req: req, + broker: brokerID, + }) + } + + return issues, true, nil // this is reshardable +} + +func (cl *deleteRecordsSharder) onResp(kmsg.Response) {} // nothing to be done here + +func (cl *deleteRecordsSharder) merge(sresps []ShardedResponse) (kmsg.Response, error) { + merged := new(kmsg.DeleteRecordsResponse) + topics := make(map[string][]kmsg.DeleteRecordsResponseTopicPartition) + + firstErr := firstErrMerger(sresps, func(kresp kmsg.Response) { + resp := kresp.(*kmsg.DeleteRecordsResponse) + merged.Version = resp.Version + merged.ThrottleMillis = resp.ThrottleMillis + + for _, topic := range resp.Topics { + topics[topic.Topic] = append(topics[topic.Topic], topic.Partitions...) } + }) + for topic, partitions := range topics { + merged.Topics = append(merged.Topics, kmsg.DeleteRecordsResponseTopic{ + Topic: topic, + Partitions: partitions, + }) + } + return merged, firstErr +} - case *kmsg.AlterReplicaLogDirsRequest: // similar to above, except types - resp := new(kmsg.AlterReplicaLogDirsResponse) - kresp = resp +// handle sharding OffsetForLeaderEpochRequest +type offsetForLeaderEpochSharder struct{ *Client } - reqParts := make(map[*broker]map[string]map[string][]int32) // broker => dir => topic => partitions - respParts := make(map[string][]kmsg.AlterReplicaLogDirsResponseTopicPartition) +func (cl *offsetForLeaderEpochSharder) shard(ctx context.Context, kreq kmsg.Request) ([]issueShard, bool, error) { + req := kreq.(*kmsg.OffsetForLeaderEpochRequest) - for _, dir := range t.Dirs { - for _, topic := range dir.Topics { - topicPartitions := topics[topic.Topic].load() - for _, partition := range topic.Partitions { - topicPartition, exists := topicPartitions.all[partition] - if !exists { - respParts[topic.Topic] = append(respParts[topic.Topic], kmsg.AlterReplicaLogDirsResponseTopicPartition{ - Partition: partition, - ErrorCode: kerr.UnknownTopicOrPartition.Code, - }) - continue - } + var need []string + for _, topic := range req.Topics { + need = append(need, topic.Topic) + } + mapping, err := cl.fetchMappedMetadata(ctx, need) + if err != nil { + return nil, false, err + } - broker := brokers[topicPartition.leader] - if topicPartition.loadErr != nil || broker == nil { - errCode := kerr.UnknownServerError.Code - if topicPartition.loadErr != nil { - if ke, ok := topicPartition.loadErr.(*kerr.Error); ok { - errCode = ke.Code - } - } - respParts[topic.Topic] = append(respParts[topic.Topic], kmsg.AlterReplicaLogDirsResponseTopicPartition{ - Partition: partition, - ErrorCode: errCode, - }) - continue - } + brokerReqs := make(map[int32]map[string][]kmsg.OffsetForLeaderEpochRequestTopicPartition) + unknowns := make(map[string][]kmsg.OffsetForLeaderEpochRequestTopicPartition) - brokerReqParts := reqParts[broker] - if brokerReqParts == nil { - brokerReqParts = make(map[string]map[string][]int32) - reqParts[broker] = brokerReqParts - } - brokerDirReqParts := brokerReqParts[dir.Dir] - if brokerDirReqParts == nil { - brokerDirReqParts = make(map[string][]int32) - brokerReqParts[dir.Dir] = brokerDirReqParts - } - brokerDirReqParts[topic.Topic] = append(brokerDirReqParts[topic.Topic], partition) - } + for _, topic := range req.Topics { + tmapping, exists := mapping[topic.Topic] + if err := kerr.ErrorForCode(tmapping.topic.ErrorCode); err != nil || !exists { + for _, partition := range topic.Partitions { + unknowns[topic.Topic] = append(unknowns[topic.Topic], partition) } + continue } + for _, partition := range topic.Partitions { + p, exists := tmapping.mapping[partition.Partition] + if !exists || kerr.ErrorForCode(p.ErrorCode) != nil { + unknowns[topic.Topic] = append(unknowns[topic.Topic], partition) + continue + } - for broker, brokerReqParts := range reqParts { - req := new(kmsg.AlterReplicaLogDirsRequest) - for dir, topics := range brokerReqParts { - dirReq := kmsg.AlterReplicaLogDirsRequestDir{ - Dir: dir, - } - for topic, parts := range topics { - dirReq.Topics = append(dirReq.Topics, kmsg.AlterReplicaLogDirsRequestDirTopic{ - Topic: topic, - Partitions: parts, - }) - } - req.Dirs = append(req.Dirs, dirReq) + brokerReq := brokerReqs[p.Leader] + if brokerReq == nil { + brokerReq = make(map[string][]kmsg.OffsetForLeaderEpochRequestTopicPartition) + brokerReqs[p.Leader] = brokerReq } - broker2req[broker] = req + brokerReq[topic.Topic] = append(brokerReq[topic.Topic], partition) } - merge = func(newKResp kmsg.Response) { - newResp := newKResp.(*kmsg.AlterReplicaLogDirsResponse) - resp.Version = newResp.Version - resp.ThrottleMillis = newResp.ThrottleMillis + } - for _, topic := range newResp.Topics { - respParts[topic.Topic] = append(respParts[topic.Topic], topic.Partitions...) - } + if len(unknowns) > 0 { + brokerReqs[unknownSeedID(0)] = unknowns + } + + var issues []issueShard + for brokerID, brokerReq := range brokerReqs { + req := &kmsg.OffsetForLeaderEpochRequest{ + ReplicaID: req.ReplicaID, + } + for topic, parts := range brokerReq { + req.Topics = append(req.Topics, kmsg.OffsetForLeaderEpochRequestTopic{ + Topic: topic, + Partitions: parts, + }) } - finalize = func() { - for topic, parts := range respParts { - resp.Topics = append(resp.Topics, kmsg.AlterReplicaLogDirsResponseTopic{ - Topic: topic, - Partitions: parts, - }) - } + issues = append(issues, issueShard{ + req: req, + broker: brokerID, + }) + } + + return issues, true, nil // this is reshardable +} + +func (cl *offsetForLeaderEpochSharder) onResp(kmsg.Response) {} + +func (cl *offsetForLeaderEpochSharder) merge(sresps []ShardedResponse) (kmsg.Response, error) { + merged := new(kmsg.OffsetForLeaderEpochResponse) + topics := make(map[string][]kmsg.OffsetForLeaderEpochResponseTopicPartition) + + firstErr := firstErrMerger(sresps, func(kresp kmsg.Response) { + resp := kresp.(*kmsg.OffsetForLeaderEpochResponse) + merged.Version = resp.Version + merged.ThrottleMillis = resp.ThrottleMillis + + for _, topic := range resp.Topics { + topics[topic.Topic] = append(topics[topic.Topic], topic.Partitions...) } + }) + for topic, partitions := range topics { + merged.Topics = append(merged.Topics, kmsg.OffsetForLeaderEpochResponseTopic{ + Topic: topic, + Partitions: partitions, + }) + } + return merged, firstErr +} - // This case is similar to above, but because the resp is per _dir_, we - // do not make up a fake directory when we cannot load the topic. - // Types are a bit weirder here too. - case *kmsg.DescribeLogDirsRequest: - resp := new(kmsg.DescribeLogDirsResponse) - kresp = resp +// handle sharding DescribeConfigsRequest +type describeConfigsSharder struct{ *Client } + +func (cl *describeConfigsSharder) shard(ctx context.Context, kreq kmsg.Request) ([]issueShard, bool, error) { + req := kreq.(*kmsg.DescribeConfigsRequest) - reqParts := make(map[*broker]map[string][]int32) // broker => topic => partitions + brokerReqs := make(map[int32][]kmsg.DescribeConfigsRequestResource) + var any []kmsg.DescribeConfigsRequestResource - type respDir struct { - errCode int16 - topics map[string][]kmsg.DescribeLogDirsResponseDirTopicPartition // topic => partitions + for i := range req.Resources { + resource := req.Resources[i] + switch resource.ResourceType { + case kmsg.ConfigResourceTypeBroker: + case kmsg.ConfigResourceTypeBrokerLogger: + default: + any = append(any, resource) + continue + } + id, err := strconv.ParseInt(resource.ResourceName, 10, 32) + if err != nil || id < 0 { + any = append(any, resource) + continue } - respParts := make(map[string]respDir) + brokerReqs[int32(id)] = append(brokerReqs[int32(id)], resource) + } - for _, topic := range t.Topics { - topicPartitions := topics[topic.Topic].load() - for _, partition := range topic.Partitions { - topicPartition, exists := topicPartitions.all[partition] - if !exists { - continue - } + var issues []issueShard + for brokerID, brokerReq := range brokerReqs { + req := &kmsg.DescribeConfigsRequest{ + Resources: brokerReq, + IncludeSynonyms: req.IncludeSynonyms, + IncludeDocumentation: req.IncludeDocumentation, + } + + issues = append(issues, issueShard{ + req: req, + broker: brokerID, + }) + } + + if len(any) > 0 { + issues = append(issues, issueShard{ + req: &kmsg.DescribeConfigsRequest{ + Resources: any, + IncludeSynonyms: req.IncludeSynonyms, + IncludeDocumentation: req.IncludeDocumentation, + }, + any: true, + }) + } + + return issues, false, nil // this is not reshardable, but the any block can go anywhere +} + +func (cl *describeConfigsSharder) onResp(kmsg.Response) {} + +func (cl *describeConfigsSharder) merge(sresps []ShardedResponse) (kmsg.Response, error) { + merged := new(kmsg.DescribeConfigsResponse) + + return merged, firstErrMerger(sresps, func(kresp kmsg.Response) { + resp := kresp.(*kmsg.DescribeConfigsResponse) + merged.Version = resp.Version + merged.ThrottleMillis = resp.ThrottleMillis + merged.Resources = append(merged.Resources, resp.Resources...) + }) +} + +// handle sharding AlterConfigsRequest +type alterConfigsSharder struct{ *Client } + +func (cl *alterConfigsSharder) shard(ctx context.Context, kreq kmsg.Request) ([]issueShard, bool, error) { + req := kreq.(*kmsg.AlterConfigsRequest) + + brokerReqs := make(map[int32][]kmsg.AlterConfigsRequestResource) + var any []kmsg.AlterConfigsRequestResource + + for i := range req.Resources { + resource := req.Resources[i] + switch resource.ResourceType { + case kmsg.ConfigResourceTypeBroker: + case kmsg.ConfigResourceTypeBrokerLogger: + default: + any = append(any, resource) + continue + } + id, err := strconv.ParseInt(resource.ResourceName, 10, 32) + if err != nil || id < 0 { + any = append(any, resource) + continue + } + brokerReqs[int32(id)] = append(brokerReqs[int32(id)], resource) + } - broker := brokers[topicPartition.leader] - if topicPartition.loadErr != nil || broker == nil { + var issues []issueShard + for brokerID, brokerReq := range brokerReqs { + req := &kmsg.AlterConfigsRequest{ + Resources: brokerReq, + ValidateOnly: req.ValidateOnly, + } + + issues = append(issues, issueShard{ + req: req, + broker: brokerID, + }) + } + + if len(any) > 0 { + issues = append(issues, issueShard{ + req: &kmsg.AlterConfigsRequest{ + Resources: any, + ValidateOnly: req.ValidateOnly, + }, + any: true, + }) + } + + return issues, false, nil // this is not reshardable, but the any block can go anywhere +} + +func (cl *alterConfigsSharder) onResp(kmsg.Response) {} + +func (cl *alterConfigsSharder) merge(sresps []ShardedResponse) (kmsg.Response, error) { + merged := new(kmsg.AlterConfigsResponse) + + return merged, firstErrMerger(sresps, func(kresp kmsg.Response) { + resp := kresp.(*kmsg.AlterConfigsResponse) + merged.Version = resp.Version + merged.ThrottleMillis = resp.ThrottleMillis + merged.Resources = append(merged.Resources, resp.Resources...) + }) +} + +// handles sharding AlterReplicaLogDirsRequest +type alterReplicaLogDirsSharder struct{ *Client } + +func (cl *alterReplicaLogDirsSharder) shard(ctx context.Context, kreq kmsg.Request) ([]issueShard, bool, error) { + req := kreq.(*kmsg.AlterReplicaLogDirsRequest) + + needMap := make(map[string]struct{}) + for _, dir := range req.Dirs { + for _, topic := range dir.Topics { + needMap[topic.Topic] = struct{}{} + } + } + var need []string + for topic := range needMap { + need = append(need, topic) + } + mapping, err := cl.fetchMappedMetadata(ctx, need) + if err != nil { + return nil, false, err + } + + brokerReqs := make(map[int32]map[string]map[string][]int32) // broker => dir => topic => partitions + unknowns := make(map[string]map[string][]int32) // dir => topic => partitions + + addBroker := func(broker int32, dir, topic string, partition int32) { + brokerDirs := brokerReqs[broker] + if brokerDirs == nil { + brokerDirs = make(map[string]map[string][]int32) + brokerReqs[broker] = brokerDirs + } + dirTopics := brokerDirs[dir] + if dirTopics == nil { + dirTopics = make(map[string][]int32) + brokerDirs[dir] = dirTopics + } + dirTopics[topic] = append(dirTopics[topic], partition) + } + + addUnknown := func(dir, topic string, partition int32) { + dirTopics := unknowns[dir] + if dirTopics == nil { + dirTopics = make(map[string][]int32) + unknowns[dir] = dirTopics + } + dirTopics[topic] = append(dirTopics[topic], partition) + } + + for _, dir := range req.Dirs { + for _, topic := range dir.Topics { + tmapping, exists := mapping[topic.Topic] + if err := kerr.ErrorForCode(tmapping.topic.ErrorCode); err != nil || !exists { + for _, partition := range topic.Partitions { + addUnknown(dir.Dir, topic.Topic, partition) + } + continue + } + for _, partition := range topic.Partitions { + p, exists := tmapping.mapping[partition] + if !exists || kerr.ErrorForCode(p.ErrorCode) != nil { + addUnknown(dir.Dir, topic.Topic, partition) continue } - brokerReqParts := reqParts[broker] - if brokerReqParts == nil { - brokerReqParts = make(map[string][]int32) - reqParts[broker] = brokerReqParts + for _, replica := range p.Replicas { + addBroker(replica, dir.Dir, topic.Topic, partition) } - brokerReqParts[topic.Topic] = append(brokerReqParts[topic.Topic], partition) } } + } + + if len(unknowns) > 0 { + brokerReqs[unknownSeedID(0)] = unknowns + } - for broker, brokerReqParts := range reqParts { - req := new(kmsg.DescribeLogDirsRequest) - for topic, parts := range brokerReqParts { - req.Topics = append(req.Topics, kmsg.DescribeLogDirsRequestTopic{ + var issues []issueShard + for brokerID, brokerReq := range brokerReqs { + req := new(kmsg.AlterReplicaLogDirsRequest) + for dir, topics := range brokerReq { + rd := kmsg.AlterReplicaLogDirsRequestDir{ + Dir: dir, + } + for topic, partitions := range topics { + rd.Topics = append(rd.Topics, kmsg.AlterReplicaLogDirsRequestDirTopic{ Topic: topic, - Partitions: parts, + Partitions: partitions, }) } - broker2req[broker] = req + req.Dirs = append(req.Dirs, rd) } - // If the request has nil topics, that means describe all. - // We need to fan that out. - if t.Topics == nil { - for _, broker := range brokers { - if broker.meta.NodeID < 0 { // do not use seed brokers - continue - } - broker2req[broker] = new(kmsg.DescribeLogDirsRequest) - } + issues = append(issues, issueShard{ + req: req, + broker: brokerID, + }) + } + + return issues, true, nil // this is reshardable +} + +func (cl *alterReplicaLogDirsSharder) onResp(kmsg.Response) {} + +// merge does not make sense for this function, but we provide a one anyway. +func (cl *alterReplicaLogDirsSharder) merge(sresps []ShardedResponse) (kmsg.Response, error) { + merged := new(kmsg.AlterReplicaLogDirsResponse) + topics := make(map[string][]kmsg.AlterReplicaLogDirsResponseTopicPartition) + + firstErr := firstErrMerger(sresps, func(kresp kmsg.Response) { + resp := kresp.(*kmsg.AlterReplicaLogDirsResponse) + merged.Version = resp.Version + merged.ThrottleMillis = resp.ThrottleMillis + + for _, topic := range resp.Topics { + topics[topic.Topic] = append(topics[topic.Topic], topic.Partitions...) } + }) + for topic, partitions := range topics { + merged.Topics = append(merged.Topics, kmsg.AlterReplicaLogDirsResponseTopic{ + Topic: topic, + Partitions: partitions, + }) + } + return merged, firstErr +} - merge = func(newKResp kmsg.Response) { - newResp := newKResp.(*kmsg.DescribeLogDirsResponse) - resp.Version = newResp.Version - resp.ThrottleMillis = newResp.ThrottleMillis +// handles sharding DescribeLogDirsRequest +type describeLogDirsSharder struct{ *Client } - for _, dir := range newResp.Dirs { - existing := respParts[dir.Dir] - if existing.topics == nil { - existing.topics = make(map[string][]kmsg.DescribeLogDirsResponseDirTopicPartition) - } - if existing.errCode == 0 { - existing.errCode = dir.ErrorCode - } - respParts[dir.Dir] = existing - for _, topic := range dir.Topics { - existing.topics[topic.Topic] = append(existing.topics[topic.Topic], topic.Partitions...) - } +func (cl *describeLogDirsSharder) shard(ctx context.Context, kreq kmsg.Request) ([]issueShard, bool, error) { + req := kreq.(*kmsg.DescribeLogDirsRequest) + + var need []string + for _, topic := range req.Topics { + need = append(need, topic.Topic) + } + mapping, err := cl.fetchMappedMetadata(ctx, need) + if err != nil { + return nil, false, err + } + + brokerReqs := make(map[int32]map[string][]int32) + unknowns := make(map[string][]int32) + + for _, topic := range req.Topics { + tmapping, exists := mapping[topic.Topic] + if err := kerr.ErrorForCode(tmapping.topic.ErrorCode); err != nil || !exists { + for _, partition := range topic.Partitions { + unknowns[topic.Topic] = append(unknowns[topic.Topic], partition) } + continue } + for _, partition := range topic.Partitions { + p, exists := tmapping.mapping[partition] + if !exists || kerr.ErrorForCode(p.ErrorCode) != nil { + unknowns[topic.Topic] = append(unknowns[topic.Topic], partition) + continue + } - finalize = func() { - for dir, inner := range respParts { - dirResp := kmsg.DescribeLogDirsResponseDir{ - ErrorCode: inner.errCode, - Dir: dir, - } - for topic, parts := range inner.topics { - dirResp.Topics = append(dirResp.Topics, kmsg.DescribeLogDirsResponseDirTopic{ - Topic: topic, - Partitions: parts, - }) + for _, replica := range p.Replicas { + brokerReq := brokerReqs[replica] + if brokerReq == nil { + brokerReq = make(map[string][]int32) + brokerReqs[replica] = brokerReq } - resp.Dirs = append(resp.Dirs, dirResp) + brokerReq[topic.Topic] = append(brokerReq[topic.Topic], partition) } } } - cl.brokersMu.RUnlock() + if len(unknowns) > 0 { + brokerReqs[unknownSeedID(0)] = unknowns + } - // Now with everything setup, we concurrently request all brokers, - // merge the responses, and finalize! - var ( - mergeMu sync.Mutex - wg sync.WaitGroup - firstErr error - errs int - ) - for broker, req := range broker2req { - wg.Add(1) - myBroker, myReq := broker, req - go func() { - defer wg.Done() + var issues []issueShard + for brokerID, brokerReq := range brokerReqs { + req := new(kmsg.DescribeLogDirsRequest) + for topic, parts := range brokerReq { + req.Topics = append(req.Topics, kmsg.DescribeLogDirsRequestTopic{ + Topic: topic, + Partitions: parts, + }) + } - resp, err := myBroker.waitResp(ctx, myReq) + issues = append(issues, issueShard{ + req: req, + broker: brokerID, + }) + } - mergeMu.Lock() - defer mergeMu.Unlock() + return issues, true, nil // this is reshardable +} - if err != nil { - errs++ - if firstErr == nil { - firstErr = err - } - return +func (cl *describeLogDirsSharder) onResp(kmsg.Response) {} + +// merge does not make sense for this function, but we provide one anyway. +// We lose the error code for directories. +func (cl *describeLogDirsSharder) merge(sresps []ShardedResponse) (kmsg.Response, error) { + merged := new(kmsg.DescribeLogDirsResponse) + dirs := make(map[string]map[string][]kmsg.DescribeLogDirsResponseDirTopicPartition) + + firstErr := firstErrMerger(sresps, func(kresp kmsg.Response) { + resp := kresp.(*kmsg.DescribeLogDirsResponse) + merged.Version = resp.Version + merged.ThrottleMillis = resp.ThrottleMillis + + for _, dir := range resp.Dirs { + mergeDir := dirs[dir.Dir] + if mergeDir == nil { + mergeDir = make(map[string][]kmsg.DescribeLogDirsResponseDirTopicPartition) + dirs[dir.Dir] = mergeDir } - merge(resp) - }() + for _, topic := range dir.Topics { + mergeDir[topic.Topic] = append(mergeDir[topic.Topic], topic.Partitions...) + } + } + }) + for dir, topics := range dirs { + md := kmsg.DescribeLogDirsResponseDir{ + Dir: dir, + } + for topic, partitions := range topics { + md.Topics = append(md.Topics, kmsg.DescribeLogDirsResponseDirTopic{ + Topic: topic, + Partitions: partitions, + }) + } + merged.Dirs = append(merged.Dirs, md) } - wg.Wait() + return merged, firstErr +} - if errs == len(broker2req) { - return kresp, firstErr +// handles sharding DeleteGroupsRequest +type deleteGroupsSharder struct{ *Client } + +func (cl *deleteGroupsSharder) shard(ctx context.Context, kreq kmsg.Request) ([]issueShard, bool, error) { + req := kreq.(*kmsg.DeleteGroupsRequest) + + coordinators, err := cl.loadCoordinators(true, coordinatorTypeGroup, req.Groups...) + if err != nil { + return nil, false, err } - finalize() + brokerReqs := make(map[int32]*kmsg.DeleteGroupsRequest) - return kresp, nil -} + for _, group := range req.Groups { + broker := coordinators[group] + brokerReq := brokerReqs[broker.meta.NodeID] + if brokerReq == nil { + brokerReq = new(kmsg.DeleteGroupsRequest) + brokerReqs[broker.meta.NodeID] = brokerReq + } + brokerReq.Groups = append(brokerReq.Groups, group) + } -// Broker pairs a broker ID with a client to directly issue requests to a -// specific broker. -type Broker struct { - id int32 - cl *Client + var issues []issueShard + for id, req := range brokerReqs { + issues = append(issues, issueShard{ + req: req, + broker: id, + }) + } + return issues, true, nil // this is reshardable } -// Request issues a request to a broker. If the broker does not exist in the -// client, this returns ErrUnknownBroker. Requests are not retried. -// -// The passed context can be used to cancel a request and return early. -// Note that if the request is not canceled before it is written to Kafka, -// you may just end up canceling and not receiving the response to what Kafka -// inevitably does. -func (b *Broker) Request(ctx context.Context, req kmsg.Request) (kmsg.Response, error) { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - var resp kmsg.Response - var err error - done := make(chan struct{}) - go func() { - defer close(done) - resp, err = b.request(ctx, req) - }() - select { - case <-done: - return resp, err - case <-ctx.Done(): - return nil, ctx.Err() - case <-b.cl.ctx.Done(): - return nil, b.cl.ctx.Err() +func (cl *deleteGroupsSharder) onResp(kresp kmsg.Response) { + resp := kresp.(*kmsg.DeleteGroupsResponse) + for i := range resp.Groups { + group := &resp.Groups[i] + err := kerr.ErrorForCode(group.ErrorCode) + cl.maybeDeleteStaleCoordinator(group.Group, coordinatorTypeGroup, err) } } -// request is the logic for Request. -func (b *Broker) request(ctx context.Context, req kmsg.Request) (kmsg.Response, error) { - b.cl.brokersMu.RLock() - br, exists := b.cl.brokers[b.id] - b.cl.brokersMu.RUnlock() +func (cl *deleteGroupsSharder) merge(sresps []ShardedResponse) (kmsg.Response, error) { + merged := new(kmsg.DeleteGroupsResponse) - if !exists { - // If the broker does not exist, we try once to update brokers. - if err := b.cl.fetchBrokerMetadata(ctx); err == nil { - b.cl.brokersMu.RLock() - br, exists = b.cl.brokers[b.id] - b.cl.brokersMu.RUnlock() - if !exists { - return nil, ErrUnknownBroker - } - } else { - return nil, ErrUnknownBroker + return merged, firstErrMerger(sresps, func(kresp kmsg.Response) { + resp := kresp.(*kmsg.DeleteGroupsResponse) + merged.Version = resp.Version + merged.ThrottleMillis = resp.ThrottleMillis + merged.Groups = append(merged.Groups, resp.Groups...) + }) +} + +// handle sharding IncrementalAlterConfigsRequest +type incrementalAlterConfigsSharder struct{ *Client } + +func (cl *incrementalAlterConfigsSharder) shard(ctx context.Context, kreq kmsg.Request) ([]issueShard, bool, error) { + req := kreq.(*kmsg.IncrementalAlterConfigsRequest) + + brokerReqs := make(map[int32][]kmsg.IncrementalAlterConfigsRequestResource) + var any []kmsg.IncrementalAlterConfigsRequestResource + + for i := range req.Resources { + resource := req.Resources[i] + switch resource.ResourceType { + case kmsg.ConfigResourceTypeBroker: + case kmsg.ConfigResourceTypeBrokerLogger: + default: + any = append(any, resource) + continue } + id, err := strconv.ParseInt(resource.ResourceName, 10, 32) + if err != nil || id < 0 { + any = append(any, resource) + continue + } + brokerReqs[int32(id)] = append(brokerReqs[int32(id)], resource) } - return br.waitResp(ctx, req) + var issues []issueShard + for brokerID, brokerReq := range brokerReqs { + req := &kmsg.IncrementalAlterConfigsRequest{ + Resources: brokerReq, + ValidateOnly: req.ValidateOnly, + } + + issues = append(issues, issueShard{ + req: req, + broker: brokerID, + }) + } + + if len(any) > 0 { + issues = append(issues, issueShard{ + req: &kmsg.IncrementalAlterConfigsRequest{ + Resources: any, + ValidateOnly: req.ValidateOnly, + }, + any: true, + }) + } + + return issues, false, nil // this is not reshardable, but the any block can go anywhere +} + +func (cl *incrementalAlterConfigsSharder) onResp(kmsg.Response) {} + +func (cl *incrementalAlterConfigsSharder) merge(sresps []ShardedResponse) (kmsg.Response, error) { + merged := new(kmsg.IncrementalAlterConfigsResponse) + + return merged, firstErrMerger(sresps, func(kresp kmsg.Response) { + resp := kresp.(*kmsg.IncrementalAlterConfigsResponse) + merged.Version = resp.Version + merged.ThrottleMillis = resp.ThrottleMillis + merged.Resources = append(merged.Resources, resp.Resources...) + }) } diff --git a/pkg/kgo/metadata.go b/pkg/kgo/metadata.go index 03b6ae3e..8cbb01ff 100644 --- a/pkg/kgo/metadata.go +++ b/pkg/kgo/metadata.go @@ -223,7 +223,7 @@ func (cl *Client) fetchTopicMetadata(reqTopics []string) (map[string]*topicParti all := cl.consumer.typ == consumerTypeDirect && cl.consumer.direct.regexTopics || cl.consumer.typ == consumerTypeGroup && cl.consumer.group.regexTopics cl.consumer.mu.Unlock() - meta, err := cl.fetchMetadata(cl.ctx, all, reqTopics) + _, meta, err := cl.fetchMetadata(cl.ctx, all, reqTopics) if err != nil { return nil, all, err }