Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,12 @@ func init() {
cmd.PersistentFlags().StringVar(&opts.ReleaseImage, "release-image", opts.ReleaseImage, "The Openshift release image url.")
cmd.PersistentFlags().StringVar(&opts.ServingCertFile, "serving-cert-file", opts.ServingCertFile, "The X.509 certificate file for serving metrics over HTTPS. You must set both --serving-cert-file and --serving-key-file unless you set --listen empty.")
cmd.PersistentFlags().StringVar(&opts.ServingKeyFile, "serving-key-file", opts.ServingKeyFile, "The X.509 key file for serving metrics over HTTPS. You must set both --serving-cert-file and --serving-key-file unless you set --listen empty.")
cmd.PersistentFlags().StringVar(&opts.PromQLTarget.CABundleFile, "metrics-ca-bundle-file", opts.PromQLTarget.CABundleFile, "The service CA bundle file containing one or more X.509 certificate files for validating certificates generated from the service CA for the respective remote PromQL query service.")
cmd.PersistentFlags().StringVar(&opts.PromQLTarget.BearerTokenFile, "metrics-token-file", opts.PromQLTarget.BearerTokenFile, "The bearer token file used to access the remote PromQL query service.")
cmd.PersistentFlags().StringVar(&opts.PromQLTarget.KubeSvc.Namespace, "metrics-namespace", opts.PromQLTarget.KubeSvc.Namespace, "The name of the namespace where the the remote PromQL query service resides. Must be specified when --use-dns-for-services is disabled.")
cmd.PersistentFlags().StringVar(&opts.PromQLTarget.KubeSvc.Name, "metrics-service", opts.PromQLTarget.KubeSvc.Name, "The name of the remote PromQL query service. Must be specified when --use-dns-for-services is disabled.")
cmd.PersistentFlags().BoolVar(&opts.PromQLTarget.UseDNS, "use-dns-for-services", opts.PromQLTarget.UseDNS, "Configures the CVO to use DNS for resolution of services in the cluster.")
cmd.PersistentFlags().StringVar(&opts.PrometheusURLString, "metrics-url", opts.PrometheusURLString, "The URL used to access the remote PromQL query service.")
cmd.PersistentFlags().BoolVar(&opts.InjectClusterIdIntoPromQL, "hypershift", opts.InjectClusterIdIntoPromQL, "This options indicates whether the CVO is running inside a hosted control plane.")
rootCmd.AddCommand(cmd)
}
3 changes: 2 additions & 1 deletion pkg/cincinnati/cincinnati_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"reflect"
"testing"

"github.com/openshift/cluster-version-operator/pkg/clusterconditions"
"github.com/openshift/cluster-version-operator/pkg/clusterconditions/standard"

"github.com/blang/semver/v4"
Expand Down Expand Up @@ -745,7 +746,7 @@ func TestGetUpdates(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(handler))
defer ts.Close()

c := NewClient(clientID, nil, "", standard.NewConditionRegistry(nil))
c := NewClient(clientID, nil, "", standard.NewConditionRegistry(clusterconditions.DefaultPromQLTarget()))

uri, err := url.Parse(ts.URL)
if err != nil {
Expand Down
31 changes: 30 additions & 1 deletion pkg/clusterconditions/clusterconditions.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
// Package clusterconditions implements cluster conditions for
// identifying metching clusters.
// identifying matching clusters.
//
// https://github.com/openshift/enhancements/blob/master/enhancements/update/targeted-update-edge-blocking.md#cluster-condition-type-registry
package clusterconditions

import (
"context"
"fmt"
"net/url"

configv1 "github.com/openshift/api/config/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
)

Expand Down Expand Up @@ -104,3 +107,29 @@ func (r *conditionRegistry) Match(ctx context.Context, matchingRules []configv1.

return false, errors.NewAggregate(errs)
}

type PromQLTarget struct {
KubeClient kubernetes.Interface
UseDNS bool
URL *url.URL
CABundleFile string
BearerTokenFile string

// The Kubernetes service to be used to resolve the IP address when DNS is not used.
KubeSvc types.NamespacedName
}

func DefaultPromQLTarget() PromQLTarget {
promqlURL, _ := url.Parse("https://thanos-querier.openshift-monitoring.svc.cluster.local:9091")
return PromQLTarget{
KubeClient: nil,
UseDNS: false,
URL: promqlURL,
KubeSvc: types.NamespacedName{
Name: "thanos-querier",
Namespace: "openshift-monitoring",
},
CABundleFile: "/etc/tls/service-ca/service-ca.crt",
BearerTokenFile: "/var/run/secrets/kubernetes.io/serviceaccount/token",
}
}
5 changes: 3 additions & 2 deletions pkg/clusterconditions/clusterconditions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"testing"

configv1 "github.com/openshift/api/config/v1"
"github.com/openshift/cluster-version-operator/pkg/clusterconditions"
"github.com/openshift/cluster-version-operator/pkg/clusterconditions/standard"
)

Expand All @@ -30,7 +31,7 @@ func (e *Error) Match(ctx context.Context, condition *configv1.ClusterCondition)

func TestPruneInvalid(t *testing.T) {
ctx := context.Background()
registry := standard.NewConditionRegistry(nil)
registry := standard.NewConditionRegistry(clusterconditions.DefaultPromQLTarget())

for _, testCase := range []struct {
name string
Expand Down Expand Up @@ -115,7 +116,7 @@ func TestPruneInvalid(t *testing.T) {

func TestMatch(t *testing.T) {
ctx := context.Background()
registry := standard.NewConditionRegistry(nil)
registry := standard.NewConditionRegistry(clusterconditions.DefaultPromQLTarget())
registry.Register("Error", &Error{})

for _, testCase := range []struct {
Expand Down
54 changes: 35 additions & 19 deletions pkg/clusterconditions/promql/promql.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"errors"
"fmt"
"net"
"net/url"
"time"

configv1 "github.com/openshift/api/config/v1"
Expand All @@ -16,15 +17,20 @@ import (
"github.com/prometheus/common/config"
"github.com/prometheus/common/model"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"

"github.com/openshift/cluster-version-operator/pkg/clusterconditions"
"github.com/openshift/cluster-version-operator/pkg/clusterconditions/cache"
)

// PromQL implements a cluster condition that matches based on PromQL.
type PromQL struct {
kubeClient kubernetes.Interface
useDNS bool
url *url.URL
kubeSvc types.NamespacedName

// HTTPClientConfig holds the client configuration for connecting to the Prometheus service.
HTTPClientConfig config.HTTPClientConfig
Expand All @@ -33,19 +39,25 @@ type PromQL struct {
QueryTimeout time.Duration
}

func NewPromQL(kubeClient kubernetes.Interface) *cache.Cache {
func NewPromQL(promqlTarget clusterconditions.PromQLTarget) *cache.Cache {
var auth *config.Authorization
if promqlTarget.BearerTokenFile != "" {
auth = &config.Authorization{
Type: "Bearer",
CredentialsFile: promqlTarget.BearerTokenFile,
}
}
return &cache.Cache{
Condition: &PromQL{
kubeClient: kubeClient,
kubeClient: promqlTarget.KubeClient,
useDNS: promqlTarget.UseDNS,
url: promqlTarget.URL,
kubeSvc: promqlTarget.KubeSvc,
HTTPClientConfig: config.HTTPClientConfig{
Authorization: &config.Authorization{
Type: "Bearer",
CredentialsFile: "/var/run/secrets/kubernetes.io/serviceaccount/token",
},
Authorization: auth,
TLSConfig: config.TLSConfig{
CAFile: "/etc/tls/service-ca/service-ca.crt",
// ServerName is used to verify the name of the service we will connect to using IP.
ServerName: "thanos-querier.openshift-monitoring.svc.cluster.local",
CAFile: promqlTarget.CABundleFile,
ServerName: promqlTarget.URL.Hostname(),
},
},
QueryTimeout: 5 * time.Minute,
Expand All @@ -56,17 +68,20 @@ func NewPromQL(kubeClient kubernetes.Interface) *cache.Cache {
}
}

// Address determines the address of the thanos-querier to avoid requiring service DNS resolution.
// We do this so that our host-network pod can use the node's resolv.conf to resolve the internal load balancer name
// on the pod before DNS pods are available and before the service network is available. The side effect is that
// the CVO cannot resolve service DNS names.
func (p *PromQL) Address(ctx context.Context) (string, error) {
svc, err := p.kubeClient.CoreV1().Services("openshift-monitoring").Get(ctx, "thanos-querier", metav1.GetOptions{})
// Host determines the host of the thanos-querier to avoid requiring service DNS resolution
// when DNS for services is disabled. We do this so that our host-network pod can use
// the node's resolv.conf to resolve the internal load balancer name on the pod before
// DNS pods are available and before the service network is available.
func (p *PromQL) Host(ctx context.Context) (string, error) {
if p.useDNS {
return p.url.Host, nil
}

svc, err := p.kubeClient.CoreV1().Services(p.kubeSvc.Namespace).Get(ctx, p.kubeSvc.Name, metav1.GetOptions{})
if err != nil {
return "", err
}

return fmt.Sprintf("https://%s", net.JoinHostPort(svc.Spec.ClusterIP, "9091")), nil
return net.JoinHostPort(svc.Spec.ClusterIP, p.url.Port()), nil
}

// Valid returns an error if the condition contains any properties
Expand All @@ -89,11 +104,12 @@ func (p *PromQL) Valid(ctx context.Context, condition *configv1.ClusterCondition
func (p *PromQL) Match(ctx context.Context, condition *configv1.ClusterCondition) (bool, error) {
// Lookup the address every attempt in case the service IP changes. This can happen when the thanos service is
// deleted and recreated.
address, err := p.Address(ctx)
host, err := p.Host(ctx)
if err != nil {
return false, fmt.Errorf("failure determine thanos IP: %w", err)
}
clientConfig := api.Config{Address: address}
p.url.Host = host
clientConfig := api.Config{Address: p.url.String()}

if roundTripper, err := config.NewRoundTripperFromConfig(p.HTTPClientConfig, "cluster-conditions"); err == nil {
clientConfig.RoundTripper = roundTripper
Expand Down
5 changes: 2 additions & 3 deletions pkg/clusterconditions/standard/standard.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@ import (
"github.com/openshift/cluster-version-operator/pkg/clusterconditions"
"github.com/openshift/cluster-version-operator/pkg/clusterconditions/always"
"github.com/openshift/cluster-version-operator/pkg/clusterconditions/promql"
"k8s.io/client-go/kubernetes"
)

func NewConditionRegistry(kubeClient kubernetes.Interface) clusterconditions.ConditionRegistry {
func NewConditionRegistry(promqlTarget clusterconditions.PromQLTarget) clusterconditions.ConditionRegistry {
conditionRegistry := clusterconditions.NewConditionRegistry()
conditionRegistry.Register("Always", &always.Always{})
conditionRegistry.Register("PromQL", promql.NewPromQL(kubeClient))
conditionRegistry.Register("PromQL", promql.NewPromQL(promqlTarget))

return conditionRegistry
}
26 changes: 24 additions & 2 deletions pkg/cvo/availableupdates.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,18 @@ func (optr *Operator) syncAvailableUpdates(ctx context.Context, config *configv1
}

userAgent := optr.getUserAgent()

current, updates, conditionalUpdates, condition := calculateAvailableUpdatesStatus(ctx, string(config.Spec.ClusterID),
clusterId := string(config.Spec.ClusterID)
current, updates, conditionalUpdates, condition := calculateAvailableUpdatesStatus(ctx, clusterId,
transport, userAgent, upstream, desiredArch, currentArch, channel, optr.release.Version, optr.conditionRegistry)

if usedDefaultUpstream {
upstream = ""
}

if optr.injectClusterIdIntoPromQL {
conditionalUpdates = injectClusterIdIntoConditionalUpdates(clusterId, conditionalUpdates)
}

au := &availableUpdates{
Upstream: upstream,
Channel: config.Spec.Channel,
Expand Down Expand Up @@ -367,3 +371,21 @@ func evaluateConditionalUpdate(ctx context.Context, conditionalUpdate *configv1.
recommended.Message = strings.Join(messages, "\n\n")
return recommended
}

func injectClusterIdIntoConditionalUpdates(clusterId string, updates []configv1.ConditionalUpdate) []configv1.ConditionalUpdate {
for i, update := range updates {
for j, risk := range update.Risks {
for k, rule := range risk.MatchingRules {
if rule.Type == "PromQL" {
newPromQl := injectIdIntoString(clusterId, rule.PromQL.PromQL)
updates[i].Risks[j].MatchingRules[k].PromQL.PromQL = newPromQl
}
}
}
}
return updates
}

func injectIdIntoString(id string, promQL string) string {
return strings.ReplaceAll(promQL, `_id=""`, fmt.Sprintf(`_id="%s"`, id))
}
17 changes: 13 additions & 4 deletions pkg/cvo/cvo.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,12 @@ type Operator struct {
// conditionRegistry is used to evaluate whether a particular condition is risky or not.
conditionRegistry clusterconditions.ConditionRegistry

// injectClusterIdIntoPromQL indicates whether the CVO should inject the cluster id
// into PromQL queries while evaluating risks from conditional updates. This is needed
// in HyperShift to differentiate between metrics from multiple hosted clusters in
// a Prometheus.
injectClusterIdIntoPromQL bool

// verifier, if provided, will be used to check an update before it is executed.
// Any error will prevent an update payload from being accessed.
verifier verify.Interface
Expand Down Expand Up @@ -180,6 +186,8 @@ func New(
exclude string,
requiredFeatureSet string,
clusterProfile string,
promqlTarget clusterconditions.PromQLTarget,
injectClusterIdIntoPromQL bool,
) (*Operator, error) {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(klog.Infof)
Expand Down Expand Up @@ -207,10 +215,11 @@ func New(
availableUpdatesQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "availableupdates"),
upgradeableQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "upgradeable"),

exclude: exclude,
requiredFeatureSet: requiredFeatureSet,
clusterProfile: clusterProfile,
conditionRegistry: standard.NewConditionRegistry(kubeClient),
exclude: exclude,
requiredFeatureSet: requiredFeatureSet,
clusterProfile: clusterProfile,
conditionRegistry: standard.NewConditionRegistry(promqlTarget),
injectClusterIdIntoPromQL: injectClusterIdIntoPromQL,
}

if _, err := cvInformer.Informer().AddEventHandler(optr.clusterVersionEventHandler()); err != nil {
Expand Down
Loading