diff --git a/lib/srv/discovery/discovery.go b/lib/srv/discovery/discovery.go index 674aee3a7bceb..d1070c04272fb 100644 --- a/lib/srv/discovery/discovery.go +++ b/lib/srv/discovery/discovery.go @@ -629,6 +629,7 @@ func (s *Server) initAWSWatchers(matchers []types.AWSMatcher) error { } lr, err := newLabelReconciler(&labelReconcilerConfig{ + clock: s.clock, log: s.Log, accessPoint: s.AccessPoint, }) @@ -1963,6 +1964,7 @@ func (s *Server) initTeleportNodeWatcher() (err error) { Logger: s.Log, Client: s.AccessPoint, MaxStaleness: time.Minute, + Clock: s.clock, }, NodesGetter: s.AccessPoint, }) diff --git a/lib/srv/discovery/discovery_eks_test.go b/lib/srv/discovery/discovery_eks_test.go index c2d94945a114c..eb2f83c60db7b 100644 --- a/lib/srv/discovery/discovery_eks_test.go +++ b/lib/srv/discovery/discovery_eks_test.go @@ -23,6 +23,7 @@ import ( "iter" "maps" "slices" + "sync" "testing" "time" @@ -315,8 +316,42 @@ type mockAuthServer struct { enrollEKSClusters func(context.Context, *integrationpb.EnrollEKSClustersRequest, ...grpc.CallOption) (*integrationpb.EnrollEKSClustersResponse, error) } +type mockWatcher struct { + done chan struct{} + events chan types.Event + doneOnce sync.Once +} + +func (w *mockWatcher) Events() <-chan types.Event { + return w.events +} + +func (w *mockWatcher) Close() error { + w.doneOnce.Do(func() { + close(w.done) + close(w.events) + }) + return nil +} + +func (w *mockWatcher) Error() error { + return nil +} + +func (w *mockWatcher) Done() <-chan struct{} { + return w.done +} + func (m *mockAuthServer) NewWatcher(ctx context.Context, watch types.Watch) (types.Watcher, error) { - return m.events.NewWatcher(ctx, watch) + w := &mockWatcher{ + done: make(chan struct{}), + events: make(chan types.Event, 1), + } + + w.events <- types.Event{ + Type: types.OpInit, + } + return w, nil } func (m *mockAuthServer) Ping(context.Context) (proto.PingResponse, error) { @@ -374,6 +409,22 @@ func (m *mockAuthServer) RangeDatabases(ctx context.Context, start, end string) return stream.Empty[types.Database]() } +func (m *mockAuthServer) CreateApp(ctx context.Context, _ types.Application) error { + return nil +} + +func (m *mockAuthServer) GetApps(ctx context.Context) ([]types.Application, error) { + return nil, nil +} + +func (m *mockAuthServer) ListApps(ctx context.Context, limit int, startKey string) ([]types.Application, string, error) { + return nil, "", nil +} + +func (m *mockAuthServer) RangeApps(ctx context.Context, start, end string) iter.Seq2[types.Application, error] { + return func(yield func(types.Application, error) bool) {} +} + func (m *mockAuthServer) GetNodes(ctx context.Context, namespace string) ([]types.Server, error) { return nil, nil } diff --git a/lib/srv/discovery/discovery_test.go b/lib/srv/discovery/discovery_test.go index 70f611072dfa5..068e94e93aaf3 100644 --- a/lib/srv/discovery/discovery_test.go +++ b/lib/srv/discovery/discovery_test.go @@ -83,6 +83,7 @@ import ( "github.com/gravitational/teleport/lib/auth/authclient" "github.com/gravitational/teleport/lib/auth/authtest" "github.com/gravitational/teleport/lib/authz" + "github.com/gravitational/teleport/lib/backend/memory" "github.com/gravitational/teleport/lib/cloud/awsconfig" "github.com/gravitational/teleport/lib/cloud/azure" "github.com/gravitational/teleport/lib/cloud/cloudtest" @@ -93,12 +94,14 @@ import ( libstream "github.com/gravitational/teleport/lib/itertools/stream" "github.com/gravitational/teleport/lib/modules" "github.com/gravitational/teleport/lib/services" + "github.com/gravitational/teleport/lib/services/local" "github.com/gravitational/teleport/lib/srv/discovery/common" "github.com/gravitational/teleport/lib/srv/discovery/fetchers" "github.com/gravitational/teleport/lib/srv/discovery/fetchers/db" "github.com/gravitational/teleport/lib/srv/server" usagereporter "github.com/gravitational/teleport/lib/usagereporter/teleport" "github.com/gravitational/teleport/lib/utils/log/logtest" + "github.com/gravitational/teleport/lib/utils/testutils/synctest" ) func TestMain(m *testing.M) { @@ -871,7 +874,7 @@ func TestDiscoveryServer(t *testing.T) { AWSConfigProvider: fakeConfigProvider, }, ClusterFeatures: func() proto.Features { return proto.Features{} }, - KubernetesClient: fake.NewSimpleClientset(), + KubernetesClient: fake.NewClientset(), AccessPoint: getDiscoveryAccessPointWithEKSEnroller(tlsServer.Auth(), authClient, authClient.IntegrationAWSOIDCClient()), Matchers: tc.staticMatchers, Emitter: tc.emitter, @@ -1043,7 +1046,7 @@ func TestDiscoveryServerConcurrency(t *testing.T) { server1, err := New(authz.ContextWithUser(ctx, identity.I), &Config{ GetEC2Client: getEC2Client, ClusterFeatures: func() proto.Features { return proto.Features{} }, - KubernetesClient: fake.NewSimpleClientset(), + KubernetesClient: fake.NewClientset(), AccessPoint: getDiscoveryAccessPoint(tlsServer.Auth(), authClient), Matchers: staticMatcher, Emitter: emitter, @@ -1056,7 +1059,7 @@ func TestDiscoveryServerConcurrency(t *testing.T) { server2, err := New(authz.ContextWithUser(ctx, identity.I), &Config{ GetEC2Client: getEC2Client, ClusterFeatures: func() proto.Features { return proto.Features{} }, - KubernetesClient: fake.NewSimpleClientset(), + KubernetesClient: fake.NewClientset(), AccessPoint: getDiscoveryAccessPoint(tlsServer.Auth(), authClient), Matchers: staticMatcher, Emitter: emitter, @@ -1258,7 +1261,7 @@ func TestDiscoveryKubeServices(t *testing.T) { ctx, &Config{ ClusterFeatures: func() proto.Features { return proto.Features{} }, - KubernetesClient: fake.NewSimpleClientset(objects...), + KubernetesClient: fake.NewClientset(objects...), AccessPoint: getDiscoveryAccessPoint(tlsServer.Auth(), authClient), Matchers: Matchers{ Kubernetes: tt.kubernetesMatchers, @@ -1270,9 +1273,7 @@ func TestDiscoveryKubeServices(t *testing.T) { require.NoError(t, err) - t.Cleanup(func() { - discServer.Stop() - }) + t.Cleanup(discServer.Stop) go discServer.Start() require.EventuallyWithT(t, func(t *assert.CollectT) { @@ -1295,6 +1296,75 @@ func TestDiscoveryKubeServices(t *testing.T) { } } +// TestDiscoveryKubeServicesInterval tests that the poll interval is honored for Kube App Discovery. +func TestDiscoveryKubeServicesInterval(t *testing.T) { + const mainDiscoveryGroup = "main" + + mockKubeServices := []*corev1.Service{ + newMockKubeService("service1", "ns1", "", + map[string]string{"test-label": "testval"}, + nil, + []corev1.ServicePort{{Port: 42, Name: "http", Protocol: corev1.ProtocolTCP}}), + newMockKubeService("service2", "ns2", "", + map[string]string{"test-label": "testval"}, + nil, + []corev1.ServicePort{{Port: 42, Name: "http", Protocol: corev1.ProtocolTCP}}), + } + + // Kubernetes app discovery matcher + kubernetesMatchers := []types.KubernetesMatcher{ + { + Types: []string{"app"}, + Namespaces: []string{types.Wildcard}, + Labels: map[string]utils.Strings{"test-label": {"testval"}}, + }, + } + + synctest.Test(t, func(t *testing.T) { + ctx := t.Context() + + fakeKubeClient := fake.NewClientset(mockKubeServices[0]) + + bk, err := memory.New(memory.Config{}) + require.NoError(t, err) + mockAccessPoint := &mockAuthServer{ + events: local.NewEventsService(bk), + } + + const pollInterval = 1 * time.Minute + discServer, err := New(ctx, + &Config{ + ClusterFeatures: func() proto.Features { return proto.Features{} }, + KubernetesClient: fakeKubeClient, + AccessPoint: mockAccessPoint, + Matchers: Matchers{ + Kubernetes: kubernetesMatchers, + }, + Emitter: &mockEmitter{}, + DiscoveryGroup: mainDiscoveryGroup, + PollInterval: pollInterval, + }) + require.NoError(t, err) + + t.Cleanup(discServer.Stop) + go discServer.Start() + + // Wait for discovery server to complete one iteration of discovering resources + synctest.Wait() + + // Mock access point implementation does not actually create the app resource, but usage event is still logged + require.Len(t, mockAccessPoint.usageEvents, 1) + + // Create new Kube service to be discovered by discServer + fakeKubeClient.CoreV1().Services("ns2").Create(ctx, mockKubeServices[1], v1.CreateOptions{}) + + time.Sleep(pollInterval) + synctest.Wait() + + require.Len(t, mockAccessPoint.usageEvents, 2) + }) +} + func TestDiscoveryInCloudKube(t *testing.T) { t.Parallel() @@ -1609,7 +1679,7 @@ func TestDiscoveryInCloudKube(t *testing.T) { gcpClients: gcpClients, AWSFetchersClients: mockedClients, ClusterFeatures: func() proto.Features { return proto.Features{} }, - KubernetesClient: fake.NewSimpleClientset(), + KubernetesClient: fake.NewClientset(), AccessPoint: getDiscoveryAccessPoint(tlsServer.Auth(), authClient), Matchers: Matchers{ AWS: tc.awsMatchers, @@ -2563,7 +2633,7 @@ func TestDiscoveryDatabase(t *testing.T) { }, azureClients: azureClients, ClusterFeatures: func() proto.Features { return proto.Features{} }, - KubernetesClient: fake.NewSimpleClientset(), + KubernetesClient: fake.NewClientset(), AccessPoint: getDiscoveryAccessPoint(tlsServer.Auth(), authClient), AWSDatabaseFetcherFactory: dbFetcherFactory, AWSConfigProvider: fakeConfigProvider, @@ -2697,7 +2767,7 @@ func TestDiscoveryDatabaseRemovingDiscoveryConfigs(t *testing.T) { AWSConfigProvider: fakeConfigProvider, AWSDatabaseFetcherFactory: dbFetcherFactory, ClusterFeatures: func() proto.Features { return proto.Features{} }, - KubernetesClient: fake.NewSimpleClientset(), + KubernetesClient: fake.NewClientset(), AccessPoint: getDiscoveryAccessPoint(tlsServer.Auth(), authClient), Matchers: Matchers{}, Emitter: authClient, @@ -3137,7 +3207,7 @@ func TestAzureVMDiscovery(t *testing.T) { server, err := New(authz.ContextWithUser(context.Background(), identity.I), &Config{ azureClients: testAzureClients, ClusterFeatures: func() proto.Features { return proto.Features{} }, - KubernetesClient: fake.NewSimpleClientset(), + KubernetesClient: fake.NewClientset(), AccessPoint: getDiscoveryAccessPoint(tlsServer.Auth(), authClient), Matchers: tc.staticMatchers, Emitter: emitter, @@ -3445,7 +3515,7 @@ func TestGCPVMDiscovery(t *testing.T) { server, err := New(authz.ContextWithUser(context.Background(), identity.I), &Config{ gcpClients: gcpClients, ClusterFeatures: func() proto.Features { return proto.Features{} }, - KubernetesClient: fake.NewSimpleClientset(), + KubernetesClient: fake.NewClientset(), AccessPoint: getDiscoveryAccessPoint(tlsServer.Auth(), authClient), Matchers: tc.staticMatchers, Emitter: emitter, diff --git a/lib/srv/discovery/kube_services_watcher.go b/lib/srv/discovery/kube_services_watcher.go index 9940734248588..83b88d8cf8b72 100644 --- a/lib/srv/discovery/kube_services_watcher.go +++ b/lib/srv/discovery/kube_services_watcher.go @@ -21,7 +21,6 @@ package discovery import ( "context" "sync" - "time" "github.com/gravitational/trace" @@ -73,10 +72,11 @@ func (s *Server) startKubeAppsWatchers() error { watcher, err := common.NewWatcher(s.ctx, common.WatcherConfig{ FetchersFn: common.StaticFetchers(s.kubeAppsFetchers), - Interval: 5 * time.Minute, Logger: s.Log.With("kind", types.KindApp), DiscoveryGroup: s.DiscoveryGroup, + Interval: s.PollInterval, Origin: types.OriginDiscoveryKubernetes, + Clock: s.clock, }) if err != nil { return trace.Wrap(err) diff --git a/lib/srv/discovery/kube_watcher.go b/lib/srv/discovery/kube_watcher.go index 20847f50f069e..8db90ccb07589 100644 --- a/lib/srv/discovery/kube_watcher.go +++ b/lib/srv/discovery/kube_watcher.go @@ -80,6 +80,7 @@ func (s *Server) startKubeWatchers() error { DiscoveryGroup: s.DiscoveryGroup, Interval: s.PollInterval, Origin: types.OriginCloud, + Clock: s.clock, }) if err != nil { return trace.Wrap(err)