Skip to content

Commit

Permalink
client: always retry on NotLeader for sharded requests
Browse files Browse the repository at this point in the history
See embedded comment;
This improves ListOffsets, OffsetForLeaderEpoch, and a few other random
requests.
  • Loading branch information
twmb committed Apr 14, 2023
1 parent 9e5db4d commit f72fdaf
Showing 1 changed file with 56 additions and 26 deletions.
82 changes: 56 additions & 26 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1629,7 +1629,7 @@ func (cl *Client) handleAdminReq(ctx context.Context, req kmsg.Request) Response
for i := range t.Topics {
topics = append(topics, t.Topics[i].Topic)
}
cl.maybeDeleteMappedMetadata(topics...)
cl.maybeDeleteMappedMetadata(false, topics...)
}

var d failDial
Expand Down Expand Up @@ -2240,27 +2240,36 @@ type mappedMetadataTopic struct {
when time.Time
}

// For NOT_LEADER_FOR_PARTITION:
// We always delete stale metadata. It's possible that a leader rebalance
// happened immediately after we requested metadata; we should not pin to
// the stale metadata for 1s.
//
// For UNKNOWN_TOPIC_OR_PARTITION:
// We only delete stale metadata if it is older than the min age or 1s,
// whichever is smaller. We use 1s even if min age is larger, because we want
// to encourage larger min age for caching purposes. More obvious would be to
// *always* evict the cache here, but if we *just* requested metadata, then
// evicting the cache would cause churn for a topic that genuinely does not
// exist.
func (cl *Client) maybeDeleteMappedMetadata(ts ...string) (shouldRetry bool) {
func (cl *Client) maybeDeleteMappedMetadata(unknownTopic bool, ts ...string) (shouldRetry bool) {
if len(ts) == 0 {
return
}

min := time.Second
if cl.cfg.metadataMinAge < min {
min = cl.cfg.metadataMinAge
var min time.Duration
if unknownTopic {
min = time.Second
if cl.cfg.metadataMinAge < min {
min = cl.cfg.metadataMinAge
}
}

cl.mappedMetaMu.Lock()
defer cl.mappedMetaMu.Unlock()
for _, t := range ts {
tcached, exists := cl.mappedMeta[t]
if exists && time.Since(tcached.when) > min {
if exists && (min == 0 || time.Since(tcached.when) > min) {
shouldRetry = true
delete(cl.mappedMeta, t)
}
Expand Down Expand Up @@ -2517,21 +2526,26 @@ func (cl *listOffsetsSharder) shard(ctx context.Context, kreq kmsg.Request, _ er
}

func (cl *listOffsetsSharder) onResp(_ kmsg.Request, kresp kmsg.Response) error {
resp := kresp.(*kmsg.ListOffsetsResponse)
var del []string
var retErr error
var (
resp = kresp.(*kmsg.ListOffsetsResponse)
del []string
retErr error
unknownTopic bool
)

for i := range resp.Topics {
t := &resp.Topics[i]
for j := range t.Partitions {
p := &t.Partitions[j]
err := kerr.ErrorForCode(p.ErrorCode)
if err == kerr.UnknownTopicOrPartition || err == kerr.NotLeaderForPartition {
del = append(del, t.Topic)
unknownTopic = unknownTopic || err == kerr.UnknownTopicOrPartition
}
onRespShardErr(&retErr, err)
}
}
if cl.maybeDeleteMappedMetadata(del...) {
if cl.maybeDeleteMappedMetadata(unknownTopic, del...) {
return retErr
}
return nil
Expand Down Expand Up @@ -3054,21 +3068,25 @@ func (cl *deleteRecordsSharder) shard(ctx context.Context, kreq kmsg.Request, _
}

func (cl *deleteRecordsSharder) onResp(_ kmsg.Request, kresp kmsg.Response) error {
resp := kresp.(*kmsg.DeleteRecordsResponse)
var del []string
var retErr error
var (
resp = kresp.(*kmsg.DeleteRecordsResponse)
del []string
retErr error
unknownTopic bool
)
for i := range resp.Topics {
t := &resp.Topics[i]
for j := range t.Partitions {
p := &t.Partitions[j]
err := kerr.ErrorForCode(p.ErrorCode)
if err == kerr.UnknownTopicOrPartition || err == kerr.NotLeaderForPartition {
del = append(del, t.Topic)
unknownTopic = unknownTopic || err == kerr.UnknownTopicOrPartition
}
onRespShardErr(&retErr, err)
}
}
if cl.maybeDeleteMappedMetadata(del...) {
if cl.maybeDeleteMappedMetadata(unknownTopic, del...) {
return retErr
}
return nil
Expand Down Expand Up @@ -3171,21 +3189,25 @@ func (cl *offsetForLeaderEpochSharder) shard(ctx context.Context, kreq kmsg.Requ
}

func (cl *offsetForLeaderEpochSharder) onResp(_ kmsg.Request, kresp kmsg.Response) error {
resp := kresp.(*kmsg.OffsetForLeaderEpochResponse)
var del []string
var retErr error
var (
resp = kresp.(*kmsg.OffsetForLeaderEpochResponse)
del []string
retErr error
unknownTopic bool
)
for i := range resp.Topics {
t := &resp.Topics[i]
for j := range t.Partitions {
p := &t.Partitions[j]
err := kerr.ErrorForCode(p.ErrorCode)
if err == kerr.UnknownTopicOrPartition || err == kerr.NotLeaderForPartition {
del = append(del, t.Topic)
unknownTopic = unknownTopic || err == kerr.UnknownTopicOrPartition
}
onRespShardErr(&retErr, err)
}
}
if cl.maybeDeleteMappedMetadata(del...) {
if cl.maybeDeleteMappedMetadata(unknownTopic, del...) {
return retErr
}
return nil
Expand Down Expand Up @@ -3344,9 +3366,12 @@ func (cl *writeTxnMarkersSharder) shard(ctx context.Context, kreq kmsg.Request,
}

func (cl *writeTxnMarkersSharder) onResp(_ kmsg.Request, kresp kmsg.Response) error {
resp := kresp.(*kmsg.WriteTxnMarkersResponse)
var del []string
var retErr error
var (
resp = kresp.(*kmsg.WriteTxnMarkersResponse)
del []string
retErr error
unknownTopic bool
)
for i := range resp.Markers {
m := &resp.Markers[i]
for j := range m.Topics {
Expand All @@ -3356,12 +3381,13 @@ func (cl *writeTxnMarkersSharder) onResp(_ kmsg.Request, kresp kmsg.Response) er
err := kerr.ErrorForCode(p.ErrorCode)
if err == kerr.UnknownTopicOrPartition || err == kerr.NotLeaderForPartition {
del = append(del, t.Topic)
unknownTopic = unknownTopic || err == kerr.UnknownTopicOrPartition
}
onRespShardErr(&retErr, err)
}
}
}
if cl.maybeDeleteMappedMetadata(del...) {
if cl.maybeDeleteMappedMetadata(unknownTopic, del...) {
return retErr
}
return nil
Expand Down Expand Up @@ -4003,21 +4029,25 @@ func (cl *describeProducersSharder) shard(ctx context.Context, kreq kmsg.Request
}

func (cl *describeProducersSharder) onResp(_ kmsg.Request, kresp kmsg.Response) error {
resp := kresp.(*kmsg.DescribeProducersResponse)
var del []string
var retErr error
var (
resp = kresp.(*kmsg.DescribeProducersResponse)
del []string
retErr error
unknownTopic bool
)
for i := range resp.Topics {
t := &resp.Topics[i]
for j := range t.Partitions {
p := &t.Partitions[j]
err := kerr.ErrorForCode(p.ErrorCode)
if err == kerr.UnknownTopicOrPartition || err == kerr.NotLeaderForPartition {
del = append(del, t.Topic)
unknownTopic = unknownTopic || err == kerr.UnknownTopicOrPartition
}
onRespShardErr(&retErr, err)
}
}
if cl.maybeDeleteMappedMetadata(del...) {
if cl.maybeDeleteMappedMetadata(unknownTopic, del...) {
return retErr
}
return nil
Expand Down

0 comments on commit f72fdaf

Please sign in to comment.