Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions v2/pkg/engine/datasource/httpclient/nethttpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,13 @@ func InjectResponseContext(ctx context.Context) (context.Context, *ResponseConte
return context.WithValue(ctx, responseContextKey{}, value), value
}

// GetResponseContext retrieves the ResponseContext previously injected into ctx
// via InjectResponseContext. Returns nil if no ResponseContext is present.
func GetResponseContext(ctx context.Context) *ResponseContext {
value, _ := ctx.Value(responseContextKey{}).(*ResponseContext)
return value
}

func setRequest(ctx context.Context, request *http.Request) {
if value, ok := ctx.Value(responseContextKey{}).(*ResponseContext); ok {
value.Request = request
Expand Down
39 changes: 39 additions & 0 deletions v2/pkg/engine/resolve/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,43 @@ type Context struct {
subgraphErrors map[string]error

SubgraphHeadersBuilder SubgraphHeadersBuilder

// GetDeduplicationData is called after the leader of an inbound singleflight request
// finishes resolving. It extracts data from the leader's context (e.g. accumulated
// response headers) that should be shared with all follower requests.
// The returned value is stored on the InflightRequest and passed to each follower's
// SetDeduplicationData callback before the follower writes its response.
// Use SetDeduplicationCallbacks to set both callbacks with type safety.
GetDeduplicationData func(ctx context.Context) any
// SetDeduplicationData is called for each follower of an inbound singleflight request,
// before the response body is written to the client. The data argument is the value
// returned by the leader's GetDeduplicationData call.
// Typical use: copy response header propagation state from the leader into the
// follower's context so that the response writer can set the correct HTTP headers.
// Use SetDeduplicationCallbacks to set both callbacks with type safety.
SetDeduplicationData func(ctx context.Context, data any)
}

// SetDeduplicationCallbacks is a generic helper that configures both GetDeduplicationData
// and SetDeduplicationData on a Context with compile-time type safety.
// The resolve package stores the data as "any" internally, but callers get typed callbacks:
//
// resolve.SetDeduplicationCallbacks(ctx,
// func(ctx context.Context) *MyHeaders { return extractHeaders(ctx) },
// func(ctx context.Context, h *MyHeaders) { applyHeaders(ctx, h) },
// )
//
// The get and set callbacks must use the same concrete type T. If the value returned by
// get cannot be asserted to T when passed to set, the set callback will be skipped.
func SetDeduplicationCallbacks[T any](c *Context, get func(ctx context.Context) T, set func(ctx context.Context, data T)) {
c.GetDeduplicationData = func(ctx context.Context) any {
return get(ctx)
}
c.SetDeduplicationData = func(ctx context.Context, data any) {
if typed, ok := data.(T); ok {
set(ctx, typed)
}
}
}

// SubgraphHeadersBuilder allows the user of the engine to "define" the headers for a subgraph request
Expand Down Expand Up @@ -276,6 +313,8 @@ func (c *Context) Free() {
c.subgraphErrors = nil
c.authorizer = nil
c.LoaderHooks = nil
c.GetDeduplicationData = nil
c.SetDeduplicationData = nil
}

type traceStartKey struct{}
Expand Down
12 changes: 8 additions & 4 deletions v2/pkg/engine/resolve/inbound_request_singleflight.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,13 @@ func NewRequestSingleFlight(shardCount int) *InboundRequestSingleFlight {
type InflightRequest struct {
Done chan struct{}
Data []byte
Err error
ID uint64
// SharedData carries opaque state from the leader to followers (e.g. accumulated
// response headers). Set by the leader via Context.GetDeduplicationData, read by
// followers via Context.SetDeduplicationData. Typed as "any" because the resolve
// package is data-agnostic — the caller decides the concrete type.
SharedData any
Err error
ID uint64

HasFollowers bool
Mu sync.Mutex
Expand Down Expand Up @@ -95,8 +100,7 @@ func (r *InboundRequestSingleFlight) GetOrCreate(ctx *Context, response *GraphQL
}
return request, nil
case <-ctx.ctx.Done():
request.Err = ctx.ctx.Err()
return nil, request.Err
return nil, ctx.ctx.Err()
}
}

Expand Down
112 changes: 112 additions & 0 deletions v2/pkg/engine/resolve/inbound_request_singleflight_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package resolve

import (
"context"
"sync"
"testing"

"github.com/wundergraph/graphql-go-tools/v2/pkg/ast"
)

// TestInboundSingleFlight_ConcurrentFollowerTimeout exercises the scenario where
// multiple followers time out concurrently. Before the fix, each follower wrote
// its context error to the shared request.Err field without synchronization,
// causing a data race. After the fix, followers return ctx.Err() directly
// without mutating shared state. Run with -race to verify.
func TestInboundSingleFlight_ConcurrentFollowerTimeout(t *testing.T) {
sf := NewRequestSingleFlight(1)
response := &GraphQLResponse{
Info: &GraphQLResponseInfo{
OperationType: ast.OperationTypeQuery,
},
}

// Leader creates the inflight request
leaderCtx := NewContext(context.Background())
leaderCtx.Request.ID = 1
inflight, err := sf.GetOrCreate(leaderCtx, response)
if err != nil {
t.Fatalf("leader GetOrCreate: %v", err)
}
if inflight == nil {
t.Fatal("expected inflight request from leader")
}

const numFollowers = 10
var wg sync.WaitGroup
wg.Add(numFollowers)

for i := 0; i < numFollowers; i++ {
go func() {
defer wg.Done()
ctx, cancel := context.WithCancel(context.Background())
followerCtx := NewContext(ctx)
followerCtx.Request.ID = 1

// Cancel immediately so the follower's context is done
cancel()

_, followerErr := sf.GetOrCreate(followerCtx, response)
if followerErr == nil {
t.Error("expected error from timed-out follower")
}
}()
}

wg.Wait()

// Clean up: finish the leader request
sf.FinishOk(inflight, []byte("ok"))
}

func TestInboundSingleFlight_FollowerReceivesLeaderError(t *testing.T) {
sf := NewRequestSingleFlight(1)
response := &GraphQLResponse{
Info: &GraphQLResponseInfo{
OperationType: ast.OperationTypeQuery,
},
}

leaderCtx := NewContext(context.Background())
leaderCtx.Request.ID = 2
inflight, err := sf.GetOrCreate(leaderCtx, response)
if err != nil {
t.Fatalf("leader GetOrCreate: %v", err)
}

// The follower calls GetOrCreate which blocks on inflight.Done.
// We wait for HasFollowers to be set before calling FinishErr.
followerReady := make(chan struct{})
var wg sync.WaitGroup
wg.Add(1)

go func() {
defer wg.Done()
followerCtx := NewContext(context.Background())
followerCtx.Request.ID = 2

// Signal that we're about to enter GetOrCreate. HasFollowers will be
// set inside GetOrCreate before the select blocks, so closing
// followerReady here is slightly early, but we poll HasFollowers below.
close(followerReady)

_, followerErr := sf.GetOrCreate(followerCtx, response)
if followerErr == nil {
t.Error("expected error from follower after leader FinishErr")
}
}()

<-followerReady
// Spin until the follower has actually registered (set HasFollowers)
for {
inflight.Mu.Lock()
ready := inflight.HasFollowers
inflight.Mu.Unlock()
if ready {
break
}
}

sf.FinishErr(inflight, context.DeadlineExceeded)
wg.Wait()
}
23 changes: 23 additions & 0 deletions v2/pkg/engine/resolve/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1717,6 +1717,21 @@ func (l *Loader) loadByContext(ctx context.Context, source DataSource, fetchItem
}

res.out = item.response
// Populate the ResponseContext that was injected by executeSourceLoad.
// This is the same pointer that executeSourceLoad reads when it assigns
// res.statusCode and res.httpResponseContext, so the follower's result
// fields will be set correctly even though no HTTP call was made.
if rc := httpclient.GetResponseContext(ctx); rc != nil {
rc.StatusCode = item.statusCode
if item.responseHeaders != nil {
// Minimal synthetic http.Response carrying only status and headers.
// Clone headers so each concurrent follower gets an independent copy.
rc.Response = &http.Response{
StatusCode: item.statusCode,
Header: item.responseHeaders.Clone(),
}
}
}
return nil
}

Expand All @@ -1733,6 +1748,14 @@ func (l *Loader) loadByContext(ctx context.Context, source DataSource, fetchItem
}

item.response = res.out
// Capture the leader's HTTP response metadata so followers can reuse it.
// The ResponseContext was populated by the HTTP client during loadByContextDirect.
if rc := httpclient.GetResponseContext(ctx); rc != nil {
item.statusCode = rc.StatusCode
if rc.Response != nil && rc.Response.Header != nil {
item.responseHeaders = rc.Response.Header.Clone()
}
}
return nil
}

Expand Down
13 changes: 13 additions & 0 deletions v2/pkg/engine/resolve/resolve.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,11 @@ func (r *Resolver) ArenaResolveGraphQLResponse(ctx *Context, response *GraphQLRe

if inflight != nil && inflight.Data != nil { // follower
resp.ResolveDeduplicated = true
// Apply the leader's shared state (e.g. response headers) to this follower's context
// before writing the response, so the response writer can propagate headers correctly.
if ctx.SetDeduplicationData != nil && inflight.SharedData != nil {
ctx.SetDeduplicationData(ctx.ctx, inflight.SharedData)
}
_, err = writer.Write(inflight.Data)
return resp, err
}
Expand Down Expand Up @@ -412,6 +417,14 @@ func (r *Resolver) ArenaResolveGraphQLResponse(ctx *Context, response *GraphQLRe
// as such, it can take some time
// which is why we split the arenas and released the first one
_, err = writer.Write(buf.Bytes())
// Extract data from the leader's context to share with singleflight followers.
// This runs after the leader has fully resolved and written its response, so all
// subgraph response headers have been accumulated on the leader's context.
// SharedData MUST be set BEFORE FinishOk, which closes the Done channel and
// unblocks followers. Otherwise followers could read SharedData before it is set.
if inflight != nil && ctx.GetDeduplicationData != nil {
inflight.SharedData = ctx.GetDeduplicationData(ctx.ctx)
}
r.inboundRequestSingleFlight.FinishOk(inflight, buf.Bytes())
// all data is written to the client
// we're safe to release our buffer
Expand Down
Loading