From 6c0abd1f3f25afa7b9c11e220cfe37ea53a079df Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Wed, 30 Nov 2022 21:07:08 -0700 Subject: [PATCH] source: avoid backoff / session reset when there is no consumed data We use fetch sessions by default. 2b4d7ea8 in 1.9.0 fixed a potential spin-loop, but introduced a different minor bug: if broker returned nothing to consume for our fetch session, the broker would return no partitions and we would (a) backoff, (b) trigger a metadata update, and (c) reset our fetch session. The fix changed the source to back off if all partitions were stripped due to retriable errors, but our check was against the number of partitions in the response. We now check against the number of partitions in the request (numOffsets). --- pkg/kgo/source.go | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index c3cf6648..87b00773 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -775,12 +775,9 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe ) { f = Fetch{Topics: make([]FetchTopic, 0, len(resp.Topics))} var ( - updateWhy multiUpdateWhy - - numParts int + updateWhy multiUpdateWhy numErrsStripped int - - kip320 = s.cl.supportsOffsetForLeaderEpoch() + kip320 = s.cl.supportsOffsetForLeaderEpoch() ) for _, rt := range resp.Topics { @@ -821,8 +818,6 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe continue } - numParts++ - // If we are fetching from the replica already, Kafka replies with a -1 // preferred read replica. If Kafka replies with a preferred replica, // it sends no records. @@ -953,7 +948,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe } } - return f, reloadOffsets, preferreds, numParts == numErrsStripped, updateMeta, updateWhy.reason("fetch had inner topic errors") + return f, reloadOffsets, preferreds, req.numOffsets == numErrsStripped, updateMeta, updateWhy.reason("fetch had inner topic errors") } // processRespPartition processes all records in all potentially compressed