From ec0c9925f4e2b3ed5d053bc5de96ff82221448a6 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Wed, 18 Aug 2021 00:40:41 -0600 Subject: [PATCH] client improvement: retry more when input brokers are invalid If a user passed two seeds and one is invalid, we can still operate with some degree of success. Previously, we would fail a request immediately if it was attempted on the invalid seed. However, if the next seed is valid, we should be able to recover, eventually load metadata, and always be successful. We will now retry on broker-specific network errors if they are not context related, and if the next broker we would retry on is different. --- pkg/kgo/client.go | 32 +++++++++++++++++++++++++------- pkg/kgo/errors.go | 20 ++++++++++++++++++++ 2 files changed, 45 insertions(+), 7 deletions(-) diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index 34a33019..6c5c7e19 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -580,6 +580,10 @@ func (cl *Client) shouldRetry(tries int, err error) bool { return (kerr.IsRetriable(err) || isRetriableBrokerErr(err)) && int64(tries) < cl.cfg.retries } +func (cl *Client) shouldRetryNext(tries int, err error) bool { + return isSkippableBrokerErr(err) && int64(tries) < cl.cfg.retries +} + type retriable struct { cl *Client br func() (*broker, error) @@ -601,9 +605,11 @@ func (r *retriable) Request(ctx context.Context, req kmsg.Request) (kmsg.Respons tries := 0 tryStart := time.Now() retryTimeout := r.cl.cfg.retryTimeout(req.Key()) + + next, nextErr := r.br() start: tries++ - br, err := r.br() + br, err := next, nextErr r.last = br var resp kmsg.Response var retryErr error @@ -613,13 +619,25 @@ start: retryErr = r.parseRetryErr(resp) } } - if err != nil || retryErr != nil { - if retryTimeout == 0 || time.Since(tryStart) <= retryTimeout { - if (r.cl.shouldRetry(tries, err) || r.cl.shouldRetry(tries, retryErr)) && - (r.limitRetries == 0 || tries < r.limitRetries) && - r.cl.waitTries(ctx, tries) { - goto start + if err != nil || retryErr != nil { + if r.limitRetries == 0 || tries < r.limitRetries { + if retryTimeout == 0 || time.Since(tryStart) <= retryTimeout { + // If this broker / request had a retriable error, we can + // just retry now. If the error is *not* retriable but + // is a broker-specific network error, and the next + // broker is different than the current, we also retry. + if r.cl.shouldRetry(tries, err) || r.cl.shouldRetry(tries, retryErr) { + if r.cl.waitTries(ctx, tries) { + next, err = r.br() + goto start + } + } else if r.cl.shouldRetryNext(tries, err) { + next, nextErr = r.br() + if next != br && r.cl.waitTries(ctx, tries) { + goto start + } + } } } } diff --git a/pkg/kgo/errors.go b/pkg/kgo/errors.go index c8117d98..9e9397a7 100644 --- a/pkg/kgo/errors.go +++ b/pkg/kgo/errors.go @@ -1,8 +1,10 @@ package kgo import ( + "context" "errors" "fmt" + "net" "os" ) @@ -45,6 +47,24 @@ func isRetriableBrokerErr(err error) bool { return false } +func isSkippableBrokerErr(err error) bool { + // Some broker errors are not retriable for the given broker itself, + // but we *could* skip the broker and try again on the next broker. For + // example, if the user input an invalid address and a valid address + // for seeds, when we fail dialing the first seed, we cannot retry that + // broker, but we can skip to the next. + // + // We take anything that returns an OpError that *is not* a context + // error deep inside. + var ne *net.OpError + if errors.As(err, &ne) && + !errors.Is(err, context.Canceled) && + !errors.Is(err, context.DeadlineExceeded) { + return true + } + return false +} + var ( ////////////// // INTERNAL // -- when used multiple times or checked in different areas of the client