Skip to content
This repository was archived by the owner on Oct 6, 2025. It is now read-only.

Commit 093236d

Browse files
committed
fix: handle deleted namespaces gracefully during sync
Implement automatic detection and removal of deleted namespaces to prevent infinite sync failure loops when namespaces are deleted without removing the managed-by label first.
1 parent 9068f90 commit 093236d

File tree

2 files changed

+215
-13
lines changed

2 files changed

+215
-13
lines changed

pkg/cache/cluster.go

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,24 @@ type clusterCache struct {
247247
respectRBAC int
248248
}
249249

250+
func (c *clusterCache) namespaceExists(ctx context.Context, nsClient dynamic.NamespaceableResourceInterface, namespace string, deletedNamespaces *sync.Map) bool {
251+
if namespace == "" {
252+
return true // Cluster-wide operations don't need namespace validation
253+
}
254+
255+
_, err := nsClient.Get(ctx, namespace, metav1.GetOptions{})
256+
if err != nil {
257+
if apierrors.IsNotFound(err) {
258+
c.log.Info(fmt.Sprintf("Namespace '%s' no longer exists, marking for removal", namespace))
259+
deletedNamespaces.Store(namespace, true)
260+
return false
261+
}
262+
c.log.V(1).Info(fmt.Sprintf("Failed to get namespace '%s' existence: %v", namespace, err))
263+
}
264+
265+
return true
266+
}
267+
250268
type clusterCacheSync struct {
251269
// When using this struct:
252270
// 1) 'lock' mutex should be acquired when reading/writing from fields of this struct.
@@ -533,14 +551,18 @@ func (c *clusterCache) startMissingWatches() error {
533551
return fmt.Errorf("failed to create clientset: %w", err)
534552
}
535553
namespacedResources := make(map[schema.GroupKind]bool)
554+
555+
// For watch startup, we don't update namespaces list, so use empty map
556+
deletedNamespaces := &sync.Map{}
557+
536558
for i := range apis {
537559
api := apis[i]
538560
namespacedResources[api.GroupKind] = api.Meta.Namespaced
539561
if _, ok := c.apisMeta[api.GroupKind]; !ok {
540562
ctx, cancel := context.WithCancel(context.Background())
541563
c.apisMeta[api.GroupKind] = &apiMeta{namespaced: api.Meta.Namespaced, watchCancel: cancel}
542564

543-
err := c.processApi(client, api, func(resClient dynamic.ResourceInterface, ns string) error {
565+
err := c.processApi(client, api, deletedNamespaces, func(resClient dynamic.ResourceInterface, ns string) error {
544566
resourceVersion, err := c.loadInitialState(ctx, api, resClient, ns, false) // don't lock here, we are already in a lock before startMissingWatches is called inside watchEvents
545567
if err != nil && c.isRestrictedResource(err) {
546568
keep := false
@@ -786,15 +808,26 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo
786808
// processApi processes all the resources for a given API. First we construct an API client for the given API. Then we
787809
// call the callback. If we're managing the whole cluster, we call the callback with the client and an empty namespace.
788810
// If we're managing specific namespaces, we call the callback for each namespace.
789-
func (c *clusterCache) processApi(client dynamic.Interface, api kube.APIResourceInfo, callback func(resClient dynamic.ResourceInterface, ns string) error) error {
811+
func (c *clusterCache) processApi(client dynamic.Interface, api kube.APIResourceInfo, deletedNamespaces *sync.Map, callback func(resClient dynamic.ResourceInterface, ns string) error) error {
790812
resClient := client.Resource(api.GroupVersionResource)
791813
switch {
792814
// if manage whole cluster or resource is cluster level and cluster resources enabled
793815
case len(c.namespaces) == 0 || (!api.Meta.Namespaced && c.clusterResources):
794816
return callback(resClient, "")
795817
// if manage some namespaces and resource is namespaced
796818
case len(c.namespaces) != 0 && api.Meta.Namespaced:
819+
nsClient := client.Resource(schema.GroupVersionResource{
820+
Group: "",
821+
Version: "v1",
822+
Resource: "namespaces",
823+
})
824+
797825
for _, ns := range c.namespaces {
826+
if !c.namespaceExists(context.Background(), nsClient, ns, deletedNamespaces) {
827+
// Namespace was deleted, skip it (deletedNamespaces map tracks it for later cleanup)
828+
continue
829+
}
830+
798831
err := callback(resClient.Namespace(ns), ns)
799832
if err != nil {
800833
return err
@@ -882,6 +915,7 @@ func (c *clusterCache) sync() error {
882915
c.resources = make(map[kube.ResourceKey]*Resource)
883916
c.namespacedResources = make(map[schema.GroupKind]bool)
884917
config := c.config
918+
885919
version, err := c.kubectl.GetServerVersion(config)
886920
if err != nil {
887921
return fmt.Errorf("failed to get server version: %w", err)
@@ -921,6 +955,9 @@ func (c *clusterCache) sync() error {
921955
go c.processEvents()
922956
}
923957

958+
// Track deleted namespaces found during parallel processing
959+
deletedNamespaces := &sync.Map{}
960+
924961
// Each API is processed in parallel, so we need to take out a lock when we update clusterCache fields.
925962
lock := sync.Mutex{}
926963
err = kube.RunAllAsync(len(apis), func(i int) error {
@@ -933,7 +970,7 @@ func (c *clusterCache) sync() error {
933970
c.namespacedResources[api.GroupKind] = api.Meta.Namespaced
934971
lock.Unlock()
935972

936-
return c.processApi(client, api, func(resClient dynamic.ResourceInterface, ns string) error {
973+
return c.processApi(client, api, deletedNamespaces, func(resClient dynamic.ResourceInterface, ns string) error {
937974
resourceVersion, err := c.listResources(ctx, resClient, func(listPager *pager.ListPager) error {
938975
return listPager.EachListItem(context.Background(), metav1.ListOptions{}, func(obj runtime.Object) error {
939976
if un, ok := obj.(*unstructured.Unstructured); !ok {
@@ -979,6 +1016,17 @@ func (c *clusterCache) sync() error {
9791016
return fmt.Errorf("failed to sync cluster %s: %w", c.config.Host, err)
9801017
}
9811018

1019+
// After parallel processing completes, update namespace list by removing deleted ones
1020+
var validNamespaces []string
1021+
for _, ns := range c.namespaces {
1022+
if _, deleted := deletedNamespaces.Load(ns); deleted {
1023+
c.log.Info(fmt.Sprintf("Namespace '%s' no longer exists, removing from cluster cache", ns))
1024+
continue
1025+
}
1026+
validNamespaces = append(validNamespaces, ns)
1027+
}
1028+
c.namespaces = validNamespaces
1029+
9821030
c.log.Info("Cluster successfully synced")
9831031
return nil
9841032
}

pkg/cache/cluster_test.go

Lines changed: 164 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
appsv1 "k8s.io/api/apps/v1"
2020
corev1 "k8s.io/api/core/v1"
2121
extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
22+
apierrors "k8s.io/apimachinery/pkg/api/errors"
2223
"k8s.io/apimachinery/pkg/api/meta"
2324
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2425
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
@@ -245,6 +246,16 @@ func TestStatefulSetOwnershipInferred(t *testing.T) {
245246
},
246247
}
247248

249+
defaultNs := &corev1.Namespace{
250+
TypeMeta: metav1.TypeMeta{
251+
APIVersion: "v1",
252+
Kind: "Namespace",
253+
},
254+
ObjectMeta: metav1.ObjectMeta{
255+
Name: "default",
256+
},
257+
}
258+
248259
tests := []struct {
249260
name string
250261
cluster *clusterCache
@@ -254,7 +265,7 @@ func TestStatefulSetOwnershipInferred(t *testing.T) {
254265
}{
255266
{
256267
name: "STSTemplateNameNotMatching",
257-
cluster: newCluster(t, sts),
268+
cluster: newCluster(t, sts, defaultNs),
258269
pvc: &corev1.PersistentVolumeClaim{
259270
TypeMeta: metav1.TypeMeta{Kind: kube.PersistentVolumeClaimKind},
260271
ObjectMeta: metav1.ObjectMeta{Name: "www1-web-0", Namespace: "default"},
@@ -263,7 +274,7 @@ func TestStatefulSetOwnershipInferred(t *testing.T) {
263274
},
264275
{
265276
name: "MatchingSTSExists",
266-
cluster: newCluster(t, sts),
277+
cluster: newCluster(t, sts, defaultNs),
267278
pvc: &corev1.PersistentVolumeClaim{
268279
TypeMeta: metav1.TypeMeta{Kind: kube.PersistentVolumeClaimKind},
269280
ObjectMeta: metav1.ObjectMeta{Name: "www-web-0", Namespace: "default"},
@@ -272,7 +283,7 @@ func TestStatefulSetOwnershipInferred(t *testing.T) {
272283
},
273284
{
274285
name: "STSTemplateNameNotMatchingWithBatchProcessing",
275-
cluster: newClusterWithOptions(t, opts, sts),
286+
cluster: newClusterWithOptions(t, opts, sts, defaultNs),
276287
pvc: &corev1.PersistentVolumeClaim{
277288
TypeMeta: metav1.TypeMeta{Kind: kube.PersistentVolumeClaimKind},
278289
ObjectMeta: metav1.ObjectMeta{Name: "www1-web-0", Namespace: "default"},
@@ -281,7 +292,7 @@ func TestStatefulSetOwnershipInferred(t *testing.T) {
281292
},
282293
{
283294
name: "MatchingSTSExistsWithBatchProcessing",
284-
cluster: newClusterWithOptions(t, opts, sts),
295+
cluster: newClusterWithOptions(t, opts, sts, defaultNs),
285296
pvc: &corev1.PersistentVolumeClaim{
286297
TypeMeta: metav1.TypeMeta{Kind: kube.PersistentVolumeClaimKind},
287298
ObjectMeta: metav1.ObjectMeta{Name: "www-web-0", Namespace: "default"},
@@ -334,7 +345,26 @@ func TestEnsureSyncedSingleNamespace(t *testing.T) {
334345
},
335346
}
336347

337-
cluster := newCluster(t, obj1, obj2)
348+
ns1 := &corev1.Namespace{
349+
TypeMeta: metav1.TypeMeta{
350+
APIVersion: "v1",
351+
Kind: "Namespace",
352+
},
353+
ObjectMeta: metav1.ObjectMeta{
354+
Name: "default1",
355+
},
356+
}
357+
ns2 := &corev1.Namespace{
358+
TypeMeta: metav1.TypeMeta{
359+
APIVersion: "v1",
360+
Kind: "Namespace",
361+
},
362+
ObjectMeta: metav1.ObjectMeta{
363+
Name: "default2",
364+
},
365+
}
366+
367+
cluster := newCluster(t, obj1, obj2, ns1, ns2)
338368
cluster.namespaces = []string{"default1"}
339369
err := cluster.EnsureSynced()
340370
require.NoError(t, err)
@@ -420,7 +450,26 @@ metadata:
420450
}
421451

422452
func TestGetManagedLiveObjsNamespacedModeClusterLevelResource(t *testing.T) {
423-
cluster := newCluster(t, testPod1(), testRS(), testDeploy())
453+
defaultNs := &corev1.Namespace{
454+
TypeMeta: metav1.TypeMeta{
455+
APIVersion: "v1",
456+
Kind: "Namespace",
457+
},
458+
ObjectMeta: metav1.ObjectMeta{
459+
Name: "default",
460+
},
461+
}
462+
productionNs := &corev1.Namespace{
463+
TypeMeta: metav1.TypeMeta{
464+
APIVersion: "v1",
465+
Kind: "Namespace",
466+
},
467+
ObjectMeta: metav1.ObjectMeta{
468+
Name: "production",
469+
},
470+
}
471+
472+
cluster := newCluster(t, testPod1(), testRS(), testDeploy(), defaultNs, productionNs)
424473
cluster.Invalidate(SetPopulateResourceInfoHandler(func(_ *unstructured.Unstructured, _ bool) (info any, cacheManifest bool) {
425474
return nil, true
426475
}))
@@ -445,7 +494,26 @@ metadata:
445494
}
446495

447496
func TestGetManagedLiveObjsNamespacedModeClusterLevelResource_ClusterResourceEnabled(t *testing.T) {
448-
cluster := newCluster(t, testPod1(), testRS(), testDeploy())
497+
defaultNs := &corev1.Namespace{
498+
TypeMeta: metav1.TypeMeta{
499+
APIVersion: "v1",
500+
Kind: "Namespace",
501+
},
502+
ObjectMeta: metav1.ObjectMeta{
503+
Name: "default",
504+
},
505+
}
506+
productionNs := &corev1.Namespace{
507+
TypeMeta: metav1.TypeMeta{
508+
APIVersion: "v1",
509+
Kind: "Namespace",
510+
},
511+
ObjectMeta: metav1.ObjectMeta{
512+
Name: "production",
513+
},
514+
}
515+
516+
cluster := newCluster(t, testPod1(), testRS(), testDeploy(), defaultNs, productionNs)
449517
cluster.Invalidate(SetPopulateResourceInfoHandler(func(_ *unstructured.Unstructured, _ bool) (info any, cacheManifest bool) {
450518
return nil, true
451519
}))
@@ -514,7 +582,26 @@ metadata:
514582
}
515583

516584
func TestGetManagedLiveObjsValidNamespace(t *testing.T) {
517-
cluster := newCluster(t, testPod1(), testRS(), testDeploy())
585+
defaultNs := &corev1.Namespace{
586+
TypeMeta: metav1.TypeMeta{
587+
APIVersion: "v1",
588+
Kind: "Namespace",
589+
},
590+
ObjectMeta: metav1.ObjectMeta{
591+
Name: "default",
592+
},
593+
}
594+
productionNs := &corev1.Namespace{
595+
TypeMeta: metav1.TypeMeta{
596+
APIVersion: "v1",
597+
Kind: "Namespace",
598+
},
599+
ObjectMeta: metav1.ObjectMeta{
600+
Name: "production",
601+
},
602+
}
603+
604+
cluster := newCluster(t, testPod1(), testRS(), testDeploy(), defaultNs, productionNs)
518605
cluster.Invalidate(SetPopulateResourceInfoHandler(func(_ *unstructured.Unstructured, _ bool) (info any, cacheManifest bool) {
519606
return nil, true
520607
}))
@@ -542,7 +629,26 @@ metadata:
542629
}
543630

544631
func TestGetManagedLiveObjsInvalidNamespace(t *testing.T) {
545-
cluster := newCluster(t, testPod1(), testRS(), testDeploy())
632+
defaultNs := &corev1.Namespace{
633+
TypeMeta: metav1.TypeMeta{
634+
APIVersion: "v1",
635+
Kind: "Namespace",
636+
},
637+
ObjectMeta: metav1.ObjectMeta{
638+
Name: "default",
639+
},
640+
}
641+
developNs := &corev1.Namespace{
642+
TypeMeta: metav1.TypeMeta{
643+
APIVersion: "v1",
644+
Kind: "Namespace",
645+
},
646+
ObjectMeta: metav1.ObjectMeta{
647+
Name: "develop",
648+
},
649+
}
650+
651+
cluster := newCluster(t, testPod1(), testRS(), testDeploy(), defaultNs, developNs)
546652
cluster.Invalidate(SetPopulateResourceInfoHandler(func(_ *unstructured.Unstructured, _ bool) (info any, cacheManifest bool) {
547653
return nil, true
548654
}))
@@ -595,7 +701,18 @@ func TestGetManagedLiveObjsFailedConversion(t *testing.T) {
595701
t.Run(testCaseCopy.name, func(t *testing.T) {
596702
err := apiextensions.AddToScheme(scheme.Scheme)
597703
require.NoError(t, err)
598-
cluster := newCluster(t, testCRD(), testCronTab()).
704+
705+
defaultNs := &corev1.Namespace{
706+
TypeMeta: metav1.TypeMeta{
707+
APIVersion: "v1",
708+
Kind: "Namespace",
709+
},
710+
ObjectMeta: metav1.ObjectMeta{
711+
Name: "default",
712+
},
713+
}
714+
715+
cluster := newCluster(t, testCRD(), testCronTab(), defaultNs).
599716
WithAPIResources([]kube.APIResourceInfo{
600717
{
601718
GroupKind: schema.GroupKind{Group: cronTabGroup, Kind: "CronTab"},
@@ -1292,3 +1409,40 @@ func BenchmarkIterateHierarchyV2(b *testing.B) {
12921409
})
12931410
}
12941411
}
1412+
1413+
func TestSyncWithDeletedNamespace(t *testing.T) {
1414+
deletedNamespace := "deleted-namespace"
1415+
validNamespace := "default"
1416+
pod := testPod1()
1417+
pod.SetNamespace(validNamespace)
1418+
validNs := &corev1.Namespace{
1419+
TypeMeta: metav1.TypeMeta{
1420+
APIVersion: "v1",
1421+
Kind: "Namespace",
1422+
},
1423+
ObjectMeta: metav1.ObjectMeta{
1424+
Name: validNamespace,
1425+
},
1426+
}
1427+
cluster := newCluster(t, pod, validNs)
1428+
cluster.namespaces = []string{validNamespace, deletedNamespace}
1429+
client := cluster.kubectl.(*kubetest.MockKubectlCmd).DynamicClient.(*fake.FakeDynamicClient)
1430+
1431+
// Return "not found" error when getting the deleted namespace during validation
1432+
client.PrependReactor("get", "namespaces", func(action testcore.Action) (handled bool, ret runtime.Object, err error) {
1433+
getAction := action.(testcore.GetAction)
1434+
if getAction.GetName() == deletedNamespace {
1435+
// Simulate namespace not found (deleted)
1436+
return true, nil, apierrors.NewNotFound(
1437+
schema.GroupResource{Group: "", Resource: "namespaces"},
1438+
deletedNamespace)
1439+
}
1440+
return false, nil, nil
1441+
})
1442+
1443+
err := cluster.sync()
1444+
1445+
assert.NoError(t, err)
1446+
assert.NotContains(t, cluster.namespaces, deletedNamespace)
1447+
assert.Contains(t, cluster.namespaces, validNamespace)
1448+
}

0 commit comments

Comments
 (0)