Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 20 additions & 27 deletions pkg/cvo/sync_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,8 +626,8 @@ func (w *SyncWorker) apply(ctx context.Context, payloadUpdate *payload.Update, w
if precreateObjects {
payload.RunGraph(ctx, graph, 8, func(ctx context.Context, tasks []*payload.Task) error {
for _, task := range tasks {
if contextIsCancelled(ctx) {
return cr.CancelError()
if err := ctx.Err(); err != nil {
return cr.ContextError(err)
}
if task.Manifest.GVK != configv1.SchemeGroupVersion.WithKind("ClusterOperator") {
continue
Expand All @@ -645,8 +645,8 @@ func (w *SyncWorker) apply(ctx context.Context, payloadUpdate *payload.Update, w
// update each object
errs := payload.RunGraph(ctx, graph, maxWorkers, func(ctx context.Context, tasks []*payload.Task) error {
for _, task := range tasks {
if contextIsCancelled(ctx) {
return cr.CancelError()
if err := ctx.Err(); err != nil {
return cr.ContextError(err)
}
cr.Update()

Expand All @@ -668,8 +668,10 @@ func (w *SyncWorker) apply(ctx context.Context, payloadUpdate *payload.Update, w
return nil
})
if len(errs) > 0 {
err := cr.Errors(errs)
return err
if err := cr.Errors(errs); err != nil {
return err
}
return errs[0]
}

// update the status
Expand All @@ -690,11 +692,11 @@ func init() {
)
}

type errCanceled struct {
type errContext struct {
err error
}

func (e errCanceled) Error() string { return e.err.Error() }
func (e errContext) Error() string { return e.err.Error() }

// consistentReporter hides the details of calculating the status based on the progress
// of the graph runner.
Expand Down Expand Up @@ -731,7 +733,7 @@ func (r *consistentReporter) Error(err error) {
copied := r.status
copied.Step = "ApplyResources"
copied.Fraction = float32(r.done) / float32(r.total)
if !isCancelledError(err) {
if !isContextError(err) {
copied.Failure = err
}
r.reporter.Report(copied)
Expand All @@ -752,10 +754,10 @@ func (r *consistentReporter) Errors(errs []error) error {
return err
}

func (r *consistentReporter) CancelError() error {
func (r *consistentReporter) ContextError(err error) error {
r.lock.Lock()
defer r.lock.Unlock()
return errCanceled{fmt.Errorf("update was cancelled at %d of %d", r.done, r.total)}
return errContext{fmt.Errorf("update %s at %d of %d", err, r.done, r.total)}
}

func (r *consistentReporter) Complete() {
Expand All @@ -771,11 +773,11 @@ func (r *consistentReporter) Complete() {
r.reporter.Report(copied)
}

func isCancelledError(err error) bool {
func isContextError(err error) bool {
if err == nil {
return false
}
_, ok := err.(errCanceled)
_, ok := err.(errContext)
return ok
}

Expand All @@ -796,11 +798,12 @@ func isImageVerificationError(err error) bool {
// not truly an error (cancellation).
// TODO: take into account install vs upgrade
func summarizeTaskGraphErrors(errs []error) error {
// we ignore cancellation errors since they don't provide good feedback to users and are an internal
// detail of the server
err := errors.FilterOut(errors.NewAggregate(errs), isCancelledError)
// we ignore context errors (canceled or timed out) since they don't
// provide good feedback to users and are an internal detail of the
// server
err := errors.FilterOut(errors.NewAggregate(errs), isContextError)
if err == nil {
klog.V(4).Infof("All errors were cancellation errors: %v", errs)
klog.V(4).Infof("All errors were context errors: %v", errs)
return nil
}
agg, ok := err.(errors.Aggregate)
Expand Down Expand Up @@ -967,16 +970,6 @@ func ownerRefModifier(config *configv1.ClusterVersion) resourcebuilder.MetaV1Obj
}
}

// contextIsCancelled returns true if the provided context is cancelled.
func contextIsCancelled(ctx context.Context) bool {
select {
case <-ctx.Done():
return true
default:
return false
}
}

// runThrottledStatusNotifier invokes fn every time ch is updated, but no more often than once
// every interval. If bucket is non-zero then the channel is throttled like a rate limiter bucket.
func runThrottledStatusNotifier(stopCh <-chan struct{}, interval time.Duration, bucket int, ch <-chan SyncWorkerStatus, fn func()) {
Expand Down