diff --git a/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/cache/cache.go b/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/cache/cache.go index f9b9e03a94..87b9a075d9 100644 --- a/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/cache/cache.go +++ b/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/cache/cache.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "slices" "sort" "sync" "time" @@ -147,7 +148,7 @@ func (c *NamespacedOperatorCache) Error() error { func (c *Cache) Namespaced(namespaces ...string) MultiCatalogOperatorFinder { const ( - CachePopulateTimeout = time.Minute + cachePopulateTimeout = time.Minute ) sources := c.sp.Sources(namespaces...) @@ -209,19 +210,35 @@ func (c *Cache) Namespaced(namespaces ...string) MultiCatalogOperatorFinder { } misses = misses[found:] + // remove any with a "live" outstanding request + misses = slices.DeleteFunc(misses, func(key SourceKey) bool { + hdr, _ := c.snapshots[key] + + // if we already have a request timestamp, we have an outstanding request, so prevent stacking + // and just send new requests if the previous one has expired + if hdr != nil && hdr.RequestSentinelActive() { + c.logger.Printf("Skipping new request for %s, already in progress", key) + return true + } + return false + }) + for _, miss := range misses { - ctx, cancel := context.WithTimeout(context.Background(), CachePopulateTimeout) + ctx, cancel := context.WithTimeout(context.Background(), cachePopulateTimeout) hdr := snapshotHeader{ - key: miss, - pop: cancel, - priority: c.sourcePriorityProvider.Priority(miss), + key: miss, + pop: cancel, + priority: c.sourcePriorityProvider.Priority(miss), + requestSentinel: time.Now().Add(cachePopulateTimeout), // set sentinel to prevent stacking requests } hdr.m.Lock() c.snapshots[miss] = &hdr result.snapshots[miss] = &hdr + // don't adjust the request sentinel in the goroutine for any outcome, so that we don't stampede sources + // instead, reevaluate the sentinel during the next snapshot go func(ctx context.Context, hdr *snapshotHeader, source Source) { defer hdr.m.Unlock() c.sem <- struct{}{} @@ -294,6 +311,8 @@ type snapshotHeader struct { pop context.CancelFunc err error priority int + + requestSentinel time.Time } func (hdr *snapshotHeader) Cancel() { @@ -314,6 +333,13 @@ func (hdr *snapshotHeader) Valid() bool { return true } +func (hdr *snapshotHeader) RequestSentinelActive() bool { + hdr.m.RLock() + defer hdr.m.RUnlock() + + return time.Now().Before(hdr.requestSentinel) +} + type sortableSnapshots struct { snapshots []*snapshotHeader preferredNamespace string diff --git a/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/source_registry.go b/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/source_registry.go index f20cab0eba..c45a83fca8 100644 --- a/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/source_registry.go +++ b/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/source_registry.go @@ -75,6 +75,8 @@ type RegistrySourceProvider struct { invalidator *sourceInvalidator } +const defaultCacheLifetime time.Duration = 30 * time.Minute + func SourceProviderFromRegistryClientProvider(rcp RegistryClientProvider, catsrcLister v1alpha1listers.CatalogSourceLister, logger logrus.StdLogger) *RegistrySourceProvider { return &RegistrySourceProvider{ rcp: rcp, @@ -82,7 +84,7 @@ func SourceProviderFromRegistryClientProvider(rcp RegistryClientProvider, catsrc catsrcLister: catsrcLister, invalidator: &sourceInvalidator{ validChans: make(map[cache.SourceKey]chan struct{}), - ttl: 5 * time.Minute, + ttl: defaultCacheLifetime, }, } } diff --git a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/cache/cache.go b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/cache/cache.go index f9b9e03a94..772231121d 100644 --- a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/cache/cache.go +++ b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/cache/cache.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "slices" "sort" "sync" "time" @@ -147,7 +148,7 @@ func (c *NamespacedOperatorCache) Error() error { func (c *Cache) Namespaced(namespaces ...string) MultiCatalogOperatorFinder { const ( - CachePopulateTimeout = time.Minute + cachePopulateTimeout = time.Minute ) sources := c.sp.Sources(namespaces...) @@ -169,7 +170,9 @@ func (c *Cache) Namespaced(namespaces ...string) MultiCatalogOperatorFinder { if snapshot.Valid() { result.snapshots[key] = snapshot } else { - misses = append(misses, key) + if !snapshot.RequestSentinelActive() { + misses = append(misses, key) + } } }() } @@ -209,19 +212,35 @@ func (c *Cache) Namespaced(namespaces ...string) MultiCatalogOperatorFinder { } misses = misses[found:] + // remove any with a "live" outstanding request + misses = slices.DeleteFunc(misses, func(key SourceKey) bool { + hdr, _ := c.snapshots[key] + + // if we already have a request timestamp, we have an outstanding request, so prevent stacking + // and just send new requests if the previous one has expired + if hdr != nil && hdr.RequestSentinelActive() { + c.logger.Printf("Skipping new request for %s, already in progress", key) + return true + } + return false + }) + for _, miss := range misses { - ctx, cancel := context.WithTimeout(context.Background(), CachePopulateTimeout) + ctx, cancel := context.WithTimeout(context.Background(), cachePopulateTimeout) hdr := snapshotHeader{ - key: miss, - pop: cancel, - priority: c.sourcePriorityProvider.Priority(miss), + key: miss, + pop: cancel, + priority: c.sourcePriorityProvider.Priority(miss), + requestSentinel: time.Now().Add(cachePopulateTimeout), // set sentinel to prevent stacking requests } hdr.m.Lock() c.snapshots[miss] = &hdr result.snapshots[miss] = &hdr + // don't adjust the request sentinel in the goroutine for any outcome, so that we don't stampede sources + // instead, reevaluate the sentinel during the next snapshot go func(ctx context.Context, hdr *snapshotHeader, source Source) { defer hdr.m.Unlock() c.sem <- struct{}{} @@ -294,6 +313,8 @@ type snapshotHeader struct { pop context.CancelFunc err error priority int + + requestSentinel time.Time } func (hdr *snapshotHeader) Cancel() { @@ -314,6 +335,15 @@ func (hdr *snapshotHeader) Valid() bool { return true } +func (hdr *snapshotHeader) RequestSentinelActive() bool { + hdr.m.RLock() + defer hdr.m.RUnlock() + if hdr != nil && time.Now().Before(hdr.requestSentinel) { + return true + } + return false +} + type sortableSnapshots struct { snapshots []*snapshotHeader preferredNamespace string diff --git a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/source_registry.go b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/source_registry.go index f20cab0eba..c45a83fca8 100644 --- a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/source_registry.go +++ b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/source_registry.go @@ -75,6 +75,8 @@ type RegistrySourceProvider struct { invalidator *sourceInvalidator } +const defaultCacheLifetime time.Duration = 30 * time.Minute + func SourceProviderFromRegistryClientProvider(rcp RegistryClientProvider, catsrcLister v1alpha1listers.CatalogSourceLister, logger logrus.StdLogger) *RegistrySourceProvider { return &RegistrySourceProvider{ rcp: rcp, @@ -82,7 +84,7 @@ func SourceProviderFromRegistryClientProvider(rcp RegistryClientProvider, catsrc catsrcLister: catsrcLister, invalidator: &sourceInvalidator{ validChans: make(map[cache.SourceKey]chan struct{}), - ttl: 5 * time.Minute, + ttl: defaultCacheLifetime, }, } }