Skip to content

Commit

Permalink
consumer: fix data race
Browse files Browse the repository at this point in the history
If two cursors independently had retriable errors from a fetch response,
they would both trigger the listOrEpoch logic (the other three triggers
are two with a stopped session, which avoids the scenario in this
commit, and a retry from listOrEpoch, which could also trigger this).

If the two failing cursors come from different sources (brokers), they
will likely trigger the listOrEpoch logic at different times and may
(depending on broker response timings) run handleListOrEpochResults
concurrently.

The concurrency could cause a race when both results try to modify
usingCursors at the same time.

To fix this, we just guard the entire handle function with
listOrEpochMu.

Closes #36.
  • Loading branch information
twmb committed May 10, 2021
1 parent bc6cc8a commit 1aaa1ef
Showing 1 changed file with 11 additions and 2 deletions.
13 changes: 11 additions & 2 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1093,15 +1093,24 @@ func (s *consumerSession) handleListOrEpochResults(loaded loadedOffsets) {
// returns when the retry limit is hit. We will backoff 1s and then
// allow RequestWith to continue requesting and backing off.
var reloads listOrEpochLoads

// In the defer below, we need to guard listOrEpochLoadsLoading, which
// can be modified with a concurent listOrEpoch.
//
// In the use closure below, we need to guard usingCursors to prevent
// it from a concurrent handleListOrEpochResults.
//
// So, we just guard this entire function, which executes quickly.
s.listOrEpochMu.Lock()
defer s.listOrEpochMu.Unlock()

defer func() {
// When we are done handling results, we have finished loading
// all the topics and partitions. We remove them from tracking
// in our session.
s.listOrEpochMu.Lock()
for _, load := range loaded.loaded {
s.listOrEpochLoadsLoading.removeLoad(load.topic, load.partition)
}
s.listOrEpochMu.Unlock()

if !reloads.isEmpty() {
s.incWorker()
Expand Down

0 comments on commit 1aaa1ef

Please sign in to comment.