From 3e6b5f89f38207ed304e18485d72dd3c1bfa6649 Mon Sep 17 00:00:00 2001 From: Joe Lanford Date: Thu, 16 Jan 2025 10:03:31 -0500 Subject: [PATCH 1/3] :bug: use operator cache provider for deprecation updates to limit calls to GRPC server (#3490) * add log and metric to instrument count of catalog source snapshots Signed-off-by: Joe Lanford * use operator cache provider for deprecation updates to limit calls to GRPC server Signed-off-by: Joe Lanford --------- Signed-off-by: Joe Lanford Upstream-repository: operator-lifecycle-manager Upstream-commit: 1274d54d885649786733d0b3fe499e9670f3310d --- .../controller/operators/catalog/operator.go | 8 ++++-- .../operators/catalog/subscription/config.go | 6 ++--- .../catalog/subscription/reconciler.go | 27 +++++++------------ .../operators/catalog/subscription/syncer.go | 6 ++--- .../controller/registry/resolver/resolver.go | 6 ++--- .../registry/resolver/source_registry.go | 4 +++ .../registry/resolver/step_resolver.go | 10 +++++-- .../pkg/metrics/metrics.go | 21 +++++++++++++++ .../controller/operators/catalog/operator.go | 8 ++++-- .../operators/catalog/subscription/config.go | 6 ++--- .../catalog/subscription/reconciler.go | 27 +++++++------------ .../operators/catalog/subscription/syncer.go | 6 ++--- .../controller/registry/resolver/resolver.go | 6 ++--- .../registry/resolver/source_registry.go | 4 +++ .../registry/resolver/step_resolver.go | 10 +++++-- .../pkg/metrics/metrics.go | 21 +++++++++++++++ 16 files changed, 114 insertions(+), 62 deletions(-) diff --git a/staging/operator-lifecycle-manager/pkg/controller/operators/catalog/operator.go b/staging/operator-lifecycle-manager/pkg/controller/operators/catalog/operator.go index 250282cfb3..f9eaeb5eeb 100644 --- a/staging/operator-lifecycle-manager/pkg/controller/operators/catalog/operator.go +++ b/staging/operator-lifecycle-manager/pkg/controller/operators/catalog/operator.go @@ -131,6 +131,7 @@ type Operator struct { clientFactory clients.Factory muInstallPlan sync.Mutex resolverSourceProvider *resolver.RegistrySourceProvider + operatorCacheProvider resolvercache.OperatorCacheProvider } type CatalogSourceSyncFunc func(logger *logrus.Entry, in *v1alpha1.CatalogSource) (out *v1alpha1.CatalogSource, continueSync bool, syncError error) @@ -217,8 +218,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo } op.sources = grpc.NewSourceStore(logger, 10*time.Second, 10*time.Minute, op.syncSourceState) op.resolverSourceProvider = resolver.SourceProviderFromRegistryClientProvider(op.sources, lister.OperatorsV1alpha1().CatalogSourceLister(), logger) + op.operatorCacheProvider = resolver.NewOperatorCacheProvider(lister, crClient, op.resolverSourceProvider, logger) op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, opClient, configmapRegistryImage, op.now, ssaClient, workloadUserID, opmImage, utilImage) - res := resolver.NewOperatorStepResolver(lister, crClient, operatorNamespace, op.resolverSourceProvider, logger) + res := resolver.NewOperatorStepResolver(lister, crClient, operatorNamespace, op.operatorCacheProvider, logger) op.resolver = resolver.NewInstrumentedResolver(res, metrics.RegisterDependencyResolutionSuccess, metrics.RegisterDependencyResolutionFailure) // Wire OLM CR sharedIndexInformers @@ -360,7 +362,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo subscription.WithAppendedReconcilers(subscription.ReconcilerFromLegacySyncHandler(op.syncSubscriptions)), subscription.WithRegistryReconcilerFactory(op.reconciler), subscription.WithGlobalCatalogNamespace(op.namespace), - subscription.WithSourceProvider(op.resolverSourceProvider), + subscription.WithOperatorCacheProvider(op.operatorCacheProvider), ) if err != nil { return nil, err @@ -781,6 +783,7 @@ func (o *Operator) syncSourceState(state grpc.SourceState) { o.logger.Infof("state.Key.Namespace=%s state.Key.Name=%s state.State=%s", state.Key.Namespace, state.Key.Name, state.State.String()) metrics.RegisterCatalogSourceState(state.Key.Name, state.Key.Namespace, state.State) + metrics.RegisterCatalogSourceSnapshotsTotal(state.Key.Name, state.Key.Namespace) switch state.State { case connectivity.Ready: @@ -896,6 +899,7 @@ func (o *Operator) handleCatSrcDeletion(obj interface{}) { o.logger.WithField("source", sourceKey).Info("removed client for deleted catalogsource") metrics.DeleteCatalogSourceStateMetric(catsrc.GetName(), catsrc.GetNamespace()) + metrics.DeleteCatalogSourceSnapshotsTotal(catsrc.GetName(), catsrc.GetNamespace()) } func validateSourceType(logger *logrus.Entry, in *v1alpha1.CatalogSource) (out *v1alpha1.CatalogSource, continueSync bool, _ error) { diff --git a/staging/operator-lifecycle-manager/pkg/controller/operators/catalog/subscription/config.go b/staging/operator-lifecycle-manager/pkg/controller/operators/catalog/subscription/config.go index 01c8a21899..9b4152c9c1 100644 --- a/staging/operator-lifecycle-manager/pkg/controller/operators/catalog/subscription/config.go +++ b/staging/operator-lifecycle-manager/pkg/controller/operators/catalog/subscription/config.go @@ -28,7 +28,7 @@ type syncerConfig struct { reconcilers kubestate.ReconcilerChain registryReconcilerFactory reconciler.RegistryReconcilerFactory globalCatalogNamespace string - sourceProvider resolverCache.SourceProvider + operatorCacheProvider resolverCache.OperatorCacheProvider } // SyncerOption is a configuration option for a subscription syncer. @@ -131,9 +131,9 @@ func WithGlobalCatalogNamespace(namespace string) SyncerOption { } } -func WithSourceProvider(provider resolverCache.SourceProvider) SyncerOption { +func WithOperatorCacheProvider(provider resolverCache.OperatorCacheProvider) SyncerOption { return func(config *syncerConfig) { - config.sourceProvider = provider + config.operatorCacheProvider = provider } } diff --git a/staging/operator-lifecycle-manager/pkg/controller/operators/catalog/subscription/reconciler.go b/staging/operator-lifecycle-manager/pkg/controller/operators/catalog/subscription/reconciler.go index 1a6741a650..fa9dd79d28 100644 --- a/staging/operator-lifecycle-manager/pkg/controller/operators/catalog/subscription/reconciler.go +++ b/staging/operator-lifecycle-manager/pkg/controller/operators/catalog/subscription/reconciler.go @@ -57,7 +57,8 @@ type catalogHealthReconciler struct { catalogLister listers.CatalogSourceLister registryReconcilerFactory reconciler.RegistryReconcilerFactory globalCatalogNamespace string - sourceProvider cache.SourceProvider + operatorCacheProvider cache.OperatorCacheProvider + logger logrus.StdLogger } // Reconcile reconciles subscription catalog health conditions. @@ -126,21 +127,16 @@ func (c *catalogHealthReconciler) Reconcile(ctx context.Context, in kubestate.St // updateDeprecatedStatus adds deprecation status conditions to the subscription when present in the cache entry then // returns a bool value of true if any changes to the existing subscription have occurred. func (c *catalogHealthReconciler) updateDeprecatedStatus(ctx context.Context, sub *v1alpha1.Subscription) (bool, error) { - if c.sourceProvider == nil { + if c.operatorCacheProvider == nil { return false, nil } - source, ok := c.sourceProvider.Sources(sub.Spec.CatalogSourceNamespace)[cache.SourceKey{ + + entries := c.operatorCacheProvider.Namespaced(sub.Spec.CatalogSourceNamespace).Catalog(cache.SourceKey{ Name: sub.Spec.CatalogSource, Namespace: sub.Spec.CatalogSourceNamespace, - }] - if !ok { - return false, nil - } - snapshot, err := source.Snapshot(ctx) - if err != nil { - return false, err - } - if len(snapshot.Entries) == 0 { + }).Find(cache.PkgPredicate(sub.Spec.Package), cache.ChannelPredicate(sub.Spec.Channel)) + + if len(entries) == 0 { return false, nil } @@ -149,12 +145,9 @@ func (c *catalogHealthReconciler) updateDeprecatedStatus(ctx context.Context, su var deprecations *cache.Deprecations found := false - for _, entry := range snapshot.Entries { + for _, entry := range entries { // Find the cache entry that matches this subscription - if entry.SourceInfo == nil || entry.Package() != sub.Spec.Package { - continue - } - if sub.Spec.Channel != "" && entry.Channel() != sub.Spec.Channel { + if entry.SourceInfo == nil { continue } if sub.Status.InstalledCSV != entry.Name { diff --git a/staging/operator-lifecycle-manager/pkg/controller/operators/catalog/subscription/syncer.go b/staging/operator-lifecycle-manager/pkg/controller/operators/catalog/subscription/syncer.go index b39adc6bc0..564930ef3c 100644 --- a/staging/operator-lifecycle-manager/pkg/controller/operators/catalog/subscription/syncer.go +++ b/staging/operator-lifecycle-manager/pkg/controller/operators/catalog/subscription/syncer.go @@ -16,7 +16,6 @@ import ( "github.com/operator-framework/api/pkg/operators/install" "github.com/operator-framework/api/pkg/operators/v1alpha1" listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1alpha1" - resolverCache "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/cache" "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate" "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/ownerutil" "github.com/operator-framework/operator-lifecycle-manager/pkg/metrics" @@ -38,7 +37,6 @@ type subscriptionSyncer struct { installPlanLister listers.InstallPlanLister globalCatalogNamespace string notify kubestate.NotifyFunc - sourceProvider resolverCache.SourceProvider } // now returns the Syncer's current time. @@ -218,7 +216,6 @@ func newSyncerWithConfig(ctx context.Context, config *syncerConfig) (kubestate.S reconcilers: config.reconcilers, subscriptionCache: config.subscriptionInformer.GetIndexer(), installPlanLister: config.lister.OperatorsV1alpha1().InstallPlanLister(), - sourceProvider: config.sourceProvider, notify: func(event types.NamespacedName) { // Notify Subscriptions by enqueuing to the Subscription queue. config.subscriptionQueue.Add(event) @@ -256,7 +253,8 @@ func newSyncerWithConfig(ctx context.Context, config *syncerConfig) (kubestate.S catalogLister: config.lister.OperatorsV1alpha1().CatalogSourceLister(), registryReconcilerFactory: config.registryReconcilerFactory, globalCatalogNamespace: config.globalCatalogNamespace, - sourceProvider: config.sourceProvider, + operatorCacheProvider: config.operatorCacheProvider, + logger: config.logger, }, } s.reconcilers = append(defaultReconcilers, s.reconcilers...) diff --git a/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/resolver.go b/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/resolver.go index c19aba9f26..322b581fb4 100644 --- a/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/resolver.go +++ b/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/resolver.go @@ -29,15 +29,15 @@ type constraintProvider interface { } type Resolver struct { - cache *cache.Cache + cache cache.OperatorCacheProvider log logrus.FieldLogger pc *predicateConverter systemConstraintsProvider constraintProvider } -func NewDefaultResolver(rcp cache.SourceProvider, sourcePriorityProvider cache.SourcePriorityProvider, logger logrus.FieldLogger) *Resolver { +func NewDefaultResolver(cacheProvider cache.OperatorCacheProvider, logger logrus.FieldLogger) *Resolver { return &Resolver{ - cache: cache.New(rcp, cache.WithLogger(logger), cache.WithSourcePriorityProvider(sourcePriorityProvider)), + cache: cacheProvider, log: logger, pc: &predicateConverter{ celEnv: constraints.NewCelEnvironment(), diff --git a/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/source_registry.go b/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/source_registry.go index fe193beae8..f20cab0eba 100644 --- a/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/source_registry.go +++ b/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/source_registry.go @@ -12,6 +12,7 @@ import ( v1alpha1listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1alpha1" "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry" "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/cache" + "github.com/operator-framework/operator-lifecycle-manager/pkg/metrics" "github.com/operator-framework/operator-registry/pkg/api" "github.com/operator-framework/operator-registry/pkg/client" opregistry "github.com/operator-framework/operator-registry/pkg/registry" @@ -143,6 +144,9 @@ type registrySource struct { } func (s *registrySource) Snapshot(ctx context.Context) (*cache.Snapshot, error) { + s.logger.Printf("requesting snapshot for catalog source %s/%s", s.key.Namespace, s.key.Name) + metrics.IncrementCatalogSourceSnapshotsTotal(s.key.Name, s.key.Namespace) + // Fetching default channels this way makes many round trips // -- may need to either add a new API to fetch all at once, // or embed the information into Bundle. diff --git a/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/step_resolver.go b/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/step_resolver.go index 5d2807bceb..5fb9ab3c0a 100644 --- a/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/step_resolver.go +++ b/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/step_resolver.go @@ -56,7 +56,7 @@ func (pp catsrcPriorityProvider) Priority(key cache.SourceKey) int { return catsrc.Spec.Priority } -func NewOperatorStepResolver(lister operatorlister.OperatorLister, client versioned.Interface, globalCatalogNamespace string, sourceProvider cache.SourceProvider, log logrus.FieldLogger) *OperatorStepResolver { +func NewOperatorCacheProvider(lister operatorlister.OperatorLister, client versioned.Interface, sourceProvider cache.SourceProvider, log logrus.FieldLogger) cache.OperatorCacheProvider { cacheSourceProvider := &mergedSourceProvider{ sps: []cache.SourceProvider{ sourceProvider, @@ -70,13 +70,19 @@ func NewOperatorStepResolver(lister operatorlister.OperatorLister, client versio }, }, } + catSrcPriorityProvider := &catsrcPriorityProvider{lister: lister.OperatorsV1alpha1().CatalogSourceLister()} + + return cache.New(cacheSourceProvider, cache.WithLogger(log), cache.WithSourcePriorityProvider(catSrcPriorityProvider)) +} + +func NewOperatorStepResolver(lister operatorlister.OperatorLister, client versioned.Interface, globalCatalogNamespace string, opCacheProvider cache.OperatorCacheProvider, log logrus.FieldLogger) *OperatorStepResolver { stepResolver := &OperatorStepResolver{ subLister: lister.OperatorsV1alpha1().SubscriptionLister(), csvLister: lister.OperatorsV1alpha1().ClusterServiceVersionLister(), ogLister: lister.OperatorsV1().OperatorGroupLister(), client: client, globalCatalogNamespace: globalCatalogNamespace, - resolver: NewDefaultResolver(cacheSourceProvider, catsrcPriorityProvider{lister: lister.OperatorsV1alpha1().CatalogSourceLister()}, log), + resolver: NewDefaultResolver(opCacheProvider, log), log: log, } diff --git a/staging/operator-lifecycle-manager/pkg/metrics/metrics.go b/staging/operator-lifecycle-manager/pkg/metrics/metrics.go index 7512d87f72..0369a41a24 100644 --- a/staging/operator-lifecycle-manager/pkg/metrics/metrics.go +++ b/staging/operator-lifecycle-manager/pkg/metrics/metrics.go @@ -152,6 +152,14 @@ var ( []string{NamespaceLabel, NameLabel}, ) + catalogSourceSnapshotsTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "catalog_source_snapshots_total", + Help: "The number of times the catalog operator has requested a snapshot of data from a catalog source", + }, + []string{NamespaceLabel, NameLabel}, + ) + // exported since it's not handled by HandleMetrics CSVUpgradeCount = prometheus.NewCounter( prometheus.CounterOpts{ @@ -250,6 +258,7 @@ func RegisterCatalog() { prometheus.MustRegister(subscriptionCount) prometheus.MustRegister(catalogSourceCount) prometheus.MustRegister(catalogSourceReady) + prometheus.MustRegister(catalogSourceSnapshotsTotal) prometheus.MustRegister(SubscriptionSyncCount) prometheus.MustRegister(dependencyResolutionSummary) prometheus.MustRegister(installPlanWarningCount) @@ -272,6 +281,18 @@ func DeleteCatalogSourceStateMetric(name, namespace string) { catalogSourceReady.DeleteLabelValues(namespace, name) } +func RegisterCatalogSourceSnapshotsTotal(name, namespace string) { + catalogSourceSnapshotsTotal.WithLabelValues(namespace, name).Add(0) +} + +func IncrementCatalogSourceSnapshotsTotal(name, namespace string) { + catalogSourceSnapshotsTotal.WithLabelValues(namespace, name).Inc() +} + +func DeleteCatalogSourceSnapshotsTotal(name, namespace string) { + catalogSourceSnapshotsTotal.DeleteLabelValues(namespace, name) +} + func DeleteCSVMetric(oldCSV *operatorsv1alpha1.ClusterServiceVersion) { // Delete the old CSV metrics csvAbnormal.DeleteLabelValues(oldCSV.Namespace, oldCSV.Name, oldCSV.Spec.Version.String(), string(oldCSV.Status.Phase), string(oldCSV.Status.Reason)) diff --git a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/catalog/operator.go b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/catalog/operator.go index 250282cfb3..f9eaeb5eeb 100644 --- a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/catalog/operator.go +++ b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/catalog/operator.go @@ -131,6 +131,7 @@ type Operator struct { clientFactory clients.Factory muInstallPlan sync.Mutex resolverSourceProvider *resolver.RegistrySourceProvider + operatorCacheProvider resolvercache.OperatorCacheProvider } type CatalogSourceSyncFunc func(logger *logrus.Entry, in *v1alpha1.CatalogSource) (out *v1alpha1.CatalogSource, continueSync bool, syncError error) @@ -217,8 +218,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo } op.sources = grpc.NewSourceStore(logger, 10*time.Second, 10*time.Minute, op.syncSourceState) op.resolverSourceProvider = resolver.SourceProviderFromRegistryClientProvider(op.sources, lister.OperatorsV1alpha1().CatalogSourceLister(), logger) + op.operatorCacheProvider = resolver.NewOperatorCacheProvider(lister, crClient, op.resolverSourceProvider, logger) op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, opClient, configmapRegistryImage, op.now, ssaClient, workloadUserID, opmImage, utilImage) - res := resolver.NewOperatorStepResolver(lister, crClient, operatorNamespace, op.resolverSourceProvider, logger) + res := resolver.NewOperatorStepResolver(lister, crClient, operatorNamespace, op.operatorCacheProvider, logger) op.resolver = resolver.NewInstrumentedResolver(res, metrics.RegisterDependencyResolutionSuccess, metrics.RegisterDependencyResolutionFailure) // Wire OLM CR sharedIndexInformers @@ -360,7 +362,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo subscription.WithAppendedReconcilers(subscription.ReconcilerFromLegacySyncHandler(op.syncSubscriptions)), subscription.WithRegistryReconcilerFactory(op.reconciler), subscription.WithGlobalCatalogNamespace(op.namespace), - subscription.WithSourceProvider(op.resolverSourceProvider), + subscription.WithOperatorCacheProvider(op.operatorCacheProvider), ) if err != nil { return nil, err @@ -781,6 +783,7 @@ func (o *Operator) syncSourceState(state grpc.SourceState) { o.logger.Infof("state.Key.Namespace=%s state.Key.Name=%s state.State=%s", state.Key.Namespace, state.Key.Name, state.State.String()) metrics.RegisterCatalogSourceState(state.Key.Name, state.Key.Namespace, state.State) + metrics.RegisterCatalogSourceSnapshotsTotal(state.Key.Name, state.Key.Namespace) switch state.State { case connectivity.Ready: @@ -896,6 +899,7 @@ func (o *Operator) handleCatSrcDeletion(obj interface{}) { o.logger.WithField("source", sourceKey).Info("removed client for deleted catalogsource") metrics.DeleteCatalogSourceStateMetric(catsrc.GetName(), catsrc.GetNamespace()) + metrics.DeleteCatalogSourceSnapshotsTotal(catsrc.GetName(), catsrc.GetNamespace()) } func validateSourceType(logger *logrus.Entry, in *v1alpha1.CatalogSource) (out *v1alpha1.CatalogSource, continueSync bool, _ error) { diff --git a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/catalog/subscription/config.go b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/catalog/subscription/config.go index 01c8a21899..9b4152c9c1 100644 --- a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/catalog/subscription/config.go +++ b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/catalog/subscription/config.go @@ -28,7 +28,7 @@ type syncerConfig struct { reconcilers kubestate.ReconcilerChain registryReconcilerFactory reconciler.RegistryReconcilerFactory globalCatalogNamespace string - sourceProvider resolverCache.SourceProvider + operatorCacheProvider resolverCache.OperatorCacheProvider } // SyncerOption is a configuration option for a subscription syncer. @@ -131,9 +131,9 @@ func WithGlobalCatalogNamespace(namespace string) SyncerOption { } } -func WithSourceProvider(provider resolverCache.SourceProvider) SyncerOption { +func WithOperatorCacheProvider(provider resolverCache.OperatorCacheProvider) SyncerOption { return func(config *syncerConfig) { - config.sourceProvider = provider + config.operatorCacheProvider = provider } } diff --git a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/catalog/subscription/reconciler.go b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/catalog/subscription/reconciler.go index 1a6741a650..fa9dd79d28 100644 --- a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/catalog/subscription/reconciler.go +++ b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/catalog/subscription/reconciler.go @@ -57,7 +57,8 @@ type catalogHealthReconciler struct { catalogLister listers.CatalogSourceLister registryReconcilerFactory reconciler.RegistryReconcilerFactory globalCatalogNamespace string - sourceProvider cache.SourceProvider + operatorCacheProvider cache.OperatorCacheProvider + logger logrus.StdLogger } // Reconcile reconciles subscription catalog health conditions. @@ -126,21 +127,16 @@ func (c *catalogHealthReconciler) Reconcile(ctx context.Context, in kubestate.St // updateDeprecatedStatus adds deprecation status conditions to the subscription when present in the cache entry then // returns a bool value of true if any changes to the existing subscription have occurred. func (c *catalogHealthReconciler) updateDeprecatedStatus(ctx context.Context, sub *v1alpha1.Subscription) (bool, error) { - if c.sourceProvider == nil { + if c.operatorCacheProvider == nil { return false, nil } - source, ok := c.sourceProvider.Sources(sub.Spec.CatalogSourceNamespace)[cache.SourceKey{ + + entries := c.operatorCacheProvider.Namespaced(sub.Spec.CatalogSourceNamespace).Catalog(cache.SourceKey{ Name: sub.Spec.CatalogSource, Namespace: sub.Spec.CatalogSourceNamespace, - }] - if !ok { - return false, nil - } - snapshot, err := source.Snapshot(ctx) - if err != nil { - return false, err - } - if len(snapshot.Entries) == 0 { + }).Find(cache.PkgPredicate(sub.Spec.Package), cache.ChannelPredicate(sub.Spec.Channel)) + + if len(entries) == 0 { return false, nil } @@ -149,12 +145,9 @@ func (c *catalogHealthReconciler) updateDeprecatedStatus(ctx context.Context, su var deprecations *cache.Deprecations found := false - for _, entry := range snapshot.Entries { + for _, entry := range entries { // Find the cache entry that matches this subscription - if entry.SourceInfo == nil || entry.Package() != sub.Spec.Package { - continue - } - if sub.Spec.Channel != "" && entry.Channel() != sub.Spec.Channel { + if entry.SourceInfo == nil { continue } if sub.Status.InstalledCSV != entry.Name { diff --git a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/catalog/subscription/syncer.go b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/catalog/subscription/syncer.go index b39adc6bc0..564930ef3c 100644 --- a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/catalog/subscription/syncer.go +++ b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/catalog/subscription/syncer.go @@ -16,7 +16,6 @@ import ( "github.com/operator-framework/api/pkg/operators/install" "github.com/operator-framework/api/pkg/operators/v1alpha1" listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1alpha1" - resolverCache "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/cache" "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate" "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/ownerutil" "github.com/operator-framework/operator-lifecycle-manager/pkg/metrics" @@ -38,7 +37,6 @@ type subscriptionSyncer struct { installPlanLister listers.InstallPlanLister globalCatalogNamespace string notify kubestate.NotifyFunc - sourceProvider resolverCache.SourceProvider } // now returns the Syncer's current time. @@ -218,7 +216,6 @@ func newSyncerWithConfig(ctx context.Context, config *syncerConfig) (kubestate.S reconcilers: config.reconcilers, subscriptionCache: config.subscriptionInformer.GetIndexer(), installPlanLister: config.lister.OperatorsV1alpha1().InstallPlanLister(), - sourceProvider: config.sourceProvider, notify: func(event types.NamespacedName) { // Notify Subscriptions by enqueuing to the Subscription queue. config.subscriptionQueue.Add(event) @@ -256,7 +253,8 @@ func newSyncerWithConfig(ctx context.Context, config *syncerConfig) (kubestate.S catalogLister: config.lister.OperatorsV1alpha1().CatalogSourceLister(), registryReconcilerFactory: config.registryReconcilerFactory, globalCatalogNamespace: config.globalCatalogNamespace, - sourceProvider: config.sourceProvider, + operatorCacheProvider: config.operatorCacheProvider, + logger: config.logger, }, } s.reconcilers = append(defaultReconcilers, s.reconcilers...) diff --git a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/resolver.go b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/resolver.go index c19aba9f26..322b581fb4 100644 --- a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/resolver.go +++ b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/resolver.go @@ -29,15 +29,15 @@ type constraintProvider interface { } type Resolver struct { - cache *cache.Cache + cache cache.OperatorCacheProvider log logrus.FieldLogger pc *predicateConverter systemConstraintsProvider constraintProvider } -func NewDefaultResolver(rcp cache.SourceProvider, sourcePriorityProvider cache.SourcePriorityProvider, logger logrus.FieldLogger) *Resolver { +func NewDefaultResolver(cacheProvider cache.OperatorCacheProvider, logger logrus.FieldLogger) *Resolver { return &Resolver{ - cache: cache.New(rcp, cache.WithLogger(logger), cache.WithSourcePriorityProvider(sourcePriorityProvider)), + cache: cacheProvider, log: logger, pc: &predicateConverter{ celEnv: constraints.NewCelEnvironment(), diff --git a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/source_registry.go b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/source_registry.go index fe193beae8..f20cab0eba 100644 --- a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/source_registry.go +++ b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/source_registry.go @@ -12,6 +12,7 @@ import ( v1alpha1listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1alpha1" "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry" "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/cache" + "github.com/operator-framework/operator-lifecycle-manager/pkg/metrics" "github.com/operator-framework/operator-registry/pkg/api" "github.com/operator-framework/operator-registry/pkg/client" opregistry "github.com/operator-framework/operator-registry/pkg/registry" @@ -143,6 +144,9 @@ type registrySource struct { } func (s *registrySource) Snapshot(ctx context.Context) (*cache.Snapshot, error) { + s.logger.Printf("requesting snapshot for catalog source %s/%s", s.key.Namespace, s.key.Name) + metrics.IncrementCatalogSourceSnapshotsTotal(s.key.Name, s.key.Namespace) + // Fetching default channels this way makes many round trips // -- may need to either add a new API to fetch all at once, // or embed the information into Bundle. diff --git a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/step_resolver.go b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/step_resolver.go index 5d2807bceb..5fb9ab3c0a 100644 --- a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/step_resolver.go +++ b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/step_resolver.go @@ -56,7 +56,7 @@ func (pp catsrcPriorityProvider) Priority(key cache.SourceKey) int { return catsrc.Spec.Priority } -func NewOperatorStepResolver(lister operatorlister.OperatorLister, client versioned.Interface, globalCatalogNamespace string, sourceProvider cache.SourceProvider, log logrus.FieldLogger) *OperatorStepResolver { +func NewOperatorCacheProvider(lister operatorlister.OperatorLister, client versioned.Interface, sourceProvider cache.SourceProvider, log logrus.FieldLogger) cache.OperatorCacheProvider { cacheSourceProvider := &mergedSourceProvider{ sps: []cache.SourceProvider{ sourceProvider, @@ -70,13 +70,19 @@ func NewOperatorStepResolver(lister operatorlister.OperatorLister, client versio }, }, } + catSrcPriorityProvider := &catsrcPriorityProvider{lister: lister.OperatorsV1alpha1().CatalogSourceLister()} + + return cache.New(cacheSourceProvider, cache.WithLogger(log), cache.WithSourcePriorityProvider(catSrcPriorityProvider)) +} + +func NewOperatorStepResolver(lister operatorlister.OperatorLister, client versioned.Interface, globalCatalogNamespace string, opCacheProvider cache.OperatorCacheProvider, log logrus.FieldLogger) *OperatorStepResolver { stepResolver := &OperatorStepResolver{ subLister: lister.OperatorsV1alpha1().SubscriptionLister(), csvLister: lister.OperatorsV1alpha1().ClusterServiceVersionLister(), ogLister: lister.OperatorsV1().OperatorGroupLister(), client: client, globalCatalogNamespace: globalCatalogNamespace, - resolver: NewDefaultResolver(cacheSourceProvider, catsrcPriorityProvider{lister: lister.OperatorsV1alpha1().CatalogSourceLister()}, log), + resolver: NewDefaultResolver(opCacheProvider, log), log: log, } diff --git a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/metrics/metrics.go b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/metrics/metrics.go index 7512d87f72..0369a41a24 100644 --- a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/metrics/metrics.go +++ b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/metrics/metrics.go @@ -152,6 +152,14 @@ var ( []string{NamespaceLabel, NameLabel}, ) + catalogSourceSnapshotsTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "catalog_source_snapshots_total", + Help: "The number of times the catalog operator has requested a snapshot of data from a catalog source", + }, + []string{NamespaceLabel, NameLabel}, + ) + // exported since it's not handled by HandleMetrics CSVUpgradeCount = prometheus.NewCounter( prometheus.CounterOpts{ @@ -250,6 +258,7 @@ func RegisterCatalog() { prometheus.MustRegister(subscriptionCount) prometheus.MustRegister(catalogSourceCount) prometheus.MustRegister(catalogSourceReady) + prometheus.MustRegister(catalogSourceSnapshotsTotal) prometheus.MustRegister(SubscriptionSyncCount) prometheus.MustRegister(dependencyResolutionSummary) prometheus.MustRegister(installPlanWarningCount) @@ -272,6 +281,18 @@ func DeleteCatalogSourceStateMetric(name, namespace string) { catalogSourceReady.DeleteLabelValues(namespace, name) } +func RegisterCatalogSourceSnapshotsTotal(name, namespace string) { + catalogSourceSnapshotsTotal.WithLabelValues(namespace, name).Add(0) +} + +func IncrementCatalogSourceSnapshotsTotal(name, namespace string) { + catalogSourceSnapshotsTotal.WithLabelValues(namespace, name).Inc() +} + +func DeleteCatalogSourceSnapshotsTotal(name, namespace string) { + catalogSourceSnapshotsTotal.DeleteLabelValues(namespace, name) +} + func DeleteCSVMetric(oldCSV *operatorsv1alpha1.ClusterServiceVersion) { // Delete the old CSV metrics csvAbnormal.DeleteLabelValues(oldCSV.Namespace, oldCSV.Name, oldCSV.Spec.Version.String(), string(oldCSV.Status.Phase), string(oldCSV.Status.Reason)) From 66df0604f2a44d2d4abf11bc0b49cda1896623c8 Mon Sep 17 00:00:00 2001 From: Per Goncalves da Silva Date: Thu, 16 Jan 2025 16:57:50 +0100 Subject: [PATCH 2/3] Add more logging around catalog source sync (#3414) Signed-off-by: Per Goncalves da Silva Co-authored-by: Per Goncalves da Silva Upstream-repository: operator-lifecycle-manager Upstream-commit: 4043bab07a49d07ce15e5e9aa82dd32845ca4cbf --- .../controller/operators/catalog/operator.go | 39 ++++++------------- .../controller/registry/reconciler/grpc.go | 35 +++++++++++++++-- .../controller/operators/catalog/operator.go | 39 ++++++------------- .../controller/registry/reconciler/grpc.go | 35 +++++++++++++++-- 4 files changed, 84 insertions(+), 64 deletions(-) diff --git a/staging/operator-lifecycle-manager/pkg/controller/operators/catalog/operator.go b/staging/operator-lifecycle-manager/pkg/controller/operators/catalog/operator.go index f9eaeb5eeb..44f38ef521 100644 --- a/staging/operator-lifecycle-manager/pkg/controller/operators/catalog/operator.go +++ b/staging/operator-lifecycle-manager/pkg/controller/operators/catalog/operator.go @@ -918,6 +918,7 @@ func validateSourceType(logger *logrus.Entry, in *v1alpha1.CatalogSource) (out * err = fmt.Errorf("unknown sourcetype: %s", sourceType) } if err != nil { + logger.WithError(err).Error("error validating catalog source type") out.SetError(v1alpha1.CatalogSourceSpecInvalidError, err) return } @@ -929,7 +930,6 @@ func validateSourceType(logger *logrus.Entry, in *v1alpha1.CatalogSource) (out * } } continueSync = true - return } @@ -942,27 +942,22 @@ func (o *Operator) syncConfigMap(logger *logrus.Entry, in *v1alpha1.CatalogSourc out = in.DeepCopy() - logger = logger.WithFields(logrus.Fields{ - "configmap.namespace": in.Namespace, - "configmap.name": in.Spec.ConfigMap, - }) - logger.Info("checking catsrc configmap state") - var updateLabel bool // Get the catalog source's config map configMap, err := o.lister.CoreV1().ConfigMapLister().ConfigMaps(in.GetNamespace()).Get(in.Spec.ConfigMap) // Attempt to look up the CM via api call if there is a cache miss if apierrors.IsNotFound(err) { + // TODO: Don't reach out via live client if its not found in the cache (https://github.com/operator-framework/operator-lifecycle-manager/issues/3415) configMap, err = o.opClient.KubernetesInterface().CoreV1().ConfigMaps(in.GetNamespace()).Get(context.TODO(), in.Spec.ConfigMap, metav1.GetOptions{}) // Found cm in the cluster, add managed label to configmap if err == nil { - labels := configMap.GetLabels() - if labels == nil { - labels = make(map[string]string) + cmLabels := configMap.GetLabels() + if cmLabels == nil { + cmLabels = make(map[string]string) } - labels[install.OLMManagedLabelKey] = "false" - configMap.SetLabels(labels) + cmLabels[install.OLMManagedLabelKey] = "false" + configMap.SetLabels(cmLabels) updateLabel = true } } @@ -979,12 +974,9 @@ func (o *Operator) syncConfigMap(logger *logrus.Entry, in *v1alpha1.CatalogSourc out.SetError(v1alpha1.CatalogSourceConfigMapError, syncError) return } - - logger.Info("adopted configmap") } if in.Status.ConfigMapResource == nil || !in.Status.ConfigMapResource.IsAMatch(&configMap.ObjectMeta) { - logger.Info("updating catsrc configmap state") // configmap ref nonexistent or updated, write out the new configmap ref to status and exit out.Status.ConfigMapResource = &v1alpha1.ConfigMapResourceReference{ Name: configMap.GetName(), @@ -1004,7 +996,6 @@ func (o *Operator) syncConfigMap(logger *logrus.Entry, in *v1alpha1.CatalogSourc func (o *Operator) syncRegistryServer(logger *logrus.Entry, in *v1alpha1.CatalogSource) (out *v1alpha1.CatalogSource, continueSync bool, syncError error) { out = in.DeepCopy() - logger.Info("synchronizing registry server") sourceKey := registry.CatalogKey{Name: in.GetName(), Namespace: in.GetNamespace()} srcReconciler := o.reconciler.ReconcilerForSource(in) if srcReconciler == nil { @@ -1021,21 +1012,15 @@ func (o *Operator) syncRegistryServer(logger *logrus.Entry, in *v1alpha1.Catalog return } - logger.WithField("health", healthy).Infof("checked registry server health") - if healthy && in.Status.RegistryServiceStatus != nil { - logger.Info("registry state good") continueSync = true // return here if catalog does not have polling enabled if !out.Poll() { - logger.Info("polling not enabled, nothing more to do") return } } // Registry pod hasn't been created or hasn't been updated since the last configmap update, recreate it - logger.Info("ensuring registry server") - err = srcReconciler.EnsureRegistryServer(logger, out) if err != nil { if _, ok := err.(reconciler.UpdateNotReadyErr); ok { @@ -1048,8 +1033,6 @@ func (o *Operator) syncRegistryServer(logger *logrus.Entry, in *v1alpha1.Catalog return } - logger.Info("ensured registry server") - // requeue the catalog sync based on the polling interval, for accurate syncs of catalogs with polling enabled if out.Spec.UpdateStrategy != nil && out.Spec.UpdateStrategy.RegistryPoll != nil { if out.Spec.UpdateStrategy.Interval == nil { @@ -1058,16 +1041,17 @@ func (o *Operator) syncRegistryServer(logger *logrus.Entry, in *v1alpha1.Catalog return } if out.Spec.UpdateStrategy.RegistryPoll.ParsingError != "" && out.Status.Reason != v1alpha1.CatalogSourceIntervalInvalidError { - out.SetError(v1alpha1.CatalogSourceIntervalInvalidError, errors.New(out.Spec.UpdateStrategy.RegistryPoll.ParsingError)) + err := errors.New(out.Spec.UpdateStrategy.RegistryPoll.ParsingError) + logger.WithError(err).Error("registry server sync error: failed to parse registry poll interval") + out.SetError(v1alpha1.CatalogSourceIntervalInvalidError, err) } - logger.Infof("requeuing registry server sync based on polling interval %s", out.Spec.UpdateStrategy.Interval.Duration.String()) resyncPeriod := reconciler.SyncRegistryUpdateInterval(out, time.Now()) o.catsrcQueueSet.RequeueAfter(out.GetNamespace(), out.GetName(), queueinformer.ResyncWithJitter(resyncPeriod, 0.1)()) return } if err := o.sources.Remove(sourceKey); err != nil { - o.logger.WithError(err).Debug("error closing client connection") + o.logger.WithError(err).Error("registry server sync error: error closing client connection") } return @@ -1158,7 +1142,6 @@ func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) { "catalogsource.name": catsrc.Name, "id": queueinformer.NewLoopID(), }) - logger.Info("syncing catalog source") syncFunc := func(in *v1alpha1.CatalogSource, chain []CatalogSourceSyncFunc) (out *v1alpha1.CatalogSource, syncErr error) { out = in diff --git a/staging/operator-lifecycle-manager/pkg/controller/registry/reconciler/grpc.go b/staging/operator-lifecycle-manager/pkg/controller/registry/reconciler/grpc.go index a960a00729..2f32ad8264 100644 --- a/staging/operator-lifecycle-manager/pkg/controller/registry/reconciler/grpc.go +++ b/staging/operator-lifecycle-manager/pkg/controller/registry/reconciler/grpc.go @@ -7,6 +7,7 @@ import ( "time" "github.com/google/go-cmp/cmp" + "github.com/operator-framework/api/pkg/operators/v1alpha1" "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/install" @@ -201,10 +202,9 @@ func (c *GrpcRegistryReconciler) currentUpdatePods(logger *logrus.Entry, source } func (c *GrpcRegistryReconciler) currentPodsWithCorrectImageAndSpec(logger *logrus.Entry, source grpcCatalogSourceDecorator, serviceAccount *corev1.ServiceAccount, defaultPodSecurityConfig v1alpha1.SecurityConfig) ([]*corev1.Pod, error) { - logger.Info("searching for current pods") pods, err := c.Lister.CoreV1().PodLister().Pods(source.GetNamespace()).List(labels.SelectorFromValidatedSet(source.Labels())) if err != nil { - logger.WithError(err).Warn("couldn't find pod in cache") + logger.WithError(err).Warn("error searching for catalog source pods: couldn't find pod in cache") return nil, nil } found := []*corev1.Pod{} @@ -222,7 +222,7 @@ func (c *GrpcRegistryReconciler) currentPodsWithCorrectImageAndSpec(logger *logr if !hash { logger.Infof("pod spec diff: %s", cmp.Diff(p.Spec, newPod.Spec)) } - if correctImages(source, p) && podHashMatch(p, newPod) { + if images && hash { found = append(found, p) } } @@ -252,6 +252,7 @@ func (c *GrpcRegistryReconciler) EnsureRegistryServer(logger *logrus.Entry, cata // if service status is nil, we force create every object to ensure they're created the first time valid, err := isRegistryServiceStatusValid(&source) if err != nil { + logger.WithError(err).Error("error ensuring registry server: could not validate registry service status") return err } overwrite := !valid @@ -262,22 +263,26 @@ func (c *GrpcRegistryReconciler) EnsureRegistryServer(logger *logrus.Entry, cata //TODO: if any of these error out, we should write a status back (possibly set RegistryServiceStatus to nil so they get recreated) sa, err := c.ensureSA(source) if err != nil && !apierrors.IsAlreadyExists(err) { + logger.WithError(err).Error("error ensuring registry server: could not ensure registry service account") return pkgerrors.Wrapf(err, "error ensuring service account: %s", source.GetName()) } sa, err = c.OpClient.GetServiceAccount(sa.GetNamespace(), sa.GetName()) if err != nil { + logger.WithError(err).Error("error ensuring registry server: could not get registry service account") return err } defaultPodSecurityConfig, err := getDefaultPodContextConfig(c.OpClient, catalogSource.GetNamespace()) if err != nil { + logger.WithError(err).Error("error ensuring registry server: could not get default pod security config") return err } // recreate the pod if no existing pod is serving the latest image or correct spec current, err := c.currentPodsWithCorrectImageAndSpec(logger, source, sa, defaultPodSecurityConfig) if err != nil { + logger.WithError(err).Error("error ensuring registry server: could not get current pods with correct image and spec") return err } overwritePod := overwrite || len(current) == 0 @@ -287,22 +292,29 @@ func (c *GrpcRegistryReconciler) EnsureRegistryServer(logger *logrus.Entry, cata pod, err := source.Pod(sa, defaultPodSecurityConfig) if err != nil { + logger.WithError(err).Error("error ensuring registry server: could not create registry pod") return err } if err := c.ensurePod(logger, source, sa, defaultPodSecurityConfig, overwritePod); err != nil { + logger.WithError(err).Error("error ensuring registry server: could not ensure registry pod") return pkgerrors.Wrapf(err, "error ensuring pod: %s", pod.GetName()) } if err := c.ensureUpdatePod(logger, sa, defaultPodSecurityConfig, source); err != nil { + logger.WithError(err).Error("error ensuring registry server: could not ensure update pod") if _, ok := err.(UpdateNotReadyErr); ok { + logger.WithError(err).Error("error ensuring registry server: ensure update pod error is not of type UpdateNotReadyErr") return err } return pkgerrors.Wrapf(err, "error ensuring updated catalog source pod: %s", pod.GetName()) } + service, err := source.Service() if err != nil { + logger.WithError(err).Error("couldn't get service") return err } if err := c.ensureService(source, overwrite); err != nil { + logger.WithError(err).Error("error ensuring registry server: could not ensure service") return pkgerrors.Wrapf(err, "error ensuring service: %s", service.GetName()) } @@ -310,6 +322,7 @@ func (c *GrpcRegistryReconciler) EnsureRegistryServer(logger *logrus.Entry, cata now := c.now() service, err := source.Service() if err != nil { + logger.WithError(err).Error("error ensuring registry server: could not get service") return err } catalogSource.Status.RegistryServiceStatus = &v1alpha1.RegistryServiceStatus{ @@ -603,6 +616,7 @@ func (c *GrpcRegistryReconciler) CheckRegistryServer(logger *logrus.Entry, catal serviceAccount := source.ServiceAccount() serviceAccount, err := c.OpClient.GetServiceAccount(serviceAccount.GetNamespace(), serviceAccount.GetName()) if err != nil { + logger.WithError(err).Error("registry service not healthy: could not get service account") if !apierrors.IsNotFound(err) { return false, err } @@ -611,6 +625,7 @@ func (c *GrpcRegistryReconciler) CheckRegistryServer(logger *logrus.Entry, catal registryPodSecurityConfig, err := getDefaultPodContextConfig(c.OpClient, catalogSource.GetNamespace()) if err != nil { + logger.WithError(err).Error("registry service not healthy: could not get registry pod security config") return false, err } @@ -618,18 +633,30 @@ func (c *GrpcRegistryReconciler) CheckRegistryServer(logger *logrus.Entry, catal // TODO: add gRPC health check service, err := c.currentService(source) if err != nil { + logger.WithError(err).Error("registry service not healthy: could not get current service") return false, err } + currentPods, err := c.currentPodsWithCorrectImageAndSpec(logger, source, serviceAccount, registryPodSecurityConfig) if err != nil { + logger.WithError(err).Error("registry service not healthy: could not get current pods") return false, err } + + currentServiceAccount := c.currentServiceAccount(source) if len(currentPods) < 1 || - service == nil || c.currentServiceAccount(source) == nil { + service == nil || currentServiceAccount == nil { + logger.WithFields(logrus.Fields{ + "numCurrentPods": len(currentPods), + "isServiceNil": service == nil, + "isCurrentServiceAccountNil": currentServiceAccount == nil, + }).Error("registry service not healthy: one or more required resources are missing") return false, nil } + podsAreLive, e := detectAndDeleteDeadPods(logger, c.OpClient, currentPods, source.GetNamespace()) if e != nil { + logger.WithError(e).Error("registry service not healthy: could not detect and delete dead pods") return false, fmt.Errorf("error deleting dead pods: %v", e) } return podsAreLive, nil diff --git a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/catalog/operator.go b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/catalog/operator.go index f9eaeb5eeb..44f38ef521 100644 --- a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/catalog/operator.go +++ b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/catalog/operator.go @@ -918,6 +918,7 @@ func validateSourceType(logger *logrus.Entry, in *v1alpha1.CatalogSource) (out * err = fmt.Errorf("unknown sourcetype: %s", sourceType) } if err != nil { + logger.WithError(err).Error("error validating catalog source type") out.SetError(v1alpha1.CatalogSourceSpecInvalidError, err) return } @@ -929,7 +930,6 @@ func validateSourceType(logger *logrus.Entry, in *v1alpha1.CatalogSource) (out * } } continueSync = true - return } @@ -942,27 +942,22 @@ func (o *Operator) syncConfigMap(logger *logrus.Entry, in *v1alpha1.CatalogSourc out = in.DeepCopy() - logger = logger.WithFields(logrus.Fields{ - "configmap.namespace": in.Namespace, - "configmap.name": in.Spec.ConfigMap, - }) - logger.Info("checking catsrc configmap state") - var updateLabel bool // Get the catalog source's config map configMap, err := o.lister.CoreV1().ConfigMapLister().ConfigMaps(in.GetNamespace()).Get(in.Spec.ConfigMap) // Attempt to look up the CM via api call if there is a cache miss if apierrors.IsNotFound(err) { + // TODO: Don't reach out via live client if its not found in the cache (https://github.com/operator-framework/operator-lifecycle-manager/issues/3415) configMap, err = o.opClient.KubernetesInterface().CoreV1().ConfigMaps(in.GetNamespace()).Get(context.TODO(), in.Spec.ConfigMap, metav1.GetOptions{}) // Found cm in the cluster, add managed label to configmap if err == nil { - labels := configMap.GetLabels() - if labels == nil { - labels = make(map[string]string) + cmLabels := configMap.GetLabels() + if cmLabels == nil { + cmLabels = make(map[string]string) } - labels[install.OLMManagedLabelKey] = "false" - configMap.SetLabels(labels) + cmLabels[install.OLMManagedLabelKey] = "false" + configMap.SetLabels(cmLabels) updateLabel = true } } @@ -979,12 +974,9 @@ func (o *Operator) syncConfigMap(logger *logrus.Entry, in *v1alpha1.CatalogSourc out.SetError(v1alpha1.CatalogSourceConfigMapError, syncError) return } - - logger.Info("adopted configmap") } if in.Status.ConfigMapResource == nil || !in.Status.ConfigMapResource.IsAMatch(&configMap.ObjectMeta) { - logger.Info("updating catsrc configmap state") // configmap ref nonexistent or updated, write out the new configmap ref to status and exit out.Status.ConfigMapResource = &v1alpha1.ConfigMapResourceReference{ Name: configMap.GetName(), @@ -1004,7 +996,6 @@ func (o *Operator) syncConfigMap(logger *logrus.Entry, in *v1alpha1.CatalogSourc func (o *Operator) syncRegistryServer(logger *logrus.Entry, in *v1alpha1.CatalogSource) (out *v1alpha1.CatalogSource, continueSync bool, syncError error) { out = in.DeepCopy() - logger.Info("synchronizing registry server") sourceKey := registry.CatalogKey{Name: in.GetName(), Namespace: in.GetNamespace()} srcReconciler := o.reconciler.ReconcilerForSource(in) if srcReconciler == nil { @@ -1021,21 +1012,15 @@ func (o *Operator) syncRegistryServer(logger *logrus.Entry, in *v1alpha1.Catalog return } - logger.WithField("health", healthy).Infof("checked registry server health") - if healthy && in.Status.RegistryServiceStatus != nil { - logger.Info("registry state good") continueSync = true // return here if catalog does not have polling enabled if !out.Poll() { - logger.Info("polling not enabled, nothing more to do") return } } // Registry pod hasn't been created or hasn't been updated since the last configmap update, recreate it - logger.Info("ensuring registry server") - err = srcReconciler.EnsureRegistryServer(logger, out) if err != nil { if _, ok := err.(reconciler.UpdateNotReadyErr); ok { @@ -1048,8 +1033,6 @@ func (o *Operator) syncRegistryServer(logger *logrus.Entry, in *v1alpha1.Catalog return } - logger.Info("ensured registry server") - // requeue the catalog sync based on the polling interval, for accurate syncs of catalogs with polling enabled if out.Spec.UpdateStrategy != nil && out.Spec.UpdateStrategy.RegistryPoll != nil { if out.Spec.UpdateStrategy.Interval == nil { @@ -1058,16 +1041,17 @@ func (o *Operator) syncRegistryServer(logger *logrus.Entry, in *v1alpha1.Catalog return } if out.Spec.UpdateStrategy.RegistryPoll.ParsingError != "" && out.Status.Reason != v1alpha1.CatalogSourceIntervalInvalidError { - out.SetError(v1alpha1.CatalogSourceIntervalInvalidError, errors.New(out.Spec.UpdateStrategy.RegistryPoll.ParsingError)) + err := errors.New(out.Spec.UpdateStrategy.RegistryPoll.ParsingError) + logger.WithError(err).Error("registry server sync error: failed to parse registry poll interval") + out.SetError(v1alpha1.CatalogSourceIntervalInvalidError, err) } - logger.Infof("requeuing registry server sync based on polling interval %s", out.Spec.UpdateStrategy.Interval.Duration.String()) resyncPeriod := reconciler.SyncRegistryUpdateInterval(out, time.Now()) o.catsrcQueueSet.RequeueAfter(out.GetNamespace(), out.GetName(), queueinformer.ResyncWithJitter(resyncPeriod, 0.1)()) return } if err := o.sources.Remove(sourceKey); err != nil { - o.logger.WithError(err).Debug("error closing client connection") + o.logger.WithError(err).Error("registry server sync error: error closing client connection") } return @@ -1158,7 +1142,6 @@ func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) { "catalogsource.name": catsrc.Name, "id": queueinformer.NewLoopID(), }) - logger.Info("syncing catalog source") syncFunc := func(in *v1alpha1.CatalogSource, chain []CatalogSourceSyncFunc) (out *v1alpha1.CatalogSource, syncErr error) { out = in diff --git a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/reconciler/grpc.go b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/reconciler/grpc.go index a960a00729..2f32ad8264 100644 --- a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/reconciler/grpc.go +++ b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/reconciler/grpc.go @@ -7,6 +7,7 @@ import ( "time" "github.com/google/go-cmp/cmp" + "github.com/operator-framework/api/pkg/operators/v1alpha1" "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/install" @@ -201,10 +202,9 @@ func (c *GrpcRegistryReconciler) currentUpdatePods(logger *logrus.Entry, source } func (c *GrpcRegistryReconciler) currentPodsWithCorrectImageAndSpec(logger *logrus.Entry, source grpcCatalogSourceDecorator, serviceAccount *corev1.ServiceAccount, defaultPodSecurityConfig v1alpha1.SecurityConfig) ([]*corev1.Pod, error) { - logger.Info("searching for current pods") pods, err := c.Lister.CoreV1().PodLister().Pods(source.GetNamespace()).List(labels.SelectorFromValidatedSet(source.Labels())) if err != nil { - logger.WithError(err).Warn("couldn't find pod in cache") + logger.WithError(err).Warn("error searching for catalog source pods: couldn't find pod in cache") return nil, nil } found := []*corev1.Pod{} @@ -222,7 +222,7 @@ func (c *GrpcRegistryReconciler) currentPodsWithCorrectImageAndSpec(logger *logr if !hash { logger.Infof("pod spec diff: %s", cmp.Diff(p.Spec, newPod.Spec)) } - if correctImages(source, p) && podHashMatch(p, newPod) { + if images && hash { found = append(found, p) } } @@ -252,6 +252,7 @@ func (c *GrpcRegistryReconciler) EnsureRegistryServer(logger *logrus.Entry, cata // if service status is nil, we force create every object to ensure they're created the first time valid, err := isRegistryServiceStatusValid(&source) if err != nil { + logger.WithError(err).Error("error ensuring registry server: could not validate registry service status") return err } overwrite := !valid @@ -262,22 +263,26 @@ func (c *GrpcRegistryReconciler) EnsureRegistryServer(logger *logrus.Entry, cata //TODO: if any of these error out, we should write a status back (possibly set RegistryServiceStatus to nil so they get recreated) sa, err := c.ensureSA(source) if err != nil && !apierrors.IsAlreadyExists(err) { + logger.WithError(err).Error("error ensuring registry server: could not ensure registry service account") return pkgerrors.Wrapf(err, "error ensuring service account: %s", source.GetName()) } sa, err = c.OpClient.GetServiceAccount(sa.GetNamespace(), sa.GetName()) if err != nil { + logger.WithError(err).Error("error ensuring registry server: could not get registry service account") return err } defaultPodSecurityConfig, err := getDefaultPodContextConfig(c.OpClient, catalogSource.GetNamespace()) if err != nil { + logger.WithError(err).Error("error ensuring registry server: could not get default pod security config") return err } // recreate the pod if no existing pod is serving the latest image or correct spec current, err := c.currentPodsWithCorrectImageAndSpec(logger, source, sa, defaultPodSecurityConfig) if err != nil { + logger.WithError(err).Error("error ensuring registry server: could not get current pods with correct image and spec") return err } overwritePod := overwrite || len(current) == 0 @@ -287,22 +292,29 @@ func (c *GrpcRegistryReconciler) EnsureRegistryServer(logger *logrus.Entry, cata pod, err := source.Pod(sa, defaultPodSecurityConfig) if err != nil { + logger.WithError(err).Error("error ensuring registry server: could not create registry pod") return err } if err := c.ensurePod(logger, source, sa, defaultPodSecurityConfig, overwritePod); err != nil { + logger.WithError(err).Error("error ensuring registry server: could not ensure registry pod") return pkgerrors.Wrapf(err, "error ensuring pod: %s", pod.GetName()) } if err := c.ensureUpdatePod(logger, sa, defaultPodSecurityConfig, source); err != nil { + logger.WithError(err).Error("error ensuring registry server: could not ensure update pod") if _, ok := err.(UpdateNotReadyErr); ok { + logger.WithError(err).Error("error ensuring registry server: ensure update pod error is not of type UpdateNotReadyErr") return err } return pkgerrors.Wrapf(err, "error ensuring updated catalog source pod: %s", pod.GetName()) } + service, err := source.Service() if err != nil { + logger.WithError(err).Error("couldn't get service") return err } if err := c.ensureService(source, overwrite); err != nil { + logger.WithError(err).Error("error ensuring registry server: could not ensure service") return pkgerrors.Wrapf(err, "error ensuring service: %s", service.GetName()) } @@ -310,6 +322,7 @@ func (c *GrpcRegistryReconciler) EnsureRegistryServer(logger *logrus.Entry, cata now := c.now() service, err := source.Service() if err != nil { + logger.WithError(err).Error("error ensuring registry server: could not get service") return err } catalogSource.Status.RegistryServiceStatus = &v1alpha1.RegistryServiceStatus{ @@ -603,6 +616,7 @@ func (c *GrpcRegistryReconciler) CheckRegistryServer(logger *logrus.Entry, catal serviceAccount := source.ServiceAccount() serviceAccount, err := c.OpClient.GetServiceAccount(serviceAccount.GetNamespace(), serviceAccount.GetName()) if err != nil { + logger.WithError(err).Error("registry service not healthy: could not get service account") if !apierrors.IsNotFound(err) { return false, err } @@ -611,6 +625,7 @@ func (c *GrpcRegistryReconciler) CheckRegistryServer(logger *logrus.Entry, catal registryPodSecurityConfig, err := getDefaultPodContextConfig(c.OpClient, catalogSource.GetNamespace()) if err != nil { + logger.WithError(err).Error("registry service not healthy: could not get registry pod security config") return false, err } @@ -618,18 +633,30 @@ func (c *GrpcRegistryReconciler) CheckRegistryServer(logger *logrus.Entry, catal // TODO: add gRPC health check service, err := c.currentService(source) if err != nil { + logger.WithError(err).Error("registry service not healthy: could not get current service") return false, err } + currentPods, err := c.currentPodsWithCorrectImageAndSpec(logger, source, serviceAccount, registryPodSecurityConfig) if err != nil { + logger.WithError(err).Error("registry service not healthy: could not get current pods") return false, err } + + currentServiceAccount := c.currentServiceAccount(source) if len(currentPods) < 1 || - service == nil || c.currentServiceAccount(source) == nil { + service == nil || currentServiceAccount == nil { + logger.WithFields(logrus.Fields{ + "numCurrentPods": len(currentPods), + "isServiceNil": service == nil, + "isCurrentServiceAccountNil": currentServiceAccount == nil, + }).Error("registry service not healthy: one or more required resources are missing") return false, nil } + podsAreLive, e := detectAndDeleteDeadPods(logger, c.OpClient, currentPods, source.GetNamespace()) if e != nil { + logger.WithError(e).Error("registry service not healthy: could not detect and delete dead pods") return false, fmt.Errorf("error deleting dead pods: %v", e) } return podsAreLive, nil From a6ec135c995c48f22241cb11c605b11c5ef318c2 Mon Sep 17 00:00:00 2001 From: Joe Lanford Date: Fri, 17 Jan 2025 03:50:35 -0500 Subject: [PATCH 3/3] sort lists of identifiers and conflict messages to reduce SAT solver non-determinism (#3491) Signed-off-by: Joe Lanford Upstream-repository: operator-lifecycle-manager Upstream-commit: 451e775ea03eb9280399ba0abaf19bd237df167c --- .../pkg/controller/registry/resolver/resolver.go | 3 +++ .../pkg/controller/registry/resolver/solver/lit_mapping.go | 5 +++++ .../pkg/controller/registry/resolver/resolver.go | 3 +++ .../pkg/controller/registry/resolver/solver/lit_mapping.go | 5 +++++ 4 files changed, 16 insertions(+) diff --git a/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/resolver.go b/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/resolver.go index 322b581fb4..7f07c711d6 100644 --- a/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/resolver.go +++ b/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/resolver.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "slices" "sort" "strings" @@ -513,11 +514,13 @@ func (r *Resolver) addInvariants(namespacedCache cache.MultiCatalogOperatorFinde } for gvk, is := range gvkConflictToVariable { + slices.Sort(is) s := NewSingleAPIProviderVariable(gvk.Group, gvk.Version, gvk.Kind, is) variables[s.Identifier()] = s } for pkg, is := range packageConflictToVariable { + slices.Sort(is) s := NewSinglePackageInstanceVariable(pkg, is) variables[s.Identifier()] = s } diff --git a/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/solver/lit_mapping.go b/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/solver/lit_mapping.go index eb7a739aca..4117da2e1a 100644 --- a/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/solver/lit_mapping.go +++ b/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/solver/lit_mapping.go @@ -1,7 +1,9 @@ package solver import ( + "cmp" "fmt" + "slices" "strings" "github.com/go-air/gini/inter" @@ -203,5 +205,8 @@ func (d *litMapping) Conflicts(g inter.Assumable) []AppliedConstraint { as = append(as, a) } } + slices.SortFunc(as, func(a, b AppliedConstraint) int { + return cmp.Compare(a.String(), b.String()) + }) return as } diff --git a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/resolver.go b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/resolver.go index 322b581fb4..7f07c711d6 100644 --- a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/resolver.go +++ b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/resolver.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "slices" "sort" "strings" @@ -513,11 +514,13 @@ func (r *Resolver) addInvariants(namespacedCache cache.MultiCatalogOperatorFinde } for gvk, is := range gvkConflictToVariable { + slices.Sort(is) s := NewSingleAPIProviderVariable(gvk.Group, gvk.Version, gvk.Kind, is) variables[s.Identifier()] = s } for pkg, is := range packageConflictToVariable { + slices.Sort(is) s := NewSinglePackageInstanceVariable(pkg, is) variables[s.Identifier()] = s } diff --git a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/solver/lit_mapping.go b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/solver/lit_mapping.go index eb7a739aca..4117da2e1a 100644 --- a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/solver/lit_mapping.go +++ b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/solver/lit_mapping.go @@ -1,7 +1,9 @@ package solver import ( + "cmp" "fmt" + "slices" "strings" "github.com/go-air/gini/inter" @@ -203,5 +205,8 @@ func (d *litMapping) Conflicts(g inter.Assumable) []AppliedConstraint { as = append(as, a) } } + slices.SortFunc(as, func(a, b AppliedConstraint) int { + return cmp.Compare(a.String(), b.String()) + }) return as }