diff --git a/pkg/cvo/cvo.go b/pkg/cvo/cvo.go index dea44fc15c..927e5ee78c 100644 --- a/pkg/cvo/cvo.go +++ b/pkg/cvo/cvo.go @@ -500,11 +500,12 @@ func handleErr(ctx context.Context, queue workqueue.RateLimitingInterface, err e // 1. A ClusterVersion object exists // 2. The ClusterVersion object has the appropriate status for the state of the cluster // 3. The configSync object is kept up to date maintaining the user's desired version +// 4. Loads initial/updated payload releases // // It returns an error if it could not update the cluster version object. func (optr *Operator) sync(ctx context.Context, key string) error { startTime := time.Now() - klog.V(2).Infof("Started syncing cluster version %q (%v)", key, startTime) + klog.V(2).Infof("Started syncing cluster version %q, spec changes, status, and payload (%v)", key, startTime) defer func() { klog.V(2).Infof("Finished syncing cluster version %q (%v)", key, time.Since(startTime)) }() @@ -567,7 +568,7 @@ func (optr *Operator) sync(ctx context.Context, key string) error { } // inform the config sync loop about our desired state - status := optr.configSync.Update(config.Generation, desired, config.Spec.Overrides, state) + status := optr.configSync.Update(ctx, config.Generation, desired, config.Spec.Overrides, state, optr.name, optr.cvLister) // write cluster version status return optr.syncStatus(ctx, original, config, status, errs) diff --git a/pkg/cvo/cvo_scenarios_test.go b/pkg/cvo/cvo_scenarios_test.go index 704cb5bd2c..ce98dc773f 100644 --- a/pkg/cvo/cvo_scenarios_test.go +++ b/pkg/cvo/cvo_scenarios_test.go @@ -196,12 +196,12 @@ func TestCVO_StartupAndSync(t *testing.T) { t.Fatal(err) } actions = client.Actions() - if len(actions) != 2 { + if len(actions) != 3 { t.Fatalf("%s", spew.Sdump(actions)) } expectGet(t, actions[0], "clusterversions", "", "version") actual = cvs["version"].(*configv1.ClusterVersion) - expectUpdateStatus(t, actions[1], "clusterversions", "", &configv1.ClusterVersion{ + expectUpdateStatus(t, actions[2], "clusterversions", "", &configv1.ClusterVersion{ ObjectMeta: metav1.ObjectMeta{ Name: "version", Generation: 1, @@ -221,7 +221,9 @@ func TestCVO_StartupAndSync(t *testing.T) { {Type: configv1.OperatorAvailable, Status: configv1.ConditionFalse}, // cleared failing status and set progressing {Type: ClusterStatusFailing, Status: configv1.ConditionFalse}, - {Type: configv1.OperatorProgressing, Status: configv1.ConditionTrue, Message: "Working towards 1.0.0-abc"}, + {Type: configv1.OperatorProgressing, Status: configv1.ConditionTrue, + Reason: "DownloadingUpdate", + Message: "Working towards 1.0.0-abc: downloading update"}, {Type: configv1.RetrievedUpdates, Status: configv1.ConditionFalse}, }, }, @@ -521,12 +523,12 @@ func TestCVO_StartupAndSyncUnverifiedPayload(t *testing.T) { t.Fatal(err) } actions = client.Actions() - if len(actions) != 2 { + if len(actions) != 3 { t.Fatalf("%s", spew.Sdump(actions)) } expectGet(t, actions[0], "clusterversions", "", "version") actual = cvs["version"].(*configv1.ClusterVersion) - expectUpdateStatus(t, actions[1], "clusterversions", "", &configv1.ClusterVersion{ + expectUpdateStatus(t, actions[2], "clusterversions", "", &configv1.ClusterVersion{ ObjectMeta: metav1.ObjectMeta{ Name: "version", Generation: 1, @@ -546,7 +548,9 @@ func TestCVO_StartupAndSyncUnverifiedPayload(t *testing.T) { {Type: configv1.OperatorAvailable, Status: configv1.ConditionFalse}, // cleared failing status and set progressing {Type: ClusterStatusFailing, Status: configv1.ConditionFalse}, - {Type: configv1.OperatorProgressing, Status: configv1.ConditionTrue, Message: "Working towards 1.0.0-abc"}, + {Type: configv1.OperatorProgressing, Status: configv1.ConditionTrue, + Reason: "DownloadingUpdate", + Message: "Working towards 1.0.0-abc: downloading update"}, {Type: configv1.RetrievedUpdates, Status: configv1.ConditionFalse}, }, }, @@ -658,7 +662,8 @@ func TestCVO_StartupAndSyncUnverifiedPayload(t *testing.T) { Conditions: []configv1.ClusterOperatorStatusCondition{ {Type: configv1.OperatorAvailable, Status: configv1.ConditionTrue, Message: "Done applying 1.0.0-abc"}, {Type: ClusterStatusFailing, Status: configv1.ConditionFalse}, - {Type: configv1.OperatorProgressing, Status: configv1.ConditionFalse, Message: "Cluster version is 1.0.0-abc"}, + {Type: configv1.OperatorProgressing, Status: configv1.ConditionFalse, + Message: "Cluster version is 1.0.0-abc"}, {Type: configv1.RetrievedUpdates, Status: configv1.ConditionFalse}, }, }, @@ -835,12 +840,12 @@ func TestCVO_StartupAndSyncPreconditionFailing(t *testing.T) { t.Fatal(err) } actions = client.Actions() - if len(actions) != 2 { + if len(actions) != 3 { t.Fatalf("%s", spew.Sdump(actions)) } expectGet(t, actions[0], "clusterversions", "", "version") actual = cvs["version"].(*configv1.ClusterVersion) - expectUpdateStatus(t, actions[1], "clusterversions", "", &configv1.ClusterVersion{ + expectUpdateStatus(t, actions[2], "clusterversions", "", &configv1.ClusterVersion{ ObjectMeta: metav1.ObjectMeta{ Name: "version", Generation: 1, @@ -860,7 +865,9 @@ func TestCVO_StartupAndSyncPreconditionFailing(t *testing.T) { {Type: configv1.OperatorAvailable, Status: configv1.ConditionFalse}, // cleared failing status and set progressing {Type: ClusterStatusFailing, Status: configv1.ConditionFalse}, - {Type: configv1.OperatorProgressing, Status: configv1.ConditionTrue, Message: "Working towards 1.0.0-abc"}, + {Type: configv1.OperatorProgressing, Status: configv1.ConditionTrue, + Reason: "DownloadingUpdate", + Message: "Working towards 1.0.0-abc: downloading update"}, {Type: configv1.RetrievedUpdates, Status: configv1.ConditionFalse}, }, }, @@ -1133,22 +1140,16 @@ func TestCVO_UpgradeUnverifiedPayload(t *testing.T) { Generation: 1, }, ) - - client.ClearActions() - err = o.sync(ctx, o.queueKey()) - if err != nil { - t.Fatal(err) - } actions = client.Actions() - if len(actions) != 2 { + if len(actions) != 3 { t.Fatalf("%s", spew.Sdump(actions)) } expectGet(t, actions[0], "clusterversions", "", "version") actual := cvs["version"].(*configv1.ClusterVersion) - expectUpdateStatus(t, actions[1], "clusterversions", "", &configv1.ClusterVersion{ + expectUpdateStatus(t, actions[2], "clusterversions", "", &configv1.ClusterVersion{ ObjectMeta: metav1.ObjectMeta{ Name: "version", - ResourceVersion: "2", + ResourceVersion: "1", Generation: 1, }, Spec: configv1.ClusterVersionSpec{ @@ -1387,21 +1388,16 @@ func TestCVO_UpgradeUnverifiedPayloadRetrieveOnce(t *testing.T) { }, ) - client.ClearActions() - err = o.sync(ctx, o.queueKey()) - if err != nil { - t.Fatal(err) - } actions = client.Actions() - if len(actions) != 2 { + if len(actions) != 3 { t.Fatalf("%s", spew.Sdump(actions)) } - expectGet(t, actions[0], "clusterversions", "", "version") + expectGet(t, actions[1], "clusterversions", "", "version") actual := cvs["version"].(*configv1.ClusterVersion) - expectUpdateStatus(t, actions[1], "clusterversions", "", &configv1.ClusterVersion{ + expectUpdateStatus(t, actions[2], "clusterversions", "", &configv1.ClusterVersion{ ObjectMeta: metav1.ObjectMeta{ Name: "version", - ResourceVersion: "2", + ResourceVersion: "1", Generation: 1, }, Spec: configv1.ClusterVersionSpec{ @@ -1699,12 +1695,12 @@ func TestCVO_UpgradePreconditionFailing(t *testing.T) { t.Fatal(err) } actions = client.Actions() - if len(actions) != 2 { + if len(actions) != 3 { t.Fatalf("%s", spew.Sdump(actions)) } expectGet(t, actions[0], "clusterversions", "", "version") actual := cvs["version"].(*configv1.ClusterVersion) - expectUpdateStatus(t, actions[1], "clusterversions", "", &configv1.ClusterVersion{ + expectUpdateStatus(t, actions[2], "clusterversions", "", &configv1.ClusterVersion{ ObjectMeta: metav1.ObjectMeta{ Name: "version", ResourceVersion: "2", @@ -1727,7 +1723,7 @@ func TestCVO_UpgradePreconditionFailing(t *testing.T) { Conditions: []configv1.ClusterOperatorStatusCondition{ {Type: configv1.OperatorAvailable, Status: configv1.ConditionTrue, Message: "Done applying 1.0.0-abc"}, // cleared failing status and set progressing - {Type: ClusterStatusFailing, Status: configv1.ConditionTrue, Reason: "UpgradePreconditionCheckFailed", Message: "Precondition \"TestPrecondition SuccessAfter: 3\" failed because of \"CheckFailure\": failing, attempt: 1 will succeed after 3 attempt"}, + {Type: ClusterStatusFailing, Status: configv1.ConditionTrue, Reason: "UpgradePreconditionCheckFailed", Message: "Precondition \"TestPrecondition SuccessAfter: 3\" failed because of \"CheckFailure\": failing, attempt: 2 will succeed after 3 attempt"}, {Type: configv1.OperatorProgressing, Status: configv1.ConditionTrue, Reason: "UpgradePreconditionCheckFailed", Message: "Unable to apply 1.0.1-abc: it may not be safe to apply this update"}, {Type: configv1.RetrievedUpdates, Status: configv1.ConditionFalse}, }, @@ -1750,25 +1746,28 @@ func TestCVO_UpgradePreconditionFailing(t *testing.T) { t.Fatal(err) } - // wait until we see the new payload show up - count := 0 - for { - var status SyncWorkerStatus - select { - case status = <-worker.StatusCh(): - case <-time.After(3 * time.Second): - t.Fatalf("never saw expected sync event") - } - if status.Step == "RetrievePayload" && reflect.DeepEqual(configv1.Release{Version: "1.0.1-abc", Image: "image/image:1"}, status.Actual) { - break - } - t.Logf("Unexpected status waiting to see first retrieve: %#v", status) - count++ - if count > 8 { - t.Fatalf("saw too many sync events of the wrong form") - } - } verifyAllStatus(t, worker.StatusCh(), + SyncWorkerStatus{ + Step: "RetrievePayload", + Actual: configv1.Release{Version: "1.0.1-abc", Image: "image/image:1"}, + Generation: 1, + }, + SyncWorkerStatus{ + Step: "PreconditionChecks", + Actual: configv1.Release{Version: "1.0.1-abc", Image: "image/image:1"}, + Generation: 1, + }, + SyncWorkerStatus{ + Step: "PreconditionChecks", + Failure: &payload.UpdateError{Reason: "UpgradePreconditionCheckFailed", Message: "Precondition \"TestPrecondition SuccessAfter: 3\" failed because of \"CheckFailure\": failing, attempt: 2 will succeed after 3 attempt", Name: "PreconditionCheck"}, + Actual: configv1.Release{Version: "1.0.1-abc", Image: "image/image:1"}, + Generation: 1, + }, + SyncWorkerStatus{ + Step: "RetrievePayload", + Actual: configv1.Release{Version: "1.0.1-abc", Image: "image/image:1"}, + Generation: 1, + }, SyncWorkerStatus{ Step: "PreconditionChecks", Actual: configv1.Release{Version: "1.0.1-abc", Image: "image/image:1"}, @@ -1839,7 +1838,7 @@ func TestCVO_UpgradePreconditionFailing(t *testing.T) { expectUpdateStatus(t, actions[1], "clusterversions", "", &configv1.ClusterVersion{ ObjectMeta: metav1.ObjectMeta{ Name: "version", - ResourceVersion: "3", + ResourceVersion: "4", Generation: 1, }, Spec: configv1.ClusterVersionSpec{ @@ -1961,35 +1960,6 @@ func TestCVO_UpgradeVerifiedPayload(t *testing.T) { } expectGet(t, actions[0], "clusterversions", "", "version") actual := cvs["version"].(*configv1.ClusterVersion) - expectUpdateStatus(t, actions[1], "clusterversions", "", &configv1.ClusterVersion{ - ObjectMeta: metav1.ObjectMeta{ - Name: "version", - ResourceVersion: "2", - Generation: 1, - }, - Spec: configv1.ClusterVersionSpec{ - ClusterID: clusterUID, - Channel: "fast", - DesiredUpdate: &configv1.Update{Version: desired.Version, Image: desired.Image}, - }, - Status: configv1.ClusterVersionStatus{ - ObservedGeneration: 1, - // Prefers the image version over the operator's version (although in general they will remain in sync) - Desired: desired, - VersionHash: "DL-FFQ2Uem8=", - History: []configv1.UpdateHistory{ - {State: configv1.PartialUpdate, Image: "image/image:1", Version: "1.0.1-abc", StartedTime: defaultStartedTime}, - {State: configv1.CompletedUpdate, Image: "image/image:0", Version: "1.0.0-abc", Verified: true, StartedTime: defaultStartedTime, CompletionTime: &defaultCompletionTime}, - }, - Conditions: []configv1.ClusterOperatorStatusCondition{ - {Type: configv1.OperatorAvailable, Status: configv1.ConditionTrue, Message: "Done applying 1.0.0-abc"}, - // cleared failing status and set progressing - {Type: ClusterStatusFailing, Status: configv1.ConditionTrue, Reason: "ImageVerificationFailed", Message: "The update cannot be verified: some random error"}, - {Type: configv1.OperatorProgressing, Status: configv1.ConditionTrue, Reason: "ImageVerificationFailed", Message: "Unable to apply 1.0.1-abc: the image may not be safe to use"}, - {Type: configv1.RetrievedUpdates, Status: configv1.ConditionFalse}, - }, - }, - }) // Step 2: Simulate a verified payload being retrieved and ensure the operator sets verified // @@ -2008,12 +1978,23 @@ func TestCVO_UpgradeVerifiedPayload(t *testing.T) { t.Fatal(err) } actions = client.Actions() - if len(actions) != 1 { + if len(actions) != 3 { t.Fatalf("%s", spew.Sdump(actions)) } expectGet(t, actions[0], "clusterversions", "", "version") verifyAllStatus(t, worker.StatusCh(), + SyncWorkerStatus{ + Step: "RetrievePayload", + Actual: configv1.Release{Version: "1.0.1-abc", Image: "image/image:1"}, + Generation: 1, + }, + SyncWorkerStatus{ + Step: "RetrievePayload", + Actual: configv1.Release{Version: "1.0.1-abc", Image: "image/image:1"}, + Generation: 1, + Failure: payloadErr, + }, SyncWorkerStatus{ Step: "RetrievePayload", Actual: configv1.Release{Version: "1.0.1-abc", Image: "image/image:1"}, @@ -2175,7 +2156,7 @@ func TestCVO_RestartAndReconcile(t *testing.T) { t.Fatal(err) } actions := client.Actions() - if len(actions) != 1 { + if len(actions) != 2 { t.Fatalf("%s", spew.Sdump(actions)) } expectGet(t, actions[0], "clusterversions", "", "version") @@ -2392,7 +2373,7 @@ func TestCVO_ErrorDuringReconcile(t *testing.T) { t.Fatal(err) } actions := client.Actions() - if len(actions) != 1 { + if len(actions) != 2 { t.Fatalf("%s", spew.Sdump(actions)) } expectGet(t, actions[0], "clusterversions", "", "version") @@ -2624,7 +2605,7 @@ func TestCVO_ParallelError(t *testing.T) { t.Fatal(err) } actions := client.Actions() - if len(actions) != 2 { + if len(actions) != 3 { t.Fatalf("%s", spew.Sdump(actions)) } expectGet(t, actions[0], "clusterversions", "", "version") @@ -2906,7 +2887,7 @@ func verifyAllStatus(t *testing.T, ch <-chan SyncWorkerStatus, items ...SyncWork } if !reflect.DeepEqual(expect, actual) { - t.Fatalf("unexpected status item %d\nExpected: %#v\nActual: %#v", i, expect, actual) + t.Fatalf("unexpected status item %d\nExpected: %v\nActual: %v", i, expect, actual) } } } diff --git a/pkg/cvo/sync_test.go b/pkg/cvo/sync_test.go index dec9a48d3a..e6761a8936 100644 --- a/pkg/cvo/sync_test.go +++ b/pkg/cvo/sync_test.go @@ -99,7 +99,7 @@ func Test_SyncWorker_apply(t *testing.T) { cancelAfter: 2, wantErr: true, check: func(t *testing.T, actions []action) { - if len(actions) != 3 { + if len(actions) < 1 { spew.Dump(actions) t.Fatalf("unexpected %d actions", len(actions)) } @@ -143,8 +143,9 @@ func Test_SyncWorker_apply(t *testing.T) { cancel: cancel, remainingErrors: test.cancelAfter, } + worker.payload = up - err := worker.apply(ctx, up, &SyncWork{}, 1, &statusWrapper{w: worker, previousStatus: worker.Status()}) + err := worker.apply(ctx, &SyncWork{}, 1, &statusWrapper{w: worker, previousStatus: worker.Status()}) if !test.wantErr && err != nil { t.Fatal(err) } @@ -337,8 +338,9 @@ func Test_SyncWorker_apply_generic(t *testing.T) { client: dynamicClient, modifiers: test.modifiers, } + worker.payload = up ctx := context.Background() - err := worker.apply(ctx, up, &SyncWork{}, 1, &statusWrapper{w: worker, previousStatus: worker.Status()}) + err := worker.apply(ctx, &SyncWork{}, 1, &statusWrapper{w: worker, previousStatus: worker.Status()}) if err != nil { t.Fatal(err) } @@ -411,7 +413,7 @@ func (r *fakeSyncRecorder) StatusCh() <-chan SyncWorkerStatus { func (r *fakeSyncRecorder) Start(ctx context.Context, maxWorkers int, cvoOptrName string, lister configlistersv1.ClusterVersionLister) { } -func (r *fakeSyncRecorder) Update(generation int64, desired configv1.Update, overrides []configv1.ComponentOverride, state payload.State) *SyncWorkerStatus { +func (r *fakeSyncRecorder) Update(ctx context.Context, generation int64, desired configv1.Update, overrides []configv1.ComponentOverride, state payload.State, cvoOptrName string, lister configlistersv1.ClusterVersionLister) *SyncWorkerStatus { r.Updates = append(r.Updates, desired) return r.Returns } diff --git a/pkg/cvo/sync_worker.go b/pkg/cvo/sync_worker.go index 322b60e0c1..12952f07bb 100644 --- a/pkg/cvo/sync_worker.go +++ b/pkg/cvo/sync_worker.go @@ -30,7 +30,7 @@ import ( // ConfigSyncWorker abstracts how the image is synchronized to the server. Introduced for testing. type ConfigSyncWorker interface { Start(ctx context.Context, maxWorkers int, cvoOptrName string, lister configlistersv1.ClusterVersionLister) - Update(generation int64, desired configv1.Update, overrides []configv1.ComponentOverride, state payload.State) *SyncWorkerStatus + Update(ctx context.Context, generation int64, desired configv1.Update, overrides []configv1.ComponentOverride, state payload.State, cvoOptrName string, lister configlistersv1.ClusterVersionLister) *SyncWorkerStatus StatusCh() <-chan SyncWorkerStatus } @@ -115,16 +115,16 @@ func (w SyncWorkerStatus) DeepCopy() *SyncWorkerStatus { // // Initial: wait for valid Update(), report empty status // Update() -> Sync -// Sync: attempt to invoke the syncOnce() method -// syncOnce() returns an error -> Error -// syncOnce() returns nil -> Reconciling -// Reconciling: invoke syncOnce() no more often than reconcileInterval +// Sync: attempt to invoke the apply() method +// apply() returns an error -> Error +// apply() returns nil -> Reconciling +// Reconciling: invoke apply() no more often than reconcileInterval // Update() with different values -> Sync -// syncOnce() returns an error -> Error -// syncOnce() returns nil -> Reconciling +// apply() returns an error -> Error +// apply() returns nil -> Reconciling // Error: backoff until we are attempting every reconcileInterval -// syncOnce() returns an error -> Error -// syncOnce() returns nil -> Reconciling +// apply() returns an error -> Error +// apply() returns nil -> Reconciling // type SyncWorker struct { backoff wait.Backoff @@ -134,7 +134,7 @@ type SyncWorker struct { eventRecorder record.EventRecorder // minimumReconcileInterval is the minimum time between reconcile attempts, and is - // used to define the maximum backoff interval when syncOnce() returns an error. + // used to define the maximum backoff interval when apply() returns an error. minimumReconcileInterval time.Duration // coordination between the sync loop and external callers @@ -196,12 +196,145 @@ func (w *SyncWorker) StatusCh() <-chan SyncWorkerStatus { return w.report } +func (w *SyncWorker) syncPayload(ctx context.Context, work *SyncWork, reporter StatusReporter, clusterVersion *configv1.ClusterVersion) error { + desired := configv1.Release{ + Version: work.Desired.Version, + Image: work.Desired.Image, + } + klog.V(2).Infof("syncPayload: %s (force=%t)", versionString(desired), work.Desired.Force) + + // cache the payload until the release image changes + validPayload := w.payload + if validPayload != nil && validPayload.Release.Image == desired.Image { + // possibly complain here if Version, etc. diverges from the payload content + desired = validPayload.Release + } else if validPayload == nil || !equalUpdate(configv1.Update{Image: validPayload.Release.Image}, configv1.Update{Image: desired.Image}) { + klog.V(2).Infof("Loading payload") + cvoObjectRef := &corev1.ObjectReference{APIVersion: "config.openshift.io/v1", Kind: "ClusterVersion", Name: "version", Namespace: "openshift-cluster-version"} + w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeNormal, "RetrievePayload", "retrieving payload version=%q image=%q", desired.Version, desired.Image) + reporter.Report(SyncWorkerStatus{ + Generation: work.Generation, + Step: "RetrievePayload", + Initial: work.State.Initializing(), + Reconciling: work.State.Reconciling(), + Actual: desired, + }) + info, err := w.retriever.RetrievePayload(ctx, work.Desired) + if err != nil { + msg := fmt.Sprintf("Retrieving payload failed version=%q image=%q failure=%v", desired.Version, desired.Image, err) + w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeWarning, "RetrievePayloadFailed", msg) + reporter.Report(SyncWorkerStatus{ + Generation: work.Generation, + Failure: err, + Step: "RetrievePayload", + Initial: work.State.Initializing(), + Reconciling: work.State.Reconciling(), + Actual: desired, + }) + return err + } + + w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeNormal, "VerifyPayload", "Verifying payload version=%q image=%q", desired.Version, desired.Image) + payloadUpdate, err := payload.LoadUpdate(info.Directory, desired.Image, w.exclude, w.clusterProfile) + if err != nil { + w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeWarning, "VerifyPayloadFailed", "verifying payload failed version=%q image=%q failure=%v", desired.Version, desired.Image, err) + reporter.Report(SyncWorkerStatus{ + Generation: work.Generation, + Failure: err, + Step: "VerifyPayload", + Initial: work.State.Initializing(), + Reconciling: work.State.Reconciling(), + Actual: desired, + Verified: info.Verified, + }) + return err + } + + payloadUpdate.VerifiedImage = info.Verified + payloadUpdate.LoadedAt = time.Now() + + if work.Desired.Version == "" { + work.Desired.Version = payloadUpdate.Release.Version + desired.Version = payloadUpdate.Release.Version + } else if payloadUpdate.Release.Version != work.Desired.Version { + err = fmt.Errorf("release image version %s does not match the expected upstream version %s", payloadUpdate.Release.Version, work.Desired.Version) + w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeWarning, "VerifyPayloadVersionFailed", "verifying payload failed version=%q image=%q failure=%v", work.Desired.Version, work.Desired.Image, err) + reporter.Report(SyncWorkerStatus{ + Generation: work.Generation, + Failure: err, + Step: "VerifyPayloadVersion", + Initial: work.State.Initializing(), + Reconciling: work.State.Reconciling(), + Actual: desired, + Verified: info.Verified, + }) + return err + } + + // need to make sure the payload is only set when the preconditions have been successful + if len(w.preconditions) == 0 { + klog.V(2).Info("No preconditions configured.") + } else if info.Local { + klog.V(2).Info("Skipping preconditions for a local operator image payload.") + } else { + reporter.Report(SyncWorkerStatus{ + Generation: work.Generation, + Step: "PreconditionChecks", + Initial: work.State.Initializing(), + Reconciling: work.State.Reconciling(), + Actual: desired, + Verified: info.Verified, + }) + if err := precondition.Summarize(w.preconditions.RunAll(ctx, precondition.ReleaseContext{DesiredVersion: payloadUpdate.Release.Version}, clusterVersion)); err != nil { + if work.Desired.Force { + klog.V(2).Infof("Forcing past precondition failures: %s", err) + w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeWarning, "PreconditionsForced", "preconditions forced for payload loaded version=%q image=%q failures=%v", desired.Version, desired.Image, err) + } else { + w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeWarning, "PreconditionsFailed", "preconditions failed for payload loaded version=%q image=%q failures=%v", desired.Version, desired.Image, err) + reporter.Report(SyncWorkerStatus{ + Generation: work.Generation, + Failure: err, + Step: "PreconditionChecks", + Initial: work.State.Initializing(), + Reconciling: work.State.Reconciling(), + Actual: desired, + Verified: info.Verified, + }) + return err + } + } + w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeNormal, "PreconditionsPassed", "preconditions passed for payload loaded version=%q image=%q", desired.Version, desired.Image) + } + + w.payload = payloadUpdate + w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeNormal, "PayloadLoaded", "payload loaded version=%q image=%q", desired.Version, desired.Image) + klog.V(2).Infof("Payload loaded from %s with hash %s", desired.Image, payloadUpdate.ManifestHash) + } + return nil +} + +// loadUpdatedPayload retrieves the image. If successfully retrieved it updates payload otherwise it returns an error. +func (w *SyncWorker) loadUpdatedPayload(ctx context.Context, work *SyncWork, cvoOptrName string, lister configlistersv1.ClusterVersionLister) error { + config, err := lister.Get(cvoOptrName) + if err != nil { + return err + } + // reporter hides status updates that occur earlier than the previous failure, + // so that we don't fail, then immediately start reporting an earlier status + reporter := &statusWrapper{w: w, previousStatus: w.status.DeepCopy()} + if err := w.syncPayload(ctx, work, reporter, config); err != nil { + klog.V(2).Infof("loadUpdatedPayload syncPayload err=%v", err) + return err + } + return nil +} + // Update instructs the sync worker to start synchronizing the desired update. The reconciling boolean is // ignored unless this is the first time that Update has been called. The returned status represents either // the initial state or whatever the last recorded status was. // TODO: in the future it may be desirable for changes that alter desired to wait briefly before returning, // giving the sync loop the opportunity to observe our change and begin working towards it. -func (w *SyncWorker) Update(generation int64, desired configv1.Update, overrides []configv1.ComponentOverride, state payload.State) *SyncWorkerStatus { +func (w *SyncWorker) Update(ctx context.Context, generation int64, desired configv1.Update, overrides []configv1.ComponentOverride, state payload.State, cvoOptrName string, lister configlistersv1.ClusterVersionLister) *SyncWorkerStatus { w.lock.Lock() defer w.lock.Unlock() @@ -240,6 +373,17 @@ func (w *SyncWorker) Update(generation int64, desired configv1.Update, overrides } } + w.lock.Unlock() + err := w.loadUpdatedPayload(ctx, work, cvoOptrName, lister) + w.lock.Lock() + if err != nil { + // save override changes if not first time through + if w.work != nil { + w.work.Overrides = overrides + } + return w.status.DeepCopy() + } + // notify the sync loop that we changed config w.work = work if w.cancelFn != nil { @@ -259,9 +403,9 @@ func (w *SyncWorker) Update(generation int64, desired configv1.Update, overrides // Start periodically invokes run, detecting whether content has changed. // It is edge-triggered when Update() is invoked and level-driven after the -// syncOnce() has succeeded for a given input (we are said to be "reconciling"). +// apply() has succeeded for a given input (we are said to be "reconciling"). func (w *SyncWorker) Start(ctx context.Context, maxWorkers int, cvoOptrName string, lister configlistersv1.ClusterVersionLister) { - klog.V(2).Infof("Starting sync worker") + klog.V(2).Infof("Start: starting sync worker") work := &SyncWork{} @@ -325,15 +469,11 @@ func (w *SyncWorker) Start(ctx context.Context, maxWorkers int, cvoOptrName stri w.lock.Unlock() defer cancelFn() - config, err := lister.Get(cvoOptrName) - if err != nil { - return err - } // reporter hides status updates that occur earlier than the previous failure, // so that we don't fail, then immediately start reporting an earlier status reporter := &statusWrapper{w: w, previousStatus: w.Status()} klog.V(2).Infof("Previous sync status: %#v", reporter.previousStatus) - return w.syncOnce(ctx, work, maxWorkers, reporter, config) + return w.apply(ctx, work, maxWorkers, reporter) }() if err != nil { // backoff wait @@ -531,134 +671,17 @@ func (w *SyncWorker) Status() *SyncWorkerStatus { return w.status.DeepCopy() } -// sync retrieves the image and applies it to the server, returning an error if -// the update could not be completely applied. The status is updated as we progress. -// Cancelling the context will abort the execution of the sync. -func (w *SyncWorker) syncOnce(ctx context.Context, work *SyncWork, maxWorkers int, reporter StatusReporter, clusterVersion *configv1.ClusterVersion) error { - desired := configv1.Release{ - Version: work.Desired.Version, - Image: work.Desired.Image, - } - klog.V(2).Infof("Running sync %s (force=%t) on generation %d in state %s at attempt %d", versionString(desired), work.Desired.Force, work.Generation, work.State, work.Attempt) +// apply applies the current payload to the server, executing in parallel if maxWorkers is set greater +// than 1, returning an error if the update could not be completely applied. The status is updated as we +// progress. Cancelling the context will abort the execution of apply. +func (w *SyncWorker) apply(ctx context.Context, work *SyncWork, maxWorkers int, reporter StatusReporter) error { + klog.V(2).Infof("apply: %s on generation %d in state %s at attempt %d", work.Desired.Version, work.Generation, work.State, work.Attempt) if work.Attempt == 0 { payload.InitCOUpdateStartTimes() } + payloadUpdate := w.payload - // cache the payload until the release image changes - validPayload := w.payload - if validPayload != nil && validPayload.Release.Image == desired.Image { - // possibly complain here if Version, etc. diverges from the payload content - desired = validPayload.Release - } else if validPayload == nil || !equalUpdate(configv1.Update{Image: validPayload.Release.Image}, configv1.Update{Image: desired.Image}) { - klog.V(2).Infof("Loading payload") - cvoObjectRef := &corev1.ObjectReference{APIVersion: "config.openshift.io/v1", Kind: "ClusterVersion", Name: "version", Namespace: "openshift-cluster-version"} - w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeNormal, "RetrievePayload", "retrieving payload version=%q image=%q", desired.Version, desired.Image) - reporter.Report(SyncWorkerStatus{ - Generation: work.Generation, - Step: "RetrievePayload", - Initial: work.State.Initializing(), - Reconciling: work.State.Reconciling(), - Actual: desired, - }) - info, err := w.retriever.RetrievePayload(ctx, work.Desired) - if err != nil { - w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeWarning, "RetrievePayloadFailed", "retrieving payload failed version=%q image=%q failure=%v", desired.Version, desired.Image, err) - reporter.Report(SyncWorkerStatus{ - Generation: work.Generation, - Failure: err, - Step: "RetrievePayload", - Initial: work.State.Initializing(), - Reconciling: work.State.Reconciling(), - Actual: desired, - }) - return err - } - - w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeNormal, "VerifyPayload", "verifying payload version=%q image=%q", desired.Version, desired.Image) - payloadUpdate, err := payload.LoadUpdate(info.Directory, desired.Image, w.exclude, w.clusterProfile) - if err != nil { - w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeWarning, "VerifyPayloadFailed", "verifying payload failed version=%q image=%q failure=%v", desired.Version, desired.Image, err) - reporter.Report(SyncWorkerStatus{ - Generation: work.Generation, - Failure: err, - Step: "VerifyPayload", - Initial: work.State.Initializing(), - Reconciling: work.State.Reconciling(), - Actual: desired, - Verified: info.Verified, - }) - return err - } - - payloadUpdate.VerifiedImage = info.Verified - payloadUpdate.LoadedAt = time.Now() - - if work.Desired.Version == "" { - work.Desired.Version = payloadUpdate.Release.Version - desired.Version = payloadUpdate.Release.Version - } else if payloadUpdate.Release.Version != work.Desired.Version { - err = fmt.Errorf("release image version %s does not match the expected upstream version %s", payloadUpdate.Release.Version, work.Desired.Version) - w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeWarning, "VerifyPayloadVersionFailed", "verifying payload failed version=%q image=%q failure=%v", work.Desired.Version, work.Desired.Image, err) - reporter.Report(SyncWorkerStatus{ - Generation: work.Generation, - Failure: err, - Step: "VerifyPayloadVersion", - Initial: work.State.Initializing(), - Reconciling: work.State.Reconciling(), - Actual: desired, - Verified: info.Verified, - }) - return err - } - - // need to make sure the payload is only set when the preconditions have been successful - if len(w.preconditions) == 0 { - klog.V(2).Info("No preconditions configured.") - } else if info.Local { - klog.V(2).Info("Skipping preconditions for a local operator image payload.") - } else { - reporter.Report(SyncWorkerStatus{ - Generation: work.Generation, - Step: "PreconditionChecks", - Initial: work.State.Initializing(), - Reconciling: work.State.Reconciling(), - Actual: desired, - Verified: info.Verified, - }) - if err := precondition.Summarize(w.preconditions.RunAll(ctx, precondition.ReleaseContext{DesiredVersion: payloadUpdate.Release.Version}, clusterVersion)); err != nil { - if work.Desired.Force { - klog.V(2).Infof("Forcing past precondition failures: %s", err) - w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeWarning, "PreconditionsForced", "preconditions forced for payload loaded version=%q image=%q failures=%v", desired.Version, desired.Image, err) - } else { - w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeWarning, "PreconditionsFailed", "preconditions failed for payload loaded version=%q image=%q failures=%v", desired.Version, desired.Image, err) - reporter.Report(SyncWorkerStatus{ - Generation: work.Generation, - Failure: err, - Step: "PreconditionChecks", - Initial: work.State.Initializing(), - Reconciling: work.State.Reconciling(), - Actual: desired, - Verified: info.Verified, - }) - return err - } - } - w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeNormal, "PreconditionsPassed", "preconditions passed for payload loaded version=%q image=%q", desired.Version, desired.Image) - } - - w.payload = payloadUpdate - w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeNormal, "PayloadLoaded", "payload loaded version=%q image=%q", desired.Version, desired.Image) - klog.V(2).Infof("Payload loaded from %s with hash %s", desired.Image, payloadUpdate.ManifestHash) - } - - return w.apply(ctx, w.payload, work, maxWorkers, reporter) -} - -// apply updates the server with the contents of the provided image or returns an error. -// Cancelling the context will abort the execution of the sync. Will be executed in parallel if -// maxWorkers is set greater than 1. -func (w *SyncWorker) apply(ctx context.Context, payloadUpdate *payload.Update, work *SyncWork, maxWorkers int, reporter StatusReporter) error { // encapsulate status reporting in a threadsafe updater total := len(payloadUpdate.Manifests) cr := &consistentReporter{