Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions execution/engine/config_factory_federation.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type SubgraphCachingConfig struct {
EntityCaching plan.EntityCacheConfigurations // Caching config for entity types in this subgraph
RootFieldCaching plan.RootFieldCacheConfigurations // Caching config for root fields in this subgraph
SubscriptionEntityPopulation plan.SubscriptionEntityPopulationConfigurations // Caching config for subscription entity population/invalidation
MutationCacheInvalidation plan.MutationCacheInvalidationConfigurations // Caching config for mutation-triggered cache invalidation
}

// SubgraphCachingConfigs is a list of per-subgraph caching configurations.
Expand Down Expand Up @@ -489,6 +490,7 @@ func (f *FederationEngineConfigFactory) dataSourceMetaData(in *nodev1.DataSource
out.FederationMetaData.EntityCaching = subgraphCachingConfig.EntityCaching
out.FederationMetaData.RootFieldCaching = subgraphCachingConfig.RootFieldCaching
out.FederationMetaData.SubscriptionEntityPopulation = subgraphCachingConfig.SubscriptionEntityPopulation
out.FederationMetaData.MutationCacheInvalidation = subgraphCachingConfig.MutationCacheInvalidation
}

return out
Expand Down
143 changes: 143 additions & 0 deletions execution/engine/federation_caching_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6543,6 +6543,149 @@ func TestMutationImpactE2E(t *testing.T) {
})
}

func TestMutationCacheInvalidationE2E(t *testing.T) {
accounts.ResetUsers()
t.Cleanup(accounts.ResetUsers)

// Configure entity caching for User AND mutation invalidation for updateUsername
subgraphCachingConfigs := engine.SubgraphCachingConfigs{
{
SubgraphName: "accounts",
EntityCaching: plan.EntityCacheConfigurations{
{TypeName: "User", CacheName: "default", TTL: 30 * time.Second},
},
MutationCacheInvalidation: plan.MutationCacheInvalidationConfigurations{
{FieldName: "updateUsername"},
},
},
}

// Query that triggers entity caching for User via authorWithoutProvides (no @provides)
entityQuery := `query { topProducts { name reviews { body authorWithoutProvides { username } } } }`
mutationQuery := `mutation { updateUsername(id: "1234", newUsername: "UpdatedMe") { id username } }`

t.Run("mutation deletes L2 cache entry", func(t *testing.T) {
accounts.ResetUsers()
defaultCache := NewFakeLoaderCache()
caches := map[string]resolve.LoaderCache{"default": defaultCache}

tracker := newSubgraphCallTracker(http.DefaultTransport)
trackingClient := &http.Client{Transport: tracker}

setup := federationtesting.NewFederationSetup(addCachingGateway(
withCachingEnableART(false),
withCachingLoaderCache(caches),
withHTTPClient(trackingClient),
withCachingOptionsFunc(resolve.CachingOptions{EnableL2Cache: true}),
withSubgraphEntityCachingConfigs(subgraphCachingConfigs),
))
t.Cleanup(setup.Close)

gqlClient := NewGraphqlClient(http.DefaultClient)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

accountsHost := mustParseHost(setup.AccountsUpstreamServer.URL)

// Request 1: Query to populate L2 cache with User entity
tracker.Reset()
defaultCache.ClearLog()
resp := gqlClient.QueryString(ctx, setup.GatewayServer.URL, entityQuery, nil, t)
assert.Contains(t, string(resp), `"username":"Me"`)
assert.Equal(t, 1, tracker.GetCount(accountsHost), "should call accounts subgraph once to populate cache")

// Request 2: Same query — should hit L2 cache, no accounts call
tracker.Reset()
defaultCache.ClearLog()
resp = gqlClient.QueryString(ctx, setup.GatewayServer.URL, entityQuery, nil, t)
assert.Contains(t, string(resp), `"username":"Me"`)
assert.Equal(t, 0, tracker.GetCount(accountsHost), "should NOT call accounts subgraph (L2 hit)")

// Request 3: Mutation — should delete the L2 cache entry
tracker.Reset()
defaultCache.ClearLog()
respMut := gqlClient.QueryString(ctx, setup.GatewayServer.URL, mutationQuery, nil, t)
assert.Contains(t, string(respMut), `"UpdatedMe"`)

// Verify the cache log contains a delete operation
mutationLog := defaultCache.GetLog()
hasDelete := false
for _, entry := range mutationLog {
if entry.Operation == "delete" {
hasDelete = true
assert.Equal(t, 1, len(entry.Keys), "delete should have exactly 1 key")
assert.Contains(t, entry.Keys[0], `"__typename":"User"`)
assert.Contains(t, entry.Keys[0], `"id":"1234"`)
}
}
assert.True(t, hasDelete, "mutation should trigger a cache delete operation")

// Request 4: Same query again — should miss L2 (entry deleted), re-fetch from subgraph
tracker.Reset()
defaultCache.ClearLog()
resp = gqlClient.QueryString(ctx, setup.GatewayServer.URL, entityQuery, nil, t)
assert.Contains(t, string(resp), `"username":"UpdatedMe"`)
assert.Equal(t, 1, tracker.GetCount(accountsHost), "should call accounts subgraph again (L2 entry was deleted)")
})

t.Run("mutation without invalidation config does not delete", func(t *testing.T) {
accounts.ResetUsers()
defaultCache := NewFakeLoaderCache()
caches := map[string]resolve.LoaderCache{"default": defaultCache}

// Config WITHOUT MutationCacheInvalidation
noInvalidationConfigs := engine.SubgraphCachingConfigs{
{
SubgraphName: "accounts",
EntityCaching: plan.EntityCacheConfigurations{
{TypeName: "User", CacheName: "default", TTL: 30 * time.Second},
},
// No MutationCacheInvalidation — mutation should NOT delete cache
},
}

tracker := newSubgraphCallTracker(http.DefaultTransport)
trackingClient := &http.Client{Transport: tracker}

setup := federationtesting.NewFederationSetup(addCachingGateway(
withCachingEnableART(false),
withCachingLoaderCache(caches),
withHTTPClient(trackingClient),
withCachingOptionsFunc(resolve.CachingOptions{EnableL2Cache: true}),
withSubgraphEntityCachingConfigs(noInvalidationConfigs),
))
t.Cleanup(setup.Close)

gqlClient := NewGraphqlClient(http.DefaultClient)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

accountsHost := mustParseHost(setup.AccountsUpstreamServer.URL)

// Request 1: Query to populate L2 cache
tracker.Reset()
resp := gqlClient.QueryString(ctx, setup.GatewayServer.URL, entityQuery, nil, t)
assert.Contains(t, string(resp), `"username":"Me"`)

// Request 2: Mutation — should NOT delete L2 cache entry
tracker.Reset()
defaultCache.ClearLog()
respMut := gqlClient.QueryString(ctx, setup.GatewayServer.URL, mutationQuery, nil, t)
assert.Contains(t, string(respMut), `"UpdatedMe"`)

// Verify no delete operation in cache log
mutationLog := defaultCache.GetLog()
for _, entry := range mutationLog {
assert.NotEqual(t, "delete", entry.Operation, "should not have any delete operations without invalidation config")
}

// Request 3: Same query — should still hit L2 cache (stale but not deleted)
tracker.Reset()
_ = gqlClient.QueryString(ctx, setup.GatewayServer.URL, entityQuery, nil, t)
assert.Equal(t, 0, tracker.GetCount(accountsHost), "should NOT call accounts subgraph (L2 entry still present)")
})
}

func mustParseHost(rawURL string) string {
parsed, err := url.Parse(rawURL)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions v2/pkg/engine/plan/datasource_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,10 @@ func (d *dataSourceConfiguration[T]) RootFieldCacheConfig(typeName, fieldName st
return d.FederationMetaData.RootFieldCacheConfig(typeName, fieldName)
}

func (d *dataSourceConfiguration[T]) MutationCacheInvalidationConfig(fieldName string) *MutationCacheInvalidationConfiguration {
return d.FederationMetaData.MutationCacheInvalidationConfig(fieldName)
}

func (d *dataSourceConfiguration[T]) Hash() DSHash {
return d.hash
}
Expand Down
32 changes: 32 additions & 0 deletions v2/pkg/engine/plan/federation_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type FederationMetaData struct {
EntityCaching EntityCacheConfigurations
RootFieldCaching RootFieldCacheConfigurations
SubscriptionEntityPopulation SubscriptionEntityPopulationConfigurations
MutationCacheInvalidation MutationCacheInvalidationConfigurations

entityTypeNames map[string]struct{}
}
Expand All @@ -31,6 +32,7 @@ type FederationInfo interface {
EntityInterfaceNames() []string
EntityCacheConfig(typeName string) *EntityCacheConfiguration
RootFieldCacheConfig(typeName, fieldName string) *RootFieldCacheConfiguration
MutationCacheInvalidationConfig(fieldName string) *MutationCacheInvalidationConfiguration
}

func (d *FederationMetaData) HasKeyRequirement(typeName, requiresFields string) bool {
Expand Down Expand Up @@ -237,6 +239,30 @@ func (c SubscriptionEntityPopulationConfigurations) FindByTypeName(typeName stri
return nil
}

// MutationCacheInvalidationConfiguration defines which mutation fields should
// invalidate (delete) L2 cache entries for the entity they return.
type MutationCacheInvalidationConfiguration struct {
// FieldName is the mutation field name (e.g., "updateUser", "deleteUser").
FieldName string `json:"field_name"`
// EntityTypeName is the return entity type (e.g., "User").
// If empty, it is inferred from the mutation return type at plan time.
EntityTypeName string `json:"entity_type_name,omitempty"`
}

// MutationCacheInvalidationConfigurations is a collection of mutation cache invalidation configurations.
type MutationCacheInvalidationConfigurations []MutationCacheInvalidationConfiguration

// FindByFieldName returns the invalidation config for the given mutation field.
// Returns nil if no configuration exists (no invalidation for this field).
func (c MutationCacheInvalidationConfigurations) FindByFieldName(fieldName string) *MutationCacheInvalidationConfiguration {
for i := range c {
if c[i].FieldName == fieldName {
return &c[i]
}
}
return nil
}

// EntityCacheConfig returns the cache configuration for the given entity type.
// Returns nil if no configuration exists (caching should be disabled for this entity).
func (d *FederationMetaData) EntityCacheConfig(typeName string) *EntityCacheConfiguration {
Expand All @@ -249,6 +275,12 @@ func (d *FederationMetaData) RootFieldCacheConfig(typeName, fieldName string) *R
return d.RootFieldCaching.FindByTypeAndField(typeName, fieldName)
}

// MutationCacheInvalidationConfig returns the invalidation config for the given mutation field.
// Returns nil if no configuration exists (no invalidation for this field).
func (d *FederationMetaData) MutationCacheInvalidationConfig(fieldName string) *MutationCacheInvalidationConfiguration {
return d.MutationCacheInvalidation.FindByFieldName(fieldName)
}

type FederationFieldConfiguration struct {
TypeName string `json:"type_name"` // TypeName is the name of the Entity the Fragment is for
FieldName string `json:"field_name,omitempty"` // FieldName is empty for key requirements, otherwise, it is the name of the field that has requires or provides directive
Expand Down
7 changes: 7 additions & 0 deletions v2/pkg/engine/plan/visitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2410,6 +2410,13 @@ func (v *Visitor) configureMutationEntityImpact(internal *objectFetchConfigurati
CacheName: entityCacheConfig.CacheName,
IncludeSubgraphHeaderPrefix: entityCacheConfig.IncludeSubgraphHeaderPrefix,
}

// Check if this specific mutation field is configured for cache invalidation
if len(internal.rootFields) > 0 {
if fedConfig.MutationCacheInvalidationConfig(internal.rootFields[0].FieldName) != nil {
result.MutationEntityImpactConfig.InvalidateCache = true
}
}
}

// resolveMutationReturnType resolves the return type name of a mutation field definition.
Expand Down
3 changes: 3 additions & 0 deletions v2/pkg/engine/resolve/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,9 @@ type MutationEntityImpactConfig struct {
KeyFields []KeyField // [{Name: "id"}]
CacheName string // "default"
IncludeSubgraphHeaderPrefix bool
// InvalidateCache when true causes the L2 cache entry for this entity to be deleted
// after the mutation completes. Configured per mutation field via MutationCacheInvalidationConfiguration.
InvalidateCache bool
}

// FetchDependency explains how a GraphCoordinate depends on other GraphCoordinates from other fetches
Expand Down
19 changes: 15 additions & 4 deletions v2/pkg/engine/resolve/loader_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -945,19 +945,20 @@ func (l *Loader) compareShadowValues(res *result, info *FetchInfo) {
}

// detectMutationEntityImpact checks if a mutation response contains a cached entity
// and compares it with the L2 cache to detect staleness.
// and either invalidates (deletes) the L2 cache entry or compares it for staleness analytics.
// Called from mergeResult on the main thread after the mutation fetch completes.
func (l *Loader) detectMutationEntityImpact(res *result, info *FetchInfo, responseData *astjson.Value) {
if info == nil || info.OperationType != ast.OperationTypeMutation {
return
}
if !l.ctx.cacheAnalyticsEnabled() {
return
}
cfg := res.cacheConfig.MutationEntityImpactConfig
if cfg == nil {
return
}
// Proceed if invalidation is configured or analytics is enabled
if !cfg.InvalidateCache && !l.ctx.cacheAnalyticsEnabled() {
return
}
if info.ProvidesData == nil || len(info.RootFields) == 0 {
return
}
Expand Down Expand Up @@ -994,6 +995,16 @@ func (l *Loader) detectMutationEntityImpact(res *result, info *FetchInfo, respon
return
}

// Invalidate L2 cache entry if configured
if cfg.InvalidateCache {
_ = cache.Delete(l.ctx.ctx, []string{cacheKey})
}

// Analytics comparison requires cacheAnalytics to be enabled
if !l.ctx.cacheAnalyticsEnabled() {
return
}
Comment on lines +998 to +1006
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Read-before-delete for analytics, and handle delete failure explicitly.

With current ordering, when InvalidateCache and analytics are both enabled, Line 1000 deletes first and Line 1020 reads afterward, so mutation impact analytics loses prior cached-state comparison. Also, delete errors are silently ignored, which can leave stale entries without any signal.

💡 Proposed fix
-	// Invalidate L2 cache entry if configured
-	if cfg.InvalidateCache {
-		_ = cache.Delete(l.ctx.ctx, []string{cacheKey})
-	}
-
-	// Analytics comparison requires cacheAnalytics to be enabled
-	if !l.ctx.cacheAnalyticsEnabled() {
-		return
-	}
+	analyticsEnabled := l.ctx.cacheAnalyticsEnabled()
+	var entries []*CacheEntry
+	var err error
+	if analyticsEnabled {
+		// Read first so analytics can compare against the pre-invalidation value.
+		entries, err = cache.Get(l.ctx.ctx, []string{cacheKey})
+	}
+
+	// Invalidate L2 cache entry if configured.
+	if cfg.InvalidateCache {
+		if delErr := cache.Delete(l.ctx.ctx, []string{cacheKey}); delErr != nil {
+			return
+		}
+	}
+
+	if !analyticsEnabled {
+		return
+	}
@@
-	// Look up L2 cache
-	entries, err := cache.Get(l.ctx.ctx, []string{cacheKey})
+	// Compare against pre-invalidation L2 value
 	hadCachedValue := err == nil && len(entries) > 0 && entries[0] != nil && len(entries[0].Value) > 0

Also applies to: 1020-1021

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@v2/pkg/engine/resolve/loader_cache.go` around lines 998 - 1006, The code
currently deletes cache (cache.Delete) before doing analytics
(l.ctx.cacheAnalyticsEnabled()/analytics read), which prevents
read-before-delete comparisons and also ignores delete errors; change the flow
in loader_cache.go so that when cfg.InvalidateCache is true AND
l.ctx.cacheAnalyticsEnabled() is true you first read the existing entry for
analytics (use the same cacheKey read used for analytics comparison), then
perform cache.Delete(cacheKey) and check its returned error instead of
discarding it (log via the existing logger or return the error), and if
analytics is disabled keep the original delete behavior; update both locations
mentioned (around cfg.InvalidateCache / cache.Delete and the similar block at
the later occurrence) to follow this read-then-delete-and-handle-error pattern.


// Build display key (without prefix) for analytics
displayKey := l.buildMutationEntityDisplayKey(cfg, entityData)

Expand Down