Skip to content

Commit

Permalink
fetchOffsets: fix data race
Browse files Browse the repository at this point in the history
Using a named return here is normal, especially if we want to react to
what the return is in a deferred function.

Unfortunately, we use `err`. We set `err` in a goroutine, the goroutine
that is issuing our request.

If we return early due to a context cancelation, we return `ctx.Err()`,
which also kills our request that is actively being issued, which sets
`err`. Returning `ctx.Err()` also coincidentally sets err, meaning this
code is bugged by doing a concurrent write to the same return value.

Instead, we will use `err` only in the concurrent function, and will
name our return `rerr`.
  • Loading branch information
twmb committed Feb 15, 2022
1 parent 4156e9f commit b39ca31
Showing 1 changed file with 3 additions and 2 deletions.
5 changes: 3 additions & 2 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -1156,14 +1156,14 @@ func (g *groupConsumer) adjustCooperativeFetchOffsets(added, lost map[string][]i

// fetchOffsets is issued once we join a group to see what the prior commits
// were for the partitions we were assigned.
func (g *groupConsumer) fetchOffsets(ctx context.Context, added, lost map[string][]int32) (err error) {
func (g *groupConsumer) fetchOffsets(ctx context.Context, added, lost map[string][]int32) (rerr error) { // we must use "rerr"! see introducing commit
// If cooperative consuming, we may have to resume fetches. See the
// comment on adjustCooperativeFetchOffsets. If we successfully fetch,
// we clear what we were fetching.
if g.cooperative {
added = g.adjustCooperativeFetchOffsets(added, lost)
defer func() {
if err == nil {
if rerr == nil {
g.fetching = nil
}
}()
Expand All @@ -1184,6 +1184,7 @@ start:
}

var resp *kmsg.OffsetFetchResponse
var err error

fetchDone := make(chan struct{})
go func() {
Expand Down

0 comments on commit b39ca31

Please sign in to comment.