diff --git a/pkg/nfd-master/nfd-master.go b/pkg/nfd-master/nfd-master.go index 4d7c3d13c0..2d1431bd93 100644 --- a/pkg/nfd-master/nfd-master.go +++ b/pkg/nfd-master/nfd-master.go @@ -87,14 +87,15 @@ type NfdMaster interface { type nfdMaster struct { *nfdController - args Args - namespace string - nodeName string - server *grpc.Server - stop chan struct{} - ready chan bool - apihelper apihelper.APIHelpers - kubeconfig *restclient.Config + args Args + namespace string + nodeName string + server *grpc.Server + stop chan struct{} + ready chan bool + apihelper apihelper.APIHelpers + kubeconfig *restclient.Config + nrtGarbageCollector *nrtGarbageCollector } // NewNfdMaster creates a new NfdMaster server instance. @@ -152,12 +153,23 @@ func (m *nfdMaster) Run() error { if m.args.Prune { return m.prune() } + kubeconfig, err := m.getKubeconfig() + if err != nil { + return err + } + nrtGC, err := newNRTGarbageCollector(kubeconfig) + if err != nil { + return err + } + + m.nrtGarbageCollector = nrtGC + + klog.Info("starting nfd nrt garbage collector") + if err := m.nrtGarbageCollector.start(); err != nil { + return err + } if m.args.CrdController { - kubeconfig, err := m.getKubeconfig() - if err != nil { - return err - } klog.Info("starting nfd api controller") m.nfdController, err = newNfdController(kubeconfig, !m.args.EnableNodeFeatureApi) if err != nil { @@ -300,6 +312,8 @@ func (m *nfdMaster) Stop() { m.nfdController.stop() } + m.nrtGarbageCollector.stop() + select { case m.stop <- struct{}{}: default: diff --git a/pkg/nfd-master/nfd-nrt-gc.go b/pkg/nfd-master/nfd-nrt-gc.go new file mode 100644 index 0000000000..73c1a5c798 --- /dev/null +++ b/pkg/nfd-master/nfd-nrt-gc.go @@ -0,0 +1,131 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nfdmaster + +import ( + "time" + + topologyclientset "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/generated/clientset/versioned" + "golang.org/x/net/context" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + restclient "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" + + "sigs.k8s.io/node-feature-discovery/pkg/apihelper" +) + +type nrtGarbageCollector struct { + stopChan chan struct{} + k8sClient kubernetes.Interface + topoClient topologyclientset.Interface +} + +func newNRTGarbageCollector(config *restclient.Config) (*nrtGarbageCollector, error) { + helper := apihelper.K8sHelpers{Kubeconfig: config} + cli, err := helper.GetTopologyClient() + if err != nil { + return nil, err + } + + clientset := kubernetes.NewForConfigOrDie(config) + stopChan := make(chan struct{}, 1) + + return &nrtGarbageCollector{ + k8sClient: clientset, + topoClient: cli, + stopChan: stopChan, + }, nil +} + +func (n *nrtGarbageCollector) deleteNodeHandler(object interface{}) { + // handle a case when we are starting up and need to clear stale NRT resources + obj := object + if deletedFinalStateUnknown, ok := object.(cache.DeletedFinalStateUnknown); ok { + klog.V(2).Infof("found stale NodeResourceTopology for node: %s ", object) + obj = deletedFinalStateUnknown.Obj + } + + node, ok := obj.(*corev1.Node) + if !ok { + klog.Errorf("cannot convert %v to v1.Node", object) + return + } + klog.Infof("deleting NodeResourceTopology for node: %s", node.GetName()) + if err := n.topoClient.TopologyV1alpha1().NodeResourceTopologies().Delete(context.TODO(), node.GetName(), metav1.DeleteOptions{}); err != nil { + if errors.IsNotFound(err) { + klog.V(2).Infof("NodeResourceTopology %s not found, ommiting deletion", object) + return + } else { + klog.Warningf("failed to delete NodeResourceTopology for node %s: %s", node.GetName(), err.Error()) + return + } + } +} + +// populateNodeIndexer populates cache with NRTs so we know which one to delete on first synchronization +func (n *nrtGarbageCollector) populateNodeIndexer(indexer cache.Indexer) error { + nrts, err := n.topoClient.TopologyV1alpha1().NodeResourceTopologies().List(context.TODO(), metav1.ListOptions{}) + if err != nil { + return err + } + + for _, nrt := range nrts.Items { + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: nrt.GetName(), + }, + } + klog.V(2).Infof("adding node %s to indexer", node.GetName()) + err := indexer.Add(node) + if err != nil { + return err + } + + } + + return nil +} + +func (n *nrtGarbageCollector) start() error { + factory := informers.NewSharedInformerFactory(n.k8sClient, 5*time.Minute) + nodeInformer := factory.Core().V1().Nodes().Informer() + + nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + DeleteFunc: n.deleteNodeHandler, + }) + + if err := n.populateNodeIndexer(nodeInformer.GetIndexer()); err != nil { + return err + } + + factory.Start(n.stopChan) + factory.WaitForCacheSync(n.stopChan) + + return nil +} + +func (n *nrtGarbageCollector) stop() { + select { + case n.stopChan <- struct{}{}: + default: + } +} diff --git a/pkg/nfd-master/nfd-nrt-gc_test.go b/pkg/nfd-master/nfd-nrt-gc_test.go new file mode 100644 index 0000000000..39c3ca7f61 --- /dev/null +++ b/pkg/nfd-master/nfd-nrt-gc_test.go @@ -0,0 +1,154 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nfdmaster + +import ( + "testing" + "time" + + nrtapi "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha1" + faketopologyv1alpha1 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/generated/clientset/versioned/fake" + "golang.org/x/net/context" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + fakek8sclientset "k8s.io/client-go/kubernetes/fake" + + . "github.com/smartystreets/goconvey/convey" +) + +func TestInformer(t *testing.T) { + Convey("When theres is old NRT ", t, func() { + k8sClient := fakek8sclientset.NewSimpleClientset() + + fakeClient := faketopologyv1alpha1.NewSimpleClientset(&nrtapi.NodeResourceTopology{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + }, + }) + + stopChan := make(chan struct{}, 1) + + nrtGC := &nrtGarbageCollector{ + k8sClient: k8sClient, + topoClient: fakeClient, + stopChan: stopChan, + } + + err := nrtGC.start() + So(err, ShouldBeNil) + + nrts, err := fakeClient.TopologyV1alpha1().NodeResourceTopologies().List(context.TODO(), metav1.ListOptions{}) + So(err, ShouldBeNil) + So(nrts.Items, ShouldHaveLength, 0) + }) + Convey("When theres is one old NRT and one up to date", t, func() { + k8sClient := fakek8sclientset.NewSimpleClientset(&corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + }, + }) + + fakeClient := faketopologyv1alpha1.NewSimpleClientset(&nrtapi.NodeResourceTopology{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + }, + }, + &nrtapi.NodeResourceTopology{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node2", + }, + }, + ) + + stopChan := make(chan struct{}, 1) + + nrtGC := &nrtGarbageCollector{ + k8sClient: k8sClient, + topoClient: fakeClient, + stopChan: stopChan, + } + + err := nrtGC.start() + So(err, ShouldBeNil) + + nrts, err := fakeClient.TopologyV1alpha1().NodeResourceTopologies().List(context.TODO(), metav1.ListOptions{}) + So(err, ShouldBeNil) + So(nrts.Items, ShouldHaveLength, 1) + So(nrts.Items[0].GetName(), ShouldEqual, "node1") + + }) + Convey("Should react to delete event", t, func() { + k8sClient := fakek8sclientset.NewSimpleClientset( + &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + }, + }, + &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node2", + }, + }, + ) + + fakeClient := faketopologyv1alpha1.NewSimpleClientset( + &nrtapi.NodeResourceTopology{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + }, + }, + &nrtapi.NodeResourceTopology{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node2", + }, + }, + ) + + stopChan := make(chan struct{}, 1) + + nrtGC := &nrtGarbageCollector{ + k8sClient: k8sClient, + topoClient: fakeClient, + stopChan: stopChan, + } + + err := nrtGC.start() + So(err, ShouldBeNil) + + nrts, err := fakeClient.TopologyV1alpha1().NodeResourceTopologies().List(context.TODO(), metav1.ListOptions{}) + So(err, ShouldBeNil) + + So(nrts.Items, ShouldHaveLength, 2) + + err = k8sClient.CoreV1().Nodes().Delete(context.TODO(), "node1", metav1.DeleteOptions{}) + So(err, ShouldBeNil) + // simple sleep with retry loop to make sure indexer will pick up event and trigger deleteNode Function + deleted := false + for i := 0; i < 5; i++ { + nrts, err := fakeClient.TopologyV1alpha1().NodeResourceTopologies().List(context.TODO(), metav1.ListOptions{}) + So(err, ShouldBeNil) + + if len(nrts.Items) == 1 { + deleted = true + break + } + time.Sleep(time.Second) + } + So(deleted, ShouldBeTrue) + }) + +}