Skip to content

Commit

Permalink
errUnknownBroker: make retriable in sink, skippable in client
Browse files Browse the repository at this point in the history
A broker can become unknown if it is no longer returned from metadata.
For sinks, this means metadata should be refreshed. For general
requesting, the error is retriable if skipping to a different broker.
  • Loading branch information
twmb committed Aug 19, 2021
1 parent ec0c992 commit 0fee54c
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 7 deletions.
11 changes: 7 additions & 4 deletions pkg/kgo/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ func isSkippableBrokerErr(err error) bool {
//
// We take anything that returns an OpError that *is not* a context
// error deep inside.
if err == errUnknownBroker {
return true
}
var ne *net.OpError
if errors.As(err, &ne) &&
!errors.Is(err, context.Canceled) &&
Expand All @@ -70,6 +73,10 @@ var (
// INTERNAL // -- when used multiple times or checked in different areas of the client
//////////////

// Returned when issuing a request to a broker that the client does not
// know about (maybe missing from metadata responses now).
errUnknownBroker = errors.New("unknown broker")

// A temporary error returned when a broker chosen for a request is
// stopped due to a concurrent metadata response.
errChosenBrokerDead = errors.New("the internal broker struct chosen to issue this requesthas died--either the broker id is migrating or no longer exists")
Expand Down Expand Up @@ -101,10 +108,6 @@ var (
// Returned when trying to produce a record outside of a transaction.
errNotInTransaction = errors.New("cannot produce record transactionally if not in a transaction")

// Returned when issuing a request to a broker that the client does not
// know about.
errUnknownBroker = errors.New("unknown broker")

// Returned when records are unable to be produced and they hit the
// configured record timeout limit.
errRecordTimeout = errors.New("records have timed out before they were able to be produced")
Expand Down
7 changes: 4 additions & 3 deletions pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,9 +504,10 @@ func (s *sink) firstRespCheck(idempotent bool, version int16) {
// produce response.
func (s *sink) handleReqClientErr(req *produceRequest, err error) {
switch {
case err == errChosenBrokerDead:
// A dead broker means the broker may have migrated, so we
// retry to force a metadata reload.
case err == errChosenBrokerDead,
err == errUnknownBroker:
// A dead / unknown broker means the broker may have migrated,
// so we retry to force a metadata reload.
s.handleRetryBatches(req.batches, req.backoffSeq, false, false)

case err == ErrClientClosed:
Expand Down

0 comments on commit 0fee54c

Please sign in to comment.