Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions pkg/cvo/cvo.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,11 +500,12 @@ func handleErr(ctx context.Context, queue workqueue.RateLimitingInterface, err e
// 1. A ClusterVersion object exists
// 2. The ClusterVersion object has the appropriate status for the state of the cluster
// 3. The configSync object is kept up to date maintaining the user's desired version
// 4. Loads initial/updated payload releases
//
// It returns an error if it could not update the cluster version object.
func (optr *Operator) sync(ctx context.Context, key string) error {
startTime := time.Now()
klog.V(2).Infof("Started syncing cluster version %q (%v)", key, startTime)
klog.V(2).Infof("Started syncing cluster version %q, spec changes, status, and payload (%v)", key, startTime)
defer func() {
klog.V(2).Infof("Finished syncing cluster version %q (%v)", key, time.Since(startTime))
}()
Expand Down Expand Up @@ -567,7 +568,7 @@ func (optr *Operator) sync(ctx context.Context, key string) error {
}

// inform the config sync loop about our desired state
status := optr.configSync.Update(config.Generation, desired, config.Spec.Overrides, state)
status := optr.configSync.Update(ctx, config.Generation, desired, config.Spec.Overrides, state, optr.name, optr.cvLister)

// write cluster version status
return optr.syncStatus(ctx, original, config, status, errs)
Expand Down
149 changes: 65 additions & 84 deletions pkg/cvo/cvo_scenarios_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,12 +196,12 @@ func TestCVO_StartupAndSync(t *testing.T) {
t.Fatal(err)
}
actions = client.Actions()
if len(actions) != 2 {
if len(actions) != 3 {
t.Fatalf("%s", spew.Sdump(actions))
}
expectGet(t, actions[0], "clusterversions", "", "version")
actual = cvs["version"].(*configv1.ClusterVersion)
expectUpdateStatus(t, actions[1], "clusterversions", "", &configv1.ClusterVersion{
expectUpdateStatus(t, actions[2], "clusterversions", "", &configv1.ClusterVersion{
ObjectMeta: metav1.ObjectMeta{
Name: "version",
Generation: 1,
Expand All @@ -221,7 +221,9 @@ func TestCVO_StartupAndSync(t *testing.T) {
{Type: configv1.OperatorAvailable, Status: configv1.ConditionFalse},
// cleared failing status and set progressing
{Type: ClusterStatusFailing, Status: configv1.ConditionFalse},
{Type: configv1.OperatorProgressing, Status: configv1.ConditionTrue, Message: "Working towards 1.0.0-abc"},
{Type: configv1.OperatorProgressing, Status: configv1.ConditionTrue,
Reason: "DownloadingUpdate",
Message: "Working towards 1.0.0-abc: downloading update"},
{Type: configv1.RetrievedUpdates, Status: configv1.ConditionFalse},
},
},
Expand Down Expand Up @@ -521,12 +523,12 @@ func TestCVO_StartupAndSyncUnverifiedPayload(t *testing.T) {
t.Fatal(err)
}
actions = client.Actions()
if len(actions) != 2 {
if len(actions) != 3 {
t.Fatalf("%s", spew.Sdump(actions))
}
expectGet(t, actions[0], "clusterversions", "", "version")
actual = cvs["version"].(*configv1.ClusterVersion)
expectUpdateStatus(t, actions[1], "clusterversions", "", &configv1.ClusterVersion{
expectUpdateStatus(t, actions[2], "clusterversions", "", &configv1.ClusterVersion{
ObjectMeta: metav1.ObjectMeta{
Name: "version",
Generation: 1,
Expand All @@ -546,7 +548,9 @@ func TestCVO_StartupAndSyncUnverifiedPayload(t *testing.T) {
{Type: configv1.OperatorAvailable, Status: configv1.ConditionFalse},
// cleared failing status and set progressing
{Type: ClusterStatusFailing, Status: configv1.ConditionFalse},
{Type: configv1.OperatorProgressing, Status: configv1.ConditionTrue, Message: "Working towards 1.0.0-abc"},
{Type: configv1.OperatorProgressing, Status: configv1.ConditionTrue,
Reason: "DownloadingUpdate",
Message: "Working towards 1.0.0-abc: downloading update"},
{Type: configv1.RetrievedUpdates, Status: configv1.ConditionFalse},
},
},
Expand Down Expand Up @@ -658,7 +662,8 @@ func TestCVO_StartupAndSyncUnverifiedPayload(t *testing.T) {
Conditions: []configv1.ClusterOperatorStatusCondition{
{Type: configv1.OperatorAvailable, Status: configv1.ConditionTrue, Message: "Done applying 1.0.0-abc"},
{Type: ClusterStatusFailing, Status: configv1.ConditionFalse},
{Type: configv1.OperatorProgressing, Status: configv1.ConditionFalse, Message: "Cluster version is 1.0.0-abc"},
{Type: configv1.OperatorProgressing, Status: configv1.ConditionFalse,
Message: "Cluster version is 1.0.0-abc"},
{Type: configv1.RetrievedUpdates, Status: configv1.ConditionFalse},
},
},
Expand Down Expand Up @@ -835,12 +840,12 @@ func TestCVO_StartupAndSyncPreconditionFailing(t *testing.T) {
t.Fatal(err)
}
actions = client.Actions()
if len(actions) != 2 {
if len(actions) != 3 {
t.Fatalf("%s", spew.Sdump(actions))
}
expectGet(t, actions[0], "clusterversions", "", "version")
actual = cvs["version"].(*configv1.ClusterVersion)
expectUpdateStatus(t, actions[1], "clusterversions", "", &configv1.ClusterVersion{
expectUpdateStatus(t, actions[2], "clusterversions", "", &configv1.ClusterVersion{
ObjectMeta: metav1.ObjectMeta{
Name: "version",
Generation: 1,
Expand All @@ -860,7 +865,9 @@ func TestCVO_StartupAndSyncPreconditionFailing(t *testing.T) {
{Type: configv1.OperatorAvailable, Status: configv1.ConditionFalse},
// cleared failing status and set progressing
{Type: ClusterStatusFailing, Status: configv1.ConditionFalse},
{Type: configv1.OperatorProgressing, Status: configv1.ConditionTrue, Message: "Working towards 1.0.0-abc"},
{Type: configv1.OperatorProgressing, Status: configv1.ConditionTrue,
Reason: "DownloadingUpdate",
Message: "Working towards 1.0.0-abc: downloading update"},
{Type: configv1.RetrievedUpdates, Status: configv1.ConditionFalse},
},
},
Expand Down Expand Up @@ -1133,22 +1140,16 @@ func TestCVO_UpgradeUnverifiedPayload(t *testing.T) {
Generation: 1,
},
)

client.ClearActions()
err = o.sync(ctx, o.queueKey())
if err != nil {
t.Fatal(err)
}
actions = client.Actions()
if len(actions) != 2 {
if len(actions) != 3 {
t.Fatalf("%s", spew.Sdump(actions))
}
expectGet(t, actions[0], "clusterversions", "", "version")
actual := cvs["version"].(*configv1.ClusterVersion)
expectUpdateStatus(t, actions[1], "clusterversions", "", &configv1.ClusterVersion{
expectUpdateStatus(t, actions[2], "clusterversions", "", &configv1.ClusterVersion{
ObjectMeta: metav1.ObjectMeta{
Name: "version",
ResourceVersion: "2",
ResourceVersion: "1",
Generation: 1,
},
Spec: configv1.ClusterVersionSpec{
Expand Down Expand Up @@ -1387,21 +1388,16 @@ func TestCVO_UpgradeUnverifiedPayloadRetrieveOnce(t *testing.T) {
},
)

client.ClearActions()
err = o.sync(ctx, o.queueKey())
if err != nil {
t.Fatal(err)
}
actions = client.Actions()
if len(actions) != 2 {
if len(actions) != 3 {
t.Fatalf("%s", spew.Sdump(actions))
}
expectGet(t, actions[0], "clusterversions", "", "version")
expectGet(t, actions[1], "clusterversions", "", "version")
actual := cvs["version"].(*configv1.ClusterVersion)
expectUpdateStatus(t, actions[1], "clusterversions", "", &configv1.ClusterVersion{
expectUpdateStatus(t, actions[2], "clusterversions", "", &configv1.ClusterVersion{
ObjectMeta: metav1.ObjectMeta{
Name: "version",
ResourceVersion: "2",
ResourceVersion: "1",
Generation: 1,
},
Spec: configv1.ClusterVersionSpec{
Expand Down Expand Up @@ -1699,12 +1695,12 @@ func TestCVO_UpgradePreconditionFailing(t *testing.T) {
t.Fatal(err)
}
actions = client.Actions()
if len(actions) != 2 {
if len(actions) != 3 {
t.Fatalf("%s", spew.Sdump(actions))
}
expectGet(t, actions[0], "clusterversions", "", "version")
actual := cvs["version"].(*configv1.ClusterVersion)
expectUpdateStatus(t, actions[1], "clusterversions", "", &configv1.ClusterVersion{
expectUpdateStatus(t, actions[2], "clusterversions", "", &configv1.ClusterVersion{
ObjectMeta: metav1.ObjectMeta{
Name: "version",
ResourceVersion: "2",
Expand All @@ -1727,7 +1723,7 @@ func TestCVO_UpgradePreconditionFailing(t *testing.T) {
Conditions: []configv1.ClusterOperatorStatusCondition{
{Type: configv1.OperatorAvailable, Status: configv1.ConditionTrue, Message: "Done applying 1.0.0-abc"},
// cleared failing status and set progressing
{Type: ClusterStatusFailing, Status: configv1.ConditionTrue, Reason: "UpgradePreconditionCheckFailed", Message: "Precondition \"TestPrecondition SuccessAfter: 3\" failed because of \"CheckFailure\": failing, attempt: 1 will succeed after 3 attempt"},
{Type: ClusterStatusFailing, Status: configv1.ConditionTrue, Reason: "UpgradePreconditionCheckFailed", Message: "Precondition \"TestPrecondition SuccessAfter: 3\" failed because of \"CheckFailure\": failing, attempt: 2 will succeed after 3 attempt"},
{Type: configv1.OperatorProgressing, Status: configv1.ConditionTrue, Reason: "UpgradePreconditionCheckFailed", Message: "Unable to apply 1.0.1-abc: it may not be safe to apply this update"},
{Type: configv1.RetrievedUpdates, Status: configv1.ConditionFalse},
},
Expand All @@ -1750,25 +1746,28 @@ func TestCVO_UpgradePreconditionFailing(t *testing.T) {
t.Fatal(err)
}

// wait until we see the new payload show up
count := 0
for {
var status SyncWorkerStatus
select {
case status = <-worker.StatusCh():
case <-time.After(3 * time.Second):
t.Fatalf("never saw expected sync event")
}
if status.Step == "RetrievePayload" && reflect.DeepEqual(configv1.Release{Version: "1.0.1-abc", Image: "image/image:1"}, status.Actual) {
break
}
t.Logf("Unexpected status waiting to see first retrieve: %#v", status)
count++
if count > 8 {
t.Fatalf("saw too many sync events of the wrong form")
}
}
verifyAllStatus(t, worker.StatusCh(),
SyncWorkerStatus{
Step: "RetrievePayload",
Actual: configv1.Release{Version: "1.0.1-abc", Image: "image/image:1"},
Generation: 1,
},
SyncWorkerStatus{
Step: "PreconditionChecks",
Actual: configv1.Release{Version: "1.0.1-abc", Image: "image/image:1"},
Generation: 1,
},
SyncWorkerStatus{
Step: "PreconditionChecks",
Failure: &payload.UpdateError{Reason: "UpgradePreconditionCheckFailed", Message: "Precondition \"TestPrecondition SuccessAfter: 3\" failed because of \"CheckFailure\": failing, attempt: 2 will succeed after 3 attempt", Name: "PreconditionCheck"},
Actual: configv1.Release{Version: "1.0.1-abc", Image: "image/image:1"},
Generation: 1,
},
SyncWorkerStatus{
Step: "RetrievePayload",
Actual: configv1.Release{Version: "1.0.1-abc", Image: "image/image:1"},
Generation: 1,
},
SyncWorkerStatus{
Step: "PreconditionChecks",
Actual: configv1.Release{Version: "1.0.1-abc", Image: "image/image:1"},
Expand Down Expand Up @@ -1839,7 +1838,7 @@ func TestCVO_UpgradePreconditionFailing(t *testing.T) {
expectUpdateStatus(t, actions[1], "clusterversions", "", &configv1.ClusterVersion{
ObjectMeta: metav1.ObjectMeta{
Name: "version",
ResourceVersion: "3",
ResourceVersion: "4",
Generation: 1,
},
Spec: configv1.ClusterVersionSpec{
Expand Down Expand Up @@ -1961,35 +1960,6 @@ func TestCVO_UpgradeVerifiedPayload(t *testing.T) {
}
expectGet(t, actions[0], "clusterversions", "", "version")
actual := cvs["version"].(*configv1.ClusterVersion)
expectUpdateStatus(t, actions[1], "clusterversions", "", &configv1.ClusterVersion{
ObjectMeta: metav1.ObjectMeta{
Name: "version",
ResourceVersion: "2",
Generation: 1,
},
Spec: configv1.ClusterVersionSpec{
ClusterID: clusterUID,
Channel: "fast",
DesiredUpdate: &configv1.Update{Version: desired.Version, Image: desired.Image},
},
Status: configv1.ClusterVersionStatus{
ObservedGeneration: 1,
// Prefers the image version over the operator's version (although in general they will remain in sync)
Desired: desired,
VersionHash: "DL-FFQ2Uem8=",
History: []configv1.UpdateHistory{
{State: configv1.PartialUpdate, Image: "image/image:1", Version: "1.0.1-abc", StartedTime: defaultStartedTime},
{State: configv1.CompletedUpdate, Image: "image/image:0", Version: "1.0.0-abc", Verified: true, StartedTime: defaultStartedTime, CompletionTime: &defaultCompletionTime},
},
Conditions: []configv1.ClusterOperatorStatusCondition{
{Type: configv1.OperatorAvailable, Status: configv1.ConditionTrue, Message: "Done applying 1.0.0-abc"},
// cleared failing status and set progressing
{Type: ClusterStatusFailing, Status: configv1.ConditionTrue, Reason: "ImageVerificationFailed", Message: "The update cannot be verified: some random error"},
{Type: configv1.OperatorProgressing, Status: configv1.ConditionTrue, Reason: "ImageVerificationFailed", Message: "Unable to apply 1.0.1-abc: the image may not be safe to use"},
{Type: configv1.RetrievedUpdates, Status: configv1.ConditionFalse},
},
},
})

// Step 2: Simulate a verified payload being retrieved and ensure the operator sets verified
//
Expand All @@ -2008,12 +1978,23 @@ func TestCVO_UpgradeVerifiedPayload(t *testing.T) {
t.Fatal(err)
}
actions = client.Actions()
if len(actions) != 1 {
if len(actions) != 3 {
t.Fatalf("%s", spew.Sdump(actions))
}
expectGet(t, actions[0], "clusterversions", "", "version")

verifyAllStatus(t, worker.StatusCh(),
SyncWorkerStatus{
Step: "RetrievePayload",
Actual: configv1.Release{Version: "1.0.1-abc", Image: "image/image:1"},
Generation: 1,
},
SyncWorkerStatus{
Step: "RetrievePayload",
Actual: configv1.Release{Version: "1.0.1-abc", Image: "image/image:1"},
Generation: 1,
Failure: payloadErr,
},
SyncWorkerStatus{
Step: "RetrievePayload",
Actual: configv1.Release{Version: "1.0.1-abc", Image: "image/image:1"},
Expand Down Expand Up @@ -2175,7 +2156,7 @@ func TestCVO_RestartAndReconcile(t *testing.T) {
t.Fatal(err)
}
actions := client.Actions()
if len(actions) != 1 {
if len(actions) != 2 {
t.Fatalf("%s", spew.Sdump(actions))
}
expectGet(t, actions[0], "clusterversions", "", "version")
Expand Down Expand Up @@ -2392,7 +2373,7 @@ func TestCVO_ErrorDuringReconcile(t *testing.T) {
t.Fatal(err)
}
actions := client.Actions()
if len(actions) != 1 {
if len(actions) != 2 {
t.Fatalf("%s", spew.Sdump(actions))
}
expectGet(t, actions[0], "clusterversions", "", "version")
Expand Down Expand Up @@ -2624,7 +2605,7 @@ func TestCVO_ParallelError(t *testing.T) {
t.Fatal(err)
}
actions := client.Actions()
if len(actions) != 2 {
if len(actions) != 3 {
t.Fatalf("%s", spew.Sdump(actions))
}
expectGet(t, actions[0], "clusterversions", "", "version")
Expand Down Expand Up @@ -2906,7 +2887,7 @@ func verifyAllStatus(t *testing.T, ch <-chan SyncWorkerStatus, items ...SyncWork
}

if !reflect.DeepEqual(expect, actual) {
t.Fatalf("unexpected status item %d\nExpected: %#v\nActual: %#v", i, expect, actual)
t.Fatalf("unexpected status item %d\nExpected: %v\nActual: %v", i, expect, actual)
}
}
}
Expand Down
10 changes: 6 additions & 4 deletions pkg/cvo/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func Test_SyncWorker_apply(t *testing.T) {
cancelAfter: 2,
wantErr: true,
check: func(t *testing.T, actions []action) {
if len(actions) != 3 {
if len(actions) < 1 {
spew.Dump(actions)
t.Fatalf("unexpected %d actions", len(actions))
}
Expand Down Expand Up @@ -143,8 +143,9 @@ func Test_SyncWorker_apply(t *testing.T) {
cancel: cancel,
remainingErrors: test.cancelAfter,
}
worker.payload = up

err := worker.apply(ctx, up, &SyncWork{}, 1, &statusWrapper{w: worker, previousStatus: worker.Status()})
err := worker.apply(ctx, &SyncWork{}, 1, &statusWrapper{w: worker, previousStatus: worker.Status()})
if !test.wantErr && err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -337,8 +338,9 @@ func Test_SyncWorker_apply_generic(t *testing.T) {
client: dynamicClient,
modifiers: test.modifiers,
}
worker.payload = up
ctx := context.Background()
err := worker.apply(ctx, up, &SyncWork{}, 1, &statusWrapper{w: worker, previousStatus: worker.Status()})
err := worker.apply(ctx, &SyncWork{}, 1, &statusWrapper{w: worker, previousStatus: worker.Status()})
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -411,7 +413,7 @@ func (r *fakeSyncRecorder) StatusCh() <-chan SyncWorkerStatus {
func (r *fakeSyncRecorder) Start(ctx context.Context, maxWorkers int, cvoOptrName string, lister configlistersv1.ClusterVersionLister) {
}

func (r *fakeSyncRecorder) Update(generation int64, desired configv1.Update, overrides []configv1.ComponentOverride, state payload.State) *SyncWorkerStatus {
func (r *fakeSyncRecorder) Update(ctx context.Context, generation int64, desired configv1.Update, overrides []configv1.ComponentOverride, state payload.State, cvoOptrName string, lister configlistersv1.ClusterVersionLister) *SyncWorkerStatus {
r.Updates = append(r.Updates, desired)
return r.Returns
}
Expand Down
Loading