Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion hack/cluster-version-util/task_graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func newTaskGraphCmd() *cobra.Command {

func runTaskGraphCmd(cmd *cobra.Command, args []string) error {
manifestDir := args[0]
release, err := payload.LoadUpdate(manifestDir, "")
release, err := payload.LoadUpdate(manifestDir, "", "")
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions lib/resourcebuilder/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ const (
UpdatingMode Mode = iota
ReconcilingMode
InitializingMode
PrecreatingMode
)

type Interface interface {
Expand Down
10 changes: 8 additions & 2 deletions pkg/cvo/cvo.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func (vcb *verifyClientBuilder) HTTPClient() (*http.Client, error) {
// controller that loads and applies content to the cluster. It returns an error if the payload appears to
// be in error rather than continuing.
func (optr *Operator) InitializeFromPayload(restConfig *rest.Config, burstRestConfig *rest.Config) error {
update, err := payload.LoadUpdate(optr.defaultPayloadDir(), optr.releaseImage)
update, err := payload.LoadUpdate(optr.defaultPayloadDir(), optr.releaseImage, optr.exclude)
if err != nil {
return fmt.Errorf("the local release contents are invalid - no current version can be determined from disk: %v", err)
}
Expand Down Expand Up @@ -663,7 +663,11 @@ func (b *resourceBuilder) builderFor(m *lib.Manifest, state payload.State) (reso
}

if b.clusterOperators != nil && m.GVK == configv1.SchemeGroupVersion.WithKind("ClusterOperator") {
return cvointernal.NewClusterOperatorBuilder(b.clusterOperators, *m), nil
client, err := clientset.NewForConfig(config)
if err != nil {
return nil, err
}
return cvointernal.NewClusterOperatorBuilder(b.clusterOperators, client.ConfigV1().ClusterOperators(), *m), nil
}
if resourcebuilder.Mapper.Exists(m.GVK) {
return resourcebuilder.New(resourcebuilder.Mapper, config, *m)
Expand Down Expand Up @@ -694,6 +698,8 @@ func stateToMode(state payload.State) resourcebuilder.Mode {
return resourcebuilder.UpdatingMode
case payload.ReconcilingPayload:
return resourcebuilder.ReconcilingMode
case payload.PrecreatingPayload:
return resourcebuilder.PrecreatingMode
default:
panic(fmt.Sprintf("unexpected payload state %d", int(state)))
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/cvo/cvo_scenarios_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func setupCVOTest(payloadDir string) (*Operator, map[string]runtime.Object, *fak
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "cvo-loop-test"),
client: client,
cvLister: &clientCVLister{client: client},
exclude: "exclude-test",
}

dynamicScheme := runtime.NewScheme()
Expand All @@ -90,7 +91,7 @@ func setupCVOTest(payloadDir string) (*Operator, map[string]runtime.Object, *fak
wait.Backoff{
Steps: 1,
},
"",
"exclude-test",
)
o.configSync = worker

Expand Down
42 changes: 32 additions & 10 deletions pkg/cvo/internal/operatorstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"
"unicode"

kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
Expand Down Expand Up @@ -48,16 +49,16 @@ func readClusterOperatorV1OrDie(objBytes []byte) *configv1.ClusterOperator {
}

type clusterOperatorBuilder struct {
client ClusterOperatorsGetter
raw []byte
modifier resourcebuilder.MetaV1ObjectModifierFunc
mode resourcebuilder.Mode
client ClusterOperatorsGetter
createClient configclientv1.ClusterOperatorInterface
raw []byte
modifier resourcebuilder.MetaV1ObjectModifierFunc
mode resourcebuilder.Mode
}

func newClusterOperatorBuilder(config *rest.Config, m lib.Manifest) resourcebuilder.Interface {
return NewClusterOperatorBuilder(clientClusterOperatorsGetter{
getter: configclientv1.NewForConfigOrDie(config).ClusterOperators(),
}, m)
client := configclientv1.NewForConfigOrDie(config).ClusterOperators()
return NewClusterOperatorBuilder(clientClusterOperatorsGetter{getter: client}, client, m)
}

// ClusterOperatorsGetter abstracts object access with a client or a cache lister.
Expand All @@ -75,10 +76,11 @@ func (g clientClusterOperatorsGetter) Get(name string) (*configv1.ClusterOperato

// NewClusterOperatorBuilder accepts the ClusterOperatorsGetter interface which may be implemented by a
// client or a lister cache.
func NewClusterOperatorBuilder(client ClusterOperatorsGetter, m lib.Manifest) resourcebuilder.Interface {
func NewClusterOperatorBuilder(client ClusterOperatorsGetter, createClient configclientv1.ClusterOperatorInterface, m lib.Manifest) resourcebuilder.Interface {
return &clusterOperatorBuilder{
client: client,
raw: m.Raw,
client: client,
createClient: createClient,
raw: m.Raw,
}
}

Expand All @@ -97,6 +99,26 @@ func (b *clusterOperatorBuilder) Do(ctx context.Context) error {
if b.modifier != nil {
b.modifier(os)
}

// create the object, and if we successfully created, update the status
if b.mode == resourcebuilder.PrecreatingMode {
clusterOperator, err := b.createClient.Create(os)
if err != nil {
if kerrors.IsAlreadyExists(err) {
return nil
}
return err
}
clusterOperator.Status.RelatedObjects = os.Status.DeepCopy().RelatedObjects
if _, err := b.createClient.UpdateStatus(clusterOperator); err != nil {
if kerrors.IsConflict(err) {
return nil
}
return err
}
return nil
}

return waitForOperatorStatusToBeDone(ctx, 1*time.Second, b.client, os, b.mode)
}

Expand Down
34 changes: 27 additions & 7 deletions pkg/cvo/sync_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ func (w *SyncWorker) syncOnce(ctx context.Context, work *SyncWork, maxWorkers in
return err
}

payloadUpdate, err := payload.LoadUpdate(info.Directory, update.Image)
payloadUpdate, err := payload.LoadUpdate(info.Directory, update.Image, w.exclude)
if err != nil {
reporter.Report(SyncWorkerStatus{
Generation: work.Generation,
Expand Down Expand Up @@ -585,12 +585,14 @@ func (w *SyncWorker) apply(ctx context.Context, payloadUpdate *payload.Update, w
}
graph := payload.NewTaskGraph(tasks)
graph.Split(payload.SplitOnJobs)
var precreateObjects bool
switch work.State {
case payload.InitializingPayload:
// Create every component in parallel to maximize reaching steady
// state.
graph.Parallelize(payload.FlattenByNumberAndComponent)
maxWorkers = len(graph.Nodes)
precreateObjects = true
case payload.ReconcilingPayload:
// Run the graph in random order during reconcile so that we don't
// hang on any particular component - we seed from the number of
Expand All @@ -608,6 +610,28 @@ func (w *SyncWorker) apply(ctx context.Context, payloadUpdate *payload.Update, w
// perform an orderly roll out by payload order, using some parallelization
// but avoiding out of order creation so components have some base
graph.Parallelize(payload.ByNumberAndComponent)
precreateObjects = true
}

// in specific modes, attempt to precreate a set of known types (currently ClusterOperator) without
// retries
if precreateObjects {
payload.RunGraph(ctx, graph, 8, func(ctx context.Context, tasks []*payload.Task) error {
for _, task := range tasks {
if err := ctx.Err(); err != nil {
return cr.ContextError(err)
}
if task.Manifest.GVK != configv1.SchemeGroupVersion.WithKind("ClusterOperator") {
continue
}
if err := w.builder.Apply(ctx, task.Manifest, payload.PrecreatingPayload); err != nil {
klog.V(2).Infof("Unable to precreate resource %s: %v", task, err)
continue
}
klog.V(4).Infof("Precreated resource %s", task)
}
return nil
})
}

// update each object
Expand All @@ -621,7 +645,7 @@ func (w *SyncWorker) apply(ctx context.Context, payloadUpdate *payload.Update, w
klog.V(4).Infof("Running sync for %s", task)
klog.V(5).Infof("Manifest: %s", string(task.Manifest.Raw))

ov, ok := getOverrideForManifest(work.Overrides, w.exclude, task.Manifest)
ov, ok := getOverrideForManifest(work.Overrides, task.Manifest)
if ok && ov.Unmanaged {
klog.V(4).Infof("Skipping %s as unmanaged", task)
continue
Expand Down Expand Up @@ -916,7 +940,7 @@ func newMultipleError(errs []error) error {
}

// getOverrideForManifest returns the override and true when override exists for manifest.
func getOverrideForManifest(overrides []configv1.ComponentOverride, excludeIdentifier string, manifest *lib.Manifest) (configv1.ComponentOverride, bool) {
func getOverrideForManifest(overrides []configv1.ComponentOverride, manifest *lib.Manifest) (configv1.ComponentOverride, bool) {
for idx, ov := range overrides {
kind, namespace, name := manifest.GVK.Kind, manifest.Object().GetNamespace(), manifest.Object().GetName()
if ov.Kind == kind &&
Expand All @@ -925,10 +949,6 @@ func getOverrideForManifest(overrides []configv1.ComponentOverride, excludeIdent
return overrides[idx], true
}
}
excludeAnnotation := fmt.Sprintf("exclude.release.openshift.io/%s", excludeIdentifier)
if annotations := manifest.Object().GetAnnotations(); annotations != nil && annotations[excludeAnnotation] == "true" {
return configv1.ComponentOverride{Unmanaged: true}, true
}
return configv1.ComponentOverride{}, false
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Test
apiVersion: v1
metadata:
name: file-20-yml
annotations:
exclude.release.openshift.io/exclude-test: "true"
22 changes: 21 additions & 1 deletion pkg/payload/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ const (
// Our goal is to get the entire payload created, even if some
// operators are still converging.
InitializingPayload
// PrecreatingPayload indicates we are selectively creating
// specific resources during a first pass of the payload to
// provide better visibility during install and upgrade of
// error conditions.
PrecreatingPayload
)

// Initializing is true if the state is InitializingPayload.
Expand Down Expand Up @@ -102,7 +107,7 @@ type Update struct {
Manifests []lib.Manifest
}

func LoadUpdate(dir, releaseImage string) (*Update, error) {
func LoadUpdate(dir, releaseImage, excludeIdentifier string) (*Update, error) {
payload, tasks, err := loadUpdatePayloadMetadata(dir, releaseImage)
if err != nil {
return nil, err
Expand Down Expand Up @@ -149,6 +154,15 @@ func LoadUpdate(dir, releaseImage string) (*Update, error) {
errs = append(errs, errors.Wrapf(err, "error parsing %s", file.Name()))
continue
}
// Filter out manifests that should be excluded based on annotation
filteredMs := []lib.Manifest{}
for _, manifest := range ms {
if shouldExclude(excludeIdentifier, &manifest) {
continue
}
filteredMs = append(filteredMs, manifest)
}
ms = filteredMs
for i := range ms {
ms[i].OriginalFilename = filepath.Base(file.Name())
}
Expand All @@ -174,6 +188,12 @@ func LoadUpdate(dir, releaseImage string) (*Update, error) {
return payload, nil
}

func shouldExclude(excludeIdentifier string, manifest *lib.Manifest) bool {
excludeAnnotation := fmt.Sprintf("exclude.release.openshift.io/%s", excludeIdentifier)
annotations := manifest.Object().GetAnnotations()
return annotations != nil && annotations[excludeAnnotation] == "true"
}

// ValidateDirectory checks if a directory can be a candidate update by
// looking for known files. It returns an error if the directory cannot
// be an update.
Expand Down
2 changes: 1 addition & 1 deletion pkg/payload/payload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func Test_loadUpdatePayload(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := LoadUpdate(tt.args.dir, tt.args.releaseImage)
got, err := LoadUpdate(tt.args.dir, tt.args.releaseImage, "exclude-test")
if (err != nil) != tt.wantErr {
t.Errorf("loadUpdatePayload() error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down
2 changes: 1 addition & 1 deletion pkg/payload/task_graph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ func Test_TaskGraph_real(t *testing.T) {
if len(path) == 0 {
t.Skip("TEST_GRAPH_PATH unset")
}
p, err := LoadUpdate(path, "arbitrary/image:1")
p, err := LoadUpdate(path, "arbitrary/image:1", "")
if err != nil {
t.Fatal(err)
}
Expand Down