diff --git a/api/types/constants.go b/api/types/constants.go index d5bbc958a6ddf..7ee59490cd32a 100644 --- a/api/types/constants.go +++ b/api/types/constants.go @@ -628,6 +628,11 @@ const ( // AlertLicenseExpired is an internal label that indicates that the license has expired. AlertLicenseExpired = TeleportInternalLabelPrefix + "license-expired-warning" + + // TeleportInternalDiscoveryGroupName is the label used to store the name of the discovery group + // that the discovered resource is owned by. It is used to differentiate resources + // that belong to different discovery services that operate on different sets of resources. + TeleportInternalDiscoveryGroupName = TeleportInternalLabelPrefix + "discovery-group-name" ) // RequestableResourceKinds lists all Teleport resource kinds users can request access to. diff --git a/lib/config/configuration.go b/lib/config/configuration.go index 94295290bf3d2..41af97166c87b 100644 --- a/lib/config/configuration.go +++ b/lib/config/configuration.go @@ -1215,6 +1215,7 @@ func getInstallerProxyAddr(matcher AzureMatcher, fc *FileConfig) string { func applyDiscoveryConfig(fc *FileConfig, cfg *servicecfg.Config) error { cfg.Discovery.Enabled = fc.Discovery.Enabled() + cfg.Discovery.DiscoveryGroup = fc.Discovery.DiscoveryGroup for _, matcher := range fc.Discovery.AWSMatchers { installParams, err := matcher.InstallParams.Parse() if err != nil { diff --git a/lib/config/fileconf.go b/lib/config/fileconf.go index 0b013a6b3dda7..c2916453d3e3d 100644 --- a/lib/config/fileconf.go +++ b/lib/config/fileconf.go @@ -1516,6 +1516,15 @@ type Discovery struct { // GCPMatchers are used to match GCP resources. GCPMatchers []GCPMatcher `yaml:"gcp,omitempty"` + + // DiscoveryGroup is the name of the discovery group that the current + // discovery service is a part of. + // It is used to filter out discovered resources that belong to another + // discovery services. When running in high availability mode and the agents + // have access to the same cloud resources, this field value must be the same + // for all discovery services. If different agents are used to discover different + // sets of cloud resources, this field must be different for each set of agents. + DiscoveryGroup string `yaml:"discovery_group,omitempty"` } // GCPMatcher matches GCP resources. diff --git a/lib/service/discovery.go b/lib/service/discovery.go index b66184a997a6f..ccce5d93c6232 100644 --- a/lib/service/discovery.go +++ b/lib/service/discovery.go @@ -56,12 +56,13 @@ func (process *TeleportProcess) initDiscoveryService() error { } discoveryService, err := discovery.New(process.ExitContext(), &discovery.Config{ - AWSMatchers: process.Config.Discovery.AWSMatchers, - AzureMatchers: process.Config.Discovery.AzureMatchers, - GCPMatchers: process.Config.Discovery.GCPMatchers, - Emitter: asyncEmitter, - AccessPoint: accessPoint, - Log: process.log, + AWSMatchers: process.Config.Discovery.AWSMatchers, + AzureMatchers: process.Config.Discovery.AzureMatchers, + GCPMatchers: process.Config.Discovery.GCPMatchers, + DiscoveryGroup: process.Config.Discovery.DiscoveryGroup, + Emitter: asyncEmitter, + AccessPoint: accessPoint, + Log: process.log, }) if err != nil { return trace.Wrap(err) diff --git a/lib/service/servicecfg/discovery.go b/lib/service/servicecfg/discovery.go index 80e417c227ab8..a5c0ea549218c 100644 --- a/lib/service/servicecfg/discovery.go +++ b/lib/service/servicecfg/discovery.go @@ -24,6 +24,14 @@ type DiscoveryConfig struct { AzureMatchers []services.AzureMatcher // GCPMatchers are used to match GCP resources for auto discovery. GCPMatchers []services.GCPMatcher + // DiscoveryGroup is the name of the discovery group that the current + // discovery service is a part of. + // It is used to filter out discovered resources that belong to another + // discovery services. When running in high availability mode and the agents + // have access to the same cloud resources, this field value must be the same + // for all discovery services. If different agents are used to discover different + // sets of cloud resources, this field must be different for each set of agents. + DiscoveryGroup string } // IsEmpty validates if the Discovery Service config has no cloud matchers. diff --git a/lib/srv/db/watcher_test.go b/lib/srv/db/watcher_test.go index f8cf06edb3758..b377f4141d27a 100644 --- a/lib/srv/db/watcher_test.go +++ b/lib/srv/db/watcher_test.go @@ -207,6 +207,17 @@ func TestWatcherDynamicResource(t *testing.T) { }) } +func setDiscoveryGroupLabel(r types.ResourceWithLabels, discoveryGroup string) { + staticLabels := r.GetStaticLabels() + if staticLabels == nil { + staticLabels = make(map[string]string) + } + if discoveryGroup != "" { + staticLabels[types.TeleportInternalDiscoveryGroupName] = discoveryGroup + } + r.SetStaticLabels(staticLabels) +} + // TestWatcherCloudFetchers tests usage of discovery database fetchers by the // database service. func TestWatcherCloudFetchers(t *testing.T) { @@ -216,10 +227,10 @@ func TestWatcherCloudFetchers(t *testing.T) { redshiftServerlessDatabase, err := services.NewDatabaseFromRedshiftServerlessWorkgroup(redshiftServerlessWorkgroup, nil) require.NoError(t, err) redshiftServerlessDatabase.SetStatusAWS(redshiftServerlessDatabase.GetAWS()) - + setDiscoveryGroupLabel(redshiftServerlessDatabase, "") // Test an Azure fetcher. azSQLServer, azSQLServerDatabase := makeAzureSQLServer(t, "discovery-azure", "group") - + setDiscoveryGroupLabel(azSQLServerDatabase, "") ctx := context.Background() testCtx := setupTestContext(ctx, t) @@ -270,7 +281,6 @@ func assertReconciledResource(t *testing.T, ch chan types.Databases, databases t case <-time.After(time.Second): t.Fatal("Didn't receive reconcile event after 1s.") } - } func makeStaticDatabase(name string, labels map[string]string, opts ...makeDatabaseOpt) (*types.DatabaseV3, error) { diff --git a/lib/srv/discovery/common/watcher.go b/lib/srv/discovery/common/watcher.go index e4b5c916d8a17..5e225aa549d51 100644 --- a/lib/srv/discovery/common/watcher.go +++ b/lib/srv/discovery/common/watcher.go @@ -43,6 +43,14 @@ type WatcherConfig struct { Log logrus.FieldLogger // Clock is used to control time. Clock clockwork.Clock + // DiscoveryGroup is the name of the discovery group that the current + // discovery service is a part of. + // It is used to filter out discovered resources that belong to another + // discovery services. When running in high availability mode and the agents + // have access to the same cloud resources, this field value must be the same + // for all discovery services. If different agents are used to discover different + // sets of cloud resources, this field must be different for each set of agents. + DiscoveryGroup string } // CheckAndSetDefaults validates the config. @@ -126,6 +134,18 @@ func (w *Watcher) fetchAndSend() { // never return the error otherwise it will impact other watchers. return nil } + if w.cfg.DiscoveryGroup != "" { + // Add the discovery group name to the static labels of each resource. + for _, r := range resources { + staticLabels := r.GetStaticLabels() + if staticLabels == nil { + staticLabels = make(map[string]string) + } + staticLabels[types.TeleportInternalDiscoveryGroupName] = w.cfg.DiscoveryGroup + r.SetStaticLabels(staticLabels) + } + } + fetchersLock.Lock() newFetcherResources = append(newFetcherResources, resources...) fetchersLock.Unlock() diff --git a/lib/srv/discovery/common/watcher_test.go b/lib/srv/discovery/common/watcher_test.go index eab00fe1654da..9c6f8125aefbd 100644 --- a/lib/srv/discovery/common/watcher_test.go +++ b/lib/srv/discovery/common/watcher_test.go @@ -98,9 +98,11 @@ func (m *mockFetcher) Get(ctx context.Context) (types.ResourcesWithLabels, error } return m.resources, nil } + func (m *mockFetcher) ResourceType() string { return m.resourceType } + func (m *mockFetcher) Cloud() string { return m.cloud } diff --git a/lib/srv/discovery/database_watcher.go b/lib/srv/discovery/database_watcher.go index a33ac3c53d037..2ca259fe51e59 100644 --- a/lib/srv/discovery/database_watcher.go +++ b/lib/srv/discovery/database_watcher.go @@ -57,8 +57,9 @@ func (s *Server) startDatabaseWatchers() error { } watcher, err := common.NewWatcher(s.ctx, common.WatcherConfig{ - Fetchers: s.databaseFetchers, - Log: s.Log.WithField("kind", types.KindDatabase), + Fetchers: s.databaseFetchers, + Log: s.Log.WithField("kind", types.KindDatabase), + DiscoveryGroup: s.DiscoveryGroup, }) if err != nil { return trace.Wrap(err) @@ -94,7 +95,7 @@ func (s *Server) getCurrentDatabases() types.ResourcesWithLabelsMap { return nil } - return types.Databases(filterResourcesByOrigin(databases, types.OriginCloud)).AsResources().ToMap() + return types.Databases(filterResources(databases, types.OriginCloud, s.DiscoveryGroup)).AsResources().ToMap() } func (s *Server) onDatabaseCreate(ctx context.Context, resource types.ResourceWithLabels) error { @@ -103,7 +104,18 @@ func (s *Server) onDatabaseCreate(ctx context.Context, resource types.ResourceWi return trace.BadParameter("invalid type received; expected types.Database, received %T", database) } s.Log.Debugf("Creating database %s.", database.GetName()) - return trace.Wrap(s.AccessPoint.CreateDatabase(ctx, database)) + err := s.AccessPoint.CreateDatabase(ctx, database) + // If the resource already exists, it means that the resource was created + // by a previous discovery_service instance that didn't support the discovery + // group feature or the discovery group was changed. + // In this case, we need to update the resource with the + // discovery group label to ensure the user doesn't have to manually delete + // the resource. + // TODO(tigrato): DELETE on 14.0.0 + if trace.IsAlreadyExists(err) { + return trace.Wrap(s.onDatabaseUpdate(ctx, resource)) + } + return trace.Wrap(err) } func (s *Server) onDatabaseUpdate(ctx context.Context, resource types.ResourceWithLabels) error { @@ -124,11 +136,14 @@ func (s *Server) onDatabaseDelete(ctx context.Context, resource types.ResourceWi return trace.Wrap(s.AccessPoint.DeleteDatabase(ctx, database.GetName())) } -func filterResourcesByOrigin[T types.ResourceWithOrigin, S ~[]T](all S, wantOrigin string) (filtered S) { +func filterResources[T types.ResourceWithLabels, S ~[]T](all S, wantOrigin, wantResourceGroup string) (filtered S) { for _, resource := range all { - if resource.Origin() == wantOrigin { - filtered = append(filtered, resource) + resourceDiscoveryGroup, _ := resource.GetLabel(types.TeleportInternalDiscoveryGroupName) + if resource.Origin() != wantOrigin || resourceDiscoveryGroup != wantResourceGroup { + continue } + filtered = append(filtered, resource) + } return } diff --git a/lib/srv/discovery/discovery.go b/lib/srv/discovery/discovery.go index e92fd13eb5b3f..dac9dfee997b1 100644 --- a/lib/srv/discovery/discovery.go +++ b/lib/srv/discovery/discovery.go @@ -62,6 +62,14 @@ type Config struct { Log logrus.FieldLogger // onDatabaseReconcile is called after each database resource reconciliation. onDatabaseReconcile func() + // DiscoveryGroup is the name of the discovery group that the current + // discovery service is a part of. + // It is used to filter out discovered resources that belong to another + // discovery services. When running in high availability mode and the agents + // have access to the same cloud resources, this field value must be the same + // for all discovery services. If different agents are used to discover different + // sets of cloud resources, this field must be different for each set of agents. + DiscoveryGroup string } func (c *Config) CheckAndSetDefaults() error { @@ -407,7 +415,6 @@ func (s *Server) handleEC2Discovery() { } else { s.Log.WithError(err).Error("Failed to enroll discovered EC2 instances.") } - } case <-s.ctx.Done(): s.ec2Watcher.Stop() diff --git a/lib/srv/discovery/discovery_test.go b/lib/srv/discovery/discovery_test.go index 6d7e667d45ee8..367785b7e5427 100644 --- a/lib/srv/discovery/discovery_test.go +++ b/lib/srv/discovery/discovery_test.go @@ -46,6 +46,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "github.com/google/uuid" + "github.com/gravitational/trace" "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" @@ -408,6 +409,10 @@ func TestDiscoveryServer(t *testing.T) { } func TestDiscoveryKube(t *testing.T) { + const ( + mainDiscoveryGroup = "main" + otherDiscoveryGroup = "other" + ) t.Parallel() tcs := []struct { name string @@ -429,8 +434,8 @@ func TestDiscoveryKube(t *testing.T) { }, }, expectedClustersToExistInAuth: []types.KubeCluster{ - mustConvertEKSToKubeCluster(t, eksMockClusters[0]), - mustConvertEKSToKubeCluster(t, eksMockClusters[1]), + mustConvertEKSToKubeCluster(t, eksMockClusters[0], mainDiscoveryGroup), + mustConvertEKSToKubeCluster(t, eksMockClusters[1], mainDiscoveryGroup), }, }, { @@ -444,14 +449,14 @@ func TestDiscoveryKube(t *testing.T) { }, }, expectedClustersToExistInAuth: []types.KubeCluster{ - mustConvertEKSToKubeCluster(t, eksMockClusters[2]), - mustConvertEKSToKubeCluster(t, eksMockClusters[3]), + mustConvertEKSToKubeCluster(t, eksMockClusters[2], mainDiscoveryGroup), + mustConvertEKSToKubeCluster(t, eksMockClusters[3], mainDiscoveryGroup), }, }, { name: "1 cluster in auth server not updated + import 1 prod cluster from EKS", existingKubeClusters: []types.KubeCluster{ - mustConvertEKSToKubeCluster(t, eksMockClusters[0]), + mustConvertEKSToKubeCluster(t, eksMockClusters[0], mainDiscoveryGroup), }, awsMatchers: []services.AWSMatcher{ { @@ -461,15 +466,15 @@ func TestDiscoveryKube(t *testing.T) { }, }, expectedClustersToExistInAuth: []types.KubeCluster{ - mustConvertEKSToKubeCluster(t, eksMockClusters[0]), - mustConvertEKSToKubeCluster(t, eksMockClusters[1]), + mustConvertEKSToKubeCluster(t, eksMockClusters[0], mainDiscoveryGroup), + mustConvertEKSToKubeCluster(t, eksMockClusters[1], mainDiscoveryGroup), }, clustersNotUpdated: []string{"eks-cluster1"}, }, { - name: "1 cluster in auth that no longer matches (deleted) + import 2 prod clusters from EKS", + name: "1 cluster in auth that belongs the same discovery group but has unmatched labels + import 2 prod clusters from EKS", existingKubeClusters: []types.KubeCluster{ - mustConvertEKSToKubeCluster(t, eksMockClusters[3]), + mustConvertEKSToKubeCluster(t, eksMockClusters[3], mainDiscoveryGroup), }, awsMatchers: []services.AWSMatcher{ { @@ -479,16 +484,35 @@ func TestDiscoveryKube(t *testing.T) { }, }, expectedClustersToExistInAuth: []types.KubeCluster{ - mustConvertEKSToKubeCluster(t, eksMockClusters[0]), - mustConvertEKSToKubeCluster(t, eksMockClusters[1]), + mustConvertEKSToKubeCluster(t, eksMockClusters[0], mainDiscoveryGroup), + mustConvertEKSToKubeCluster(t, eksMockClusters[1], mainDiscoveryGroup), }, clustersNotUpdated: []string{}, }, { - name: "1 cluster in auth that matches but must be updated + import 1 prod clusters from EKS", + name: "1 cluster in auth that belongs to a different discovery group + import 2 prod clusters from EKS", + existingKubeClusters: []types.KubeCluster{ + mustConvertEKSToKubeCluster(t, eksMockClusters[3], otherDiscoveryGroup), + }, + awsMatchers: []services.AWSMatcher{ + { + Types: []string{"eks"}, + Regions: []string{"eu-west-1"}, + Tags: map[string]utils.Strings{"env": {"prod"}}, + }, + }, + expectedClustersToExistInAuth: []types.KubeCluster{ + mustConvertEKSToKubeCluster(t, eksMockClusters[3], otherDiscoveryGroup), + mustConvertEKSToKubeCluster(t, eksMockClusters[0], mainDiscoveryGroup), + mustConvertEKSToKubeCluster(t, eksMockClusters[1], mainDiscoveryGroup), + }, + clustersNotUpdated: []string{}, + }, + { + name: "1 cluster in auth that must be updated + import 1 prod clusters from EKS", existingKubeClusters: []types.KubeCluster{ // add an extra static label to force update in auth server - modifyKubeCluster(mustConvertEKSToKubeCluster(t, eksMockClusters[1])), + modifyKubeCluster(mustConvertEKSToKubeCluster(t, eksMockClusters[1], mainDiscoveryGroup)), }, awsMatchers: []services.AWSMatcher{ { @@ -498,8 +522,8 @@ func TestDiscoveryKube(t *testing.T) { }, }, expectedClustersToExistInAuth: []types.KubeCluster{ - mustConvertEKSToKubeCluster(t, eksMockClusters[0]), - mustConvertEKSToKubeCluster(t, eksMockClusters[1]), + mustConvertEKSToKubeCluster(t, eksMockClusters[0], mainDiscoveryGroup), + mustConvertEKSToKubeCluster(t, eksMockClusters[1], mainDiscoveryGroup), }, clustersNotUpdated: []string{}, }, @@ -507,8 +531,8 @@ func TestDiscoveryKube(t *testing.T) { name: "2 clusters in auth that matches but one must be updated + import 2 prod clusters, 1 from EKS and other from AKS", existingKubeClusters: []types.KubeCluster{ // add an extra static label to force update in auth server - modifyKubeCluster(mustConvertEKSToKubeCluster(t, eksMockClusters[1])), - mustConvertAKSToKubeCluster(t, aksMockClusters["group1"][0]), + modifyKubeCluster(mustConvertEKSToKubeCluster(t, eksMockClusters[1], mainDiscoveryGroup)), + mustConvertAKSToKubeCluster(t, aksMockClusters["group1"][0], mainDiscoveryGroup), }, awsMatchers: []services.AWSMatcher{ { @@ -527,10 +551,10 @@ func TestDiscoveryKube(t *testing.T) { }, }, expectedClustersToExistInAuth: []types.KubeCluster{ - mustConvertEKSToKubeCluster(t, eksMockClusters[0]), - mustConvertEKSToKubeCluster(t, eksMockClusters[1]), - mustConvertAKSToKubeCluster(t, aksMockClusters["group1"][0]), - mustConvertAKSToKubeCluster(t, aksMockClusters["group1"][1]), + mustConvertEKSToKubeCluster(t, eksMockClusters[0], mainDiscoveryGroup), + mustConvertEKSToKubeCluster(t, eksMockClusters[1], mainDiscoveryGroup), + mustConvertAKSToKubeCluster(t, aksMockClusters["group1"][0], mainDiscoveryGroup), + mustConvertAKSToKubeCluster(t, aksMockClusters["group1"][1], mainDiscoveryGroup), }, clustersNotUpdated: []string{"aks-cluster1"}, }, @@ -546,8 +570,8 @@ func TestDiscoveryKube(t *testing.T) { }, }, expectedClustersToExistInAuth: []types.KubeCluster{ - mustConvertGKEToKubeCluster(t, gkeMockClusters[0]), - mustConvertGKEToKubeCluster(t, gkeMockClusters[1]), + mustConvertGKEToKubeCluster(t, gkeMockClusters[0], mainDiscoveryGroup), + mustConvertGKEToKubeCluster(t, gkeMockClusters[1], mainDiscoveryGroup), }, }, } @@ -621,13 +645,14 @@ func TestDiscoveryKube(t *testing.T) { discServer, err := New( ctx, &Config{ - Clients: &testClients, - AccessPoint: tlsServer.Auth(), - AWSMatchers: tc.awsMatchers, - AzureMatchers: tc.azureMatchers, - GCPMatchers: tc.gcpMatchers, - Emitter: authClient, - Log: logger, + Clients: &testClients, + AccessPoint: tlsServer.Auth(), + AWSMatchers: tc.awsMatchers, + AzureMatchers: tc.azureMatchers, + GCPMatchers: tc.gcpMatchers, + Emitter: authClient, + Log: logger, + DiscoveryGroup: mainDiscoveryGroup, }) require.NoError(t, err) @@ -825,15 +850,17 @@ var eksMockClusters = []*eks.Cluster{ }, } -func mustConvertEKSToKubeCluster(t *testing.T, eksCluster *eks.Cluster) types.KubeCluster { +func mustConvertEKSToKubeCluster(t *testing.T, eksCluster *eks.Cluster, discoveryGroup string) types.KubeCluster { cluster, err := services.NewKubeClusterFromAWSEKS(eksCluster) require.NoError(t, err) + cluster.GetStaticLabels()[types.TeleportInternalDiscoveryGroupName] = discoveryGroup return cluster } -func mustConvertAKSToKubeCluster(t *testing.T, azureCluster *azure.AKSCluster) types.KubeCluster { +func mustConvertAKSToKubeCluster(t *testing.T, azureCluster *azure.AKSCluster, discoveryGroup string) types.KubeCluster { cluster, err := services.NewKubeClusterFromAzureAKS(azureCluster) require.NoError(t, err) + cluster.GetStaticLabels()[types.TeleportInternalDiscoveryGroupName] = discoveryGroup return cluster } @@ -903,9 +930,10 @@ var gkeMockClusters = []gcp.GKECluster{ }, } -func mustConvertGKEToKubeCluster(t *testing.T, gkeCluster gcp.GKECluster) types.KubeCluster { +func mustConvertGKEToKubeCluster(t *testing.T, gkeCluster gcp.GKECluster, discoveryGroup string) types.KubeCluster { cluster, err := services.NewKubeClusterFromGCPGKE(gkeCluster) require.NoError(t, err) + cluster.GetStaticLabels()[types.TeleportInternalDiscoveryGroupName] = discoveryGroup return cluster } @@ -919,9 +947,12 @@ func (m *mockGKEAPI) ListClusters(ctx context.Context, projectID string, locatio } func TestDiscoveryDatabase(t *testing.T) { - awsRedshiftResource, awsRedshiftDB := makeRedshiftCluster(t, "aws-redshift", "us-east-1") - awsRDSInstance, awsRDSDB := makeRDSInstance(t, "aws-rds", "us-west-1") - azRedisResource, azRedisDB := makeAzureRedisServer(t, "az-redis", "sub1", "group1", "East US") + const ( + mainDiscoveryGroup = "main" + ) + awsRedshiftResource, awsRedshiftDB := makeRedshiftCluster(t, "aws-redshift", "us-east-1", mainDiscoveryGroup) + awsRDSInstance, awsRDSDB := makeRDSInstance(t, "aws-rds", "us-west-1", mainDiscoveryGroup) + azRedisResource, azRedisDB := makeAzureRedisServer(t, "az-redis", "sub1", "group1", "East US", mainDiscoveryGroup) role := services.AssumeRole{RoleARN: "arn:aws:iam::123456789012:role/test-role", ExternalID: "test123"} awsRDSDBWithRole := awsRDSDB.Copy() @@ -991,10 +1022,15 @@ func TestDiscoveryDatabase(t *testing.T) { mustNewDatabase(t, types.Metadata{ Name: "aws-redshift", Description: "should be updated", - Labels: map[string]string{types.OriginLabel: types.OriginCloud}, + Labels: map[string]string{types.OriginLabel: types.OriginCloud, types.TeleportInternalDiscoveryGroupName: mainDiscoveryGroup}, }, types.DatabaseSpecV3{ Protocol: "redis", URI: "should.be.updated.com:12345", + AWS: types.AWS{ + Redshift: types.Redshift{ + ClusterID: "aws-redshift", + }, + }, }), }, awsMatchers: []services.AWSMatcher{{ @@ -1010,7 +1046,7 @@ func TestDiscoveryDatabase(t *testing.T) { mustNewDatabase(t, types.Metadata{ Name: "aws-rds", Description: "should be updated", - Labels: map[string]string{types.OriginLabel: types.OriginCloud}, + Labels: map[string]string{types.OriginLabel: types.OriginCloud, types.TeleportInternalDiscoveryGroupName: mainDiscoveryGroup}, }, types.DatabaseSpecV3{ Protocol: "postgres", URI: "should.be.updated.com:12345", @@ -1029,11 +1065,11 @@ func TestDiscoveryDatabase(t *testing.T) { existingDatabases: []types.Database{ mustNewDatabase(t, types.Metadata{ Name: "aws-redshift", - Description: "should be deleted", + Description: "should not be deleted", Labels: map[string]string{types.OriginLabel: types.OriginCloud}, }, types.DatabaseSpecV3{ Protocol: "redis", - URI: "should.be.deleted.com:12345", + URI: "should.not.be.deleted.com:12345", }), }, awsMatchers: []services.AWSMatcher{{ @@ -1041,7 +1077,16 @@ func TestDiscoveryDatabase(t *testing.T) { Tags: map[string]utils.Strings{"do-not-match": {"do-not-match"}}, Regions: []string{"us-east-1"}, }}, - expectDatabases: []types.Database{}, + expectDatabases: []types.Database{ + mustNewDatabase(t, types.Metadata{ + Name: "aws-redshift", + Description: "should not be deleted", + Labels: map[string]string{types.OriginLabel: types.OriginCloud}, + }, types.DatabaseSpecV3{ + Protocol: "redis", + URI: "should.not.be.deleted.com:12345", + }), + }, }, { name: "skip self-hosted database", @@ -1114,6 +1159,7 @@ func TestDiscoveryDatabase(t *testing.T) { onDatabaseReconcile: func() { waitForReconcile <- struct{}{} }, + DiscoveryGroup: mainDiscoveryGroup, }) require.NoError(t, err) @@ -1136,7 +1182,7 @@ func TestDiscoveryDatabase(t *testing.T) { } } -func makeRDSInstance(t *testing.T, name, region string) (*rds.DBInstance, types.Database) { +func makeRDSInstance(t *testing.T, name, region string, discoveryGroup string) (*rds.DBInstance, types.Database) { instance := &rds.DBInstance{ DBInstanceArn: aws.String(fmt.Sprintf("arn:aws:rds:%v:123456789012:db:%v", region, name)), DBInstanceIdentifier: aws.String(name), @@ -1150,10 +1196,13 @@ func makeRDSInstance(t *testing.T, name, region string) (*rds.DBInstance, types. } database, err := services.NewDatabaseFromRDSInstance(instance) require.NoError(t, err) + staticLabels := database.GetStaticLabels() + staticLabels[types.TeleportInternalDiscoveryGroupName] = discoveryGroup + database.SetStaticLabels(staticLabels) return instance, database } -func makeRedshiftCluster(t *testing.T, name, region string) (*redshift.Cluster, types.Database) { +func makeRedshiftCluster(t *testing.T, name, region string, discoveryGroup string) (*redshift.Cluster, types.Database) { t.Helper() cluster := &redshift.Cluster{ ClusterIdentifier: aws.String(name), @@ -1168,10 +1217,13 @@ func makeRedshiftCluster(t *testing.T, name, region string) (*redshift.Cluster, database, err := services.NewDatabaseFromRedshiftCluster(cluster) require.NoError(t, err) database.SetOrigin(types.OriginCloud) + staticLabels := database.GetStaticLabels() + staticLabels[types.TeleportInternalDiscoveryGroupName] = discoveryGroup + database.SetStaticLabels(staticLabels) return cluster, database } -func makeAzureRedisServer(t *testing.T, name, subscription, group, region string) (*armredis.ResourceInfo, types.Database) { +func makeAzureRedisServer(t *testing.T, name, subscription, group, region string, discoveryGroup string) (*armredis.ResourceInfo, types.Database) { t.Helper() resourceInfo := &armredis.ResourceInfo{ Name: to.Ptr(name), @@ -1187,6 +1239,9 @@ func makeAzureRedisServer(t *testing.T, name, subscription, group, region string database, err := services.NewDatabaseFromAzureRedis(resourceInfo) require.NoError(t, err) database.SetOrigin(types.OriginCloud) + staticLabels := database.GetStaticLabels() + staticLabels[types.TeleportInternalDiscoveryGroupName] = discoveryGroup + database.SetStaticLabels(staticLabels) return resourceInfo, database } @@ -1428,3 +1483,79 @@ func TestAzureVMDiscovery(t *testing.T) { } } + +// TestServer_onCreate tests the update of the discovery_group of a resource +// when it differs from the one in the database. +// TODO(tigrato): DELETE in 14.0.0 +func TestServer_onCreate(t *testing.T) { + _, awsRedshiftDB := makeRedshiftCluster(t, "aws-redshift", "us-east-1", "test") + accessPoint := &fakeAccessPoint{} + s := &Server{ + Config: &Config{ + AccessPoint: accessPoint, + Log: logrus.New(), + }, + } + type args struct { + resource types.ResourceWithLabels + onCreate func(context.Context, types.ResourceWithLabels) error + } + tests := []struct { + name string + args args + verify func(t *testing.T, accessPoint *fakeAccessPoint) + }{ + { + name: "onCreate update kube", + args: args{ + resource: mustConvertEKSToKubeCluster(t, eksMockClusters[0], "test-cluster"), + onCreate: s.onKubeCreate, + }, + verify: func(t *testing.T, accessPoint *fakeAccessPoint) { + require.True(t, accessPoint.updateKube) + }, + }, + { + name: "onCreate update database", + args: args{ + resource: awsRedshiftDB, + onCreate: s.onDatabaseCreate, + }, + verify: func(t *testing.T, accessPoint *fakeAccessPoint) { + require.True(t, accessPoint.updateDatabase) + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.args.onCreate(context.Background(), tt.args.resource) + require.NoError(t, err) + tt.verify(t, accessPoint) + }) + } +} + +type fakeAccessPoint struct { + auth.DiscoveryAccessPoint + updateKube bool + updateDatabase bool +} + +func (f *fakeAccessPoint) CreateDatabase(ctx context.Context, database types.Database) error { + return trace.AlreadyExists("already exists") +} + +func (f *fakeAccessPoint) UpdateDatabase(ctx context.Context, database types.Database) error { + f.updateDatabase = true + return nil +} + +func (f *fakeAccessPoint) CreateKubernetesCluster(ctx context.Context, cluster types.KubeCluster) error { + return trace.AlreadyExists("already exists") +} + +// UpdateKubernetesCluster updates existing kubernetes cluster resource. +func (f *fakeAccessPoint) UpdateKubernetesCluster(ctx context.Context, cluster types.KubeCluster) error { + f.updateKube = true + return nil +} diff --git a/lib/srv/discovery/fetchers/aks.go b/lib/srv/discovery/fetchers/aks.go index 53835ce9afc1a..a9eb2af088c4c 100644 --- a/lib/srv/discovery/fetchers/aks.go +++ b/lib/srv/discovery/fetchers/aks.go @@ -88,7 +88,12 @@ func (a *aksFetcher) Get(ctx context.Context) (types.ResourcesWithLabels, error) a.Log.Debugf("Cluster region %q does not match with allowed values.", cluster.Location) continue } - if match, reason, err := services.MatchLabels(a.FilterLabels, cluster.Tags); err != nil { + kubeCluster, err := services.NewKubeClusterFromAzureAKS(cluster) + if err != nil { + a.Log.WithError(err).Warn("Unable to create Kubernetes cluster from azure.AKSCluster.") + continue + } + if match, reason, err := services.MatchLabels(a.FilterLabels, kubeCluster.GetAllLabels()); err != nil { a.Log.WithError(err).Warn("Unable to match AKS cluster labels against match labels.") continue } else if !match { @@ -96,11 +101,6 @@ func (a *aksFetcher) Get(ctx context.Context) (types.ResourcesWithLabels, error) continue } - kubeCluster, err := services.NewKubeClusterFromAzureAKS(cluster) - if err != nil { - a.Log.WithError(err).Warn("Unable to create Kubernetes cluster from azure.AKSCluster.") - continue - } kubeClusters = append(kubeClusters, kubeCluster) } return kubeClusters.AsResources(), nil diff --git a/lib/srv/discovery/fetchers/db/azure_dbserver.go b/lib/srv/discovery/fetchers/db/azure_dbserver.go index fa14af8709bc7..8247527c2fff9 100644 --- a/lib/srv/discovery/fetchers/db/azure_dbserver.go +++ b/lib/srv/discovery/fetchers/db/azure_dbserver.go @@ -37,8 +37,7 @@ func newAzurePostgresFetcher(config azureFetcherConfig) (common.Fetcher, error) } // azureDBServerPlugin implements azureFetcherPlugin for MySQL and PostgresSQL. -type azureDBServerPlugin struct { -} +type azureDBServerPlugin struct{} func (p *azureDBServerPlugin) GetListClient(cfg *azureFetcherConfig, subID string) (azure.DBServersClient, error) { switch cfg.Type { diff --git a/lib/srv/discovery/fetchers/db/azure_managed_sql.go b/lib/srv/discovery/fetchers/db/azure_managed_sql.go index 140180e9a9acf..04fb1c0588a8b 100644 --- a/lib/srv/discovery/fetchers/db/azure_managed_sql.go +++ b/lib/srv/discovery/fetchers/db/azure_managed_sql.go @@ -32,8 +32,7 @@ func newAzureManagedSQLServerFetcher(config azureFetcherConfig) (common.Fetcher, // azureManagedSQLServerFetcher implements azureFetcherPlugin for Azure Managed // SQL Servers. -type azureManagedSQLServerFetcher struct { -} +type azureManagedSQLServerFetcher struct{} func (f *azureManagedSQLServerFetcher) GetListClient(cfg *azureFetcherConfig, subID string) (azure.ManagedSQLServerClient, error) { client, err := cfg.AzureClients.GetAzureManagedSQLServerClient(subID) diff --git a/lib/srv/discovery/fetchers/db/azure_mysql_flex.go b/lib/srv/discovery/fetchers/db/azure_mysql_flex.go index 0082c5d259d28..8dfbda55fb0ef 100644 --- a/lib/srv/discovery/fetchers/db/azure_mysql_flex.go +++ b/lib/srv/discovery/fetchers/db/azure_mysql_flex.go @@ -31,8 +31,7 @@ func newAzureMySQLFlexServerFetcher(config azureFetcherConfig) (common.Fetcher, } // azureMySQLFlexServerFetcher implements azureFetcherPlugin for Azure MySQL Flexible server. -type azureMySQLFlexServerFetcher struct { -} +type azureMySQLFlexServerFetcher struct{} // GetListClient returns a server-listing client for Azure MySQL Flexible server. func (f *azureMySQLFlexServerFetcher) GetListClient(cfg *azureFetcherConfig, subID string) (azure.MySQLFlexServersClient, error) { diff --git a/lib/srv/discovery/fetchers/db/azure_postgres_flex.go b/lib/srv/discovery/fetchers/db/azure_postgres_flex.go index 9ea8d03e8e603..1c2ea9ec4403d 100644 --- a/lib/srv/discovery/fetchers/db/azure_postgres_flex.go +++ b/lib/srv/discovery/fetchers/db/azure_postgres_flex.go @@ -31,8 +31,7 @@ func newAzurePostgresFlexServerFetcher(config azureFetcherConfig) (common.Fetche } // newAzurePostgresFlexServerFetcher implements azureFetcherPlugin for Azure PostgreSQL Flexible server. -type azurePostgresFlexServerFetcher struct { -} +type azurePostgresFlexServerFetcher struct{} // GetListClient returns a server-listing client for Azure PostgreSQL Flexible server. func (f *azurePostgresFlexServerFetcher) GetListClient(cfg *azureFetcherConfig, subID string) (azure.PostgresFlexServersClient, error) { diff --git a/lib/srv/discovery/fetchers/db/azure_redis.go b/lib/srv/discovery/fetchers/db/azure_redis.go index 3bc8b6701a76e..067b1fb0e7b9c 100644 --- a/lib/srv/discovery/fetchers/db/azure_redis.go +++ b/lib/srv/discovery/fetchers/db/azure_redis.go @@ -33,8 +33,7 @@ func newAzureRedisFetcher(config azureFetcherConfig) (common.Fetcher, error) { } // azureRedisPlugin implements azureFetcherPlugin for Azure Redis. -type azureRedisPlugin struct { -} +type azureRedisPlugin struct{} func (p *azureRedisPlugin) GetListClient(cfg *azureFetcherConfig, subID string) (azure.RedisClient, error) { client, err := cfg.AzureClients.GetAzureRedisClient(subID) diff --git a/lib/srv/discovery/fetchers/db/azure_redis_enterprise.go b/lib/srv/discovery/fetchers/db/azure_redis_enterprise.go index 9cc33c0c5fbbd..ef1996514b407 100644 --- a/lib/srv/discovery/fetchers/db/azure_redis_enterprise.go +++ b/lib/srv/discovery/fetchers/db/azure_redis_enterprise.go @@ -32,8 +32,7 @@ func newAzureRedisEnterpriseFetcher(config azureFetcherConfig) (common.Fetcher, return newAzureFetcher[*azure.RedisEnterpriseDatabase, azure.RedisEnterpriseClient](config, &azureRedisEnterprisePlugin{}) } -type azureRedisEnterprisePlugin struct { -} +type azureRedisEnterprisePlugin struct{} func (p *azureRedisEnterprisePlugin) GetListClient(cfg *azureFetcherConfig, subID string) (azure.RedisEnterpriseClient, error) { client, err := cfg.AzureClients.GetAzureRedisEnterpriseClient(subID) diff --git a/lib/srv/discovery/fetchers/db/azure_sql.go b/lib/srv/discovery/fetchers/db/azure_sql.go index ab0ec99ee0a0c..79d8262a69d35 100644 --- a/lib/srv/discovery/fetchers/db/azure_sql.go +++ b/lib/srv/discovery/fetchers/db/azure_sql.go @@ -31,8 +31,7 @@ func newAzureSQLServerFetcher(config azureFetcherConfig) (common.Fetcher, error) } // azureSQLServerFetcher implements azureFetcherPlugin for Azure SQL Servers. -type azureSQLServerFetcher struct { -} +type azureSQLServerFetcher struct{} func (f *azureSQLServerFetcher) GetListClient(cfg *azureFetcherConfig, subID string) (azure.SQLServerClient, error) { client, err := cfg.AzureClients.GetAzureSQLServerClient(subID) diff --git a/lib/srv/discovery/fetchers/eks.go b/lib/srv/discovery/fetchers/eks.go index e5d66d77b2afa..c0c0c0c42bcc5 100644 --- a/lib/srv/discovery/fetchers/eks.go +++ b/lib/srv/discovery/fetchers/eks.go @@ -148,19 +148,6 @@ func (a *eksFetcher) String() string { a.Region, a.FilterLabels) } -// awsEKSTagsToLabels converts EKS tags to a labels map. -func (a *eksFetcher) awsEKSTagsToLabels(tags map[string]*string) map[string]string { - labels := make(map[string]string) - for key, val := range tags { - if types.IsValidLabelKey(key) { - labels[key] = aws.StringValue(val) - } else { - a.Log.Debugf("Skipping EKS tag %q, not a valid label key.", key) - } - } - return labels -} - // getMatchingKubeCluster extracts EKS cluster Tags and cluster status from EKS and checks if the cluster matches // the AWS matcher filtering labels. It also excludes EKS clusters that are not ready. // If any cluster does not match the filtering criteria, this function returns a “trace.CompareFailed“ error @@ -183,15 +170,16 @@ func (a *eksFetcher) getMatchingKubeCluster(ctx context.Context, clusterName str return nil, trace.CompareFailed("EKS cluster %q not enrolled due to its current status: %s", clusterName, st) } - if match, reason, err := services.MatchLabels(a.FilterLabels, a.awsEKSTagsToLabels(rsp.Cluster.Tags)); err != nil { + cluster, err := services.NewKubeClusterFromAWSEKS(rsp.Cluster) + if err != nil { + return nil, trace.WrapWithMessage(err, "Unable to convert eks.Cluster cluster into types.KubernetesClusterV3.") + } + + if match, reason, err := services.MatchLabels(a.FilterLabels, cluster.GetAllLabels()); err != nil { return nil, trace.WrapWithMessage(err, "Unable to match EKS cluster labels against match labels.") } else if !match { return nil, trace.CompareFailed("EKS cluster %q labels does not match the selector: %s", clusterName, reason) } - cluster, err := services.NewKubeClusterFromAWSEKS(rsp.Cluster) - if err != nil { - return nil, trace.WrapWithMessage(err, "Unable to convert eks.Cluster cluster into types.KubernetesClusterV3.") - } return cluster, nil } diff --git a/lib/srv/discovery/fetchers/gke.go b/lib/srv/discovery/fetchers/gke.go index 937909efa96a4..acccc17835347 100644 --- a/lib/srv/discovery/fetchers/gke.go +++ b/lib/srv/discovery/fetchers/gke.go @@ -121,27 +121,17 @@ func (a *gkeFetcher) String() string { a.ProjectID, a.Location, a.FilterLabels) } -// gcpLabelsToTeleportLabels converts GKE labels to a labels map. -func (a *gkeFetcher) gcpLabelsToTeleportLabels(tags map[string]string) map[string]string { - labels := make(map[string]string) - for key, val := range tags { - if types.IsValidLabelKey(key) { - labels[key] = val - } else { - a.Log.Debugf("Skipping GKE tag %q, not a valid label key.", key) - } - } - return labels -} - // getMatchingKubeCluster checks if the GKE cluster tags matches the GCP matcher // filtering labels. It also excludes GKE clusters that are not Running/Degraded/Reconciling. // If any cluster does not match the filtering criteria, this function returns // a “trace.CompareFailed“ error to distinguish filtering and operational errors. func (a *gkeFetcher) getMatchingKubeCluster(gkeCluster gcp.GKECluster) (types.KubeCluster, error) { - gkeCluster.Labels = a.gcpLabelsToTeleportLabels(gkeCluster.Labels) + cluster, err := services.NewKubeClusterFromGCPGKE(gkeCluster) + if err != nil { + return nil, trace.WrapWithMessage(err, "Unable to create types.KubernetesClusterV3 cluster from gcp.GKECluster.") + } - if match, reason, err := services.MatchLabels(a.FilterLabels, gkeCluster.Labels); err != nil { + if match, reason, err := services.MatchLabels(a.FilterLabels, cluster.GetAllLabels()); err != nil { return nil, trace.WrapWithMessage(err, "Unable to match GKE cluster labels against match labels.") } else if !match { return nil, trace.CompareFailed("GKE cluster %q labels does not match the selector: %s", gkeCluster.Name, reason) @@ -153,9 +143,5 @@ func (a *gkeFetcher) getMatchingKubeCluster(gkeCluster gcp.GKECluster) (types.Ku return nil, trace.CompareFailed("GKE cluster %q not enrolled due to its current status: %s", gkeCluster.Name, st) } - cluster, err := services.NewKubeClusterFromGCPGKE(gkeCluster) - if err != nil { - return nil, trace.WrapWithMessage(err, "Unable to create types.KubernetesClusterV3 cluster from gcp.GKECluster.") - } return cluster, nil } diff --git a/lib/srv/discovery/kube_watcher.go b/lib/srv/discovery/kube_watcher.go index dd762bf28cc0b..1a5482e86d587 100644 --- a/lib/srv/discovery/kube_watcher.go +++ b/lib/srv/discovery/kube_watcher.go @@ -46,16 +46,7 @@ func (s *Server) startKubeWatchers() error { return nil } - // filter only discover clusters. - var kubeClusters types.KubeClusters - for _, kc := range kcs { - if kc.Origin() != types.OriginCloud { - continue - } - kubeClusters = append(kubeClusters, kc) - } - - return kubeClusters.AsResources().ToMap() + return types.KubeClusters(filterResources(kcs, types.OriginCloud, s.DiscoveryGroup)).AsResources().ToMap() }, GetNewResources: func() types.ResourcesWithLabelsMap { mu.Lock() @@ -73,8 +64,9 @@ func (s *Server) startKubeWatchers() error { } watcher, err := common.NewWatcher(s.ctx, common.WatcherConfig{ - Fetchers: s.kubeFetchers, - Log: s.Log.WithField("kind", types.KindKubernetesCluster), + Fetchers: s.kubeFetchers, + Log: s.Log.WithField("kind", types.KindKubernetesCluster), + DiscoveryGroup: s.DiscoveryGroup, }) if err != nil { return trace.Wrap(err) @@ -107,7 +99,18 @@ func (s *Server) onKubeCreate(ctx context.Context, rwl types.ResourceWithLabels) return trace.BadParameter("invalid type received; expected types.KubeCluster, received %T", kubeCluster) } s.Log.Debugf("Creating kube_cluster %s.", kubeCluster.GetName()) - return trace.Wrap(s.AccessPoint.CreateKubernetesCluster(ctx, kubeCluster)) + err := s.AccessPoint.CreateKubernetesCluster(ctx, kubeCluster) + // If the resource already exists, it means that the resource was created + // by a previous discovery_service instance that didn't support the discovery + // group feature or the discovery group was changed. + // In this case, we need to update the resource with the + // discovery group label to ensure the user doesn't have to manually delete + // the resource. + // TODO(tigrato): DELETE on 14.0.0 + if trace.IsAlreadyExists(err) { + return trace.Wrap(s.onKubeUpdate(ctx, rwl)) + } + return trace.Wrap(err) } func (s *Server) onKubeUpdate(ctx context.Context, rwl types.ResourceWithLabels) error {