From 0fee54ca97e586ce252a1e8ade18ef98927d764a Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Wed, 18 Aug 2021 20:35:04 -0600 Subject: [PATCH] errUnknownBroker: make retriable in sink, skippable in client A broker can become unknown if it is no longer returned from metadata. For sinks, this means metadata should be refreshed. For general requesting, the error is retriable if skipping to a different broker. --- pkg/kgo/errors.go | 11 +++++++---- pkg/kgo/sink.go | 7 ++++--- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/pkg/kgo/errors.go b/pkg/kgo/errors.go index 9e9397a7..6622e9b7 100644 --- a/pkg/kgo/errors.go +++ b/pkg/kgo/errors.go @@ -56,6 +56,9 @@ func isSkippableBrokerErr(err error) bool { // // We take anything that returns an OpError that *is not* a context // error deep inside. + if err == errUnknownBroker { + return true + } var ne *net.OpError if errors.As(err, &ne) && !errors.Is(err, context.Canceled) && @@ -70,6 +73,10 @@ var ( // INTERNAL // -- when used multiple times or checked in different areas of the client ////////////// + // Returned when issuing a request to a broker that the client does not + // know about (maybe missing from metadata responses now). + errUnknownBroker = errors.New("unknown broker") + // A temporary error returned when a broker chosen for a request is // stopped due to a concurrent metadata response. errChosenBrokerDead = errors.New("the internal broker struct chosen to issue this requesthas died--either the broker id is migrating or no longer exists") @@ -101,10 +108,6 @@ var ( // Returned when trying to produce a record outside of a transaction. errNotInTransaction = errors.New("cannot produce record transactionally if not in a transaction") - // Returned when issuing a request to a broker that the client does not - // know about. - errUnknownBroker = errors.New("unknown broker") - // Returned when records are unable to be produced and they hit the // configured record timeout limit. errRecordTimeout = errors.New("records have timed out before they were able to be produced") diff --git a/pkg/kgo/sink.go b/pkg/kgo/sink.go index 5f53dac7..d8f68fe6 100644 --- a/pkg/kgo/sink.go +++ b/pkg/kgo/sink.go @@ -504,9 +504,10 @@ func (s *sink) firstRespCheck(idempotent bool, version int16) { // produce response. func (s *sink) handleReqClientErr(req *produceRequest, err error) { switch { - case err == errChosenBrokerDead: - // A dead broker means the broker may have migrated, so we - // retry to force a metadata reload. + case err == errChosenBrokerDead, + err == errUnknownBroker: + // A dead / unknown broker means the broker may have migrated, + // so we retry to force a metadata reload. s.handleRetryBatches(req.batches, req.backoffSeq, false, false) case err == ErrClientClosed: