From b39ca311e5c1775250e981bbf28f0e06084f2eff Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Mon, 14 Feb 2022 20:55:16 -0700 Subject: [PATCH] fetchOffsets: fix data race Using a named return here is normal, especially if we want to react to what the return is in a deferred function. Unfortunately, we use `err`. We set `err` in a goroutine, the goroutine that is issuing our request. If we return early due to a context cancelation, we return `ctx.Err()`, which also kills our request that is actively being issued, which sets `err`. Returning `ctx.Err()` also coincidentally sets err, meaning this code is bugged by doing a concurrent write to the same return value. Instead, we will use `err` only in the concurrent function, and will name our return `rerr`. --- pkg/kgo/consumer_group.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/kgo/consumer_group.go b/pkg/kgo/consumer_group.go index 850d33ac..7741649f 100644 --- a/pkg/kgo/consumer_group.go +++ b/pkg/kgo/consumer_group.go @@ -1156,14 +1156,14 @@ func (g *groupConsumer) adjustCooperativeFetchOffsets(added, lost map[string][]i // fetchOffsets is issued once we join a group to see what the prior commits // were for the partitions we were assigned. -func (g *groupConsumer) fetchOffsets(ctx context.Context, added, lost map[string][]int32) (err error) { +func (g *groupConsumer) fetchOffsets(ctx context.Context, added, lost map[string][]int32) (rerr error) { // we must use "rerr"! see introducing commit // If cooperative consuming, we may have to resume fetches. See the // comment on adjustCooperativeFetchOffsets. If we successfully fetch, // we clear what we were fetching. if g.cooperative { added = g.adjustCooperativeFetchOffsets(added, lost) defer func() { - if err == nil { + if rerr == nil { g.fetching = nil } }() @@ -1184,6 +1184,7 @@ start: } var resp *kmsg.OffsetFetchResponse + var err error fetchDone := make(chan struct{}) go func() {