diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index 2d70a157..3dd2c3bc 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -810,11 +810,27 @@ type retriable struct { // that can fail / do not need to retry forever. limitRetries int - // parseRetryErr, if non-nil, can parse a retriable error out of the - // response and return it. This error is *not* returned from the - // request if the req cannot be retried due to timeout or retry limits, - // but it *can* allow a retry if neither limit is hit yet. - parseRetryErr func(kmsg.Response) error + // parseRetryErr, if non-nil, can delete stale cached brokers. We do + // *not* return the error from this function to the caller, but we do + // use it to potentially retry. It is not necessary, but also not + // harmful, to return the input error. + parseRetryErr func(kmsg.Response, error) error +} + +type failDial struct{ fails int8 } + +// The controller and group/txn coordinators are cached. If dialing the broker +// repeatedly fails, we need to forget our cache to force a re-load: the broker +// may have completely died. +func (d *failDial) isRepeatedDialFail(err error) bool { + if isDialErr(err) { + d.fails++ + if d.fails == 3 { + d.fails = 0 + return true + } + } + return false } func (r *retriable) Request(ctx context.Context, req kmsg.Request) (kmsg.Response, error) { @@ -831,8 +847,8 @@ start: var retryErr error if err == nil { resp, err = r.last.waitResp(ctx, req) - if err == nil && r.parseRetryErr != nil { - retryErr = r.parseRetryErr(resp) + if r.parseRetryErr != nil { + retryErr = r.parseRetryErr(resp, err) } } @@ -1098,7 +1114,6 @@ func (cl *Client) controller(ctx context.Context) (*broker, error) { func (cl *Client) forgetControllerID(id int32) { cl.controllerIDMu.Lock() defer cl.controllerIDMu.Unlock() - if cl.controllerID == id { cl.controllerID = unknownControllerID } @@ -1288,18 +1303,21 @@ func (cl *Client) maybeDeleteStaleCoordinator(name string, typ int8, err error) case errors.Is(err, kerr.CoordinatorNotAvailable), errors.Is(err, kerr.CoordinatorLoadInProgress), errors.Is(err, kerr.NotCoordinator): - - cl.coordinatorsMu.Lock() - delete(cl.coordinators, coordinatorKey{ - name: name, - typ: typ, - }) - cl.coordinatorsMu.Unlock() + cl.deleteStaleCoordinator(name, typ) return true } return false } +func (cl *Client) deleteStaleCoordinator(name string, typ int8) { + cl.coordinatorsMu.Lock() + defer cl.coordinatorsMu.Unlock() + delete(cl.coordinators, coordinatorKey{ + name: name, + typ: typ, + }) +} + type brokerOrErr struct { b *broker err error @@ -1325,7 +1343,14 @@ func (cl *Client) handleAdminReq(ctx context.Context, req kmsg.Request) Response cl.maybeDeleteMappedMetadata(topics...) } - r.parseRetryErr = func(resp kmsg.Response) error { + var d failDial + r.parseRetryErr = func(resp kmsg.Response, err error) error { + if err != nil { + if d.isRepeatedDialFail(err) { + cl.forgetControllerID(r.last.meta.NodeID) + } + return err + } var code int16 switch t := resp.(type) { case *kmsg.CreateTopicsResponse: @@ -1455,7 +1480,14 @@ func (cl *Client) handleReqWithCoordinator( req kmsg.Request, ) (*broker, kmsg.Response, error) { r := cl.retriableBrokerFn(coordinator) - r.parseRetryErr = func(resp kmsg.Response) error { + var d failDial + r.parseRetryErr = func(resp kmsg.Response, err error) error { + if err != nil { + if d.isRepeatedDialFail(err) { + cl.deleteStaleCoordinator(name, typ) + } + return err + } var code int16 switch t := resp.(type) { // TXN