Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import (
"io"
"maps"
"math/rand"
"net"
"strconv"
"sync"
"syscall"
"time"
)

Expand Down Expand Up @@ -211,16 +213,23 @@ func (ca *clusterAdmin) refreshController() (*Broker, error) {
}

// isRetriableControllerError returns `true` if the given error type unwraps to
// an `ErrNotController` or `EOF` response from Kafka
// an `ErrNotController`, `EOF` response from Kafka, or a network-level error.
func isRetriableControllerError(err error) bool {
return errors.Is(err, ErrNotController) || errors.Is(err, io.EOF)
return errors.Is(err, ErrNotController) || errors.Is(err, io.EOF) || isNetworkError(err)
}

// isRetriableGroupCoordinatorError returns `true` if the given error type
// unwraps to an `ErrNotCoordinatorForConsumer`,
// `ErrConsumerCoordinatorNotAvailable` or `EOF` response from Kafka
// `ErrConsumerCoordinatorNotAvailable`, `EOF` response from Kafka, or a network-level error
func isRetriableGroupCoordinatorError(err error) bool {
return errors.Is(err, ErrNotCoordinatorForConsumer) || errors.Is(err, ErrConsumerCoordinatorNotAvailable) || errors.Is(err, io.EOF)
return errors.Is(err, ErrNotCoordinatorForConsumer) || errors.Is(err, ErrConsumerCoordinatorNotAvailable) || errors.Is(err, io.EOF) || isNetworkError(err)
}

// isNetworkError categorizes transient network failures that should trigger
// a reconnect/retry path (e.g. broken pipe, connection reset/timeout).
func isNetworkError(err error) bool {
var ne net.Error
return errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, syscall.EPIPE) || errors.As(err, &ne)
}

// retryOnError will repeatedly call the given (error-returning) func in the
Expand All @@ -234,6 +243,13 @@ func (ca *clusterAdmin) retryOnError(retryable func(error) bool, fn func() error
if err == nil || attemptsRemaining <= 0 || !retryable(err) {
return err
}
if isNetworkError(err) {
// Close the cached controller connection so the next attempt will reopen it.
// Ref: https://github.com/IBM/sarama/issues/1162 — admin calls can get stuck on a broken controller connection.
if ctrl, e := ca.client.Controller(); e == nil {
_ = ctrl.Close()
}
}
Logger.Printf(
"admin/request retrying after %dms... (%d attempts remaining)\n",
ca.conf.Admin.Retry.Backoff/time.Millisecond, attemptsRemaining)
Expand Down