diff --git a/lib/resourcebuilder/apiext.go b/lib/resourcebuilder/apiext.go index e345ea276..8ab83d268 100644 --- a/lib/resourcebuilder/apiext.go +++ b/lib/resourcebuilder/apiext.go @@ -30,6 +30,10 @@ func newCRDBuilder(config *rest.Config, m lib.Manifest) Interface { } } +func (b *crdBuilder) WithMode(m Mode) Interface { + return b +} + func (b *crdBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface { b.modifier = f return b diff --git a/lib/resourcebuilder/apireg.go b/lib/resourcebuilder/apireg.go index da428fd22..cda4d3cee 100644 --- a/lib/resourcebuilder/apireg.go +++ b/lib/resourcebuilder/apireg.go @@ -23,6 +23,10 @@ func newAPIServiceBuilder(config *rest.Config, m lib.Manifest) Interface { } } +func (b *apiServiceBuilder) WithMode(m Mode) Interface { + return b +} + func (b *apiServiceBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface { b.modifier = f return b diff --git a/lib/resourcebuilder/apps.go b/lib/resourcebuilder/apps.go index f086f6479..79afb9ffb 100644 --- a/lib/resourcebuilder/apps.go +++ b/lib/resourcebuilder/apps.go @@ -31,6 +31,10 @@ func newDeploymentBuilder(config *rest.Config, m lib.Manifest) Interface { } } +func (b *deploymentBuilder) WithMode(m Mode) Interface { + return b +} + func (b *deploymentBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface { b.modifier = f return b @@ -91,6 +95,10 @@ func newDaemonsetBuilder(config *rest.Config, m lib.Manifest) Interface { } } +func (b *daemonsetBuilder) WithMode(m Mode) Interface { + return b +} + func (b *daemonsetBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface { b.modifier = f return b diff --git a/lib/resourcebuilder/batch.go b/lib/resourcebuilder/batch.go index 15f922e1f..843215caa 100644 --- a/lib/resourcebuilder/batch.go +++ b/lib/resourcebuilder/batch.go @@ -30,6 +30,10 @@ func newJobBuilder(config *rest.Config, m lib.Manifest) Interface { } } +func (b *jobBuilder) WithMode(m Mode) Interface { + return b +} + func (b *jobBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface { b.modifier = f return b diff --git a/lib/resourcebuilder/core.go b/lib/resourcebuilder/core.go index f121a66e5..f8f03f0db 100644 --- a/lib/resourcebuilder/core.go +++ b/lib/resourcebuilder/core.go @@ -23,6 +23,10 @@ func newServiceAccountBuilder(config *rest.Config, m lib.Manifest) Interface { } } +func (b *serviceAccountBuilder) WithMode(m Mode) Interface { + return b +} + func (b *serviceAccountBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface { b.modifier = f return b @@ -50,6 +54,10 @@ func newConfigMapBuilder(config *rest.Config, m lib.Manifest) Interface { } } +func (b *configMapBuilder) WithMode(m Mode) Interface { + return b +} + func (b *configMapBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface { b.modifier = f return b @@ -77,6 +85,10 @@ func newNamespaceBuilder(config *rest.Config, m lib.Manifest) Interface { } } +func (b *namespaceBuilder) WithMode(m Mode) Interface { + return b +} + func (b *namespaceBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface { b.modifier = f return b @@ -104,6 +116,10 @@ func newServiceBuilder(config *rest.Config, m lib.Manifest) Interface { } } +func (b *serviceBuilder) WithMode(m Mode) Interface { + return b +} + func (b *serviceBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface { b.modifier = f return b diff --git a/lib/resourcebuilder/interface.go b/lib/resourcebuilder/interface.go index c3665f43d..a22c8a142 100644 --- a/lib/resourcebuilder/interface.go +++ b/lib/resourcebuilder/interface.go @@ -63,8 +63,18 @@ type MetaV1ObjectModifierFunc func(metav1.Object) // and the Manifest. type NewInteraceFunc func(rest *rest.Config, m lib.Manifest) Interface +// Mode is how this builder is being used. +type Mode int + +const ( + UpdatingMode Mode = iota + ReconcilingMode + InitializingMode +) + type Interface interface { WithModifier(MetaV1ObjectModifierFunc) Interface + WithMode(Mode) Interface Do(context.Context) error } diff --git a/lib/resourcebuilder/rbac.go b/lib/resourcebuilder/rbac.go index 098726be7..b3a0d57a4 100644 --- a/lib/resourcebuilder/rbac.go +++ b/lib/resourcebuilder/rbac.go @@ -23,6 +23,10 @@ func newClusterRoleBuilder(config *rest.Config, m lib.Manifest) Interface { } } +func (b *clusterRoleBuilder) WithMode(m Mode) Interface { + return b +} + func (b *clusterRoleBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface { b.modifier = f return b @@ -50,6 +54,10 @@ func newClusterRoleBindingBuilder(config *rest.Config, m lib.Manifest) Interface } } +func (b *clusterRoleBindingBuilder) WithMode(m Mode) Interface { + return b +} + func (b *clusterRoleBindingBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface { b.modifier = f return b @@ -77,6 +85,10 @@ func newRoleBuilder(config *rest.Config, m lib.Manifest) Interface { } } +func (b *roleBuilder) WithMode(m Mode) Interface { + return b +} + func (b *roleBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface { b.modifier = f return b @@ -104,6 +116,10 @@ func newRoleBindingBuilder(config *rest.Config, m lib.Manifest) Interface { } } +func (b *roleBindingBuilder) WithMode(m Mode) Interface { + return b +} + func (b *roleBindingBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface { b.modifier = f return b diff --git a/lib/resourcebuilder/security.go b/lib/resourcebuilder/security.go index f89e2dde2..91e2a11b1 100644 --- a/lib/resourcebuilder/security.go +++ b/lib/resourcebuilder/security.go @@ -23,6 +23,10 @@ func newSecurityBuilder(config *rest.Config, m lib.Manifest) Interface { } } +func (b *securityBuilder) WithMode(m Mode) Interface { + return b +} + func (b *securityBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface { b.modifier = f return b diff --git a/pkg/cvo/cvo.go b/pkg/cvo/cvo.go index 42e58a2b3..56e60e9a6 100644 --- a/pkg/cvo/cvo.go +++ b/pkg/cvo/cvo.go @@ -31,7 +31,6 @@ import ( "github.com/openshift/cluster-version-operator/lib" "github.com/openshift/cluster-version-operator/lib/resourceapply" "github.com/openshift/cluster-version-operator/lib/resourcebuilder" - "github.com/openshift/cluster-version-operator/lib/resourcemerge" "github.com/openshift/cluster-version-operator/lib/validation" "github.com/openshift/cluster-version-operator/pkg/cvo/internal" "github.com/openshift/cluster-version-operator/pkg/cvo/internal/dynamicclient" @@ -157,13 +156,13 @@ func New( kubeClient: kubeClient, eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: namespace}), - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "clusterversion"), + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "clusterversion"), availableUpdatesQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "availableupdates"), } optr.configSync = NewSyncWorker( optr.defaultPayloadRetriever(), - NewResourceBuilder(optr.restConfig), + NewResourceBuilder(optr.restConfig, coInformer.Lister()), minimumInterval, wait.Backoff{ Duration: time.Second * 10, @@ -219,7 +218,7 @@ func (optr *Operator) Run(workers int, stopCh <-chan struct{}) { // start the config sync loop, and have it notify the queue when new status is detected go runThrottledStatusNotifier(stopCh, optr.statusInterval, 2, optr.configSync.StatusCh(), func() { optr.queue.Add(optr.queueKey()) }) - go optr.configSync.Start(8, stopCh) + go optr.configSync.Start(16, stopCh) go wait.Until(func() { optr.worker(optr.queue, optr.sync) }, time.Second, stopCh) go wait.Until(func() { optr.worker(optr.availableUpdatesQueue, optr.availableUpdatesSync) }, time.Second, stopCh) @@ -334,10 +333,19 @@ func (optr *Operator) sync(key string) error { }, errs) } + // identify an initial state to inform the sync loop of + var state payload.State + switch { + case hasNeverReachedLevel(config): + state = payload.InitializingPayload + case hasReachedLevel(config, desired): + state = payload.ReconcilingPayload + default: + state = payload.UpdatingPayload + } + // inform the config sync loop about our desired state - reconciling := resourcemerge.IsOperatorStatusConditionTrue(config.Status.Conditions, configv1.OperatorAvailable) && - resourcemerge.IsOperatorStatusConditionFalse(config.Status.Conditions, configv1.OperatorProgressing) - status := optr.configSync.Update(config.Generation, desired, config.Spec.Overrides, reconciling) + status := optr.configSync.Update(config.Generation, desired, config.Spec.Overrides, state) // write cluster version status return optr.syncStatus(original, config, status, errs) @@ -462,14 +470,19 @@ func (optr *Operator) SetSyncWorkerForTesting(worker ConfigSyncWorker) { type resourceBuilder struct { config *rest.Config modifier resourcebuilder.MetaV1ObjectModifierFunc + + clusterOperators internal.ClusterOperatorsGetter } // NewResourceBuilder creates the default resource builder implementation. -func NewResourceBuilder(config *rest.Config) payload.ResourceBuilder { - return &resourceBuilder{config: config} +func NewResourceBuilder(config *rest.Config, clusterOperators configlistersv1.ClusterOperatorLister) payload.ResourceBuilder { + return &resourceBuilder{config: config, clusterOperators: clusterOperators} } func (b *resourceBuilder) BuilderFor(m *lib.Manifest) (resourcebuilder.Interface, error) { + if b.clusterOperators != nil && m.GVK == configv1.SchemeGroupVersion.WithKind("ClusterOperator") { + return internal.NewClusterOperatorBuilder(b.clusterOperators, *m), nil + } if resourcebuilder.Mapper.Exists(m.GVK) { return resourcebuilder.New(resourcebuilder.Mapper, b.config, *m) } @@ -480,7 +493,7 @@ func (b *resourceBuilder) BuilderFor(m *lib.Manifest) (resourcebuilder.Interface return internal.NewGenericBuilder(client, *m) } -func (b *resourceBuilder) Apply(ctx context.Context, m *lib.Manifest) error { +func (b *resourceBuilder) Apply(ctx context.Context, m *lib.Manifest, state payload.State) error { builder, err := b.BuilderFor(m) if err != nil { return err @@ -488,5 +501,38 @@ func (b *resourceBuilder) Apply(ctx context.Context, m *lib.Manifest) error { if b.modifier != nil { builder = builder.WithModifier(b.modifier) } - return builder.Do(ctx) + return builder.WithMode(stateToMode(state)).Do(ctx) +} + +func stateToMode(state payload.State) resourcebuilder.Mode { + switch state { + case payload.InitializingPayload: + return resourcebuilder.InitializingMode + case payload.UpdatingPayload: + return resourcebuilder.UpdatingMode + case payload.ReconcilingPayload: + return resourcebuilder.ReconcilingMode + default: + panic(fmt.Sprintf("unexpected payload state %d", int(state))) + } +} + +func hasNeverReachedLevel(cv *configv1.ClusterVersion) bool { + for _, version := range cv.Status.History { + if version.State == configv1.CompletedUpdate { + return false + } + } + // TODO: check the payload, just in case + return true +} + +func hasReachedLevel(cv *configv1.ClusterVersion, desired configv1.Update) bool { + if len(cv.Status.History) == 0 { + return false + } + if cv.Status.History[0].State != configv1.CompletedUpdate { + return false + } + return desired.Image == cv.Status.History[0].Image } diff --git a/pkg/cvo/cvo_scenarios_test.go b/pkg/cvo/cvo_scenarios_test.go index 8517da386..112b37643 100644 --- a/pkg/cvo/cvo_scenarios_test.go +++ b/pkg/cvo/cvo_scenarios_test.go @@ -206,23 +206,27 @@ func TestCVO_StartupAndSync(t *testing.T) { }) verifyAllStatus(t, worker.StatusCh(), SyncWorkerStatus{ - Step: "RetrievePayload", + Step: "RetrievePayload", + Initial: true, // the desired version is briefly incorrect (user provided) until we retrieve the image Actual: configv1.Update{Version: "4.0.1", Image: "image/image:1"}, }, SyncWorkerStatus{ Step: "ApplyResources", + Initial: true, VersionHash: "6GC9TkkG9PA=", Actual: configv1.Update{Version: "1.0.0-abc", Image: "image/image:1"}, }, SyncWorkerStatus{ Fraction: float32(1) / 3, Step: "ApplyResources", + Initial: true, VersionHash: "6GC9TkkG9PA=", Actual: configv1.Update{Version: "1.0.0-abc", Image: "image/image:1"}, }, SyncWorkerStatus{ Fraction: float32(2) / 3, + Initial: true, Step: "ApplyResources", VersionHash: "6GC9TkkG9PA=", Actual: configv1.Update{Version: "1.0.0-abc", Image: "image/image:1"}, @@ -706,6 +710,6 @@ func (b *blockingResourceBuilder) Send(err error) { b.ch <- err } -func (b *blockingResourceBuilder) Apply(ctx context.Context, m *lib.Manifest) error { +func (b *blockingResourceBuilder) Apply(ctx context.Context, m *lib.Manifest, state payload.State) error { return <-b.ch } diff --git a/pkg/cvo/internal/generic.go b/pkg/cvo/internal/generic.go index efb41c1f8..df2c02d03 100644 --- a/pkg/cvo/internal/generic.go +++ b/pkg/cvo/internal/generic.go @@ -73,6 +73,10 @@ func NewGenericBuilder(client dynamic.ResourceInterface, m lib.Manifest) (resour }, nil } +func (b *genericBuilder) WithMode(m resourcebuilder.Mode) resourcebuilder.Interface { + return b +} + func (b *genericBuilder) WithModifier(f resourcebuilder.MetaV1ObjectModifierFunc) resourcebuilder.Interface { b.modifier = f return b diff --git a/pkg/cvo/internal/operatorstatus.go b/pkg/cvo/internal/operatorstatus.go index a1e0f5047..3a364ee9d 100644 --- a/pkg/cvo/internal/operatorstatus.go +++ b/pkg/cvo/internal/operatorstatus.go @@ -50,18 +50,45 @@ func readClusterOperatorV1OrDie(objBytes []byte) *configv1.ClusterOperator { } type clusterOperatorBuilder struct { - client configclientv1.ConfigV1Interface + client ClusterOperatorsGetter raw []byte modifier resourcebuilder.MetaV1ObjectModifierFunc + mode resourcebuilder.Mode } func newClusterOperatorBuilder(config *rest.Config, m lib.Manifest) resourcebuilder.Interface { + return NewClusterOperatorBuilder(clientClusterOperatorsGetter{ + getter: configclientv1.NewForConfigOrDie(config).ClusterOperators(), + }, m) +} + +// ClusterOperatorsGetter abstracts object access with a client or a cache lister. +type ClusterOperatorsGetter interface { + Get(name string) (*configv1.ClusterOperator, error) +} + +type clientClusterOperatorsGetter struct { + getter configclientv1.ClusterOperatorInterface +} + +func (g clientClusterOperatorsGetter) Get(name string) (*configv1.ClusterOperator, error) { + return g.getter.Get(name, metav1.GetOptions{}) +} + +// NewClusterOperatorBuilder accepts the ClusterOperatorsGetter interface which may be implemented by a +// client or a lister cache. +func NewClusterOperatorBuilder(client ClusterOperatorsGetter, m lib.Manifest) resourcebuilder.Interface { return &clusterOperatorBuilder{ - client: configclientv1.NewForConfigOrDie(config), + client: client, raw: m.Raw, } } +func (b *clusterOperatorBuilder) WithMode(m resourcebuilder.Mode) resourcebuilder.Interface { + b.mode = m + return b +} + func (b *clusterOperatorBuilder) WithModifier(f resourcebuilder.MetaV1ObjectModifierFunc) resourcebuilder.Interface { b.modifier = f return b @@ -74,13 +101,13 @@ func (b *clusterOperatorBuilder) Do(ctx context.Context) error { } ctxWithTimeout, cancel := context.WithTimeout(ctx, 1*time.Minute) defer cancel() - return waitForOperatorStatusToBeDone(ctxWithTimeout, 1*time.Second, b.client, os) + return waitForOperatorStatusToBeDone(ctxWithTimeout, 1*time.Second, b.client, os, b.mode) } -func waitForOperatorStatusToBeDone(ctx context.Context, interval time.Duration, client configclientv1.ClusterOperatorsGetter, expected *configv1.ClusterOperator) error { +func waitForOperatorStatusToBeDone(ctx context.Context, interval time.Duration, client ClusterOperatorsGetter, expected *configv1.ClusterOperator, mode resourcebuilder.Mode) error { var lastErr error err := wait.PollImmediateUntil(interval, func() (bool, error) { - actual, err := client.ClusterOperators().Get(expected.Name, metav1.GetOptions{}) + actual, err := client.Get(expected.Name) if err != nil { lastErr = &payload.UpdateError{ Nested: err, @@ -91,6 +118,24 @@ func waitForOperatorStatusToBeDone(ctx context.Context, interval time.Duration, return false, nil } + switch mode { + case resourcebuilder.ReconcilingMode: + // During reconciliation we want to report unavailability and fail fast. + if c := resourcemerge.FindOperatorStatusCondition(actual.Status.Conditions, configv1.OperatorAvailable); c != nil && c.Status != configv1.ConditionTrue { + message := fmt.Sprintf("Cluster operator %s is unavailable", actual.Name) + if len(c.Message) > 0 { + message = fmt.Sprintf("Cluster operator %s is unavailable: %s", actual.Name, c.Message) + } + return false, &payload.UpdateError{ + Nested: errors.New(lowerFirst(message)), + Reason: "ClusterOperatorNotAvailable", + Message: message, + Name: actual.Name, + } + } + return true, nil + } + // undone is map of operand to tuple of (expected version, actual version) // for incomplete operands. undone := map[string][]string{} @@ -152,11 +197,20 @@ func waitForOperatorStatusToBeDone(ctx context.Context, interval time.Duration, failing = false } } - // if we're at the correct version, and available, and not failing, we are done - // if we're available, not failing, and not progressing, we're also done - // TODO: remove progressing once all cluster operators report expected versions - if available && (!progressing || len(expected.Status.Versions) > 0) && !failing { - return true, nil + + switch mode { + case resourcebuilder.InitializingMode: + // During initialization, if we're available and have reached level (or are not progressing) + // we can continue. We tolerate partial error states as long as the operator says it's available + // since this is the first rollout. + if available && (!progressing || len(expected.Status.Versions) > 0) { + return true, nil + } + default: + // Be pessimistic when upgrading - require that we not be in a degraded error state. + if available && (!progressing || len(expected.Status.Versions) > 0) && !failing { + return true, nil + } } if c := resourcemerge.FindOperatorStatusCondition(actual.Status.Conditions, configv1.OperatorFailing); c != nil && c.Status == configv1.ConditionTrue { diff --git a/pkg/cvo/internal/operatorstatus_test.go b/pkg/cvo/internal/operatorstatus_test.go index 9e1740279..e4a171af8 100644 --- a/pkg/cvo/internal/operatorstatus_test.go +++ b/pkg/cvo/internal/operatorstatus_test.go @@ -16,6 +16,7 @@ import ( configv1 "github.com/openshift/api/config/v1" "github.com/openshift/client-go/config/clientset/versioned/fake" + "github.com/openshift/cluster-version-operator/lib/resourcebuilder" "github.com/openshift/cluster-version-operator/pkg/payload" ) @@ -23,7 +24,7 @@ func Test_waitForOperatorStatusToBeDone(t *testing.T) { tests := []struct { name string actual *configv1.ClusterOperator - + mode resourcebuilder.Mode exp *configv1.ClusterOperator expErr error }{{ @@ -377,6 +378,32 @@ func Test_waitForOperatorStatusToBeDone(t *testing.T) { Message: "Cluster operator test-co is reporting a failure: random error", Name: "test-co", }, + }, { + name: "cluster operator reporting available=true failing=true when Reconciling", + actual: &configv1.ClusterOperator{ + ObjectMeta: metav1.ObjectMeta{Name: "test-co"}, + Status: configv1.ClusterOperatorStatus{ + Versions: []configv1.OperandVersion{{ + Name: "operator", Version: "v1", + }, { + Name: "operand-1", Version: "v1", + }}, + Conditions: []configv1.ClusterOperatorStatusCondition{{Type: configv1.OperatorAvailable, Status: configv1.ConditionTrue}, {Type: configv1.OperatorFailing, Status: configv1.ConditionTrue, Message: "random error"}}, + }, + }, + mode: resourcebuilder.ReconcilingMode, + exp: &configv1.ClusterOperator{ + ObjectMeta: metav1.ObjectMeta{Name: "test-co"}, + Status: configv1.ClusterOperatorStatus{ + Versions: []configv1.OperandVersion{{ + Name: "operator", Version: "v1", + }, { + Name: "operand-1", Version: "v1", + }}, + }, + }, + // we ignore the failing condition + expErr: nil, }, { name: "cluster operator reporting available=true progressing=true failing=true", actual: &configv1.ClusterOperator{ @@ -478,15 +505,12 @@ func Test_waitForOperatorStatusToBeDone(t *testing.T) { ctxWithTimeout, cancel := context.WithTimeout(context.TODO(), 1*time.Millisecond) defer cancel() - err := waitForOperatorStatusToBeDone(ctxWithTimeout, 1*time.Millisecond, client.ConfigV1(), test.exp) - if test.expErr == nil { - if err != nil { - t.Fatalf("expected nil error, got: %v", err) - } - } else { - if !reflect.DeepEqual(test.expErr, err) { - t.Fatalf("unexpected: %s", diff.ObjectReflectDiff(test.expErr, err)) - } + err := waitForOperatorStatusToBeDone(ctxWithTimeout, 1*time.Millisecond, clientClusterOperatorsGetter{getter: client.ConfigV1().ClusterOperators()}, test.exp, test.mode) + if (test.expErr == nil) != (err == nil) { + t.Fatalf("unexpected error: %v", err) + } + if !reflect.DeepEqual(test.expErr, err) { + t.Fatalf("unexpected: %s", diff.ObjectReflectDiff(test.expErr, err)) } }) } diff --git a/pkg/cvo/sync_test.go b/pkg/cvo/sync_test.go index 003c98981..7fdb052d0 100644 --- a/pkg/cvo/sync_test.go +++ b/pkg/cvo/sync_test.go @@ -121,7 +121,7 @@ func Test_SyncWorker_apply(t *testing.T) { worker := &SyncWorker{} worker.backoff.Steps = 3 - worker.builder = NewResourceBuilder(nil) + worker.builder = NewResourceBuilder(nil, nil) ctx := context.Background() worker.apply(ctx, up, &SyncWork{}, 1, &statusWrapper{w: worker, previousStatus: worker.Status()}) test.check(t, r.actions) @@ -300,10 +300,16 @@ type testBuilder struct { *recorder reactors map[action]error modifiers []resourcebuilder.MetaV1ObjectModifierFunc + mode resourcebuilder.Mode m *lib.Manifest } +func (t *testBuilder) WithMode(m resourcebuilder.Mode) resourcebuilder.Interface { + t.mode = m + return t +} + func (t *testBuilder) WithModifier(m resourcebuilder.MetaV1ObjectModifierFunc) resourcebuilder.Interface { t.modifiers = append(t.modifiers, m) return t @@ -353,7 +359,7 @@ func (r *fakeSyncRecorder) StatusCh() <-chan SyncWorkerStatus { func (r *fakeSyncRecorder) Start(maxWorkers int, stopCh <-chan struct{}) {} -func (r *fakeSyncRecorder) Update(generation int64, desired configv1.Update, overrides []configv1.ComponentOverride, reconciling bool) *SyncWorkerStatus { +func (r *fakeSyncRecorder) Update(generation int64, desired configv1.Update, overrides []configv1.ComponentOverride, state payload.State) *SyncWorkerStatus { r.Updates = append(r.Updates, desired) return r.Returns } @@ -392,7 +398,7 @@ type testResourceBuilder struct { modifiers []resourcebuilder.MetaV1ObjectModifierFunc } -func (b *testResourceBuilder) Apply(ctx context.Context, m *lib.Manifest) error { +func (b *testResourceBuilder) Apply(ctx context.Context, m *lib.Manifest, state payload.State) error { ns := m.Object().GetNamespace() fakeGVR := schema.GroupVersionResource{Group: m.GVK.Group, Version: m.GVK.Version, Resource: strings.ToLower(m.GVK.Kind)} client := b.client.Resource(fakeGVR).Namespace(ns) diff --git a/pkg/cvo/sync_worker.go b/pkg/cvo/sync_worker.go index 5d40bbceb..2d0874464 100644 --- a/pkg/cvo/sync_worker.go +++ b/pkg/cvo/sync_worker.go @@ -25,7 +25,7 @@ import ( // ConfigSyncWorker abstracts how the image is synchronized to the server. Introduced for testing. type ConfigSyncWorker interface { Start(maxWorkers int, stopCh <-chan struct{}) - Update(generation int64, desired configv1.Update, overrides []configv1.ComponentOverride, reconciling bool) *SyncWorkerStatus + Update(generation int64, desired configv1.Update, overrides []configv1.ComponentOverride, state payload.State) *SyncWorkerStatus StatusCh() <-chan SyncWorkerStatus } @@ -41,11 +41,11 @@ type StatusReporter interface { // SyncWork represents the work that should be done in a sync iteration. type SyncWork struct { - Generation int64 - Desired configv1.Update - Overrides []configv1.ComponentOverride - Reconciling bool - Completed int + Generation int64 + Desired configv1.Update + Overrides []configv1.ComponentOverride + State payload.State + Completed int } // Empty returns true if the image is empty for this work. @@ -64,6 +64,7 @@ type SyncWorkerStatus struct { Completed int Reconciling bool + Initial bool VersionHash string Actual configv1.Update @@ -147,7 +148,7 @@ func (w *SyncWorker) StatusCh() <-chan SyncWorkerStatus { // the initial state or whatever the last recorded status was. // TODO: in the future it may be desirable for changes that alter desired to wait briefly before returning, // giving the sync loop the opportunity to observe our change and begin working towards it. -func (w *SyncWorker) Update(generation int64, desired configv1.Update, overrides []configv1.ComponentOverride, reconciling bool) *SyncWorkerStatus { +func (w *SyncWorker) Update(generation int64, desired configv1.Update, overrides []configv1.ComponentOverride, state payload.State) *SyncWorkerStatus { w.lock.Lock() defer w.lock.Unlock() @@ -164,12 +165,10 @@ func (w *SyncWorker) Update(generation int64, desired configv1.Update, overrides // initialize the reconciliation flag and the status the first time // update is invoked if w.work == nil { - if reconciling { - work.Reconciling = true - } + work.State = state w.status = SyncWorkerStatus{ Generation: generation, - Reconciling: work.Reconciling, + Reconciling: state.Reconciling(), Actual: work.Desired, } } @@ -202,7 +201,7 @@ func (w *SyncWorker) Start(maxWorkers int, stopCh <-chan struct{}) { var next <-chan time.Time for { - waitingToReconcile := work.Reconciling + waitingToReconcile := work.State == payload.ReconcilingPayload select { case <-stopCh: glog.V(5).Infof("Stopped worker") @@ -261,7 +260,7 @@ func (w *SyncWorker) Start(maxWorkers int, stopCh <-chan struct{}) { glog.V(5).Infof("Sync succeeded, reconciling") work.Completed++ - work.Reconciling = true + work.State = payload.ReconcilingPayload next = time.After(w.minimumReconcileInterval) } }, 10*time.Millisecond, stopCh) @@ -302,14 +301,14 @@ func (w *SyncWorker) calculateNext(work *SyncWork) bool { // if this is the first time through the loop, initialize reconciling to // the state Update() calculated (to allow us to start in reconciling) if work.Empty() { - work.Reconciling = w.work.Reconciling + work.State = w.work.State } else { if changed { - work.Reconciling = false + work.State = payload.UpdatingPayload } } // always clear the completed variable if we are not reconciling - if !work.Reconciling { + if work.State != payload.ReconcilingPayload { work.Completed = 0 } @@ -378,22 +377,22 @@ func (w *SyncWorker) Status() *SyncWorkerStatus { // the update could not be completely applied. The status is updated as we progress. // Cancelling the context will abort the execution of the sync. func (w *SyncWorker) syncOnce(ctx context.Context, work *SyncWork, maxWorkers int, reporter StatusReporter) error { - glog.V(4).Infof("Running sync %s on generation %d", versionString(work.Desired), work.Generation) + glog.V(4).Infof("Running sync %s on generation %d in state %s", versionString(work.Desired), work.Generation, work.State) update := work.Desired // cache the payload until the release image changes validPayload := w.payload if validPayload == nil || !equalUpdate(configv1.Update{Image: validPayload.ReleaseImage}, update) { glog.V(4).Infof("Loading payload") - reporter.Report(SyncWorkerStatus{Step: "RetrievePayload", Reconciling: work.Reconciling, Actual: update}) + reporter.Report(SyncWorkerStatus{Step: "RetrievePayload", Initial: work.State.Initializing(), Reconciling: work.State.Reconciling(), Actual: update}) payloadDir, err := w.retriever.RetrievePayload(ctx, update) if err != nil { - reporter.Report(SyncWorkerStatus{Failure: err, Step: "RetrievePayload", Reconciling: work.Reconciling, Actual: update}) + reporter.Report(SyncWorkerStatus{Failure: err, Step: "RetrievePayload", Initial: work.State.Initializing(), Reconciling: work.State.Reconciling(), Actual: update}) return err } payloadUpdate, err := payload.LoadUpdate(payloadDir, update.Image) if err != nil { - reporter.Report(SyncWorkerStatus{Failure: err, Step: "VerifyPayload", Reconciling: work.Reconciling, Actual: update}) + reporter.Report(SyncWorkerStatus{Failure: err, Step: "VerifyPayload", Initial: work.State.Initializing(), Reconciling: work.State.Reconciling(), Actual: update}) return err } w.payload = payloadUpdate @@ -418,7 +417,8 @@ func (w *SyncWorker) apply(ctx context.Context, payloadUpdate *payload.Update, w cr := &consistentReporter{ status: SyncWorkerStatus{ Generation: work.Generation, - Reconciling: work.Reconciling, + Initial: work.State.Initializing(), + Reconciling: work.State.Reconciling(), VersionHash: payloadUpdate.ManifestHash, Actual: update, }, @@ -458,7 +458,7 @@ func (w *SyncWorker) apply(ctx context.Context, payloadUpdate *payload.Update, w continue } - if err := task.Run(ctx, version, w.builder); err != nil { + if err := task.Run(ctx, version, w.builder, work.State); err != nil { return err } cr.Inc() @@ -471,7 +471,7 @@ func (w *SyncWorker) apply(ctx context.Context, payloadUpdate *payload.Update, w return err } - // update the + // update the status cr.Complete() return nil } @@ -541,6 +541,7 @@ func (r *consistentReporter) Complete() { metricPayload.WithLabelValues(r.version, "applied").Set(float64(r.done)) copied := r.status copied.Completed = r.completed + 1 + copied.Initial = false copied.Reconciling = true copied.Fraction = 1 r.reporter.Report(copied) diff --git a/pkg/payload/payload.go b/pkg/payload/payload.go index 1d9345f72..333ca719c 100644 --- a/pkg/payload/payload.go +++ b/pkg/payload/payload.go @@ -21,6 +21,61 @@ import ( "github.com/openshift/cluster-version-operator/lib/resourceread" ) +// State describes the state of the payload and alters +// how a payload is applied. +type State int + +const ( + // UpdatingPayload indicates we are moving from one state to + // another. + // + // When we are moving to a different payload version, we want to + // be as conservative as possible about ordering of the payload + // and the errors we might encounter. An error in one operator + // should prevent dependent operators from changing. We are + // willing to take longer to roll out an update if it reduces + // the possibility of error. + UpdatingPayload State = iota + // ReconcilingPayload indicates we are attempting to maintain + // our current state. + // + // When the payload has already been applied to the cluster, we + // prioritize ensuring resources are recreated and don't need to + // progress in strict order. We also attempt to reset as many + // resources as possible back to their desired state and report + // errors after the fact. + ReconcilingPayload + // InitializingPayload indicates we are establishing our first + // state. + // + // When we are deploying a payload for the first time we want + // to make progress quickly but in a predictable order to + // minimize retries and crash-loops. We wait for operators + // to report level but tolerate degraded and transient errors. + // Our goal is to get the entire payload created, even if some + // operators are still converging. + InitializingPayload +) + +// Initializing is true if the state is InitializingPayload. +func (s State) Initializing() bool { return s == InitializingPayload } + +// Reconciling is true if the state is ReconcilingPayload. +func (s State) Reconciling() bool { return s == ReconcilingPayload } + +func (s State) String() string { + switch s { + case ReconcilingPayload: + return "Reconciling" + case UpdatingPayload: + return "Updating" + case InitializingPayload: + return "Initializing" + default: + panic(fmt.Sprintf("unrecognized state %d", int(s))) + } +} + const ( DefaultPayloadDir = "/" diff --git a/pkg/payload/task.go b/pkg/payload/task.go index 3089c0258..c28a66abe 100644 --- a/pkg/payload/task.go +++ b/pkg/payload/task.go @@ -31,7 +31,7 @@ func init() { // ResourceBuilder abstracts how a manifest is created on the server. Introduced for testing. type ResourceBuilder interface { - Apply(context.Context, *lib.Manifest) error + Apply(context.Context, *lib.Manifest, State) error } type Task struct { @@ -50,11 +50,11 @@ func (st *Task) String() string { return fmt.Sprintf("%s \"%s/%s\" (%d of %d)", strings.ToLower(st.Manifest.GVK.Kind), ns, st.Manifest.Object().GetName(), st.Index, st.Total) } -func (st *Task) Run(ctx context.Context, version string, builder ResourceBuilder) error { +func (st *Task) Run(ctx context.Context, version string, builder ResourceBuilder, state State) error { var lastErr error if err := wait.ExponentialBackoff(st.Backoff, func() (bool, error) { // run builder for the manifest - if err := builder.Apply(ctx, st.Manifest); err != nil { + if err := builder.Apply(ctx, st.Manifest, state); err != nil { utilruntime.HandleError(errors.Wrapf(err, "error running apply for %s", st)) lastErr = err metricPayloadErrors.WithLabelValues(version).Inc() diff --git a/pkg/start/start.go b/pkg/start/start.go index a2f9dcfe7..b3396ac98 100644 --- a/pkg/start/start.go +++ b/pkg/start/start.go @@ -274,7 +274,7 @@ func newClientBuilder(kubeconfig string) (*ClientBuilder, error) { } func increaseQPS(config *rest.Config) { - config.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(20, 40) + config.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(40, 80) } func useProtobuf(config *rest.Config) { diff --git a/pkg/start/start_integration_test.go b/pkg/start/start_integration_test.go index 5c3f82128..e11b4e39c 100644 --- a/pkg/start/start_integration_test.go +++ b/pkg/start/start_integration_test.go @@ -230,7 +230,7 @@ func TestIntegrationCVO_initializeAndUpgrade(t *testing.T) { options.EnableMetrics = false controllers := options.NewControllerContext(cb) - worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg), 5*time.Second, wait.Backoff{Steps: 3}).(*cvo.SyncWorker) + worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}).(*cvo.SyncWorker) controllers.CVO.SetSyncWorkerForTesting(worker) stopCh := make(chan struct{}) @@ -381,7 +381,7 @@ func TestIntegrationCVO_initializeAndHandleError(t *testing.T) { options.EnableMetrics = false controllers := options.NewControllerContext(cb) - worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg), 5*time.Second, wait.Backoff{Steps: 3}).(*cvo.SyncWorker) + worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}).(*cvo.SyncWorker) controllers.CVO.SetSyncWorkerForTesting(worker) stopCh := make(chan struct{}) @@ -487,7 +487,7 @@ func TestIntegrationCVO_gracefulStepDown(t *testing.T) { options.EnableMetrics = false controllers := options.NewControllerContext(cb) - worker := cvo.NewSyncWorker(&mapPayloadRetriever{}, cvo.NewResourceBuilder(cfg), 5*time.Second, wait.Backoff{Steps: 3}).(*cvo.SyncWorker) + worker := cvo.NewSyncWorker(&mapPayloadRetriever{}, cvo.NewResourceBuilder(cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}).(*cvo.SyncWorker) controllers.CVO.SetSyncWorkerForTesting(worker) lock, err := createResourceLock(cb, ns, ns) @@ -657,7 +657,7 @@ metadata: options.EnableMetrics = false controllers := options.NewControllerContext(cb) - worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg), 5*time.Second, wait.Backoff{Steps: 3}).(*cvo.SyncWorker) + worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}).(*cvo.SyncWorker) controllers.CVO.SetSyncWorkerForTesting(worker) stopCh := make(chan struct{})