From 847aeb918ebc35a059cb2977424f16272c4e81bd Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Wed, 22 Sep 2021 10:39:44 -0600 Subject: [PATCH] bugfix consumer: do not buffer partitions we reload There are two cases where we delete a used offset from our usedOffsets map: when the response indicated a partition needs to be fetched on a different replica, and when a response indicated the partition has either the OffsetOutOfRange error or FencedLeaderEpoch error. In the first case, we already avoided processing the response partition & keeping it. There was nothing to process. In the second case, we always kept the partition and returned it to users. We would clear the error, which we handled internally, but returning the partition potentially allowed giving the user some high / low watermarks. Odds are that the watermakrs actually were not usable because of the partition error, and keeping the partition resulting in problems with PollRecords. If we returns the partition to users, but deleted it from our own used offsets, then takeNBuffered would panic. takeNBuffered would lookup the taken partition in the used offsets and blindly use the offset, but the offset would be nil. We now keep the partition & return it to users only if it has no error or an error we are not internally handling. This means waste for users to process, and no bug in takeNBuffered. I've tested this locally, causing the panic by internally setting kerr.OffsetOutOfRange, and then ensuring this patch fixes it. Closes #87. --- pkg/kgo/source.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index e9144c9a..1ba3c462 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -362,7 +362,6 @@ func (s *source) takeNBuffered(n int) (Fetch, int, bool) { rt.Partitions = append(rt.Partitions, *p) rp := &rt.Partitions[len(rt.Partitions)-1] - rp.Records = nil take := n if take > len(p.Records) { @@ -799,10 +798,12 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe continue } - fetchTopic.Partitions = append(fetchTopic.Partitions, partOffset.processRespPartition(br, resp.Version, rp, s.cl.decompressor, s.cl.cfg.hooks)) - fp := &fetchTopic.Partitions[len(fetchTopic.Partitions)-1] + fp := partOffset.processRespPartition(br, resp.Version, rp, s.cl.decompressor, s.cl.cfg.hooks) updateMeta = updateMeta || fp.Err != nil + // We only keep the partition if it has no error, or an + // error we do not internally retry. + var keep bool switch fp.Err { default: // - bad auth @@ -810,6 +811,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe // - unsupported message version // - unknown error // - or, no error + keep = true case kerr.UnknownTopicOrPartition, kerr.NotLeaderForPartition, @@ -818,11 +820,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe kerr.UnknownLeaderEpoch, // our meta is newer than broker we fetched from kerr.OffsetNotAvailable: // fetched from out of sync replica or a behind in-sync one (KIP-392: case 1 and case 2) - fp.Err = nil // recoverable with client backoff; hide the error - case kerr.OffsetOutOfRange: - fp.Err = nil - // If we are out of range, we reset to what we can. // With Kafka >= 2.1.0, we should only get offset out // of range if we fetch before the start, but a user @@ -876,8 +874,6 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe } case kerr.FencedLeaderEpoch: - fp.Err = nil - // With fenced leader epoch, we notify an error only // if necessary after we find out if loss occurred. // If we have consumed nothing, then we got unlucky @@ -897,6 +893,10 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe }) } } + + if keep { + fetchTopic.Partitions = append(fetchTopic.Partitions, fp) + } } if len(fetchTopic.Partitions) > 0 {