From 31ed46b6ac790f7bb5f5f2b757cd9e2ec236d0b1 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Tue, 31 Aug 2021 20:30:21 -0600 Subject: [PATCH] consuming: add autocommitting marked records This is a halfway point between manually committing and autocommitting; this opts in to manually specifying which records should be available to autocommitting. This is useful when you have slower batch processing between polling, and you do not want to lose everything in the event of a shutdown. Closes #73 --- pkg/kgo/config.go | 26 ++++++++++++++ pkg/kgo/consumer.go | 4 +-- pkg/kgo/consumer_group.go | 75 +++++++++++++++++++++++++++++++++++++-- 3 files changed, 99 insertions(+), 6 deletions(-) diff --git a/pkg/kgo/config.go b/pkg/kgo/config.go index 1338d3b6..143e7840 100644 --- a/pkg/kgo/config.go +++ b/pkg/kgo/config.go @@ -164,6 +164,7 @@ type cfg struct { autocommitDisable bool // true if autocommit was disabled or we are transactional autocommitGreedy bool + autocommitMarks bool autocommitInterval time.Duration commitCallback func(*Client, *kmsg.OffsetCommitRequest, *kmsg.OffsetCommitResponse, error) } @@ -338,6 +339,19 @@ func (cfg *cfg) validate() error { } } + if cfg.autocommitDisable && cfg.autocommitGreedy { + return errors.New("cannot both disable autocommitting and enable greedy autocommitting") + } + if cfg.autocommitDisable && cfg.autocommitMarks { + return errors.New("cannot both disable autocommitting and enable marked autocommitting") + } + if cfg.autocommitGreedy && cfg.autocommitMarks { + return errors.New("cannot enable both greedy autocommitting and marked autocommitting") + } + if cfg.autocommitGreedy || cfg.autocommitDisable || cfg.autocommitMarks && len(cfg.group) == 0 { + return errors.New("invalid autocommit options specified when a group was not specified") + } + return nil } @@ -1325,6 +1339,18 @@ func AutoCommitInterval(interval time.Duration) GroupOpt { return groupOpt{func(cfg *cfg) { cfg.autocommitInterval = interval }} } +// AutoCommitMarks switches the autocommitting behavior to only commit "marked" +// records, which can be done with the MarkCommitRecords method. +// +// This option is basically a halfway point between autocommitting and manually +// committing. If you have slow batch processing of polls, then you can +// manually mark records to be autocommitted before you poll again. This way, +// if you usually take a long time between polls, your partial work can still +// be automatically checkpointed through autocommitting. +func AutoCommitMarks() GroupOpt { + return groupOpt{func(cfg *cfg) { cfg.autocommitMarks = true }} +} + // InstanceID sets the group consumer's instance ID, switching the group member // from "dynamic" to "static". // diff --git a/pkg/kgo/consumer.go b/pkg/kgo/consumer.go index 63a37979..903edc0d 100644 --- a/pkg/kgo/consumer.go +++ b/pkg/kgo/consumer.go @@ -278,9 +278,7 @@ func (cl *Client) PollRecords(ctx context.Context, maxPollRecords int) Fetches { } c := &cl.consumer - if c.g != nil && !cl.cfg.autocommitDisable && !cl.cfg.autocommitGreedy { - c.g.undirtyUncommitted() - } + c.g.undirtyUncommitted() var fetches Fetches fill := func() { diff --git a/pkg/kgo/consumer_group.go b/pkg/kgo/consumer_group.go index 56000566..f7d49e4e 100644 --- a/pkg/kgo/consumer_group.go +++ b/pkg/kgo/consumer_group.go @@ -1295,6 +1295,10 @@ type EpochOffset struct { Offset int64 } +func (e EpochOffset) less(o EpochOffset) bool { + return e.Epoch < o.Epoch || e.Epoch == o.Epoch && e.Offset < o.Offset +} + type uncommitted map[string]map[int32]uncommit // updateUncommitted sets the latest uncommitted offset. @@ -1302,6 +1306,9 @@ func (g *groupConsumer) updateUncommitted(fetches Fetches) { var b bytes.Buffer debug := g.cfg.logger.Level() >= LogLevelDebug + // We set the head offset if autocommitting is disabled (because we + // only use head / committed in that case), or if we are greedily + // autocommitting (so that the latest head is available to autocommit). setHead := g.cfg.autocommitDisable || g.cfg.autocommitGreedy g.mu.Lock() @@ -1375,6 +1382,25 @@ func (g *groupConsumer) updateUncommitted(fetches Fetches) { // not committing greedily, this ensures that when we enter poll, everything // previously consumed is a candidate for autocommitting. func (g *groupConsumer) undirtyUncommitted() { + if g == nil { + return + } + // Disabling autocommit means we do not use the dirty offset: we always + // update head, and then manual commits use that. + if g.cfg.autocommitDisable { + return + } + // Greedy autocommitting does not use dirty offsets, because we always + // just set head to the latest. + if g.cfg.autocommitGreedy { + return + } + // If we are autocommitting marked records only, then we do not + // automatically un-dirty our offsets. + if g.cfg.autocommitMarks { + return + } + g.mu.Lock() defer g.mu.Unlock() @@ -1474,9 +1500,7 @@ func (g *groupConsumer) updateCommitted( &uncommit.head, &uncommit.committed, } { - if set.Epoch > next.Epoch || - set.Epoch == next.Epoch && - set.Offset > next.Offset { + if next.less(set) { *next = set } } @@ -1782,6 +1806,51 @@ func (cl *Client) CommitRecords(ctx context.Context, rs ...*Record) error { return rerr } +// MarkCommitRecords marks records to be available for autocommitting. This +// function is only useful if you use the AutoCommitMarks config option, see +// the documentation on that option for more details. +func (cl *Client) MarkCommitRecords(rs ...*Record) { + g := cl.consumer.g + if g == nil || !cl.cfg.autocommitMarks { + return + } + + sort.Slice(rs, func(i, j int) bool { + return rs[i].Topic < rs[j].Topic || + rs[i].Topic == rs[j].Topic && rs[i].Partition < rs[j].Partition + }) + + if g.uncommitted == nil { + g.uncommitted = make(uncommitted) + } + var curTopic string + var curPartitions map[int32]uncommit + for _, r := range rs { + if curPartitions == nil || r.Topic != curTopic { + curPartitions = g.uncommitted[r.Topic] + if curPartitions == nil { + curPartitions = make(map[int32]uncommit) + g.uncommitted[r.Topic] = curPartitions + } + curTopic = r.Topic + } + + set := EpochOffset{ + r.LeaderEpoch, + r.Offset + 1, + } + + next := curPartitions[r.Partition] + if next.head.less(set) { + next.head = set + } + if next.dirty.less(set) { // for sanity, but this should not happen + next.dirty = set + } + curPartitions[r.Partition] = next + } +} + // CommitUncommittedOffsets issues a synchronous offset commit for any // partition that has been consumed from that has uncommitted offsets. // Retriable errors are retried up to the configured retry limit, and any