From 2eb27c653a4cdb815bf366894a2b87a3555ee50b Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Sun, 6 Sep 2020 23:46:04 -0600 Subject: [PATCH] client.Request: shard DeleteRecords, {AlterReplica,Describe}LogDirs Turns out, these requests need to be sharded too. This properly fixes issuing these requests based on the minor break introduced in 4ea4c4297e. --- pkg/kgo/client.go | 330 +++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 313 insertions(+), 17 deletions(-) diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index 03782416..90024b7c 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -429,9 +429,11 @@ func (cl *Client) Close() { // coordinator. However, if the request is an init producer ID request and the // request has no transactional ID, the request goes to any broker. // -// If the request is a ListOffsets request or OffsetForLeaderEpoch request, -// this will properly split the request to send partitions to the appropriate -// broker. +// If the request is an of a ListOffsets, DeleteRecords, OffsetForLeaderEpoch, +// AlterReplicaLogDirs, or DescribeLogDirs request, this will properly split +// the request to send partitions to the appropriate broker. If you want to +// ensure the request is not split and instead sent directly to a single +// broker, use the Broker function. // // If the request is a ListGroups request, this will send ListGroups to every // known broker after a broker metadata lookup. The first error code of any @@ -493,14 +495,19 @@ start: resp, err = cl.handleCoordinatorReq(ctx, groupReq, coordinatorTypeGroup) } else if txnReq, isTxnReq := req.(kmsg.TxnCoordinatorRequest); isTxnReq { resp, err = cl.handleCoordinatorReq(ctx, txnReq, coordinatorTypeTxn) - } else if listReq, ok := req.(*kmsg.ListOffsetsRequest); ok { - resp, err = cl.handleListOrEpochReq(ctx, listReq) - } else if offsetEpochReq, ok := req.(*kmsg.OffsetForLeaderEpochRequest); ok { - resp, err = cl.handleListOrEpochReq(ctx, offsetEpochReq) - } else if listGroupsReq, ok := req.(*kmsg.ListGroupsRequest); ok { - resp, err = cl.handleListGroupsReq(ctx, listGroupsReq) } else { - resp, err = cl.broker().waitResp(ctx, req) + switch t := req.(type) { + case *kmsg.ListOffsetsRequest, + *kmsg.DeleteRecordsRequest, + *kmsg.OffsetForLeaderEpochRequest, + *kmsg.AlterReplicaLogDirsRequest, + *kmsg.DescribeLogDirsRequest: + resp, err = cl.handleShardedReq(ctx, req) + case *kmsg.ListGroupsRequest: + resp, err = cl.handleListGroupsReq(ctx, t) + default: + resp, err = cl.broker().waitResp(ctx, req) + } } if err != nil { @@ -1022,11 +1029,17 @@ func (cl *Client) handleListGroupsReq(ctx context.Context, req *kmsg.ListGroupsR return &mergeResp, nil } -// handleListOrEpochReq is simple-in-theory function that is long due to types. -// This simply sends all partitions of a list offset request or offset for -// leader epoch request to the appropriate brokers and then merges the -// response. -func (cl *Client) handleListOrEpochReq(ctx context.Context, req kmsg.Request) (kmsg.Response, error) { +// handleShardedReq is simple-in-theory function that is long due to types. +// This simply sends all partitions of a broker-sharded request to the +// appropriate brokers and then merges the response. +// +// Handled: +// - list offsets (key 2) +// - delete records request (key 21) +// - offset for leader epoch (key 23) +// - alter replica log dirs (key 34) +// - describe log dirs (key 35) +func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) (kmsg.Response, error) { // First, pull out the topics from either request and set them as // topics we need to load metadata for. var needTopics []string @@ -1035,10 +1048,24 @@ func (cl *Client) handleListOrEpochReq(ctx context.Context, req kmsg.Request) (k for _, topic := range t.Topics { needTopics = append(needTopics, topic.Topic) } + case *kmsg.DeleteRecordsRequest: + for _, topic := range t.Topics { + needTopics = append(needTopics, topic.Topic) + } case *kmsg.OffsetForLeaderEpochRequest: for _, topic := range t.Topics { needTopics = append(needTopics, topic.Topic) } + case *kmsg.AlterReplicaLogDirsRequest: + for _, dir := range t.Dirs { + for _, topic := range dir.Topics { + needTopics = append(needTopics, topic.Topic) + } + } + case *kmsg.DescribeLogDirsRequest: + for _, topic := range t.Topics { + needTopics = append(needTopics, topic.Topic) + } } cl.topicsMu.Lock() topics := cl.cloneTopics() @@ -1085,9 +1112,13 @@ func (cl *Client) handleListOrEpochReq(ctx context.Context, req kmsg.Request) (k reqParts := make(map[*broker]map[string][]kmsg.ListOffsetsRequestTopicPartition) respParts := make(map[string][]kmsg.ListOffsetsResponseTopicPartition) + // Over all the req topics, for _, topic := range t.Topics { topicPartitions := topics[topic.Topic].load() + // Over each topic's partitions, for _, partition := range topic.Partitions { + // if we could not load the metadata for this partition, we save + // in the resp UnknownTopicOrPartition, topicPartition, exists := topicPartitions.all[partition.Partition] if !exists { respParts[topic.Topic] = append(respParts[topic.Topic], kmsg.ListOffsetsResponseTopicPartition{ @@ -1097,6 +1128,8 @@ func (cl *Client) handleListOrEpochReq(ctx context.Context, req kmsg.Request) (k continue } + // or, if we could load the metadata, but the load err is non-nil + // or the broker is nil, we save an error, broker := brokers[topicPartition.leader] if topicPartition.loadErr != nil || broker == nil { errCode := kerr.UnknownServerError.Code @@ -1112,6 +1145,7 @@ func (cl *Client) handleListOrEpochReq(ctx context.Context, req kmsg.Request) (k continue } + // otherwise, for this broker, we ask for this partition. brokerReqParts := reqParts[broker] if brokerReqParts == nil { brokerReqParts = make(map[string][]kmsg.ListOffsetsRequestTopicPartition) @@ -1121,6 +1155,8 @@ func (cl *Client) handleListOrEpochReq(ctx context.Context, req kmsg.Request) (k } } + // Now, over each req part (per broker request), we initialize our + // request for the broker. for broker, brokerReqParts := range reqParts { req := &kmsg.ListOffsetsRequest{ ReplicaID: t.ReplicaID, @@ -1134,6 +1170,9 @@ func (cl *Client) handleListOrEpochReq(ctx context.Context, req kmsg.Request) (k } broker2req[broker] = req } + + // Merging the responses is pretty simple. We always keep the final version / throttle, + // but we expect both to be identical across all brokers. merge = func(newKResp kmsg.Response) { newResp := newKResp.(*kmsg.ListOffsetsResponse) resp.Version = newResp.Version @@ -1144,6 +1183,8 @@ func (cl *Client) handleListOrEpochReq(ctx context.Context, req kmsg.Request) (k } } + // To finalize the response, we take our per-topic respParts and merge + // them into our final response. finalize = func() { for topic, parts := range respParts { resp.Topics = append(resp.Topics, kmsg.ListOffsetsResponseTopic{ @@ -1153,8 +1194,81 @@ func (cl *Client) handleListOrEpochReq(ctx context.Context, req kmsg.Request) (k } } - // Outside of type swapping, this case is the same as the last - case *kmsg.OffsetForLeaderEpochRequest: + case *kmsg.DeleteRecordsRequest: // similar to above, except types + resp := new(kmsg.DeleteRecordsResponse) + kresp = resp + + reqParts := make(map[*broker]map[string][]kmsg.DeleteRecordsRequestTopicPartition) + respParts := make(map[string][]kmsg.DeleteRecordsResponseTopicPartition) + + for _, topic := range t.Topics { + topicPartitions := topics[topic.Topic].load() + for _, partition := range topic.Partitions { + topicPartition, exists := topicPartitions.all[partition.Partition] + if !exists { + respParts[topic.Topic] = append(respParts[topic.Topic], kmsg.DeleteRecordsResponseTopicPartition{ + Partition: partition.Partition, + ErrorCode: kerr.UnknownTopicOrPartition.Code, + }) + continue + } + + broker := brokers[topicPartition.leader] + if topicPartition.loadErr != nil || broker == nil { + errCode := kerr.UnknownServerError.Code + if topicPartition.loadErr != nil { + if ke, ok := topicPartition.loadErr.(*kerr.Error); ok { + errCode = ke.Code + } + } + respParts[topic.Topic] = append(respParts[topic.Topic], kmsg.DeleteRecordsResponseTopicPartition{ + Partition: partition.Partition, + ErrorCode: errCode, + }) + continue + } + + brokerReqParts := reqParts[broker] + if brokerReqParts == nil { + brokerReqParts = make(map[string][]kmsg.DeleteRecordsRequestTopicPartition) + reqParts[broker] = brokerReqParts + } + brokerReqParts[topic.Topic] = append(brokerReqParts[topic.Topic], partition) + } + } + + for broker, brokerReqParts := range reqParts { + req := &kmsg.DeleteRecordsRequest{ + TimeoutMillis: t.TimeoutMillis, + } + for topic, parts := range brokerReqParts { + req.Topics = append(req.Topics, kmsg.DeleteRecordsRequestTopic{ + Topic: topic, + Partitions: parts, + }) + } + broker2req[broker] = req + } + merge = func(newKResp kmsg.Response) { + newResp := newKResp.(*kmsg.DeleteRecordsResponse) + resp.Version = newResp.Version + resp.ThrottleMillis = newResp.ThrottleMillis + + for _, topic := range newResp.Topics { + respParts[topic.Topic] = append(respParts[topic.Topic], topic.Partitions...) + } + } + + finalize = func() { + for topic, parts := range respParts { + resp.Topics = append(resp.Topics, kmsg.DeleteRecordsResponseTopic{ + Topic: topic, + Partitions: parts, + }) + } + } + + case *kmsg.OffsetForLeaderEpochRequest: // similar to above, except types resp := new(kmsg.OffsetForLeaderEpochResponse) kresp = resp @@ -1227,10 +1341,192 @@ func (cl *Client) handleListOrEpochReq(ctx context.Context, req kmsg.Request) (k }) } } + + case *kmsg.AlterReplicaLogDirsRequest: // similar to above, except types + resp := new(kmsg.AlterReplicaLogDirsResponse) + kresp = resp + + reqParts := make(map[*broker]map[string]map[string][]int32) // broker => dir => topic => partitions + respParts := make(map[string][]kmsg.AlterReplicaLogDirsResponseTopicPartition) + + for _, dir := range t.Dirs { + for _, topic := range dir.Topics { + topicPartitions := topics[topic.Topic].load() + for _, partition := range topic.Partitions { + topicPartition, exists := topicPartitions.all[partition] + if !exists { + respParts[topic.Topic] = append(respParts[topic.Topic], kmsg.AlterReplicaLogDirsResponseTopicPartition{ + Partition: partition, + ErrorCode: kerr.UnknownTopicOrPartition.Code, + }) + continue + } + + broker := brokers[topicPartition.leader] + if topicPartition.loadErr != nil || broker == nil { + errCode := kerr.UnknownServerError.Code + if topicPartition.loadErr != nil { + if ke, ok := topicPartition.loadErr.(*kerr.Error); ok { + errCode = ke.Code + } + } + respParts[topic.Topic] = append(respParts[topic.Topic], kmsg.AlterReplicaLogDirsResponseTopicPartition{ + Partition: partition, + ErrorCode: errCode, + }) + continue + } + + brokerReqParts := reqParts[broker] + if brokerReqParts == nil { + brokerReqParts = make(map[string]map[string][]int32) + reqParts[broker] = brokerReqParts + } + brokerDirReqParts := brokerReqParts[dir.Dir] + if brokerDirReqParts == nil { + brokerDirReqParts = make(map[string][]int32) + brokerReqParts[dir.Dir] = brokerDirReqParts + } + brokerDirReqParts[topic.Topic] = append(brokerDirReqParts[topic.Topic], partition) + } + } + } + + for broker, brokerReqParts := range reqParts { + req := new(kmsg.AlterReplicaLogDirsRequest) + for dir, topics := range brokerReqParts { + dirReq := kmsg.AlterReplicaLogDirsRequestDir{ + Dir: dir, + } + for topic, parts := range topics { + dirReq.Topics = append(dirReq.Topics, kmsg.AlterReplicaLogDirsRequestDirTopic{ + Topic: topic, + Partitions: parts, + }) + } + req.Dirs = append(req.Dirs, dirReq) + } + broker2req[broker] = req + } + merge = func(newKResp kmsg.Response) { + newResp := newKResp.(*kmsg.AlterReplicaLogDirsResponse) + resp.Version = newResp.Version + resp.ThrottleMillis = newResp.ThrottleMillis + + for _, topic := range newResp.Topics { + respParts[topic.Topic] = append(respParts[topic.Topic], topic.Partitions...) + } + } + + finalize = func() { + for topic, parts := range respParts { + resp.Topics = append(resp.Topics, kmsg.AlterReplicaLogDirsResponseTopic{ + Topic: topic, + Partitions: parts, + }) + } + } + + // This case is similar to above, but because the resp is per _dir_, we + // do not make up a fake directory when we cannot load the topic. + // Types are a bit weirder here too. + case *kmsg.DescribeLogDirsRequest: + resp := new(kmsg.DescribeLogDirsResponse) + kresp = resp + + reqParts := make(map[*broker]map[string][]int32) // broker => topic => partitions + + type respDir struct { + errCode int16 + topics map[string][]kmsg.DescribeLogDirsResponseDirTopicPartition // topic => partitions + } + respParts := make(map[string]respDir) + + for _, topic := range t.Topics { + topicPartitions := topics[topic.Topic].load() + for _, partition := range topic.Partitions { + topicPartition, exists := topicPartitions.all[partition] + if !exists { + continue + } + + broker := brokers[topicPartition.leader] + if topicPartition.loadErr != nil || broker == nil { + continue + } + + brokerReqParts := reqParts[broker] + if brokerReqParts == nil { + brokerReqParts = make(map[string][]int32) + reqParts[broker] = brokerReqParts + } + brokerReqParts[topic.Topic] = append(brokerReqParts[topic.Topic], partition) + } + } + + for broker, brokerReqParts := range reqParts { + req := new(kmsg.DescribeLogDirsRequest) + for topic, parts := range brokerReqParts { + req.Topics = append(req.Topics, kmsg.DescribeLogDirsRequestTopic{ + Topic: topic, + Partitions: parts, + }) + } + broker2req[broker] = req + } + + // If the request has nil topics, that means describe all. + // We need to fan that out. + if t.Topics == nil { + for _, broker := range brokers { + if broker.id < 0 { // do not use seed brokers + continue + } + broker2req[broker] = new(kmsg.DescribeLogDirsRequest) + } + } + + merge = func(newKResp kmsg.Response) { + newResp := newKResp.(*kmsg.DescribeLogDirsResponse) + resp.Version = newResp.Version + resp.ThrottleMillis = newResp.ThrottleMillis + + for _, dir := range newResp.Dirs { + existing := respParts[dir.Dir] + if existing.topics == nil { + existing.topics = make(map[string][]kmsg.DescribeLogDirsResponseDirTopicPartition) + } + if existing.errCode == 0 { + existing.errCode = dir.ErrorCode + } + respParts[dir.Dir] = existing + for _, topic := range dir.Topics { + existing.topics[topic.Topic] = append(existing.topics[topic.Topic], topic.Partitions...) + } + } + } + + finalize = func() { + for dir, inner := range respParts { + dirResp := kmsg.DescribeLogDirsResponseDir{ + ErrorCode: inner.errCode, + Dir: dir, + } + for topic, parts := range inner.topics { + dirResp.Topics = append(dirResp.Topics, kmsg.DescribeLogDirsResponseDirTopic{ + Topic: topic, + Partitions: parts, + }) + } + resp.Dirs = append(resp.Dirs, dirResp) + } + } } cl.brokersMu.RUnlock() + // Now with everything setup, we concurrently request all brokers, + // merge the responses, and finalize! var ( mergeMu sync.Mutex wg sync.WaitGroup