From 83f0dbed0d5a2ebcc014ae6b605a4b8b3a3958ac Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Sat, 26 Nov 2022 21:33:38 -0700 Subject: [PATCH] kgo: add support for sharding WriteTxnMarkers For aborting transactions, per KIP-664 --- pkg/kgo/client.go | 190 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 190 insertions(+) diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index 25858a89..2d70a157 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -1673,6 +1673,8 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res sharder = &deleteRecordsSharder{cl} case *kmsg.OffsetForLeaderEpochRequest: sharder = &offsetForLeaderEpochSharder{cl} + case *kmsg.WriteTxnMarkersRequest: + sharder = &writeTxnMarkersSharder{cl} case *kmsg.DescribeConfigsRequest: sharder = &describeConfigsSharder{cl} case *kmsg.AlterConfigsRequest: @@ -2868,6 +2870,194 @@ func (*offsetForLeaderEpochSharder) merge(sresps []ResponseShard) (kmsg.Response return merged, firstErr } +// handle sharding WriteTxnMarkersRequest +type writeTxnMarkersSharder struct{ *Client } + +func (*writeTxnMarkersSharder) unpackPinReq() bool { return false } + +func (cl *writeTxnMarkersSharder) shard(ctx context.Context, kreq kmsg.Request, _ error) ([]issueShard, bool, error) { + req := kreq.(*kmsg.WriteTxnMarkersRequest) + + var need []string + for _, marker := range req.Markers { + for _, topic := range marker.Topics { + need = append(need, topic.Topic) + } + } + mapping, err := cl.fetchMappedMetadata(ctx, need, true) + if err != nil { + return nil, false, err + } + + type pidEpochCommit struct { + pid int64 + epoch int16 + commit bool + } + + brokerReqs := make(map[int32]map[pidEpochCommit]map[string][]int32) + unknown := make(map[error]map[pidEpochCommit]map[string][]int32) // err => pec => topic => partitions + + addreq := func(b int32, pec pidEpochCommit, t string, p int32) { + pecs := brokerReqs[b] + if pecs == nil { + pecs = make(map[pidEpochCommit]map[string][]int32) + brokerReqs[b] = pecs + } + ts := pecs[pec] + if ts == nil { + ts = make(map[string][]int32) + pecs[pec] = ts + } + ts[t] = append(ts[t], p) + } + addunk := func(err error, pec pidEpochCommit, t string, p int32) { + pecs := unknown[err] + if pecs == nil { + pecs = make(map[pidEpochCommit]map[string][]int32) + unknown[err] = pecs + } + ts := pecs[pec] + if ts == nil { + ts = make(map[string][]int32) + pecs[pec] = ts + } + ts[t] = append(ts[t], p) + } + + for _, marker := range req.Markers { + pec := pidEpochCommit{ + marker.ProducerID, + marker.ProducerEpoch, + marker.Committed, + } + for _, topic := range marker.Topics { + t := topic.Topic + tmapping, exists := mapping[t] + if err := unknownOrCode(exists, tmapping.t.ErrorCode); err != nil { + for _, partition := range topic.Partitions { + addunk(err, pec, t, partition) + } + continue + } + for _, partition := range topic.Partitions { + p, exists := tmapping.ps[partition] + if err := unknownOrCode(exists, p.ErrorCode); err != nil { + addunk(err, pec, t, partition) + continue + } + if err := noLeader(p.Leader); err != nil { + addunk(err, pec, t, partition) + continue + } + addreq(p.Leader, pec, t, partition) + } + } + } + + mkreq := kmsg.NewPtrWriteTxnMarkersRequest + + var issues []issueShard + for brokerID, brokerReq := range brokerReqs { + req := mkreq() + for pec, topics := range brokerReq { + rm := kmsg.NewWriteTxnMarkersRequestMarker() + rm.ProducerID = pec.pid + rm.ProducerEpoch = pec.epoch + rm.Committed = pec.commit + for topic, parts := range topics { + rt := kmsg.NewWriteTxnMarkersRequestMarkerTopic() + rt.Topic = topic + rt.Partitions = parts + rm.Topics = append(rm.Topics, rt) + } + req.Markers = append(req.Markers, rm) + } + issues = append(issues, issueShard{ + req: req, + broker: brokerID, + }) + } + + for err, errReq := range unknown { + req := mkreq() + for pec, topics := range errReq { + rm := kmsg.NewWriteTxnMarkersRequestMarker() + rm.ProducerID = pec.pid + rm.ProducerEpoch = pec.epoch + rm.Committed = pec.commit + for topic, parts := range topics { + rt := kmsg.NewWriteTxnMarkersRequestMarkerTopic() + rt.Topic = topic + rt.Partitions = parts + rm.Topics = append(rm.Topics, rt) + } + req.Markers = append(req.Markers, rm) + } + issues = append(issues, issueShard{ + req: req, + err: err, + }) + } + return issues, true, nil // this is reshardable +} + +func (cl *writeTxnMarkersSharder) onResp(_ kmsg.Request, kresp kmsg.Response) error { + resp := kresp.(*kmsg.WriteTxnMarkersResponse) + var del []string + var retErr error + for i := range resp.Markers { + m := &resp.Markers[i] + for j := range m.Topics { + t := &m.Topics[j] + for k := range t.Partitions { + p := &t.Partitions[k] + err := kerr.ErrorForCode(p.ErrorCode) + if err == kerr.UnknownTopicOrPartition || err == kerr.NotLeaderForPartition { + del = append(del, t.Topic) + } + onRespShardErr(&retErr, err) + } + } + } + if cl.maybeDeleteMappedMetadata(del...) { + return retErr + } + return nil +} + +func (*writeTxnMarkersSharder) merge(sresps []ResponseShard) (kmsg.Response, error) { + merged := kmsg.NewPtrWriteTxnMarkersResponse() + markers := make(map[int64]map[string][]kmsg.WriteTxnMarkersResponseMarkerTopicPartition) + + firstErr := firstErrMerger(sresps, func(kresp kmsg.Response) { + resp := kresp.(*kmsg.WriteTxnMarkersResponse) + merged.Version = resp.Version + for _, marker := range resp.Markers { + topics := markers[marker.ProducerID] + if topics == nil { + topics = make(map[string][]kmsg.WriteTxnMarkersResponseMarkerTopicPartition) + markers[marker.ProducerID] = topics + } + for _, topic := range marker.Topics { + topics[topic.Topic] = append(topics[topic.Topic], topic.Partitions...) + } + } + }) + for pid, topics := range markers { + respMarker := kmsg.NewWriteTxnMarkersResponseMarker() + respMarker.ProducerID = pid + for topic, partitions := range topics { + respTopic := kmsg.NewWriteTxnMarkersResponseMarkerTopic() + respTopic.Topic = topic + respTopic.Partitions = append(respTopic.Partitions, partitions...) + respMarker.Topics = append(respMarker.Topics, respTopic) + } + merged.Markers = append(merged.Markers, respMarker) + } + return merged, firstErr +} + // handle sharding DescribeConfigsRequest type describeConfigsSharder struct{ *Client }