From 0a4f2ec891ad2c34ab16156a99b3d18a6cd3ae84 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Sat, 30 Apr 2022 22:03:05 -0600 Subject: [PATCH] kgo: add PreCommitContextFn, enabling pre-commit interceptors for metadata Today, there is no way to add metadata to commits. I originally figured the use cases were too advanced and niche, however, there are use cases. The PreCommitContextFn allows one to hook into *just* before a commit is issued to annotate any part of it as desired, or to abort the commit altogether. This basically enables associating commits with any metadata, or other advanced unknown use cases. --- pkg/kgo/consumer_group.go | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/pkg/kgo/consumer_group.go b/pkg/kgo/consumer_group.go index 41d91e85..a0fc0337 100644 --- a/pkg/kgo/consumer_group.go +++ b/pkg/kgo/consumer_group.go @@ -2029,6 +2029,19 @@ func (g *groupConsumer) getUncommittedLocked(head, dirty bool) map[string]map[in return uncommitted } +type commitContextFnT struct{} + +var commitContextFn commitContextFnT + +// PreCommitContextFn attaches fn to the context through WithValue. Using the +// context while committing allows fn to be called just before the commit is +// issued. This can be used to modify the actual commit, such as by associating +// metadata with partitions. If fn returns an error, the commit is not +// attempted. +func PreCommitContextFn(ctx context.Context, fn func(*kmsg.OffsetCommitRequest) error) context.Context { + return context.WithValue(ctx, commitContextFn, fn) +} + // CommitRecords issues a synchronous offset commit for the offsets contained // within rs. Retriable errors are retried up to the configured retry limit, // and any unretriable error is returned. @@ -2518,6 +2531,13 @@ func (g *groupConsumer) commitAcrossRebalance( req.Topics = append(req.Topics, reqTopic) } + if fn, ok := ctx.Value(commitContextFn).(func(*kmsg.OffsetCommitRequest) error); ok { + if err := fn(req); err != nil { + onDone(g.cl, req, nil, err) + return + } + } + resp, err := req.RequestWith(commitCtx, g.cl) if err != nil { onDone(g.cl, req, nil, err)