Skip to content

Commit

Permalink
errors: make dial errors non-retriable
Browse files Browse the repository at this point in the history
Although we can continue retrying dialing a lot of the time, dial errors
outright are often not retriable. We split dial errors from
isRetriableBrokerErr.

We now:
- fail quicker in client.Request for dial errors
- update metadata if producing fails due to dial errors

We continue:
- to swallow dial errors when consuming on epoch / list load errors
- to swallow dial errors when AddPartitionsToTxn fails
  • Loading branch information
twmb committed Oct 6, 2021
1 parent 1667757 commit f494301
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 8 deletions.
2 changes: 1 addition & 1 deletion pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1339,7 +1339,7 @@ func (s *consumerSession) handleListOrEpochResults(loaded loadedOffsets) (reload

default: // from ErrorCode in a response
reloads.addLoad(load.topic, load.partition, loaded.loadType, load.request)
if !kerr.IsRetriable(load.err) && !isRetriableBrokerErr(load.err) { // non-retriable response error; signal such in a response
if !kerr.IsRetriable(load.err) && !isRetriableBrokerErr(load.err) && !isDialErr(load.err) { // non-retriable response error; signal such in a response
s.c.addFakeReadyForDraining(load.topic, load.partition, load.err)
}

Expand Down
13 changes: 10 additions & 3 deletions pkg/kgo/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@ func isRetriableBrokerErr(err error) bool {
// We favor testing os.SyscallError first, because net.OpError _always_
// implements Temporary, so if we test that first, it'll return false
// in many cases when we want to return true from os.SyscallError.
var se *os.SyscallError
if errors.As(err, &se) {
return true
if se := (*os.SyscallError)(nil); errors.As(err, &se) {
// If a dial fails, potentially we could retry if the resolver
// had a temporary hiccup, but we will err on the side of this
// being a slightly less temporary error.
return !isDialErr(err)
}
// EOF can be returned if a broker kills a connection unexpectedly, and
// we can retry that. Same for ErrClosed.
Expand All @@ -66,6 +68,11 @@ func isRetriableBrokerErr(err error) bool {
return false
}

func isDialErr(err error) bool {
var ne *net.OpError
return errors.As(err, &ne) && ne.Op == "dial"
}

func isSkippableBrokerErr(err error) bool {
// Some broker errors are not retriable for the given broker itself,
// but we *could* skip the broker and try again on the next broker. For
Expand Down
10 changes: 6 additions & 4 deletions pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func (s *sink) produce(sem <-chan struct{}) bool {
// it was set on, since producer id recovery resets the flag.
if err := s.doTxnReq(req, txnReq); err != nil {
switch {
case isRetriableBrokerErr(err):
case isRetriableBrokerErr(err) || isDialErr(err):
s.cl.bumpRepeatedLoadErr(err)
s.cl.cfg.logger.Log(LogLevelWarn, "unable to AddPartitionsToTxn due to retriable broker err, bumping client's buffered record load errors by 1 and retrying", "err", err)
return moreToDrain || len(req.batches) > 0
Expand Down Expand Up @@ -504,10 +504,12 @@ func (s *sink) handleReqClientErr(req *produceRequest, err error) {

case err == errChosenBrokerDead,
err == errUnknownBroker,
isDialErr(err),
isRetriableBrokerErr(err):
// A dead / unknown broker means the broker may have migrated,
// so we retry to force a metadata reload.
updateMeta := err == errUnknownBroker
// so we retry to force a metadata reload. As well, if this is
// not a retriable broker error, we should reload.
updateMeta := err == errUnknownBroker || !isRetriableBrokerErr(err)
s.handleRetryBatches(req.batches, req.backoffSeq, updateMeta, false)

case err == ErrClientClosed:
Expand Down Expand Up @@ -1162,7 +1164,7 @@ func (recBuf *recBuf) bumpRepeatedLoadErr(err error) {
batch0 := recBuf.batches[0]
batch0.tries++
failErr := batch0.maybeFailErr(&recBuf.cl.cfg)
if (!recBuf.cl.idempotent() || batch0.canFailFromLoadErrs) && (failErr != nil || !isRetriableBrokerErr(err) && !kerr.IsRetriable(err)) {
if (!recBuf.cl.idempotent() || batch0.canFailFromLoadErrs) && (failErr != nil || !isRetriableBrokerErr(err) && !isDialErr(err) && !kerr.IsRetriable(err)) {
recBuf.failAllRecords(err)
}
}
Expand Down

0 comments on commit f494301

Please sign in to comment.