From 169891c16aca38d0bf9838d61d8727340c292630 Mon Sep 17 00:00:00 2001 From: Jens Neuse Date: Thu, 19 Feb 2026 01:23:42 +0100 Subject: [PATCH 1/7] fix(resolve): fix flaky singleflight deduplication tests The tests used a followersEntered channel that signaled before followers actually called GetOrCreate/LoadOrStore. Under the race detector, the leader could finish and delete the singleflight key before followers entered, causing them to start fresh instead of deduplicating. Add followerCount atomic counter to InflightRequest and poll it in tests to confirm all followers have registered before releasing the data source. Co-Authored-By: Claude Opus 4.6 --- .../resolve/inbound_request_singleflight.go | 7 +- .../inbound_request_singleflight_test.go | 26 +++---- v2/pkg/engine/resolve/resolve_test.go | 71 +++++++++++-------- 3 files changed, 59 insertions(+), 45 deletions(-) diff --git a/v2/pkg/engine/resolve/inbound_request_singleflight.go b/v2/pkg/engine/resolve/inbound_request_singleflight.go index a67d8deccc..5d2f1e1fe3 100644 --- a/v2/pkg/engine/resolve/inbound_request_singleflight.go +++ b/v2/pkg/engine/resolve/inbound_request_singleflight.go @@ -3,6 +3,7 @@ package resolve import ( "encoding/binary" "sync" + "sync/atomic" "github.com/wundergraph/graphql-go-tools/v2/pkg/pool" ) @@ -46,8 +47,9 @@ type InflightRequest struct { Err error ID uint64 - HasFollowers bool - Mu sync.Mutex + HasFollowers bool + Mu sync.Mutex + followerCount atomic.Int32 } // GetOrCreate creates a new InflightRequest or returns an existing (shared) one @@ -90,6 +92,7 @@ func (r *InboundRequestSingleFlight) GetOrCreate(ctx *Context, response *GraphQL inflight, shared := shard.m.LoadOrStore(key, request) if shared { request = inflight.(*InflightRequest) + request.followerCount.Add(1) request.Mu.Lock() request.HasFollowers = true request.Mu.Unlock() diff --git a/v2/pkg/engine/resolve/inbound_request_singleflight_test.go b/v2/pkg/engine/resolve/inbound_request_singleflight_test.go index a82d372d4f..76468be79f 100644 --- a/v2/pkg/engine/resolve/inbound_request_singleflight_test.go +++ b/v2/pkg/engine/resolve/inbound_request_singleflight_test.go @@ -2,8 +2,10 @@ package resolve import ( "context" + "runtime" "sync" "testing" + "time" "github.com/wundergraph/graphql-go-tools/v2/pkg/ast" ) @@ -75,8 +77,7 @@ func TestInboundSingleFlight_FollowerReceivesLeaderError(t *testing.T) { } // The follower calls GetOrCreate which blocks on inflight.Done. - // We wait for HasFollowers to be set before calling FinishErr. - followerReady := make(chan struct{}) + // We wait for followerCount to confirm it has entered before calling FinishErr. var wg sync.WaitGroup wg.Add(1) @@ -85,25 +86,20 @@ func TestInboundSingleFlight_FollowerReceivesLeaderError(t *testing.T) { followerCtx := NewContext(context.Background()) followerCtx.Request.ID = 2 - // Signal that we're about to enter GetOrCreate. HasFollowers will be - // set inside GetOrCreate before the select blocks, so closing - // followerReady here is slightly early, but we poll HasFollowers below. - close(followerReady) - _, followerErr := sf.GetOrCreate(followerCtx, response) if followerErr == nil { t.Error("expected error from follower after leader FinishErr") } }() - <-followerReady - // Spin until the follower has actually registered (set HasFollowers) - for { - inflight.Mu.Lock() - ready := inflight.HasFollowers - inflight.Mu.Unlock() - if ready { - break + // Poll until the follower has actually registered inside GetOrCreate. + deadline := time.After(time.Second) + for inflight.followerCount.Load() < 1 { + select { + case <-deadline: + t.Fatal("timeout waiting for follower to enter singleflight") + default: + runtime.Gosched() } } diff --git a/v2/pkg/engine/resolve/resolve_test.go b/v2/pkg/engine/resolve/resolve_test.go index 287ab72a63..bd7a23373b 100644 --- a/v2/pkg/engine/resolve/resolve_test.go +++ b/v2/pkg/engine/resolve/resolve_test.go @@ -8,6 +8,7 @@ import ( "io" "net" "net/http" + "runtime" "sync" "sync/atomic" "testing" @@ -149,6 +150,40 @@ func (w *blockingWriter) String() string { return w.buf.String() } +// findAnyInflight iterates through all singleflight shards and returns +// the first inflight request found. Used in tests to poll followerCount. +func findAnyInflight(r *Resolver) *InflightRequest { + for i := range r.inboundRequestSingleFlight.shards { + var found *InflightRequest + r.inboundRequestSingleFlight.shards[i].m.Range(func(_, value any) bool { + found = value.(*InflightRequest) + return false + }) + if found != nil { + return found + } + } + return nil +} + +// waitForFollowerCount polls until the inflight request has at least count followers registered. +func waitForFollowerCount(t *testing.T, r *Resolver, count int32) { + t.Helper() + deadline := time.After(time.Second) + for { + inflight := findAnyInflight(r) + if inflight != nil && inflight.followerCount.Load() >= count { + return + } + select { + case <-deadline: + t.Fatal("timeout waiting for followers to enter singleflight") + default: + runtime.Gosched() + } + } +} + type TestErrorWriter struct { } @@ -4694,30 +4729,20 @@ func TestResolver_ArenaResolveGraphQLResponse_RequestDeduplication(t *testing.T) t.Fatalf("timeout waiting for leader data source load") } - startFollowers := make(chan struct{}) - followersEntered := make(chan struct{}, requestCount-1) - for i := 1; i < requestCount; i++ { go func(i int) { defer wg.Done() ctx := ctxTemplate - <-startFollowers - followersEntered <- struct{}{} buf := &bytes.Buffer{} info, err := r.ArenaResolveGraphQLResponse(&ctx, response, buf) results[i] = result{info: info, output: buf.String(), err: err} }(i) } - close(startFollowers) - - for i := 1; i < requestCount; i++ { - select { - case <-followersEntered: - case <-time.After(time.Second): - t.Fatalf("timeout waiting for follower %d to start", i) - } - } + // Wait until all followers have entered the singleflight (called LoadOrStore) + // before releasing the data source. This guarantees they join the leader's + // inflight request rather than creating their own. + waitForFollowerCount(t, r, int32(requestCount-1)) ds.Release() @@ -4823,9 +4848,6 @@ func TestResolver_ArenaResolveGraphQLResponse_RequestDeduplication_SharedData(t t.Fatalf("timeout waiting for leader data source load") } - startFollowers := make(chan struct{}) - followersEntered := make(chan struct{}, requestCount-1) - for i := 1; i < requestCount; i++ { go func(i int) { defer wg.Done() @@ -4838,23 +4860,16 @@ func TestResolver_ArenaResolveGraphQLResponse_RequestDeduplication_SharedData(t followerData.Store(i, data) }, ) - <-startFollowers - followersEntered <- struct{}{} buf := &bytes.Buffer{} info, err := r.ArenaResolveGraphQLResponse(&ctx, response, buf) results[i] = result{info: info, output: buf.String(), err: err} }(i) } - close(startFollowers) - - for i := 1; i < requestCount; i++ { - select { - case <-followersEntered: - case <-time.After(time.Second): - t.Fatalf("timeout waiting for follower %d to start", i) - } - } + // Wait until all followers have entered the singleflight (called LoadOrStore) + // before releasing the data source. This guarantees they join the leader's + // inflight request rather than creating their own. + waitForFollowerCount(t, r, int32(requestCount-1)) ds.Release() From 4456d67a058c39f2ca4a83e25cd31718abaf24e8 Mon Sep 17 00:00:00 2001 From: Jens Neuse Date: Thu, 19 Feb 2026 01:29:29 +0100 Subject: [PATCH 2/7] refactor(resolve): remove redundant HasFollowers/Mu from InflightRequest The followerCount atomic.Int32 already tracks follower presence. Replace the mutex-guarded HasFollowers bool with followerCount.Load() > 0, removing two struct fields and ~10 lines of lock/unlock code with identical correctness. Co-Authored-By: Claude Opus 4.6 --- v2/pkg/engine/resolve/inbound_request_singleflight.go | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/v2/pkg/engine/resolve/inbound_request_singleflight.go b/v2/pkg/engine/resolve/inbound_request_singleflight.go index 5d2f1e1fe3..54678b7668 100644 --- a/v2/pkg/engine/resolve/inbound_request_singleflight.go +++ b/v2/pkg/engine/resolve/inbound_request_singleflight.go @@ -47,8 +47,6 @@ type InflightRequest struct { Err error ID uint64 - HasFollowers bool - Mu sync.Mutex followerCount atomic.Int32 } @@ -93,9 +91,6 @@ func (r *InboundRequestSingleFlight) GetOrCreate(ctx *Context, response *GraphQL if shared { request = inflight.(*InflightRequest) request.followerCount.Add(1) - request.Mu.Lock() - request.HasFollowers = true - request.Mu.Unlock() select { case <-request.Done: if request.Err != nil { @@ -116,10 +111,7 @@ func (r *InboundRequestSingleFlight) FinishOk(req *InflightRequest, data []byte) } shard := r.shardFor(req.ID) shard.m.Delete(req.ID) - req.Mu.Lock() - hasFollowers := req.HasFollowers - req.Mu.Unlock() - if hasFollowers { + if req.followerCount.Load() > 0 { // optimization to only copy when we actually have to req.Data = make([]byte, len(data)) copy(req.Data, data) From 085c0b97064e8f9e32c9a416e260a0559be92619 Mon Sep 17 00:00:00 2001 From: Jens Neuse Date: Thu, 19 Feb 2026 10:39:04 +0100 Subject: [PATCH 3/7] fix(resolve): fix flaky subscription test for dual-subscriber trigger The "two subscriptions to the same trigger" test was flaky because the data source's emitting goroutine could send counter=0 before sub2's addSubscription event was processed on the unbuffered events channel. Gate the data source start via the onStart callback until sub2 is registered. Co-Authored-By: Claude Opus 4.6 --- v2/pkg/engine/resolve/resolve_test.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/v2/pkg/engine/resolve/resolve_test.go b/v2/pkg/engine/resolve/resolve_test.go index bd7a23373b..5458f34c63 100644 --- a/v2/pkg/engine/resolve/resolve_test.go +++ b/v2/pkg/engine/resolve/resolve_test.go @@ -6441,10 +6441,21 @@ func TestResolver_ResolveGraphQLSubscription(t *testing.T) { c, cancel := context.WithCancel(context.Background()) defer cancel() + // sub2Ready gates the data source goroutine so that it doesn't start + // emitting before sub2 has been registered on the trigger. Without this, + // the emitting goroutine's first triggerUpdate can race sub2's + // addSubscription on the unbuffered events channel, causing sub2 to + // miss counter=0. + sub2Ready := make(chan struct{}) fakeStream := createFakeStream(func(counter int) (message string, done bool) { return fmt.Sprintf(`{"data":{"counter":%d}}`, counter), counter == 100 }, 1*time.Millisecond, func(input []byte) { assert.Equal(t, `{"method":"POST","url":"http://localhost:4000","body":{"query":"subscription { counter }"}}`, string(input)) + // Block the data source goroutine until sub2 is registered. + // onStart runs inside the goroutine that calls Start(), not the + // event loop, so blocking here is safe — the event loop remains + // free to process sub2's addSubscription event. + <-sub2Ready }, func(ctx StartupHookContext, input []byte) (err error) { return nil }) @@ -6466,6 +6477,7 @@ func TestResolver_ResolveGraphQLSubscription(t *testing.T) { err2 := resolver1.AsyncResolveGraphQLSubscription(ctx2, plan1, recorder2, id2) assert.NoError(t, err2) + close(sub2Ready) // complete is called only on the last recorder recorder1.AwaitComplete(t, defaultTimeout) From c7a8188e31e787ef344c771cf5ce9b71934cf1f8 Mon Sep 17 00:00:00 2001 From: Jens Neuse Date: Thu, 19 Feb 2026 10:45:32 +0100 Subject: [PATCH 4/7] fix(subscription): fix goroutine outliving test in TestHandler_Handle MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Go 1.24+ panics when t.Fail() is called from a goroutine after the test has completed. Two sendChatMutation calls were launched in goroutines that could outlive their subtests. Call them synchronously instead — the HTTP request completes quickly and doesn't need to be async. Co-Authored-By: Claude Opus 4.6 --- execution/subscription/legacy_handler_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/execution/subscription/legacy_handler_test.go b/execution/subscription/legacy_handler_test.go index 8ccb631711..1ca4933258 100644 --- a/execution/subscription/legacy_handler_test.go +++ b/execution/subscription/legacy_handler_test.go @@ -417,7 +417,7 @@ func TestHandler_Handle(t *testing.T) { time.Sleep(10 * time.Millisecond) cancelFunc() - go sendChatMutation(t, chatServer.URL) + sendChatMutation(t, chatServer.URL) require.Eventually(t, func() bool { return client.hasMoreMessagesThan(0) @@ -481,7 +481,7 @@ func TestHandler_Handle(t *testing.T) { time.Sleep(10 * time.Millisecond) cancelFunc() - go sendChatMutation(t, chatServer.URL) + sendChatMutation(t, chatServer.URL) require.Eventually(t, func() bool { return client.hasMoreMessagesThan(0) From fcdd773d9b20ef6a5d2a51e4dfa27f689afb9c42 Mon Sep 17 00:00:00 2001 From: Jens Neuse Date: Thu, 19 Feb 2026 12:04:22 +0100 Subject: [PATCH 5/7] fix(resolve): address PR review feedback on polling loops Replace runtime.Gosched() with time.Sleep(10ms) to reduce CPU usage in test polling loops. Increase deadline from 1s to 3s to accommodate slow CI runners. Co-Authored-By: Claude Opus 4.6 --- v2/pkg/engine/resolve/inbound_request_singleflight_test.go | 5 ++--- v2/pkg/engine/resolve/resolve_test.go | 5 ++--- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/v2/pkg/engine/resolve/inbound_request_singleflight_test.go b/v2/pkg/engine/resolve/inbound_request_singleflight_test.go index 76468be79f..b11d80c989 100644 --- a/v2/pkg/engine/resolve/inbound_request_singleflight_test.go +++ b/v2/pkg/engine/resolve/inbound_request_singleflight_test.go @@ -2,7 +2,6 @@ package resolve import ( "context" - "runtime" "sync" "testing" "time" @@ -93,13 +92,13 @@ func TestInboundSingleFlight_FollowerReceivesLeaderError(t *testing.T) { }() // Poll until the follower has actually registered inside GetOrCreate. - deadline := time.After(time.Second) + deadline := time.After(3 * time.Second) for inflight.followerCount.Load() < 1 { select { case <-deadline: t.Fatal("timeout waiting for follower to enter singleflight") default: - runtime.Gosched() + time.Sleep(10 * time.Millisecond) } } diff --git a/v2/pkg/engine/resolve/resolve_test.go b/v2/pkg/engine/resolve/resolve_test.go index 5458f34c63..56619c9035 100644 --- a/v2/pkg/engine/resolve/resolve_test.go +++ b/v2/pkg/engine/resolve/resolve_test.go @@ -8,7 +8,6 @@ import ( "io" "net" "net/http" - "runtime" "sync" "sync/atomic" "testing" @@ -169,7 +168,7 @@ func findAnyInflight(r *Resolver) *InflightRequest { // waitForFollowerCount polls until the inflight request has at least count followers registered. func waitForFollowerCount(t *testing.T, r *Resolver, count int32) { t.Helper() - deadline := time.After(time.Second) + deadline := time.After(3 * time.Second) for { inflight := findAnyInflight(r) if inflight != nil && inflight.followerCount.Load() >= count { @@ -179,7 +178,7 @@ func waitForFollowerCount(t *testing.T, r *Resolver, count int32) { case <-deadline: t.Fatal("timeout waiting for followers to enter singleflight") default: - runtime.Gosched() + time.Sleep(10 * time.Millisecond) } } } From 42ee9a0c2675443af9ce80ab8a5b47db4bbfb000 Mon Sep 17 00:00:00 2001 From: Jens Neuse Date: Thu, 19 Feb 2026 12:56:29 +0100 Subject: [PATCH 6/7] refactor(resolve): add AddFollower/HasFollowers methods to InflightRequest Wrap the atomic followerCount operations behind AddFollower() and HasFollowers() methods to keep production code cleaner. Co-Authored-By: Claude Opus 4.6 --- .../engine/resolve/inbound_request_singleflight.go | 12 ++++++++++-- .../resolve/inbound_request_singleflight_test.go | 2 +- v2/pkg/engine/resolve/resolve_test.go | 1 + 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/v2/pkg/engine/resolve/inbound_request_singleflight.go b/v2/pkg/engine/resolve/inbound_request_singleflight.go index 54678b7668..5affdb4d36 100644 --- a/v2/pkg/engine/resolve/inbound_request_singleflight.go +++ b/v2/pkg/engine/resolve/inbound_request_singleflight.go @@ -50,6 +50,14 @@ type InflightRequest struct { followerCount atomic.Int32 } +func (r *InflightRequest) AddFollower() { + r.followerCount.Add(1) +} + +func (r *InflightRequest) HasFollowers() bool { + return r.followerCount.Load() > 0 +} + // GetOrCreate creates a new InflightRequest or returns an existing (shared) one // The first caller to create an InflightRequest for a given key is a leader, everyone else a follower // GetOrCreate blocks until ctx.ctx.Done() returns or InflightRequest.Done is closed @@ -90,7 +98,7 @@ func (r *InboundRequestSingleFlight) GetOrCreate(ctx *Context, response *GraphQL inflight, shared := shard.m.LoadOrStore(key, request) if shared { request = inflight.(*InflightRequest) - request.followerCount.Add(1) + request.AddFollower() select { case <-request.Done: if request.Err != nil { @@ -111,7 +119,7 @@ func (r *InboundRequestSingleFlight) FinishOk(req *InflightRequest, data []byte) } shard := r.shardFor(req.ID) shard.m.Delete(req.ID) - if req.followerCount.Load() > 0 { + if req.HasFollowers() { // optimization to only copy when we actually have to req.Data = make([]byte, len(data)) copy(req.Data, data) diff --git a/v2/pkg/engine/resolve/inbound_request_singleflight_test.go b/v2/pkg/engine/resolve/inbound_request_singleflight_test.go index b11d80c989..8198b8723d 100644 --- a/v2/pkg/engine/resolve/inbound_request_singleflight_test.go +++ b/v2/pkg/engine/resolve/inbound_request_singleflight_test.go @@ -93,7 +93,7 @@ func TestInboundSingleFlight_FollowerReceivesLeaderError(t *testing.T) { // Poll until the follower has actually registered inside GetOrCreate. deadline := time.After(3 * time.Second) - for inflight.followerCount.Load() < 1 { + for !inflight.HasFollowers() { select { case <-deadline: t.Fatal("timeout waiting for follower to enter singleflight") diff --git a/v2/pkg/engine/resolve/resolve_test.go b/v2/pkg/engine/resolve/resolve_test.go index 56619c9035..80349bf0c6 100644 --- a/v2/pkg/engine/resolve/resolve_test.go +++ b/v2/pkg/engine/resolve/resolve_test.go @@ -183,6 +183,7 @@ func waitForFollowerCount(t *testing.T, r *Resolver, count int32) { } } + type TestErrorWriter struct { } From 13d26a65309edb854630e11a0ebfe80cbf9ee334 Mon Sep 17 00:00:00 2001 From: Jens Neuse Date: Thu, 19 Feb 2026 13:47:07 +0100 Subject: [PATCH 7/7] fix(resolve): remove extra blank line to fix gci linter Co-Authored-By: Claude Opus 4.6 --- v2/pkg/engine/resolve/resolve_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/v2/pkg/engine/resolve/resolve_test.go b/v2/pkg/engine/resolve/resolve_test.go index 80349bf0c6..56619c9035 100644 --- a/v2/pkg/engine/resolve/resolve_test.go +++ b/v2/pkg/engine/resolve/resolve_test.go @@ -183,7 +183,6 @@ func waitForFollowerCount(t *testing.T, r *Resolver, count int32) { } } - type TestErrorWriter struct { }