diff --git a/pkg/cvo/cvo.go b/pkg/cvo/cvo.go index 1f7865f9e..631d36fbd 100644 --- a/pkg/cvo/cvo.go +++ b/pkg/cvo/cvo.go @@ -507,11 +507,12 @@ func handleErr(ctx context.Context, queue workqueue.RateLimitingInterface, err e // 1. A ClusterVersion object exists // 2. The ClusterVersion object has the appropriate status for the state of the cluster // 3. The configSync object is kept up to date maintaining the user's desired version +// 4. Loads initial/updated payload releases // // It returns an error if it could not update the cluster version object. func (optr *Operator) sync(ctx context.Context, key string) error { startTime := time.Now() - klog.V(2).Infof("Started syncing cluster version %q (%v)", key, startTime) + klog.V(2).Infof("Started syncing cluster version %q, spec changes, status, and payload (%v)", key, startTime) defer func() { klog.V(2).Infof("Finished syncing cluster version %q (%v)", key, time.Since(startTime)) }() @@ -574,7 +575,7 @@ func (optr *Operator) sync(ctx context.Context, key string) error { } // inform the config sync loop about our desired state - status := optr.configSync.Update(config.Generation, desired, config.Spec.Overrides, state) + status := optr.configSync.Update(ctx, config.Generation, desired, config.Spec.Overrides, state, optr.name, optr.cvLister) // write cluster version status return optr.syncStatus(ctx, original, config, status, errs) diff --git a/pkg/cvo/cvo_scenarios_test.go b/pkg/cvo/cvo_scenarios_test.go index 1d60b9f25..dd6ff09f0 100644 --- a/pkg/cvo/cvo_scenarios_test.go +++ b/pkg/cvo/cvo_scenarios_test.go @@ -204,12 +204,12 @@ func TestCVO_StartupAndSync(t *testing.T) { t.Fatal(err) } actions = client.Actions() - if len(actions) != 2 { + if len(actions) != 3 { t.Fatalf("%s", spew.Sdump(actions)) } expectGet(t, actions[0], "clusterversions", "", "version") actual = cvs["version"].(*configv1.ClusterVersion) - expectUpdateStatus(t, actions[1], "clusterversions", "", &configv1.ClusterVersion{ + expectUpdateStatus(t, actions[2], "clusterversions", "", &configv1.ClusterVersion{ ObjectMeta: metav1.ObjectMeta{ Name: "version", Generation: 1, @@ -231,20 +231,36 @@ func TestCVO_StartupAndSync(t *testing.T) { {Type: ClusterStatusFailing, Status: configv1.ConditionFalse}, {Type: configv1.OperatorProgressing, Status: configv1.ConditionTrue, Message: "Working towards 1.0.0-abc"}, {Type: configv1.RetrievedUpdates, Status: configv1.ConditionFalse}, + {Type: DesiredReleaseAccepted, Status: configv1.ConditionTrue, Reason: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.0-abc\" image=\"image/image:1\""}, }, }, }) verifyAllStatus(t, worker.StatusCh(), SyncWorkerStatus{ Generation: 1, - Step: "RetrievePayload", - Initial: true, Actual: configv1.Release{Version: "1.0.0-abc", Image: "image/image:1"}, + loadPayloadStatus: LoadPayloadStatus{ + Step: "RetrievePayload", + Message: "Retrieving and verifying payload version=\"1.0.0-abc\" image=\"image/image:1\"", + LastTransitionTime: time.Unix(1, 0), + Release: configv1.Release{Version: "1.0.0-abc", Image: "image/image:1"}, + }, + }, + SyncWorkerStatus{ + Generation: 1, + Actual: configv1.Release{Version: "1.0.0-abc", Image: "image/image:1"}, + LastProgress: time.Unix(1, 0), + loadPayloadStatus: LoadPayloadStatus{ + Step: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.0-abc\" image=\"image/image:1\"", + LastTransitionTime: time.Unix(2, 0), + Release: configv1.Release{Version: "1.0.0-abc", Image: "image/image:1"}, + }, }, SyncWorkerStatus{ Generation: 1, Total: 3, - Step: "ApplyResources", Initial: true, VersionHash: "DL-FFQ2Uem8=", Actual: configv1.Release{ @@ -253,12 +269,18 @@ func TestCVO_StartupAndSync(t *testing.T) { URL: configv1.URL("https://example.com/v1.0.0-abc"), Channels: []string{"channel-a", "channel-b", "channel-c"}, }, + LastProgress: time.Unix(2, 0), + loadPayloadStatus: LoadPayloadStatus{ + Step: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.0-abc\" image=\"image/image:1\"", + LastTransitionTime: time.Unix(3, 0), + Release: configv1.Release{Version: "1.0.0-abc", Image: "image/image:1"}, + }, }, SyncWorkerStatus{ Generation: 1, Done: 1, Total: 3, - Step: "ApplyResources", Initial: true, VersionHash: "DL-FFQ2Uem8=", Actual: configv1.Release{ @@ -267,14 +289,19 @@ func TestCVO_StartupAndSync(t *testing.T) { URL: configv1.URL("https://example.com/v1.0.0-abc"), Channels: []string{"channel-a", "channel-b", "channel-c"}, }, - LastProgress: time.Unix(1, 0), + LastProgress: time.Unix(3, 0), + loadPayloadStatus: LoadPayloadStatus{ + Step: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.0-abc\" image=\"image/image:1\"", + LastTransitionTime: time.Unix(4, 0), + Release: configv1.Release{Version: "1.0.0-abc", Image: "image/image:1"}, + }, }, SyncWorkerStatus{ Generation: 1, Done: 2, Total: 3, Initial: true, - Step: "ApplyResources", VersionHash: "DL-FFQ2Uem8=", Actual: configv1.Release{ Version: "1.0.0-abc", @@ -282,7 +309,13 @@ func TestCVO_StartupAndSync(t *testing.T) { URL: configv1.URL("https://example.com/v1.0.0-abc"), Channels: []string{"channel-a", "channel-b", "channel-c"}, }, - LastProgress: time.Unix(2, 0), + LastProgress: time.Unix(4, 0), + loadPayloadStatus: LoadPayloadStatus{ + Step: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.0-abc\" image=\"image/image:1\"", + LastTransitionTime: time.Unix(5, 0), + Release: configv1.Release{Version: "1.0.0-abc", Image: "image/image:1"}, + }, }, SyncWorkerStatus{ Generation: 1, @@ -297,7 +330,13 @@ func TestCVO_StartupAndSync(t *testing.T) { URL: configv1.URL("https://example.com/v1.0.0-abc"), Channels: []string{"channel-a", "channel-b", "channel-c"}, }, - LastProgress: time.Unix(3, 0), + LastProgress: time.Unix(5, 0), + loadPayloadStatus: LoadPayloadStatus{ + Step: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.0-abc\" image=\"image/image:1\"", + LastTransitionTime: time.Unix(6, 0), + Release: configv1.Release{Version: "1.0.0-abc", Image: "image/image:1"}, + }, }, ) @@ -343,6 +382,8 @@ func TestCVO_StartupAndSync(t *testing.T) { {Type: ClusterStatusFailing, Status: configv1.ConditionFalse}, {Type: configv1.OperatorProgressing, Status: configv1.ConditionFalse, Message: "Cluster version is 1.0.0-abc"}, {Type: configv1.RetrievedUpdates, Status: configv1.ConditionFalse}, + {Type: DesiredReleaseAccepted, Status: configv1.ConditionTrue, Reason: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.0-abc\" image=\"image/image:1\""}, }, }, }) @@ -354,7 +395,6 @@ func TestCVO_StartupAndSync(t *testing.T) { Generation: 1, Reconciling: true, Total: 3, - Step: "ApplyResources", VersionHash: "DL-FFQ2Uem8=", Actual: configv1.Release{ Version: "1.0.0-abc", @@ -362,13 +402,18 @@ func TestCVO_StartupAndSync(t *testing.T) { URL: configv1.URL("https://example.com/v1.0.0-abc"), Channels: []string{"channel-a", "channel-b", "channel-c"}, }, + loadPayloadStatus: LoadPayloadStatus{ + Step: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.0-abc\" image=\"image/image:1\"", + LastTransitionTime: time.Unix(1, 0), + Release: configv1.Release{Version: "1.0.0-abc", Image: "image/image:1"}, + }, }, SyncWorkerStatus{ Generation: 1, Reconciling: true, Done: 1, Total: 3, - Step: "ApplyResources", VersionHash: "DL-FFQ2Uem8=", Actual: configv1.Release{ Version: "1.0.0-abc", @@ -376,13 +421,19 @@ func TestCVO_StartupAndSync(t *testing.T) { URL: configv1.URL("https://example.com/v1.0.0-abc"), Channels: []string{"channel-a", "channel-b", "channel-c"}, }, + LastProgress: time.Unix(1, 0), + loadPayloadStatus: LoadPayloadStatus{ + Step: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.0-abc\" image=\"image/image:1\"", + LastTransitionTime: time.Unix(2, 0), + Release: configv1.Release{Version: "1.0.0-abc", Image: "image/image:1"}, + }, }, SyncWorkerStatus{ Generation: 1, Reconciling: true, Done: 2, Total: 3, - Step: "ApplyResources", VersionHash: "DL-FFQ2Uem8=", Actual: configv1.Release{ Version: "1.0.0-abc", @@ -390,6 +441,13 @@ func TestCVO_StartupAndSync(t *testing.T) { URL: configv1.URL("https://example.com/v1.0.0-abc"), Channels: []string{"channel-a", "channel-b", "channel-c"}, }, + LastProgress: time.Unix(2, 0), + loadPayloadStatus: LoadPayloadStatus{ + Step: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.0-abc\" image=\"image/image:1\"", + LastTransitionTime: time.Unix(3, 0), + Release: configv1.Release{Version: "1.0.0-abc", Image: "image/image:1"}, + }, }, SyncWorkerStatus{ Generation: 1, @@ -404,7 +462,13 @@ func TestCVO_StartupAndSync(t *testing.T) { URL: configv1.URL("https://example.com/v1.0.0-abc"), Channels: []string{"channel-a", "channel-b", "channel-c"}, }, - LastProgress: time.Unix(1, 0), + LastProgress: time.Unix(3, 0), + loadPayloadStatus: LoadPayloadStatus{ + Step: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.0-abc\" image=\"image/image:1\"", + LastTransitionTime: time.Unix(4, 0), + Release: configv1.Release{Version: "1.0.0-abc", Image: "image/image:1"}, + }, }, ) @@ -529,12 +593,12 @@ func TestCVO_StartupAndSyncUnverifiedPayload(t *testing.T) { t.Fatal(err) } actions = client.Actions() - if len(actions) != 2 { + if len(actions) != 3 { t.Fatalf("%s", spew.Sdump(actions)) } expectGet(t, actions[0], "clusterversions", "", "version") actual = cvs["version"].(*configv1.ClusterVersion) - expectUpdateStatus(t, actions[1], "clusterversions", "", &configv1.ClusterVersion{ + expectUpdateStatus(t, actions[2], "clusterversions", "", &configv1.ClusterVersion{ ObjectMeta: metav1.ObjectMeta{ Name: "version", Generation: 1, @@ -556,33 +620,38 @@ func TestCVO_StartupAndSyncUnverifiedPayload(t *testing.T) { {Type: ClusterStatusFailing, Status: configv1.ConditionFalse}, {Type: configv1.OperatorProgressing, Status: configv1.ConditionTrue, Message: "Working towards 1.0.0-abc"}, {Type: configv1.RetrievedUpdates, Status: configv1.ConditionFalse}, + {Type: DesiredReleaseAccepted, Status: configv1.ConditionTrue, Reason: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.0-abc\" image=\"image/image:1\""}, }, }, }) verifyAllStatus(t, worker.StatusCh(), SyncWorkerStatus{ - Step: "RetrievePayload", - Initial: true, Actual: configv1.Release{Version: "1.0.0-abc", Image: "image/image:1"}, Generation: 1, + loadPayloadStatus: LoadPayloadStatus{ + Step: "RetrievePayload", + Message: "Retrieving and verifying payload version=\"1.0.0-abc\" image=\"image/image:1\"", + LastTransitionTime: time.Unix(1, 0), + Release: configv1.Release{Version: "1.0.0-abc", Image: "image/image:1"}, + }, }, SyncWorkerStatus{ - Total: 3, - Step: "ApplyResources", - Initial: true, - VersionHash: "DL-FFQ2Uem8=", Actual: configv1.Release{ - Version: "1.0.0-abc", - Image: "image/image:1", - URL: configv1.URL("https://example.com/v1.0.0-abc"), - Channels: []string{"channel-a", "channel-b", "channel-c"}, + Version: "1.0.0-abc", + Image: "image/image:1", + }, + LastProgress: time.Unix(1, 0), + Generation: 1, + loadPayloadStatus: LoadPayloadStatus{ + Step: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.0-abc\" image=\"image/image:1\"", + LastTransitionTime: time.Unix(2, 0), + Release: configv1.Release{Version: "1.0.0-abc", Image: "image/image:1"}, }, - Generation: 1, }, SyncWorkerStatus{ - Done: 1, Total: 3, - Step: "ApplyResources", Initial: true, VersionHash: "DL-FFQ2Uem8=", Actual: configv1.Release{ @@ -591,14 +660,19 @@ func TestCVO_StartupAndSyncUnverifiedPayload(t *testing.T) { URL: configv1.URL("https://example.com/v1.0.0-abc"), Channels: []string{"channel-a", "channel-b", "channel-c"}, }, - LastProgress: time.Unix(1, 0), + LastProgress: time.Unix(2, 0), Generation: 1, + loadPayloadStatus: LoadPayloadStatus{ + Step: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.0-abc\" image=\"image/image:1\"", + LastTransitionTime: time.Unix(3, 0), + Release: configv1.Release{Version: "1.0.0-abc", Image: "image/image:1"}, + }, }, SyncWorkerStatus{ - Done: 2, + Done: 1, Total: 3, Initial: true, - Step: "ApplyResources", VersionHash: "DL-FFQ2Uem8=", Actual: configv1.Release{ Version: "1.0.0-abc", @@ -606,13 +680,18 @@ func TestCVO_StartupAndSyncUnverifiedPayload(t *testing.T) { URL: configv1.URL("https://example.com/v1.0.0-abc"), Channels: []string{"channel-a", "channel-b", "channel-c"}, }, - LastProgress: time.Unix(2, 0), + LastProgress: time.Unix(3, 0), Generation: 1, + loadPayloadStatus: LoadPayloadStatus{ + Step: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.0-abc\" image=\"image/image:1\"", + LastTransitionTime: time.Unix(4, 0), + Release: configv1.Release{Version: "1.0.0-abc", Image: "image/image:1"}, + }, }, SyncWorkerStatus{ - Reconciling: true, - Completed: 1, - Done: 3, + Initial: true, + Done: 2, Total: 3, VersionHash: "DL-FFQ2Uem8=", Actual: configv1.Release{ @@ -621,11 +700,20 @@ func TestCVO_StartupAndSyncUnverifiedPayload(t *testing.T) { URL: configv1.URL("https://example.com/v1.0.0-abc"), Channels: []string{"channel-a", "channel-b", "channel-c"}, }, - LastProgress: time.Unix(3, 0), + LastProgress: time.Unix(4, 0), Generation: 1, + loadPayloadStatus: LoadPayloadStatus{ + Step: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.0-abc\" image=\"image/image:1\"", + LastTransitionTime: time.Unix(5, 0), + Release: configv1.Release{Version: "1.0.0-abc", Image: "image/image:1"}, + }, }, ) + // wait for status to reflect sync of new payload + waitForStatusCompleted(t, worker) + // Step 4: Now that sync is complete, verify status is updated to represent image contents // client.ClearActions() @@ -633,7 +721,9 @@ func TestCVO_StartupAndSyncUnverifiedPayload(t *testing.T) { if err != nil { t.Fatal(err) } + actions = client.Actions() + if len(actions) != 2 { t.Fatalf("%s", spew.Sdump(actions)) } @@ -668,6 +758,8 @@ func TestCVO_StartupAndSyncUnverifiedPayload(t *testing.T) { {Type: ClusterStatusFailing, Status: configv1.ConditionFalse}, {Type: configv1.OperatorProgressing, Status: configv1.ConditionFalse, Message: "Cluster version is 1.0.0-abc"}, {Type: configv1.RetrievedUpdates, Status: configv1.ConditionFalse}, + {Type: DesiredReleaseAccepted, Status: configv1.ConditionTrue, Reason: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.0-abc\" image=\"image/image:1\""}, }, }, }) @@ -678,7 +770,6 @@ func TestCVO_StartupAndSyncUnverifiedPayload(t *testing.T) { SyncWorkerStatus{ Reconciling: true, Total: 3, - Step: "ApplyResources", VersionHash: "DL-FFQ2Uem8=", Actual: configv1.Release{ Version: "1.0.0-abc", @@ -687,12 +778,17 @@ func TestCVO_StartupAndSyncUnverifiedPayload(t *testing.T) { Channels: []string{"channel-a", "channel-b", "channel-c"}, }, Generation: 1, + loadPayloadStatus: LoadPayloadStatus{ + Step: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.0-abc\" image=\"image/image:1\"", + LastTransitionTime: time.Unix(1, 0), + Release: configv1.Release{Version: "1.0.0-abc", Image: "image/image:1"}, + }, }, SyncWorkerStatus{ Reconciling: true, Done: 1, Total: 3, - Step: "ApplyResources", VersionHash: "DL-FFQ2Uem8=", Actual: configv1.Release{ Version: "1.0.0-abc", @@ -700,13 +796,19 @@ func TestCVO_StartupAndSyncUnverifiedPayload(t *testing.T) { URL: configv1.URL("https://example.com/v1.0.0-abc"), Channels: []string{"channel-a", "channel-b", "channel-c"}, }, - Generation: 1, + Generation: 1, + LastProgress: time.Unix(1, 0), + loadPayloadStatus: LoadPayloadStatus{ + Step: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.0-abc\" image=\"image/image:1\"", + LastTransitionTime: time.Unix(2, 0), + Release: configv1.Release{Version: "1.0.0-abc", Image: "image/image:1"}, + }, }, SyncWorkerStatus{ Reconciling: true, Done: 2, Total: 3, - Step: "ApplyResources", VersionHash: "DL-FFQ2Uem8=", Actual: configv1.Release{ Version: "1.0.0-abc", @@ -714,7 +816,14 @@ func TestCVO_StartupAndSyncUnverifiedPayload(t *testing.T) { URL: configv1.URL("https://example.com/v1.0.0-abc"), Channels: []string{"channel-a", "channel-b", "channel-c"}, }, - Generation: 1, + Generation: 1, + LastProgress: time.Unix(2, 0), + loadPayloadStatus: LoadPayloadStatus{ + Step: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.0-abc\" image=\"image/image:1\"", + LastTransitionTime: time.Unix(3, 0), + Release: configv1.Release{Version: "1.0.0-abc", Image: "image/image:1"}, + }, }, SyncWorkerStatus{ Reconciling: true, @@ -728,8 +837,14 @@ func TestCVO_StartupAndSyncUnverifiedPayload(t *testing.T) { URL: configv1.URL("https://example.com/v1.0.0-abc"), Channels: []string{"channel-a", "channel-b", "channel-c"}, }, - LastProgress: time.Unix(1, 0), + LastProgress: time.Unix(3, 0), Generation: 1, + loadPayloadStatus: LoadPayloadStatus{ + Step: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.0-abc\" image=\"image/image:1\"", + LastTransitionTime: time.Unix(4, 0), + Release: configv1.Release{Version: "1.0.0-abc", Image: "image/image:1"}, + }, }, ) @@ -843,12 +958,12 @@ func TestCVO_StartupAndSyncPreconditionFailing(t *testing.T) { t.Fatal(err) } actions = client.Actions() - if len(actions) != 2 { + if len(actions) != 3 { t.Fatalf("%s", spew.Sdump(actions)) } expectGet(t, actions[0], "clusterversions", "", "version") actual = cvs["version"].(*configv1.ClusterVersion) - expectUpdateStatus(t, actions[1], "clusterversions", "", &configv1.ClusterVersion{ + expectUpdateStatus(t, actions[2], "clusterversions", "", &configv1.ClusterVersion{ ObjectMeta: metav1.ObjectMeta{ Name: "version", Generation: 1, @@ -870,33 +985,38 @@ func TestCVO_StartupAndSyncPreconditionFailing(t *testing.T) { {Type: ClusterStatusFailing, Status: configv1.ConditionFalse}, {Type: configv1.OperatorProgressing, Status: configv1.ConditionTrue, Message: "Working towards 1.0.0-abc"}, {Type: configv1.RetrievedUpdates, Status: configv1.ConditionFalse}, + {Type: DesiredReleaseAccepted, Status: configv1.ConditionTrue, Reason: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.0-abc\" image=\"image/image:1\""}, }, }, }) verifyAllStatus(t, worker.StatusCh(), SyncWorkerStatus{ - Step: "RetrievePayload", - Initial: true, Actual: configv1.Release{Version: "1.0.0-abc", Image: "image/image:1"}, Generation: 1, + loadPayloadStatus: LoadPayloadStatus{ + Step: "RetrievePayload", + Message: "Retrieving and verifying payload version=\"1.0.0-abc\" image=\"image/image:1\"", + LastTransitionTime: time.Unix(1, 0), + Release: configv1.Release{Version: "1.0.0-abc", Image: "image/image:1"}, + }, }, SyncWorkerStatus{ - Total: 3, - Step: "ApplyResources", - Initial: true, - VersionHash: "DL-FFQ2Uem8=", Actual: configv1.Release{ - Version: "1.0.0-abc", - Image: "image/image:1", - URL: configv1.URL("https://example.com/v1.0.0-abc"), - Channels: []string{"channel-a", "channel-b", "channel-c"}, + Version: "1.0.0-abc", + Image: "image/image:1", + }, + Generation: 1, + LastProgress: time.Unix(1, 0), + loadPayloadStatus: LoadPayloadStatus{ + Step: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.0-abc\" image=\"image/image:1\"", + LastTransitionTime: time.Unix(2, 0), + Release: configv1.Release{Version: "1.0.0-abc", Image: "image/image:1"}, }, - Generation: 1, }, SyncWorkerStatus{ - Done: 1, Total: 3, - Step: "ApplyResources", Initial: true, VersionHash: "DL-FFQ2Uem8=", Actual: configv1.Release{ @@ -905,14 +1025,19 @@ func TestCVO_StartupAndSyncPreconditionFailing(t *testing.T) { URL: configv1.URL("https://example.com/v1.0.0-abc"), Channels: []string{"channel-a", "channel-b", "channel-c"}, }, - LastProgress: time.Unix(1, 0), + LastProgress: time.Unix(2, 0), Generation: 1, + loadPayloadStatus: LoadPayloadStatus{ + Step: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.0-abc\" image=\"image/image:1\"", + LastTransitionTime: time.Unix(3, 0), + Release: configv1.Release{Version: "1.0.0-abc", Image: "image/image:1"}, + }, }, SyncWorkerStatus{ - Done: 2, + Done: 1, Total: 3, Initial: true, - Step: "ApplyResources", VersionHash: "DL-FFQ2Uem8=", Actual: configv1.Release{ Version: "1.0.0-abc", @@ -920,14 +1045,19 @@ func TestCVO_StartupAndSyncPreconditionFailing(t *testing.T) { URL: configv1.URL("https://example.com/v1.0.0-abc"), Channels: []string{"channel-a", "channel-b", "channel-c"}, }, - LastProgress: time.Unix(2, 0), + LastProgress: time.Unix(3, 0), Generation: 1, + loadPayloadStatus: LoadPayloadStatus{ + Step: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.0-abc\" image=\"image/image:1\"", + LastTransitionTime: time.Unix(4, 0), + Release: configv1.Release{Version: "1.0.0-abc", Image: "image/image:1"}, + }, }, SyncWorkerStatus{ - Reconciling: true, - Completed: 1, - Done: 3, + Done: 2, Total: 3, + Initial: true, VersionHash: "DL-FFQ2Uem8=", Actual: configv1.Release{ Version: "1.0.0-abc", @@ -935,11 +1065,20 @@ func TestCVO_StartupAndSyncPreconditionFailing(t *testing.T) { URL: configv1.URL("https://example.com/v1.0.0-abc"), Channels: []string{"channel-a", "channel-b", "channel-c"}, }, - LastProgress: time.Unix(3, 0), + LastProgress: time.Unix(4, 0), Generation: 1, + loadPayloadStatus: LoadPayloadStatus{ + Step: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.0-abc\" image=\"image/image:1\"", + LastTransitionTime: time.Unix(5, 0), + Release: configv1.Release{Version: "1.0.0-abc", Image: "image/image:1"}, + }, }, ) + // wait for status to reflect sync of new payload + waitForStatusCompleted(t, worker) + // Step 4: Now that sync is complete, verify status is updated to represent image contents // client.ClearActions() @@ -947,6 +1086,7 @@ func TestCVO_StartupAndSyncPreconditionFailing(t *testing.T) { if err != nil { t.Fatal(err) } + actions = client.Actions() if len(actions) != 2 { t.Fatalf("%s", spew.Sdump(actions)) @@ -975,6 +1115,7 @@ func TestCVO_StartupAndSyncPreconditionFailing(t *testing.T) { }, VersionHash: "DL-FFQ2Uem8=", History: []configv1.UpdateHistory{ + //{State: configv1.PartialUpdate, Image: "image/image:1", Version: "1.0.0-abc", StartedTime: defaultStartedTime}, {State: configv1.CompletedUpdate, Image: "image/image:1", Version: "1.0.0-abc", StartedTime: defaultStartedTime, CompletionTime: &defaultCompletionTime}, }, Conditions: []configv1.ClusterOperatorStatusCondition{ @@ -982,6 +1123,8 @@ func TestCVO_StartupAndSyncPreconditionFailing(t *testing.T) { {Type: ClusterStatusFailing, Status: configv1.ConditionFalse}, {Type: configv1.OperatorProgressing, Status: configv1.ConditionFalse, Message: "Cluster version is 1.0.0-abc"}, {Type: configv1.RetrievedUpdates, Status: configv1.ConditionFalse}, + {Type: DesiredReleaseAccepted, Status: configv1.ConditionTrue, Reason: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.0-abc\" image=\"image/image:1\""}, }, }, }) @@ -992,7 +1135,6 @@ func TestCVO_StartupAndSyncPreconditionFailing(t *testing.T) { SyncWorkerStatus{ Reconciling: true, Total: 3, - Step: "ApplyResources", VersionHash: "DL-FFQ2Uem8=", Actual: configv1.Release{ Version: "1.0.0-abc", @@ -1001,12 +1143,17 @@ func TestCVO_StartupAndSyncPreconditionFailing(t *testing.T) { Channels: []string{"channel-a", "channel-b", "channel-c"}, }, Generation: 1, + loadPayloadStatus: LoadPayloadStatus{ + Step: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.0-abc\" image=\"image/image:1\"", + LastTransitionTime: time.Unix(1, 0), + Release: configv1.Release{Version: "1.0.0-abc", Image: "image/image:1"}, + }, }, SyncWorkerStatus{ Reconciling: true, Done: 1, Total: 3, - Step: "ApplyResources", VersionHash: "DL-FFQ2Uem8=", Actual: configv1.Release{ Version: "1.0.0-abc", @@ -1014,13 +1161,19 @@ func TestCVO_StartupAndSyncPreconditionFailing(t *testing.T) { URL: configv1.URL("https://example.com/v1.0.0-abc"), Channels: []string{"channel-a", "channel-b", "channel-c"}, }, - Generation: 1, + LastProgress: time.Unix(1, 0), + Generation: 1, + loadPayloadStatus: LoadPayloadStatus{ + Step: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.0-abc\" image=\"image/image:1\"", + LastTransitionTime: time.Unix(2, 0), + Release: configv1.Release{Version: "1.0.0-abc", Image: "image/image:1"}, + }, }, SyncWorkerStatus{ Reconciling: true, Done: 2, Total: 3, - Step: "ApplyResources", VersionHash: "DL-FFQ2Uem8=", Actual: configv1.Release{ Version: "1.0.0-abc", @@ -1028,7 +1181,14 @@ func TestCVO_StartupAndSyncPreconditionFailing(t *testing.T) { URL: configv1.URL("https://example.com/v1.0.0-abc"), Channels: []string{"channel-a", "channel-b", "channel-c"}, }, - Generation: 1, + LastProgress: time.Unix(2, 0), + Generation: 1, + loadPayloadStatus: LoadPayloadStatus{ + Step: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.0-abc\" image=\"image/image:1\"", + LastTransitionTime: time.Unix(3, 0), + Release: configv1.Release{Version: "1.0.0-abc", Image: "image/image:1"}, + }, }, SyncWorkerStatus{ Reconciling: true, @@ -1042,8 +1202,14 @@ func TestCVO_StartupAndSyncPreconditionFailing(t *testing.T) { URL: configv1.URL("https://example.com/v1.0.0-abc"), Channels: []string{"channel-a", "channel-b", "channel-c"}, }, - LastProgress: time.Unix(1, 0), + LastProgress: time.Unix(3, 0), Generation: 1, + loadPayloadStatus: LoadPayloadStatus{ + Step: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.0-abc\" image=\"image/image:1\"", + LastTransitionTime: time.Unix(4, 0), + Release: configv1.Release{Version: "1.0.0-abc", Image: "image/image:1"}, + }, }, ) @@ -1089,12 +1255,6 @@ func TestCVO_UpgradeUnverifiedPayload(t *testing.T) { History: []configv1.UpdateHistory{ {State: configv1.CompletedUpdate, Image: "image/image:0", Version: "1.0.0-abc", Verified: true, StartedTime: defaultStartedTime, CompletionTime: &defaultCompletionTime}, }, - Conditions: []configv1.ClusterOperatorStatusCondition{ - {Type: configv1.OperatorAvailable, Status: configv1.ConditionTrue, Message: "Done applying 1.0.0-abc"}, - {Type: ClusterStatusFailing, Status: configv1.ConditionFalse}, - {Type: configv1.OperatorProgressing, Status: configv1.ConditionFalse, Message: "Cluster version is 1.0.0-abc"}, - {Type: configv1.RetrievedUpdates, Status: configv1.ConditionFalse}, - }, }, } @@ -1130,33 +1290,38 @@ func TestCVO_UpgradeUnverifiedPayload(t *testing.T) { verifyAllStatus(t, worker.StatusCh(), SyncWorkerStatus{ - Step: "RetrievePayload", Actual: configv1.Release{Version: "1.0.1-abc", Image: "image/image:1"}, Generation: 1, + loadPayloadStatus: LoadPayloadStatus{ + Step: "RetrievePayload", + Message: "Retrieving and verifying payload version=\"1.0.1-abc\" image=\"image/image:1\"", + LastTransitionTime: time.Unix(1, 0), + Release: configv1.Release{Version: "1.0.1-abc", Image: "image/image:1"}, + }, }, SyncWorkerStatus{ - Step: "RetrievePayload", - Failure: payloadErr, - Actual: configv1.Release{Version: "1.0.1-abc", Image: "image/image:1"}, - Generation: 1, + Actual: configv1.Release{Version: "1.0.1-abc", Image: "image/image:1"}, + Generation: 1, + LastProgress: time.Unix(1, 0), + loadPayloadStatus: LoadPayloadStatus{ + Step: "RetrievePayload", + Message: "Retrieving payload failed version=\"1.0.1-abc\" image=\"image/image:1\" failure=The update cannot be verified: some random error", + LastTransitionTime: time.Unix(2, 0), + Release: configv1.Release{Version: "1.0.1-abc", Image: "image/image:1"}, + Failure: payloadErr, + }, }, ) - - client.ClearActions() - err = o.sync(ctx, o.queueKey()) - if err != nil { - t.Fatal(err) - } actions = client.Actions() - if len(actions) != 2 { + if len(actions) != 3 { t.Fatalf("%s", spew.Sdump(actions)) } expectGet(t, actions[0], "clusterversions", "", "version") actual := cvs["version"].(*configv1.ClusterVersion) - expectUpdateStatus(t, actions[1], "clusterversions", "", &configv1.ClusterVersion{ + expectUpdateStatus(t, actions[2], "clusterversions", "", &configv1.ClusterVersion{ ObjectMeta: metav1.ObjectMeta{ Name: "version", - ResourceVersion: "2", + ResourceVersion: "1", Generation: 1, }, Spec: configv1.ClusterVersionSpec{ @@ -1174,10 +1339,11 @@ func TestCVO_UpgradeUnverifiedPayload(t *testing.T) { {State: configv1.CompletedUpdate, Image: "image/image:0", Version: "1.0.0-abc", Verified: true, StartedTime: defaultStartedTime, CompletionTime: &defaultCompletionTime}, }, Conditions: []configv1.ClusterOperatorStatusCondition{ - {Type: configv1.OperatorAvailable, Status: configv1.ConditionTrue, Message: "Done applying 1.0.0-abc"}, - // cleared failing status and set progressing - {Type: ClusterStatusFailing, Status: configv1.ConditionTrue, Reason: "ImageVerificationFailed", Message: "The update cannot be verified: some random error"}, - {Type: configv1.OperatorProgressing, Status: configv1.ConditionTrue, Reason: "ImageVerificationFailed", Message: "Unable to apply 1.0.1-abc: the image may not be safe to use"}, + {Type: DesiredReleaseAccepted, Status: configv1.ConditionFalse, Reason: "RetrievePayload", + Message: "Retrieving payload failed version=\"1.0.1-abc\" image=\"image/image:1\" failure=The update cannot be verified: some random error"}, + {Type: "Available", Status: "False"}, + {Type: "Failing", Status: "False"}, + {Type: "Progressing", Status: "True", Message: "Working towards 1.0.1-abc"}, {Type: configv1.RetrievedUpdates, Status: configv1.ConditionFalse}, }, }, @@ -1207,9 +1373,9 @@ func TestCVO_UpgradeUnverifiedPayload(t *testing.T) { select { case status = <-worker.StatusCh(): case <-time.After(3 * time.Second): - t.Fatalf("never saw expected sync event") + t.Fatalf("never saw expected retrieve payload event") } - if status.Step == "RetrievePayload" && reflect.DeepEqual(configv1.Release{Version: "1.0.1-abc", Image: "image/image:1"}, status.Actual) { + if reflect.DeepEqual(configv1.Release{Version: "1.0.1-abc", Image: "image/image:1"}, status.Actual) { break } t.Logf("Unexpected status waiting to see first retrieve: %#v", status) @@ -1218,10 +1384,27 @@ func TestCVO_UpgradeUnverifiedPayload(t *testing.T) { t.Fatalf("saw too many sync events of the wrong form") } } + // wait until the new payload is applied + count = 0 + for { + var status SyncWorkerStatus + select { + case status = <-worker.StatusCh(): + case <-time.After(3 * time.Second): + t.Fatalf("never saw expected apply event") + } + if status.loadPayloadStatus.Step == "PayloadLoaded" { + break + } + t.Log("Waiting to see step PayloadLoaded") + count++ + if count > 8 { + t.Fatalf("saw too many sync events of the wrong form") + } + } verifyAllStatus(t, worker.StatusCh(), SyncWorkerStatus{ Total: 3, - Step: "ApplyResources", VersionHash: "DL-FFQ2Uem8=", Actual: configv1.Release{ Version: "1.0.1-abc", @@ -1229,48 +1412,18 @@ func TestCVO_UpgradeUnverifiedPayload(t *testing.T) { URL: configv1.URL("https://example.com/v1.0.1-abc"), }, Generation: 1, - }, - SyncWorkerStatus{ - Done: 1, - Total: 3, - Step: "ApplyResources", - VersionHash: "DL-FFQ2Uem8=", - Actual: configv1.Release{ - Version: "1.0.1-abc", - Image: "image/image:1", - URL: configv1.URL("https://example.com/v1.0.1-abc"), - }, - LastProgress: time.Unix(1, 0), - Generation: 1, - }, - SyncWorkerStatus{ - Done: 2, - Total: 3, - Step: "ApplyResources", - VersionHash: "DL-FFQ2Uem8=", - Actual: configv1.Release{ - Version: "1.0.1-abc", - Image: "image/image:1", - URL: configv1.URL("https://example.com/v1.0.1-abc"), - }, - LastProgress: time.Unix(2, 0), - Generation: 1, - }, - SyncWorkerStatus{ - Reconciling: true, - Completed: 1, - Done: 3, - Total: 3, - VersionHash: "DL-FFQ2Uem8=", - Actual: configv1.Release{ - Version: "1.0.1-abc", - Image: "image/image:1", - URL: configv1.URL("https://example.com/v1.0.1-abc"), + loadPayloadStatus: LoadPayloadStatus{ + Step: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.1-abc\" image=\"image/image:1\"", + LastTransitionTime: time.Unix(1, 0), + Release: configv1.Release{Version: "1.0.1-abc", Image: "image/image:1"}, }, - LastProgress: time.Unix(3, 0), - Generation: 1, }, ) + + // wait for status to reflect sync of new payload + waitForStatusCompleted(t, worker) + client.ClearActions() err = o.sync(ctx, o.queueKey()) if err != nil { @@ -1305,6 +1458,8 @@ func TestCVO_UpgradeUnverifiedPayload(t *testing.T) { {State: configv1.CompletedUpdate, Image: "image/image:0", Version: "1.0.0-abc", Verified: true, StartedTime: defaultStartedTime, CompletionTime: &defaultCompletionTime}, }, Conditions: []configv1.ClusterOperatorStatusCondition{ + {Type: DesiredReleaseAccepted, Status: configv1.ConditionTrue, Reason: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.1-abc\" image=\"image/image:1\""}, {Type: configv1.OperatorAvailable, Status: configv1.ConditionTrue, Message: "Done applying 1.0.1-abc"}, {Type: ClusterStatusFailing, Status: configv1.ConditionFalse}, {Type: configv1.OperatorProgressing, Status: configv1.ConditionFalse, Message: "Cluster version is 1.0.1-abc"}, @@ -1383,33 +1538,38 @@ func TestCVO_UpgradeUnverifiedPayloadRetrieveOnce(t *testing.T) { verifyAllStatus(t, worker.StatusCh(), SyncWorkerStatus{ - Step: "RetrievePayload", Actual: configv1.Release{Version: "1.0.1-abc", Image: "image/image:1"}, Generation: 1, + loadPayloadStatus: LoadPayloadStatus{ + Step: "RetrievePayload", + Message: "Retrieving and verifying payload version=\"1.0.1-abc\" image=\"image/image:1\"", + LastTransitionTime: time.Unix(1, 0), + Release: configv1.Release{Version: "1.0.1-abc", Image: "image/image:1"}, + }, }, SyncWorkerStatus{ - Step: "RetrievePayload", - Failure: payloadErr, - Actual: configv1.Release{Version: "1.0.1-abc", Image: "image/image:1"}, - Generation: 1, + Actual: configv1.Release{Version: "1.0.1-abc", Image: "image/image:1"}, + Generation: 1, + LastProgress: time.Unix(1, 0), + loadPayloadStatus: LoadPayloadStatus{ + Step: "RetrievePayload", + Message: "Retrieving payload failed version=\"1.0.1-abc\" image=\"image/image:1\" failure=The update cannot be verified: some random error", + LastTransitionTime: time.Unix(2, 0), + Release: configv1.Release{Version: "1.0.1-abc", Image: "image/image:1"}, + Failure: payloadErr, + }, }, ) - - client.ClearActions() - err = o.sync(ctx, o.queueKey()) - if err != nil { - t.Fatal(err) - } actions = client.Actions() - if len(actions) != 2 { + if len(actions) != 3 { t.Fatalf("%s", spew.Sdump(actions)) } expectGet(t, actions[0], "clusterversions", "", "version") actual := cvs["version"].(*configv1.ClusterVersion) - expectUpdateStatus(t, actions[1], "clusterversions", "", &configv1.ClusterVersion{ + expectUpdateStatus(t, actions[2], "clusterversions", "", &configv1.ClusterVersion{ ObjectMeta: metav1.ObjectMeta{ Name: "version", - ResourceVersion: "2", + ResourceVersion: "1", Generation: 1, }, Spec: configv1.ClusterVersionSpec{ @@ -1429,9 +1589,11 @@ func TestCVO_UpgradeUnverifiedPayloadRetrieveOnce(t *testing.T) { Conditions: []configv1.ClusterOperatorStatusCondition{ {Type: configv1.OperatorAvailable, Status: configv1.ConditionTrue, Message: "Done applying 1.0.0-abc"}, // cleared failing status and set progressing - {Type: ClusterStatusFailing, Status: configv1.ConditionTrue, Reason: "ImageVerificationFailed", Message: "The update cannot be verified: some random error"}, - {Type: configv1.OperatorProgressing, Status: configv1.ConditionTrue, Reason: "ImageVerificationFailed", Message: "Unable to apply 1.0.1-abc: the image may not be safe to use"}, + {Type: ClusterStatusFailing, Status: configv1.ConditionFalse}, + {Type: configv1.OperatorProgressing, Status: configv1.ConditionTrue, Message: "Working towards 1.0.1-abc"}, {Type: configv1.RetrievedUpdates, Status: configv1.ConditionFalse}, + {Type: DesiredReleaseAccepted, Status: configv1.ConditionFalse, Reason: "RetrievePayload", + Message: "Retrieving payload failed version=\"1.0.1-abc\" image=\"image/image:1\" failure=The update cannot be verified: some random error"}, }, }, }) @@ -1460,9 +1622,9 @@ func TestCVO_UpgradeUnverifiedPayloadRetrieveOnce(t *testing.T) { select { case status = <-worker.StatusCh(): case <-time.After(3 * time.Second): - t.Fatalf("never saw expected sync event") + t.Fatalf("never saw expected retrieve payload event") } - if status.Step == "RetrievePayload" && reflect.DeepEqual(configv1.Release{Version: "1.0.1-abc", Image: "image/image:1"}, status.Actual) { + if reflect.DeepEqual(configv1.Release{Version: "1.0.1-abc", Image: "image/image:1"}, status.Actual) { break } t.Logf("Unexpected status waiting to see first retrieve: %#v", status) @@ -1471,59 +1633,47 @@ func TestCVO_UpgradeUnverifiedPayloadRetrieveOnce(t *testing.T) { t.Fatalf("saw too many sync events of the wrong form") } } + // wait until the new payload is applied + count = 0 + for { + var status SyncWorkerStatus + select { + case status = <-worker.StatusCh(): + case <-time.After(3 * time.Second): + t.Fatalf("never saw expected apply event") + } + if status.loadPayloadStatus.Step == "PayloadLoaded" { + break + } + t.Log("Waiting to see step PayloadLoaded") + count++ + if count > 8 { + t.Fatalf("saw too many sync events of the wrong form") + } + } verifyAllStatus(t, worker.StatusCh(), SyncWorkerStatus{ Total: 3, - Step: "ApplyResources", VersionHash: "DL-FFQ2Uem8=", Actual: configv1.Release{ Version: "1.0.1-abc", Image: "image/image:1", URL: configv1.URL("https://example.com/v1.0.1-abc"), }, + //LastProgress: time.Unix(1, 0), Generation: 1, - }, - SyncWorkerStatus{ - Done: 1, - Total: 3, - Step: "ApplyResources", - VersionHash: "DL-FFQ2Uem8=", - Actual: configv1.Release{ - Version: "1.0.1-abc", - Image: "image/image:1", - URL: configv1.URL("https://example.com/v1.0.1-abc"), - }, - LastProgress: time.Unix(1, 0), - Generation: 1, - }, - SyncWorkerStatus{ - Done: 2, - Total: 3, - Step: "ApplyResources", - VersionHash: "DL-FFQ2Uem8=", - Actual: configv1.Release{ - Version: "1.0.1-abc", - Image: "image/image:1", - URL: configv1.URL("https://example.com/v1.0.1-abc"), + loadPayloadStatus: LoadPayloadStatus{ + Step: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.1-abc\" image=\"image/image:1\"", + LastTransitionTime: time.Unix(1, 0), + Release: configv1.Release{Version: "1.0.1-abc", Image: "image/image:1"}, }, - LastProgress: time.Unix(2, 0), - Generation: 1, - }, - SyncWorkerStatus{ - Reconciling: true, - Completed: 1, - Done: 3, - Total: 3, - VersionHash: "DL-FFQ2Uem8=", - Actual: configv1.Release{ - Version: "1.0.1-abc", - Image: "image/image:1", - URL: configv1.URL("https://example.com/v1.0.1-abc"), - }, - LastProgress: time.Unix(3, 0), - Generation: 1, }, ) + + // wait for status to reflect sync of new payload + waitForStatusCompleted(t, worker) + client.ClearActions() err = o.sync(ctx, o.queueKey()) if err != nil { @@ -1563,6 +1713,8 @@ func TestCVO_UpgradeUnverifiedPayloadRetrieveOnce(t *testing.T) { {Type: ClusterStatusFailing, Status: configv1.ConditionFalse}, {Type: configv1.OperatorProgressing, Status: configv1.ConditionFalse, Message: "Cluster version is 1.0.1-abc"}, {Type: configv1.RetrievedUpdates, Status: configv1.ConditionFalse}, + {Type: DesiredReleaseAccepted, Status: configv1.ConditionTrue, Reason: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.1-abc\" image=\"image/image:1\""}, }, }, }) @@ -1573,7 +1725,6 @@ func TestCVO_UpgradeUnverifiedPayloadRetrieveOnce(t *testing.T) { SyncWorkerStatus{ Reconciling: true, Total: 3, - Step: "ApplyResources", VersionHash: "DL-FFQ2Uem8=", Actual: configv1.Release{ Version: "1.0.1-abc", @@ -1581,32 +1732,50 @@ func TestCVO_UpgradeUnverifiedPayloadRetrieveOnce(t *testing.T) { URL: configv1.URL("https://example.com/v1.0.1-abc"), }, Generation: 1, + loadPayloadStatus: LoadPayloadStatus{ + Step: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.1-abc\" image=\"image/image:1\"", + LastTransitionTime: time.Unix(1, 0), + Release: configv1.Release{Version: "1.0.1-abc", Image: "image/image:1"}, + }, }, SyncWorkerStatus{ Reconciling: true, Done: 1, Total: 3, - Step: "ApplyResources", VersionHash: "DL-FFQ2Uem8=", Actual: configv1.Release{ Version: "1.0.1-abc", Image: "image/image:1", URL: configv1.URL("https://example.com/v1.0.1-abc"), }, - Generation: 1, + Generation: 1, + LastProgress: time.Unix(1, 0), + loadPayloadStatus: LoadPayloadStatus{ + Step: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.1-abc\" image=\"image/image:1\"", + LastTransitionTime: time.Unix(2, 0), + Release: configv1.Release{Version: "1.0.1-abc", Image: "image/image:1"}, + }, }, SyncWorkerStatus{ Reconciling: true, Done: 2, Total: 3, - Step: "ApplyResources", VersionHash: "DL-FFQ2Uem8=", Actual: configv1.Release{ Version: "1.0.1-abc", Image: "image/image:1", URL: configv1.URL("https://example.com/v1.0.1-abc"), }, - Generation: 1, + Generation: 1, + LastProgress: time.Unix(2, 0), + loadPayloadStatus: LoadPayloadStatus{ + Step: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.1-abc\" image=\"image/image:1\"", + LastTransitionTime: time.Unix(3, 0), + Release: configv1.Release{Version: "1.0.1-abc", Image: "image/image:1"}, + }, }, SyncWorkerStatus{ Reconciling: true, @@ -1619,8 +1788,14 @@ func TestCVO_UpgradeUnverifiedPayloadRetrieveOnce(t *testing.T) { Image: "image/image:1", URL: configv1.URL("https://example.com/v1.0.1-abc"), }, - LastProgress: time.Unix(1, 0), + LastProgress: time.Unix(3, 0), Generation: 1, + loadPayloadStatus: LoadPayloadStatus{ + Step: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.1-abc\" image=\"image/image:1\"", + LastTransitionTime: time.Unix(4, 0), + Release: configv1.Release{Version: "1.0.1-abc", Image: "image/image:1"}, + }, }, ) } @@ -1684,20 +1859,26 @@ func TestCVO_UpgradePreconditionFailing(t *testing.T) { verifyAllStatus(t, worker.StatusCh(), SyncWorkerStatus{ - Step: "RetrievePayload", - Actual: configv1.Release{Version: "1.0.1-abc", Image: "image/image:1"}, - Generation: 1, - }, - SyncWorkerStatus{ - Step: "PreconditionChecks", Actual: configv1.Release{Version: "1.0.1-abc", Image: "image/image:1"}, Generation: 1, + loadPayloadStatus: LoadPayloadStatus{ + Step: "RetrievePayload", + Message: "Retrieving and verifying payload version=\"1.0.1-abc\" image=\"image/image:1\"", + LastTransitionTime: time.Unix(1, 0), + Release: configv1.Release{Version: "1.0.1-abc", Image: "image/image:1"}, + }, }, SyncWorkerStatus{ - Step: "PreconditionChecks", - Failure: &payload.UpdateError{Reason: "UpgradePreconditionCheckFailed", Message: "Precondition \"TestPrecondition SuccessAfter: 3\" failed because of \"CheckFailure\": failing, attempt: 1 will succeed after 3 attempt", Name: "PreconditionCheck"}, - Actual: configv1.Release{Version: "1.0.1-abc", Image: "image/image:1"}, - Generation: 1, + Actual: configv1.Release{Version: "1.0.1-abc", Image: "image/image:1"}, + Generation: 1, + LastProgress: time.Unix(1, 0), + loadPayloadStatus: LoadPayloadStatus{ + Step: "PreconditionChecks", + Message: "Preconditions failed for payload loaded version=\"1.0.1-abc\" image=\"image/image:1\": Precondition \"TestPrecondition SuccessAfter: 3\" failed because of \"CheckFailure\": failing, attempt: 1 will succeed after 3 attempt", + LastTransitionTime: time.Unix(2, 0), + Release: configv1.Release{Version: "1.0.1-abc", Image: "image/image:1"}, + Failure: &payload.UpdateError{Reason: "UpgradePreconditionCheckFailed", Message: "Precondition \"TestPrecondition SuccessAfter: 3\" failed because of \"CheckFailure\": failing, attempt: 1 will succeed after 3 attempt", Name: "PreconditionCheck"}, + }, }, ) @@ -1707,40 +1888,11 @@ func TestCVO_UpgradePreconditionFailing(t *testing.T) { t.Fatal(err) } actions = client.Actions() - if len(actions) != 2 { + if len(actions) != 1 { t.Fatalf("%s", spew.Sdump(actions)) } expectGet(t, actions[0], "clusterversions", "", "version") actual := cvs["version"].(*configv1.ClusterVersion) - expectUpdateStatus(t, actions[1], "clusterversions", "", &configv1.ClusterVersion{ - ObjectMeta: metav1.ObjectMeta{ - Name: "version", - ResourceVersion: "2", - Generation: 1, - }, - Spec: configv1.ClusterVersionSpec{ - ClusterID: clusterUID, - Channel: "fast", - DesiredUpdate: &configv1.Update{Version: desired.Version, Image: desired.Image}, - }, - Status: configv1.ClusterVersionStatus{ - // Prefers the image version over the operator's version (although in general they will remain in sync) - ObservedGeneration: 1, - Desired: desired, - VersionHash: "DL-FFQ2Uem8=", - History: []configv1.UpdateHistory{ - {State: configv1.PartialUpdate, Image: "image/image:1", Version: "1.0.1-abc", StartedTime: defaultStartedTime}, - {State: configv1.CompletedUpdate, Image: "image/image:0", Version: "1.0.0-abc", Verified: true, StartedTime: defaultStartedTime, CompletionTime: &defaultCompletionTime}, - }, - Conditions: []configv1.ClusterOperatorStatusCondition{ - {Type: configv1.OperatorAvailable, Status: configv1.ConditionTrue, Message: "Done applying 1.0.0-abc"}, - // cleared failing status and set progressing - {Type: ClusterStatusFailing, Status: configv1.ConditionTrue, Reason: "UpgradePreconditionCheckFailed", Message: "Precondition \"TestPrecondition SuccessAfter: 3\" failed because of \"CheckFailure\": failing, attempt: 1 will succeed after 3 attempt"}, - {Type: configv1.OperatorProgressing, Status: configv1.ConditionTrue, Reason: "UpgradePreconditionCheckFailed", Message: "Unable to apply 1.0.1-abc: it may not be safe to apply this update"}, - {Type: configv1.RetrievedUpdates, Status: configv1.ConditionFalse}, - }, - }, - }) // Step 2: Set allowUnverifiedImages to true, trigger a sync and the operator should apply the payload // @@ -1765,9 +1917,9 @@ func TestCVO_UpgradePreconditionFailing(t *testing.T) { select { case status = <-worker.StatusCh(): case <-time.After(3 * time.Second): - t.Fatalf("never saw expected sync event") + t.Fatalf("never saw expected retrieve payload event") } - if status.Step == "RetrievePayload" && reflect.DeepEqual(configv1.Release{Version: "1.0.1-abc", Image: "image/image:1"}, status.Actual) { + if reflect.DeepEqual(configv1.Release{Version: "1.0.1-abc", Image: "image/image:1"}, status.Actual) { break } t.Logf("Unexpected status waiting to see first retrieve: %#v", status) @@ -1776,15 +1928,27 @@ func TestCVO_UpgradePreconditionFailing(t *testing.T) { t.Fatalf("saw too many sync events of the wrong form") } } + // wait until the new payload is applied + count = 0 + for { + var status SyncWorkerStatus + select { + case status = <-worker.StatusCh(): + case <-time.After(3 * time.Second): + t.Fatalf("never saw expected apply event") + } + if status.loadPayloadStatus.Step == "PayloadLoaded" { + break + } + t.Log("Waiting to see step PayloadLoaded") + count++ + if count > 8 { + t.Fatalf("saw too many sync events of the wrong form") + } + } verifyAllStatus(t, worker.StatusCh(), - SyncWorkerStatus{ - Step: "PreconditionChecks", - Actual: configv1.Release{Version: "1.0.1-abc", Image: "image/image:1"}, - Generation: 1, - }, SyncWorkerStatus{ Total: 3, - Step: "ApplyResources", VersionHash: "DL-FFQ2Uem8=", Actual: configv1.Release{ Version: "1.0.1-abc", @@ -1792,11 +1956,16 @@ func TestCVO_UpgradePreconditionFailing(t *testing.T) { URL: configv1.URL("https://example.com/v1.0.1-abc"), }, Generation: 1, + loadPayloadStatus: LoadPayloadStatus{ + Step: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.1-abc\" image=\"image/image:1\"", + LastTransitionTime: time.Unix(1, 0), + Release: configv1.Release{Version: "1.0.1-abc", Image: "image/image:1"}, + }, }, SyncWorkerStatus{ Done: 1, Total: 3, - Step: "ApplyResources", VersionHash: "DL-FFQ2Uem8=", Actual: configv1.Release{ Version: "1.0.1-abc", @@ -1805,11 +1974,16 @@ func TestCVO_UpgradePreconditionFailing(t *testing.T) { }, LastProgress: time.Unix(1, 0), Generation: 1, + loadPayloadStatus: LoadPayloadStatus{ + Step: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.1-abc\" image=\"image/image:1\"", + LastTransitionTime: time.Unix(2, 0), + Release: configv1.Release{Version: "1.0.1-abc", Image: "image/image:1"}, + }, }, SyncWorkerStatus{ Done: 2, Total: 3, - Step: "ApplyResources", VersionHash: "DL-FFQ2Uem8=", Actual: configv1.Release{ Version: "1.0.1-abc", @@ -1818,27 +1992,24 @@ func TestCVO_UpgradePreconditionFailing(t *testing.T) { }, LastProgress: time.Unix(2, 0), Generation: 1, - }, - SyncWorkerStatus{ - Reconciling: true, - Completed: 1, - Done: 3, - Total: 3, - VersionHash: "DL-FFQ2Uem8=", - Actual: configv1.Release{ - Version: "1.0.1-abc", - Image: "image/image:1", - URL: configv1.URL("https://example.com/v1.0.1-abc"), + loadPayloadStatus: LoadPayloadStatus{ + Step: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.1-abc\" image=\"image/image:1\"", + LastTransitionTime: time.Unix(3, 0), + Release: configv1.Release{Version: "1.0.1-abc", Image: "image/image:1"}, }, - LastProgress: time.Unix(3, 0), - Generation: 1, }, ) + + // wait for status to reflect sync of new payload + waitForStatusCompleted(t, worker) + client.ClearActions() err = o.sync(ctx, o.queueKey()) if err != nil { t.Fatal(err) } + actions = client.Actions() if len(actions) != 2 { t.Fatalf("%s", spew.Sdump(actions)) @@ -1872,6 +2043,8 @@ func TestCVO_UpgradePreconditionFailing(t *testing.T) { {Type: ClusterStatusFailing, Status: configv1.ConditionFalse}, {Type: configv1.OperatorProgressing, Status: configv1.ConditionFalse, Message: "Cluster version is 1.0.1-abc"}, {Type: configv1.RetrievedUpdates, Status: configv1.ConditionFalse}, + {Type: DesiredReleaseAccepted, Status: configv1.ConditionTrue, Reason: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.1-abc\" image=\"image/image:1\""}, }, }, }) @@ -1931,10 +2104,11 @@ func TestCVO_UpgradeVerifiedPayload(t *testing.T) { worker := o.configSync.(*SyncWorker) retriever := worker.retriever.(*fakeDirectoryRetriever) retriever.Set(PayloadInfo{}, payloadErr) + retriever.Set(PayloadInfo{Directory: "testdata/payloadtest-2", Verified: true}, nil) go worker.Start(ctx, 1, o.name, o.cvLister) - // Step 1: The operator should report that it is blocked on unverified content + // Step 1: Simulate a verified payload being retrieved and ensure the operator sets verified // client.ClearActions() err := o.sync(ctx, o.queueKey()) @@ -1942,37 +2116,14 @@ func TestCVO_UpgradeVerifiedPayload(t *testing.T) { t.Fatal(err) } actions := client.Actions() - verifyCVSingleUpdate(t, actions) - - verifyAllStatus(t, worker.StatusCh(), - SyncWorkerStatus{ - Step: "RetrievePayload", - Actual: configv1.Release{Version: "1.0.1-abc", Image: "image/image:1"}, - Generation: 1, - }, - SyncWorkerStatus{ - Step: "RetrievePayload", - Failure: payloadErr, - Actual: configv1.Release{Version: "1.0.1-abc", Image: "image/image:1"}, - Generation: 1, - }, - ) - - client.ClearActions() - err = o.sync(ctx, o.queueKey()) - if err != nil { - t.Fatal(err) - } - actions = client.Actions() - if len(actions) != 2 { + if len(actions) != 3 { t.Fatalf("%s", spew.Sdump(actions)) } expectGet(t, actions[0], "clusterversions", "", "version") - actual := cvs["version"].(*configv1.ClusterVersion) - expectUpdateStatus(t, actions[1], "clusterversions", "", &configv1.ClusterVersion{ + expectUpdateStatus(t, actions[2], "clusterversions", "", &configv1.ClusterVersion{ ObjectMeta: metav1.ObjectMeta{ Name: "version", - ResourceVersion: "2", + ResourceVersion: "1", Generation: 1, }, Spec: configv1.ClusterVersionSpec{ @@ -1992,139 +2143,16 @@ func TestCVO_UpgradeVerifiedPayload(t *testing.T) { Conditions: []configv1.ClusterOperatorStatusCondition{ {Type: configv1.OperatorAvailable, Status: configv1.ConditionTrue, Message: "Done applying 1.0.0-abc"}, // cleared failing status and set progressing - {Type: ClusterStatusFailing, Status: configv1.ConditionTrue, Reason: "ImageVerificationFailed", Message: "The update cannot be verified: some random error"}, - {Type: configv1.OperatorProgressing, Status: configv1.ConditionTrue, Reason: "ImageVerificationFailed", Message: "Unable to apply 1.0.1-abc: the image may not be safe to use"}, - {Type: configv1.RetrievedUpdates, Status: configv1.ConditionFalse}, - }, - }, - }) - - // Step 2: Simulate a verified payload being retrieved and ensure the operator sets verified - // - // set an update - copied := configv1.Update{ - Version: desired.Version, - Image: desired.Image, - } - actual.ObjectMeta.Generation = 2 - actual.Spec.DesiredUpdate = &copied - retriever.Set(PayloadInfo{Directory: "testdata/payloadtest-2", Verified: true}, nil) - // - client.ClearActions() - err = o.sync(ctx, o.queueKey()) - if err != nil { - t.Fatal(err) - } - actions = client.Actions() - if len(actions) != 1 { - t.Fatalf("%s", spew.Sdump(actions)) - } - expectGet(t, actions[0], "clusterversions", "", "version") - - verifyAllStatus(t, worker.StatusCh(), - SyncWorkerStatus{ - Step: "RetrievePayload", - Actual: configv1.Release{Version: "1.0.1-abc", Image: "image/image:1"}, - Generation: 2, - }, - SyncWorkerStatus{ - Total: 3, - Step: "ApplyResources", - VersionHash: "DL-FFQ2Uem8=", - Actual: configv1.Release{ - Version: "1.0.1-abc", - Image: "image/image:1", - URL: configv1.URL("https://example.com/v1.0.1-abc"), - }, - Verified: true, - Generation: 2, - }, - SyncWorkerStatus{ - Done: 1, - Total: 3, - Step: "ApplyResources", - VersionHash: "DL-FFQ2Uem8=", - Actual: configv1.Release{ - Version: "1.0.1-abc", - Image: "image/image:1", - URL: configv1.URL("https://example.com/v1.0.1-abc"), - }, - LastProgress: time.Unix(1, 0), - Verified: true, - Generation: 2, - }, - SyncWorkerStatus{ - Done: 2, - Total: 3, - Step: "ApplyResources", - VersionHash: "DL-FFQ2Uem8=", - Actual: configv1.Release{ - Version: "1.0.1-abc", - Image: "image/image:1", - URL: configv1.URL("https://example.com/v1.0.1-abc"), - }, - LastProgress: time.Unix(2, 0), - Verified: true, - Generation: 2, - }, - SyncWorkerStatus{ - Reconciling: true, - Completed: 1, - Done: 3, - Total: 3, - VersionHash: "DL-FFQ2Uem8=", - Actual: configv1.Release{ - Version: "1.0.1-abc", - Image: "image/image:1", - URL: configv1.URL("https://example.com/v1.0.1-abc"), - }, - LastProgress: time.Unix(3, 0), - Verified: true, - Generation: 2, - }, - ) - client.ClearActions() - err = o.sync(ctx, o.queueKey()) - if err != nil { - t.Fatal(err) - } - actions = client.Actions() - if len(actions) != 2 { - t.Fatalf("%s", spew.Sdump(actions)) - } - expectGet(t, actions[0], "clusterversions", "", "version") - expectUpdateStatus(t, actions[1], "clusterversions", "", &configv1.ClusterVersion{ - ObjectMeta: metav1.ObjectMeta{ - Name: "version", - ResourceVersion: "3", - Generation: 2, - }, - Spec: configv1.ClusterVersionSpec{ - ClusterID: actual.Spec.ClusterID, - Channel: "fast", - DesiredUpdate: &copied, - }, - Status: configv1.ClusterVersionStatus{ - ObservedGeneration: 2, - Desired: configv1.Release{ - Version: "1.0.1-abc", - Image: "image/image:1", - URL: configv1.URL("https://example.com/v1.0.1-abc"), - }, - VersionHash: "DL-FFQ2Uem8=", - History: []configv1.UpdateHistory{ - {State: configv1.CompletedUpdate, Image: "image/image:1", Version: "1.0.1-abc", Verified: true, StartedTime: defaultStartedTime, CompletionTime: &defaultCompletionTime}, - {State: configv1.CompletedUpdate, Image: "image/image:0", Version: "1.0.0-abc", Verified: true, StartedTime: defaultStartedTime, CompletionTime: &defaultCompletionTime}, - }, - Conditions: []configv1.ClusterOperatorStatusCondition{ - {Type: configv1.OperatorAvailable, Status: configv1.ConditionTrue, Message: "Done applying 1.0.1-abc"}, {Type: ClusterStatusFailing, Status: configv1.ConditionFalse}, - {Type: configv1.OperatorProgressing, Status: configv1.ConditionFalse, Message: "Cluster version is 1.0.1-abc"}, + {Type: configv1.OperatorProgressing, Status: configv1.ConditionTrue, Message: "Working towards 1.0.1-abc"}, {Type: configv1.RetrievedUpdates, Status: configv1.ConditionFalse}, + {Type: DesiredReleaseAccepted, Status: configv1.ConditionTrue, Reason: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.1-abc\" image=\"image/image:1\""}, }, }, }) } + func TestCVO_RestartAndReconcile(t *testing.T) { o, cvs, client, _, shutdownFn := setupCVOTest("testdata/payloadtest") @@ -2183,7 +2211,7 @@ func TestCVO_RestartAndReconcile(t *testing.T) { t.Fatal(err) } actions := client.Actions() - if len(actions) != 1 { + if len(actions) != 3 { t.Fatalf("%s", spew.Sdump(actions)) } expectGet(t, actions[0], "clusterversions", "", "version") @@ -2204,26 +2232,31 @@ func TestCVO_RestartAndReconcile(t *testing.T) { verifyAllStatus(t, worker.StatusCh(), SyncWorkerStatus{ Reconciling: true, - Step: "RetrievePayload", Actual: configv1.Release{Version: "1.0.0-abc", Image: "image/image:1"}, + loadPayloadStatus: LoadPayloadStatus{ + Step: "RetrievePayload", + Message: "Retrieving and verifying payload version=\"1.0.0-abc\" image=\"image/image:1\"", + LastTransitionTime: time.Unix(1, 0), + Release: configv1.Release{Version: "1.0.0-abc", Image: "image/image:1"}, + }, }, SyncWorkerStatus{ Reconciling: true, - Total: 3, - Step: "ApplyResources", - VersionHash: "DL-FFQ2Uem8=", Actual: configv1.Release{ - Version: "1.0.0-abc", - Image: "image/image:1", - URL: configv1.URL("https://example.com/v1.0.0-abc"), - Channels: []string{"channel-a", "channel-b", "channel-c"}, + Version: "1.0.0-abc", + Image: "image/image:1", + }, + LastProgress: time.Unix(1, 0), + loadPayloadStatus: LoadPayloadStatus{ + Step: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.0-abc\" image=\"image/image:1\"", + LastTransitionTime: time.Unix(2, 0), + Release: configv1.Release{Version: "1.0.0-abc", Image: "image/image:1"}, }, }, SyncWorkerStatus{ Reconciling: true, - Done: 1, Total: 3, - Step: "ApplyResources", VersionHash: "DL-FFQ2Uem8=", Actual: configv1.Release{ Version: "1.0.0-abc", @@ -2231,13 +2264,18 @@ func TestCVO_RestartAndReconcile(t *testing.T) { URL: configv1.URL("https://example.com/v1.0.0-abc"), Channels: []string{"channel-a", "channel-b", "channel-c"}, }, - LastProgress: time.Unix(1, 0), + LastProgress: time.Unix(2, 0), + loadPayloadStatus: LoadPayloadStatus{ + Step: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.0-abc\" image=\"image/image:1\"", + LastTransitionTime: time.Unix(3, 0), + Release: configv1.Release{Version: "1.0.0-abc", Image: "image/image:1"}, + }, }, SyncWorkerStatus{ Reconciling: true, - Done: 2, + Done: 1, Total: 3, - Step: "ApplyResources", VersionHash: "DL-FFQ2Uem8=", Actual: configv1.Release{ Version: "1.0.0-abc", @@ -2245,12 +2283,17 @@ func TestCVO_RestartAndReconcile(t *testing.T) { URL: configv1.URL("https://example.com/v1.0.0-abc"), Channels: []string{"channel-a", "channel-b", "channel-c"}, }, - LastProgress: time.Unix(2, 0), + LastProgress: time.Unix(3, 0), + loadPayloadStatus: LoadPayloadStatus{ + Step: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.0-abc\" image=\"image/image:1\"", + LastTransitionTime: time.Unix(4, 0), + Release: configv1.Release{Version: "1.0.0-abc", Image: "image/image:1"}, + }, }, SyncWorkerStatus{ Reconciling: true, - Completed: 1, - Done: 3, + Done: 2, Total: 3, VersionHash: "DL-FFQ2Uem8=", Actual: configv1.Release{ @@ -2259,7 +2302,13 @@ func TestCVO_RestartAndReconcile(t *testing.T) { URL: configv1.URL("https://example.com/v1.0.0-abc"), Channels: []string{"channel-a", "channel-b", "channel-c"}, }, - LastProgress: time.Unix(3, 0), + LastProgress: time.Unix(4, 0), + loadPayloadStatus: LoadPayloadStatus{ + Step: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.0-abc\" image=\"image/image:1\"", + LastTransitionTime: time.Unix(5, 0), + Release: configv1.Release{Version: "1.0.0-abc", Image: "image/image:1"}, + }, }, ) client.ClearActions() @@ -2280,8 +2329,9 @@ func TestCVO_RestartAndReconcile(t *testing.T) { // note that the image is not retrieved a second time SyncWorkerStatus{ Reconciling: true, + Completed: 1, + Done: 3, Total: 3, - Step: "ApplyResources", VersionHash: "DL-FFQ2Uem8=", Actual: configv1.Release{ Version: "1.0.0-abc", @@ -2289,12 +2339,17 @@ func TestCVO_RestartAndReconcile(t *testing.T) { URL: configv1.URL("https://example.com/v1.0.0-abc"), Channels: []string{"channel-a", "channel-b", "channel-c"}, }, + LastProgress: time.Unix(1, 0), + loadPayloadStatus: LoadPayloadStatus{ + Step: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.0-abc\" image=\"image/image:1\"", + LastTransitionTime: time.Unix(1, 0), + Release: configv1.Release{Version: "1.0.0-abc", Image: "image/image:1"}, + }, }, SyncWorkerStatus{ Reconciling: true, - Done: 1, Total: 3, - Step: "ApplyResources", VersionHash: "DL-FFQ2Uem8=", Actual: configv1.Release{ Version: "1.0.0-abc", @@ -2302,12 +2357,18 @@ func TestCVO_RestartAndReconcile(t *testing.T) { URL: configv1.URL("https://example.com/v1.0.0-abc"), Channels: []string{"channel-a", "channel-b", "channel-c"}, }, + LastProgress: time.Unix(2, 0), + loadPayloadStatus: LoadPayloadStatus{ + Step: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.0-abc\" image=\"image/image:1\"", + LastTransitionTime: time.Unix(2, 0), + Release: configv1.Release{Version: "1.0.0-abc", Image: "image/image:1"}, + }, }, SyncWorkerStatus{ Reconciling: true, - Done: 2, + Done: 1, Total: 3, - Step: "ApplyResources", VersionHash: "DL-FFQ2Uem8=", Actual: configv1.Release{ Version: "1.0.0-abc", @@ -2315,11 +2376,17 @@ func TestCVO_RestartAndReconcile(t *testing.T) { URL: configv1.URL("https://example.com/v1.0.0-abc"), Channels: []string{"channel-a", "channel-b", "channel-c"}, }, + LastProgress: time.Unix(3, 0), + loadPayloadStatus: LoadPayloadStatus{ + Step: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.0-abc\" image=\"image/image:1\"", + LastTransitionTime: time.Unix(3, 0), + Release: configv1.Release{Version: "1.0.0-abc", Image: "image/image:1"}, + }, }, SyncWorkerStatus{ Reconciling: true, - Completed: 2, - Done: 3, + Done: 2, Total: 3, VersionHash: "DL-FFQ2Uem8=", Actual: configv1.Release{ @@ -2328,7 +2395,13 @@ func TestCVO_RestartAndReconcile(t *testing.T) { URL: configv1.URL("https://example.com/v1.0.0-abc"), Channels: []string{"channel-a", "channel-b", "channel-c"}, }, - LastProgress: time.Unix(1, 0), + LastProgress: time.Unix(4, 0), + loadPayloadStatus: LoadPayloadStatus{ + Step: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.0-abc\" image=\"image/image:1\"", + LastTransitionTime: time.Unix(4, 0), + Release: configv1.Release{Version: "1.0.0-abc", Image: "image/image:1"}, + }, }, ) client.ClearActions() @@ -2400,7 +2473,7 @@ func TestCVO_ErrorDuringReconcile(t *testing.T) { t.Fatal(err) } actions := client.Actions() - if len(actions) != 1 { + if len(actions) != 3 { t.Fatalf("%s", spew.Sdump(actions)) } expectGet(t, actions[0], "clusterversions", "", "version") @@ -2418,26 +2491,33 @@ func TestCVO_ErrorDuringReconcile(t *testing.T) { verifyAllStatus(t, worker.StatusCh(), SyncWorkerStatus{ Reconciling: true, - Step: "RetrievePayload", Actual: configv1.Release{Version: "1.0.0-abc", Image: "image/image:1"}, + loadPayloadStatus: LoadPayloadStatus{ + Step: "RetrievePayload", + Message: "Retrieving and verifying payload version=\"1.0.0-abc\" image=\"image/image:1\"", + LastTransitionTime: time.Unix(1, 0), + Release: configv1.Release{Version: "1.0.0-abc", Image: "image/image:1"}, + }, }, SyncWorkerStatus{ Reconciling: true, - Total: 3, - Step: "ApplyResources", - VersionHash: "DL-FFQ2Uem8=", Actual: configv1.Release{ - Version: "1.0.0-abc", - Image: "image/image:1", - URL: configv1.URL("https://example.com/v1.0.0-abc"), - Channels: []string{"channel-a", "channel-b", "channel-c"}, + Version: "1.0.0-abc", + Image: "image/image:1", + }, + LastProgress: time.Unix(1, 0), + loadPayloadStatus: LoadPayloadStatus{ + Step: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.0-abc\" image=\"image/image:1\"", + LastTransitionTime: time.Unix(2, 0), + Release: configv1.Release{Version: "1.0.0-abc", Image: "image/image:1"}, }, }, ) // verify we haven't observed any other events verifyAllStatus(t, worker.StatusCh()) - // Step 3: Simulate a sync being triggered while we are partway through our first + // Step 2: Simulate a sync being triggered while we are partway through our first // reconcile sync and verify status is not updated // client.ClearActions() @@ -2445,13 +2525,14 @@ func TestCVO_ErrorDuringReconcile(t *testing.T) { if err != nil { t.Fatal(err) } + actions = client.Actions() if len(actions) != 1 { t.Fatalf("%s", spew.Sdump(actions)) } expectGet(t, actions[0], "clusterversions", "", "version") - // Step 4: Unblock the first item from being applied + // Step 3: Unblock the first item from being applied // b.Send(nil) // @@ -2459,9 +2540,7 @@ func TestCVO_ErrorDuringReconcile(t *testing.T) { verifyAllStatus(t, worker.StatusCh(), SyncWorkerStatus{ Reconciling: true, - Done: 1, Total: 3, - Step: "ApplyResources", VersionHash: "DL-FFQ2Uem8=", Actual: configv1.Release{ Version: "1.0.0-abc", @@ -2469,22 +2548,28 @@ func TestCVO_ErrorDuringReconcile(t *testing.T) { URL: configv1.URL("https://example.com/v1.0.0-abc"), Channels: []string{"channel-a", "channel-b", "channel-c"}, }, - LastProgress: time.Unix(1, 0), + loadPayloadStatus: LoadPayloadStatus{ + Step: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.0-abc\" image=\"image/image:1\"", + LastTransitionTime: time.Unix(1, 0), + Release: configv1.Release{Version: "1.0.0-abc", Image: "image/image:1"}, + }, }, ) - verifyAllStatus(t, worker.StatusCh()) + clearAllStatus(t, worker.StatusCh()) - // Step 5: Unblock the first item from being applied + // Step 4: Unblock the first item from being applied // b.Send(nil) // - // verify we observe the remaining changes in the first sync - verifyAllStatus(t, worker.StatusCh(), + // Verify we observe the remaining changes in the first sync. Since timing is + // non-deterministic, use this instead of verifyAllStatus when don't know or + // care how many are done. + verifyAllStatusOptionalDone(t, true, worker.StatusCh(), SyncWorkerStatus{ Reconciling: true, Done: 2, Total: 3, - Step: "ApplyResources", VersionHash: "DL-FFQ2Uem8=", Actual: configv1.Release{ Version: "1.0.0-abc", @@ -2493,11 +2578,17 @@ func TestCVO_ErrorDuringReconcile(t *testing.T) { Channels: []string{"channel-a", "channel-b", "channel-c"}, }, LastProgress: time.Unix(1, 0), + loadPayloadStatus: LoadPayloadStatus{ + Step: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.0-abc\" image=\"image/image:1\"", + LastTransitionTime: time.Unix(1, 0), + Release: configv1.Release{Version: "1.0.0-abc", Image: "image/image:1"}, + }, }, ) - verifyAllStatus(t, worker.StatusCh()) + clearAllStatus(t, worker.StatusCh()) - // Step 6: Send an error, then verify it shows up in status + // Step 5: Send an error, then verify it shows up in status // b.Send(fmt.Errorf("unable to proceed")) @@ -2516,7 +2607,6 @@ func TestCVO_ErrorDuringReconcile(t *testing.T) { verifyAllStatus(t, worker.StatusCh(), SyncWorkerStatus{ Reconciling: true, - Step: "ApplyResources", Done: 2, Total: 3, VersionHash: "DL-FFQ2Uem8=", @@ -2533,6 +2623,12 @@ func TestCVO_ErrorDuringReconcile(t *testing.T) { Channels: []string{"channel-a", "channel-b", "channel-c"}, }, LastProgress: time.Unix(1, 0), + loadPayloadStatus: LoadPayloadStatus{ + Step: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.0-abc\" image=\"image/image:1\"", + LastTransitionTime: time.Unix(1, 0), + Release: configv1.Release{Version: "1.0.0-abc", Image: "image/image:1"}, + }, }, ) client.ClearActions() @@ -2548,7 +2644,7 @@ func TestCVO_ErrorDuringReconcile(t *testing.T) { expectUpdateStatus(t, actions[1], "clusterversions", "", &configv1.ClusterVersion{ ObjectMeta: metav1.ObjectMeta{ Name: "version", - ResourceVersion: "1", + ResourceVersion: "2", }, Spec: configv1.ClusterVersionSpec{ ClusterID: clusterUID, @@ -2571,6 +2667,8 @@ func TestCVO_ErrorDuringReconcile(t *testing.T) { {Type: ClusterStatusFailing, Status: configv1.ConditionTrue, Reason: "UpdatePayloadFailed", Message: "Could not update test \"file-yml\" (3 of 3)"}, {Type: configv1.OperatorProgressing, Status: configv1.ConditionFalse, Reason: "UpdatePayloadFailed", Message: "Error while reconciling 1.0.0-abc: the update could not be applied"}, {Type: configv1.RetrievedUpdates, Status: configv1.ConditionFalse}, + {Type: DesiredReleaseAccepted, Status: configv1.ConditionTrue, Reason: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.0-abc\" image=\"image/image:1\""}, }, }, }) @@ -2632,7 +2730,7 @@ func TestCVO_ParallelError(t *testing.T) { t.Fatal(err) } actions := client.Actions() - if len(actions) != 2 { + if len(actions) != 3 { t.Fatalf("%s", spew.Sdump(actions)) } expectGet(t, actions[0], "clusterversions", "", "version") @@ -2645,28 +2743,30 @@ func TestCVO_ParallelError(t *testing.T) { t.Fatalf("The worker should be reconciling: %v", worker.work) } - // Step 2: Start the sync worker and verify the sequence of events + // Step 2: Start the sync worker and wait for the payload to be loaded // cancellable, cancel := context.WithCancel(ctx) defer cancel() go worker.Start(cancellable, 1, o.name, o.cvLister) - // - verifyAllStatus(t, worker.StatusCh(), - SyncWorkerStatus{ - Initial: true, - Step: "RetrievePayload", - Actual: configv1.Release{Version: "1.0.0-abc", Image: "image/image:1"}, - Generation: 1, - }, - SyncWorkerStatus{ - Initial: true, - Total: 3, - Step: "ApplyResources", - VersionHash: "Gyh2W6qcDO4=", - Actual: configv1.Release{Version: "1.0.0-abc", Image: "image/image:1"}, - Generation: 1, - }, - ) + + // wait until the new payload is applied + count := 0 + for { + var status SyncWorkerStatus + select { + case status = <-worker.StatusCh(): + case <-time.After(3 * time.Second): + t.Fatalf("never saw expected apply event") + } + if status.loadPayloadStatus.Step == "PayloadLoaded" { + break + } + t.Log("Waiting to see step PayloadLoaded") + count++ + if count > 8 { + t.Fatalf("saw too many sync events of the wrong form") + } + } // Step 3: Cancel after we've accumulated 2/3 errors // @@ -2681,11 +2781,16 @@ func TestCVO_ParallelError(t *testing.T) { Initial: true, Done: status.Done, Total: 3, - Step: "ApplyResources", VersionHash: "Gyh2W6qcDO4=", Actual: configv1.Release{Version: "1.0.0-abc", Image: "image/image:1"}, LastProgress: status.LastProgress, Generation: 1, + loadPayloadStatus: LoadPayloadStatus{ + Step: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.0-abc\" image=\"image/image:1\"", + LastTransitionTime: status.loadPayloadStatus.LastTransitionTime, + Release: configv1.Release{Version: "1.0.0-abc", Image: "image/image:1"}, + }, }) { t.Fatalf("unexpected status: %v", status) } @@ -2705,11 +2810,16 @@ func TestCVO_ParallelError(t *testing.T) { Failure: err, Done: 1, Total: 3, - Step: "ApplyResources", VersionHash: "Gyh2W6qcDO4=", Actual: configv1.Release{Version: "1.0.0-abc", Image: "image/image:1"}, LastProgress: status.LastProgress, Generation: 1, + loadPayloadStatus: LoadPayloadStatus{ + Step: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.0-abc\" image=\"image/image:1\"", + LastTransitionTime: status.loadPayloadStatus.LastTransitionTime, + Release: configv1.Release{Version: "1.0.0-abc", Image: "image/image:1"}, + }, }) { t.Fatalf("unexpected final: %v", status) } @@ -2746,6 +2856,8 @@ func TestCVO_ParallelError(t *testing.T) { {State: configv1.PartialUpdate, Image: "image/image:1", Version: "1.0.0-abc", StartedTime: defaultStartedTime}, }, Conditions: []configv1.ClusterOperatorStatusCondition{ + {Type: DesiredReleaseAccepted, Status: configv1.ConditionTrue, Reason: "PayloadLoaded", + Message: "Payload loaded version=\"1.0.0-abc\" image=\"image/image:1\""}, {Type: configv1.OperatorAvailable, Status: configv1.ConditionFalse}, {Type: ClusterStatusFailing, Status: configv1.ConditionFalse}, {Type: configv1.OperatorProgressing, Status: configv1.ConditionTrue, Reason: "ClusterOperatorsNotAvailable", Message: "Working towards 1.0.0-abc: 1 of 3 done (33% complete), waiting on operator-1, operator-2"}, @@ -2891,6 +3003,12 @@ func verifyCVSingleUpdate(t *testing.T, actions []clientgotesting.Action) { } func verifyAllStatus(t *testing.T, ch <-chan SyncWorkerStatus, items ...SyncWorkerStatus) { + verifyAllStatusOptionalDone(t, false, ch, items...) +} + +// Since timing can be non-deterministic, use this instead of verifyAllStatus when +// don't know or care how many are done. +func verifyAllStatusOptionalDone(t *testing.T, ignoreDone bool, ch <-chan SyncWorkerStatus, items ...SyncWorkerStatus) { t.Helper() if len(items) == 0 { if len(ch) > 0 { @@ -2900,6 +3018,7 @@ func verifyAllStatus(t *testing.T, ch <-chan SyncWorkerStatus, items ...SyncWork } var lastTime time.Time count := int64(1) + count2 := int64(1) for i, expect := range items { actual, ok := <-ch if !ok { @@ -2913,6 +3032,17 @@ func verifyAllStatus(t *testing.T, ch <-chan SyncWorkerStatus, items ...SyncWork actual.LastProgress = time.Unix(count, 0) } + lastTime = time.Unix(0, 0) + if nextTime := actual.loadPayloadStatus.LastTransitionTime; !nextTime.Equal(lastTime) { + actual.loadPayloadStatus.LastTransitionTime = time.Unix(count2, 0) + count2++ + } else if !lastTime.IsZero() { + actual.loadPayloadStatus.LastTransitionTime = time.Unix(count2, 0) + } + if ignoreDone { + expect.Done = actual.Done + } + if !reflect.DeepEqual(expect, actual) { t.Fatalf("unexpected status item %d\nExpected: %#v\nActual: %#v", i, expect, actual) } @@ -2949,3 +3079,39 @@ func (b *errorResourceBuilder) Apply(ctx context.Context, m *manifest.Manifest, } return fmt.Errorf("unknown file %s", m.OriginalFilename) } + +// wait for status completed +func waitForStatusCompleted(t *testing.T, worker *SyncWorker) { + count := 0 + for { + var status SyncWorkerStatus + select { + case status = <-worker.StatusCh(): + case <-time.After(3 * time.Second): + t.Fatalf("never saw status Completed > 0") + } + if status.Completed > 0 { + break + } + t.Log("Waiting for Completed > 0") + count++ + if count > 8 { + t.Fatalf("saw too many sync events of the wrong form") + } + } +} + +func clearAllStatus(t *testing.T, ch <-chan SyncWorkerStatus) { + count := 0 + for { + if len(ch) <= 0 { + break + } + <-ch + t.Log("Waiting for SyncWorkerStatus to clear") + count++ + if count > 8 { + t.Fatalf("Waited too long for SyncWorkerStatus to clear") + } + } +} diff --git a/pkg/cvo/cvo_test.go b/pkg/cvo/cvo_test.go index 6342326d5..a66a0a46c 100644 --- a/pkg/cvo/cvo_test.go +++ b/pkg/cvo/cvo_test.go @@ -222,7 +222,6 @@ func TestOperator_sync(t *testing.T) { { name: "progressing and previously failed, not reconciling", syncStatus: &SyncWorkerStatus{ - Step: "Moving", Reconciling: false, Actual: configv1.Release{Version: "0.0.1-abc", Image: "image/image:v4.0.1"}, Failure: &payload.UpdateError{ @@ -303,7 +302,6 @@ func TestOperator_sync(t *testing.T) { name: "default", configSync: &fakeSyncRecorder{ Returns: &SyncWorkerStatus{ - Step: "Moving", Reconciling: true, Actual: configv1.Release{Version: "0.0.1-abc", Image: "image/image:v4.0.1"}, Failure: &payload.UpdateError{ @@ -378,7 +376,6 @@ func TestOperator_sync(t *testing.T) { name: "default", configSync: &fakeSyncRecorder{ Returns: &SyncWorkerStatus{ - Step: "Moving", Reconciling: true, Completed: 2, Actual: configv1.Release{Version: "0.0.1-abc", Image: "image/image:v4.0.1"}, @@ -454,7 +451,6 @@ func TestOperator_sync(t *testing.T) { name: "default", configSync: &fakeSyncRecorder{ Returns: &SyncWorkerStatus{ - Step: "Moving", Actual: configv1.Release{Version: "0.0.1-abc", Image: "image/image:v4.0.1"}, Failure: fmt.Errorf("injected error"), VersionHash: "foo", @@ -572,7 +568,6 @@ func TestOperator_sync(t *testing.T) { { name: "invalid image while progressing preserves progressing order and partial history", syncStatus: &SyncWorkerStatus{ - Step: "Working", Done: 600, Total: 1000, Failure: os.ErrNotExist, @@ -1009,7 +1004,6 @@ func TestOperator_sync(t *testing.T) { name: "report partial retrieved version", syncStatus: &SyncWorkerStatus{ Actual: configv1.Release{Image: "image/image:v4.0.1"}, - Step: "RetrievePayload", }, optr: &Operator{ release: configv1.Release{ @@ -1099,7 +1093,7 @@ func TestOperator_sync(t *testing.T) { {Type: configv1.OperatorAvailable, Status: configv1.ConditionFalse}, {Type: ClusterStatusFailing, Status: configv1.ConditionFalse}, // we correct the message that was incorrect from the previous state - {Type: configv1.OperatorProgressing, Status: configv1.ConditionTrue, Reason: "DownloadingUpdate", Message: "Working towards image/image:v4.0.1: downloading update"}, + {Type: configv1.OperatorProgressing, Status: configv1.ConditionTrue, Message: "Working towards image/image:v4.0.1"}, {Type: configv1.RetrievedUpdates, Status: configv1.ConditionFalse}, }, }, @@ -3993,6 +3987,7 @@ func expectMutation(t *testing.T, a ktesting.Action, verb string, resource, subr t.Fatalf("unexpected action: %#v", at) } if !reflect.DeepEqual(expect, actual) { + t.Logf("%#v", actual) t.Fatalf("unexpected object: %s", diff.ObjectReflectDiff(expect, actual)) } default: diff --git a/pkg/cvo/status.go b/pkg/cvo/status.go index b96894edc..0fdb83fda 100644 --- a/pkg/cvo/status.go +++ b/pkg/cvo/status.go @@ -155,10 +155,14 @@ func pruneStatusHistory(config *configv1.ClusterVersion, maxHistory int) { // condition is set. const ClusterVersionInvalid configv1.ClusterStatusConditionType = "Invalid" +// DesiredReleaseAccepted indicates whether the requested (desired) release payload was successfully loaded +// and no failures occurred during image verification and precondition checking. +const DesiredReleaseAccepted configv1.ClusterStatusConditionType = "ReleaseAccepted" + // syncStatus calculates the new status of the ClusterVersion based on the current sync state and any // validation errors found. We allow the caller to pass the original object to avoid DeepCopying twice. func (optr *Operator) syncStatus(ctx context.Context, original, config *configv1.ClusterVersion, status *SyncWorkerStatus, validationErrs field.ErrorList) error { - klog.V(2).Infof("Synchronizing errs=%#v status=%#v", validationErrs, status) + klog.V(2).Infof("Synchronizing status errs=%#v status=%#v", validationErrs, status) cvUpdated := false // update the config with the latest available updates @@ -222,13 +226,15 @@ func (optr *Operator) syncStatus(ctx context.Context, original, config *configv1 resourcemerge.RemoveOperatorStatusCondition(&config.Status.Conditions, ClusterVersionInvalid) } + // set the desired release accepted condition + setDesiredReleaseAcceptedCondition(config, status.loadPayloadStatus, now) + // set the available condition if status.Completed > 0 { resourcemerge.SetOperatorStatusCondition(&config.Status.Conditions, configv1.ClusterOperatorStatusCondition{ - Type: configv1.OperatorAvailable, - Status: configv1.ConditionTrue, - Message: fmt.Sprintf("Done applying %s", version), - + Type: configv1.OperatorAvailable, + Status: configv1.ConditionTrue, + Message: fmt.Sprintf("Done applying %s", version), LastTransitionTime: now, }) } @@ -313,11 +319,6 @@ func (optr *Operator) syncStatus(ctx context.Context, original, config *configv1 case fractionComplete > 0: message = fmt.Sprintf("Working towards %s: %d of %d done (%.0f%% complete)", version, status.Done, status.Total, math.Trunc(float64(fractionComplete*100))) - case status.Step == "RetrievePayload": - if len(reason) == 0 { - reason = "DownloadingUpdate" - } - message = fmt.Sprintf("Working towards %s: downloading update", version) case skipFailure: reason = progressReason message = fmt.Sprintf("Working towards %s: %s", version, progressMessage) @@ -351,6 +352,36 @@ func (optr *Operator) syncStatus(ctx context.Context, original, config *configv1 return err } +func setDesiredReleaseAcceptedCondition(config *configv1.ClusterVersion, status LoadPayloadStatus, now metav1.Time) { + if status.Step == "PayloadLoaded" { + resourcemerge.SetOperatorStatusCondition(&config.Status.Conditions, configv1.ClusterOperatorStatusCondition{ + Type: DesiredReleaseAccepted, + Status: configv1.ConditionTrue, + Reason: status.Step, + Message: status.Message, + LastTransitionTime: now, + }) + } else if status.Step != "" { + if status.Failure != nil { + resourcemerge.SetOperatorStatusCondition(&config.Status.Conditions, configv1.ClusterOperatorStatusCondition{ + Type: DesiredReleaseAccepted, + Status: configv1.ConditionFalse, + Reason: status.Step, + Message: status.Message, + LastTransitionTime: now, + }) + } else { + resourcemerge.SetOperatorStatusCondition(&config.Status.Conditions, configv1.ClusterOperatorStatusCondition{ + Type: DesiredReleaseAccepted, + Status: configv1.ConditionUnknown, + Reason: status.Step, + Message: status.Message, + LastTransitionTime: now, + }) + } + } +} + // convertErrorToProgressing returns true if the provided status indicates a failure condition can be interpreted as // still making internal progress. The general error we try to suppress is an operator or operators still being // unavailable AND the general payload task making progress towards its goal. The error's UpdateEffect determines diff --git a/pkg/cvo/sync_test.go b/pkg/cvo/sync_test.go index f960d0da5..92df47d10 100644 --- a/pkg/cvo/sync_test.go +++ b/pkg/cvo/sync_test.go @@ -99,7 +99,7 @@ func Test_SyncWorker_apply(t *testing.T) { cancelAfter: 2, wantErr: true, check: func(t *testing.T, actions []action) { - if len(actions) != 3 { + if len(actions) < 1 { spew.Dump(actions) t.Fatalf("unexpected %d actions", len(actions)) } @@ -143,8 +143,9 @@ func Test_SyncWorker_apply(t *testing.T) { cancel: cancel, remainingErrors: test.cancelAfter, } + worker.payload = up - err := worker.apply(ctx, up, &SyncWork{}, 1, &statusWrapper{w: worker, previousStatus: worker.Status()}) + err := worker.apply(ctx, &SyncWork{}, 1, &statusWrapper{w: worker, previousStatus: worker.Status()}) if !test.wantErr && err != nil { t.Fatal(err) } @@ -337,8 +338,9 @@ func Test_SyncWorker_apply_generic(t *testing.T) { client: dynamicClient, modifiers: test.modifiers, } + worker.payload = up ctx := context.Background() - err := worker.apply(ctx, up, &SyncWork{}, 1, &statusWrapper{w: worker, previousStatus: worker.Status()}) + err := worker.apply(ctx, &SyncWork{}, 1, &statusWrapper{w: worker, previousStatus: worker.Status()}) if err != nil { t.Fatal(err) } @@ -411,7 +413,7 @@ func (r *fakeSyncRecorder) StatusCh() <-chan SyncWorkerStatus { func (r *fakeSyncRecorder) Start(ctx context.Context, maxWorkers int, cvoOptrName string, lister configlistersv1.ClusterVersionLister) { } -func (r *fakeSyncRecorder) Update(generation int64, desired configv1.Update, overrides []configv1.ComponentOverride, state payload.State) *SyncWorkerStatus { +func (r *fakeSyncRecorder) Update(ctx context.Context, generation int64, desired configv1.Update, overrides []configv1.ComponentOverride, state payload.State, cvoOptrName string, lister configlistersv1.ClusterVersionLister) *SyncWorkerStatus { r.Updates = append(r.Updates, desired) return r.Returns } diff --git a/pkg/cvo/sync_worker.go b/pkg/cvo/sync_worker.go index df633b814..77d982bba 100644 --- a/pkg/cvo/sync_worker.go +++ b/pkg/cvo/sync_worker.go @@ -30,7 +30,7 @@ import ( // ConfigSyncWorker abstracts how the image is synchronized to the server. Introduced for testing. type ConfigSyncWorker interface { Start(ctx context.Context, maxWorkers int, cvoOptrName string, lister configlistersv1.ClusterVersionLister) - Update(generation int64, desired configv1.Update, overrides []configv1.ComponentOverride, state payload.State) *SyncWorkerStatus + Update(ctx context.Context, generation int64, desired configv1.Update, overrides []configv1.ComponentOverride, state payload.State, cvoOptrName string, lister configlistersv1.ClusterVersionLister) *SyncWorkerStatus StatusCh() <-chan SyncWorkerStatus } @@ -59,6 +59,8 @@ type PayloadRetriever interface { // StatusReporter abstracts how status is reported by the worker run method. Introduced for testing. type StatusReporter interface { Report(status SyncWorkerStatus) + ReportPayload(payLoadStatus LoadPayloadStatus) + ValidPayloadStatus(release configv1.Release) bool } // SyncWork represents the work that should be done in a sync iteration. @@ -80,11 +82,19 @@ func (w SyncWork) Empty() bool { return len(w.Desired.Image) == 0 } +type LoadPayloadStatus struct { + Step string + Message string + Failure error + Release configv1.Release + Verified bool + LastTransitionTime time.Time +} + // SyncWorkerStatus is the status of the sync worker at a given time. type SyncWorkerStatus struct { Generation int64 - Step string Failure error Done int @@ -97,8 +107,12 @@ type SyncWorkerStatus struct { LastProgress time.Time - Actual configv1.Release + Actual configv1.Release + + // indicates if actual (current) release was verified Verified bool + + loadPayloadStatus LoadPayloadStatus } // DeepCopy copies the worker status. @@ -115,16 +129,16 @@ func (w SyncWorkerStatus) DeepCopy() *SyncWorkerStatus { // // Initial: wait for valid Update(), report empty status // Update() -> Sync -// Sync: attempt to invoke the syncOnce() method -// syncOnce() returns an error -> Error -// syncOnce() returns nil -> Reconciling -// Reconciling: invoke syncOnce() no more often than reconcileInterval +// Sync: attempt to invoke the apply() method +// apply() returns an error -> Error +// apply() returns nil -> Reconciling +// Reconciling: invoke apply() no more often than reconcileInterval // Update() with different values -> Sync -// syncOnce() returns an error -> Error -// syncOnce() returns nil -> Reconciling +// apply() returns an error -> Error +// apply() returns nil -> Reconciling // Error: backoff until we are attempting every reconcileInterval -// syncOnce() returns an error -> Error -// syncOnce() returns nil -> Reconciling +// apply() returns an error -> Error +// apply() returns nil -> Reconciling // type SyncWorker struct { backoff wait.Backoff @@ -134,7 +148,7 @@ type SyncWorker struct { eventRecorder record.EventRecorder // minimumReconcileInterval is the minimum time between reconcile attempts, and is - // used to define the maximum backoff interval when syncOnce() returns an error. + // used to define the maximum backoff interval when apply() returns an error. minimumReconcileInterval time.Duration // coordination between the sync loop and external callers @@ -200,12 +214,159 @@ func (w *SyncWorker) StatusCh() <-chan SyncWorkerStatus { return w.report } +func (w *SyncWorker) syncPayload(ctx context.Context, work *SyncWork, reporter StatusReporter, clusterVersion *configv1.ClusterVersion) error { + desired := configv1.Release{ + Version: work.Desired.Version, + Image: work.Desired.Image, + } + klog.V(2).Infof("syncPayload: %s (force=%t)", versionString(desired), work.Desired.Force) + + // cache the payload until the release image changes + validPayload := w.payload + if validPayload != nil && validPayload.Release.Image == desired.Image { + + // clear payload status if it no longer applies to desired target + if !reporter.ValidPayloadStatus(desired) { + klog.V(2).Info("Resetting payload status to currently loaded payload.") + reporter.ReportPayload(LoadPayloadStatus{ + Failure: nil, + Step: "PayloadLoaded", + Message: fmt.Sprintf("Payload loaded version=%q image=%q", desired.Version, desired.Image), + Verified: w.status.Verified, + Release: desired, + LastTransitionTime: time.Now(), + }) + } + // possibly complain here if Version, etc. diverges from the payload content + return nil + } else if validPayload == nil || !equalUpdate(configv1.Update{Image: validPayload.Release.Image}, configv1.Update{Image: desired.Image}) { + cvoObjectRef := &corev1.ObjectReference{APIVersion: "config.openshift.io/v1", Kind: "ClusterVersion", Name: "version", Namespace: "openshift-cluster-version"} + msg := fmt.Sprintf("Retrieving and verifying payload version=%q image=%q", desired.Version, desired.Image) + w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeNormal, "RetrievePayload", msg) + reporter.ReportPayload(LoadPayloadStatus{ + Step: "RetrievePayload", + Message: msg, + Release: desired, + LastTransitionTime: time.Now(), + }) + info, err := w.retriever.RetrievePayload(ctx, work.Desired) + if err != nil { + msg := fmt.Sprintf("Retrieving payload failed version=%q image=%q failure=%v", desired.Version, desired.Image, err) + w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeWarning, "RetrievePayloadFailed", msg) + reporter.ReportPayload(LoadPayloadStatus{ + Failure: err, + Step: "RetrievePayload", + Message: msg, + Release: desired, + LastTransitionTime: time.Now(), + }) + return err + } + + w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeNormal, "LoadPayload", "Loading payload version=%q image=%q", desired.Version, desired.Image) + payloadUpdate, err := payload.LoadUpdate(info.Directory, desired.Image, w.exclude, w.includeTechPreview, w.clusterProfile) + if err != nil { + msg := fmt.Sprintf("Loading payload failed version=%q image=%q failure=%v", desired.Version, desired.Image, err) + w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeWarning, "LoadPayloadFailed", msg) + reporter.ReportPayload(LoadPayloadStatus{ + Failure: err, + Step: "LoadPayload", + Message: msg, + Verified: info.Verified, + Release: desired, + LastTransitionTime: time.Now(), + }) + return err + } + + payloadUpdate.VerifiedImage = info.Verified + payloadUpdate.LoadedAt = time.Now() + + if work.Desired.Version == "" { + work.Desired.Version = payloadUpdate.Release.Version + desired.Version = payloadUpdate.Release.Version + } else if payloadUpdate.Release.Version != work.Desired.Version { + err = fmt.Errorf("release image version %s does not match the expected upstream version %s", payloadUpdate.Release.Version, work.Desired.Version) + msg := fmt.Sprintf("Verifying payload failed version=%q image=%q failure=%v", work.Desired.Version, work.Desired.Image, err) + w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeWarning, "VerifyPayloadVersionFailed", msg) + reporter.ReportPayload(LoadPayloadStatus{ + Failure: err, + Step: "VerifyPayloadVersion", + Message: msg, + Verified: info.Verified, + Release: desired, + LastTransitionTime: time.Now(), + }) + return err + } + + // need to make sure the payload is only set when the preconditions have been successful + if len(w.preconditions) == 0 { + klog.V(2).Info("No preconditions configured.") + } else if info.Local { + klog.V(2).Info("Skipping preconditions for a local operator image payload.") + } else { + if block, err := precondition.Summarize(w.preconditions.RunAll(ctx, precondition.ReleaseContext{ + DesiredVersion: payloadUpdate.Release.Version, + }), work.Desired.Force); err != nil { + klog.V(2).Infof("Precondition error (force %t, block %t): %v", work.Desired.Force, block, err) + if block { + msg := fmt.Sprintf("Preconditions failed for payload loaded version=%q image=%q: %v", desired.Version, desired.Image, err) + w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeWarning, "PreconditionBlock", msg) + reporter.ReportPayload(LoadPayloadStatus{ + Failure: err, + Step: "PreconditionChecks", + Message: msg, + Verified: info.Verified, + Release: desired, + LastTransitionTime: time.Now(), + }) + return err + } else { + w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeWarning, "PreconditionWarn", "precondition warning for payload loaded version=%q image=%q: %v", desired.Version, desired.Image, err) + } + } + w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeNormal, "PreconditionsPassed", "preconditions passed for payload loaded version=%q image=%q", desired.Version, desired.Image) + } + + w.payload = payloadUpdate + msg = fmt.Sprintf("Payload loaded version=%q image=%q", desired.Version, desired.Image) + w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeNormal, "PayloadLoaded", msg) + reporter.ReportPayload(LoadPayloadStatus{ + Failure: nil, + Step: "PayloadLoaded", + Message: msg, + Verified: info.Verified, + Release: desired, + LastTransitionTime: time.Now(), + }) + klog.V(2).Infof("Payload loaded from %s with hash %s", desired.Image, payloadUpdate.ManifestHash) + } + return nil +} + +// loadUpdatedPayload retrieves the image. If successfully retrieved it updates payload otherwise it returns an error. +func (w *SyncWorker) loadUpdatedPayload(ctx context.Context, work *SyncWork, cvoOptrName string, lister configlistersv1.ClusterVersionLister) error { + config, err := lister.Get(cvoOptrName) + if err != nil { + return err + } + // reporter hides status updates that occur earlier than the previous failure, + // so that we don't fail, then immediately start reporting an earlier status + reporter := &statusWrapper{w: w, previousStatus: w.status.DeepCopy()} + if err := w.syncPayload(ctx, work, reporter, config); err != nil { + klog.V(2).Infof("loadUpdatedPayload syncPayload err=%v", err) + return err + } + return nil +} + // Update instructs the sync worker to start synchronizing the desired update. The reconciling boolean is // ignored unless this is the first time that Update has been called. The returned status represents either // the initial state or whatever the last recorded status was. // TODO: in the future it may be desirable for changes that alter desired to wait briefly before returning, // giving the sync loop the opportunity to observe our change and begin working towards it. -func (w *SyncWorker) Update(generation int64, desired configv1.Update, overrides []configv1.ComponentOverride, state payload.State) *SyncWorkerStatus { +func (w *SyncWorker) Update(ctx context.Context, generation int64, desired configv1.Update, overrides []configv1.ComponentOverride, state payload.State, cvoOptrName string, lister configlistersv1.ClusterVersionLister) *SyncWorkerStatus { w.lock.Lock() defer w.lock.Unlock() @@ -261,6 +422,13 @@ func (w *SyncWorker) Update(generation int64, desired configv1.Update, overrides } } + w.lock.Unlock() + err := w.loadUpdatedPayload(ctx, work, cvoOptrName, lister) + w.lock.Lock() + if err != nil { + return w.status.DeepCopy() + } + // notify the sync loop that we changed config if w.cancelFn != nil { klog.V(2).Info("Cancel the sync worker's current loop") @@ -279,9 +447,9 @@ func (w *SyncWorker) Update(generation int64, desired configv1.Update, overrides // Start periodically invokes run, detecting whether content has changed. // It is edge-triggered when Update() is invoked and level-driven after the -// syncOnce() has succeeded for a given input (we are said to be "reconciling"). +// apply() has succeeded for a given input (we are said to be "reconciling"). func (w *SyncWorker) Start(ctx context.Context, maxWorkers int, cvoOptrName string, lister configlistersv1.ClusterVersionLister) { - klog.V(2).Infof("Starting sync worker") + klog.V(2).Infof("Start: starting sync worker") work := &SyncWork{} @@ -345,15 +513,11 @@ func (w *SyncWorker) Start(ctx context.Context, maxWorkers int, cvoOptrName stri w.lock.Unlock() defer cancelFn() - config, err := lister.Get(cvoOptrName) - if err != nil { - return err - } // reporter hides status updates that occur earlier than the previous failure, // so that we don't fail, then immediately start reporting an earlier status reporter := &statusWrapper{w: w, previousStatus: w.Status()} klog.V(2).Infof("Previous sync status: %#v", reporter.previousStatus) - return w.syncOnce(ctx, work, maxWorkers, reporter, config) + return w.apply(ctx, work, maxWorkers, reporter) }() if err != nil { // backoff wait @@ -393,8 +557,19 @@ type statusWrapper struct { previousStatus *SyncWorkerStatus } +func (w *statusWrapper) ValidPayloadStatus(release configv1.Release) bool { + return w.previousStatus.loadPayloadStatus.Release.Image == release.Image +} + +func (w *statusWrapper) ReportPayload(payloadStatus LoadPayloadStatus) { + status := w.previousStatus + status.loadPayloadStatus = payloadStatus + w.w.updateStatus(*status) +} + func (w *statusWrapper) Report(status SyncWorkerStatus) { p := w.previousStatus + status.loadPayloadStatus = p.loadPayloadStatus var fractionComplete float32 if status.Total > 0 { fractionComplete = float32(status.Done) / float32(status.Total) @@ -548,136 +723,17 @@ func (w *SyncWorker) Status() *SyncWorkerStatus { return w.status.DeepCopy() } -// sync retrieves the image and applies it to the server, returning an error if -// the update could not be completely applied. The status is updated as we progress. -// Cancelling the context will abort the execution of the sync. -func (w *SyncWorker) syncOnce(ctx context.Context, work *SyncWork, maxWorkers int, reporter StatusReporter, clusterVersion *configv1.ClusterVersion) error { - desired := configv1.Release{ - Version: work.Desired.Version, - Image: work.Desired.Image, - } - klog.V(2).Infof("Running sync %s (force=%t) on generation %d in state %s at attempt %d", versionString(desired), work.Desired.Force, work.Generation, work.State, work.Attempt) +// apply applies the current payload to the server, executing in parallel if maxWorkers is set greater +// than 1, returning an error if the update could not be completely applied. The status is updated as we +// progress. Cancelling the context will abort the execution of apply. +func (w *SyncWorker) apply(ctx context.Context, work *SyncWork, maxWorkers int, reporter StatusReporter) error { + klog.V(2).Infof("apply: %s on generation %d in state %s at attempt %d", work.Desired.Version, work.Generation, work.State, work.Attempt) if work.Attempt == 0 { payload.InitCOUpdateStartTimes() } + payloadUpdate := w.payload - // cache the payload until the release image changes - validPayload := w.payload - if validPayload != nil && validPayload.Release.Image == desired.Image { - // possibly complain here if Version, etc. diverges from the payload content - desired = validPayload.Release - } else if validPayload == nil || !equalUpdate(configv1.Update{Image: validPayload.Release.Image}, configv1.Update{Image: desired.Image}) { - klog.V(2).Infof("Loading payload") - cvoObjectRef := &corev1.ObjectReference{APIVersion: "config.openshift.io/v1", Kind: "ClusterVersion", Name: "version", Namespace: "openshift-cluster-version"} - w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeNormal, "RetrievePayload", "retrieving payload version=%q image=%q", desired.Version, desired.Image) - reporter.Report(SyncWorkerStatus{ - Generation: work.Generation, - Step: "RetrievePayload", - Initial: work.State.Initializing(), - Reconciling: work.State.Reconciling(), - Actual: desired, - }) - info, err := w.retriever.RetrievePayload(ctx, work.Desired) - if err != nil { - w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeWarning, "RetrievePayloadFailed", "retrieving payload failed version=%q image=%q failure=%v", desired.Version, desired.Image, err) - reporter.Report(SyncWorkerStatus{ - Generation: work.Generation, - Failure: err, - Step: "RetrievePayload", - Initial: work.State.Initializing(), - Reconciling: work.State.Reconciling(), - Actual: desired, - }) - return err - } - - w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeNormal, "VerifyPayload", "verifying payload version=%q image=%q", desired.Version, desired.Image) - payloadUpdate, err := payload.LoadUpdate(info.Directory, desired.Image, w.exclude, w.includeTechPreview, w.clusterProfile) - if err != nil { - w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeWarning, "VerifyPayloadFailed", "verifying payload failed version=%q image=%q failure=%v", desired.Version, desired.Image, err) - reporter.Report(SyncWorkerStatus{ - Generation: work.Generation, - Failure: err, - Step: "VerifyPayload", - Initial: work.State.Initializing(), - Reconciling: work.State.Reconciling(), - Actual: desired, - Verified: info.Verified, - }) - return err - } - - payloadUpdate.VerifiedImage = info.Verified - payloadUpdate.LoadedAt = time.Now() - - if work.Desired.Version == "" { - work.Desired.Version = payloadUpdate.Release.Version - desired.Version = payloadUpdate.Release.Version - } else if payloadUpdate.Release.Version != work.Desired.Version { - err = fmt.Errorf("release image version %s does not match the expected upstream version %s", payloadUpdate.Release.Version, work.Desired.Version) - w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeWarning, "VerifyPayloadVersionFailed", "verifying payload failed version=%q image=%q failure=%v", work.Desired.Version, work.Desired.Image, err) - reporter.Report(SyncWorkerStatus{ - Generation: work.Generation, - Failure: err, - Step: "VerifyPayloadVersion", - Initial: work.State.Initializing(), - Reconciling: work.State.Reconciling(), - Actual: desired, - Verified: info.Verified, - }) - return err - } - - // need to make sure the payload is only set when the preconditions have been successful - if len(w.preconditions) == 0 { - klog.V(2).Info("No preconditions configured.") - } else if info.Local { - klog.V(2).Info("Skipping preconditions for a local operator image payload.") - } else { - reporter.Report(SyncWorkerStatus{ - Generation: work.Generation, - Step: "PreconditionChecks", - Initial: work.State.Initializing(), - Reconciling: work.State.Reconciling(), - Actual: desired, - Verified: info.Verified, - }) - if block, err := precondition.Summarize(w.preconditions.RunAll(ctx, precondition.ReleaseContext{ - DesiredVersion: payloadUpdate.Release.Version, - }), work.Desired.Force); err != nil { - klog.V(2).Infof("Precondition error (force %t, block %t): %v", work.Desired.Force, block, err) - if block { - w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeWarning, "PreconditionBlock", "preconditions failed for payload loaded version=%q image=%q: %v", desired.Version, desired.Image, err) - reporter.Report(SyncWorkerStatus{ - Generation: work.Generation, - Failure: err, - Step: "PreconditionChecks", - Initial: work.State.Initializing(), - Reconciling: work.State.Reconciling(), - Actual: desired, - Verified: info.Verified, - }) - return err - } else { - w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeWarning, "PreconditionWarn", "precondition warning for payload loaded version=%q image=%q: %v", desired.Version, desired.Image, err) - } - } - w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeNormal, "PreconditionsPassed", "preconditions passed for payload loaded version=%q image=%q", desired.Version, desired.Image) - } - - w.payload = payloadUpdate - w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeNormal, "PayloadLoaded", "payload loaded version=%q image=%q", desired.Version, desired.Image) - klog.V(2).Infof("Payload loaded from %s with hash %s", desired.Image, payloadUpdate.ManifestHash) - } - - return w.apply(ctx, w.payload, work, maxWorkers, reporter) -} - -// apply updates the server with the contents of the provided image or returns an error. -// Cancelling the context will abort the execution of the sync. Will be executed in parallel if -// maxWorkers is set greater than 1. -func (w *SyncWorker) apply(ctx context.Context, payloadUpdate *payload.Update, work *SyncWork, maxWorkers int, reporter StatusReporter) error { // encapsulate status reporting in a threadsafe updater total := len(payloadUpdate.Manifests) cr := &consistentReporter{ @@ -840,7 +896,6 @@ func (r *consistentReporter) Update() { metricPayload.WithLabelValues(r.version, "pending").Set(float64(r.total - r.done)) metricPayload.WithLabelValues(r.version, "applied").Set(float64(r.done)) copied := r.status - copied.Step = "ApplyResources" copied.Done = r.done copied.Total = r.total r.reporter.Report(copied) @@ -850,7 +905,6 @@ func (r *consistentReporter) Error(err error) { r.lock.Lock() defer r.lock.Unlock() copied := r.status - copied.Step = "ApplyResources" copied.Done = r.done copied.Total = r.total if !isContextError(err) { @@ -865,7 +919,6 @@ func (r *consistentReporter) Errors(errs []error) error { r.lock.Lock() defer r.lock.Unlock() copied := r.status - copied.Step = "ApplyResources" copied.Done = r.done copied.Total = r.total if err != nil { diff --git a/pkg/cvo/sync_worker_test.go b/pkg/cvo/sync_worker_test.go index ddce260d0..8a3de52a3 100644 --- a/pkg/cvo/sync_worker_test.go +++ b/pkg/cvo/sync_worker_test.go @@ -113,16 +113,15 @@ func Test_statusWrapper_ReportGeneration(t *testing.T) { next SyncWorkerStatus want int64 }{{ - previous: SyncWorkerStatus{Generation: 1, Step: "Apply", Done: 10, Total: 100}, - next: SyncWorkerStatus{Step: "RetreivePayload"}, + previous: SyncWorkerStatus{Generation: 1, Done: 10, Total: 100}, want: 1, }, { - previous: SyncWorkerStatus{Generation: 1, Step: "Apply", Done: 10, Total: 100}, - next: SyncWorkerStatus{Generation: 2, Step: "Apply", Done: 50, Total: 100}, + previous: SyncWorkerStatus{Generation: 1, Done: 10, Total: 100}, + next: SyncWorkerStatus{Generation: 2, Done: 50, Total: 100}, want: 2, }, { - previous: SyncWorkerStatus{Generation: 5, Step: "Apply", Done: 70, Total: 100}, - next: SyncWorkerStatus{Generation: 2, Step: "Apply", Done: 50, Total: 100}, + previous: SyncWorkerStatus{Generation: 5, Done: 70, Total: 100}, + next: SyncWorkerStatus{Generation: 2, Done: 50, Total: 100}, want: 2, }} for _, tt := range tests {