diff --git a/pkg/cvo/cvo.go b/pkg/cvo/cvo.go index 12fd828ff3..f8ef8ecfac 100644 --- a/pkg/cvo/cvo.go +++ b/pkg/cvo/cvo.go @@ -262,6 +262,7 @@ func (optr *Operator) InitializeFromPayload(restConfig *rest.Config, burstRestCo Duration: time.Second * 10, Factor: 1.3, Steps: 3, + Cap: time.Second * 15, }, optr.exclude, optr.eventRecorder, diff --git a/pkg/cvo/cvo_scenarios_test.go b/pkg/cvo/cvo_scenarios_test.go index 704cb5bd2c..284163c1bc 100644 --- a/pkg/cvo/cvo_scenarios_test.go +++ b/pkg/cvo/cvo_scenarios_test.go @@ -2662,9 +2662,6 @@ func TestCVO_ParallelError(t *testing.T) { // Step 3: Cancel after we've accumulated 2/3 errors // - time.Sleep(100 * time.Millisecond) - cancel() - // // verify we observe the remaining changes in the first sync for status := range worker.StatusCh() { if status.Failure == nil { @@ -2707,6 +2704,7 @@ func TestCVO_ParallelError(t *testing.T) { } break } + cancel() verifyAllStatus(t, worker.StatusCh()) client.ClearActions() diff --git a/pkg/cvo/internal/operatorstatus.go b/pkg/cvo/internal/operatorstatus.go index cf7cb8057d..759cd298ac 100644 --- a/pkg/cvo/internal/operatorstatus.go +++ b/pkg/cvo/internal/operatorstatus.go @@ -127,7 +127,7 @@ func checkOperatorHealth(ctx context.Context, client ClusterOperatorsGetter, exp if len(expected.Status.Versions) == 0 { return &payload.UpdateError{ UpdateEffect: payload.UpdateEffectFail, - Reason: "ClusterOperatorNotAvailable", + Reason: "ClusterOperatorNoVersions", Message: fmt.Sprintf("Cluster operator %s does not declare expected versions", expected.Name), Name: expected.Name, } @@ -135,13 +135,7 @@ func checkOperatorHealth(ctx context.Context, client ClusterOperatorsGetter, exp actual, err := client.Get(ctx, expected.Name) if err != nil { - return &payload.UpdateError{ - Nested: err, - UpdateEffect: payload.UpdateEffectNone, - Reason: "ClusterOperatorNotAvailable", - Message: fmt.Sprintf("Cluster operator %s has not yet reported success", expected.Name), - Name: expected.Name, - } + return err } // undone is a sorted slice of transition messages for incomplete operands. diff --git a/pkg/cvo/internal/operatorstatus_test.go b/pkg/cvo/internal/operatorstatus_test.go index 8877a2ecd0..47b373e09e 100644 --- a/pkg/cvo/internal/operatorstatus_test.go +++ b/pkg/cvo/internal/operatorstatus_test.go @@ -42,13 +42,7 @@ func Test_checkOperatorHealth(t *testing.T) { }}, }, }, - expErr: &payload.UpdateError{ - Nested: apierrors.NewNotFound(schema.GroupResource{Resource: "clusteroperator"}, "test-co"), - UpdateEffect: payload.UpdateEffectNone, - Reason: "ClusterOperatorNotAvailable", - Message: "Cluster operator test-co has not yet reported success", - Name: "test-co", - }, + expErr: apierrors.NewNotFound(schema.GroupResource{Resource: "clusteroperator"}, "test-co"), }, { name: "cluster operator reporting available=true and degraded=false, but no versions", actual: &configv1.ClusterOperator{ diff --git a/pkg/cvo/sync_test.go b/pkg/cvo/sync_test.go index dec9a48d3a..28b9c9dcca 100644 --- a/pkg/cvo/sync_test.go +++ b/pkg/cvo/sync_test.go @@ -35,6 +35,7 @@ import ( func Test_SyncWorker_apply(t *testing.T) { tests := []struct { + name string manifests []string reactors map[action]error cancelAfter int @@ -42,6 +43,7 @@ func Test_SyncWorker_apply(t *testing.T) { check func(*testing.T, []action) wantErr bool }{{ + name: "successful creation", manifests: []string{ `{ "apiVersion": "test.cvo.io/v1", @@ -75,6 +77,7 @@ func Test_SyncWorker_apply(t *testing.T) { } }, }, { + name: "unknown resource failures", manifests: []string{ `{ "apiVersion": "test.cvo.io/v1", @@ -99,18 +102,21 @@ func Test_SyncWorker_apply(t *testing.T) { cancelAfter: 2, wantErr: true, check: func(t *testing.T, actions []action) { - if len(actions) != 3 { + if len(actions) != 2 { spew.Dump(actions) t.Fatalf("unexpected %d actions", len(actions)) } - if got, exp := actions[0], (newAction(schema.GroupVersionKind{Group: "test.cvo.io", Version: "v1", Kind: "TestA"}, "default", "testa")); !reflect.DeepEqual(got, exp) { - t.Fatalf("%s", diff.ObjectReflectDiff(exp, got)) + exp := newAction(schema.GroupVersionKind{Group: "test.cvo.io", Version: "v1", Kind: "TestA"}, "default", "testa") + for i, got := range actions { + if !reflect.DeepEqual(got, exp) { + t.Fatalf("unexpected action %d: %s", i, diff.ObjectReflectDiff(exp, got)) + } } }, }} - for idx, test := range tests { - t.Run(fmt.Sprintf("test#%d", idx), func(t *testing.T) { + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { var manifests []manifest.Manifest for _, s := range test.manifests { m := manifest.Manifest{} @@ -134,6 +140,7 @@ func Test_SyncWorker_apply(t *testing.T) { testMapper.AddToMap(resourcebuilder.Mapper) worker := &SyncWorker{eventRecorder: record.NewFakeRecorder(100)} + worker.backoff.Steps = 2 worker.builder = NewResourceBuilder(nil, nil, nil, nil) ctx, cancel := context.WithCancel(context.Background()) diff --git a/pkg/cvo/sync_worker.go b/pkg/cvo/sync_worker.go index 38be867064..7758c43a23 100644 --- a/pkg/cvo/sync_worker.go +++ b/pkg/cvo/sync_worker.go @@ -678,8 +678,11 @@ func (w *SyncWorker) apply(ctx context.Context, payloadUpdate *payload.Update, w var tasks []*payload.Task backoff := w.backoff + if backoff.Steps == 0 { + return fmt.Errorf("SyncWorker requires at least one backoff step to apply any manifests") + } if backoff.Steps > 1 && work.State == payload.InitializingPayload { - backoff = wait.Backoff{Steps: 4, Factor: 2, Duration: time.Second} + backoff = wait.Backoff{Steps: 4, Factor: 2, Duration: time.Second, Cap: 15 * time.Second} } for i := range payloadUpdate.Manifests { tasks = append(tasks, &payload.Task{ diff --git a/pkg/payload/task.go b/pkg/payload/task.go index 91bc3110a2..83ed7f6c9f 100644 --- a/pkg/payload/task.go +++ b/pkg/payload/task.go @@ -95,51 +95,46 @@ func (st *Task) String() string { return fmt.Sprintf("%s \"%s/%s\" (%d of %d)", strings.ToLower(st.Manifest.GVK.Kind), ns, name, st.Index, st.Total) } -// Run attempts to create the provided object until it succeeds or context is cancelled. It returns the -// last error if context is cancelled. +// Run attempts to create the provided object until it: +// +// * Succeeds, or +// * Fails with an UpdateError, because these are unlikely to improve quickly on retry, or +// * The context is canceled, in which case it returns the most recent error. func (st *Task) Run(ctx context.Context, version string, builder ResourceBuilder, state State) error { var lastErr error backoff := st.Backoff - maxDuration := 15 * time.Second // TODO: fold back into Backoff in 1.13 - for { - // attempt the apply, waiting as long as necessary - err := builder.Apply(ctx, st.Manifest, state) + err := wait.ExponentialBackoffWithContext(ctx, backoff, func() (done bool, err error) { + err = builder.Apply(ctx, st.Manifest, state) if err == nil { - return nil + return true, nil } - lastErr = err utilruntime.HandleError(errors.Wrapf(err, "error running apply for %s", st)) metricPayloadErrors.WithLabelValues(version).Inc() - - // TODO: this code will become easier in Kube 1.13 because Backoff now supports max - d := time.Duration(float64(backoff.Duration) * backoff.Factor) - if d > maxDuration { - d = maxDuration - } - d = wait.Jitter(d, backoff.Jitter) - - // sleep or wait for cancellation - select { - case <-time.After(d): - continue - case <-ctx.Done(): - if uerr, ok := lastErr.(*UpdateError); ok { - uerr.Task = st.Copy() - return uerr - } - reason, cause := reasonForPayloadSyncError(lastErr) - if len(cause) > 0 { - cause = ": " + cause - } - return &UpdateError{ - Nested: lastErr, - Reason: reason, - Message: fmt.Sprintf("Could not update %s%s", st, cause), - - Task: st.Copy(), - } + if updateErr, ok := err.(*UpdateError); ok { + updateErr.Task = st.Copy() + return false, updateErr // failing fast for UpdateError } + return false, nil + }) + if err == nil { + return nil + } + if lastErr != nil { + err = lastErr + } + if _, ok := err.(*UpdateError); ok { + return err + } + reason, cause := reasonForPayloadSyncError(err) + if len(cause) > 0 { + cause = ": " + cause + } + return &UpdateError{ + Nested: err, + Reason: reason, + Message: fmt.Sprintf("Could not update %s%s", st, cause), + Task: st.Copy(), } } @@ -177,7 +172,7 @@ func (e *UpdateError) Cause() error { return e.Nested } -// reasonForUpdateError provides a succint explanation of a known error type for use in a human readable +// reasonForPayloadSyncError provides a succint explanation of a known error type for use in a human readable // message during update. Since all objects in the image should be successfully applied, messages // should direct the reader (likely a cluster administrator) to a possible cause in their own config. func reasonForPayloadSyncError(err error) (string, string) { @@ -249,11 +244,16 @@ func SummaryForReason(reason, name string) string { return "a cluster operator is degraded" case "ClusterOperatorNotAvailable": if len(name) > 0 { - return fmt.Sprintf("the cluster operator %s has not yet successfully rolled out", name) + return fmt.Sprintf("the cluster operator %s is not available", name) } - return "a cluster operator has not yet rolled out" + return "a cluster operator is not available" case "ClusterOperatorsNotAvailable": - return "some cluster operators have not yet rolled out" + return "some cluster operators are not available" + case "ClusterOperatorNoVersions": + if len(name) > 0 { + return fmt.Sprintf("the cluster operator %s does not declare expected versions", name) + } + return "a cluster operator does not declare expected versions" case "WorkloadNotAvailable": if len(name) > 0 { return fmt.Sprintf("the workload %s has not yet successfully rolled out", name)