From 3191842a81033342e8d37a529bd0a1b3d190fd9f Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Wed, 31 Aug 2022 21:51:18 -0600 Subject: [PATCH] consumer: bugfix fetch concurrency loop This loop has been a bit problematic... The desire fetch channel needed to be unbuffered. By being buffered, we could race towards a deadlock * source sends to desireFetchCh, is buffered * source seeds context canceled, tries sending to cancelFetchCh * session concurrently sees context canceled * session has not drained desireFetchCh, sees activeFetches is 0 * session exits * source permanently hangs sending to desireFetchCh We also theoretically could have made activeFetches negative if the cancel was seen before the desire. By forcing the desire channel to be unbuffered, we ensure that the desire is tracked properly in the wantFetch slice. The logic here could be cleaned up a little bit -- particularly, we could return earlier if active fetches is zero, rather than continue spinning through desired fetches and starting them only for them to die -- but this is a small optimization that would save only a bit of CPU. I'm happy to keep the logic as is for now and focus on the bugfix. Closes #198. --- pkg/kgo/consumer.go | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/pkg/kgo/consumer.go b/pkg/kgo/consumer.go index ea08dad2..3a97d755 100644 --- a/pkg/kgo/consumer.go +++ b/pkg/kgo/consumer.go @@ -1253,7 +1253,24 @@ func (c *consumer) newConsumerSession(tps *topicsPartitions) *consumerSession { tps: tps, - desireFetchCh: make(chan chan chan struct{}, 8), + // NOTE: This channel must be unbuffered. If it is buffered, + // then we can exit manageFetchConcurrency when we should not + // and have a deadlock: + // + // * source sends to desireFetchCh, is buffered + // * source seeds context canceled, tries sending to cancelFetchCh + // * session concurrently sees context canceled + // * session has not drained desireFetchCh, sees activeFetches is 0 + // * session exits + // * source permanently hangs sending to desireFetchCh + // + // By having desireFetchCh unbuffered, we *ensure* that if the + // source indicates it wants a fetch, the session knows it and + // tracks it in wantFetch. + // + // See #198. + desireFetchCh: make(chan chan chan struct{}), + cancelFetchCh: make(chan chan chan struct{}, 4), allowedFetches: c.cl.cfg.maxConcurrentFetches, }