From 055b3499c309f84d6004438b459cd9dc269634fc Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Tue, 4 Apr 2023 16:20:56 -0600 Subject: [PATCH] kgo consumer: do not use the partition epoch when assigning offsets See comment; this is a little bit confusing to explain though. --- pkg/kgo/consumer.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/pkg/kgo/consumer.go b/pkg/kgo/consumer.go index 09e6464e..715a964b 100644 --- a/pkg/kgo/consumer.go +++ b/pkg/kgo/consumer.go @@ -1001,8 +1001,13 @@ func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how } // If an exact offset is specified and we have loaded - // the partition, we use it. Without an epoch, if it is - // out of bounds, we just reset appropriately. + // the partition, we use it. We have to use epoch -1 + // rather than the latest loaded epoch on the partition + // because the offset being requested to use could be + // from an epoch after OUR loaded epoch. Otherwise, we + // could update the metadata, see the later epoch, + // request the end offset for our prior epoch, and then + // think data loss occurred. // // If an offset is unspecified or we have not loaded // the partition, we list offsets to find out what to @@ -1012,7 +1017,7 @@ func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how cursor := part.cursor cursor.setOffset(cursorOffset{ offset: offset.at, - lastConsumedEpoch: part.leaderEpoch, + lastConsumedEpoch: -1, }) cursor.allowUsable() c.usingCursors.use(cursor)