diff --git a/Dockerfile b/Dockerfile index 67308e1c01..d1ee9308aa 100644 --- a/Dockerfile +++ b/Dockerfile @@ -11,5 +11,4 @@ COPY --from=builder /go/src/github.com/openshift/cluster-etcd-operator/bindata/b COPY --from=builder /go/src/github.com/openshift/cluster-etcd-operator/cluster-etcd-operator /usr/bin/ COPY --from=builder /go/src/github.com/openshift/cluster-etcd-operator/manifests/ /manifests/ -#TODO uncomment below when we want to be part of CVO -#LABEL io.openshift.release.operator true +LABEL io.openshift.release.operator true diff --git a/Dockerfile.rhel7 b/Dockerfile.rhel7 index b200bb67b4..0bfdf5d9a0 100644 --- a/Dockerfile.rhel7 +++ b/Dockerfile.rhel7 @@ -11,5 +11,4 @@ COPY --from=builder /go/src/github.com/openshift/cluster-etcd-operator/bindata/b COPY --from=builder /go/src/github.com/openshift/cluster-etcd-operator/cluster-etcd-operator /usr/bin/ COPY --from=builder /go/src/github.com/openshift/cluster-etcd-operator/manifests/ /manifests/ -#TODO uncomment below when we want to be part of CVO -#LABEL io.openshift.release.operator true +LABEL io.openshift.release.operator true diff --git a/manifests/0000_12_etcd-operator_01_operator.cr.yaml b/manifests/0000_12_etcd-operator_01_operator.cr.yaml index f191730362..47c7e0a35e 100644 --- a/manifests/0000_12_etcd-operator_01_operator.cr.yaml +++ b/manifests/0000_12_etcd-operator_01_operator.cr.yaml @@ -5,4 +5,4 @@ metadata: annotations: release.openshift.io/create-only: "true" spec: - managementState: Managed + managementState: Unmanaged diff --git a/manifests/0000_12_etcd-operator_06_deployment.yaml b/manifests/0000_12_etcd-operator_06_deployment.yaml index 107103e751..2fe1a1878e 100644 --- a/manifests/0000_12_etcd-operator_06_deployment.yaml +++ b/manifests/0000_12_etcd-operator_06_deployment.yaml @@ -7,6 +7,8 @@ metadata: app: etcd-operator spec: replicas: 1 + strategy: + type: Recreate selector: matchLabels: app: etcd-operator @@ -20,7 +22,7 @@ spec: containers: - name: operator image: quay.io/openshift/cluster-etcd-operator:v4.0 - imagePullPolicy: Always + imagePullPolicy: IfNotPresent ports: - containerPort: 8443 name: metrics @@ -70,5 +72,16 @@ spec: secretName: etcd-client nodeSelector: node-role.kubernetes.io/master: "" + priorityClassName: "system-cluster-critical" tolerations: - - operator: Exists + - key: "node-role.kubernetes.io/master" + operator: "Exists" + effect: "NoSchedule" + - key: "node.kubernetes.io/unreachable" + operator: "Exists" + effect: "NoExecute" + tolerationSeconds: 120 + - key: "node.kubernetes.io/not-ready" + operator: "Exists" + effect: "NoExecute" + tolerationSeconds: 120 diff --git a/manifests/0000_12_etcd-operator_06_static_pod_demonset.yaml b/manifests/0000_12_etcd-operator_06_static_pod_demonset.yaml index fea63ee2ce..5ae77030b9 100644 --- a/manifests/0000_12_etcd-operator_06_static_pod_demonset.yaml +++ b/manifests/0000_12_etcd-operator_06_static_pod_demonset.yaml @@ -45,7 +45,7 @@ spec: effect: NoSchedule containers: - image: "quay.io/openshift/cluster-etcd-operator:latest" - imagePullPolicy: Always + imagePullPolicy: IfNotPresent name: etcd-staticpod command: ["/usr/bin/cluster-etcd-operator"] args: diff --git a/manifests/0000_12_etcd-operator_06_static_sync_demonset.yaml b/manifests/0000_12_etcd-operator_06_static_sync_demonset.yaml index 1f4fa19e63..954b0f5520 100644 --- a/manifests/0000_12_etcd-operator_06_static_sync_demonset.yaml +++ b/manifests/0000_12_etcd-operator_06_static_sync_demonset.yaml @@ -45,7 +45,7 @@ spec: effect: NoSchedule containers: - image: "quay.io/openshift/cluster-etcd-operator:latest" - imagePullPolicy: Always + imagePullPolicy: IfNotPresent name: etcd-staticsync command: ["/usr/bin/cluster-etcd-operator"] args: diff --git a/pkg/cmd/staticpodcontroller/staticpodcontroller.go b/pkg/cmd/staticpodcontroller/staticpodcontroller.go index 99d25cbd4c..ddb1a735f6 100644 --- a/pkg/cmd/staticpodcontroller/staticpodcontroller.go +++ b/pkg/cmd/staticpodcontroller/staticpodcontroller.go @@ -23,7 +23,11 @@ import ( "k8s.io/client-go/util/workqueue" "k8s.io/klog" - operatorclient "github.com/openshift/client-go/operator/clientset/versioned/typed/operator/v1" + operatorv1 "github.com/openshift/api/operator/v1" + operatorversionedclient "github.com/openshift/client-go/operator/clientset/versioned" + operatorv1informers "github.com/openshift/client-go/operator/informers/externalversions" + "github.com/openshift/cluster-etcd-operator/pkg/operator/operatorclient" + "github.com/openshift/library-go/pkg/operator/v1helpers" mcfgclientset "github.com/openshift/machine-config-operator/pkg/generated/clientset/versioned" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -97,11 +101,16 @@ func (s *podOpts) Run() error { if err != nil { return err } - operatorClient, err := operatorclient.NewForConfig(clientConfig) + operatorConfigClient, err := operatorversionedclient.NewForConfig(clientConfig) if err != nil { return err } - etcdOperatorClient := operatorClient.Etcds() + + operatorConfigInformers := operatorv1informers.NewSharedInformerFactory(operatorConfigClient, 10*time.Minute) + operatorClient := &operatorclient.OperatorClient{ + Informers: operatorConfigInformers, + Client: operatorConfigClient.OperatorV1(), + } kubeInformerFactory := informers.NewFilteredSharedInformerFactory(clientset, 0, etcdNamespace, nil) nodeName := os.Getenv("NODE_NAME") @@ -119,7 +128,7 @@ func (s *podOpts) Run() error { eventRecorder := events.NewKubeRecorder(clientset.CoreV1().Events(etcdNamespace), "static-pod-controller-"+localEtcdName, controllerRef) staticPodController := NewStaticPodController( - etcdOperatorClient, + operatorClient, kubeInformerFactory, clientset, clientmc, @@ -136,7 +145,7 @@ func (s *podOpts) Run() error { } type StaticPodController struct { - etcdOperatorClient operatorclient.EtcdInterface + operatorConfigClient v1helpers.OperatorClient podInformer corev1informer.SecretInformer kubeInformersForOpenshiftEtcdNamespace cache.SharedIndexInformer clientset corev1client.Interface @@ -149,7 +158,7 @@ type StaticPodController struct { } func NewStaticPodController( - etcdOperatorClient operatorclient.EtcdInterface, + operatorConfigClient v1helpers.OperatorClient, kubeInformersForOpenshiftEtcdNamespace informers.SharedInformerFactory, clientset corev1client.Interface, clientmc mcfgclientset.Interface, @@ -157,11 +166,11 @@ func NewStaticPodController( eventRecorder events.Recorder, ) *StaticPodController { c := &StaticPodController{ - etcdOperatorClient: etcdOperatorClient, - eventRecorder: eventRecorder.WithComponentSuffix("static-pod-controller-" + localEtcdName), - clientset: clientset, - clientmc: clientmc, - localEtcdName: localEtcdName, + operatorConfigClient: operatorConfigClient, + eventRecorder: eventRecorder.WithComponentSuffix("static-pod-controller-" + localEtcdName), + clientset: clientset, + clientmc: clientmc, + localEtcdName: localEtcdName, queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "StaticPodController"), } @@ -172,6 +181,21 @@ func NewStaticPodController( } func (c *StaticPodController) sync() error { + operatorSpec, _, _, err := c.operatorConfigClient.GetOperatorState() + if err != nil { + return err + } + switch operatorSpec.ManagementState { + case operatorv1.Managed: + case operatorv1.Unmanaged: + return nil + case operatorv1.Removed: + // TODO should we support removal? + return nil + default: + c.eventRecorder.Warningf("ManagementStateUnknown", "Unrecognized operator management state %q", operatorSpec.ManagementState) + return nil + } pod, err := c.clientset.CoreV1().Pods(etcdNamespace).Get(c.localEtcdName, metav1.GetOptions{}) if err != nil { klog.Infof("No Pod found in %s with name %s", etcdNamespace, c.localEtcdName) @@ -234,13 +258,13 @@ func (c *StaticPodController) IsMemberRemove(name string) bool { func (c *StaticPodController) PendingMemberList() ([]ceoapi.Member, error) { configPath := []string{"cluster", "pending"} - operatorSpec, err := c.etcdOperatorClient.Get("cluster", metav1.GetOptions{}) + operatorSpec, _, _, err := c.operatorConfigClient.GetOperatorState() if err != nil { return nil, err } config := map[string]interface{}{} - if err := json.NewDecoder(bytes.NewBuffer(operatorSpec.Spec.ObservedConfig.Raw)).Decode(&config); err != nil { + if err := json.NewDecoder(bytes.NewBuffer(operatorSpec.ObservedConfig.Raw)).Decode(&config); err != nil { klog.V(4).Infof("decode of existing config failed with error: %v", err) } data, exists, err := unstructured.NestedSlice(config, configPath...) diff --git a/pkg/cmd/staticsynccontroller/staticsynccontroller.go b/pkg/cmd/staticsynccontroller/staticsynccontroller.go index f48b55e66d..b0076434c5 100644 --- a/pkg/cmd/staticsynccontroller/staticsynccontroller.go +++ b/pkg/cmd/staticsynccontroller/staticsynccontroller.go @@ -7,8 +7,13 @@ import ( "os" "time" + operatorv1 "github.com/openshift/api/operator/v1" + operatorversionedclient "github.com/openshift/client-go/operator/clientset/versioned" + operatorv1informers "github.com/openshift/client-go/operator/informers/externalversions" + "github.com/openshift/cluster-etcd-operator/pkg/operator/operatorclient" "github.com/openshift/cluster-etcd-operator/pkg/version" "github.com/openshift/library-go/pkg/operator/events" + "github.com/openshift/library-go/pkg/operator/v1helpers" "github.com/spf13/cobra" "github.com/spf13/pflag" "k8s.io/client-go/kubernetes" @@ -79,6 +84,16 @@ func (s *syncOpts) Run() error { if err != nil { return err } + operatorConfigClient, err := operatorversionedclient.NewForConfig(clientConfig) + if err != nil { + return err + } + + operatorConfigInformers := operatorv1informers.NewSharedInformerFactory(operatorConfigClient, 10*time.Minute) + operatorClient := &operatorclient.OperatorClient{ + Informers: operatorConfigInformers, + Client: operatorConfigClient.OperatorV1(), + } //TODO: util v6j controllerRef, err := events.GetControllerReferenceForCurrentPod(clientset, etcdNamespace, nil) @@ -91,6 +106,7 @@ func (s *syncOpts) Run() error { kubeInformerFactory := informers.NewFilteredSharedInformerFactory(clientset, 0, etcdNamespace, nil) staticSyncController := NewStaticSyncController( + operatorClient, kubeInformerFactory, eventRecorder, ) @@ -104,6 +120,7 @@ func (s *syncOpts) Run() error { } type StaticSyncController struct { + operatorConfigClient v1helpers.OperatorClient podInformer corev1informer.SecretInformer kubeInformersForOpenshiftEtcdNamespace cache.SharedIndexInformer @@ -113,12 +130,14 @@ type StaticSyncController struct { } func NewStaticSyncController( + operatorConfigClient v1helpers.OperatorClient, kubeInformersForOpenshiftEtcdNamespace informers.SharedInformerFactory, eventRecorder events.Recorder, ) *StaticSyncController { c := &StaticSyncController{ - eventRecorder: eventRecorder.WithComponentSuffix("resource-sync-controller-" + os.Getenv("NODE_NAME")), - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ResourceSyncController"), + operatorConfigClient: operatorConfigClient, + eventRecorder: eventRecorder.WithComponentSuffix("resource-sync-controller-" + os.Getenv("NODE_NAME")), + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ResourceSyncController"), } kubeInformersForOpenshiftEtcdNamespace.Core().V1().Secrets().Informer().AddEventHandler(c.eventHandler()) @@ -126,6 +145,21 @@ func NewStaticSyncController( } func (c *StaticSyncController) sync() error { + operatorSpec, _, _, err := c.operatorConfigClient.GetOperatorState() + if err != nil { + return err + } + switch operatorSpec.ManagementState { + case operatorv1.Managed: + case operatorv1.Unmanaged: + return nil + case operatorv1.Removed: + // TODO should we support removal? + return nil + default: + c.eventRecorder.Warningf("ManagementStateUnknown", "Unrecognized operator management state %q", operatorSpec.ManagementState) + return nil + } // if anything changes we copy assets := [4]string{ "namespace", diff --git a/pkg/operator/bootstrapteardown/teardown.go b/pkg/operator/bootstrapteardown/teardown.go index acd35fbff8..c6e06e5e63 100644 --- a/pkg/operator/bootstrapteardown/teardown.go +++ b/pkg/operator/bootstrapteardown/teardown.go @@ -6,8 +6,10 @@ import ( "k8s.io/client-go/rest" configv1 "github.com/openshift/api/config/v1" + operatorv1 "github.com/openshift/api/operator/v1" "github.com/openshift/cluster-etcd-operator/pkg/operator/clustermembercontroller" cov1helpers "github.com/openshift/library-go/pkg/config/clusteroperator/v1helpers" + "github.com/openshift/library-go/pkg/operator/v1helpers" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/tools/cache" @@ -18,7 +20,7 @@ import ( ) func TearDownBootstrap(config *rest.Config, - clusterMemberShipController *clustermembercontroller.ClusterMemberController) error { + clusterMemberShipController *clustermembercontroller.ClusterMemberController, operatorClient v1helpers.OperatorClient) error { failing := configv1.ClusterStatusConditionType("Failing") var lastError string var err error @@ -53,12 +55,23 @@ func TearDownBootstrap(config *rest.Config, }, ) + operatorSpec, _, _, err := operatorClient.GetOperatorState() + if err != nil { + return err + } + if err == nil { klog.Infof("clusterversions is available, safe to remove bootstrap") - if clusterMemberShipController.IsMember("etcd-bootstrap") { - return clusterMemberShipController.RemoveBootstrap() + switch operatorSpec.ManagementState { + case operatorv1.Managed: + if clusterMemberShipController.IsMember("etcd-bootstrap") { + return clusterMemberShipController.RemoveBootstrap() + } + break + case operatorv1.Unmanaged: + break } - return nil + // TODO handle default } return nil diff --git a/pkg/operator/clustermembercontroller/clustermembercontroller.go b/pkg/operator/clustermembercontroller/clustermembercontroller.go index 749e8945b8..b17b5343fa 100644 --- a/pkg/operator/clustermembercontroller/clustermembercontroller.go +++ b/pkg/operator/clustermembercontroller/clustermembercontroller.go @@ -72,6 +72,45 @@ func NewClusterMemberController( } func (c *ClusterMemberController) sync() error { + operatorSpec, _, _, err := c.operatorConfigClient.GetOperatorState() + if err != nil { + return err + } + switch operatorSpec.ManagementState { + case operatorv1.Managed: + case operatorv1.Unmanaged: + condUpgradable := operatorv1.OperatorCondition{ + Type: operatorv1.OperatorStatusTypeUpgradeable, + Status: operatorv1.ConditionFalse, + } + condProgressing := operatorv1.OperatorCondition{ + Type: operatorv1.OperatorStatusTypeProgressing, + Status: operatorv1.ConditionFalse, + } + condAvailable := operatorv1.OperatorCondition{ + Type: operatorv1.OperatorStatusTypeAvailable, + Status: operatorv1.ConditionTrue, + } + condDegraded := operatorv1.OperatorCondition{ + Type: operatorv1.OperatorStatusTypeDegraded, + Status: operatorv1.ConditionFalse, + } + if _, _, updateError := v1helpers.UpdateStatus(c.operatorConfigClient, + v1helpers.UpdateConditionFn(condUpgradable), + v1helpers.UpdateConditionFn(condProgressing), + v1helpers.UpdateConditionFn(condDegraded), + v1helpers.UpdateConditionFn(condAvailable)); updateError != nil { + return updateError + } + return nil + case operatorv1.Removed: + // TODO should we support removal? + return nil + default: + c.eventRecorder.Warningf("ManagementStateUnknown", "Unrecognized operator management state %q", operatorSpec.ManagementState) + return nil + } + pods, err := c.clientset.CoreV1().Pods("openshift-etcd").List(metav1.ListOptions{LabelSelector: "k8s-app=etcd"}) if err != nil { klog.Infof("No Pod found in openshift-etcd with label k8s-app=etcd") diff --git a/pkg/operator/configobservation/configobservercontroller/observe_config_controller.go b/pkg/operator/configobservation/configobservercontroller/observe_config_controller.go index 70efdbba54..13a0206fa1 100644 --- a/pkg/operator/configobservation/configobservercontroller/observe_config_controller.go +++ b/pkg/operator/configobservation/configobservercontroller/observe_config_controller.go @@ -52,6 +52,7 @@ func NewConfigObserver( ResourceSync: resourceSyncer, PreRunCachesSynced: append(configMapPreRunCacheSynced, + operatorClient.Informer().HasSynced, operatorConfigInformers.Operator().V1().Etcds().Informer().HasSynced, kubeInformersForNamespaces.InformersFor("openshift-etcd").Core().V1().Endpoints().Informer().HasSynced, diff --git a/pkg/operator/etcdcertsigner/etcdcertsignercontroller.go b/pkg/operator/etcdcertsigner/etcdcertsignercontroller.go index 59c303d0e4..6e6fda6ee2 100644 --- a/pkg/operator/etcdcertsigner/etcdcertsignercontroller.go +++ b/pkg/operator/etcdcertsigner/etcdcertsignercontroller.go @@ -19,6 +19,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/util/sets" + operatorv1 "github.com/openshift/api/operator/v1" "github.com/openshift/library-go/pkg/operator/events" "github.com/openshift/library-go/pkg/operator/v1helpers" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -110,6 +111,22 @@ func (c *EtcdCertSignerController) processNextWorkItem() bool { } func (c *EtcdCertSignerController) sync() error { + operatorSpec, _, _, err := c.operatorConfigClient.GetOperatorState() + if err != nil { + return err + } + switch operatorSpec.ManagementState { + case operatorv1.Managed: + case operatorv1.Unmanaged: + return nil + case operatorv1.Removed: + // TODO should we support removal? + return nil + default: + c.eventRecorder.Warningf("ManagementStateUnknown", "Unrecognized operator management state %q", operatorSpec.ManagementState) + return nil + } + // TODO: make the namespace and name constants in one of the packages cm, err := c.clientset.CoreV1().ConfigMaps(etcdNamespace).Get("member-config", metav1.GetOptions{}) if err != nil { diff --git a/pkg/operator/hostetcdendpointcontroller/hostendpointcontroller.go b/pkg/operator/hostetcdendpointcontroller/hostendpointcontroller.go index 9f1bbecb4a..496da37ba7 100644 --- a/pkg/operator/hostetcdendpointcontroller/hostendpointcontroller.go +++ b/pkg/operator/hostetcdendpointcontroller/hostendpointcontroller.go @@ -6,6 +6,7 @@ import ( "strconv" "time" + operatorv1 "github.com/openshift/api/operator/v1" "github.com/openshift/cluster-etcd-operator/pkg/operator/clustermembercontroller" "github.com/openshift/library-go/pkg/operator/events" "github.com/openshift/library-go/pkg/operator/v1helpers" @@ -111,6 +112,22 @@ func (h *HostEtcdEndpointController) eventHandler() cache.ResourceEventHandler { } func (h *HostEtcdEndpointController) sync() error { + operatorSpec, _, _, err := h.operatorConfigClient.GetOperatorState() + if err != nil { + return err + } + switch operatorSpec.ManagementState { + case operatorv1.Managed: + case operatorv1.Unmanaged: + return nil + case operatorv1.Removed: + // TODO should we support removal? + return nil + default: + h.eventRecorder.Warningf("ManagementStateUnknown", "Unrecognized operator management state %q", operatorSpec.ManagementState) + return nil + } + ep, err := h.clientset.CoreV1().Endpoints(clustermembercontroller.EtcdEndpointNamespace). Get(clustermembercontroller.EtcdHostEndpointName, v1.GetOptions{}) if err != nil { diff --git a/pkg/operator/starter.go b/pkg/operator/starter.go index 74ff626f90..1ffe6d9452 100644 --- a/pkg/operator/starter.go +++ b/pkg/operator/starter.go @@ -2,6 +2,7 @@ package operator import ( "fmt" + "github.com/openshift/library-go/pkg/operator/status" "time" "github.com/openshift/cluster-etcd-operator/pkg/operator/hostetcdendpointcontroller" @@ -19,7 +20,6 @@ import ( operatorv1informers "github.com/openshift/client-go/operator/informers/externalversions" "github.com/openshift/library-go/pkg/controller/controllercmd" - "github.com/openshift/library-go/pkg/operator/status" "github.com/openshift/library-go/pkg/operator/v1helpers" "github.com/openshift/cluster-etcd-operator/pkg/operator/bootstrapteardown" @@ -28,6 +28,7 @@ import ( "github.com/openshift/cluster-etcd-operator/pkg/operator/etcdcertsigner" "github.com/openshift/cluster-etcd-operator/pkg/operator/operatorclient" "github.com/openshift/cluster-etcd-operator/pkg/operator/resourcesynccontroller" + "github.com/openshift/cluster-etcd-operator/pkg/operator/statuscontroller" ) func RunOperator(ctx *controllercmd.ControllerContext) error { @@ -96,7 +97,7 @@ func RunOperator(ctx *controllercmd.ControllerContext) error { versionRecorder.SetVersion("raw-internal", status.VersionForOperatorFromEnv()) versionRecorder.SetVersion("operator", status.VersionForOperatorFromEnv()) - clusterOperatorStatus := status.NewClusterOperatorStatusController( + clusterOperatorStatus := statuscontroller.NewClusterOperatorStatusController( "openshift-etcd", []configv1.ObjectReference{ {Group: "operator.openshift.io", Resource: "etcds", Name: "cluster"}, @@ -138,6 +139,7 @@ func RunOperator(ctx *controllercmd.ControllerContext) error { ctx.EventRecorder, etcdDiscoveryDomain, ) + operatorConfigInformers.Start(ctx.Done()) kubeInformersForNamespaces.Start(ctx.Done()) configInformers.Start(ctx.Done()) @@ -145,11 +147,11 @@ func RunOperator(ctx *controllercmd.ControllerContext) error { go etcdCertSignerController.Run(1, ctx.Done()) go hostEtcdEndpointController.Run(1, ctx.Done()) go resourceSyncController.Run(1, ctx.Done()) - go configObserver.Run(1, ctx.Done()) go clusterOperatorStatus.Run(1, ctx.Done()) + go configObserver.Run(1, ctx.Done()) go clusterMemberController.Run(ctx.Done()) go func() { - err := bootstrapteardown.TearDownBootstrap(ctx.KubeConfig, clusterMemberController) + err := bootstrapteardown.TearDownBootstrap(ctx.KubeConfig, clusterMemberController, operatorClient) if err != nil { klog.Fatalf("Error tearing down bootstrap: %#v", err) } diff --git a/pkg/operator/statuscontroller/condition.go b/pkg/operator/statuscontroller/condition.go new file mode 100644 index 0000000000..3db0621d45 --- /dev/null +++ b/pkg/operator/statuscontroller/condition.go @@ -0,0 +1,134 @@ +package statuscontroller + +import ( + "fmt" + "strings" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + configv1 "github.com/openshift/api/config/v1" + operatorv1 "github.com/openshift/api/operator/v1" +) + +// unionCondition returns a single cluster operator condition that is the union of multiple operator conditions. +func unionCondition(conditionType string, defaultConditionStatus operatorv1.ConditionStatus, allConditions ...operatorv1.OperatorCondition) configv1.ClusterOperatorStatusCondition { + return internalUnionCondition(conditionType, defaultConditionStatus, false, allConditions...) +} + +// unionInertialCondition returns a single cluster operator condition that is the union of multiple operator conditions, +// but resists returning a condition with a status opposite the defaultConditionStatus. +func unionInertialCondition(conditionType string, defaultConditionStatus operatorv1.ConditionStatus, allConditions ...operatorv1.OperatorCondition) configv1.ClusterOperatorStatusCondition { + return internalUnionCondition(conditionType, defaultConditionStatus, true, allConditions...) +} + +// internalUnionCondition returns a single cluster operator condition that is the union of multiple operator conditions. +// +// defaultConditionStatus indicates whether you want to merge all Falses or merge all Trues. For instance, Failures merge +// on true, but Available merges on false. Thing of it like an anti-default. +// +// If hasInertia, then resist returning a condition with a status opposite the defaultConditionStatus. +func internalUnionCondition(conditionType string, defaultConditionStatus operatorv1.ConditionStatus, hasInertia bool, allConditions ...operatorv1.OperatorCondition) configv1.ClusterOperatorStatusCondition { + var oppositeConditionStatus operatorv1.ConditionStatus + if defaultConditionStatus == operatorv1.ConditionTrue { + oppositeConditionStatus = operatorv1.ConditionFalse + } else { + oppositeConditionStatus = operatorv1.ConditionTrue + } + + interestingConditions := []operatorv1.OperatorCondition{} + badConditions := []operatorv1.OperatorCondition{} + for _, condition := range allConditions { + if strings.HasSuffix(condition.Type, conditionType) { + interestingConditions = append(interestingConditions, condition) + + if condition.Status == oppositeConditionStatus { + badConditions = append(badConditions, condition) + } + } + } + + unionedCondition := operatorv1.OperatorCondition{Type: conditionType, Status: operatorv1.ConditionUnknown} + if len(interestingConditions) == 0 { + unionedCondition.Status = operatorv1.ConditionUnknown + unionedCondition.Reason = "NoData" + return OperatorConditionToClusterOperatorCondition(unionedCondition) + } + + oneMinuteAgo := time.Now().Add(-1 * time.Minute) + earliestBadConditionNotOldEnough := earliestTransitionTime(badConditions).Time.After(oneMinuteAgo) + if len(badConditions) == 0 || (hasInertia && earliestBadConditionNotOldEnough) { + unionedCondition.Status = defaultConditionStatus + unionedCondition.Message = unionMessage(interestingConditions) + unionedCondition.Reason = "AsExpected" + unionedCondition.LastTransitionTime = latestTransitionTime(interestingConditions) + + return OperatorConditionToClusterOperatorCondition(unionedCondition) + } + + // at this point we have bad conditions + unionedCondition.Status = oppositeConditionStatus + unionedCondition.Message = unionMessage(badConditions) + unionedCondition.Reason = unionReason(badConditions) + unionedCondition.LastTransitionTime = latestTransitionTime(badConditions) + + return OperatorConditionToClusterOperatorCondition(unionedCondition) +} + +func latestTransitionTime(conditions []operatorv1.OperatorCondition) metav1.Time { + latestTransitionTime := metav1.Time{} + for _, condition := range conditions { + if latestTransitionTime.Before(&condition.LastTransitionTime) { + latestTransitionTime = condition.LastTransitionTime + } + } + return latestTransitionTime +} + +func earliestTransitionTime(conditions []operatorv1.OperatorCondition) metav1.Time { + earliestTransitionTime := metav1.Now() + for _, condition := range conditions { + if !earliestTransitionTime.Before(&condition.LastTransitionTime) { + earliestTransitionTime = condition.LastTransitionTime + } + } + return earliestTransitionTime +} + +func uniq(s []string) []string { + seen := make(map[string]struct{}, len(s)) + j := 0 + for _, v := range s { + if _, ok := seen[v]; ok { + continue + } + seen[v] = struct{}{} + s[j] = v + j++ + } + return s[:j] +} + +func unionMessage(conditions []operatorv1.OperatorCondition) string { + messages := []string{} + for _, condition := range conditions { + if len(condition.Message) == 0 { + continue + } + for _, message := range uniq(strings.Split(condition.Message, "\n")) { + messages = append(messages, fmt.Sprintf("%s: %s", condition.Type, message)) + } + } + return strings.Join(messages, "\n") +} + +func unionReason(conditions []operatorv1.OperatorCondition) string { + if len(conditions) == 1 { + if len(conditions[0].Reason) != 0 { + return conditions[0].Type + conditions[0].Reason + } + return conditions[0].Type + } else { + return "MultipleConditionsMatching" + } +} diff --git a/pkg/operator/statuscontroller/statuscontroller.go b/pkg/operator/statuscontroller/statuscontroller.go new file mode 100644 index 0000000000..e56145db57 --- /dev/null +++ b/pkg/operator/statuscontroller/statuscontroller.go @@ -0,0 +1,261 @@ +package statuscontroller + +import ( + "fmt" + "strings" + "time" + + "k8s.io/klog" + + "k8s.io/apimachinery/pkg/api/equality" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + + configv1 "github.com/openshift/api/config/v1" + operatorv1 "github.com/openshift/api/operator/v1" + configv1client "github.com/openshift/client-go/config/clientset/versioned/typed/config/v1" + configv1informers "github.com/openshift/client-go/config/informers/externalversions/config/v1" + configv1listers "github.com/openshift/client-go/config/listers/config/v1" + + configv1helpers "github.com/openshift/library-go/pkg/config/clusteroperator/v1helpers" + "github.com/openshift/library-go/pkg/operator/events" + "github.com/openshift/library-go/pkg/operator/resource/resourceapply" + operatorv1helpers "github.com/openshift/library-go/pkg/operator/v1helpers" +) + +// NOTE this controller is only temporary until we merge installer changes. + +var workQueueKey = "instance" + +type VersionGetter interface { + // SetVersion is a way to set the version for an operand. It must be thread-safe + SetVersion(operandName, version string) + // GetVersion is way to get the versions for all operands. It must be thread-safe and return an object that doesn't mutate + GetVersions() map[string]string + // VersionChangedChannel is a channel that will get an item whenever SetVersion has been called + VersionChangedChannel() <-chan struct{} +} + +type StatusSyncer struct { + clusterOperatorName string + relatedObjects []configv1.ObjectReference + + versionGetter VersionGetter + operatorClient operatorv1helpers.OperatorClient + clusterOperatorClient configv1client.ClusterOperatorsGetter + clusterOperatorLister configv1listers.ClusterOperatorLister + + cachesToSync []cache.InformerSynced + queue workqueue.RateLimitingInterface + eventRecorder events.Recorder +} + +func NewClusterOperatorStatusController( + name string, + relatedObjects []configv1.ObjectReference, + clusterOperatorClient configv1client.ClusterOperatorsGetter, + clusterOperatorInformer configv1informers.ClusterOperatorInformer, + operatorClient operatorv1helpers.OperatorClient, + versionGetter VersionGetter, + recorder events.Recorder, +) *StatusSyncer { + c := &StatusSyncer{ + clusterOperatorName: name, + relatedObjects: relatedObjects, + versionGetter: versionGetter, + clusterOperatorClient: clusterOperatorClient, + clusterOperatorLister: clusterOperatorInformer.Lister(), + operatorClient: operatorClient, + eventRecorder: recorder.WithComponentSuffix("status-controller"), + + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "StatusSyncer_"+strings.Replace(name, "-", "_", -1)), + } + + operatorClient.Informer().AddEventHandler(c.eventHandler()) + clusterOperatorInformer.Informer().AddEventHandler(c.eventHandler()) + + c.cachesToSync = append(c.cachesToSync, operatorClient.Informer().HasSynced) + c.cachesToSync = append(c.cachesToSync, clusterOperatorInformer.Informer().HasSynced) + + return c +} + +// sync reacts to a change in prereqs by finding information that is required to match another value in the cluster. This +// must be information that is logically "owned" by another component. +func (c StatusSyncer) sync() error { + detailedSpec, currentDetailedStatus, _, err := c.operatorClient.GetOperatorState() + if apierrors.IsNotFound(err) { + c.eventRecorder.Warningf("StatusNotFound", "Unable to determine current operator status for clusteroperator/%s", c.clusterOperatorName) + if err := c.clusterOperatorClient.ClusterOperators().Delete(c.clusterOperatorName, nil); err != nil && !apierrors.IsNotFound(err) { + return err + } + return nil + } + if err != nil { + return err + } + + originalClusterOperatorObj, err := c.clusterOperatorLister.Get(c.clusterOperatorName) + if err != nil && !apierrors.IsNotFound(err) { + c.eventRecorder.Warningf("StatusFailed", "Unable to get current operator status for clusteroperator/%s: %v", c.clusterOperatorName, err) + return err + } + + // ensure that we have a clusteroperator resource + if originalClusterOperatorObj == nil || apierrors.IsNotFound(err) { + klog.Infof("clusteroperator/%s not found", c.clusterOperatorName) + var createErr error + originalClusterOperatorObj, createErr = c.clusterOperatorClient.ClusterOperators().Create(&configv1.ClusterOperator{ + ObjectMeta: metav1.ObjectMeta{Name: c.clusterOperatorName}, + }) + if apierrors.IsNotFound(createErr) { + // this means that the API isn't present. We did not fail. Try again later + klog.Infof("ClusterOperator API not created") + c.queue.AddRateLimited(workQueueKey) + return nil + } + if createErr != nil { + c.eventRecorder.Warningf("StatusCreateFailed", "Failed to create operator status: %v", err) + return createErr + } + } + clusterOperatorObj := originalClusterOperatorObj.DeepCopy() + + clusterOperatorObj.Status.RelatedObjects = c.relatedObjects + if detailedSpec.ManagementState == operatorv1.Unmanaged { + + configv1helpers.SetStatusCondition(&clusterOperatorObj.Status.Conditions, configv1.ClusterOperatorStatusCondition{Type: configv1.OperatorAvailable, Status: configv1.ConditionTrue, Reason: "Unmanaged"}) + configv1helpers.SetStatusCondition(&clusterOperatorObj.Status.Conditions, configv1.ClusterOperatorStatusCondition{Type: configv1.OperatorProgressing, Status: configv1.ConditionFalse, Reason: "Unmanaged"}) + configv1helpers.SetStatusCondition(&clusterOperatorObj.Status.Conditions, configv1.ClusterOperatorStatusCondition{Type: configv1.OperatorDegraded, Status: configv1.ConditionFalse, Reason: "Unmanaged"}) + configv1helpers.SetStatusCondition(&clusterOperatorObj.Status.Conditions, configv1.ClusterOperatorStatusCondition{Type: configv1.OperatorUpgradeable, Status: configv1.ConditionTrue, Reason: "Unmanaged"}) + + versions := c.versionGetter.GetVersions() + for operand, version := range versions { + previousVersion := operatorv1helpers.SetOperandVersion(&clusterOperatorObj.Status.Versions, configv1.OperandVersion{Name: operand, Version: version}) + if previousVersion != version { + // having this message will give us a marker in events when the operator updated compared to when the operand is updated + c.eventRecorder.Eventf("OperatorVersionChanged", "clusteroperator/%s version %q changed from %q to %q", c.clusterOperatorName, operand, previousVersion, version) + } + } + + if equality.Semantic.DeepEqual(clusterOperatorObj, originalClusterOperatorObj) { + return nil + } + if _, updateErr := c.clusterOperatorClient.ClusterOperators().UpdateStatus(clusterOperatorObj); err != nil { + return updateErr + } + + c.eventRecorder.Eventf("OperatorStatusChanged", "Status for operator %s changed: %s", c.clusterOperatorName, configv1helpers.GetStatusDiff(originalClusterOperatorObj.Status, clusterOperatorObj.Status)) + return nil + } + + configv1helpers.SetStatusCondition(&clusterOperatorObj.Status.Conditions, unionInertialCondition("Degraded", operatorv1.ConditionFalse, currentDetailedStatus.Conditions...)) + configv1helpers.SetStatusCondition(&clusterOperatorObj.Status.Conditions, unionCondition("Progressing", operatorv1.ConditionFalse, currentDetailedStatus.Conditions...)) + configv1helpers.SetStatusCondition(&clusterOperatorObj.Status.Conditions, unionCondition("Available", operatorv1.ConditionTrue, currentDetailedStatus.Conditions...)) + configv1helpers.SetStatusCondition(&clusterOperatorObj.Status.Conditions, unionCondition("Upgradeable", operatorv1.ConditionTrue, currentDetailedStatus.Conditions...)) + + // TODO work out removal. We don't always know the existing value, so removing early seems like a bad idea. Perhaps a remove flag. + versions := c.versionGetter.GetVersions() + for operand, version := range versions { + previousVersion := operatorv1helpers.SetOperandVersion(&clusterOperatorObj.Status.Versions, configv1.OperandVersion{Name: operand, Version: version}) + if previousVersion != version { + // having this message will give us a marker in events when the operator updated compared to when the operand is updated + c.eventRecorder.Eventf("OperatorVersionChanged", "clusteroperator/%s version %q changed from %q to %q", c.clusterOperatorName, operand, previousVersion, version) + } + } + + // if we have no diff, just return + if equality.Semantic.DeepEqual(clusterOperatorObj, originalClusterOperatorObj) { + return nil + } + klog.V(2).Infof("clusteroperator/%s diff %v", c.clusterOperatorName, resourceapply.JSONPatch(originalClusterOperatorObj, clusterOperatorObj)) + + if _, updateErr := c.clusterOperatorClient.ClusterOperators().UpdateStatus(clusterOperatorObj); err != nil { + return updateErr + } + c.eventRecorder.Eventf("OperatorStatusChanged", "Status for clusteroperator/%s changed: %s", c.clusterOperatorName, configv1helpers.GetStatusDiff(originalClusterOperatorObj.Status, clusterOperatorObj.Status)) + return nil +} + +func OperatorConditionToClusterOperatorCondition(condition operatorv1.OperatorCondition) configv1.ClusterOperatorStatusCondition { + return configv1.ClusterOperatorStatusCondition{ + Type: configv1.ClusterStatusConditionType(condition.Type), + Status: configv1.ConditionStatus(condition.Status), + LastTransitionTime: condition.LastTransitionTime, + Reason: condition.Reason, + Message: condition.Message, + } +} + +func (c *StatusSyncer) Run(workers int, stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + defer c.queue.ShutDown() + + klog.Infof("Starting StatusSyncer-" + c.clusterOperatorName) + defer klog.Infof("Shutting down StatusSyncer-" + c.clusterOperatorName) + if !cache.WaitForCacheSync(stopCh, c.cachesToSync...) { + return + } + + // start watching for version changes + go c.watchVersionGetter(stopCh) + + // doesn't matter what workers say, only start one. + go wait.Until(c.runWorker, time.Second, stopCh) + + <-stopCh +} + +func (c *StatusSyncer) watchVersionGetter(stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + + versionCh := c.versionGetter.VersionChangedChannel() + // always kick at least once + c.queue.Add(workQueueKey) + + for { + select { + case <-stopCh: + return + case <-versionCh: + c.queue.Add(workQueueKey) + } + } +} + +func (c *StatusSyncer) runWorker() { + for c.processNextWorkItem() { + } +} + +func (c *StatusSyncer) processNextWorkItem() bool { + dsKey, quit := c.queue.Get() + if quit { + return false + } + defer c.queue.Done(dsKey) + + err := c.sync() + if err == nil { + c.queue.Forget(dsKey) + return true + } + + utilruntime.HandleError(fmt.Errorf("%v failed with : %v", dsKey, err)) + c.queue.AddRateLimited(dsKey) + + return true +} + +// eventHandler queues the operator to check spec and status +func (c *StatusSyncer) eventHandler() cache.ResourceEventHandler { + return cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { c.queue.Add(workQueueKey) }, + UpdateFunc: func(old, new interface{}) { c.queue.Add(workQueueKey) }, + DeleteFunc: func(obj interface{}) { c.queue.Add(workQueueKey) }, + } +}