Skip to content

Commit

Permalink
kgo: add support for sharding WriteTxnMarkers
Browse files Browse the repository at this point in the history
For aborting transactions, per KIP-664
  • Loading branch information
twmb committed Nov 29, 2022
1 parent 806ef06 commit 83f0dbe
Showing 1 changed file with 190 additions and 0 deletions.
190 changes: 190 additions & 0 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1673,6 +1673,8 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res
sharder = &deleteRecordsSharder{cl}
case *kmsg.OffsetForLeaderEpochRequest:
sharder = &offsetForLeaderEpochSharder{cl}
case *kmsg.WriteTxnMarkersRequest:
sharder = &writeTxnMarkersSharder{cl}
case *kmsg.DescribeConfigsRequest:
sharder = &describeConfigsSharder{cl}
case *kmsg.AlterConfigsRequest:
Expand Down Expand Up @@ -2868,6 +2870,194 @@ func (*offsetForLeaderEpochSharder) merge(sresps []ResponseShard) (kmsg.Response
return merged, firstErr
}

// handle sharding WriteTxnMarkersRequest
type writeTxnMarkersSharder struct{ *Client }

func (*writeTxnMarkersSharder) unpackPinReq() bool { return false }

func (cl *writeTxnMarkersSharder) shard(ctx context.Context, kreq kmsg.Request, _ error) ([]issueShard, bool, error) {
req := kreq.(*kmsg.WriteTxnMarkersRequest)

var need []string
for _, marker := range req.Markers {
for _, topic := range marker.Topics {
need = append(need, topic.Topic)
}
}
mapping, err := cl.fetchMappedMetadata(ctx, need, true)
if err != nil {
return nil, false, err
}

type pidEpochCommit struct {
pid int64
epoch int16
commit bool
}

brokerReqs := make(map[int32]map[pidEpochCommit]map[string][]int32)
unknown := make(map[error]map[pidEpochCommit]map[string][]int32) // err => pec => topic => partitions

addreq := func(b int32, pec pidEpochCommit, t string, p int32) {
pecs := brokerReqs[b]
if pecs == nil {
pecs = make(map[pidEpochCommit]map[string][]int32)
brokerReqs[b] = pecs
}
ts := pecs[pec]
if ts == nil {
ts = make(map[string][]int32)
pecs[pec] = ts
}
ts[t] = append(ts[t], p)
}
addunk := func(err error, pec pidEpochCommit, t string, p int32) {
pecs := unknown[err]
if pecs == nil {
pecs = make(map[pidEpochCommit]map[string][]int32)
unknown[err] = pecs
}
ts := pecs[pec]
if ts == nil {
ts = make(map[string][]int32)
pecs[pec] = ts
}
ts[t] = append(ts[t], p)
}

for _, marker := range req.Markers {
pec := pidEpochCommit{
marker.ProducerID,
marker.ProducerEpoch,
marker.Committed,
}
for _, topic := range marker.Topics {
t := topic.Topic
tmapping, exists := mapping[t]
if err := unknownOrCode(exists, tmapping.t.ErrorCode); err != nil {
for _, partition := range topic.Partitions {
addunk(err, pec, t, partition)
}
continue
}
for _, partition := range topic.Partitions {
p, exists := tmapping.ps[partition]
if err := unknownOrCode(exists, p.ErrorCode); err != nil {
addunk(err, pec, t, partition)
continue
}
if err := noLeader(p.Leader); err != nil {
addunk(err, pec, t, partition)
continue
}
addreq(p.Leader, pec, t, partition)
}
}
}

mkreq := kmsg.NewPtrWriteTxnMarkersRequest

var issues []issueShard
for brokerID, brokerReq := range brokerReqs {
req := mkreq()
for pec, topics := range brokerReq {
rm := kmsg.NewWriteTxnMarkersRequestMarker()
rm.ProducerID = pec.pid
rm.ProducerEpoch = pec.epoch
rm.Committed = pec.commit
for topic, parts := range topics {
rt := kmsg.NewWriteTxnMarkersRequestMarkerTopic()
rt.Topic = topic
rt.Partitions = parts
rm.Topics = append(rm.Topics, rt)
}
req.Markers = append(req.Markers, rm)
}
issues = append(issues, issueShard{
req: req,
broker: brokerID,
})
}

for err, errReq := range unknown {
req := mkreq()
for pec, topics := range errReq {
rm := kmsg.NewWriteTxnMarkersRequestMarker()
rm.ProducerID = pec.pid
rm.ProducerEpoch = pec.epoch
rm.Committed = pec.commit
for topic, parts := range topics {
rt := kmsg.NewWriteTxnMarkersRequestMarkerTopic()
rt.Topic = topic
rt.Partitions = parts
rm.Topics = append(rm.Topics, rt)
}
req.Markers = append(req.Markers, rm)
}
issues = append(issues, issueShard{
req: req,
err: err,
})
}
return issues, true, nil // this is reshardable
}

func (cl *writeTxnMarkersSharder) onResp(_ kmsg.Request, kresp kmsg.Response) error {
resp := kresp.(*kmsg.WriteTxnMarkersResponse)
var del []string
var retErr error
for i := range resp.Markers {
m := &resp.Markers[i]
for j := range m.Topics {
t := &m.Topics[j]
for k := range t.Partitions {
p := &t.Partitions[k]
err := kerr.ErrorForCode(p.ErrorCode)
if err == kerr.UnknownTopicOrPartition || err == kerr.NotLeaderForPartition {
del = append(del, t.Topic)
}
onRespShardErr(&retErr, err)
}
}
}
if cl.maybeDeleteMappedMetadata(del...) {
return retErr
}
return nil
}

func (*writeTxnMarkersSharder) merge(sresps []ResponseShard) (kmsg.Response, error) {
merged := kmsg.NewPtrWriteTxnMarkersResponse()
markers := make(map[int64]map[string][]kmsg.WriteTxnMarkersResponseMarkerTopicPartition)

firstErr := firstErrMerger(sresps, func(kresp kmsg.Response) {
resp := kresp.(*kmsg.WriteTxnMarkersResponse)
merged.Version = resp.Version
for _, marker := range resp.Markers {
topics := markers[marker.ProducerID]
if topics == nil {
topics = make(map[string][]kmsg.WriteTxnMarkersResponseMarkerTopicPartition)
markers[marker.ProducerID] = topics
}
for _, topic := range marker.Topics {
topics[topic.Topic] = append(topics[topic.Topic], topic.Partitions...)
}
}
})
for pid, topics := range markers {
respMarker := kmsg.NewWriteTxnMarkersResponseMarker()
respMarker.ProducerID = pid
for topic, partitions := range topics {
respTopic := kmsg.NewWriteTxnMarkersResponseMarkerTopic()
respTopic.Topic = topic
respTopic.Partitions = append(respTopic.Partitions, partitions...)
respMarker.Topics = append(respMarker.Topics, respTopic)
}
merged.Markers = append(merged.Markers, respMarker)
}
return merged, firstErr
}

// handle sharding DescribeConfigsRequest
type describeConfigsSharder struct{ *Client }

Expand Down

0 comments on commit 83f0dbe

Please sign in to comment.