Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions api/types/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,11 @@ const (

// AlertLicenseExpired is an internal label that indicates that the license has expired.
AlertLicenseExpired = TeleportInternalLabelPrefix + "license-expired-warning"

// TeleportInternalDiscoveryGroupName is the label used to store the name of the discovery group
// that the discovered resource is owned by. It is used to differentiate resources
// that belong to different discovery services that operate on different sets of resources.
TeleportInternalDiscoveryGroupName = TeleportInternalLabelPrefix + "discovery-group-name"
)

// RequestableResourceKinds lists all Teleport resource kinds users can request access to.
Expand Down
1 change: 1 addition & 0 deletions lib/config/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -1215,6 +1215,7 @@ func getInstallerProxyAddr(matcher AzureMatcher, fc *FileConfig) string {

func applyDiscoveryConfig(fc *FileConfig, cfg *servicecfg.Config) error {
cfg.Discovery.Enabled = fc.Discovery.Enabled()
cfg.Discovery.DiscoveryGroup = fc.Discovery.DiscoveryGroup
for _, matcher := range fc.Discovery.AWSMatchers {
installParams, err := matcher.InstallParams.Parse()
if err != nil {
Expand Down
9 changes: 9 additions & 0 deletions lib/config/fileconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -1516,6 +1516,15 @@ type Discovery struct {

// GCPMatchers are used to match GCP resources.
GCPMatchers []GCPMatcher `yaml:"gcp,omitempty"`

// DiscoveryGroup is the name of the discovery group that the current
// discovery service is a part of.
// It is used to filter out discovered resources that belong to another
// discovery services. When running in high availability mode and the agents
// have access to the same cloud resources, this field value must be the same
// for all discovery services. If different agents are used to discover different
// sets of cloud resources, this field must be different for each set of agents.
DiscoveryGroup string `yaml:"discovery_group,omitempty"`
}

// GCPMatcher matches GCP resources.
Expand Down
13 changes: 7 additions & 6 deletions lib/service/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,13 @@ func (process *TeleportProcess) initDiscoveryService() error {
}

discoveryService, err := discovery.New(process.ExitContext(), &discovery.Config{
AWSMatchers: process.Config.Discovery.AWSMatchers,
AzureMatchers: process.Config.Discovery.AzureMatchers,
GCPMatchers: process.Config.Discovery.GCPMatchers,
Emitter: asyncEmitter,
AccessPoint: accessPoint,
Log: process.log,
AWSMatchers: process.Config.Discovery.AWSMatchers,
AzureMatchers: process.Config.Discovery.AzureMatchers,
GCPMatchers: process.Config.Discovery.GCPMatchers,
DiscoveryGroup: process.Config.Discovery.DiscoveryGroup,
Emitter: asyncEmitter,
AccessPoint: accessPoint,
Log: process.log,
})
if err != nil {
return trace.Wrap(err)
Expand Down
8 changes: 8 additions & 0 deletions lib/service/servicecfg/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@ type DiscoveryConfig struct {
AzureMatchers []services.AzureMatcher
// GCPMatchers are used to match GCP resources for auto discovery.
GCPMatchers []services.GCPMatcher
// DiscoveryGroup is the name of the discovery group that the current
// discovery service is a part of.
// It is used to filter out discovered resources that belong to another
// discovery services. When running in high availability mode and the agents
// have access to the same cloud resources, this field value must be the same
// for all discovery services. If different agents are used to discover different
// sets of cloud resources, this field must be different for each set of agents.
DiscoveryGroup string
}

// IsEmpty validates if the Discovery Service config has no cloud matchers.
Expand Down
16 changes: 13 additions & 3 deletions lib/srv/db/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,17 @@ func TestWatcherDynamicResource(t *testing.T) {
})
}

func setDiscoveryGroupLabel(r types.ResourceWithLabels, discoveryGroup string) {
staticLabels := r.GetStaticLabels()
if staticLabels == nil {
staticLabels = make(map[string]string)
}
if discoveryGroup != "" {
staticLabels[types.TeleportInternalDiscoveryGroupName] = discoveryGroup
}
r.SetStaticLabels(staticLabels)
}

// TestWatcherCloudFetchers tests usage of discovery database fetchers by the
// database service.
func TestWatcherCloudFetchers(t *testing.T) {
Expand All @@ -216,10 +227,10 @@ func TestWatcherCloudFetchers(t *testing.T) {
redshiftServerlessDatabase, err := services.NewDatabaseFromRedshiftServerlessWorkgroup(redshiftServerlessWorkgroup, nil)
require.NoError(t, err)
redshiftServerlessDatabase.SetStatusAWS(redshiftServerlessDatabase.GetAWS())

setDiscoveryGroupLabel(redshiftServerlessDatabase, "")
// Test an Azure fetcher.
azSQLServer, azSQLServerDatabase := makeAzureSQLServer(t, "discovery-azure", "group")

setDiscoveryGroupLabel(azSQLServerDatabase, "")
ctx := context.Background()
testCtx := setupTestContext(ctx, t)

Expand Down Expand Up @@ -270,7 +281,6 @@ func assertReconciledResource(t *testing.T, ch chan types.Databases, databases t
case <-time.After(time.Second):
t.Fatal("Didn't receive reconcile event after 1s.")
}

}

func makeStaticDatabase(name string, labels map[string]string, opts ...makeDatabaseOpt) (*types.DatabaseV3, error) {
Expand Down
20 changes: 20 additions & 0 deletions lib/srv/discovery/common/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ type WatcherConfig struct {
Log logrus.FieldLogger
// Clock is used to control time.
Clock clockwork.Clock
// DiscoveryGroup is the name of the discovery group that the current
// discovery service is a part of.
// It is used to filter out discovered resources that belong to another
// discovery services. When running in high availability mode and the agents
// have access to the same cloud resources, this field value must be the same
// for all discovery services. If different agents are used to discover different
// sets of cloud resources, this field must be different for each set of agents.
DiscoveryGroup string
}

// CheckAndSetDefaults validates the config.
Expand Down Expand Up @@ -126,6 +134,18 @@ func (w *Watcher) fetchAndSend() {
// never return the error otherwise it will impact other watchers.
return nil
}
if w.cfg.DiscoveryGroup != "" {
// Add the discovery group name to the static labels of each resource.
for _, r := range resources {
staticLabels := r.GetStaticLabels()
if staticLabels == nil {
staticLabels = make(map[string]string)
}
staticLabels[types.TeleportInternalDiscoveryGroupName] = w.cfg.DiscoveryGroup
r.SetStaticLabels(staticLabels)
}
}

fetchersLock.Lock()
newFetcherResources = append(newFetcherResources, resources...)
fetchersLock.Unlock()
Expand Down
2 changes: 2 additions & 0 deletions lib/srv/discovery/common/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,11 @@ func (m *mockFetcher) Get(ctx context.Context) (types.ResourcesWithLabels, error
}
return m.resources, nil
}

func (m *mockFetcher) ResourceType() string {
return m.resourceType
}

func (m *mockFetcher) Cloud() string {
return m.cloud
}
29 changes: 22 additions & 7 deletions lib/srv/discovery/database_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,9 @@ func (s *Server) startDatabaseWatchers() error {
}

watcher, err := common.NewWatcher(s.ctx, common.WatcherConfig{
Fetchers: s.databaseFetchers,
Log: s.Log.WithField("kind", types.KindDatabase),
Fetchers: s.databaseFetchers,
Log: s.Log.WithField("kind", types.KindDatabase),
DiscoveryGroup: s.DiscoveryGroup,
})
if err != nil {
return trace.Wrap(err)
Expand Down Expand Up @@ -94,7 +95,7 @@ func (s *Server) getCurrentDatabases() types.ResourcesWithLabelsMap {
return nil
}

return types.Databases(filterResourcesByOrigin(databases, types.OriginCloud)).AsResources().ToMap()
return types.Databases(filterResources(databases, types.OriginCloud, s.DiscoveryGroup)).AsResources().ToMap()
}

func (s *Server) onDatabaseCreate(ctx context.Context, resource types.ResourceWithLabels) error {
Expand All @@ -103,7 +104,18 @@ func (s *Server) onDatabaseCreate(ctx context.Context, resource types.ResourceWi
return trace.BadParameter("invalid type received; expected types.Database, received %T", database)
}
s.Log.Debugf("Creating database %s.", database.GetName())
return trace.Wrap(s.AccessPoint.CreateDatabase(ctx, database))
err := s.AccessPoint.CreateDatabase(ctx, database)
// If the resource already exists, it means that the resource was created
// by a previous discovery_service instance that didn't support the discovery
// group feature or the discovery group was changed.
// In this case, we need to update the resource with the
// discovery group label to ensure the user doesn't have to manually delete
// the resource.
// TODO(tigrato): DELETE on 14.0.0
if trace.IsAlreadyExists(err) {
return trace.Wrap(s.onDatabaseUpdate(ctx, resource))
}
return trace.Wrap(err)
}

func (s *Server) onDatabaseUpdate(ctx context.Context, resource types.ResourceWithLabels) error {
Expand All @@ -124,11 +136,14 @@ func (s *Server) onDatabaseDelete(ctx context.Context, resource types.ResourceWi
return trace.Wrap(s.AccessPoint.DeleteDatabase(ctx, database.GetName()))
}

func filterResourcesByOrigin[T types.ResourceWithOrigin, S ~[]T](all S, wantOrigin string) (filtered S) {
func filterResources[T types.ResourceWithLabels, S ~[]T](all S, wantOrigin, wantResourceGroup string) (filtered S) {
for _, resource := range all {
if resource.Origin() == wantOrigin {
filtered = append(filtered, resource)
resourceDiscoveryGroup, _ := resource.GetLabel(types.TeleportInternalDiscoveryGroupName)
if resource.Origin() != wantOrigin || resourceDiscoveryGroup != wantResourceGroup {
continue
}
filtered = append(filtered, resource)

}
return
}
9 changes: 8 additions & 1 deletion lib/srv/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ type Config struct {
Log logrus.FieldLogger
// onDatabaseReconcile is called after each database resource reconciliation.
onDatabaseReconcile func()
// DiscoveryGroup is the name of the discovery group that the current
// discovery service is a part of.
// It is used to filter out discovered resources that belong to another
// discovery services. When running in high availability mode and the agents
// have access to the same cloud resources, this field value must be the same
// for all discovery services. If different agents are used to discover different
// sets of cloud resources, this field must be different for each set of agents.
DiscoveryGroup string
}

func (c *Config) CheckAndSetDefaults() error {
Expand Down Expand Up @@ -407,7 +415,6 @@ func (s *Server) handleEC2Discovery() {
} else {
s.Log.WithError(err).Error("Failed to enroll discovered EC2 instances.")
}

}
case <-s.ctx.Done():
s.ec2Watcher.Stop()
Expand Down
Loading