Skip to content

Commit

Permalink
consumer: add BlockRebalancesOnPoll option, AllowRebalances
Browse files Browse the repository at this point in the history
One strength of Sarama's consumer group is that it is easier to reason
about where record processing logic falls within the group management
lifecycle: if you are processing records, you can be sure that the group
will not rebalance underneath you. This is also a risk, though, if your
processing logic is so long that your group member is booted from the
group.

This client took the opposite approach of separating the group
management logic from the consuming logic. This essentially eliminated
the risk of long processing booting the member, but made it much more
difficult to reason about when to commit. There are plenty of
disclaimers about potential duplicates, and any non-transactional
consumer should expect duplicates at some point, but if a user wants to
opt for the simpler approach of consuming & processing within one group
generation, we should support that. In fact, if a user's processing loop
is very fast, we really should encourage it.

Helps #137.
  • Loading branch information
twmb committed Mar 1, 2022
1 parent f6f25a5 commit 0c8c1a6
Show file tree
Hide file tree
Showing 3 changed files with 190 additions and 45 deletions.
34 changes: 34 additions & 0 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ type cfg struct {

adjustOffsetsBeforeAssign func(ctx context.Context, offsets map[string]map[int32]Offset) (map[string]map[int32]Offset, error)

blockRebalanceOnPoll bool

setAssigned bool
setRevoked bool
setLost bool
Expand Down Expand Up @@ -1383,6 +1385,38 @@ func RequireStableFetchOffsets() GroupOpt {
return groupOpt{func(cfg *cfg) { cfg.requireStable = true }}
}

// BlockRebalanceOnPoll switches the client to block rebalances whenever you
// poll until you explicitly call AllowRebalance.
//
// By default, a consumer group is managed completely independently of
// consuming. A rebalance may occur at any moment. If you poll records, and
// then a rebalance happens, and then you commit, you may be committing to
// partitions you no longer own. This will result in duplicates. In the worst
// case, you could rewind commits that a different member has already made
// (risking duplicates if another rebalance were to happen before that other
// member commits again).
//
// By blocking rebalancing after you poll until you call AllowRebalances, you
// can be sure that you commit records that your member currently owns.
// However, the big tradeoff is that by blocking rebalances, you put your group
// member at risk of waiting so long that the group member is kicked from the
// group because it exceeded the rebalance timeout. To compare clients, Sarama
// takes the default choice of blocking rebalancing; this option makes kgo more
// similar to Sarama.
//
// If you use this option, you should ensure that you always process records
// quickly. It is recommended you also use PollRecords rather than PollFetches
// so that you can bound how many records you process at once. You must always
// AllowRebalances when you are done processing the records you received. Only
// rebalances that lose partitions are blocked; rebalances that are strictly
// net additions or non-modifications do not block.
//
// This function can largely replace any commit logic you may want to do in
// OnPartitionsRevoked.
func BlockRebalanceOnPoll() GroupOpt {
return groupOpt{func(cfg *cfg) { cfg.blockRebalanceOnPoll = true }}
}

// AdjustFetchOffsetsFn sets the function to be called when a group is joined
// after offsets are fetched for those partitions so that a user can adjust them
// before consumption begins.
Expand Down
134 changes: 113 additions & 21 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,59 @@ type consumer struct {
sourcesReadyCond *sync.Cond
sourcesReadyForDraining []*source
fakeReadyForDraining []Fetch

pollWaitMu sync.Mutex
pollWaitC *sync.Cond
pollWaitState uint64 // 0 == nothing, low 32 bits: # pollers, high 32: # waiting rebalances
}

func (c *consumer) loadPaused() pausedTopics { return c.paused.Load().(pausedTopics) }
func (c *consumer) clonePaused() pausedTopics { return c.paused.Load().(pausedTopics).clone() }
func (c *consumer) storePaused(p pausedTopics) { c.paused.Store(p) }

func (c *consumer) waitAndAddPoller() {
c.pollWaitMu.Lock()
defer c.pollWaitMu.Unlock()
for c.pollWaitState>>32 != 0 {
c.pollWaitC.Wait()
}
// Rebalance always takes priority, but if there are no active
// rebalances, our poll blocks rebalances.
c.pollWaitState++
}

func (c *consumer) unaddPoller() {
c.pollWaitMu.Lock()
defer c.pollWaitMu.Unlock()
c.pollWaitState--
c.pollWaitC.Broadcast()
}

func (c *consumer) allowRebalance() {
c.pollWaitMu.Lock()
defer c.pollWaitMu.Unlock()
// When allowing rebalances, the user is explicitly saying all pollers
// are done. We mask them out.
c.pollWaitState &= math.MaxUint32 << 32
c.pollWaitC.Broadcast()
}

func (c *consumer) waitAndAddRebalance() {
c.pollWaitMu.Lock()
defer c.pollWaitMu.Unlock()
c.pollWaitState += 1 << 32
for c.pollWaitState&math.MaxUint32 != 0 {
c.pollWaitC.Wait()
}
}

func (c *consumer) unaddRebalance() {
c.pollWaitMu.Lock()
defer c.pollWaitMu.Unlock()
c.pollWaitState -= 1 << 32
c.pollWaitC.Broadcast()
}

// BufferedFetchRecords returns the number of records currently buffered from
// fetching within the client.
//
Expand All @@ -198,6 +245,7 @@ func (c *consumer) init(cl *Client) {
c.cl = cl
c.paused.Store(make(pausedTopics))
c.sourcesReadyCond = sync.NewCond(&c.sourcesReadyMu)
c.pollWaitC = sync.NewCond(&c.pollWaitMu)

if len(cl.cfg.topics) == 0 && len(cl.cfg.partitions) == 0 {
return // not consuming
Expand Down Expand Up @@ -240,6 +288,14 @@ func (c *consumer) addFakeReadyForDraining(topic string, partition int32, err er
c.sourcesReadyCond.Broadcast()
}

func (c *consumer) pollWait(fn func()) {
if c.cl.cfg.blockRebalanceOnPoll {
c.waitAndAddRebalance()
defer c.unaddRebalance()
}
fn()
}

// PollFetches waits for fetches to be available, returning as soon as any
// broker returns a fetch. If the context quits, this function quits. If the
// context is nil or is already canceled, this function will return immediately
Expand All @@ -253,6 +309,12 @@ func (c *consumer) addFakeReadyForDraining(topic string, partition int32, err er
// has no topic, a partition of 0, and a partition error of ErrClientClosed.
// This can be used to detect if the client is closing and to break out of a
// poll loop.
//
// If you are group consuming, a rebalance can happen under the hood while you
// process the returned fetches. This can result in duplicate work, and you may
// accidentally commit to partitions that you no longer own. You can prevent
// this by using BlockRebalanceOnPoll, but this comes with different tradeoffs.
// See the documentation on BlockRebalanceOnPoll for more information.
func (cl *Client) PollFetches(ctx context.Context) Fetches {
return cl.PollRecords(ctx, 0)
}
Expand All @@ -273,6 +335,12 @@ func (cl *Client) PollFetches(ctx context.Context) Fetches {
// has no topic, a partition of 0, and a partition error of ErrClientClosed.
// This can be used to detect if the client is closing and to break out of a
// poll loop.
//
// If you are group consuming, a rebalance can happen under the hood while you
// process the returned fetches. This can result in duplicate work, and you may
// accidentally commit to partitions that you no longer own. You can prevent
// this by using BlockRebalanceOnPoll, but this comes with different tradeoffs.
// See the documentation on BlockRebalanceOnPoll for more information.
func (cl *Client) PollRecords(ctx context.Context, maxPollRecords int) Fetches {
if maxPollRecords == 0 {
maxPollRecords = -1
Expand All @@ -283,6 +351,15 @@ func (cl *Client) PollRecords(ctx context.Context, maxPollRecords int) Fetches {

var fetches Fetches
fill := func() {
if c.cl.cfg.blockRebalanceOnPoll {
c.waitAndAddPoller()
defer func() {
if len(fetches) == 0 {
c.unaddPoller()
}
}()
}

// A group can grab the consumer lock then the group mu and
// assign partitions. The group mu is grabbed to update its
// uncommitted map. Assigning partitions clears sources ready
Expand Down Expand Up @@ -387,6 +464,19 @@ func (cl *Client) PollRecords(ctx context.Context, maxPollRecords int) Fetches {
return fetches
}

// AllowRebalance allows a consumer group to rebalance if it was blocked by you
// polling records in tandem with the BlockRebalanceOnPoll option.
//
// You can poll many times before calling this function; this function
// internally resets the poll count and allows any blocked rebalances to
// continue. Rebalances take priority: if a rebalance is blocked, and you allow
// rebalances and then immediately poll, your poll will be blocked until the
// rebalance completes. Internally, this function simply waits for lost
// partitions to stop being fetched before allowing you to poll again.
func (cl *Client) AllowRebalance() {
cl.consumer.allowRebalance()
}

// PauseFetchTopics sets the client to no longer fetch the given topics and
// returns all currently paused topics. Paused topics persist until resumed.
// You can call this function with no topics to simply receive the list of
Expand Down Expand Up @@ -546,29 +636,31 @@ func (c *consumer) purgeTopics(topics []string) {
purgeAssignments[topic] = nil
}

c.mu.Lock()
defer c.mu.Unlock()
c.pollWait(func() {
c.mu.Lock()
defer c.mu.Unlock()

// The difference for groups is we need to lock the group and there is
// a slight type difference in g.using vs d.using.
if c.g != nil {
c.g.mu.Lock()
defer c.g.mu.Unlock()
c.g.tps.purgeTopics(topics)
c.assignPartitions(purgeAssignments, assignPurgeMatching, c.g.tps, fmt.Sprintf("purge of %v requested", topics))
for _, topic := range topics {
delete(c.g.using, topic)
delete(c.g.reSeen, topic)
}
c.g.rejoin("rejoin from PurgeFetchTopics")
} else {
c.d.tps.purgeTopics(topics)
c.assignPartitions(purgeAssignments, assignPurgeMatching, c.d.tps, fmt.Sprintf("purge of %v requested", topics))
for _, topic := range topics {
delete(c.d.using, topic)
delete(c.d.reSeen, topic)
// The difference for groups is we need to lock the group and there is
// a slight type difference in g.using vs d.using.
if c.g != nil {
c.g.mu.Lock()
defer c.g.mu.Unlock()
c.g.tps.purgeTopics(topics)
c.assignPartitions(purgeAssignments, assignPurgeMatching, c.g.tps, fmt.Sprintf("purge of %v requested", topics))
for _, topic := range topics {
delete(c.g.using, topic)
delete(c.g.reSeen, topic)
}
c.g.rejoin("rejoin from PurgeFetchTopics")
} else {
c.d.tps.purgeTopics(topics)
c.assignPartitions(purgeAssignments, assignPurgeMatching, c.d.tps, fmt.Sprintf("purge of %v requested", topics))
for _, topic := range topics {
delete(c.d.using, topic)
delete(c.d.reSeen, topic)
}
}
}
})
}

// AddConsumeTopics adds new topics to be consumed. This function is a no-op if
Expand Down
67 changes: 43 additions & 24 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,13 @@ func (cl *Client) LeaveGroup() {
return
}

c.mu.Lock() // lock for assign
c.assignPartitions(nil, assignInvalidateAll, noTopicsPartitions, "invalidating all assignments in LeaveGroup")
wait := c.g.leave()
c.mu.Unlock()

var wait func()
c.pollWait(func() {
c.mu.Lock() // lock for assign
defer c.mu.Unlock()
c.assignPartitions(nil, assignInvalidateAll, noTopicsPartitions, "invalidating all assignments in LeaveGroup")
wait = c.g.leave()
})
wait() // wait after we unlock
}

Expand Down Expand Up @@ -311,14 +313,23 @@ func (g *groupConsumer) manage() {
hook()
}

// We need to invalidate everything from an error return.
// If we are eager, we should have invalidated everything
// before getting here, but we do so doubly just in case.
//
// If we are cooperative, the join and sync could have failed
// during the cooperative rebalance where we were still
// consuming. We need to invalidate everything. Waiting to
// resume from poll is necessary, but the user will likely be
// unable to commit.
{
g.c.mu.Lock()
g.c.assignPartitions(nil, assignInvalidateAll, nil, "clearing assignment at end of group management session")
g.mu.Lock() // before allowing poll to touch uncommitted, lock the group
g.c.mu.Unlock() // now part of poll can continue
g.uncommitted = nil
g.mu.Unlock()
g.c.pollWait(func() {
g.c.mu.Lock()
g.c.assignPartitions(nil, assignInvalidateAll, nil, "clearing assignment at end of group management session")
g.mu.Lock() // before allowing poll to touch uncommitted, lock the group
g.c.mu.Unlock() // now part of poll can continue
g.uncommitted = nil
g.mu.Unlock()
})

g.nowAssigned.store(nil)
g.lastAssigned = nil
Expand Down Expand Up @@ -483,13 +494,15 @@ func (g *groupConsumer) revoke(stage revokeStage, lost map[string][]int32, leavi
if !g.cooperative || leaving { // stage == revokeThisSession if not cooperative
// If we are an eager consumer, we stop fetching all of our
// current partitions as we will be revoking them.
g.c.mu.Lock()
if leaving {
g.c.assignPartitions(nil, assignInvalidateAll, nil, "revoking all assignments because we are leaving the group")
} else {
g.c.assignPartitions(nil, assignInvalidateAll, nil, "revoking all assignments because we are not cooperative")
}
g.c.mu.Unlock()
g.c.pollWait(func() {
g.c.mu.Lock()
defer g.c.mu.Unlock()
if leaving {
g.c.assignPartitions(nil, assignInvalidateAll, nil, "revoking all assignments because we are leaving the group")
} else {
g.c.assignPartitions(nil, assignInvalidateAll, nil, "revoking all assignments because we are not cooperative")
}
})

if !g.cooperative {
g.cfg.logger.Log(LogLevelInfo, "eager consumer revoking prior assigned partitions", "group", g.cfg.group, "revoking", g.nowAssigned.read())
Expand Down Expand Up @@ -562,9 +575,11 @@ func (g *groupConsumer) revoke(stage revokeStage, lost map[string][]int32, leavi
// for the final polled offsets. We do not want to allow the
// logical race of allowing fetches for revoked partitions
// after a revoke but before an invalidation.
g.c.mu.Lock()
g.c.assignPartitions(lostOffsets, assignInvalidateMatching, g.tps, "revoking assignments from cooperative consuming")
g.c.mu.Unlock()
g.c.pollWait(func() {
g.c.mu.Lock()
defer g.c.mu.Unlock()
g.c.assignPartitions(lostOffsets, assignInvalidateMatching, g.tps, "revoking assignments from cooperative consuming")
})
}

if len(lost) > 0 || stage == revokeThisSession {
Expand Down Expand Up @@ -2020,7 +2035,9 @@ func (g *groupConsumer) getUncommittedLocked(head, dirty bool) map[string]map[in
// group rebalance occurs before or while this function is being executed. You
// can avoid this scenario by calling CommitRecords in a custom OnRevoked, but
// for most workloads, a small bit of potential duplicate processing is fine.
// See the documentation on DisableAutoCommit for more details.
// See the documentation on DisableAutoCommit for more details. You can also
// avoid this problem by using BlockRebalanceOnCommit, but that option comes
// with its own tradeoffs (refer to its documentation).
//
// It is recommended to always commit records in order (per partition). If you
// call this function twice with record for partition 0 at offset 999
Expand Down Expand Up @@ -2144,7 +2161,9 @@ func (cl *Client) MarkCommitRecords(rs ...*Record) {
// group rebalance occurs before or while this function is being executed. You
// can avoid this scenario by calling CommitRecords in a custom OnRevoked, but
// for most workloads, a small bit of potential duplicate processing is fine.
// See the documentation on DisableAutoCommit for more details.
// See the documentation on DisableAutoCommit for more details. You can also
// avoid this problem by using BlockRebalanceOnCommit, but that option comes
// with its own tradeoffs (refer to its documentation).
//
// The recommended pattern for using this function is to have a poll / process
// / commit loop. First PollFetches, then process every record, then call
Expand Down

0 comments on commit 0c8c1a6

Please sign in to comment.