diff --git a/lib/srv/discovery/discovery.go b/lib/srv/discovery/discovery.go index ad160057ed5aa..54efa877a12ad 100644 --- a/lib/srv/discovery/discovery.go +++ b/lib/srv/discovery/discovery.go @@ -516,6 +516,7 @@ func (s *Server) initAWSWatchers(matchers []types.AWSMatcher) error { } lr, err := newLabelReconciler(&labelReconcilerConfig{ + clock: s.clock, log: s.Log, accessPoint: s.AccessPoint, }) @@ -1835,6 +1836,7 @@ func (s *Server) initTeleportNodeWatcher() (err error) { // Logger: s.Logger, Client: s.AccessPoint, MaxStaleness: time.Minute, + Clock: s.clock, }, NodesGetter: s.AccessPoint, }) diff --git a/lib/srv/discovery/discovery_eks_test.go b/lib/srv/discovery/discovery_eks_test.go new file mode 100644 index 0000000000000..c7bc03587b0c8 --- /dev/null +++ b/lib/srv/discovery/discovery_eks_test.go @@ -0,0 +1,471 @@ +/* + * Teleport + * Copyright (C) 2025 Gravitational, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package discovery + +import ( + "context" + "iter" + "maps" + "slices" + "sync" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go/service/eks" + "github.com/google/uuid" + "github.com/gravitational/trace" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + + "github.com/gravitational/teleport/api" + "github.com/gravitational/teleport/api/client/proto" + integrationpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/integration/v1" + usertasksv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/usertasks/v1" + "github.com/gravitational/teleport/api/types" + "github.com/gravitational/teleport/api/types/discoveryconfig" + "github.com/gravitational/teleport/api/types/header" + "github.com/gravitational/teleport/api/utils" + "github.com/gravitational/teleport/lib/auth/authclient" + "github.com/gravitational/teleport/lib/backend/memory" + "github.com/gravitational/teleport/lib/cloud" + "github.com/gravitational/teleport/lib/cloud/mocks" + "github.com/gravitational/teleport/lib/itertools/stream" + "github.com/gravitational/teleport/lib/services/local" + libutils "github.com/gravitational/teleport/lib/utils" + "github.com/gravitational/teleport/lib/utils/testutils/synctest" +) + +func discoveryConfigWithAWSMatchers(t *testing.T, discoveryGroup string, m ...types.AWSMatcher) *discoveryconfig.DiscoveryConfig { + t.Helper() + discoveryConfigName := uuid.NewString() + discoveryConfig, err := discoveryconfig.NewDiscoveryConfig( + header.Metadata{Name: discoveryConfigName}, + discoveryconfig.Spec{ + DiscoveryGroup: discoveryGroup, + AWS: m, + }, + ) + require.NoError(t, err) + return discoveryConfig +} + +func awsMatcherForEKS(t *testing.T, integrationName string) types.AWSMatcher { + t.Helper() + return types.AWSMatcher{ + Types: []string{"eks"}, + Regions: []string{"us-west-2"}, + Tags: map[string]utils.Strings{"RunDiscover": {"Please"}}, + Integration: integrationName, + } +} + +func awsMatcherForEKSWithAppDiscovery(t *testing.T, integrationName string) types.AWSMatcher { + t.Helper() + matcher := awsMatcherForEKS(t, integrationName) + matcher.KubeAppDiscovery = true + return matcher +} + +func TestDiscoveryServerEKS(t *testing.T) { + t.Parallel() + integrationName := "my-integration" + defaultDiscoveryGroup := "dc001" + + eksMatcher := awsMatcherForEKS(t, integrationName) + discoveryConfigForUserTaskEKSTest := discoveryConfigWithAWSMatchers(t, defaultDiscoveryGroup, eksMatcher) + + matcherWithAppDiscovery := awsMatcherForEKSWithAppDiscovery(t, integrationName) + matcherWithoutAppDiscovery := awsMatcherForEKS(t, integrationName) + discoveryConfigWithAndWithoutAppDiscovery := discoveryConfigWithAWSMatchers(t, defaultDiscoveryGroup, matcherWithAppDiscovery, matcherWithoutAppDiscovery) + + for _, tt := range []struct { + name string + emitter *mockEmitter + cloudClients *cloud.TestCloudClients + eksEnroller eksClustersEnroller + discoveryConfig *discoveryconfig.DiscoveryConfig + staticMatchers Matchers + wantInstalledInstances []string + wantDiscoveryConfigStatus *discoveryconfig.Status + userTasksDiscoverCheck func(t *testing.T, existingTasks []*usertasksv1.UserTask) + }{ + { + name: "multiple EKS clusters failed to autoenroll and user tasks are created", + cloudClients: &cloud.TestCloudClients{ + STS: &mocks.STSMock{}, + EKS: &mocks.EKSMock{ + Clusters: []*eks.Cluster{ + { + Name: aws.String("cluster01"), + Arn: aws.String("arn:aws:eks:us-west-2:123456789012:cluster/cluster01"), + Status: aws.String(eks.ClusterStatusActive), + Tags: map[string]*string{ + "RunDiscover": aws.String("Please"), + }, + }, + { + Name: aws.String("cluster02"), + Arn: aws.String("arn:aws:eks:us-west-2:123456789012:cluster/cluster02"), + Status: aws.String(eks.ClusterStatusActive), + Tags: map[string]*string{ + "RunDiscover": aws.String("Please"), + }, + }, + }, + }, + }, + eksEnroller: &mockEKSClusterEnroller{ + resp: &integrationpb.EnrollEKSClustersResponse{ + Results: []*integrationpb.EnrollEKSClusterResult{ + { + EksClusterName: "cluster01", + Error: "access endpoint is not reachable", + IssueType: "eks-cluster-unreachable", + }, + { + EksClusterName: "cluster02", + Error: "access endpoint is not reachable", + IssueType: "eks-cluster-unreachable", + }, + }, + }, + err: nil, + }, + emitter: &mockEmitter{}, + staticMatchers: Matchers{}, + discoveryConfig: discoveryConfigForUserTaskEKSTest, + wantInstalledInstances: []string{}, + userTasksDiscoverCheck: func(t *testing.T, existingTasks []*usertasksv1.UserTask) { + require.NotEmpty(t, existingTasks) + existingTask := existingTasks[0] + + require.Equal(t, "OPEN", existingTask.GetSpec().State) + require.Equal(t, integrationName, existingTask.GetSpec().Integration) + require.Equal(t, "eks-cluster-unreachable", existingTask.GetSpec().IssueType) + require.Equal(t, "123456789012", existingTask.GetSpec().GetDiscoverEks().GetAccountId()) + require.Equal(t, "us-west-2", existingTask.GetSpec().GetDiscoverEks().GetRegion()) + + taskClusters := existingTask.GetSpec().GetDiscoverEks().Clusters + require.Contains(t, taskClusters, "cluster01") + taskCluster := taskClusters["cluster01"] + + require.Equal(t, "cluster01", taskCluster.Name) + require.Equal(t, discoveryConfigForUserTaskEKSTest.GetName(), taskCluster.DiscoveryConfig) + require.Equal(t, defaultDiscoveryGroup, taskCluster.DiscoveryGroup) + + require.Contains(t, taskClusters, "cluster02") + taskCluster2 := taskClusters["cluster02"] + + require.Equal(t, "cluster02", taskCluster2.Name) + require.Equal(t, discoveryConfigForUserTaskEKSTest.GetName(), taskCluster2.DiscoveryConfig) + require.Equal(t, defaultDiscoveryGroup, taskCluster2.DiscoveryGroup) + }, + }, + { + name: "multiple EKS clusters with different KubeAppDiscovery setting failed to autoenroll and user tasks are created", + cloudClients: &cloud.TestCloudClients{ + STS: &mocks.STSMock{}, + EKS: &mocks.EKSMock{ + Clusters: []*eks.Cluster{ + { + Name: aws.String("cluster01"), + Arn: aws.String("arn:aws:eks:us-west-2:123456789012:cluster/cluster01"), + Status: aws.String(eks.ClusterStatusActive), + Tags: map[string]*string{ + "RunDiscover": aws.String("Please"), + }, + }, + { + Name: aws.String("cluster02"), + Arn: aws.String("arn:aws:eks:us-west-2:123456789012:cluster/cluster02"), + Status: aws.String(eks.ClusterStatusActive), + Tags: map[string]*string{ + "RunDiscover": aws.String("Please"), + }, + }, + }, + }, + }, + eksEnroller: &mockEKSClusterEnroller{ + resp: &integrationpb.EnrollEKSClustersResponse{ + Results: []*integrationpb.EnrollEKSClusterResult{ + { + EksClusterName: "cluster01", + Error: "access endpoint is not reachable", + IssueType: "eks-cluster-unreachable", + }, + { + EksClusterName: "cluster02", + Error: "access endpoint is not reachable", + IssueType: "eks-cluster-unreachable", + }, + }, + }, + err: nil, + }, + emitter: &mockEmitter{}, + staticMatchers: Matchers{}, + discoveryConfig: discoveryConfigWithAndWithoutAppDiscovery, + wantInstalledInstances: []string{}, + userTasksDiscoverCheck: func(t *testing.T, existingTasks []*usertasksv1.UserTask) { + require.Len(t, existingTasks, 2) + existingTask := existingTasks[0] + if existingTask.Spec.DiscoverEks.AppAutoDiscover == false { + existingTask = existingTasks[1] + } + + require.Equal(t, "OPEN", existingTask.GetSpec().State) + require.Equal(t, integrationName, existingTask.GetSpec().Integration) + require.Equal(t, "eks-cluster-unreachable", existingTask.GetSpec().IssueType) + require.Equal(t, "123456789012", existingTask.GetSpec().GetDiscoverEks().GetAccountId()) + require.Equal(t, "us-west-2", existingTask.GetSpec().GetDiscoverEks().GetRegion()) + + taskClusters := existingTask.GetSpec().GetDiscoverEks().Clusters + require.Contains(t, taskClusters, "cluster01") + taskCluster := taskClusters["cluster01"] + + require.Equal(t, "cluster01", taskCluster.Name) + require.Equal(t, discoveryConfigWithAndWithoutAppDiscovery.GetName(), taskCluster.DiscoveryConfig) + require.Equal(t, defaultDiscoveryGroup, taskCluster.DiscoveryGroup) + }, + }, + } { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + synctest.Test(t, func(t *testing.T) { + ctx, cancel := context.WithCancel(t.Context()) + + bk, err := memory.New(memory.Config{}) + require.NoError(t, err) + + mockAccessPoint := &mockAuthServer{ + events: local.NewEventsService(bk), + enrollEKSClusters: tt.eksEnroller.EnrollEKSClusters, + storeDiscoveryConfigs: map[string]*discoveryconfig.DiscoveryConfig{ + tt.discoveryConfig.GetName(): tt.discoveryConfig, + }, + storeUserTasks: map[string]*usertasksv1.UserTask{}, + } + + server, err := New(ctx, &Config{ + CloudClients: tt.cloudClients, + ClusterFeatures: func() proto.Features { return proto.Features{} }, + AccessPoint: mockAccessPoint, + Matchers: Matchers{}, + Emitter: tt.emitter, + DiscoveryGroup: defaultDiscoveryGroup, + Log: libutils.NewSlogLoggerForTests(), + }) + require.NoError(t, err) + + go func() { + assert.NoError(t, server.Start()) + }() + defer t.Cleanup(server.Stop) + + // Wait for the discovery server to complete one iteration of discovering resources + synctest.Wait() + + // Start server shutdown. + cancel() + + // Discovery usage events are reported. + require.NotEmpty(t, mockAccessPoint.usageEvents) + + // Check the UserTasks created by the discovery server. + existingTasks := slices.Collect(maps.Values(mockAccessPoint.storeUserTasks)) + tt.userTasksDiscoverCheck(t, existingTasks) + }) + }) + } +} + +type mockAuthServer struct { + authclient.DiscoveryAccessPoint + + storeDiscoveryConfigs map[string]*discoveryconfig.DiscoveryConfig + storeUserTasks map[string]*usertasksv1.UserTask + + events types.Events + usageEvents []*proto.SubmitUsageEventRequest + + enrollEKSClusters func(context.Context, *integrationpb.EnrollEKSClustersRequest, ...grpc.CallOption) (*integrationpb.EnrollEKSClustersResponse, error) +} + +type mockWatcher struct { + done chan struct{} + events chan types.Event + doneOnce sync.Once +} + +func (w *mockWatcher) Events() <-chan types.Event { + return w.events +} + +func (w *mockWatcher) Close() error { + w.doneOnce.Do(func() { + close(w.done) + close(w.events) + }) + return nil +} + +func (w *mockWatcher) Error() error { + return nil +} + +func (w *mockWatcher) Done() <-chan struct{} { + return w.done +} + +func (m *mockAuthServer) NewWatcher(ctx context.Context, watch types.Watch) (types.Watcher, error) { + w := &mockWatcher{ + done: make(chan struct{}), + events: make(chan types.Event, 1), + } + + w.events <- types.Event{ + Type: types.OpInit, + } + return w, nil +} + +func (m *mockAuthServer) Ping(context.Context) (proto.PingResponse, error) { + return proto.PingResponse{ + ServerVersion: api.SemVer().String(), + }, nil +} + +func (m *mockAuthServer) SubmitUsageEvent(ctx context.Context, req *proto.SubmitUsageEventRequest) error { + m.usageEvents = append(m.usageEvents, req) + return nil +} + +func (m *mockAuthServer) ListDiscoveryConfigs(ctx context.Context, pageSize int, nextKey string) ([]*discoveryconfig.DiscoveryConfig, string, error) { + return slices.Collect(maps.Values(m.storeDiscoveryConfigs)), "", nil +} + +func (m *mockAuthServer) UpdateDiscoveryConfigStatus(ctx context.Context, name string, status discoveryconfig.Status) (*discoveryconfig.DiscoveryConfig, error) { + dc, ok := m.storeDiscoveryConfigs[name] + if !ok { + return nil, trace.NotFound("discovery config %q not found", name) + } + + dc.Status = status + m.storeDiscoveryConfigs[name] = dc + + return dc, nil +} + +func (m *mockAuthServer) GetKubernetesClusters(ctx context.Context) ([]types.KubeCluster, error) { + return nil, nil +} + +func (m *mockAuthServer) ListKubernetesClusters(ctx context.Context, limit int, start string) ([]types.KubeCluster, string, error) { + return nil, "", nil +} + +func (m *mockAuthServer) RangeKubernetesClusters(ctx context.Context, start, end string) iter.Seq2[types.KubeCluster, error] { + return func(yield func(types.KubeCluster, error) bool) {} +} + +func (m *mockAuthServer) GetKubernetesServers(context.Context) ([]types.KubeServer, error) { + return nil, nil +} + +func (m *mockAuthServer) GetDatabases(ctx context.Context) ([]types.Database, error) { + return nil, nil +} + +func (m *mockAuthServer) ListDatabases(ctx context.Context, limit int, startKey string) ([]types.Database, string, error) { + return nil, "", nil +} + +func (m *mockAuthServer) RangeDatabases(ctx context.Context, start, end string) iter.Seq2[types.Database, error] { + return stream.Empty[types.Database]() +} + +func (m *mockAuthServer) CreateApp(ctx context.Context, _ types.Application) error { + return nil +} + +func (m *mockAuthServer) GetApps(ctx context.Context) ([]types.Application, error) { + return nil, nil +} + +func (m *mockAuthServer) ListApps(ctx context.Context, limit int, startKey string) ([]types.Application, string, error) { + return nil, "", nil +} + +func (m *mockAuthServer) RangeApps(ctx context.Context, start, end string) iter.Seq2[types.Application, error] { + return func(yield func(types.Application, error) bool) {} +} + +func (m *mockAuthServer) GetNodes(ctx context.Context, namespace string) ([]types.Server, error) { + return nil, nil +} + +func (m *mockAuthServer) EnrollEKSClusters(ctx context.Context, req *integrationpb.EnrollEKSClustersRequest, opts ...grpc.CallOption) (*integrationpb.EnrollEKSClustersResponse, error) { + return m.enrollEKSClusters(ctx, req, opts...) +} + +func (m *mockAuthServer) AcquireSemaphore(ctx context.Context, params types.AcquireSemaphoreRequest) (*types.SemaphoreLease, error) { + return &types.SemaphoreLease{ + Expires: time.Now().Add(10 * time.Minute), + }, nil +} + +func (m *mockAuthServer) CancelSemaphoreLease(ctx context.Context, lease types.SemaphoreLease) error { + return nil +} + +func (m *mockAuthServer) GetUserTask(ctx context.Context, name string) (*usertasksv1.UserTask, error) { + if task, ok := m.storeUserTasks[name]; ok { + return task, nil + } + return nil, trace.NotFound("user task %q not found", name) +} + +func (m *mockAuthServer) UpsertUserTask(ctx context.Context, req *usertasksv1.UserTask) (*usertasksv1.UserTask, error) { + m.storeUserTasks[req.GetMetadata().GetName()] = req + return req, nil +} + +type mockEKSClusterEnroller struct { + resp *integrationpb.EnrollEKSClustersResponse + err error +} + +func (m *mockEKSClusterEnroller) EnrollEKSClusters(ctx context.Context, req *integrationpb.EnrollEKSClustersRequest, opt ...grpc.CallOption) (*integrationpb.EnrollEKSClustersResponse, error) { + ret := &integrationpb.EnrollEKSClustersResponse{ + Results: []*integrationpb.EnrollEKSClusterResult{}, + } + // Filter out non-requested clusters. + for _, clusterName := range req.EksClusterNames { + for _, mockClusterResult := range m.resp.Results { + if clusterName == mockClusterResult.EksClusterName { + ret.Results = append(ret.Results, mockClusterResult) + } + } + } + return ret, m.err +} diff --git a/lib/srv/discovery/discovery_test.go b/lib/srv/discovery/discovery_test.go index e1c8b0e2a1423..48b78a383a0df 100644 --- a/lib/srv/discovery/discovery_test.go +++ b/lib/srv/discovery/discovery_test.go @@ -80,6 +80,7 @@ import ( "github.com/gravitational/teleport/lib/auth/authclient" "github.com/gravitational/teleport/lib/auth/authtest" "github.com/gravitational/teleport/lib/authz" + "github.com/gravitational/teleport/lib/backend/memory" "github.com/gravitational/teleport/lib/cloud" "github.com/gravitational/teleport/lib/cloud/azure" "github.com/gravitational/teleport/lib/cloud/gcp" @@ -89,10 +90,12 @@ import ( libstream "github.com/gravitational/teleport/lib/itertools/stream" "github.com/gravitational/teleport/lib/modules" "github.com/gravitational/teleport/lib/services" + "github.com/gravitational/teleport/lib/services/local" "github.com/gravitational/teleport/lib/srv/discovery/common" "github.com/gravitational/teleport/lib/srv/server" usagereporter "github.com/gravitational/teleport/lib/usagereporter/teleport" libutils "github.com/gravitational/teleport/lib/utils" + "github.com/gravitational/teleport/lib/utils/testutils/synctest" ) func TestMain(m *testing.M) { @@ -322,46 +325,6 @@ func TestDiscoveryServer(t *testing.T) { ) require.NoError(t, err) - discoveryConfigForUserTaskEKSTestName := uuid.NewString() - discoveryConfigForUserTaskEKSTest, err := discoveryconfig.NewDiscoveryConfig( - header.Metadata{Name: discoveryConfigForUserTaskEKSTestName}, - discoveryconfig.Spec{ - DiscoveryGroup: defaultDiscoveryGroup, - AWS: []types.AWSMatcher{{ - Types: []string{"eks"}, - Regions: []string{"eu-west-2"}, - Tags: map[string]utils.Strings{"RunDiscover": {"Please"}}, - Integration: "my-integration", - }}, - }, - ) - require.NoError(t, err) - - discoveryConfigWithAndWithoutAppDiscoveryTestName := uuid.NewString() - discoveryConfigWithAndWithoutAppDiscovery, err := discoveryconfig.NewDiscoveryConfig( - header.Metadata{Name: discoveryConfigWithAndWithoutAppDiscoveryTestName}, - discoveryconfig.Spec{ - DiscoveryGroup: defaultDiscoveryGroup, - AWS: []types.AWSMatcher{ - { - Types: []string{"eks"}, - Regions: []string{"eu-west-2"}, - Tags: map[string]utils.Strings{"EnableAppDiscovery": {"No"}}, - Integration: "my-integration", - KubeAppDiscovery: false, - }, - { - Types: []string{"eks"}, - Regions: []string{"eu-west-2"}, - Tags: map[string]utils.Strings{"EnableAppDiscovery": {"Yes"}}, - Integration: "my-integration", - KubeAppDiscovery: true, - }, - }, - }, - ) - require.NoError(t, err) - tcs := []struct { name string // presentInstances is a list of servers already present in teleport @@ -369,7 +332,6 @@ func TestDiscoveryServer(t *testing.T) { foundEC2Instances []*ec2.Instance ssm *mockSSMClient emitter *mockEmitter - eksEnroller eksClustersEnroller discoveryConfig *discoveryconfig.DiscoveryConfig staticMatchers Matchers wantInstalledInstances []string @@ -726,149 +688,6 @@ func TestDiscoveryServer(t *testing.T) { require.Equal(t, defaultDiscoveryGroup, taskInstance.DiscoveryGroup) }, }, - { - name: "multiple EKS clusters failed to autoenroll and user tasks are created", - presentInstances: []types.Server{}, - foundEC2Instances: []*ec2.Instance{}, - ssm: &mockSSMClient{}, - cloudClients: &cloud.TestCloudClients{ - STS: &mocks.STSMock{}, - EKS: &mocks.EKSMock{ - Clusters: []*eks.Cluster{ - { - Name: aws.String("cluster01"), - Arn: aws.String("arn:aws:eks:us-west-2:123456789012:cluster/cluster01"), - Status: aws.String(eks.ClusterStatusActive), - Tags: map[string]*string{ - "RunDiscover": aws.String("Please"), - }, - }, - { - Name: aws.String("cluster02"), - Arn: aws.String("arn:aws:eks:us-west-2:123456789012:cluster/cluster02"), - Status: aws.String(eks.ClusterStatusActive), - Tags: map[string]*string{ - "RunDiscover": aws.String("Please"), - }, - }, - }, - }, - }, - eksEnroller: &mockEKSClusterEnroller{ - resp: &integrationpb.EnrollEKSClustersResponse{ - Results: []*integrationpb.EnrollEKSClusterResult{ - { - EksClusterName: "cluster01", - Error: "access endpoint is not reachable", - IssueType: "eks-cluster-unreachable", - }, - { - EksClusterName: "cluster02", - Error: "access endpoint is not reachable", - IssueType: "eks-cluster-unreachable", - }, - }, - }, - err: nil, - }, - emitter: &mockEmitter{}, - staticMatchers: Matchers{}, - discoveryConfig: discoveryConfigForUserTaskEKSTest, - wantInstalledInstances: []string{}, - userTasksDiscoverCheck: func(t *testing.T, userTasksClt services.UserTasks) { - atLeastOneUserTask := 1 - atLeastTwoTaskItems := 2 - existingTasks := fetchAllUserTasks(t, userTasksClt, atLeastOneUserTask, atLeastTwoTaskItems) - existingTask := existingTasks[0] - - require.Equal(t, "OPEN", existingTask.GetSpec().State) - require.Equal(t, "my-integration", existingTask.GetSpec().Integration) - require.Equal(t, "eks-cluster-unreachable", existingTask.GetSpec().IssueType) - require.Equal(t, "123456789012", existingTask.GetSpec().GetDiscoverEks().GetAccountId()) - require.Equal(t, "us-west-2", existingTask.GetSpec().GetDiscoverEks().GetRegion()) - - taskClusters := existingTask.GetSpec().GetDiscoverEks().Clusters - require.Contains(t, taskClusters, "cluster01") - taskCluster := taskClusters["cluster01"] - - require.Equal(t, "cluster01", taskCluster.Name) - require.Equal(t, discoveryConfigForUserTaskEKSTestName, taskCluster.DiscoveryConfig) - require.Equal(t, defaultDiscoveryGroup, taskCluster.DiscoveryGroup) - }, - }, - { - name: "multiple EKS clusters with different KubeAppDiscovery setting failed to autoenroll and user tasks are created", - presentInstances: []types.Server{}, - foundEC2Instances: []*ec2.Instance{}, - ssm: &mockSSMClient{}, - cloudClients: &cloud.TestCloudClients{ - STS: &mocks.STSMock{}, - EKS: &mocks.EKSMock{ - Clusters: []*eks.Cluster{ - { - Name: aws.String("cluster01"), - Arn: aws.String("arn:aws:eks:us-west-2:123456789012:cluster/cluster01"), - Status: aws.String(eks.ClusterStatusActive), - Tags: map[string]*string{ - "EnableAppDiscovery": aws.String("Yes"), - }, - }, - { - Name: aws.String("cluster02"), - Arn: aws.String("arn:aws:eks:us-west-2:123456789012:cluster/cluster02"), - Status: aws.String(eks.ClusterStatusActive), - Tags: map[string]*string{ - "EnableAppDiscovery": aws.String("No"), - }, - }, - }, - }, - }, - eksEnroller: &mockEKSClusterEnroller{ - resp: &integrationpb.EnrollEKSClustersResponse{ - Results: []*integrationpb.EnrollEKSClusterResult{ - { - EksClusterName: "cluster01", - Error: "access endpoint is not reachable", - IssueType: "eks-cluster-unreachable", - }, - { - EksClusterName: "cluster02", - Error: "access endpoint is not reachable", - IssueType: "eks-cluster-unreachable", - }, - }, - }, - err: nil, - }, - emitter: &mockEmitter{}, - staticMatchers: Matchers{}, - discoveryConfig: discoveryConfigWithAndWithoutAppDiscovery, - wantInstalledInstances: []string{}, - userTasksDiscoverCheck: func(t *testing.T, userTasksClt services.UserTasks) { - atLeastOneUserTask := 2 - atLeastTwoTaskItems := 2 - existingTasks := fetchAllUserTasks(t, userTasksClt, atLeastOneUserTask, atLeastTwoTaskItems) - existingTask := existingTasks[0] - if existingTask.Spec.DiscoverEks.AppAutoDiscover == false { - existingTask = existingTasks[1] - } - - require.Equal(t, "OPEN", existingTask.GetSpec().State) - require.Equal(t, "my-integration", existingTask.GetSpec().Integration) - require.Equal(t, "eks-cluster-unreachable", existingTask.GetSpec().IssueType) - require.Equal(t, "123456789012", existingTask.GetSpec().GetDiscoverEks().GetAccountId()) - require.Equal(t, "us-west-2", existingTask.GetSpec().GetDiscoverEks().GetRegion()) - - taskClusters := existingTask.GetSpec().GetDiscoverEks().Clusters - require.Contains(t, taskClusters, "cluster01") - taskCluster := taskClusters["cluster01"] - - require.Equal(t, "cluster01", taskCluster.Name) - require.Equal(t, discoveryConfigWithAndWithoutAppDiscoveryTestName, taskCluster.DiscoveryConfig) - require.Equal(t, defaultDiscoveryGroup, taskCluster.DiscoveryGroup) - }, - }, } for _, tc := range tcs { @@ -928,10 +747,6 @@ func TestDiscoveryServer(t *testing.T) { require.NoError(t, err) } - var eksEnroller eksClustersEnroller = authClient.IntegrationAWSOIDCClient() - if tc.eksEnroller != nil { - eksEnroller = tc.eksEnroller - } if tc.cloudClients != nil { testCloudClients = tc.cloudClients } @@ -939,8 +754,8 @@ func TestDiscoveryServer(t *testing.T) { server, err := New(authz.ContextWithUser(context.Background(), identity.I), &Config{ CloudClients: testCloudClients, ClusterFeatures: func() proto.Features { return proto.Features{} }, - KubernetesClient: fake.NewSimpleClientset(), - AccessPoint: getDiscoveryAccessPointWithEKSEnroller(tlsServer.Auth(), authClient, eksEnroller), + KubernetesClient: fake.NewClientset(), + AccessPoint: getDiscoveryAccessPointWithEKSEnroller(tlsServer.Auth(), authClient, authClient.IntegrationAWSOIDCClient()), Matchers: tc.staticMatchers, Emitter: tc.emitter, Log: logger, @@ -1101,7 +916,7 @@ func TestDiscoveryServerConcurrency(t *testing.T) { server1, err := New(authz.ContextWithUser(ctx, identity.I), &Config{ CloudClients: testCloudClients, ClusterFeatures: func() proto.Features { return proto.Features{} }, - KubernetesClient: fake.NewSimpleClientset(), + KubernetesClient: fake.NewClientset(), AccessPoint: getDiscoveryAccessPoint(tlsServer.Auth(), authClient), Matchers: staticMatcher, Emitter: emitter, @@ -1114,7 +929,7 @@ func TestDiscoveryServerConcurrency(t *testing.T) { server2, err := New(authz.ContextWithUser(ctx, identity.I), &Config{ CloudClients: testCloudClients, ClusterFeatures: func() proto.Features { return proto.Features{} }, - KubernetesClient: fake.NewSimpleClientset(), + KubernetesClient: fake.NewClientset(), AccessPoint: getDiscoveryAccessPoint(tlsServer.Auth(), authClient), Matchers: staticMatcher, Emitter: emitter, @@ -1318,7 +1133,7 @@ func TestDiscoveryKubeServices(t *testing.T) { &Config{ CloudClients: &cloud.TestCloudClients{}, ClusterFeatures: func() proto.Features { return proto.Features{} }, - KubernetesClient: fake.NewSimpleClientset(objects...), + KubernetesClient: fake.NewClientset(objects...), AccessPoint: getDiscoveryAccessPoint(tlsServer.Auth(), authClient), Matchers: Matchers{ Kubernetes: tt.kubernetesMatchers, @@ -1355,6 +1170,75 @@ func TestDiscoveryKubeServices(t *testing.T) { } } +// TestDiscoveryKubeServicesInterval tests that the poll interval is honored for Kube App Discovery. +func TestDiscoveryKubeServicesInterval(t *testing.T) { + const mainDiscoveryGroup = "main" + + mockKubeServices := []*corev1.Service{ + newMockKubeService("service1", "ns1", "", + map[string]string{"test-label": "testval"}, + nil, + []corev1.ServicePort{{Port: 42, Name: "http", Protocol: corev1.ProtocolTCP}}), + newMockKubeService("service2", "ns2", "", + map[string]string{"test-label": "testval"}, + nil, + []corev1.ServicePort{{Port: 42, Name: "http", Protocol: corev1.ProtocolTCP}}), + } + + // Kubernetes app discovery matcher + kubernetesMatchers := []types.KubernetesMatcher{ + { + Types: []string{"app"}, + Namespaces: []string{types.Wildcard}, + Labels: map[string]utils.Strings{"test-label": {"testval"}}, + }, + } + + synctest.Test(t, func(t *testing.T) { + ctx := t.Context() + + fakeKubeClient := fake.NewClientset(mockKubeServices[0]) + + bk, err := memory.New(memory.Config{}) + require.NoError(t, err) + mockAccessPoint := &mockAuthServer{ + events: local.NewEventsService(bk), + } + + const pollInterval = 1 * time.Minute + discServer, err := New(ctx, + &Config{ + ClusterFeatures: func() proto.Features { return proto.Features{} }, + KubernetesClient: fakeKubeClient, + AccessPoint: mockAccessPoint, + Matchers: Matchers{ + Kubernetes: kubernetesMatchers, + }, + Emitter: &mockEmitter{}, + DiscoveryGroup: mainDiscoveryGroup, + PollInterval: pollInterval, + }) + require.NoError(t, err) + + t.Cleanup(discServer.Stop) + go discServer.Start() + + // Wait for discovery server to complete one iteration of discovering resources + synctest.Wait() + + // Mock access point implementation does not actually create the app resource, but usage event is still logged + require.Len(t, mockAccessPoint.usageEvents, 1) + + // Create new Kube service to be discovered by discServer + fakeKubeClient.CoreV1().Services("ns2").Create(ctx, mockKubeServices[1], v1.CreateOptions{}) + + time.Sleep(pollInterval) + synctest.Wait() + + require.Len(t, mockAccessPoint.usageEvents, 2) + }) +} + func TestDiscoveryInCloudKube(t *testing.T) { const ( mainDiscoveryGroup = "main" @@ -1661,7 +1545,7 @@ func TestDiscoveryInCloudKube(t *testing.T) { &Config{ CloudClients: testCloudClients, ClusterFeatures: func() proto.Features { return proto.Features{} }, - KubernetesClient: fake.NewSimpleClientset(), + KubernetesClient: fake.NewClientset(), AccessPoint: getDiscoveryAccessPoint(tlsServer.Auth(), authClient), Matchers: Matchers{ AWS: tc.awsMatchers, @@ -2563,7 +2447,7 @@ func TestDiscoveryDatabase(t *testing.T) { IntegrationOnlyCredentials: integrationOnlyCredential, CloudClients: testCloudClients, ClusterFeatures: func() proto.Features { return proto.Features{} }, - KubernetesClient: fake.NewSimpleClientset(), + KubernetesClient: fake.NewClientset(), AccessPoint: getDiscoveryAccessPoint(tlsServer.Auth(), authClient), Matchers: Matchers{ AWS: tc.awsMatchers, @@ -2687,7 +2571,7 @@ func TestDiscoveryDatabaseRemovingDiscoveryConfigs(t *testing.T) { &Config{ CloudClients: testCloudClients, ClusterFeatures: func() proto.Features { return proto.Features{} }, - KubernetesClient: fake.NewSimpleClientset(), + KubernetesClient: fake.NewClientset(), AccessPoint: getDiscoveryAccessPoint(tlsServer.Auth(), authClient), Matchers: Matchers{}, Emitter: authClient, @@ -3127,7 +3011,7 @@ func TestAzureVMDiscovery(t *testing.T) { server, err := New(authz.ContextWithUser(context.Background(), identity.I), &Config{ CloudClients: testCloudClients, ClusterFeatures: func() proto.Features { return proto.Features{} }, - KubernetesClient: fake.NewSimpleClientset(), + KubernetesClient: fake.NewClientset(), AccessPoint: getDiscoveryAccessPoint(tlsServer.Auth(), authClient), Matchers: tc.staticMatchers, Emitter: emitter, @@ -3435,7 +3319,7 @@ func TestGCPVMDiscovery(t *testing.T) { server, err := New(authz.ContextWithUser(context.Background(), identity.I), &Config{ CloudClients: testCloudClients, ClusterFeatures: func() proto.Features { return proto.Features{} }, - KubernetesClient: fake.NewSimpleClientset(), + KubernetesClient: fake.NewClientset(), AccessPoint: getDiscoveryAccessPoint(tlsServer.Auth(), authClient), Matchers: tc.staticMatchers, Emitter: emitter, @@ -3658,26 +3542,6 @@ type eksClustersEnroller interface { EnrollEKSClusters(context.Context, *integrationpb.EnrollEKSClustersRequest, ...grpc.CallOption) (*integrationpb.EnrollEKSClustersResponse, error) } -type mockEKSClusterEnroller struct { - resp *integrationpb.EnrollEKSClustersResponse - err error -} - -func (m *mockEKSClusterEnroller) EnrollEKSClusters(ctx context.Context, req *integrationpb.EnrollEKSClustersRequest, opt ...grpc.CallOption) (*integrationpb.EnrollEKSClustersResponse, error) { - ret := &integrationpb.EnrollEKSClustersResponse{ - Results: []*integrationpb.EnrollEKSClusterResult{}, - } - // Filter out non-requested clusters. - for _, clusterName := range req.EksClusterNames { - for _, mockClusterResult := range m.resp.Results { - if clusterName == mockClusterResult.EksClusterName { - ret.Results = append(ret.Results, mockClusterResult) - } - } - } - return ret, m.err -} - type combinedDiscoveryClient struct { *auth.Server eksClustersEnroller diff --git a/lib/srv/discovery/kube_services_watcher.go b/lib/srv/discovery/kube_services_watcher.go index f9fc7ad47a16e..b5b6a0617cbd8 100644 --- a/lib/srv/discovery/kube_services_watcher.go +++ b/lib/srv/discovery/kube_services_watcher.go @@ -22,7 +22,6 @@ import ( "context" "log/slog" "sync" - "time" "github.com/gravitational/trace" @@ -75,10 +74,11 @@ func (s *Server) startKubeAppsWatchers() error { watcher, err := common.NewWatcher(s.ctx, common.WatcherConfig{ FetchersFn: common.StaticFetchers(s.kubeAppsFetchers), - Interval: 5 * time.Minute, Log: s.LegacyLogger.WithField("kind", types.KindApp), DiscoveryGroup: s.DiscoveryGroup, + Interval: s.PollInterval, Origin: types.OriginDiscoveryKubernetes, + Clock: s.clock, }) if err != nil { return trace.Wrap(err) diff --git a/lib/srv/discovery/kube_watcher.go b/lib/srv/discovery/kube_watcher.go index d6cc07aa75ae2..34aec2c9949ab 100644 --- a/lib/srv/discovery/kube_watcher.go +++ b/lib/srv/discovery/kube_watcher.go @@ -82,6 +82,7 @@ func (s *Server) startKubeWatchers() error { DiscoveryGroup: s.DiscoveryGroup, Interval: s.PollInterval, Origin: types.OriginCloud, + Clock: s.clock, }) if err != nil { return trace.Wrap(err)