diff --git a/pkg/cincinnati/cincinnati.go b/pkg/cincinnati/cincinnati.go index 795a2281cc..6d09c3032c 100644 --- a/pkg/cincinnati/cincinnati.go +++ b/pkg/cincinnati/cincinnati.go @@ -1,12 +1,14 @@ package cincinnati import ( + "context" "crypto/tls" "encoding/json" "fmt" "io/ioutil" "net/http" "net/url" + "time" "github.com/blang/semver/v4" "github.com/google/uuid" @@ -16,6 +18,9 @@ const ( // GraphMediaType is the media-type specified in the HTTP Accept header // of requests sent to the Cincinnati-v1 Graph API. GraphMediaType = "application/json" + + // Timeout when calling upstream Cincinnati stack. + getUpdatesTimeout = time.Minute * 60 ) // Client is a Cincinnati client which can be used to fetch update graphs from @@ -58,7 +63,7 @@ func (err *Error) Error() string { // finding all of the children. These children are the available updates for // the current version and their payloads indicate from where the actual update // image can be downloaded. -func (c Client) GetUpdates(uri *url.URL, arch string, channel string, version semver.Version) ([]Update, error) { +func (c Client) GetUpdates(ctx context.Context, uri *url.URL, arch string, channel string, version semver.Version) ([]Update, error) { transport := http.Transport{} // Prepare parametrized cincinnati query. queryParams := uri.Query() @@ -83,7 +88,9 @@ func (c Client) GetUpdates(uri *url.URL, arch string, channel string, version se } client := http.Client{Transport: &transport} - resp, err := client.Do(req) + timeoutCtx, cancel := context.WithTimeout(ctx, getUpdatesTimeout) + defer cancel() + resp, err := client.Do(req.WithContext(timeoutCtx)) if err != nil { return nil, &Error{Reason: "RemoteFailed", Message: err.Error(), cause: err} } diff --git a/pkg/cincinnati/cincinnati_test.go b/pkg/cincinnati/cincinnati_test.go index d91cf4eb66..66a22604e5 100644 --- a/pkg/cincinnati/cincinnati_test.go +++ b/pkg/cincinnati/cincinnati_test.go @@ -1,6 +1,7 @@ package cincinnati import ( + "context" "crypto/tls" "encoding/json" "fmt" @@ -132,7 +133,7 @@ func TestGetUpdates(t *testing.T) { t.Fatal(err) } - updates, err := c.GetUpdates(uri, arch, channelName, semver.MustParse(test.version)) + updates, err := c.GetUpdates(context.Background(), uri, arch, channelName, semver.MustParse(test.version)) if test.err == "" { if err != nil { t.Fatalf("expected nil error, got: %v", err) diff --git a/pkg/cvo/availableupdates.go b/pkg/cvo/availableupdates.go index c9612c8383..d1c2d91758 100644 --- a/pkg/cvo/availableupdates.go +++ b/pkg/cvo/availableupdates.go @@ -1,6 +1,7 @@ package cvo import ( + "context" "crypto/tls" "fmt" "net/url" @@ -23,7 +24,7 @@ const noChannel string = "NoChannel" // syncAvailableUpdates attempts to retrieve the latest updates and update the status of the ClusterVersion // object. It will set the RetrievedUpdates condition. Updates are only checked if it has been more than // the minimumUpdateCheckInterval since the last check. -func (optr *Operator) syncAvailableUpdates(config *configv1.ClusterVersion) error { +func (optr *Operator) syncAvailableUpdates(ctx context.Context, config *configv1.ClusterVersion) error { usedDefaultUpstream := false upstream := string(config.Spec.Upstream) if len(upstream) == 0 { @@ -45,7 +46,7 @@ func (optr *Operator) syncAvailableUpdates(config *configv1.ClusterVersion) erro return err } - updates, condition := calculateAvailableUpdatesStatus(string(config.Spec.ClusterID), proxyURL, tlsConfig, upstream, arch, channel, optr.releaseVersion) + updates, condition := calculateAvailableUpdatesStatus(ctx, string(config.Spec.ClusterID), proxyURL, tlsConfig, upstream, arch, channel, optr.releaseVersion) if usedDefaultUpstream { upstream = "" @@ -139,7 +140,7 @@ func (optr *Operator) getAvailableUpdates() *availableUpdates { return optr.availableUpdates } -func calculateAvailableUpdatesStatus(clusterID string, proxyURL *url.URL, tlsConfig *tls.Config, upstream, arch, channel, version string) ([]configv1.Update, configv1.ClusterOperatorStatusCondition) { +func calculateAvailableUpdatesStatus(ctx context.Context, clusterID string, proxyURL *url.URL, tlsConfig *tls.Config, upstream, arch, channel, version string) ([]configv1.Update, configv1.ClusterOperatorStatusCondition) { if len(upstream) == 0 { return nil, configv1.ClusterOperatorStatusCondition{ Type: configv1.RetrievedUpdates, Status: configv1.ConditionFalse, Reason: "NoUpstream", @@ -193,7 +194,7 @@ func calculateAvailableUpdatesStatus(clusterID string, proxyURL *url.URL, tlsCon } } - updates, err := cincinnati.NewClient(uuid, proxyURL, tlsConfig).GetUpdates(upstreamURI, arch, channel, currentVersion) + updates, err := cincinnati.NewClient(uuid, proxyURL, tlsConfig).GetUpdates(ctx, upstreamURI, arch, channel, currentVersion) if err != nil { klog.V(2).Infof("Upstream server %s could not return available updates: %v", upstream, err) if updateError, ok := err.(*cincinnati.Error); ok { diff --git a/pkg/cvo/cvo.go b/pkg/cvo/cvo.go index c9f3db1f9e..f8670b6aa2 100644 --- a/pkg/cvo/cvo.go +++ b/pkg/cvo/cvo.go @@ -326,19 +326,19 @@ func (optr *Operator) Run(ctx context.Context, workers int) { optr.queue.Add(optr.queueKey()) // start the config sync loop, and have it notify the queue when new status is detected - go runThrottledStatusNotifier(stopCh, optr.statusInterval, 2, optr.configSync.StatusCh(), func() { optr.queue.Add(optr.queueKey()) }) + go runThrottledStatusNotifier(ctx, optr.statusInterval, 2, optr.configSync.StatusCh(), func() { optr.queue.Add(optr.queueKey()) }) go optr.configSync.Start(ctx, 16, optr.name, optr.cvLister) - go wait.Until(func() { optr.worker(ctx, optr.availableUpdatesQueue, optr.availableUpdatesSync) }, time.Second, stopCh) - go wait.Until(func() { optr.worker(ctx, optr.upgradeableQueue, optr.upgradeableSync) }, time.Second, stopCh) - go wait.Until(func() { + go wait.UntilWithContext(ctx, func(ctx context.Context) { optr.worker(ctx, optr.availableUpdatesQueue, optr.availableUpdatesSync) }, time.Second) + go wait.UntilWithContext(ctx, func(ctx context.Context) { optr.worker(ctx, optr.upgradeableQueue, optr.upgradeableSync) }, time.Second) + go wait.UntilWithContext(ctx, func(ctx context.Context) { defer close(workerStopCh) // run the worker, then when the queue is closed sync one final time to flush any pending status - optr.worker(ctx, optr.queue, func(key string) error { return optr.sync(ctx, key) }) + optr.worker(ctx, optr.queue, func(ctx context.Context, key string) error { return optr.sync(ctx, key) }) if err := optr.sync(ctx, optr.queueKey()); err != nil { utilruntime.HandleError(fmt.Errorf("unable to perform final sync: %v", err)) } - }, time.Second, stopCh) + }, time.Second) if optr.signatureStore != nil { go optr.signatureStore.Run(ctx, optr.minimumUpdateCheckInterval*2) } @@ -375,21 +375,21 @@ func (optr *Operator) eventHandler() cache.ResourceEventHandler { } } -func (optr *Operator) worker(ctx context.Context, queue workqueue.RateLimitingInterface, syncHandler func(string) error) { +func (optr *Operator) worker(ctx context.Context, queue workqueue.RateLimitingInterface, syncHandler func(context.Context, string) error) { for processNextWorkItem(ctx, queue, syncHandler, optr.syncFailingStatus) { } } type syncFailingStatusFunc func(ctx context.Context, config *configv1.ClusterVersion, err error) error -func processNextWorkItem(ctx context.Context, queue workqueue.RateLimitingInterface, syncHandler func(string) error, syncFailingStatus syncFailingStatusFunc) bool { +func processNextWorkItem(ctx context.Context, queue workqueue.RateLimitingInterface, syncHandler func(context.Context, string) error, syncFailingStatus syncFailingStatusFunc) bool { key, quit := queue.Get() if quit { return false } defer queue.Done(key) - err := syncHandler(key.(string)) + err := syncHandler(ctx, key.(string)) handleErr(ctx, queue, err, key, syncFailingStatus) return true } @@ -486,7 +486,7 @@ func (optr *Operator) sync(ctx context.Context, key string) error { // availableUpdatesSync is triggered on cluster version change (and periodic requeues) to // sync available updates. It only modifies cluster version. -func (optr *Operator) availableUpdatesSync(key string) error { +func (optr *Operator) availableUpdatesSync(ctx context.Context, key string) error { startTime := time.Now() klog.V(4).Infof("Started syncing available updates %q (%v)", key, startTime) defer func() { @@ -503,13 +503,12 @@ func (optr *Operator) availableUpdatesSync(key string) error { if errs := validation.ValidateClusterVersion(config); len(errs) > 0 { return nil } - - return optr.syncAvailableUpdates(config) + return optr.syncAvailableUpdates(ctx, config) } // upgradeableSync is triggered on cluster version change (and periodic requeues) to // sync upgradeableCondition. It only modifies cluster version. -func (optr *Operator) upgradeableSync(key string) error { +func (optr *Operator) upgradeableSync(ctx context.Context, key string) error { startTime := time.Now() klog.V(4).Infof("Started syncing upgradeable %q (%v)", key, startTime) defer func() { diff --git a/pkg/cvo/cvo_test.go b/pkg/cvo/cvo_test.go index 6005f659f3..125ac350c4 100644 --- a/pkg/cvo/cvo_test.go +++ b/pkg/cvo/cvo_test.go @@ -256,7 +256,6 @@ func (c *fakeApiExtClient) Patch(ctx context.Context, name string, pt types.Patc } func TestOperator_sync(t *testing.T) { - ctx := context.Background() id := uuid.Must(uuid.NewRandom()).String() tests := []struct { @@ -2271,6 +2270,7 @@ func TestOperator_sync(t *testing.T) { } optr.eventRecorder = record.NewFakeRecorder(100) + ctx := context.Background() err := optr.sync(ctx, optr.queueKey()) if err != nil && tt.wantErr == nil { t.Fatalf("Operator.sync() unexpected error: %v", err) @@ -2651,7 +2651,8 @@ func TestOperator_availableUpdatesSync(t *testing.T) { } old := optr.availableUpdates - err := optr.availableUpdatesSync(optr.queueKey()) + ctx := context.Background() + err := optr.availableUpdatesSync(ctx, optr.queueKey()) if err != nil && tt.wantErr == nil { t.Fatalf("Operator.sync() unexpected error: %v", err) } @@ -3143,7 +3144,8 @@ func TestOperator_upgradeableSync(t *testing.T) { optr.upgradeableChecks = optr.defaultUpgradeableChecks() optr.eventRecorder = record.NewFakeRecorder(100) - err := optr.upgradeableSync(optr.queueKey()) + ctx := context.Background() + err := optr.upgradeableSync(ctx, optr.queueKey()) if err != nil && tt.wantErr == nil { t.Fatalf("Operator.sync() unexpected error: %v", err) } diff --git a/pkg/cvo/sync_worker.go b/pkg/cvo/sync_worker.go index 4d77739668..4a60a3e5b2 100644 --- a/pkg/cvo/sync_worker.go +++ b/pkg/cvo/sync_worker.go @@ -988,21 +988,20 @@ func ownerRefModifier(config *configv1.ClusterVersion) resourcebuilder.MetaV1Obj // runThrottledStatusNotifier invokes fn every time ch is updated, but no more often than once // every interval. If bucket is non-zero then the channel is throttled like a rate limiter bucket. -func runThrottledStatusNotifier(stopCh <-chan struct{}, interval time.Duration, bucket int, ch <-chan SyncWorkerStatus, fn func()) { +func runThrottledStatusNotifier(ctx context.Context, interval time.Duration, bucket int, ch <-chan SyncWorkerStatus, fn func()) { // notify the status change function fairly infrequently to avoid updating // the caller status more frequently than is needed throttle := rate.NewLimiter(rate.Every(interval), bucket) - wait.Until(func() { - ctx := context.Background() + wait.UntilWithContext(ctx, func(ctx context.Context) { var last SyncWorkerStatus for { select { - case <-stopCh: + case <-ctx.Done(): return case next := <-ch: // only throttle if we aren't on an edge if next.Generation == last.Generation && next.Actual == last.Actual && next.Reconciling == last.Reconciling && (next.Failure != nil) == (last.Failure != nil) { - if err := throttle.Wait(ctx); err != nil { + if err := throttle.Wait(ctx); err != nil && err != context.Canceled && err != context.DeadlineExceeded { utilruntime.HandleError(fmt.Errorf("unable to throttle status notification: %v", err)) } } @@ -1011,5 +1010,5 @@ func runThrottledStatusNotifier(stopCh <-chan struct{}, interval time.Duration, fn() } } - }, 1*time.Second, stopCh) + }, 1*time.Second) } diff --git a/pkg/cvo/sync_worker_test.go b/pkg/cvo/sync_worker_test.go index 423833ec1e..54285a9d97 100644 --- a/pkg/cvo/sync_worker_test.go +++ b/pkg/cvo/sync_worker_test.go @@ -1,6 +1,7 @@ package cvo import ( + "context" "fmt" "testing" "time" @@ -146,12 +147,11 @@ func Test_statusWrapper_ReportGeneration(t *testing.T) { } } func Test_runThrottledStatusNotifier(t *testing.T) { - stopCh := make(chan struct{}) - defer close(stopCh) in := make(chan SyncWorkerStatus) out := make(chan struct{}, 100) - go runThrottledStatusNotifier(stopCh, 30*time.Second, 1, in, func() { out <- struct{}{} }) + ctx := context.Background() + go runThrottledStatusNotifier(ctx, 30*time.Second, 1, in, func() { out <- struct{}{} }) in <- SyncWorkerStatus{Actual: configv1.Update{Image: "test"}} select {