Skip to content

Commit

Permalink
all: switch all docs, all non-public APIs from retriable to retryable
Browse files Browse the repository at this point in the history
Retryable is more common and after seeing it a lot, I've come to prefer
it. Unfortuantely, the public APIs can not be switched. They can be
deprecated but it isn't really worth it. There are three public APIs
that cannot change:

func kerr.IsRetriable
field kerr.Error.Retriable
func kgo.Broker.RetriableRequest
  • Loading branch information
twmb committed Mar 8, 2023
1 parent db50e8d commit 335cd47
Show file tree
Hide file tree
Showing 14 changed files with 87 additions and 100 deletions.
2 changes: 1 addition & 1 deletion generate/definitions/09_offset_fetch
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ OffsetFetchRequest => key 9, max version 8, flexible v6+, group coordinator
Topic: string
Partitions: [int32]
// RequireStable signifies whether the broker should wait on returning
// unstable offsets, instead setting a retriable error on the relevant
// unstable offsets, instead setting a retryable error on the relevant
// unstable partitions (UNSTABLE_OFFSET_COMMIT). See KIP-447 for more
// details.
RequireStable: bool // v7+
Expand Down
6 changes: 3 additions & 3 deletions pkg/kgo/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ start:
// It is rare, but it is possible that the broker has
// an immediate issue on a new connection. We retry
// once.
if isRetriableBrokerErr(err) && !retriedOnNewConnection {
if isRetryableBrokerErr(err) && !retriedOnNewConnection {
retriedOnNewConnection = true
goto start
}
Expand Down Expand Up @@ -672,7 +672,7 @@ func (cxn *brokerCxn) init(isProduceCxn bool) error {
if !hasVersions {
if cxn.b.cl.cfg.maxVersions == nil || cxn.b.cl.cfg.maxVersions.HasKey(18) {
if err := cxn.requestAPIVersions(); err != nil {
if !errors.Is(err, ErrClientClosed) && !isRetriableBrokerErr(err) {
if !errors.Is(err, ErrClientClosed) && !isRetryableBrokerErr(err) {
cxn.cl.cfg.logger.Log(LogLevelError, "unable to request api versions", "broker", logID(cxn.b.meta.NodeID), "err", err)
}
return err
Expand All @@ -685,7 +685,7 @@ func (cxn *brokerCxn) init(isProduceCxn bool) error {
}

if err := cxn.sasl(); err != nil {
if !errors.Is(err, ErrClientClosed) && !isRetriableBrokerErr(err) {
if !errors.Is(err, ErrClientClosed) && !isRetryableBrokerErr(err) {
cxn.cl.cfg.logger.Log(LogLevelError, "unable to initialize sasl", "broker", logID(cxn.b.meta.NodeID), "err", err)
}
return err
Expand Down
60 changes: 30 additions & 30 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ func (cl *Client) fetchMetadataForTopics(ctx context.Context, all bool, topics [
}

func (cl *Client) fetchMetadata(ctx context.Context, req *kmsg.MetadataRequest, limitRetries bool) (*broker, *kmsg.MetadataResponse, error) {
r := cl.retriable()
r := cl.retryable()

// We limit retries for internal metadata refreshes, because these do
// not need to retry forever and are usually blocking *other* requests.
Expand Down Expand Up @@ -731,7 +731,7 @@ func (cl *Client) Close() {
}

// Request issues a request to Kafka, waiting for and returning the response.
// If a retriable network error occurs, or if a retriable group / transaction
// If a retryable network error occurs, or if a retryable group / transaction
// coordinator error occurs, the request is retried. All other errors are
// returned.
//
Expand Down Expand Up @@ -799,23 +799,23 @@ func (cl *Client) Request(ctx context.Context, req kmsg.Request) (kmsg.Response,
return merge(resps)
}

func (cl *Client) retriable() *retriable {
return cl.retriableBrokerFn(func() (*broker, error) { return cl.broker(), nil })
func (cl *Client) retryable() *retryable {
return cl.retryableBrokerFn(func() (*broker, error) { return cl.broker(), nil })
}

func (cl *Client) retriableBrokerFn(fn func() (*broker, error)) *retriable {
return &retriable{cl: cl, br: fn}
func (cl *Client) retryableBrokerFn(fn func() (*broker, error)) *retryable {
return &retryable{cl: cl, br: fn}
}

func (cl *Client) shouldRetry(tries int, err error) bool {
return (kerr.IsRetriable(err) || isRetriableBrokerErr(err)) && int64(tries) < cl.cfg.retries
return (kerr.IsRetriable(err) || isRetryableBrokerErr(err)) && int64(tries) < cl.cfg.retries
}

func (cl *Client) shouldRetryNext(tries int, err error) bool {
return isSkippableBrokerErr(err) && int64(tries) < cl.cfg.retries
}

type retriable struct {
type retryable struct {
cl *Client
br func() (*broker, error)
last *broker
Expand Down Expand Up @@ -848,7 +848,7 @@ func (d *failDial) isRepeatedDialFail(err error) bool {
return false
}

func (r *retriable) Request(ctx context.Context, req kmsg.Request) (kmsg.Response, error) {
func (r *retryable) Request(ctx context.Context, req kmsg.Request) (kmsg.Response, error) {
tries := 0
tryStart := time.Now()
retryTimeout := r.cl.cfg.retryTimeout(req.Key())
Expand All @@ -871,8 +871,8 @@ start:
if r.limitRetries == 0 || tries < r.limitRetries {
backoff := r.cl.cfg.retryBackoff(tries)
if retryTimeout == 0 || time.Now().Add(backoff).Sub(tryStart) <= retryTimeout {
// If this broker / request had a retriable error, we can
// just retry now. If the error is *not* retriable but
// If this broker / request had a retryable error, we can
// just retry now. If the error is *not* retryable but
// is a broker-specific network error, and the next
// broker is different than the current, we also retry.
if r.cl.shouldRetry(tries, err) || r.cl.shouldRetry(tries, retryErr) {
Expand Down Expand Up @@ -1033,8 +1033,8 @@ func (cl *Client) shardedRequest(ctx context.Context, req kmsg.Request) ([]Respo
}

// All other requests not handled above can be issued to any broker
// with the default retriable logic.
r := cl.retriable()
// with the default retryable logic.
r := cl.retryable()
resp, err := r.Request(ctx, req)
return shards(shard(r.last, req, resp, err)), nil
}
Expand Down Expand Up @@ -1346,7 +1346,7 @@ func (cl *Client) handleAdminReq(ctx context.Context, req kmsg.Request) Response
// Loading a controller can perform some wait; we accept that and do
// not account for the retries or the time to load the controller as
// part of the retries / time to issue the req.
r := cl.retriableBrokerFn(func() (*broker, error) {
r := cl.retryableBrokerFn(func() (*broker, error) {
return cl.controller(ctx)
})

Expand Down Expand Up @@ -1442,7 +1442,7 @@ func (cl *Client) handleCoordinatorReq(ctx context.Context, req kmsg.Request) Re
}
// InitProducerID can go to any broker if the transactional ID
// is nil. By using handleReqWithCoordinator, we get the
// retriable-error parsing, even though we are not actually
// retryable-error parsing, even though we are not actually
// using a defined txn coordinator. This is fine; by passing no
// names, we delete no coordinator.
coordinator, resp, err := cl.handleReqWithCoordinator(ctx, func() (*broker, error) { return cl.broker(), nil }, coordinatorTypeTxn, "", req)
Expand Down Expand Up @@ -1478,7 +1478,7 @@ func (cl *Client) handleCoordinatorReq(ctx context.Context, req kmsg.Request) Re
// handleCoordinatorReqSimple issues a request that contains a single group or
// txn to its coordinator.
//
// The error is inspected to see if it is a retriable error and, if so, the
// The error is inspected to see if it is a retryable error and, if so, the
// coordinator is deleted.
func (cl *Client) handleCoordinatorReqSimple(ctx context.Context, typ int8, name string, req kmsg.Request) ResponseShard {
coordinator, resp, err := cl.handleReqWithCoordinator(ctx, func() (*broker, error) {
Expand All @@ -1498,7 +1498,7 @@ func (cl *Client) handleReqWithCoordinator(
name string, // group ID or the transactional id
req kmsg.Request,
) (*broker, kmsg.Response, error) {
r := cl.retriableBrokerFn(coordinator)
r := cl.retryableBrokerFn(coordinator)
var d failDial
r.parseRetryErr = func(resp kmsg.Response, err error) error {
if err != nil {
Expand Down Expand Up @@ -1647,7 +1647,7 @@ func (b *Broker) Request(ctx context.Context, req kmsg.Request) (kmsg.Response,
}

// RetriableRequest issues a request to a broker the same as Broker, but
// retries in the face of retriable broker connection errors. This does not
// retries in the face of retryable broker connection errors. This does not
// retry on response internal errors.
func (b *Broker) RetriableRequest(ctx context.Context, req kmsg.Request) (kmsg.Response, error) {
return b.request(ctx, true, req)
Expand All @@ -1670,7 +1670,7 @@ func (b *Broker) request(ctx context.Context, retry bool, req kmsg.Request) (kms
resp, err = br.waitResp(ctx, req)
}
} else {
resp, err = b.cl.retriableBrokerFn(func() (*broker, error) {
resp, err = b.cl.retryableBrokerFn(func() (*broker, error) {
return b.cl.brokerOrErr(ctx, b.id, errUnknownBroker)
}).Request(ctx, req)
}
Expand Down Expand Up @@ -1713,7 +1713,7 @@ type sharder interface {
// is some pre-loading that needs to happen. If an error is returned,
// the request that was intended for splitting is failed wholesale.
//
// Due to sharded requests not being retriable if a response is
// Due to sharded requests not being retryable if a response is
// received, to avoid stale coordinator errors, this function should
// not use any previously cached metadata.
//
Expand Down Expand Up @@ -1878,7 +1878,7 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res

// If we failed to issue the request, we *maybe* will retry.
// We could have failed to even issue the request or receive
// a response, which is retriable.
// a response, which is retryable.
backoff := cl.cfg.retryBackoff(tries)
if err != nil &&
(retryTimeout == 0 || time.Now().Add(backoff).Sub(start) < retryTimeout) &&
Expand All @@ -1896,7 +1896,7 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res
return
}

addShard(shard(broker, myUnderlyingReq, resp, err)) // the error was not retriable
addShard(shard(broker, myUnderlyingReq, resp, err)) // the error was not retryable
}()
}
}
Expand All @@ -1907,13 +1907,13 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res
return shards, sharder.merge
}

// For sharded errors, we prefer to keep retriable errors rather than
// non-retriable errors. We keep the non-retriable if everything is
// non-retriable.
// For sharded errors, we prefer to keep retryable errors rather than
// non-retryable errors. We keep the non-retryable if everything is
// non-retryable.
//
// We favor retriable because retriable means we used a stale cache value; we
// We favor retryable because retryable means we used a stale cache value; we
// clear the stale entries on failure and the retry uses fresh data. The
// request will be split and remapped, and the non-retriable errors will be
// request will be split and remapped, and the non-retryable errors will be
// encountered again.
func onRespShardErr(err *error, newKerr error) {
if newKerr == nil || *err != nil && kerr.IsRetriable(*err) {
Expand Down Expand Up @@ -3175,7 +3175,7 @@ func (*describeConfigsSharder) shard(_ context.Context, kreq kmsg.Request, _ err
return issues, false, nil // this is not reshardable, but the any block can go anywhere
}

func (*describeConfigsSharder) onResp(kmsg.Request, kmsg.Response) error { return nil } // configs: topics not mapped, nothing retriable
func (*describeConfigsSharder) onResp(kmsg.Request, kmsg.Response) error { return nil } // configs: topics not mapped, nothing retryable

func (*describeConfigsSharder) merge(sresps []ResponseShard) (kmsg.Response, error) {
merged := kmsg.NewPtrDescribeConfigsResponse()
Expand Down Expand Up @@ -3238,7 +3238,7 @@ func (*alterConfigsSharder) shard(_ context.Context, kreq kmsg.Request, _ error)
return issues, false, nil // this is not reshardable, but the any block can go anywhere
}

func (*alterConfigsSharder) onResp(kmsg.Request, kmsg.Response) error { return nil } // configs: topics not mapped, nothing retriable
func (*alterConfigsSharder) onResp(kmsg.Request, kmsg.Response) error { return nil } // configs: topics not mapped, nothing retryable

func (*alterConfigsSharder) merge(sresps []ResponseShard) (kmsg.Response, error) {
merged := kmsg.NewPtrAlterConfigsResponse()
Expand Down Expand Up @@ -3647,7 +3647,7 @@ func (*incrementalAlterConfigsSharder) shard(_ context.Context, kreq kmsg.Reques
return issues, false, nil // this is not reshardable, but the any block can go anywhere
}

func (*incrementalAlterConfigsSharder) onResp(kmsg.Request, kmsg.Response) error { return nil } // configs: topics not mapped, nothing retriable
func (*incrementalAlterConfigsSharder) onResp(kmsg.Request, kmsg.Response) error { return nil } // configs: topics not mapped, nothing retryable

func (*incrementalAlterConfigsSharder) merge(sresps []ResponseShard) (kmsg.Response, error) {
merged := kmsg.NewPtrIncrementalAlterConfigsResponse()
Expand Down
2 changes: 1 addition & 1 deletion pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,7 @@ func RetryBackoffFn(backoff func(int) time.Duration) Opt {
return clientOpt{func(cfg *cfg) { cfg.retryBackoff = backoff }}
}

// RequestRetries sets the number of tries that retriable requests are allowed,
// RequestRetries sets the number of tries that retryable requests are allowed,
// overriding the default of 20.
//
// This option does not apply to produce requests; to limit produce request
Expand Down
12 changes: 6 additions & 6 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1603,14 +1603,14 @@ func (s *consumerSession) listOrEpoch(waiting listOrEpochLoads, immediate bool,
// Called within a consumer session, this function handles results from list
// offsets or epoch loads and returns any loads that should be retried.
//
// To us, all errors are reloadable. We either have request level retriable
// errors (unknown partition, etc) or non-retriable errors (auth), or we have
// To us, all errors are reloadable. We either have request level retryable
// errors (unknown partition, etc) or non-retryable errors (auth), or we have
// request issuing errors (no dial, connection cut repeatedly).
//
// For retriable request errors, we may as well back off a little bit to allow
// For retryable request errors, we may as well back off a little bit to allow
// Kafka to harmonize if the topic exists / etc.
//
// For non-retriable request errors, we may as well retry to both (a) allow the
// For non-retryable request errors, we may as well retry to both (a) allow the
// user more signals about a problem that they can maybe fix within Kafka (i.e.
// the auth), and (b) force the user to notice errors.
//
Expand Down Expand Up @@ -1676,7 +1676,7 @@ func (s *consumerSession) handleListOrEpochResults(loaded loadedOffsets) (reload

default: // from ErrorCode in a response
reloads.addLoad(load.topic, load.partition, loaded.loadType, load.request)
if !kerr.IsRetriable(load.err) && !isRetriableBrokerErr(load.err) && !isDialErr(load.err) { // non-retriable response error; signal such in a response
if !kerr.IsRetriable(load.err) && !isRetryableBrokerErr(load.err) && !isDialErr(load.err) { // non-retryable response error; signal such in a response
s.c.addFakeReadyForDraining(load.topic, load.partition, load.err)
}

Expand Down Expand Up @@ -1758,7 +1758,7 @@ type loadedOffset struct {
leaderEpoch int32

// Any error encountered for loading this partition, or for epoch
// loading, potentially ErrDataLoss. If this error is not retriable, we
// loading, potentially ErrDataLoss. If this error is not retryable, we
// avoid reloading the offset and instead inject a fake partition for
// PollFetches containing this error.
err error
Expand Down
16 changes: 8 additions & 8 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -1470,7 +1470,7 @@ start:
return ctx.Err()
}
if err != nil {
g.cfg.logger.Log(LogLevelError, "fetch offsets failed with non-retriable error", "group", g.cfg.group, "err", err)
g.cfg.logger.Log(LogLevelError, "fetch offsets failed with non-retryable error", "group", g.cfg.group, "err", err)
return err
}

Expand Down Expand Up @@ -2204,8 +2204,8 @@ func PreCommitFnContext(ctx context.Context, fn func(*kmsg.OffsetCommitRequest)
}

// CommitRecords issues a synchronous offset commit for the offsets contained
// within rs. Retriable errors are retried up to the configured retry limit,
// and any unretriable error is returned.
// within rs. Retryable errors are retried up to the configured retry limit,
// and any unretryable error is returned.
//
// This function is useful as a simple way to commit offsets if you have
// disabled autocommitting. As an alternative if you always want to commit
Expand Down Expand Up @@ -2257,7 +2257,7 @@ func (cl *Client) CommitRecords(ctx context.Context, rs ...*Record) error {
var rerr error // return error

// Our client retries an OffsetCommitRequest as necessary if the first
// response partition has a retriable group error (group coordinator
// response partition has a retryable group error (group coordinator
// loading, etc), so any partition error is fatal.
cl.CommitOffsetsSync(ctx, offsets, func(_ *Client, _ *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) {
if err != nil {
Expand Down Expand Up @@ -2328,8 +2328,8 @@ func (cl *Client) MarkCommitRecords(rs ...*Record) {

// CommitUncommittedOffsets issues a synchronous offset commit for any
// partition that has been consumed from that has uncommitted offsets.
// Retriable errors are retried up to the configured retry limit, and any
// unretriable error is returned.
// Retryable errors are retried up to the configured retry limit, and any
// unretryable error is returned.
//
// The recommended pattern for using this function is to have a poll / process
// / commit loop. First PollFetches, then process every record, then call
Expand All @@ -2343,8 +2343,8 @@ func (cl *Client) CommitUncommittedOffsets(ctx context.Context) error {

// CommitMarkedOffsets issues a synchronous offset commit for any
// partition that has been consumed from that has marked offsets.
// Retriable errors are retried up to the configured retry limit, and any
// unretriable error is returned.
// Retryable errors are retried up to the configured retry limit, and any
// unretryable error is returned.
//
// This function is useful if you have marked offsets with MarkCommitRecords
// when using AutoCommitMarks.
Expand Down
10 changes: 5 additions & 5 deletions pkg/kgo/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import (
"os"
)

func isRetriableBrokerErr(err error) bool {
func isRetryableBrokerErr(err error) bool {
// The error could be nil if we are evaluating multiple errors at once,
// and only one is non-nil. The intent of this function is to evaluate
// whether an **error** is retriable, not a non-error. We return that
// nil is not retriable -- the calling code evaluating multiple errors
// whether an **error** is retryable, not a non-error. We return that
// nil is not retryable -- the calling code evaluating multiple errors
// at once would not call into this function if all errors were nil.
if err == nil {
return false
Expand Down Expand Up @@ -51,7 +51,7 @@ func isRetriableBrokerErr(err error) bool {
if isNetClosedErr(err) || errors.Is(err, io.EOF) {
return true
}
// We could have a retriable producer ID failure, which then bubbled up
// We could have a retryable producer ID failure, which then bubbled up
// as errProducerIDLoadFail so as to be retried later.
if errors.Is(err, errProducerIDLoadFail) {
return true
Expand Down Expand Up @@ -94,7 +94,7 @@ func isDialErr(err error) bool {
}

func isSkippableBrokerErr(err error) bool {
// Some broker errors are not retriable for the given broker itself,
// Some broker errors are not retryable for the given broker itself,
// but we *could* skip the broker and try again on the next broker. For
// example, if the user input an invalid address and a valid address
// for seeds, when we fail dialing the first seed, we cannot retry that
Expand Down
4 changes: 2 additions & 2 deletions pkg/kgo/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ func (cl *Client) fetchTopicMetadata(all bool, reqTopics []string) (map[string]*
// mergeTopicPartitions merges a new topicPartition into an old and returns
// whether the metadata update that caused this merge needs to be retried.
//
// Retries are necessary if the topic or any partition has a retriable error.
// Retries are necessary if the topic or any partition has a retryable error.
func (cl *Client) mergeTopicPartitions(
topic string,
l *topicPartitions,
Expand Down Expand Up @@ -693,7 +693,7 @@ func (cl *Client) mergeTopicPartitions(

// Like above for the entire topic, an individual partittion
// can have a load error. Unlike for the topic, individual
// partition errors are always retriable.
// partition errors are always retryable.
//
// If the load errored, we keep all old information minus the
// load error itself (the new load will have no information).
Expand Down
Loading

0 comments on commit 335cd47

Please sign in to comment.