Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ type Operator struct {
clientFactory clients.Factory
muInstallPlan sync.Mutex
resolverSourceProvider *resolver.RegistrySourceProvider
operatorCacheProvider resolvercache.OperatorCacheProvider
}

type CatalogSourceSyncFunc func(logger *logrus.Entry, in *v1alpha1.CatalogSource) (out *v1alpha1.CatalogSource, continueSync bool, syncError error)
Expand Down Expand Up @@ -217,8 +218,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
}
op.sources = grpc.NewSourceStore(logger, 10*time.Second, 10*time.Minute, op.syncSourceState)
op.resolverSourceProvider = resolver.SourceProviderFromRegistryClientProvider(op.sources, lister.OperatorsV1alpha1().CatalogSourceLister(), logger)
op.operatorCacheProvider = resolver.NewOperatorCacheProvider(lister, crClient, op.resolverSourceProvider, logger)
op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, opClient, configmapRegistryImage, op.now, ssaClient, workloadUserID, opmImage, utilImage)
res := resolver.NewOperatorStepResolver(lister, crClient, operatorNamespace, op.resolverSourceProvider, logger)
res := resolver.NewOperatorStepResolver(lister, crClient, operatorNamespace, op.operatorCacheProvider, logger)
op.resolver = resolver.NewInstrumentedResolver(res, metrics.RegisterDependencyResolutionSuccess, metrics.RegisterDependencyResolutionFailure)

// Wire OLM CR sharedIndexInformers
Expand Down Expand Up @@ -360,7 +362,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
subscription.WithAppendedReconcilers(subscription.ReconcilerFromLegacySyncHandler(op.syncSubscriptions)),
subscription.WithRegistryReconcilerFactory(op.reconciler),
subscription.WithGlobalCatalogNamespace(op.namespace),
subscription.WithSourceProvider(op.resolverSourceProvider),
subscription.WithOperatorCacheProvider(op.operatorCacheProvider),
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -781,6 +783,7 @@ func (o *Operator) syncSourceState(state grpc.SourceState) {

o.logger.Infof("state.Key.Namespace=%s state.Key.Name=%s state.State=%s", state.Key.Namespace, state.Key.Name, state.State.String())
metrics.RegisterCatalogSourceState(state.Key.Name, state.Key.Namespace, state.State)
metrics.RegisterCatalogSourceSnapshotsTotal(state.Key.Name, state.Key.Namespace)

switch state.State {
case connectivity.Ready:
Expand Down Expand Up @@ -896,6 +899,7 @@ func (o *Operator) handleCatSrcDeletion(obj interface{}) {
o.logger.WithField("source", sourceKey).Info("removed client for deleted catalogsource")

metrics.DeleteCatalogSourceStateMetric(catsrc.GetName(), catsrc.GetNamespace())
metrics.DeleteCatalogSourceSnapshotsTotal(catsrc.GetName(), catsrc.GetNamespace())
}

func validateSourceType(logger *logrus.Entry, in *v1alpha1.CatalogSource) (out *v1alpha1.CatalogSource, continueSync bool, _ error) {
Expand All @@ -914,6 +918,7 @@ func validateSourceType(logger *logrus.Entry, in *v1alpha1.CatalogSource) (out *
err = fmt.Errorf("unknown sourcetype: %s", sourceType)
}
if err != nil {
logger.WithError(err).Error("error validating catalog source type")
out.SetError(v1alpha1.CatalogSourceSpecInvalidError, err)
return
}
Expand All @@ -925,7 +930,6 @@ func validateSourceType(logger *logrus.Entry, in *v1alpha1.CatalogSource) (out *
}
}
continueSync = true

return
}

Expand All @@ -938,27 +942,22 @@ func (o *Operator) syncConfigMap(logger *logrus.Entry, in *v1alpha1.CatalogSourc

out = in.DeepCopy()

logger = logger.WithFields(logrus.Fields{
"configmap.namespace": in.Namespace,
"configmap.name": in.Spec.ConfigMap,
})
logger.Info("checking catsrc configmap state")

var updateLabel bool
// Get the catalog source's config map
configMap, err := o.lister.CoreV1().ConfigMapLister().ConfigMaps(in.GetNamespace()).Get(in.Spec.ConfigMap)
// Attempt to look up the CM via api call if there is a cache miss
if apierrors.IsNotFound(err) {
// TODO: Don't reach out via live client if its not found in the cache (https://github.com/operator-framework/operator-lifecycle-manager/issues/3415)
configMap, err = o.opClient.KubernetesInterface().CoreV1().ConfigMaps(in.GetNamespace()).Get(context.TODO(), in.Spec.ConfigMap, metav1.GetOptions{})
// Found cm in the cluster, add managed label to configmap
if err == nil {
labels := configMap.GetLabels()
if labels == nil {
labels = make(map[string]string)
cmLabels := configMap.GetLabels()
if cmLabels == nil {
cmLabels = make(map[string]string)
}

labels[install.OLMManagedLabelKey] = "false"
configMap.SetLabels(labels)
cmLabels[install.OLMManagedLabelKey] = "false"
configMap.SetLabels(cmLabels)
updateLabel = true
}
}
Expand All @@ -975,12 +974,9 @@ func (o *Operator) syncConfigMap(logger *logrus.Entry, in *v1alpha1.CatalogSourc
out.SetError(v1alpha1.CatalogSourceConfigMapError, syncError)
return
}

logger.Info("adopted configmap")
}

if in.Status.ConfigMapResource == nil || !in.Status.ConfigMapResource.IsAMatch(&configMap.ObjectMeta) {
logger.Info("updating catsrc configmap state")
// configmap ref nonexistent or updated, write out the new configmap ref to status and exit
out.Status.ConfigMapResource = &v1alpha1.ConfigMapResourceReference{
Name: configMap.GetName(),
Expand All @@ -1000,7 +996,6 @@ func (o *Operator) syncConfigMap(logger *logrus.Entry, in *v1alpha1.CatalogSourc
func (o *Operator) syncRegistryServer(logger *logrus.Entry, in *v1alpha1.CatalogSource) (out *v1alpha1.CatalogSource, continueSync bool, syncError error) {
out = in.DeepCopy()

logger.Info("synchronizing registry server")
sourceKey := registry.CatalogKey{Name: in.GetName(), Namespace: in.GetNamespace()}
srcReconciler := o.reconciler.ReconcilerForSource(in)
if srcReconciler == nil {
Expand All @@ -1017,21 +1012,15 @@ func (o *Operator) syncRegistryServer(logger *logrus.Entry, in *v1alpha1.Catalog
return
}

logger.WithField("health", healthy).Infof("checked registry server health")

if healthy && in.Status.RegistryServiceStatus != nil {
logger.Info("registry state good")
continueSync = true
// return here if catalog does not have polling enabled
if !out.Poll() {
logger.Info("polling not enabled, nothing more to do")
return
}
}

// Registry pod hasn't been created or hasn't been updated since the last configmap update, recreate it
logger.Info("ensuring registry server")

err = srcReconciler.EnsureRegistryServer(logger, out)
if err != nil {
if _, ok := err.(reconciler.UpdateNotReadyErr); ok {
Expand All @@ -1044,8 +1033,6 @@ func (o *Operator) syncRegistryServer(logger *logrus.Entry, in *v1alpha1.Catalog
return
}

logger.Info("ensured registry server")

// requeue the catalog sync based on the polling interval, for accurate syncs of catalogs with polling enabled
if out.Spec.UpdateStrategy != nil && out.Spec.UpdateStrategy.RegistryPoll != nil {
if out.Spec.UpdateStrategy.Interval == nil {
Expand All @@ -1054,16 +1041,17 @@ func (o *Operator) syncRegistryServer(logger *logrus.Entry, in *v1alpha1.Catalog
return
}
if out.Spec.UpdateStrategy.RegistryPoll.ParsingError != "" && out.Status.Reason != v1alpha1.CatalogSourceIntervalInvalidError {
out.SetError(v1alpha1.CatalogSourceIntervalInvalidError, errors.New(out.Spec.UpdateStrategy.RegistryPoll.ParsingError))
err := errors.New(out.Spec.UpdateStrategy.RegistryPoll.ParsingError)
logger.WithError(err).Error("registry server sync error: failed to parse registry poll interval")
out.SetError(v1alpha1.CatalogSourceIntervalInvalidError, err)
}
logger.Infof("requeuing registry server sync based on polling interval %s", out.Spec.UpdateStrategy.Interval.Duration.String())
resyncPeriod := reconciler.SyncRegistryUpdateInterval(out, time.Now())
o.catsrcQueueSet.RequeueAfter(out.GetNamespace(), out.GetName(), queueinformer.ResyncWithJitter(resyncPeriod, 0.1)())
return
}

if err := o.sources.Remove(sourceKey); err != nil {
o.logger.WithError(err).Debug("error closing client connection")
o.logger.WithError(err).Error("registry server sync error: error closing client connection")
}

return
Expand Down Expand Up @@ -1154,7 +1142,6 @@ func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) {
"catalogsource.name": catsrc.Name,
"id": queueinformer.NewLoopID(),
})
logger.Info("syncing catalog source")

syncFunc := func(in *v1alpha1.CatalogSource, chain []CatalogSourceSyncFunc) (out *v1alpha1.CatalogSource, syncErr error) {
out = in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type syncerConfig struct {
reconcilers kubestate.ReconcilerChain
registryReconcilerFactory reconciler.RegistryReconcilerFactory
globalCatalogNamespace string
sourceProvider resolverCache.SourceProvider
operatorCacheProvider resolverCache.OperatorCacheProvider
}

// SyncerOption is a configuration option for a subscription syncer.
Expand Down Expand Up @@ -131,9 +131,9 @@ func WithGlobalCatalogNamespace(namespace string) SyncerOption {
}
}

func WithSourceProvider(provider resolverCache.SourceProvider) SyncerOption {
func WithOperatorCacheProvider(provider resolverCache.OperatorCacheProvider) SyncerOption {
return func(config *syncerConfig) {
config.sourceProvider = provider
config.operatorCacheProvider = provider
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ type catalogHealthReconciler struct {
catalogLister listers.CatalogSourceLister
registryReconcilerFactory reconciler.RegistryReconcilerFactory
globalCatalogNamespace string
sourceProvider cache.SourceProvider
operatorCacheProvider cache.OperatorCacheProvider
logger logrus.StdLogger
}

// Reconcile reconciles subscription catalog health conditions.
Expand Down Expand Up @@ -126,21 +127,16 @@ func (c *catalogHealthReconciler) Reconcile(ctx context.Context, in kubestate.St
// updateDeprecatedStatus adds deprecation status conditions to the subscription when present in the cache entry then
// returns a bool value of true if any changes to the existing subscription have occurred.
func (c *catalogHealthReconciler) updateDeprecatedStatus(ctx context.Context, sub *v1alpha1.Subscription) (bool, error) {
if c.sourceProvider == nil {
if c.operatorCacheProvider == nil {
return false, nil
}
source, ok := c.sourceProvider.Sources(sub.Spec.CatalogSourceNamespace)[cache.SourceKey{

entries := c.operatorCacheProvider.Namespaced(sub.Spec.CatalogSourceNamespace).Catalog(cache.SourceKey{
Name: sub.Spec.CatalogSource,
Namespace: sub.Spec.CatalogSourceNamespace,
}]
if !ok {
return false, nil
}
snapshot, err := source.Snapshot(ctx)
if err != nil {
return false, err
}
if len(snapshot.Entries) == 0 {
}).Find(cache.PkgPredicate(sub.Spec.Package), cache.ChannelPredicate(sub.Spec.Channel))

if len(entries) == 0 {
return false, nil
}

Expand All @@ -149,12 +145,9 @@ func (c *catalogHealthReconciler) updateDeprecatedStatus(ctx context.Context, su
var deprecations *cache.Deprecations

found := false
for _, entry := range snapshot.Entries {
for _, entry := range entries {
// Find the cache entry that matches this subscription
if entry.SourceInfo == nil || entry.Package() != sub.Spec.Package {
continue
}
if sub.Spec.Channel != "" && entry.Channel() != sub.Spec.Channel {
if entry.SourceInfo == nil {
continue
}
if sub.Status.InstalledCSV != entry.Name {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/operator-framework/api/pkg/operators/install"
"github.com/operator-framework/api/pkg/operators/v1alpha1"
listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1alpha1"
resolverCache "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/cache"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/ownerutil"
"github.com/operator-framework/operator-lifecycle-manager/pkg/metrics"
Expand All @@ -38,7 +37,6 @@ type subscriptionSyncer struct {
installPlanLister listers.InstallPlanLister
globalCatalogNamespace string
notify kubestate.NotifyFunc
sourceProvider resolverCache.SourceProvider
}

// now returns the Syncer's current time.
Expand Down Expand Up @@ -218,7 +216,6 @@ func newSyncerWithConfig(ctx context.Context, config *syncerConfig) (kubestate.S
reconcilers: config.reconcilers,
subscriptionCache: config.subscriptionInformer.GetIndexer(),
installPlanLister: config.lister.OperatorsV1alpha1().InstallPlanLister(),
sourceProvider: config.sourceProvider,
notify: func(event types.NamespacedName) {
// Notify Subscriptions by enqueuing to the Subscription queue.
config.subscriptionQueue.Add(event)
Expand Down Expand Up @@ -256,7 +253,8 @@ func newSyncerWithConfig(ctx context.Context, config *syncerConfig) (kubestate.S
catalogLister: config.lister.OperatorsV1alpha1().CatalogSourceLister(),
registryReconcilerFactory: config.registryReconcilerFactory,
globalCatalogNamespace: config.globalCatalogNamespace,
sourceProvider: config.sourceProvider,
operatorCacheProvider: config.operatorCacheProvider,
logger: config.logger,
},
}
s.reconcilers = append(defaultReconcilers, s.reconcilers...)
Expand Down
Loading