Skip to content

Commit

Permalink
consumer: allow list offsets v0 to work
Browse files Browse the repository at this point in the history
On Kafka v0.10.0 and under, list offsets v0 is used and we need to look
at OldStyleOffsets.
  • Loading branch information
twmb committed Jan 29, 2021
1 parent c7caea1 commit 0ff08da
Showing 1 changed file with 4 additions and 0 deletions.
4 changes: 4 additions & 0 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -990,6 +990,9 @@ func (cl *Client) listOffsetsForBrokerLoad(ctx context.Context, broker *broker,
}

offset := rPartition.Offset + loadPart.relative
if len(rPartition.OldStyleOffsets) > 0 { // if we have any, we used list offsets v0
offset = rPartition.OldStyleOffsets[0] + loadPart.relative
}
if loadPart.at >= 0 {
offset = loadPart.at + loadPart.relative // we obey exact requests, even if they end up past the end
}
Expand Down Expand Up @@ -1104,6 +1107,7 @@ func (o offsetLoadMap) buildListReq(isolationLevel int8) *kmsg.ListOffsetsReques
Partition: partition,
CurrentLeaderEpoch: offset.currentEpoch, // KIP-320
Timestamp: offset.at,
MaxNumOffsets: 1,
})
}
req.Topics = append(req.Topics, kmsg.ListOffsetsRequestTopic{
Expand Down

0 comments on commit 0ff08da

Please sign in to comment.