Skip to content

Commit

Permalink
metadata: limit retries, bump produce load errors on failure
Browse files Browse the repository at this point in the history
Previously, if we could not dial a broker, then we would *not* bump load
errors on buffered records. This meant that we would wait forever, even
if produce retries was limited.

Now, we limit retries for internally-caused metadata requests to 3, and
we bump errors on all partitions and any waiting unknown topic on
metadat failure.

The number 3 keeps the internal retries tolerant to single-broker
problems (e.g., cannot dial seed 1), but also limits things to be small
enough to not retry for an unreasonable amount of time.

This changes two logging messages to better reflect the ambiguity of
what is going on.
  • Loading branch information
twmb committed Jun 6, 2021
1 parent b26489f commit 085ad30
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 9 deletions.
32 changes: 27 additions & 5 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func (cl *Client) fetchBrokerMetadata(ctx context.Context) error {
close(wait.done)
}()

_, _, wait.err = cl.fetchMetadata(ctx, kmsg.NewPtrMetadataRequest())
_, _, wait.err = cl.fetchMetadata(ctx, kmsg.NewPtrMetadataRequest(), true)
return wait.err
}

Expand All @@ -329,11 +329,25 @@ func (cl *Client) fetchMetadataForTopics(ctx context.Context, all bool, topics [
req.Topics = append(req.Topics, kmsg.MetadataRequestTopic{Topic: &t})
}
}
return cl.fetchMetadata(ctx, req)
return cl.fetchMetadata(ctx, req, true)
}

func (cl *Client) fetchMetadata(ctx context.Context, req *kmsg.MetadataRequest) (*broker, *kmsg.MetadataResponse, error) {
func (cl *Client) fetchMetadata(ctx context.Context, req *kmsg.MetadataRequest, limitRetries bool) (*broker, *kmsg.MetadataResponse, error) {
r := cl.retriable()

// We limit retries for internal metadata refreshes, because these do
// not need to retry forever and are usually blocking *other* requests.
// e.g., producing bumps load errors when metadata returns, so 3
// failures here will correspond to 1 bumped error count. To make the
// number more accurate, we should *never* retry here, but this is
// pretty intolerant of immediately-temporary network issues. Rather,
// we use a small count of 3 retries, which with the default backoff,
// will be <500ms of retrying. This is still intolerant of temporary
// failures, but it does allow recovery from a dns issue / bad path.
if limitRetries {
r.limitRetries = 3
}

meta, err := req.RequestWith(ctx, r)
if err == nil {
if meta.ControllerID >= 0 {
Expand Down Expand Up @@ -506,6 +520,11 @@ type retriable struct {
br func() (*broker, error)
last *broker

// If non-zero, limitRetries may specify a smaller # of retries than
// the client RequestRetries number. This is used for internal requests
// that can fail / do not need to retry forever.
limitRetries int

// parseRetryErr, if non-nil, can parse a retriable error out of the
// response and return it. This error is *not* returned from the
// request if the req cannot be retried due to timeout or retry limits,
Expand All @@ -531,7 +550,10 @@ start:
}
if err != nil || retryErr != nil {
if retryTimeout == 0 || time.Since(tryStart) <= retryTimeout {
if (r.cl.shouldRetry(tries, err) || r.cl.shouldRetry(tries, retryErr)) && r.cl.waitTries(ctx, tries) {
if (r.cl.shouldRetry(tries, err) || r.cl.shouldRetry(tries, retryErr)) &&
(r.limitRetries == 0 || tries < r.limitRetries) &&
r.cl.waitTries(ctx, tries) {

goto start
}
}
Expand Down Expand Up @@ -651,7 +673,7 @@ func (cl *Client) shardedRequest(ctx context.Context, req kmsg.Request) ([]Respo
if metaReq, isMetaReq := req.(*kmsg.MetadataRequest); isMetaReq {
// We hijack any metadata request so as to populate our
// own brokers and controller ID.
br, resp, err := cl.fetchMetadata(ctx, metaReq)
br, resp, err := cl.fetchMetadata(ctx, metaReq, false)
return shards(shard(br, req, resp, err)), nil

} else if adminReq, admin := req.(kmsg.AdminRequest); admin {
Expand Down
18 changes: 17 additions & 1 deletion pkg/kgo/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kgo

import (
"context"
"errors"
"fmt"
"sort"
"sync"
Expand Down Expand Up @@ -242,14 +243,18 @@ func (cl *Client) updateMetadata() (needsRetry bool, err error) {

latest, err := cl.fetchTopicMetadata(all, reqTopics)
if err != nil {
cl.bumpMetadataFailForTopics( // bump load failures for all topics
tpsProducerLoad,
err,
)
return true, err
}

// If we are consuming with regex and fetched all topics, the metadata
// may have returned topics the consumer is not yet tracking. We ensure
// that we will store the topics at the end of our metadata update.
tpsConsumerLoad := tpsConsumer.load()
if all {
if all && len(latest) > 0 {
allTopics := make([]string, 0, len(latest))
for topic := range latest {
allTopics = append(allTopics, topic)
Expand All @@ -274,6 +279,7 @@ func (cl *Client) updateMetadata() (needsRetry bool, err error) {
}
}()

var missingProduceTopics []string
for _, m := range []struct {
priors map[string]*topicPartitions
isProduce bool
Expand All @@ -284,6 +290,9 @@ func (cl *Client) updateMetadata() (needsRetry bool, err error) {
for topic, priorParts := range m.priors {
newParts, exists := latest[topic]
if !exists {
if m.isProduce {
missingProduceTopics = append(missingProduceTopics, topic)
}
continue
}
needsRetry = needsRetry || cl.mergeTopicPartitions(
Expand All @@ -297,6 +306,13 @@ func (cl *Client) updateMetadata() (needsRetry bool, err error) {
)
}
}
if len(missingProduceTopics) > 0 {
cl.bumpMetadataFailForTopics(
tpsProducerLoad,
errors.New("metadata request did not return this topic"),
missingProduceTopics...,
)
}

return needsRetry, nil
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kgo/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,8 +636,8 @@ func (cl *Client) waitUnknownTopic(
}
cl.cfg.logger.Log(LogLevelInfo, "unknown topic wait failed, retrying wait", "topic", topic, "err", retriableErr)
tries++
if int64(tries) >= cl.cfg.retries {
err = fmt.Errorf("no partitions available after refreshing metadata %d times, last err: %w", tries, retriableErr)
if int64(tries) >= cl.cfg.produceRetries {
err = fmt.Errorf("no partitions available after attempting to refresh metadata %d times, last err: %w", tries, retriableErr)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -1105,7 +1105,7 @@ func (recBuf *recBuf) bumpRepeatedLoadErr(err error) {
if len(recBuf.batches) == 0 {
return
}
recBuf.cl.cfg.logger.Log(LogLevelWarn, "produce partition load error, unable to produce on this partition", "broker", recBuf.sink.nodeID, "topic", recBuf.topic, "partition", recBuf.partition, "err", err)
recBuf.cl.cfg.logger.Log(LogLevelWarn, "produce partition load error, bumping error count on first stored batch", "broker", recBuf.sink.nodeID, "topic", recBuf.topic, "partition", recBuf.partition, "err", err)
batch0 := recBuf.batches[0]
batch0.tries++
failErr := batch0.maybeFailErr(&recBuf.cl.cfg)
Expand Down
57 changes: 57 additions & 0 deletions pkg/kgo/topics_and_partitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,63 @@ func (cl *Client) storePartitionsUpdate(topic string, l *topicPartitions, lv *to

}

// If a metadata request fails after retrying (internally retrying, so only a
// few times), or the metadata request does not return topics that we requested
// (which may also happen additionally consuming via regex), then we need to
// bump errors for topics that were previously loaded, and bump errors for
// topics awaiting load.
//
// This has two modes of operation:
//
// 1) if no topics were missing, then the metadata request failed outright,
// and we need to bump errors on all stored topics and unknown topics.
//
// 2) if topics were missing, then the metadata request was successful but
// had missing data, and we need to bump errors on only what was mising.
//
func (cl *Client) bumpMetadataFailForTopics(requested map[string]*topicPartitions, err error, missingTopics ...string) {
p := &cl.producer

// mode 1
if len(missingTopics) == 0 {
for _, topic := range requested {
for _, topicPartition := range topic.load().partitions {
topicPartition.records.bumpRepeatedLoadErr(err)
}
}
}

// mode 2
var missing map[string]bool
for _, failTopic := range missingTopics {
if missing == nil {
missing = make(map[string]bool, len(missingTopics))
}
missing[failTopic] = true

if topic, exists := requested[failTopic]; exists {
for _, topicPartition := range topic.load().partitions {
topicPartition.records.bumpRepeatedLoadErr(err)
}
}
}

p.unknownTopicsMu.Lock()
defer p.unknownTopicsMu.Unlock()

for topic, unknown := range p.unknownTopics {
// if nil, mode 1, else mode 2
if missing != nil && !missing[topic] {
continue
}

select {
case unknown.wait <- err:
default:
}
}
}

// topicPartitionsData is the data behind a topicPartitions' v.
//
// We keep this in an atomic because it is expected to be extremely read heavy,
Expand Down

0 comments on commit 085ad30

Please sign in to comment.