Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 1 addition & 15 deletions integration/ec2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,24 +408,10 @@ func TestEC2Labels(t *testing.T) {
kubes, err = authServer.GetKubernetesServers(ctx)
assert.NoError(t, err)

// dedupClusters is required because GetKubernetesServers returns duplicated servers
// because it lists the KindKubeServer and KindKubeService.
// We must remove this once legacy heartbeat is removed.
// DELETE IN 13.0.0 (tigrato)
var dedupClusters []types.KubeServer
dedup := map[string]struct{}{}
for _, kube := range kubes {
if _, ok := dedup[kube.GetName()]; ok {
continue
}
dedup[kube.GetName()] = struct{}{}
dedupClusters = append(dedupClusters, kube)
}

assert.Len(t, nodes, 1)
assert.Len(t, apps, 1)
assert.Len(t, databases, 1)
assert.Len(t, dedupClusters, 1)
assert.Len(t, kubes, 1)
}, 10*time.Second, time.Second)

tagName := fmt.Sprintf("%s/Name", labels.AWSLabelNamespace)
Expand Down
2 changes: 2 additions & 0 deletions lib/auth/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,8 @@ type ReadDiscoveryAccessPoint interface {

// GetDatabases returns all database resources.
GetDatabases(ctx context.Context) ([]types.Database, error)
// GetDatabase returns a database resource with the given name if it exists.
GetDatabase(ctx context.Context, name string) (types.Database, error)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a customer-facing API? If so, does the next release need to bump the minor version instead of just patch?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. This is just an interface extension. the underlying stuff already implements that API


// GetApps returns all application resources.
GetApps(context.Context) ([]types.Application, error)
Expand Down
13 changes: 5 additions & 8 deletions lib/srv/discovery/database_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,11 @@ func (s *Server) getCurrentDatabases() map[string]types.Database {
func (s *Server) onDatabaseCreate(ctx context.Context, database types.Database) error {
s.Log.Debugf("Creating database %s.", database.GetName())
err := s.AccessPoint.CreateDatabase(ctx, database)
// If the resource already exists, it means that the resource was created
// by a previous discovery_service instance that didn't support the discovery
// group feature or the discovery group was changed.
// In this case, we need to update the resource with the
// discovery group label to ensure the user doesn't have to manually delete
// the resource.
// TODO(tigrato): DELETE on 15.0.0
if trace.IsAlreadyExists(err) {
// If the database already exists but has an empty discovery group, update it.
if trace.IsAlreadyExists(err) && s.updatesEmptyDiscoveryGroup(
func() (types.ResourceWithLabels, error) {
return s.AccessPoint.GetDatabase(ctx, database.GetName())
}) {
return trace.Wrap(s.onDatabaseUpdate(ctx, database, nil))
}
if err != nil {
Expand Down
18 changes: 18 additions & 0 deletions lib/srv/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,12 @@ kubernetes matchers are present.`)
}

c.Log = c.Log.WithField(trace.Component, teleport.ComponentDiscovery)

if c.DiscoveryGroup == "" {
c.Log.Warn("discovery_service.discovery_group is not set. This field is required for the discovery service to work properly.\n" +
"Please set discovery_service.discovery_group according to the documentation: https://goteleport.com/docs/reference/config/#discovery-service")
}

c.Matchers.Azure = services.SimplifyAzureMatchers(c.Matchers.Azure)
return nil
}
Expand Down Expand Up @@ -1466,3 +1472,15 @@ func splitMatchers[T types.Matcher](matchers []T, matcherTypeCheck func(string)
}
return
}

func (s *Server) updatesEmptyDiscoveryGroup(getter func() (types.ResourceWithLabels, error)) bool {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this function appropriately named? I don't see anything that gets updated, but I may be misunderstanding what this code does.

Maybe isDiscoveryGroupEmpty instead?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function checks if we are updating a resource with an empty discovery group. It requires going to the DB and pulling the resource version there because if it's empty we allow the update.

I tried to avoid isDiscoveryGroupEmpty because it gives the impression that it checks for the current version (discovery group not empty) while in reality, it checks the discovery group of the existing resource

if s.DiscoveryGroup == "" {
return false
}
old, err := getter()
if err != nil {
return false
}
oldDiscoveryGroup, _ := old.GetLabel(types.TeleportInternalDiscoveryGroupName)
return oldDiscoveryGroup == ""
}
58 changes: 42 additions & 16 deletions lib/srv/discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,6 @@ func TestDiscoveryServer(t *testing.T) {
instances := installer.GetInstalledInstances()
slices.Sort(instances)
return slices.Equal(tc.wantInstalledInstances, instances) && len(tc.wantInstalledInstances) == reporter.ResourceCreateEventCount()

}, 5000*time.Millisecond, 50*time.Millisecond)
} else {
require.Never(t, func() bool {
Expand Down Expand Up @@ -570,8 +569,10 @@ func TestDiscoveryKubeServices(t *testing.T) {
mockKubeServices := []*corev1.Service{
newMockKubeService("service1", "ns1", "", map[string]string{"test-label": "testval"}, nil,
[]corev1.ServicePort{{Port: 42, Name: "http", Protocol: corev1.ProtocolTCP}}),
newMockKubeService("service2", "ns2", "", map[string]string{"test-label": "testval",
"test-label2": "testval2"}, nil, []corev1.ServicePort{{Port: 42, Name: "custom", AppProtocol: &appProtocolHTTP, Protocol: corev1.ProtocolTCP}}),
newMockKubeService("service2", "ns2", "", map[string]string{
"test-label": "testval",
"test-label2": "testval2",
}, nil, []corev1.ServicePort{{Port: 42, Name: "custom", AppProtocol: &appProtocolHTTP, Protocol: corev1.ProtocolTCP}}),
}

app1 := mustConvertKubeServiceToApp(t, mainDiscoveryGroup, "http", mockKubeServices[0], mockKubeServices[0].Spec.Ports[0])
Expand Down Expand Up @@ -718,7 +719,6 @@ func TestDiscoveryKubeServices(t *testing.T) {
}
}
return true

}, 5*time.Second, 200*time.Millisecond)
})
}
Expand Down Expand Up @@ -2599,34 +2599,53 @@ func TestGCPVMDiscovery(t *testing.T) {
return len(installer.GetInstalledInstances()) > 0 || reporter.ResourceCreateEventCount() > 0
}, 500*time.Millisecond, 50*time.Millisecond)
}

})
}
}

// TestServer_onCreate tests the update of the discovery_group of a resource
// when it differs from the one in the database.
// TODO(tigrato): DELETE in 15.0.0
// when a resource already exists with the same name but an empty discovery_group.
func TestServer_onCreate(t *testing.T) {
_, awsRedshiftDB := makeRedshiftCluster(t, "aws-redshift", "us-east-1", "test")
accessPoint := &fakeAccessPoint{}
_, awsRedshiftDBEmptyDiscoveryGroup := makeRedshiftCluster(t, "aws-redshift", "us-east-1", "" /* empty discovery group */)
accessPoint := &fakeAccessPoint{
kube: mustConvertEKSToKubeCluster(t, eksMockClusters[0], "" /* empty discovery group */),
database: awsRedshiftDBEmptyDiscoveryGroup,
}
s := &Server{
Config: &Config{
AccessPoint: accessPoint,
Log: logrus.New(),
DiscoveryGroup: "test-cluster",
AccessPoint: accessPoint,
Log: logrus.New(),
},
}

t.Run("onCreate update kube", func(t *testing.T) {
err := s.onKubeCreate(context.Background(), mustConvertEKSToKubeCluster(t, eksMockClusters[0], "test-cluster"))
require.NoError(t, err)
require.True(t, accessPoint.updateKube)

// Reset the update flag.
accessPoint.updateKube = false
accessPoint.kube = mustConvertEKSToKubeCluster(t, eksMockClusters[0], "nonEmpty")
// Update the kube cluster with non-empty discovery group.
err = s.onKubeCreate(context.Background(), mustConvertEKSToKubeCluster(t, eksMockClusters[0], "test-cluster"))
require.Error(t, err)
require.False(t, accessPoint.updateKube)
})

t.Run("onCreate update database", func(t *testing.T) {
err := s.onDatabaseCreate(context.Background(), awsRedshiftDB)
require.NoError(t, err)
require.True(t, accessPoint.updateDatabase)

// Reset the update flag.
accessPoint.updateDatabase = false
accessPoint.database = awsRedshiftDB
// Update the db with non-empty discovery group.
err = s.onDatabaseCreate(context.Background(), awsRedshiftDB)
require.Error(t, err)
require.False(t, accessPoint.updateDatabase)
})
}

Expand Down Expand Up @@ -2689,9 +2708,10 @@ func TestEmitUsageEvents(t *testing.T) {

type fakeAccessPoint struct {
auth.DiscoveryAccessPoint
updateKube bool
updateDatabase bool

updateKube bool
updateDatabase bool
kube types.KubeCluster
database types.Database
upsertedServerInfos chan types.ServerInfo
}

Expand All @@ -2701,6 +2721,14 @@ func newFakeAccessPoint() *fakeAccessPoint {
}
}

func (f *fakeAccessPoint) GetKubernetesCluster(ctx context.Context, name string) (types.KubeCluster, error) {
return f.kube, nil
}

func (f *fakeAccessPoint) GetDatabase(ctx context.Context, name string) (types.Database, error) {
return f.database, nil
}

func (f *fakeAccessPoint) CreateDatabase(ctx context.Context, database types.Database) error {
return trace.AlreadyExists("already exists")
}
Expand Down Expand Up @@ -2729,11 +2757,9 @@ func (f *fakeAccessPoint) NewWatcher(ctx context.Context, watch types.Watch) (ty
return newFakeWatcher(), nil
}

type fakeWatcher struct {
}
type fakeWatcher struct{}

func newFakeWatcher() fakeWatcher {

return fakeWatcher{}
}

Expand Down
13 changes: 5 additions & 8 deletions lib/srv/discovery/kube_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,11 @@ func (s *Server) startKubeWatchers() error {
func (s *Server) onKubeCreate(ctx context.Context, kubeCluster types.KubeCluster) error {
s.Log.Debugf("Creating kube_cluster %s.", kubeCluster.GetName())
err := s.AccessPoint.CreateKubernetesCluster(ctx, kubeCluster)
// If the resource already exists, it means that the resource was created
// by a previous discovery_service instance that didn't support the discovery
// group feature or the discovery group was changed.
// In this case, we need to update the resource with the
// discovery group label to ensure the user doesn't have to manually delete
// the resource.
// TODO(tigrato): DELETE on 15.0.0
if trace.IsAlreadyExists(err) {
// If the kube already exists but has an empty discovery group, update it.
if trace.IsAlreadyExists(err) && s.updatesEmptyDiscoveryGroup(
func() (types.ResourceWithLabels, error) {
return s.AccessPoint.GetKubernetesCluster(ctx, kubeCluster.GetName())
}) {
return trace.Wrap(s.onKubeUpdate(ctx, kubeCluster, nil))
}
if err != nil {
Expand Down