Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nfd-master: use separate k8s api clients for each updater #1653

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/nfd-master/nfd-master-internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func TestUpdateNodeObject(t *testing.T) {
fakeMaster := newFakeMaster(WithKubernetesClient(fakeCli))

Convey("When I successfully update the node with feature labels", func() {
err := fakeMaster.updateNodeObject(testNode, featureLabels, featureAnnotations, featureExtResources, nil)
err := fakeMaster.updateNodeObject(fakeCli, testNode, featureLabels, featureAnnotations, featureExtResources, nil)
Convey("Error is nil", func() {
So(err, ShouldBeNil)
})
Expand Down Expand Up @@ -199,7 +199,7 @@ func TestUpdateNodeObject(t *testing.T) {
fakeCli.CoreV1().(*fakecorev1client.FakeCoreV1).PrependReactor("patch", "nodes", func(action clienttesting.Action) (handled bool, ret runtime.Object, err error) {
return true, &v1.Node{}, errors.New("Fake error when patching node")
})
err := fakeMaster.updateNodeObject(testNode, nil, featureAnnotations, ExtendedResources{"": ""}, nil)
err := fakeMaster.updateNodeObject(fakeCli, testNode, nil, featureAnnotations, ExtendedResources{"": ""}, nil)

Convey("Error is produced", func() {
So(err, ShouldBeError)
Expand Down
57 changes: 30 additions & 27 deletions pkg/nfd-master/nfd-master.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
k8sLabels "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
k8sclient "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -150,6 +151,7 @@
healthServer *grpc.Server
stop chan struct{}
ready chan struct{}
kubeconfig *restclient.Config
k8sClient k8sclient.Interface
nodeUpdaterPool *nodeUpdaterPool
deniedNs
Expand Down Expand Up @@ -200,6 +202,7 @@
if err != nil {
return nfd, err
}
nfd.kubeconfig = kubeconfig

Check warning on line 205 in pkg/nfd-master/nfd-master.go

View check run for this annotation

Codecov / codecov/patch

pkg/nfd-master/nfd-master.go#L205

Added line #L205 was not covered by tests
cli, err := k8sclient.NewForConfig(kubeconfig)
if err != nil {
return nfd, err
Expand Down Expand Up @@ -528,7 +531,7 @@
return nil
}

nodes, err := m.getNodes()
nodes, err := getNodes(m.k8sClient)

Check warning on line 534 in pkg/nfd-master/nfd-master.go

View check run for this annotation

Codecov / codecov/patch

pkg/nfd-master/nfd-master.go#L534

Added line #L534 was not covered by tests
if err != nil {
return err
}
Expand All @@ -537,14 +540,14 @@
klog.InfoS("pruning node...", "nodeName", node.Name)

// Prune labels and extended resources
err := m.updateNodeObject(&node, Labels{}, Annotations{}, ExtendedResources{}, []corev1.Taint{})
err := m.updateNodeObject(m.k8sClient, &node, Labels{}, Annotations{}, ExtendedResources{}, []corev1.Taint{})

Check warning on line 543 in pkg/nfd-master/nfd-master.go

View check run for this annotation

Codecov / codecov/patch

pkg/nfd-master/nfd-master.go#L543

Added line #L543 was not covered by tests
if err != nil {
nodeUpdateFailures.Inc()
return fmt.Errorf("failed to prune node %q: %v", node.Name, err)
}

// Prune annotations
node, err := m.getNode(node.Name)
node, err := getNode(m.k8sClient, node.Name)

Check warning on line 550 in pkg/nfd-master/nfd-master.go

View check run for this annotation

Codecov / codecov/patch

pkg/nfd-master/nfd-master.go#L550

Added line #L550 was not covered by tests
if err != nil {
return err
}
Expand All @@ -564,7 +567,7 @@
// "nfd.node.kubernetes.io/master.version" annotation, if it exists.
// TODO: Drop when nfdv1alpha1.MasterVersionAnnotation is removed.
func (m *nfdMaster) updateMasterNode() error {
node, err := m.getNode(m.nodeName)
node, err := getNode(m.k8sClient, m.nodeName)
if err != nil {
return err
}
Expand All @@ -575,7 +578,7 @@
nil,
"/metadata/annotations")

err = m.patchNode(node.Name, p)
err = patchNode(m.k8sClient, node.Name, p)
if err != nil {
return fmt.Errorf("failed to patch node annotations: %w", err)
}
Expand Down Expand Up @@ -727,12 +730,12 @@
}
if !m.config.NoPublish {
// Fetch the node object.
node, err := m.getNode(r.NodeName)
node, err := getNode(m.k8sClient, r.NodeName)
if err != nil {
return &pb.SetLabelsReply{}, err
}
// Create labels et al
if err := m.refreshNodeFeatures(node, r.GetLabels(), r.GetFeatures()); err != nil {
if err := m.refreshNodeFeatures(m.k8sClient, node, r.GetLabels(), r.GetFeatures()); err != nil {
nodeUpdateFailures.Inc()
return &pb.SetLabelsReply{}, err
}
Expand All @@ -743,7 +746,7 @@
func (m *nfdMaster) nfdAPIUpdateAllNodes() error {
klog.InfoS("will process all nodes in the cluster")

nodes, err := m.getNodes()
nodes, err := getNodes(m.k8sClient)

Check warning on line 749 in pkg/nfd-master/nfd-master.go

View check run for this annotation

Codecov / codecov/patch

pkg/nfd-master/nfd-master.go#L749

Added line #L749 was not covered by tests
if err != nil {
return err
}
Expand All @@ -755,7 +758,7 @@
return nil
}

func (m *nfdMaster) nfdAPIUpdateOneNode(node *corev1.Node) error {
func (m *nfdMaster) nfdAPIUpdateOneNode(cli k8sclient.Interface, node *corev1.Node) error {

Check warning on line 761 in pkg/nfd-master/nfd-master.go

View check run for this annotation

Codecov / codecov/patch

pkg/nfd-master/nfd-master.go#L761

Added line #L761 was not covered by tests
if m.nfdController == nil || m.nfdController.featureLister == nil {
return nil
}
Expand Down Expand Up @@ -810,7 +813,7 @@
// Update node labels et al. This may also mean removing all NFD-owned
// labels (et al.), for example in the case no NodeFeature objects are
// present.
if err := m.refreshNodeFeatures(node, features.Labels, &features.Features); err != nil {
if err := m.refreshNodeFeatures(cli, node, features.Labels, &features.Features); err != nil {

Check warning on line 816 in pkg/nfd-master/nfd-master.go

View check run for this annotation

Codecov / codecov/patch

pkg/nfd-master/nfd-master.go#L816

Added line #L816 was not covered by tests
return err
}

Expand Down Expand Up @@ -855,7 +858,7 @@
return filteredValue, nil
}

func (m *nfdMaster) refreshNodeFeatures(node *corev1.Node, labels map[string]string, features *nfdv1alpha1.Features) error {
func (m *nfdMaster) refreshNodeFeatures(cli k8sclient.Interface, node *corev1.Node, labels map[string]string, features *nfdv1alpha1.Features) error {
if m.config.AutoDefaultNs {
labels = addNsToMapKeys(labels, nfdv1alpha1.FeatureLabelNs)
} else if labels == nil {
Expand Down Expand Up @@ -889,7 +892,7 @@
return nil
}

err := m.updateNodeObject(node, labels, annotations, extendedResources, taints)
err := m.updateNodeObject(cli, node, labels, annotations, extendedResources, taints)
if err != nil {
klog.ErrorS(err, "failed to update node", "nodeName", node.Name)
return err
Expand All @@ -901,7 +904,7 @@
// setTaints sets node taints and annotations based on the taints passed via
// nodeFeatureRule custom resorce. If empty list of taints is passed, currently
// NFD owned taints and annotations are removed from the node.
func (m *nfdMaster) setTaints(taints []corev1.Taint, node *corev1.Node) error {
func setTaints(cli k8sclient.Interface, taints []corev1.Taint, node *corev1.Node) error {
// De-serialize the taints annotation into corev1.Taint type for comparision below.
var err error
oldTaints := []corev1.Taint{}
Expand Down Expand Up @@ -940,7 +943,7 @@
}

if taintsUpdated {
if err := controller.PatchNodeTaints(context.TODO(), m.k8sClient, node.Name, node, newNode); err != nil {
if err := controller.PatchNodeTaints(context.TODO(), cli, node.Name, node, newNode); err != nil {

Check warning on line 946 in pkg/nfd-master/nfd-master.go

View check run for this annotation

Codecov / codecov/patch

pkg/nfd-master/nfd-master.go#L946

Added line #L946 was not covered by tests
return fmt.Errorf("failed to patch the node %v", node.Name)
}
klog.InfoS("updated node taints", "nodeName", node.Name)
Expand All @@ -960,7 +963,7 @@

patches := createPatches([]string{nfdv1alpha1.NodeTaintsAnnotation}, node.Annotations, newAnnotations, "/metadata/annotations")
if len(patches) > 0 {
if err := m.patchNode(node.Name, patches); err != nil {
if err := patchNode(cli, node.Name, patches); err != nil {

Check warning on line 966 in pkg/nfd-master/nfd-master.go

View check run for this annotation

Codecov / codecov/patch

pkg/nfd-master/nfd-master.go#L966

Added line #L966 was not covered by tests
return fmt.Errorf("error while patching node object: %w", err)
}
klog.V(1).InfoS("patched node annotations for taints", "nodeName", node.Name)
Expand Down Expand Up @@ -1057,7 +1060,7 @@
// updateNodeObject ensures the Kubernetes node object is up to date,
// creating new labels and extended resources where necessary and removing
// outdated ones. Also updates the corresponding annotations.
func (m *nfdMaster) updateNodeObject(node *corev1.Node, labels Labels, featureAnnotations Annotations, extendedResources ExtendedResources, taints []corev1.Taint) error {
func (m *nfdMaster) updateNodeObject(cli k8sclient.Interface, node *corev1.Node, labels Labels, featureAnnotations Annotations, extendedResources ExtendedResources, taints []corev1.Taint) error {
annotations := make(Annotations)

// Store names of labels in an annotation
Expand Down Expand Up @@ -1110,13 +1113,13 @@

// patch node status with extended resource changes
statusPatches := m.createExtendedResourcePatches(node, extendedResources)
err := m.patchNodeStatus(node.Name, statusPatches)
err := patchNodeStatus(cli, node.Name, statusPatches)
if err != nil {
return fmt.Errorf("error while patching extended resources: %w", err)
}

// Patch the node object in the apiserver
err = m.patchNode(node.Name, patches)
err = patchNode(cli, node.Name, patches)
if err != nil {
return fmt.Errorf("error while patching node object: %w", err)
}
Expand All @@ -1129,7 +1132,7 @@
}

// Set taints
err = m.setTaints(taints, node)
err = setTaints(cli, taints, node)
if err != nil {
return err
}
Expand Down Expand Up @@ -1420,25 +1423,25 @@
return outAnnotations
}

func (m *nfdMaster) getNode(nodeName string) (*corev1.Node, error) {
return m.k8sClient.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
func getNode(cli k8sclient.Interface, nodeName string) (*corev1.Node, error) {
return cli.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
}

func (m *nfdMaster) getNodes() (*corev1.NodeList, error) {
return m.k8sClient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
func getNodes(cli k8sclient.Interface) (*corev1.NodeList, error) {
return cli.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})

Check warning on line 1431 in pkg/nfd-master/nfd-master.go

View check run for this annotation

Codecov / codecov/patch

pkg/nfd-master/nfd-master.go#L1430-L1431

Added lines #L1430 - L1431 were not covered by tests
}

func (m *nfdMaster) patchNode(nodeName string, patches []utils.JsonPatch, subresources ...string) error {
func patchNode(cli k8sclient.Interface, nodeName string, patches []utils.JsonPatch, subresources ...string) error {
if len(patches) == 0 {
return nil
}
data, err := json.Marshal(patches)
if err == nil {
_, err = m.k8sClient.CoreV1().Nodes().Patch(context.TODO(), nodeName, types.JSONPatchType, data, metav1.PatchOptions{}, subresources...)
_, err = cli.CoreV1().Nodes().Patch(context.TODO(), nodeName, types.JSONPatchType, data, metav1.PatchOptions{}, subresources...)
}
return err
}

func (m *nfdMaster) patchNodeStatus(nodeName string, patches []utils.JsonPatch) error {
return m.patchNode(nodeName, patches, "status")
func patchNodeStatus(cli k8sclient.Interface, nodeName string, patches []utils.JsonPatch) error {
return patchNode(cli, nodeName, patches, "status")
}
17 changes: 13 additions & 4 deletions pkg/nfd-master/node-updater-pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

"golang.org/x/time/rate"
apierrors "k8s.io/apimachinery/pkg/api/errors"
k8sclient "k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
)
Expand All @@ -41,7 +42,7 @@
}
}

func (u *nodeUpdaterPool) processNodeUpdateRequest(queue workqueue.RateLimitingInterface) bool {
func (u *nodeUpdaterPool) processNodeUpdateRequest(cli k8sclient.Interface, queue workqueue.RateLimitingInterface) bool {
n, quit := queue.Get()
if quit {
return false
Expand All @@ -53,9 +54,9 @@
nodeUpdateRequests.Inc()

// Check if node exists
if node, err := u.nfdMaster.getNode(nodeName); apierrors.IsNotFound(err) {
if node, err := getNode(cli, nodeName); apierrors.IsNotFound(err) {
klog.InfoS("node not found, skip update", "nodeName", nodeName)
} else if err := u.nfdMaster.nfdAPIUpdateOneNode(node); err != nil {
} else if err := u.nfdMaster.nfdAPIUpdateOneNode(cli, node); err != nil {
if n := queue.NumRequeues(nodeName); n < 15 {
klog.InfoS("retrying node update", "nodeName", nodeName, "lastError", err, "numRetries", n)
} else {
Expand All @@ -71,7 +72,15 @@
}

func (u *nodeUpdaterPool) runNodeUpdater(queue workqueue.RateLimitingInterface) {
for u.processNodeUpdateRequest(queue) {
var cli k8sclient.Interface
if u.nfdMaster.kubeconfig != nil {
marquiz marked this conversation as resolved.
Show resolved Hide resolved
// For normal execution, initialize a separate api client for each updater
cli = k8sclient.NewForConfigOrDie(u.nfdMaster.kubeconfig)

Check warning on line 78 in pkg/nfd-master/node-updater-pool.go

View check run for this annotation

Codecov / codecov/patch

pkg/nfd-master/node-updater-pool.go#L77-L78

Added lines #L77 - L78 were not covered by tests
} else {
// For tests, re-use the api client from nfd-master
cli = u.nfdMaster.k8sClient
}
for u.processNodeUpdateRequest(cli, queue) {
}
u.wg.Done()
}
Expand Down