From 0ff08dade21e691be6ddb3cf73c6bbe20efbcd6f Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Thu, 28 Jan 2021 14:54:01 -0700 Subject: [PATCH] consumer: allow list offsets v0 to work On Kafka v0.10.0 and under, list offsets v0 is used and we need to look at OldStyleOffsets. --- pkg/kgo/consumer.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/kgo/consumer.go b/pkg/kgo/consumer.go index c45afff2..7453c721 100644 --- a/pkg/kgo/consumer.go +++ b/pkg/kgo/consumer.go @@ -990,6 +990,9 @@ func (cl *Client) listOffsetsForBrokerLoad(ctx context.Context, broker *broker, } offset := rPartition.Offset + loadPart.relative + if len(rPartition.OldStyleOffsets) > 0 { // if we have any, we used list offsets v0 + offset = rPartition.OldStyleOffsets[0] + loadPart.relative + } if loadPart.at >= 0 { offset = loadPart.at + loadPart.relative // we obey exact requests, even if they end up past the end } @@ -1104,6 +1107,7 @@ func (o offsetLoadMap) buildListReq(isolationLevel int8) *kmsg.ListOffsetsReques Partition: partition, CurrentLeaderEpoch: offset.currentEpoch, // KIP-320 Timestamp: offset.at, + MaxNumOffsets: 1, }) } req.Topics = append(req.Topics, kmsg.ListOffsetsRequestTopic{