From 8cf3e5a9dcd495928782c7d70a6204de6f6f94e7 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Thu, 10 Jun 2021 11:46:40 -0600 Subject: [PATCH] (minor) breaking api change: BlockingCommitOffsets => CommitOffsetsSync After the prior comits adding CommitRecords and CommitUncommittedOffsets, there is a much more valid reason for this change: consistency & docs. ProduceSync was added a bit ago, and I've been well aware of the inconsistency between BlockingCommitOffsets and ProduceSync. Ideally, they would have the same type of name. I think I initially started with CommitOffsetsSync, but thought Blocking was better. I'm not sure that is the case anymore. Second, now that we have four (4) ways to commit, I think it would be very good if all four methods are next to each other in documentation. Although only two methods are recommended (CommitRecords and, better, CommitUncommittedOffsets), it is great now to have all methods start with Commit for documentation proximity (since we cannot control godoc's alphabetical ordering). Lastly, a side benefit of breaking this is that now anybody using it will be forced to notice the change and then potentially see that an even better API was added: CommitUncommittedOffsets. --- pkg/kgo/config.go | 5 +++- pkg/kgo/consumer_group.go | 54 +++++++++++++++++++-------------------- pkg/kgo/group_test.go | 18 ++++++------- 3 files changed, 39 insertions(+), 38 deletions(-) diff --git a/pkg/kgo/config.go b/pkg/kgo/config.go index 6b76c850..bfc92362 100644 --- a/pkg/kgo/config.go +++ b/pkg/kgo/config.go @@ -1209,7 +1209,10 @@ func OnAssigned(onAssigned func(context.Context, *Client, map[string][]int32)) G // completes. // // The OnRevoked function is passed the group's context, which is only canceled -// if the group is left or the client is closed. +// if the group is left or the client is closed. Since OnRevoked is called when +// leaving a group, you likely want to commit before leaving, and to ignore +// context.Canceled / return early if your handling in OnRevoked fails due to +// the context being canceled. // // OnRevoked function is called at the end of a group session even if there are // no partitions being revoked. diff --git a/pkg/kgo/consumer_group.go b/pkg/kgo/consumer_group.go index 711651a8..e89efec4 100644 --- a/pkg/kgo/consumer_group.go +++ b/pkg/kgo/consumer_group.go @@ -31,11 +31,11 @@ type groupConsumer struct { reSeen map[string]bool // topics we evaluated against regex, and whether we want them or not - // Full lock grabbed in BlockingCommitOffsets, read lock grabbed in - // CommitOffsets, this lock ensures that only one blocking commit can + // Full lock grabbed in CommitOffsetsSync, read lock grabbed in + // CommitOffsets, this lock ensures that only one sync commit can // happen at once, and if it is happening, no other commit can be // happening. - blockingCommitMu sync.RWMutex + syncCommitMu sync.RWMutex rejoinCh chan struct{} // cap 1; sent to if subscription changes (regex) @@ -103,7 +103,7 @@ type groupConsumer struct { commitCancel func() commitDone chan struct{} - // blockAuto is set and cleared in {,Blocking}CommitOffsets to block + // blockAuto is set and cleared in CommitOffsets{,Sync} to block // autocommitting if autocommitting is active. This ensures that an // autocommit does not cancel the user's manual commit. blockAuto bool @@ -427,7 +427,7 @@ func (g *groupConsumer) revoke(stage revokeStage, lost map[string][]int32, leavi // After nilling uncommitted here, nothing should recreate // uncommitted until a future fetch after the group is // rejoined. This _can_ be broken with a manual SetOffsets or - // with {,Blocking}CommitOffsets but we explicitly document not + // with CommitOffsets{,Sync} but we explicitly document not // to do that outside the context of a live group session. g.mu.Lock() g.uncommitted = nil @@ -494,7 +494,7 @@ func (g *groupConsumer) revoke(stage revokeStage, lost map[string][]int32, leavi // The block below deletes everything lost from our uncommitted map. // All commits should be **completed** by the time this runs. An async - // commit can undo what we do below. The default revoke runs a blocking + // commit can undo what we do below. The default revoke runs a sync // commit. g.mu.Lock() defer g.mu.Unlock() @@ -1616,7 +1616,7 @@ func (g *groupConsumer) getUncommittedLocked(head bool) map[string]map[int32]Epo return uncommitted } -// CommitRecords issues a blocking offset commit for the offsets contained +// CommitRecords issues a synchronous offset commit for the offsets contained // within rs. Retriable errors are retried up to the configured retry limit, // and any unretriable error is returned. // @@ -1669,7 +1669,7 @@ func (cl *Client) CommitRecords(ctx context.Context, rs ...*Record) error { // Our client retries an OffsetCommitRequest as necessary if the first // response partition has a retriable group error (group coordinator // loading, etc), so any partition error is fatal. - cl.BlockingCommitOffsets(ctx, offsets, func(_ *Client, _ *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) { + cl.CommitOffsetsSync(ctx, offsets, func(_ *Client, _ *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) { if err != nil { rerr = err return @@ -1688,10 +1688,10 @@ func (cl *Client) CommitRecords(ctx context.Context, rs ...*Record) error { return rerr } -// CommitUncommittedOffsets issues a blocking offset commit for any partition -// that has been consumed from that has uncommitted offsets. Retriable errors -// are retried up to the configured retry limit, and any unretriable error is -// returned. +// CommitUncommittedOffsets issues a synchronous offset commit for any +// partition that has been consumed from that has uncommitted offsets. +// Retriable errors are retried up to the configured retry limit, and any +// unretriable error is returned. // // This function is useful as a simple way to commit offsets if you have // disabled autocommitting. As an alternative if you want to commit specific @@ -1712,7 +1712,7 @@ func (cl *Client) CommitRecords(ctx context.Context, rs ...*Record) error { func (cl *Client) CommitUncommittedOffsets(ctx context.Context) error { // This function is just the tail end of CommitRecords just above. var rerr error - cl.BlockingCommitOffsets(ctx, cl.UncommittedOffsets(), func(_ *Client, _ *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) { + cl.CommitOffsetsSync(ctx, cl.UncommittedOffsets(), func(_ *Client, _ *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) { if err != nil { rerr = err return @@ -1730,7 +1730,7 @@ func (cl *Client) CommitUncommittedOffsets(ctx context.Context) error { return rerr } -// BlockingCommitOffsets cancels any active CommitOffsets, begins a commit that +// CommitOffsetsSync cancels any active CommitOffsets, begins a commit that // cannot be canceled, and waits for that commit to complete. This function // will not return until the commit is done and the onDone callback is // complete. @@ -1744,7 +1744,7 @@ func (cl *Client) CommitUncommittedOffsets(ctx context.Context) error { // // For more information about committing and committing asynchronously, see // CommitOffsets. -func (cl *Client) BlockingCommitOffsets( +func (cl *Client) CommitOffsetsSync( ctx context.Context, uncommitted map[string]map[int32]EpochOffset, onDone func(*Client, *kmsg.OffsetCommitRequest, *kmsg.OffsetCommitResponse, error), @@ -1762,10 +1762,10 @@ func (cl *Client) BlockingCommitOffsets( onDone(cl, new(kmsg.OffsetCommitRequest), new(kmsg.OffsetCommitResponse), nil) return } - g.blockingCommitOffsets(ctx, uncommitted, onDone) + g.commitOffsetsSync(ctx, uncommitted, onDone) } -func (g *groupConsumer) blockingCommitOffsets( +func (g *groupConsumer) commitOffsetsSync( ctx context.Context, uncommitted map[string]map[int32]EpochOffset, onDone func(*Client, *kmsg.OffsetCommitRequest, *kmsg.OffsetCommitResponse, error), @@ -1773,18 +1773,18 @@ func (g *groupConsumer) blockingCommitOffsets( done := make(chan struct{}) defer func() { <-done }() - g.cl.cfg.logger.Log(LogLevelDebug, "in BlockingCommitOffsets", "with", uncommitted) - defer g.cl.cfg.logger.Log(LogLevelDebug, "left BlockingCommitOffsets") + g.cl.cfg.logger.Log(LogLevelDebug, "in CommitOffsetsSync", "with", uncommitted) + defer g.cl.cfg.logger.Log(LogLevelDebug, "left CommitOffsetsSync") if onDone == nil { onDone = func(*Client, *kmsg.OffsetCommitRequest, *kmsg.OffsetCommitResponse, error) {} } - g.blockingCommitMu.Lock() // block all other concurrent commits until our OnDone is done. + g.syncCommitMu.Lock() // block all other concurrent commits until our OnDone is done. unblockCommits := func(cl *Client, req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) { defer close(done) - defer g.blockingCommitMu.Unlock() + defer g.syncCommitMu.Unlock() onDone(cl, req, resp, err) } @@ -1835,9 +1835,9 @@ func (g *groupConsumer) blockingCommitOffsets( // event. // // Do not use this async CommitOffsets in OnRevoked, instead use -// BlockingCommitOffsets. If you commit async, the rebalance will proceed -// before this function executes, and you will commit offsets for partitions -// that have moved to a different consumer. +// CommitOffsetsSync. If you commit async, the rebalance will proceed before +// this function executes, and you will commit offsets for partitions that have +// moved to a different consumer. func (cl *Client) CommitOffsets( ctx context.Context, uncommitted map[string]map[int32]EpochOffset, @@ -1859,10 +1859,10 @@ func (cl *Client) CommitOffsets( return } - g.blockingCommitMu.RLock() // block BlockingCommit, but allow other concurrent Commit to cancel us + g.syncCommitMu.RLock() // block sync commit, but allow other concurrent Commit to cancel us unblockSyncCommit := func(cl *Client, req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) { - defer g.blockingCommitMu.RUnlock() + defer g.syncCommitMu.RUnlock() onDone(cl, req, resp, err) } @@ -1893,7 +1893,7 @@ func (g *groupConsumer) defaultRevoke(context.Context, *Client, map[string][]int // We use the client's context rather than the group context, // because this could come from the group being left. The group // context will already be canceled. - g.blockingCommitOffsets(g.cl.ctx, g.getUncommitted(), g.cfg.commitCallback) + g.commitOffsetsSync(g.cl.ctx, g.getUncommitted(), g.cfg.commitCallback) } } diff --git a/pkg/kgo/group_test.go b/pkg/kgo/group_test.go index 2673398b..8909f1e0 100644 --- a/pkg/kgo/group_test.go +++ b/pkg/kgo/group_test.go @@ -135,11 +135,11 @@ func (c *testConsumer) etl(etlsBeforeQuit int) { opts = append(opts, DisableAutoCommit(), OnRevoked(func(ctx context.Context, cl *Client, _ map[string][]int32) { - cl.BlockingCommitOffsets( - ctx, - cl.UncommittedOffsets(), - nil, - ) + // context.Canceled means the client left the + // group, so we ignore that. + if err := cl.CommitUncommittedOffsets(ctx); err != nil && err != context.Canceled { + c.errCh <- fmt.Errorf("unable to commit: %v", err) + } }), OnLost(func(context.Context, *Client, map[string][]int32) {}), ) @@ -165,11 +165,9 @@ func (c *testConsumer) etl(etlsBeforeQuit int) { // see above). To do so, we commit every time _before_ we poll. // Thus, the final poll will remain uncommitted. if etlsBeforeQuit > 0 { - cl.BlockingCommitOffsets( - context.Background(), - cl.UncommittedOffsets(), - nil, - ) + if err := cl.CommitUncommittedOffsets(context.Background()); err != nil { + c.errCh <- fmt.Errorf("unable to commit: %v", err) + } } // We poll with a short timeout so that we do not hang waiting