diff --git a/execution/engine/config_factory_federation.go b/execution/engine/config_factory_federation.go index d3dda53aaa..d8e386b291 100644 --- a/execution/engine/config_factory_federation.go +++ b/execution/engine/config_factory_federation.go @@ -36,6 +36,7 @@ type SubgraphCachingConfig struct { EntityCaching plan.EntityCacheConfigurations // Caching config for entity types in this subgraph RootFieldCaching plan.RootFieldCacheConfigurations // Caching config for root fields in this subgraph SubscriptionEntityPopulation plan.SubscriptionEntityPopulationConfigurations // Caching config for subscription entity population/invalidation + MutationCacheInvalidation plan.MutationCacheInvalidationConfigurations // Caching config for mutation-triggered cache invalidation } // SubgraphCachingConfigs is a list of per-subgraph caching configurations. @@ -489,6 +490,7 @@ func (f *FederationEngineConfigFactory) dataSourceMetaData(in *nodev1.DataSource out.FederationMetaData.EntityCaching = subgraphCachingConfig.EntityCaching out.FederationMetaData.RootFieldCaching = subgraphCachingConfig.RootFieldCaching out.FederationMetaData.SubscriptionEntityPopulation = subgraphCachingConfig.SubscriptionEntityPopulation + out.FederationMetaData.MutationCacheInvalidation = subgraphCachingConfig.MutationCacheInvalidation } return out diff --git a/execution/engine/federation_caching_test.go b/execution/engine/federation_caching_test.go index 57ea5572f5..5386193e50 100644 --- a/execution/engine/federation_caching_test.go +++ b/execution/engine/federation_caching_test.go @@ -6543,6 +6543,149 @@ func TestMutationImpactE2E(t *testing.T) { }) } +func TestMutationCacheInvalidationE2E(t *testing.T) { + accounts.ResetUsers() + t.Cleanup(accounts.ResetUsers) + + // Configure entity caching for User AND mutation invalidation for updateUsername + subgraphCachingConfigs := engine.SubgraphCachingConfigs{ + { + SubgraphName: "accounts", + EntityCaching: plan.EntityCacheConfigurations{ + {TypeName: "User", CacheName: "default", TTL: 30 * time.Second}, + }, + MutationCacheInvalidation: plan.MutationCacheInvalidationConfigurations{ + {FieldName: "updateUsername"}, + }, + }, + } + + // Query that triggers entity caching for User via authorWithoutProvides (no @provides) + entityQuery := `query { topProducts { name reviews { body authorWithoutProvides { username } } } }` + mutationQuery := `mutation { updateUsername(id: "1234", newUsername: "UpdatedMe") { id username } }` + + t.Run("mutation deletes L2 cache entry", func(t *testing.T) { + accounts.ResetUsers() + defaultCache := NewFakeLoaderCache() + caches := map[string]resolve.LoaderCache{"default": defaultCache} + + tracker := newSubgraphCallTracker(http.DefaultTransport) + trackingClient := &http.Client{Transport: tracker} + + setup := federationtesting.NewFederationSetup(addCachingGateway( + withCachingEnableART(false), + withCachingLoaderCache(caches), + withHTTPClient(trackingClient), + withCachingOptionsFunc(resolve.CachingOptions{EnableL2Cache: true}), + withSubgraphEntityCachingConfigs(subgraphCachingConfigs), + )) + t.Cleanup(setup.Close) + + gqlClient := NewGraphqlClient(http.DefaultClient) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + accountsHost := mustParseHost(setup.AccountsUpstreamServer.URL) + + // Request 1: Query to populate L2 cache with User entity + tracker.Reset() + defaultCache.ClearLog() + resp := gqlClient.QueryString(ctx, setup.GatewayServer.URL, entityQuery, nil, t) + assert.Contains(t, string(resp), `"username":"Me"`) + assert.Equal(t, 1, tracker.GetCount(accountsHost), "should call accounts subgraph once to populate cache") + + // Request 2: Same query — should hit L2 cache, no accounts call + tracker.Reset() + defaultCache.ClearLog() + resp = gqlClient.QueryString(ctx, setup.GatewayServer.URL, entityQuery, nil, t) + assert.Contains(t, string(resp), `"username":"Me"`) + assert.Equal(t, 0, tracker.GetCount(accountsHost), "should NOT call accounts subgraph (L2 hit)") + + // Request 3: Mutation — should delete the L2 cache entry + tracker.Reset() + defaultCache.ClearLog() + respMut := gqlClient.QueryString(ctx, setup.GatewayServer.URL, mutationQuery, nil, t) + assert.Contains(t, string(respMut), `"UpdatedMe"`) + + // Verify the cache log contains a delete operation + mutationLog := defaultCache.GetLog() + hasDelete := false + for _, entry := range mutationLog { + if entry.Operation == "delete" { + hasDelete = true + assert.Equal(t, 1, len(entry.Keys), "delete should have exactly 1 key") + assert.Contains(t, entry.Keys[0], `"__typename":"User"`) + assert.Contains(t, entry.Keys[0], `"id":"1234"`) + } + } + assert.True(t, hasDelete, "mutation should trigger a cache delete operation") + + // Request 4: Same query again — should miss L2 (entry deleted), re-fetch from subgraph + tracker.Reset() + defaultCache.ClearLog() + resp = gqlClient.QueryString(ctx, setup.GatewayServer.URL, entityQuery, nil, t) + assert.Contains(t, string(resp), `"username":"UpdatedMe"`) + assert.Equal(t, 1, tracker.GetCount(accountsHost), "should call accounts subgraph again (L2 entry was deleted)") + }) + + t.Run("mutation without invalidation config does not delete", func(t *testing.T) { + accounts.ResetUsers() + defaultCache := NewFakeLoaderCache() + caches := map[string]resolve.LoaderCache{"default": defaultCache} + + // Config WITHOUT MutationCacheInvalidation + noInvalidationConfigs := engine.SubgraphCachingConfigs{ + { + SubgraphName: "accounts", + EntityCaching: plan.EntityCacheConfigurations{ + {TypeName: "User", CacheName: "default", TTL: 30 * time.Second}, + }, + // No MutationCacheInvalidation — mutation should NOT delete cache + }, + } + + tracker := newSubgraphCallTracker(http.DefaultTransport) + trackingClient := &http.Client{Transport: tracker} + + setup := federationtesting.NewFederationSetup(addCachingGateway( + withCachingEnableART(false), + withCachingLoaderCache(caches), + withHTTPClient(trackingClient), + withCachingOptionsFunc(resolve.CachingOptions{EnableL2Cache: true}), + withSubgraphEntityCachingConfigs(noInvalidationConfigs), + )) + t.Cleanup(setup.Close) + + gqlClient := NewGraphqlClient(http.DefaultClient) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + accountsHost := mustParseHost(setup.AccountsUpstreamServer.URL) + + // Request 1: Query to populate L2 cache + tracker.Reset() + resp := gqlClient.QueryString(ctx, setup.GatewayServer.URL, entityQuery, nil, t) + assert.Contains(t, string(resp), `"username":"Me"`) + + // Request 2: Mutation — should NOT delete L2 cache entry + tracker.Reset() + defaultCache.ClearLog() + respMut := gqlClient.QueryString(ctx, setup.GatewayServer.URL, mutationQuery, nil, t) + assert.Contains(t, string(respMut), `"UpdatedMe"`) + + // Verify no delete operation in cache log + mutationLog := defaultCache.GetLog() + for _, entry := range mutationLog { + assert.NotEqual(t, "delete", entry.Operation, "should not have any delete operations without invalidation config") + } + + // Request 3: Same query — should still hit L2 cache (stale but not deleted) + tracker.Reset() + _ = gqlClient.QueryString(ctx, setup.GatewayServer.URL, entityQuery, nil, t) + assert.Equal(t, 0, tracker.GetCount(accountsHost), "should NOT call accounts subgraph (L2 entry still present)") + }) +} + func mustParseHost(rawURL string) string { parsed, err := url.Parse(rawURL) if err != nil { diff --git a/v2/pkg/engine/plan/datasource_configuration.go b/v2/pkg/engine/plan/datasource_configuration.go index e51ea16fbf..c789f90f12 100644 --- a/v2/pkg/engine/plan/datasource_configuration.go +++ b/v2/pkg/engine/plan/datasource_configuration.go @@ -351,6 +351,10 @@ func (d *dataSourceConfiguration[T]) RootFieldCacheConfig(typeName, fieldName st return d.FederationMetaData.RootFieldCacheConfig(typeName, fieldName) } +func (d *dataSourceConfiguration[T]) MutationCacheInvalidationConfig(fieldName string) *MutationCacheInvalidationConfiguration { + return d.FederationMetaData.MutationCacheInvalidationConfig(fieldName) +} + func (d *dataSourceConfiguration[T]) Hash() DSHash { return d.hash } diff --git a/v2/pkg/engine/plan/federation_metadata.go b/v2/pkg/engine/plan/federation_metadata.go index 5fa0908fcf..56a8e98e31 100644 --- a/v2/pkg/engine/plan/federation_metadata.go +++ b/v2/pkg/engine/plan/federation_metadata.go @@ -17,6 +17,7 @@ type FederationMetaData struct { EntityCaching EntityCacheConfigurations RootFieldCaching RootFieldCacheConfigurations SubscriptionEntityPopulation SubscriptionEntityPopulationConfigurations + MutationCacheInvalidation MutationCacheInvalidationConfigurations entityTypeNames map[string]struct{} } @@ -31,6 +32,7 @@ type FederationInfo interface { EntityInterfaceNames() []string EntityCacheConfig(typeName string) *EntityCacheConfiguration RootFieldCacheConfig(typeName, fieldName string) *RootFieldCacheConfiguration + MutationCacheInvalidationConfig(fieldName string) *MutationCacheInvalidationConfiguration } func (d *FederationMetaData) HasKeyRequirement(typeName, requiresFields string) bool { @@ -237,6 +239,30 @@ func (c SubscriptionEntityPopulationConfigurations) FindByTypeName(typeName stri return nil } +// MutationCacheInvalidationConfiguration defines which mutation fields should +// invalidate (delete) L2 cache entries for the entity they return. +type MutationCacheInvalidationConfiguration struct { + // FieldName is the mutation field name (e.g., "updateUser", "deleteUser"). + FieldName string `json:"field_name"` + // EntityTypeName is the return entity type (e.g., "User"). + // If empty, it is inferred from the mutation return type at plan time. + EntityTypeName string `json:"entity_type_name,omitempty"` +} + +// MutationCacheInvalidationConfigurations is a collection of mutation cache invalidation configurations. +type MutationCacheInvalidationConfigurations []MutationCacheInvalidationConfiguration + +// FindByFieldName returns the invalidation config for the given mutation field. +// Returns nil if no configuration exists (no invalidation for this field). +func (c MutationCacheInvalidationConfigurations) FindByFieldName(fieldName string) *MutationCacheInvalidationConfiguration { + for i := range c { + if c[i].FieldName == fieldName { + return &c[i] + } + } + return nil +} + // EntityCacheConfig returns the cache configuration for the given entity type. // Returns nil if no configuration exists (caching should be disabled for this entity). func (d *FederationMetaData) EntityCacheConfig(typeName string) *EntityCacheConfiguration { @@ -249,6 +275,12 @@ func (d *FederationMetaData) RootFieldCacheConfig(typeName, fieldName string) *R return d.RootFieldCaching.FindByTypeAndField(typeName, fieldName) } +// MutationCacheInvalidationConfig returns the invalidation config for the given mutation field. +// Returns nil if no configuration exists (no invalidation for this field). +func (d *FederationMetaData) MutationCacheInvalidationConfig(fieldName string) *MutationCacheInvalidationConfiguration { + return d.MutationCacheInvalidation.FindByFieldName(fieldName) +} + type FederationFieldConfiguration struct { TypeName string `json:"type_name"` // TypeName is the name of the Entity the Fragment is for FieldName string `json:"field_name,omitempty"` // FieldName is empty for key requirements, otherwise, it is the name of the field that has requires or provides directive diff --git a/v2/pkg/engine/plan/visitor.go b/v2/pkg/engine/plan/visitor.go index 452cc30915..eec96e7d58 100644 --- a/v2/pkg/engine/plan/visitor.go +++ b/v2/pkg/engine/plan/visitor.go @@ -2410,6 +2410,13 @@ func (v *Visitor) configureMutationEntityImpact(internal *objectFetchConfigurati CacheName: entityCacheConfig.CacheName, IncludeSubgraphHeaderPrefix: entityCacheConfig.IncludeSubgraphHeaderPrefix, } + + // Check if this specific mutation field is configured for cache invalidation + if len(internal.rootFields) > 0 { + if fedConfig.MutationCacheInvalidationConfig(internal.rootFields[0].FieldName) != nil { + result.MutationEntityImpactConfig.InvalidateCache = true + } + } } // resolveMutationReturnType resolves the return type name of a mutation field definition. diff --git a/v2/pkg/engine/resolve/fetch.go b/v2/pkg/engine/resolve/fetch.go index 32b97dddff..9d1d859038 100644 --- a/v2/pkg/engine/resolve/fetch.go +++ b/v2/pkg/engine/resolve/fetch.go @@ -362,6 +362,9 @@ type MutationEntityImpactConfig struct { KeyFields []KeyField // [{Name: "id"}] CacheName string // "default" IncludeSubgraphHeaderPrefix bool + // InvalidateCache when true causes the L2 cache entry for this entity to be deleted + // after the mutation completes. Configured per mutation field via MutationCacheInvalidationConfiguration. + InvalidateCache bool } // FetchDependency explains how a GraphCoordinate depends on other GraphCoordinates from other fetches diff --git a/v2/pkg/engine/resolve/loader_cache.go b/v2/pkg/engine/resolve/loader_cache.go index 351500d737..b977f25602 100644 --- a/v2/pkg/engine/resolve/loader_cache.go +++ b/v2/pkg/engine/resolve/loader_cache.go @@ -945,19 +945,20 @@ func (l *Loader) compareShadowValues(res *result, info *FetchInfo) { } // detectMutationEntityImpact checks if a mutation response contains a cached entity -// and compares it with the L2 cache to detect staleness. +// and either invalidates (deletes) the L2 cache entry or compares it for staleness analytics. // Called from mergeResult on the main thread after the mutation fetch completes. func (l *Loader) detectMutationEntityImpact(res *result, info *FetchInfo, responseData *astjson.Value) { if info == nil || info.OperationType != ast.OperationTypeMutation { return } - if !l.ctx.cacheAnalyticsEnabled() { - return - } cfg := res.cacheConfig.MutationEntityImpactConfig if cfg == nil { return } + // Proceed if invalidation is configured or analytics is enabled + if !cfg.InvalidateCache && !l.ctx.cacheAnalyticsEnabled() { + return + } if info.ProvidesData == nil || len(info.RootFields) == 0 { return } @@ -994,6 +995,16 @@ func (l *Loader) detectMutationEntityImpact(res *result, info *FetchInfo, respon return } + // Invalidate L2 cache entry if configured + if cfg.InvalidateCache { + _ = cache.Delete(l.ctx.ctx, []string{cacheKey}) + } + + // Analytics comparison requires cacheAnalytics to be enabled + if !l.ctx.cacheAnalyticsEnabled() { + return + } + // Build display key (without prefix) for analytics displayKey := l.buildMutationEntityDisplayKey(cfg, entityData)