Skip to content

Commit

Permalink
(minor) breaking api change: BlockingCommitOffsets => CommitOffsetsSync
Browse files Browse the repository at this point in the history
After the prior comits adding CommitRecords and
CommitUncommittedOffsets, there is a much more valid reason for this
change: consistency & docs.

ProduceSync was added a bit ago, and I've been well aware of the
inconsistency between BlockingCommitOffsets and ProduceSync. Ideally,
they would have the same type of name. I think I initially started with
CommitOffsetsSync, but thought Blocking was better. I'm not sure that is
the case anymore.

Second, now that we have four (4) ways to commit, I think it would be
very good if all four methods are next to each other in documentation.
Although only two methods are recommended (CommitRecords and, better,
CommitUncommittedOffsets), it is great now to have all methods start
with Commit for documentation proximity (since we cannot control godoc's
alphabetical ordering).

Lastly, a side benefit of breaking this is that now anybody using it
will be forced to notice the change and then potentially see that an
even better API was added: CommitUncommittedOffsets.
  • Loading branch information
twmb committed Jun 10, 2021
1 parent 922f4b8 commit 8cf3e5a
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 38 deletions.
5 changes: 4 additions & 1 deletion pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1209,7 +1209,10 @@ func OnAssigned(onAssigned func(context.Context, *Client, map[string][]int32)) G
// completes.
//
// The OnRevoked function is passed the group's context, which is only canceled
// if the group is left or the client is closed.
// if the group is left or the client is closed. Since OnRevoked is called when
// leaving a group, you likely want to commit before leaving, and to ignore
// context.Canceled / return early if your handling in OnRevoked fails due to
// the context being canceled.
//
// OnRevoked function is called at the end of a group session even if there are
// no partitions being revoked.
Expand Down
54 changes: 27 additions & 27 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ type groupConsumer struct {

reSeen map[string]bool // topics we evaluated against regex, and whether we want them or not

// Full lock grabbed in BlockingCommitOffsets, read lock grabbed in
// CommitOffsets, this lock ensures that only one blocking commit can
// Full lock grabbed in CommitOffsetsSync, read lock grabbed in
// CommitOffsets, this lock ensures that only one sync commit can
// happen at once, and if it is happening, no other commit can be
// happening.
blockingCommitMu sync.RWMutex
syncCommitMu sync.RWMutex

rejoinCh chan struct{} // cap 1; sent to if subscription changes (regex)

Expand Down Expand Up @@ -103,7 +103,7 @@ type groupConsumer struct {
commitCancel func()
commitDone chan struct{}

// blockAuto is set and cleared in {,Blocking}CommitOffsets to block
// blockAuto is set and cleared in CommitOffsets{,Sync} to block
// autocommitting if autocommitting is active. This ensures that an
// autocommit does not cancel the user's manual commit.
blockAuto bool
Expand Down Expand Up @@ -427,7 +427,7 @@ func (g *groupConsumer) revoke(stage revokeStage, lost map[string][]int32, leavi
// After nilling uncommitted here, nothing should recreate
// uncommitted until a future fetch after the group is
// rejoined. This _can_ be broken with a manual SetOffsets or
// with {,Blocking}CommitOffsets but we explicitly document not
// with CommitOffsets{,Sync} but we explicitly document not
// to do that outside the context of a live group session.
g.mu.Lock()
g.uncommitted = nil
Expand Down Expand Up @@ -494,7 +494,7 @@ func (g *groupConsumer) revoke(stage revokeStage, lost map[string][]int32, leavi

// The block below deletes everything lost from our uncommitted map.
// All commits should be **completed** by the time this runs. An async
// commit can undo what we do below. The default revoke runs a blocking
// commit can undo what we do below. The default revoke runs a sync
// commit.
g.mu.Lock()
defer g.mu.Unlock()
Expand Down Expand Up @@ -1616,7 +1616,7 @@ func (g *groupConsumer) getUncommittedLocked(head bool) map[string]map[int32]Epo
return uncommitted
}

// CommitRecords issues a blocking offset commit for the offsets contained
// CommitRecords issues a synchronous offset commit for the offsets contained
// within rs. Retriable errors are retried up to the configured retry limit,
// and any unretriable error is returned.
//
Expand Down Expand Up @@ -1669,7 +1669,7 @@ func (cl *Client) CommitRecords(ctx context.Context, rs ...*Record) error {
// Our client retries an OffsetCommitRequest as necessary if the first
// response partition has a retriable group error (group coordinator
// loading, etc), so any partition error is fatal.
cl.BlockingCommitOffsets(ctx, offsets, func(_ *Client, _ *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) {
cl.CommitOffsetsSync(ctx, offsets, func(_ *Client, _ *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) {
if err != nil {
rerr = err
return
Expand All @@ -1688,10 +1688,10 @@ func (cl *Client) CommitRecords(ctx context.Context, rs ...*Record) error {
return rerr
}

// CommitUncommittedOffsets issues a blocking offset commit for any partition
// that has been consumed from that has uncommitted offsets. Retriable errors
// are retried up to the configured retry limit, and any unretriable error is
// returned.
// CommitUncommittedOffsets issues a synchronous offset commit for any
// partition that has been consumed from that has uncommitted offsets.
// Retriable errors are retried up to the configured retry limit, and any
// unretriable error is returned.
//
// This function is useful as a simple way to commit offsets if you have
// disabled autocommitting. As an alternative if you want to commit specific
Expand All @@ -1712,7 +1712,7 @@ func (cl *Client) CommitRecords(ctx context.Context, rs ...*Record) error {
func (cl *Client) CommitUncommittedOffsets(ctx context.Context) error {
// This function is just the tail end of CommitRecords just above.
var rerr error
cl.BlockingCommitOffsets(ctx, cl.UncommittedOffsets(), func(_ *Client, _ *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) {
cl.CommitOffsetsSync(ctx, cl.UncommittedOffsets(), func(_ *Client, _ *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) {
if err != nil {
rerr = err
return
Expand All @@ -1730,7 +1730,7 @@ func (cl *Client) CommitUncommittedOffsets(ctx context.Context) error {
return rerr
}

// BlockingCommitOffsets cancels any active CommitOffsets, begins a commit that
// CommitOffsetsSync cancels any active CommitOffsets, begins a commit that
// cannot be canceled, and waits for that commit to complete. This function
// will not return until the commit is done and the onDone callback is
// complete.
Expand All @@ -1744,7 +1744,7 @@ func (cl *Client) CommitUncommittedOffsets(ctx context.Context) error {
//
// For more information about committing and committing asynchronously, see
// CommitOffsets.
func (cl *Client) BlockingCommitOffsets(
func (cl *Client) CommitOffsetsSync(
ctx context.Context,
uncommitted map[string]map[int32]EpochOffset,
onDone func(*Client, *kmsg.OffsetCommitRequest, *kmsg.OffsetCommitResponse, error),
Expand All @@ -1762,29 +1762,29 @@ func (cl *Client) BlockingCommitOffsets(
onDone(cl, new(kmsg.OffsetCommitRequest), new(kmsg.OffsetCommitResponse), nil)
return
}
g.blockingCommitOffsets(ctx, uncommitted, onDone)
g.commitOffsetsSync(ctx, uncommitted, onDone)
}

func (g *groupConsumer) blockingCommitOffsets(
func (g *groupConsumer) commitOffsetsSync(
ctx context.Context,
uncommitted map[string]map[int32]EpochOffset,
onDone func(*Client, *kmsg.OffsetCommitRequest, *kmsg.OffsetCommitResponse, error),
) {
done := make(chan struct{})
defer func() { <-done }()

g.cl.cfg.logger.Log(LogLevelDebug, "in BlockingCommitOffsets", "with", uncommitted)
defer g.cl.cfg.logger.Log(LogLevelDebug, "left BlockingCommitOffsets")
g.cl.cfg.logger.Log(LogLevelDebug, "in CommitOffsetsSync", "with", uncommitted)
defer g.cl.cfg.logger.Log(LogLevelDebug, "left CommitOffsetsSync")

if onDone == nil {
onDone = func(*Client, *kmsg.OffsetCommitRequest, *kmsg.OffsetCommitResponse, error) {}
}

g.blockingCommitMu.Lock() // block all other concurrent commits until our OnDone is done.
g.syncCommitMu.Lock() // block all other concurrent commits until our OnDone is done.

unblockCommits := func(cl *Client, req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) {
defer close(done)
defer g.blockingCommitMu.Unlock()
defer g.syncCommitMu.Unlock()
onDone(cl, req, resp, err)
}

Expand Down Expand Up @@ -1835,9 +1835,9 @@ func (g *groupConsumer) blockingCommitOffsets(
// event.
//
// Do not use this async CommitOffsets in OnRevoked, instead use
// BlockingCommitOffsets. If you commit async, the rebalance will proceed
// before this function executes, and you will commit offsets for partitions
// that have moved to a different consumer.
// CommitOffsetsSync. If you commit async, the rebalance will proceed before
// this function executes, and you will commit offsets for partitions that have
// moved to a different consumer.
func (cl *Client) CommitOffsets(
ctx context.Context,
uncommitted map[string]map[int32]EpochOffset,
Expand All @@ -1859,10 +1859,10 @@ func (cl *Client) CommitOffsets(
return
}

g.blockingCommitMu.RLock() // block BlockingCommit, but allow other concurrent Commit to cancel us
g.syncCommitMu.RLock() // block sync commit, but allow other concurrent Commit to cancel us

unblockSyncCommit := func(cl *Client, req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) {
defer g.blockingCommitMu.RUnlock()
defer g.syncCommitMu.RUnlock()
onDone(cl, req, resp, err)
}

Expand Down Expand Up @@ -1893,7 +1893,7 @@ func (g *groupConsumer) defaultRevoke(context.Context, *Client, map[string][]int
// We use the client's context rather than the group context,
// because this could come from the group being left. The group
// context will already be canceled.
g.blockingCommitOffsets(g.cl.ctx, g.getUncommitted(), g.cfg.commitCallback)
g.commitOffsetsSync(g.cl.ctx, g.getUncommitted(), g.cfg.commitCallback)
}
}

Expand Down
18 changes: 8 additions & 10 deletions pkg/kgo/group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,11 @@ func (c *testConsumer) etl(etlsBeforeQuit int) {
opts = append(opts,
DisableAutoCommit(),
OnRevoked(func(ctx context.Context, cl *Client, _ map[string][]int32) {
cl.BlockingCommitOffsets(
ctx,
cl.UncommittedOffsets(),
nil,
)
// context.Canceled means the client left the
// group, so we ignore that.
if err := cl.CommitUncommittedOffsets(ctx); err != nil && err != context.Canceled {
c.errCh <- fmt.Errorf("unable to commit: %v", err)
}
}),
OnLost(func(context.Context, *Client, map[string][]int32) {}),
)
Expand All @@ -165,11 +165,9 @@ func (c *testConsumer) etl(etlsBeforeQuit int) {
// see above). To do so, we commit every time _before_ we poll.
// Thus, the final poll will remain uncommitted.
if etlsBeforeQuit > 0 {
cl.BlockingCommitOffsets(
context.Background(),
cl.UncommittedOffsets(),
nil,
)
if err := cl.CommitUncommittedOffsets(context.Background()); err != nil {
c.errCh <- fmt.Errorf("unable to commit: %v", err)
}
}

// We poll with a short timeout so that we do not hang waiting
Expand Down

0 comments on commit 8cf3e5a

Please sign in to comment.