Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cmd/sriov-network-config-daemon/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/connrotation"

mcfgv1 "github.com/openshift/machine-config-operator/pkg/apis/machineconfiguration.openshift.io/v1"
mcclientset "github.com/openshift/machine-config-operator/pkg/generated/clientset/versioned"
)

var (
Expand Down Expand Up @@ -124,9 +127,11 @@ func runStartCmd(cmd *cobra.Command, args []string) {
}

sriovnetworkv1.AddToScheme(scheme.Scheme)
mcfgv1.AddToScheme(scheme.Scheme)

snclient := snclientset.NewForConfigOrDie(config)
kubeclient := kubernetes.NewForConfigOrDie(config)
mcclient := mcclientset.NewForConfigOrDie(config)

config.Timeout = 5 * time.Second
writerclient := snclientset.NewForConfigOrDie(config)
Expand Down Expand Up @@ -161,6 +166,7 @@ func runStartCmd(cmd *cobra.Command, args []string) {
startOpts.nodeName,
snclient,
kubeclient,
mcclient,
exitCh,
stopCh,
syncCh,
Expand Down
3 changes: 3 additions & 0 deletions deploy/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,6 @@ rules:
- apiGroups: [""]
resources: ["pods/eviction"]
verbs: ["create"]
- apiGroups: ["machineconfiguration.openshift.io"]
Comment thread
SchSeba marked this conversation as resolved.
resources: ["*"]
verbs: ["*"]
154 changes: 140 additions & 14 deletions pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ import (
snclientset "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/client/clientset/versioned"
sninformer "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/client/informers/externalversions"
"github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/utils"
mcfgv1 "github.com/openshift/machine-config-operator/pkg/apis/machineconfiguration.openshift.io/v1"
mcclientset "github.com/openshift/machine-config-operator/pkg/generated/clientset/versioned"
mcfginformers "github.com/openshift/machine-config-operator/pkg/generated/informers/externalversions"
)

const (
Expand Down Expand Up @@ -69,6 +72,8 @@ type Daemon struct {
// kubeClient allows interaction with Kubernetes, including the node we are running on.
kubeClient *kubernetes.Clientset

mcClient *mcclientset.Clientset

nodeState *sriovnetworkv1.SriovNetworkNodeState

LoadedPlugins map[string]VendorPlugin
Expand Down Expand Up @@ -98,6 +103,8 @@ type Daemon struct {
nodeLister listerv1.NodeLister

workqueue workqueue.RateLimitingInterface

mcpName string
}

type workItem struct {
Expand All @@ -110,6 +117,7 @@ const (
annoKey = "sriovnetwork.openshift.io/state"
annoIdle = "Idle"
annoDraining = "Draining"
annoMcpPaused = "Draining_MCP_Paused"
)

var namespace = os.Getenv("NAMESPACE")
Expand All @@ -130,6 +138,7 @@ func New(
nodeName string,
client snclientset.Interface,
kubeClient *kubernetes.Clientset,
mcClient *mcclientset.Clientset,
exitCh chan<- error,
stopCh <-chan struct{},
syncCh <-chan struct{},
Expand All @@ -141,11 +150,11 @@ func New(
platform: platformType,
client: client,
kubeClient: kubeClient,
mcClient: mcClient,
exitCh: exitCh,
stopCh: stopCh,
syncCh: syncCh,
refreshCh: refreshCh,
drainable: true,
nodeState: &sriovnetworkv1.SriovNetworkNodeState{},
drainer: &drain.Helper{
Client: kubeClient,
Expand Down Expand Up @@ -413,13 +422,11 @@ func (dn *Daemon) nodeUpdateHandler(old, new interface{}) {
return
}
for _, node := range nodes {
if node.GetName() != dn.name && node.Annotations[annoKey] == annoDraining {
glog.V(2).Infof("nodeUpdateHandler(): node %s is draining", node.Name)
if node.GetName() != dn.name && (node.Annotations[annoKey] == annoDraining || node.Annotations[annoKey] == annoMcpPaused) {
dn.drainable = false
return
}
}
glog.V(2).Infof("nodeUpdateHandler(): no other node is draining")
dn.drainable = true
}

Expand Down Expand Up @@ -522,6 +529,10 @@ func (dn *Daemon) nodeStateSyncHandler(generation int64) error {
}
return nil
}
if err = dn.getNodeMachinePool(); err != nil {
return err
}

if reqDrain && !dn.disableDrain {
glog.Info("nodeStateSyncHandler(): drain node")
if err := dn.drainNode(dn.name); err != nil {
Expand Down Expand Up @@ -553,7 +564,7 @@ func (dn *Daemon) nodeStateSyncHandler(generation int64) error {
}
}

if anno, ok := dn.node.Annotations[annoKey]; ok && anno == annoDraining {
if anno, ok := dn.node.Annotations[annoKey]; ok && (anno == annoDraining || anno == annoMcpPaused) {
if err := dn.completeDrain(); err != nil {
glog.Errorf("nodeStateSyncHandler(): failed to complete draining: %v", err)
return err
Expand All @@ -580,8 +591,17 @@ func (dn *Daemon) completeDrain() error {
return err
}

if utils.ClusterType == utils.ClusterTypeOpenshift {
glog.Infof("completeDrain(): resume MCP %s", dn.mcpName)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making sure I understand this properly, the completeDrain() function gets called after the reboot correct? To uncordon the node?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It will be executed after reboot or after draining without a reboot.

pausePatch := []byte("{\"spec\":{\"paused\":false}}")
if _, err := dn.mcClient.MachineconfigurationV1().MachineConfigPools().Patch(context.Background(), dn.mcpName, types.MergePatchType, pausePatch, metav1.PatchOptions{}); err != nil {
Comment thread
pliurh marked this conversation as resolved.
glog.Errorf("completeDrain(): failed to resume MCP %s: %v", dn.mcpName, err)
return err
}
}

if err := dn.annotateNode(dn.name, annoIdle); err != nil {
glog.Errorf("drainNode(): failed to annotate node: %v", err)
glog.Errorf("completeDrain(): failed to annotate node: %v", err)
return err
}
return nil
Expand Down Expand Up @@ -747,21 +767,127 @@ func (dn *Daemon) annotateNode(node, value string) error {
return nil
}

func (dn *Daemon) getNodeMachinePool() error {
Comment thread
SchSeba marked this conversation as resolved.
mcpList, err := dn.mcClient.MachineconfigurationV1().MachineConfigPools().List(context.TODO(), metav1.ListOptions{})
if err != nil {
glog.Errorf("getNodeMachinePool(): Failed to list Machine Config Pools: %v", err)
return err
}
var mcp mcfgv1.MachineConfigPool
for _, mcp = range mcpList.Items {
selector, err := metav1.LabelSelectorAsSelector(mcp.Spec.NodeSelector)
if err != nil {
glog.Errorf("getNodeMachinePool(): Machine Config Pool %s invalid label selector: %v", mcp.GetName(), err)
return err
}

if selector.Matches(labels.Set(dn.node.Labels)) {
dn.mcpName = mcp.GetName()
glog.Infof("getNodeMachinePool(): find node in MCP %s", dn.mcpName)
return nil
}
}
return fmt.Errorf("getNodeMachinePool(): Failed to find the MCP of the node")
}

func (dn *Daemon) drainNode(name string) error {
glog.Info("drainNode(): Update prepared")
var err error

ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
// wait a random time to avoid all the nodes drain at the same time
wait.PollUntil(time.Duration(rand.Intn(15)+1)*time.Second, func() (bool, error) {
time.Sleep(wait.Jitter(3*time.Second, 3))
wait.JitterUntil(func() {
if !dn.drainable {
glog.Info("drainNode(): other node is draining, waiting...")
glog.V(2).Info("drainNode(): other node is draining")
return
}
glog.V(2).Info("drainNode(): no other node is draining")
err = dn.annotateNode(dn.name, annoDraining)
if err != nil {
glog.Errorf("drainNode(): Failed to annotate node: %v", err)
return
}
cancel()
}, 3*time.Second, 3, true, ctx.Done())

if utils.ClusterType == utils.ClusterTypeOpenshift {
mcpInformerFactory := mcfginformers.NewSharedInformerFactory(dn.mcClient,
time.Second*30,
)
mcpInformer := mcpInformerFactory.Machineconfiguration().V1().MachineConfigPools().Informer()

ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
paused := dn.node.Annotations[annoKey] == annoMcpPaused

mcpEventHandler := func(obj interface{}) {
mcp := obj.(*mcfgv1.MachineConfigPool)
if mcp.GetName() != dn.mcpName {
return
}
// Always get the latest object
newMcp, err := dn.mcClient.MachineconfigurationV1().MachineConfigPools().Get(ctx, dn.mcpName, metav1.GetOptions{})
if err != nil {
glog.V(2).Infof("drainNode(): Failed to get MCP %s: %v", dn.mcpName, err)
return
}
if mcfgv1.IsMachineConfigPoolConditionFalse(newMcp.Status.Conditions, mcfgv1.MachineConfigPoolDegraded) &&

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By checking poolDegraded, it makes sure pool is not paused by user, right?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yuqi-zhang @sinnykumari, do you think the checks here can 100% guarantee there is no race condition between MCO and SNO trying to start an update simultaneously?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By checking poolDegraded, it makes sure pool is not paused by user, right?

No, this is to check MCP is not in 'Degraded' state.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pausing and degraded are separate concepts. I think the SRIOV operator shouldn't do anything if a pool is degraded (which means the pool is in an error state), as you have here. I think the checks (not degraded, not updating, updated) should be good.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When node gets rebooted by sriov config daemon, will the drainNode be called again (in which case MCP is degraded due to pause:true)?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pausing an MCP will not make it 'Degraded'.

mcfgv1.IsMachineConfigPoolConditionTrue(newMcp.Status.Conditions, mcfgv1.MachineConfigPoolUpdated) &&
mcfgv1.IsMachineConfigPoolConditionFalse(newMcp.Status.Conditions, mcfgv1.MachineConfigPoolUpdating) {
glog.V(2).Infof("drainNode(): MCP %s is ready", dn.mcpName)
if paused {
glog.V(2).Info("drainNode(): stop MCP informer", dn.mcpName)
cancel()
return
}
if newMcp.Spec.Paused {
Comment thread
SchSeba marked this conversation as resolved.
glog.V(2).Infof("drainNode(): MCP %s was paused by other, wait...", dn.mcpName)
return
}
glog.Infof("drainNode(): pause MCP %s", dn.mcpName)
pausePatch := []byte("{\"spec\":{\"paused\":true}}")
_, err = dn.mcClient.MachineconfigurationV1().MachineConfigPools().Patch(context.Background(), dn.mcpName, types.MergePatchType, pausePatch, metav1.PatchOptions{})
if err != nil {
glog.V(2).Infof("drainNode(): Failed to pause MCP %s: %v", dn.mcpName, err)
return
}
err = dn.annotateNode(dn.name, annoMcpPaused)
if err != nil {
glog.V(2).Infof("drainNode(): Failed to annotate node: %v", err)
return
}
paused = true
return
}
if paused {
glog.Infof("drainNode(): MCP is processing, resume MCP %s", dn.mcpName)
pausePatch := []byte("{\"spec\":{\"paused\":false}}")

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my understanding, can you please explain the condition in which we are unpausing the pool here.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The MCP informer will not be stoped right after we pause the MCP. So the mcpEventHandler will be invoked once more as we updated the MCD. In this cycle, we check if the MCP is processing, if yes and the MCP was paused during the last cycle (indicated by the variable 'paused' here). We resume the MCP and wait until it finishes.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to address #93 (comment)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it, makes sense.

_, err = dn.mcClient.MachineconfigurationV1().MachineConfigPools().Patch(context.Background(), dn.mcpName, types.MergePatchType, pausePatch, metav1.PatchOptions{})
if err != nil {
glog.V(2).Infof("drainNode(): fail to resume MCP %s: %v", dn.mcpName, err)
return
}
err = dn.annotateNode(dn.name, annoDraining)
if err != nil {
glog.V(2).Infof("drainNode(): Failed to annotate node: %v", err)
return
}
paused = false
}
glog.Infof("drainNode():MCP %s is not ready: %v, wait...", newMcp.GetName(), newMcp.Status.Conditions)
}
return dn.drainable, nil
}, dn.stopCh)

err = dn.annotateNode(dn.name, annoDraining)
if err != nil {
glog.Errorf("drainNode(): Failed to annotate node: %v", err)
return err
mcpInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we add a check if this is the MCP that the node is part of even before we send it to the mcpEventHandler function?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I don't know how we can do that. Normally, the informer watches all the objects. I think it's sufficient that we check it in the mcpEventHandler.

AddFunc: mcpEventHandler,
UpdateFunc: func(old, new interface{}) {
mcpEventHandler(new)
},
})
mcpInformerFactory.Start(ctx.Done())
mcpInformerFactory.WaitForCacheSync(ctx.Done())
<-ctx.Done()
}

backoff := wait.Backoff{
Expand Down
20 changes: 11 additions & 9 deletions pkg/plugins/generic/generic_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,15 +190,17 @@ func needDrainNode(desired sriovnetworkv1.Interfaces, current sriovnetworkv1.Int
// ignore swichdev device
break
}
if iface.NumVfs != ifaceStatus.NumVfs && ifaceStatus.NumVfs != 0 {
glog.V(2).Infof("generic-plugin needDrainNode(): need drain, expect NumVfs %v, current NumVfs %v", iface.NumVfs, ifaceStatus.NumVfs)
needDrain = true
return
}
if iface.Mtu != 0 && iface.Mtu != ifaceStatus.Mtu {
glog.V(2).Infof("generic-plugin needDrainNode(): need drain, expect MTU %v, current MTU %v", iface.Mtu, ifaceStatus.Mtu)
needDrain = true
return
if ifaceStatus.NumVfs != 0 {
if iface.NumVfs != ifaceStatus.NumVfs {
glog.V(2).Infof("generic-plugin needDrainNode(): need drain, expect NumVfs %v, current NumVfs %v", iface.NumVfs, ifaceStatus.NumVfs)
needDrain = true
return
}
if iface.Mtu != 0 && iface.Mtu != ifaceStatus.Mtu {
glog.V(2).Infof("generic-plugin needDrainNode(): need drain, expect MTU %v, current MTU %v", iface.Mtu, ifaceStatus.Mtu)
needDrain = true
return
}
}
}
}
Expand Down