diff --git a/lib/resourcebuilder/interface.go b/lib/resourcebuilder/interface.go index fb4598ee6c..7566ae0291 100644 --- a/lib/resourcebuilder/interface.go +++ b/lib/resourcebuilder/interface.go @@ -71,6 +71,7 @@ const ( UpdatingMode Mode = iota ReconcilingMode InitializingMode + PrecreatingMode ) type Interface interface { diff --git a/pkg/cvo/cvo.go b/pkg/cvo/cvo.go index a2ab841700..94a466dc8c 100644 --- a/pkg/cvo/cvo.go +++ b/pkg/cvo/cvo.go @@ -663,7 +663,11 @@ func (b *resourceBuilder) builderFor(m *lib.Manifest, state payload.State) (reso } if b.clusterOperators != nil && m.GVK == configv1.SchemeGroupVersion.WithKind("ClusterOperator") { - return cvointernal.NewClusterOperatorBuilder(b.clusterOperators, *m), nil + client, err := clientset.NewForConfig(config) + if err != nil { + return nil, err + } + return cvointernal.NewClusterOperatorBuilder(b.clusterOperators, client.ConfigV1().ClusterOperators(), *m), nil } if resourcebuilder.Mapper.Exists(m.GVK) { return resourcebuilder.New(resourcebuilder.Mapper, config, *m) @@ -694,6 +698,8 @@ func stateToMode(state payload.State) resourcebuilder.Mode { return resourcebuilder.UpdatingMode case payload.ReconcilingPayload: return resourcebuilder.ReconcilingMode + case payload.PrecreatingPayload: + return resourcebuilder.PrecreatingMode default: panic(fmt.Sprintf("unexpected payload state %d", int(state))) } diff --git a/pkg/cvo/internal/operatorstatus.go b/pkg/cvo/internal/operatorstatus.go index cda58cd74b..5ae3d65b1b 100644 --- a/pkg/cvo/internal/operatorstatus.go +++ b/pkg/cvo/internal/operatorstatus.go @@ -8,6 +8,7 @@ import ( "time" "unicode" + kerrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/serializer" @@ -48,16 +49,16 @@ func readClusterOperatorV1OrDie(objBytes []byte) *configv1.ClusterOperator { } type clusterOperatorBuilder struct { - client ClusterOperatorsGetter - raw []byte - modifier resourcebuilder.MetaV1ObjectModifierFunc - mode resourcebuilder.Mode + client ClusterOperatorsGetter + createClient configclientv1.ClusterOperatorInterface + raw []byte + modifier resourcebuilder.MetaV1ObjectModifierFunc + mode resourcebuilder.Mode } func newClusterOperatorBuilder(config *rest.Config, m lib.Manifest) resourcebuilder.Interface { - return NewClusterOperatorBuilder(clientClusterOperatorsGetter{ - getter: configclientv1.NewForConfigOrDie(config).ClusterOperators(), - }, m) + client := configclientv1.NewForConfigOrDie(config).ClusterOperators() + return NewClusterOperatorBuilder(clientClusterOperatorsGetter{getter: client}, client, m) } // ClusterOperatorsGetter abstracts object access with a client or a cache lister. @@ -75,10 +76,11 @@ func (g clientClusterOperatorsGetter) Get(name string) (*configv1.ClusterOperato // NewClusterOperatorBuilder accepts the ClusterOperatorsGetter interface which may be implemented by a // client or a lister cache. -func NewClusterOperatorBuilder(client ClusterOperatorsGetter, m lib.Manifest) resourcebuilder.Interface { +func NewClusterOperatorBuilder(client ClusterOperatorsGetter, createClient configclientv1.ClusterOperatorInterface, m lib.Manifest) resourcebuilder.Interface { return &clusterOperatorBuilder{ - client: client, - raw: m.Raw, + client: client, + createClient: createClient, + raw: m.Raw, } } @@ -97,6 +99,26 @@ func (b *clusterOperatorBuilder) Do(ctx context.Context) error { if b.modifier != nil { b.modifier(os) } + + // create the object, and if we successfully created, update the status + if b.mode == resourcebuilder.PrecreatingMode { + clusterOperator, err := b.createClient.Create(os) + if err != nil { + if kerrors.IsAlreadyExists(err) { + return nil + } + return err + } + clusterOperator.Status.RelatedObjects = os.Status.DeepCopy().RelatedObjects + if _, err := b.createClient.UpdateStatus(clusterOperator); err != nil { + if kerrors.IsConflict(err) { + return nil + } + return err + } + return nil + } + return waitForOperatorStatusToBeDone(ctx, 1*time.Second, b.client, os, b.mode) } diff --git a/pkg/cvo/sync_worker.go b/pkg/cvo/sync_worker.go index cd92d3c23b..9b1cb491b9 100644 --- a/pkg/cvo/sync_worker.go +++ b/pkg/cvo/sync_worker.go @@ -585,12 +585,14 @@ func (w *SyncWorker) apply(ctx context.Context, payloadUpdate *payload.Update, w } graph := payload.NewTaskGraph(tasks) graph.Split(payload.SplitOnJobs) + var precreateObjects bool switch work.State { case payload.InitializingPayload: // Create every component in parallel to maximize reaching steady // state. graph.Parallelize(payload.FlattenByNumberAndComponent) maxWorkers = len(graph.Nodes) + precreateObjects = true case payload.ReconcilingPayload: // Run the graph in random order during reconcile so that we don't // hang on any particular component - we seed from the number of @@ -608,6 +610,28 @@ func (w *SyncWorker) apply(ctx context.Context, payloadUpdate *payload.Update, w // perform an orderly roll out by payload order, using some parallelization // but avoiding out of order creation so components have some base graph.Parallelize(payload.ByNumberAndComponent) + precreateObjects = true + } + + // in specific modes, attempt to precreate a set of known types (currently ClusterOperator) without + // retries + if precreateObjects { + payload.RunGraph(ctx, graph, 8, func(ctx context.Context, tasks []*payload.Task) error { + for _, task := range tasks { + if contextIsCancelled(ctx) { + return cr.CancelError() + } + if task.Manifest.GVK != configv1.SchemeGroupVersion.WithKind("ClusterOperator") { + continue + } + if err := w.builder.Apply(ctx, task.Manifest, payload.PrecreatingPayload); err != nil { + klog.V(2).Infof("Unable to precreate resource %s: %v", task, err) + continue + } + klog.V(4).Infof("Precreated resource %s", task) + } + return nil + }) } // update each object diff --git a/pkg/payload/payload.go b/pkg/payload/payload.go index c2c880c8de..b8e170c111 100644 --- a/pkg/payload/payload.go +++ b/pkg/payload/payload.go @@ -56,6 +56,11 @@ const ( // Our goal is to get the entire payload created, even if some // operators are still converging. InitializingPayload + // PrecreatingPayload indicates we are selectively creating + // specific resources during a first pass of the payload to + // provide better visibility during install and upgrade of + // error conditions. + PrecreatingPayload ) // Initializing is true if the state is InitializingPayload.