Skip to content

Commit

Permalink
kgo: avoid counting pinReq version failures against retries
Browse files Browse the repository at this point in the history
Technically, a request that goes to the sharding logic that is pinned to
a minimum version that fails due to errBrokerTooOld has never been
issued. Currently, if a request is enqueued for a long time due to the
client being throttled, it can fail with errBrokerTooOld even though it
was never issued.

Instead, if the request is pinned and the error is errBrokerTooOld, we
always retry immediately (up to 3 times, for safety).
  • Loading branch information
twmb committed Apr 13, 2023
1 parent c3c8c69 commit 88fa883
Showing 1 changed file with 7 additions and 2 deletions.
9 changes: 7 additions & 2 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2151,10 +2151,15 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res
// If we failed to issue the request, we *maybe* will retry.
// We could have failed to even issue the request or receive
// a response, which is retryable.
//
// If a pinned req fails with errBrokerTooOld, we always retry
// immediately. The request was not even issued. However, as a
// safety, we only do this 3 times to avoid some super weird
// pathological spin loop.
backoff := cl.cfg.retryBackoff(tries)
if err != nil &&
(retryTimeout == 0 || time.Now().Add(backoff).Sub(start) < retryTimeout) &&
(reshardable && isPinned && errors.Is(err, errBrokerTooOld) || cl.shouldRetry(tries, err) && cl.waitTries(ctx, backoff)) {
(reshardable && isPinned && errors.Is(err, errBrokerTooOld) && tries <= 3) ||
(retryTimeout == 0 || time.Now().Add(backoff).Sub(start) < retryTimeout) && cl.shouldRetry(tries, err) && cl.waitTries(ctx, backoff) {
// Non-reshardable re-requests just jump back to the
// top where the broker is loaded. This is the case on
// requests where the original request is split to
Expand Down

0 comments on commit 88fa883

Please sign in to comment.