Skip to content

Commit

Permalink
kgo: add PreCommitContextFn, enabling pre-commit interceptors for met…
Browse files Browse the repository at this point in the history
…adata

Today, there is no way to add metadata to commits. I originally figured
the use cases were too advanced and niche, however, there are use cases.

The PreCommitContextFn allows one to hook into *just* before a commit is
issued to annotate any part of it as desired, or to abort the commit
altogether. This basically enables associating commits with any
metadata, or other advanced unknown use cases.
  • Loading branch information
twmb committed May 1, 2022
1 parent 29363ea commit 0a4f2ec
Showing 1 changed file with 20 additions and 0 deletions.
20 changes: 20 additions & 0 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -2029,6 +2029,19 @@ func (g *groupConsumer) getUncommittedLocked(head, dirty bool) map[string]map[in
return uncommitted
}

type commitContextFnT struct{}

var commitContextFn commitContextFnT

// PreCommitContextFn attaches fn to the context through WithValue. Using the
// context while committing allows fn to be called just before the commit is
// issued. This can be used to modify the actual commit, such as by associating
// metadata with partitions. If fn returns an error, the commit is not
// attempted.
func PreCommitContextFn(ctx context.Context, fn func(*kmsg.OffsetCommitRequest) error) context.Context {
return context.WithValue(ctx, commitContextFn, fn)
}

// CommitRecords issues a synchronous offset commit for the offsets contained
// within rs. Retriable errors are retried up to the configured retry limit,
// and any unretriable error is returned.
Expand Down Expand Up @@ -2518,6 +2531,13 @@ func (g *groupConsumer) commitAcrossRebalance(
req.Topics = append(req.Topics, reqTopic)
}

if fn, ok := ctx.Value(commitContextFn).(func(*kmsg.OffsetCommitRequest) error); ok {
if err := fn(req); err != nil {
onDone(g.cl, req, nil, err)
return
}
}

resp, err := req.RequestWith(commitCtx, g.cl)
if err != nil {
onDone(g.cl, req, nil, err)
Expand Down

0 comments on commit 0a4f2ec

Please sign in to comment.