Skip to content

Commit

Permalink
nfd-master: use separate k8s api clients for each updater
Browse files Browse the repository at this point in the history
Sharing the same client between updater threads virtually serializes
access, in practice making the effective parallelism close to 1.

With this patch, in my bench cluster of 300 nodes, the time taken by
updating all nodes drops from ~2 minutes to ~12 seconds (with the
default parallelism of 10 node updater threads). This demonstrates the
10-fold increased parallelism from ~1 to 10.

There might be other solutions that could be explored, e.g. caching
nodes with an indexer/lister but otoh nfd doesn't necessarily need/want
to watch every little change in each node. We only need to get the node
when something in our own CRDs change (we don't react to any changes in
the node object itself). Using multiple clients was the most obvious
choice to solve the problem for now.
  • Loading branch information
marquiz committed Apr 4, 2024
1 parent 1ed50d2 commit 983af1b
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 33 deletions.
4 changes: 2 additions & 2 deletions pkg/nfd-master/nfd-master-internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func TestUpdateNodeObject(t *testing.T) {
fakeMaster := newFakeMaster(fakeCli)

Convey("When I successfully update the node with feature labels", func() {
err := fakeMaster.updateNodeObject(testNode, featureLabels, featureAnnotations, featureExtResources, nil)
err := fakeMaster.updateNodeObject(fakeCli, testNode, featureLabels, featureAnnotations, featureExtResources, nil)
Convey("Error is nil", func() {
So(err, ShouldBeNil)
})
Expand Down Expand Up @@ -189,7 +189,7 @@ func TestUpdateNodeObject(t *testing.T) {
fakeCli.CoreV1().(*fakecorev1client.FakeCoreV1).PrependReactor("patch", "nodes", func(action clienttesting.Action) (handled bool, ret runtime.Object, err error) {
return true, &v1.Node{}, errors.New("Fake error when patching node")
})
err := fakeMaster.updateNodeObject(testNode, nil, featureAnnotations, ExtendedResources{"": ""}, nil)
err := fakeMaster.updateNodeObject(fakeCli, testNode, nil, featureAnnotations, ExtendedResources{"": ""}, nil)

Convey("Error is produced", func() {
So(err, ShouldBeError)
Expand Down
57 changes: 30 additions & 27 deletions pkg/nfd-master/nfd-master.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
k8sLabels "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
k8sclient "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -150,6 +151,7 @@ type nfdMaster struct {
healthServer *grpc.Server
stop chan struct{}
ready chan bool
kubeconfig *restclient.Config
k8sClient k8sclient.Interface
nodeUpdaterPool *nodeUpdaterPool
deniedNs
Expand Down Expand Up @@ -494,7 +496,7 @@ func (m *nfdMaster) prune() error {
return nil
}

nodes, err := m.getNodes()
nodes, err := getNodes(m.k8sClient)

Check warning on line 499 in pkg/nfd-master/nfd-master.go

View check run for this annotation

Codecov / codecov/patch

pkg/nfd-master/nfd-master.go#L499

Added line #L499 was not covered by tests
if err != nil {
return err
}
Expand All @@ -503,14 +505,14 @@ func (m *nfdMaster) prune() error {
klog.InfoS("pruning node...", "nodeName", node.Name)

// Prune labels and extended resources
err := m.updateNodeObject(&node, Labels{}, Annotations{}, ExtendedResources{}, []corev1.Taint{})
err := m.updateNodeObject(m.k8sClient, &node, Labels{}, Annotations{}, ExtendedResources{}, []corev1.Taint{})

Check warning on line 508 in pkg/nfd-master/nfd-master.go

View check run for this annotation

Codecov / codecov/patch

pkg/nfd-master/nfd-master.go#L508

Added line #L508 was not covered by tests
if err != nil {
nodeUpdateFailures.Inc()
return fmt.Errorf("failed to prune node %q: %v", node.Name, err)
}

// Prune annotations
node, err := m.getNode(node.Name)
node, err := getNode(m.k8sClient, node.Name)

Check warning on line 515 in pkg/nfd-master/nfd-master.go

View check run for this annotation

Codecov / codecov/patch

pkg/nfd-master/nfd-master.go#L515

Added line #L515 was not covered by tests
if err != nil {
return err
}
Expand All @@ -530,7 +532,7 @@ func (m *nfdMaster) prune() error {
// "nfd.node.kubernetes.io/master.version" annotation, if it exists.
// TODO: Drop when nfdv1alpha1.MasterVersionAnnotation is removed.
func (m *nfdMaster) updateMasterNode() error {
node, err := m.getNode(m.nodeName)
node, err := getNode(m.k8sClient, m.nodeName)
if err != nil {
return err
}
Expand All @@ -541,7 +543,7 @@ func (m *nfdMaster) updateMasterNode() error {
nil,
"/metadata/annotations")

err = m.patchNode(node.Name, p)
err = patchNode(m.k8sClient, node.Name, p)
if err != nil {
return fmt.Errorf("failed to patch node annotations: %w", err)
}
Expand Down Expand Up @@ -693,12 +695,12 @@ func (m *nfdMaster) SetLabels(c context.Context, r *pb.SetLabelsRequest) (*pb.Se
}
if !m.config.NoPublish {
// Fetch the node object.
node, err := m.getNode(r.NodeName)
node, err := getNode(m.k8sClient, r.NodeName)
if err != nil {
return &pb.SetLabelsReply{}, err
}
// Create labels et al
if err := m.refreshNodeFeatures(node, r.GetLabels(), r.GetFeatures()); err != nil {
if err := m.refreshNodeFeatures(m.k8sClient, node, r.GetLabels(), r.GetFeatures()); err != nil {
nodeUpdateFailures.Inc()
return &pb.SetLabelsReply{}, err
}
Expand All @@ -709,7 +711,7 @@ func (m *nfdMaster) SetLabels(c context.Context, r *pb.SetLabelsRequest) (*pb.Se
func (m *nfdMaster) nfdAPIUpdateAllNodes() error {
klog.InfoS("will process all nodes in the cluster")

nodes, err := m.getNodes()
nodes, err := getNodes(m.k8sClient)

Check warning on line 714 in pkg/nfd-master/nfd-master.go

View check run for this annotation

Codecov / codecov/patch

pkg/nfd-master/nfd-master.go#L714

Added line #L714 was not covered by tests
if err != nil {
return err
}
Expand All @@ -721,7 +723,7 @@ func (m *nfdMaster) nfdAPIUpdateAllNodes() error {
return nil
}

func (m *nfdMaster) nfdAPIUpdateOneNode(node *corev1.Node) error {
func (m *nfdMaster) nfdAPIUpdateOneNode(cli k8sclient.Interface, node *corev1.Node) error {

Check warning on line 726 in pkg/nfd-master/nfd-master.go

View check run for this annotation

Codecov / codecov/patch

pkg/nfd-master/nfd-master.go#L726

Added line #L726 was not covered by tests
if m.nfdController == nil || m.nfdController.featureLister == nil {
return nil
}
Expand Down Expand Up @@ -776,7 +778,7 @@ func (m *nfdMaster) nfdAPIUpdateOneNode(node *corev1.Node) error {
// Update node labels et al. This may also mean removing all NFD-owned
// labels (et al.), for example in the case no NodeFeature objects are
// present.
if err := m.refreshNodeFeatures(node, features.Labels, &features.Features); err != nil {
if err := m.refreshNodeFeatures(cli, node, features.Labels, &features.Features); err != nil {

Check warning on line 781 in pkg/nfd-master/nfd-master.go

View check run for this annotation

Codecov / codecov/patch

pkg/nfd-master/nfd-master.go#L781

Added line #L781 was not covered by tests
return err
}

Expand Down Expand Up @@ -821,7 +823,7 @@ func filterExtendedResource(name, value string, features *nfdv1alpha1.Features)
return filteredValue, nil
}

func (m *nfdMaster) refreshNodeFeatures(node *corev1.Node, labels map[string]string, features *nfdv1alpha1.Features) error {
func (m *nfdMaster) refreshNodeFeatures(cli k8sclient.Interface, node *corev1.Node, labels map[string]string, features *nfdv1alpha1.Features) error {
if m.config.AutoDefaultNs {
labels = addNsToMapKeys(labels, nfdv1alpha1.FeatureLabelNs)
} else if labels == nil {
Expand Down Expand Up @@ -855,7 +857,7 @@ func (m *nfdMaster) refreshNodeFeatures(node *corev1.Node, labels map[string]str
return nil
}

Check warning on line 858 in pkg/nfd-master/nfd-master.go

View check run for this annotation

Codecov / codecov/patch

pkg/nfd-master/nfd-master.go#L856-L858

Added lines #L856 - L858 were not covered by tests

err := m.updateNodeObject(node, labels, annotations, extendedResources, taints)
err := m.updateNodeObject(cli, node, labels, annotations, extendedResources, taints)
if err != nil {
klog.ErrorS(err, "failed to update node", "nodeName", node.Name)
return err
Expand All @@ -867,7 +869,7 @@ func (m *nfdMaster) refreshNodeFeatures(node *corev1.Node, labels map[string]str
// setTaints sets node taints and annotations based on the taints passed via
// nodeFeatureRule custom resorce. If empty list of taints is passed, currently
// NFD owned taints and annotations are removed from the node.
func (m *nfdMaster) setTaints(taints []corev1.Taint, node *corev1.Node) error {
func setTaints(cli k8sclient.Interface, taints []corev1.Taint, node *corev1.Node) error {
// De-serialize the taints annotation into corev1.Taint type for comparision below.
var err error
oldTaints := []corev1.Taint{}
Expand Down Expand Up @@ -906,7 +908,7 @@ func (m *nfdMaster) setTaints(taints []corev1.Taint, node *corev1.Node) error {
}

if taintsUpdated {
if err := controller.PatchNodeTaints(context.TODO(), m.k8sClient, node.Name, node, newNode); err != nil {
if err := controller.PatchNodeTaints(context.TODO(), cli, node.Name, node, newNode); err != nil {

Check warning on line 911 in pkg/nfd-master/nfd-master.go

View check run for this annotation

Codecov / codecov/patch

pkg/nfd-master/nfd-master.go#L911

Added line #L911 was not covered by tests
return fmt.Errorf("failed to patch the node %v", node.Name)
}
klog.InfoS("updated node taints", "nodeName", node.Name)
Expand All @@ -926,7 +928,7 @@ func (m *nfdMaster) setTaints(taints []corev1.Taint, node *corev1.Node) error {

patches := createPatches([]string{nfdv1alpha1.NodeTaintsAnnotation}, node.Annotations, newAnnotations, "/metadata/annotations")
if len(patches) > 0 {
if err := m.patchNode(node.Name, patches); err != nil {
if err := patchNode(cli, node.Name, patches); err != nil {

Check warning on line 931 in pkg/nfd-master/nfd-master.go

View check run for this annotation

Codecov / codecov/patch

pkg/nfd-master/nfd-master.go#L931

Added line #L931 was not covered by tests
return fmt.Errorf("error while patching node object: %w", err)
}
klog.V(1).InfoS("patched node annotations for taints", "nodeName", node.Name)
Expand Down Expand Up @@ -1023,7 +1025,7 @@ func (m *nfdMaster) processNodeFeatureRule(nodeName string, features *nfdv1alpha
// updateNodeObject ensures the Kubernetes node object is up to date,
// creating new labels and extended resources where necessary and removing
// outdated ones. Also updates the corresponding annotations.
func (m *nfdMaster) updateNodeObject(node *corev1.Node, labels Labels, featureAnnotations Annotations, extendedResources ExtendedResources, taints []corev1.Taint) error {
func (m *nfdMaster) updateNodeObject(cli k8sclient.Interface, node *corev1.Node, labels Labels, featureAnnotations Annotations, extendedResources ExtendedResources, taints []corev1.Taint) error {
annotations := make(Annotations)

// Store names of labels in an annotation
Expand Down Expand Up @@ -1076,13 +1078,13 @@ func (m *nfdMaster) updateNodeObject(node *corev1.Node, labels Labels, featureAn

// patch node status with extended resource changes
statusPatches := m.createExtendedResourcePatches(node, extendedResources)
err := m.patchNodeStatus(node.Name, statusPatches)
err := patchNodeStatus(cli, node.Name, statusPatches)
if err != nil {
return fmt.Errorf("error while patching extended resources: %w", err)
}

// Patch the node object in the apiserver
err = m.patchNode(node.Name, patches)
err = patchNode(cli, node.Name, patches)
if err != nil {
return fmt.Errorf("error while patching node object: %w", err)
}
Expand All @@ -1095,7 +1097,7 @@ func (m *nfdMaster) updateNodeObject(node *corev1.Node, labels Labels, featureAn
}

// Set taints
err = m.setTaints(taints, node)
err = setTaints(cli, taints, node)
if err != nil {
return err
}
Expand Down Expand Up @@ -1234,6 +1236,7 @@ func (m *nfdMaster) configure(filepath string, overrides string) error {
if err != nil {
return err
}

Check warning on line 1238 in pkg/nfd-master/nfd-master.go

View check run for this annotation

Codecov / codecov/patch

pkg/nfd-master/nfd-master.go#L1237-L1238

Added lines #L1237 - L1238 were not covered by tests
m.kubeconfig = kubeconfig
cli, err := k8sclient.NewForConfig(kubeconfig)
if err != nil {
return err

Check warning on line 1242 in pkg/nfd-master/nfd-master.go

View check run for this annotation

Codecov / codecov/patch

pkg/nfd-master/nfd-master.go#L1242

Added line #L1242 was not covered by tests
Expand Down Expand Up @@ -1396,25 +1399,25 @@ func (m *nfdMaster) filterFeatureAnnotations(annotations map[string]string) map[
return outAnnotations
}

func (m *nfdMaster) getNode(nodeName string) (*corev1.Node, error) {
return m.k8sClient.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
func getNode(cli k8sclient.Interface, nodeName string) (*corev1.Node, error) {
return cli.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
}

func (m *nfdMaster) getNodes() (*corev1.NodeList, error) {
return m.k8sClient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
func getNodes(cli k8sclient.Interface) (*corev1.NodeList, error) {
return cli.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})

Check warning on line 1407 in pkg/nfd-master/nfd-master.go

View check run for this annotation

Codecov / codecov/patch

pkg/nfd-master/nfd-master.go#L1406-L1407

Added lines #L1406 - L1407 were not covered by tests
}

func (m *nfdMaster) patchNode(nodeName string, patches []utils.JsonPatch, subresources ...string) error {
func patchNode(cli k8sclient.Interface, nodeName string, patches []utils.JsonPatch, subresources ...string) error {
if len(patches) == 0 {
return nil
}
data, err := json.Marshal(patches)
if err == nil {
_, err = m.k8sClient.CoreV1().Nodes().Patch(context.TODO(), nodeName, types.JSONPatchType, data, metav1.PatchOptions{}, subresources...)
_, err = cli.CoreV1().Nodes().Patch(context.TODO(), nodeName, types.JSONPatchType, data, metav1.PatchOptions{}, subresources...)
}
return err
}

func (m *nfdMaster) patchNodeStatus(nodeName string, patches []utils.JsonPatch) error {
return m.patchNode(nodeName, patches, "status")
func patchNodeStatus(cli k8sclient.Interface, nodeName string, patches []utils.JsonPatch) error {
return patchNode(cli, nodeName, patches, "status")
}
10 changes: 6 additions & 4 deletions pkg/nfd-master/node-updater-pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"golang.org/x/time/rate"
apierrors "k8s.io/apimachinery/pkg/api/errors"
k8sclient "k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
)
Expand All @@ -41,7 +42,7 @@ func newNodeUpdaterPool(nfdMaster *nfdMaster) *nodeUpdaterPool {
}
}

func (u *nodeUpdaterPool) processNodeUpdateRequest(queue workqueue.RateLimitingInterface) bool {
func (u *nodeUpdaterPool) processNodeUpdateRequest(cli k8sclient.Interface, queue workqueue.RateLimitingInterface) bool {
n, quit := queue.Get()
if quit {
return false
Expand All @@ -53,9 +54,9 @@ func (u *nodeUpdaterPool) processNodeUpdateRequest(queue workqueue.RateLimitingI
nodeUpdateRequests.Inc()

// Check if node exists
if node, err := u.nfdMaster.getNode(nodeName); apierrors.IsNotFound(err) {
if node, err := getNode(cli, nodeName); apierrors.IsNotFound(err) {
klog.InfoS("node not found, skip update", "nodeName", nodeName)
} else if err := u.nfdMaster.nfdAPIUpdateOneNode(node); err != nil {
} else if err := u.nfdMaster.nfdAPIUpdateOneNode(cli, node); err != nil {

Check warning on line 59 in pkg/nfd-master/node-updater-pool.go

View check run for this annotation

Codecov / codecov/patch

pkg/nfd-master/node-updater-pool.go#L59

Added line #L59 was not covered by tests
if n := queue.NumRequeues(nodeName); n < 15 {
klog.InfoS("retrying node update", "nodeName", nodeName, "lastError", err, "numRetries", n)
} else {
Expand All @@ -71,7 +72,8 @@ func (u *nodeUpdaterPool) processNodeUpdateRequest(queue workqueue.RateLimitingI
}

func (u *nodeUpdaterPool) runNodeUpdater(queue workqueue.RateLimitingInterface) {
for u.processNodeUpdateRequest(queue) {
cli := k8sclient.NewForConfigOrDie(u.nfdMaster.kubeconfig)
for u.processNodeUpdateRequest(cli, queue) {
}
u.wg.Done()
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/nfd-master/node-updater-pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

. "github.com/smartystreets/goconvey/convey"
fakek8sclient "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/clientcmd"
fakenfdclient "sigs.k8s.io/node-feature-discovery/pkg/generated/clientset/versioned/fake"
)

Expand All @@ -35,6 +36,9 @@ func newFakeNodeUpdaterPool(nfdMaster *nfdMaster) *nodeUpdaterPool {

func TestNodeUpdaterStart(t *testing.T) {
fakeMaster := newFakeMaster(nil)
kubeconfig, _ := clientcmd.BuildConfigFromFlags(".", "")
fakeMaster.kubeconfig = kubeconfig

nodeUpdaterPool := newFakeNodeUpdaterPool(fakeMaster)

Convey("When starting the node updater pool", t, func() {
Expand All @@ -54,6 +58,9 @@ func TestNodeUpdaterStart(t *testing.T) {

func TestNodeUpdaterStop(t *testing.T) {
fakeMaster := newFakeMaster(nil)
kubeconfig, _ := clientcmd.BuildConfigFromFlags(".", "")
fakeMaster.kubeconfig = kubeconfig

nodeUpdaterPool := newFakeNodeUpdaterPool(fakeMaster)

nodeUpdaterPool.start(10)
Expand All @@ -71,7 +78,10 @@ func TestNodeUpdaterStop(t *testing.T) {

func TestRunNodeUpdater(t *testing.T) {
fakeMaster := newFakeMaster(fakek8sclient.NewSimpleClientset())
kubeconfig, _ := clientcmd.BuildConfigFromFlags(".", "")
fakeMaster.kubeconfig = kubeconfig
fakeMaster.nfdController = newFakeNfdAPIController(fakenfdclient.NewSimpleClientset())

nodeUpdaterPool := newFakeNodeUpdaterPool(fakeMaster)

nodeUpdaterPool.start(10)
Expand Down

0 comments on commit 983af1b

Please sign in to comment.