From bc6810dde25cde27d6c442afc5380cfcfa844a4d Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Mon, 28 Nov 2022 21:12:51 -0700 Subject: [PATCH] broker: hide retryable errors *once* --- pkg/kgo/broker.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/pkg/kgo/broker.go b/pkg/kgo/broker.go index 292c8973..d26c0aeb 100644 --- a/pkg/kgo/broker.go +++ b/pkg/kgo/broker.go @@ -287,6 +287,13 @@ start: { var err error if cxn, err = b.loadConnection(pr.ctx, req); err != nil { + // It is rare, but it is possible that the broker has + // an immediate issue on a new connection. We retry + // once. + if isRetriableBrokerErr(err) && !retriedOnNewConnection { + retriedOnNewConnection = true + goto start + } pr.promise(nil, err) return } @@ -664,7 +671,7 @@ func (cxn *brokerCxn) init(isProduceCxn bool) error { if !hasVersions { if cxn.b.cl.cfg.maxVersions == nil || cxn.b.cl.cfg.maxVersions.HasKey(18) { if err := cxn.requestAPIVersions(); err != nil { - if !errors.Is(err, ErrClientClosed) { + if !errors.Is(err, ErrClientClosed) && !isRetriableBrokerErr(err) { cxn.cl.cfg.logger.Log(LogLevelError, "unable to request api versions", "broker", logID(cxn.b.meta.NodeID), "err", err) } return err @@ -677,7 +684,7 @@ func (cxn *brokerCxn) init(isProduceCxn bool) error { } if err := cxn.sasl(); err != nil { - if !errors.Is(err, ErrClientClosed) { + if !errors.Is(err, ErrClientClosed) && !isRetriableBrokerErr(err) { cxn.cl.cfg.logger.Log(LogLevelError, "unable to initialize sasl", "broker", logID(cxn.b.meta.NodeID), "err", err) } return err