Skip to content

Commit

Permalink
group: fix blocking commit on leave; potential deadlock
Browse files Browse the repository at this point in the history
As it turns out, we were already performing a blocking commit on leave
with the defaultRevoke, but we had a bug that made it never issue:
committing always died if the group's context was canceled, which is the
first thing we did when leaving the group so that we could break out of
the heartbeat loop.

We take a multi pronged fix to this.

First, we now do not kill commits if the group context is closed. This
is a bit odd, but the group context is only closed to break out of the
manage loop, it does not mean that the member has been booted.

Second, we specifically opt in to the revoke function if the
group-manage loop quits with context.Canceled _and_ if the group is
cooperative. There is a big comment detailing why.

Third, we now no longer default onLost to the onRevoked. It's not right
to call revoke on fatal errors, because fatal errors mean it is no
longer possible to commit.

Fourth, we cannot go through the client's BlockingCommitOffsets function
when leaving a group and using the default revoke, because the first
thing that happens when leaving a group is to unset the group consumer,
so the client level BlockingCommitOffsets will not be able to commit. We
now have a dedicated blockingCommitOffsets method on the group consumer;
this is used for default revokes. We also explicitly document that if a
user has disabled autocommitting, they must manually commit before
leaving the group.

Fifth, if a commit contained no offsets to commit, then we could
deadlock, because all calls to `commit` assumed that it was
non-blocking, when a no-offset commit would immediately call onDone. We
now call that in a goroutine.

---

Because of the above, we actually do not need
DisableBlockingCommitOnLeave, because that actually is our default
behavior already with autocommitting enabled.

---

As a few other small included updates, we
- have a small patch of the prior commit
- update docs that mentioned the now removed option
- now use commitCallback in defaultRevoke
  • Loading branch information
twmb committed May 12, 2021
1 parent 3565f3b commit 46cfcb7
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 70 deletions.
7 changes: 2 additions & 5 deletions docs/producing-and-consuming.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,9 @@ First and the most recommended option, you can just rely on the default
autocommiting behavior and the default blocking commit on leave. At most, you
may want to use your own custom commit callback.

Alternatively, you can either disable just the blocking commit on leave with
[`DisableBlockingCommitOnLeave`][18] or disable autocommitting entirely with
[`DisableAutoCommit`][19]. With either of these options, you will need a custom
`OnRevoked`.
Alternatively, you can disable autocommitting with [`DisableAutoCommit`][19]
and instead use a custom `OnRevoked`.

[18]: https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#DisableBlockingCommitOnLeave
[19]: https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#DisableAutoCommit

In your custom revoke, you can guard a revoked variable with a mutex. Before
Expand Down
3 changes: 3 additions & 0 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,9 @@ func (cl *Client) updateBrokers(brokers []kmsg.MetadataResponseBroker) {
}

// Close leaves any group and closes all connections and goroutines.
//
// If you are group consuming and have overridden the default OnRevoked, you
// must manually commit offsets before closing the client.
func (cl *Client) Close() {
// First, kill the consumer. This waits for the consumer to unset
// gracefully, ensuring we leave groups properly, and then stores the
Expand Down
101 changes: 54 additions & 47 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,15 +215,6 @@ func GroupProtocol(protocol string) GroupOpt {
return groupOpt{func(cfg *groupConsumer) { cfg.protocol = protocol }}
}

// DisableBlockingCommitOnLeave disables issuing a blocking commit when leaving
// a group. If autocommitting is disabled, this is automatically disabled.
//
// This can be used if you want to have quick shutdowns and do not care about
// committing the latest offsets.
func DisableBlockingCommitOnLeave() GroupOpt {
return groupOpt{func(cfg *groupConsumer) { cfg.noCommitOnLeave = true }}
}

// CommitCallback sets the callback to use if autocommitting is enabled. This
// overrides the default callback that logs errors and continues.
func CommitCallback(fn func(*kmsg.OffsetCommitRequest, *kmsg.OffsetCommitResponse, error)) GroupOpt {
Expand Down Expand Up @@ -260,7 +251,6 @@ type groupConsumer struct {

autocommitDisable bool // true if autocommit was disabled or we are transactional
autocommitInterval time.Duration
noCommitOnLeave bool
commitCallback func(*kmsg.OffsetCommitRequest, *kmsg.OffsetCommitResponse, error)

///////////////////////
Expand Down Expand Up @@ -364,6 +354,9 @@ type groupConsumer struct {
// also leaves a group, so this is only necessary to call if you plan to leave
// the group and continue using the client.
//
// If you have overridden the default revoke, you must manually commit offsets
// before leaving the group.
//
// If you have configured the group with an InstanceID, this does not leave the
// group. With instance IDs, it is expected that clients will restart and
// re-use the same instance ID. To leave a group using an instance ID, you must
Expand Down Expand Up @@ -428,7 +421,10 @@ func (cl *Client) AssignGroup(group string, opts ...GroupOpt) {
}
if c.cl.cfg.txnID == nil {
g.onRevoked = g.defaultRevoke
g.onLost = g.defaultRevoke
// We do not want to commit in onLost, so we explicitly set
// onLost to an empty function to avoid the fallback to
// onRevoked.
g.onLost = func(context.Context, map[string][]int32) {}
} else {
g.autocommitDisable = true
}
Expand Down Expand Up @@ -490,9 +486,27 @@ func (g *groupConsumer) manage() {
continue
}

if g.onLost != nil {
if g.cooperative && err == context.Canceled && g.onRevoked != nil {
// The cooperative consumer does not revoke everything
// while rebalancing, meaning if our context is
// canceled, we may have uncommitted data. Rather than
// diving into OnLost, we should go into OnRevoked,
// because for the most part, a context cancelation
// means we are leaving the group. Going into OnRevoked
// gives us an opportunity to commit outstanding
// offsets. For the eager consumer, since we always
// revoke before exiting the heartbeat loop, we do not
// really care to differentiate and can fall into
// OnLost just fine.
g.onRevoked(g.ctx, g.nowAssigned)

} else if g.onLost != nil {
// Any other error is perceived as a fatal error,
// and we go into OnLost as appropriate.
g.onLost(g.ctx, g.nowAssigned)

} else if g.onRevoked != nil {
// If OnLost is not specified, we fallback to OnRevoked.
g.onRevoked(g.ctx, g.nowAssigned)
}

Expand Down Expand Up @@ -549,12 +563,7 @@ func (g *groupConsumer) leave() (wait func()) {
go func() {
defer close(done)

if !(g.noCommitOnLeave || g.autocommitDisable) {
g.cl.cfg.logger.Log(LogLevelInfo, "issuing blocking commit due to leaving group")
g.cl.BlockingCommitOffsets(g.ctx, g.cl.UncommittedOffsets(), g.commitCallback)
}

g.cancel() // make sure to cancel **after** the blocking commit
g.cancel()

if wasManaging {
// We want to wait for the manage goroutine to be done
Expand Down Expand Up @@ -1681,6 +1690,11 @@ func (g *groupConsumer) loopCommit() {
return
}

// We use the group context for the default autocommit; revokes
// use the client context so that we can be sure we commit even
// after the group context is canceled (which is the first
// thing that happens so as to quit the manage loop before
// leaving a group).
g.mu.Lock()
if !g.blockAuto {
g.cl.cfg.logger.Log(LogLevelDebug, "autocommitting")
Expand Down Expand Up @@ -1874,27 +1888,36 @@ func (cl *Client) BlockingCommitOffsets(
uncommitted map[string]map[int32]EpochOffset,
onDone func(*kmsg.OffsetCommitRequest, *kmsg.OffsetCommitResponse, error),
) {
cl.cfg.logger.Log(LogLevelDebug, "in BlockingCommitOffsets", "with", uncommitted)
defer cl.cfg.logger.Log(LogLevelDebug, "left BlockingCommitOffsets")

done := make(chan struct{})
defer func() { <-done }()

if onDone == nil {
onDone = func(_ *kmsg.OffsetCommitRequest, _ *kmsg.OffsetCommitResponse, _ error) {}
}

g, ok := cl.consumer.loadGroup()
if !ok {
onDone(new(kmsg.OffsetCommitRequest), new(kmsg.OffsetCommitResponse), errNotGroup)
close(done)
return
}
if len(uncommitted) == 0 {
onDone(new(kmsg.OffsetCommitRequest), new(kmsg.OffsetCommitResponse), nil)
close(done)
return
}
g.blockingCommitOffsets(ctx, uncommitted, onDone)
}

func (g *groupConsumer) blockingCommitOffsets(
ctx context.Context,
uncommitted map[string]map[int32]EpochOffset,
onDone func(*kmsg.OffsetCommitRequest, *kmsg.OffsetCommitResponse, error),
) {
done := make(chan struct{})
defer func() { <-done }()

g.cl.cfg.logger.Log(LogLevelDebug, "in BlockingCommitOffsets", "with", uncommitted)
defer g.cl.cfg.logger.Log(LogLevelDebug, "left BlockingCommitOffsets")

if onDone == nil {
onDone = func(_ *kmsg.OffsetCommitRequest, _ *kmsg.OffsetCommitResponse, _ error) {}
}

g.blockingCommitMu.Lock() // block all other concurrent commits until our OnDone is done.

Expand Down Expand Up @@ -2005,28 +2028,10 @@ func (cl *Client) CommitOffsets(
// before revoking, meaning this truly will commit all polled fetches.
func (g *groupConsumer) defaultRevoke(_ context.Context, _ map[string][]int32) {
if !g.autocommitDisable {
un := g.getUncommitted()
// We use the client's context rather than the group context,
// because this could come from the group being left. The group
// context will already be canceled.
g.cl.BlockingCommitOffsets(g.cl.ctx, un, func(_ *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) {
if err != nil {
if err != errNotGroup && err != context.Canceled {
g.cl.cfg.logger.Log(LogLevelError, "default revoke BlockingCommitOffsets failed", "err", err)
}
return
}
for _, topic := range resp.Topics {
for _, partition := range topic.Partitions {
if err := kerr.ErrorForCode(partition.ErrorCode); err != nil {
g.cl.cfg.logger.Log(LogLevelError, "in revoke: unable to commit offsets for topic partition",
"topic", topic.Topic,
"partition", partition.Partition,
"error", err)
}
}
}
})
g.blockingCommitOffsets(g.cl.ctx, g.getUncommitted(), g.commitCallback)
}
}

Expand All @@ -2042,14 +2047,16 @@ func (g *groupConsumer) commit(
onDone = func(_ *kmsg.OffsetCommitRequest, _ *kmsg.OffsetCommitResponse, _ error) {}
}
if len(uncommitted) == 0 { // only empty if called thru autocommit / default revoke
onDone(new(kmsg.OffsetCommitRequest), new(kmsg.OffsetCommitResponse), nil)
// We have to do this concurrently because the expectation is
// that commit itself does not block.
go onDone(new(kmsg.OffsetCommitRequest), new(kmsg.OffsetCommitResponse), nil)
return
}

priorCancel := g.commitCancel
priorDone := g.commitDone

commitCtx, commitCancel := context.WithCancel(g.ctx) // enable ours to be canceled and waited for
commitCtx, commitCancel := context.WithCancel(ctx) // enable ours to be canceled and waited for
commitDone := make(chan struct{})

g.commitCancel = commitCancel
Expand Down
2 changes: 1 addition & 1 deletion pkg/kgo/group_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ func (g *groupConsumer) findBalancer(from, proto string) (GroupBalancer, error)
for _, b := range g.balancers {
ours = append(ours, b.ProtocolName())
}
g.cl.cfg.logger.Log(LogLevelError, fmt.Sprintf("%s could not find Kafka-chosen balancer", from), "err", err, "kafka_choice", proto, "our_set", strings.Join(ours, ", "))
g.cl.cfg.logger.Log(LogLevelError, fmt.Sprintf("%s could not find Kafka-chosen balancer", from), "kafka_choice", proto, "our_set", strings.Join(ours, ", "))
return nil, fmt.Errorf("unable to balance: none of our balancers have a name equal to the balancer chosen for balancing (%s)", proto)
}

Expand Down
47 changes: 30 additions & 17 deletions pkg/kgo/group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,31 +118,44 @@ func (c *testConsumer) etl(etlsBeforeQuit int) {
cl, _ := NewClient(WithLogger(testLogger()))
defer cl.Close()

cl.AssignGroup(c.group,
opts := []GroupOpt{
GroupTopics(c.consumeFrom),
Balancers(c.balancer))
Balancers(c.balancer),
}

defer func() {
defer cl.AssignGroup("")
if etlsBeforeQuit >= 0 {

// If we quit before consuming to the end, the behavior we are
// triggering is to poll a batch and _not_ commit. Thus, if
// we have etlsBeforeQuit, we do _not_ commit on leave.
// triggering is to poll a batch and _not_ commit. Thus, if we
// have etlsBeforeQuit, we do _not_ commit on leave, and so we
// disable autocommitting.
//
// However, we still want to flush to avoid an unnecessary
// dead broker errors for unfinished produces.
// However, we still want to commit on valid rebalances, so we
// set that option, BUT we do not want to commit on lost, which
// triggers when we leave,when want to flush to avoid an unnecessary dead
// broker errors for unfinished produces.

opts = append(opts,
DisableAutoCommit(),
OnRevoked(func(ctx context.Context, _ map[string][]int32) {
cl.BlockingCommitOffsets(
ctx,
cl.UncommittedOffsets(),
nil,
)
}),
OnLost(func(context.Context, map[string][]int32) {}),
)
}

cl.AssignGroup(c.group, opts...)

defer func() {
defer cl.LeaveGroup()

if err := cl.Flush(context.Background()); err != nil {
c.errCh <- fmt.Errorf("unable to flush: %v", err)
}

if etlsBeforeQuit >= 0 {
return
}
cl.BlockingCommitOffsets(
context.Background(),
cl.UncommittedOffsets(),
nil,
)
}()

netls := 0 // for if etlsBeforeQuit is non-negative
Expand Down

0 comments on commit 46cfcb7

Please sign in to comment.