diff --git a/pkg/kgo/config.go b/pkg/kgo/config.go index 1338d3b6..143e7840 100644 --- a/pkg/kgo/config.go +++ b/pkg/kgo/config.go @@ -164,6 +164,7 @@ type cfg struct { autocommitDisable bool // true if autocommit was disabled or we are transactional autocommitGreedy bool + autocommitMarks bool autocommitInterval time.Duration commitCallback func(*Client, *kmsg.OffsetCommitRequest, *kmsg.OffsetCommitResponse, error) } @@ -338,6 +339,19 @@ func (cfg *cfg) validate() error { } } + if cfg.autocommitDisable && cfg.autocommitGreedy { + return errors.New("cannot both disable autocommitting and enable greedy autocommitting") + } + if cfg.autocommitDisable && cfg.autocommitMarks { + return errors.New("cannot both disable autocommitting and enable marked autocommitting") + } + if cfg.autocommitGreedy && cfg.autocommitMarks { + return errors.New("cannot enable both greedy autocommitting and marked autocommitting") + } + if cfg.autocommitGreedy || cfg.autocommitDisable || cfg.autocommitMarks && len(cfg.group) == 0 { + return errors.New("invalid autocommit options specified when a group was not specified") + } + return nil } @@ -1325,6 +1339,18 @@ func AutoCommitInterval(interval time.Duration) GroupOpt { return groupOpt{func(cfg *cfg) { cfg.autocommitInterval = interval }} } +// AutoCommitMarks switches the autocommitting behavior to only commit "marked" +// records, which can be done with the MarkCommitRecords method. +// +// This option is basically a halfway point between autocommitting and manually +// committing. If you have slow batch processing of polls, then you can +// manually mark records to be autocommitted before you poll again. This way, +// if you usually take a long time between polls, your partial work can still +// be automatically checkpointed through autocommitting. +func AutoCommitMarks() GroupOpt { + return groupOpt{func(cfg *cfg) { cfg.autocommitMarks = true }} +} + // InstanceID sets the group consumer's instance ID, switching the group member // from "dynamic" to "static". // diff --git a/pkg/kgo/consumer.go b/pkg/kgo/consumer.go index 63a37979..903edc0d 100644 --- a/pkg/kgo/consumer.go +++ b/pkg/kgo/consumer.go @@ -278,9 +278,7 @@ func (cl *Client) PollRecords(ctx context.Context, maxPollRecords int) Fetches { } c := &cl.consumer - if c.g != nil && !cl.cfg.autocommitDisable && !cl.cfg.autocommitGreedy { - c.g.undirtyUncommitted() - } + c.g.undirtyUncommitted() var fetches Fetches fill := func() { diff --git a/pkg/kgo/consumer_group.go b/pkg/kgo/consumer_group.go index 56000566..f7d49e4e 100644 --- a/pkg/kgo/consumer_group.go +++ b/pkg/kgo/consumer_group.go @@ -1295,6 +1295,10 @@ type EpochOffset struct { Offset int64 } +func (e EpochOffset) less(o EpochOffset) bool { + return e.Epoch < o.Epoch || e.Epoch == o.Epoch && e.Offset < o.Offset +} + type uncommitted map[string]map[int32]uncommit // updateUncommitted sets the latest uncommitted offset. @@ -1302,6 +1306,9 @@ func (g *groupConsumer) updateUncommitted(fetches Fetches) { var b bytes.Buffer debug := g.cfg.logger.Level() >= LogLevelDebug + // We set the head offset if autocommitting is disabled (because we + // only use head / committed in that case), or if we are greedily + // autocommitting (so that the latest head is available to autocommit). setHead := g.cfg.autocommitDisable || g.cfg.autocommitGreedy g.mu.Lock() @@ -1375,6 +1382,25 @@ func (g *groupConsumer) updateUncommitted(fetches Fetches) { // not committing greedily, this ensures that when we enter poll, everything // previously consumed is a candidate for autocommitting. func (g *groupConsumer) undirtyUncommitted() { + if g == nil { + return + } + // Disabling autocommit means we do not use the dirty offset: we always + // update head, and then manual commits use that. + if g.cfg.autocommitDisable { + return + } + // Greedy autocommitting does not use dirty offsets, because we always + // just set head to the latest. + if g.cfg.autocommitGreedy { + return + } + // If we are autocommitting marked records only, then we do not + // automatically un-dirty our offsets. + if g.cfg.autocommitMarks { + return + } + g.mu.Lock() defer g.mu.Unlock() @@ -1474,9 +1500,7 @@ func (g *groupConsumer) updateCommitted( &uncommit.head, &uncommit.committed, } { - if set.Epoch > next.Epoch || - set.Epoch == next.Epoch && - set.Offset > next.Offset { + if next.less(set) { *next = set } } @@ -1782,6 +1806,51 @@ func (cl *Client) CommitRecords(ctx context.Context, rs ...*Record) error { return rerr } +// MarkCommitRecords marks records to be available for autocommitting. This +// function is only useful if you use the AutoCommitMarks config option, see +// the documentation on that option for more details. +func (cl *Client) MarkCommitRecords(rs ...*Record) { + g := cl.consumer.g + if g == nil || !cl.cfg.autocommitMarks { + return + } + + sort.Slice(rs, func(i, j int) bool { + return rs[i].Topic < rs[j].Topic || + rs[i].Topic == rs[j].Topic && rs[i].Partition < rs[j].Partition + }) + + if g.uncommitted == nil { + g.uncommitted = make(uncommitted) + } + var curTopic string + var curPartitions map[int32]uncommit + for _, r := range rs { + if curPartitions == nil || r.Topic != curTopic { + curPartitions = g.uncommitted[r.Topic] + if curPartitions == nil { + curPartitions = make(map[int32]uncommit) + g.uncommitted[r.Topic] = curPartitions + } + curTopic = r.Topic + } + + set := EpochOffset{ + r.LeaderEpoch, + r.Offset + 1, + } + + next := curPartitions[r.Partition] + if next.head.less(set) { + next.head = set + } + if next.dirty.less(set) { // for sanity, but this should not happen + next.dirty = set + } + curPartitions[r.Partition] = next + } +} + // CommitUncommittedOffsets issues a synchronous offset commit for any // partition that has been consumed from that has uncommitted offsets. // Retriable errors are retried up to the configured retry limit, and any