diff --git a/execution/engine/federation_caching_ext_invalidation_helpers_test.go b/execution/engine/federation_caching_ext_invalidation_helpers_test.go new file mode 100644 index 0000000000..a3d32ecebb --- /dev/null +++ b/execution/engine/federation_caching_ext_invalidation_helpers_test.go @@ -0,0 +1,313 @@ +package engine_test + +import ( + "context" + "encoding/json" + "maps" + "net/http" + "net/http/httptest" + "strconv" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/wundergraph/graphql-go-tools/execution/engine" + "github.com/wundergraph/graphql-go-tools/execution/federationtesting" + accounts "github.com/wundergraph/graphql-go-tools/execution/federationtesting/accounts/graph" + products "github.com/wundergraph/graphql-go-tools/execution/federationtesting/products/graph" + reviews "github.com/wundergraph/graphql-go-tools/execution/federationtesting/reviews/graph" + "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/plan" + "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve" +) + +// Standard queries and keys used by all extensions cache invalidation tests. +const ( + extInvEntityQuery = `query { topProducts { name reviews { body authorWithoutProvides { username } } } }` + extInvMutationQuery = `mutation { updateUsername(id: "1234", newUsername: "UpdatedMe") { id username } }` + extInvUserKey = `{"__typename":"User","key":{"id":"1234"}}` + + // Expected gateway responses (exact). + entityResponseMe = `{"data":{"topProducts":[{"name":"Trilby","reviews":[{"body":"A highly effective form of birth control.","authorWithoutProvides":{"username":"Me"}}]},{"name":"Fedora","reviews":[{"body":"Fedoras are one of the most fashionable hats around and can look great with a variety of outfits.","authorWithoutProvides":{"username":"Me"}}]}]}}` + entityResponseUpdated = `{"data":{"topProducts":[{"name":"Trilby","reviews":[{"body":"A highly effective form of birth control.","authorWithoutProvides":{"username":"UpdatedMe"}}]},{"name":"Fedora","reviews":[{"body":"Fedoras are one of the most fashionable hats around and can look great with a variety of outfits.","authorWithoutProvides":{"username":"UpdatedMe"}}]}]}}` + mutationResponse = `{"data":{"updateUsername":{"id":"1234","username":"UpdatedMe"}}}` + entitiesSubgraphRespMe = `{"data":{"_entities":[{"__typename":"User","username":"Me"}]}}` +) + +// injectCacheInvalidation injects a raw JSON cacheInvalidation object into a subgraph +// response's extensions field and returns the modified response body. +func injectCacheInvalidation(t *testing.T, body []byte, cacheInvalidationJSON string) []byte { + t.Helper() + var resp map[string]json.RawMessage + require.NoError(t, json.Unmarshal(body, &resp)) + resp["extensions"] = json.RawMessage(`{"cacheInvalidation":` + cacheInvalidationJSON + `}`) + modified, err := json.Marshal(resp) + require.NoError(t, err) + return modified +} + +// injectErrorsAndCacheInvalidation injects both errors and cacheInvalidation extensions +// into a subgraph response body. Used to test that invalidation runs even when errors are present. +func injectErrorsAndCacheInvalidation(t *testing.T, body []byte, errorsJSON string, cacheInvalidationJSON string) []byte { + t.Helper() + var resp map[string]json.RawMessage + require.NoError(t, json.Unmarshal(body, &resp)) + resp["errors"] = json.RawMessage(errorsJSON) + resp["extensions"] = json.RawMessage(`{"cacheInvalidation":` + cacheInvalidationJSON + `}`) + modified, err := json.Marshal(resp) + require.NoError(t, err) + return modified +} + +// subgraphResponseInterceptor wraps a subgraph HTTP handler and applies a modifier +// function to every response body when set. When modifier is nil, responses pass through. +type subgraphResponseInterceptor struct { + handler http.Handler + mu sync.RWMutex + modifier func(body []byte) []byte +} + +func newSubgraphResponseInterceptor(handler http.Handler) *subgraphResponseInterceptor { + return &subgraphResponseInterceptor{handler: handler} +} + +func (s *subgraphResponseInterceptor) SetModifier(fn func(body []byte) []byte) { + s.mu.Lock() + defer s.mu.Unlock() + s.modifier = fn +} + +func (s *subgraphResponseInterceptor) ClearModifier() { + s.mu.Lock() + defer s.mu.Unlock() + s.modifier = nil +} + +func (s *subgraphResponseInterceptor) ServeHTTP(w http.ResponseWriter, r *http.Request) { + s.mu.RLock() + mod := s.modifier + s.mu.RUnlock() + + if mod == nil { + s.handler.ServeHTTP(w, r) + return + } + + rec := httptest.NewRecorder() + s.handler.ServeHTTP(rec, r) + + modified := mod(rec.Body.Bytes()) + + maps.Copy(w.Header(), rec.Header()) + w.Header().Set("Content-Length", strconv.Itoa(len(modified))) + w.WriteHeader(rec.Code) + _, _ = w.Write(modified) +} + +// newFederationSetupWithInterceptor creates a FederationSetup where the accounts subgraph +// is wrapped with the response interceptor. +func newFederationSetupWithInterceptor( + interceptor *subgraphResponseInterceptor, + gatewayFn func(*federationtesting.FederationSetup) *httptest.Server, +) *federationtesting.FederationSetup { + accountsServer := httptest.NewServer(interceptor) + productsServer := httptest.NewServer(products.GraphQLEndpointHandler(products.TestOptions)) + reviewsServer := httptest.NewServer(reviews.GraphQLEndpointHandler(reviews.TestOptions)) + + setup := &federationtesting.FederationSetup{ + AccountsUpstreamServer: accountsServer, + ProductsUpstreamServer: productsServer, + ReviewsUpstreamServer: reviewsServer, + } + + setup.GatewayServer = gatewayFn(setup) + return setup +} + +// --------------------------------------------------------------------------- +// extInvalidationEnv — test environment for extensions cache invalidation tests +// --------------------------------------------------------------------------- + +type extInvalidationOption func(*extInvalidationConfig) + +type extInvalidationConfig struct { + mutationCacheInvalidationField string + headerPrefixHash uint64 + useHeaderPrefix bool + l2KeyInterceptor func(ctx context.Context, key string, info resolve.L2CacheKeyInterceptorInfo) string + enableAnalytics bool +} + +// withMutationCacheInvalidation enables the config-based MutationCacheInvalidation +// mechanism for the given mutation field (e.g. "updateUsername"). +func withMutationCacheInvalidation(fieldName string) extInvalidationOption { + return func(c *extInvalidationConfig) { + c.mutationCacheInvalidationField = fieldName + } +} + +// withHeaderPrefix enables IncludeSubgraphHeaderPrefix on the User entity config +// and sets up a mockSubgraphHeadersBuilder with the given hash for "accounts". +func withHeaderPrefix(hash uint64) extInvalidationOption { + return func(c *extInvalidationConfig) { + c.useHeaderPrefix = true + c.headerPrefixHash = hash + } +} + +// withExtInvAnalytics enables cache analytics collection on the gateway, +// allowing tests to assert on MutationEvent and other analytics data. +func withExtInvAnalytics() extInvalidationOption { + return func(c *extInvalidationConfig) { + c.enableAnalytics = true + } +} + +// withL2KeyInterceptor sets an L2CacheKeyInterceptor on the caching options. +func withExtInvL2KeyInterceptor(fn func(ctx context.Context, key string, info resolve.L2CacheKeyInterceptorInfo) string) extInvalidationOption { + return func(c *extInvalidationConfig) { + c.l2KeyInterceptor = fn + } +} + +type extInvalidationEnv struct { + t *testing.T + cache *FakeLoaderCache + tracker *subgraphCallTracker + interceptor *subgraphResponseInterceptor + setup *federationtesting.FederationSetup + gqlClient *GraphqlClient + accountsHost string + ctx context.Context +} + +// newExtInvalidationEnv creates a fully wired test environment for extensions +// cache invalidation E2E tests. All boilerplate (cache, tracker, interceptor, +// federation setup, gateway, cleanup) is handled here. +func newExtInvalidationEnv(t *testing.T, opts ...extInvalidationOption) *extInvalidationEnv { + t.Helper() + + accounts.ResetUsers() + t.Cleanup(accounts.ResetUsers) + + var cfg extInvalidationConfig + for _, opt := range opts { + opt(&cfg) + } + + // Build entity cache config. + entityCfg := plan.EntityCacheConfiguration{ + TypeName: "User", + CacheName: "default", + TTL: 30 * time.Second, + IncludeSubgraphHeaderPrefix: cfg.useHeaderPrefix, + } + + subgraphCfg := engine.SubgraphCachingConfig{ + SubgraphName: "accounts", + EntityCaching: plan.EntityCacheConfigurations{entityCfg}, + } + if cfg.mutationCacheInvalidationField != "" { + subgraphCfg.MutationCacheInvalidation = plan.MutationCacheInvalidationConfigurations{ + {FieldName: cfg.mutationCacheInvalidationField}, + } + } + + cachingOpts := resolve.CachingOptions{EnableL2Cache: true} + if cfg.enableAnalytics { + cachingOpts.EnableCacheAnalytics = true + } + if cfg.l2KeyInterceptor != nil { + cachingOpts.L2CacheKeyInterceptor = cfg.l2KeyInterceptor + } + + cache := NewFakeLoaderCache() + caches := map[string]resolve.LoaderCache{"default": cache} + tracker := newSubgraphCallTracker(http.DefaultTransport) + trackingClient := &http.Client{Transport: tracker} + interceptor := newSubgraphResponseInterceptor(accounts.GraphQLEndpointHandler(accounts.TestOptions)) + + gatewayOpts := []cachingGatewayOptionsToFunc{ + withCachingEnableART(false), + withCachingLoaderCache(caches), + withHTTPClient(trackingClient), + withCachingOptionsFunc(cachingOpts), + withSubgraphEntityCachingConfigs(engine.SubgraphCachingConfigs{subgraphCfg}), + } + if cfg.useHeaderPrefix { + gatewayOpts = append(gatewayOpts, withSubgraphHeadersBuilder(&mockSubgraphHeadersBuilder{ + hashes: map[string]uint64{"accounts": cfg.headerPrefixHash}, + })) + } + + setup := newFederationSetupWithInterceptor(interceptor, addCachingGateway(gatewayOpts...)) + t.Cleanup(setup.Close) + + return &extInvalidationEnv{ + t: t, + cache: cache, + tracker: tracker, + interceptor: interceptor, + setup: setup, + gqlClient: NewGraphqlClient(http.DefaultClient), + accountsHost: mustParseHost(setup.AccountsUpstreamServer.URL), + ctx: t.Context(), + } +} + +// resetCounters resets the subgraph call tracker and clears the cache operation log. +func (e *extInvalidationEnv) resetCounters() { + e.tracker.Reset() + e.cache.ClearLog() +} + +// queryEntity sends the standard entity query, resets counters first. +func (e *extInvalidationEnv) queryEntity() string { + e.t.Helper() + e.resetCounters() + return string(e.gqlClient.QueryString(e.ctx, e.setup.GatewayServer.URL, extInvEntityQuery, nil, e.t)) +} + +// mutate sends the standard mutation, resets counters first. +func (e *extInvalidationEnv) mutate() string { + e.t.Helper() + e.resetCounters() + return string(e.gqlClient.QueryString(e.ctx, e.setup.GatewayServer.URL, extInvMutationQuery, nil, e.t)) +} + +// mutateWithHeaders sends the standard mutation and returns both the response body +// and HTTP headers (for cache analytics inspection). Resets counters first. +func (e *extInvalidationEnv) mutateWithHeaders() (string, http.Header) { + e.t.Helper() + e.resetCounters() + resp, headers := e.gqlClient.QueryStringWithHeaders(e.ctx, e.setup.GatewayServer.URL, extInvMutationQuery, nil, e.t) + return string(resp), headers +} + +// onAccountsResponse sets a modifier on the accounts subgraph interceptor. +func (e *extInvalidationEnv) onAccountsResponse(fn func(body []byte) []byte) { + e.interceptor.SetModifier(fn) +} + +// clearModifier removes the interceptor modifier. +func (e *extInvalidationEnv) clearModifier() { + e.interceptor.ClearModifier() +} + +// cacheLog returns the current cache log with keys sorted for deterministic comparison. +func (e *extInvalidationEnv) cacheLog() []CacheLogEntry { + return sortCacheLogKeys(e.cache.GetLog()) +} + +// accountsCalls returns the number of HTTP calls made to the accounts subgraph. +func (e *extInvalidationEnv) accountsCalls() int { + return e.tracker.GetCount(e.accountsHost) +} + +// deleteFromCache manually deletes keys from the L2 cache. +func (e *extInvalidationEnv) deleteFromCache(keys ...string) { + e.t.Helper() + err := e.cache.Delete(e.ctx, keys) + require.NoError(e.t, err) +} diff --git a/execution/engine/federation_caching_ext_invalidation_test.go b/execution/engine/federation_caching_ext_invalidation_test.go new file mode 100644 index 0000000000..00eaac1dc1 --- /dev/null +++ b/execution/engine/federation_caching_ext_invalidation_test.go @@ -0,0 +1,447 @@ +package engine_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve" +) + +func TestFederationCaching_ExtensionsInvalidation(t *testing.T) { + t.Run("mutation with extensions invalidation clears L2 cache", func(t *testing.T) { + // Verify that a mutation response with cacheInvalidation extensions + // deletes the corresponding L2 cache entry, forcing a re-fetch. + env := newExtInvalidationEnv(t) + + // Step 1: Query populates L2 cache. + resp := env.queryEntity() + assert.Equal(t, entityResponseMe, resp) + assert.Equal(t, 1, env.accountsCalls(), "first request fetches from accounts") + assert.Equal(t, sortCacheLogKeys([]CacheLogEntry{ + {Operation: "get", Keys: []string{extInvUserKey}, Hits: []bool{false}}, // L2 empty on first request + {Operation: "set", Keys: []string{extInvUserKey}}, // populate L2 after fetch + }), env.cacheLog()) + + // Step 2: Same query — L2 hit, no subgraph call. + resp = env.queryEntity() + assert.Equal(t, entityResponseMe, resp) + assert.Equal(t, 0, env.accountsCalls(), "L2 cache hit") + assert.Equal(t, sortCacheLogKeys([]CacheLogEntry{ + {Operation: "get", Keys: []string{extInvUserKey}, Hits: []bool{true}}, // L2 hit from Step 1 + }), env.cacheLog()) + + // Step 3: Mutation with cacheInvalidation extensions deletes User:1234. + env.onAccountsResponse(func(body []byte) []byte { + assert.Equal(t, mutationResponse, string(body)) + return injectCacheInvalidation(t, body, + `{"keys":[{"typename":"User","key":{"id":"1234"}}]}`) + }) + mutResp := env.mutate() + assert.Equal(t, mutationResponse, mutResp) + env.clearModifier() + assert.Equal(t, sortCacheLogKeys([]CacheLogEntry{ + {Operation: "delete", Keys: []string{extInvUserKey}}, // extensions-based invalidation + }), env.cacheLog()) + + // Step 4: Re-query — L2 miss after invalidation, fetches updated username. + resp = env.queryEntity() + assert.Equal(t, entityResponseUpdated, resp) + assert.Equal(t, 1, env.accountsCalls(), "re-fetched after invalidation") + assert.Equal(t, sortCacheLogKeys([]CacheLogEntry{ + {Operation: "get", Keys: []string{extInvUserKey}, Hits: []bool{false}}, // L2 miss because Step 3 deleted it + {Operation: "set", Keys: []string{extInvUserKey}}, // re-populate L2 after re-fetch + }), env.cacheLog()) + }) + + t.Run("invalidation of entity not in cache is a no-op", func(t *testing.T) { + // Invalidating a different entity (User:9999) should not affect + // the cached entity (User:1234). + env := newExtInvalidationEnv(t) + + // Populate cache with User:1234. + env.queryEntity() + + // Mutation invalidates User:9999 (never cached). + user9999Key := `{"__typename":"User","key":{"id":"9999"}}` + env.onAccountsResponse(func(body []byte) []byte { + assert.Equal(t, mutationResponse, string(body)) + return injectCacheInvalidation(t, body, + `{"keys":[{"typename":"User","key":{"id":"9999"}}]}`) + }) + mutResp := env.mutate() + assert.Equal(t, mutationResponse, mutResp) + env.clearModifier() + assert.Equal(t, sortCacheLogKeys([]CacheLogEntry{ + {Operation: "delete", Keys: []string{user9999Key}}, // delete called even though entry doesn't exist + }), env.cacheLog()) + + // User:1234 should still be cached (unaffected by User:9999 invalidation). + resp := env.queryEntity() + assert.Equal(t, entityResponseMe, resp) + assert.Equal(t, 0, env.accountsCalls(), "User:1234 still cached") + assert.Equal(t, sortCacheLogKeys([]CacheLogEntry{ + {Operation: "get", Keys: []string{extInvUserKey}, Hits: []bool{true}}, // User:1234 still in L2 + }), env.cacheLog()) + }) + + t.Run("multiple entities invalidated in single response", func(t *testing.T) { + // A single mutation response can invalidate multiple entities at once. + env := newExtInvalidationEnv(t) + + // Populate cache with User:1234. + env.queryEntity() + + // Mutation invalidates both User:1234 and User:2345 in one response. + env.onAccountsResponse(func(body []byte) []byte { + assert.Equal(t, mutationResponse, string(body)) + return injectCacheInvalidation(t, body, + `{"keys":[{"typename":"User","key":{"id":"1234"}},{"typename":"User","key":{"id":"2345"}}]}`) + }) + env.mutate() + env.clearModifier() + assert.Equal(t, sortCacheLogKeys([]CacheLogEntry{ + {Operation: "delete", Keys: []string{ + `{"__typename":"User","key":{"id":"1234"}}`, + `{"__typename":"User","key":{"id":"2345"}}`, + }}, // both entities deleted in single batch + }), env.cacheLog()) + + // User:1234 must be re-fetched after invalidation. + env.queryEntity() + assert.Equal(t, 1, env.accountsCalls(), "re-fetched after invalidation") + assert.Equal(t, sortCacheLogKeys([]CacheLogEntry{ + {Operation: "get", Keys: []string{extInvUserKey}, Hits: []bool{false}}, // L2 miss because mutation deleted it + {Operation: "set", Keys: []string{extInvUserKey}}, // re-populate L2 + }), env.cacheLog()) + }) + + t.Run("mutation without extensions does not delete", func(t *testing.T) { + // A mutation without cacheInvalidation extensions should not + // trigger any cache deletes — cached data survives. + env := newExtInvalidationEnv(t) + + // Populate cache. + env.queryEntity() + + // Verify cache hit. + resp := env.queryEntity() + assert.Equal(t, entityResponseMe, resp) + assert.Equal(t, 0, env.accountsCalls(), "L2 cache hit") + + // Mutation WITHOUT extensions — no cache operations. + env.mutate() + assert.Equal(t, sortCacheLogKeys([]CacheLogEntry{}), env.cacheLog(), "no cache operations for mutation without extensions") + + // Cache should still be valid. + resp = env.queryEntity() + assert.Equal(t, entityResponseMe, resp) + assert.Equal(t, 0, env.accountsCalls(), "cache still valid") + assert.Equal(t, sortCacheLogKeys([]CacheLogEntry{ + {Operation: "get", Keys: []string{extInvUserKey}, Hits: []bool{true}}, // L2 still valid + }), env.cacheLog()) + }) + + t.Run("coexistence with detectMutationEntityImpact", func(t *testing.T) { + // When BOTH config-based MutationCacheInvalidation AND extensions-based + // invalidation target the same key, the delete should be deduplicated + // to a single cache.Delete() call. + env := newExtInvalidationEnv(t, withMutationCacheInvalidation("updateUsername")) + + // Populate cache. + env.queryEntity() + assert.Equal(t, 1, env.accountsCalls()) + + // Verify cache hit. + env.queryEntity() + assert.Equal(t, 0, env.accountsCalls(), "L2 cache hit") + + // Mutation triggers BOTH mechanisms on User:1234. + env.onAccountsResponse(func(body []byte) []byte { + assert.Equal(t, mutationResponse, string(body)) + return injectCacheInvalidation(t, body, + `{"keys":[{"typename":"User","key":{"id":"1234"}}]}`) + }) + env.mutate() + env.clearModifier() + assert.Equal(t, sortCacheLogKeys([]CacheLogEntry{ + {Operation: "delete", Keys: []string{extInvUserKey}}, // deduplicated: detectMutationEntityImpact fires, extensions-based skipped + }), env.cacheLog(), "single delete despite both mechanisms targeting same key") + + // Cache invalidated — query should re-fetch. + env.queryEntity() + assert.Equal(t, 1, env.accountsCalls(), "re-fetched after combined invalidation") + }) + + t.Run("query response triggers invalidation", func(t *testing.T) { + // Cache invalidation via extensions is NOT restricted to mutations. + // A query (e.g. _entities) response can also carry invalidation extensions. + env := newExtInvalidationEnv(t) + + // Step 1: Populate L2 cache. + resp := env.queryEntity() + assert.Equal(t, entityResponseMe, resp) + assert.Equal(t, 1, env.accountsCalls()) + + // Step 2: Verify cache hit. + env.queryEntity() + assert.Equal(t, 0, env.accountsCalls(), "L2 cache hit") + + // Step 3: Manually delete cache entry, then inject invalidation into the + // _entities query response. This proves invalidation works on queries too. + env.deleteFromCache(extInvUserKey) + env.onAccountsResponse(func(body []byte) []byte { + assert.Equal(t, entitiesSubgraphRespMe, string(body)) + return injectCacheInvalidation(t, body, + `{"keys":[{"typename":"User","key":{"id":"1234"}}]}`) + }) + + resp = env.queryEntity() + assert.Equal(t, entityResponseMe, resp) + assert.Equal(t, 1, env.accountsCalls(), "re-fetched after manual delete") + env.clearModifier() + + // Extensions-based delete is skipped because updateL2Cache will set the same + // key with fresh data — only get(miss) + set remain. + assert.Equal(t, sortCacheLogKeys([]CacheLogEntry{ + {Operation: "get", Keys: []string{extInvUserKey}, Hits: []bool{false}}, // L2 miss because we manually deleted it + {Operation: "set", Keys: []string{extInvUserKey}}, // re-populate L2 (delete skipped: same key about to be set) + }), env.cacheLog()) + }) + + t.Run("with subgraph header prefix", func(t *testing.T) { + // When IncludeSubgraphHeaderPrefix is enabled, cache keys include a + // hash prefix (e.g. "55555:"). Invalidation must use the same prefix. + env := newExtInvalidationEnv(t, withHeaderPrefix(55555)) + prefixedKey := `55555:` + extInvUserKey + + // Populate cache (keys include header prefix). + env.queryEntity() + assert.Equal(t, 1, env.accountsCalls()) + assert.Equal(t, sortCacheLogKeys([]CacheLogEntry{ + {Operation: "get", Keys: []string{prefixedKey}, Hits: []bool{false}}, // L2 miss, prefixed key + {Operation: "set", Keys: []string{prefixedKey}}, // populate L2 with prefixed key + }), env.cacheLog()) + + // Verify cache hit. + env.queryEntity() + assert.Equal(t, 0, env.accountsCalls(), "L2 cache hit") + assert.Equal(t, sortCacheLogKeys([]CacheLogEntry{ + {Operation: "get", Keys: []string{prefixedKey}, Hits: []bool{true}}, // L2 hit with prefixed key + }), env.cacheLog()) + + // Mutation with extensions invalidation. + env.onAccountsResponse(func(body []byte) []byte { + assert.Equal(t, mutationResponse, string(body)) + return injectCacheInvalidation(t, body, + `{"keys":[{"typename":"User","key":{"id":"1234"}}]}`) + }) + env.mutate() + env.clearModifier() + assert.Equal(t, sortCacheLogKeys([]CacheLogEntry{ + {Operation: "delete", Keys: []string{prefixedKey}}, // delete key includes header prefix + }), env.cacheLog()) + + // Cache invalidated — re-fetch. + env.queryEntity() + assert.Equal(t, 1, env.accountsCalls(), "re-fetched after invalidation") + assert.Equal(t, sortCacheLogKeys([]CacheLogEntry{ + {Operation: "get", Keys: []string{prefixedKey}, Hits: []bool{false}}, // L2 miss after delete + {Operation: "set", Keys: []string{prefixedKey}}, // re-populate L2 + }), env.cacheLog()) + }) + + t.Run("with L2CacheKeyInterceptor", func(t *testing.T) { + // When an L2CacheKeyInterceptor is configured, cache keys are transformed + // (e.g. "tenant-X:" prefix). Invalidation must use the same transformation. + env := newExtInvalidationEnv(t, withExtInvL2KeyInterceptor( + func(_ context.Context, key string, _ resolve.L2CacheKeyInterceptorInfo) string { + return "tenant-X:" + key + }, + )) + interceptedKey := `tenant-X:` + extInvUserKey + + // Populate cache (keys include interceptor prefix). + env.queryEntity() + assert.Equal(t, 1, env.accountsCalls()) + assert.Equal(t, sortCacheLogKeys([]CacheLogEntry{ + {Operation: "get", Keys: []string{interceptedKey}, Hits: []bool{false}}, // L2 miss, intercepted key + {Operation: "set", Keys: []string{interceptedKey}}, // populate L2 with intercepted key + }), env.cacheLog()) + + // Verify cache hit. + env.queryEntity() + assert.Equal(t, 0, env.accountsCalls(), "L2 cache hit") + assert.Equal(t, sortCacheLogKeys([]CacheLogEntry{ + {Operation: "get", Keys: []string{interceptedKey}, Hits: []bool{true}}, // L2 hit with intercepted key + }), env.cacheLog()) + + // Mutation with extensions invalidation. + env.onAccountsResponse(func(body []byte) []byte { + assert.Equal(t, mutationResponse, string(body)) + return injectCacheInvalidation(t, body, + `{"keys":[{"typename":"User","key":{"id":"1234"}}]}`) + }) + env.mutate() + env.clearModifier() + assert.Equal(t, sortCacheLogKeys([]CacheLogEntry{ + {Operation: "delete", Keys: []string{interceptedKey}}, // delete key includes interceptor prefix + }), env.cacheLog()) + + // Cache invalidated — re-fetch. + env.queryEntity() + assert.Equal(t, 1, env.accountsCalls(), "re-fetched after invalidation") + assert.Equal(t, sortCacheLogKeys([]CacheLogEntry{ + {Operation: "get", Keys: []string{interceptedKey}, Hits: []bool{false}}, // L2 miss after delete + {Operation: "set", Keys: []string{interceptedKey}}, // re-populate L2 + }), env.cacheLog()) + }) + + // ------------------------------------------------------------------------- + // Error handling: cache invalidation must run even when errors are present. + // ------------------------------------------------------------------------- + + t.Run("error response with invalidation extensions still invalidates cache", func(t *testing.T) { + // When a mutation returns BOTH errors AND extensions.cacheInvalidation, + // the cache invalidation should still run despite the errors. + env := newExtInvalidationEnv(t) + + // Populate L2 cache. + resp := env.queryEntity() + assert.Equal(t, entityResponseMe, resp) + assert.Equal(t, 1, env.accountsCalls()) + + // Verify cache hit. + resp = env.queryEntity() + assert.Equal(t, entityResponseMe, resp) + assert.Equal(t, 0, env.accountsCalls(), "L2 cache hit") + + // Mutation returns errors alongside cacheInvalidation extensions. + env.onAccountsResponse(func(body []byte) []byte { + return injectErrorsAndCacheInvalidation(t, body, + `[{"message":"partial error"}]`, + `{"keys":[{"typename":"User","key":{"id":"1234"}}]}`) + }) + env.mutate() + env.clearModifier() + + // Cache should be invalidated despite errors in response. + assert.Equal(t, sortCacheLogKeys([]CacheLogEntry{ + {Operation: "delete", Keys: []string{extInvUserKey}}, // invalidation runs despite errors + }), env.cacheLog()) + + // Re-query — L2 miss after invalidation, re-fetches updated data. + resp = env.queryEntity() + assert.Equal(t, entityResponseUpdated, resp) + assert.Equal(t, 1, env.accountsCalls(), "re-fetched after invalidation") + }) + + // ------------------------------------------------------------------------- + // Analytics: MutationEvent correctness with cache invalidation. + // ------------------------------------------------------------------------- + + t.Run("coexistence with analytics reports correct staleness", func(t *testing.T) { + // When both config-based and extensions-based invalidation target the same + // entity, analytics should correctly report the entity was cached and stale. + env := newExtInvalidationEnv(t, + withMutationCacheInvalidation("updateUsername"), + withExtInvAnalytics(), + ) + + // Populate L2 cache with User:1234 (username="Me"). + env.queryEntity() + assert.Equal(t, 1, env.accountsCalls()) + + // Mutation with BOTH mechanisms targeting User:1234. + env.onAccountsResponse(func(body []byte) []byte { + assert.Equal(t, mutationResponse, string(body)) + return injectCacheInvalidation(t, body, + `{"keys":[{"typename":"User","key":{"id":"1234"}}]}`) + }) + mutResp, headers := env.mutateWithHeaders() + assert.Equal(t, mutationResponse, mutResp) + env.clearModifier() + + // Analytics should report correct staleness detection. + snap := normalizeSnapshot(parseCacheAnalytics(t, headers)) + require.Equal(t, 1, len(snap.MutationEvents), "should have exactly 1 mutation impact event") + + event := snap.MutationEvents[0] + assert.Equal(t, normalizeSnapshot(resolve.CacheAnalyticsSnapshot{ + FieldHashes: []resolve.EntityFieldHash{ + // Hash of "UpdatedMe" (post-mutation username) + {EntityType: "User", FieldName: "username", FieldHash: 16932466035575627600, KeyRaw: `{"id":"1234"}`}, + }, + EntityTypes: []resolve.EntityTypeInfo{ + {TypeName: "User", Count: 1, UniqueKeys: 1}, // Mutation returned 1 User entity + }, + MutationEvents: []resolve.MutationEvent{ + { + MutationRootField: "updateUsername", + EntityType: "User", + EntityCacheKey: extInvUserKey, + HadCachedValue: true, // L2 had cached value from prior query + IsStale: true, // Cached "Me" differs from fresh "UpdatedMe" + CachedHash: event.CachedHash, + FreshHash: event.FreshHash, + CachedBytes: event.CachedBytes, + FreshBytes: event.FreshBytes, + }, + }, + }), snap) + + // Verify dedup still works — single delete despite both mechanisms. + assert.Equal(t, sortCacheLogKeys([]CacheLogEntry{ + {Operation: "get", Keys: []string{extInvUserKey}, Hits: []bool{true}}, // analytics reads cached value before delete + {Operation: "delete", Keys: []string{extInvUserKey}}, // config-based delete (extensions-based skipped via dedup) + }), env.cacheLog(), "analytics read before delete, single delete despite both mechanisms") + }) + + t.Run("analytics without prior cache reports no-cache event", func(t *testing.T) { + // When mutation triggers invalidation but entity was never cached, + // MutationEvent should show HadCachedValue=false, IsStale=false. + env := newExtInvalidationEnv(t, + withMutationCacheInvalidation("updateUsername"), + withExtInvAnalytics(), + ) + + // No prior query — L2 cache is empty. + // Mutation with extensions invalidation targeting User:1234. + env.onAccountsResponse(func(body []byte) []byte { + assert.Equal(t, mutationResponse, string(body)) + return injectCacheInvalidation(t, body, + `{"keys":[{"typename":"User","key":{"id":"1234"}}]}`) + }) + mutResp, headers := env.mutateWithHeaders() + assert.Equal(t, mutationResponse, mutResp) + env.clearModifier() + + // Analytics should report no cached value. + snap := normalizeSnapshot(parseCacheAnalytics(t, headers)) + require.Equal(t, 1, len(snap.MutationEvents), "should have exactly 1 mutation impact event") + + event := snap.MutationEvents[0] + assert.Equal(t, normalizeSnapshot(resolve.CacheAnalyticsSnapshot{ + FieldHashes: []resolve.EntityFieldHash{ + // Hash of "UpdatedMe" (post-mutation username) + {EntityType: "User", FieldName: "username", FieldHash: 16932466035575627600, KeyRaw: `{"id":"1234"}`}, + }, + EntityTypes: []resolve.EntityTypeInfo{ + {TypeName: "User", Count: 1, UniqueKeys: 1}, // Mutation returned 1 User entity + }, + MutationEvents: []resolve.MutationEvent{ + { + MutationRootField: "updateUsername", + EntityType: "User", + EntityCacheKey: extInvUserKey, + HadCachedValue: false, // No prior query, L2 cache was empty + IsStale: false, // Cannot be stale without a cached value to compare + FreshHash: event.FreshHash, + FreshBytes: event.FreshBytes, + }, + }, + }), snap) + }) +} diff --git a/execution/federationtesting/gateway/gateway.go b/execution/federationtesting/gateway/gateway.go index fa98add19a..6d3664f979 100644 --- a/execution/federationtesting/gateway/gateway.go +++ b/execution/federationtesting/gateway/gateway.go @@ -79,6 +79,31 @@ func WithSubgraphEntityCachingConfigs(configs engine.SubgraphCachingConfigs) Gat } } +// buildEntityCacheConfigs converts SubgraphCachingConfigs into the runtime lookup map +// needed by the resolver for extensions-based cache invalidation. +// Only EntityCaching entries are processed — RootFieldCaching uses a different key format +// and is not eligible for extensions-based invalidation. +func buildEntityCacheConfigs(configs engine.SubgraphCachingConfigs) map[string]map[string]*resolve.EntityCacheInvalidationConfig { + if len(configs) == 0 { + return nil + } + result := make(map[string]map[string]*resolve.EntityCacheInvalidationConfig, len(configs)) + for _, sc := range configs { + if len(sc.EntityCaching) == 0 { + continue + } + entityMap := make(map[string]*resolve.EntityCacheInvalidationConfig, len(sc.EntityCaching)) + for _, ec := range sc.EntityCaching { + entityMap[ec.TypeName] = &resolve.EntityCacheInvalidationConfig{ + CacheName: ec.CacheName, + IncludeSubgraphHeaderPrefix: ec.IncludeSubgraphHeaderPrefix, + } + } + result[sc.SubgraphName] = entityMap + } + return result +} + func (g *Gateway) ServeHTTP(w http.ResponseWriter, r *http.Request) { g.mu.Lock() handler := g.gqlHandler @@ -110,8 +135,9 @@ func (g *Gateway) UpdateDataSources(subgraphsConfigs []engine.SubgraphConfigurat } executionEngine, err := engine.NewExecutionEngine(ctx, g.logger, engineConfig, resolve.ResolverOptions{ - MaxConcurrency: 1024, - Caches: g.loaderCaches, + MaxConcurrency: 1024, + Caches: g.loaderCaches, + EntityCacheConfigs: buildEntityCacheConfigs(g.subgraphEntityCachingConfigs), }) if err != nil { g.logger.Error("create engine: %v", log.Error(err)) diff --git a/v2/pkg/engine/resolve/extensions_cache_invalidation_helpers_test.go b/v2/pkg/engine/resolve/extensions_cache_invalidation_helpers_test.go new file mode 100644 index 0000000000..a3a90f5975 --- /dev/null +++ b/v2/pkg/engine/resolve/extensions_cache_invalidation_helpers_test.go @@ -0,0 +1,289 @@ +package resolve + +import ( + "context" + "net/http" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" + + "github.com/wundergraph/go-arena" + + "github.com/wundergraph/graphql-go-tools/v2/pkg/ast" + "github.com/wundergraph/graphql-go-tools/v2/pkg/fastjsonext" +) + +// --------------------------------------------------------------------------- +// Schema building blocks for User entity tests +// --------------------------------------------------------------------------- + +// newUserCacheKeyTemplate returns a cache key template for User entities with @key(fields: "id"). +func newUserCacheKeyTemplate() *EntityQueryCacheKeyTemplate { + return &EntityQueryCacheKeyTemplate{ + Keys: NewResolvableObjectVariable(&Object{ + Fields: []*Field{ + {Name: []byte("__typename"), Value: &String{Path: []string{"__typename"}}}, + {Name: []byte("id"), Value: &String{Path: []string{"id"}}}, + }, + }), + } +} + +// newUserProvidesData describes the fields provided by a User entity fetch. +func newUserProvidesData() *Object { + return &Object{ + Fields: []*Field{ + {Name: []byte("id"), Value: &Scalar{Path: []string{"id"}, Nullable: false}}, + {Name: []byte("username"), Value: &Scalar{Path: []string{"username"}, Nullable: false}}, + }, + } +} + +// newUserEntityFetchSegments returns the input template segments for a User _entities fetch. +func newUserEntityFetchSegments() []TemplateSegment { + return []TemplateSegment{ + { + Data: []byte(`{"method":"POST","url":"http://accounts.service","body":{"query":"query($representations: [_Any!]!){_entities(representations: $representations){__typename ... on User {id username}}}","variables":{"representations":[`), + SegmentType: StaticSegmentType, + }, + { + SegmentType: VariableSegmentType, + VariableKind: ResolvableObjectVariableKind, + Renderer: NewGraphQLVariableResolveRenderer(&Object{ + Fields: []*Field{ + {Name: []byte("__typename"), Value: &String{Path: []string{"__typename"}}}, + {Name: []byte("id"), Value: &String{Path: []string{"id"}}}, + }, + }), + }, + { + Data: []byte(`]}}}`), + SegmentType: StaticSegmentType, + }, + } +} + +// --------------------------------------------------------------------------- +// extInvOption — functional options for extInvEnv configuration +// --------------------------------------------------------------------------- + +type extInvOption func(*extInvConfig) + +type extInvConfig struct { + enableHeaderPrefix bool + headerHash uint64 + l2KeyInterceptor func(context.Context, string, L2CacheKeyInterceptorInfo) string + disableL2 bool +} + +// withExtInvHeaderPrefix enables IncludeSubgraphHeaderPrefix on the entity cache config +// and fetch configuration, and sets up a mockSubgraphHeadersBuilder with the given hash. +func withExtInvHeaderPrefix(hash uint64) extInvOption { + return func(c *extInvConfig) { + c.enableHeaderPrefix = true + c.headerHash = hash + } +} + +// withExtInvInterceptor sets an L2CacheKeyInterceptor on the caching options. +func withExtInvInterceptor(fn func(context.Context, string, L2CacheKeyInterceptorInfo) string) extInvOption { + return func(c *extInvConfig) { + c.l2KeyInterceptor = fn + } +} + +// withExtInvL2Disabled disables L2 caching. +func withExtInvL2Disabled() extInvOption { + return func(c *extInvConfig) { + c.disableL2 = true + } +} + +// --------------------------------------------------------------------------- +// extInvEnv — test environment for extensions cache invalidation unit tests +// --------------------------------------------------------------------------- + +// extInvEnv encapsulates all test infrastructure for a single invalidation test. +// Tests only need to specify the entity response (with/without extensions) and +// any configuration options — all boilerplate is handled here. +type extInvEnv struct { + t *testing.T + loader *Loader + ctx *Context + response *GraphQLResponse + cache *FakeLoaderCache +} + +// newExtInvEnv creates a standard test environment: one root fetch returning +// User:1, one entity fetch returning the given entityResponse. +func newExtInvEnv(t *testing.T, entityResponse string, opts ...extInvOption) *extInvEnv { + t.Helper() + + var cfg extInvConfig + for _, opt := range opts { + opt(&cfg) + } + + ctrl := gomock.NewController(t) + t.Cleanup(ctrl.Finish) + + cache := NewFakeLoaderCache() + + rootDS := NewMockDataSource(ctrl) + rootDS.EXPECT(). + Load(gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(func(_ context.Context, _ any, _ []byte) ([]byte, error) { + return []byte(`{"data":{"user":{"__typename":"User","id":"1"}}}`), nil + }).Times(1) + + entityDS := NewMockDataSource(ctrl) + entityDS.EXPECT(). + Load(gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(func(_ context.Context, _ any, _ []byte) ([]byte, error) { + return []byte(entityResponse), nil + }).Times(1) + + response := &GraphQLResponse{ + Info: &GraphQLResponseInfo{OperationType: ast.OperationTypeQuery}, + Fetches: Sequence( + SingleWithPath(&SingleFetch{ + FetchConfiguration: FetchConfiguration{ + DataSource: rootDS, + PostProcessing: PostProcessingConfiguration{ + SelectResponseDataPath: []string{"data"}, + }, + }, + InputTemplate: InputTemplate{ + Segments: []TemplateSegment{ + {Data: []byte(`{"method":"POST","url":"http://root.service","body":{"query":"{user {__typename id}}"}}`), SegmentType: StaticSegmentType}, + }, + }, + DataSourceIdentifier: []byte("graphql_datasource.Source"), + }, "query"), + SingleWithPath(&SingleFetch{ + FetchConfiguration: FetchConfiguration{ + DataSource: entityDS, + PostProcessing: PostProcessingConfiguration{ + SelectResponseDataPath: []string{"data", "_entities", "0"}, + }, + Caching: FetchCacheConfiguration{ + Enabled: true, + CacheName: "default", + TTL: 30 * time.Second, + CacheKeyTemplate: newUserCacheKeyTemplate(), + UseL1Cache: true, + IncludeSubgraphHeaderPrefix: cfg.enableHeaderPrefix, + }, + }, + InputTemplate: InputTemplate{Segments: newUserEntityFetchSegments()}, + Info: &FetchInfo{ + DataSourceID: "accounts", + DataSourceName: "accounts", + OperationType: ast.OperationTypeQuery, + ProvidesData: newUserProvidesData(), + }, + DataSourceIdentifier: []byte("graphql_datasource.Source"), + }, "query.user", ObjectPath("user")), + ), + Data: &Object{ + Fields: []*Field{ + { + Name: []byte("user"), + Value: &Object{ + Path: []string{"user"}, + Fields: []*Field{ + {Name: []byte("id"), Value: &String{Path: []string{"id"}}}, + {Name: []byte("username"), Value: &String{Path: []string{"username"}}}, + }, + }, + }, + }, + }, + } + + loader := &Loader{ + caches: map[string]LoaderCache{"default": cache}, + entityCacheConfigs: map[string]map[string]*EntityCacheInvalidationConfig{ + "accounts": { + "User": {CacheName: "default", IncludeSubgraphHeaderPrefix: cfg.enableHeaderPrefix}, + }, + }, + } + + ctx := NewContext(t.Context()) + ctx.ExecutionOptions.DisableSubgraphRequestDeduplication = true + ctx.ExecutionOptions.Caching.EnableL1Cache = true + ctx.ExecutionOptions.Caching.EnableL2Cache = !cfg.disableL2 + + if cfg.enableHeaderPrefix { + ctx.SubgraphHeadersBuilder = &mockSubgraphHeadersBuilder{ + hashes: map[string]uint64{"accounts": cfg.headerHash}, + } + } + if cfg.l2KeyInterceptor != nil { + ctx.ExecutionOptions.Caching.L2CacheKeyInterceptor = cfg.l2KeyInterceptor + } + + return &extInvEnv{ + t: t, + loader: loader, + ctx: ctx, + response: response, + cache: cache, + } +} + +// run executes the loader and returns the GraphQL response string. +func (e *extInvEnv) run() string { + e.t.Helper() + ar := arena.NewMonotonicArena(arena.WithMinBufferSize(1024)) + resolvable := NewResolvable(ar, ResolvableOptions{}) + err := resolvable.Init(e.ctx, nil, ast.OperationTypeQuery) + require.NoError(e.t, err) + + err = e.loader.LoadGraphQLResponseData(e.ctx, e.response, resolvable) + require.NoError(e.t, err) + + return fastjsonext.PrintGraphQLResponse(resolvable.data, resolvable.errors) +} + +// deleteKeys returns all keys that were passed to cache.Delete() calls. +func (e *extInvEnv) deleteKeys() []string { + var keys []string + for _, entry := range e.cache.GetLog() { + if entry.Operation == "delete" { + keys = append(keys, entry.Keys...) + } + } + return keys +} + +// hasDeletes returns true if any cache.Delete() calls were recorded. +func (e *extInvEnv) hasDeletes() bool { + for _, entry := range e.cache.GetLog() { + if entry.Operation == "delete" { + return true + } + } + return false +} + +// --------------------------------------------------------------------------- +// mockSubgraphHeadersBuilder — test mock for SubgraphHeadersBuilder +// --------------------------------------------------------------------------- + +type mockSubgraphHeadersBuilder struct { + hashes map[string]uint64 +} + +func (m *mockSubgraphHeadersBuilder) HeadersForSubgraph(subgraphName string) (http.Header, uint64) { + return nil, m.hashes[subgraphName] +} + +func (m *mockSubgraphHeadersBuilder) HashAll() uint64 { + return 0 +} + +var _ SubgraphHeadersBuilder = (*mockSubgraphHeadersBuilder)(nil) diff --git a/v2/pkg/engine/resolve/extensions_cache_invalidation_test.go b/v2/pkg/engine/resolve/extensions_cache_invalidation_test.go new file mode 100644 index 0000000000..439897ca56 --- /dev/null +++ b/v2/pkg/engine/resolve/extensions_cache_invalidation_test.go @@ -0,0 +1,196 @@ +package resolve + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestExtensionsCacheInvalidation(t *testing.T) { + // ------------------------------------------------------------------------- + // Delete-before-set optimization: when the invalidated entity is the SAME + // entity being fetched, the L2 delete is skipped because updateL2Cache + // will immediately set it with fresh data. + // ------------------------------------------------------------------------- + + t.Run("same entity fetched and invalidated — delete skipped", func(t *testing.T) { + // User:1 is fetched AND invalidated in the same response. + // updateL2Cache will set User:1, so the delete is redundant and skipped. + env := newExtInvEnv(t, + `{"data":{"_entities":[{"__typename":"User","id":"1","username":"Alice"}]},"extensions":{"cacheInvalidation":{"keys":[{"typename":"User","key":{"id":"1"}}]}}}`, + ) + env.run() + assert.False(t, env.hasDeletes(), "delete skipped — same key about to be set by updateL2Cache") + }) + + t.Run("same entity with header prefix — delete still skipped", func(t *testing.T) { + // Same optimization applies even when keys are prefixed (e.g. "33333:User:1"). + // Both the invalidation key and the L2 set key go through the same prefix transform. + env := newExtInvEnv(t, + `{"data":{"_entities":[{"__typename":"User","id":"1","username":"Alice"}]},"extensions":{"cacheInvalidation":{"keys":[{"typename":"User","key":{"id":"1"}}]}}}`, + withExtInvHeaderPrefix(33333), + ) + env.run() + assert.False(t, env.hasDeletes(), "delete skipped — prefixed key also about to be set") + }) + + t.Run("same entity with L2CacheKeyInterceptor — delete still skipped", func(t *testing.T) { + // Same optimization applies when keys are transformed by an interceptor. + env := newExtInvEnv(t, + `{"data":{"_entities":[{"__typename":"User","id":"1","username":"Alice"}]},"extensions":{"cacheInvalidation":{"keys":[{"typename":"User","key":{"id":"1"}}]}}}`, + withExtInvInterceptor(func(_ context.Context, key string, _ L2CacheKeyInterceptorInfo) string { + return "tenant-X:" + key + }), + ) + env.run() + assert.False(t, env.hasDeletes(), "delete skipped — intercepted key also about to be set") + }) + + t.Run("same entity with both prefix and interceptor — delete still skipped", func(t *testing.T) { + // Both transforms applied: prefix + interceptor. Delete is still redundant. + env := newExtInvEnv(t, + `{"data":{"_entities":[{"__typename":"User","id":"1","username":"Alice"}]},"extensions":{"cacheInvalidation":{"keys":[{"typename":"User","key":{"id":"1"}}]}}}`, + withExtInvHeaderPrefix(33333), + withExtInvInterceptor(func(_ context.Context, key string, _ L2CacheKeyInterceptorInfo) string { + return "tenant-X:" + key + }), + ) + env.run() + assert.False(t, env.hasDeletes(), "delete skipped — both prefix and interceptor applied, key still about to be set") + }) + + // ------------------------------------------------------------------------- + // Different entity invalidated: the delete MUST happen because the key + // being invalidated is NOT the same key being set by updateL2Cache. + // ------------------------------------------------------------------------- + + t.Run("different entity invalidated — only that entity deleted", func(t *testing.T) { + // Invalidation targets User:1 (same as fetched → skipped) AND User:2 (different → deleted). + // This proves the optimization is per-key, not all-or-nothing. + env := newExtInvEnv(t, + `{"data":{"_entities":[{"__typename":"User","id":"1","username":"Alice"}]},"extensions":{"cacheInvalidation":{"keys":[{"typename":"User","key":{"id":"1"}},{"typename":"User","key":{"id":"2"}}]}}}`, + ) + env.run() + + deleteKeys := env.deleteKeys() + require.Len(t, deleteKeys, 1, "User:1 skipped (about to be set), User:2 deleted") + assert.Equal(t, `{"__typename":"User","key":{"id":"2"}}`, deleteKeys[0]) + }) + + t.Run("composite key fields — different key shape is not skipped", func(t *testing.T) { + // Invalidation key has composite fields {id:"1", orgId:"42"} which differs + // from the fetched entity key {id:"1"}. No match → delete happens. + env := newExtInvEnv(t, + `{"data":{"_entities":[{"__typename":"User","id":"1","username":"Alice"}]},"extensions":{"cacheInvalidation":{"keys":[{"typename":"User","key":{"id":"1","orgId":"42"}}]}}}`, + ) + env.run() + + deleteKeys := env.deleteKeys() + require.Len(t, deleteKeys, 1, "composite key differs from fetch key — delete not skipped") + assert.Equal(t, `{"__typename":"User","key":{"id":"1","orgId":"42"}}`, deleteKeys[0]) + }) + + // ------------------------------------------------------------------------- + // No-op cases: various scenarios where no delete should happen. + // ------------------------------------------------------------------------- + + t.Run("no extensions in response — no delete", func(t *testing.T) { + // Response has no extensions at all. Nothing to invalidate. + env := newExtInvEnv(t, + `{"data":{"_entities":[{"__typename":"User","id":"1","username":"Alice"}]}}`, + ) + env.run() + assert.False(t, env.hasDeletes(), "no extensions → no invalidation") + }) + + t.Run("extensions without cacheInvalidation key — no delete", func(t *testing.T) { + // Extensions present but contain only tracing data, not cacheInvalidation. + env := newExtInvEnv(t, + `{"data":{"_entities":[{"__typename":"User","id":"1","username":"Alice"}]},"extensions":{"tracing":{"version":1}}}`, + ) + env.run() + assert.False(t, env.hasDeletes(), "no cacheInvalidation key → no invalidation") + }) + + t.Run("empty keys array — no delete", func(t *testing.T) { + // cacheInvalidation present but keys array is empty. + env := newExtInvEnv(t, + `{"data":{"_entities":[{"__typename":"User","id":"1","username":"Alice"}]},"extensions":{"cacheInvalidation":{"keys":[]}}}`, + ) + env.run() + assert.False(t, env.hasDeletes(), "empty keys array → no invalidation") + }) + + t.Run("unknown typename — silently skipped, no delete", func(t *testing.T) { + // Typename "UnknownType" has no entity cache config → skipped. + env := newExtInvEnv(t, + `{"data":{"_entities":[{"__typename":"User","id":"1","username":"Alice"}]},"extensions":{"cacheInvalidation":{"keys":[{"typename":"UnknownType","key":{"id":"1"}}]}}}`, + ) + env.run() + assert.False(t, env.hasDeletes(), "unknown typename has no cache config → skipped") + }) + + t.Run("L2 cache disabled — no delete", func(t *testing.T) { + // With L2 disabled, processExtensionsCacheInvalidation returns early. + env := newExtInvEnv(t, + `{"data":{"_entities":[{"__typename":"User","id":"1","username":"Alice"}]},"extensions":{"cacheInvalidation":{"keys":[{"typename":"User","key":{"id":"1"}}]}}}`, + withExtInvL2Disabled(), + ) + env.run() + assert.False(t, env.hasDeletes(), "L2 disabled → invalidation skipped entirely") + }) + + // ------------------------------------------------------------------------- + // Malformed extensions: gracefully handled, no panics, no deletes. + // ------------------------------------------------------------------------- + + t.Run("malformed — keys not an array", func(t *testing.T) { + env := newExtInvEnv(t, + `{"data":{"_entities":[{"__typename":"User","id":"1","username":"Alice"}]},"extensions":{"cacheInvalidation":{"keys":"invalid"}}}`, + ) + env.run() + assert.False(t, env.hasDeletes(), "malformed keys field → gracefully ignored") + }) + + t.Run("malformed — entry missing typename", func(t *testing.T) { + env := newExtInvEnv(t, + `{"data":{"_entities":[{"__typename":"User","id":"1","username":"Alice"}]},"extensions":{"cacheInvalidation":{"keys":[{"key":{"id":"1"}}]}}}`, + ) + env.run() + assert.False(t, env.hasDeletes(), "missing typename → entry skipped") + }) + + t.Run("malformed — entry missing key", func(t *testing.T) { + env := newExtInvEnv(t, + `{"data":{"_entities":[{"__typename":"User","id":"1","username":"Alice"}]},"extensions":{"cacheInvalidation":{"keys":[{"typename":"User"}]}}}`, + ) + env.run() + assert.False(t, env.hasDeletes(), "missing key → entry skipped") + }) + + // ------------------------------------------------------------------------- + // Interceptor metadata: verify the L2CacheKeyInterceptor receives correct + // SubgraphName and CacheName for both regular cache operations and + // invalidation key construction. + // ------------------------------------------------------------------------- + + t.Run("interceptor receives correct SubgraphName and CacheName", func(t *testing.T) { + // The interceptor is called twice: once for the L2 cache set (regular flow) + // and once for the invalidation key construction. + var capturedInfos []L2CacheKeyInterceptorInfo + env := newExtInvEnv(t, + `{"data":{"_entities":[{"__typename":"User","id":"1","username":"Alice"}]},"extensions":{"cacheInvalidation":{"keys":[{"typename":"User","key":{"id":"1"}}]}}}`, + withExtInvInterceptor(func(_ context.Context, key string, info L2CacheKeyInterceptorInfo) string { + capturedInfos = append(capturedInfos, info) + return key + }), + ) + env.run() + + require.Len(t, capturedInfos, 2, "interceptor called for L2 set + invalidation key") + assert.Equal(t, L2CacheKeyInterceptorInfo{SubgraphName: "accounts", CacheName: "default"}, capturedInfos[0]) + assert.Equal(t, L2CacheKeyInterceptorInfo{SubgraphName: "accounts", CacheName: "default"}, capturedInfos[1]) + }) +} diff --git a/v2/pkg/engine/resolve/loader.go b/v2/pkg/engine/resolve/loader.go index fe0af7aade..ec4c2dd689 100644 --- a/v2/pkg/engine/resolve/loader.go +++ b/v2/pkg/engine/resolve/loader.go @@ -216,6 +216,10 @@ type Loader struct { caches map[string]LoaderCache + // entityCacheConfigs maps subgraphName → entityTypeName → config. + // Used by processExtensionsCacheInvalidation to look up cache settings at runtime. + entityCacheConfigs map[string]map[string]*EntityCacheInvalidationConfig + propagateSubgraphErrors bool propagateSubgraphStatusCodes bool subgraphErrorPropagationMode SubgraphErrorPropagationMode @@ -727,6 +731,10 @@ func (l *Loader) mergeResult(fetchItem *FetchItem, res *result, items []*astjson return l.renderErrorsFailedToFetch(fetchItem, res, invalidGraphQLResponse) } + // Extract cache invalidation signal from subgraph response extensions. + // This is not restricted to mutations — any subgraph response can signal invalidation. + cacheInvalidation := response.Get("extensions", "cacheInvalidation") + var responseData *astjson.Value if res.postProcessing.SelectResponseDataPath != nil { responseData = response.Get(res.postProcessing.SelectResponseDataPath...) @@ -795,9 +803,11 @@ func (l *Loader) mergeResult(fetchItem *FetchItem, res *result, items []*astjson return l.renderErrorsFailedToFetch(fetchItem, res, invalidGraphQLResponseShape) } l.resolvable.data = responseData + // Always run invalidation, even on partial-error responses. + l.runCacheInvalidation(fetchItem, res, responseData, cacheInvalidation) // Only populate caches on success (no errors) if !hasErrors { - l.populateCachesAfterFetch(fetchItem, res, items, responseData) + l.populateCachesAfterFetch(fetchItem, res, items, responseData, cacheInvalidation) } return nil } @@ -820,10 +830,12 @@ func (l *Loader) mergeResult(fetchItem *FetchItem, res *result, items []*astjson if len(res.l2CacheKeys) > 0 && res.l2CacheKeys[0] != nil { res.l2CacheKeys[0].Item = items[0] } + // Always run invalidation, even on partial-error responses. + l.runCacheInvalidation(fetchItem, res, responseData, cacheInvalidation) // Only populate caches on success (no errors) if !hasErrors { defer func() { - l.populateCachesAfterFetch(fetchItem, res, items, responseData) + l.populateCachesAfterFetch(fetchItem, res, items, responseData, cacheInvalidation) }() } return nil @@ -877,9 +889,11 @@ func (l *Loader) mergeResult(fetchItem *FetchItem, res *result, items []*astjson } } } + // Always run invalidation, even on partial-error responses. + l.runCacheInvalidation(fetchItem, res, responseData, cacheInvalidation) // Only populate caches on success (no errors) if !hasErrors { - l.populateCachesAfterFetch(fetchItem, res, items, responseData) + l.populateCachesAfterFetch(fetchItem, res, items, responseData, cacheInvalidation) } return nil } @@ -909,18 +923,32 @@ func (l *Loader) mergeResult(fetchItem *FetchItem, res *result, items []*astjson } } + // Always run invalidation, even on partial-error responses. + l.runCacheInvalidation(fetchItem, res, responseData, cacheInvalidation) // Only populate caches on success (no errors) if !hasErrors { - l.populateCachesAfterFetch(fetchItem, res, items, responseData) + l.populateCachesAfterFetch(fetchItem, res, items, responseData, cacheInvalidation) } return nil } -// populateCachesAfterFetch runs shadow comparison, mutation impact detection, -// and L1/L2 cache population. Called after a successful (error-free) fetch merge. -func (l *Loader) populateCachesAfterFetch(fetchItem *FetchItem, res *result, items []*astjson.Value, responseData *astjson.Value) { - l.compareShadowValues(res, getFetchInfo(fetchItem.Fetch)) - l.detectMutationEntityImpact(res, getFetchInfo(fetchItem.Fetch), responseData) +// runCacheInvalidation runs mutation entity impact detection and extensions-based +// cache invalidation. It is intentionally separated from populateCachesAfterFetch +// so it can be called unconditionally, even when the subgraph response contains errors. +func (l *Loader) runCacheInvalidation(fetchItem *FetchItem, res *result, responseData *astjson.Value, cacheInvalidation *astjson.Value) { + info := getFetchInfo(fetchItem.Fetch) + deletedKeys := l.detectMutationEntityImpact(res, info, responseData) + l.processExtensionsCacheInvalidation(res, cacheInvalidation, deletedKeys) +} + +// populateCachesAfterFetch runs shadow comparison and L1/L2 cache population. +// Called after a successful (error-free) fetch merge. +// +// Invalidation (detectMutationEntityImpact + processExtensionsCacheInvalidation) is +// called via runCacheInvalidation at each call site unconditionally before this function. +func (l *Loader) populateCachesAfterFetch(fetchItem *FetchItem, res *result, items []*astjson.Value, responseData *astjson.Value, cacheInvalidation *astjson.Value) { + info := getFetchInfo(fetchItem.Fetch) + l.compareShadowValues(res, info) l.populateL1Cache(fetchItem, res, items) l.updateL2Cache(res) } diff --git a/v2/pkg/engine/resolve/loader_cache.go b/v2/pkg/engine/resolve/loader_cache.go index d6cd903572..f801e3614c 100644 --- a/v2/pkg/engine/resolve/loader_cache.go +++ b/v2/pkg/engine/resolve/loader_cache.go @@ -20,6 +20,15 @@ type CacheEntry struct { RemainingTTL time.Duration // remaining TTL from cache (0 = unknown/not supported) } +// EntityCacheInvalidationConfig holds the minimal cache settings needed to build +// invalidation keys for a specific entity type on a specific subgraph. +// Separate from plan.EntityCacheConfiguration to avoid a resolve → plan dependency; +// only CacheName and IncludeSubgraphHeaderPrefix are needed at invalidation time. +type EntityCacheInvalidationConfig struct { + CacheName string + IncludeSubgraphHeaderPrefix bool +} + type LoaderCache interface { Get(ctx context.Context, keys []string) ([]*CacheEntry, error) Set(ctx context.Context, entries []*CacheEntry, ttl time.Duration) error @@ -966,29 +975,29 @@ func (l *Loader) compareShadowValues(res *result, info *FetchInfo) { // detectMutationEntityImpact checks if a mutation response contains a cached entity // and either invalidates (deletes) the L2 cache entry or compares it for staleness analytics. // Called from mergeResult on the main thread after the mutation fetch completes. -func (l *Loader) detectMutationEntityImpact(res *result, info *FetchInfo, responseData *astjson.Value) { +func (l *Loader) detectMutationEntityImpact(res *result, info *FetchInfo, responseData *astjson.Value) map[string]struct{} { if info == nil || info.OperationType != ast.OperationTypeMutation { - return + return nil } cfg := res.cacheConfig.MutationEntityImpactConfig if cfg == nil { - return + return nil } // Proceed if invalidation is configured or analytics is enabled if !cfg.InvalidateCache && !l.ctx.cacheAnalyticsEnabled() { - return + return nil } if info.ProvidesData == nil || len(info.RootFields) == 0 { - return + return nil } // Get the LoaderCache for this entity's cache name if l.caches == nil { - return + return nil } cache := l.caches[cfg.CacheName] if cache == nil { - return + return nil } mutationFieldName := info.RootFields[0].FieldName @@ -997,7 +1006,7 @@ func (l *Loader) detectMutationEntityImpact(res *result, info *FetchInfo, respon // For root mutation: responseData = {"updateUsername": {"id":"1234","username":"UpdatedMe"}} entityData := responseData.Get(mutationFieldName) if entityData == nil || entityData.Type() != astjson.TypeObject { - return + return nil } // Navigate ProvidesData to the entity level. @@ -1005,23 +1014,31 @@ func (l *Loader) detectMutationEntityImpact(res *result, info *FetchInfo, respon // We need the inner Object that describes the entity's fields. entityProvidesData := navigateProvidesDataToField(info.ProvidesData, mutationFieldName) if entityProvidesData == nil { - return + return nil } // Build L2 cache key for lookup cacheKey := l.buildMutationEntityCacheKey(cfg, entityData, info) if cacheKey == "" { - return + return nil + } + + // Read cached value for analytics BEFORE deleting, so analytics sees the real pre-delete value. + var analyticsEntries []*CacheEntry + if l.ctx.cacheAnalyticsEnabled() { + analyticsEntries, _ = cache.Get(l.ctx.ctx, []string{cacheKey}) } // Invalidate L2 cache entry if configured + var deletedKeys map[string]struct{} if cfg.InvalidateCache { _ = cache.Delete(l.ctx.ctx, []string{cacheKey}) + deletedKeys = map[string]struct{}{cacheKey: {}} } // Analytics comparison requires cacheAnalytics to be enabled if !l.ctx.cacheAnalyticsEnabled() { - return + return deletedKeys } // Build display key (without prefix) for analytics @@ -1035,9 +1052,8 @@ func (l *Loader) detectMutationEntityImpact(res *result, info *FetchInfo, respon _, _ = xxh.Write(freshBytes) freshHash := xxh.Sum64() - // Look up L2 cache - entries, err := cache.Get(l.ctx.ctx, []string{cacheKey}) - hadCachedValue := err == nil && len(entries) > 0 && entries[0] != nil && len(entries[0].Value) > 0 + // Use the pre-delete cached value for analytics comparison + hadCachedValue := len(analyticsEntries) > 0 && analyticsEntries[0] != nil && len(analyticsEntries[0].Value) > 0 if !hadCachedValue { // No cached value — record event showing entity was returned but not previously cached @@ -1050,13 +1066,13 @@ func (l *Loader) detectMutationEntityImpact(res *result, info *FetchInfo, respon FreshHash: freshHash, FreshBytes: len(freshBytes), }) - return + return deletedKeys } // Parse cached value and compare - cachedValue, parseErr := astjson.ParseBytesWithArena(l.jsonArena, entries[0].Value) + cachedValue, parseErr := astjson.ParseBytesWithArena(l.jsonArena, analyticsEntries[0].Value) if parseErr != nil { - return + return deletedKeys } cachedProvides := l.shallowCopyProvidedFields(cachedValue, entityProvidesData) @@ -1076,6 +1092,7 @@ func (l *Loader) detectMutationEntityImpact(res *result, info *FetchInfo, respon CachedBytes: len(cachedBytes), FreshBytes: len(freshBytes), }) + return deletedKeys } // buildMutationEntityCacheKey builds the L2 cache key for a mutation-returned entity. @@ -1134,6 +1151,182 @@ func buildEntityKeyValue(a arena.Arena, data *astjson.Value, keyFields []KeyFiel return obj } +// processExtensionsCacheInvalidation handles cache invalidation signals from subgraph response extensions. +// +// Subgraphs can signal cache invalidation by including an extensions field in their response: +// +// {"extensions": {"cacheInvalidation": {"keys": [{"typename": "User", "key": {"id": "1"}}]}}} +// +// This function parses the keys array and deletes the corresponding L2 cache entries. +// Works for both query and mutation responses — not restricted to mutations. +// +// The cache key construction pipeline mirrors the storage pipeline: +// +// typename + key fields → build JSON → apply header prefix → apply interceptor → cache.Delete() +func (l *Loader) processExtensionsCacheInvalidation(res *result, cacheInvalidation *astjson.Value, deletedKeys map[string]struct{}) { + // No invalidation data in the response extensions. + if cacheInvalidation == nil { + return + } + // Extensions-based invalidation only applies when L2 caching is enabled, + // since L2 is the cross-request cache that benefits from explicit invalidation. + if !l.ctx.ExecutionOptions.Caching.EnableL2Cache { + return + } + // entityCacheConfigs maps subgraph name → entity type → config (CacheName, IncludeSubgraphHeaderPrefix). + // Without this mapping, we don't know which cache to delete from or how to build the key. + if l.entityCacheConfigs == nil || l.caches == nil { + return + } + + // Extract the "keys" array from the cacheInvalidation object. + // Each entry has {"typename": "User", "key": {"id": "1"}}. + keysArray := cacheInvalidation.GetArray("keys") + if len(keysArray) == 0 { + return + } + + // Look up the entity cache config for the responding subgraph. + // The subgraph that sent the invalidation signal is the same one whose entity configs we use, + // because in federation, the subgraph that caches an entity is the one that resolves it. + subgraphName := res.ds.Name + subgraphConfigs := l.entityCacheConfigs[subgraphName] + if subgraphConfigs == nil { + return + } + + // Build set of L2 keys that updateL2Cache will set after this function returns. + // Deleting a key that's about to be re-set with fresh data is redundant. + keysAboutToBeSet := l.l2KeysAboutToBeSet(res) + + // Group invalidation keys by cache name so we can batch-delete per cache instance. + type cacheDeleteBatch struct { + cache LoaderCache + keys []string + } + batches := map[string]*cacheDeleteBatch{} + + for _, entry := range keysArray { + // Skip malformed entries (must be JSON objects). + if entry == nil || entry.Type() != astjson.TypeObject { + continue + } + + // Extract "typename" (string) and "key" (JSON object) from each invalidation entry. + typenameVal := entry.Get("typename") + keyVal := entry.Get("key") + if typenameVal == nil || keyVal == nil || keyVal.Type() != astjson.TypeObject { + continue + } + typename := string(typenameVal.GetStringBytes()) + if typename == "" { + continue + } + + // Look up the entity cache config for this typename from the responding subgraph. + // This tells us which cache instance to use and whether to apply header prefix. + // Unknown typenames are silently skipped — the subgraph may send invalidation + // for types that aren't configured for caching on this router. + entityConfig := subgraphConfigs[typename] + if entityConfig == nil { + continue + } + + // Resolve the cache instance by name. + cache := l.caches[entityConfig.CacheName] + if cache == nil { + continue + } + + // Build the base cache key JSON matching the format used during cache population: + // {"__typename":"User","key":{"id":"1"}} + // The "key" value is taken directly from the extensions — it's already a JSON object + // with the entity's @key field values. + keyObj := astjson.ObjectValue(l.jsonArena) + keyObj.Set(l.jsonArena, "__typename", astjson.StringValue(l.jsonArena, typename)) + keyObj.Set(l.jsonArena, "key", keyVal) + baseKey := string(keyObj.MarshalTo(nil)) + cacheKey := baseKey + + // Apply subgraph header prefix if configured for this entity type. + // This mirrors prepareCacheKeys() which prefixes L2 keys with a hash of the + // HTTP headers sent to the subgraph, enabling per-tenant cache isolation. + // Result: "55555:{"__typename":"User","key":{"id":"1"}}" + if entityConfig.IncludeSubgraphHeaderPrefix && l.ctx.SubgraphHeadersBuilder != nil { + _, headersHash := l.ctx.SubgraphHeadersBuilder.HeadersForSubgraph(subgraphName) + var buf [20]byte + b := strconv.AppendUint(buf[:0], headersHash, 10) + cacheKey = string(b) + ":" + cacheKey + } + + // Apply user-provided L2 cache key interceptor if set. + // This allows user-defined key transformations (e.g., tenant isolation prefixes) + // and mirrors the same interceptor applied during cache population. + if interceptor := l.ctx.ExecutionOptions.Caching.L2CacheKeyInterceptor; interceptor != nil { + cacheKey = interceptor(l.ctx.ctx, cacheKey, L2CacheKeyInterceptorInfo{ + SubgraphName: subgraphName, + CacheName: entityConfig.CacheName, + }) + } + + // Skip L2 delete if: + // - already deleted by detectMutationEntityImpact (deduplication) + // - about to be re-set by updateL2Cache (redundant delete before set) + if _, alreadyDone := deletedKeys[cacheKey]; alreadyDone { + continue + } + if _, aboutToBeSet := keysAboutToBeSet[cacheKey]; aboutToBeSet { + continue + } + + // Accumulate the key into the batch for this cache name. + batch, ok := batches[entityConfig.CacheName] + if !ok { + batch = &cacheDeleteBatch{cache: cache} + batches[entityConfig.CacheName] = batch + } + batch.keys = append(batch.keys, cacheKey) + } + + // Execute batched L2 cache deletes — one Delete call per cache instance. + for _, batch := range batches { + _ = batch.cache.Delete(l.ctx.ctx, batch.keys) + } +} + +// l2KeysAboutToBeSet returns the set of L2 cache keys that updateL2Cache will store +// after the current fetch. Returns nil if updateL2Cache won't run (e.g., mutations +// without explicit L2 population, or no cache misses to populate). +func (l *Loader) l2KeysAboutToBeSet(res *result) map[string]struct{} { + // updateL2Cache skips for mutations unless L2 population is explicitly enabled. + if l.info != nil && l.info.OperationType == ast.OperationTypeMutation && + !l.enableMutationL2CachePopulation { + return nil + } + if res.cache == nil || !res.cacheMustBeUpdated { + return nil + } + keys := res.l2CacheKeys + if len(keys) == 0 { + keys = res.l1CacheKeys + } + if len(keys) == 0 { + return nil + } + set := make(map[string]struct{}, len(keys)) + for _, ck := range keys { + // Skip keys whose Item is nil — updateL2Cache won't store them + // (can happen if an entity failed to merge during batch processing). + if ck == nil || ck.Item == nil { + continue + } + for _, k := range ck.Keys { + set[k] = struct{}{} + } + } + return set +} + // navigateProvidesDataToField finds the Object within ProvidesData that corresponds // to a specific field name. For root mutations, ProvidesData describes the full response // (e.g., {updateUsername: {id, username}}) and we need the inner Object for comparison. diff --git a/v2/pkg/engine/resolve/resolve.go b/v2/pkg/engine/resolve/resolve.go index 258de42a81..789cedf5ac 100644 --- a/v2/pkg/engine/resolve/resolve.go +++ b/v2/pkg/engine/resolve/resolve.go @@ -192,6 +192,10 @@ type ResolverOptions struct { Caches map[string]LoaderCache + // EntityCacheConfigs maps subgraphName → entityTypeName → config. + // Used by extensions-based cache invalidation to look up cache settings at runtime. + EntityCacheConfigs map[string]map[string]*EntityCacheInvalidationConfig + // SubgraphRequestDeduplicationShardCount defines the number of shards to use for subgraph request deduplication SubgraphRequestDeduplicationShardCount int // InboundRequestDeduplicationShardCount defines the number of shards to use for inbound request deduplication @@ -326,6 +330,7 @@ func newTools(options ResolverOptions, allowedExtensionFields map[string]struct{ singleFlight: sf, jsonArena: a, caches: options.Caches, + entityCacheConfigs: options.EntityCacheConfigs, }, } }