Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/resourcebuilder/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ const (
UpdatingMode Mode = iota
ReconcilingMode
InitializingMode
PrecreatingMode
)

type Interface interface {
Expand Down
8 changes: 7 additions & 1 deletion pkg/cvo/cvo.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,11 @@ func (b *resourceBuilder) builderFor(m *lib.Manifest, state payload.State) (reso
}

if b.clusterOperators != nil && m.GVK == configv1.SchemeGroupVersion.WithKind("ClusterOperator") {
return cvointernal.NewClusterOperatorBuilder(b.clusterOperators, *m), nil
client, err := clientset.NewForConfig(config)
if err != nil {
return nil, err
}
return cvointernal.NewClusterOperatorBuilder(b.clusterOperators, client.ConfigV1().ClusterOperators(), *m), nil
}
if resourcebuilder.Mapper.Exists(m.GVK) {
return resourcebuilder.New(resourcebuilder.Mapper, config, *m)
Expand Down Expand Up @@ -694,6 +698,8 @@ func stateToMode(state payload.State) resourcebuilder.Mode {
return resourcebuilder.UpdatingMode
case payload.ReconcilingPayload:
return resourcebuilder.ReconcilingMode
case payload.PrecreatingPayload:
return resourcebuilder.PrecreatingMode
default:
panic(fmt.Sprintf("unexpected payload state %d", int(state)))
}
Expand Down
42 changes: 32 additions & 10 deletions pkg/cvo/internal/operatorstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"
"unicode"

kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
Expand Down Expand Up @@ -48,16 +49,16 @@ func readClusterOperatorV1OrDie(objBytes []byte) *configv1.ClusterOperator {
}

type clusterOperatorBuilder struct {
client ClusterOperatorsGetter
raw []byte
modifier resourcebuilder.MetaV1ObjectModifierFunc
mode resourcebuilder.Mode
client ClusterOperatorsGetter
createClient configclientv1.ClusterOperatorInterface
raw []byte
modifier resourcebuilder.MetaV1ObjectModifierFunc
mode resourcebuilder.Mode
}

func newClusterOperatorBuilder(config *rest.Config, m lib.Manifest) resourcebuilder.Interface {
return NewClusterOperatorBuilder(clientClusterOperatorsGetter{
getter: configclientv1.NewForConfigOrDie(config).ClusterOperators(),
}, m)
client := configclientv1.NewForConfigOrDie(config).ClusterOperators()
return NewClusterOperatorBuilder(clientClusterOperatorsGetter{getter: client}, client, m)
}

// ClusterOperatorsGetter abstracts object access with a client or a cache lister.
Expand All @@ -75,10 +76,11 @@ func (g clientClusterOperatorsGetter) Get(name string) (*configv1.ClusterOperato

// NewClusterOperatorBuilder accepts the ClusterOperatorsGetter interface which may be implemented by a
// client or a lister cache.
func NewClusterOperatorBuilder(client ClusterOperatorsGetter, m lib.Manifest) resourcebuilder.Interface {
func NewClusterOperatorBuilder(client ClusterOperatorsGetter, createClient configclientv1.ClusterOperatorInterface, m lib.Manifest) resourcebuilder.Interface {
return &clusterOperatorBuilder{
client: client,
raw: m.Raw,
client: client,
createClient: createClient,
raw: m.Raw,
}
}

Expand All @@ -97,6 +99,26 @@ func (b *clusterOperatorBuilder) Do(ctx context.Context) error {
if b.modifier != nil {
b.modifier(os)
}

// create the object, and if we successfully created, update the status
if b.mode == resourcebuilder.PrecreatingMode {
clusterOperator, err := b.createClient.Create(os)
if err != nil {
if kerrors.IsAlreadyExists(err) {
return nil
}
return err
}
clusterOperator.Status.RelatedObjects = os.Status.DeepCopy().RelatedObjects
if _, err := b.createClient.UpdateStatus(clusterOperator); err != nil {
if kerrors.IsConflict(err) {
return nil
}
return err
}
return nil
}

return waitForOperatorStatusToBeDone(ctx, 1*time.Second, b.client, os, b.mode)
}

Expand Down
24 changes: 24 additions & 0 deletions pkg/cvo/sync_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,12 +585,14 @@ func (w *SyncWorker) apply(ctx context.Context, payloadUpdate *payload.Update, w
}
graph := payload.NewTaskGraph(tasks)
graph.Split(payload.SplitOnJobs)
var precreateObjects bool
switch work.State {
case payload.InitializingPayload:
// Create every component in parallel to maximize reaching steady
// state.
graph.Parallelize(payload.FlattenByNumberAndComponent)
maxWorkers = len(graph.Nodes)
precreateObjects = true
case payload.ReconcilingPayload:
// Run the graph in random order during reconcile so that we don't
// hang on any particular component - we seed from the number of
Expand All @@ -608,6 +610,28 @@ func (w *SyncWorker) apply(ctx context.Context, payloadUpdate *payload.Update, w
// perform an orderly roll out by payload order, using some parallelization
// but avoiding out of order creation so components have some base
graph.Parallelize(payload.ByNumberAndComponent)
precreateObjects = true
}

// in specific modes, attempt to precreate a set of known types (currently ClusterOperator) without
// retries
if precreateObjects {
payload.RunGraph(ctx, graph, 8, func(ctx context.Context, tasks []*payload.Task) error {
for _, task := range tasks {
if contextIsCancelled(ctx) {
return cr.CancelError()
}
if task.Manifest.GVK != configv1.SchemeGroupVersion.WithKind("ClusterOperator") {
continue
}
if err := w.builder.Apply(ctx, task.Manifest, payload.PrecreatingPayload); err != nil {
klog.V(2).Infof("Unable to precreate resource %s: %v", task, err)
continue
}
klog.V(4).Infof("Precreated resource %s", task)
}
return nil
})
}

// update each object
Expand Down
5 changes: 5 additions & 0 deletions pkg/payload/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ const (
// Our goal is to get the entire payload created, even if some
// operators are still converging.
InitializingPayload
// PrecreatingPayload indicates we are selectively creating
// specific resources during a first pass of the payload to
// provide better visibility during install and upgrade of
// error conditions.
PrecreatingPayload
)

// Initializing is true if the state is InitializingPayload.
Expand Down