From a3c2a5cd9f1ab748d5cf87cffb9fd93d727b19f2 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Wed, 21 Apr 2021 22:17:06 -0600 Subject: [PATCH] consumer: backoff when list offsets or load epoch has any err We used to loop immediately to retry if we had any retriable error. We may as well just always backoff to ensure we have no request spinloops. --- pkg/kgo/consumer.go | 42 ++++++++++++++++++++------------------- pkg/kgo/consumer_group.go | 2 ++ 2 files changed, 24 insertions(+), 20 deletions(-) diff --git a/pkg/kgo/consumer.go b/pkg/kgo/consumer.go index 714faf95..edc51283 100644 --- a/pkg/kgo/consumer.go +++ b/pkg/kgo/consumer.go @@ -1032,16 +1032,24 @@ func (s *consumerSession) listOrEpoch(waiting listOrEpochLoads, immediate bool) // Called within a consumer session, this function handles results from list // offsets or epoch loads. func (s *consumerSession) handleListOrEpochResults(loaded loadedOffsets) { - var ( - // Retriable errors are retried immediately, while - // non-retriable errors are retried after a 1s backoff. It is - // unlikely that the client will be able to recover, but we may - // as well fetch every second to (a) force the user to notice - // errors, and (b) allow the user to auth the client at - // runtime. - reloads listOrEpochLoads - slowReloads listOrEpochLoads - ) + // All errors are retried after a 1s backoff. We either have request + // level retriable errors (unknown partition, etc) or non-retriable + // errors (auth), or we have request issuing errors (no dial, + // connection cut repeatedly). + // + // For retriable request errors, we may as well back off a little bit + // to allow Kafka to harmonize if the topic exists / etc. + // + // For non-retriable request errors, we may as well retry to both (a) + // allow the user more signals about a problem that they can maybe fix + // within Kafka (i.e. the auth), and (b) force the user to notice + // errors. + // + // For request issuing errors, we may as well continue to retry because + // there is not much else we can do. RequestWith already retries, but + // returns when the retry limit is hit. We will backoff 1s and then + // allow RequestWith to continue requesting and backing off. + var reloads listOrEpochLoads defer func() { // When we are done handling results, we have finished loading // all the topics and partitions. We remove them from tracking @@ -1052,21 +1060,17 @@ func (s *consumerSession) handleListOrEpochResults(loaded loadedOffsets) { } s.listOrEpochMu.Unlock() - // We now add our immediate reloads back to the session. We are - // still in the context of the live session itself because this - // handling function is run with a session worker. - reloads.loadWithSession(s) - if !slowReloads.isEmpty() { + if !reloads.isEmpty() { s.incWorker() go func() { - // Before we dec our worker, we must add the slow + // Before we dec our worker, we must add the // reloads back into the session's waiting loads. // Doing so allows a concurrent stopSession to // track the waiting loads, whereas if we did not // add things back to the session, we could abandon // loading these offsets and have a stuck cursor. defer s.decWorker() - defer slowReloads.loadWithSession(s) + defer reloads.loadWithSession(s) after := time.NewTimer(time.Second) defer after.Stop() select { @@ -1097,12 +1101,10 @@ func (s *consumerSession) handleListOrEpochResults(loaded loadedOffsets) { use() default: // from ErrorCode in a response + reloads.addLoad(load.topic, load.partition, loaded.loadType, load.request) if !kerr.IsRetriable(load.err) && !isRetriableBrokerErr(load.err) { // non-retriable response error; signal such in a response s.c.addFakeReadyForDraining(load.topic, load.partition, load.err) - slowReloads.addLoad(load.topic, load.partition, loaded.loadType, load.request) - continue } - reloads.addLoad(load.topic, load.partition, loaded.loadType, load.request) } } } diff --git a/pkg/kgo/consumer_group.go b/pkg/kgo/consumer_group.go index 334757de..4d3ba513 100644 --- a/pkg/kgo/consumer_group.go +++ b/pkg/kgo/consumer_group.go @@ -517,6 +517,8 @@ func (g *groupConsumer) leave() (wait func()) { "group", g.id, "memberID", g.memberID, // lock not needed now since nothing can change it (manageDone) ) + // If we error when leaving, there is not much + // we can do. We may as well just return. (&kmsg.LeaveGroupRequest{ Group: g.id, MemberID: g.memberID,