diff --git a/cmd/sriov-network-config-daemon/start.go b/cmd/sriov-network-config-daemon/start.go index c8dd875944..92742e5609 100644 --- a/cmd/sriov-network-config-daemon/start.go +++ b/cmd/sriov-network-config-daemon/start.go @@ -23,6 +23,9 @@ import ( "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/util/connrotation" + + mcfgv1 "github.com/openshift/machine-config-operator/pkg/apis/machineconfiguration.openshift.io/v1" + mcclientset "github.com/openshift/machine-config-operator/pkg/generated/clientset/versioned" ) var ( @@ -124,9 +127,11 @@ func runStartCmd(cmd *cobra.Command, args []string) { } sriovnetworkv1.AddToScheme(scheme.Scheme) + mcfgv1.AddToScheme(scheme.Scheme) snclient := snclientset.NewForConfigOrDie(config) kubeclient := kubernetes.NewForConfigOrDie(config) + mcclient := mcclientset.NewForConfigOrDie(config) config.Timeout = 5 * time.Second writerclient := snclientset.NewForConfigOrDie(config) @@ -161,6 +166,7 @@ func runStartCmd(cmd *cobra.Command, args []string) { startOpts.nodeName, snclient, kubeclient, + mcclient, exitCh, stopCh, syncCh, diff --git a/deploy/clusterrole.yaml b/deploy/clusterrole.yaml index b7aba3e22c..77869f52c0 100644 --- a/deploy/clusterrole.yaml +++ b/deploy/clusterrole.yaml @@ -48,3 +48,6 @@ rules: - apiGroups: [""] resources: ["pods/eviction"] verbs: ["create"] +- apiGroups: ["machineconfiguration.openshift.io"] + resources: ["*"] + verbs: ["*"] diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index 7815f89791..0d534f1f2d 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -40,6 +40,9 @@ import ( snclientset "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/client/clientset/versioned" sninformer "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/client/informers/externalversions" "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/utils" + mcfgv1 "github.com/openshift/machine-config-operator/pkg/apis/machineconfiguration.openshift.io/v1" + mcclientset "github.com/openshift/machine-config-operator/pkg/generated/clientset/versioned" + mcfginformers "github.com/openshift/machine-config-operator/pkg/generated/informers/externalversions" ) const ( @@ -69,6 +72,8 @@ type Daemon struct { // kubeClient allows interaction with Kubernetes, including the node we are running on. kubeClient *kubernetes.Clientset + mcClient *mcclientset.Clientset + nodeState *sriovnetworkv1.SriovNetworkNodeState LoadedPlugins map[string]VendorPlugin @@ -98,6 +103,8 @@ type Daemon struct { nodeLister listerv1.NodeLister workqueue workqueue.RateLimitingInterface + + mcpName string } type workItem struct { @@ -110,6 +117,7 @@ const ( annoKey = "sriovnetwork.openshift.io/state" annoIdle = "Idle" annoDraining = "Draining" + annoMcpPaused = "Draining_MCP_Paused" ) var namespace = os.Getenv("NAMESPACE") @@ -130,6 +138,7 @@ func New( nodeName string, client snclientset.Interface, kubeClient *kubernetes.Clientset, + mcClient *mcclientset.Clientset, exitCh chan<- error, stopCh <-chan struct{}, syncCh <-chan struct{}, @@ -141,11 +150,11 @@ func New( platform: platformType, client: client, kubeClient: kubeClient, + mcClient: mcClient, exitCh: exitCh, stopCh: stopCh, syncCh: syncCh, refreshCh: refreshCh, - drainable: true, nodeState: &sriovnetworkv1.SriovNetworkNodeState{}, drainer: &drain.Helper{ Client: kubeClient, @@ -413,13 +422,11 @@ func (dn *Daemon) nodeUpdateHandler(old, new interface{}) { return } for _, node := range nodes { - if node.GetName() != dn.name && node.Annotations[annoKey] == annoDraining { - glog.V(2).Infof("nodeUpdateHandler(): node %s is draining", node.Name) + if node.GetName() != dn.name && (node.Annotations[annoKey] == annoDraining || node.Annotations[annoKey] == annoMcpPaused) { dn.drainable = false return } } - glog.V(2).Infof("nodeUpdateHandler(): no other node is draining") dn.drainable = true } @@ -522,6 +529,10 @@ func (dn *Daemon) nodeStateSyncHandler(generation int64) error { } return nil } + if err = dn.getNodeMachinePool(); err != nil { + return err + } + if reqDrain && !dn.disableDrain { glog.Info("nodeStateSyncHandler(): drain node") if err := dn.drainNode(dn.name); err != nil { @@ -553,7 +564,7 @@ func (dn *Daemon) nodeStateSyncHandler(generation int64) error { } } - if anno, ok := dn.node.Annotations[annoKey]; ok && anno == annoDraining { + if anno, ok := dn.node.Annotations[annoKey]; ok && (anno == annoDraining || anno == annoMcpPaused) { if err := dn.completeDrain(); err != nil { glog.Errorf("nodeStateSyncHandler(): failed to complete draining: %v", err) return err @@ -580,8 +591,17 @@ func (dn *Daemon) completeDrain() error { return err } + if utils.ClusterType == utils.ClusterTypeOpenshift { + glog.Infof("completeDrain(): resume MCP %s", dn.mcpName) + pausePatch := []byte("{\"spec\":{\"paused\":false}}") + if _, err := dn.mcClient.MachineconfigurationV1().MachineConfigPools().Patch(context.Background(), dn.mcpName, types.MergePatchType, pausePatch, metav1.PatchOptions{}); err != nil { + glog.Errorf("completeDrain(): failed to resume MCP %s: %v", dn.mcpName, err) + return err + } + } + if err := dn.annotateNode(dn.name, annoIdle); err != nil { - glog.Errorf("drainNode(): failed to annotate node: %v", err) + glog.Errorf("completeDrain(): failed to annotate node: %v", err) return err } return nil @@ -747,21 +767,127 @@ func (dn *Daemon) annotateNode(node, value string) error { return nil } +func (dn *Daemon) getNodeMachinePool() error { + mcpList, err := dn.mcClient.MachineconfigurationV1().MachineConfigPools().List(context.TODO(), metav1.ListOptions{}) + if err != nil { + glog.Errorf("getNodeMachinePool(): Failed to list Machine Config Pools: %v", err) + return err + } + var mcp mcfgv1.MachineConfigPool + for _, mcp = range mcpList.Items { + selector, err := metav1.LabelSelectorAsSelector(mcp.Spec.NodeSelector) + if err != nil { + glog.Errorf("getNodeMachinePool(): Machine Config Pool %s invalid label selector: %v", mcp.GetName(), err) + return err + } + + if selector.Matches(labels.Set(dn.node.Labels)) { + dn.mcpName = mcp.GetName() + glog.Infof("getNodeMachinePool(): find node in MCP %s", dn.mcpName) + return nil + } + } + return fmt.Errorf("getNodeMachinePool(): Failed to find the MCP of the node") +} + func (dn *Daemon) drainNode(name string) error { glog.Info("drainNode(): Update prepared") var err error + + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() // wait a random time to avoid all the nodes drain at the same time - wait.PollUntil(time.Duration(rand.Intn(15)+1)*time.Second, func() (bool, error) { + time.Sleep(wait.Jitter(3*time.Second, 3)) + wait.JitterUntil(func() { if !dn.drainable { - glog.Info("drainNode(): other node is draining, waiting...") + glog.V(2).Info("drainNode(): other node is draining") + return + } + glog.V(2).Info("drainNode(): no other node is draining") + err = dn.annotateNode(dn.name, annoDraining) + if err != nil { + glog.Errorf("drainNode(): Failed to annotate node: %v", err) + return + } + cancel() + }, 3*time.Second, 3, true, ctx.Done()) + + if utils.ClusterType == utils.ClusterTypeOpenshift { + mcpInformerFactory := mcfginformers.NewSharedInformerFactory(dn.mcClient, + time.Second*30, + ) + mcpInformer := mcpInformerFactory.Machineconfiguration().V1().MachineConfigPools().Informer() + + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + paused := dn.node.Annotations[annoKey] == annoMcpPaused + + mcpEventHandler := func(obj interface{}) { + mcp := obj.(*mcfgv1.MachineConfigPool) + if mcp.GetName() != dn.mcpName { + return + } + // Always get the latest object + newMcp, err := dn.mcClient.MachineconfigurationV1().MachineConfigPools().Get(ctx, dn.mcpName, metav1.GetOptions{}) + if err != nil { + glog.V(2).Infof("drainNode(): Failed to get MCP %s: %v", dn.mcpName, err) + return + } + if mcfgv1.IsMachineConfigPoolConditionFalse(newMcp.Status.Conditions, mcfgv1.MachineConfigPoolDegraded) && + mcfgv1.IsMachineConfigPoolConditionTrue(newMcp.Status.Conditions, mcfgv1.MachineConfigPoolUpdated) && + mcfgv1.IsMachineConfigPoolConditionFalse(newMcp.Status.Conditions, mcfgv1.MachineConfigPoolUpdating) { + glog.V(2).Infof("drainNode(): MCP %s is ready", dn.mcpName) + if paused { + glog.V(2).Info("drainNode(): stop MCP informer", dn.mcpName) + cancel() + return + } + if newMcp.Spec.Paused { + glog.V(2).Infof("drainNode(): MCP %s was paused by other, wait...", dn.mcpName) + return + } + glog.Infof("drainNode(): pause MCP %s", dn.mcpName) + pausePatch := []byte("{\"spec\":{\"paused\":true}}") + _, err = dn.mcClient.MachineconfigurationV1().MachineConfigPools().Patch(context.Background(), dn.mcpName, types.MergePatchType, pausePatch, metav1.PatchOptions{}) + if err != nil { + glog.V(2).Infof("drainNode(): Failed to pause MCP %s: %v", dn.mcpName, err) + return + } + err = dn.annotateNode(dn.name, annoMcpPaused) + if err != nil { + glog.V(2).Infof("drainNode(): Failed to annotate node: %v", err) + return + } + paused = true + return + } + if paused { + glog.Infof("drainNode(): MCP is processing, resume MCP %s", dn.mcpName) + pausePatch := []byte("{\"spec\":{\"paused\":false}}") + _, err = dn.mcClient.MachineconfigurationV1().MachineConfigPools().Patch(context.Background(), dn.mcpName, types.MergePatchType, pausePatch, metav1.PatchOptions{}) + if err != nil { + glog.V(2).Infof("drainNode(): fail to resume MCP %s: %v", dn.mcpName, err) + return + } + err = dn.annotateNode(dn.name, annoDraining) + if err != nil { + glog.V(2).Infof("drainNode(): Failed to annotate node: %v", err) + return + } + paused = false + } + glog.Infof("drainNode():MCP %s is not ready: %v, wait...", newMcp.GetName(), newMcp.Status.Conditions) } - return dn.drainable, nil - }, dn.stopCh) - err = dn.annotateNode(dn.name, annoDraining) - if err != nil { - glog.Errorf("drainNode(): Failed to annotate node: %v", err) - return err + mcpInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: mcpEventHandler, + UpdateFunc: func(old, new interface{}) { + mcpEventHandler(new) + }, + }) + mcpInformerFactory.Start(ctx.Done()) + mcpInformerFactory.WaitForCacheSync(ctx.Done()) + <-ctx.Done() } backoff := wait.Backoff{ diff --git a/pkg/plugins/generic/generic_plugin.go b/pkg/plugins/generic/generic_plugin.go index ec32bcb9e9..966fdc8e0c 100644 --- a/pkg/plugins/generic/generic_plugin.go +++ b/pkg/plugins/generic/generic_plugin.go @@ -190,15 +190,17 @@ func needDrainNode(desired sriovnetworkv1.Interfaces, current sriovnetworkv1.Int // ignore swichdev device break } - if iface.NumVfs != ifaceStatus.NumVfs && ifaceStatus.NumVfs != 0 { - glog.V(2).Infof("generic-plugin needDrainNode(): need drain, expect NumVfs %v, current NumVfs %v", iface.NumVfs, ifaceStatus.NumVfs) - needDrain = true - return - } - if iface.Mtu != 0 && iface.Mtu != ifaceStatus.Mtu { - glog.V(2).Infof("generic-plugin needDrainNode(): need drain, expect MTU %v, current MTU %v", iface.Mtu, ifaceStatus.Mtu) - needDrain = true - return + if ifaceStatus.NumVfs != 0 { + if iface.NumVfs != ifaceStatus.NumVfs { + glog.V(2).Infof("generic-plugin needDrainNode(): need drain, expect NumVfs %v, current NumVfs %v", iface.NumVfs, ifaceStatus.NumVfs) + needDrain = true + return + } + if iface.Mtu != 0 && iface.Mtu != ifaceStatus.Mtu { + glog.V(2).Infof("generic-plugin needDrainNode(): need drain, expect MTU %v, current MTU %v", iface.Mtu, ifaceStatus.Mtu) + needDrain = true + return + } } } }