Skip to content

Commit

Permalink
consuming: add NoResetOffset
Browse files Browse the repository at this point in the history
This matches Kafka's "none" for the consume reset offset option: when
OffsetOutOfRange is encountered, the partition being consumed enters an
unrecoverable state.
  • Loading branch information
twmb committed Mar 1, 2022
1 parent 4e0e1d7 commit c763c9b
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 15 deletions.
3 changes: 3 additions & 0 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1177,6 +1177,9 @@ func MaxConcurrentFetches(n int) ConsumerOpt {
// offset is before the partition's log start offset or after the high
// watermark, this will reset to the start offset or end offset, respectively.
// Relative offsets are only obeyed if they fall within bounds.
//
// You can use the NoResetOffset to change the behavior of the client to enter
// a fatal state when OffsetOutOfRange is encountered.
func ConsumeResetOffset(offset Offset) ConsumerOpt {
return consumerOpt{func(cfg *cfg) { cfg.resetOffset = offset }}
}
Expand Down
21 changes: 21 additions & 0 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"math"
"sort"
"strings"
"sync"
Expand Down Expand Up @@ -52,6 +53,26 @@ func NewOffset() Offset {
}
}

var noResetOffset = Offset{
at: math.MinInt64,
relative: math.MinInt64,
epoch: math.MinInt32,
currentEpoch: math.MinInt32,
}

// NoResetOffset returns an offset that can be used as a "none" option for the
// ConsumeResetOffset option. The returned offset should not be modified; if it
// is, it will no longer be the no reset offset.
//
// Using this offset will make it such that if OffsetOutOfRange is ever
// encountered while consuming, rather than trying to recover, the client will
// return the error to the user. Since the client does not record, the error
// will forever be encountered and the partition is effectively in a fatal
// state.
func NoResetOffset() Offset {
return noResetOffset
}

// AtStart returns a copy of the calling offset, changing the returned offset
// to begin at the beginning of a partition.
func (o Offset) AtStart() Offset {
Expand Down
38 changes: 23 additions & 15 deletions pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -849,18 +849,29 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe
// rare". Rather than falling back to listing offsets,
// we stay in a cycle of validating the leader epoch
// until the follower has caught up.
//
// In all cases except case 4, we also have to check if
// no reset offset was configured. If so, we ignore
// trying to reset and instead keep our failed partition.
addList := func(replica int32) {
if s.cl.cfg.resetOffset == noResetOffset {
keep = true
} else {
reloadOffsets.addLoad(topic, partition, loadTypeList, offsetLoad{
replica: replica,
Offset: s.cl.cfg.resetOffset,
})
}
}

if s.nodeID == partOffset.from.leader { // non KIP-392 case
reloadOffsets.addLoad(topic, partition, loadTypeList, offsetLoad{
replica: -1,
Offset: s.cl.cfg.resetOffset,
})
} else if partOffset.offset < fp.LogStartOffset { // KIP-392 case 3
reloadOffsets.addLoad(topic, partition, loadTypeList, offsetLoad{
replica: s.nodeID,
Offset: s.cl.cfg.resetOffset,
})
} else { // partOffset.offset > fp.HighWatermark, KIP-392 case 4
switch {
case s.nodeID == partOffset.from.leader: // non KIP-392 case
addList(-1)

case partOffset.offset < fp.LogStartOffset: // KIP-392 case 3
addList(s.nodeID)

default: // partOffset.offset > fp.HighWatermark, KIP-392 case 4
if kip320 {
reloadOffsets.addLoad(topic, partition, loadTypeEpoch, offsetLoad{
replica: -1,
Expand All @@ -873,10 +884,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe
// If the broker does not support offset for leader epoch but
// does support follower fetching for some reason, we have to
// fallback to listing.
reloadOffsets.addLoad(topic, partition, loadTypeList, offsetLoad{
replica: -1,
Offset: s.cl.cfg.resetOffset,
})
addList(-1)
}
}

Expand Down

0 comments on commit c763c9b

Please sign in to comment.