Skip to content

Commit

Permalink
MarkCommitRecords: forbid rewinds
Browse files Browse the repository at this point in the history
v1.3.3 changed MarkCommitRecords to allow rewinds.

After using this further, allowing rewinds in an API that under the hood
is periodic is a bit odd.

This reinstates the original behavior of not moving marked records
backwards.
  • Loading branch information
twmb committed May 5, 2022
1 parent 41284b3 commit ff5a3ed
Showing 1 changed file with 11 additions and 14 deletions.
25 changes: 11 additions & 14 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -2154,13 +2154,8 @@ func (cl *Client) CommitRecords(ctx context.Context, rs ...*Record) error {

// MarkCommitRecords marks records to be available for autocommitting. This
// function is only useful if you use the AutoCommitMarks config option, see
// the documentation on that option for more details.
//
// This function blindly sets the "head" per partition that will be committed
// on the next autocommit. This can be used to rewind partitions if necessary,
// however it is strongly not recommended to use autocommitting + marks to
// rewind commits, and depending on timing, the autocommit may undo a mark
// rewind.
// the documentation on that option for more details. This function does not
// allow rewinds.
func (cl *Client) MarkCommitRecords(rs ...*Record) {
g := cl.consumer.g
if g == nil || !cl.cfg.autocommitMarks {
Expand Down Expand Up @@ -2192,13 +2187,15 @@ func (cl *Client) MarkCommitRecords(rs ...*Record) {
}

current := curPartitions[r.Partition]
curPartitions[r.Partition] = uncommit{
dirty: current.dirty,
committed: current.committed,
head: EpochOffset{
r.LeaderEpoch,
r.Offset + 1,
},
if newHead := (EpochOffset{
r.LeaderEpoch,
r.Offset + 1,
}); current.head.less(newHead) {
curPartitions[r.Partition] = uncommit{
dirty: current.dirty,
committed: current.committed,
head: newHead,
}
}
}
}
Expand Down

0 comments on commit ff5a3ed

Please sign in to comment.