Skip to content

Commit d035ddf

Browse files
committed
kgo groups: block join&sync while a commit is inflight
See comments. This is also a different and arguably better approach to the old issue #137. The implementation has to be a bit insane because we want to obey the context while _also_ trying to grab a lock, which cannot be canceled. Closes #409. Closes #137.
1 parent 1a59c2d commit d035ddf

File tree

3 files changed

+93
-94
lines changed

3 files changed

+93
-94
lines changed

Diff for: pkg/kgo/config.go

+12-1
Original file line numberDiff line numberDiff line change
@@ -1471,6 +1471,13 @@ func RequireStableFetchOffsets() GroupOpt {
14711471
//
14721472
// This function can largely replace any commit logic you may want to do in
14731473
// OnPartitionsRevoked.
1474+
//
1475+
// Lastly, note that this actually blocks any rebalance from calling
1476+
// OnPartitions{Assigned,Revoked,Lost}. If you are using a cooperative
1477+
// rebalancer such as CooperativeSticky, a rebalance can begin right before you
1478+
// poll, and you will still receive records because no partitions are lost yet.
1479+
// The in-progress rebalance only blocks if you are assigned new partitions or
1480+
// if any of your partitions are revoked.
14741481
func BlockRebalanceOnPoll() GroupOpt {
14751482
return groupOpt{func(cfg *cfg) { cfg.blockRebalanceOnPoll = true }}
14761483
}
@@ -1674,5 +1681,9 @@ func GroupProtocol(protocol string) GroupOpt {
16741681
// AutoCommitCallback sets the callback to use if autocommitting is enabled.
16751682
// This overrides the default callback that logs errors and continues.
16761683
func AutoCommitCallback(fn func(*Client, *kmsg.OffsetCommitRequest, *kmsg.OffsetCommitResponse, error)) GroupOpt {
1677-
return groupOpt{func(cfg *cfg) { cfg.commitCallback, cfg.setCommitCallback = fn, true }}
1684+
return groupOpt{func(cfg *cfg) {
1685+
if fn != nil {
1686+
cfg.commitCallback, cfg.setCommitCallback = fn, true
1687+
}
1688+
}}
16781689
}

Diff for: pkg/kgo/consumer_group.go

+79-93
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,11 @@ type groupConsumer struct {
9797
// We store this as a pointer for address comparisons.
9898
external atomic.Value // *groupExternal
9999

100+
// See the big comment on `commit`. If we allow committing between
101+
// join&sync, we occasionally see RebalanceInProgress or
102+
// IllegalGeneration errors while cooperative consuming.
103+
noCommitDuringJoinAndSync sync.RWMutex
104+
100105
//////////////
101106
// mu block //
102107
//////////////
@@ -991,6 +996,9 @@ func (g *groupConsumer) rejoin(why string) {
991996
// Joins and then syncs, issuing the two slow requests in goroutines to allow
992997
// for group cancelation to return early.
993998
func (g *groupConsumer) joinAndSync(joinWhy string) error {
999+
g.noCommitDuringJoinAndSync.Lock()
1000+
defer g.noCommitDuringJoinAndSync.Unlock()
1001+
9941002
g.cfg.logger.Log(LogLevelInfo, "joining group", "group", g.cfg.group)
9951003
g.leader.Store(false)
9961004
g.getAndResetExternalRejoin()
@@ -2037,10 +2045,16 @@ func (g *groupConsumer) loopCommit() {
20372045
// We always commit only the head. If we are autocommitting
20382046
// dirty, then updateUncommitted updates the head to dirty
20392047
// offsets.
2048+
g.noCommitDuringJoinAndSync.RLock()
20402049
g.mu.Lock()
20412050
if !g.blockAuto {
20422051
g.cfg.logger.Log(LogLevelDebug, "autocommitting", "group", g.cfg.group)
2043-
g.commit(g.ctx, g.getUncommittedLocked(true, false), g.cfg.commitCallback)
2052+
g.commit(g.ctx, g.getUncommittedLocked(true, false), func(cl *Client, req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) {
2053+
g.noCommitDuringJoinAndSync.RUnlock()
2054+
g.cfg.commitCallback(cl, req, resp, err)
2055+
})
2056+
} else {
2057+
g.noCommitDuringJoinAndSync.RUnlock()
20442058
}
20452059
g.mu.Unlock()
20462060
}
@@ -2426,6 +2440,42 @@ func (cl *Client) CommitOffsetsSync(
24262440
g.commitOffsetsSync(ctx, uncommitted, onDone)
24272441
}
24282442

2443+
// waitJoinSyncMu is a rather insane way to try to grab a lock, but also return
2444+
// early if we have to wait and the context is canceled.
2445+
func (g *groupConsumer) waitJoinSyncMu(ctx context.Context) error {
2446+
if g.noCommitDuringJoinAndSync.TryRLock() {
2447+
return nil
2448+
}
2449+
2450+
var (
2451+
blockJoinSyncCh = make(chan struct{})
2452+
mu sync.Mutex
2453+
returned bool
2454+
maybeRUnlock = func() {
2455+
mu.Lock()
2456+
defer mu.Unlock()
2457+
if returned {
2458+
g.noCommitDuringJoinAndSync.RUnlock()
2459+
}
2460+
returned = true
2461+
}
2462+
)
2463+
2464+
go func() {
2465+
g.noCommitDuringJoinAndSync.RLock()
2466+
close(blockJoinSyncCh)
2467+
maybeRUnlock()
2468+
}()
2469+
2470+
select {
2471+
case <-blockJoinSyncCh:
2472+
return nil
2473+
case <-ctx.Done():
2474+
maybeRUnlock()
2475+
return ctx.Err()
2476+
}
2477+
}
2478+
24292479
func (g *groupConsumer) commitOffsetsSync(
24302480
ctx context.Context,
24312481
uncommitted map[string]map[int32]EpochOffset,
@@ -2443,7 +2493,12 @@ func (g *groupConsumer) commitOffsetsSync(
24432493

24442494
g.syncCommitMu.Lock() // block all other concurrent commits until our OnDone is done.
24452495

2496+
if err := g.waitJoinSyncMu(ctx); err != nil {
2497+
onDone(g.cl, nil, nil, err)
2498+
return
2499+
}
24462500
unblockCommits := func(cl *Client, req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) {
2501+
g.noCommitDuringJoinAndSync.RUnlock()
24472502
defer close(done)
24482503
defer g.syncCommitMu.Unlock()
24492504
onDone(cl, req, resp, err)
@@ -2519,8 +2574,12 @@ func (cl *Client) CommitOffsets(
25192574
}
25202575

25212576
g.syncCommitMu.RLock() // block sync commit, but allow other concurrent Commit to cancel us
2522-
2577+
if err := g.waitJoinSyncMu(ctx); err != nil {
2578+
onDone(g.cl, nil, nil, err)
2579+
return
2580+
}
25232581
unblockSyncCommit := func(cl *Client, req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) {
2582+
g.noCommitDuringJoinAndSync.RUnlock()
25242583
defer g.syncCommitMu.RUnlock()
25252584
onDone(cl, req, resp, err)
25262585
}
@@ -2554,7 +2613,24 @@ func (g *groupConsumer) defaultRevoke(context.Context, *Client, map[string][]int
25542613
}
25552614
}
25562615

2557-
// commit is the first step of actually committing; see doc below.
2616+
// The actual logic to commit. This is called under two locks:
2617+
// - g.noCommitDuringJoinAndSync.RLock()
2618+
// - g.mu.Lock()
2619+
//
2620+
// By blocking the JoinGroup from being issued, or blocking the commit on join
2621+
// & sync finishing, we avoid RebalanceInProgress and IllegalGeneration. The
2622+
// former error happens if a commit arrives to the broker between the two, the
2623+
// latter error happens when a commit arrives to the broker with the old
2624+
// generation (it was in flight before sync finished).
2625+
//
2626+
// Practically, what this means is that a user's commits will be blocked if
2627+
// they try to commit between join and sync.
2628+
//
2629+
// For eager consuming, the user should not have any partitions to commit
2630+
// anyway. For cooperative consuming, a rebalance can happen after at any
2631+
// moment. We block only revokation aspects of rebalances with
2632+
// BlockRebalanceOnPoll; we want to allow the cooperative part of rebalancing
2633+
// to occur.
25582634
func (g *groupConsumer) commit(
25592635
ctx context.Context,
25602636
uncommitted map[string]map[int32]EpochOffset,
@@ -2587,103 +2663,13 @@ func (g *groupConsumer) commit(
25872663
uncommitted = dup
25882664
}
25892665

2590-
g.commitAcrossRebalance(ctx, uncommitted, onDone, 1)
2591-
}
2592-
2593-
// commitAcrossRebalances, called under the group mu, actually issues a commit.
2594-
// This function retries committing up to *once*. In standard mode of
2595-
// consuming, if a cooperative rebalance happens, a user may commit records
2596-
// while the client is rebalancing. This can cause ILLEGAL_GENERATION or
2597-
// REBALANCE_IN_PROGRESS errors. If we see either of those errors (once, to
2598-
// prevent spin looping), we re-issue the commit. See #137 for an example.
2599-
//
2600-
// We only try this logic for a cooperative group. Non-cooperative groups give
2601-
// up all their partitions on rebalance and do not continue to consume during
2602-
// the rebalancing.
2603-
func (g *groupConsumer) commitAcrossRebalance(
2604-
ctx context.Context,
2605-
uncommitted map[string]map[int32]EpochOffset,
2606-
onDone func(*Client, *kmsg.OffsetCommitRequest, *kmsg.OffsetCommitResponse, error),
2607-
tries int8,
2608-
) {
2609-
if onDone == nil { // note we must always call onDone
2610-
onDone = func(*Client, *kmsg.OffsetCommitRequest, *kmsg.OffsetCommitResponse, error) {}
2611-
}
26122666
if len(uncommitted) == 0 { // only empty if called thru autocommit / default revoke
26132667
// We have to do this concurrently because the expectation is
26142668
// that commit itself does not block.
26152669
go onDone(g.cl, kmsg.NewPtrOffsetCommitRequest(), kmsg.NewPtrOffsetCommitResponse(), nil)
26162670
return
26172671
}
26182672

2619-
// We retry four times, for five tries total: cooperative rebalancing
2620-
// uses two back to back rebalances, and the commit could
2621-
// pathologically end during both.
2622-
if g.cooperative.Load() && tries < 5 {
2623-
origDone := onDone
2624-
onDone = func(cl *Client, req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) {
2625-
retry := err == nil
2626-
var retryErr error
2627-
2628-
// Per package docs: if all partitions indicate rebalancing
2629-
// or illegal generation, we re-issue the commit.
2630-
if retry {
2631-
checkErr:
2632-
for i := range resp.Topics {
2633-
t := &resp.Topics[i]
2634-
for j := range t.Partitions {
2635-
p := &t.Partitions[j]
2636-
retryErr = kerr.ErrorForCode(p.ErrorCode)
2637-
retry = retry && (retryErr == kerr.RebalanceInProgress || retryErr == kerr.IllegalGeneration)
2638-
if !retry {
2639-
break checkErr
2640-
}
2641-
}
2642-
}
2643-
}
2644-
2645-
if retry {
2646-
// All errors are generation or rebalance. We now check
2647-
// if we are still assigned everything in the commit.
2648-
nowAssigned := g.nowAssigned.read()
2649-
mps := make(map[int32]struct{})
2650-
checkAssign:
2651-
for i := range resp.Topics {
2652-
t := &resp.Topics[i]
2653-
ps, exists := nowAssigned[t.Topic]
2654-
if retry = exists; !exists {
2655-
break checkAssign // no longer assigned this topic
2656-
}
2657-
for p := range mps {
2658-
delete(mps, p)
2659-
}
2660-
for _, p := range ps {
2661-
mps[p] = struct{}{}
2662-
}
2663-
for j := range t.Partitions {
2664-
p := &t.Partitions[j]
2665-
_, exists := mps[p.Partition]
2666-
if retry = exists; !exists {
2667-
break checkAssign // no longer assigned this partition
2668-
}
2669-
}
2670-
}
2671-
}
2672-
2673-
if retry {
2674-
go func() {
2675-
g.cl.cfg.logger.Log(LogLevelInfo, "CommitOffsets spanned a rebalance, we are cooperative and did not lose any partition we were trying to commit, recommitting", "err", retryErr)
2676-
time.Sleep(10 * time.Millisecond)
2677-
g.mu.Lock()
2678-
defer g.mu.Unlock()
2679-
g.commitAcrossRebalance(ctx, uncommitted, origDone, tries+1)
2680-
}()
2681-
} else {
2682-
origDone(cl, req, resp, err)
2683-
}
2684-
}
2685-
}
2686-
26872673
priorCancel := g.commitCancel
26882674
priorDone := g.commitDone
26892675

Diff for: pkg/kgo/txn.go

+2
Original file line numberDiff line numberDiff line change
@@ -1077,6 +1077,8 @@ func (cl *Client) commitTransactionOffsets(
10771077

10781078
unlockTxn()
10791079

1080+
g.noCommitDuringJoinAndSync.RLock()
1081+
defer g.noCommitDuringJoinAndSync.RUnlock()
10801082
g.mu.Lock()
10811083
defer g.mu.Unlock()
10821084

0 commit comments

Comments
 (0)