Skip to content

Commit

Permalink
bugfix consumer: do not buffer partitions we reload
Browse files Browse the repository at this point in the history
There are two cases where we delete a used offset from our usedOffsets
map: when the response indicated a partition needs to be fetched on a
different replica, and when a response indicated the partition has
either the OffsetOutOfRange error or FencedLeaderEpoch error.

In the first case, we already avoided processing the response partition
& keeping it. There was nothing to process.

In the second case, we always kept the partition and returned it to
users. We would clear the error, which we handled internally, but
returning the partition potentially allowed giving the user some high /
low watermarks. Odds are that the watermakrs actually were not usable
because of the partition error, and keeping the partition resulting in
problems with PollRecords.

If we returns the partition to users, but deleted it from our own used
offsets, then takeNBuffered would panic. takeNBuffered would lookup the
taken partition in the used offsets and blindly use the offset, but the
offset would be nil.

We now keep the partition & return it to users only if it has no error
or an error we are not internally handling. This means waste for users
to process, and no bug in takeNBuffered.

I've tested this locally, causing the panic by internally setting
kerr.OffsetOutOfRange, and then ensuring this patch fixes it.

Closes #87.
  • Loading branch information
twmb committed Sep 22, 2021
1 parent 631cabe commit 847aeb9
Showing 1 changed file with 9 additions and 9 deletions.
18 changes: 9 additions & 9 deletions pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,6 @@ func (s *source) takeNBuffered(n int) (Fetch, int, bool) {

rt.Partitions = append(rt.Partitions, *p)
rp := &rt.Partitions[len(rt.Partitions)-1]
rp.Records = nil

take := n
if take > len(p.Records) {
Expand Down Expand Up @@ -799,17 +798,20 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe
continue
}

fetchTopic.Partitions = append(fetchTopic.Partitions, partOffset.processRespPartition(br, resp.Version, rp, s.cl.decompressor, s.cl.cfg.hooks))
fp := &fetchTopic.Partitions[len(fetchTopic.Partitions)-1]
fp := partOffset.processRespPartition(br, resp.Version, rp, s.cl.decompressor, s.cl.cfg.hooks)
updateMeta = updateMeta || fp.Err != nil

// We only keep the partition if it has no error, or an
// error we do not internally retry.
var keep bool
switch fp.Err {
default:
// - bad auth
// - unsupported compression
// - unsupported message version
// - unknown error
// - or, no error
keep = true

case kerr.UnknownTopicOrPartition,
kerr.NotLeaderForPartition,
Expand All @@ -818,11 +820,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe
kerr.UnknownLeaderEpoch, // our meta is newer than broker we fetched from
kerr.OffsetNotAvailable: // fetched from out of sync replica or a behind in-sync one (KIP-392: case 1 and case 2)

fp.Err = nil // recoverable with client backoff; hide the error

case kerr.OffsetOutOfRange:
fp.Err = nil

// If we are out of range, we reset to what we can.
// With Kafka >= 2.1.0, we should only get offset out
// of range if we fetch before the start, but a user
Expand Down Expand Up @@ -876,8 +874,6 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe
}

case kerr.FencedLeaderEpoch:
fp.Err = nil

// With fenced leader epoch, we notify an error only
// if necessary after we find out if loss occurred.
// If we have consumed nothing, then we got unlucky
Expand All @@ -897,6 +893,10 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe
})
}
}

if keep {
fetchTopic.Partitions = append(fetchTopic.Partitions, fp)
}
}

if len(fetchTopic.Partitions) > 0 {
Expand Down

0 comments on commit 847aeb9

Please sign in to comment.