Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/cvo/cvo.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ func (optr *Operator) InitializeFromPayload(restConfig *rest.Config, burstRestCo
Duration: time.Second * 10,
Factor: 1.3,
Steps: 3,
Cap: time.Second * 15,
},
optr.exclude,
optr.eventRecorder,
Expand Down
4 changes: 1 addition & 3 deletions pkg/cvo/cvo_scenarios_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2662,9 +2662,6 @@ func TestCVO_ParallelError(t *testing.T) {

// Step 3: Cancel after we've accumulated 2/3 errors
//
time.Sleep(100 * time.Millisecond)
cancel()
//
// verify we observe the remaining changes in the first sync
for status := range worker.StatusCh() {
if status.Failure == nil {
Expand Down Expand Up @@ -2707,6 +2704,7 @@ func TestCVO_ParallelError(t *testing.T) {
}
break
}
cancel()
verifyAllStatus(t, worker.StatusCh())

client.ClearActions()
Expand Down
10 changes: 2 additions & 8 deletions pkg/cvo/internal/operatorstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,21 +127,15 @@ func checkOperatorHealth(ctx context.Context, client ClusterOperatorsGetter, exp
if len(expected.Status.Versions) == 0 {
return &payload.UpdateError{
UpdateEffect: payload.UpdateEffectFail,
Reason: "ClusterOperatorNotAvailable",
Reason: "ClusterOperatorNoVersions",
Message: fmt.Sprintf("Cluster operator %s does not declare expected versions", expected.Name),
Name: expected.Name,
}
}

actual, err := client.Get(ctx, expected.Name)
if err != nil {
return &payload.UpdateError{
Nested: err,
UpdateEffect: payload.UpdateEffectNone,
Reason: "ClusterOperatorNotAvailable",
Message: fmt.Sprintf("Cluster operator %s has not yet reported success", expected.Name),
Name: expected.Name,
}
return err
}

// undone is a sorted slice of transition messages for incomplete operands.
Expand Down
8 changes: 1 addition & 7 deletions pkg/cvo/internal/operatorstatus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,7 @@ func Test_checkOperatorHealth(t *testing.T) {
}},
},
},
expErr: &payload.UpdateError{
Nested: apierrors.NewNotFound(schema.GroupResource{Resource: "clusteroperator"}, "test-co"),
UpdateEffect: payload.UpdateEffectNone,
Reason: "ClusterOperatorNotAvailable",
Message: "Cluster operator test-co has not yet reported success",
Name: "test-co",
},
expErr: apierrors.NewNotFound(schema.GroupResource{Resource: "clusteroperator"}, "test-co"),
}, {
name: "cluster operator reporting available=true and degraded=false, but no versions",
actual: &configv1.ClusterOperator{
Expand Down
17 changes: 12 additions & 5 deletions pkg/cvo/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,15 @@ import (

func Test_SyncWorker_apply(t *testing.T) {
tests := []struct {
name string
manifests []string
reactors map[action]error
cancelAfter int

check func(*testing.T, []action)
wantErr bool
}{{
name: "successful creation",
manifests: []string{
`{
"apiVersion": "test.cvo.io/v1",
Expand Down Expand Up @@ -75,6 +77,7 @@ func Test_SyncWorker_apply(t *testing.T) {
}
},
}, {
name: "unknown resource failures",
manifests: []string{
`{
"apiVersion": "test.cvo.io/v1",
Expand All @@ -99,18 +102,21 @@ func Test_SyncWorker_apply(t *testing.T) {
cancelAfter: 2,
wantErr: true,
check: func(t *testing.T, actions []action) {
if len(actions) != 3 {
if len(actions) != 2 {
spew.Dump(actions)
t.Fatalf("unexpected %d actions", len(actions))
}

if got, exp := actions[0], (newAction(schema.GroupVersionKind{Group: "test.cvo.io", Version: "v1", Kind: "TestA"}, "default", "testa")); !reflect.DeepEqual(got, exp) {
t.Fatalf("%s", diff.ObjectReflectDiff(exp, got))
exp := newAction(schema.GroupVersionKind{Group: "test.cvo.io", Version: "v1", Kind: "TestA"}, "default", "testa")
for i, got := range actions {
if !reflect.DeepEqual(got, exp) {
t.Fatalf("unexpected action %d: %s", i, diff.ObjectReflectDiff(exp, got))
}
}
},
}}
for idx, test := range tests {
t.Run(fmt.Sprintf("test#%d", idx), func(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
var manifests []manifest.Manifest
for _, s := range test.manifests {
m := manifest.Manifest{}
Expand All @@ -134,6 +140,7 @@ func Test_SyncWorker_apply(t *testing.T) {
testMapper.AddToMap(resourcebuilder.Mapper)

worker := &SyncWorker{eventRecorder: record.NewFakeRecorder(100)}
worker.backoff.Steps = 2
worker.builder = NewResourceBuilder(nil, nil, nil, nil)

ctx, cancel := context.WithCancel(context.Background())
Expand Down
5 changes: 4 additions & 1 deletion pkg/cvo/sync_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -678,8 +678,11 @@ func (w *SyncWorker) apply(ctx context.Context, payloadUpdate *payload.Update, w

var tasks []*payload.Task
backoff := w.backoff
if backoff.Steps == 0 {
return fmt.Errorf("SyncWorker requires at least one backoff step to apply any manifests")
}
if backoff.Steps > 1 && work.State == payload.InitializingPayload {
backoff = wait.Backoff{Steps: 4, Factor: 2, Duration: time.Second}
backoff = wait.Backoff{Steps: 4, Factor: 2, Duration: time.Second, Cap: 15 * time.Second}
}
for i := range payloadUpdate.Manifests {
tasks = append(tasks, &payload.Task{
Expand Down
80 changes: 40 additions & 40 deletions pkg/payload/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,51 +95,46 @@ func (st *Task) String() string {
return fmt.Sprintf("%s \"%s/%s\" (%d of %d)", strings.ToLower(st.Manifest.GVK.Kind), ns, name, st.Index, st.Total)
}

// Run attempts to create the provided object until it succeeds or context is cancelled. It returns the
// last error if context is cancelled.
// Run attempts to create the provided object until it:
//
// * Succeeds, or
// * Fails with an UpdateError, because these are unlikely to improve quickly on retry, or
// * The context is canceled, in which case it returns the most recent error.
func (st *Task) Run(ctx context.Context, version string, builder ResourceBuilder, state State) error {
var lastErr error
backoff := st.Backoff
maxDuration := 15 * time.Second // TODO: fold back into Backoff in 1.13
for {
// attempt the apply, waiting as long as necessary
err := builder.Apply(ctx, st.Manifest, state)
err := wait.ExponentialBackoffWithContext(ctx, backoff, func() (done bool, err error) {
err = builder.Apply(ctx, st.Manifest, state)
if err == nil {
return nil
return true, nil
}

lastErr = err
utilruntime.HandleError(errors.Wrapf(err, "error running apply for %s", st))
metricPayloadErrors.WithLabelValues(version).Inc()

// TODO: this code will become easier in Kube 1.13 because Backoff now supports max
d := time.Duration(float64(backoff.Duration) * backoff.Factor)
if d > maxDuration {
d = maxDuration
}
d = wait.Jitter(d, backoff.Jitter)

// sleep or wait for cancellation
select {
case <-time.After(d):
continue
case <-ctx.Done():
if uerr, ok := lastErr.(*UpdateError); ok {
uerr.Task = st.Copy()
return uerr
}
reason, cause := reasonForPayloadSyncError(lastErr)
if len(cause) > 0 {
cause = ": " + cause
}
return &UpdateError{
Nested: lastErr,
Reason: reason,
Message: fmt.Sprintf("Could not update %s%s", st, cause),

Task: st.Copy(),
}
if updateErr, ok := err.(*UpdateError); ok {
updateErr.Task = st.Copy()
return false, updateErr // failing fast for UpdateError
}
return false, nil
})
if err == nil {
return nil
}
if lastErr != nil {
err = lastErr
}
if _, ok := err.(*UpdateError); ok {
return err
}
reason, cause := reasonForPayloadSyncError(err)
if len(cause) > 0 {
cause = ": " + cause
}
return &UpdateError{
Nested: err,
Reason: reason,
Message: fmt.Sprintf("Could not update %s%s", st, cause),
Task: st.Copy(),
}
}

Expand Down Expand Up @@ -177,7 +172,7 @@ func (e *UpdateError) Cause() error {
return e.Nested
}

// reasonForUpdateError provides a succint explanation of a known error type for use in a human readable
// reasonForPayloadSyncError provides a succint explanation of a known error type for use in a human readable
// message during update. Since all objects in the image should be successfully applied, messages
// should direct the reader (likely a cluster administrator) to a possible cause in their own config.
func reasonForPayloadSyncError(err error) (string, string) {
Expand Down Expand Up @@ -249,11 +244,16 @@ func SummaryForReason(reason, name string) string {
return "a cluster operator is degraded"
case "ClusterOperatorNotAvailable":
if len(name) > 0 {
return fmt.Sprintf("the cluster operator %s has not yet successfully rolled out", name)
return fmt.Sprintf("the cluster operator %s is not available", name)
}
return "a cluster operator has not yet rolled out"
return "a cluster operator is not available"
case "ClusterOperatorsNotAvailable":
return "some cluster operators have not yet rolled out"
return "some cluster operators are not available"
case "ClusterOperatorNoVersions":
if len(name) > 0 {
return fmt.Sprintf("the cluster operator %s does not declare expected versions", name)
}
return "a cluster operator does not declare expected versions"
case "WorkloadNotAvailable":
if len(name) > 0 {
return fmt.Sprintf("the workload %s has not yet successfully rolled out", name)
Expand Down