diff --git a/admin.go b/admin.go index a88fe6b06..1d58bc68f 100644 --- a/admin.go +++ b/admin.go @@ -6,8 +6,10 @@ import ( "io" "maps" "math/rand" + "net" "strconv" "sync" + "syscall" "time" ) @@ -211,16 +213,23 @@ func (ca *clusterAdmin) refreshController() (*Broker, error) { } // isRetriableControllerError returns `true` if the given error type unwraps to -// an `ErrNotController` or `EOF` response from Kafka +// an `ErrNotController`, `EOF` response from Kafka, or a network-level error. func isRetriableControllerError(err error) bool { - return errors.Is(err, ErrNotController) || errors.Is(err, io.EOF) + return errors.Is(err, ErrNotController) || errors.Is(err, io.EOF) || isNetworkError(err) } // isRetriableGroupCoordinatorError returns `true` if the given error type // unwraps to an `ErrNotCoordinatorForConsumer`, -// `ErrConsumerCoordinatorNotAvailable` or `EOF` response from Kafka +// `ErrConsumerCoordinatorNotAvailable`, `EOF` response from Kafka, or a network-level error func isRetriableGroupCoordinatorError(err error) bool { - return errors.Is(err, ErrNotCoordinatorForConsumer) || errors.Is(err, ErrConsumerCoordinatorNotAvailable) || errors.Is(err, io.EOF) + return errors.Is(err, ErrNotCoordinatorForConsumer) || errors.Is(err, ErrConsumerCoordinatorNotAvailable) || errors.Is(err, io.EOF) || isNetworkError(err) +} + +// isNetworkError categorizes transient network failures that should trigger +// a reconnect/retry path (e.g. broken pipe, connection reset/timeout). +func isNetworkError(err error) bool { + var ne net.Error + return errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, syscall.EPIPE) || errors.As(err, &ne) } // retryOnError will repeatedly call the given (error-returning) func in the @@ -234,6 +243,13 @@ func (ca *clusterAdmin) retryOnError(retryable func(error) bool, fn func() error if err == nil || attemptsRemaining <= 0 || !retryable(err) { return err } + if isNetworkError(err) { + // Close the cached controller connection so the next attempt will reopen it. + // Ref: https://github.com/IBM/sarama/issues/1162 — admin calls can get stuck on a broken controller connection. + if ctrl, e := ca.client.Controller(); e == nil { + _ = ctrl.Close() + } + } Logger.Printf( "admin/request retrying after %dms... (%d attempts remaining)\n", ca.conf.Admin.Retry.Backoff/time.Millisecond, attemptsRemaining)