Skip to content
Merged
128 changes: 93 additions & 35 deletions pkg/remoteresolution/resolver/framework/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,24 +481,39 @@ func TestCacheConcurrentWrites(t *testing.T) {

// Launch 500 concurrent writers
numWriters := 500
entriesPerWriter := 10
done := make(chan bool, numWriters)

// Track all entries written for later verification
type entryInfo struct {
params []pipelinev1.Param
expectedData string
}
allEntries := make([]entryInfo, numWriters*entriesPerWriter)

for i := range numWriters {
go func(writerID int) {
defer func() { done <- true }()

// Each writer adds 10 unique entries
for j := range 10 {
for j := range entriesPerWriter {
params := []pipelinev1.Param{
{Name: "bundle", Value: pipelinev1.ParamValue{
Type: pipelinev1.ParamTypeString,
StringVal: fmt.Sprintf("registry.io/writer%d-entry%d@sha256:%064d", writerID, j, writerID*100+j),
}},
}
expectedData := fmt.Sprintf("writer-%d-data-%d", writerID, j)
mockResource := &mockResolvedResource{
data: []byte(fmt.Sprintf("writer-%d-data-%d", writerID, j)),
data: []byte(expectedData),
}
cache.Add(resolverType, params, mockResource)

// Record this entry for verification
allEntries[writerID*entriesPerWriter+j] = entryInfo{
params: params,
expectedData: expectedData,
}
}
}(i)
}
Expand All @@ -508,45 +523,39 @@ func TestCacheConcurrentWrites(t *testing.T) {
<-done
}

// After all concurrent writes, add some entries synchronously
// These are guaranteed to be the most recent entries in the cache
numSyncEntries := 20
syncEntries := make([][]pipelinev1.Param, numSyncEntries)
for i := range numSyncEntries {
params := []pipelinev1.Param{
{Name: "bundle", Value: pipelinev1.ParamValue{
Type: pipelinev1.ParamTypeString,
StringVal: fmt.Sprintf("registry.io/sync-entry%d@sha256:%064d", i, 999000+i),
}},
}
syncEntries[i] = params
mockResource := &mockResolvedResource{
data: []byte(fmt.Sprintf("sync-data-%d", i)),
}
cache.Add(resolverType, params, mockResource)
}

// Verify all synchronous entries are retrievable
// Since they were written most recently, they should all be in cache
// Verify that concurrent writes are retrievable and have correct data
// With 5000 entries (500 writers * 10 entries each) and cache size of 1000,
// we expect many entries to be evicted due to LRU. We verify that:
// 1. Entries that ARE in cache have the correct data
// 2. We get a reasonable hit rate
cachedCount := 0
for i, params := range syncEntries {
cached, ok := cache.Get(resolverType, params)
if !ok || cached == nil {
t.Errorf("Expected cache hit for sync entry %d, but got miss", i)
} else {
wrongDataCount := 0

for _, entry := range allEntries {
cached, ok := cache.Get(resolverType, entry.params)
if ok {
cachedCount++
// Verify the data is correct
expectedData := fmt.Sprintf("sync-data-%d", i)
if string(cached.Data()) != expectedData {
t.Errorf("Expected data '%s' for sync entry %d, got '%s'", expectedData, i, string(cached.Data()))
if string(cached.Data()) != entry.expectedData {
wrongDataCount++
t.Errorf("Expected data '%s', got '%s'", entry.expectedData, string(cached.Data()))
}
}
}

// All synchronous entries should be in cache since they were written most recently
if cachedCount != numSyncEntries {
t.Errorf("Expected all %d synchronous entries to be cached, but only found %d", numSyncEntries, cachedCount)
// We should have no entries with wrong data
if wrongDataCount > 0 {
t.Errorf("Found %d entries with wrong data", wrongDataCount)
}

// We should have some reasonable number of entries cached
// With 5000 entries and cache size 1000, we expect close to 1000 entries to be cached
// Using a lower bound of 500 to account for concurrent evictions and timing
if cachedCount < 500 {
t.Errorf("Expected at least 500 entries to be cached, but only found %d out of %d total", cachedCount, len(allEntries))
}

t.Logf("Concurrent write test passed: %d entries cached out of %d written", cachedCount, len(allEntries))
}

func TestCacheConcurrentReadWrite(t *testing.T) {
Expand Down Expand Up @@ -574,9 +583,17 @@ func TestCacheConcurrentReadWrite(t *testing.T) {
// Launch 300 readers and 300 writers concurrently
numReaders := 300
numWriters := 300
entriesPerWriter := 5
totalGoroutines := numReaders + numWriters
done := make(chan bool, totalGoroutines)

// Track entries written by concurrent writers for later verification
type entryInfo struct {
params []pipelinev1.Param
expectedData string
}
writerEntries := make([]entryInfo, numWriters*entriesPerWriter)

// Start readers
for i := range numReaders {
go func(readerID int) {
Expand All @@ -594,17 +611,24 @@ func TestCacheConcurrentReadWrite(t *testing.T) {
go func(writerID int) {
defer func() { done <- true }()

for j := range 5 {
for j := range entriesPerWriter {
params := []pipelinev1.Param{
{Name: "bundle", Value: pipelinev1.ParamValue{
Type: pipelinev1.ParamTypeString,
StringVal: fmt.Sprintf("registry.io/concurrent-writer%d-entry%d@sha256:%064d", writerID, j, writerID*10+j),
}},
}
expectedData := fmt.Sprintf("concurrent-writer-%d-data-%d", writerID, j)
mockResource := &mockResolvedResource{
data: []byte(fmt.Sprintf("concurrent-writer-%d-data-%d", writerID, j)),
data: []byte(expectedData),
}
cache.Add(resolverType, params, mockResource)

// Record this entry for verification
writerEntries[writerID*entriesPerWriter+j] = entryInfo{
params: params,
expectedData: expectedData,
}
}
}(i)
}
Expand All @@ -613,6 +637,40 @@ func TestCacheConcurrentReadWrite(t *testing.T) {
for range totalGoroutines {
<-done
}

// Verify that concurrent writes are retrievable and have correct data
// With 1500 new entries (300 writers * 5 entries each) plus 100 initial entries,
// and cache size of 500, we expect some entries to be evicted. We verify that:
// 1. Entries that ARE in cache have the correct data
// 2. We get a reasonable hit rate for the concurrent writes
cachedCount := 0
wrongDataCount := 0

for _, entry := range writerEntries {
cached, ok := cache.Get(resolverType, entry.params)
if ok {
cachedCount++
// Verify the data is correct
if string(cached.Data()) != entry.expectedData {
wrongDataCount++
t.Errorf("Expected data '%s', got '%s'", entry.expectedData, string(cached.Data()))
}
}
}

// We should have no entries with wrong data
if wrongDataCount > 0 {
t.Errorf("Found %d entries with wrong data", wrongDataCount)
}

// We should have some reasonable number of entries cached
// With 1500 concurrent write entries and cache size 500, accounting for initial entries
// and concurrent evictions, we expect at least 200 of the writer entries to be cached
if cachedCount < 200 {
t.Errorf("Expected at least 200 writer entries to be cached, but only found %d out of %d total", cachedCount, len(writerEntries))
}

t.Logf("Concurrent read/write test passed: %d writer entries cached out of %d written", cachedCount, len(writerEntries))
}

func TestCacheConcurrentEviction(t *testing.T) {
Expand Down
35 changes: 20 additions & 15 deletions pkg/remoteresolution/resolver/framework/cache/configstore.go
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This didn't change anything but make the cacheConfig and its properties public. @waveywaves

Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,10 @@ var (

type cacheConfigKey struct{}

type cacheConfig struct {
maxSize int
ttl time.Duration
// Config holds the configuration for the resolver cache
type Config struct {
MaxSize int
TTL time.Duration
}

type CacheConfigStore struct {
Expand All @@ -65,7 +66,7 @@ func NewCacheConfigStore(cacheConfigName string, logger configmap.Logger) *Cache
defaultConfigMapName,
logger,
configmap.Constructors{
getCacheConfigName(): parseCacheConfigMap,
getCacheConfigName(): NewConfigFromConfigMap,
},
onCacheConfigChanged,
),
Expand All @@ -78,13 +79,16 @@ func (store *CacheConfigStore) WatchConfigs(w configmap.Watcher) {
})
}

func (store *CacheConfigStore) GetResolverConfig() cacheConfig {
func (store *CacheConfigStore) GetResolverConfig() *Config {
untypedConf := store.untyped.UntypedLoad(store.cacheConfigName)
if cacheConf, ok := untypedConf.(cacheConfig); ok {
if cacheConf, ok := untypedConf.(*Config); ok {
return cacheConf
}

return cacheConfig{}
return &Config{
MaxSize: defaultCacheSize,
TTL: defaultExpiration,
}
}

// ToContext returns a new context with the cache's configuration
Expand All @@ -103,10 +107,11 @@ func getCacheConfigName() string {
return defaultConfigMapName
}

func parseCacheConfigMap(cm *corev1.ConfigMap) (*cacheConfig, error) {
config := &cacheConfig{
maxSize: defaultCacheSize,
ttl: defaultExpiration,
// NewConfigFromConfigMap creates a Config from a ConfigMap
func NewConfigFromConfigMap(cm *corev1.ConfigMap) (*Config, error) {
config := &Config{
MaxSize: defaultCacheSize,
TTL: defaultExpiration,
}

if cm == nil {
Expand All @@ -115,27 +120,27 @@ func parseCacheConfigMap(cm *corev1.ConfigMap) (*cacheConfig, error) {

if maxSizeStr, ok := cm.Data[maxSizeConfigMapKey]; ok {
if parsed, err := strconv.Atoi(maxSizeStr); err == nil && parsed > 0 {
config.maxSize = parsed
config.MaxSize = parsed
}
}

if ttlStr, ok := cm.Data[ttlConfigMapKey]; ok {
if parsed, err := time.ParseDuration(ttlStr); err == nil && parsed > 0 {
config.ttl = parsed
config.TTL = parsed
}
}

return config, nil
}

func onCacheConfigChanged(_ string, value any) {
config, ok := value.(*cacheConfig)
config, ok := value.(*Config)
if !ok {
return
}

cacheMu.Lock()
defer cacheMu.Unlock()

sharedCache = newResolverCache(config.maxSize, config.ttl)
sharedCache = newResolverCache(config.MaxSize, config.TTL)
}
32 changes: 16 additions & 16 deletions pkg/remoteresolution/resolver/framework/cache/configstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,17 +181,17 @@ func TestParseCacheConfigMap(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
config, err := parseCacheConfigMap(tt.configMap)
config, err := NewConfigFromConfigMap(tt.configMap)
if err != nil {
t.Fatalf("parseCacheConfigMap() returned error: %v", err)
t.Fatalf("NewConfigFromConfigMap() returned error: %v", err)
}

if config.maxSize != tt.expectedMaxSize {
t.Errorf("maxSize = %d, want %d", config.maxSize, tt.expectedMaxSize)
if config.MaxSize != tt.expectedMaxSize {
t.Errorf("MaxSize = %d, want %d", config.MaxSize, tt.expectedMaxSize)
}

if config.ttl != tt.expectedTTL {
t.Errorf("ttl = %v, want %v", config.ttl, tt.expectedTTL)
if config.TTL != tt.expectedTTL {
t.Errorf("TTL = %v, want %v", config.TTL, tt.expectedTTL)
}
})
}
Expand All @@ -204,9 +204,9 @@ func TestOnCacheConfigChanged(t *testing.T) {
_ = Get(ctx)

// Test that onCacheConfigChanged updates the shared cache with new config values
config := &cacheConfig{
maxSize: 500,
ttl: 10 * time.Minute,
config := &Config{
MaxSize: 500,
TTL: 10 * time.Minute,
}

// Call onCacheConfigChanged to update the shared cache
Expand All @@ -219,21 +219,21 @@ func TestOnCacheConfigChanged(t *testing.T) {
}

// Verify TTL was applied
if cache.TTL() != config.ttl {
t.Errorf("Expected TTL to be %v, got %v", config.ttl, cache.TTL())
if cache.TTL() != config.TTL {
t.Errorf("Expected TTL to be %v, got %v", config.TTL, cache.TTL())
}

// Verify MaxSize was applied
if cache.MaxSize() != config.maxSize {
t.Errorf("Expected MaxSize to be %d, got %d", config.maxSize, cache.MaxSize())
if cache.MaxSize() != config.MaxSize {
t.Errorf("Expected MaxSize to be %d, got %d", config.MaxSize, cache.MaxSize())
}
}

func TestOnCacheConfigChangedWithInvalidType(t *testing.T) {
// First, set up a known good config
goodConfig := &cacheConfig{
maxSize: defaultCacheSize,
ttl: defaultExpiration,
goodConfig := &Config{
MaxSize: defaultCacheSize,
TTL: defaultExpiration,
}
onCacheConfigChanged("test-config", goodConfig)

Expand Down
Loading