diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index e9144c9a..1ba3c462 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -362,7 +362,6 @@ func (s *source) takeNBuffered(n int) (Fetch, int, bool) { rt.Partitions = append(rt.Partitions, *p) rp := &rt.Partitions[len(rt.Partitions)-1] - rp.Records = nil take := n if take > len(p.Records) { @@ -799,10 +798,12 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe continue } - fetchTopic.Partitions = append(fetchTopic.Partitions, partOffset.processRespPartition(br, resp.Version, rp, s.cl.decompressor, s.cl.cfg.hooks)) - fp := &fetchTopic.Partitions[len(fetchTopic.Partitions)-1] + fp := partOffset.processRespPartition(br, resp.Version, rp, s.cl.decompressor, s.cl.cfg.hooks) updateMeta = updateMeta || fp.Err != nil + // We only keep the partition if it has no error, or an + // error we do not internally retry. + var keep bool switch fp.Err { default: // - bad auth @@ -810,6 +811,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe // - unsupported message version // - unknown error // - or, no error + keep = true case kerr.UnknownTopicOrPartition, kerr.NotLeaderForPartition, @@ -818,11 +820,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe kerr.UnknownLeaderEpoch, // our meta is newer than broker we fetched from kerr.OffsetNotAvailable: // fetched from out of sync replica or a behind in-sync one (KIP-392: case 1 and case 2) - fp.Err = nil // recoverable with client backoff; hide the error - case kerr.OffsetOutOfRange: - fp.Err = nil - // If we are out of range, we reset to what we can. // With Kafka >= 2.1.0, we should only get offset out // of range if we fetch before the start, but a user @@ -876,8 +874,6 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe } case kerr.FencedLeaderEpoch: - fp.Err = nil - // With fenced leader epoch, we notify an error only // if necessary after we find out if loss occurred. // If we have consumed nothing, then we got unlucky @@ -897,6 +893,10 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe }) } } + + if keep { + fetchTopic.Partitions = append(fetchTopic.Partitions, fp) + } } if len(fetchTopic.Partitions) > 0 {