Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions execution/federationtesting/skipped_fetch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package federationtesting

import (
"io"
"net/http"
"net/http/httptest"
"sync/atomic"
"testing"

"github.com/jensneuse/abstractlogger"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/wundergraph/graphql-go-tools/execution/engine"
"github.com/wundergraph/graphql-go-tools/execution/graphql"
"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve"
)

func TestSkippedFetchOnNullParent(t *testing.T) {
// Users subgraph: returns null for the "user" field.
usersServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
_, _ = io.ReadAll(r.Body)
_, _ = w.Write([]byte(`{"data":{"user":null}}`))
}))
t.Cleanup(usersServer.Close)

// Reviews subgraph: tracks all requests. Should never be called at query time
// because the user is null and the entity fetch must be skipped.
var reviewsCalls atomic.Int64
reviewsServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
reviewsCalls.Add(1)
w.Header().Set("Content-Type", "application/json")
_, _ = io.ReadAll(r.Body)
_, _ = w.Write([]byte(`{"data":{"_entities":[]}}`))
}))
t.Cleanup(reviewsServer.Close)

const usersSDL = `type Query { user(id: ID!): User } type User @key(fields: "id") { id: ID! name: String! }`
const reviewsSDL = `type User @key(fields: "id") { id: ID! @external reviews: [Review] } type Review { body: String! }`

ctx := t.Context()
factory := engine.NewFederationEngineConfigFactory(ctx, []engine.SubgraphConfiguration{
{Name: "users", URL: usersServer.URL, SDL: usersSDL},
{Name: "reviews", URL: reviewsServer.URL, SDL: reviewsSDL},
})

engineConfig, err := factory.BuildEngineConfiguration()
require.NoError(t, err)

eng, err := engine.NewExecutionEngine(ctx, abstractlogger.NoopLogger, engineConfig, resolve.ResolverOptions{
MaxConcurrency: 1024,
})
require.NoError(t, err)

gqlRequest := &graphql.Request{
Query: `{ user(id: "1") { id name reviews { body } } }`,
}

resultWriter := graphql.NewEngineResultWriter()
err = eng.Execute(ctx, gqlRequest, &resultWriter)
require.NoError(t, err)

// The user is null, so the response should reflect that without panic.
assert.Equal(t, `{"data":{"user":null}}`, resultWriter.String())

// The reviews subgraph must NOT have been called — the entity fetch was skipped
// because the parent user is null.
assert.Equal(t, int64(0), reviewsCalls.Load(), "reviews subgraph should not be called when parent user is null")
}
40 changes: 19 additions & 21 deletions v2/pkg/engine/resolve/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,12 @@ func stringValueOnArena(a arena.Arena, s string) *astjson.Value {
}

type LoaderHooks interface {
// OnLoad is called before the fetch is executed
// OnLoad is called before a fetch is executed.
// The returned context is passed to OnFinished after the fetch completes.
// OnLoad is not called when the fetch is skipped (e.g. null parent data, auth rejection).
OnLoad(ctx context.Context, ds DataSourceInfo) context.Context
// OnFinished is called after the fetch has been executed and the response has been processed and merged
// OnFinished is called after a fetch has been executed and the response has been processed and merged.
// It is only called when OnLoad was called, i.e. when the fetch was not skipped.
OnFinished(ctx context.Context, ds DataSourceInfo, info *ResponseInfo)
}

Expand Down Expand Up @@ -139,8 +142,9 @@ type result struct {
rateLimitRejected bool
rateLimitRejectedReason string

// loaderHookContext used to share data between the OnLoad and OnFinished hooks
// It should be valid even when OnLoad isn't called
// loaderHookContext is set by OnLoad during fetch execution.
// It is nil when the fetch was skipped (e.g. null parent data, auth rejection),
// in which case OnFinished must not be called.
loaderHookContext context.Context

httpResponseContext *httpclient.ResponseContext
Expand Down Expand Up @@ -270,20 +274,14 @@ func (l *Loader) resolveParallel(nodes []*FetchTreeNode) error {
if results[i].nestedMergeItems != nil {
for j := range results[i].nestedMergeItems {
err = l.mergeResult(nodes[i].Item, results[i].nestedMergeItems[j], itemsItems[i][j:j+1])
if l.ctx.LoaderHooks != nil && results[i].nestedMergeItems[j].loaderHookContext != nil {
l.ctx.LoaderHooks.OnFinished(results[i].nestedMergeItems[j].loaderHookContext,
results[i].nestedMergeItems[j].ds,
newResponseInfo(results[i].nestedMergeItems[j], l.ctx.subgraphErrors))
}
l.callOnFinished(results[i].nestedMergeItems[j])
if err != nil {
return errors.WithStack(err)
}
}
} else {
err = l.mergeResult(nodes[i].Item, results[i], itemsItems[i])
if l.ctx.LoaderHooks != nil {
l.ctx.LoaderHooks.OnFinished(results[i].loaderHookContext, results[i].ds, newResponseInfo(results[i], l.ctx.subgraphErrors))
}
l.callOnFinished(results[i])
if err != nil {
return errors.WithStack(err)
}
Expand Down Expand Up @@ -316,9 +314,7 @@ func (l *Loader) resolveSingle(item *FetchItem) error {
return err
}
err = l.mergeResult(item, res, items)
if l.ctx.LoaderHooks != nil {
l.ctx.LoaderHooks.OnFinished(res.loaderHookContext, res.ds, newResponseInfo(res, l.ctx.subgraphErrors))
}
l.callOnFinished(res)
return err
case *BatchEntityFetch:
res := &result{}
Expand All @@ -328,9 +324,7 @@ func (l *Loader) resolveSingle(item *FetchItem) error {
return errors.WithStack(err)
}
err = l.mergeResult(item, res, items)
if l.ctx.LoaderHooks != nil {
l.ctx.LoaderHooks.OnFinished(res.loaderHookContext, res.ds, newResponseInfo(res, l.ctx.subgraphErrors))
}
l.callOnFinished(res)
return err
case *EntityFetch:
res := &result{}
Expand All @@ -339,15 +333,19 @@ func (l *Loader) resolveSingle(item *FetchItem) error {
return errors.WithStack(err)
}
err = l.mergeResult(item, res, items)
if l.ctx.LoaderHooks != nil {
l.ctx.LoaderHooks.OnFinished(res.loaderHookContext, res.ds, newResponseInfo(res, l.ctx.subgraphErrors))
}
l.callOnFinished(res)
return err
default:
return nil
}
}

func (l *Loader) callOnFinished(res *result) {
if l.ctx.LoaderHooks != nil && res.loaderHookContext != nil {
l.ctx.LoaderHooks.OnFinished(res.loaderHookContext, res.ds, newResponseInfo(res, l.ctx.subgraphErrors))
}
}

func (l *Loader) selectItemsForPath(path []FetchItemPathElement) []*astjson.Value {
// Use arena allocation for the initial items slice
items := arena.AllocateSlice[*astjson.Value](l.jsonArena, 1, 1)
Expand Down
Loading