Skip to content

Commit

Permalink
broker: hide retryable errors *once*
Browse files Browse the repository at this point in the history
  • Loading branch information
twmb committed Nov 29, 2022
1 parent 20dd08d commit bc6810d
Showing 1 changed file with 9 additions and 2 deletions.
11 changes: 9 additions & 2 deletions pkg/kgo/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,13 @@ start:
{
var err error
if cxn, err = b.loadConnection(pr.ctx, req); err != nil {
// It is rare, but it is possible that the broker has
// an immediate issue on a new connection. We retry
// once.
if isRetriableBrokerErr(err) && !retriedOnNewConnection {
retriedOnNewConnection = true
goto start
}
pr.promise(nil, err)
return
}
Expand Down Expand Up @@ -664,7 +671,7 @@ func (cxn *brokerCxn) init(isProduceCxn bool) error {
if !hasVersions {
if cxn.b.cl.cfg.maxVersions == nil || cxn.b.cl.cfg.maxVersions.HasKey(18) {
if err := cxn.requestAPIVersions(); err != nil {
if !errors.Is(err, ErrClientClosed) {
if !errors.Is(err, ErrClientClosed) && !isRetriableBrokerErr(err) {
cxn.cl.cfg.logger.Log(LogLevelError, "unable to request api versions", "broker", logID(cxn.b.meta.NodeID), "err", err)
}
return err
Expand All @@ -677,7 +684,7 @@ func (cxn *brokerCxn) init(isProduceCxn bool) error {
}

if err := cxn.sasl(); err != nil {
if !errors.Is(err, ErrClientClosed) {
if !errors.Is(err, ErrClientClosed) && !isRetriableBrokerErr(err) {
cxn.cl.cfg.logger.Log(LogLevelError, "unable to initialize sasl", "broker", logID(cxn.b.meta.NodeID), "err", err)
}
return err
Expand Down

0 comments on commit bc6810d

Please sign in to comment.