From cffbee735a31ec6662922ea4d1386d5f161c4694 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Tue, 1 Mar 2022 01:12:31 -0700 Subject: [PATCH] consumer: add BlockRebalancesOnPoll option, AllowRebalances One strength of Sarama's consumer group is that it is easier to reason about where record processing logic falls within the group management lifecycle: if you are processing records, you can be sure that the group will not rebalance underneath you. This is also a risk, though, if your processing logic is so long that your group member is booted from the group. This client took the opposite approach of separating the group management logic from the consuming logic. This essentially eliminated the risk of long processing booting the member, but made it much more difficult to reason about when to commit. There are plenty of disclaimers about potential duplicates, and any non-transactional consumer should expect duplicates at some point, but if a user wants to opt for the simpler approach of consuming & processing within one group generation, we should support that. In fact, if a user's processing loop is very fast, we really should encourage it. Helps #137. --- pkg/kgo/config.go | 54 ++++++++++++++++++-- pkg/kgo/consumer.go | 100 ++++++++++++++++++++++++++++++++++++++ pkg/kgo/consumer_group.go | 79 ++++++++++++++++++++---------- 3 files changed, 203 insertions(+), 30 deletions(-) diff --git a/pkg/kgo/config.go b/pkg/kgo/config.go index f10bea64..649a8ded 100644 --- a/pkg/kgo/config.go +++ b/pkg/kgo/config.go @@ -164,6 +164,8 @@ type cfg struct { adjustOffsetsBeforeAssign func(ctx context.Context, offsets map[string]map[int32]Offset) (map[string]map[int32]Offset, error) + blockRebalanceOnPoll bool + setAssigned bool setRevoked bool setLost bool @@ -1383,6 +1385,43 @@ func RequireStableFetchOffsets() GroupOpt { return groupOpt{func(cfg *cfg) { cfg.requireStable = true }} } +// BlockRebalanceOnPoll switches the client to block rebalances whenever you +// poll until you explicitly call AllowRebalance. This option also ensures that +// any OnPartitions{Assigned,Revoked,Lost} callbacks are only called when you +// allow rebalances; they cannot be called if you have polled and are +// processing records. +// +// By default, a consumer group is managed completely independently of +// consuming. A rebalance may occur at any moment. If you poll records, and +// then a rebalance happens, and then you commit, you may be committing to +// partitions you no longer own. This will result in duplicates. In the worst +// case, you could rewind commits that a different member has already made +// (risking duplicates if another rebalance were to happen before that other +// member commits again). +// +// By blocking rebalancing after you poll until you call AllowRebalances, you +// can be sure that you commit records that your member currently owns. +// However, the big tradeoff is that by blocking rebalances, you put your group +// member at risk of waiting so long that the group member is kicked from the +// group because it exceeded the rebalance timeout. To compare clients, Sarama +// takes the default choice of blocking rebalancing; this option makes kgo more +// similar to Sarama. +// +// If you use this option, you should ensure that you always process records +// quickly, and that your OnPartitions{Assigned,Revoked,Lost} callbacks are +// fast. It is recommended you also use PollRecords rather than PollFetches so +// that you can bound how many records you process at once. You must always +// AllowRebalances when you are done processing the records you received. Only +// rebalances that lose partitions are blocked; rebalances that are strictly +// net additions or non-modifications do not block (the On callbacks are always +// blocked so that you can ensure their serialization). +// +// This function can largely replace any commit logic you may want to do in +// OnPartitionsRevoked. +func BlockRebalanceOnPoll() GroupOpt { + return groupOpt{func(cfg *cfg) { cfg.blockRebalanceOnPoll = true }} +} + // AdjustFetchOffsetsFn sets the function to be called when a group is joined // after offsets are fetched for those partitions so that a user can adjust them // before consumption begins. @@ -1410,7 +1449,10 @@ func AdjustFetchOffsetsFn(adjustOffsetsBeforeAssign func(context.Context, map[st // only canceled if the client is closed. // // This function is not called concurrent with any other On callback, and this -// function is given a new map that the user is free to modify. +// function is given a new map that the user is free to modify. This function +// can be called at any time you are polling or processing records. If you want +// to ensure this function is called serially with processing, consider the +// BlockRebalanceOnPoll option. func OnPartitionsAssigned(onAssigned func(context.Context, *Client, map[string][]int32)) GroupOpt { return groupOpt{func(cfg *cfg) { cfg.onAssigned, cfg.setAssigned = onAssigned, true }} } @@ -1438,7 +1480,10 @@ func OnPartitionsAssigned(onAssigned func(context.Context, *Client, map[string][ // OnPartitionsRevoked. // // This function is not called concurrent with any other On callback, and this -// function is given a new map that the user is free to modify. +// function is given a new map that the user is free to modify. This function +// can be called at any time you are polling or processing records. If you want +// to ensure this function is called serially with processing, consider the +// BlockRebalanceOnPoll option. func OnPartitionsRevoked(onRevoked func(context.Context, *Client, map[string][]int32)) GroupOpt { return groupOpt{func(cfg *cfg) { cfg.onRevoked, cfg.setRevoked = onRevoked, true }} } @@ -1454,7 +1499,10 @@ func OnPartitionsRevoked(onRevoked func(context.Context, *Client, map[string][]i // lost and revoked, you can use OnPartitionsLostAsRevoked as a shortcut. // // This function is not called concurrent with any other On callback, and this -// function is given a new map that the user is free to modify. +// function is given a new map that the user is free to modify. This function +// can be called at any time you are polling or processing records. If you want +// to ensure this function is called serially with processing, consider the +// BlockRebalanceOnPoll option. func OnPartitionsLost(onLost func(context.Context, *Client, map[string][]int32)) GroupOpt { return groupOpt{func(cfg *cfg) { cfg.onLost, cfg.setLost = onLost, true }} } diff --git a/pkg/kgo/consumer.go b/pkg/kgo/consumer.go index df3da14b..896f984d 100644 --- a/pkg/kgo/consumer.go +++ b/pkg/kgo/consumer.go @@ -166,12 +166,74 @@ type consumer struct { sourcesReadyCond *sync.Cond sourcesReadyForDraining []*source fakeReadyForDraining []Fetch + + pollWaitMu sync.Mutex + pollWaitC *sync.Cond + pollWaitState uint64 // 0 == nothing, low 32 bits: # pollers, high 32: # waiting rebalances } func (c *consumer) loadPaused() pausedTopics { return c.paused.Load().(pausedTopics) } func (c *consumer) clonePaused() pausedTopics { return c.paused.Load().(pausedTopics).clone() } func (c *consumer) storePaused(p pausedTopics) { c.paused.Store(p) } +func (c *consumer) waitAndAddPoller() { + if !c.cl.cfg.blockRebalanceOnPoll { + return + } + c.pollWaitMu.Lock() + defer c.pollWaitMu.Unlock() + for c.pollWaitState>>32 != 0 { + c.pollWaitC.Wait() + } + // Rebalance always takes priority, but if there are no active + // rebalances, our poll blocks rebalances. + c.pollWaitState++ +} + +func (c *consumer) unaddPoller() { + if !c.cl.cfg.blockRebalanceOnPoll { + return + } + c.pollWaitMu.Lock() + defer c.pollWaitMu.Unlock() + c.pollWaitState-- + c.pollWaitC.Broadcast() +} + +func (c *consumer) allowRebalance() { + if !c.cl.cfg.blockRebalanceOnPoll { + return + } + c.pollWaitMu.Lock() + defer c.pollWaitMu.Unlock() + // When allowing rebalances, the user is explicitly saying all pollers + // are done. We mask them out. + c.pollWaitState &= math.MaxUint32 << 32 + c.pollWaitC.Broadcast() +} + +func (c *consumer) waitAndAddRebalance() { + if !c.cl.cfg.blockRebalanceOnPoll { + return + } + c.pollWaitMu.Lock() + defer c.pollWaitMu.Unlock() + c.pollWaitState += 1 << 32 + for c.pollWaitState&math.MaxUint32 != 0 { + c.pollWaitC.Wait() + } +} + +func (c *consumer) unaddRebalance() { + if !c.cl.cfg.blockRebalanceOnPoll { + return + } + c.pollWaitMu.Lock() + defer c.pollWaitMu.Unlock() + c.pollWaitState -= 1 << 32 + c.pollWaitC.Broadcast() +} + // BufferedFetchRecords returns the number of records currently buffered from // fetching within the client. // @@ -198,6 +260,7 @@ func (c *consumer) init(cl *Client) { c.cl = cl c.paused.Store(make(pausedTopics)) c.sourcesReadyCond = sync.NewCond(&c.sourcesReadyMu) + c.pollWaitC = sync.NewCond(&c.pollWaitMu) if len(cl.cfg.topics) == 0 && len(cl.cfg.partitions) == 0 { return // not consuming @@ -253,6 +316,12 @@ func (c *consumer) addFakeReadyForDraining(topic string, partition int32, err er // has no topic, a partition of 0, and a partition error of ErrClientClosed. // This can be used to detect if the client is closing and to break out of a // poll loop. +// +// If you are group consuming, a rebalance can happen under the hood while you +// process the returned fetches. This can result in duplicate work, and you may +// accidentally commit to partitions that you no longer own. You can prevent +// this by using BlockRebalanceOnPoll, but this comes with different tradeoffs. +// See the documentation on BlockRebalanceOnPoll for more information. func (cl *Client) PollFetches(ctx context.Context) Fetches { return cl.PollRecords(ctx, 0) } @@ -273,6 +342,12 @@ func (cl *Client) PollFetches(ctx context.Context) Fetches { // has no topic, a partition of 0, and a partition error of ErrClientClosed. // This can be used to detect if the client is closing and to break out of a // poll loop. +// +// If you are group consuming, a rebalance can happen under the hood while you +// process the returned fetches. This can result in duplicate work, and you may +// accidentally commit to partitions that you no longer own. You can prevent +// this by using BlockRebalanceOnPoll, but this comes with different tradeoffs. +// See the documentation on BlockRebalanceOnPoll for more information. func (cl *Client) PollRecords(ctx context.Context, maxPollRecords int) Fetches { if maxPollRecords == 0 { maxPollRecords = -1 @@ -283,6 +358,15 @@ func (cl *Client) PollRecords(ctx context.Context, maxPollRecords int) Fetches { var fetches Fetches fill := func() { + if c.cl.cfg.blockRebalanceOnPoll { + c.waitAndAddPoller() + defer func() { + if len(fetches) == 0 { + c.unaddPoller() + } + }() + } + // A group can grab the consumer lock then the group mu and // assign partitions. The group mu is grabbed to update its // uncommitted map. Assigning partitions clears sources ready @@ -387,6 +471,19 @@ func (cl *Client) PollRecords(ctx context.Context, maxPollRecords int) Fetches { return fetches } +// AllowRebalance allows a consumer group to rebalance if it was blocked by you +// polling records in tandem with the BlockRebalanceOnPoll option. +// +// You can poll many times before calling this function; this function +// internally resets the poll count and allows any blocked rebalances to +// continue. Rebalances take priority: if a rebalance is blocked, and you allow +// rebalances and then immediately poll, your poll will be blocked until the +// rebalance completes. Internally, this function simply waits for lost +// partitions to stop being fetched before allowing you to poll again. +func (cl *Client) AllowRebalance() { + cl.consumer.allowRebalance() +} + // PauseFetchTopics sets the client to no longer fetch the given topics and // returns all currently paused topics. Paused topics persist until resumed. // You can call this function with no topics to simply receive the list of @@ -546,6 +643,9 @@ func (c *consumer) purgeTopics(topics []string) { purgeAssignments[topic] = nil } + c.waitAndAddRebalance() + defer c.unaddRebalance() + c.mu.Lock() defer c.mu.Unlock() diff --git a/pkg/kgo/consumer_group.go b/pkg/kgo/consumer_group.go index c65b3052..c21b9f4a 100644 --- a/pkg/kgo/consumer_group.go +++ b/pkg/kgo/consumer_group.go @@ -152,10 +152,12 @@ func (cl *Client) LeaveGroup() { return } + c.waitAndAddRebalance() c.mu.Lock() // lock for assign c.assignPartitions(nil, assignInvalidateAll, noTopicsPartitions, "invalidating all assignments in LeaveGroup") wait := c.g.leave() c.mu.Unlock() + c.unaddRebalance() wait() // wait after we unlock } @@ -279,13 +281,9 @@ func (g *groupConsumer) manage() { } joinWhy = "rejoining after we previously errored and backed off" - hook := func() { - g.cfg.hooks.each(func(h Hook) { - if h, ok := h.(HookGroupManageError); ok { - h.OnGroupManageError(err) - } - }) - } + // If the user has BlockPollOnRebalance enabled, we have to + // block around the onLost and assigning. + g.c.waitAndAddRebalance() if errors.Is(err, context.Canceled) && g.cfg.onRevoked != nil { // The cooperative consumer does not revoke everything @@ -308,10 +306,21 @@ func (g *groupConsumer) manage() { if g.cfg.onLost != nil { g.cfg.onLost(g.cl.ctx, g.cl, g.nowAssigned.read()) } - hook() + g.cfg.hooks.each(func(h Hook) { + if h, ok := h.(HookGroupManageError); ok { + h.OnGroupManageError(err) + } + }) } - // We need to invalidate everything from an error return. + // If we are eager, we should have invalidated everything + // before getting here, but we do so doubly just in case. + // + // If we are cooperative, the join and sync could have failed + // during the cooperative rebalance where we were still + // consuming. We need to invalidate everything. Waiting to + // resume from poll is necessary, but the user will likely be + // unable to commit. { g.c.mu.Lock() g.c.assignPartitions(nil, assignInvalidateAll, nil, "clearing assignment at end of group management session") @@ -328,6 +337,10 @@ func (g *groupConsumer) manage() { g.resetExternal() } + // Unblock bolling now that we have called onLost and + // re-assigned. + g.c.unaddRebalance() + if errors.Is(err, context.Canceled) { // context was canceled, quit now return } @@ -480,6 +493,9 @@ const ( // Lastly, for cooperative consumers, this must selectively delete what was // lost from the uncommitted map. func (g *groupConsumer) revoke(stage revokeStage, lost map[string][]int32, leaving bool) { + g.c.waitAndAddRebalance() + defer g.c.unaddRebalance() + if !g.cooperative || leaving { // stage == revokeThisSession if not cooperative // If we are an eager consumer, we stop fetching all of our // current partitions as we will be revoking them. @@ -648,6 +664,10 @@ func (s *assignRevokeSession) assign(g *groupConsumer, newAssigned map[string][] // We always call on assigned, even if nothing new is // assigned. This allows consumers to know that // assignment is done and do setup logic. + // + // If configured, we have to block polling. + g.c.waitAndAddRebalance() + defer g.c.unaddRebalance() g.cfg.onAssigned(g.cl.ctx, g.cl, newAssigned) } }() @@ -722,21 +742,8 @@ func (g *groupConsumer) setupAssignedAndHeartbeat() (string, error) { added = g.adjustCooperativeFetchOffsets(added, lost) } - if len(added) > 0 { - go func() { - defer close(fetchDone) - defer close(fetchErrCh) - fetchErrCh <- g.fetchOffsets(ctx, added) - }() - } else { - close(fetchDone) - close(fetchErrCh) - } - - // Before we return, we also want to ensure that the user's onAssign is - // done. - // - // Ensuring assigning is done ensures two things: + // Before we fetch offsets, we wait for the user's onAssign callback to + // be done. This ensures a few things: // // * that we wait for for prerevoking to be done, which updates the // uncommitted field. Waiting for that ensures that a rejoin and poll @@ -744,11 +751,25 @@ func (g *groupConsumer) setupAssignedAndHeartbeat() (string, error) { // // * that our onLost will not be concurrent with onAssign // + // * that the user can start up any per-partition processors necessary + // before we begin consuming that partition. + // // We especially need to wait here because heartbeating may not // necessarily run onRevoke before returning (because of a fatal // error). s.assign(g, added) - defer func() { <-s.assignDone }() + <-s.assignDone + + if len(added) > 0 { + go func() { + defer close(fetchDone) + defer close(fetchErrCh) + fetchErrCh <- g.fetchOffsets(ctx, added) + }() + } else { + close(fetchDone) + close(fetchErrCh) + } // Finally, we simply return whatever the heartbeat error is. This will // be the fetch offset error if that function is what killed this. @@ -2020,7 +2041,9 @@ func (g *groupConsumer) getUncommittedLocked(head, dirty bool) map[string]map[in // group rebalance occurs before or while this function is being executed. You // can avoid this scenario by calling CommitRecords in a custom OnRevoked, but // for most workloads, a small bit of potential duplicate processing is fine. -// See the documentation on DisableAutoCommit for more details. +// See the documentation on DisableAutoCommit for more details. You can also +// avoid this problem by using BlockRebalanceOnCommit, but that option comes +// with its own tradeoffs (refer to its documentation). // // It is recommended to always commit records in order (per partition). If you // call this function twice with record for partition 0 at offset 999 @@ -2144,7 +2167,9 @@ func (cl *Client) MarkCommitRecords(rs ...*Record) { // group rebalance occurs before or while this function is being executed. You // can avoid this scenario by calling CommitRecords in a custom OnRevoked, but // for most workloads, a small bit of potential duplicate processing is fine. -// See the documentation on DisableAutoCommit for more details. +// See the documentation on DisableAutoCommit for more details. You can also +// avoid this problem by using BlockRebalanceOnCommit, but that option comes +// with its own tradeoffs (refer to its documentation). // // The recommended pattern for using this function is to have a poll / process // / commit loop. First PollFetches, then process every record, then call