From f494301dc44fe3872562ff107735ce3bc30f5acf Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Tue, 5 Oct 2021 22:47:11 -0600 Subject: [PATCH] errors: make dial errors non-retriable Although we can continue retrying dialing a lot of the time, dial errors outright are often not retriable. We split dial errors from isRetriableBrokerErr. We now: - fail quicker in client.Request for dial errors - update metadata if producing fails due to dial errors We continue: - to swallow dial errors when consuming on epoch / list load errors - to swallow dial errors when AddPartitionsToTxn fails --- pkg/kgo/consumer.go | 2 +- pkg/kgo/errors.go | 13 ++++++++++--- pkg/kgo/sink.go | 10 ++++++---- 3 files changed, 17 insertions(+), 8 deletions(-) diff --git a/pkg/kgo/consumer.go b/pkg/kgo/consumer.go index ef8c957a..34238adb 100644 --- a/pkg/kgo/consumer.go +++ b/pkg/kgo/consumer.go @@ -1339,7 +1339,7 @@ func (s *consumerSession) handleListOrEpochResults(loaded loadedOffsets) (reload default: // from ErrorCode in a response reloads.addLoad(load.topic, load.partition, loaded.loadType, load.request) - if !kerr.IsRetriable(load.err) && !isRetriableBrokerErr(load.err) { // non-retriable response error; signal such in a response + if !kerr.IsRetriable(load.err) && !isRetriableBrokerErr(load.err) && !isDialErr(load.err) { // non-retriable response error; signal such in a response s.c.addFakeReadyForDraining(load.topic, load.partition, load.err) } diff --git a/pkg/kgo/errors.go b/pkg/kgo/errors.go index 6c78e8bc..c3b77c9b 100644 --- a/pkg/kgo/errors.go +++ b/pkg/kgo/errors.go @@ -40,9 +40,11 @@ func isRetriableBrokerErr(err error) bool { // We favor testing os.SyscallError first, because net.OpError _always_ // implements Temporary, so if we test that first, it'll return false // in many cases when we want to return true from os.SyscallError. - var se *os.SyscallError - if errors.As(err, &se) { - return true + if se := (*os.SyscallError)(nil); errors.As(err, &se) { + // If a dial fails, potentially we could retry if the resolver + // had a temporary hiccup, but we will err on the side of this + // being a slightly less temporary error. + return !isDialErr(err) } // EOF can be returned if a broker kills a connection unexpectedly, and // we can retry that. Same for ErrClosed. @@ -66,6 +68,11 @@ func isRetriableBrokerErr(err error) bool { return false } +func isDialErr(err error) bool { + var ne *net.OpError + return errors.As(err, &ne) && ne.Op == "dial" +} + func isSkippableBrokerErr(err error) bool { // Some broker errors are not retriable for the given broker itself, // but we *could* skip the broker and try again on the next broker. For diff --git a/pkg/kgo/sink.go b/pkg/kgo/sink.go index c82ac85e..2b9ded1c 100644 --- a/pkg/kgo/sink.go +++ b/pkg/kgo/sink.go @@ -311,7 +311,7 @@ func (s *sink) produce(sem <-chan struct{}) bool { // it was set on, since producer id recovery resets the flag. if err := s.doTxnReq(req, txnReq); err != nil { switch { - case isRetriableBrokerErr(err): + case isRetriableBrokerErr(err) || isDialErr(err): s.cl.bumpRepeatedLoadErr(err) s.cl.cfg.logger.Log(LogLevelWarn, "unable to AddPartitionsToTxn due to retriable broker err, bumping client's buffered record load errors by 1 and retrying", "err", err) return moreToDrain || len(req.batches) > 0 @@ -504,10 +504,12 @@ func (s *sink) handleReqClientErr(req *produceRequest, err error) { case err == errChosenBrokerDead, err == errUnknownBroker, + isDialErr(err), isRetriableBrokerErr(err): // A dead / unknown broker means the broker may have migrated, - // so we retry to force a metadata reload. - updateMeta := err == errUnknownBroker + // so we retry to force a metadata reload. As well, if this is + // not a retriable broker error, we should reload. + updateMeta := err == errUnknownBroker || !isRetriableBrokerErr(err) s.handleRetryBatches(req.batches, req.backoffSeq, updateMeta, false) case err == ErrClientClosed: @@ -1162,7 +1164,7 @@ func (recBuf *recBuf) bumpRepeatedLoadErr(err error) { batch0 := recBuf.batches[0] batch0.tries++ failErr := batch0.maybeFailErr(&recBuf.cl.cfg) - if (!recBuf.cl.idempotent() || batch0.canFailFromLoadErrs) && (failErr != nil || !isRetriableBrokerErr(err) && !kerr.IsRetriable(err)) { + if (!recBuf.cl.idempotent() || batch0.canFailFromLoadErrs) && (failErr != nil || !isRetriableBrokerErr(err) && !isDialErr(err) && !kerr.IsRetriable(err)) { recBuf.failAllRecords(err) } }