diff --git a/hack/cluster-version-util/task_graph.go b/hack/cluster-version-util/task_graph.go index a0f8e304a3..7f04e1eae7 100644 --- a/hack/cluster-version-util/task_graph.go +++ b/hack/cluster-version-util/task_graph.go @@ -30,7 +30,7 @@ func newTaskGraphCmd() *cobra.Command { func runTaskGraphCmd(cmd *cobra.Command, args []string) error { manifestDir := args[0] - release, err := payload.LoadUpdate(manifestDir, "") + release, err := payload.LoadUpdate(manifestDir, "", "") if err != nil { return err } diff --git a/lib/resourcebuilder/interface.go b/lib/resourcebuilder/interface.go index fb4598ee6c..7566ae0291 100644 --- a/lib/resourcebuilder/interface.go +++ b/lib/resourcebuilder/interface.go @@ -71,6 +71,7 @@ const ( UpdatingMode Mode = iota ReconcilingMode InitializingMode + PrecreatingMode ) type Interface interface { diff --git a/pkg/cvo/cvo.go b/pkg/cvo/cvo.go index a2ab841700..9d094022d9 100644 --- a/pkg/cvo/cvo.go +++ b/pkg/cvo/cvo.go @@ -236,7 +236,7 @@ func (vcb *verifyClientBuilder) HTTPClient() (*http.Client, error) { // controller that loads and applies content to the cluster. It returns an error if the payload appears to // be in error rather than continuing. func (optr *Operator) InitializeFromPayload(restConfig *rest.Config, burstRestConfig *rest.Config) error { - update, err := payload.LoadUpdate(optr.defaultPayloadDir(), optr.releaseImage) + update, err := payload.LoadUpdate(optr.defaultPayloadDir(), optr.releaseImage, optr.exclude) if err != nil { return fmt.Errorf("the local release contents are invalid - no current version can be determined from disk: %v", err) } @@ -663,7 +663,11 @@ func (b *resourceBuilder) builderFor(m *lib.Manifest, state payload.State) (reso } if b.clusterOperators != nil && m.GVK == configv1.SchemeGroupVersion.WithKind("ClusterOperator") { - return cvointernal.NewClusterOperatorBuilder(b.clusterOperators, *m), nil + client, err := clientset.NewForConfig(config) + if err != nil { + return nil, err + } + return cvointernal.NewClusterOperatorBuilder(b.clusterOperators, client.ConfigV1().ClusterOperators(), *m), nil } if resourcebuilder.Mapper.Exists(m.GVK) { return resourcebuilder.New(resourcebuilder.Mapper, config, *m) @@ -694,6 +698,8 @@ func stateToMode(state payload.State) resourcebuilder.Mode { return resourcebuilder.UpdatingMode case payload.ReconcilingPayload: return resourcebuilder.ReconcilingMode + case payload.PrecreatingPayload: + return resourcebuilder.PrecreatingMode default: panic(fmt.Sprintf("unexpected payload state %d", int(state))) } diff --git a/pkg/cvo/cvo_scenarios_test.go b/pkg/cvo/cvo_scenarios_test.go index 053c23666d..ed2def9d68 100644 --- a/pkg/cvo/cvo_scenarios_test.go +++ b/pkg/cvo/cvo_scenarios_test.go @@ -76,6 +76,7 @@ func setupCVOTest(payloadDir string) (*Operator, map[string]runtime.Object, *fak queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "cvo-loop-test"), client: client, cvLister: &clientCVLister{client: client}, + exclude: "exclude-test", } dynamicScheme := runtime.NewScheme() @@ -90,7 +91,7 @@ func setupCVOTest(payloadDir string) (*Operator, map[string]runtime.Object, *fak wait.Backoff{ Steps: 1, }, - "", + "exclude-test", ) o.configSync = worker diff --git a/pkg/cvo/internal/operatorstatus.go b/pkg/cvo/internal/operatorstatus.go index cda58cd74b..5ae3d65b1b 100644 --- a/pkg/cvo/internal/operatorstatus.go +++ b/pkg/cvo/internal/operatorstatus.go @@ -8,6 +8,7 @@ import ( "time" "unicode" + kerrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/serializer" @@ -48,16 +49,16 @@ func readClusterOperatorV1OrDie(objBytes []byte) *configv1.ClusterOperator { } type clusterOperatorBuilder struct { - client ClusterOperatorsGetter - raw []byte - modifier resourcebuilder.MetaV1ObjectModifierFunc - mode resourcebuilder.Mode + client ClusterOperatorsGetter + createClient configclientv1.ClusterOperatorInterface + raw []byte + modifier resourcebuilder.MetaV1ObjectModifierFunc + mode resourcebuilder.Mode } func newClusterOperatorBuilder(config *rest.Config, m lib.Manifest) resourcebuilder.Interface { - return NewClusterOperatorBuilder(clientClusterOperatorsGetter{ - getter: configclientv1.NewForConfigOrDie(config).ClusterOperators(), - }, m) + client := configclientv1.NewForConfigOrDie(config).ClusterOperators() + return NewClusterOperatorBuilder(clientClusterOperatorsGetter{getter: client}, client, m) } // ClusterOperatorsGetter abstracts object access with a client or a cache lister. @@ -75,10 +76,11 @@ func (g clientClusterOperatorsGetter) Get(name string) (*configv1.ClusterOperato // NewClusterOperatorBuilder accepts the ClusterOperatorsGetter interface which may be implemented by a // client or a lister cache. -func NewClusterOperatorBuilder(client ClusterOperatorsGetter, m lib.Manifest) resourcebuilder.Interface { +func NewClusterOperatorBuilder(client ClusterOperatorsGetter, createClient configclientv1.ClusterOperatorInterface, m lib.Manifest) resourcebuilder.Interface { return &clusterOperatorBuilder{ - client: client, - raw: m.Raw, + client: client, + createClient: createClient, + raw: m.Raw, } } @@ -97,6 +99,26 @@ func (b *clusterOperatorBuilder) Do(ctx context.Context) error { if b.modifier != nil { b.modifier(os) } + + // create the object, and if we successfully created, update the status + if b.mode == resourcebuilder.PrecreatingMode { + clusterOperator, err := b.createClient.Create(os) + if err != nil { + if kerrors.IsAlreadyExists(err) { + return nil + } + return err + } + clusterOperator.Status.RelatedObjects = os.Status.DeepCopy().RelatedObjects + if _, err := b.createClient.UpdateStatus(clusterOperator); err != nil { + if kerrors.IsConflict(err) { + return nil + } + return err + } + return nil + } + return waitForOperatorStatusToBeDone(ctx, 1*time.Second, b.client, os, b.mode) } diff --git a/pkg/cvo/sync_worker.go b/pkg/cvo/sync_worker.go index ae57f76a7e..86e6612b13 100644 --- a/pkg/cvo/sync_worker.go +++ b/pkg/cvo/sync_worker.go @@ -495,7 +495,7 @@ func (w *SyncWorker) syncOnce(ctx context.Context, work *SyncWork, maxWorkers in return err } - payloadUpdate, err := payload.LoadUpdate(info.Directory, update.Image) + payloadUpdate, err := payload.LoadUpdate(info.Directory, update.Image, w.exclude) if err != nil { reporter.Report(SyncWorkerStatus{ Generation: work.Generation, @@ -585,12 +585,14 @@ func (w *SyncWorker) apply(ctx context.Context, payloadUpdate *payload.Update, w } graph := payload.NewTaskGraph(tasks) graph.Split(payload.SplitOnJobs) + var precreateObjects bool switch work.State { case payload.InitializingPayload: // Create every component in parallel to maximize reaching steady // state. graph.Parallelize(payload.FlattenByNumberAndComponent) maxWorkers = len(graph.Nodes) + precreateObjects = true case payload.ReconcilingPayload: // Run the graph in random order during reconcile so that we don't // hang on any particular component - we seed from the number of @@ -608,6 +610,28 @@ func (w *SyncWorker) apply(ctx context.Context, payloadUpdate *payload.Update, w // perform an orderly roll out by payload order, using some parallelization // but avoiding out of order creation so components have some base graph.Parallelize(payload.ByNumberAndComponent) + precreateObjects = true + } + + // in specific modes, attempt to precreate a set of known types (currently ClusterOperator) without + // retries + if precreateObjects { + payload.RunGraph(ctx, graph, 8, func(ctx context.Context, tasks []*payload.Task) error { + for _, task := range tasks { + if err := ctx.Err(); err != nil { + return cr.ContextError(err) + } + if task.Manifest.GVK != configv1.SchemeGroupVersion.WithKind("ClusterOperator") { + continue + } + if err := w.builder.Apply(ctx, task.Manifest, payload.PrecreatingPayload); err != nil { + klog.V(2).Infof("Unable to precreate resource %s: %v", task, err) + continue + } + klog.V(4).Infof("Precreated resource %s", task) + } + return nil + }) } // update each object @@ -621,7 +645,7 @@ func (w *SyncWorker) apply(ctx context.Context, payloadUpdate *payload.Update, w klog.V(4).Infof("Running sync for %s", task) klog.V(5).Infof("Manifest: %s", string(task.Manifest.Raw)) - ov, ok := getOverrideForManifest(work.Overrides, w.exclude, task.Manifest) + ov, ok := getOverrideForManifest(work.Overrides, task.Manifest) if ok && ov.Unmanaged { klog.V(4).Infof("Skipping %s as unmanaged", task) continue @@ -916,7 +940,7 @@ func newMultipleError(errs []error) error { } // getOverrideForManifest returns the override and true when override exists for manifest. -func getOverrideForManifest(overrides []configv1.ComponentOverride, excludeIdentifier string, manifest *lib.Manifest) (configv1.ComponentOverride, bool) { +func getOverrideForManifest(overrides []configv1.ComponentOverride, manifest *lib.Manifest) (configv1.ComponentOverride, bool) { for idx, ov := range overrides { kind, namespace, name := manifest.GVK.Kind, manifest.Object().GetNamespace(), manifest.Object().GetName() if ov.Kind == kind && @@ -925,10 +949,6 @@ func getOverrideForManifest(overrides []configv1.ComponentOverride, excludeIdent return overrides[idx], true } } - excludeAnnotation := fmt.Sprintf("exclude.release.openshift.io/%s", excludeIdentifier) - if annotations := manifest.Object().GetAnnotations(); annotations != nil && annotations[excludeAnnotation] == "true" { - return configv1.ComponentOverride{Unmanaged: true}, true - } return configv1.ComponentOverride{}, false } diff --git a/pkg/cvo/testdata/payloadtest/release-manifests/0000_20_a_exclude.yml b/pkg/cvo/testdata/payloadtest/release-manifests/0000_20_a_exclude.yml new file mode 100644 index 0000000000..22ba8b5f91 --- /dev/null +++ b/pkg/cvo/testdata/payloadtest/release-manifests/0000_20_a_exclude.yml @@ -0,0 +1,6 @@ +kind: Test +apiVersion: v1 +metadata: + name: file-20-yml + annotations: + exclude.release.openshift.io/exclude-test: "true" diff --git a/pkg/payload/payload.go b/pkg/payload/payload.go index c2c880c8de..b5a3eec31c 100644 --- a/pkg/payload/payload.go +++ b/pkg/payload/payload.go @@ -56,6 +56,11 @@ const ( // Our goal is to get the entire payload created, even if some // operators are still converging. InitializingPayload + // PrecreatingPayload indicates we are selectively creating + // specific resources during a first pass of the payload to + // provide better visibility during install and upgrade of + // error conditions. + PrecreatingPayload ) // Initializing is true if the state is InitializingPayload. @@ -102,7 +107,7 @@ type Update struct { Manifests []lib.Manifest } -func LoadUpdate(dir, releaseImage string) (*Update, error) { +func LoadUpdate(dir, releaseImage, excludeIdentifier string) (*Update, error) { payload, tasks, err := loadUpdatePayloadMetadata(dir, releaseImage) if err != nil { return nil, err @@ -149,6 +154,15 @@ func LoadUpdate(dir, releaseImage string) (*Update, error) { errs = append(errs, errors.Wrapf(err, "error parsing %s", file.Name())) continue } + // Filter out manifests that should be excluded based on annotation + filteredMs := []lib.Manifest{} + for _, manifest := range ms { + if shouldExclude(excludeIdentifier, &manifest) { + continue + } + filteredMs = append(filteredMs, manifest) + } + ms = filteredMs for i := range ms { ms[i].OriginalFilename = filepath.Base(file.Name()) } @@ -174,6 +188,12 @@ func LoadUpdate(dir, releaseImage string) (*Update, error) { return payload, nil } +func shouldExclude(excludeIdentifier string, manifest *lib.Manifest) bool { + excludeAnnotation := fmt.Sprintf("exclude.release.openshift.io/%s", excludeIdentifier) + annotations := manifest.Object().GetAnnotations() + return annotations != nil && annotations[excludeAnnotation] == "true" +} + // ValidateDirectory checks if a directory can be a candidate update by // looking for known files. It returns an error if the directory cannot // be an update. diff --git a/pkg/payload/payload_test.go b/pkg/payload/payload_test.go index 376d3f1586..40d64f49ac 100644 --- a/pkg/payload/payload_test.go +++ b/pkg/payload/payload_test.go @@ -104,7 +104,7 @@ func Test_loadUpdatePayload(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := LoadUpdate(tt.args.dir, tt.args.releaseImage) + got, err := LoadUpdate(tt.args.dir, tt.args.releaseImage, "exclude-test") if (err != nil) != tt.wantErr { t.Errorf("loadUpdatePayload() error = %v, wantErr %v", err, tt.wantErr) return diff --git a/pkg/payload/task_graph_test.go b/pkg/payload/task_graph_test.go index 3ae17549b5..e0230b4ec7 100644 --- a/pkg/payload/task_graph_test.go +++ b/pkg/payload/task_graph_test.go @@ -487,7 +487,7 @@ func Test_TaskGraph_real(t *testing.T) { if len(path) == 0 { t.Skip("TEST_GRAPH_PATH unset") } - p, err := LoadUpdate(path, "arbitrary/image:1") + p, err := LoadUpdate(path, "arbitrary/image:1", "") if err != nil { t.Fatal(err) }