Skip to content

Commit 9dae366

Browse files
committed
kgo: allow retries on dial timeouts
I saw a dial failing recently because the OS was very overloaded and then slow -- dial timeouts are fine. Every other dial failure indicates a bigger problem.
1 parent 00e4e76 commit 9dae366

File tree

4 files changed

+12
-7
lines changed

4 files changed

+12
-7
lines changed

pkg/kgo/client.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1122,7 +1122,7 @@ type failDial struct{ fails int8 }
11221122
// repeatedly fails, we need to forget our cache to force a re-load: the broker
11231123
// may have completely died.
11241124
func (d *failDial) isRepeatedDialFail(err error) bool {
1125-
if isDialErr(err) {
1125+
if isAnyDialErr(err) {
11261126
d.fails++
11271127
if d.fails == 3 {
11281128
d.fails = 0

pkg/kgo/consumer.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1780,7 +1780,7 @@ func (s *consumerSession) handleListOrEpochResults(loaded loadedOffsets) (reload
17801780

17811781
default: // from ErrorCode in a response, or broker request err, or request is canceled as our session is ending
17821782
reloads.addLoad(load.topic, load.partition, loaded.loadType, load.request)
1783-
if !kerr.IsRetriable(load.err) && !isRetryableBrokerErr(load.err) && !isDialErr(load.err) && !isContextErr(load.err) { // non-retryable response error; signal such in a response
1783+
if !kerr.IsRetriable(load.err) && !isRetryableBrokerErr(load.err) && !isDialNonTimeoutErr(load.err) && !isContextErr(load.err) { // non-retryable response error; signal such in a response
17841784
s.c.addFakeReadyForDraining(load.topic, load.partition, load.err, fmt.Sprintf("notification of non-retryable error from %s request", loaded.loadType))
17851785
}
17861786

pkg/kgo/errors.go

+7-2
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func isRetryableBrokerErr(err error) bool {
4444
// If a dial fails, potentially we could retry if the resolver
4545
// had a temporary hiccup, but we will err on the side of this
4646
// being a slightly less temporary error.
47-
return !isDialErr(err)
47+
return !isDialNonTimeoutErr(err)
4848
}
4949
// EOF can be returned if a broker kills a connection unexpectedly, and
5050
// we can retry that. Same for ErrClosed.
@@ -88,7 +88,12 @@ func isRetryableBrokerErr(err error) bool {
8888
return false
8989
}
9090

91-
func isDialErr(err error) bool {
91+
func isDialNonTimeoutErr(err error) bool {
92+
var ne *net.OpError
93+
return errors.As(err, &ne) && ne.Op == "dial" && !ne.Timeout()
94+
}
95+
96+
func isAnyDialErr(err error) bool {
9297
var ne *net.OpError
9398
return errors.As(err, &ne) && ne.Op == "dial"
9499
}

pkg/kgo/sink.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,7 @@ func (s *sink) produce(sem <-chan struct{}) bool {
326326
batchesStripped, err := s.doTxnReq(req, txnReq)
327327
if err != nil {
328328
switch {
329-
case isRetryableBrokerErr(err) || isDialErr(err):
329+
case isRetryableBrokerErr(err) || isDialNonTimeoutErr(err):
330330
s.cl.bumpRepeatedLoadErr(err)
331331
s.cl.cfg.logger.Log(LogLevelWarn, "unable to AddPartitionsToTxn due to retryable broker err, bumping client's buffered record load errors by 1 and retrying", "err", err)
332332
s.cl.triggerUpdateMetadata(false, "attempting to refresh broker list due to failed AddPartitionsToTxn requests")
@@ -532,7 +532,7 @@ func (s *sink) handleReqClientErr(req *produceRequest, err error) {
532532
fallthrough
533533

534534
case errors.Is(err, errUnknownBroker),
535-
isDialErr(err),
535+
isDialNonTimeoutErr(err),
536536
isRetryableBrokerErr(err):
537537
updateMeta := !isRetryableBrokerErr(err)
538538
if updateMeta {
@@ -1269,7 +1269,7 @@ func (recBuf *recBuf) bumpRepeatedLoadErr(err error) {
12691269
var (
12701270
canFail = !recBuf.cl.idempotent() || batch0.canFailFromLoadErrs // we can only fail if we are not idempotent or if we have no outstanding requests
12711271
batch0Fail = batch0.maybeFailErr(&recBuf.cl.cfg) != nil // timeout, retries, or aborting
1272-
netErr = isRetryableBrokerErr(err) || isDialErr(err) // we can fail if this is *not* a network error
1272+
netErr = isRetryableBrokerErr(err) || isDialNonTimeoutErr(err) // we can fail if this is *not* a network error
12731273
retryableKerr = kerr.IsRetriable(err) // we fail if this is not a retryable kerr,
12741274
isUnknownLimit = recBuf.checkUnknownFailLimit(err) // or if it is, but it is UnknownTopicOrPartition and we are at our limit
12751275

0 commit comments

Comments
 (0)