Skip to content

Commit

Permalink
kgo.Offset: add AtCommitted()
Browse files Browse the repository at this point in the history
This adds an Offset method that allows ConsumeResetOffset to exactly
mirror Kafka's auto.offset.reset = none
  • Loading branch information
twmb committed May 26, 2024
1 parent 2bc29e6 commit a7caf20
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 24 deletions.
16 changes: 14 additions & 2 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1237,8 +1237,10 @@ func MaxConcurrentFetches(n int) ConsumerOpt {
// from. For group consumers, this is the offset that partitions begin to
// consume from if a partition has no commits. If partitions have commits, the
// commit offset is used. While fetching, if OffsetOutOfRange is encountered,
// the partition resets to ConsumeResetOffset. Conversely, using NoResetOffset
// stops consuming a partition if the client encounters OffsetOutOfRange.
// the partition resets to ConsumeResetOffset. Using [NoResetOffset] stops
// consuming a partition if the client encounters OffsetOutOfRange. Using
// [Offset.AtCommitted] prevents consuming a partition in a group if the
// partition has no prior commits.
//
// If you use an exact offset or relative offsets and the offset ends up out of
// range, the client chooses the nearest of either the log start offset or the
Expand All @@ -1254,6 +1256,16 @@ func MaxConcurrentFetches(n int) ConsumerOpt {
// reset relative? => the above, + / - the relative amount
// reset exact or relative out of bounds? => nearest boundary (start or end)
// reset after millisec? => high watermark, or first offset after millisec if one exists
//
// To match Kafka's auto.offset.reset,
//
// NewOffset().AtStart() == auto.offset.reset "earliest"
// NewOffset().AtEnd() == auto.offset.reset "latest"
// NewOffset().AtCommitted() == auto.offset.reset "none"
//
// With the above, make sure to use NoResetOffset() if you want to stop
// consuming when you encounter OffsetOutOfRange. It is highly recommended
// to read the docs for all Offset methods to see a few other alternatives.
func ConsumeResetOffset(offset Offset) ConsumerOpt {
return consumerOpt{func(cfg *cfg) { cfg.resetOffset = offset }}
}
Expand Down
70 changes: 48 additions & 22 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ type Offset struct {
afterMilli bool
}

// Random negative, only significant within this package.
const atCommitted = -999

// MarshalJSON implements json.Marshaler.
func (o Offset) MarshalJSON() ([]byte, error) {
if o.relative == 0 {
Expand Down Expand Up @@ -54,8 +57,8 @@ func (o Offset) EpochOffset() EpochOffset {
}
}

// NewOffset creates and returns an offset to use in ConsumePartitions or
// ConsumeResetOffset.
// NewOffset creates and returns an offset to use in [ConsumePartitions] or
// [ConsumeResetOffset].
//
// The default offset begins at the end.
func NewOffset() Offset {
Expand All @@ -66,24 +69,26 @@ func NewOffset() Offset {
}

// NoResetOffset returns an offset that can be used as a "none" option for the
// ConsumeResetOffset option. By default, NoResetOffset starts consuming from
// [ConsumeResetOffset] option. By default, NoResetOffset starts consuming from
// the beginning of partitions (similar to NewOffset().AtStart()). This can be
// changed with AtEnd, Relative, etc.
//
// Using this offset will make it such that if OffsetOutOfRange is ever
// encountered while consuming, rather than trying to recover, the client will
// return the error to the user and enter a fatal state (for the partition).
// return the error to the user and enter a fatal state (for the affected
// partition).
func NoResetOffset() Offset {
return Offset{
at: math.MinInt64,
relative: 0,
noReset: true,
at: -1,
epoch: -1,
noReset: true,
}
}

// AfterMilli returns an offset that consumes from the first offset after a
// given timestamp. This option is not compatible with At/Relative/WithEpoch;
// using any of those will clear the special millisecond state.
// given timestamp. This option is *not* compatible with any At options (nor
// Relative nor WithEpoch); using any of those will clear the special
// millisecond state.
//
// This option can be used to consume at the end of existing partitions, but at
// the start of any new partitions that are created later:
Expand All @@ -93,7 +98,7 @@ func NoResetOffset() Offset {
// By default when using this offset, if consuming encounters an
// OffsetOutOfRange error, consuming will reset to the first offset after this
// timestamp. You can use NoResetOffset().AfterMilli(...) to instead switch the
// client to a fatal state.
// client to a fatal state (for the affected partition).
func (o Offset) AfterMilli(millisec int64) Offset {
o.at = millisec
o.relative = 0
Expand All @@ -102,36 +107,49 @@ func (o Offset) AfterMilli(millisec int64) Offset {
return o
}

// AtStart returns a copy of the calling offset, changing the returned offset
// to begin at the beginning of a partition.
// AtStart copies 'o' and returns an offset starting at the beginning of a
// partition.
func (o Offset) AtStart() Offset {
o.afterMilli = false
o.at = -2
return o
}

// AtEnd returns a copy of the calling offset, changing the returned offset to
// begin at the end of a partition. If you want to consume at the end of the
// topic as it exists right now, but at the beginning of new partitions as they
// are added to the topic later, check out AfterMilli.
// AtEnd copies 'o' and returns an offset starting at the end of a partition.
// If you want to consume at the end of the topic as it exists right now, but
// at the beginning of new partitions as they are added to the topic later,
// check out AfterMilli.
func (o Offset) AtEnd() Offset {
o.afterMilli = false
o.at = -1
return o
}

// Relative returns a copy of the calling offset, changing the returned offset
// to be n relative to what it currently is. If the offset is beginning at the
// end, Relative(-100) will begin 100 before the end.
// AtCommitted copies 'o' and returns an offset that is used *only if*
// there is an existing commit. This is only useful for group consumers.
// If a partition being consumed does not have a commit, the partition will
// enter a fatal state and return an error from PollFetches.
//
// Using this function automatically opts into [NoResetOffset].
func (o Offset) AtCommitted() Offset {
o.noReset = true
o.afterMilli = false
o.at = atCommitted
return o
}

// Relative copies 'o' and returns an offset that starts 'n' relative to what
// 'o' currently is. If 'o' is at the end (from [AtEnd]), Relative(-100) will
// begin 100 before the end.
func (o Offset) Relative(n int64) Offset {
o.afterMilli = false
o.relative = n
return o
}

// WithEpoch returns a copy of the calling offset, changing the returned offset
// to use the given epoch. This epoch is used for truncation detection; the
// default of -1 implies no truncation detection.
// WithEpoch copies 'o' and returns an offset with the given epoch. to use the
// given epoch. This epoch is used for truncation detection; the default of -1
// implies no truncation detection.
func (o Offset) WithEpoch(e int32) Offset {
o.afterMilli = false
if e < 0 {
Expand Down Expand Up @@ -1129,6 +1147,14 @@ func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how
continue
}

// If the offset is atCommitted, then no offset was
// loaded from FetchOffsets. We inject an error and
// avoid using this partition.
if offset.at == atCommitted {
c.addFakeReadyForDraining(topic, partition, errNoCommittedOffset, "notification of uncommitted partition")
continue
}

loadOffsets.addLoad(topic, partition, loadTypeList, offsetLoad{
replica: -1,
Offset: offset,
Expand Down
2 changes: 2 additions & 0 deletions pkg/kgo/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ var (

errMissingMetadataPartition = errors.New("metadata update is missing a partition that we were previously using")

errNoCommittedOffset = errors.New("partition has no prior committed offset")

//////////////
// EXTERNAL //
//////////////
Expand Down

0 comments on commit a7caf20

Please sign in to comment.