Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
693 changes: 693 additions & 0 deletions ENTITY_CACHING_ACCEPTANCE_CRITERIA.md

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions v2/pkg/engine/plan/federation_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ type EntityCacheConfiguration struct {
// Instead, fresh data is always fetched from the subgraph and compared against the cached value
// to detect staleness. L1 cache works normally (not affected by shadow mode).
ShadowMode bool `json:"shadow_mode"`

// NegativeCacheTTL is the TTL for caching null entity results (entity not found).
// When > 0, null responses (entity returned null without errors from _entities) are cached
// as negative sentinels to avoid repeated subgraph lookups for non-existent entities.
// When 0 (default), null entities are not cached and will be re-fetched on every request.
NegativeCacheTTL time.Duration `json:"negative_cache_ttl,omitzero"`
}

// EntityCacheConfigurations is a collection of entity cache configurations.
Expand Down
1 change: 1 addition & 0 deletions v2/pkg/engine/plan/visitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2368,6 +2368,7 @@ func (v *Visitor) configureFetchCaching(internal *objectFetchConfiguration, exte
HashAnalyticsKeys: cacheConfig.HashAnalyticsKeys,
KeyFields: keyFields,
ShadowMode: cacheConfig.ShadowMode,
NegativeCacheTTL: cacheConfig.NegativeCacheTTL,
}
}

Expand Down
7 changes: 7 additions & 0 deletions v2/pkg/engine/resolve/CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,13 @@ L2Reads: []resolve.CacheKeyEvent{

Every `defaultCache.ClearLog()` MUST be followed by `defaultCache.GetLog()` with full assertions BEFORE the next `ClearLog()` or end of test. Never clear a log without verifying its contents.

### Caching Test / AC Sync Rule

**When modifying or adding caching-related tests**, you MUST also update `ENTITY_CACHING_ACCEPTANCE_CRITERIA.md` (in the repo root). Every AC must link to its covering tests with relative paths, line numbers, and test names. This applies to:
- New caching tests (add test links to the relevant AC)
- Changes to existing caching tests that affect which ACs are covered
- New ACs (must have at least one test link)

### Run Tests
```bash
go test -run "TestL1Cache" ./v2/pkg/engine/resolve/... -v
Expand Down
98 changes: 98 additions & 0 deletions v2/pkg/engine/resolve/arena_thread_safety_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package resolve

import (
"strconv"
"sync"
"testing"

"github.com/wundergraph/astjson"
"github.com/wundergraph/go-arena"
)

// cacheLoadAllocs simulates the allocation pattern of tryL2CacheLoad:
// parse cached JSON bytes, create wrapper objects, allocate slices.
func cacheLoadAllocs(a arena.Arena) {
// 1. extractCacheKeysStrings: allocate slice + string bytes
keys := arena.AllocateSlice[string](a, 0, 4)
for range 4 {
buf := arena.AllocateSlice[byte](a, 0, 64)
buf = arena.SliceAppend(a, buf, []byte("cache:entity:Product:id:prod-1234")...)
keys = arena.SliceAppend(a, keys, string(buf))
}
_ = keys

// 2. populateFromCache: parse JSON bytes
v, _ := astjson.ParseBytesWithArena(a, []byte(`{"__typename":"Product","id":"prod-1234","name":"Test Product","price":29.99}`))

// 3. EntityMergePath wrapping: create wrapper objects
obj := astjson.ObjectValue(a)
obj.Set(a, "product", v)
outer := astjson.ObjectValue(a)
outer.Set(a, "data", obj)

// 4. denormalizeFromCache: create new object tree
result := astjson.ObjectValue(a)
result.Set(a, "productName", v.Get("name"))
result.Set(a, "productPrice", v.Get("price"))
}

// BenchmarkConcurrentArena measures Option A: single arena wrapped with NewConcurrentArena.
// All goroutines allocate from the same mutex-protected arena.
func BenchmarkConcurrentArena(b *testing.B) {
for _, goroutines := range []int{1, 4, 8, 16} {
b.Run(goroutineName(goroutines), func(b *testing.B) {
a := arena.NewConcurrentArena(arena.NewMonotonicArena(arena.WithMinBufferSize(64 * 1024)))
b.ResetTimer()
for b.Loop() {
var wg sync.WaitGroup
for range goroutines {
wg.Go(func() {
cacheLoadAllocs(a)
})
}
wg.Wait()
a.Reset()
}
})
}
}

// BenchmarkPerGoroutineArena measures Option B: each goroutine gets its own arena from sync.Pool.
// Zero lock contention on allocations.
func BenchmarkPerGoroutineArena(b *testing.B) {
pool := sync.Pool{
New: func() any {
return arena.NewMonotonicArena(arena.WithMinBufferSize(4096))
},
}

for _, goroutines := range []int{1, 4, 8, 16} {
b.Run(goroutineName(goroutines), func(b *testing.B) {
b.ResetTimer()
for b.Loop() {
arenas := make([]arena.Arena, goroutines)
var wg sync.WaitGroup
for i := range goroutines {
ga := pool.Get().(arena.Arena)
arenas[i] = ga
wg.Go(func() {
cacheLoadAllocs(ga)
})
}
wg.Wait()
for _, ga := range arenas {
ga.Reset()
pool.Put(ga)
}
}
})
}
}

func goroutineName(n int) string {
return "goroutines=" + stringFromInt(n)
}

func stringFromInt(n int) string {
return strconv.Itoa(n)
}
178 changes: 178 additions & 0 deletions v2/pkg/engine/resolve/arena_thread_safety_gc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package resolve

import (
"runtime"
"runtime/debug"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/wundergraph/astjson"
"github.com/wundergraph/go-arena"
)

// TestCrossArenaMergeValuesCreatesShallowReferences proves that MergeValues
// links *Value pointers from the source arena into the target arena's tree
// without deep-copying. Resetting the source arena makes the merged values stale.
//
// This is the foundational invariant for AC-THREAD-04: goroutine arenas that
// hold FromCache values must NOT be released before the response is fully rendered.
func TestCrossArenaMergeValuesCreatesShallowReferences(t *testing.T) {
old := debug.SetGCPercent(1)
defer debug.SetGCPercent(old)

mainArena := arena.NewMonotonicArena(arena.WithMinBufferSize(4096))
goroutineArena := arena.NewMonotonicArena(arena.WithMinBufferSize(4096))

// Parse entity data on the "goroutine" arena (simulates populateFromCache)
fromCache, err := astjson.ParseBytesWithArena(goroutineArena, []byte(`{"id":"prod-1","name":"Widget"}`))
require.NoError(t, err)

// Parse the target item on the main arena (simulates the response tree)
item, err := astjson.ParseBytesWithArena(mainArena, []byte(`{"id":"prod-1"}`))
require.NoError(t, err)

// Merge: this splices FromCache nodes into item's object tree
merged, _, err := astjson.MergeValues(mainArena, item, fromCache)
require.NoError(t, err)

// Verify merged result contains data from both arenas
mergedJSON := string(merged.MarshalTo(nil))
assert.Contains(t, mergedJSON, `"name":"Widget"`)
assert.Contains(t, mergedJSON, `"id":"prod-1"`)

// Force GC to stress-test pointer validity — goroutine arena is still alive
runtime.GC()
runtime.GC()

// Values should still be valid since goroutine arena hasn't been reset
postGCJSON := string(merged.MarshalTo(nil))
assert.Equal(t, mergedJSON, postGCJSON,
"merged values should survive GC when goroutine arena is still alive")

// Now reset the goroutine arena — simulates premature release
goroutineArena.Reset()

// Overwrite the freed memory with different data
_, _ = astjson.ParseBytesWithArena(goroutineArena, []byte(`{"id":"STALE","name":"CORRUPTED"}`))

// The merged tree still holds pointers into the (now overwritten) goroutine arena.
// This proves MergeValues is shallow — accessing the stale data may panic or
// return corrupted values.
staleOrPanicked := func() (result string, panicked bool) {
defer func() {
if r := recover(); r != nil {
panicked = true
}
}()
return string(merged.MarshalTo(nil)), false
}
staleJSON, panicked := staleOrPanicked()
assert.True(t, panicked || staleJSON != mergedJSON,
"merged values should be stale or inaccessible after goroutine arena reset — "+
"this proves MergeValues creates cross-arena shallow references")

runtime.KeepAlive(mainArena)
runtime.KeepAlive(goroutineArena)
}

// TestGoroutineArenaLifetimeWithDeferredRelease verifies the correct pattern:
// goroutine arenas survive through the full resolve lifecycle and are only
// released in Free(). This matches the Loader.goroutineArenas design.
func TestGoroutineArenaLifetimeWithDeferredRelease(t *testing.T) {
old := debug.SetGCPercent(1)
defer debug.SetGCPercent(old)

mainArena := arena.NewMonotonicArena(arena.WithMinBufferSize(4096))

// Simulate multiple goroutines, each with their own arena
const numGoroutines = 4
goroutineArenas := make([]arena.Arena, numGoroutines)
fromCacheValues := make([]*astjson.Value, numGoroutines)

for i := range numGoroutines {
goroutineArenas[i] = arena.NewMonotonicArena(arena.WithMinBufferSize(4096))
var err error
fromCacheValues[i], err = astjson.ParseBytesWithArena(
goroutineArenas[i],
[]byte(`{"id":"prod-`+stringFromInt(i+1)+`","name":"Product `+stringFromInt(i+1)+`"}`),
)
require.NoError(t, err)
}

// Phase 4: merge all FromCache values into main arena tree
items := make([]*astjson.Value, numGoroutines)
for i := range numGoroutines {
items[i], _ = astjson.ParseBytesWithArena(mainArena, []byte(`{"id":"prod-`+stringFromInt(i+1)+`"}`))
merged, _, err := astjson.MergeValues(mainArena, items[i], fromCacheValues[i])
require.NoError(t, err)
items[i] = merged
}

// GC pressure — all arenas still alive
runtime.GC()
runtime.GC()

// Verify all merged values are still valid (simulates response rendering)
for i := range numGoroutines {
json := string(items[i].MarshalTo(nil))
assert.Contains(t, json, `"name":"Product `+stringFromInt(i+1)+`"`,
"merged value %d should be readable with goroutine arenas alive", i)
}

// Now release goroutine arenas (simulates Loader.Free())
for _, a := range goroutineArenas {
a.Reset()
}

runtime.KeepAlive(mainArena)
runtime.KeepAlive(goroutineArenas)
}

// Benchmark_CrossArenaGCSafety exercises the goroutine arena pattern under GC
// pressure. Each iteration creates goroutine arenas, merges values, renders the
// result, then releases. runtime.GC() between iterations maximizes pressure on
// any dangling pointers.
func Benchmark_CrossArenaGCSafety(b *testing.B) {
old := debug.SetGCPercent(1)
defer debug.SetGCPercent(old)

entityJSON := []byte(`{"__typename":"Product","id":"prod-1","name":"Widget","price":9.99}`)
itemJSON := []byte(`{"__typename":"Product","id":"prod-1"}`)

b.ResetTimer()
for b.Loop() {
mainArena := arena.NewMonotonicArena(arena.WithMinBufferSize(4096))
goroutineArena := arena.NewMonotonicArena(arena.WithMinBufferSize(4096))

// Simulate goroutine: parse cached entity
fromCache, err := astjson.ParseBytesWithArena(goroutineArena, entityJSON)
if err != nil {
b.Fatal(err)
}

// Simulate Phase 4: merge into response tree
item, err := astjson.ParseBytesWithArena(mainArena, itemJSON)
if err != nil {
b.Fatal(err)
}
merged, _, err := astjson.MergeValues(mainArena, item, fromCache)
if err != nil {
b.Fatal(err)
}

// Simulate response rendering
buf := merged.MarshalTo(nil)
if len(buf) == 0 {
b.Fatal("empty output")
}

// Release (correct order: goroutine arena after rendering)
goroutineArena.Reset()
mainArena.Reset()

// GC pressure between iterations
runtime.GC()
}
}
29 changes: 29 additions & 0 deletions v2/pkg/engine/resolve/cache_analytics.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,18 @@ type MutationEvent struct {
FreshBytes int
}

// CacheOperationError records a cache operation (Get/Set/Delete) that returned an error.
// Cache errors are non-fatal (the engine falls back to subgraph fetch), but tracking them
// in analytics allows operators to detect cache infrastructure issues.
type CacheOperationError struct {
Operation string // "get", "set", or "delete"
CacheName string // named cache instance
EntityType string // entity type (empty for root fetches)
DataSource string // subgraph name
Message string // error message (truncated for safety)
ItemCount int // number of keys involved in the failed operation
}

// HeaderImpactEvent records a fresh fetch that wrote to L2 cache with header-prefixed keys.
// A cross-request consumer can aggregate these events: when the same BaseKey appears with
// different HeaderHash values but identical ResponseHash values, the forwarded headers
Expand Down Expand Up @@ -170,6 +182,8 @@ type CacheAnalyticsCollector struct {
shadowComparisons []ShadowComparisonEvent // shadow mode staleness comparison events
mutationEvents []MutationEvent // mutation entity impact events
headerImpactEvents []HeaderImpactEvent // header impact events for L2 writes with header prefix
cacheOpErrors []CacheOperationError // cache operation errors (main thread)
l2CacheOpErrors []CacheOperationError // accumulated in goroutines, merged on main thread
xxh *xxhash.Digest
}

Expand Down Expand Up @@ -322,6 +336,17 @@ func (c *CacheAnalyticsCollector) RecordHeaderImpactEvent(event HeaderImpactEven
c.headerImpactEvents = append(c.headerImpactEvents, event)
}

// RecordCacheOperationError records a cache operation error. Main thread only.
func (c *CacheAnalyticsCollector) RecordCacheOperationError(event CacheOperationError) {
c.cacheOpErrors = append(c.cacheOpErrors, event)
}

// MergeL2CacheOpErrors merges cache operation errors collected in goroutines into the collector.
// Must be called on the main thread.
func (c *CacheAnalyticsCollector) MergeL2CacheOpErrors(events []CacheOperationError) {
c.cacheOpErrors = append(c.cacheOpErrors, events...)
}

// EntitySource returns the source for a given entity instance.
// Returns FieldSourceSubgraph if no record is found (the default).
func (c *CacheAnalyticsCollector) EntitySource(entityType, keyJSON string) FieldSource {
Expand All @@ -347,6 +372,7 @@ func (c *CacheAnalyticsCollector) Snapshot() CacheAnalyticsSnapshot {
ShadowComparisons: deduplicateShadowComparisons(c.shadowComparisons),
MutationEvents: c.mutationEvents,
HeaderImpactEvents: deduplicateHeaderImpactEvents(c.headerImpactEvents),
CacheOpErrors: c.cacheOpErrors,
}

// Split write events into L1 and L2, then deduplicate each
Expand Down Expand Up @@ -487,6 +513,9 @@ type CacheAnalyticsSnapshot struct {

// Header impact events (L2 writes with header-prefixed keys)
HeaderImpactEvents []HeaderImpactEvent

// Cache operation errors (Get/Set/Delete failures)
CacheOpErrors []CacheOperationError
}

// L1HitRate returns the L1 cache hit rate as a float64 in [0, 1].
Expand Down
Loading