diff --git a/pkg/kgo/config.go b/pkg/kgo/config.go index f10bea64..649a8ded 100644 --- a/pkg/kgo/config.go +++ b/pkg/kgo/config.go @@ -164,6 +164,8 @@ type cfg struct { adjustOffsetsBeforeAssign func(ctx context.Context, offsets map[string]map[int32]Offset) (map[string]map[int32]Offset, error) + blockRebalanceOnPoll bool + setAssigned bool setRevoked bool setLost bool @@ -1383,6 +1385,43 @@ func RequireStableFetchOffsets() GroupOpt { return groupOpt{func(cfg *cfg) { cfg.requireStable = true }} } +// BlockRebalanceOnPoll switches the client to block rebalances whenever you +// poll until you explicitly call AllowRebalance. This option also ensures that +// any OnPartitions{Assigned,Revoked,Lost} callbacks are only called when you +// allow rebalances; they cannot be called if you have polled and are +// processing records. +// +// By default, a consumer group is managed completely independently of +// consuming. A rebalance may occur at any moment. If you poll records, and +// then a rebalance happens, and then you commit, you may be committing to +// partitions you no longer own. This will result in duplicates. In the worst +// case, you could rewind commits that a different member has already made +// (risking duplicates if another rebalance were to happen before that other +// member commits again). +// +// By blocking rebalancing after you poll until you call AllowRebalances, you +// can be sure that you commit records that your member currently owns. +// However, the big tradeoff is that by blocking rebalances, you put your group +// member at risk of waiting so long that the group member is kicked from the +// group because it exceeded the rebalance timeout. To compare clients, Sarama +// takes the default choice of blocking rebalancing; this option makes kgo more +// similar to Sarama. +// +// If you use this option, you should ensure that you always process records +// quickly, and that your OnPartitions{Assigned,Revoked,Lost} callbacks are +// fast. It is recommended you also use PollRecords rather than PollFetches so +// that you can bound how many records you process at once. You must always +// AllowRebalances when you are done processing the records you received. Only +// rebalances that lose partitions are blocked; rebalances that are strictly +// net additions or non-modifications do not block (the On callbacks are always +// blocked so that you can ensure their serialization). +// +// This function can largely replace any commit logic you may want to do in +// OnPartitionsRevoked. +func BlockRebalanceOnPoll() GroupOpt { + return groupOpt{func(cfg *cfg) { cfg.blockRebalanceOnPoll = true }} +} + // AdjustFetchOffsetsFn sets the function to be called when a group is joined // after offsets are fetched for those partitions so that a user can adjust them // before consumption begins. @@ -1410,7 +1449,10 @@ func AdjustFetchOffsetsFn(adjustOffsetsBeforeAssign func(context.Context, map[st // only canceled if the client is closed. // // This function is not called concurrent with any other On callback, and this -// function is given a new map that the user is free to modify. +// function is given a new map that the user is free to modify. This function +// can be called at any time you are polling or processing records. If you want +// to ensure this function is called serially with processing, consider the +// BlockRebalanceOnPoll option. func OnPartitionsAssigned(onAssigned func(context.Context, *Client, map[string][]int32)) GroupOpt { return groupOpt{func(cfg *cfg) { cfg.onAssigned, cfg.setAssigned = onAssigned, true }} } @@ -1438,7 +1480,10 @@ func OnPartitionsAssigned(onAssigned func(context.Context, *Client, map[string][ // OnPartitionsRevoked. // // This function is not called concurrent with any other On callback, and this -// function is given a new map that the user is free to modify. +// function is given a new map that the user is free to modify. This function +// can be called at any time you are polling or processing records. If you want +// to ensure this function is called serially with processing, consider the +// BlockRebalanceOnPoll option. func OnPartitionsRevoked(onRevoked func(context.Context, *Client, map[string][]int32)) GroupOpt { return groupOpt{func(cfg *cfg) { cfg.onRevoked, cfg.setRevoked = onRevoked, true }} } @@ -1454,7 +1499,10 @@ func OnPartitionsRevoked(onRevoked func(context.Context, *Client, map[string][]i // lost and revoked, you can use OnPartitionsLostAsRevoked as a shortcut. // // This function is not called concurrent with any other On callback, and this -// function is given a new map that the user is free to modify. +// function is given a new map that the user is free to modify. This function +// can be called at any time you are polling or processing records. If you want +// to ensure this function is called serially with processing, consider the +// BlockRebalanceOnPoll option. func OnPartitionsLost(onLost func(context.Context, *Client, map[string][]int32)) GroupOpt { return groupOpt{func(cfg *cfg) { cfg.onLost, cfg.setLost = onLost, true }} } diff --git a/pkg/kgo/consumer.go b/pkg/kgo/consumer.go index df3da14b..896f984d 100644 --- a/pkg/kgo/consumer.go +++ b/pkg/kgo/consumer.go @@ -166,12 +166,74 @@ type consumer struct { sourcesReadyCond *sync.Cond sourcesReadyForDraining []*source fakeReadyForDraining []Fetch + + pollWaitMu sync.Mutex + pollWaitC *sync.Cond + pollWaitState uint64 // 0 == nothing, low 32 bits: # pollers, high 32: # waiting rebalances } func (c *consumer) loadPaused() pausedTopics { return c.paused.Load().(pausedTopics) } func (c *consumer) clonePaused() pausedTopics { return c.paused.Load().(pausedTopics).clone() } func (c *consumer) storePaused(p pausedTopics) { c.paused.Store(p) } +func (c *consumer) waitAndAddPoller() { + if !c.cl.cfg.blockRebalanceOnPoll { + return + } + c.pollWaitMu.Lock() + defer c.pollWaitMu.Unlock() + for c.pollWaitState>>32 != 0 { + c.pollWaitC.Wait() + } + // Rebalance always takes priority, but if there are no active + // rebalances, our poll blocks rebalances. + c.pollWaitState++ +} + +func (c *consumer) unaddPoller() { + if !c.cl.cfg.blockRebalanceOnPoll { + return + } + c.pollWaitMu.Lock() + defer c.pollWaitMu.Unlock() + c.pollWaitState-- + c.pollWaitC.Broadcast() +} + +func (c *consumer) allowRebalance() { + if !c.cl.cfg.blockRebalanceOnPoll { + return + } + c.pollWaitMu.Lock() + defer c.pollWaitMu.Unlock() + // When allowing rebalances, the user is explicitly saying all pollers + // are done. We mask them out. + c.pollWaitState &= math.MaxUint32 << 32 + c.pollWaitC.Broadcast() +} + +func (c *consumer) waitAndAddRebalance() { + if !c.cl.cfg.blockRebalanceOnPoll { + return + } + c.pollWaitMu.Lock() + defer c.pollWaitMu.Unlock() + c.pollWaitState += 1 << 32 + for c.pollWaitState&math.MaxUint32 != 0 { + c.pollWaitC.Wait() + } +} + +func (c *consumer) unaddRebalance() { + if !c.cl.cfg.blockRebalanceOnPoll { + return + } + c.pollWaitMu.Lock() + defer c.pollWaitMu.Unlock() + c.pollWaitState -= 1 << 32 + c.pollWaitC.Broadcast() +} + // BufferedFetchRecords returns the number of records currently buffered from // fetching within the client. // @@ -198,6 +260,7 @@ func (c *consumer) init(cl *Client) { c.cl = cl c.paused.Store(make(pausedTopics)) c.sourcesReadyCond = sync.NewCond(&c.sourcesReadyMu) + c.pollWaitC = sync.NewCond(&c.pollWaitMu) if len(cl.cfg.topics) == 0 && len(cl.cfg.partitions) == 0 { return // not consuming @@ -253,6 +316,12 @@ func (c *consumer) addFakeReadyForDraining(topic string, partition int32, err er // has no topic, a partition of 0, and a partition error of ErrClientClosed. // This can be used to detect if the client is closing and to break out of a // poll loop. +// +// If you are group consuming, a rebalance can happen under the hood while you +// process the returned fetches. This can result in duplicate work, and you may +// accidentally commit to partitions that you no longer own. You can prevent +// this by using BlockRebalanceOnPoll, but this comes with different tradeoffs. +// See the documentation on BlockRebalanceOnPoll for more information. func (cl *Client) PollFetches(ctx context.Context) Fetches { return cl.PollRecords(ctx, 0) } @@ -273,6 +342,12 @@ func (cl *Client) PollFetches(ctx context.Context) Fetches { // has no topic, a partition of 0, and a partition error of ErrClientClosed. // This can be used to detect if the client is closing and to break out of a // poll loop. +// +// If you are group consuming, a rebalance can happen under the hood while you +// process the returned fetches. This can result in duplicate work, and you may +// accidentally commit to partitions that you no longer own. You can prevent +// this by using BlockRebalanceOnPoll, but this comes with different tradeoffs. +// See the documentation on BlockRebalanceOnPoll for more information. func (cl *Client) PollRecords(ctx context.Context, maxPollRecords int) Fetches { if maxPollRecords == 0 { maxPollRecords = -1 @@ -283,6 +358,15 @@ func (cl *Client) PollRecords(ctx context.Context, maxPollRecords int) Fetches { var fetches Fetches fill := func() { + if c.cl.cfg.blockRebalanceOnPoll { + c.waitAndAddPoller() + defer func() { + if len(fetches) == 0 { + c.unaddPoller() + } + }() + } + // A group can grab the consumer lock then the group mu and // assign partitions. The group mu is grabbed to update its // uncommitted map. Assigning partitions clears sources ready @@ -387,6 +471,19 @@ func (cl *Client) PollRecords(ctx context.Context, maxPollRecords int) Fetches { return fetches } +// AllowRebalance allows a consumer group to rebalance if it was blocked by you +// polling records in tandem with the BlockRebalanceOnPoll option. +// +// You can poll many times before calling this function; this function +// internally resets the poll count and allows any blocked rebalances to +// continue. Rebalances take priority: if a rebalance is blocked, and you allow +// rebalances and then immediately poll, your poll will be blocked until the +// rebalance completes. Internally, this function simply waits for lost +// partitions to stop being fetched before allowing you to poll again. +func (cl *Client) AllowRebalance() { + cl.consumer.allowRebalance() +} + // PauseFetchTopics sets the client to no longer fetch the given topics and // returns all currently paused topics. Paused topics persist until resumed. // You can call this function with no topics to simply receive the list of @@ -546,6 +643,9 @@ func (c *consumer) purgeTopics(topics []string) { purgeAssignments[topic] = nil } + c.waitAndAddRebalance() + defer c.unaddRebalance() + c.mu.Lock() defer c.mu.Unlock() diff --git a/pkg/kgo/consumer_group.go b/pkg/kgo/consumer_group.go index c65b3052..c21b9f4a 100644 --- a/pkg/kgo/consumer_group.go +++ b/pkg/kgo/consumer_group.go @@ -152,10 +152,12 @@ func (cl *Client) LeaveGroup() { return } + c.waitAndAddRebalance() c.mu.Lock() // lock for assign c.assignPartitions(nil, assignInvalidateAll, noTopicsPartitions, "invalidating all assignments in LeaveGroup") wait := c.g.leave() c.mu.Unlock() + c.unaddRebalance() wait() // wait after we unlock } @@ -279,13 +281,9 @@ func (g *groupConsumer) manage() { } joinWhy = "rejoining after we previously errored and backed off" - hook := func() { - g.cfg.hooks.each(func(h Hook) { - if h, ok := h.(HookGroupManageError); ok { - h.OnGroupManageError(err) - } - }) - } + // If the user has BlockPollOnRebalance enabled, we have to + // block around the onLost and assigning. + g.c.waitAndAddRebalance() if errors.Is(err, context.Canceled) && g.cfg.onRevoked != nil { // The cooperative consumer does not revoke everything @@ -308,10 +306,21 @@ func (g *groupConsumer) manage() { if g.cfg.onLost != nil { g.cfg.onLost(g.cl.ctx, g.cl, g.nowAssigned.read()) } - hook() + g.cfg.hooks.each(func(h Hook) { + if h, ok := h.(HookGroupManageError); ok { + h.OnGroupManageError(err) + } + }) } - // We need to invalidate everything from an error return. + // If we are eager, we should have invalidated everything + // before getting here, but we do so doubly just in case. + // + // If we are cooperative, the join and sync could have failed + // during the cooperative rebalance where we were still + // consuming. We need to invalidate everything. Waiting to + // resume from poll is necessary, but the user will likely be + // unable to commit. { g.c.mu.Lock() g.c.assignPartitions(nil, assignInvalidateAll, nil, "clearing assignment at end of group management session") @@ -328,6 +337,10 @@ func (g *groupConsumer) manage() { g.resetExternal() } + // Unblock bolling now that we have called onLost and + // re-assigned. + g.c.unaddRebalance() + if errors.Is(err, context.Canceled) { // context was canceled, quit now return } @@ -480,6 +493,9 @@ const ( // Lastly, for cooperative consumers, this must selectively delete what was // lost from the uncommitted map. func (g *groupConsumer) revoke(stage revokeStage, lost map[string][]int32, leaving bool) { + g.c.waitAndAddRebalance() + defer g.c.unaddRebalance() + if !g.cooperative || leaving { // stage == revokeThisSession if not cooperative // If we are an eager consumer, we stop fetching all of our // current partitions as we will be revoking them. @@ -648,6 +664,10 @@ func (s *assignRevokeSession) assign(g *groupConsumer, newAssigned map[string][] // We always call on assigned, even if nothing new is // assigned. This allows consumers to know that // assignment is done and do setup logic. + // + // If configured, we have to block polling. + g.c.waitAndAddRebalance() + defer g.c.unaddRebalance() g.cfg.onAssigned(g.cl.ctx, g.cl, newAssigned) } }() @@ -722,21 +742,8 @@ func (g *groupConsumer) setupAssignedAndHeartbeat() (string, error) { added = g.adjustCooperativeFetchOffsets(added, lost) } - if len(added) > 0 { - go func() { - defer close(fetchDone) - defer close(fetchErrCh) - fetchErrCh <- g.fetchOffsets(ctx, added) - }() - } else { - close(fetchDone) - close(fetchErrCh) - } - - // Before we return, we also want to ensure that the user's onAssign is - // done. - // - // Ensuring assigning is done ensures two things: + // Before we fetch offsets, we wait for the user's onAssign callback to + // be done. This ensures a few things: // // * that we wait for for prerevoking to be done, which updates the // uncommitted field. Waiting for that ensures that a rejoin and poll @@ -744,11 +751,25 @@ func (g *groupConsumer) setupAssignedAndHeartbeat() (string, error) { // // * that our onLost will not be concurrent with onAssign // + // * that the user can start up any per-partition processors necessary + // before we begin consuming that partition. + // // We especially need to wait here because heartbeating may not // necessarily run onRevoke before returning (because of a fatal // error). s.assign(g, added) - defer func() { <-s.assignDone }() + <-s.assignDone + + if len(added) > 0 { + go func() { + defer close(fetchDone) + defer close(fetchErrCh) + fetchErrCh <- g.fetchOffsets(ctx, added) + }() + } else { + close(fetchDone) + close(fetchErrCh) + } // Finally, we simply return whatever the heartbeat error is. This will // be the fetch offset error if that function is what killed this. @@ -2020,7 +2041,9 @@ func (g *groupConsumer) getUncommittedLocked(head, dirty bool) map[string]map[in // group rebalance occurs before or while this function is being executed. You // can avoid this scenario by calling CommitRecords in a custom OnRevoked, but // for most workloads, a small bit of potential duplicate processing is fine. -// See the documentation on DisableAutoCommit for more details. +// See the documentation on DisableAutoCommit for more details. You can also +// avoid this problem by using BlockRebalanceOnCommit, but that option comes +// with its own tradeoffs (refer to its documentation). // // It is recommended to always commit records in order (per partition). If you // call this function twice with record for partition 0 at offset 999 @@ -2144,7 +2167,9 @@ func (cl *Client) MarkCommitRecords(rs ...*Record) { // group rebalance occurs before or while this function is being executed. You // can avoid this scenario by calling CommitRecords in a custom OnRevoked, but // for most workloads, a small bit of potential duplicate processing is fine. -// See the documentation on DisableAutoCommit for more details. +// See the documentation on DisableAutoCommit for more details. You can also +// avoid this problem by using BlockRebalanceOnCommit, but that option comes +// with its own tradeoffs (refer to its documentation). // // The recommended pattern for using this function is to have a poll / process // / commit loop. First PollFetches, then process every record, then call