diff --git a/execution/engine/federation_caching_test.go b/execution/engine/federation_caching_test.go index ae95a7ac01..57ea5572f5 100644 --- a/execution/engine/federation_caching_test.go +++ b/execution/engine/federation_caching_test.go @@ -4654,6 +4654,130 @@ func TestL1CacheRootFieldEntityListPopulation(t *testing.T) { }) } +func TestL1CacheRootFieldNonEntityWithNestedEntities(t *testing.T) { + // This test verifies L1 cache behavior when a root field returns a NON-entity type + // (Review) that contains nested entities (User via authorWithoutProvides). + // + // Key difference from TestL1CacheRootFieldEntityListPopulation: + // - That test starts with topProducts -> [Product] where Product IS an entity (@key(fields: "upc")) + // - This test starts with topReviews -> [Review] where Review is NOT an entity (no @key) + // - Both prove L1 entity caching works for nested User entities + // + // Query flow: + // 1. topReviews -> reviews subgraph (root query, returns [Review] — NOT an entity) + // 2. authorWithoutProvides -> accounts subgraph (entity fetch for Users, stored in L1) + // 3. sameUserReviewers -> reviews subgraph (after username resolved via @requires) + // 4. Entity resolution for sameUserReviewers -> accounts subgraph + // - All Users are 100% L1 HITs (already fetched in step 2) + // - THE ENTIRE ACCOUNTS CALL IS SKIPPED! + + query := `query { + topReviews { + body + authorWithoutProvides { + id + username + sameUserReviewers { + id + username + } + } + } + }` + + expectedResponse := `{"data":{"topReviews":[{"body":"A highly effective form of birth control.","authorWithoutProvides":{"id":"1234","username":"Me","sameUserReviewers":[{"id":"1234","username":"Me"}]}},{"body":"Fedoras are one of the most fashionable hats around and can look great with a variety of outfits.","authorWithoutProvides":{"id":"1234","username":"Me","sameUserReviewers":[{"id":"1234","username":"Me"}]}},{"body":"This is the last straw. Hat you will wear. 11/10","authorWithoutProvides":{"id":"7777","username":"User 7777","sameUserReviewers":[{"id":"7777","username":"User 7777"}]}},{"body":"Perfect summer hat.","authorWithoutProvides":{"id":"5678","username":"User 5678","sameUserReviewers":[{"id":"5678","username":"User 5678"}]}},{"body":"A bit too fancy for my taste.","authorWithoutProvides":{"id":"8888","username":"User 8888","sameUserReviewers":[{"id":"8888","username":"User 8888"}]}}]}}` + + t.Run("L1 enabled - sameUserReviewers fetch skipped via L1 cache", func(t *testing.T) { + tracker := newSubgraphCallTracker(http.DefaultTransport) + trackingClient := &http.Client{Transport: tracker} + + cachingOpts := resolve.CachingOptions{ + EnableL1Cache: true, + EnableL2Cache: false, + } + + setup := federationtesting.NewFederationSetup(addCachingGateway( + withCachingEnableART(false), + withHTTPClient(trackingClient), + withCachingOptionsFunc(cachingOpts), + )) + t.Cleanup(setup.Close) + + gqlClient := NewGraphqlClient(http.DefaultClient) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + // Extract hostnames + reviewsURLParsed, _ := url.Parse(setup.ReviewsUpstreamServer.URL) + accountsURLParsed, _ := url.Parse(setup.AccountsUpstreamServer.URL) + reviewsHost := reviewsURLParsed.Host + accountsHost := accountsURLParsed.Host + + tracker.Reset() + out, _ := gqlClient.QueryStringWithHeaders(ctx, setup.GatewayServer.URL, query, nil, t) + + assert.Equal(t, expectedResponse, string(out)) + + // Query flow with L1 enabled: + // 1. reviews subgraph: topReviews root query (Review is NOT an entity) + // 2. accounts subgraph: User entity fetch for authorWithoutProvides (Users stored in L1) + // 3. reviews subgraph: sameUserReviewers (returns [User] references) + // 4. sameUserReviewers entity resolution: all Users are L1 HITs → accounts call SKIPPED! + reviewsCalls := tracker.GetCount(reviewsHost) + accountsCalls := tracker.GetCount(accountsHost) + + assert.Equal(t, 2, reviewsCalls, "Should call reviews subgraph twice (topReviews + sameUserReviewers)") + // KEY ASSERTION: Only 1 accounts call! sameUserReviewers entity resolution skipped via L1. + assert.Equal(t, 1, accountsCalls, + "With L1 enabled: only 1 accounts call (sameUserReviewers entity fetch skipped via L1)") + }) + + t.Run("L1 disabled - more accounts calls without L1 optimization", func(t *testing.T) { + tracker := newSubgraphCallTracker(http.DefaultTransport) + trackingClient := &http.Client{Transport: tracker} + + cachingOpts := resolve.CachingOptions{ + EnableL1Cache: false, + EnableL2Cache: false, + } + + setup := federationtesting.NewFederationSetup(addCachingGateway( + withCachingEnableART(false), + withHTTPClient(trackingClient), + withCachingOptionsFunc(cachingOpts), + )) + t.Cleanup(setup.Close) + + gqlClient := NewGraphqlClient(http.DefaultClient) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + // Extract hostnames + reviewsURLParsed, _ := url.Parse(setup.ReviewsUpstreamServer.URL) + accountsURLParsed, _ := url.Parse(setup.AccountsUpstreamServer.URL) + reviewsHost := reviewsURLParsed.Host + accountsHost := accountsURLParsed.Host + + tracker.Reset() + out, _ := gqlClient.QueryStringWithHeaders(ctx, setup.GatewayServer.URL, query, nil, t) + + assert.Equal(t, expectedResponse, string(out)) + + // Query flow with L1 disabled: + // 1. reviews subgraph: topReviews root query + // 2. accounts subgraph: User entity fetch for authorWithoutProvides + // 3. reviews subgraph: sameUserReviewers + // 4. accounts subgraph: User entity fetch for sameUserReviewers (no L1 → must fetch again!) + reviewsCalls := tracker.GetCount(reviewsHost) + accountsCalls := tracker.GetCount(accountsHost) + + assert.Equal(t, 2, reviewsCalls, "Should call reviews subgraph twice") + // KEY ASSERTION: 2 accounts calls without L1 optimization + assert.Equal(t, 2, accountsCalls, + "With L1 disabled: 2 accounts calls (sameUserReviewers requires separate fetch)") + }) +} + // ============================================================================= // CACHE ERROR HANDLING TESTS // ============================================================================= diff --git a/execution/federationtesting/reviews/graph/generated/generated.go b/execution/federationtesting/reviews/graph/generated/generated.go index 032b4e8ae2..790a860169 100644 --- a/execution/federationtesting/reviews/graph/generated/generated.go +++ b/execution/federationtesting/reviews/graph/generated/generated.go @@ -83,6 +83,7 @@ type ComplexityRoot struct { Cat func(childComplexity int) int Me func(childComplexity int) int ReviewWithError func(childComplexity int) int + TopReviews func(childComplexity int) int __resolve__service func(childComplexity int) int __resolve_entities func(childComplexity int, representations []map[string]any) int } @@ -142,6 +143,7 @@ type QueryResolver interface { Me(ctx context.Context) (*model.User, error) Cat(ctx context.Context) (*model.Cat, error) ReviewWithError(ctx context.Context) (*model.Review, error) + TopReviews(ctx context.Context) ([]*model.Review, error) } type ReviewResolver interface { AuthorWithoutProvides(ctx context.Context, obj *model.Review) (*model.User, error) @@ -281,6 +283,13 @@ func (e *executableSchema) Complexity(ctx context.Context, typeName, field strin return e.complexity.Query.ReviewWithError(childComplexity), true + case "Query.topReviews": + if e.complexity.Query.TopReviews == nil { + break + } + + return e.complexity.Query.TopReviews(childComplexity), true + case "Query._service": if e.complexity.Query.__resolve__service == nil { break @@ -564,6 +573,10 @@ var sources = []*ast.Source{ # reviewWithError returns a review whose author (error-user) triggers an error in accounts subgraph. # Used for testing cache error handling - caches should NOT be populated on errors. reviewWithError: Review + # topReviews returns all reviews. Review is NOT an entity (no @key), + # but contains entities (author: User, product: Product). + # Used for testing L1 cache with non-entity root fields containing nested entities. + topReviews: [Review] } type Cat { @@ -1661,6 +1674,61 @@ func (ec *executionContext) fieldContext_Query_reviewWithError(_ context.Context return fc, nil } +func (ec *executionContext) _Query_topReviews(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_Query_topReviews(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (any, error) { + ctx = rctx // use context from middleware stack in children + return ec.resolvers.Query().TopReviews(rctx) + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + return graphql.Null + } + res := resTmp.([]*model.Review) + fc.Result = res + return ec.marshalOReview2ᚕᚖgithubᚗcomᚋwundergraphᚋgraphqlᚑgoᚑtoolsᚋexecutionᚋfederationtestingᚋreviewsᚋgraphᚋmodelᚐReview(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_Query_topReviews(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "Query", + Field: field, + IsMethod: true, + IsResolver: true, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + switch field.Name { + case "body": + return ec.fieldContext_Review_body(ctx, field) + case "author": + return ec.fieldContext_Review_author(ctx, field) + case "authorWithoutProvides": + return ec.fieldContext_Review_authorWithoutProvides(ctx, field) + case "product": + return ec.fieldContext_Review_product(ctx, field) + case "attachments": + return ec.fieldContext_Review_attachments(ctx, field) + case "comment": + return ec.fieldContext_Review_comment(ctx, field) + } + return nil, fmt.Errorf("no field named %q was found under type Review", field.Name) + }, + } + return fc, nil +} + func (ec *executionContext) _Query__entities(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) { fc, err := ec.fieldContext_Query__entities(ctx, field) if err != nil { @@ -5408,6 +5476,25 @@ func (ec *executionContext) _Query(ctx context.Context, sel ast.SelectionSet) gr func(ctx context.Context) graphql.Marshaler { return innerFunc(ctx, out) }) } + out.Concurrently(i, func(ctx context.Context) graphql.Marshaler { return rrm(innerCtx) }) + case "topReviews": + field := field + + innerFunc := func(ctx context.Context, _ *graphql.FieldSet) (res graphql.Marshaler) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + } + }() + res = ec._Query_topReviews(ctx, field) + return res + } + + rrm := func(ctx context.Context) graphql.Marshaler { + return ec.OperationContext.RootResolverMiddleware(ctx, + func(ctx context.Context) graphql.Marshaler { return innerFunc(ctx, out) }) + } + out.Concurrently(i, func(ctx context.Context) graphql.Marshaler { return rrm(innerCtx) }) case "_entities": field := field diff --git a/execution/federationtesting/reviews/graph/schema.graphqls b/execution/federationtesting/reviews/graph/schema.graphqls index be74180b87..6530f5fabc 100644 --- a/execution/federationtesting/reviews/graph/schema.graphqls +++ b/execution/federationtesting/reviews/graph/schema.graphqls @@ -4,6 +4,10 @@ type Query { # reviewWithError returns a review whose author (error-user) triggers an error in accounts subgraph. # Used for testing cache error handling - caches should NOT be populated on errors. reviewWithError: Review + # topReviews returns all reviews. Review is NOT an entity (no @key), + # but contains entities (author: User, product: Product). + # Used for testing L1 cache with non-entity root fields containing nested entities. + topReviews: [Review] } type Cat { diff --git a/execution/federationtesting/reviews/graph/schema.resolvers.go b/execution/federationtesting/reviews/graph/schema.resolvers.go index 77c1718801..3ee63e3a73 100644 --- a/execution/federationtesting/reviews/graph/schema.resolvers.go +++ b/execution/federationtesting/reviews/graph/schema.resolvers.go @@ -66,6 +66,14 @@ func (r *queryResolver) ReviewWithError(ctx context.Context) (*model.Review, err return errorReview, nil } +// TopReviews is the resolver for the topReviews field. +// Returns all reviews. Review is NOT an entity (no @key), but contains +// entities (author: User, product: Product). Used for L1 cache testing +// with non-entity root fields containing nested entities. +func (r *queryResolver) TopReviews(ctx context.Context) ([]*model.Review, error) { + return r.reviews, nil +} + // AuthorWithoutProvides is the resolver for the authorWithoutProvides field. // Returns the same Author as the regular author field, but without @provides directive // in the schema. This forces the gateway to fetch username from accounts subgraph. diff --git a/execution/go.mod b/execution/go.mod index cb6ba61f57..7ee66eab77 100644 --- a/execution/go.mod +++ b/execution/go.mod @@ -14,7 +14,7 @@ require ( github.com/sebdah/goldie/v2 v2.7.1 github.com/stretchr/testify v1.11.1 github.com/vektah/gqlparser/v2 v2.5.30 - github.com/wundergraph/astjson v1.0.0 + github.com/wundergraph/astjson v1.1.0 github.com/wundergraph/cosmo/composition-go v0.0.0-20241020204711-78f240a77c99 github.com/wundergraph/cosmo/router v0.0.0-20251013094319-c611abf26b17 github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.231 diff --git a/execution/go.sum b/execution/go.sum index 5ccbc08129..33cc7ad592 100644 --- a/execution/go.sum +++ b/execution/go.sum @@ -155,8 +155,8 @@ github.com/urfave/cli/v2 v2.27.7 h1:bH59vdhbjLv3LAvIu6gd0usJHgoTTPhCFib8qqOwXYU= github.com/urfave/cli/v2 v2.27.7/go.mod h1:CyNAG/xg+iAOg0N4MPGZqVmv2rCoP267496AOXUZjA4= github.com/vektah/gqlparser/v2 v2.5.30 h1:EqLwGAFLIzt1wpx1IPpY67DwUujF1OfzgEyDsLrN6kE= github.com/vektah/gqlparser/v2 v2.5.30/go.mod h1:D1/VCZtV3LPnQrcPBeR/q5jkSQIPti0uYCP/RI0gIeo= -github.com/wundergraph/astjson v1.0.0 h1:rETLJuQkMWWW03HCF6WBttEBOu8gi5vznj5KEUPVV2Q= -github.com/wundergraph/astjson v1.0.0/go.mod h1:h12D/dxxnedtLzsKyBLK7/Oe4TAoGpRVC9nDpDrZSWw= +github.com/wundergraph/astjson v1.1.0 h1:xORDosrZ87zQFJwNGe/HIHXqzpdHOFmqWgykCLVL040= +github.com/wundergraph/astjson v1.1.0/go.mod h1:h12D/dxxnedtLzsKyBLK7/Oe4TAoGpRVC9nDpDrZSWw= github.com/wundergraph/cosmo/composition-go v0.0.0-20241020204711-78f240a77c99 h1:TGXDYfDhwFLFTuNuCwkuqXT5aXGz47zcurXLfTBS9w4= github.com/wundergraph/cosmo/composition-go v0.0.0-20241020204711-78f240a77c99/go.mod h1:fUuOAUAXUFB/mlSkAaImGeE4A841AKR5dTMWhV4ibxI= github.com/wundergraph/cosmo/router v0.0.0-20251013094319-c611abf26b17 h1:GjO2E8LTf3U5JiQJCY4MmlRcAjVt7IvAbWFSgEjQdl8= diff --git a/v2/pkg/engine/resolve/caching.go b/v2/pkg/engine/resolve/caching.go index 2f2b58ca82..d8ae11fb8a 100644 --- a/v2/pkg/engine/resolve/caching.go +++ b/v2/pkg/engine/resolve/caching.go @@ -58,13 +58,15 @@ func (r *RootQueryCacheKeyTemplate) RenderCacheKeys(a arena.Arena, ctx *Context, if len(r.RootFields) == 0 { return nil, nil } - // Estimate capacity: one CacheKey per item - cacheKeys := arena.AllocateSlice[*CacheKey](a, 0, len(items)) + // Use heap slices for pointer-containing types (*CacheKey, string) because + // arena memory is backed by []byte (noscan) — GC cannot trace pointers stored + // in arena memory, which can cause premature collection of heap objects. + cacheKeys := make([]*CacheKey, 0, len(items)) jsonBytes := arena.AllocateSlice[byte](a, 0, 64) for _, item := range items { // Create KeyEntry for each root field - keyEntries := arena.AllocateSlice[string](a, 0, len(r.RootFields)) + keyEntries := make([]string, 0, len(r.RootFields)) for _, field := range r.RootFields { if len(r.EntityKeyMappings) > 0 { // Entity key mapping configured: use entity key format INSTEAD of root field key @@ -72,7 +74,7 @@ func (r *RootQueryCacheKeyTemplate) RenderCacheKeys(a arena.Arena, ctx *Context, entityKey, jsonBytesOut := r.renderDerivedEntityKey(a, ctx, jsonBytes, mapping, prefix) jsonBytes = jsonBytesOut if entityKey != "" { - keyEntries = arena.SliceAppend(a, keyEntries, entityKey) + keyEntries = append(keyEntries, entityKey) } // If entityKey is empty (missing arg), keyEntries stays empty → no caching } @@ -86,12 +88,12 @@ func (r *RootQueryCacheKeyTemplate) RenderCacheKeys(a arena.Arena, ctx *Context, tmp = arena.SliceAppend(a, tmp, unsafebytes.StringToBytes(prefix)...) tmp = arena.SliceAppend(a, tmp, []byte(`:`)...) tmp = arena.SliceAppend(a, tmp, unsafebytes.StringToBytes(key)...) - key = unsafebytes.BytesToString(tmp) + key = string(tmp) } - keyEntries = arena.SliceAppend(a, keyEntries, key) + keyEntries = append(keyEntries, key) } } - cacheKeys = arena.SliceAppend(a, cacheKeys, &CacheKey{ + cacheKeys = append(cacheKeys, &CacheKey{ Item: item, Keys: keyEntries, }) @@ -138,7 +140,7 @@ func (r *RootQueryCacheKeyTemplate) renderDerivedEntityKey(a arena.Arena, ctx *C slice = arena.SliceAppend(a, slice, []byte(`:`)...) } slice = arena.SliceAppend(a, slice, jsonBytes...) - return unsafebytes.BytesToString(slice), jsonBytes + return string(slice), jsonBytes } // renderField renders a single field cache key as JSON @@ -202,7 +204,7 @@ func (r *RootQueryCacheKeyTemplate) renderField(a arena.Arena, ctx *Context, ite jsonBytes = keyObj.MarshalTo(jsonBytes[:0]) slice := arena.AllocateSlice[byte](a, len(jsonBytes), len(jsonBytes)) copy(slice, jsonBytes) - return unsafebytes.BytesToString(slice), jsonBytes + return string(slice), jsonBytes } type EntityQueryCacheKeyTemplate struct { @@ -252,7 +254,9 @@ func (e *EntityQueryCacheKeyTemplate) RenderCacheKeys(a arena.Arena, ctx *Contex // Returns one cache key per item for entity queries with keys nested under "key". func (e *EntityQueryCacheKeyTemplate) renderCacheKeys(a arena.Arena, ctx *Context, items []*astjson.Value, keysTemplate *ResolvableObjectVariable, prefix string) ([]*CacheKey, error) { jsonBytes := arena.AllocateSlice[byte](a, 0, 64) - cacheKeys := arena.AllocateSlice[*CacheKey](a, 0, len(items)) + // Use heap slices for pointer-containing types — arena memory is noscan, + // so GC cannot trace pointers stored there, risking premature collection. + cacheKeys := make([]*CacheKey, 0, len(items)) for _, item := range items { if item == nil { @@ -308,10 +312,9 @@ func (e *EntityQueryCacheKeyTemplate) renderCacheKeys(a arena.Arena, ctx *Contex slice = arena.SliceAppend(a, slice, jsonBytes...) // Create KeyEntry with empty path for entity queries - keyEntries := arena.AllocateSlice[string](a, 0, 1) - keyEntries = arena.SliceAppend(a, keyEntries, unsafebytes.BytesToString(slice)) + keyEntries := []string{string(slice)} - cacheKeys = arena.SliceAppend(a, cacheKeys, &CacheKey{ + cacheKeys = append(cacheKeys, &CacheKey{ Item: item, Keys: keyEntries, }) diff --git a/v2/pkg/engine/resolve/loader.go b/v2/pkg/engine/resolve/loader.go index e552e9e476..895fc3c201 100644 --- a/v2/pkg/engine/resolve/loader.go +++ b/v2/pkg/engine/resolve/loader.go @@ -258,7 +258,7 @@ type Loader struct { // Value: *astjson.Value pointer to entity in jsonArena // Thread-safe via sync.Map for parallel fetch support. // Only used for entity fetches, NOT root fetches (root fields have no prior entity data). - l1Cache sync.Map + l1Cache *sync.Map } func (l *Loader) Free() { @@ -266,6 +266,8 @@ func (l *Loader) Free() { l.ctx = nil l.resolvable = nil l.taintedObjs = nil + l.l1Cache = nil + l.jsonArena = nil } func (l *Loader) LoadGraphQLResponseData(ctx *Context, response *GraphQLResponse, resolvable *Resolvable) (err error) { @@ -273,6 +275,7 @@ func (l *Loader) LoadGraphQLResponseData(ctx *Context, response *GraphQLResponse l.ctx = ctx l.info = response.Info l.taintedObjs = make(taintedObjects) + l.l1Cache = &sync.Map{} ctx.initCacheAnalytics() return l.resolveFetchNode(response.Fetches) } diff --git a/v2/pkg/engine/resolve/loader_arena_gc_test.go b/v2/pkg/engine/resolve/loader_arena_gc_test.go index 9e16e8e8a2..50e9af5517 100644 --- a/v2/pkg/engine/resolve/loader_arena_gc_test.go +++ b/v2/pkg/engine/resolve/loader_arena_gc_test.go @@ -8,6 +8,14 @@ import ( "net/http" "runtime" "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/wundergraph/astjson" + "github.com/wundergraph/go-arena" "github.com/wundergraph/graphql-go-tools/v2/pkg/ast" "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/datasource/httpclient" @@ -299,6 +307,109 @@ func Benchmark_ArenaGCSafety(b *testing.B) { return resp }, }, + { + // Codepath: L1 cache population — entity fetch with UseL1Cache stores + // arena-allocated *astjson.Value pointers in Loader.l1Cache (sync.Map). + // After ArenaResolveGraphQLResponse releases the arena, those pointers + // become dangling. runtime.GC() should detect them. + name: "l1CacheDanglingPointers", + resolverOpts: func() ResolverOptions { + return ResolverOptions{ + MaxConcurrency: 1024, + } + }, + setupCtx: func() *Context { + ctx := NewContext(context.Background()) + ctx.ExecutionOptions.Caching.EnableL1Cache = true + ctx.ExecutionOptions.DisableSubgraphRequestDeduplication = true + return ctx + }, + setupResp: func() *GraphQLResponse { + productCacheKeyTemplate := &EntityQueryCacheKeyTemplate{ + Keys: NewResolvableObjectVariable(&Object{ + Fields: []*Field{ + {Name: []byte("__typename"), Value: &String{Path: []string{"__typename"}}}, + {Name: []byte("id"), Value: &String{Path: []string{"id"}}}, + }, + }), + } + providesData := &Object{ + Fields: []*Field{ + {Name: []byte("id"), Value: &Scalar{Path: []string{"id"}}}, + {Name: []byte("name"), Value: &Scalar{Path: []string{"name"}}}, + }, + } + return &GraphQLResponse{ + Info: &GraphQLResponseInfo{OperationType: ast.OperationTypeQuery}, + Fetches: Sequence( + // Root fetch + SingleWithPath(&SingleFetch{ + FetchConfiguration: FetchConfiguration{ + DataSource: FakeDataSource(`{"data":{"product":{"__typename":"Product","id":"prod-1"}}}`), + PostProcessing: PostProcessingConfiguration{ + SelectResponseDataPath: []string{"data"}, + }, + }, + InputTemplate: InputTemplate{ + Segments: []TemplateSegment{ + {Data: []byte(`{"method":"POST","url":"http://root.service","body":{"query":"{product {__typename id}}"}}`), SegmentType: StaticSegmentType}, + }, + }, + DataSourceIdentifier: []byte("graphql_datasource.Source"), + }, "query"), + // Entity fetch — populates L1 cache with arena-allocated pointers + SingleWithPath(&SingleFetch{ + FetchConfiguration: FetchConfiguration{ + DataSource: FakeDataSource(`{"data":{"_entities":[{"__typename":"Product","id":"prod-1","name":"Product One"}]}}`), + PostProcessing: PostProcessingConfiguration{ + SelectResponseDataPath: []string{"data", "_entities", "0"}, + }, + Caching: FetchCacheConfiguration{ + Enabled: true, + CacheName: "default", + TTL: 30 * time.Second, + CacheKeyTemplate: productCacheKeyTemplate, + UseL1Cache: true, + }, + }, + InputTemplate: InputTemplate{ + Segments: []TemplateSegment{ + {Data: []byte(`{"method":"POST","url":"http://products.service","body":{"query":"...","variables":{"representations":[`), SegmentType: StaticSegmentType}, + {SegmentType: VariableSegmentType, VariableKind: ResolvableObjectVariableKind, Renderer: NewGraphQLVariableResolveRenderer(&Object{ + Fields: []*Field{ + {Name: []byte("__typename"), Value: &String{Path: []string{"__typename"}}}, + {Name: []byte("id"), Value: &String{Path: []string{"id"}}}, + }, + })}, + {Data: []byte(`]}}}`), SegmentType: StaticSegmentType}, + }, + }, + Info: &FetchInfo{ + DataSourceID: "products", + DataSourceName: "products", + OperationType: ast.OperationTypeQuery, + ProvidesData: providesData, + }, + DataSourceIdentifier: []byte("graphql_datasource.Source"), + }, "query.product", ObjectPath("product")), + ), + Data: &Object{ + Fields: []*Field{ + { + Name: []byte("product"), + Value: &Object{ + Path: []string{"product"}, + Fields: []*Field{ + {Name: []byte("id"), Value: &String{Path: []string{"id"}}}, + {Name: []byte("name"), Value: &String{Path: []string{"name"}}}, + }, + }, + }, + }, + }, + } + }, + }, } for _, tc := range cases { @@ -330,3 +441,197 @@ func Benchmark_ArenaGCSafety(b *testing.B) { }) } } + +// TestL1CacheStalePointersAfterArenaReset deterministically proves that L1 cache +// entries become stale when the arena is reset and reused. This is the root cause +// of the CI crash "found pointer to free object": the Loader's l1Cache (sync.Map) +// holds *astjson.Value pointers into arena memory that becomes invalid after +// resolveArenaPool.Release() resets the arena. +func TestL1CacheStalePointersAfterArenaReset(t *testing.T) { + // Shared entity fetch setup — same as l1_cache_test.go + productCacheKeyTemplate := &EntityQueryCacheKeyTemplate{ + Keys: NewResolvableObjectVariable(&Object{ + Fields: []*Field{ + {Name: []byte("__typename"), Value: &String{Path: []string{"__typename"}}}, + {Name: []byte("id"), Value: &String{Path: []string{"id"}}}, + }, + }), + } + providesData := &Object{ + Fields: []*Field{ + {Name: []byte("id"), Value: &Scalar{Path: []string{"id"}}}, + {Name: []byte("name"), Value: &Scalar{Path: []string{"name"}}}, + }, + } + + // buildResponse creates a GraphQLResponse with a root fetch + entity fetch that populates L1 cache. + buildResponse := func(rootDS, entityDS DataSource) *GraphQLResponse { + return &GraphQLResponse{ + Info: &GraphQLResponseInfo{OperationType: ast.OperationTypeQuery}, + Fetches: Sequence( + SingleWithPath(&SingleFetch{ + FetchConfiguration: FetchConfiguration{ + DataSource: rootDS, + PostProcessing: PostProcessingConfiguration{ + SelectResponseDataPath: []string{"data"}, + }, + }, + InputTemplate: InputTemplate{ + Segments: []TemplateSegment{ + {Data: []byte(`{"method":"POST","url":"http://root.service","body":{"query":"{product {__typename id}}"}}`), SegmentType: StaticSegmentType}, + }, + }, + DataSourceIdentifier: []byte("graphql_datasource.Source"), + }, "query"), + SingleWithPath(&SingleFetch{ + FetchConfiguration: FetchConfiguration{ + DataSource: entityDS, + PostProcessing: PostProcessingConfiguration{ + SelectResponseDataPath: []string{"data", "_entities", "0"}, + }, + Caching: FetchCacheConfiguration{ + Enabled: true, + CacheName: "default", + TTL: 30 * time.Second, + CacheKeyTemplate: productCacheKeyTemplate, + UseL1Cache: true, + }, + }, + InputTemplate: InputTemplate{ + Segments: []TemplateSegment{ + {Data: []byte(`{"method":"POST","url":"http://products.service","body":{"query":"...","variables":{"representations":[`), SegmentType: StaticSegmentType}, + {SegmentType: VariableSegmentType, VariableKind: ResolvableObjectVariableKind, Renderer: NewGraphQLVariableResolveRenderer(&Object{ + Fields: []*Field{ + {Name: []byte("__typename"), Value: &String{Path: []string{"__typename"}}}, + {Name: []byte("id"), Value: &String{Path: []string{"id"}}}, + }, + })}, + {Data: []byte(`]}}}`), SegmentType: StaticSegmentType}, + }, + }, + Info: &FetchInfo{ + DataSourceID: "products", + DataSourceName: "products", + OperationType: ast.OperationTypeQuery, + ProvidesData: providesData, + }, + DataSourceIdentifier: []byte("graphql_datasource.Source"), + }, "query.product", ObjectPath("product")), + ), + Data: &Object{ + Fields: []*Field{ + { + Name: []byte("product"), + Value: &Object{ + Path: []string{"product"}, + Fields: []*Field{ + {Name: []byte("id"), Value: &String{Path: []string{"id"}}}, + {Name: []byte("name"), Value: &String{Path: []string{"name"}}}, + }, + }, + }, + }, + }, + } + } + + t.Run("stale pointers after arena reset", func(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + rootDS := NewMockDataSource(ctrl) + rootDS.EXPECT().Load(gomock.Any(), gomock.Any(), gomock.Any()). + Return([]byte(`{"data":{"product":{"__typename":"Product","id":"prod-1"}}}`), nil).Times(1) + + entityDS := NewMockDataSource(ctrl) + entityDS.EXPECT().Load(gomock.Any(), gomock.Any(), gomock.Any()). + Return([]byte(`{"data":{"_entities":[{"__typename":"Product","id":"prod-1","name":"Product One"}]}}`), nil).Times(1) + + response := buildResponse(rootDS, entityDS) + + ar := arena.NewMonotonicArena(arena.WithMinBufferSize(4096)) + loader := &Loader{jsonArena: ar} + + ctx := NewContext(context.Background()) + ctx.ExecutionOptions.DisableSubgraphRequestDeduplication = true + ctx.ExecutionOptions.Caching.EnableL1Cache = true + + resolvable := NewResolvable(ar, ResolvableOptions{}) + err := resolvable.Init(ctx, nil, ast.OperationTypeQuery) + require.NoError(t, err) + + err = loader.LoadGraphQLResponseData(ctx, response, resolvable) + require.NoError(t, err) + + // Verify L1 cache was populated with correct data + var cacheCount int + var originalBytes []byte + loader.l1Cache.Range(func(key, value any) bool { + cacheCount++ + originalBytes = value.(*astjson.Value).MarshalTo(nil) + return true + }) + require.Equal(t, 1, cacheCount, "entity fetch should populate exactly 1 L1 cache entry") + assert.Contains(t, string(originalBytes), `Product One`) + + // Simulate arena reuse after resolveArenaPool.Release(): + // Reset zeroes the offset (same as Pool.Release → Arena.Reset) + ar.Reset() + // A subsequent request reuses the arena, overwriting old allocations + _, _ = astjson.ParseBytesWithArena(ar, []byte(`{"__typename":"Product","id":"STALE","name":"CORRUPTED DATA"}`)) + + // The l1Cache still holds pointers into the arena buffer. + // Those pointers now reference the overwritten memory → stale data. + var staleBytes []byte + loader.l1Cache.Range(func(key, value any) bool { + staleBytes = value.(*astjson.Value).MarshalTo(nil) + return true + }) + assert.NotEqual(t, string(originalBytes), string(staleBytes), + "L1 cache entries should be stale after arena reset+reuse — "+ + "this proves the bug: l1Cache holds dangling pointers into reused arena memory") + }) + + t.Run("Free prevents stale pointer access", func(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + rootDS := NewMockDataSource(ctrl) + rootDS.EXPECT().Load(gomock.Any(), gomock.Any(), gomock.Any()). + Return([]byte(`{"data":{"product":{"__typename":"Product","id":"prod-1"}}}`), nil).Times(1) + + entityDS := NewMockDataSource(ctrl) + entityDS.EXPECT().Load(gomock.Any(), gomock.Any(), gomock.Any()). + Return([]byte(`{"data":{"_entities":[{"__typename":"Product","id":"prod-1","name":"Product One"}]}}`), nil).Times(1) + + response := buildResponse(rootDS, entityDS) + + ar := arena.NewMonotonicArena(arena.WithMinBufferSize(4096)) + loader := &Loader{jsonArena: ar} + + ctx := NewContext(context.Background()) + ctx.ExecutionOptions.DisableSubgraphRequestDeduplication = true + ctx.ExecutionOptions.Caching.EnableL1Cache = true + + resolvable := NewResolvable(ar, ResolvableOptions{}) + err := resolvable.Init(ctx, nil, ast.OperationTypeQuery) + require.NoError(t, err) + + err = loader.LoadGraphQLResponseData(ctx, response, resolvable) + require.NoError(t, err) + + // Verify L1 cache was populated + var cacheCount int + loader.l1Cache.Range(func(key, value any) bool { + cacheCount++ + return true + }) + require.Equal(t, 1, cacheCount, "entity fetch should populate exactly 1 L1 cache entry") + + // The fix: Free() nils l1Cache before arena release + loader.Free() + assert.Nil(t, loader.l1Cache, + "Free() must nil l1Cache to sever all references to arena-allocated values — "+ + "this prevents the GC crash when the arena is released and reused") + }) +} diff --git a/v2/pkg/engine/resolve/loader_cache.go b/v2/pkg/engine/resolve/loader_cache.go index ef212bb717..351500d737 100644 --- a/v2/pkg/engine/resolve/loader_cache.go +++ b/v2/pkg/engine/resolve/loader_cache.go @@ -44,7 +44,7 @@ func (l *Loader) extractCacheKeysStrings(a arena.Arena, cacheKeys []*CacheKey) [ keyLen := len(keyStr) key := arena.AllocateSlice[byte](a, 0, keyLen) key = arena.SliceAppend(a, key, unsafebytes.StringToBytes(keyStr)...) - out = arena.SliceAppend(a, out, unsafebytes.BytesToString(key)) + out = arena.SliceAppend(a, out, string(key)) } } return out @@ -75,7 +75,9 @@ func (l *Loader) populateFromCache(a arena.Arena, cacheKeys []*CacheKey, entries // For each CacheKey, creates entries for all its KeyEntries with the same value // If includePrefix is true and subgraphName is provided, keys are prefixed with the subgraph header hash. func (l *Loader) cacheKeysToEntries(a arena.Arena, cacheKeys []*CacheKey) ([]*CacheEntry, error) { - out := arena.AllocateSlice[*CacheEntry](a, 0, len(cacheKeys)) + // Use heap slice for []*CacheEntry — arena memory is noscan, so GC cannot + // trace *CacheEntry pointers stored there, risking premature collection. + out := make([]*CacheEntry, 0, len(cacheKeys)) buf := arena.AllocateSlice[byte](a, 64, 64) seen := make(map[string]struct{}, len(cacheKeys)) for i := range cacheKeys { @@ -102,7 +104,7 @@ func (l *Loader) cacheKeysToEntries(a arena.Arena, cacheKeys []*CacheKey) ([]*Ca Value: arena.AllocateSlice[byte](a, len(buf), len(buf)), } copy(entry.Value, buf) - out = arena.SliceAppend(a, out, entry) + out = append(out, entry) } } return out, nil diff --git a/v2/pkg/engine/resolve/resolve.go b/v2/pkg/engine/resolve/resolve.go index 44bdc483f8..258de42a81 100644 --- a/v2/pkg/engine/resolve/resolve.go +++ b/v2/pkg/engine/resolve/resolve.go @@ -403,10 +403,16 @@ func (r *Resolver) ArenaResolveGraphQLResponse(ctx *Context, response *GraphQLRe // we're intentionally not using defer Release to have more control over the timing (see below) t := newTools(r.options, r.allowedErrorExtensionFields, r.allowedErrorFields, r.subgraphRequestSingleFlight, resolveArena.Arena) + releaseResolveArena := func() { + t.resolvable.Reset() + t.loader.Free() + r.resolveArenaPool.Release(resolveArena) + } + err = t.resolvable.Init(ctx, nil, response.Info.OperationType) if err != nil { r.inboundRequestSingleFlight.FinishErr(inflight, err) - r.resolveArenaPool.Release(resolveArena) + releaseResolveArena() return nil, err } @@ -414,7 +420,7 @@ func (r *Resolver) ArenaResolveGraphQLResponse(ctx *Context, response *GraphQLRe err = t.loader.LoadGraphQLResponseData(ctx, response, t.resolvable) if err != nil { r.inboundRequestSingleFlight.FinishErr(inflight, err) - r.resolveArenaPool.Release(resolveArena) + releaseResolveArena() return nil, err } } @@ -425,7 +431,7 @@ func (r *Resolver) ArenaResolveGraphQLResponse(ctx *Context, response *GraphQLRe err = t.resolvable.Resolve(ctx.ctx, response.Data, response.Fetches, buf) if err != nil { r.inboundRequestSingleFlight.FinishErr(inflight, err) - r.resolveArenaPool.Release(resolveArena) + releaseResolveArena() r.responseBufferPool.Release(responseArena) return nil, err } @@ -433,7 +439,7 @@ func (r *Resolver) ArenaResolveGraphQLResponse(ctx *Context, response *GraphQLRe // first release resolverArena // all data is resolved and written into the response arena - r.resolveArenaPool.Release(resolveArena) + releaseResolveArena() // next we write back to the client // this includes flushing and syscalls // as such, it can take some time @@ -601,9 +607,13 @@ func (r *Resolver) executeSubscriptionUpdate(resolveCtx *Context, sub *sub, shar resolveArena := r.resolveArenaPool.Acquire(resolveCtx.Request.ID) t := newTools(r.options, r.allowedErrorExtensionFields, r.allowedErrorFields, r.subgraphRequestSingleFlight, resolveArena.Arena) + defer func() { + t.resolvable.Reset() + t.loader.Free() + r.resolveArenaPool.Release(resolveArena) + }() if err := t.resolvable.InitSubscription(resolveCtx, input, sub.resolve.Trigger.PostProcessing); err != nil { - r.resolveArenaPool.Release(resolveArena) r.asyncErrorWriter.WriteError(resolveCtx, err, sub.resolve.Response, sub.writer) if r.options.Debug { fmt.Printf("resolver:trigger:subscription:init:failed:%d\n", sub.id.SubscriptionID) @@ -615,7 +625,6 @@ func (r *Resolver) executeSubscriptionUpdate(resolveCtx *Context, sub *sub, shar } if err := t.loader.LoadGraphQLResponseData(resolveCtx, sub.resolve.Response, t.resolvable); err != nil { - r.resolveArenaPool.Release(resolveArena) r.asyncErrorWriter.WriteError(resolveCtx, err, sub.resolve.Response, sub.writer) if r.options.Debug { fmt.Printf("resolver:trigger:subscription:load:failed:%d\n", sub.id.SubscriptionID) @@ -627,7 +636,6 @@ func (r *Resolver) executeSubscriptionUpdate(resolveCtx *Context, sub *sub, shar } if err := t.resolvable.Resolve(resolveCtx.ctx, sub.resolve.Response.Data, sub.resolve.Response.Fetches, sub.writer); err != nil { - r.resolveArenaPool.Release(resolveArena) r.asyncErrorWriter.WriteError(resolveCtx, err, sub.resolve.Response, sub.writer) if r.options.Debug { fmt.Printf("resolver:trigger:subscription:resolve:failed:%d\n", sub.id.SubscriptionID) @@ -638,8 +646,6 @@ func (r *Resolver) executeSubscriptionUpdate(resolveCtx *Context, sub *sub, shar return } - r.resolveArenaPool.Release(resolveArena) - if err := sub.writer.Flush(); err != nil { // If flush fails (e.g. client disconnected), remove the subscription. _ = r.AsyncUnsubscribeSubscription(sub.id) @@ -714,9 +720,12 @@ func (r *Resolver) handleTriggerEntityCache(config *triggerEntityCacheConfig, da // We need a temporary resolvable to parse the subscription data and extract entity items. resolveArena := r.resolveArenaPool.Acquire(config.resolveCtx.Request.ID) - defer r.resolveArenaPool.Release(resolveArena) - t := newTools(r.options, r.allowedErrorExtensionFields, r.allowedErrorFields, r.subgraphRequestSingleFlight, resolveArena.Arena) + defer func() { + t.resolvable.Reset() + t.loader.Free() + r.resolveArenaPool.Release(resolveArena) + }() if err := t.resolvable.InitSubscription(config.resolveCtx, data, config.postProcess); err != nil { return }