fix(router): fix singleflight deduplication bugs and propagate response headers#2522
fix(router): fix singleflight deduplication bugs and propagate response headers#2522
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughAdds header-propagation wiring and runtime application via engine loader hooks, preserves a variablesHash in the variables normalization cache to avoid inbound singleflight collisions, adds warmed-cache singleflight tests, and bumps github.com/wundergraph/graphql-go-tools/v2 dependency. Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Comment |
❌ Internal Query Planner CI checks failedThe Internal Query Planner CI checks failed in the celestial repository, and this is going to stop the merge of this PR. |
There was a problem hiding this comment.
🧹 Nitpick comments (2)
router/core/header_rule_engine.go (1)
423-449: Consider carrying the subgraph status code into response‑rule application.
applyResponseRuleMostRestrictiveCacheControlusesres.StatusCode, but the synthetic response here leaves it at 0. Passing the actual status code (e.g., fromresolve.ResponseInfo.StatusCode) will make cache‑control evaluation more faithful for singleflight followers.Proposed adjustment
-func (h *HeaderPropagation) ApplyResponseHeaderRules(ctx context.Context, headers http.Header, subgraphName string) { +func (h *HeaderPropagation) ApplyResponseHeaderRules(ctx context.Context, headers http.Header, statusCode int, subgraphName string) { propagation := getResponseHeaderPropagation(ctx) if propagation == nil { return } // Create a synthetic response to pass to applyResponseRule, which expects *http.Response resp := &http.Response{ - Header: headers, + Header: headers, + StatusCode: statusCode, } resp.Request = (&http.Request{}).WithContext(ctx)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@router/core/header_rule_engine.go` around lines 423 - 449, ApplyResponseHeaderRules currently creates a synthetic *http.Response with StatusCode == 0, breaking applyResponseRuleMostRestrictiveCacheControl which reads res.StatusCode; update ApplyResponseHeaderRules to extract the actual status code from the resolve response info in context (e.g. resolve.ResponseInfo.StatusCode or similar value available via ctx/getResponseHeaderPropagation) and assign it to resp.StatusCode before calling h.applyResponseRule for global and per-subgraph rules, falling back to 0 only if the resolved status code is unavailable.router-tests/singleflight_test.go (1)
1069-1092: Optional: usesync.WaitGroup.Gofor the worker goroutines.
Go 1.25’sWaitGroup.Goremoves the manualAdd/Donepairing and keeps the helper lean while retaining the readiness barrier.Possible refactor
- ready.Add(n) - done.Add(n) + ready.Add(n) trigger := make(chan struct{}) responses := make([]*testenv.TestResponse, n) for i := 0; i < n; i++ { - go func(idx int) { - ready.Done() - defer done.Done() - <-trigger - resp, err := xEnv.MakeGraphQLRequest(testenv.GraphQLRequest{Query: query}) - require.NoError(t, err) - require.Equal(t, http.StatusOK, resp.Response.StatusCode) - responses[idx] = resp - }(i) + idx := i + done.Go(func() { + ready.Done() + <-trigger + resp, err := xEnv.MakeGraphQLRequest(testenv.GraphQLRequest{Query: query}) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.Response.StatusCode) + responses[idx] = resp + }) }Based on learnings: In Go code (Go 1.25+), prefer using sync.WaitGroup.Go(func()) to run a function in a new goroutine, letting the WaitGroup manage Add/Done automatically. Avoid manual wg.Add(1) followed by go func() { defer wg.Done(); ... }() patterns. Apply this guidance across all Go files in the wundergraph/cosmo repository where concurrency is used.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@router-tests/singleflight_test.go` around lines 1069 - 1092, The runConcurrentSingleflightRequests helper currently manually manages ready.Add/ready.Done and done.Add/done.Done with go func; replace those manual Add/Done patterns by using sync.WaitGroup.Go to spawn the worker goroutines: remove ready.Add(n) and done.Add(n), and in the loop call ready.Go (or done.Go) to start each worker so the WaitGroup manages Add/Done automatically; inside the worker keep the barrier receive on trigger and the test assertions (resp, err checks) and update responses[idx] as before—reference runConcurrentSingleflightRequests, ready and done WaitGroup variables, and the anonymous worker goroutine to locate the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@router-tests/singleflight_test.go`:
- Around line 1069-1092: The runConcurrentSingleflightRequests helper currently
manually manages ready.Add/ready.Done and done.Add/done.Done with go func;
replace those manual Add/Done patterns by using sync.WaitGroup.Go to spawn the
worker goroutines: remove ready.Add(n) and done.Add(n), and in the loop call
ready.Go (or done.Go) to start each worker so the WaitGroup manages Add/Done
automatically; inside the worker keep the barrier receive on trigger and the
test assertions (resp, err checks) and update responses[idx] as before—reference
runConcurrentSingleflightRequests, ready and done WaitGroup variables, and the
anonymous worker goroutine to locate the change.
In `@router/core/header_rule_engine.go`:
- Around line 423-449: ApplyResponseHeaderRules currently creates a synthetic
*http.Response with StatusCode == 0, breaking
applyResponseRuleMostRestrictiveCacheControl which reads res.StatusCode; update
ApplyResponseHeaderRules to extract the actual status code from the resolve
response info in context (e.g. resolve.ResponseInfo.StatusCode or similar value
available via ctx/getResponseHeaderPropagation) and assign it to resp.StatusCode
before calling h.applyResponseRule for global and per-subgraph rules, falling
back to 0 only if the resolved status code is unavailable.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #2522 +/- ##
==========================================
+ Coverage 61.40% 61.85% +0.44%
==========================================
Files 231 231
Lines 24157 24218 +61
==========================================
+ Hits 14833 14979 +146
+ Misses 8061 7985 -76
+ Partials 1263 1254 -9
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
🧹 Nitpick comments (1)
router-tests/singleflight_test.go (1)
1139-1163: Consider usingsync.WaitGroup.Gofor cleaner goroutine management.The manual
wg.Add(n)followed bygo func() { defer wg.Done(); ... }()pattern works correctly but can be simplified usingsync.WaitGroup.Go(func())available in Go 1.25+, which manages Add/Done automatically.♻️ Suggested refactor
func runConcurrentSingleflightRequests(t *testing.T, xEnv *testenv.Environment, query string, n int) []*testenv.TestResponse { t.Helper() - var ready, done sync.WaitGroup - ready.Add(n) - done.Add(n) + var ready sync.WaitGroup trigger := make(chan struct{}) responses := make([]*testenv.TestResponse, n) + ready.Add(n) + var done sync.WaitGroup for i := 0; i < n; i++ { - go func(idx int) { + done.Go(func() { + idx := i // capture loop variable ready.Done() - defer done.Done() <-trigger resp, err := xEnv.MakeGraphQLRequest(testenv.GraphQLRequest{Query: query}) require.NoError(t, err) require.Equal(t, http.StatusOK, resp.Response.StatusCode) responses[idx] = resp - }(i) + }) } ready.Wait() close(trigger) done.Wait() return responses }Based on learnings: "In Go code (Go 1.25+), prefer using sync.WaitGroup.Go(func()) to run a function in a new goroutine, letting the WaitGroup manage Add/Done automatically."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@router-tests/singleflight_test.go` around lines 1139 - 1163, The test uses manual done.Add(n)/defer done.Done() for goroutine lifecycle but can be simplified with sync.WaitGroup.Go; remove done.Add(n), replace the go func launch with done.Go(func() { ready.Done(); <-trigger; resp, err := xEnv.MakeGraphQLRequest(...); require.NoError(...); require.Equal(...); responses[idx] = resp }) so the done WaitGroup auto-Add/Done is handled by Go, keep ready as the barrier (keep ready.Add(n) and ready.Wait()) and keep the existing ready.Done() call inside the goroutine to signal readiness.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@router-tests/singleflight_test.go`:
- Around line 1139-1163: The test uses manual done.Add(n)/defer done.Done() for
goroutine lifecycle but can be simplified with sync.WaitGroup.Go; remove
done.Add(n), replace the go func launch with done.Go(func() { ready.Done();
<-trigger; resp, err := xEnv.MakeGraphQLRequest(...); require.NoError(...);
require.Equal(...); responses[idx] = resp }) so the done WaitGroup auto-Add/Done
is handled by Go, keep ready as the barrier (keep ready.Add(n) and ready.Wait())
and keep the existing ready.Done() call inside the goroutine to signal
readiness.
…nse headers - Fix VariablesHash not restored from normalization cache, causing different-variable requests to collide in inbound singleflight - Add ApplyResponseHeaderRules for singleflight followers whose OnOriginResponse never fires - Wire GetDeduplicationData/SetDeduplicationData via generic SetDeduplicationCallbacks to share response headers from leader to follower inbound requests - Update graphql-go-tools to c28d5e183ea3 (PR #1389) - Add 4 new singleflight tests (variable collision, header Set rule, Cache-Control, multiple Set rules) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Replace silent if-guard with require.NotNil in variable collision test - Use exact Cache-Control value assertions instead of NotEmpty - Add inline comment explaining VariablesHash cache-hit fix - Document double-application behavior for singleflight leaders Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Add mutex around propagation.header.Set() in applyResponseRule to prevent data race when concurrent subgraph responses apply Set rules - Extract runConcurrentSingleflightRequests helper to deduplicate barrier+goroutine boilerplate across 3 singleflight header tests Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ight leaders Track which subgraphs had OnOriginResponse fire in responseHeaderPropagation.originResponseApplied. ApplyResponseHeaderRules (called from OnFinished for all fetches) skips subgraphs already handled by the leader's OnOriginResponse, preventing double-application of rules like Append or MostRestrictiveCacheControl. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…iginResponse Remove response header rule application from OnOriginResponse transport post-handler and apply rules only in OnFinished (engine loader hooks). This ensures both singleflight leaders and followers are handled uniformly without the flawed originResponseApplied tracking map. Add tests for multi-subgraph and subgraph-specific response header rules with singleflight. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2c36f52 to
ac919fa
Compare
There was a problem hiding this comment.
🧹 Nitpick comments (1)
router-tests/singleflight_test.go (1)
872-949: Prefersync.WaitGroup.Gofor goroutine lifecycle management.The
doneWaitGroup in this block can useGo()to avoid manual Add/Done calls, keeping goroutine lifecycle handling consistent with the preferred pattern in Go 1.25+.♻️ Suggested refactor
- done.Add(total) trigger := make(chan struct{}) ... for _, id := range variableValues { + id := id for j := 0; j < numPerVariable; j++ { + slot := idx + idx++ - go func(slot, varVal int) { + done.Go(func() { ready.Done() - defer done.Done() <-trigger res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ Query: `query($id: Int!) { employee(id: $id) { id details { forename } } }`, Variables: []byte(fmt.Sprintf(`{"id": %d}`, varVal)), }) - results[slot] = result{body: res.Body, requested: varVal} - }(idx, id) - idx++ + results[slot] = result{body: res.Body, requested: id} + }) } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@router-tests/singleflight_test.go` around lines 872 - 949, The test "different variables with warm cache should not collide in inbound dedup" uses done.Add(total) and manual defer done.Done() inside spawned goroutines; replace that manual lifecycle with the sync.WaitGroup Go method: remove done.Add(total) and wrap each goroutine body with done.Go(func() { ... }) (keep the ready.Done / ready.Wait logic as-is so readiness coordination is unchanged), ensuring you still pass slot and varVal into the closure to avoid capture issues and eliminating the deferred done.Done() calls.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@router-tests/singleflight_test.go`:
- Around line 872-949: The test "different variables with warm cache should not
collide in inbound dedup" uses done.Add(total) and manual defer done.Done()
inside spawned goroutines; replace that manual lifecycle with the sync.WaitGroup
Go method: remove done.Add(total) and wrap each goroutine body with
done.Go(func() { ... }) (keep the ready.Done / ready.Wait logic as-is so
readiness coordination is unchanged), ensuring you still pass slot and varVal
into the closure to avoid capture issues and eliminating the deferred
done.Done() calls.
… returns Move header propagation to the top of OnFinished so it runs for ALL fetches including entity resolution. The tracing/metrics guards (reqContext, hookCtx) may not pass for entity resolution contexts, causing subgraph-specific response header rules to be silently skipped. Add test combining global All + per-subgraph rules with a multi-subgraph query that triggers entity resolution (employees + family). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
🧹 Nitpick comments (1)
router-tests/singleflight_test.go (1)
1200-1216: Usesync.WaitGroup.Go()instead of manual Add/Done for goroutine tracking.The project targets Go 1.25, which includes the
WaitGroup.Go()helper for spawning goroutines with automatic Add/Done management. This pattern is cleaner and safer than the current manual approach. Note: thereadyWaitGroup serves as a synchronization barrier (requiring explicitAdd/Donecalls), but thedoneWaitGroup can be converted to use.Go().♻️ Proposed refactor
func runConcurrentSingleflightRequests(t *testing.T, xEnv *testenv.Environment, query string, n int) []*testenv.TestResponse { t.Helper() var ready, done sync.WaitGroup ready.Add(n) - done.Add(n) trigger := make(chan struct{}) responses := make([]*testenv.TestResponse, n) for i := 0; i < n; i++ { - go func(idx int) { + idx := i + done.Go(func() { ready.Done() - defer done.Done() <-trigger resp, err := xEnv.MakeGraphQLRequest(testenv.GraphQLRequest{Query: query}) require.NoError(t, err) require.Equal(t, http.StatusOK, resp.Response.StatusCode) responses[idx] = resp - }(i) + }) } ready.Wait() close(trigger) done.Wait() return responses }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@router-tests/singleflight_test.go` around lines 1200 - 1216, In runConcurrentSingleflightRequests replace the manual done.Add(n)/defer done.Done() pattern by using the WaitGroup.Go helper: keep ready as the explicit barrier (ready.Add(n)/ready.Done() stays), remove done.Add(n), and spawn each worker with done.Go(func() { <-trigger; resp, err := xEnv.MakeGraphQLRequest(...); require.NoError(t, err); require.Equal(t, http.StatusOK, resp.Response.StatusCode); responses[idx] = resp }) so the WaitGroup.Go handles the Add/Done automatically; ensure you still pass idx into the closure (go func(idx int) {...}(i) style) to avoid loop-variable capture.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@router-tests/singleflight_test.go`:
- Around line 1200-1216: In runConcurrentSingleflightRequests replace the manual
done.Add(n)/defer done.Done() pattern by using the WaitGroup.Go helper: keep
ready as the explicit barrier (ready.Add(n)/ready.Done() stays), remove
done.Add(n), and spawn each worker with done.Go(func() { <-trigger; resp, err :=
xEnv.MakeGraphQLRequest(...); require.NoError(t, err); require.Equal(t,
http.StatusOK, resp.Response.StatusCode); responses[idx] = resp }) so the
WaitGroup.Go handles the Add/Done automatically; ensure you still pass idx into
the closure (go func(idx int) {...}(i) style) to avoid loop-variable capture.
…bgraph cache control Covers three previously untested code paths: - Propagate Named rule forwarding actual subgraph response headers under singleflight - MostRestrictiveCacheControl merging across two subgraphs under singleflight - Subgraph-specific Propagate rule with entity resolution under singleflight Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Use sync.WaitGroup.Go() instead of manual Add/Done, remove orphaned comment in router.go. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Summary by CodeRabbit
Bug Fixes
Tests
Fixes two singleflight deduplication bugs:
VariablesHashwas always 0 on cache hits, causing inbound singleflight to collapse requests with different variables into the same key and return wrong data.OnOriginResponsefire, so response header rules (Set, Cache-Control propagation) and inbound dedup header copying were silently dropped.Updates
graphql-go-toolstoc28d5e183ea3(PR #1389), which providesresponseHeaderssharing inSingleFlightItemandSetDeduplicationCallbacksonresolve.Context. The router wires these mechanisms viaApplyResponseHeaderRulesinOnFinished(subgraph SF followers) and typedSetDeduplicationCallbacksclosures in the GraphQL handler (inbound SF followers). Also fixes a pre-existing data race inapplyResponseRuleforSetoperations.Checklist