Skip to content

Commit

Permalink
consuming: add autocommitting marked records
Browse files Browse the repository at this point in the history
This is a halfway point between manually committing and autocommitting;
this opts in to manually specifying which records should be available to
autocommitting. This is useful when you have slower batch processing
between polling, and you do not want to lose everything in the event of
a shutdown.

Closes #73
  • Loading branch information
twmb committed Sep 1, 2021
1 parent c973268 commit 31ed46b
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 6 deletions.
26 changes: 26 additions & 0 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ type cfg struct {

autocommitDisable bool // true if autocommit was disabled or we are transactional
autocommitGreedy bool
autocommitMarks bool
autocommitInterval time.Duration
commitCallback func(*Client, *kmsg.OffsetCommitRequest, *kmsg.OffsetCommitResponse, error)
}
Expand Down Expand Up @@ -338,6 +339,19 @@ func (cfg *cfg) validate() error {
}
}

if cfg.autocommitDisable && cfg.autocommitGreedy {
return errors.New("cannot both disable autocommitting and enable greedy autocommitting")
}
if cfg.autocommitDisable && cfg.autocommitMarks {
return errors.New("cannot both disable autocommitting and enable marked autocommitting")
}
if cfg.autocommitGreedy && cfg.autocommitMarks {
return errors.New("cannot enable both greedy autocommitting and marked autocommitting")
}
if cfg.autocommitGreedy || cfg.autocommitDisable || cfg.autocommitMarks && len(cfg.group) == 0 {
return errors.New("invalid autocommit options specified when a group was not specified")
}

return nil
}

Expand Down Expand Up @@ -1325,6 +1339,18 @@ func AutoCommitInterval(interval time.Duration) GroupOpt {
return groupOpt{func(cfg *cfg) { cfg.autocommitInterval = interval }}
}

// AutoCommitMarks switches the autocommitting behavior to only commit "marked"
// records, which can be done with the MarkCommitRecords method.
//
// This option is basically a halfway point between autocommitting and manually
// committing. If you have slow batch processing of polls, then you can
// manually mark records to be autocommitted before you poll again. This way,
// if you usually take a long time between polls, your partial work can still
// be automatically checkpointed through autocommitting.
func AutoCommitMarks() GroupOpt {
return groupOpt{func(cfg *cfg) { cfg.autocommitMarks = true }}
}

// InstanceID sets the group consumer's instance ID, switching the group member
// from "dynamic" to "static".
//
Expand Down
4 changes: 1 addition & 3 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,9 +278,7 @@ func (cl *Client) PollRecords(ctx context.Context, maxPollRecords int) Fetches {
}
c := &cl.consumer

if c.g != nil && !cl.cfg.autocommitDisable && !cl.cfg.autocommitGreedy {
c.g.undirtyUncommitted()
}
c.g.undirtyUncommitted()

var fetches Fetches
fill := func() {
Expand Down
75 changes: 72 additions & 3 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -1295,13 +1295,20 @@ type EpochOffset struct {
Offset int64
}

func (e EpochOffset) less(o EpochOffset) bool {
return e.Epoch < o.Epoch || e.Epoch == o.Epoch && e.Offset < o.Offset
}

type uncommitted map[string]map[int32]uncommit

// updateUncommitted sets the latest uncommitted offset.
func (g *groupConsumer) updateUncommitted(fetches Fetches) {
var b bytes.Buffer
debug := g.cfg.logger.Level() >= LogLevelDebug

// We set the head offset if autocommitting is disabled (because we
// only use head / committed in that case), or if we are greedily
// autocommitting (so that the latest head is available to autocommit).
setHead := g.cfg.autocommitDisable || g.cfg.autocommitGreedy

g.mu.Lock()
Expand Down Expand Up @@ -1375,6 +1382,25 @@ func (g *groupConsumer) updateUncommitted(fetches Fetches) {
// not committing greedily, this ensures that when we enter poll, everything
// previously consumed is a candidate for autocommitting.
func (g *groupConsumer) undirtyUncommitted() {
if g == nil {
return
}
// Disabling autocommit means we do not use the dirty offset: we always
// update head, and then manual commits use that.
if g.cfg.autocommitDisable {
return
}
// Greedy autocommitting does not use dirty offsets, because we always
// just set head to the latest.
if g.cfg.autocommitGreedy {
return
}
// If we are autocommitting marked records only, then we do not
// automatically un-dirty our offsets.
if g.cfg.autocommitMarks {
return
}

g.mu.Lock()
defer g.mu.Unlock()

Expand Down Expand Up @@ -1474,9 +1500,7 @@ func (g *groupConsumer) updateCommitted(
&uncommit.head,
&uncommit.committed,
} {
if set.Epoch > next.Epoch ||
set.Epoch == next.Epoch &&
set.Offset > next.Offset {
if next.less(set) {
*next = set
}
}
Expand Down Expand Up @@ -1782,6 +1806,51 @@ func (cl *Client) CommitRecords(ctx context.Context, rs ...*Record) error {
return rerr
}

// MarkCommitRecords marks records to be available for autocommitting. This
// function is only useful if you use the AutoCommitMarks config option, see
// the documentation on that option for more details.
func (cl *Client) MarkCommitRecords(rs ...*Record) {
g := cl.consumer.g
if g == nil || !cl.cfg.autocommitMarks {
return
}

sort.Slice(rs, func(i, j int) bool {
return rs[i].Topic < rs[j].Topic ||
rs[i].Topic == rs[j].Topic && rs[i].Partition < rs[j].Partition
})

if g.uncommitted == nil {
g.uncommitted = make(uncommitted)
}
var curTopic string
var curPartitions map[int32]uncommit
for _, r := range rs {
if curPartitions == nil || r.Topic != curTopic {
curPartitions = g.uncommitted[r.Topic]
if curPartitions == nil {
curPartitions = make(map[int32]uncommit)
g.uncommitted[r.Topic] = curPartitions
}
curTopic = r.Topic
}

set := EpochOffset{
r.LeaderEpoch,
r.Offset + 1,
}

next := curPartitions[r.Partition]
if next.head.less(set) {
next.head = set
}
if next.dirty.less(set) { // for sanity, but this should not happen
next.dirty = set
}
curPartitions[r.Partition] = next
}
}

// CommitUncommittedOffsets issues a synchronous offset commit for any
// partition that has been consumed from that has uncommitted offsets.
// Retriable errors are retried up to the configured retry limit, and any
Expand Down

0 comments on commit 31ed46b

Please sign in to comment.