Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions pkg/cvo/cvo.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,16 +677,25 @@ func (optr *Operator) sync(ctx context.Context, key string) error {
config := validation.ClearInvalidFields(original, errs)

// identify the desired next version
desired, ok := findUpdateFromConfig(config, optr.getArchitecture())
if ok {
klog.V(2).Infof("Desired version from spec is %#v", desired)
desired, found := findUpdateFromConfig(config, optr.getArchitecture())
initialized := optr.configSync.Initialized()
if found && initialized {
klog.V(2).Infof("Desired version from spec is %#v after initialization", desired)
} else {
pendingDesired := desired
currentVersion := optr.currentVersion()
desired = configv1.Update{
Version: currentVersion.Version,
Image: currentVersion.Image,
}
klog.V(2).Infof("Desired version from operator is %#v", desired)
if !initialized {
klog.V(2).Infof("Desired version from operator is %#v with user's request to go to %#v. "+
"We are currently initializing the worker and will evaluate the request later", desired, pendingDesired)
// enqueue to trigger a reconciliation on ClusterVersion
optr.queue.Add(optr.queueKey())
} else {
klog.V(2).Infof("Desired version from operator is %#v", desired)
}
}

// handle the case of a misconfigured CVO by doing nothing
Expand Down
110 changes: 109 additions & 1 deletion pkg/cvo/cvo_scenarios_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1398,6 +1398,7 @@ func TestCVO_UpgradeUnverifiedPayload(t *testing.T) {
t.Fatal("not the correct error type")
}
worker := o.configSync.(*SyncWorker)
worker.initializedFunc = func() bool { return true }
retriever := worker.retriever.(*fakeDirectoryRetriever)
retriever.Set(PayloadInfo{}, payloadErr)

Expand Down Expand Up @@ -1653,7 +1654,7 @@ func TestCVO_ResetPayloadLoadStatus(t *testing.T) {
t.Fatal("not the correct error type")
}
worker := o.configSync.(*SyncWorker)

worker.initializedFunc = func() bool { return true }
// checked by SyncWorker.syncPayload
worker.payload = &payload.Update{Release: o.release}

Expand Down Expand Up @@ -1904,6 +1905,7 @@ func TestCVO_UpgradeFailedPayloadLoadWithCapsChanges(t *testing.T) {
t.Fatal("not the correct error type")
}
worker := o.configSync.(*SyncWorker)
worker.initializedFunc = func() bool { return true }
retriever := worker.retriever.(*fakeDirectoryRetriever)
retriever.Set(PayloadInfo{}, payloadErr)

Expand Down Expand Up @@ -2022,6 +2024,7 @@ func TestCVO_InitImplicitlyEnabledCaps(t *testing.T) {

defer shutdownFn()
worker := o.configSync.(*SyncWorker)
worker.initializedFunc = func() bool { return true }

go worker.Start(ctx, 1)

Expand Down Expand Up @@ -2188,6 +2191,7 @@ func TestCVO_UpgradeUnverifiedPayloadRetrieveOnce(t *testing.T) {
t.Fatal("not the correct error type")
}
worker := o.configSync.(*SyncWorker)
worker.initializedFunc = func() bool { return true }
retriever := worker.retriever.(*fakeDirectoryRetriever)
retriever.Set(PayloadInfo{}, payloadErr)

Expand Down Expand Up @@ -2476,6 +2480,7 @@ func TestCVO_UpgradePreconditionFailing(t *testing.T) {
defer shutdownFn()

worker := o.configSync.(*SyncWorker)
worker.initializedFunc = func() bool { return true }
worker.preconditions = []precondition.Precondition{&testPrecondition{SuccessAfter: 3}}

go worker.Start(ctx, 1)
Expand Down Expand Up @@ -2750,6 +2755,7 @@ func TestCVO_UpgradePreconditionFailingAcceptedRisks(t *testing.T) {
defer shutdownFn()

worker := o.configSync.(*SyncWorker)
worker.initializedFunc = func() bool { return true }
worker.preconditions = []precondition.Precondition{&testPreconditionAlwaysFail{PreConditionName: "PreCondition1"}, &testPreconditionAlwaysFail{PreConditionName: "PreCondition2"}}

go worker.Start(ctx, 1)
Expand Down Expand Up @@ -2852,6 +2858,107 @@ func TestCVO_UpgradePreconditionFailingAcceptedRisks(t *testing.T) {
})
}

func TestCVO_UpgradePayloadStillInitializing(t *testing.T) {
o, cvs, client, _, shutdownFn := setupCVOTest("testdata/payloadtest")

// Setup: an upgrade request from user to a new image and the operator at the same image as before
//
o.release.Image = "image/image:0"
o.release.Version = "1.0.0-abc"
desired := configv1.Release{Version: "1.0.1-abc", Image: "image/image:1"}
uid, _ := uuid.NewRandom()
clusterUID := configv1.ClusterID(uid.String())
cvs["version"] = &configv1.ClusterVersion{
ObjectMeta: metav1.ObjectMeta{
Name: "version",
ResourceVersion: "1",
Generation: 1,
},
Spec: configv1.ClusterVersionSpec{
ClusterID: clusterUID,
Channel: "fast",
DesiredUpdate: &configv1.Update{Version: desired.Version, Image: desired.Image},
},
Status: configv1.ClusterVersionStatus{
// Prefers the image version over the operator's version (although in general they will remain in sync)
Desired: desired,
VersionHash: "DL-FFQ2Uem8=",
History: []configv1.UpdateHistory{
{State: configv1.CompletedUpdate, Image: "image/image:0", Version: "1.0.0-abc", Verified: true, StartedTime: defaultStartedTime, CompletionTime: &defaultCompletionTime},
},
Conditions: []configv1.ClusterOperatorStatusCondition{
{Type: ImplicitlyEnabledCapabilities, Status: "False", Reason: "AsExpected", Message: "Capabilities match configured spec"},
{Type: configv1.OperatorAvailable, Status: configv1.ConditionTrue, Message: "Done applying 1.0.0-abc"},
{Type: ClusterStatusFailing, Status: configv1.ConditionFalse},
{Type: configv1.OperatorProgressing, Status: configv1.ConditionFalse, Message: "Cluster version is 1.0.0-abc"},
{Type: configv1.RetrievedUpdates, Status: configv1.ConditionFalse},
},
},
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

defer shutdownFn()

worker := o.configSync.(*SyncWorker)
retriever := worker.retriever.(*fakeDirectoryRetriever)
retriever.Set(PayloadInfo{Directory: "testdata/payloadtest", Verified: true}, nil)

go worker.Start(ctx, 1)

// Step 1: Simulate a payload being retrieved while the sync worker is not initialized
// and ensure the desired version from the operator is taken from the operator and a reconciliation is enqueued
client.ClearActions()
err := o.sync(ctx, o.queueKey())
if err != nil {
t.Fatal(err)
}
actions := client.Actions()
if len(actions) != 2 {
t.Fatalf("%s", spew.Sdump(actions))
}
expectGet(t, actions[0], "clusterversions", "", "version")
expectUpdateStatus(t, actions[1], "clusterversions", "", &configv1.ClusterVersion{
ObjectMeta: metav1.ObjectMeta{
Name: "version",
ResourceVersion: "1",
Generation: 1,
},
Spec: configv1.ClusterVersionSpec{
ClusterID: clusterUID,
Channel: "fast",
DesiredUpdate: &configv1.Update{Version: desired.Version, Image: desired.Image},
},
Status: configv1.ClusterVersionStatus{
ObservedGeneration: 1,
// Prefers the operator's version
Desired: configv1.Release{Version: o.release.Version, Image: o.release.Image},
VersionHash: "DL-FFQ2Uem8=",
History: []configv1.UpdateHistory{
{State: configv1.CompletedUpdate, Image: "image/image:0", Version: "1.0.0-abc", Verified: true, StartedTime: defaultStartedTime, CompletionTime: &defaultCompletionTime},
},
Capabilities: configv1.ClusterVersionCapabilitiesStatus{
EnabledCapabilities: sortedCaps,
KnownCapabilities: sortedKnownCaps,
},
Conditions: []configv1.ClusterOperatorStatusCondition{
{Type: ImplicitlyEnabledCapabilities, Status: "False", Reason: "AsExpected", Message: "Capabilities match configured spec"},
{Type: configv1.OperatorAvailable, Status: configv1.ConditionTrue, Message: "Done applying 1.0.0-abc"},
{Type: ClusterStatusFailing, Status: configv1.ConditionFalse},
{Type: configv1.OperatorProgressing, Status: configv1.ConditionFalse, Message: "Cluster version is 1.0.0-abc"},
{Type: configv1.RetrievedUpdates, Status: configv1.ConditionFalse},
{Type: DesiredReleaseAccepted, Status: configv1.ConditionTrue, Reason: "PayloadLoaded",
Message: `Payload loaded version="1.0.0-abc" image="image/image:0" architecture="` + architecture + `"`},
},
},
})
if l := o.queue.Len(); l != 1 {
t.Errorf("expecting queue length is 1 but got %d", l)
}

}

func TestCVO_UpgradeVerifiedPayload(t *testing.T) {
o, cvs, client, _, shutdownFn := setupCVOTest("testdata/payloadtest-2")

Expand Down Expand Up @@ -2905,6 +3012,7 @@ func TestCVO_UpgradeVerifiedPayload(t *testing.T) {
t.Fatal("not the correct error type")
}
worker := o.configSync.(*SyncWorker)
worker.initializedFunc = func() bool { return true }
retriever := worker.retriever.(*fakeDirectoryRetriever)
retriever.Set(PayloadInfo{}, payloadErr)
retriever.Set(PayloadInfo{Directory: "testdata/payloadtest-2", Verified: true}, nil)
Expand Down
9 changes: 7 additions & 2 deletions pkg/cvo/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,8 +471,13 @@ func newAction(gvk schema.GroupVersionKind, namespace, name string) action {
}

type fakeSyncRecorder struct {
Returns *SyncWorkerStatus
Updates []configv1.Update
Returns *SyncWorkerStatus
Updates []configv1.Update
initializedFunc func() bool
}

func (r *fakeSyncRecorder) Initialized() bool {
return r.initializedFunc == nil || r.initializedFunc()
}

func (r *fakeSyncRecorder) StatusCh() <-chan SyncWorkerStatus {
Expand Down
12 changes: 12 additions & 0 deletions pkg/cvo/sync_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type ConfigSyncWorker interface {

// NotifyAboutManagedResourceActivity informs the sync worker about activity for a managed resource.
NotifyAboutManagedResourceActivity(msg string)
// Initialized returns true if the worker has work to do already
Initialized() bool
}

// PayloadInfo returns details about the payload when it was retrieved.
Expand Down Expand Up @@ -186,6 +188,9 @@ type SyncWorker struct {
// always be implicitly enabled.
// This contributes to whether or not some manifests are included for reconciliation.
alwaysEnableCapabilities []configv1.ClusterVersionCapability

// initializedFunc is only for the unit-test purpose
initializedFunc func() bool
}

// NewSyncWorker initializes a ConfigSyncWorker that will retrieve payloads to disk, apply them via builder
Expand Down Expand Up @@ -227,6 +232,13 @@ func (w *SyncWorker) StatusCh() <-chan SyncWorkerStatus {
return w.report
}

func (w *SyncWorker) Initialized() bool {
if w.initializedFunc != nil {
return w.initializedFunc()
}
return w.work != nil
}

// NotifyAboutManagedResourceActivity informs the sync worker about activity for a managed resource.
func (w *SyncWorker) NotifyAboutManagedResourceActivity(message string) {
select {
Expand Down