diff --git a/cmd/machine-config-controller/start.go b/cmd/machine-config-controller/start.go index 324127f387..07bf828a64 100644 --- a/cmd/machine-config-controller/start.go +++ b/cmd/machine-config-controller/start.go @@ -135,6 +135,7 @@ func createControllers(ctx *ctrlcommon.ControllerContext) []ctrlcommon.Controlle ), // The node controller consumes data written by the above node.New( + ctx.InformerFactory.Machineconfiguration().V1().ControllerConfigs(), ctx.InformerFactory.Machineconfiguration().V1().MachineConfigPools(), ctx.KubeInformerFactory.Core().V1().Nodes(), ctx.ConfigInformerFactory.Config().V1().Schedulers(), diff --git a/manifests/controllerconfig.crd.yaml b/manifests/controllerconfig.crd.yaml index 038eac257a..59c8492a92 100644 --- a/manifests/controllerconfig.crd.yaml +++ b/manifests/controllerconfig.crd.yaml @@ -175,6 +175,14 @@ spec: like the web console to tell users where to find the Kubernetes API. type: string + controlPlaneTopology: + description: controlPlaneTopology expresses the expectations for + operands that normally run on control nodes. The default is + HighlyAvailable, which represents the behavior operators have + in a normal cluster. The SingleReplica mode will be used in + single-node deployments and the operators should not configure + the operand for highly-available operation. + type: string etcdDiscoveryDomain: description: 'etcdDiscoveryDomain is the domain used to fetch the SRV records for discovering etcd servers and clients. diff --git a/pkg/controller/node/node_controller.go b/pkg/controller/node/node_controller.go index 30b431357e..37b6b030eb 100644 --- a/pkg/controller/node/node_controller.go +++ b/pkg/controller/node/node_controller.go @@ -80,9 +80,11 @@ type Controller struct { syncHandler func(mcp string) error enqueueMachineConfigPool func(*mcfgv1.MachineConfigPool) + ccLister mcfglistersv1.ControllerConfigLister mcpLister mcfglistersv1.MachineConfigPoolLister nodeLister corelisterv1.NodeLister + ccListerSynced cache.InformerSynced mcpListerSynced cache.InformerSynced nodeListerSynced cache.InformerSynced @@ -94,6 +96,7 @@ type Controller struct { // New returns a new node controller. func New( + ccInformer mcfginformersv1.ControllerConfigInformer, mcpInformer mcfginformersv1.MachineConfigPoolInformer, nodeInformer coreinformersv1.NodeInformer, schedulerInformer cligoinformersv1.SchedulerInformer, @@ -129,8 +132,10 @@ func New( ctrl.syncHandler = ctrl.syncMachineConfigPool ctrl.enqueueMachineConfigPool = ctrl.enqueueDefault + ctrl.ccLister = ccInformer.Lister() ctrl.mcpLister = mcpInformer.Lister() ctrl.nodeLister = nodeInformer.Lister() + ctrl.ccListerSynced = ccInformer.Informer().HasSynced ctrl.mcpListerSynced = mcpInformer.Informer().HasSynced ctrl.nodeListerSynced = nodeInformer.Informer().HasSynced @@ -145,7 +150,7 @@ func (ctrl *Controller) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() defer ctrl.queue.ShutDown() - if !cache.WaitForCacheSync(stopCh, ctrl.mcpListerSynced, ctrl.nodeListerSynced, ctrl.schedulerListerSynced) { + if !cache.WaitForCacheSync(stopCh, ctrl.ccListerSynced, ctrl.mcpListerSynced, ctrl.nodeListerSynced, ctrl.schedulerListerSynced) { return } @@ -758,6 +763,10 @@ func (ctrl *Controller) syncMachineConfigPool(key string) error { return err } + if err := ctrl.setClusterConfigAnnotation(nodes); err != nil { + return goerrs.Wrapf(err, "error setting clusterConfig Annotation for node in pool %q, error: %v", pool.Name, err) + } + candidates, capacity := getAllCandidateMachines(pool, nodes, maxunavail) if len(candidates) > 0 { ctrl.logPool(pool, "%d candidate nodes for update, capacity: %d", len(candidates), capacity) @@ -800,6 +809,30 @@ func (ctrl *Controller) getNodesForPool(pool *mcfgv1.MachineConfigPool) ([]*core return nodes, nil } +// setClusterConfigAnnotation reads cluster configs set into controllerConfig +// and add/updates required annotation to node such as ControlPlaneTopology +// from infrastructure object. +func (ctrl *Controller) setClusterConfigAnnotation(nodes []*corev1.Node) error { + cc, err := ctrl.ccLister.Get(ctrlcommon.ControllerConfigName) + if err != nil { + return err + } + + for _, node := range nodes { + if node.Annotations[daemonconsts.ClusterControlPlaneTopologyAnnotationKey] != string(cc.Spec.Infra.Status.ControlPlaneTopology) { + oldAnn := node.Annotations[daemonconsts.ClusterControlPlaneTopologyAnnotationKey] + _, err := internal.UpdateNodeRetry(ctrl.kubeClient.CoreV1().Nodes(), ctrl.nodeLister, node.Name, func(node *corev1.Node) { + node.Annotations[daemonconsts.ClusterControlPlaneTopologyAnnotationKey] = string(cc.Spec.Infra.Status.ControlPlaneTopology) + }) + if err != nil { + return err + } + glog.Infof("Updated controlPlaneTopology annotation of node %s from %s to %s", node.Name, oldAnn, node.Annotations[daemonconsts.ClusterControlPlaneTopologyAnnotationKey]) + } + } + return nil +} + func (ctrl *Controller) setDesiredMachineConfigAnnotation(nodeName, currentConfig string) error { return clientretry.RetryOnConflict(nodeUpdateBackoff, func() error { oldNode, err := ctrl.kubeClient.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) @@ -815,6 +848,7 @@ func (ctrl *Controller) setDesiredMachineConfigAnnotation(nodeName, currentConfi if newNode.Annotations == nil { newNode.Annotations = map[string]string{} } + if newNode.Annotations[daemonconsts.DesiredMachineConfigAnnotationKey] == currentConfig { return nil } diff --git a/pkg/controller/node/node_controller_test.go b/pkg/controller/node/node_controller_test.go index c9c6e97405..b3f5258adc 100644 --- a/pkg/controller/node/node_controller_test.go +++ b/pkg/controller/node/node_controller_test.go @@ -22,14 +22,18 @@ import ( "k8s.io/client-go/tools/record" apicfgv1 "github.com/openshift/api/config/v1" + configv1 "github.com/openshift/api/config/v1" fakeconfigv1client "github.com/openshift/client-go/config/clientset/versioned/fake" configv1informer "github.com/openshift/client-go/config/informers/externalversions" mcfgv1 "github.com/openshift/machine-config-operator/pkg/apis/machineconfiguration.openshift.io/v1" + ctrlcommon "github.com/openshift/machine-config-operator/pkg/controller/common" daemonconsts "github.com/openshift/machine-config-operator/pkg/daemon/constants" "github.com/openshift/machine-config-operator/pkg/generated/clientset/versioned/fake" informers "github.com/openshift/machine-config-operator/pkg/generated/informers/externalversions" + "github.com/openshift/machine-config-operator/pkg/version" "github.com/openshift/machine-config-operator/test/helpers" "github.com/stretchr/testify/assert" + utilrand "k8s.io/apimachinery/pkg/util/rand" ) var ( @@ -44,6 +48,7 @@ type fixture struct { kubeclient *k8sfake.Clientset schedulerClient *fakeconfigv1client.Clientset + ccLister []*mcfgv1.ControllerConfig mcpLister []*mcfgv1.MachineConfigPool nodeLister []*corev1.Node @@ -72,9 +77,10 @@ func (f *fixture) newController() *Controller { i := informers.NewSharedInformerFactory(f.client, noResyncPeriodFunc()) k8sI := kubeinformers.NewSharedInformerFactory(f.kubeclient, noResyncPeriodFunc()) ci := configv1informer.NewSharedInformerFactory(f.schedulerClient, noResyncPeriodFunc()) - c := New(i.Machineconfiguration().V1().MachineConfigPools(), k8sI.Core().V1().Nodes(), + c := New(i.Machineconfiguration().V1().ControllerConfigs(), i.Machineconfiguration().V1().MachineConfigPools(), k8sI.Core().V1().Nodes(), ci.Config().V1().Schedulers(), f.kubeclient, f.client) + c.ccListerSynced = alwaysReady c.mcpListerSynced = alwaysReady c.nodeListerSynced = alwaysReady c.schedulerListerSynced = alwaysReady @@ -87,6 +93,9 @@ func (f *fixture) newController() *Controller { k8sI.Start(stopCh) k8sI.WaitForCacheSync(stopCh) + for _, c := range f.ccLister { + i.Machineconfiguration().V1().ControllerConfigs().Informer().GetIndexer().Add(c) + } for _, c := range f.mcpLister { i.Machineconfiguration().V1().MachineConfigPools().Informer().GetIndexer().Add(c) } @@ -97,6 +106,7 @@ func (f *fixture) newController() *Controller { for _, c := range f.schedulerLister { ci.Config().V1().Schedulers().Informer().GetIndexer().Add(c) } + return c } @@ -148,6 +158,20 @@ func (f *fixture) runController(pool string, expectError bool) { } } +func newControllerConfig(name string, topology configv1.TopologyMode) *mcfgv1.ControllerConfig { + return &mcfgv1.ControllerConfig{ + TypeMeta: metav1.TypeMeta{APIVersion: mcfgv1.SchemeGroupVersion.String()}, + ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{daemonconsts.GeneratedByVersionAnnotationKey: version.Raw}, Name: name, UID: types.UID(utilrand.String(5))}, + Spec: mcfgv1.ControllerConfigSpec{ + Infra: &configv1.Infrastructure{ + Status: configv1.InfrastructureStatus{ + ControlPlaneTopology: topology, + }, + }, + }, + } +} + // checkAction verifies that expected and actual actions are equal and both have // same attached resources func checkAction(expected, actual core.Action, t *testing.T) { @@ -188,6 +212,8 @@ func filterInformerActions(actions []core.Action) []core.Action { if len(action.GetNamespace()) == 0 && (action.Matches("list", "machineconfigpools") || action.Matches("watch", "machineconfigpools") || + action.Matches("list", "controllerconfigs") || + action.Matches("watch", "controllerconfigs") || action.Matches("list", "nodes") || action.Matches("watch", "nodes")) { continue @@ -758,14 +784,17 @@ func TestSetDesiredMachineConfigAnnotation(t *testing.T) { func TestShouldMakeProgress(t *testing.T) { f := newFixture(t) + cc := newControllerConfig(ctrlcommon.ControllerConfigName, configv1.TopologyMode("")) mcp := helpers.NewMachineConfigPool("test-cluster-infra", nil, helpers.InfraSelector, "v1") mcpWorker := helpers.NewMachineConfigPool("worker", nil, helpers.WorkerSelector, "v1") mcp.Spec.MaxUnavailable = intStrPtr(intstr.FromInt(1)) + nodes := []*corev1.Node{ newNodeWithLabel("node-0", "v1", "v1", map[string]string{"node-role/worker": "", "node-role/infra": ""}), newNodeWithLabel("node-1", "v0", "v0", map[string]string{"node-role/worker": "", "node-role/infra": ""}), } + f.ccLister = append(f.ccLister, cc) f.mcpLister = append(f.mcpLister, mcp, mcpWorker) f.objects = append(f.objects, mcp, mcpWorker) f.nodeLister = append(f.nodeLister, nodes...) @@ -799,8 +828,11 @@ func TestShouldMakeProgress(t *testing.T) { func TestEmptyCurrentMachineConfig(t *testing.T) { f := newFixture(t) + cc := newControllerConfig(ctrlcommon.ControllerConfigName, configv1.TopologyMode("")) mcp := helpers.NewMachineConfigPool("test-cluster-master", nil, helpers.MasterSelector, "") mcp.Spec.MaxUnavailable = intStrPtr(intstr.FromInt(1)) + + f.ccLister = append(f.ccLister, cc) f.mcpLister = append(f.mcpLister, mcp) f.objects = append(f.objects, mcp) f.run(getKey(mcp, t)) @@ -834,6 +866,7 @@ func TestPaused(t *testing.T) { func TestShouldUpdateStatusOnlyUpdated(t *testing.T) { f := newFixture(t) + cc := newControllerConfig(ctrlcommon.ControllerConfigName, configv1.TopologyMode("")) mcp := helpers.NewMachineConfigPool("test-cluster-infra", nil, helpers.InfraSelector, "v1") mcpWorker := helpers.NewMachineConfigPool("worker", nil, helpers.WorkerSelector, "v1") mcp.Spec.MaxUnavailable = intStrPtr(intstr.FromInt(1)) @@ -842,6 +875,7 @@ func TestShouldUpdateStatusOnlyUpdated(t *testing.T) { newNodeWithLabel("node-1", "v1", "v1", map[string]string{"node-role/worker": "", "node-role/infra": ""}), } + f.ccLister = append(f.ccLister, cc) f.mcpLister = append(f.mcpLister, mcp, mcpWorker) f.objects = append(f.objects, mcp, mcpWorker) f.nodeLister = append(f.nodeLister, nodes...) @@ -859,6 +893,7 @@ func TestShouldUpdateStatusOnlyUpdated(t *testing.T) { func TestShouldUpdateStatusOnlyNoProgress(t *testing.T) { f := newFixture(t) + cc := newControllerConfig(ctrlcommon.ControllerConfigName, configv1.TopologyMode("")) mcp := helpers.NewMachineConfigPool("test-cluster-infra", nil, helpers.InfraSelector, "v1") mcpWorker := helpers.NewMachineConfigPool("worker", nil, helpers.WorkerSelector, "v1") mcp.Spec.MaxUnavailable = intStrPtr(intstr.FromInt(1)) @@ -867,6 +902,7 @@ func TestShouldUpdateStatusOnlyNoProgress(t *testing.T) { newNodeWithLabel("node-1", "v0", "v1", map[string]string{"node-role/worker": "", "node-role/infra": ""}), } + f.ccLister = append(f.ccLister, cc) f.mcpLister = append(f.mcpLister, mcp, mcpWorker) f.objects = append(f.objects, mcp, mcpWorker) f.nodeLister = append(f.nodeLister, nodes...) @@ -884,6 +920,7 @@ func TestShouldUpdateStatusOnlyNoProgress(t *testing.T) { func TestShouldDoNothing(t *testing.T) { f := newFixture(t) + cc := newControllerConfig(ctrlcommon.ControllerConfigName, configv1.TopologyMode("")) mcp := helpers.NewMachineConfigPool("test-cluster-infra", nil, helpers.InfraSelector, "v1") mcpWorker := helpers.NewMachineConfigPool("worker", nil, helpers.WorkerSelector, "v1") mcp.Spec.MaxUnavailable = intStrPtr(intstr.FromInt(1)) @@ -894,6 +931,7 @@ func TestShouldDoNothing(t *testing.T) { status := calculateStatus(mcp, nodes) mcp.Status = status + f.ccLister = append(f.ccLister, cc) f.mcpLister = append(f.mcpLister, mcp, mcpWorker) f.objects = append(f.objects, mcp, mcpWorker) f.nodeLister = append(f.nodeLister, nodes...) @@ -904,6 +942,46 @@ func TestShouldDoNothing(t *testing.T) { f.run(getKey(mcp, t)) } +func TestControlPlaneTopology(t *testing.T) { + f := newFixture(t) + cc := newControllerConfig(ctrlcommon.ControllerConfigName, configv1.SingleReplicaTopologyMode) + mcp := helpers.NewMachineConfigPool("test-cluster-infra", nil, helpers.InfraSelector, "v1") + mcpWorker := helpers.NewMachineConfigPool("worker", nil, helpers.WorkerSelector, "v1") + mcp.Spec.MaxUnavailable = intStrPtr(intstr.FromInt(1)) + annotations := map[string]string{daemonconsts.ClusterControlPlaneTopologyAnnotationKey: "SingleReplica"} + + nodes := []*corev1.Node{ + newNodeWithLabel("node-0", "v1", "v1", map[string]string{"node-role/worker": "", "node-role/infra": ""}), + newNodeWithLabel("node-1", "v1", "v1", map[string]string{"node-role/worker": "", "node-role/infra": ""}), + } + + for _, node := range nodes { + addNodeAnnotations(node, annotations) + } + status := calculateStatus(mcp, nodes) + mcp.Status = status + + f.ccLister = append(f.ccLister, cc) + f.mcpLister = append(f.mcpLister, mcp, mcpWorker) + f.objects = append(f.objects, mcp, mcpWorker) + f.nodeLister = append(f.nodeLister, nodes...) + for idx := range nodes { + f.kubeobjects = append(f.kubeobjects, nodes[idx]) + } + + f.run(getKey(mcp, t)) +} + +// adds annotation to the node +func addNodeAnnotations(node *corev1.Node, annotations map[string]string) { + if node.Annotations == nil { + node.Annotations = map[string]string{} + } + for k, v := range annotations { + node.Annotations[k] = v + } +} + func getKey(config *mcfgv1.MachineConfigPool, t *testing.T) string { key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(config) if err != nil { diff --git a/pkg/daemon/constants/constants.go b/pkg/daemon/constants/constants.go index d0de172b9a..9ac01dca50 100644 --- a/pkg/daemon/constants/constants.go +++ b/pkg/daemon/constants/constants.go @@ -14,6 +14,9 @@ const ( DesiredMachineConfigAnnotationKey = "machineconfiguration.openshift.io/desiredConfig" // MachineConfigDaemonStateAnnotationKey is used to fetch the state of the daemon on the machine. MachineConfigDaemonStateAnnotationKey = "machineconfiguration.openshift.io/state" + // ClusterControlPlaneTopologyAnnotationKey is set by the node controller by reading value from + // controllerConfig. MCD uses the annotation value to decide drain action on the node. + ClusterControlPlaneTopologyAnnotationKey = "machineconfiguration.openshift.io/controlPlaneTopology" // OpenShiftOperatorManagedLabel is used to filter out kube objects that don't need to be synced by the MCO OpenShiftOperatorManagedLabel = "openshift.io/operator-managed" diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index 18d50ee257..c5a2c7467f 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -39,6 +39,7 @@ import ( "k8s.io/client-go/util/workqueue" "k8s.io/kubectl/pkg/drain" + configv1 "github.com/openshift/api/config/v1" "github.com/openshift/machine-config-operator/lib/resourceread" mcfgv1 "github.com/openshift/machine-config-operator/pkg/apis/machineconfiguration.openshift.io/v1" ctrlcommon "github.com/openshift/machine-config-operator/pkg/controller/common" @@ -1010,7 +1011,7 @@ func (dn *Daemon) checkStateOnFirstRun() error { // take a stab at that and re-run the drain+reboot routine if state.pendingConfig != nil && bootID == dn.bootID { dn.logSystem("drain interrupted, retrying") - if err := dn.drain(); err != nil { + if err := dn.performDrain(); err != nil { return err } if err := dn.finalizeBeforeReboot(state.pendingConfig); err != nil { @@ -1150,7 +1151,7 @@ func (dn *Daemon) updateConfigAndState(state *stateAndConfigs) (bool, error) { // Great, we've successfully rebooted for the desired config, // let's mark it done! glog.Infof("Completing pending config %s", state.pendingConfig.GetName()) - if err := dn.completeUpdate(dn.node, state.pendingConfig.GetName()); err != nil { + if err := dn.completeUpdate(state.pendingConfig.GetName()); err != nil { MCDUpdateState.WithLabelValues("", err.Error()).SetToCurrentTime() return inDesiredConfig, err } @@ -1282,8 +1283,8 @@ func (dn *Daemon) prepUpdateFromCluster() (*mcfgv1.MachineConfig, *mcfgv1.Machin // completeUpdate marks the node as schedulable again, then deletes the // "transient state" file, which signifies that all of those prior steps have // been completed. -func (dn *Daemon) completeUpdate(node *corev1.Node, desiredConfigName string) error { - if err := drain.RunCordonOrUncordon(dn.drainer, node, false); err != nil { +func (dn *Daemon) completeUpdate(desiredConfigName string) error { + if err := dn.cordonOrUncordonNode(false); err != nil { return err } @@ -1617,3 +1618,23 @@ func (dn *Daemon) senseAndLoadOnceFrom(onceFrom string) (interface{}, onceFromOr return nil, onceFromUnknownConfig, fmt.Errorf("unable to decipher onceFrom config type: %v", err) } + +func isSingleNodeTopology(topology configv1.TopologyMode) bool { + return topology == configv1.SingleReplicaTopologyMode +} + +// getControlPlaneTopology reads from node annotation and returns +// controlPlaneTopology value set in the cluster. If annotation value +// is unrecognized, we consider it as a highly available cluster. +func (dn *Daemon) getControlPlaneTopology() configv1.TopologyMode { + controlPlaneTopology := dn.node.Annotations[constants.ClusterControlPlaneTopologyAnnotationKey] + switch configv1.TopologyMode(controlPlaneTopology) { + case configv1.SingleReplicaTopologyMode: + return configv1.SingleReplicaTopologyMode + case configv1.HighlyAvailableTopologyMode: + return configv1.HighlyAvailableTopologyMode + default: + // for any unhandled case, default to HighlyAvailableTopologyMode + return configv1.HighlyAvailableTopologyMode + } +} diff --git a/pkg/daemon/drain.go b/pkg/daemon/drain.go new file mode 100644 index 0000000000..fb494dbcd6 --- /dev/null +++ b/pkg/daemon/drain.go @@ -0,0 +1,113 @@ +package daemon + +import ( + "fmt" + "time" + + "github.com/golang/glog" + "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/kubectl/pkg/drain" +) + +func (dn *Daemon) drainRequired() bool { + // Drain operation is not useful on a single node cluster as there + // is no other node in the cluster where workload with PDB set + // can be rescheduled. It can lead to node being stuck at drain indefinitely. + // These clusters can take advantage of graceful node shutdown feature. + // https://kubernetes.io/docs/concepts/architecture/nodes/#graceful-node-shutdown + return !isSingleNodeTopology(dn.getControlPlaneTopology()) +} + +func (dn *Daemon) cordonOrUncordonNode(desired bool) error { + backoff := wait.Backoff{ + Steps: 5, + Duration: 10 * time.Second, + Factor: 2, + } + var lastErr error + if err := wait.ExponentialBackoff(backoff, func() (bool, error) { + err := drain.RunCordonOrUncordon(dn.drainer, dn.node, desired) + if err != nil { + lastErr = err + glog.Infof("cordon/uncordon failed with: %v, retrying", err) + return false, nil + } + return true, nil + }); err != nil { + if err == wait.ErrWaitTimeout { + return errors.Wrapf(lastErr, "failed to cordon/uncordon node (%d tries): %v", backoff.Steps, err) + } + return errors.Wrap(err, "failed to cordon/uncordon node") + } + return nil +} + +func (dn *Daemon) drain() error { + backoff := wait.Backoff{ + Steps: 5, + Duration: 10 * time.Second, + Factor: 2, + } + var lastErr error + if err := wait.ExponentialBackoff(backoff, func() (bool, error) { + err := drain.RunNodeDrain(dn.drainer, dn.node.Name) + if err != nil { + lastErr = err + glog.Infof("Draining failed with: %v, retrying", err) + return false, nil + } + return true, nil + + }); err != nil { + if err == wait.ErrWaitTimeout { + failMsg := fmt.Sprintf("%d tries: %v", backoff.Steps, lastErr) + MCDDrainErr.WithLabelValues(dn.node.Name, "WaitTimeout").Set(float64(backoff.Steps)) + dn.recorder.Eventf(getNodeRef(dn.node), corev1.EventTypeWarning, "FailedToDrain", failMsg) + return errors.Wrapf(lastErr, "failed to drain node (%d tries): %v", backoff.Steps, err) + } + MCDDrainErr.WithLabelValues(dn.node.Name, "UnknownError").Set(float64(backoff.Steps)) + dn.recorder.Eventf(getNodeRef(dn.node), corev1.EventTypeWarning, "FailedToDrain", err.Error()) + return errors.Wrap(err, "failed to drain node") + } + + return nil +} + +func (dn *Daemon) performDrain() error { + // Skip drain process when we're not cluster driven + if dn.kubeClient == nil { + return nil + } + + if err := dn.cordonOrUncordonNode(true); err != nil { + return err + } + dn.logSystem("Node has been successfully cordoned") + dn.recorder.Eventf(getNodeRef(dn.node), corev1.EventTypeNormal, "Cordon", "Cordoned node to apply update") + + if !dn.drainRequired() { + dn.logSystem("Drain not required, skipping") + dn.recorder.Eventf(getNodeRef(dn.node), corev1.EventTypeNormal, "Drain", "Drain not required, skipping") + return nil + } + + // We are here, that means we need to cordon and drain node + MCDDrainErr.WithLabelValues(dn.node.Name, "").Set(0) + dn.logSystem("Update prepared; beginning drain") + startTime := time.Now() + + dn.recorder.Eventf(getNodeRef(dn.node), corev1.EventTypeNormal, "Drain", "Draining node to update config.") + + if err := dn.drain(); err != nil { + return err + } + + dn.logSystem("drain complete") + t := time.Since(startTime).Seconds() + glog.Infof("Successful drain took %v seconds", t) + MCDDrainErr.WithLabelValues(dn.node.Name, "").Set(0) + + return nil +} diff --git a/pkg/daemon/update.go b/pkg/daemon/update.go index 7b8eab3c5f..f5225f42c0 100644 --- a/pkg/daemon/update.go +++ b/pkg/daemon/update.go @@ -25,8 +25,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/uuid" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/kubectl/pkg/drain" mcfgv1 "github.com/openshift/machine-config-operator/pkg/apis/machineconfiguration.openshift.io/v1" ctrlcommon "github.com/openshift/machine-config-operator/pkg/controller/common" @@ -185,58 +183,6 @@ func (dn *Daemon) finalizeBeforeReboot(newConfig *mcfgv1.MachineConfig) (retErr return nil } -func (dn *Daemon) drain() error { - // Skip draining of the node when we're not cluster driven - if dn.kubeClient == nil { - return nil - } - MCDDrainErr.WithLabelValues(dn.node.Name, "").Set(0) - - dn.logSystem("Update prepared; beginning drain") - startTime := time.Now() - - dn.recorder.Eventf(getNodeRef(dn.node), corev1.EventTypeNormal, "Drain", "Draining node to update config.") - - backoff := wait.Backoff{ - Steps: 5, - Duration: 10 * time.Second, - Factor: 2, - } - var lastErr error - if err := wait.ExponentialBackoff(backoff, func() (bool, error) { - err := drain.RunCordonOrUncordon(dn.drainer, dn.node, true) - if err != nil { - lastErr = err - glog.Infof("Cordon failed with: %v, retrying", err) - return false, nil - } - err = drain.RunNodeDrain(dn.drainer, dn.node.Name) - if err == nil { - return true, nil - } - lastErr = err - glog.Infof("Draining failed with: %v, retrying", err) - return false, nil - }); err != nil { - if err == wait.ErrWaitTimeout { - failMsg := fmt.Sprintf("%d tries: %v", backoff.Steps, lastErr) - MCDDrainErr.WithLabelValues(dn.node.Name, "WaitTimeout").Set(float64(backoff.Steps)) - dn.recorder.Eventf(getNodeRef(dn.node), corev1.EventTypeWarning, "FailedToDrain", failMsg) - return errors.Wrapf(lastErr, "failed to drain node (%d tries): %v", backoff.Steps, err) - } - MCDDrainErr.WithLabelValues(dn.node.Name, "UnknownError").Set(float64(backoff.Steps)) - dn.recorder.Eventf(getNodeRef(dn.node), corev1.EventTypeWarning, "FailedToDrain", err.Error()) - return errors.Wrap(err, "failed to drain node") - } - - dn.logSystem("drain complete") - t := time.Since(startTime).Seconds() - glog.Infof("Successful drain took %v seconds", t) - MCDDrainErr.WithLabelValues(dn.node.Name, "").Set(0) - - return nil -} - var errUnreconcilable = errors.New("unreconcilable") func canonicalizeEmptyMC(config *mcfgv1.MachineConfig) *mcfgv1.MachineConfig { @@ -621,7 +567,7 @@ func (dn *Daemon) update(oldConfig, newConfig *mcfgv1.MachineConfig) (retErr err // Drain if we need to reboot or reload crio configuration if ctrlcommon.InSlice(postConfigChangeActionReboot, actions) || ctrlcommon.InSlice(postConfigChangeActionReloadCrio, actions) { - if err := dn.drain(); err != nil { + if err := dn.performDrain(); err != nil { return err } } else { diff --git a/pkg/operator/assets/bindata.go b/pkg/operator/assets/bindata.go index 5501ae7969..b726ef2e73 100644 --- a/pkg/operator/assets/bindata.go +++ b/pkg/operator/assets/bindata.go @@ -343,6 +343,14 @@ spec: like the web console to tell users where to find the Kubernetes API. type: string + controlPlaneTopology: + description: controlPlaneTopology expresses the expectations for + operands that normally run on control nodes. The default is + HighlyAvailable, which represents the behavior operators have + in a normal cluster. The SingleReplica mode will be used in + single-node deployments and the operators should not configure + the operand for highly-available operation. + type: string etcdDiscoveryDomain: description: 'etcdDiscoveryDomain is the domain used to fetch the SRV records for discovering etcd servers and clients.