Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/kube/proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ func (t *TLSServer) Shutdown(ctx context.Context) error {
func (t *TLSServer) close(ctx context.Context) error {
var errs []error
for _, kubeCluster := range t.fwd.kubeClusters() {
errs = append(errs, t.unregisterKubeCluster(ctx, kubeCluster))
errs = append(errs, t.unregisterKubeCluster(ctx, kubeCluster, true))
}
errs = append(errs, t.fwd.Close(), t.Server.Close())

Expand Down
16 changes: 7 additions & 9 deletions lib/kube/proxy/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (s *TLSServer) onUpdate(ctx context.Context, cluster, _ types.KubeCluster)
}

func (s *TLSServer) onDelete(ctx context.Context, cluster types.KubeCluster) error {
return s.unregisterKubeCluster(ctx, cluster)
return s.unregisterKubeCluster(ctx, cluster, false)
}

func (s *TLSServer) matcher(cluster types.KubeCluster) bool {
Expand Down Expand Up @@ -213,7 +213,7 @@ func (s *TLSServer) updateKubeCluster(ctx context.Context, cluster types.KubeClu
// unregisterKubeCluster unregisters the proxied Kube Cluster from the agent.
// This function is called when the dynamic cluster is deleted/no longer match
// the agent's resource matcher or when the agent is shutting down.
func (s *TLSServer) unregisterKubeCluster(ctx context.Context, cluster types.KubeCluster) error {
func (s *TLSServer) unregisterKubeCluster(ctx context.Context, cluster types.KubeCluster, isShutdown bool) error {
var errs []error

if err := s.stopHeartbeatAndHealthCheck(cluster); err != nil {
Expand All @@ -222,22 +222,20 @@ func (s *TLSServer) unregisterKubeCluster(ctx context.Context, cluster types.Kub
clusterName := cluster.GetName()
s.fwd.removeKubeDetails(clusterName)

shouldDeleteCluster := services.ShouldDeleteServerHeartbeatsOnShutdown(ctx)
// A child process can be forked to upgrade the Teleport binary. The child
// will take over the heartbeats so do NOT delete them in that case.
shouldDeleteOnShutdown := services.ShouldDeleteServerHeartbeatsOnShutdown(ctx)
sender, ok := s.TLSServerConfig.InventoryHandle.GetSender()
if ok {
// Manual deletion per cluster is only required if the auth server
// doesn't support actively cleaning up database resources when the
// inventory control stream is terminated during shutdown.
if capabilities := sender.Hello().Capabilities; capabilities != nil {
shouldDeleteCluster = shouldDeleteCluster && !capabilities.KubernetesCleanup
shouldDeleteOnShutdown = shouldDeleteOnShutdown && !capabilities.KubernetesCleanup
}
}

// A child process can be forked to upgrade the Teleport binary. The child
// will take over the heartbeats so do NOT delete them in that case.
// When unregistering a dynamic cluster, the context is empty and the
// decision will be to delete the kubernetes server.
if shouldDeleteCluster {
if !isShutdown || shouldDeleteOnShutdown {
if err := s.deleteKubernetesServer(ctx, clusterName); err != nil {
errs = append(errs, err)
}
Expand Down
37 changes: 37 additions & 0 deletions lib/kube/proxy/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package proxy

import (
"context"
"fmt"
"maps"
"sort"
"strings"
Expand All @@ -33,10 +34,32 @@ import (
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"

"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/lib/auth/authclient"
testingkubemock "github.com/gravitational/teleport/lib/kube/proxy/testing/kube_server"
"github.com/gravitational/teleport/lib/services"
)

type mockAuthClient struct {
authclient.ClientI
deleteCh chan string
}

func newMockAuthClient(client authclient.ClientI) *mockAuthClient {
return &mockAuthClient{
ClientI: client,
deleteCh: make(chan string, 1),
}
}

func (m *mockAuthClient) DeleteKubernetesServer(ctx context.Context, hostID, name string) error {
select {
case m.deleteCh <- name:
case <-time.After(5 * time.Second):
return fmt.Errorf("failed to signal kube server deletion")
}
return m.ClientI.DeleteKubernetesServer(ctx, hostID, name)
}

// TestWatcher verifies that kubernetes agent properly detects and applies
// changes to kube_cluster resources.
func TestWatcher(t *testing.T) {
Expand Down Expand Up @@ -64,6 +87,8 @@ func TestWatcher(t *testing.T) {
}
},
})
authClient := newMockAuthClient(testCtx.KubeServer.AuthClient)
testCtx.KubeServer.AuthClient = authClient

require.Len(t, testCtx.KubeServer.fwd.kubeClusters(), 1)
kube0 := testCtx.KubeServer.fwd.kubeClusters()[0]
Expand Down Expand Up @@ -180,6 +205,12 @@ func TestWatcher(t *testing.T) {
case <-time.After(time.Second):
t.Fatal("Didn't receive reconcile event after 1s.")
}
select {
case deleted := <-authClient.deleteCh:
require.Equal(t, kube1.GetName(), deleted)
case <-time.After(time.Second):
t.Fatal("Kube server wasn't deleted after 1s.")
}

// Remove kube2.
err = testCtx.AuthServer.DeleteKubernetesCluster(ctx, kube2.GetName())
Expand All @@ -194,6 +225,12 @@ func TestWatcher(t *testing.T) {
case <-time.After(time.Second):
t.Fatal("Didn't receive reconcile event after 1s.")
}
select {
case deleted := <-authClient.deleteCh:
require.Equal(t, kube2.GetName(), deleted)
case <-time.After(time.Second):
t.Fatal("Kube server wasn't deleted after 1s.")
}
}

func makeDynamicKubeCluster(t *testing.T, name, url string, labels map[string]string) (*types.KubernetesClusterV3, error) {
Expand Down
Loading