Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@ import (

"github.com/spf13/cobra"
"k8s.io/klog/v2"

_ "github.com/openshift/cluster-version-operator/pkg/clusterconditions/always"
_ "github.com/openshift/cluster-version-operator/pkg/clusterconditions/promql"
)

var (
Expand Down
4 changes: 3 additions & 1 deletion install/0000_00_cluster-version-operator_03_deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ spec:
fieldPath: spec.nodeName
- name: CLUSTER_PROFILE
value: {{ .ClusterProfile }}
dnsPolicy: ClusterFirstWithHostNet
# this pod is hostNetwork and uses the internal LB DNS name when possible, which the kubelet also uses.
# this dnsPolicy allows us to use the same dnsConfig as the kubelet, without access to read it ourselves.
dnsPolicy: Default
hostNetwork: true
nodeSelector:
node-role.kubernetes.io/master: ""
Expand Down
13 changes: 8 additions & 5 deletions pkg/cincinnati/cincinnati.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,17 @@ type Client struct {
// requests. If empty, the User-Agent header will not be
// populated.
userAgent string

conditionRegistry clusterconditions.ConditionRegistry
}

// NewClient creates a new Cincinnati client with the given client identifier.
func NewClient(id uuid.UUID, transport *http.Transport, userAgent string) Client {
func NewClient(id uuid.UUID, transport *http.Transport, userAgent string, conditionRegistry clusterconditions.ConditionRegistry) Client {
return Client{
id: id,
transport: transport,
userAgent: userAgent,
id: id,
transport: transport,
userAgent: userAgent,
conditionRegistry: conditionRegistry,
}
}

Expand Down Expand Up @@ -246,7 +249,7 @@ func (c Client) GetUpdates(ctx context.Context, uri *url.URL, desiredArch, curre

for i := len(conditionalUpdates) - 1; i >= 0; i-- {
for j, risk := range conditionalUpdates[i].Risks {
conditionalUpdates[i].Risks[j].MatchingRules, err = clusterconditions.PruneInvalid(ctx, risk.MatchingRules)
conditionalUpdates[i].Risks[j].MatchingRules, err = c.conditionRegistry.PruneInvalid(ctx, risk.MatchingRules)
if len(conditionalUpdates[i].Risks[j].MatchingRules) == 0 {
klog.Warningf("Conditional update to %s, risk %q, has empty pruned matchingRules; dropping this target to avoid rejections when pushing to the Kubernetes API server. Pruning results: %s", conditionalUpdates[i].Release.Version, risk.Name, err)
conditionalUpdates = append(conditionalUpdates[:i], conditionalUpdates[i+1:]...)
Expand Down
6 changes: 3 additions & 3 deletions pkg/cincinnati/cincinnati_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ import (
"reflect"
"testing"

"github.com/openshift/cluster-version-operator/pkg/clusterconditions/standard"

"github.com/blang/semver/v4"
"github.com/google/uuid"
configv1 "github.com/openshift/api/config/v1"
_ "github.com/openshift/cluster-version-operator/pkg/clusterconditions/always"
_ "github.com/openshift/cluster-version-operator/pkg/clusterconditions/promql"
_ "k8s.io/klog/v2" // integration tests set glog flags.
)

Expand Down Expand Up @@ -745,7 +745,7 @@ func TestGetUpdates(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(handler))
defer ts.Close()

c := NewClient(clientID, nil, "")
c := NewClient(clientID, nil, "", standard.NewConditionRegistry(nil))

uri, err := url.Parse(ts.URL)
if err != nil {
Expand Down
7 changes: 0 additions & 7 deletions pkg/clusterconditions/always/always.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,11 @@ import (
"errors"

configv1 "github.com/openshift/api/config/v1"
"github.com/openshift/cluster-version-operator/pkg/clusterconditions"
)

// Always implements a cluster condition that always matches.
type Always struct{}

var always = &Always{}

// Valid returns an error if the condition contains any properties
// besides 'type'.
func (a *Always) Valid(ctx context.Context, condition *configv1.ClusterCondition) error {
Expand All @@ -30,7 +27,3 @@ func (a *Always) Valid(ctx context.Context, condition *configv1.ClusterCondition
func (a *Always) Match(ctx context.Context, condition *configv1.ClusterCondition) (bool, error) {
return true, nil
}

func init() {
clusterconditions.Register("Always", always)
}
45 changes: 34 additions & 11 deletions pkg/clusterconditions/clusterconditions.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,28 +25,51 @@ type Condition interface {
Match(ctx context.Context, condition *configv1.ClusterCondition) (bool, error)
}

// Registry is a registry of implemented condition types.
var Registry map[string]Condition
type ConditionRegistry interface {
// Register registers a condition type, and panics on any name collisions.
Register(conditionType string, condition Condition)

// PruneInvalid returns a new slice with recognized, valid conditions.
// The error complains about any unrecognized or invalid conditions.
PruneInvalid(ctx context.Context, matchingRules []configv1.ClusterCondition) ([]configv1.ClusterCondition, error)

// Match returns whether the cluster matches the given rules (true),
// does not match (false), or the rules fail to evaluate (error).
Match(ctx context.Context, matchingRules []configv1.ClusterCondition) (bool, error)
}

type conditionRegistry struct {
// registry is a registry of implemented condition types.
registry map[string]Condition
}

func NewConditionRegistry() ConditionRegistry {
ret := &conditionRegistry{
registry: map[string]Condition{},
}

return ret
}

// Register registers a condition type, and panics on any name collisions.
func Register(conditionType string, condition Condition) {
if Registry == nil {
Registry = make(map[string]Condition, 1)
func (r *conditionRegistry) Register(conditionType string, condition Condition) {
if r.registry == nil {
r.registry = make(map[string]Condition, 1)
}
if existing, ok := Registry[conditionType]; ok && condition != existing {
if existing, ok := r.registry[conditionType]; ok && condition != existing {
panic(fmt.Sprintf("cluster condition %q already registered", conditionType))
}
Registry[conditionType] = condition
r.registry[conditionType] = condition
}

// PruneInvalid returns a new slice with recognized, valid conditions.
// The error complains about any unrecognized or invalid conditions.
func PruneInvalid(ctx context.Context, matchingRules []configv1.ClusterCondition) ([]configv1.ClusterCondition, error) {
func (r *conditionRegistry) PruneInvalid(ctx context.Context, matchingRules []configv1.ClusterCondition) ([]configv1.ClusterCondition, error) {
var valid []configv1.ClusterCondition
var errs []error

for _, config := range matchingRules {
condition, ok := Registry[config.Type]
condition, ok := r.registry[config.Type]
if !ok {
errs = append(errs, fmt.Errorf("Skipping unrecognized cluster condition type %q", config.Type))
continue
Expand All @@ -63,11 +86,11 @@ func PruneInvalid(ctx context.Context, matchingRules []configv1.ClusterCondition

// Match returns whether the cluster matches the given rules (true),
// does not match (false), or the rules fail to evaluate (error).
func Match(ctx context.Context, matchingRules []configv1.ClusterCondition) (bool, error) {
func (r *conditionRegistry) Match(ctx context.Context, matchingRules []configv1.ClusterCondition) (bool, error) {
var errs []error

for _, config := range matchingRules {
condition, ok := Registry[config.Type]
condition, ok := r.registry[config.Type]
if !ok {
klog.V(2).Infof("Skipping unrecognized cluster condition type %q", config.Type)
continue
Expand Down
15 changes: 6 additions & 9 deletions pkg/clusterconditions/clusterconditions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,7 @@ import (
"testing"

configv1 "github.com/openshift/api/config/v1"

"github.com/openshift/cluster-version-operator/pkg/clusterconditions"
_ "github.com/openshift/cluster-version-operator/pkg/clusterconditions/always"
_ "github.com/openshift/cluster-version-operator/pkg/clusterconditions/promql"
"github.com/openshift/cluster-version-operator/pkg/clusterconditions/standard"
)

// Error implements a cluster condition that always errors.
Expand All @@ -33,6 +30,7 @@ func (e *Error) Match(ctx context.Context, condition *configv1.ClusterCondition)

func TestPruneInvalid(t *testing.T) {
ctx := context.Background()
registry := standard.NewConditionRegistry(nil)

for _, testCase := range []struct {
name string
Expand Down Expand Up @@ -100,7 +98,7 @@ func TestPruneInvalid(t *testing.T) {
},
} {
t.Run(testCase.name, func(t *testing.T) {
valid, err := clusterconditions.PruneInvalid(ctx, testCase.conditions)
valid, err := registry.PruneInvalid(ctx, testCase.conditions)
if !reflect.DeepEqual(valid, testCase.expectedValid) {
t.Errorf("got valid %v but expected %v", valid, testCase.expectedValid)
}
Expand All @@ -117,7 +115,8 @@ func TestPruneInvalid(t *testing.T) {

func TestMatch(t *testing.T) {
ctx := context.Background()
clusterconditions.Register("Error", &Error{})
registry := standard.NewConditionRegistry(nil)
registry.Register("Error", &Error{})

for _, testCase := range []struct {
name string
Expand Down Expand Up @@ -181,7 +180,7 @@ func TestMatch(t *testing.T) {
},
} {
t.Run(testCase.name, func(t *testing.T) {
match, err := clusterconditions.Match(ctx, testCase.conditions)
match, err := registry.Match(ctx, testCase.conditions)
if match != testCase.expectedMatch {
t.Errorf("got match %t but expected %t", match, testCase.expectedMatch)
}
Expand All @@ -194,6 +193,4 @@ func TestMatch(t *testing.T) {
}
})
}

delete(clusterconditions.Registry, "Error")
}
66 changes: 43 additions & 23 deletions pkg/clusterconditions/promql/promql.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,24 @@ import (
"context"
"errors"
"fmt"
"net"
"time"

configv1 "github.com/openshift/api/config/v1"
"github.com/prometheus/client_golang/api"
prometheusv1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/config"
"github.com/prometheus/common/model"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"

"github.com/openshift/cluster-version-operator/pkg/clusterconditions"
"github.com/openshift/cluster-version-operator/pkg/clusterconditions/cache"
)

// PromQL implements a cluster condition that matches based on PromQL.
type PromQL struct {
// Address holds the Prometheus query URI.
Address string
kubeClient kubernetes.Interface

// HTTPClientConfig holds the client configuration for connecting to the Prometheus service.
HTTPClientConfig config.HTTPClientConfig
Expand All @@ -32,23 +33,40 @@ type PromQL struct {
QueryTimeout time.Duration
}

var promql = &cache.Cache{
Condition: &PromQL{
Address: "https://thanos-querier.openshift-monitoring.svc.cluster.local:9091",
HTTPClientConfig: config.HTTPClientConfig{
Authorization: &config.Authorization{
Type: "Bearer",
CredentialsFile: "/var/run/secrets/kubernetes.io/serviceaccount/token",
},
TLSConfig: config.TLSConfig{
CAFile: "/etc/tls/service-ca/service-ca.crt",
func NewPromQL(kubeClient kubernetes.Interface) *cache.Cache {
return &cache.Cache{
Condition: &PromQL{
kubeClient: kubeClient,
HTTPClientConfig: config.HTTPClientConfig{
Authorization: &config.Authorization{
Type: "Bearer",
CredentialsFile: "/var/run/secrets/kubernetes.io/serviceaccount/token",
},
TLSConfig: config.TLSConfig{
CAFile: "/etc/tls/service-ca/service-ca.crt",
// ServerName is used to verify the name of the service we will connect to using IP.
ServerName: "thanos-querier.openshift-monitoring.svc.cluster.local",
},
},
QueryTimeout: 5 * time.Minute,
},
QueryTimeout: 5 * time.Minute,
},
MinBetweenMatches: 10 * time.Minute,
MinForCondition: time.Hour,
Expiration: 24 * time.Hour,
MinBetweenMatches: 10 * time.Minute,
MinForCondition: time.Hour,
Expiration: 24 * time.Hour,
}
}

// Address determines the address of the thanos-querier to avoid requiring service DNS resolution.
// We do this so that our host-network pod can use the node's resolv.conf to resolve the internal load balancer name
// on the pod before DNS pods are available and before the service network is available. The side effect is that
// the CVO cannot resolve service DNS names.
func (p *PromQL) Address(ctx context.Context) (string, error) {
svc, err := p.kubeClient.CoreV1().Services("openshift-monitoring").Get(ctx, "thanos-querier", metav1.GetOptions{})
if err != nil {
return "", err
}

return fmt.Sprintf("https://%s", net.JoinHostPort(svc.Spec.ClusterIP, "9091")), nil
}

// Valid returns an error if the condition contains any properties
Expand All @@ -69,7 +87,13 @@ func (p *PromQL) Valid(ctx context.Context, condition *configv1.ClusterCondition
// false when the PromQL evaluates to 0, and an error if the PromQL
// returns no time series or returns a value besides 0 or 1.
func (p *PromQL) Match(ctx context.Context, condition *configv1.ClusterCondition) (bool, error) {
clientConfig := api.Config{Address: p.Address}
// Lookup the address every attempt in case the service IP changes. This can happen when the thanos service is
// deleted and recreated.
address, err := p.Address(ctx)
if err != nil {
return false, fmt.Errorf("failure determine thanos IP: %w", err)
}
clientConfig := api.Config{Address: address}

if roundTripper, err := config.NewRoundTripperFromConfig(p.HTTPClientConfig, "cluster-conditions"); err == nil {
clientConfig.RoundTripper = roundTripper
Expand Down Expand Up @@ -122,7 +146,3 @@ func (p *PromQL) Match(ctx context.Context, condition *configv1.ClusterCondition
}
return false, fmt.Errorf("invalid PromQL result (must be 0 or 1): %v", sample.Value)
}

func init() {
clusterconditions.Register("PromQL", promql)
}
16 changes: 16 additions & 0 deletions pkg/clusterconditions/standard/standard.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package standard

import (
"github.com/openshift/cluster-version-operator/pkg/clusterconditions"
"github.com/openshift/cluster-version-operator/pkg/clusterconditions/always"
"github.com/openshift/cluster-version-operator/pkg/clusterconditions/promql"
"k8s.io/client-go/kubernetes"
)

func NewConditionRegistry(kubeClient kubernetes.Interface) clusterconditions.ConditionRegistry {
conditionRegistry := clusterconditions.NewConditionRegistry()
conditionRegistry.Register("Always", &always.Always{})
conditionRegistry.Register("PromQL", promql.NewPromQL(kubeClient))

return conditionRegistry
}
Loading