From 4206f11756616a9eea136b78a12fe4163c4befa9 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Fri, 7 Feb 2020 14:03:38 -0500 Subject: [PATCH 1/2] cvo: When installing or upgrading, fast-fill cluster-operators The must-gather and insights operator depend on cluster operators and related objects in order to identify resources to create. Because cluster operators are delegated to the operator install and upgrade failures of new operators can fail to gather the requisite info if the cluster degrades before those steps. Add a new selective Precreating install mode and do a single pass over all cluster operators in the payload without retries at the beginning of an initializing or upgrading sync pass to attempt to create the ClusterOperators if they don't exist. If we succeed at creating the object, try exactly once to update status so that relatedObjects can be set. --- lib/resourcebuilder/interface.go | 1 + pkg/cvo/cvo.go | 8 +++++- pkg/cvo/internal/operatorstatus.go | 42 +++++++++++++++++++++++------- pkg/cvo/sync_worker.go | 24 +++++++++++++++++ pkg/payload/payload.go | 5 ++++ 5 files changed, 69 insertions(+), 11 deletions(-) diff --git a/lib/resourcebuilder/interface.go b/lib/resourcebuilder/interface.go index fb4598ee6..7566ae029 100644 --- a/lib/resourcebuilder/interface.go +++ b/lib/resourcebuilder/interface.go @@ -71,6 +71,7 @@ const ( UpdatingMode Mode = iota ReconcilingMode InitializingMode + PrecreatingMode ) type Interface interface { diff --git a/pkg/cvo/cvo.go b/pkg/cvo/cvo.go index a2ab84170..94a466dc8 100644 --- a/pkg/cvo/cvo.go +++ b/pkg/cvo/cvo.go @@ -663,7 +663,11 @@ func (b *resourceBuilder) builderFor(m *lib.Manifest, state payload.State) (reso } if b.clusterOperators != nil && m.GVK == configv1.SchemeGroupVersion.WithKind("ClusterOperator") { - return cvointernal.NewClusterOperatorBuilder(b.clusterOperators, *m), nil + client, err := clientset.NewForConfig(config) + if err != nil { + return nil, err + } + return cvointernal.NewClusterOperatorBuilder(b.clusterOperators, client.ConfigV1().ClusterOperators(), *m), nil } if resourcebuilder.Mapper.Exists(m.GVK) { return resourcebuilder.New(resourcebuilder.Mapper, config, *m) @@ -694,6 +698,8 @@ func stateToMode(state payload.State) resourcebuilder.Mode { return resourcebuilder.UpdatingMode case payload.ReconcilingPayload: return resourcebuilder.ReconcilingMode + case payload.PrecreatingPayload: + return resourcebuilder.PrecreatingMode default: panic(fmt.Sprintf("unexpected payload state %d", int(state))) } diff --git a/pkg/cvo/internal/operatorstatus.go b/pkg/cvo/internal/operatorstatus.go index cda58cd74..5ae3d65b1 100644 --- a/pkg/cvo/internal/operatorstatus.go +++ b/pkg/cvo/internal/operatorstatus.go @@ -8,6 +8,7 @@ import ( "time" "unicode" + kerrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/serializer" @@ -48,16 +49,16 @@ func readClusterOperatorV1OrDie(objBytes []byte) *configv1.ClusterOperator { } type clusterOperatorBuilder struct { - client ClusterOperatorsGetter - raw []byte - modifier resourcebuilder.MetaV1ObjectModifierFunc - mode resourcebuilder.Mode + client ClusterOperatorsGetter + createClient configclientv1.ClusterOperatorInterface + raw []byte + modifier resourcebuilder.MetaV1ObjectModifierFunc + mode resourcebuilder.Mode } func newClusterOperatorBuilder(config *rest.Config, m lib.Manifest) resourcebuilder.Interface { - return NewClusterOperatorBuilder(clientClusterOperatorsGetter{ - getter: configclientv1.NewForConfigOrDie(config).ClusterOperators(), - }, m) + client := configclientv1.NewForConfigOrDie(config).ClusterOperators() + return NewClusterOperatorBuilder(clientClusterOperatorsGetter{getter: client}, client, m) } // ClusterOperatorsGetter abstracts object access with a client or a cache lister. @@ -75,10 +76,11 @@ func (g clientClusterOperatorsGetter) Get(name string) (*configv1.ClusterOperato // NewClusterOperatorBuilder accepts the ClusterOperatorsGetter interface which may be implemented by a // client or a lister cache. -func NewClusterOperatorBuilder(client ClusterOperatorsGetter, m lib.Manifest) resourcebuilder.Interface { +func NewClusterOperatorBuilder(client ClusterOperatorsGetter, createClient configclientv1.ClusterOperatorInterface, m lib.Manifest) resourcebuilder.Interface { return &clusterOperatorBuilder{ - client: client, - raw: m.Raw, + client: client, + createClient: createClient, + raw: m.Raw, } } @@ -97,6 +99,26 @@ func (b *clusterOperatorBuilder) Do(ctx context.Context) error { if b.modifier != nil { b.modifier(os) } + + // create the object, and if we successfully created, update the status + if b.mode == resourcebuilder.PrecreatingMode { + clusterOperator, err := b.createClient.Create(os) + if err != nil { + if kerrors.IsAlreadyExists(err) { + return nil + } + return err + } + clusterOperator.Status.RelatedObjects = os.Status.DeepCopy().RelatedObjects + if _, err := b.createClient.UpdateStatus(clusterOperator); err != nil { + if kerrors.IsConflict(err) { + return nil + } + return err + } + return nil + } + return waitForOperatorStatusToBeDone(ctx, 1*time.Second, b.client, os, b.mode) } diff --git a/pkg/cvo/sync_worker.go b/pkg/cvo/sync_worker.go index ae57f76a7..85d5d871e 100644 --- a/pkg/cvo/sync_worker.go +++ b/pkg/cvo/sync_worker.go @@ -585,12 +585,14 @@ func (w *SyncWorker) apply(ctx context.Context, payloadUpdate *payload.Update, w } graph := payload.NewTaskGraph(tasks) graph.Split(payload.SplitOnJobs) + var precreateObjects bool switch work.State { case payload.InitializingPayload: // Create every component in parallel to maximize reaching steady // state. graph.Parallelize(payload.FlattenByNumberAndComponent) maxWorkers = len(graph.Nodes) + precreateObjects = true case payload.ReconcilingPayload: // Run the graph in random order during reconcile so that we don't // hang on any particular component - we seed from the number of @@ -608,6 +610,28 @@ func (w *SyncWorker) apply(ctx context.Context, payloadUpdate *payload.Update, w // perform an orderly roll out by payload order, using some parallelization // but avoiding out of order creation so components have some base graph.Parallelize(payload.ByNumberAndComponent) + precreateObjects = true + } + + // in specific modes, attempt to precreate a set of known types (currently ClusterOperator) without + // retries + if precreateObjects { + payload.RunGraph(ctx, graph, 8, func(ctx context.Context, tasks []*payload.Task) error { + for _, task := range tasks { + if contextIsCancelled(ctx) { + return cr.CancelError() + } + if task.Manifest.GVK != configv1.SchemeGroupVersion.WithKind("ClusterOperator") { + continue + } + if err := w.builder.Apply(ctx, task.Manifest, payload.PrecreatingPayload); err != nil { + klog.V(2).Infof("Unable to precreate resource %s: %v", task, err) + continue + } + klog.V(4).Infof("Precreated resource %s", task) + } + return nil + }) } // update each object diff --git a/pkg/payload/payload.go b/pkg/payload/payload.go index c2c880c8d..b8e170c11 100644 --- a/pkg/payload/payload.go +++ b/pkg/payload/payload.go @@ -56,6 +56,11 @@ const ( // Our goal is to get the entire payload created, even if some // operators are still converging. InitializingPayload + // PrecreatingPayload indicates we are selectively creating + // specific resources during a first pass of the payload to + // provide better visibility during install and upgrade of + // error conditions. + PrecreatingPayload ) // Initializing is true if the state is InitializingPayload. From 44b7cb9606da2139945688061a8e62143b738582 Mon Sep 17 00:00:00 2001 From: Cesar Wong Date: Wed, 27 May 2020 18:27:02 -0400 Subject: [PATCH 2/2] Avoid pre-creating clusteroperators that should be excluded --- hack/cluster-version-util/task_graph.go | 2 +- pkg/cvo/cvo.go | 2 +- pkg/cvo/cvo_scenarios_test.go | 3 ++- pkg/cvo/sync_worker.go | 14 +++++--------- .../release-manifests/0000_20_a_exclude.yml | 6 ++++++ pkg/payload/payload.go | 17 ++++++++++++++++- pkg/payload/payload_test.go | 2 +- pkg/payload/task_graph_test.go | 2 +- 8 files changed, 33 insertions(+), 15 deletions(-) create mode 100644 pkg/cvo/testdata/payloadtest/release-manifests/0000_20_a_exclude.yml diff --git a/hack/cluster-version-util/task_graph.go b/hack/cluster-version-util/task_graph.go index a0f8e304a..7f04e1eae 100644 --- a/hack/cluster-version-util/task_graph.go +++ b/hack/cluster-version-util/task_graph.go @@ -30,7 +30,7 @@ func newTaskGraphCmd() *cobra.Command { func runTaskGraphCmd(cmd *cobra.Command, args []string) error { manifestDir := args[0] - release, err := payload.LoadUpdate(manifestDir, "") + release, err := payload.LoadUpdate(manifestDir, "", "") if err != nil { return err } diff --git a/pkg/cvo/cvo.go b/pkg/cvo/cvo.go index 94a466dc8..9d094022d 100644 --- a/pkg/cvo/cvo.go +++ b/pkg/cvo/cvo.go @@ -236,7 +236,7 @@ func (vcb *verifyClientBuilder) HTTPClient() (*http.Client, error) { // controller that loads and applies content to the cluster. It returns an error if the payload appears to // be in error rather than continuing. func (optr *Operator) InitializeFromPayload(restConfig *rest.Config, burstRestConfig *rest.Config) error { - update, err := payload.LoadUpdate(optr.defaultPayloadDir(), optr.releaseImage) + update, err := payload.LoadUpdate(optr.defaultPayloadDir(), optr.releaseImage, optr.exclude) if err != nil { return fmt.Errorf("the local release contents are invalid - no current version can be determined from disk: %v", err) } diff --git a/pkg/cvo/cvo_scenarios_test.go b/pkg/cvo/cvo_scenarios_test.go index 053c23666..ed2def9d6 100644 --- a/pkg/cvo/cvo_scenarios_test.go +++ b/pkg/cvo/cvo_scenarios_test.go @@ -76,6 +76,7 @@ func setupCVOTest(payloadDir string) (*Operator, map[string]runtime.Object, *fak queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "cvo-loop-test"), client: client, cvLister: &clientCVLister{client: client}, + exclude: "exclude-test", } dynamicScheme := runtime.NewScheme() @@ -90,7 +91,7 @@ func setupCVOTest(payloadDir string) (*Operator, map[string]runtime.Object, *fak wait.Backoff{ Steps: 1, }, - "", + "exclude-test", ) o.configSync = worker diff --git a/pkg/cvo/sync_worker.go b/pkg/cvo/sync_worker.go index 85d5d871e..86e6612b1 100644 --- a/pkg/cvo/sync_worker.go +++ b/pkg/cvo/sync_worker.go @@ -495,7 +495,7 @@ func (w *SyncWorker) syncOnce(ctx context.Context, work *SyncWork, maxWorkers in return err } - payloadUpdate, err := payload.LoadUpdate(info.Directory, update.Image) + payloadUpdate, err := payload.LoadUpdate(info.Directory, update.Image, w.exclude) if err != nil { reporter.Report(SyncWorkerStatus{ Generation: work.Generation, @@ -618,8 +618,8 @@ func (w *SyncWorker) apply(ctx context.Context, payloadUpdate *payload.Update, w if precreateObjects { payload.RunGraph(ctx, graph, 8, func(ctx context.Context, tasks []*payload.Task) error { for _, task := range tasks { - if contextIsCancelled(ctx) { - return cr.CancelError() + if err := ctx.Err(); err != nil { + return cr.ContextError(err) } if task.Manifest.GVK != configv1.SchemeGroupVersion.WithKind("ClusterOperator") { continue @@ -645,7 +645,7 @@ func (w *SyncWorker) apply(ctx context.Context, payloadUpdate *payload.Update, w klog.V(4).Infof("Running sync for %s", task) klog.V(5).Infof("Manifest: %s", string(task.Manifest.Raw)) - ov, ok := getOverrideForManifest(work.Overrides, w.exclude, task.Manifest) + ov, ok := getOverrideForManifest(work.Overrides, task.Manifest) if ok && ov.Unmanaged { klog.V(4).Infof("Skipping %s as unmanaged", task) continue @@ -940,7 +940,7 @@ func newMultipleError(errs []error) error { } // getOverrideForManifest returns the override and true when override exists for manifest. -func getOverrideForManifest(overrides []configv1.ComponentOverride, excludeIdentifier string, manifest *lib.Manifest) (configv1.ComponentOverride, bool) { +func getOverrideForManifest(overrides []configv1.ComponentOverride, manifest *lib.Manifest) (configv1.ComponentOverride, bool) { for idx, ov := range overrides { kind, namespace, name := manifest.GVK.Kind, manifest.Object().GetNamespace(), manifest.Object().GetName() if ov.Kind == kind && @@ -949,10 +949,6 @@ func getOverrideForManifest(overrides []configv1.ComponentOverride, excludeIdent return overrides[idx], true } } - excludeAnnotation := fmt.Sprintf("exclude.release.openshift.io/%s", excludeIdentifier) - if annotations := manifest.Object().GetAnnotations(); annotations != nil && annotations[excludeAnnotation] == "true" { - return configv1.ComponentOverride{Unmanaged: true}, true - } return configv1.ComponentOverride{}, false } diff --git a/pkg/cvo/testdata/payloadtest/release-manifests/0000_20_a_exclude.yml b/pkg/cvo/testdata/payloadtest/release-manifests/0000_20_a_exclude.yml new file mode 100644 index 000000000..22ba8b5f9 --- /dev/null +++ b/pkg/cvo/testdata/payloadtest/release-manifests/0000_20_a_exclude.yml @@ -0,0 +1,6 @@ +kind: Test +apiVersion: v1 +metadata: + name: file-20-yml + annotations: + exclude.release.openshift.io/exclude-test: "true" diff --git a/pkg/payload/payload.go b/pkg/payload/payload.go index b8e170c11..b5a3eec31 100644 --- a/pkg/payload/payload.go +++ b/pkg/payload/payload.go @@ -107,7 +107,7 @@ type Update struct { Manifests []lib.Manifest } -func LoadUpdate(dir, releaseImage string) (*Update, error) { +func LoadUpdate(dir, releaseImage, excludeIdentifier string) (*Update, error) { payload, tasks, err := loadUpdatePayloadMetadata(dir, releaseImage) if err != nil { return nil, err @@ -154,6 +154,15 @@ func LoadUpdate(dir, releaseImage string) (*Update, error) { errs = append(errs, errors.Wrapf(err, "error parsing %s", file.Name())) continue } + // Filter out manifests that should be excluded based on annotation + filteredMs := []lib.Manifest{} + for _, manifest := range ms { + if shouldExclude(excludeIdentifier, &manifest) { + continue + } + filteredMs = append(filteredMs, manifest) + } + ms = filteredMs for i := range ms { ms[i].OriginalFilename = filepath.Base(file.Name()) } @@ -179,6 +188,12 @@ func LoadUpdate(dir, releaseImage string) (*Update, error) { return payload, nil } +func shouldExclude(excludeIdentifier string, manifest *lib.Manifest) bool { + excludeAnnotation := fmt.Sprintf("exclude.release.openshift.io/%s", excludeIdentifier) + annotations := manifest.Object().GetAnnotations() + return annotations != nil && annotations[excludeAnnotation] == "true" +} + // ValidateDirectory checks if a directory can be a candidate update by // looking for known files. It returns an error if the directory cannot // be an update. diff --git a/pkg/payload/payload_test.go b/pkg/payload/payload_test.go index 376d3f158..40d64f49a 100644 --- a/pkg/payload/payload_test.go +++ b/pkg/payload/payload_test.go @@ -104,7 +104,7 @@ func Test_loadUpdatePayload(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := LoadUpdate(tt.args.dir, tt.args.releaseImage) + got, err := LoadUpdate(tt.args.dir, tt.args.releaseImage, "exclude-test") if (err != nil) != tt.wantErr { t.Errorf("loadUpdatePayload() error = %v, wantErr %v", err, tt.wantErr) return diff --git a/pkg/payload/task_graph_test.go b/pkg/payload/task_graph_test.go index 3ae17549b..e0230b4ec 100644 --- a/pkg/payload/task_graph_test.go +++ b/pkg/payload/task_graph_test.go @@ -487,7 +487,7 @@ func Test_TaskGraph_real(t *testing.T) { if len(path) == 0 { t.Skip("TEST_GRAPH_PATH unset") } - p, err := LoadUpdate(path, "arbitrary/image:1") + p, err := LoadUpdate(path, "arbitrary/image:1", "") if err != nil { t.Fatal(err) }