Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion staging/operator-lifecycle-manager/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ require (
github.com/spf13/cobra v1.7.0
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.4
golang.org/x/exp v0.0.0-20231006140011-7918f672742d
golang.org/x/net v0.34.0
golang.org/x/sync v0.10.0
golang.org/x/time v0.3.0
Expand Down Expand Up @@ -216,7 +217,6 @@ require (
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/crypto v0.32.0 // indirect
golang.org/x/exp v0.0.0-20231006140011-7918f672742d // indirect
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/oauth2 v0.14.0 // indirect
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ type Operator struct {
bundleUnpackTimeout time.Duration
clientFactory clients.Factory
muInstallPlan sync.Mutex
sourceInvalidator *resolver.RegistrySourceProvider
resolverSourceProvider *resolver.RegistrySourceProvider
operatorCacheProvider resolvercache.OperatorCacheProvider
}

type CatalogSourceSyncFunc func(logger *logrus.Entry, in *v1alpha1.CatalogSource) (out *v1alpha1.CatalogSource, continueSync bool, syncError error)
Expand Down Expand Up @@ -215,10 +216,10 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
clientFactory: clients.NewFactory(validatingConfig),
}
op.sources = grpc.NewSourceStore(logger, 10*time.Second, 10*time.Minute, op.syncSourceState)
op.sourceInvalidator = resolver.SourceProviderFromRegistryClientProvider(op.sources, lister.OperatorsV1alpha1().CatalogSourceLister(), logger)
resolverSourceProvider := NewOperatorGroupToggleSourceProvider(op.sourceInvalidator, logger, op.lister.OperatorsV1().OperatorGroupLister())
op.resolverSourceProvider = resolver.SourceProviderFromRegistryClientProvider(op.sources, lister.OperatorsV1alpha1().CatalogSourceLister(), logger)
op.operatorCacheProvider = resolver.NewOperatorCacheProvider(lister, crClient, op.resolverSourceProvider, logger)
op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, opClient, configmapRegistryImage, op.now, ssaClient, workloadUserID, opmImage, utilImage)
res := resolver.NewOperatorStepResolver(lister, crClient, operatorNamespace, resolverSourceProvider, logger)
res := resolver.NewOperatorStepResolver(lister, crClient, operatorNamespace, op.operatorCacheProvider, logger)
op.resolver = resolver.NewInstrumentedResolver(res, metrics.RegisterDependencyResolutionSuccess, metrics.RegisterDependencyResolutionFailure)

// Wire OLM CR sharedIndexInformers
Expand Down Expand Up @@ -347,7 +348,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
subscription.WithAppendedReconcilers(subscription.ReconcilerFromLegacySyncHandler(op.syncSubscriptions, nil)),
subscription.WithRegistryReconcilerFactory(op.reconciler),
subscription.WithGlobalCatalogNamespace(op.namespace),
subscription.WithSourceProvider(op.sourceInvalidator),
subscription.WithOperatorCacheProvider(op.operatorCacheProvider),
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -763,10 +764,11 @@ func (o *Operator) syncSourceState(state grpc.SourceState) {

o.logger.Infof("state.Key.Namespace=%s state.Key.Name=%s state.State=%s", state.Key.Namespace, state.Key.Name, state.State.String())
metrics.RegisterCatalogSourceState(state.Key.Name, state.Key.Namespace, state.State)
metrics.RegisterCatalogSourceSnapshotsTotal(state.Key.Name, state.Key.Namespace)

switch state.State {
case connectivity.Ready:
o.sourceInvalidator.Invalidate(resolvercache.SourceKey(state.Key))
o.resolverSourceProvider.Invalidate(resolvercache.SourceKey(state.Key))
if o.namespace == state.Key.Namespace {
namespaces, err := index.CatalogSubscriberNamespaces(o.catalogSubscriberIndexer,
state.Key.Name, state.Key.Namespace)
Expand Down Expand Up @@ -880,6 +882,7 @@ func (o *Operator) handleCatSrcDeletion(obj interface{}) {
o.logger.WithField("source", sourceKey).Info("removed client for deleted catalogsource")

metrics.DeleteCatalogSourceStateMetric(catsrc.GetName(), catsrc.GetNamespace())
metrics.DeleteCatalogSourceSnapshotsTotal(catsrc.GetName(), catsrc.GetNamespace())
}

func validateSourceType(logger *logrus.Entry, in *v1alpha1.CatalogSource) (out *v1alpha1.CatalogSource, continueSync bool, _ error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type syncerConfig struct {
reconcilers kubestate.ReconcilerChain
registryReconcilerFactory reconciler.RegistryReconcilerFactory
globalCatalogNamespace string
sourceProvider resolverCache.SourceProvider
operatorCacheProvider resolverCache.OperatorCacheProvider
}

// SyncerOption is a configuration option for a subscription syncer.
Expand Down Expand Up @@ -130,9 +130,9 @@ func WithGlobalCatalogNamespace(namespace string) SyncerOption {
}
}

func WithSourceProvider(provider resolverCache.SourceProvider) SyncerOption {
func WithOperatorCacheProvider(provider resolverCache.OperatorCacheProvider) SyncerOption {
return func(config *syncerConfig) {
config.sourceProvider = provider
config.operatorCacheProvider = provider
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ type catalogHealthReconciler struct {
catalogLister listers.CatalogSourceLister
registryReconcilerFactory reconciler.RegistryReconcilerFactory
globalCatalogNamespace string
sourceProvider cache.SourceProvider
operatorCacheProvider cache.OperatorCacheProvider
logger logrus.StdLogger
}

// Reconcile reconciles subscription catalog health conditions.
Expand Down Expand Up @@ -130,21 +131,16 @@ func (c *catalogHealthReconciler) Reconcile(ctx context.Context, in kubestate.St
// updateDeprecatedStatus adds deprecation status conditions to the subscription when present in the cache entry then
// returns a bool value of true if any changes to the existing subscription have occurred.
func (c *catalogHealthReconciler) updateDeprecatedStatus(ctx context.Context, sub *v1alpha1.Subscription) (bool, error) {
if c.sourceProvider == nil {
if c.operatorCacheProvider == nil {
return false, nil
}
source, ok := c.sourceProvider.Sources(sub.Spec.CatalogSourceNamespace)[cache.SourceKey{

entries := c.operatorCacheProvider.Namespaced(sub.Spec.CatalogSourceNamespace).Catalog(cache.SourceKey{
Name: sub.Spec.CatalogSource,
Namespace: sub.Spec.CatalogSourceNamespace,
}]
if !ok {
return false, nil
}
snapshot, err := source.Snapshot(ctx)
if err != nil {
return false, err
}
if len(snapshot.Entries) == 0 {
}).Find(cache.PkgPredicate(sub.Spec.Package), cache.ChannelPredicate(sub.Spec.Channel))

if len(entries) == 0 {
return false, nil
}

Expand All @@ -153,12 +149,9 @@ func (c *catalogHealthReconciler) updateDeprecatedStatus(ctx context.Context, su
var deprecations *cache.Deprecations

found := false
for _, entry := range snapshot.Entries {
for _, entry := range entries {
// Find the cache entry that matches this subscription
if entry.SourceInfo == nil || entry.Package() != sub.Spec.Package {
continue
}
if sub.Spec.Channel != "" && entry.Channel() != sub.Spec.Channel {
if entry.SourceInfo == nil {
continue
}
if sub.Status.InstalledCSV != entry.Name {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/operator-framework/api/pkg/operators/install"
"github.com/operator-framework/api/pkg/operators/v1alpha1"
listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1alpha1"
resolverCache "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/cache"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/ownerutil"
"github.com/operator-framework/operator-lifecycle-manager/pkg/metrics"
Expand All @@ -35,7 +34,6 @@ type subscriptionSyncer struct {
installPlanLister listers.InstallPlanLister
globalCatalogNamespace string
notify kubestate.NotifyFunc
sourceProvider resolverCache.SourceProvider
}

// now returns the Syncer's current time.
Expand Down Expand Up @@ -216,7 +214,6 @@ func newSyncerWithConfig(ctx context.Context, config *syncerConfig) (kubestate.S
reconcilers: config.reconcilers,
subscriptionCache: config.subscriptionInformer.GetIndexer(),
installPlanLister: config.lister.OperatorsV1alpha1().InstallPlanLister(),
sourceProvider: config.sourceProvider,
notify: func(event kubestate.ResourceEvent) {
// Notify Subscriptions by enqueuing to the Subscription queue.
config.subscriptionQueue.Add(event)
Expand All @@ -237,7 +234,8 @@ func newSyncerWithConfig(ctx context.Context, config *syncerConfig) (kubestate.S
catalogLister: config.lister.OperatorsV1alpha1().CatalogSourceLister(),
registryReconcilerFactory: config.registryReconcilerFactory,
globalCatalogNamespace: config.globalCatalogNamespace,
sourceProvider: config.sourceProvider,
operatorCacheProvider: config.operatorCacheProvider,
logger: config.logger,
},
}
s.reconcilers = append(defaultReconcilers, s.reconcilers...)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/solver"
"github.com/operator-framework/operator-registry/pkg/api"
opregistry "github.com/operator-framework/operator-registry/pkg/registry"

"golang.org/x/exp/slices"
)

// constraintProvider knows how to provide solver constraints for a given cache entry.
Expand All @@ -29,15 +31,15 @@ type constraintProvider interface {
}

type Resolver struct {
cache *cache.Cache
cache cache.OperatorCacheProvider
log logrus.FieldLogger
pc *predicateConverter
systemConstraintsProvider constraintProvider
}

func NewDefaultResolver(rcp cache.SourceProvider, sourcePriorityProvider cache.SourcePriorityProvider, logger logrus.FieldLogger) *Resolver {
func NewDefaultResolver(cacheProvider cache.OperatorCacheProvider, logger logrus.FieldLogger) *Resolver {
return &Resolver{
cache: cache.New(rcp, cache.WithLogger(logger), cache.WithSourcePriorityProvider(sourcePriorityProvider)),
cache: cacheProvider,
log: logger,
pc: &predicateConverter{
celEnv: constraints.NewCelEnvironment(),
Expand Down Expand Up @@ -513,11 +515,13 @@ func (r *Resolver) addInvariants(namespacedCache cache.MultiCatalogOperatorFinde
}

for gvk, is := range gvkConflictToVariable {
slices.Sort(is)
s := NewSingleAPIProviderVariable(gvk.Group, gvk.Version, gvk.Kind, is)
variables[s.Identifier()] = s
}

for pkg, is := range packageConflictToVariable {
slices.Sort(is)
s := NewSinglePackageInstanceVariable(pkg, is)
variables[s.Identifier()] = s
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"github.com/go-air/gini/inter"
"github.com/go-air/gini/logic"
"github.com/go-air/gini/z"

"golang.org/x/exp/slices"
)

type DuplicateIdentifier Identifier
Expand Down Expand Up @@ -203,5 +205,17 @@ func (d *litMapping) Conflicts(g inter.Assumable) []AppliedConstraint {
as = append(as, a)
}
}
slices.SortFunc(as, func(a, b AppliedConstraint) int {
return strCmp(a.String(), b.String())
})
return as
}

func strCmp(str1, str2 string) int {
if str1 < str2 {
return -1
} else if str1 > str2 {
return 1
}
return 0
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
v1alpha1listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1alpha1"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/cache"
"github.com/operator-framework/operator-lifecycle-manager/pkg/metrics"
"github.com/operator-framework/operator-registry/pkg/api"
"github.com/operator-framework/operator-registry/pkg/client"
opregistry "github.com/operator-framework/operator-registry/pkg/registry"
Expand Down Expand Up @@ -143,6 +144,9 @@ type registrySource struct {
}

func (s *registrySource) Snapshot(ctx context.Context) (*cache.Snapshot, error) {
s.logger.Printf("requesting snapshot for catalog source %s/%s", s.key.Namespace, s.key.Name)
metrics.IncrementCatalogSourceSnapshotsTotal(s.key.Name, s.key.Namespace)

// Fetching default channels this way makes many round trips
// -- may need to either add a new API to fetch all at once,
// or embed the information into Bundle.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (pp catsrcPriorityProvider) Priority(key cache.SourceKey) int {
return catsrc.Spec.Priority
}

func NewOperatorStepResolver(lister operatorlister.OperatorLister, client versioned.Interface, globalCatalogNamespace string, sourceProvider cache.SourceProvider, log logrus.FieldLogger) *OperatorStepResolver {
func NewOperatorCacheProvider(lister operatorlister.OperatorLister, client versioned.Interface, sourceProvider cache.SourceProvider, log logrus.FieldLogger) cache.OperatorCacheProvider {
cacheSourceProvider := &mergedSourceProvider{
sps: []cache.SourceProvider{
sourceProvider,
Expand All @@ -70,13 +70,19 @@ func NewOperatorStepResolver(lister operatorlister.OperatorLister, client versio
},
},
}
catSrcPriorityProvider := &catsrcPriorityProvider{lister: lister.OperatorsV1alpha1().CatalogSourceLister()}

return cache.New(cacheSourceProvider, cache.WithLogger(log), cache.WithSourcePriorityProvider(catSrcPriorityProvider))
}

func NewOperatorStepResolver(lister operatorlister.OperatorLister, client versioned.Interface, globalCatalogNamespace string, opCacheProvider cache.OperatorCacheProvider, log logrus.FieldLogger) *OperatorStepResolver {
stepResolver := &OperatorStepResolver{
subLister: lister.OperatorsV1alpha1().SubscriptionLister(),
csvLister: lister.OperatorsV1alpha1().ClusterServiceVersionLister(),
ogLister: lister.OperatorsV1().OperatorGroupLister(),
client: client,
globalCatalogNamespace: globalCatalogNamespace,
resolver: NewDefaultResolver(cacheSourceProvider, catsrcPriorityProvider{lister: lister.OperatorsV1alpha1().CatalogSourceLister()}, log),
resolver: NewDefaultResolver(opCacheProvider, log),
log: log,
}

Expand Down
21 changes: 21 additions & 0 deletions staging/operator-lifecycle-manager/pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,14 @@ var (
[]string{NamespaceLabel, NameLabel},
)

catalogSourceSnapshotsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "catalog_source_snapshots_total",
Help: "The number of times the catalog operator has requested a snapshot of data from a catalog source",
},
[]string{NamespaceLabel, NameLabel},
)

// exported since it's not handled by HandleMetrics
CSVUpgradeCount = prometheus.NewCounter(
prometheus.CounterOpts{
Expand Down Expand Up @@ -250,6 +258,7 @@ func RegisterCatalog() {
prometheus.MustRegister(subscriptionCount)
prometheus.MustRegister(catalogSourceCount)
prometheus.MustRegister(catalogSourceReady)
prometheus.MustRegister(catalogSourceSnapshotsTotal)
prometheus.MustRegister(SubscriptionSyncCount)
prometheus.MustRegister(dependencyResolutionSummary)
prometheus.MustRegister(installPlanWarningCount)
Expand All @@ -272,6 +281,18 @@ func DeleteCatalogSourceStateMetric(name, namespace string) {
catalogSourceReady.DeleteLabelValues(namespace, name)
}

func RegisterCatalogSourceSnapshotsTotal(name, namespace string) {
catalogSourceSnapshotsTotal.WithLabelValues(namespace, name).Add(0)
}

func IncrementCatalogSourceSnapshotsTotal(name, namespace string) {
catalogSourceSnapshotsTotal.WithLabelValues(namespace, name).Inc()
}

func DeleteCatalogSourceSnapshotsTotal(name, namespace string) {
catalogSourceSnapshotsTotal.DeleteLabelValues(namespace, name)
}

func DeleteCSVMetric(oldCSV *operatorsv1alpha1.ClusterServiceVersion) {
// Delete the old CSV metrics
csvAbnormal.DeleteLabelValues(oldCSV.Namespace, oldCSV.Name, oldCSV.Spec.Version.String(), string(oldCSV.Status.Phase), string(oldCSV.Status.Reason))
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading