diff --git a/pkg/cvo/cvo.go b/pkg/cvo/cvo.go index fa92bca901..08733b8d9f 100644 --- a/pkg/cvo/cvo.go +++ b/pkg/cvo/cvo.go @@ -677,16 +677,25 @@ func (optr *Operator) sync(ctx context.Context, key string) error { config := validation.ClearInvalidFields(original, errs) // identify the desired next version - desired, ok := findUpdateFromConfig(config, optr.getArchitecture()) - if ok { - klog.V(2).Infof("Desired version from spec is %#v", desired) + desired, found := findUpdateFromConfig(config, optr.getArchitecture()) + initialized := optr.configSync.Initialized() + if found && initialized { + klog.V(2).Infof("Desired version from spec is %#v after initialization", desired) } else { + pendingDesired := desired currentVersion := optr.currentVersion() desired = configv1.Update{ Version: currentVersion.Version, Image: currentVersion.Image, } - klog.V(2).Infof("Desired version from operator is %#v", desired) + if !initialized { + klog.V(2).Infof("Desired version from operator is %#v with user's request to go to %#v. "+ + "We are currently initializing the worker and will evaluate the request later", desired, pendingDesired) + // enqueue to trigger a reconciliation on ClusterVersion + optr.queue.Add(optr.queueKey()) + } else { + klog.V(2).Infof("Desired version from operator is %#v", desired) + } } // handle the case of a misconfigured CVO by doing nothing diff --git a/pkg/cvo/cvo_scenarios_test.go b/pkg/cvo/cvo_scenarios_test.go index dfd8eb32f1..7c75626a37 100644 --- a/pkg/cvo/cvo_scenarios_test.go +++ b/pkg/cvo/cvo_scenarios_test.go @@ -1398,6 +1398,7 @@ func TestCVO_UpgradeUnverifiedPayload(t *testing.T) { t.Fatal("not the correct error type") } worker := o.configSync.(*SyncWorker) + worker.initializedFunc = func() bool { return true } retriever := worker.retriever.(*fakeDirectoryRetriever) retriever.Set(PayloadInfo{}, payloadErr) @@ -1653,7 +1654,7 @@ func TestCVO_ResetPayloadLoadStatus(t *testing.T) { t.Fatal("not the correct error type") } worker := o.configSync.(*SyncWorker) - + worker.initializedFunc = func() bool { return true } // checked by SyncWorker.syncPayload worker.payload = &payload.Update{Release: o.release} @@ -1904,6 +1905,7 @@ func TestCVO_UpgradeFailedPayloadLoadWithCapsChanges(t *testing.T) { t.Fatal("not the correct error type") } worker := o.configSync.(*SyncWorker) + worker.initializedFunc = func() bool { return true } retriever := worker.retriever.(*fakeDirectoryRetriever) retriever.Set(PayloadInfo{}, payloadErr) @@ -2022,6 +2024,7 @@ func TestCVO_InitImplicitlyEnabledCaps(t *testing.T) { defer shutdownFn() worker := o.configSync.(*SyncWorker) + worker.initializedFunc = func() bool { return true } go worker.Start(ctx, 1) @@ -2188,6 +2191,7 @@ func TestCVO_UpgradeUnverifiedPayloadRetrieveOnce(t *testing.T) { t.Fatal("not the correct error type") } worker := o.configSync.(*SyncWorker) + worker.initializedFunc = func() bool { return true } retriever := worker.retriever.(*fakeDirectoryRetriever) retriever.Set(PayloadInfo{}, payloadErr) @@ -2476,6 +2480,7 @@ func TestCVO_UpgradePreconditionFailing(t *testing.T) { defer shutdownFn() worker := o.configSync.(*SyncWorker) + worker.initializedFunc = func() bool { return true } worker.preconditions = []precondition.Precondition{&testPrecondition{SuccessAfter: 3}} go worker.Start(ctx, 1) @@ -2750,6 +2755,7 @@ func TestCVO_UpgradePreconditionFailingAcceptedRisks(t *testing.T) { defer shutdownFn() worker := o.configSync.(*SyncWorker) + worker.initializedFunc = func() bool { return true } worker.preconditions = []precondition.Precondition{&testPreconditionAlwaysFail{PreConditionName: "PreCondition1"}, &testPreconditionAlwaysFail{PreConditionName: "PreCondition2"}} go worker.Start(ctx, 1) @@ -2852,6 +2858,107 @@ func TestCVO_UpgradePreconditionFailingAcceptedRisks(t *testing.T) { }) } +func TestCVO_UpgradePayloadStillInitializing(t *testing.T) { + o, cvs, client, _, shutdownFn := setupCVOTest("testdata/payloadtest") + + // Setup: an upgrade request from user to a new image and the operator at the same image as before + // + o.release.Image = "image/image:0" + o.release.Version = "1.0.0-abc" + desired := configv1.Release{Version: "1.0.1-abc", Image: "image/image:1"} + uid, _ := uuid.NewRandom() + clusterUID := configv1.ClusterID(uid.String()) + cvs["version"] = &configv1.ClusterVersion{ + ObjectMeta: metav1.ObjectMeta{ + Name: "version", + ResourceVersion: "1", + Generation: 1, + }, + Spec: configv1.ClusterVersionSpec{ + ClusterID: clusterUID, + Channel: "fast", + DesiredUpdate: &configv1.Update{Version: desired.Version, Image: desired.Image}, + }, + Status: configv1.ClusterVersionStatus{ + // Prefers the image version over the operator's version (although in general they will remain in sync) + Desired: desired, + VersionHash: "DL-FFQ2Uem8=", + History: []configv1.UpdateHistory{ + {State: configv1.CompletedUpdate, Image: "image/image:0", Version: "1.0.0-abc", Verified: true, StartedTime: defaultStartedTime, CompletionTime: &defaultCompletionTime}, + }, + Conditions: []configv1.ClusterOperatorStatusCondition{ + {Type: ImplicitlyEnabledCapabilities, Status: "False", Reason: "AsExpected", Message: "Capabilities match configured spec"}, + {Type: configv1.OperatorAvailable, Status: configv1.ConditionTrue, Message: "Done applying 1.0.0-abc"}, + {Type: ClusterStatusFailing, Status: configv1.ConditionFalse}, + {Type: configv1.OperatorProgressing, Status: configv1.ConditionFalse, Message: "Cluster version is 1.0.0-abc"}, + {Type: configv1.RetrievedUpdates, Status: configv1.ConditionFalse}, + }, + }, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + defer shutdownFn() + + worker := o.configSync.(*SyncWorker) + retriever := worker.retriever.(*fakeDirectoryRetriever) + retriever.Set(PayloadInfo{Directory: "testdata/payloadtest", Verified: true}, nil) + + go worker.Start(ctx, 1) + + // Step 1: Simulate a payload being retrieved while the sync worker is not initialized + // and ensure the desired version from the operator is taken from the operator and a reconciliation is enqueued + client.ClearActions() + err := o.sync(ctx, o.queueKey()) + if err != nil { + t.Fatal(err) + } + actions := client.Actions() + if len(actions) != 2 { + t.Fatalf("%s", spew.Sdump(actions)) + } + expectGet(t, actions[0], "clusterversions", "", "version") + expectUpdateStatus(t, actions[1], "clusterversions", "", &configv1.ClusterVersion{ + ObjectMeta: metav1.ObjectMeta{ + Name: "version", + ResourceVersion: "1", + Generation: 1, + }, + Spec: configv1.ClusterVersionSpec{ + ClusterID: clusterUID, + Channel: "fast", + DesiredUpdate: &configv1.Update{Version: desired.Version, Image: desired.Image}, + }, + Status: configv1.ClusterVersionStatus{ + ObservedGeneration: 1, + // Prefers the operator's version + Desired: configv1.Release{Version: o.release.Version, Image: o.release.Image}, + VersionHash: "DL-FFQ2Uem8=", + History: []configv1.UpdateHistory{ + {State: configv1.CompletedUpdate, Image: "image/image:0", Version: "1.0.0-abc", Verified: true, StartedTime: defaultStartedTime, CompletionTime: &defaultCompletionTime}, + }, + Capabilities: configv1.ClusterVersionCapabilitiesStatus{ + EnabledCapabilities: sortedCaps, + KnownCapabilities: sortedKnownCaps, + }, + Conditions: []configv1.ClusterOperatorStatusCondition{ + {Type: ImplicitlyEnabledCapabilities, Status: "False", Reason: "AsExpected", Message: "Capabilities match configured spec"}, + {Type: configv1.OperatorAvailable, Status: configv1.ConditionTrue, Message: "Done applying 1.0.0-abc"}, + {Type: ClusterStatusFailing, Status: configv1.ConditionFalse}, + {Type: configv1.OperatorProgressing, Status: configv1.ConditionFalse, Message: "Cluster version is 1.0.0-abc"}, + {Type: configv1.RetrievedUpdates, Status: configv1.ConditionFalse}, + {Type: DesiredReleaseAccepted, Status: configv1.ConditionTrue, Reason: "PayloadLoaded", + Message: `Payload loaded version="1.0.0-abc" image="image/image:0" architecture="` + architecture + `"`}, + }, + }, + }) + if l := o.queue.Len(); l != 1 { + t.Errorf("expecting queue length is 1 but got %d", l) + } + +} + func TestCVO_UpgradeVerifiedPayload(t *testing.T) { o, cvs, client, _, shutdownFn := setupCVOTest("testdata/payloadtest-2") @@ -2905,6 +3012,7 @@ func TestCVO_UpgradeVerifiedPayload(t *testing.T) { t.Fatal("not the correct error type") } worker := o.configSync.(*SyncWorker) + worker.initializedFunc = func() bool { return true } retriever := worker.retriever.(*fakeDirectoryRetriever) retriever.Set(PayloadInfo{}, payloadErr) retriever.Set(PayloadInfo{Directory: "testdata/payloadtest-2", Verified: true}, nil) diff --git a/pkg/cvo/sync_test.go b/pkg/cvo/sync_test.go index 84f1f6e62c..77476ab684 100644 --- a/pkg/cvo/sync_test.go +++ b/pkg/cvo/sync_test.go @@ -471,8 +471,13 @@ func newAction(gvk schema.GroupVersionKind, namespace, name string) action { } type fakeSyncRecorder struct { - Returns *SyncWorkerStatus - Updates []configv1.Update + Returns *SyncWorkerStatus + Updates []configv1.Update + initializedFunc func() bool +} + +func (r *fakeSyncRecorder) Initialized() bool { + return r.initializedFunc == nil || r.initializedFunc() } func (r *fakeSyncRecorder) StatusCh() <-chan SyncWorkerStatus { diff --git a/pkg/cvo/sync_worker.go b/pkg/cvo/sync_worker.go index 8c66c3469c..59ff726458 100644 --- a/pkg/cvo/sync_worker.go +++ b/pkg/cvo/sync_worker.go @@ -35,6 +35,8 @@ type ConfigSyncWorker interface { // NotifyAboutManagedResourceActivity informs the sync worker about activity for a managed resource. NotifyAboutManagedResourceActivity(msg string) + // Initialized returns true if the worker has work to do already + Initialized() bool } // PayloadInfo returns details about the payload when it was retrieved. @@ -186,6 +188,9 @@ type SyncWorker struct { // always be implicitly enabled. // This contributes to whether or not some manifests are included for reconciliation. alwaysEnableCapabilities []configv1.ClusterVersionCapability + + // initializedFunc is only for the unit-test purpose + initializedFunc func() bool } // NewSyncWorker initializes a ConfigSyncWorker that will retrieve payloads to disk, apply them via builder @@ -227,6 +232,13 @@ func (w *SyncWorker) StatusCh() <-chan SyncWorkerStatus { return w.report } +func (w *SyncWorker) Initialized() bool { + if w.initializedFunc != nil { + return w.initializedFunc() + } + return w.work != nil +} + // NotifyAboutManagedResourceActivity informs the sync worker about activity for a managed resource. func (w *SyncWorker) NotifyAboutManagedResourceActivity(message string) { select {