diff --git a/integration/ec2_test.go b/integration/ec2_test.go index 3987c827cc577..bafc938df761d 100644 --- a/integration/ec2_test.go +++ b/integration/ec2_test.go @@ -408,24 +408,10 @@ func TestEC2Labels(t *testing.T) { kubes, err = authServer.GetKubernetesServers(ctx) assert.NoError(t, err) - // dedupClusters is required because GetKubernetesServers returns duplicated servers - // because it lists the KindKubeServer and KindKubeService. - // We must remove this once legacy heartbeat is removed. - // DELETE IN 13.0.0 (tigrato) - var dedupClusters []types.KubeServer - dedup := map[string]struct{}{} - for _, kube := range kubes { - if _, ok := dedup[kube.GetName()]; ok { - continue - } - dedup[kube.GetName()] = struct{}{} - dedupClusters = append(dedupClusters, kube) - } - assert.Len(t, nodes, 1) assert.Len(t, apps, 1) assert.Len(t, databases, 1) - assert.Len(t, dedupClusters, 1) + assert.Len(t, kubes, 1) }, 10*time.Second, time.Second) tagName := fmt.Sprintf("%s/Name", labels.AWSLabelNamespace) diff --git a/lib/auth/api.go b/lib/auth/api.go index 8e26ec9cd9e33..de5153969f27e 100644 --- a/lib/auth/api.go +++ b/lib/auth/api.go @@ -704,6 +704,8 @@ type ReadDiscoveryAccessPoint interface { // GetDatabases returns all database resources. GetDatabases(ctx context.Context) ([]types.Database, error) + // GetDatabase returns a database resource with the given name if it exists. + GetDatabase(ctx context.Context, name string) (types.Database, error) // GetApps returns all application resources. GetApps(context.Context) ([]types.Application, error) diff --git a/lib/srv/discovery/database_watcher.go b/lib/srv/discovery/database_watcher.go index 333fbd130e435..c49b32f4eaeb5 100644 --- a/lib/srv/discovery/database_watcher.go +++ b/lib/srv/discovery/database_watcher.go @@ -138,14 +138,11 @@ func (s *Server) getCurrentDatabases() map[string]types.Database { func (s *Server) onDatabaseCreate(ctx context.Context, database types.Database) error { s.Log.Debugf("Creating database %s.", database.GetName()) err := s.AccessPoint.CreateDatabase(ctx, database) - // If the resource already exists, it means that the resource was created - // by a previous discovery_service instance that didn't support the discovery - // group feature or the discovery group was changed. - // In this case, we need to update the resource with the - // discovery group label to ensure the user doesn't have to manually delete - // the resource. - // TODO(tigrato): DELETE on 15.0.0 - if trace.IsAlreadyExists(err) { + // If the database already exists but has an empty discovery group, update it. + if trace.IsAlreadyExists(err) && s.updatesEmptyDiscoveryGroup( + func() (types.ResourceWithLabels, error) { + return s.AccessPoint.GetDatabase(ctx, database.GetName()) + }) { return trace.Wrap(s.onDatabaseUpdate(ctx, database, nil)) } if err != nil { diff --git a/lib/srv/discovery/discovery.go b/lib/srv/discovery/discovery.go index 2afe677157298..dfa7c6a449089 100644 --- a/lib/srv/discovery/discovery.go +++ b/lib/srv/discovery/discovery.go @@ -195,6 +195,12 @@ kubernetes matchers are present.`) } c.Log = c.Log.WithField(trace.Component, teleport.ComponentDiscovery) + + if c.DiscoveryGroup == "" { + c.Log.Warn("discovery_service.discovery_group is not set. This field is required for the discovery service to work properly.\n" + + "Please set discovery_service.discovery_group according to the documentation: https://goteleport.com/docs/reference/config/#discovery-service") + } + c.Matchers.Azure = services.SimplifyAzureMatchers(c.Matchers.Azure) return nil } @@ -1466,3 +1472,15 @@ func splitMatchers[T types.Matcher](matchers []T, matcherTypeCheck func(string) } return } + +func (s *Server) updatesEmptyDiscoveryGroup(getter func() (types.ResourceWithLabels, error)) bool { + if s.DiscoveryGroup == "" { + return false + } + old, err := getter() + if err != nil { + return false + } + oldDiscoveryGroup, _ := old.GetLabel(types.TeleportInternalDiscoveryGroupName) + return oldDiscoveryGroup == "" +} diff --git a/lib/srv/discovery/discovery_test.go b/lib/srv/discovery/discovery_test.go index 6c08f359208c7..cfbe5e12d2978 100644 --- a/lib/srv/discovery/discovery_test.go +++ b/lib/srv/discovery/discovery_test.go @@ -518,7 +518,6 @@ func TestDiscoveryServer(t *testing.T) { instances := installer.GetInstalledInstances() slices.Sort(instances) return slices.Equal(tc.wantInstalledInstances, instances) && len(tc.wantInstalledInstances) == reporter.ResourceCreateEventCount() - }, 5000*time.Millisecond, 50*time.Millisecond) } else { require.Never(t, func() bool { @@ -570,8 +569,10 @@ func TestDiscoveryKubeServices(t *testing.T) { mockKubeServices := []*corev1.Service{ newMockKubeService("service1", "ns1", "", map[string]string{"test-label": "testval"}, nil, []corev1.ServicePort{{Port: 42, Name: "http", Protocol: corev1.ProtocolTCP}}), - newMockKubeService("service2", "ns2", "", map[string]string{"test-label": "testval", - "test-label2": "testval2"}, nil, []corev1.ServicePort{{Port: 42, Name: "custom", AppProtocol: &appProtocolHTTP, Protocol: corev1.ProtocolTCP}}), + newMockKubeService("service2", "ns2", "", map[string]string{ + "test-label": "testval", + "test-label2": "testval2", + }, nil, []corev1.ServicePort{{Port: 42, Name: "custom", AppProtocol: &appProtocolHTTP, Protocol: corev1.ProtocolTCP}}), } app1 := mustConvertKubeServiceToApp(t, mainDiscoveryGroup, "http", mockKubeServices[0], mockKubeServices[0].Spec.Ports[0]) @@ -718,7 +719,6 @@ func TestDiscoveryKubeServices(t *testing.T) { } } return true - }, 5*time.Second, 200*time.Millisecond) }) } @@ -2599,21 +2599,24 @@ func TestGCPVMDiscovery(t *testing.T) { return len(installer.GetInstalledInstances()) > 0 || reporter.ResourceCreateEventCount() > 0 }, 500*time.Millisecond, 50*time.Millisecond) } - }) } } // TestServer_onCreate tests the update of the discovery_group of a resource -// when it differs from the one in the database. -// TODO(tigrato): DELETE in 15.0.0 +// when a resource already exists with the same name but an empty discovery_group. func TestServer_onCreate(t *testing.T) { _, awsRedshiftDB := makeRedshiftCluster(t, "aws-redshift", "us-east-1", "test") - accessPoint := &fakeAccessPoint{} + _, awsRedshiftDBEmptyDiscoveryGroup := makeRedshiftCluster(t, "aws-redshift", "us-east-1", "" /* empty discovery group */) + accessPoint := &fakeAccessPoint{ + kube: mustConvertEKSToKubeCluster(t, eksMockClusters[0], "" /* empty discovery group */), + database: awsRedshiftDBEmptyDiscoveryGroup, + } s := &Server{ Config: &Config{ - AccessPoint: accessPoint, - Log: logrus.New(), + DiscoveryGroup: "test-cluster", + AccessPoint: accessPoint, + Log: logrus.New(), }, } @@ -2621,12 +2624,28 @@ func TestServer_onCreate(t *testing.T) { err := s.onKubeCreate(context.Background(), mustConvertEKSToKubeCluster(t, eksMockClusters[0], "test-cluster")) require.NoError(t, err) require.True(t, accessPoint.updateKube) + + // Reset the update flag. + accessPoint.updateKube = false + accessPoint.kube = mustConvertEKSToKubeCluster(t, eksMockClusters[0], "nonEmpty") + // Update the kube cluster with non-empty discovery group. + err = s.onKubeCreate(context.Background(), mustConvertEKSToKubeCluster(t, eksMockClusters[0], "test-cluster")) + require.Error(t, err) + require.False(t, accessPoint.updateKube) }) t.Run("onCreate update database", func(t *testing.T) { err := s.onDatabaseCreate(context.Background(), awsRedshiftDB) require.NoError(t, err) require.True(t, accessPoint.updateDatabase) + + // Reset the update flag. + accessPoint.updateDatabase = false + accessPoint.database = awsRedshiftDB + // Update the db with non-empty discovery group. + err = s.onDatabaseCreate(context.Background(), awsRedshiftDB) + require.Error(t, err) + require.False(t, accessPoint.updateDatabase) }) } @@ -2689,9 +2708,10 @@ func TestEmitUsageEvents(t *testing.T) { type fakeAccessPoint struct { auth.DiscoveryAccessPoint - updateKube bool - updateDatabase bool - + updateKube bool + updateDatabase bool + kube types.KubeCluster + database types.Database upsertedServerInfos chan types.ServerInfo } @@ -2701,6 +2721,14 @@ func newFakeAccessPoint() *fakeAccessPoint { } } +func (f *fakeAccessPoint) GetKubernetesCluster(ctx context.Context, name string) (types.KubeCluster, error) { + return f.kube, nil +} + +func (f *fakeAccessPoint) GetDatabase(ctx context.Context, name string) (types.Database, error) { + return f.database, nil +} + func (f *fakeAccessPoint) CreateDatabase(ctx context.Context, database types.Database) error { return trace.AlreadyExists("already exists") } @@ -2729,11 +2757,9 @@ func (f *fakeAccessPoint) NewWatcher(ctx context.Context, watch types.Watch) (ty return newFakeWatcher(), nil } -type fakeWatcher struct { -} +type fakeWatcher struct{} func newFakeWatcher() fakeWatcher { - return fakeWatcher{} } diff --git a/lib/srv/discovery/kube_watcher.go b/lib/srv/discovery/kube_watcher.go index 7d9796ac9b9a5..e81547831a83a 100644 --- a/lib/srv/discovery/kube_watcher.go +++ b/lib/srv/discovery/kube_watcher.go @@ -113,14 +113,11 @@ func (s *Server) startKubeWatchers() error { func (s *Server) onKubeCreate(ctx context.Context, kubeCluster types.KubeCluster) error { s.Log.Debugf("Creating kube_cluster %s.", kubeCluster.GetName()) err := s.AccessPoint.CreateKubernetesCluster(ctx, kubeCluster) - // If the resource already exists, it means that the resource was created - // by a previous discovery_service instance that didn't support the discovery - // group feature or the discovery group was changed. - // In this case, we need to update the resource with the - // discovery group label to ensure the user doesn't have to manually delete - // the resource. - // TODO(tigrato): DELETE on 15.0.0 - if trace.IsAlreadyExists(err) { + // If the kube already exists but has an empty discovery group, update it. + if trace.IsAlreadyExists(err) && s.updatesEmptyDiscoveryGroup( + func() (types.ResourceWithLabels, error) { + return s.AccessPoint.GetKubernetesCluster(ctx, kubeCluster.GetName()) + }) { return trace.Wrap(s.onKubeUpdate(ctx, kubeCluster, nil)) } if err != nil {