From 870333fb5071c73818ca9bcf7a19d8d3c3d855c5 Mon Sep 17 00:00:00 2001 From: Andrew Burke Date: Thu, 20 Nov 2025 16:01:04 -0600 Subject: [PATCH] Fix kube servers not deleting on cluster delete --- lib/kube/proxy/server.go | 2 +- lib/kube/proxy/watcher.go | 16 +++++++-------- lib/kube/proxy/watcher_test.go | 37 ++++++++++++++++++++++++++++++++++ 3 files changed, 45 insertions(+), 10 deletions(-) diff --git a/lib/kube/proxy/server.go b/lib/kube/proxy/server.go index 22e3b3086cd60..9d29af67c9c06 100644 --- a/lib/kube/proxy/server.go +++ b/lib/kube/proxy/server.go @@ -449,7 +449,7 @@ func (t *TLSServer) Shutdown(ctx context.Context) error { func (t *TLSServer) close(ctx context.Context) error { var errs []error for _, kubeCluster := range t.fwd.kubeClusters() { - errs = append(errs, t.unregisterKubeCluster(ctx, kubeCluster)) + errs = append(errs, t.unregisterKubeCluster(ctx, kubeCluster, true)) } errs = append(errs, t.fwd.Close(), t.Server.Close()) diff --git a/lib/kube/proxy/watcher.go b/lib/kube/proxy/watcher.go index 204509f39941b..368d0860671c4 100644 --- a/lib/kube/proxy/watcher.go +++ b/lib/kube/proxy/watcher.go @@ -139,7 +139,7 @@ func (s *TLSServer) onUpdate(ctx context.Context, cluster, _ types.KubeCluster) } func (s *TLSServer) onDelete(ctx context.Context, cluster types.KubeCluster) error { - return s.unregisterKubeCluster(ctx, cluster) + return s.unregisterKubeCluster(ctx, cluster, false) } func (s *TLSServer) matcher(cluster types.KubeCluster) bool { @@ -213,7 +213,7 @@ func (s *TLSServer) updateKubeCluster(ctx context.Context, cluster types.KubeClu // unregisterKubeCluster unregisters the proxied Kube Cluster from the agent. // This function is called when the dynamic cluster is deleted/no longer match // the agent's resource matcher or when the agent is shutting down. -func (s *TLSServer) unregisterKubeCluster(ctx context.Context, cluster types.KubeCluster) error { +func (s *TLSServer) unregisterKubeCluster(ctx context.Context, cluster types.KubeCluster, isShutdown bool) error { var errs []error if err := s.stopHeartbeatAndHealthCheck(cluster); err != nil { @@ -222,22 +222,20 @@ func (s *TLSServer) unregisterKubeCluster(ctx context.Context, cluster types.Kub clusterName := cluster.GetName() s.fwd.removeKubeDetails(clusterName) - shouldDeleteCluster := services.ShouldDeleteServerHeartbeatsOnShutdown(ctx) + // A child process can be forked to upgrade the Teleport binary. The child + // will take over the heartbeats so do NOT delete them in that case. + shouldDeleteOnShutdown := services.ShouldDeleteServerHeartbeatsOnShutdown(ctx) sender, ok := s.TLSServerConfig.InventoryHandle.GetSender() if ok { // Manual deletion per cluster is only required if the auth server // doesn't support actively cleaning up database resources when the // inventory control stream is terminated during shutdown. if capabilities := sender.Hello().Capabilities; capabilities != nil { - shouldDeleteCluster = shouldDeleteCluster && !capabilities.KubernetesCleanup + shouldDeleteOnShutdown = shouldDeleteOnShutdown && !capabilities.KubernetesCleanup } } - // A child process can be forked to upgrade the Teleport binary. The child - // will take over the heartbeats so do NOT delete them in that case. - // When unregistering a dynamic cluster, the context is empty and the - // decision will be to delete the kubernetes server. - if shouldDeleteCluster { + if !isShutdown || shouldDeleteOnShutdown { if err := s.deleteKubernetesServer(ctx, clusterName); err != nil { errs = append(errs, err) } diff --git a/lib/kube/proxy/watcher_test.go b/lib/kube/proxy/watcher_test.go index f666f7906d879..93c68e69ce9a5 100644 --- a/lib/kube/proxy/watcher_test.go +++ b/lib/kube/proxy/watcher_test.go @@ -20,6 +20,7 @@ package proxy import ( "context" + "fmt" "maps" "sort" "strings" @@ -33,10 +34,32 @@ import ( clientcmdapi "k8s.io/client-go/tools/clientcmd/api" "github.com/gravitational/teleport/api/types" + "github.com/gravitational/teleport/lib/auth/authclient" testingkubemock "github.com/gravitational/teleport/lib/kube/proxy/testing/kube_server" "github.com/gravitational/teleport/lib/services" ) +type mockAuthClient struct { + authclient.ClientI + deleteCh chan string +} + +func newMockAuthClient(client authclient.ClientI) *mockAuthClient { + return &mockAuthClient{ + ClientI: client, + deleteCh: make(chan string, 1), + } +} + +func (m *mockAuthClient) DeleteKubernetesServer(ctx context.Context, hostID, name string) error { + select { + case m.deleteCh <- name: + case <-time.After(5 * time.Second): + return fmt.Errorf("failed to signal kube server deletion") + } + return m.ClientI.DeleteKubernetesServer(ctx, hostID, name) +} + // TestWatcher verifies that kubernetes agent properly detects and applies // changes to kube_cluster resources. func TestWatcher(t *testing.T) { @@ -64,6 +87,8 @@ func TestWatcher(t *testing.T) { } }, }) + authClient := newMockAuthClient(testCtx.KubeServer.AuthClient) + testCtx.KubeServer.AuthClient = authClient require.Len(t, testCtx.KubeServer.fwd.kubeClusters(), 1) kube0 := testCtx.KubeServer.fwd.kubeClusters()[0] @@ -180,6 +205,12 @@ func TestWatcher(t *testing.T) { case <-time.After(time.Second): t.Fatal("Didn't receive reconcile event after 1s.") } + select { + case deleted := <-authClient.deleteCh: + require.Equal(t, kube1.GetName(), deleted) + case <-time.After(time.Second): + t.Fatal("Kube server wasn't deleted after 1s.") + } // Remove kube2. err = testCtx.AuthServer.DeleteKubernetesCluster(ctx, kube2.GetName()) @@ -194,6 +225,12 @@ func TestWatcher(t *testing.T) { case <-time.After(time.Second): t.Fatal("Didn't receive reconcile event after 1s.") } + select { + case deleted := <-authClient.deleteCh: + require.Equal(t, kube2.GetName(), deleted) + case <-time.After(time.Second): + t.Fatal("Kube server wasn't deleted after 1s.") + } } func makeDynamicKubeCluster(t *testing.T, name, url string, labels map[string]string) (*types.KubernetesClusterV3, error) {