diff --git a/pkg/kgo/consumer.go b/pkg/kgo/consumer.go index ea08dad2..3a97d755 100644 --- a/pkg/kgo/consumer.go +++ b/pkg/kgo/consumer.go @@ -1253,7 +1253,24 @@ func (c *consumer) newConsumerSession(tps *topicsPartitions) *consumerSession { tps: tps, - desireFetchCh: make(chan chan chan struct{}, 8), + // NOTE: This channel must be unbuffered. If it is buffered, + // then we can exit manageFetchConcurrency when we should not + // and have a deadlock: + // + // * source sends to desireFetchCh, is buffered + // * source seeds context canceled, tries sending to cancelFetchCh + // * session concurrently sees context canceled + // * session has not drained desireFetchCh, sees activeFetches is 0 + // * session exits + // * source permanently hangs sending to desireFetchCh + // + // By having desireFetchCh unbuffered, we *ensure* that if the + // source indicates it wants a fetch, the session knows it and + // tracks it in wantFetch. + // + // See #198. + desireFetchCh: make(chan chan chan struct{}), + cancelFetchCh: make(chan chan chan struct{}, 4), allowedFetches: c.cl.cfg.maxConcurrentFetches, }