From c763c9b18df6ff0b77bed3ca839a720cd39ab2d6 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Sun, 20 Feb 2022 22:33:37 -0700 Subject: [PATCH] consuming: add NoResetOffset This matches Kafka's "none" for the consume reset offset option: when OffsetOutOfRange is encountered, the partition being consumed enters an unrecoverable state. --- pkg/kgo/config.go | 3 +++ pkg/kgo/consumer.go | 21 +++++++++++++++++++++ pkg/kgo/source.go | 38 +++++++++++++++++++++++--------------- 3 files changed, 47 insertions(+), 15 deletions(-) diff --git a/pkg/kgo/config.go b/pkg/kgo/config.go index 24aec308..20389735 100644 --- a/pkg/kgo/config.go +++ b/pkg/kgo/config.go @@ -1177,6 +1177,9 @@ func MaxConcurrentFetches(n int) ConsumerOpt { // offset is before the partition's log start offset or after the high // watermark, this will reset to the start offset or end offset, respectively. // Relative offsets are only obeyed if they fall within bounds. +// +// You can use the NoResetOffset to change the behavior of the client to enter +// a fatal state when OffsetOutOfRange is encountered. func ConsumeResetOffset(offset Offset) ConsumerOpt { return consumerOpt{func(cfg *cfg) { cfg.resetOffset = offset }} } diff --git a/pkg/kgo/consumer.go b/pkg/kgo/consumer.go index 53994940..df3da14b 100644 --- a/pkg/kgo/consumer.go +++ b/pkg/kgo/consumer.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "math" "sort" "strings" "sync" @@ -52,6 +53,26 @@ func NewOffset() Offset { } } +var noResetOffset = Offset{ + at: math.MinInt64, + relative: math.MinInt64, + epoch: math.MinInt32, + currentEpoch: math.MinInt32, +} + +// NoResetOffset returns an offset that can be used as a "none" option for the +// ConsumeResetOffset option. The returned offset should not be modified; if it +// is, it will no longer be the no reset offset. +// +// Using this offset will make it such that if OffsetOutOfRange is ever +// encountered while consuming, rather than trying to recover, the client will +// return the error to the user. Since the client does not record, the error +// will forever be encountered and the partition is effectively in a fatal +// state. +func NoResetOffset() Offset { + return noResetOffset +} + // AtStart returns a copy of the calling offset, changing the returned offset // to begin at the beginning of a partition. func (o Offset) AtStart() Offset { diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index e7727194..8c2ebc7d 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -849,18 +849,29 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe // rare". Rather than falling back to listing offsets, // we stay in a cycle of validating the leader epoch // until the follower has caught up. + // + // In all cases except case 4, we also have to check if + // no reset offset was configured. If so, we ignore + // trying to reset and instead keep our failed partition. + addList := func(replica int32) { + if s.cl.cfg.resetOffset == noResetOffset { + keep = true + } else { + reloadOffsets.addLoad(topic, partition, loadTypeList, offsetLoad{ + replica: replica, + Offset: s.cl.cfg.resetOffset, + }) + } + } - if s.nodeID == partOffset.from.leader { // non KIP-392 case - reloadOffsets.addLoad(topic, partition, loadTypeList, offsetLoad{ - replica: -1, - Offset: s.cl.cfg.resetOffset, - }) - } else if partOffset.offset < fp.LogStartOffset { // KIP-392 case 3 - reloadOffsets.addLoad(topic, partition, loadTypeList, offsetLoad{ - replica: s.nodeID, - Offset: s.cl.cfg.resetOffset, - }) - } else { // partOffset.offset > fp.HighWatermark, KIP-392 case 4 + switch { + case s.nodeID == partOffset.from.leader: // non KIP-392 case + addList(-1) + + case partOffset.offset < fp.LogStartOffset: // KIP-392 case 3 + addList(s.nodeID) + + default: // partOffset.offset > fp.HighWatermark, KIP-392 case 4 if kip320 { reloadOffsets.addLoad(topic, partition, loadTypeEpoch, offsetLoad{ replica: -1, @@ -873,10 +884,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe // If the broker does not support offset for leader epoch but // does support follower fetching for some reason, we have to // fallback to listing. - reloadOffsets.addLoad(topic, partition, loadTypeList, offsetLoad{ - replica: -1, - Offset: s.cl.cfg.resetOffset, - }) + addList(-1) } }