From 085ad3056116f9d8476c7333afa072d011b5af52 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Sun, 6 Jun 2021 00:41:23 -0600 Subject: [PATCH] metadata: limit retries, bump produce load errors on failure Previously, if we could not dial a broker, then we would *not* bump load errors on buffered records. This meant that we would wait forever, even if produce retries was limited. Now, we limit retries for internally-caused metadata requests to 3, and we bump errors on all partitions and any waiting unknown topic on metadat failure. The number 3 keeps the internal retries tolerant to single-broker problems (e.g., cannot dial seed 1), but also limits things to be small enough to not retry for an unreasonable amount of time. This changes two logging messages to better reflect the ambiguity of what is going on. --- pkg/kgo/client.go | 32 +++++++++++++++--- pkg/kgo/metadata.go | 18 +++++++++- pkg/kgo/producer.go | 4 +-- pkg/kgo/sink.go | 2 +- pkg/kgo/topics_and_partitions.go | 57 ++++++++++++++++++++++++++++++++ 5 files changed, 104 insertions(+), 9 deletions(-) diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index 9f233853..3a057a7c 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -311,7 +311,7 @@ func (cl *Client) fetchBrokerMetadata(ctx context.Context) error { close(wait.done) }() - _, _, wait.err = cl.fetchMetadata(ctx, kmsg.NewPtrMetadataRequest()) + _, _, wait.err = cl.fetchMetadata(ctx, kmsg.NewPtrMetadataRequest(), true) return wait.err } @@ -329,11 +329,25 @@ func (cl *Client) fetchMetadataForTopics(ctx context.Context, all bool, topics [ req.Topics = append(req.Topics, kmsg.MetadataRequestTopic{Topic: &t}) } } - return cl.fetchMetadata(ctx, req) + return cl.fetchMetadata(ctx, req, true) } -func (cl *Client) fetchMetadata(ctx context.Context, req *kmsg.MetadataRequest) (*broker, *kmsg.MetadataResponse, error) { +func (cl *Client) fetchMetadata(ctx context.Context, req *kmsg.MetadataRequest, limitRetries bool) (*broker, *kmsg.MetadataResponse, error) { r := cl.retriable() + + // We limit retries for internal metadata refreshes, because these do + // not need to retry forever and are usually blocking *other* requests. + // e.g., producing bumps load errors when metadata returns, so 3 + // failures here will correspond to 1 bumped error count. To make the + // number more accurate, we should *never* retry here, but this is + // pretty intolerant of immediately-temporary network issues. Rather, + // we use a small count of 3 retries, which with the default backoff, + // will be <500ms of retrying. This is still intolerant of temporary + // failures, but it does allow recovery from a dns issue / bad path. + if limitRetries { + r.limitRetries = 3 + } + meta, err := req.RequestWith(ctx, r) if err == nil { if meta.ControllerID >= 0 { @@ -506,6 +520,11 @@ type retriable struct { br func() (*broker, error) last *broker + // If non-zero, limitRetries may specify a smaller # of retries than + // the client RequestRetries number. This is used for internal requests + // that can fail / do not need to retry forever. + limitRetries int + // parseRetryErr, if non-nil, can parse a retriable error out of the // response and return it. This error is *not* returned from the // request if the req cannot be retried due to timeout or retry limits, @@ -531,7 +550,10 @@ start: } if err != nil || retryErr != nil { if retryTimeout == 0 || time.Since(tryStart) <= retryTimeout { - if (r.cl.shouldRetry(tries, err) || r.cl.shouldRetry(tries, retryErr)) && r.cl.waitTries(ctx, tries) { + if (r.cl.shouldRetry(tries, err) || r.cl.shouldRetry(tries, retryErr)) && + (r.limitRetries == 0 || tries < r.limitRetries) && + r.cl.waitTries(ctx, tries) { + goto start } } @@ -651,7 +673,7 @@ func (cl *Client) shardedRequest(ctx context.Context, req kmsg.Request) ([]Respo if metaReq, isMetaReq := req.(*kmsg.MetadataRequest); isMetaReq { // We hijack any metadata request so as to populate our // own brokers and controller ID. - br, resp, err := cl.fetchMetadata(ctx, metaReq) + br, resp, err := cl.fetchMetadata(ctx, metaReq, false) return shards(shard(br, req, resp, err)), nil } else if adminReq, admin := req.(kmsg.AdminRequest); admin { diff --git a/pkg/kgo/metadata.go b/pkg/kgo/metadata.go index 1fd7e84a..031b5e22 100644 --- a/pkg/kgo/metadata.go +++ b/pkg/kgo/metadata.go @@ -2,6 +2,7 @@ package kgo import ( "context" + "errors" "fmt" "sort" "sync" @@ -242,6 +243,10 @@ func (cl *Client) updateMetadata() (needsRetry bool, err error) { latest, err := cl.fetchTopicMetadata(all, reqTopics) if err != nil { + cl.bumpMetadataFailForTopics( // bump load failures for all topics + tpsProducerLoad, + err, + ) return true, err } @@ -249,7 +254,7 @@ func (cl *Client) updateMetadata() (needsRetry bool, err error) { // may have returned topics the consumer is not yet tracking. We ensure // that we will store the topics at the end of our metadata update. tpsConsumerLoad := tpsConsumer.load() - if all { + if all && len(latest) > 0 { allTopics := make([]string, 0, len(latest)) for topic := range latest { allTopics = append(allTopics, topic) @@ -274,6 +279,7 @@ func (cl *Client) updateMetadata() (needsRetry bool, err error) { } }() + var missingProduceTopics []string for _, m := range []struct { priors map[string]*topicPartitions isProduce bool @@ -284,6 +290,9 @@ func (cl *Client) updateMetadata() (needsRetry bool, err error) { for topic, priorParts := range m.priors { newParts, exists := latest[topic] if !exists { + if m.isProduce { + missingProduceTopics = append(missingProduceTopics, topic) + } continue } needsRetry = needsRetry || cl.mergeTopicPartitions( @@ -297,6 +306,13 @@ func (cl *Client) updateMetadata() (needsRetry bool, err error) { ) } } + if len(missingProduceTopics) > 0 { + cl.bumpMetadataFailForTopics( + tpsProducerLoad, + errors.New("metadata request did not return this topic"), + missingProduceTopics..., + ) + } return needsRetry, nil } diff --git a/pkg/kgo/producer.go b/pkg/kgo/producer.go index 9ba172f0..b6afb284 100644 --- a/pkg/kgo/producer.go +++ b/pkg/kgo/producer.go @@ -636,8 +636,8 @@ func (cl *Client) waitUnknownTopic( } cl.cfg.logger.Log(LogLevelInfo, "unknown topic wait failed, retrying wait", "topic", topic, "err", retriableErr) tries++ - if int64(tries) >= cl.cfg.retries { - err = fmt.Errorf("no partitions available after refreshing metadata %d times, last err: %w", tries, retriableErr) + if int64(tries) >= cl.cfg.produceRetries { + err = fmt.Errorf("no partitions available after attempting to refresh metadata %d times, last err: %w", tries, retriableErr) } } } diff --git a/pkg/kgo/sink.go b/pkg/kgo/sink.go index bda50f6c..6ae326fa 100644 --- a/pkg/kgo/sink.go +++ b/pkg/kgo/sink.go @@ -1105,7 +1105,7 @@ func (recBuf *recBuf) bumpRepeatedLoadErr(err error) { if len(recBuf.batches) == 0 { return } - recBuf.cl.cfg.logger.Log(LogLevelWarn, "produce partition load error, unable to produce on this partition", "broker", recBuf.sink.nodeID, "topic", recBuf.topic, "partition", recBuf.partition, "err", err) + recBuf.cl.cfg.logger.Log(LogLevelWarn, "produce partition load error, bumping error count on first stored batch", "broker", recBuf.sink.nodeID, "topic", recBuf.topic, "partition", recBuf.partition, "err", err) batch0 := recBuf.batches[0] batch0.tries++ failErr := batch0.maybeFailErr(&recBuf.cl.cfg) diff --git a/pkg/kgo/topics_and_partitions.go b/pkg/kgo/topics_and_partitions.go index 4543f265..b52d5d50 100644 --- a/pkg/kgo/topics_and_partitions.go +++ b/pkg/kgo/topics_and_partitions.go @@ -158,6 +158,63 @@ func (cl *Client) storePartitionsUpdate(topic string, l *topicPartitions, lv *to } +// If a metadata request fails after retrying (internally retrying, so only a +// few times), or the metadata request does not return topics that we requested +// (which may also happen additionally consuming via regex), then we need to +// bump errors for topics that were previously loaded, and bump errors for +// topics awaiting load. +// +// This has two modes of operation: +// +// 1) if no topics were missing, then the metadata request failed outright, +// and we need to bump errors on all stored topics and unknown topics. +// +// 2) if topics were missing, then the metadata request was successful but +// had missing data, and we need to bump errors on only what was mising. +// +func (cl *Client) bumpMetadataFailForTopics(requested map[string]*topicPartitions, err error, missingTopics ...string) { + p := &cl.producer + + // mode 1 + if len(missingTopics) == 0 { + for _, topic := range requested { + for _, topicPartition := range topic.load().partitions { + topicPartition.records.bumpRepeatedLoadErr(err) + } + } + } + + // mode 2 + var missing map[string]bool + for _, failTopic := range missingTopics { + if missing == nil { + missing = make(map[string]bool, len(missingTopics)) + } + missing[failTopic] = true + + if topic, exists := requested[failTopic]; exists { + for _, topicPartition := range topic.load().partitions { + topicPartition.records.bumpRepeatedLoadErr(err) + } + } + } + + p.unknownTopicsMu.Lock() + defer p.unknownTopicsMu.Unlock() + + for topic, unknown := range p.unknownTopics { + // if nil, mode 1, else mode 2 + if missing != nil && !missing[topic] { + continue + } + + select { + case unknown.wait <- err: + default: + } + } +} + // topicPartitionsData is the data behind a topicPartitions' v. // // We keep this in an atomic because it is expected to be extremely read heavy,