diff --git a/pkg/kgo/config.go b/pkg/kgo/config.go index b63ae9d6..4c0572c6 100644 --- a/pkg/kgo/config.go +++ b/pkg/kgo/config.go @@ -1237,8 +1237,10 @@ func MaxConcurrentFetches(n int) ConsumerOpt { // from. For group consumers, this is the offset that partitions begin to // consume from if a partition has no commits. If partitions have commits, the // commit offset is used. While fetching, if OffsetOutOfRange is encountered, -// the partition resets to ConsumeResetOffset. Conversely, using NoResetOffset -// stops consuming a partition if the client encounters OffsetOutOfRange. +// the partition resets to ConsumeResetOffset. Using [NoResetOffset] stops +// consuming a partition if the client encounters OffsetOutOfRange. Using +// [Offset.AtCommitted] prevents consuming a partition in a group if the +// partition has no prior commits. // // If you use an exact offset or relative offsets and the offset ends up out of // range, the client chooses the nearest of either the log start offset or the @@ -1254,6 +1256,16 @@ func MaxConcurrentFetches(n int) ConsumerOpt { // reset relative? => the above, + / - the relative amount // reset exact or relative out of bounds? => nearest boundary (start or end) // reset after millisec? => high watermark, or first offset after millisec if one exists +// +// To match Kafka's auto.offset.reset, +// +// NewOffset().AtStart() == auto.offset.reset "earliest" +// NewOffset().AtEnd() == auto.offset.reset "latest" +// NewOffset().AtCommitted() == auto.offset.reset "none" +// +// With the above, make sure to use NoResetOffset() if you want to stop +// consuming when you encounter OffsetOutOfRange. It is highly recommended +// to read the docs for all Offset methods to see a few other alternatives. func ConsumeResetOffset(offset Offset) ConsumerOpt { return consumerOpt{func(cfg *cfg) { cfg.resetOffset = offset }} } diff --git a/pkg/kgo/consumer.go b/pkg/kgo/consumer.go index 10e0fb44..7ba893d5 100644 --- a/pkg/kgo/consumer.go +++ b/pkg/kgo/consumer.go @@ -27,6 +27,9 @@ type Offset struct { afterMilli bool } +// Random negative, only significant within this package. +const atCommitted = -999 + // MarshalJSON implements json.Marshaler. func (o Offset) MarshalJSON() ([]byte, error) { if o.relative == 0 { @@ -54,8 +57,8 @@ func (o Offset) EpochOffset() EpochOffset { } } -// NewOffset creates and returns an offset to use in ConsumePartitions or -// ConsumeResetOffset. +// NewOffset creates and returns an offset to use in [ConsumePartitions] or +// [ConsumeResetOffset]. // // The default offset begins at the end. func NewOffset() Offset { @@ -66,24 +69,26 @@ func NewOffset() Offset { } // NoResetOffset returns an offset that can be used as a "none" option for the -// ConsumeResetOffset option. By default, NoResetOffset starts consuming from +// [ConsumeResetOffset] option. By default, NoResetOffset starts consuming from // the beginning of partitions (similar to NewOffset().AtStart()). This can be // changed with AtEnd, Relative, etc. // // Using this offset will make it such that if OffsetOutOfRange is ever // encountered while consuming, rather than trying to recover, the client will -// return the error to the user and enter a fatal state (for the partition). +// return the error to the user and enter a fatal state (for the affected +// partition). func NoResetOffset() Offset { return Offset{ - at: math.MinInt64, - relative: 0, - noReset: true, + at: -1, + epoch: -1, + noReset: true, } } // AfterMilli returns an offset that consumes from the first offset after a -// given timestamp. This option is not compatible with At/Relative/WithEpoch; -// using any of those will clear the special millisecond state. +// given timestamp. This option is *not* compatible with any At options (nor +// Relative nor WithEpoch); using any of those will clear the special +// millisecond state. // // This option can be used to consume at the end of existing partitions, but at // the start of any new partitions that are created later: @@ -93,7 +98,7 @@ func NoResetOffset() Offset { // By default when using this offset, if consuming encounters an // OffsetOutOfRange error, consuming will reset to the first offset after this // timestamp. You can use NoResetOffset().AfterMilli(...) to instead switch the -// client to a fatal state. +// client to a fatal state (for the affected partition). func (o Offset) AfterMilli(millisec int64) Offset { o.at = millisec o.relative = 0 @@ -102,36 +107,49 @@ func (o Offset) AfterMilli(millisec int64) Offset { return o } -// AtStart returns a copy of the calling offset, changing the returned offset -// to begin at the beginning of a partition. +// AtStart copies 'o' and returns an offset starting at the beginning of a +// partition. func (o Offset) AtStart() Offset { o.afterMilli = false o.at = -2 return o } -// AtEnd returns a copy of the calling offset, changing the returned offset to -// begin at the end of a partition. If you want to consume at the end of the -// topic as it exists right now, but at the beginning of new partitions as they -// are added to the topic later, check out AfterMilli. +// AtEnd copies 'o' and returns an offset starting at the end of a partition. +// If you want to consume at the end of the topic as it exists right now, but +// at the beginning of new partitions as they are added to the topic later, +// check out AfterMilli. func (o Offset) AtEnd() Offset { o.afterMilli = false o.at = -1 return o } -// Relative returns a copy of the calling offset, changing the returned offset -// to be n relative to what it currently is. If the offset is beginning at the -// end, Relative(-100) will begin 100 before the end. +// AtCommitted copies 'o' and returns an offset that is used *only if* +// there is an existing commit. This is only useful for group consumers. +// If a partition being consumed does not have a commit, the partition will +// enter a fatal state and return an error from PollFetches. +// +// Using this function automatically opts into [NoResetOffset]. +func (o Offset) AtCommitted() Offset { + o.noReset = true + o.afterMilli = false + o.at = atCommitted + return o +} + +// Relative copies 'o' and returns an offset that starts 'n' relative to what +// 'o' currently is. If 'o' is at the end (from [AtEnd]), Relative(-100) will +// begin 100 before the end. func (o Offset) Relative(n int64) Offset { o.afterMilli = false o.relative = n return o } -// WithEpoch returns a copy of the calling offset, changing the returned offset -// to use the given epoch. This epoch is used for truncation detection; the -// default of -1 implies no truncation detection. +// WithEpoch copies 'o' and returns an offset with the given epoch. to use the +// given epoch. This epoch is used for truncation detection; the default of -1 +// implies no truncation detection. func (o Offset) WithEpoch(e int32) Offset { o.afterMilli = false if e < 0 { @@ -1129,6 +1147,14 @@ func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how continue } + // If the offset is atCommitted, then no offset was + // loaded from FetchOffsets. We inject an error and + // avoid using this partition. + if offset.at == atCommitted { + c.addFakeReadyForDraining(topic, partition, errNoCommittedOffset, "notification of uncommitted partition") + continue + } + loadOffsets.addLoad(topic, partition, loadTypeList, offsetLoad{ replica: -1, Offset: offset, diff --git a/pkg/kgo/errors.go b/pkg/kgo/errors.go index 8d574b90..37c4f8fc 100644 --- a/pkg/kgo/errors.go +++ b/pkg/kgo/errors.go @@ -173,6 +173,8 @@ var ( errMissingMetadataPartition = errors.New("metadata update is missing a partition that we were previously using") + errNoCommittedOffset = errors.New("partition has no prior committed offset") + ////////////// // EXTERNAL // //////////////