Skip to content

Commit

Permalink
kgo: fix race between client closing and purging
Browse files Browse the repository at this point in the history
Purging topics can actually restart consuming.
This can race with killing fetch sessions while closing.
Purging internally calls assignPartitions with the consumer topics.
In general, we need to prevent anything resuming consuming no matter
what, so we add a bool field to the client that forcibly prevents
consuming.
  • Loading branch information
twmb committed Jul 10, 2023
1 parent ee392e1 commit 8c785fa
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 12 deletions.
11 changes: 6 additions & 5 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -947,18 +947,19 @@ func (cl *Client) Close() {
})

c := &cl.consumer
c.kill.Store(true)
if c.g != nil {
cl.LeaveGroup()
} else if c.d != nil {
c.mu.Lock() // lock for assign
c.assignPartitions(nil, assignInvalidateAll, noTopicsPartitions, "") // we do not use a log message when not in a group
c.mu.Lock() // lock for assign
c.assignPartitions(nil, assignInvalidateAll, nil, "") // we do not use a log message when not in a group
c.mu.Unlock()
}

// After the above, consumers cannot consume anymore. LeaveGroup
// internally assigns noTopicsPartitions, which uses noConsumerSession,
// which prevents loopFetch from starting. Assigning also waits for the
// prior session to be complete, meaning loopFetch cannot be running.
// internally assigns nil, which uses noConsumerSession, which prevents
// loopFetch from starting. Assigning also waits for the prior session
// to be complete, meaning loopFetch cannot be running.

sessCloseCtx, sessCloseCancel := context.WithTimeout(cl.ctx, time.Second)
var wg sync.WaitGroup
Expand Down
17 changes: 13 additions & 4 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ type consumer struct {
sessionChangeMu sync.Mutex

session atomic.Value // *consumerSession
kill atomic.Bool

usingCursors usedCursors

Expand Down Expand Up @@ -927,8 +928,13 @@ func (f fmtAssignment) String() string {
return sb.String()
}

// assignPartitions, called under the consumer's mu, is used to set new
// cursors or add to the existing cursors.
// assignPartitions, called under the consumer's mu, is used to set new cursors
// or add to the existing cursors.
//
// We do not need to pass tps when we are bumping the session or when we are
// invalidating all. All other cases, we want the tps -- the logic below does
// not fully differentiate needing to start a new session vs. just reusing the
// old (third if case below)
func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how assignHow, tps *topicsPartitions, why string) {
if c.mu.TryLock() {
c.mu.Unlock()
Expand Down Expand Up @@ -1058,7 +1064,7 @@ func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how

// This assignment could contain nothing (for the purposes of
// invalidating active fetches), so we only do this if needed.
if len(assignments) == 0 || how == assignInvalidateMatching || how == assignPurgeMatching || how == assignSetMatching || how == assignBumpSession {
if len(assignments) == 0 || how != assignWithoutInvalidating {
return
}

Expand Down Expand Up @@ -1565,7 +1571,7 @@ func (c *consumer) stopSession() (listOrEpochLoads, *topicsPartitions) {
session := c.loadSession()

if session == noConsumerSession {
return listOrEpochLoads{}, noTopicsPartitions // we had no session
return listOrEpochLoads{}, nil // we had no session
}

// Before storing noConsumerSession, cancel our old. This pairs
Expand Down Expand Up @@ -1621,6 +1627,9 @@ func (c *consumer) stopSession() (listOrEpochLoads, *topicsPartitions) {
// 1 worker allows for initialization work to prevent the session from being
// immediately stopped.
func (c *consumer) startNewSession(tps *topicsPartitions) *consumerSession {
if c.kill.Load() {
tps = nil
}
session := c.newConsumerSession(tps)
c.session.Store(session)

Expand Down
2 changes: 1 addition & 1 deletion pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (cl *Client) LeaveGroup() {

c.waitAndAddRebalance()
c.mu.Lock() // lock for assign
c.assignPartitions(nil, assignInvalidateAll, noTopicsPartitions, "invalidating all assignments in LeaveGroup")
c.assignPartitions(nil, assignInvalidateAll, nil, "invalidating all assignments in LeaveGroup")
wait := c.g.leave()
c.mu.Unlock()
c.unaddRebalance()
Expand Down
2 changes: 0 additions & 2 deletions pkg/kgo/topics_and_partitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,6 @@ type topicPartitions struct {

func (t *topicPartitions) load() *topicPartitionsData { return t.v.Load().(*topicPartitionsData) }

var noTopicsPartitions = newTopicsPartitions()

func newTopicsPartitions() *topicsPartitions {
var t topicsPartitions
t.v.Store(make(topicsPartitionsData))
Expand Down

0 comments on commit 8c785fa

Please sign in to comment.