diff --git a/pkg/kgo/config.go b/pkg/kgo/config.go index f10bea64..8fd3b8de 100644 --- a/pkg/kgo/config.go +++ b/pkg/kgo/config.go @@ -164,6 +164,8 @@ type cfg struct { adjustOffsetsBeforeAssign func(ctx context.Context, offsets map[string]map[int32]Offset) (map[string]map[int32]Offset, error) + blockRebalanceOnPoll bool + setAssigned bool setRevoked bool setLost bool @@ -1383,6 +1385,36 @@ func RequireStableFetchOffsets() GroupOpt { return groupOpt{func(cfg *cfg) { cfg.requireStable = true }} } +// BlockRebalanceOnPoll switches the client to block rebalances whenever you +// poll until you explicitly call AllowRebalance. +// +// By default, a consumer group is managed completely independently of +// consuming. A rebalance may occur at any moment. If you poll records, and +// then a rebalance happens, and then you commit, you may be committing to +// partitions you no longer own. This will result in duplicates. In the worst +// case, you could rewind commits that a different member has already made +// (risking duplicates if another rebalance were to happen before that other +// member commits again). +// +// By blocking rebalancing after you poll until you call AllowRebalances, you +// can be sure that you commit records that your member currently owns. +// However, the big tradeoff is that by blocking rebalances, you put your group +// member at risk of waiting so long that the group member is kicked from the +// group because it exceeded the rebalance timeout. To compare clients, Sarama +// takes the default choice of blocking rebalancing; this option makes kgo more +// similar to Sarama. +// +// If you use this option, you should ensure that you always process records +// quickly. It is recommended you also use PollRecords rather than PollFetches +// so that you can bound how many records you process at once. You must always +// AllowRebalances when you are done processing the records you received. +// +// This function can largely replace any commit logic you may want to do in +// OnPartitionsRevoked. +func BlockRebalanceOnPoll() GroupOpt { + return groupOpt{func(cfg *cfg) { cfg.blockRebalanceOnPoll = true }} +} + // AdjustFetchOffsetsFn sets the function to be called when a group is joined // after offsets are fetched for those partitions so that a user can adjust them // before consumption begins. diff --git a/pkg/kgo/consumer.go b/pkg/kgo/consumer.go index df3da14b..629b8ce7 100644 --- a/pkg/kgo/consumer.go +++ b/pkg/kgo/consumer.go @@ -166,12 +166,59 @@ type consumer struct { sourcesReadyCond *sync.Cond sourcesReadyForDraining []*source fakeReadyForDraining []Fetch + + pollWaitMu sync.Mutex + pollWaitC *sync.Cond + pollWaitState uint64 // 0 == nothing, low 32 bits: # pollers, high 32: # waiting rebalances } func (c *consumer) loadPaused() pausedTopics { return c.paused.Load().(pausedTopics) } func (c *consumer) clonePaused() pausedTopics { return c.paused.Load().(pausedTopics).clone() } func (c *consumer) storePaused(p pausedTopics) { c.paused.Store(p) } +func (c *consumer) waitAndAddPoller() { + c.pollWaitMu.Lock() + defer c.pollWaitMu.Unlock() + for c.pollWaitState>>32 != 0 { + c.pollWaitC.Wait() + } + // Rebalance always takes priority, but if there are no active + // rebalances, our poll blocks rebalances. + c.pollWaitState++ +} + +func (c *consumer) unaddPoller() { + c.pollWaitMu.Lock() + defer c.pollWaitMu.Unlock() + c.pollWaitState-- + c.pollWaitC.Broadcast() +} + +func (c *consumer) allowRebalance() { + c.pollWaitMu.Lock() + defer c.pollWaitMu.Unlock() + // When allowing rebalances, the user is explicitly saying all pollers + // are done. We mask them out. + c.pollWaitState &= math.MaxUint32 << 32 + c.pollWaitC.Broadcast() +} + +func (c *consumer) waitAndAddRebalance() { + c.pollWaitMu.Lock() + defer c.pollWaitMu.Unlock() + c.pollWaitState += 1 << 32 + for c.pollWaitState&math.MaxUint32 != 0 { + c.pollWaitC.Wait() + } +} + +func (c *consumer) unaddRebalance() { + c.pollWaitMu.Lock() + defer c.pollWaitMu.Unlock() + c.pollWaitState -= 1 << 32 + c.pollWaitC.Broadcast() +} + // BufferedFetchRecords returns the number of records currently buffered from // fetching within the client. // @@ -198,6 +245,7 @@ func (c *consumer) init(cl *Client) { c.cl = cl c.paused.Store(make(pausedTopics)) c.sourcesReadyCond = sync.NewCond(&c.sourcesReadyMu) + c.pollWaitC = sync.NewCond(&c.pollWaitMu) if len(cl.cfg.topics) == 0 && len(cl.cfg.partitions) == 0 { return // not consuming @@ -240,6 +288,14 @@ func (c *consumer) addFakeReadyForDraining(topic string, partition int32, err er c.sourcesReadyCond.Broadcast() } +func (c *consumer) pollWait(fn func()) { + if c.cl.cfg.blockRebalanceOnPoll { + c.waitAndAddRebalance() + defer c.unaddRebalance() + } + fn() +} + // PollFetches waits for fetches to be available, returning as soon as any // broker returns a fetch. If the context quits, this function quits. If the // context is nil or is already canceled, this function will return immediately @@ -253,6 +309,12 @@ func (c *consumer) addFakeReadyForDraining(topic string, partition int32, err er // has no topic, a partition of 0, and a partition error of ErrClientClosed. // This can be used to detect if the client is closing and to break out of a // poll loop. +// +// If you are group consuming, a rebalance can happen under the hood while you +// process the returned fetches. This can result in duplicate work, and you may +// accidentally commit to partitions that you no longer own. You can prevent +// this by using BlockRebalanceOnPoll, but this comes with different tradeoffs. +// See the documentation on BlockRebalanceOnPoll for more information. func (cl *Client) PollFetches(ctx context.Context) Fetches { return cl.PollRecords(ctx, 0) } @@ -273,6 +335,12 @@ func (cl *Client) PollFetches(ctx context.Context) Fetches { // has no topic, a partition of 0, and a partition error of ErrClientClosed. // This can be used to detect if the client is closing and to break out of a // poll loop. +// +// If you are group consuming, a rebalance can happen under the hood while you +// process the returned fetches. This can result in duplicate work, and you may +// accidentally commit to partitions that you no longer own. You can prevent +// this by using BlockRebalanceOnPoll, but this comes with different tradeoffs. +// See the documentation on BlockRebalanceOnPoll for more information. func (cl *Client) PollRecords(ctx context.Context, maxPollRecords int) Fetches { if maxPollRecords == 0 { maxPollRecords = -1 @@ -283,6 +351,15 @@ func (cl *Client) PollRecords(ctx context.Context, maxPollRecords int) Fetches { var fetches Fetches fill := func() { + if c.cl.cfg.blockRebalanceOnPoll { + c.waitAndAddPoller() + defer func() { + if len(fetches) == 0 { + c.unaddPoller() + } + }() + } + // A group can grab the consumer lock then the group mu and // assign partitions. The group mu is grabbed to update its // uncommitted map. Assigning partitions clears sources ready @@ -387,6 +464,19 @@ func (cl *Client) PollRecords(ctx context.Context, maxPollRecords int) Fetches { return fetches } +// AllowRebalance allows a consumer group to rebalance if it was blocked by you +// polling records in tandem with the BlockRebalanceOnPoll option. +// +// You can poll many times before calling this function; this function +// internally resets the poll count and allows any blocked rebalances to +// continue. Rebalances take priority: if a rebalance is blocked, and you allow +// rebalances and then immediately poll, your poll will be blocked until the +// rebalance completes. Internally, this function simply waits for lost +// partitions to stop being fetched before allowing you to poll again. +func (cl *Client) AllowRebalance() { + cl.consumer.allowRebalance() +} + // PauseFetchTopics sets the client to no longer fetch the given topics and // returns all currently paused topics. Paused topics persist until resumed. // You can call this function with no topics to simply receive the list of @@ -546,29 +636,31 @@ func (c *consumer) purgeTopics(topics []string) { purgeAssignments[topic] = nil } - c.mu.Lock() - defer c.mu.Unlock() + c.pollWait(func() { + c.mu.Lock() + defer c.mu.Unlock() - // The difference for groups is we need to lock the group and there is - // a slight type difference in g.using vs d.using. - if c.g != nil { - c.g.mu.Lock() - defer c.g.mu.Unlock() - c.g.tps.purgeTopics(topics) - c.assignPartitions(purgeAssignments, assignPurgeMatching, c.g.tps, fmt.Sprintf("purge of %v requested", topics)) - for _, topic := range topics { - delete(c.g.using, topic) - delete(c.g.reSeen, topic) - } - c.g.rejoin("rejoin from PurgeFetchTopics") - } else { - c.d.tps.purgeTopics(topics) - c.assignPartitions(purgeAssignments, assignPurgeMatching, c.d.tps, fmt.Sprintf("purge of %v requested", topics)) - for _, topic := range topics { - delete(c.d.using, topic) - delete(c.d.reSeen, topic) + // The difference for groups is we need to lock the group and there is + // a slight type difference in g.using vs d.using. + if c.g != nil { + c.g.mu.Lock() + defer c.g.mu.Unlock() + c.g.tps.purgeTopics(topics) + c.assignPartitions(purgeAssignments, assignPurgeMatching, c.g.tps, fmt.Sprintf("purge of %v requested", topics)) + for _, topic := range topics { + delete(c.g.using, topic) + delete(c.g.reSeen, topic) + } + c.g.rejoin("rejoin from PurgeFetchTopics") + } else { + c.d.tps.purgeTopics(topics) + c.assignPartitions(purgeAssignments, assignPurgeMatching, c.d.tps, fmt.Sprintf("purge of %v requested", topics)) + for _, topic := range topics { + delete(c.d.using, topic) + delete(c.d.reSeen, topic) + } } - } + }) } // AddConsumeTopics adds new topics to be consumed. This function is a no-op if diff --git a/pkg/kgo/consumer_group.go b/pkg/kgo/consumer_group.go index c65b3052..a2b208a5 100644 --- a/pkg/kgo/consumer_group.go +++ b/pkg/kgo/consumer_group.go @@ -152,11 +152,13 @@ func (cl *Client) LeaveGroup() { return } - c.mu.Lock() // lock for assign - c.assignPartitions(nil, assignInvalidateAll, noTopicsPartitions, "invalidating all assignments in LeaveGroup") - wait := c.g.leave() - c.mu.Unlock() - + var wait func() + c.pollWait(func() { + c.mu.Lock() // lock for assign + defer c.mu.Unlock() + c.assignPartitions(nil, assignInvalidateAll, noTopicsPartitions, "invalidating all assignments in LeaveGroup") + wait = c.g.leave() + }) wait() // wait after we unlock } @@ -311,14 +313,23 @@ func (g *groupConsumer) manage() { hook() } - // We need to invalidate everything from an error return. + // If we are eager, we should have invalidated everything + // before getting here, but we do so doubly just in case. + // + // If we are cooperative, the join and sync could have failed + // during the cooperative rebalance where we were still + // consuming. We need to invalidate everything. Waiting to + // resume from poll is necessary, but the user will likely be + // unable to commit. { - g.c.mu.Lock() - g.c.assignPartitions(nil, assignInvalidateAll, nil, "clearing assignment at end of group management session") - g.mu.Lock() // before allowing poll to touch uncommitted, lock the group - g.c.mu.Unlock() // now part of poll can continue - g.uncommitted = nil - g.mu.Unlock() + g.c.pollWait(func() { + g.c.mu.Lock() + g.c.assignPartitions(nil, assignInvalidateAll, nil, "clearing assignment at end of group management session") + g.mu.Lock() // before allowing poll to touch uncommitted, lock the group + g.c.mu.Unlock() // now part of poll can continue + g.uncommitted = nil + g.mu.Unlock() + }) g.nowAssigned.store(nil) g.lastAssigned = nil @@ -483,13 +494,15 @@ func (g *groupConsumer) revoke(stage revokeStage, lost map[string][]int32, leavi if !g.cooperative || leaving { // stage == revokeThisSession if not cooperative // If we are an eager consumer, we stop fetching all of our // current partitions as we will be revoking them. - g.c.mu.Lock() - if leaving { - g.c.assignPartitions(nil, assignInvalidateAll, nil, "revoking all assignments because we are leaving the group") - } else { - g.c.assignPartitions(nil, assignInvalidateAll, nil, "revoking all assignments because we are not cooperative") - } - g.c.mu.Unlock() + g.c.pollWait(func() { + g.c.mu.Lock() + defer g.c.mu.Unlock() + if leaving { + g.c.assignPartitions(nil, assignInvalidateAll, nil, "revoking all assignments because we are leaving the group") + } else { + g.c.assignPartitions(nil, assignInvalidateAll, nil, "revoking all assignments because we are not cooperative") + } + }) if !g.cooperative { g.cfg.logger.Log(LogLevelInfo, "eager consumer revoking prior assigned partitions", "group", g.cfg.group, "revoking", g.nowAssigned.read()) @@ -562,9 +575,11 @@ func (g *groupConsumer) revoke(stage revokeStage, lost map[string][]int32, leavi // for the final polled offsets. We do not want to allow the // logical race of allowing fetches for revoked partitions // after a revoke but before an invalidation. - g.c.mu.Lock() - g.c.assignPartitions(lostOffsets, assignInvalidateMatching, g.tps, "revoking assignments from cooperative consuming") - g.c.mu.Unlock() + g.c.pollWait(func() { + g.c.mu.Lock() + defer g.c.mu.Unlock() + g.c.assignPartitions(lostOffsets, assignInvalidateMatching, g.tps, "revoking assignments from cooperative consuming") + }) } if len(lost) > 0 || stage == revokeThisSession { @@ -2020,7 +2035,9 @@ func (g *groupConsumer) getUncommittedLocked(head, dirty bool) map[string]map[in // group rebalance occurs before or while this function is being executed. You // can avoid this scenario by calling CommitRecords in a custom OnRevoked, but // for most workloads, a small bit of potential duplicate processing is fine. -// See the documentation on DisableAutoCommit for more details. +// See the documentation on DisableAutoCommit for more details. You can also +// avoid this problem by using BlockRebalanceOnCommit, but that option comes +// with its own tradeoffs (refer to its documentation). // // It is recommended to always commit records in order (per partition). If you // call this function twice with record for partition 0 at offset 999 @@ -2144,7 +2161,9 @@ func (cl *Client) MarkCommitRecords(rs ...*Record) { // group rebalance occurs before or while this function is being executed. You // can avoid this scenario by calling CommitRecords in a custom OnRevoked, but // for most workloads, a small bit of potential duplicate processing is fine. -// See the documentation on DisableAutoCommit for more details. +// See the documentation on DisableAutoCommit for more details. You can also +// avoid this problem by using BlockRebalanceOnCommit, but that option comes +// with its own tradeoffs (refer to its documentation). // // The recommended pattern for using this function is to have a poll / process // / commit loop. First PollFetches, then process every record, then call