diff --git a/pkg/kgo/broker.go b/pkg/kgo/broker.go index 3667be01..292c8973 100644 --- a/pkg/kgo/broker.go +++ b/pkg/kgo/broker.go @@ -282,6 +282,8 @@ start: func (b *broker) handleReq(pr promisedReq) { req := pr.req var cxn *brokerCxn + var retriedOnNewConnection bool +start: { var err error if cxn, err = b.loadConnection(pr.ctx, req); err != nil { @@ -353,11 +355,22 @@ func (b *broker) handleReq(pr promisedReq) { // If we are after the reauth time, try to reauth. We // can only have an expiry if we went the authenticate // flow, so we know we are authenticating again. + // + // Some implementations (AWS) occasionally fail for + // unclear reasons (principals change, somehow). If + // we receive SASL_AUTHENTICATION_FAILED, we retry + // once on a new connection. See #249. + // // For KIP-368. cxn.cl.cfg.logger.Log(LogLevelDebug, "sasl expiry limit reached, reauthenticating", "broker", logID(cxn.b.meta.NodeID)) if err := cxn.sasl(); err != nil { - pr.promise(nil, err) cxn.die() + if errors.Is(err, kerr.SaslAuthenticationFailed) && !retriedOnNewConnection { + cxn.cl.cfg.logger.Log(LogLevelDebug, "sasl reauth failed, retrying once on new connection", "broker", logID(cxn.b.meta.NodeID), "err", err) + retriedOnNewConnection = true + goto start + } + pr.promise(nil, err) return } }