Skip to content

Commit

Permalink
consumer: bugfix fetch concurrency loop
Browse files Browse the repository at this point in the history
This loop has been a bit problematic...

The desire fetch channel needed to be unbuffered. By being buffered, we
could race towards a deadlock

* source sends to desireFetchCh, is buffered
* source seeds context canceled, tries sending to cancelFetchCh
* session concurrently sees context canceled
* session has not drained desireFetchCh, sees activeFetches is 0
* session exits
* source permanently hangs sending to desireFetchCh

We also theoretically could have made activeFetches negative if the
cancel was seen before the desire. By forcing the desire channel to be
unbuffered, we ensure that the desire is tracked properly in the
wantFetch slice.

The logic here could be cleaned up a little bit -- particularly, we
could return earlier if active fetches is zero, rather than continue
spinning through desired fetches and starting them only for them to die
-- but this is a small optimization that would save only a bit of CPU.
I'm happy to keep the logic as is for now and focus on the bugfix.

Closes #198.
  • Loading branch information
twmb committed Sep 1, 2022
1 parent 5f24fae commit 3191842
Showing 1 changed file with 18 additions and 1 deletion.
19 changes: 18 additions & 1 deletion pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1253,7 +1253,24 @@ func (c *consumer) newConsumerSession(tps *topicsPartitions) *consumerSession {

tps: tps,

desireFetchCh: make(chan chan chan struct{}, 8),
// NOTE: This channel must be unbuffered. If it is buffered,
// then we can exit manageFetchConcurrency when we should not
// and have a deadlock:
//
// * source sends to desireFetchCh, is buffered
// * source seeds context canceled, tries sending to cancelFetchCh
// * session concurrently sees context canceled
// * session has not drained desireFetchCh, sees activeFetches is 0
// * session exits
// * source permanently hangs sending to desireFetchCh
//
// By having desireFetchCh unbuffered, we *ensure* that if the
// source indicates it wants a fetch, the session knows it and
// tracks it in wantFetch.
//
// See #198.
desireFetchCh: make(chan chan chan struct{}),

cancelFetchCh: make(chan chan chan struct{}, 4),
allowedFetches: c.cl.cfg.maxConcurrentFetches,
}
Expand Down

0 comments on commit 3191842

Please sign in to comment.