Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion router-tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ require (
github.com/wundergraph/cosmo/demo/pkg/subgraphs/projects v0.0.0-20250715110703-10f2e5f9c79e
github.com/wundergraph/cosmo/router v0.0.0-20260213130455-6e3277e7b850
github.com/wundergraph/cosmo/router-plugin v0.0.0-20250808194725-de123ba1c65e
github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.250
github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.251
go.opentelemetry.io/otel v1.36.0
go.opentelemetry.io/otel/sdk v1.36.0
go.opentelemetry.io/otel/sdk/metric v1.36.0
Expand Down
4 changes: 2 additions & 2 deletions router-tests/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -354,8 +354,8 @@ github.com/wundergraph/astjson v1.0.0 h1:rETLJuQkMWWW03HCF6WBttEBOu8gi5vznj5KEUP
github.com/wundergraph/astjson v1.0.0/go.mod h1:h12D/dxxnedtLzsKyBLK7/Oe4TAoGpRVC9nDpDrZSWw=
github.com/wundergraph/go-arena v1.1.0 h1:9+wSRkJAkA2vbYHp6s8tEGhPViRGQNGXqPHT0QzhdIc=
github.com/wundergraph/go-arena v1.1.0/go.mod h1:ROOysEHWJjLQ8FSfNxZCziagb7Qw2nXY3/vgKRh7eWw=
github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.250 h1:dggpvV2VI+2zMVXRm/1NsasT7Nxne6SRYR+GSshl5P8=
github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.250/go.mod h1:MFbY0QI8ncF60DHs7yyyiyyhWyld0WE1JokiyTVY8j4=
github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.251 h1:avZIXjYGTLliqS+RCZXlnFAaJdEr9HizulP1qhNLR1U=
github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.251/go.mod h1:MFbY0QI8ncF60DHs7yyyiyyhWyld0WE1JokiyTVY8j4=
github.com/xrash/smetrics v0.0.0-20250705151800-55b8f293f342 h1:FnBeRrxr7OU4VvAzt5X7s6266i6cSVkkFPS0TuXWbIg=
github.com/xrash/smetrics v0.0.0-20250705151800-55b8f293f342/go.mod h1:Ohn+xnUBiLI6FVj/9LpzZWtj1/D6lUovWYBkxHVV3aM=
github.com/yosida95/uritemplate/v3 v3.0.2 h1:Ed3Oyj9yrmi9087+NczuL5BwkIc4wvTb5zIM+UJPGz4=
Expand Down
291 changes: 291 additions & 0 deletions router-tests/singleflight_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -869,4 +869,295 @@ func TestSingleFlight(t *testing.T) {
require.Equal(t, numOfOperations, globalRequests)
})
})
t.Run("different variables with warm cache should not collide in inbound dedup", func(t *testing.T) {
t.Parallel()
testenv.Run(t, &testenv.Config{
Subgraphs: testenv.SubgraphsConfig{
GlobalDelay: time.Millisecond * 100,
},
RouterOptions: []core.Option{
core.WithEngineExecutionConfig(config.EngineExecutionConfiguration{
EnableSingleFlight: true,
ForceEnableSingleFlight: false,
EnableInboundRequestDeduplication: true,
MaxConcurrentResolvers: 0,
}),
},
}, func(t *testing.T, xEnv *testenv.Environment) {
variableValues := []int{1, 2, 3, 4, 5}

// Phase 1: Warm the variables normalization cache with sequential requests
for _, id := range variableValues {
res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{
Query: `query($id: Int!) { employee(id: $id) { id details { forename } } }`,
Variables: []byte(fmt.Sprintf(`{"id": %d}`, id)),
})
v, err := astjson.Parse(res.Body)
require.NoError(t, err)
emp := v.Get("data", "employee")
require.NotNil(t, emp, "expected employee object in response for id=%d", id)
require.Equal(t, id, emp.GetInt("id"))
}

// Phase 2: Send concurrent requests with different variables (cache is now warm)
// Without the fix, all cache-hit requests get VariablesHash=0, causing them to
// collide in the inbound singleflight and return wrong data.
numPerVariable := 2
total := numPerVariable * len(variableValues)
var (
ready, done sync.WaitGroup
)
ready.Add(total)
done.Add(total)
trigger := make(chan struct{})

type result struct {
body string
requested int
}
results := make([]result, total)

idx := 0
for _, id := range variableValues {
for j := 0; j < numPerVariable; j++ {
go func(slot, varVal int) {
ready.Done()
defer done.Done()
<-trigger
res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{
Query: `query($id: Int!) { employee(id: $id) { id details { forename } } }`,
Variables: []byte(fmt.Sprintf(`{"id": %d}`, varVal)),
})
results[slot] = result{body: res.Body, requested: varVal}
}(idx, id)
idx++
Comment thread
jensneuse marked this conversation as resolved.
Outdated
}
}
ready.Wait()
close(trigger)
done.Wait()

// Verify each response matches its requested variable (no cross-contamination)
for _, r := range results {
v, err := astjson.Parse(r.body)
require.NoError(t, err)
emp := v.Get("data", "employee")
require.NotNil(t, emp, "expected employee object in response for id=%d", r.requested)
actualID := emp.GetInt("id")
require.Equal(t, r.requested, actualID,
"response for variable id=%d returned employee id=%d (cross-contamination)", r.requested, actualID)
}
})
})
t.Run("response header set rule with singleflight followers", func(t *testing.T) {
t.Parallel()
testenv.Run(t, &testenv.Config{
Subgraphs: testenv.SubgraphsConfig{
GlobalDelay: time.Millisecond * 100,
},
RouterOptions: []core.Option{
core.WithEngineExecutionConfig(config.EngineExecutionConfiguration{
EnableSingleFlight: true,
MaxConcurrentResolvers: 0,
}),
core.WithHeaderRules(config.HeaderRules{
All: &config.GlobalHeaderRule{
Response: []*config.ResponseHeaderRule{
{
Operation: config.HeaderRuleOperationSet,
Name: "X-Custom-Header",
Value: "test-value",
},
},
},
}),
},
}, func(t *testing.T, xEnv *testenv.Environment) {
responses := runConcurrentSingleflightRequests(t, xEnv, `{ employee(id: 1) { id } }`, 5)
for i, res := range responses {
require.Equal(t, `{"data":{"employee":{"id":1}}}`, res.Body)
require.Equal(t, "test-value", res.Response.Header.Get("X-Custom-Header"),
"response %d missing X-Custom-Header", i)
}
})
})
t.Run("cache control propagation with singleflight followers", func(t *testing.T) {
t.Parallel()
testenv.Run(t, &testenv.Config{
CacheControlPolicy: config.CacheControlPolicy{
Enabled: true,
Value: "max-age=300",
},
Subgraphs: testenv.SubgraphsConfig{
GlobalDelay: time.Millisecond * 100,
Employees: testenv.SubgraphConfig{
Middleware: func(handler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Cache-Control", "max-age=120")
handler.ServeHTTP(w, r)
})
},
},
},
RouterOptions: []core.Option{
core.WithEngineExecutionConfig(config.EngineExecutionConfiguration{
EnableSingleFlight: true,
MaxConcurrentResolvers: 0,
}),
},
}, func(t *testing.T, xEnv *testenv.Environment) {
// Verify single request works
res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{
Query: `{ employee(id: 1) { id } }`,
})
require.Equal(t, "max-age=120", res.Response.Header.Get("Cache-Control"), "single request should have Cache-Control")

responses := runConcurrentSingleflightRequests(t, xEnv, `{ employee(id: 1) { id } }`, 5)
for i, res := range responses {
require.Equal(t, "max-age=120", res.Response.Header.Get("Cache-Control"),
"response %d has wrong Cache-Control header", i)
}
})
})
t.Run("multiple response set rules with singleflight followers", func(t *testing.T) {
t.Parallel()
testenv.Run(t, &testenv.Config{
Subgraphs: testenv.SubgraphsConfig{
GlobalDelay: time.Millisecond * 100,
},
RouterOptions: []core.Option{
core.WithEngineExecutionConfig(config.EngineExecutionConfiguration{
EnableSingleFlight: true,
MaxConcurrentResolvers: 0,
}),
core.WithHeaderRules(config.HeaderRules{
All: &config.GlobalHeaderRule{
Response: []*config.ResponseHeaderRule{
{
Operation: config.HeaderRuleOperationSet,
Name: "X-Header-A",
Value: "value-a",
},
{
Operation: config.HeaderRuleOperationSet,
Name: "X-Header-B",
Value: "value-b",
},
},
},
}),
Comment thread
jensneuse marked this conversation as resolved.
},
}, func(t *testing.T, xEnv *testenv.Environment) {
// Verify single request works
res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{
Query: `{ employee(id: 1) { id } }`,
})
require.Equal(t, "value-a", res.Response.Header.Get("X-Header-A"), "single request should have X-Header-A")
require.Equal(t, "value-b", res.Response.Header.Get("X-Header-B"), "single request should have X-Header-B")

responses := runConcurrentSingleflightRequests(t, xEnv, `{ employee(id: 1) { id } }`, 5)
for i, res := range responses {
require.Equal(t, "value-a", res.Response.Header.Get("X-Header-A"),
"response %d missing X-Header-A", i)
require.Equal(t, "value-b", res.Response.Header.Get("X-Header-B"),
"response %d missing X-Header-B", i)
}
})
})
t.Run("multi-subgraph response header propagation with singleflight", func(t *testing.T) {
t.Parallel()
testenv.Run(t, &testenv.Config{
Subgraphs: testenv.SubgraphsConfig{
GlobalDelay: time.Millisecond * 100,
},
RouterOptions: []core.Option{
core.WithEngineExecutionConfig(config.EngineExecutionConfiguration{
EnableSingleFlight: true,
MaxConcurrentResolvers: 0,
}),
core.WithHeaderRules(config.HeaderRules{
All: &config.GlobalHeaderRule{
Response: []*config.ResponseHeaderRule{
{
Operation: config.HeaderRuleOperationSet,
Name: "X-Custom-Header",
Value: "multi-subgraph-value",
},
},
},
}),
},
}, func(t *testing.T, xEnv *testenv.Environment) {
// This query fans out to multiple subgraphs: employees, family, availability, mood
query := `{ employee(id: 1) { id details { forename surname } isAvailable currentMood } }`

responses := runConcurrentSingleflightRequests(t, xEnv, query, 5)
for i, res := range responses {
require.Contains(t, res.Body, `"employee"`)
require.Equal(t, "multi-subgraph-value", res.Response.Header.Get("X-Custom-Header"),
"response %d missing X-Custom-Header from multi-subgraph query", i)
}
})
})
t.Run("subgraph-specific response header rule with singleflight", func(t *testing.T) {
t.Parallel()
testenv.Run(t, &testenv.Config{
Subgraphs: testenv.SubgraphsConfig{
GlobalDelay: time.Millisecond * 100,
},
RouterOptions: []core.Option{
core.WithEngineExecutionConfig(config.EngineExecutionConfiguration{
EnableSingleFlight: true,
MaxConcurrentResolvers: 0,
}),
core.WithHeaderRules(config.HeaderRules{
Subgraphs: map[string]*config.GlobalHeaderRule{
"employees": {
Response: []*config.ResponseHeaderRule{
{
Operation: config.HeaderRuleOperationSet,
Name: "X-Subgraph-Header",
Value: "employees-value",
},
},
},
},
}),
},
}, func(t *testing.T, xEnv *testenv.Environment) {
// This query hits the employees subgraph, so the subgraph-specific rule should apply
responses := runConcurrentSingleflightRequests(t, xEnv, `{ employees { id } }`, 5)
for i, res := range responses {
require.Equal(t, `{"data":{"employees":[{"id":1},{"id":2},{"id":3},{"id":4},{"id":5},{"id":7},{"id":8},{"id":10},{"id":11},{"id":12}]}}`, res.Body)
require.Equal(t, "employees-value", res.Response.Header.Get("X-Subgraph-Header"),
"response %d missing subgraph-specific X-Subgraph-Header", i)
}
})
})
}

// runConcurrentSingleflightRequests sends n identical GraphQL requests concurrently,
// using a barrier to maximize overlap and trigger singleflight deduplication.
func runConcurrentSingleflightRequests(t *testing.T, xEnv *testenv.Environment, query string, n int) []*testenv.TestResponse {
t.Helper()
var ready, done sync.WaitGroup
ready.Add(n)
done.Add(n)
trigger := make(chan struct{})
responses := make([]*testenv.TestResponse, n)
for i := 0; i < n; i++ {
go func(idx int) {
ready.Done()
defer done.Done()
<-trigger
resp, err := xEnv.MakeGraphQLRequest(testenv.GraphQLRequest{Query: query})
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.Response.StatusCode)
responses[idx] = resp
}(i)
Comment thread
jensneuse marked this conversation as resolved.
Outdated
}
ready.Wait()
close(trigger)
done.Wait()
return responses
}
9 changes: 9 additions & 0 deletions router/core/engine_loader_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type engineLoaderHooks struct {
metricAttributeExpressions *attributeExpressions

storeSubgraphResponseBody bool
headerPropagation *HeaderPropagation
}

type engineLoaderHooksRequestContext struct {
Expand All @@ -61,6 +62,7 @@ func NewEngineRequestHooks(
telemetryAttributes *attributeExpressions,
metricAttributes *attributeExpressions,
storeSubgraphResponseBody bool,
headerPropagation *HeaderPropagation,
) resolve.LoaderHooks {
var tracer trace.Tracer
if tracerProvider != nil {
Expand All @@ -83,6 +85,7 @@ func NewEngineRequestHooks(
metricAttributeExpressions: metricAttributes,
accessLogger: logger,
storeSubgraphResponseBody: storeSubgraphResponseBody,
headerPropagation: headerPropagation,
}
}

Expand Down Expand Up @@ -142,6 +145,12 @@ func (f *engineLoaderHooks) OnFinished(ctx context.Context, ds resolve.DataSourc
responseInfo = &resolve.ResponseInfo{}
}

// Apply response header propagation rules for this subgraph fetch.
// This handles both singleflight leaders and followers uniformly.
if f.headerPropagation != nil && responseInfo.ResponseHeaders != nil {
f.headerPropagation.ApplyResponseHeaderRules(ctx, responseInfo.ResponseHeaders, ds.Name, responseInfo.StatusCode, responseInfo.Request)
}

commonAttrs := []attribute.KeyValue{
semconv.HTTPStatusCode(responseInfo.StatusCode),
rotel.WgSubgraphID.String(ds.ID),
Expand Down
1 change: 1 addition & 0 deletions router/core/graph_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1417,6 +1417,7 @@ func (s *graphServer) buildGraphMux(
telemetryAttExpressions,
metricAttExpressions,
exprManager.VisitorManager.IsSubgraphResponseBodyUsedInExpressions(),
s.headerPropagation,
)

handlerOpts := HandlerOptions{
Expand Down
22 changes: 22 additions & 0 deletions router/core/graphql_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,28 @@ func (h *GraphQLHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {

if h.enableResponseHeaderPropagation {
resolveCtx = WithResponseHeaderPropagation(resolveCtx)
resolve.SetDeduplicationCallbacks(resolveCtx,
func(ctx context.Context) http.Header {
propagation := getResponseHeaderPropagation(ctx)
if propagation == nil {
return nil
}
propagation.m.Lock()
defer propagation.m.Unlock()
return propagation.header.Clone()
},
func(ctx context.Context, headers http.Header) {
propagation := getResponseHeaderPropagation(ctx)
if propagation == nil {
return
}
propagation.m.Lock()
defer propagation.m.Unlock()
for k, v := range headers {
propagation.header[k] = v
}
},
)
}

defer propagateSubgraphErrors(resolveCtx)
Expand Down
Loading
Loading