Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 20 additions & 27 deletions pkg/cvo/sync_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,8 +626,8 @@ func (w *SyncWorker) apply(ctx context.Context, payloadUpdate *payload.Update, w
if precreateObjects {
payload.RunGraph(ctx, graph, 8, func(ctx context.Context, tasks []*payload.Task) error {
for _, task := range tasks {
if contextIsCancelled(ctx) {
return cr.CancelError()
if err := ctx.Err(); err != nil {
return cr.ContextError(err)
}
if task.Manifest.GVK != configv1.SchemeGroupVersion.WithKind("ClusterOperator") {
continue
Expand All @@ -645,8 +645,8 @@ func (w *SyncWorker) apply(ctx context.Context, payloadUpdate *payload.Update, w
// update each object
errs := payload.RunGraph(ctx, graph, maxWorkers, func(ctx context.Context, tasks []*payload.Task) error {
for _, task := range tasks {
if contextIsCancelled(ctx) {
return cr.CancelError()
if err := ctx.Err(); err != nil {
return cr.ContextError(err)
}
cr.Update()

Expand All @@ -668,8 +668,10 @@ func (w *SyncWorker) apply(ctx context.Context, payloadUpdate *payload.Update, w
return nil
})
if len(errs) > 0 {
err := cr.Errors(errs)
return err
if err := cr.Errors(errs); err != nil {
return err
}
return errs[0]
}

// update the status
Expand All @@ -690,11 +692,11 @@ func init() {
)
}

type errCanceled struct {
type errContext struct {
err error
}

func (e errCanceled) Error() string { return e.err.Error() }
func (e errContext) Error() string { return e.err.Error() }

// consistentReporter hides the details of calculating the status based on the progress
// of the graph runner.
Expand Down Expand Up @@ -731,7 +733,7 @@ func (r *consistentReporter) Error(err error) {
copied := r.status
copied.Step = "ApplyResources"
copied.Fraction = float32(r.done) / float32(r.total)
if !isCancelledError(err) {
if !isContextError(err) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace this with errors.Is ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dunno if we have support for Unwrap, which is what Is uses. This commit is mostly about adjusting the naming to reflect the current implementation more precisely. Can we punt implementation improvements to future work?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which errors.Is we are talking about here.

https://blog.golang.org/go1.13-errors

copied.Failure = err
}
r.reporter.Report(copied)
Expand All @@ -752,10 +754,10 @@ func (r *consistentReporter) Errors(errs []error) error {
return err
}

func (r *consistentReporter) CancelError() error {
func (r *consistentReporter) ContextError(err error) error {
r.lock.Lock()
defer r.lock.Unlock()
return errCanceled{fmt.Errorf("update was cancelled at %d of %d", r.done, r.total)}
return errContext{fmt.Errorf("update %s at %d of %d", err, r.done, r.total)}
}

func (r *consistentReporter) Complete() {
Expand All @@ -771,11 +773,11 @@ func (r *consistentReporter) Complete() {
r.reporter.Report(copied)
}

func isCancelledError(err error) bool {
func isContextError(err error) bool {
if err == nil {
return false
}
_, ok := err.(errCanceled)
_, ok := err.(errContext)
return ok
}

Expand All @@ -796,11 +798,12 @@ func isImageVerificationError(err error) bool {
// not truly an error (cancellation).
// TODO: take into account install vs upgrade
func summarizeTaskGraphErrors(errs []error) error {
// we ignore cancellation errors since they don't provide good feedback to users and are an internal
// detail of the server
err := errors.FilterOut(errors.NewAggregate(errs), isCancelledError)
// we ignore context errors (canceled or timed out) since they don't
// provide good feedback to users and are an internal detail of the
// server
err := errors.FilterOut(errors.NewAggregate(errs), isContextError)
if err == nil {
klog.V(4).Infof("All errors were cancellation errors: %v", errs)
klog.V(4).Infof("All errors were context errors: %v", errs)
return nil
}
agg, ok := err.(errors.Aggregate)
Expand Down Expand Up @@ -971,16 +974,6 @@ func ownerRefModifier(config *configv1.ClusterVersion) resourcebuilder.MetaV1Obj
}
}

// contextIsCancelled returns true if the provided context is cancelled.
func contextIsCancelled(ctx context.Context) bool {
select {
case <-ctx.Done():
return true
default:
return false
}
}

// runThrottledStatusNotifier invokes fn every time ch is updated, but no more often than once
// every interval. If bucket is non-zero then the channel is throttled like a rate limiter bucket.
func runThrottledStatusNotifier(stopCh <-chan struct{}, interval time.Duration, bucket int, ch <-chan SyncWorkerStatus, fn func()) {
Expand Down