diff --git a/cmd/main.go b/cmd/main.go index a11b054c30..1414798925 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -5,9 +5,6 @@ import ( "github.com/spf13/cobra" "k8s.io/klog/v2" - - _ "github.com/openshift/cluster-version-operator/pkg/clusterconditions/always" - _ "github.com/openshift/cluster-version-operator/pkg/clusterconditions/promql" ) var ( diff --git a/install/0000_00_cluster-version-operator_01_admingate_configmap.yaml b/install/0000_00_cluster-version-operator_01_admingate_configmap.yaml index a3387fd866..f468c90771 100644 --- a/install/0000_00_cluster-version-operator_01_admingate_configmap.yaml +++ b/install/0000_00_cluster-version-operator_01_admingate_configmap.yaml @@ -1,5 +1,8 @@ apiVersion: v1 kind: ConfigMap +data: + ack-4.12-kube-1.26-api-removals-in-4.13: |- + Kubernetes 1.26 and therefore OpenShift 4.13 remove several APIs which require admin consideration. Please see the knowledge article https://access.redhat.com/articles/6958394 for details and instructions. metadata: name: admin-gates namespace: openshift-config-managed diff --git a/install/0000_00_cluster-version-operator_03_deployment.yaml b/install/0000_00_cluster-version-operator_03_deployment.yaml index 7b608829c7..d8a32bd694 100644 --- a/install/0000_00_cluster-version-operator_03_deployment.yaml +++ b/install/0000_00_cluster-version-operator_03_deployment.yaml @@ -66,7 +66,9 @@ spec: fieldPath: spec.nodeName - name: CLUSTER_PROFILE value: {{ .ClusterProfile }} - dnsPolicy: ClusterFirstWithHostNet + # this pod is hostNetwork and uses the internal LB DNS name when possible, which the kubelet also uses. + # this dnsPolicy allows us to use the same dnsConfig as the kubelet, without access to read it ourselves. + dnsPolicy: Default hostNetwork: true nodeSelector: node-role.kubernetes.io/master: "" diff --git a/lib/resourcemerge/core.go b/lib/resourcemerge/core.go index b6f4df7619..9f3288c6b6 100644 --- a/lib/resourcemerge/core.go +++ b/lib/resourcemerge/core.go @@ -112,13 +112,6 @@ func ensureContainer(modified *bool, existing *corev1.Container, required corev1 func ensureEnvVar(modified *bool, existing *[]corev1.EnvVar, required []corev1.EnvVar) { for envidx := range required { - // Currently only CVO deployment uses this variable to inject internal LB host. - // This may result in an IP address being returned by API so assuming the - // returned value is correct. - if required[envidx].Name == "KUBERNETES_SERVICE_HOST" { - ensureEnvVarKubeService(*existing, &required[envidx]) - } - if required[envidx].ValueFrom != nil { ensureEnvVarSourceFieldRefDefault(required[envidx].ValueFrom.FieldRef) } @@ -129,14 +122,6 @@ func ensureEnvVar(modified *bool, existing *[]corev1.EnvVar, required []corev1.E } } -func ensureEnvVarKubeService(existing []corev1.EnvVar, required *corev1.EnvVar) { - for envidx := range existing { - if existing[envidx].Name == required.Name { - required.Value = existing[envidx].Value - } - } -} - func ensureEnvVarSourceFieldRefDefault(required *corev1.ObjectFieldSelector) { if required != nil && required.APIVersion == "" { required.APIVersion = "v1" diff --git a/lib/resourcemerge/core_test.go b/lib/resourcemerge/core_test.go index 4ca8978549..29db68f3ef 100644 --- a/lib/resourcemerge/core_test.go +++ b/lib/resourcemerge/core_test.go @@ -1640,6 +1640,26 @@ func TestEnsureEnvVar(t *testing.T) { }, expectedModified: false, }, + { + name: "CVO can inject LB into ENVVAR", + existing: []corev1.EnvVar{ + {Name: "ENVVAR", Value: "127.0.0.1"}, + }, + input: []corev1.EnvVar{ + {Name: "ENVVAR", Value: "api-int.ci-ln-vqs15yk-72292.gcp-2.ci.openshift.org"}, + }, + expectedModified: true, + }, + { + name: "CVO can inject LB into KUBERNETES_SERVICE_HOST", + existing: []corev1.EnvVar{ + {Name: "KUBERNETES_SERVICE_HOST", Value: "127.0.0.1"}, + }, + input: []corev1.EnvVar{ + {Name: "KUBERNETES_SERVICE_HOST", Value: "api-int.ci-ln-vqs15yk-72292.gcp-2.ci.openshift.org"}, + }, + expectedModified: true, + }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { diff --git a/pkg/cincinnati/cincinnati.go b/pkg/cincinnati/cincinnati.go index 9cdc00e3eb..d3d41e2aa7 100644 --- a/pkg/cincinnati/cincinnati.go +++ b/pkg/cincinnati/cincinnati.go @@ -30,13 +30,18 @@ const ( // Client is a Cincinnati client which can be used to fetch update graphs from // an upstream Cincinnati stack. type Client struct { - id uuid.UUID - transport *http.Transport + id uuid.UUID + transport *http.Transport + conditionRegistry clusterconditions.ConditionRegistry } // NewClient creates a new Cincinnati client with the given client identifier. -func NewClient(id uuid.UUID, transport *http.Transport) Client { - return Client{id: id, transport: transport} +func NewClient(id uuid.UUID, transport *http.Transport, conditionRegistry clusterconditions.ConditionRegistry) Client { + return Client{ + id: id, + transport: transport, + conditionRegistry: conditionRegistry, + } } // Error is returned when are unable to get updates. @@ -216,7 +221,7 @@ func (c Client) GetUpdates(ctx context.Context, uri *url.URL, arch string, chann for i := len(conditionalUpdates) - 1; i >= 0; i-- { for j, risk := range conditionalUpdates[i].Risks { - conditionalUpdates[i].Risks[j].MatchingRules, err = clusterconditions.PruneInvalid(ctx, risk.MatchingRules) + conditionalUpdates[i].Risks[j].MatchingRules, err = c.conditionRegistry.PruneInvalid(ctx, risk.MatchingRules) if len(conditionalUpdates[i].Risks[j].MatchingRules) == 0 { klog.Warningf("Conditional update to %s, risk %q, has empty pruned matchingRules; dropping this target to avoid rejections when pushing to the Kubernetes API server. Pruning results: %s", conditionalUpdates[i].Release.Version, risk.Name, err) conditionalUpdates = append(conditionalUpdates[:i], conditionalUpdates[i+1:]...) diff --git a/pkg/cincinnati/cincinnati_test.go b/pkg/cincinnati/cincinnati_test.go index 28b8193cc6..776257371f 100644 --- a/pkg/cincinnati/cincinnati_test.go +++ b/pkg/cincinnati/cincinnati_test.go @@ -10,11 +10,11 @@ import ( "reflect" "testing" + "github.com/openshift/cluster-version-operator/pkg/clusterconditions/standard" + "github.com/blang/semver/v4" "github.com/google/uuid" configv1 "github.com/openshift/api/config/v1" - _ "github.com/openshift/cluster-version-operator/pkg/clusterconditions/always" - _ "github.com/openshift/cluster-version-operator/pkg/clusterconditions/promql" _ "k8s.io/klog/v2" // integration tests set glog flags. ) @@ -604,7 +604,7 @@ func TestGetUpdates(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(handler)) defer ts.Close() - c := NewClient(clientID, nil) + c := NewClient(clientID, nil, standard.NewConditionRegistry(nil)) uri, err := url.Parse(ts.URL) if err != nil { diff --git a/pkg/clusterconditions/always/always.go b/pkg/clusterconditions/always/always.go index 6267761eb7..ca24f0a3e5 100644 --- a/pkg/clusterconditions/always/always.go +++ b/pkg/clusterconditions/always/always.go @@ -8,14 +8,11 @@ import ( "errors" configv1 "github.com/openshift/api/config/v1" - "github.com/openshift/cluster-version-operator/pkg/clusterconditions" ) // Always implements a cluster condition that always matches. type Always struct{} -var always = &Always{} - // Valid returns an error if the condition contains any properties // besides 'type'. func (a *Always) Valid(ctx context.Context, condition *configv1.ClusterCondition) error { @@ -30,7 +27,3 @@ func (a *Always) Valid(ctx context.Context, condition *configv1.ClusterCondition func (a *Always) Match(ctx context.Context, condition *configv1.ClusterCondition) (bool, error) { return true, nil } - -func init() { - clusterconditions.Register("Always", always) -} diff --git a/pkg/clusterconditions/cache/cache.go b/pkg/clusterconditions/cache/cache.go index ec772d2b24..e19a54d617 100644 --- a/pkg/clusterconditions/cache/cache.go +++ b/pkg/clusterconditions/cache/cache.go @@ -117,6 +117,8 @@ func (c *Cache) Match(ctx context.Context, condition *configv1.ClusterCondition) detail = fmt.Sprintf(" (last evaluated on %s)", thiefResult.When) } klog.V(2).Infof("%s is the most stale cached cluster-condition match entry, but it is too fresh%s. However, we don't have a cached evaluation for %s, so attempt to evaluate that now.", thiefKey, detail, key) + thiefKey = key + targetCondition = condition } // if we ended up stealing this Match call, log that, to make contention more clear diff --git a/pkg/clusterconditions/clusterconditions.go b/pkg/clusterconditions/clusterconditions.go index e532547cbe..d961d67379 100644 --- a/pkg/clusterconditions/clusterconditions.go +++ b/pkg/clusterconditions/clusterconditions.go @@ -25,28 +25,51 @@ type Condition interface { Match(ctx context.Context, condition *configv1.ClusterCondition) (bool, error) } -// Registry is a registry of implemented condition types. -var Registry map[string]Condition +type ConditionRegistry interface { + // Register registers a condition type, and panics on any name collisions. + Register(conditionType string, condition Condition) + + // PruneInvalid returns a new slice with recognized, valid conditions. + // The error complains about any unrecognized or invalid conditions. + PruneInvalid(ctx context.Context, matchingRules []configv1.ClusterCondition) ([]configv1.ClusterCondition, error) + + // Match returns whether the cluster matches the given rules (true), + // does not match (false), or the rules fail to evaluate (error). + Match(ctx context.Context, matchingRules []configv1.ClusterCondition) (bool, error) +} + +type conditionRegistry struct { + // registry is a registry of implemented condition types. + registry map[string]Condition +} + +func NewConditionRegistry() ConditionRegistry { + ret := &conditionRegistry{ + registry: map[string]Condition{}, + } + + return ret +} // Register registers a condition type, and panics on any name collisions. -func Register(conditionType string, condition Condition) { - if Registry == nil { - Registry = make(map[string]Condition, 1) +func (r *conditionRegistry) Register(conditionType string, condition Condition) { + if r.registry == nil { + r.registry = make(map[string]Condition, 1) } - if existing, ok := Registry[conditionType]; ok && condition != existing { + if existing, ok := r.registry[conditionType]; ok && condition != existing { panic(fmt.Sprintf("cluster condition %q already registered", conditionType)) } - Registry[conditionType] = condition + r.registry[conditionType] = condition } // PruneInvalid returns a new slice with recognized, valid conditions. // The error complains about any unrecognized or invalid conditions. -func PruneInvalid(ctx context.Context, matchingRules []configv1.ClusterCondition) ([]configv1.ClusterCondition, error) { +func (r *conditionRegistry) PruneInvalid(ctx context.Context, matchingRules []configv1.ClusterCondition) ([]configv1.ClusterCondition, error) { var valid []configv1.ClusterCondition var errs []error for _, config := range matchingRules { - condition, ok := Registry[config.Type] + condition, ok := r.registry[config.Type] if !ok { errs = append(errs, fmt.Errorf("Skipping unrecognized cluster condition type %q", config.Type)) continue @@ -63,11 +86,11 @@ func PruneInvalid(ctx context.Context, matchingRules []configv1.ClusterCondition // Match returns whether the cluster matches the given rules (true), // does not match (false), or the rules fail to evaluate (error). -func Match(ctx context.Context, matchingRules []configv1.ClusterCondition) (bool, error) { +func (r *conditionRegistry) Match(ctx context.Context, matchingRules []configv1.ClusterCondition) (bool, error) { var errs []error for _, config := range matchingRules { - condition, ok := Registry[config.Type] + condition, ok := r.registry[config.Type] if !ok { klog.V(2).Infof("Skipping unrecognized cluster condition type %q", config.Type) continue diff --git a/pkg/clusterconditions/clusterconditions_test.go b/pkg/clusterconditions/clusterconditions_test.go index 4acb9b0659..d00541306d 100644 --- a/pkg/clusterconditions/clusterconditions_test.go +++ b/pkg/clusterconditions/clusterconditions_test.go @@ -8,10 +8,7 @@ import ( "testing" configv1 "github.com/openshift/api/config/v1" - - "github.com/openshift/cluster-version-operator/pkg/clusterconditions" - _ "github.com/openshift/cluster-version-operator/pkg/clusterconditions/always" - _ "github.com/openshift/cluster-version-operator/pkg/clusterconditions/promql" + "github.com/openshift/cluster-version-operator/pkg/clusterconditions/standard" ) // Error implements a cluster condition that always errors. @@ -33,6 +30,7 @@ func (e *Error) Match(ctx context.Context, condition *configv1.ClusterCondition) func TestPruneInvalid(t *testing.T) { ctx := context.Background() + registry := standard.NewConditionRegistry(nil) for _, testCase := range []struct { name string @@ -100,7 +98,7 @@ func TestPruneInvalid(t *testing.T) { }, } { t.Run(testCase.name, func(t *testing.T) { - valid, err := clusterconditions.PruneInvalid(ctx, testCase.conditions) + valid, err := registry.PruneInvalid(ctx, testCase.conditions) if !reflect.DeepEqual(valid, testCase.expectedValid) { t.Errorf("got valid %v but expected %v", valid, testCase.expectedValid) } @@ -117,7 +115,8 @@ func TestPruneInvalid(t *testing.T) { func TestMatch(t *testing.T) { ctx := context.Background() - clusterconditions.Register("Error", &Error{}) + registry := standard.NewConditionRegistry(nil) + registry.Register("Error", &Error{}) for _, testCase := range []struct { name string @@ -181,7 +180,7 @@ func TestMatch(t *testing.T) { }, } { t.Run(testCase.name, func(t *testing.T) { - match, err := clusterconditions.Match(ctx, testCase.conditions) + match, err := registry.Match(ctx, testCase.conditions) if match != testCase.expectedMatch { t.Errorf("got match %t but expected %t", match, testCase.expectedMatch) } @@ -194,6 +193,4 @@ func TestMatch(t *testing.T) { } }) } - - delete(clusterconditions.Registry, "Error") } diff --git a/pkg/clusterconditions/mock/mock.go b/pkg/clusterconditions/mock/mock.go index bede44d735..47c4945c14 100644 --- a/pkg/clusterconditions/mock/mock.go +++ b/pkg/clusterconditions/mock/mock.go @@ -44,7 +44,7 @@ type Mock struct { } // Valid returns an error popped from ValidQueue. -func (m *Mock) Valid(ctx context.Context, condition *configv1.ClusterCondition) error { +func (m *Mock) Valid(_ context.Context, condition *configv1.ClusterCondition) error { m.Calls = append(m.Calls, Call{ When: time.Now(), Method: "Valid", @@ -61,7 +61,7 @@ func (m *Mock) Valid(ctx context.Context, condition *configv1.ClusterCondition) } // Match returns an error popped from MatchQueue. -func (m *Mock) Match(ctx context.Context, condition *configv1.ClusterCondition) (bool, error) { +func (m *Mock) Match(_ context.Context, condition *configv1.ClusterCondition) (bool, error) { m.Calls = append(m.Calls, Call{ When: time.Now(), Method: "Match", diff --git a/pkg/clusterconditions/promql/promql.go b/pkg/clusterconditions/promql/promql.go index 5b84069995..19f20b6897 100644 --- a/pkg/clusterconditions/promql/promql.go +++ b/pkg/clusterconditions/promql/promql.go @@ -7,6 +7,7 @@ import ( "context" "errors" "fmt" + "net" "time" configv1 "github.com/openshift/api/config/v1" @@ -14,16 +15,16 @@ import ( prometheusv1 "github.com/prometheus/client_golang/api/prometheus/v1" "github.com/prometheus/common/config" "github.com/prometheus/common/model" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" - "github.com/openshift/cluster-version-operator/pkg/clusterconditions" "github.com/openshift/cluster-version-operator/pkg/clusterconditions/cache" ) // PromQL implements a cluster condition that matches based on PromQL. type PromQL struct { - // Address holds the Prometheus query URI. - Address string + kubeClient kubernetes.Interface // HTTPClientConfig holds the client configuration for connecting to the Prometheus service. HTTPClientConfig config.HTTPClientConfig @@ -32,23 +33,40 @@ type PromQL struct { QueryTimeout time.Duration } -var promql = &cache.Cache{ - Condition: &PromQL{ - Address: "https://thanos-querier.openshift-monitoring.svc.cluster.local:9091", - HTTPClientConfig: config.HTTPClientConfig{ - Authorization: &config.Authorization{ - Type: "Bearer", - CredentialsFile: "/var/run/secrets/kubernetes.io/serviceaccount/token", - }, - TLSConfig: config.TLSConfig{ - CAFile: "/etc/tls/service-ca/service-ca.crt", +func NewPromQL(kubeClient kubernetes.Interface) *cache.Cache { + return &cache.Cache{ + Condition: &PromQL{ + kubeClient: kubeClient, + HTTPClientConfig: config.HTTPClientConfig{ + Authorization: &config.Authorization{ + Type: "Bearer", + CredentialsFile: "/var/run/secrets/kubernetes.io/serviceaccount/token", + }, + TLSConfig: config.TLSConfig{ + CAFile: "/etc/tls/service-ca/service-ca.crt", + // ServerName is used to verify the name of the service we will connect to using IP. + ServerName: "thanos-querier.openshift-monitoring.svc.cluster.local", + }, }, + QueryTimeout: 5 * time.Minute, }, - QueryTimeout: 5 * time.Minute, - }, - MinBetweenMatches: 10 * time.Minute, - MinForCondition: time.Hour, - Expiration: 24 * time.Hour, + MinBetweenMatches: 1 * time.Second, + MinForCondition: time.Hour, + Expiration: 24 * time.Hour, + } +} + +// Address determines the address of the thanos-querier to avoid requiring service DNS resolution. +// We do this so that our host-network pod can use the node's resolv.conf to resolve the internal load balancer name +// on the pod before DNS pods are available and before the service network is available. The side effect is that +// the CVO cannot resolve service DNS names. +func (p *PromQL) Address(ctx context.Context) (string, error) { + svc, err := p.kubeClient.CoreV1().Services("openshift-monitoring").Get(ctx, "thanos-querier", metav1.GetOptions{}) + if err != nil { + return "", err + } + + return fmt.Sprintf("https://%s", net.JoinHostPort(svc.Spec.ClusterIP, "9091")), nil } // Valid returns an error if the condition contains any properties @@ -69,7 +87,13 @@ func (p *PromQL) Valid(ctx context.Context, condition *configv1.ClusterCondition // false when the PromQL evaluates to 0, and an error if the PromQL // returns no time series or returns a value besides 0 or 1. func (p *PromQL) Match(ctx context.Context, condition *configv1.ClusterCondition) (bool, error) { - clientConfig := api.Config{Address: p.Address} + // Lookup the address every attempt in case the service IP changes. This can happen when the thanos service is + // deleted and recreated. + address, err := p.Address(ctx) + if err != nil { + return false, fmt.Errorf("failure determine thanos IP: %w", err) + } + clientConfig := api.Config{Address: address} if roundTripper, err := config.NewRoundTripperFromConfig(p.HTTPClientConfig, "cluster-conditions"); err == nil { clientConfig.RoundTripper = roundTripper @@ -122,7 +146,3 @@ func (p *PromQL) Match(ctx context.Context, condition *configv1.ClusterCondition } return false, fmt.Errorf("invalid PromQL result (must be 0 or 1): %v", sample.Value) } - -func init() { - clusterconditions.Register("PromQL", promql) -} diff --git a/pkg/clusterconditions/standard/standard.go b/pkg/clusterconditions/standard/standard.go new file mode 100644 index 0000000000..97c2814f88 --- /dev/null +++ b/pkg/clusterconditions/standard/standard.go @@ -0,0 +1,16 @@ +package standard + +import ( + "github.com/openshift/cluster-version-operator/pkg/clusterconditions" + "github.com/openshift/cluster-version-operator/pkg/clusterconditions/always" + "github.com/openshift/cluster-version-operator/pkg/clusterconditions/promql" + "k8s.io/client-go/kubernetes" +) + +func NewConditionRegistry(kubeClient kubernetes.Interface) clusterconditions.ConditionRegistry { + conditionRegistry := clusterconditions.NewConditionRegistry() + conditionRegistry.Register("Always", &always.Always{}) + conditionRegistry.Register("PromQL", promql.NewPromQL(kubeClient)) + + return conditionRegistry +} diff --git a/pkg/cvo/availableupdates.go b/pkg/cvo/availableupdates.go index 8c24a79a24..36ff71b7d8 100644 --- a/pkg/cvo/availableupdates.go +++ b/pkg/cvo/availableupdates.go @@ -40,49 +40,89 @@ func (optr *Operator) syncAvailableUpdates(ctx context.Context, config *configv1 arch := optr.getArchitecture() // updates are only checked at most once per minimumUpdateCheckInterval or if the generation changes - u := optr.getAvailableUpdates() - if u == nil { + optrAvailableUpdates := optr.getAvailableUpdates() + needFreshFetch := true + if optrAvailableUpdates == nil { klog.V(2).Info("First attempt to retrieve available updates") - } else if !u.RecentlyChanged(optr.minimumUpdateCheckInterval) { - klog.V(2).Infof("Retrieving available updates again, because more than %s has elapsed since %s", optr.minimumUpdateCheckInterval, u.LastAttempt) - } else if channel != u.Channel { - klog.V(2).Infof("Retrieving available updates again, because the channel has changed from %q to %q", u.Channel, channel) - } else if arch != u.Architecture { - klog.V(2).Infof("Retrieving available updates again, because the architecture has changed from %q to %q", - u.Architecture, arch) - } else if upstream == u.Upstream || (upstream == optr.defaultUpstreamServer && u.Upstream == "") { - klog.V(2).Infof("Available updates were recently retrieved, with less than %s elapsed since %s, will try later.", optr.minimumUpdateCheckInterval, u.LastAttempt) - return nil + optrAvailableUpdates = &availableUpdates{} + } else if !optrAvailableUpdates.RecentlyChanged(optr.minimumUpdateCheckInterval) { + klog.V(2).Infof("Retrieving available updates again, because more than %s has elapsed since %s", optr.minimumUpdateCheckInterval, optrAvailableUpdates.LastAttempt.Format(time.RFC3339)) + } else if channel != optrAvailableUpdates.Channel { + klog.V(2).Infof("Retrieving available updates again, because the channel has changed from %q to %q", optrAvailableUpdates.Channel, channel) + } else if arch != optrAvailableUpdates.Architecture { + klog.V(2).Infof("Retrieving available updates again, because the architecture has changed from %q to %q", optrAvailableUpdates.Architecture, arch) + } else if upstream == optrAvailableUpdates.Upstream || (upstream == optr.defaultUpstreamServer && optrAvailableUpdates.Upstream == "") { + needsConditionalUpdateEval := false + for _, conditionalUpdate := range optrAvailableUpdates.ConditionalUpdates { + if recommended := meta.FindStatusCondition(conditionalUpdate.Conditions, "Recommended"); recommended == nil { + needsConditionalUpdateEval = true + break + } else if recommended.Status != metav1.ConditionTrue && recommended.Status != metav1.ConditionFalse { + needsConditionalUpdateEval = true + break + } + } + if !needsConditionalUpdateEval { + klog.V(2).Infof("Available updates were recently retrieved, with less than %s elapsed since %s, will try later.", optr.minimumUpdateCheckInterval, optrAvailableUpdates.LastAttempt.Format(time.RFC3339)) + return nil + } + needFreshFetch = false } else { - klog.V(2).Infof("Retrieving available updates again, because the upstream has changed from %q to %q", u.Upstream, config.Spec.Upstream) + klog.V(2).Infof("Retrieving available updates again, because the upstream has changed from %q to %q", optrAvailableUpdates.Upstream, config.Spec.Upstream) } - transport, err := optr.getTransport() - if err != nil { - return err - } + if needFreshFetch { + transport, err := optr.getTransport() + if err != nil { + return err + } - current, updates, conditionalUpdates, condition := calculateAvailableUpdatesStatus(ctx, string(config.Spec.ClusterID), transport, upstream, arch, channel, optr.release.Version) + current, updates, conditionalUpdates, condition := calculateAvailableUpdatesStatus(ctx, string(config.Spec.ClusterID), + transport, upstream, arch, channel, optr.release.Version, optr.conditionRegistry) - if usedDefaultUpstream { - upstream = "" + // Populate conditions on conditional updates from operator state + for i := range optrAvailableUpdates.ConditionalUpdates { + for j := range conditionalUpdates { + if optrAvailableUpdates.ConditionalUpdates[i].Release.Image == conditionalUpdates[j].Release.Image { + conditionalUpdates[j].Conditions = optrAvailableUpdates.ConditionalUpdates[i].Conditions + break + } + } + } + + if usedDefaultUpstream { + upstream = "" + } + + optrAvailableUpdates.Upstream = upstream + optrAvailableUpdates.Channel = channel + optrAvailableUpdates.Architecture = arch + optrAvailableUpdates.Current = current + optrAvailableUpdates.Updates = updates + optrAvailableUpdates.ConditionalUpdates = conditionalUpdates + optrAvailableUpdates.ConditionRegistry = optr.conditionRegistry + optrAvailableUpdates.Condition = condition } - au := &availableUpdates{ - Upstream: upstream, - Channel: config.Spec.Channel, - Architecture: arch, - Current: current, - Updates: updates, - ConditionalUpdates: conditionalUpdates, - Condition: condition, + optrAvailableUpdates.evaluateConditionalUpdates(ctx) + + queueKey := optr.queueKey() + for _, conditionalUpdate := range optrAvailableUpdates.ConditionalUpdates { + if recommended := meta.FindStatusCondition(conditionalUpdate.Conditions, "Recommended"); recommended == nil { + klog.Warningf("Requeue available-update evaluation, because %q lacks a Recommended condition", conditionalUpdate.Release.Version) + optr.availableUpdatesQueue.AddAfter(queueKey, time.Second) + break + } else if recommended.Status != metav1.ConditionTrue && recommended.Status != metav1.ConditionFalse { + klog.V(2).Infof("Requeue available-update evaluation, because %q is %s=%s: %s: %s", conditionalUpdate.Release.Version, recommended.Type, recommended.Status, recommended.Reason, recommended.Message) + optr.availableUpdatesQueue.AddAfter(queueKey, time.Second) + break + } } - au.evaluateConditionalUpdates(ctx) - optr.setAvailableUpdates(au) + optr.setAvailableUpdates(optrAvailableUpdates) - // requeue - optr.queue.Add(optr.queueKey()) + // queue optr.sync() to update ClusterVersion status + optr.queue.Add(queueKey) return nil } @@ -109,6 +149,7 @@ type availableUpdates struct { Current configv1.Release Updates []configv1.Release ConditionalUpdates []configv1.ConditionalUpdate + ConditionRegistry clusterconditions.ConditionRegistry Condition configv1.ClusterOperatorStatusCondition } @@ -181,7 +222,37 @@ func (optr *Operator) setAvailableUpdates(u *availableUpdates) { func (optr *Operator) getAvailableUpdates() *availableUpdates { optr.statusLock.Lock() defer optr.statusLock.Unlock() - return optr.availableUpdates + + if optr.availableUpdates == nil { + return nil + } + + u := &availableUpdates{ + Upstream: optr.availableUpdates.Upstream, + Channel: optr.availableUpdates.Channel, + Architecture: optr.availableUpdates.Architecture, + LastAttempt: optr.availableUpdates.LastAttempt, + LastSyncOrConfigChange: optr.availableUpdates.LastSyncOrConfigChange, + Current: *optr.availableUpdates.Current.DeepCopy(), + ConditionRegistry: optr.availableUpdates.ConditionRegistry, // intentionally not a copy, to preserve cache state + Condition: optr.availableUpdates.Condition, + } + + if optr.availableUpdates.Updates != nil { + u.Updates = make([]configv1.Release, 0, len(optr.availableUpdates.Updates)) + for _, update := range optr.availableUpdates.Updates { + u.Updates = append(u.Updates, *update.DeepCopy()) + } + } + + if optr.availableUpdates.ConditionalUpdates != nil { + u.ConditionalUpdates = make([]configv1.ConditionalUpdate, 0, len(optr.availableUpdates.ConditionalUpdates)) + for _, conditionalUpdate := range optr.availableUpdates.ConditionalUpdates { + u.ConditionalUpdates = append(u.ConditionalUpdates, *conditionalUpdate.DeepCopy()) + } + } + + return u } // getArchitecture returns the currently determined cluster architecture. @@ -198,7 +269,8 @@ func (optr *Operator) SetArchitecture(architecture string) { optr.architecture = architecture } -func calculateAvailableUpdatesStatus(ctx context.Context, clusterID string, transport *http.Transport, upstream, arch, channel, version string) (configv1.Release, []configv1.Release, []configv1.ConditionalUpdate, configv1.ClusterOperatorStatusCondition) { +func calculateAvailableUpdatesStatus(ctx context.Context, clusterID string, transport *http.Transport, upstream, arch, channel, version string, conditionRegistry clusterconditions.ConditionRegistry) (configv1.Release, []configv1.Release, []configv1.ConditionalUpdate, configv1.ClusterOperatorStatusCondition) { + var cvoCurrent configv1.Release if len(upstream) == 0 { return cvoCurrent, nil, nil, configv1.ClusterOperatorStatusCondition{ @@ -253,7 +325,7 @@ func calculateAvailableUpdatesStatus(ctx context.Context, clusterID string, tran } } - current, updates, conditionalUpdates, err := cincinnati.NewClient(uuid, transport).GetUpdates(ctx, upstreamURI, arch, channel, currentVersion) + current, updates, conditionalUpdates, err := cincinnati.NewClient(uuid, transport, conditionRegistry).GetUpdates(ctx, upstreamURI, arch, channel, currentVersion) if err != nil { klog.V(2).Infof("Upstream server %s could not return available updates: %v", upstream, err) if updateError, ok := err.(*cincinnati.Error); ok { @@ -282,10 +354,16 @@ func (u *availableUpdates) evaluateConditionalUpdates(ctx context.Context) { return } + sort.Slice(u.ConditionalUpdates, func(i, j int) bool { + vi := semver.MustParse(u.ConditionalUpdates[i].Release.Version) + vj := semver.MustParse(u.ConditionalUpdates[j].Release.Version) + return vi.GTE(vj) + }) + for i, conditionalUpdate := range u.ConditionalUpdates { - if errorCondition := evaluateConditionalUpdate(ctx, &conditionalUpdate); errorCondition != nil { + if errorCondition := evaluateConditionalUpdate(ctx, &conditionalUpdate, u.ConditionRegistry); errorCondition != nil { meta.SetStatusCondition(&conditionalUpdate.Conditions, *errorCondition) - u.removeUpdate(ctx, conditionalUpdate.Release.Image) + u.removeUpdate(conditionalUpdate.Release.Image) } else { meta.SetStatusCondition(&conditionalUpdate.Conditions, metav1.Condition{ Type: "Recommended", @@ -294,13 +372,23 @@ func (u *availableUpdates) evaluateConditionalUpdates(ctx context.Context) { Reason: "AsExpected", Message: "The update is recommended, because none of the conditional update risks apply to this cluster.", }) - u.Updates = append(u.Updates, conditionalUpdate.Release) + u.addUpdate(conditionalUpdate.Release) } u.ConditionalUpdates[i].Conditions = conditionalUpdate.Conditions } } -func (u *availableUpdates) removeUpdate(ctx context.Context, image string) { +func (u *availableUpdates) addUpdate(release configv1.Release) { + for _, update := range u.Updates { + if update.Image == release.Image { + return + } + } + + u.Updates = append(u.Updates, release) +} + +func (u *availableUpdates) removeUpdate(image string) { for i, update := range u.Updates { if update.Image == image { u.Updates = append(u.Updates[:i], u.Updates[i+1:]...) @@ -308,13 +396,13 @@ func (u *availableUpdates) removeUpdate(ctx context.Context, image string) { } } -func evaluateConditionalUpdate(ctx context.Context, conditionalUpdate *configv1.ConditionalUpdate) *metav1.Condition { +func evaluateConditionalUpdate(ctx context.Context, conditionalUpdate *configv1.ConditionalUpdate, conditionRegistry clusterconditions.ConditionRegistry) *metav1.Condition { recommended := &metav1.Condition{ Type: "Recommended", } messages := []string{} for _, risk := range conditionalUpdate.Risks { - if match, err := clusterconditions.Match(ctx, risk.MatchingRules); err != nil { + if match, err := conditionRegistry.Match(ctx, risk.MatchingRules); err != nil { if recommended.Status != metav1.ConditionFalse { recommended.Status = metav1.ConditionUnknown } diff --git a/pkg/cvo/availableupdates_test.go b/pkg/cvo/availableupdates_test.go new file mode 100644 index 0000000000..117ec21616 --- /dev/null +++ b/pkg/cvo/availableupdates_test.go @@ -0,0 +1,226 @@ +package cvo + +import ( + "context" + "fmt" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/util/workqueue" + + configv1 "github.com/openshift/api/config/v1" + "github.com/openshift/cluster-version-operator/pkg/clusterconditions" + "github.com/openshift/cluster-version-operator/pkg/clusterconditions/always" + "github.com/openshift/cluster-version-operator/pkg/clusterconditions/mock" +) + +// notFoundProxyLister is a stub for ProxyLister +type notFoundProxyLister struct{} + +func (n notFoundProxyLister) List(labels.Selector) ([]*configv1.Proxy, error) { + return nil, nil +} + +func (n notFoundProxyLister) Get(name string) (*configv1.Proxy, error) { + return nil, errors.NewNotFound(schema.GroupResource{Group: configv1.GroupName, Resource: "proxy"}, name) +} + +type notFoundConfigMapLister struct{} + +func (n notFoundConfigMapLister) List(labels.Selector) ([]*corev1.ConfigMap, error) { + return nil, nil +} + +func (n notFoundConfigMapLister) Get(name string) (*corev1.ConfigMap, error) { + return nil, errors.NewNotFound(schema.GroupResource{Group: "", Resource: "configmap"}, name) +} + +// osusWithSingleConditionalEdge helper returns: +// 1. mock osus server that serves a simple conditional path between two versions. +// 2. mock condition that always evaluates to match +// 3. expected []ConditionalUpdate data after evaluation of the data served by mock osus server +// (assuming the mock condition (2) was used) +// 4. current version of the cluster that would issue the request to the mock osus server +func osusWithSingleConditionalEdge() (*httptest.Server, clusterconditions.Condition, []configv1.ConditionalUpdate, string) { + from := "4.5.5" + to := "4.5.6" + osus := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintf(w, `{ + "nodes": [{"version": "%s", "payload": "payload/%s"}, {"version": "%s", "payload": "payload/%s"}], + "conditionalEdges": [ + { + "edges": [{"from": "%s", "to": "%s"}], + "risks": [ + { + "url": "https://example.com/%s", + "name": "FourFiveSix", + "message": "Four Five Five is just fine", + "matchingRules": [{"type": "PromQL", "promql": { "promql": "this is a query"}}] + } + ] + } + ] +} +`, from, from, to, to, from, to, to) + })) + + updates := []configv1.ConditionalUpdate{ + { + Release: configv1.Release{Version: to, Image: "payload/" + to}, + Risks: []configv1.ConditionalUpdateRisk{ + { + URL: "https://example.com/" + to, + Name: "FourFiveSix", + Message: "Four Five Five is just fine", + MatchingRules: []configv1.ClusterCondition{ + { + Type: "PromQL", + PromQL: &configv1.PromQLClusterCondition{PromQL: "this is a query"}, + }, + }, + }, + }, + Conditions: []metav1.Condition{ + { + Type: "Recommended", + Status: metav1.ConditionFalse, + Reason: "FourFiveSix", + Message: "Four Five Five is just fine https://example.com/" + to, + }, + }, + }, + } + mockPromql := &mock.Mock{ + ValidQueue: []error{nil}, + MatchQueue: []mock.MatchResult{{Match: true, Error: nil}}, + } + + return osus, mockPromql, updates, from +} + +func newOperator(url, version string, promqlMock clusterconditions.Condition) (*availableUpdates, *Operator) { + currentRelease := configv1.Release{Version: version, Image: "payload/" + version} + registry := clusterconditions.NewConditionRegistry() + registry.Register("Always", &always.Always{}) + registry.Register("PromQL", promqlMock) + operator := &Operator{ + defaultUpstreamServer: url, + architecture: "amd64", + proxyLister: notFoundProxyLister{}, + cmConfigManagedLister: notFoundConfigMapLister{}, + conditionRegistry: registry, + queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + release: currentRelease, + } + availableUpdates := &availableUpdates{ + Architecture: "amd64", + Current: configv1.Release{Version: version, Image: "payload/" + version}, + } + return availableUpdates, operator +} + +var cvFixture = &configv1.ClusterVersion{ + Spec: configv1.ClusterVersionSpec{ + ClusterID: "897f0a22-33ca-4106-a2c4-29b75250255a", + Channel: "channel", + }, +} + +var availableUpdatesCmpOpts = []cmp.Option{ + cmpopts.IgnoreTypes(time.Time{}), + cmpopts.IgnoreInterfaces(struct { + clusterconditions.ConditionRegistry + }{}), +} + +func TestSyncAvailableUpdates(t *testing.T) { + fakeOsus, mockPromql, expectedConditionalUpdates, version := osusWithSingleConditionalEdge() + defer fakeOsus.Close() + expectedAvailableUpdates, optr := newOperator(fakeOsus.URL, version, mockPromql) + expectedAvailableUpdates.ConditionalUpdates = expectedConditionalUpdates + expectedAvailableUpdates.Channel = cvFixture.Spec.Channel + expectedAvailableUpdates.Condition = configv1.ClusterOperatorStatusCondition{ + Type: configv1.RetrievedUpdates, + Status: configv1.ConditionTrue, + } + + err := optr.syncAvailableUpdates(context.Background(), cvFixture) + + if err != nil { + t.Fatalf("syncAvailableUpdates() unexpected error: %v", err) + } + if diff := cmp.Diff(expectedAvailableUpdates, optr.availableUpdates, availableUpdatesCmpOpts...); diff != "" { + t.Fatalf("available updates differ from expected:\n%s", diff) + } +} + +func TestSyncAvailableUpdates_ConditionalUpdateRecommendedConditions(t *testing.T) { + testCases := []struct { + name string + modifyOriginalState func(condition *metav1.Condition) + expectTimeChange bool + }{ + { + name: "lastTransitionTime is not updated when nothing changes", + modifyOriginalState: func(condition *metav1.Condition) {}, + }, + { + name: "lastTransitionTime is not updated when changed but status is identical", + modifyOriginalState: func(condition *metav1.Condition) { + condition.Reason = "OldReason" + condition.Message = "This message should be changed to something else" + }, + }, + { + name: "lastTransitionTime is updated when status changes", + modifyOriginalState: func(condition *metav1.Condition) { + condition.Status = metav1.ConditionUnknown + }, + expectTimeChange: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + fakeOsus, mockPromql, conditionalUpdates, version := osusWithSingleConditionalEdge() + defer fakeOsus.Close() + availableUpdates, optr := newOperator(fakeOsus.URL, version, mockPromql) + optr.availableUpdates = availableUpdates + optr.availableUpdates.ConditionalUpdates = conditionalUpdates + expectedConditions := []metav1.Condition{{}} + conditionalUpdates[0].Conditions[0].DeepCopyInto(&expectedConditions[0]) + tc.modifyOriginalState(&optr.availableUpdates.ConditionalUpdates[0].Conditions[0]) + + err := optr.syncAvailableUpdates(context.Background(), cvFixture) + + if err != nil { + t.Fatalf("syncAvailableUpdates() unexpected error: %v", err) + } + if optr.availableUpdates == nil || len(optr.availableUpdates.ConditionalUpdates) == 0 { + t.Fatalf("syncAvailableUpdates() did not properly set available updates") + } + if diff := cmp.Diff(expectedConditions, optr.availableUpdates.ConditionalUpdates[0].Conditions, cmpopts.IgnoreTypes(time.Time{})); diff != "" { + t.Errorf("conditions on conditional updates differ from expected:\n%s", diff) + } + timeBefore := expectedConditions[0].LastTransitionTime + timeAfter := optr.availableUpdates.ConditionalUpdates[0].Conditions[0].LastTransitionTime + + if tc.expectTimeChange && timeBefore == timeAfter { + t.Errorf("lastTransitionTime was not updated as expected: before=%s after=%s", timeBefore, timeAfter) + } + if !tc.expectTimeChange && timeBefore != timeAfter { + t.Errorf("lastTransitionTime was updated but was not expected to: before=%s after=%s", timeBefore, timeAfter) + } + }) + } +} diff --git a/pkg/cvo/cvo.go b/pkg/cvo/cvo.go index 82538be816..f901748e07 100644 --- a/pkg/cvo/cvo.go +++ b/pkg/cvo/cvo.go @@ -33,6 +33,8 @@ import ( "github.com/openshift/cluster-version-operator/lib/capability" "github.com/openshift/cluster-version-operator/lib/resourcebuilder" "github.com/openshift/cluster-version-operator/lib/validation" + "github.com/openshift/cluster-version-operator/pkg/clusterconditions" + "github.com/openshift/cluster-version-operator/pkg/clusterconditions/standard" cvointernal "github.com/openshift/cluster-version-operator/pkg/cvo/internal" "github.com/openshift/cluster-version-operator/pkg/cvo/internal/dynamicclient" "github.com/openshift/cluster-version-operator/pkg/internal" @@ -125,10 +127,12 @@ type Operator struct { upgradeable *upgradeable upgradeableChecks []upgradeableCheck - // minimumUpgradeableCheckInterval is the minimum duration before another - // synchronization of the upgradeable status can happen during failing - // precondition checks. - minimumUpgradeableCheckInterval time.Duration + // upgradeableCheckIntervals drives minimal intervals between Upgradeable status + // synchronization + upgradeableCheckIntervals upgradeableCheckIntervals + + // conditionRegistry is used to evaluate whether a particular condition is risky or not. + conditionRegistry clusterconditions.ConditionRegistry // verifier, if provided, will be used to check an update before it is executed. // Any error will prevent an update payload from being accessed. @@ -188,11 +192,11 @@ func New( Image: releaseImage, }, - statusInterval: 15 * time.Second, - minimumUpdateCheckInterval: minimumInterval, - minimumUpgradeableCheckInterval: 15 * time.Second, - payloadDir: overridePayloadDir, - defaultUpstreamServer: "https://api.openshift.com/api/upgrades_info/v1/graph", + statusInterval: 15 * time.Second, + minimumUpdateCheckInterval: minimumInterval, + upgradeableCheckIntervals: defaultUpgradeableCheckIntervals(), + payloadDir: overridePayloadDir, + defaultUpstreamServer: "https://api.openshift.com/api/upgrades_info/v1/graph", client: client, kubeClient: kubeClient, @@ -205,6 +209,7 @@ func New( exclude: exclude, requiredFeatureSet: requiredFeatureSet, clusterProfile: clusterProfile, + conditionRegistry: standard.NewConditionRegistry(kubeClient), } cvInformer.Informer().AddEventHandler(optr.clusterVersionEventHandler()) @@ -496,29 +501,64 @@ func (optr *Operator) clusterVersionEventHandler() cache.ResourceEventHandler { func (optr *Operator) clusterOperatorEventHandler() cache.ResourceEventHandler { return cache.ResourceEventHandlerFuncs{ UpdateFunc: func(old, new interface{}) { + if optr.configSync == nil { + return + } + versionName := "operator" - _, oldVersion := clusterOperatorInterfaceVersionOrDie(old, versionName) - newStruct, newVersion := clusterOperatorInterfaceVersionOrDie(new, versionName) - if optr.configSync != nil && oldVersion != newVersion { + oldStruct := clusterOperatorInterfaceStructOrDie(old) + newStruct := clusterOperatorInterfaceStructOrDie(new) + + oldVersion := clusterOperatorVersion(oldStruct, versionName) + newVersion := clusterOperatorVersion(newStruct, versionName) + if oldVersion != newVersion { msg := fmt.Sprintf("Cluster operator %s changed versions[name=%q] from %q to %q", newStruct.ObjectMeta.Name, versionName, oldVersion, newVersion) optr.configSync.NotifyAboutManagedResourceActivity(new, msg) + return + } + + for _, cond := range []configv1.ClusterStatusConditionType{ + configv1.OperatorAvailable, + configv1.OperatorDegraded, + } { + oldStatus := clusterOperatorConditionStatus(oldStruct, cond) + newStatus := clusterOperatorConditionStatus(newStruct, cond) + if oldStatus != newStatus { + msg := fmt.Sprintf("Cluster operator %s changed %s from %q to %q", newStruct.ObjectMeta.Name, cond, oldStatus, newStatus) + optr.configSync.NotifyAboutManagedResourceActivity(new, msg) + return + } } + }, } } -func clusterOperatorInterfaceVersionOrDie(obj interface{}, name string) (*configv1.ClusterOperator, string) { +func clusterOperatorInterfaceStructOrDie(obj interface{}) *configv1.ClusterOperator { co, ok := obj.(*configv1.ClusterOperator) if !ok { panic(fmt.Sprintf("%v is %T, not a ClusterOperator", obj, obj)) } + return co +} + +func clusterOperatorVersion(co *configv1.ClusterOperator, name string) string { for _, version := range co.Status.Versions { if version.Name == name { - return co, version.Version + return version.Version + } + } + return "" +} + +func clusterOperatorConditionStatus(co *configv1.ClusterOperator, condType configv1.ClusterStatusConditionType) configv1.ConditionStatus { + for _, cond := range co.Status.Conditions { + if cond.Type == condType { + return cond.Status } } - return co, "" + return configv1.ConditionUnknown } func (optr *Operator) worker(ctx context.Context, queue workqueue.RateLimitingInterface, syncHandler func(context.Context, string) error) { diff --git a/pkg/cvo/updatepayload.go b/pkg/cvo/updatepayload.go index 7b2d1c79a3..702f195028 100644 --- a/pkg/cvo/updatepayload.go +++ b/pkg/cvo/updatepayload.go @@ -38,14 +38,15 @@ func (optr *Operator) defaultPayloadDir() string { func (optr *Operator) defaultPayloadRetriever() PayloadRetriever { return &payloadRetriever{ - kubeClient: optr.kubeClient, - operatorName: optr.name, - releaseImage: optr.release.Image, - namespace: optr.namespace, - nodeName: optr.nodename, - payloadDir: optr.defaultPayloadDir(), - workingDir: targetUpdatePayloadsDir, - verifier: optr.verifier, + kubeClient: optr.kubeClient, + operatorName: optr.name, + releaseImage: optr.release.Image, + namespace: optr.namespace, + nodeName: optr.nodename, + payloadDir: optr.defaultPayloadDir(), + workingDir: targetUpdatePayloadsDir, + verifier: optr.verifier, + retrieveTimeout: 4 * time.Minute, } } @@ -53,6 +54,12 @@ const ( targetUpdatePayloadsDir = "/etc/cvo/updatepayloads" ) +// downloadFunc downloads the requested update and returns either a path on the local filesystem +// containing extracted manifests or an error +// The type exists so that tests for payloadRetriever.RetrievePayload can mock this functionality +// by setting payloadRetriever.downloader. +type downloadFunc func(context.Context, configv1.Update) (string, error) + type payloadRetriever struct { // releaseImage and payloadDir are the default payload identifiers - updates that point // to releaseImage will always use the contents of payloadDir @@ -68,8 +75,22 @@ type payloadRetriever struct { // verifier guards against invalid remote data being accessed verifier verify.Interface + + // downloader is called to download the requested update to local filesystem. It should only be + // set to non-nil value in tests. When this is nil, payloadRetriever.targetUpdatePayloadDir + // is called as a downloader. + downloader downloadFunc + + // retrieveTimeout limits the time spent in payloadRetriever.RetrievePayload. This timeout is only + // applied when the context passed into that method is unbounded; otherwise the method respects + // the deadline set in the input context. + retrieveTimeout time.Duration } +// RetrievePayload verifies, downloads and extracts to local filesystem the payload image specified +// by update. If the input context has a deadline, it is respected, otherwise r.retrieveTimeout +// applies. When update.Force is true, the verification is still performed, but the method proceeds +// even when the image cannot be verified successfully. func (r *payloadRetriever) RetrievePayload(ctx context.Context, update configv1.Update) (PayloadInfo, error) { if r.releaseImage == update.Image { return PayloadInfo{ @@ -82,28 +103,28 @@ func (r *payloadRetriever) RetrievePayload(ctx context.Context, update configv1. return PayloadInfo{}, fmt.Errorf("no payload image has been specified and the contents of the payload cannot be retrieved") } - var info PayloadInfo - // verify the provided payload var releaseDigest string if index := strings.LastIndex(update.Image, "@"); index != -1 { releaseDigest = update.Image[index+1:] } - verifyCtx := ctx - - // if 'force' specified, ensure call to verify payload signature times out well before parent context - // to allow time to perform forced update - if update.Force { - timeout := time.Minute * 2 - if deadline, deadlineSet := ctx.Deadline(); deadlineSet { - timeout = time.Until(deadline) / 2 - } - klog.V(2).Infof("Forced update so reducing payload signature verification timeout to %s", timeout) - var cancel context.CancelFunc - verifyCtx, cancel = context.WithTimeout(ctx, timeout) - defer cancel() + + var downloadCtx context.Context + downloadDeadline, ok := ctx.Deadline() + if ok { + downloadCtx = ctx + } else { + downloadDeadline = time.Now().Add(r.retrieveTimeout) + var downloadCtxCancel context.CancelFunc + downloadCtx, downloadCtxCancel = context.WithDeadline(ctx, downloadDeadline) + defer downloadCtxCancel() } + verifyTimeout := time.Until(downloadDeadline) / 2 + verifyCtx, verifyCtxCancel := context.WithTimeout(ctx, verifyTimeout) + defer verifyCtxCancel() + + var info PayloadInfo if err := r.verifier.Verify(verifyCtx, releaseDigest); err != nil { vErr := &payload.UpdateError{ Reason: "ImageVerificationFailed", @@ -120,9 +141,12 @@ func (r *payloadRetriever) RetrievePayload(ctx context.Context, update configv1. info.Verified = true } + if r.downloader == nil { + r.downloader = r.targetUpdatePayloadDir + } // download the payload to the directory var err error - info.Directory, err = r.targetUpdatePayloadDir(ctx, update) + info.Directory, err = r.downloader(downloadCtx, update) if err != nil { return PayloadInfo{}, &payload.UpdateError{ Reason: "UpdatePayloadRetrievalFailed", diff --git a/pkg/cvo/updatepayload_test.go b/pkg/cvo/updatepayload_test.go new file mode 100644 index 0000000000..a93b463a7d --- /dev/null +++ b/pkg/cvo/updatepayload_test.go @@ -0,0 +1,347 @@ +package cvo + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + configv1 "github.com/openshift/api/config/v1" + "github.com/openshift/library-go/pkg/verify/store" + + //nolint:staticcheck // verify,Verifier from openshift/library-go uses a type from this deprecated package (needs to be addressed there) + "golang.org/x/crypto/openpgp" + + "github.com/openshift/cluster-version-operator/pkg/payload" +) + +// EquateErrorMessage reports errors to be equal if both are nil +// or both have the same message +var EquateErrorMessage = cmp.FilterValues(func(x, y interface{}) bool { + _, ok1 := x.(error) + _, ok2 := y.(error) + return ok1 && ok2 +}, cmp.Comparer(func(x, y interface{}) bool { + xe := x.(error) + ye := y.(error) + if xe == nil || ye == nil { + return xe == nil && ye == nil + } + return xe.Error() == ye.Error() +})) + +// mockVerifier implements verify.Verifier +type mockVerifier struct { + t *testing.T + + expectNoVerify bool + expectVerifyDigest string + expectVerifyCancel bool + verifyReturns error +} + +func (m *mockVerifier) Verify(ctx context.Context, releaseDigest string) error { + if m.expectNoVerify { + m.t.Errorf("Unexpected call: Verify(releaseDigest=%s)", releaseDigest) + } + if !m.expectNoVerify && m.expectVerifyDigest != releaseDigest { + m.t.Errorf("Verify() called with unexpected value: %v", releaseDigest) + } + + timeout, timeoutCancel := context.WithTimeout(context.Background(), backstopDuration) + defer timeoutCancel() + + if m.expectVerifyCancel { + select { + case <-ctx.Done(): + case <-timeout.Done(): + m.t.Errorf("Verify() expected to be cancelled by context but it was not") + } + } else { + select { + case <-ctx.Done(): + m.t.Errorf("Unexpected ctx cancel in Verify()") + default: + } + } + + return m.verifyReturns +} +func (m *mockVerifier) Signatures() map[string][][]byte { return nil } +func (m *mockVerifier) Verifiers() map[string]openpgp.EntityList { return nil } +func (m *mockVerifier) AddStore(_ store.Store) {} + +type downloadMocker struct { + expectCancel bool + duration time.Duration + returnLocation string + returnErr error +} + +const ( + // backstopDuration is a maximum duration for which we wait on a tested operation + backstopDuration = 5 * time.Second + // hangs represents a "hanging" operation, always preempted by backstop + hangs = 2 * backstopDuration +) + +func (d *downloadMocker) make(t *testing.T) downloadFunc { + if d == nil { + return func(_ context.Context, update configv1.Update) (string, error) { + t.Errorf("Unexpected call: downloader(, upddate=%v", update) + return "", nil + } + } + return func(ctx context.Context, _ configv1.Update) (string, error) { + backstopCtx, backstopCancel := context.WithTimeout(context.Background(), backstopDuration) + defer backstopCancel() + + downloadCtx, downloadCancel := context.WithTimeout(context.Background(), d.duration) + defer downloadCancel() + + if d.expectCancel { + select { + case <-backstopCtx.Done(): + t.Errorf("downloader: test backstop hit (expected cancel via ctx)") + return "", errors.New("downloader: test backstop hit (expected cancel via ctx)") + case <-downloadCtx.Done(): + t.Errorf("downloader: download finished (expected cancel via ctx)") + return "/some/location", errors.New("downloader: download finished (expected cancel via ctx)") + case <-ctx.Done(): + } + } else { + select { + case <-backstopCtx.Done(): + t.Errorf("downloader: test backstop hit (expected download to finish)") + return "", errors.New("downloader: test backstop hit (expected download to finish)") + case <-ctx.Done(): + t.Errorf("downloader: unexpected ctx cancel (expected download to finish)") + return "", errors.New("downloader: unexpected ctx cancel (expected download to finish)") + case <-downloadCtx.Done(): + } + } + + return d.returnLocation, d.returnErr + } +} + +func TestPayloadRetrieverRetrievePayload(t *testing.T) { + t.Parallel() + testCases := []struct { + name string + verifier *mockVerifier + downloader *downloadMocker + update configv1.Update + ctxTimeout time.Duration + + expected PayloadInfo + expectedErr error + }{ + { + name: "when desired image matches retriever image then return local payload directory", + verifier: &mockVerifier{expectNoVerify: true}, + update: configv1.Update{Image: "releaseImage"}, + expected: PayloadInfo{ + Directory: "/local/payload/dir", + Local: true, + }, + }, + { + name: "when desired image is empty then return error", + verifier: &mockVerifier{expectNoVerify: true}, + update: configv1.Update{}, + expectedErr: errors.New("no payload image has been specified and the contents of the payload cannot be retrieved"), + }, + { + name: "when desired image is tag pullspec and passes verification but fails to download then return error", + verifier: &mockVerifier{expectVerifyDigest: ""}, + downloader: &downloadMocker{returnErr: errors.New("fails to download")}, + update: configv1.Update{Image: "quay.io/openshift-release-dev/ocp-release:failing"}, + expectedErr: errors.New("Unable to download and prepare the update: fails to download"), + }, + { + name: "when desired image is tag pullspec and fails verification then return error", + verifier: &mockVerifier{ + expectVerifyDigest: "", + verifyReturns: errors.New("fails-verification"), + }, + update: configv1.Update{Image: "quay.io/openshift-release-dev/ocp-release:failing"}, + expectedErr: errors.New("The update cannot be verified: fails-verification"), + }, + { + name: "when desired image is sha digest pullspec and passes verification but fails to download then return error", + verifier: &mockVerifier{expectVerifyDigest: "sha256:08ef16270e643a001454410b22864db6246d782298be267688a4433d83f404f4"}, + downloader: &downloadMocker{returnErr: errors.New("fails to download")}, + update: configv1.Update{Image: "quay.io/openshift-release-dev/ocp-release@sha256:08ef16270e643a001454410b22864db6246d782298be267688a4433d83f404f4"}, + expectedErr: errors.New("Unable to download and prepare the update: fails to download"), + }, + { + name: "when sha digest pullspec image fails verification but update is forced then retrieval proceeds then when download fails then return error", + verifier: &mockVerifier{ + expectVerifyDigest: "sha256:08ef16270e643a001454410b22864db6246d782298be267688a4433d83f404f4", + verifyReturns: errors.New("fails-to-verify"), + }, + downloader: &downloadMocker{returnErr: errors.New("fails to download")}, + update: configv1.Update{ + Force: true, + Image: "quay.io/openshift-release-dev/ocp-release@sha256:08ef16270e643a001454410b22864db6246d782298be267688a4433d83f404f4", + }, + expectedErr: errors.New("Unable to download and prepare the update: fails to download"), + }, + { + name: "when sha digest pullspec image is timing out verification with unlimited context and update is forced " + + "then verification times out promptly and retrieval proceeds but download fails then return error", + verifier: &mockVerifier{ + expectVerifyDigest: "sha256:08ef16270e643a001454410b22864db6246d782298be267688a4433d83f404f4", + expectVerifyCancel: true, + verifyReturns: errors.New("fails-to-verify"), + }, + downloader: &downloadMocker{returnErr: errors.New("fails to download")}, + update: configv1.Update{ + Force: true, + Image: "quay.io/openshift-release-dev/ocp-release@sha256:08ef16270e643a001454410b22864db6246d782298be267688a4433d83f404f4", + }, + expectedErr: errors.New("Unable to download and prepare the update: fails to download"), + }, + { + name: "when sha digest pullspec image is timing out verification with long deadline context and update is forced " + + "then verification times out promptly, retrieval proceeds but download fails then return error", + verifier: &mockVerifier{ + expectVerifyDigest: "sha256:08ef16270e643a001454410b22864db6246d782298be267688a4433d83f404f4", + expectVerifyCancel: true, + verifyReturns: errors.New("fails-to-verify"), + }, + downloader: &downloadMocker{returnErr: errors.New("fails to download")}, + update: configv1.Update{ + Force: true, + Image: "quay.io/openshift-release-dev/ocp-release@sha256:08ef16270e643a001454410b22864db6246d782298be267688a4433d83f404f4", + }, + ctxTimeout: backstopDuration - time.Second, + expectedErr: errors.New("Unable to download and prepare the update: fails to download"), + }, + { + name: "when sha digest pullspec image is timing out verification with long deadline context and update is forced " + + "then verification times out promptly, retrieval proceeds but download times out then return error", + verifier: &mockVerifier{ + expectVerifyDigest: "sha256:08ef16270e643a001454410b22864db6246d782298be267688a4433d83f404f4", + expectVerifyCancel: true, + verifyReturns: errors.New("fails-to-verify"), + }, + downloader: &downloadMocker{ + expectCancel: true, + duration: backstopDuration - (2 * time.Second), + returnErr: errors.New("fails to download"), + }, + update: configv1.Update{ + Force: true, + Image: "quay.io/openshift-release-dev/ocp-release@sha256:08ef16270e643a001454410b22864db6246d782298be267688a4433d83f404f4", + }, + ctxTimeout: backstopDuration - time.Second, + expectedErr: errors.New("Unable to download and prepare the update: fails to download"), + }, + { + name: "when sha digest pullspec image fails verification but update is forced then retrieval proceeds and download succeeds then return info with location and verification error", + verifier: &mockVerifier{ + expectVerifyDigest: "sha256:08ef16270e643a001454410b22864db6246d782298be267688a4433d83f404f4", + verifyReturns: errors.New("fails-to-verify"), + }, + downloader: &downloadMocker{returnLocation: "/location/of/download"}, + update: configv1.Update{ + Force: true, + Image: "quay.io/openshift-release-dev/ocp-release@sha256:08ef16270e643a001454410b22864db6246d782298be267688a4433d83f404f4", + }, + expected: PayloadInfo{ + Directory: "/location/of/download", + VerificationError: &payload.UpdateError{ + Reason: "ImageVerificationFailed", + Message: `Target release version="" image="quay.io/openshift-release-dev/ocp-release@sha256:08ef16270e643a001454410b22864db6246d782298be267688a4433d83f404f4" cannot be verified, but continuing anyway because the update was forced: fails-to-verify`, + }, + }, + }, + { + name: "when sha digest pullspec image passes and download hangs then it is terminated and returns error (RHBZ#2090680)", + verifier: &mockVerifier{expectVerifyDigest: "sha256:08ef16270e643a001454410b22864db6246d782298be267688a4433d83f404f4"}, + downloader: &downloadMocker{ + duration: hangs, + expectCancel: true, + returnErr: errors.New("download was canceled"), + }, + update: configv1.Update{ + Image: "quay.io/openshift-release-dev/ocp-release@sha256:08ef16270e643a001454410b22864db6246d782298be267688a4433d83f404f4", + }, + expectedErr: errors.New("Unable to download and prepare the update: download was canceled"), + }, + { + name: "when sha digest pullspec image fails to verify until timeout but is forced then it allows enough time for download and it returns successfully", + verifier: &mockVerifier{ + expectVerifyDigest: "sha256:08ef16270e643a001454410b22864db6246d782298be267688a4433d83f404f4", + expectVerifyCancel: true, + verifyReturns: errors.New("fails-to-verify"), + }, + downloader: &downloadMocker{ + duration: 300 * time.Millisecond, + returnLocation: "/location/of/download", + }, + update: configv1.Update{ + Force: true, + Image: "quay.io/openshift-release-dev/ocp-release@sha256:08ef16270e643a001454410b22864db6246d782298be267688a4433d83f404f4", + }, + expected: PayloadInfo{ + Directory: "/location/of/download", + VerificationError: &payload.UpdateError{ + Reason: "ImageVerificationFailed", + Message: `Target release version="" image="quay.io/openshift-release-dev/ocp-release@sha256:08ef16270e643a001454410b22864db6246d782298be267688a4433d83f404f4" cannot be verified, but continuing anyway because the update was forced: fails-to-verify`, + }, + }, + }, + { + name: "when sha digest pullspec image passes and download succeeds then returns location and no error", + verifier: &mockVerifier{expectVerifyDigest: "sha256:08ef16270e643a001454410b22864db6246d782298be267688a4433d83f404f4"}, + downloader: &downloadMocker{returnLocation: "/location/of/download"}, + update: configv1.Update{ + Image: "quay.io/openshift-release-dev/ocp-release@sha256:08ef16270e643a001454410b22864db6246d782298be267688a4433d83f404f4", + }, + expected: PayloadInfo{ + Directory: "/location/of/download", + Verified: true, + }, + }, + } + for _, tc := range testCases { + tc := tc // prevent parallel closures from sharing a single tc copy + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + retriever := payloadRetriever{ + releaseImage: "releaseImage", + payloadDir: "/local/payload/dir", + retrieveTimeout: 2 * time.Second, + } + + if tc.verifier != nil { + tc.verifier.t = t + retriever.verifier = tc.verifier + } + + if tc.downloader != nil { + retriever.downloader = tc.downloader.make(t) + } + + ctx := context.Background() + if tc.ctxTimeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, tc.ctxTimeout) + defer cancel() + } + + actual, err := retriever.RetrievePayload(ctx, tc.update) + if diff := cmp.Diff(tc.expectedErr, err, EquateErrorMessage); diff != "" { + t.Errorf("Returned error differs from expected:\n%s", diff) + } + if diff := cmp.Diff(tc.expected, actual, cmpopts.IgnoreFields(payload.UpdateError{}, "Nested")); err == nil && diff != "" { + t.Errorf("Returned PayloadInfo differs from expected:\n%s", diff) + } + }) + } +} diff --git a/pkg/cvo/upgradeable.go b/pkg/cvo/upgradeable.go index d42b7f318c..0074e2a547 100644 --- a/pkg/cvo/upgradeable.go +++ b/pkg/cvo/upgradeable.go @@ -33,15 +33,41 @@ const ( var adminAckGateRegexp = regexp.MustCompile(adminAckGateFmt) -// syncUpgradeable synchronizes the upgradeable status only if it has been more than -// the minimumUpdateCheckInterval since the last synchronization or the precondition -// checks on the payload are failing for less than minimumUpdateCheckInterval, and it has -// been more than the minimumUpgradeableCheckInterval since the last synchronization. -func (optr *Operator) syncUpgradeable(config *configv1.ClusterVersion) error { - u := optr.getUpgradeable() - if u != nil && u.RecentlyChanged(optr.minimumUpdateCheckInterval) && !shouldSyncUpgradeableDueToPreconditionChecks(optr, config, u) { - klog.V(2).Infof("Upgradeable conditions were recently checked, will try later.") - return nil +// upgradeableCheckIntervals holds the time intervals that drive how often CVO checks for upgradeability +type upgradeableCheckIntervals struct { + // min is the base minimum interval between upgradeability checks, applied under normal circumstances + min time.Duration + + // minOnFailedPreconditions is the minimum interval between upgradeability checks when precondition checks are + // failing and were recently (see afterPreconditionsFailed) changed. This should be lower than min because we want CVO + // to check upgradeability more often + minOnFailedPreconditions time.Duration + + // afterFailingPreconditions is the period of time after preconditions failed when minOnFailedPreconditions is + // applied instead of min + afterPreconditionsFailed time.Duration +} + +func defaultUpgradeableCheckIntervals() upgradeableCheckIntervals { + return upgradeableCheckIntervals{ + // 2 minutes are here because it is a lower bound of previously nondeterministicly chosen interval + // TODO (OTA-860): Investigate our options of reducing this interval. We will need to investigate + // the API usage patterns of the underlying checks, there is anecdotal evidence that they hit + // apiserver instead of using local informer cache + min: 2 * time.Minute, + minOnFailedPreconditions: 15 * time.Second, + afterPreconditionsFailed: 2 * time.Minute, + } +} + +// syncUpgradeable synchronizes the upgradeable status only if the sufficient time passed since its last update. This +// throttling period is dynamic and is driven by upgradeableCheckIntervals. +func (optr *Operator) syncUpgradeable(cv *configv1.ClusterVersion) error { + if u := optr.getUpgradeable(); u != nil { + if u.RecentlyChanged(optr.upgradeableCheckIntervals.throttlePeriod(cv)) { + klog.V(2).Infof("Upgradeable conditions were recently checked, will try later.") + return nil + } } optr.setUpgradeableConditions() @@ -450,21 +476,20 @@ func (optr *Operator) adminGatesEventHandler() cache.ResourceEventHandler { } } -// shouldSyncUpgradeableDueToPreconditionChecks checks if the upgradeable status should -// be synchronized due to the precondition checks. It checks whether the precondition -// checks on the payload are failing for less than minimumUpdateCheckInterval, and it has -// been more than the minimumUpgradeableCheckInterval since the last synchronization. -// This means, upon precondition failure, we will synchronize the upgradeable status -// more frequently at beginning of an upgrade. -// -// shouldSyncUpgradeableDueToPreconditionChecks expects the parameters not to be nil. +// throttlePeriod returns the duration for which upgradeable status should be considered recent +// enough and unnecessary to update. The baseline duration is min. When the precondition checks +// on the payload are failing for less than afterPreconditionsFailed we want to synchronize +// the upgradeable status more frequently at beginning of an upgrade and return +// minOnFailedPreconditions which is expected to be lower than min. // -// Function returns true if the synchronization should happen, returns false otherwise. -func shouldSyncUpgradeableDueToPreconditionChecks(optr *Operator, config *configv1.ClusterVersion, u *upgradeable) bool { - cond := resourcemerge.FindOperatorStatusCondition(config.Status.Conditions, DesiredReleaseAccepted) - if cond != nil && cond.Reason == "PreconditionChecks" && cond.Status == configv1.ConditionFalse && - hasPassedDurationSinceTime(u.At, optr.minimumUpgradeableCheckInterval) && !hasPassedDurationSinceTime(cond.LastTransitionTime.Time, optr.minimumUpdateCheckInterval) { - return true - } - return false +// The cv parameter is expected to be non-nil. +func (intervals *upgradeableCheckIntervals) throttlePeriod(cv *configv1.ClusterVersion) time.Duration { + if cond := resourcemerge.FindOperatorStatusCondition(cv.Status.Conditions, DesiredReleaseAccepted); cond != nil { + // Function returns true if the synchronization should happen, returns false otherwise. + if cond.Reason == "PreconditionChecks" && cond.Status == configv1.ConditionFalse && + !hasPassedDurationSinceTime(cond.LastTransitionTime.Time, intervals.afterPreconditionsFailed) { + return intervals.minOnFailedPreconditions + } + } + return intervals.min } diff --git a/pkg/cvo/upgradeable_test.go b/pkg/cvo/upgradeable_test.go new file mode 100644 index 0000000000..ec9e41264f --- /dev/null +++ b/pkg/cvo/upgradeable_test.go @@ -0,0 +1,59 @@ +package cvo + +import ( + "testing" + "time" + + configv1 "github.com/openshift/api/config/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestUpgradeableCheckIntervalsThrottlePeriod(t *testing.T) { + intervals := defaultUpgradeableCheckIntervals() + testCases := []struct { + name string + condition *configv1.ClusterOperatorStatusCondition + expected time.Duration + }{ + { + name: "no condition", + expected: intervals.min, + }, + { + name: "passing preconditions", + condition: &configv1.ClusterOperatorStatusCondition{Type: DesiredReleaseAccepted, Reason: "PreconditionChecks", Status: configv1.ConditionTrue, LastTransitionTime: metav1.Now()}, + expected: intervals.min, + }, + { + name: "failing but not precondition", + condition: &configv1.ClusterOperatorStatusCondition{Type: DesiredReleaseAccepted, Reason: "NotPreconditionChecks", Status: configv1.ConditionFalse, LastTransitionTime: metav1.Now()}, + expected: intervals.min, + }, + { + name: "failing preconditions but too long ago", + condition: &configv1.ClusterOperatorStatusCondition{ + Type: DesiredReleaseAccepted, + Reason: "PreconditionChecks", + Status: configv1.ConditionFalse, + LastTransitionTime: metav1.NewTime(time.Now().Add(-(intervals.afterPreconditionsFailed + time.Hour))), + }, + expected: intervals.min, + }, + { + name: "failing preconditions recently", + condition: &configv1.ClusterOperatorStatusCondition{Type: DesiredReleaseAccepted, Reason: "PreconditionChecks", Status: configv1.ConditionFalse, LastTransitionTime: metav1.Now()}, + expected: intervals.minOnFailedPreconditions, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + cv := &configv1.ClusterVersion{Status: configv1.ClusterVersionStatus{Conditions: []configv1.ClusterOperatorStatusCondition{}}} + if tc.condition != nil { + cv.Status.Conditions = append(cv.Status.Conditions, *tc.condition) + } + if actual := intervals.throttlePeriod(cv); actual != tc.expected { + t.Errorf("throttlePeriod() = %v, want %v", actual, tc.expected) + } + }) + } +} diff --git a/pkg/payload/precondition/clusterversion/recommendedupdate.go b/pkg/payload/precondition/clusterversion/recommendedupdate.go index 4821c21de2..8a07ae255c 100644 --- a/pkg/payload/precondition/clusterversion/recommendedupdate.go +++ b/pkg/payload/precondition/clusterversion/recommendedupdate.go @@ -37,10 +37,11 @@ func (ru *RecommendedUpdate) Run(ctx context.Context, releaseContext preconditio } if err != nil { return &precondition.Error{ - Nested: err, - Reason: "UnknownError", - Message: err.Error(), - Name: ru.Name(), + Nested: err, + Reason: "UnknownError", + Message: err.Error(), + Name: ru.Name(), + NonBlockingWarning: true, } } for _, recommended := range clusterVersion.Status.AvailableUpdates { @@ -61,7 +62,8 @@ func (ru *RecommendedUpdate) Run(ctx context.Context, releaseContext preconditio Reason: condition.Reason, Message: fmt.Sprintf("Update from %s to %s is not recommended:\n\n%s", clusterVersion.Status.Desired.Version, releaseContext.DesiredVersion, condition.Message), - Name: ru.Name(), + Name: ru.Name(), + NonBlockingWarning: true, } default: return &precondition.Error{ @@ -69,7 +71,8 @@ func (ru *RecommendedUpdate) Run(ctx context.Context, releaseContext preconditio Message: fmt.Sprintf("Update from %s to %s is %s=%s: %s: %s", clusterVersion.Status.Desired.Version, releaseContext.DesiredVersion, condition.Type, condition.Status, condition.Reason, condition.Message), - Name: ru.Name(), + Name: ru.Name(), + NonBlockingWarning: true, } } } @@ -78,7 +81,8 @@ func (ru *RecommendedUpdate) Run(ctx context.Context, releaseContext preconditio Reason: "UnknownConditionType", Message: fmt.Sprintf("Update from %s to %s has a status.conditionalUpdates entry, but no Recommended condition.", clusterVersion.Status.Desired.Version, releaseContext.DesiredVersion), - Name: ru.Name(), + Name: ru.Name(), + NonBlockingWarning: true, } } } @@ -88,13 +92,13 @@ func (ru *RecommendedUpdate) Run(ctx context.Context, releaseContext preconditio Reason: "NoChannel", Message: fmt.Sprintf("Configured channel is unset, so the recommended status of updating from %s to %s is unknown.", clusterVersion.Status.Desired.Version, releaseContext.DesiredVersion), - Name: ru.Name(), + Name: ru.Name(), + NonBlockingWarning: true, } } reason := "UnknownUpdate" msg := "" - if retrieved := resourcemerge.FindOperatorStatusCondition(clusterVersion.Status.Conditions, configv1.RetrievedUpdates); retrieved == nil { msg = fmt.Sprintf("No %s, so the recommended status of updating from %s to %s is unknown.", configv1.RetrievedUpdates, clusterVersion.Status.Desired.Version, releaseContext.DesiredVersion) @@ -108,9 +112,10 @@ func (ru *RecommendedUpdate) Run(ctx context.Context, releaseContext preconditio if msg != "" { return &precondition.Error{ - Reason: reason, - Message: msg, - Name: ru.Name(), + Reason: reason, + Message: msg, + Name: ru.Name(), + NonBlockingWarning: true, } } return nil diff --git a/pkg/payload/precondition/precondition.go b/pkg/payload/precondition/precondition.go index 5daa1d47ec..3725ad84aa 100644 --- a/pkg/payload/precondition/precondition.go +++ b/pkg/payload/precondition/precondition.go @@ -12,10 +12,11 @@ import ( // Error is a wrapper for errors that occur during a precondition check for payload. type Error struct { - Nested error - Reason string - Message string - Name string + Nested error + Reason string + Message string + Name string + NonBlockingWarning bool // For some errors we do not want to fail the precondition check but we want to communicate about it } // Error returns the message @@ -71,12 +72,18 @@ func Summarize(errs []error, force bool) (bool, error) { return false, nil } var msgs []string + var isWarning = true for _, e := range errs { if pferr, ok := e.(*Error); ok { msgs = append(msgs, fmt.Sprintf("Precondition %q failed because of %q: %v", pferr.Name, pferr.Reason, pferr.Error())) + if !pferr.NonBlockingWarning { + isWarning = false + } continue } + isWarning = false msgs = append(msgs, e.Error()) + } msg := "" if len(msgs) == 1 { @@ -85,11 +92,12 @@ func Summarize(errs []error, force bool) (bool, error) { msg = fmt.Sprintf("Multiple precondition checks failed:\n* %s", strings.Join(msgs, "\n* ")) } - if force { + if force && !isWarning { msg = fmt.Sprintf("Forced through blocking failures: %s", msg) + isWarning = true } - return !force, &payload.UpdateError{ + return !isWarning, &payload.UpdateError{ Nested: nil, Reason: "UpgradePreconditionCheckFailed", Message: msg, diff --git a/pkg/payload/precondition/precondition_test.go b/pkg/payload/precondition/precondition_test.go index 015a41cfd7..581bf4c07b 100644 --- a/pkg/payload/precondition/precondition_test.go +++ b/pkg/payload/precondition/precondition_test.go @@ -29,6 +29,28 @@ func TestSummarize(t *testing.T) { force: true, expectedBlock: false, expectedError: "Forced through blocking failures: random error", + }, { + name: "unforced warning", + errors: []error{&Error{ + Nested: nil, + Reason: "UnknownUpdate", + Message: "update from A to B is probably neither recommended nor supported.", + NonBlockingWarning: true, + Name: "ClusterVersionRecommendedUpdate", + }}, + expectedBlock: false, + expectedError: `Precondition "ClusterVersionRecommendedUpdate" failed because of "UnknownUpdate": update from A to B is probably neither recommended nor supported.`, + }, { + name: "forced through warning", + errors: []error{&Error{ + Nested: nil, + Reason: "UnknownUpdate", + Message: "update from A to B is probably neither recommended nor supported.", + NonBlockingWarning: true, + Name: "ClusterVersionRecommendedUpdate", + }}, + force: true, + expectedError: `Precondition "ClusterVersionRecommendedUpdate" failed because of "UnknownUpdate": update from A to B is probably neither recommended nor supported.`, }, { name: "single feature-gate error", errors: []error{&Error{ diff --git a/pkg/start/start.go b/pkg/start/start.go index 40309b1d12..6785a83414 100644 --- a/pkg/start/start.go +++ b/pkg/start/start.go @@ -16,6 +16,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" @@ -133,19 +134,37 @@ func (o *Options) Run(ctx context.Context) error { // check to see if techpreview should be on or off. If we cannot read the featuregate for any reason, it is assumed // to be off. If this value changes, the binary will shutdown and expect the pod lifecycle to restart it. startingFeatureSet := "" - gate, err := cb.ClientOrDie("feature-gate-getter").ConfigV1().FeatureGates().Get(ctx, "cluster", metav1.GetOptions{}) - switch { - case apierrors.IsNotFound(err): - // if we have no featuregates, then the cluster is using the default featureset, which is "". - // This excludes everything that could possibly depend on a different feature set. - startingFeatureSet = "" - case err != nil: - // client-go automatically retries network blip errors on GETs for 30s by default. If we fail longer than that - // the operator won't be able to do work anyway. Return the error and crashloop. - return err - default: - startingFeatureSet = string(gate.Spec.FeatureSet) + // client-go automatically retries some network blip errors on GETs for 30s by default, and we want to + // retry the remaining ones ourselves. If we fail longer than that, the operator won't be able to do work + // anyway. Return the error and crashloop. + // + // We implement the timeout with a context because the timeout in PollImmediateWithContext does not behave + // well when ConditionFunc takes longer time to execute, like here where the GET can be retried by client-go + fgCtx, fgCancel := context.WithTimeout(ctx, 25*time.Second) + defer fgCancel() + var lastError error + if err := wait.PollImmediateInfiniteWithContext(fgCtx, 2*time.Second, func(ctx context.Context) (bool, error) { + gate, fgErr := cb.ClientOrDie("feature-gate-getter").ConfigV1().FeatureGates().Get(ctx, "cluster", metav1.GetOptions{}) + switch { + case apierrors.IsNotFound(fgErr): + // if we have no featuregates, then the cluster is using the default featureset, which is "". + // This excludes everything that could possibly depend on a different feature set. + startingFeatureSet = "" + return true, nil + case fgErr != nil: + lastError = fgErr + klog.Warningf("Failed to get FeatureGate from cluster: %v", fgErr) + return false, nil + default: + startingFeatureSet = string(gate.Spec.FeatureSet) + return true, nil + } + }); err != nil { + if lastError != nil { + return lastError + } + return err } lock, err := createResourceLock(cb, o.Namespace, o.Name)