From f72fdafdf6c97f47fab1b8567a926a989997f0dc Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Fri, 14 Apr 2023 11:24:18 -0600 Subject: [PATCH] client: always retry on NotLeader for sharded requests See embedded comment; This improves ListOffsets, OffsetForLeaderEpoch, and a few other random requests. --- pkg/kgo/client.go | 82 ++++++++++++++++++++++++++++++++--------------- 1 file changed, 56 insertions(+), 26 deletions(-) diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index 9e051d00..8a36433c 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -1629,7 +1629,7 @@ func (cl *Client) handleAdminReq(ctx context.Context, req kmsg.Request) Response for i := range t.Topics { topics = append(topics, t.Topics[i].Topic) } - cl.maybeDeleteMappedMetadata(topics...) + cl.maybeDeleteMappedMetadata(false, topics...) } var d failDial @@ -2240,27 +2240,36 @@ type mappedMetadataTopic struct { when time.Time } +// For NOT_LEADER_FOR_PARTITION: +// We always delete stale metadata. It's possible that a leader rebalance +// happened immediately after we requested metadata; we should not pin to +// the stale metadata for 1s. +// +// For UNKNOWN_TOPIC_OR_PARTITION: // We only delete stale metadata if it is older than the min age or 1s, // whichever is smaller. We use 1s even if min age is larger, because we want // to encourage larger min age for caching purposes. More obvious would be to // *always* evict the cache here, but if we *just* requested metadata, then // evicting the cache would cause churn for a topic that genuinely does not // exist. -func (cl *Client) maybeDeleteMappedMetadata(ts ...string) (shouldRetry bool) { +func (cl *Client) maybeDeleteMappedMetadata(unknownTopic bool, ts ...string) (shouldRetry bool) { if len(ts) == 0 { return } - min := time.Second - if cl.cfg.metadataMinAge < min { - min = cl.cfg.metadataMinAge + var min time.Duration + if unknownTopic { + min = time.Second + if cl.cfg.metadataMinAge < min { + min = cl.cfg.metadataMinAge + } } cl.mappedMetaMu.Lock() defer cl.mappedMetaMu.Unlock() for _, t := range ts { tcached, exists := cl.mappedMeta[t] - if exists && time.Since(tcached.when) > min { + if exists && (min == 0 || time.Since(tcached.when) > min) { shouldRetry = true delete(cl.mappedMeta, t) } @@ -2517,9 +2526,13 @@ func (cl *listOffsetsSharder) shard(ctx context.Context, kreq kmsg.Request, _ er } func (cl *listOffsetsSharder) onResp(_ kmsg.Request, kresp kmsg.Response) error { - resp := kresp.(*kmsg.ListOffsetsResponse) - var del []string - var retErr error + var ( + resp = kresp.(*kmsg.ListOffsetsResponse) + del []string + retErr error + unknownTopic bool + ) + for i := range resp.Topics { t := &resp.Topics[i] for j := range t.Partitions { @@ -2527,11 +2540,12 @@ func (cl *listOffsetsSharder) onResp(_ kmsg.Request, kresp kmsg.Response) error err := kerr.ErrorForCode(p.ErrorCode) if err == kerr.UnknownTopicOrPartition || err == kerr.NotLeaderForPartition { del = append(del, t.Topic) + unknownTopic = unknownTopic || err == kerr.UnknownTopicOrPartition } onRespShardErr(&retErr, err) } } - if cl.maybeDeleteMappedMetadata(del...) { + if cl.maybeDeleteMappedMetadata(unknownTopic, del...) { return retErr } return nil @@ -3054,9 +3068,12 @@ func (cl *deleteRecordsSharder) shard(ctx context.Context, kreq kmsg.Request, _ } func (cl *deleteRecordsSharder) onResp(_ kmsg.Request, kresp kmsg.Response) error { - resp := kresp.(*kmsg.DeleteRecordsResponse) - var del []string - var retErr error + var ( + resp = kresp.(*kmsg.DeleteRecordsResponse) + del []string + retErr error + unknownTopic bool + ) for i := range resp.Topics { t := &resp.Topics[i] for j := range t.Partitions { @@ -3064,11 +3081,12 @@ func (cl *deleteRecordsSharder) onResp(_ kmsg.Request, kresp kmsg.Response) erro err := kerr.ErrorForCode(p.ErrorCode) if err == kerr.UnknownTopicOrPartition || err == kerr.NotLeaderForPartition { del = append(del, t.Topic) + unknownTopic = unknownTopic || err == kerr.UnknownTopicOrPartition } onRespShardErr(&retErr, err) } } - if cl.maybeDeleteMappedMetadata(del...) { + if cl.maybeDeleteMappedMetadata(unknownTopic, del...) { return retErr } return nil @@ -3171,9 +3189,12 @@ func (cl *offsetForLeaderEpochSharder) shard(ctx context.Context, kreq kmsg.Requ } func (cl *offsetForLeaderEpochSharder) onResp(_ kmsg.Request, kresp kmsg.Response) error { - resp := kresp.(*kmsg.OffsetForLeaderEpochResponse) - var del []string - var retErr error + var ( + resp = kresp.(*kmsg.OffsetForLeaderEpochResponse) + del []string + retErr error + unknownTopic bool + ) for i := range resp.Topics { t := &resp.Topics[i] for j := range t.Partitions { @@ -3181,11 +3202,12 @@ func (cl *offsetForLeaderEpochSharder) onResp(_ kmsg.Request, kresp kmsg.Respons err := kerr.ErrorForCode(p.ErrorCode) if err == kerr.UnknownTopicOrPartition || err == kerr.NotLeaderForPartition { del = append(del, t.Topic) + unknownTopic = unknownTopic || err == kerr.UnknownTopicOrPartition } onRespShardErr(&retErr, err) } } - if cl.maybeDeleteMappedMetadata(del...) { + if cl.maybeDeleteMappedMetadata(unknownTopic, del...) { return retErr } return nil @@ -3344,9 +3366,12 @@ func (cl *writeTxnMarkersSharder) shard(ctx context.Context, kreq kmsg.Request, } func (cl *writeTxnMarkersSharder) onResp(_ kmsg.Request, kresp kmsg.Response) error { - resp := kresp.(*kmsg.WriteTxnMarkersResponse) - var del []string - var retErr error + var ( + resp = kresp.(*kmsg.WriteTxnMarkersResponse) + del []string + retErr error + unknownTopic bool + ) for i := range resp.Markers { m := &resp.Markers[i] for j := range m.Topics { @@ -3356,12 +3381,13 @@ func (cl *writeTxnMarkersSharder) onResp(_ kmsg.Request, kresp kmsg.Response) er err := kerr.ErrorForCode(p.ErrorCode) if err == kerr.UnknownTopicOrPartition || err == kerr.NotLeaderForPartition { del = append(del, t.Topic) + unknownTopic = unknownTopic || err == kerr.UnknownTopicOrPartition } onRespShardErr(&retErr, err) } } } - if cl.maybeDeleteMappedMetadata(del...) { + if cl.maybeDeleteMappedMetadata(unknownTopic, del...) { return retErr } return nil @@ -4003,9 +4029,12 @@ func (cl *describeProducersSharder) shard(ctx context.Context, kreq kmsg.Request } func (cl *describeProducersSharder) onResp(_ kmsg.Request, kresp kmsg.Response) error { - resp := kresp.(*kmsg.DescribeProducersResponse) - var del []string - var retErr error + var ( + resp = kresp.(*kmsg.DescribeProducersResponse) + del []string + retErr error + unknownTopic bool + ) for i := range resp.Topics { t := &resp.Topics[i] for j := range t.Partitions { @@ -4013,11 +4042,12 @@ func (cl *describeProducersSharder) onResp(_ kmsg.Request, kresp kmsg.Response) err := kerr.ErrorForCode(p.ErrorCode) if err == kerr.UnknownTopicOrPartition || err == kerr.NotLeaderForPartition { del = append(del, t.Topic) + unknownTopic = unknownTopic || err == kerr.UnknownTopicOrPartition } onRespShardErr(&retErr, err) } } - if cl.maybeDeleteMappedMetadata(del...) { + if cl.maybeDeleteMappedMetadata(unknownTopic, del...) { return retErr } return nil