diff --git a/cmd/main.go b/cmd/main.go index a11b054c3..141479892 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -5,9 +5,6 @@ import ( "github.com/spf13/cobra" "k8s.io/klog/v2" - - _ "github.com/openshift/cluster-version-operator/pkg/clusterconditions/always" - _ "github.com/openshift/cluster-version-operator/pkg/clusterconditions/promql" ) var ( diff --git a/install/0000_00_cluster-version-operator_03_deployment.yaml b/install/0000_00_cluster-version-operator_03_deployment.yaml index 7b608829c..d8a32bd69 100644 --- a/install/0000_00_cluster-version-operator_03_deployment.yaml +++ b/install/0000_00_cluster-version-operator_03_deployment.yaml @@ -66,7 +66,9 @@ spec: fieldPath: spec.nodeName - name: CLUSTER_PROFILE value: {{ .ClusterProfile }} - dnsPolicy: ClusterFirstWithHostNet + # this pod is hostNetwork and uses the internal LB DNS name when possible, which the kubelet also uses. + # this dnsPolicy allows us to use the same dnsConfig as the kubelet, without access to read it ourselves. + dnsPolicy: Default hostNetwork: true nodeSelector: node-role.kubernetes.io/master: "" diff --git a/pkg/cincinnati/cincinnati.go b/pkg/cincinnati/cincinnati.go index 0c866c2c4..24893f2f8 100644 --- a/pkg/cincinnati/cincinnati.go +++ b/pkg/cincinnati/cincinnati.go @@ -37,14 +37,17 @@ type Client struct { // requests. If empty, the User-Agent header will not be // populated. userAgent string + + conditionRegistry clusterconditions.ConditionRegistry } // NewClient creates a new Cincinnati client with the given client identifier. -func NewClient(id uuid.UUID, transport *http.Transport, userAgent string) Client { +func NewClient(id uuid.UUID, transport *http.Transport, userAgent string, conditionRegistry clusterconditions.ConditionRegistry) Client { return Client{ - id: id, - transport: transport, - userAgent: userAgent, + id: id, + transport: transport, + userAgent: userAgent, + conditionRegistry: conditionRegistry, } } @@ -246,7 +249,7 @@ func (c Client) GetUpdates(ctx context.Context, uri *url.URL, desiredArch, curre for i := len(conditionalUpdates) - 1; i >= 0; i-- { for j, risk := range conditionalUpdates[i].Risks { - conditionalUpdates[i].Risks[j].MatchingRules, err = clusterconditions.PruneInvalid(ctx, risk.MatchingRules) + conditionalUpdates[i].Risks[j].MatchingRules, err = c.conditionRegistry.PruneInvalid(ctx, risk.MatchingRules) if len(conditionalUpdates[i].Risks[j].MatchingRules) == 0 { klog.Warningf("Conditional update to %s, risk %q, has empty pruned matchingRules; dropping this target to avoid rejections when pushing to the Kubernetes API server. Pruning results: %s", conditionalUpdates[i].Release.Version, risk.Name, err) conditionalUpdates = append(conditionalUpdates[:i], conditionalUpdates[i+1:]...) diff --git a/pkg/cincinnati/cincinnati_test.go b/pkg/cincinnati/cincinnati_test.go index aea8e4a94..8b9c05146 100644 --- a/pkg/cincinnati/cincinnati_test.go +++ b/pkg/cincinnati/cincinnati_test.go @@ -10,11 +10,11 @@ import ( "reflect" "testing" + "github.com/openshift/cluster-version-operator/pkg/clusterconditions/standard" + "github.com/blang/semver/v4" "github.com/google/uuid" configv1 "github.com/openshift/api/config/v1" - _ "github.com/openshift/cluster-version-operator/pkg/clusterconditions/always" - _ "github.com/openshift/cluster-version-operator/pkg/clusterconditions/promql" _ "k8s.io/klog/v2" // integration tests set glog flags. ) @@ -745,7 +745,7 @@ func TestGetUpdates(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(handler)) defer ts.Close() - c := NewClient(clientID, nil, "") + c := NewClient(clientID, nil, "", standard.NewConditionRegistry(nil)) uri, err := url.Parse(ts.URL) if err != nil { diff --git a/pkg/clusterconditions/always/always.go b/pkg/clusterconditions/always/always.go index 6267761eb..ca24f0a3e 100644 --- a/pkg/clusterconditions/always/always.go +++ b/pkg/clusterconditions/always/always.go @@ -8,14 +8,11 @@ import ( "errors" configv1 "github.com/openshift/api/config/v1" - "github.com/openshift/cluster-version-operator/pkg/clusterconditions" ) // Always implements a cluster condition that always matches. type Always struct{} -var always = &Always{} - // Valid returns an error if the condition contains any properties // besides 'type'. func (a *Always) Valid(ctx context.Context, condition *configv1.ClusterCondition) error { @@ -30,7 +27,3 @@ func (a *Always) Valid(ctx context.Context, condition *configv1.ClusterCondition func (a *Always) Match(ctx context.Context, condition *configv1.ClusterCondition) (bool, error) { return true, nil } - -func init() { - clusterconditions.Register("Always", always) -} diff --git a/pkg/clusterconditions/clusterconditions.go b/pkg/clusterconditions/clusterconditions.go index e532547cb..d961d6737 100644 --- a/pkg/clusterconditions/clusterconditions.go +++ b/pkg/clusterconditions/clusterconditions.go @@ -25,28 +25,51 @@ type Condition interface { Match(ctx context.Context, condition *configv1.ClusterCondition) (bool, error) } -// Registry is a registry of implemented condition types. -var Registry map[string]Condition +type ConditionRegistry interface { + // Register registers a condition type, and panics on any name collisions. + Register(conditionType string, condition Condition) + + // PruneInvalid returns a new slice with recognized, valid conditions. + // The error complains about any unrecognized or invalid conditions. + PruneInvalid(ctx context.Context, matchingRules []configv1.ClusterCondition) ([]configv1.ClusterCondition, error) + + // Match returns whether the cluster matches the given rules (true), + // does not match (false), or the rules fail to evaluate (error). + Match(ctx context.Context, matchingRules []configv1.ClusterCondition) (bool, error) +} + +type conditionRegistry struct { + // registry is a registry of implemented condition types. + registry map[string]Condition +} + +func NewConditionRegistry() ConditionRegistry { + ret := &conditionRegistry{ + registry: map[string]Condition{}, + } + + return ret +} // Register registers a condition type, and panics on any name collisions. -func Register(conditionType string, condition Condition) { - if Registry == nil { - Registry = make(map[string]Condition, 1) +func (r *conditionRegistry) Register(conditionType string, condition Condition) { + if r.registry == nil { + r.registry = make(map[string]Condition, 1) } - if existing, ok := Registry[conditionType]; ok && condition != existing { + if existing, ok := r.registry[conditionType]; ok && condition != existing { panic(fmt.Sprintf("cluster condition %q already registered", conditionType)) } - Registry[conditionType] = condition + r.registry[conditionType] = condition } // PruneInvalid returns a new slice with recognized, valid conditions. // The error complains about any unrecognized or invalid conditions. -func PruneInvalid(ctx context.Context, matchingRules []configv1.ClusterCondition) ([]configv1.ClusterCondition, error) { +func (r *conditionRegistry) PruneInvalid(ctx context.Context, matchingRules []configv1.ClusterCondition) ([]configv1.ClusterCondition, error) { var valid []configv1.ClusterCondition var errs []error for _, config := range matchingRules { - condition, ok := Registry[config.Type] + condition, ok := r.registry[config.Type] if !ok { errs = append(errs, fmt.Errorf("Skipping unrecognized cluster condition type %q", config.Type)) continue @@ -63,11 +86,11 @@ func PruneInvalid(ctx context.Context, matchingRules []configv1.ClusterCondition // Match returns whether the cluster matches the given rules (true), // does not match (false), or the rules fail to evaluate (error). -func Match(ctx context.Context, matchingRules []configv1.ClusterCondition) (bool, error) { +func (r *conditionRegistry) Match(ctx context.Context, matchingRules []configv1.ClusterCondition) (bool, error) { var errs []error for _, config := range matchingRules { - condition, ok := Registry[config.Type] + condition, ok := r.registry[config.Type] if !ok { klog.V(2).Infof("Skipping unrecognized cluster condition type %q", config.Type) continue diff --git a/pkg/clusterconditions/clusterconditions_test.go b/pkg/clusterconditions/clusterconditions_test.go index 4acb9b065..d00541306 100644 --- a/pkg/clusterconditions/clusterconditions_test.go +++ b/pkg/clusterconditions/clusterconditions_test.go @@ -8,10 +8,7 @@ import ( "testing" configv1 "github.com/openshift/api/config/v1" - - "github.com/openshift/cluster-version-operator/pkg/clusterconditions" - _ "github.com/openshift/cluster-version-operator/pkg/clusterconditions/always" - _ "github.com/openshift/cluster-version-operator/pkg/clusterconditions/promql" + "github.com/openshift/cluster-version-operator/pkg/clusterconditions/standard" ) // Error implements a cluster condition that always errors. @@ -33,6 +30,7 @@ func (e *Error) Match(ctx context.Context, condition *configv1.ClusterCondition) func TestPruneInvalid(t *testing.T) { ctx := context.Background() + registry := standard.NewConditionRegistry(nil) for _, testCase := range []struct { name string @@ -100,7 +98,7 @@ func TestPruneInvalid(t *testing.T) { }, } { t.Run(testCase.name, func(t *testing.T) { - valid, err := clusterconditions.PruneInvalid(ctx, testCase.conditions) + valid, err := registry.PruneInvalid(ctx, testCase.conditions) if !reflect.DeepEqual(valid, testCase.expectedValid) { t.Errorf("got valid %v but expected %v", valid, testCase.expectedValid) } @@ -117,7 +115,8 @@ func TestPruneInvalid(t *testing.T) { func TestMatch(t *testing.T) { ctx := context.Background() - clusterconditions.Register("Error", &Error{}) + registry := standard.NewConditionRegistry(nil) + registry.Register("Error", &Error{}) for _, testCase := range []struct { name string @@ -181,7 +180,7 @@ func TestMatch(t *testing.T) { }, } { t.Run(testCase.name, func(t *testing.T) { - match, err := clusterconditions.Match(ctx, testCase.conditions) + match, err := registry.Match(ctx, testCase.conditions) if match != testCase.expectedMatch { t.Errorf("got match %t but expected %t", match, testCase.expectedMatch) } @@ -194,6 +193,4 @@ func TestMatch(t *testing.T) { } }) } - - delete(clusterconditions.Registry, "Error") } diff --git a/pkg/clusterconditions/promql/promql.go b/pkg/clusterconditions/promql/promql.go index 5b8406999..7a75046e8 100644 --- a/pkg/clusterconditions/promql/promql.go +++ b/pkg/clusterconditions/promql/promql.go @@ -7,6 +7,7 @@ import ( "context" "errors" "fmt" + "net" "time" configv1 "github.com/openshift/api/config/v1" @@ -14,16 +15,16 @@ import ( prometheusv1 "github.com/prometheus/client_golang/api/prometheus/v1" "github.com/prometheus/common/config" "github.com/prometheus/common/model" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" - "github.com/openshift/cluster-version-operator/pkg/clusterconditions" "github.com/openshift/cluster-version-operator/pkg/clusterconditions/cache" ) // PromQL implements a cluster condition that matches based on PromQL. type PromQL struct { - // Address holds the Prometheus query URI. - Address string + kubeClient kubernetes.Interface // HTTPClientConfig holds the client configuration for connecting to the Prometheus service. HTTPClientConfig config.HTTPClientConfig @@ -32,23 +33,40 @@ type PromQL struct { QueryTimeout time.Duration } -var promql = &cache.Cache{ - Condition: &PromQL{ - Address: "https://thanos-querier.openshift-monitoring.svc.cluster.local:9091", - HTTPClientConfig: config.HTTPClientConfig{ - Authorization: &config.Authorization{ - Type: "Bearer", - CredentialsFile: "/var/run/secrets/kubernetes.io/serviceaccount/token", - }, - TLSConfig: config.TLSConfig{ - CAFile: "/etc/tls/service-ca/service-ca.crt", +func NewPromQL(kubeClient kubernetes.Interface) *cache.Cache { + return &cache.Cache{ + Condition: &PromQL{ + kubeClient: kubeClient, + HTTPClientConfig: config.HTTPClientConfig{ + Authorization: &config.Authorization{ + Type: "Bearer", + CredentialsFile: "/var/run/secrets/kubernetes.io/serviceaccount/token", + }, + TLSConfig: config.TLSConfig{ + CAFile: "/etc/tls/service-ca/service-ca.crt", + // ServerName is used to verify the name of the service we will connect to using IP. + ServerName: "thanos-querier.openshift-monitoring.svc.cluster.local", + }, }, + QueryTimeout: 5 * time.Minute, }, - QueryTimeout: 5 * time.Minute, - }, - MinBetweenMatches: 10 * time.Minute, - MinForCondition: time.Hour, - Expiration: 24 * time.Hour, + MinBetweenMatches: 10 * time.Minute, + MinForCondition: time.Hour, + Expiration: 24 * time.Hour, + } +} + +// Address determines the address of the thanos-querier to avoid requiring service DNS resolution. +// We do this so that our host-network pod can use the node's resolv.conf to resolve the internal load balancer name +// on the pod before DNS pods are available and before the service network is available. The side effect is that +// the CVO cannot resolve service DNS names. +func (p *PromQL) Address(ctx context.Context) (string, error) { + svc, err := p.kubeClient.CoreV1().Services("openshift-monitoring").Get(ctx, "thanos-querier", metav1.GetOptions{}) + if err != nil { + return "", err + } + + return fmt.Sprintf("https://%s", net.JoinHostPort(svc.Spec.ClusterIP, "9091")), nil } // Valid returns an error if the condition contains any properties @@ -69,7 +87,13 @@ func (p *PromQL) Valid(ctx context.Context, condition *configv1.ClusterCondition // false when the PromQL evaluates to 0, and an error if the PromQL // returns no time series or returns a value besides 0 or 1. func (p *PromQL) Match(ctx context.Context, condition *configv1.ClusterCondition) (bool, error) { - clientConfig := api.Config{Address: p.Address} + // Lookup the address every attempt in case the service IP changes. This can happen when the thanos service is + // deleted and recreated. + address, err := p.Address(ctx) + if err != nil { + return false, fmt.Errorf("failure determine thanos IP: %w", err) + } + clientConfig := api.Config{Address: address} if roundTripper, err := config.NewRoundTripperFromConfig(p.HTTPClientConfig, "cluster-conditions"); err == nil { clientConfig.RoundTripper = roundTripper @@ -122,7 +146,3 @@ func (p *PromQL) Match(ctx context.Context, condition *configv1.ClusterCondition } return false, fmt.Errorf("invalid PromQL result (must be 0 or 1): %v", sample.Value) } - -func init() { - clusterconditions.Register("PromQL", promql) -} diff --git a/pkg/clusterconditions/standard/standard.go b/pkg/clusterconditions/standard/standard.go new file mode 100644 index 000000000..97c2814f8 --- /dev/null +++ b/pkg/clusterconditions/standard/standard.go @@ -0,0 +1,16 @@ +package standard + +import ( + "github.com/openshift/cluster-version-operator/pkg/clusterconditions" + "github.com/openshift/cluster-version-operator/pkg/clusterconditions/always" + "github.com/openshift/cluster-version-operator/pkg/clusterconditions/promql" + "k8s.io/client-go/kubernetes" +) + +func NewConditionRegistry(kubeClient kubernetes.Interface) clusterconditions.ConditionRegistry { + conditionRegistry := clusterconditions.NewConditionRegistry() + conditionRegistry.Register("Always", &always.Always{}) + conditionRegistry.Register("PromQL", promql.NewPromQL(kubeClient)) + + return conditionRegistry +} diff --git a/pkg/cvo/availableupdates.go b/pkg/cvo/availableupdates.go index bd50d3f8b..317f21b6c 100644 --- a/pkg/cvo/availableupdates.go +++ b/pkg/cvo/availableupdates.go @@ -69,7 +69,7 @@ func (optr *Operator) syncAvailableUpdates(ctx context.Context, config *configv1 userAgent := optr.getUserAgent() current, updates, conditionalUpdates, condition := calculateAvailableUpdatesStatus(ctx, string(config.Spec.ClusterID), - transport, userAgent, upstream, desiredArch, currentArch, channel, optr.release.Version) + transport, userAgent, upstream, desiredArch, currentArch, channel, optr.release.Version, optr.conditionRegistry) if usedDefaultUpstream { upstream = "" @@ -82,6 +82,7 @@ func (optr *Operator) syncAvailableUpdates(ctx context.Context, config *configv1 Current: current, Updates: updates, ConditionalUpdates: conditionalUpdates, + ConditionRegistry: optr.conditionRegistry, Condition: condition, } @@ -116,6 +117,7 @@ type availableUpdates struct { Current configv1.Release Updates []configv1.Release ConditionalUpdates []configv1.ConditionalUpdate + ConditionRegistry clusterconditions.ConditionRegistry Condition configv1.ClusterOperatorStatusCondition } @@ -213,7 +215,7 @@ func (optr *Operator) getDesiredArchitecture(update *configv1.Update) string { } func calculateAvailableUpdatesStatus(ctx context.Context, clusterID string, transport *http.Transport, userAgent, upstream, desiredArch, - currentArch, channel, version string) (configv1.Release, []configv1.Release, []configv1.ConditionalUpdate, + currentArch, channel, version string, conditionRegistry clusterconditions.ConditionRegistry) (configv1.Release, []configv1.Release, []configv1.ConditionalUpdate, configv1.ClusterOperatorStatusCondition) { var cvoCurrent configv1.Release @@ -270,7 +272,7 @@ func calculateAvailableUpdatesStatus(ctx context.Context, clusterID string, tran } } - current, updates, conditionalUpdates, err := cincinnati.NewClient(uuid, transport, userAgent).GetUpdates(ctx, upstreamURI, desiredArch, + current, updates, conditionalUpdates, err := cincinnati.NewClient(uuid, transport, userAgent, conditionRegistry).GetUpdates(ctx, upstreamURI, desiredArch, currentArch, channel, currentVersion) if err != nil { @@ -308,7 +310,7 @@ func (u *availableUpdates) evaluateConditionalUpdates(ctx context.Context) { }) for i, conditionalUpdate := range u.ConditionalUpdates { - if errorCondition := evaluateConditionalUpdate(ctx, &conditionalUpdate); errorCondition != nil { + if errorCondition := evaluateConditionalUpdate(ctx, &conditionalUpdate, u.ConditionRegistry); errorCondition != nil { meta.SetStatusCondition(&conditionalUpdate.Conditions, *errorCondition) u.removeUpdate(conditionalUpdate.Release.Image) } else { @@ -333,13 +335,13 @@ func (u *availableUpdates) removeUpdate(image string) { } } -func evaluateConditionalUpdate(ctx context.Context, conditionalUpdate *configv1.ConditionalUpdate) *metav1.Condition { +func evaluateConditionalUpdate(ctx context.Context, conditionalUpdate *configv1.ConditionalUpdate, conditionRegistry clusterconditions.ConditionRegistry) *metav1.Condition { recommended := &metav1.Condition{ Type: "Recommended", } messages := []string{} for _, risk := range conditionalUpdate.Risks { - if match, err := clusterconditions.Match(ctx, risk.MatchingRules); err != nil { + if match, err := conditionRegistry.Match(ctx, risk.MatchingRules); err != nil { if recommended.Status != metav1.ConditionFalse { recommended.Status = metav1.ConditionUnknown } diff --git a/pkg/cvo/cvo.go b/pkg/cvo/cvo.go index 1d9310958..3cd561e78 100644 --- a/pkg/cvo/cvo.go +++ b/pkg/cvo/cvo.go @@ -34,6 +34,8 @@ import ( "github.com/openshift/cluster-version-operator/lib/capability" "github.com/openshift/cluster-version-operator/lib/resourcebuilder" "github.com/openshift/cluster-version-operator/lib/validation" + "github.com/openshift/cluster-version-operator/pkg/clusterconditions" + "github.com/openshift/cluster-version-operator/pkg/clusterconditions/standard" cvointernal "github.com/openshift/cluster-version-operator/pkg/cvo/internal" "github.com/openshift/cluster-version-operator/pkg/cvo/internal/dynamicclient" "github.com/openshift/cluster-version-operator/pkg/internal" @@ -130,6 +132,9 @@ type Operator struct { // synchronization upgradeableCheckIntervals upgradeableCheckIntervals + // conditionRegistry is used to evaluate whether a particular condition is risky or not. + conditionRegistry clusterconditions.ConditionRegistry + // verifier, if provided, will be used to check an update before it is executed. // Any error will prevent an update payload from being accessed. verifier verify.Interface @@ -205,6 +210,7 @@ func New( exclude: exclude, requiredFeatureSet: requiredFeatureSet, clusterProfile: clusterProfile, + conditionRegistry: standard.NewConditionRegistry(kubeClient), } cvInformer.Informer().AddEventHandler(optr.clusterVersionEventHandler())