diff --git a/lib/srv/discovery/database_watcher.go b/lib/srv/discovery/database_watcher.go index 9ca1f3b5afa6a..d7143e5a87130 100644 --- a/lib/srv/discovery/database_watcher.go +++ b/lib/srv/discovery/database_watcher.go @@ -97,11 +97,11 @@ func (s *Server) startDatabaseWatchers() error { func (s *Server) getAllDatabaseFetchers() []common.Fetcher { allFetchers := make([]common.Fetcher, 0, len(s.databaseFetchers)) - s.muDynamicFetchers.RLock() + s.muDynamicDatabaseFetchers.RLock() for _, fetcherSet := range s.dynamicDatabaseFetchers { allFetchers = append(allFetchers, fetcherSet...) } - s.muDynamicFetchers.RUnlock() + s.muDynamicDatabaseFetchers.RUnlock() return append(allFetchers, s.databaseFetchers...) } diff --git a/lib/srv/discovery/discovery.go b/lib/srv/discovery/discovery.go index 54e168f416c3e..473f96216c0e9 100644 --- a/lib/srv/discovery/discovery.go +++ b/lib/srv/discovery/discovery.go @@ -217,8 +217,26 @@ type Server struct { // dynamicDatabaseFetchers holds the current Database Fetchers for the Dynamic Matchers (those coming from DiscoveryConfig resource). // The key is the DiscoveryConfig name. - dynamicDatabaseFetchers map[string][]common.Fetcher - muDynamicFetchers sync.RWMutex + dynamicDatabaseFetchers map[string][]common.Fetcher + muDynamicDatabaseFetchers sync.RWMutex + + // dynamicServerAWSFetchers holds the current AWS EC2 Fetchers for the Dynamic Matchers (those coming from DiscoveryConfig resource). + // The key is the DiscoveryConfig name. + dynamicServerAWSFetchers map[string][]server.Fetcher + muDynamicServerAWSFetchers sync.RWMutex + staticServerAWSFetchers []server.Fetcher + + // dynamicServerAzureFetchers holds the current Azure VM Fetchers for the Dynamic Matchers (those coming from DiscoveryConfig resource). + // The key is the DiscoveryConfig name. + dynamicServerAzureFetchers map[string][]server.Fetcher + muDynamicServerAzureFetchers sync.RWMutex + staticServerAzureFetchers []server.Fetcher + + // dynamicServerGCPFetchers holds the current GCP VM Fetchers for the Dynamic Matchers (those coming from DiscoveryConfig resource). + // The key is the DiscoveryConfig name. + dynamicServerGCPFetchers map[string][]server.Fetcher + muDynamicServerGCPFetchers sync.RWMutex + staticServerGCPFetchers []server.Fetcher // caRotationCh receives nodes that need to have their CAs rotated. caRotationCh chan []types.Server @@ -240,11 +258,14 @@ func New(ctx context.Context, cfg *Config) (*Server, error) { localCtx, cancelfn := context.WithCancel(ctx) s := &Server{ - Config: cfg, - ctx: localCtx, - cancelfn: cancelfn, - usageEventCache: make(map[string]struct{}), - dynamicDatabaseFetchers: make(map[string][]common.Fetcher), + Config: cfg, + ctx: localCtx, + cancelfn: cancelfn, + usageEventCache: make(map[string]struct{}), + dynamicDatabaseFetchers: make(map[string][]common.Fetcher), + dynamicServerAWSFetchers: make(map[string][]server.Fetcher), + dynamicServerAzureFetchers: make(map[string][]server.Fetcher), + dynamicServerGCPFetchers: make(map[string][]server.Fetcher), } if err := s.startDynamicMatchersWatcher(ctx); err != nil { @@ -316,35 +337,39 @@ func (s *Server) startDynamicMatchersWatcher(ctx context.Context) error { // initAWSWatchers starts AWS resource watchers based on types provided. func (s *Server) initAWSWatchers(matchers []types.AWSMatcher) error { + var err error + ec2Matchers, otherMatchers := splitMatchers(matchers, func(matcherType string) bool { return matcherType == types.AWSMatcherEC2 }) - // start ec2 watchers - var err error - if len(ec2Matchers) > 0 { - s.caRotationCh = make(chan []types.Server) - s.ec2Watcher, err = server.NewEC2Watcher(s.ctx, ec2Matchers, s.CloudClients, s.caRotationCh, server.WithPollInterval(s.PollInterval)) - if err != nil { - return trace.Wrap(err) - } + s.staticServerAWSFetchers, err = server.MatchersToEC2InstanceFetchers(s.ctx, ec2Matchers, s.CloudClients) + if err != nil { + return trace.Wrap(err) + } - if s.ec2Installer == nil { - s.ec2Installer = server.NewSSMInstaller(server.SSMInstallerConfig{ - Emitter: s.Emitter, - }) - } + s.ec2Watcher, err = server.NewEC2Watcher(s.ctx, s.getAllAWSServerFetchers, s.caRotationCh, server.WithPollInterval(s.PollInterval)) + if err != nil { + return trace.Wrap(err) + } - lr, err := newLabelReconciler(&labelReconcilerConfig{ - log: s.Log, - accessPoint: s.AccessPoint, + s.caRotationCh = make(chan []types.Server) + + if s.ec2Installer == nil { + s.ec2Installer = server.NewSSMInstaller(server.SSMInstallerConfig{ + Emitter: s.Emitter, }) - if err != nil { - return trace.Wrap(err) - } - s.reconciler = lr } + lr, err := newLabelReconciler(&labelReconcilerConfig{ + log: s.Log, + accessPoint: s.AccessPoint, + }) + if err != nil { + return trace.Wrap(err) + } + s.reconciler = lr + // Database fetchers were added in databaseFetchersFromMatchers. _, otherMatchers = splitMatchers(otherMatchers, db.IsAWSMatcherType) @@ -421,6 +446,49 @@ func (s *Server) initKubeAppWatchers(matchers []types.KubernetesMatcher) error { return nil } +// awsServerFetchersFromMatchers converts Matchers into a set of AWS EC2 Fetchers. +func (s *Server) awsServerFetchersFromMatchers(ctx context.Context, matchers []types.AWSMatcher) ([]server.Fetcher, error) { + serverMatchers, _ := splitMatchers(matchers, func(matcherType string) bool { + return matcherType == types.AWSMatcherEC2 + }) + + fetchers, err := server.MatchersToEC2InstanceFetchers(ctx, serverMatchers, s.CloudClients) + if err != nil { + return nil, trace.Wrap(err) + } + + return fetchers, nil +} + +// azureServerFetchersFromMatchers converts Matchers into a set of Azure Servers Fetchers. +func (s *Server) azureServerFetchersFromMatchers(matchers []types.AzureMatcher) []server.Fetcher { + serverMatchers, _ := splitMatchers(matchers, func(matcherType string) bool { + return matcherType == types.AzureMatcherVM + }) + + return server.MatchersToAzureInstanceFetchers(serverMatchers, s.CloudClients) +} + +// gcpServerFetchersFromMatchers converts Matchers into a set of GCP Servers Fetchers. +func (s *Server) gcpServerFetchersFromMatchers(ctx context.Context, matchers []types.GCPMatcher) ([]server.Fetcher, error) { + serverMatchers, _ := splitMatchers(matchers, func(matcherType string) bool { + return matcherType == types.GCPMatcherCompute + }) + + if len(serverMatchers) == 0 { + // We have an early exit here because GetGCPInstancesClient returns an error + // when there are no credentials in the environment. + return nil, nil + } + + client, err := s.CloudClients.GetGCPInstancesClient(ctx) + if err != nil { + return nil, trace.Wrap(err) + } + + return server.MatchersToGCPInstanceFetchers(serverMatchers, client), nil +} + // databaseFetchersFromMatchers converts Matchers into a set of Database Fetchers. func (s *Server) databaseFetchersFromMatchers(matchers Matchers) ([]common.Fetcher, error) { var fetchers []common.Fetcher @@ -457,17 +525,17 @@ func (s *Server) initAzureWatchers(ctx context.Context, matchers []types.AzureMa return matcherType == types.AzureMatcherVM }) + s.staticServerAzureFetchers = server.MatchersToAzureInstanceFetchers(vmMatchers, s.CloudClients) + // VM watcher. - if len(vmMatchers) > 0 { - var err error - s.azureWatcher, err = server.NewAzureWatcher(s.ctx, vmMatchers, s.CloudClients, server.WithPollInterval(s.PollInterval)) - if err != nil { - return trace.Wrap(err) - } - if s.azureInstaller == nil { - s.azureInstaller = &server.AzureInstaller{ - Emitter: s.Emitter, - } + var err error + s.azureWatcher, err = server.NewAzureWatcher(s.ctx, s.getAllAzureServerFetchers, server.WithPollInterval(s.PollInterval)) + if err != nil { + return trace.Wrap(err) + } + if s.azureInstaller == nil { + s.azureInstaller = &server.AzureInstaller{ + Emitter: s.Emitter, } } @@ -506,30 +574,46 @@ func (s *Server) initAzureWatchers(ctx context.Context, matchers []types.AzureMa return nil } +func (s *Server) initGCPServerWatcher(ctx context.Context, vmMatchers []types.GCPMatcher) error { + var err error + + s.staticServerGCPFetchers, err = s.gcpServerFetchersFromMatchers(ctx, vmMatchers) + if err != nil { + return trace.Wrap(err) + } + + s.gcpWatcher, err = server.NewGCPWatcher(s.ctx, s.getAllGCPServerFetchers) + if err != nil { + return trace.Wrap(err) + } + + if s.gcpInstaller == nil { + s.gcpInstaller = &server.GCPInstaller{ + Emitter: s.Emitter, + } + } + + return nil +} + // initGCPWatchers starts GCP resource watchers based on types provided. func (s *Server) initGCPWatchers(ctx context.Context, matchers []types.GCPMatcher) error { // return early if there are no matchers as GetGCPGKEClient causes // an error if there are no credentials present - if len(matchers) == 0 { - return nil - } vmMatchers, otherMatchers := splitMatchers(matchers, func(matcherType string) bool { return matcherType == types.GCPMatcherCompute }) - // VM watcher. - if len(vmMatchers) > 0 { - var err error - s.gcpWatcher, err = server.NewGCPWatcher(s.ctx, vmMatchers, s.CloudClients) - if err != nil { - return trace.Wrap(err) - } - if s.gcpInstaller == nil { - s.gcpInstaller = &server.GCPInstaller{ - Emitter: s.Emitter, - } - } + if err := s.initGCPServerWatcher(ctx, vmMatchers); err != nil { + return trace.Wrap(err) + } + + // If there's no GCP Client creds in the environment + // and call to GetGCP...Client + // Early exit if there's no Kube Matchers, to prevent the error. + if len(otherMatchers) == 0 { + return nil } kubeClient, err := s.CloudClients.GetGCPGKEClient(ctx) @@ -963,6 +1047,42 @@ func (s *Server) emitUsageEvents(events map[string]*usageeventsv1.ResourceCreate return nil } +func (s *Server) getAllAWSServerFetchers() []server.Fetcher { + allFetchers := make([]server.Fetcher, 0, len(s.staticServerAWSFetchers)) + + s.muDynamicServerAWSFetchers.RLock() + for _, fetcherSet := range s.dynamicServerAWSFetchers { + allFetchers = append(allFetchers, fetcherSet...) + } + s.muDynamicServerAWSFetchers.RUnlock() + + return append(allFetchers, s.staticServerAWSFetchers...) +} + +func (s *Server) getAllAzureServerFetchers() []server.Fetcher { + allFetchers := make([]server.Fetcher, 0, len(s.staticServerAzureFetchers)) + + s.muDynamicServerAzureFetchers.RLock() + for _, fetcherSet := range s.dynamicServerAzureFetchers { + allFetchers = append(allFetchers, fetcherSet...) + } + s.muDynamicServerAzureFetchers.RUnlock() + + return append(allFetchers, s.staticServerAzureFetchers...) +} + +func (s *Server) getAllGCPServerFetchers() []server.Fetcher { + allFetchers := make([]server.Fetcher, 0, len(s.staticServerGCPFetchers)) + + s.muDynamicServerGCPFetchers.RLock() + for _, fetcherSet := range s.dynamicServerGCPFetchers { + allFetchers = append(allFetchers, fetcherSet...) + } + s.muDynamicServerGCPFetchers.RUnlock() + + return append(allFetchers, s.staticServerGCPFetchers...) +} + // Start starts the discovery service. func (s *Server) Start() error { if s.ec2Watcher != nil { @@ -1032,9 +1152,7 @@ func (s *Server) startDynamicWatcherUpdater() { // Let's assume there's a DiscoveryConfig DC1 has DiscoveryGroup DG1, which this process is monitoring. // If the user updates the DiscoveryGroup to DG2, then DC1 must be removed from the scope of this process. // We blindly delete it, in the worst case, this is a no-op. - s.muDynamicFetchers.Lock() - delete(s.dynamicDatabaseFetchers, event.Resource.GetName()) - s.muDynamicFetchers.Unlock() + s.deleteDynamicFetchers(event.Resource.GetName()) continue } @@ -1044,9 +1162,7 @@ func (s *Server) startDynamicWatcherUpdater() { } case types.OpDelete: - s.muDynamicFetchers.Lock() - delete(s.dynamicDatabaseFetchers, event.Resource.GetName()) - s.muDynamicFetchers.Unlock() + s.deleteDynamicFetchers(event.Resource.GetName()) default: s.Log.Warnf("Skipping unknown event type %s", event.Type) @@ -1058,6 +1174,24 @@ func (s *Server) startDynamicWatcherUpdater() { } } +func (s *Server) deleteDynamicFetchers(name string) { + s.muDynamicDatabaseFetchers.Lock() + delete(s.dynamicDatabaseFetchers, name) + s.muDynamicDatabaseFetchers.Unlock() + + s.muDynamicServerAWSFetchers.Lock() + delete(s.dynamicServerAWSFetchers, name) + s.muDynamicServerAWSFetchers.Unlock() + + s.muDynamicServerAzureFetchers.Lock() + delete(s.dynamicServerAzureFetchers, name) + s.muDynamicServerAzureFetchers.Unlock() + + s.muDynamicServerGCPFetchers.Lock() + delete(s.dynamicServerGCPFetchers, name) + s.muDynamicServerGCPFetchers.Unlock() +} + // upsertDynamicMatchers upserts the internal set of dynamic matchers given a particular discovery config. func (s *Server) upsertDynamicMatchers(dc *discoveryconfig.DiscoveryConfig) error { matchers := Matchers{ @@ -1067,16 +1201,37 @@ func (s *Server) upsertDynamicMatchers(dc *discoveryconfig.DiscoveryConfig) erro Kubernetes: dc.Spec.Kube, } - databaseFetchers, err := s.databaseFetchersFromMatchers(matchers) + awsServerFetchers, err := s.awsServerFetchersFromMatchers(s.ctx, matchers.AWS) if err != nil { return trace.Wrap(err) } + s.muDynamicServerAWSFetchers.Lock() + s.dynamicServerAWSFetchers[dc.GetName()] = awsServerFetchers + s.muDynamicServerAWSFetchers.Unlock() - // TODO(marco): add other matcher types (VMs, Kubes, KubeApps) + azureServerFetchers := s.azureServerFetchersFromMatchers(matchers.Azure) + s.muDynamicServerAzureFetchers.Lock() + s.dynamicServerAzureFetchers[dc.GetName()] = azureServerFetchers + s.muDynamicServerAzureFetchers.Unlock() - s.muDynamicFetchers.Lock() + gcpServerFetchers, err := s.gcpServerFetchersFromMatchers(s.ctx, matchers.GCP) + if err != nil { + return trace.Wrap(err) + } + s.muDynamicServerGCPFetchers.Lock() + s.dynamicServerGCPFetchers[dc.GetName()] = gcpServerFetchers + s.muDynamicServerGCPFetchers.Unlock() + + databaseFetchers, err := s.databaseFetchersFromMatchers(matchers) + if err != nil { + return trace.Wrap(err) + } + + s.muDynamicDatabaseFetchers.Lock() s.dynamicDatabaseFetchers[dc.GetName()] = databaseFetchers - s.muDynamicFetchers.Unlock() + s.muDynamicDatabaseFetchers.Unlock() + + // TODO(marco): add other fetchers: Kube Clusters and Kube Resources (Apps) return nil } diff --git a/lib/srv/discovery/discovery_test.go b/lib/srv/discovery/discovery_test.go index 9ae1b76135171..d57f639a79438 100644 --- a/lib/srv/discovery/discovery_test.go +++ b/lib/srv/discovery/discovery_test.go @@ -188,6 +188,32 @@ func (m *mockSSMInstaller) GetInstalledInstances() []string { func TestDiscoveryServer(t *testing.T) { t.Parallel() + + defaultDiscoveryGroup := "dc001" + defaultStaticMatcher := Matchers{ + AWS: []types.AWSMatcher{{ + Types: []string{"ec2"}, + Regions: []string{"eu-central-1"}, + Tags: map[string]utils.Strings{"teleport": {"yes"}}, + SSM: &types.AWSSSM{DocumentName: "document"}, + Params: &types.InstallerParams{ + InstallTeleport: true, + }, + }}, + } + + defaultDiscoveryConfig, err := discoveryconfig.NewDiscoveryConfig( + header.Metadata{Name: uuid.NewString()}, + discoveryconfig.Spec{ + DiscoveryGroup: defaultDiscoveryGroup, + AWS: defaultStaticMatcher.AWS, + Azure: defaultStaticMatcher.Azure, + GCP: defaultStaticMatcher.GCP, + Kube: defaultStaticMatcher.Kubernetes, + }, + ) + require.NoError(t, err) + tcs := []struct { name string // presentInstances is a list of servers already present in teleport @@ -195,6 +221,8 @@ func TestDiscoveryServer(t *testing.T) { foundEC2Instances []*ec2.Instance ssm *mockSSMClient emitter *mockEmitter + discoveryConfig *discoveryconfig.DiscoveryConfig + staticMatchers Matchers wantInstalledInstances []string }{ { @@ -240,6 +268,7 @@ func TestDiscoveryServer(t *testing.T) { }) }, }, + staticMatchers: defaultStaticMatcher, wantInstalledInstances: []string{"instance-id-1"}, }, { @@ -280,7 +309,8 @@ func TestDiscoveryServer(t *testing.T) { ResponseCode: aws.Int64(0), }, }, - emitter: &mockEmitter{}, + staticMatchers: defaultStaticMatcher, + emitter: &mockEmitter{}, }, { name: "nodes present, instance not filtered", @@ -321,6 +351,7 @@ func TestDiscoveryServer(t *testing.T) { }, }, emitter: &mockEmitter{}, + staticMatchers: defaultStaticMatcher, wantInstalledInstances: []string{"instance-id-1"}, }, { @@ -339,8 +370,56 @@ func TestDiscoveryServer(t *testing.T) { }, }, emitter: &mockEmitter{}, + staticMatchers: defaultStaticMatcher, wantInstalledInstances: genEC2InstanceIDs(58), }, + { + name: "no nodes present, 1 found using dynamic matchers", + presentInstances: []types.Server{}, + foundEC2Instances: []*ec2.Instance{ + { + InstanceId: aws.String("instance-id-1"), + Tags: []*ec2.Tag{{ + Key: aws.String("env"), + Value: aws.String("dev"), + }}, + State: &ec2.InstanceState{ + Name: aws.String(ec2.InstanceStateNameRunning), + }, + }, + }, + ssm: &mockSSMClient{ + commandOutput: &ssm.SendCommandOutput{ + Command: &ssm.Command{ + CommandId: aws.String("command-id-1"), + }, + }, + invokeOutput: &ssm.GetCommandInvocationOutput{ + Status: aws.String(ssm.CommandStatusSuccess), + ResponseCode: aws.Int64(0), + }, + }, + emitter: &mockEmitter{ + eventHandler: func(t *testing.T, ae events.AuditEvent, server *Server) { + t.Helper() + require.Equal(t, ae, &events.SSMRun{ + Metadata: events.Metadata{ + Type: libevents.SSMRunEvent, + Code: libevents.SSMRunSuccessCode, + }, + CommandID: "command-id-1", + AccountID: "owner", + InstanceID: "instance-id-1", + Region: "eu-central-1", + ExitCode: 0, + Status: ssm.CommandStatusSuccess, + }) + }, + }, + staticMatchers: Matchers{}, + discoveryConfig: defaultDiscoveryConfig, + wantInstalledInstances: []string{"instance-id-1"}, + }, } for _, tc := range tcs { @@ -393,30 +472,27 @@ func TestDiscoveryServer(t *testing.T) { CloudClients: testCloudClients, KubernetesClient: fake.NewSimpleClientset(), AccessPoint: tlsServer.Auth(), - Matchers: Matchers{ - AWS: []types.AWSMatcher{{ - Types: []string{"ec2"}, - Regions: []string{"eu-central-1"}, - Tags: map[string]utils.Strings{"teleport": {"yes"}}, - SSM: &types.AWSSSM{DocumentName: "document"}, - Params: &types.InstallerParams{ - InstallTeleport: true, - }, - }}, - }, - Emitter: tc.emitter, - Log: logger, + Matchers: tc.staticMatchers, + Emitter: tc.emitter, + Log: logger, + DiscoveryGroup: defaultDiscoveryGroup, }) require.NoError(t, err) server.ec2Installer = installer tc.emitter.server = server tc.emitter.t = t - r, w := io.Pipe() - t.Cleanup(func() { - require.NoError(t, r.Close()) - require.NoError(t, w.Close()) - }) + if tc.discoveryConfig != nil { + _, err := tlsServer.Auth().DiscoveryConfigClient().CreateDiscoveryConfig(ctx, tc.discoveryConfig) + require.NoError(t, err) + + // Wait for the DiscoveryConfig to be added to the dynamic matchers + require.Eventually(t, func() bool { + server.muDynamicServerAWSFetchers.RLock() + defer server.muDynamicServerAWSFetchers.RUnlock() + return len(server.dynamicServerAWSFetchers) > 0 + }, 1*time.Second, 100*time.Millisecond) + } go server.Start() t.Cleanup(server.Stop) @@ -427,7 +503,7 @@ func TestDiscoveryServer(t *testing.T) { instances := installer.GetInstalledInstances() slices.Sort(instances) return slices.Equal(tc.wantInstalledInstances, instances) && len(tc.wantInstalledInstances) == reporter.EventsCount() - }, 500*time.Millisecond, 50*time.Millisecond) + }, 5000*time.Millisecond, 50*time.Millisecond) } else { require.Never(t, func() bool { return len(installer.GetInstalledInstances()) > 0 || reporter.EventsCount() > 0 @@ -1532,8 +1608,8 @@ func TestDiscoveryDatabase(t *testing.T) { // Wait for the DiscoveryConfig to be added to the dynamic matchers require.Eventually(t, func() bool { - srv.muDynamicFetchers.RLock() - defer srv.muDynamicFetchers.RUnlock() + srv.muDynamicDatabaseFetchers.RLock() + defer srv.muDynamicDatabaseFetchers.RUnlock() return len(srv.dynamicDatabaseFetchers) > 0 }, 1*time.Second, 100*time.Millisecond) } @@ -1659,8 +1735,8 @@ func TestDiscoveryDatabaseRemovingDiscoveryConfigs(t *testing.T) { _, err = tlsServer.Auth().DiscoveryConfigClient().CreateDiscoveryConfig(ctx, dc1) require.NoError(t, err) require.Eventually(t, func() bool { - srv.muDynamicFetchers.RLock() - defer srv.muDynamicFetchers.RUnlock() + srv.muDynamicDatabaseFetchers.RLock() + defer srv.muDynamicDatabaseFetchers.RUnlock() return len(srv.dynamicDatabaseFetchers) == 0 }, 1*time.Second, 100*time.Millisecond) @@ -1696,8 +1772,8 @@ func TestDiscoveryDatabaseRemovingDiscoveryConfigs(t *testing.T) { _, err = tlsServer.Auth().DiscoveryConfigClient().CreateDiscoveryConfig(ctx, dc1) require.NoError(t, err) require.Eventually(t, func() bool { - srv.muDynamicFetchers.RLock() - defer srv.muDynamicFetchers.RUnlock() + srv.muDynamicDatabaseFetchers.RLock() + defer srv.muDynamicDatabaseFetchers.RUnlock() return len(srv.dynamicDatabaseFetchers) > 0 }, 1*time.Second, 100*time.Millisecond) @@ -1723,8 +1799,8 @@ func TestDiscoveryDatabaseRemovingDiscoveryConfigs(t *testing.T) { err = tlsServer.Auth().DiscoveryConfigClient().DeleteDiscoveryConfig(ctx, dc1.GetName()) require.NoError(t, err) require.Eventually(t, func() bool { - srv.muDynamicFetchers.RLock() - defer srv.muDynamicFetchers.RUnlock() + srv.muDynamicDatabaseFetchers.RLock() + defer srv.muDynamicDatabaseFetchers.RUnlock() return len(srv.dynamicDatabaseFetchers) == 0 }, 1*time.Second, 100*time.Millisecond) @@ -1862,10 +1938,40 @@ func (m *mockAzureInstaller) GetInstalledInstances() []string { func TestAzureVMDiscovery(t *testing.T) { t.Parallel() + + defaultDiscoveryGroup := "dc001" + + vmMatcherFn := func() Matchers { + return Matchers{ + Azure: []types.AzureMatcher{{ + Types: []string{"vm"}, + Subscriptions: []string{"testsub"}, + ResourceGroups: []string{"testrg"}, + Regions: []string{"westcentralus"}, + ResourceTags: types.Labels{"teleport": {"yes"}}, + }}, + } + } + + vmMatcher := vmMatcherFn() + defaultDiscoveryConfig, err := discoveryconfig.NewDiscoveryConfig( + header.Metadata{Name: uuid.NewString()}, + discoveryconfig.Spec{ + DiscoveryGroup: defaultDiscoveryGroup, + AWS: vmMatcher.AWS, + Azure: vmMatcher.Azure, + GCP: vmMatcher.GCP, + Kube: vmMatcher.Kubernetes, + }, + ) + require.NoError(t, err) + tests := []struct { name string presentVMs []types.Server foundAzureVMs []*armcompute.VirtualMachine + discoveryConfig *discoveryconfig.DiscoveryConfig + staticMatchers Matchers wantInstalledInstances []string }{ { @@ -1888,6 +1994,7 @@ func TestAzureVMDiscovery(t *testing.T) { }, }, }, + staticMatchers: vmMatcherFn(), wantInstalledInstances: []string{"testvm"}, }, { @@ -1905,6 +2012,7 @@ func TestAzureVMDiscovery(t *testing.T) { }, }, }, + staticMatchers: vmMatcherFn(), foundAzureVMs: []*armcompute.VirtualMachine{ { ID: aws.String((&arm.ResourceID{ @@ -1937,6 +2045,29 @@ func TestAzureVMDiscovery(t *testing.T) { }, }, }, + staticMatchers: vmMatcherFn(), + foundAzureVMs: []*armcompute.VirtualMachine{ + { + ID: aws.String((&arm.ResourceID{ + SubscriptionID: "testsub", + ResourceGroupName: "rg", + Name: "testvm", + }).String()), + Name: aws.String("testvm"), + Location: aws.String("westcentralus"), + Tags: map[string]*string{ + "teleport": aws.String("yes"), + }, + Properties: &armcompute.VirtualMachineProperties{ + VMID: aws.String("test-vmid"), + }, + }, + }, + wantInstalledInstances: []string{"testvm"}, + }, + { + name: "no nodes present, 1 found using dynamic matchers", + presentVMs: []types.Server{}, foundAzureVMs: []*armcompute.VirtualMachine{ { ID: aws.String((&arm.ResourceID{ @@ -1954,6 +2085,8 @@ func TestAzureVMDiscovery(t *testing.T) { }, }, }, + discoveryConfig: defaultDiscoveryConfig, + staticMatchers: Matchers{}, wantInstalledInstances: []string{"testvm"}, }, } @@ -2001,17 +2134,10 @@ func TestAzureVMDiscovery(t *testing.T) { CloudClients: testCloudClients, KubernetesClient: fake.NewSimpleClientset(), AccessPoint: tlsServer.Auth(), - Matchers: Matchers{ - Azure: []types.AzureMatcher{{ - Types: []string{"vm"}, - Subscriptions: []string{"testsub"}, - ResourceGroups: []string{"testrg"}, - Regions: []string{"westcentralus"}, - ResourceTags: types.Labels{"teleport": {"yes"}}, - }}, - }, - Emitter: emitter, - Log: logger, + Matchers: tc.staticMatchers, + Emitter: emitter, + Log: logger, + DiscoveryGroup: defaultDiscoveryGroup, }) require.NoError(t, err) @@ -2019,11 +2145,17 @@ func TestAzureVMDiscovery(t *testing.T) { emitter.server = server emitter.t = t - r, w := io.Pipe() - t.Cleanup(func() { - require.NoError(t, r.Close()) - require.NoError(t, w.Close()) - }) + if tc.discoveryConfig != nil { + _, err := tlsServer.Auth().DiscoveryConfigClient().CreateDiscoveryConfig(ctx, tc.discoveryConfig) + require.NoError(t, err) + + // Wait for the DiscoveryConfig to be added to the dynamic matchers + require.Eventually(t, func() bool { + server.muDynamicServerAzureFetchers.RLock() + defer server.muDynamicServerAzureFetchers.RUnlock() + return len(server.dynamicServerAzureFetchers) > 0 + }, 1*time.Second, 100*time.Millisecond) + } go server.Start() t.Cleanup(server.Stop) @@ -2090,10 +2222,35 @@ func (m *mockGCPInstaller) GetInstalledInstances() []string { func TestGCPVMDiscovery(t *testing.T) { t.Parallel() + + defaultDiscoveryGroup := "dc001" + defaultStaticMatcher := Matchers{ + GCP: []types.GCPMatcher{{ + Types: []string{"gce"}, + ProjectIDs: []string{"myproject"}, + Locations: []string{"myzone"}, + Labels: types.Labels{"teleport": {"yes"}}, + }}, + } + + defaultDiscoveryConfig, err := discoveryconfig.NewDiscoveryConfig( + header.Metadata{Name: uuid.NewString()}, + discoveryconfig.Spec{ + DiscoveryGroup: defaultDiscoveryGroup, + AWS: defaultStaticMatcher.AWS, + Azure: defaultStaticMatcher.Azure, + GCP: defaultStaticMatcher.GCP, + Kube: defaultStaticMatcher.Kubernetes, + }, + ) + require.NoError(t, err) + tests := []struct { name string presentVMs []types.Server foundGCPVMs []*gcp.Instance + discoveryConfig *discoveryconfig.DiscoveryConfig + staticMatchers Matchers wantInstalledInstances []string }{ { @@ -2109,6 +2266,7 @@ func TestGCPVMDiscovery(t *testing.T) { }, }, }, + staticMatchers: defaultStaticMatcher, wantInstalledInstances: []string{"myinstance"}, }, { @@ -2127,6 +2285,7 @@ func TestGCPVMDiscovery(t *testing.T) { }, }, }, + staticMatchers: defaultStaticMatcher, foundGCPVMs: []*gcp.Instance{ { ProjectID: "myproject", @@ -2154,6 +2313,22 @@ func TestGCPVMDiscovery(t *testing.T) { }, }, }, + staticMatchers: defaultStaticMatcher, + foundGCPVMs: []*gcp.Instance{ + { + ProjectID: "myproject", + Zone: "myzone", + Name: "myinstance", + Labels: map[string]string{ + "teleport": "yes", + }, + }, + }, + wantInstalledInstances: []string{"myinstance"}, + }, + { + name: "no nodes present, 1 found usind dynamic matchers", + presentVMs: []types.Server{}, foundGCPVMs: []*gcp.Instance{ { ProjectID: "myproject", @@ -2164,6 +2339,8 @@ func TestGCPVMDiscovery(t *testing.T) { }, }, }, + staticMatchers: Matchers{}, + discoveryConfig: defaultDiscoveryConfig, wantInstalledInstances: []string{"myinstance"}, }, } @@ -2210,16 +2387,10 @@ func TestGCPVMDiscovery(t *testing.T) { CloudClients: testCloudClients, KubernetesClient: fake.NewSimpleClientset(), AccessPoint: tlsServer.Auth(), - Matchers: Matchers{ - GCP: []types.GCPMatcher{{ - Types: []string{"gce"}, - ProjectIDs: []string{"myproject"}, - Locations: []string{"myzone"}, - Labels: types.Labels{"teleport": {"yes"}}, - }}, - }, - Emitter: emitter, - Log: logger, + Matchers: tc.staticMatchers, + Emitter: emitter, + Log: logger, + DiscoveryGroup: defaultDiscoveryGroup, }) require.NoError(t, err) @@ -2227,11 +2398,17 @@ func TestGCPVMDiscovery(t *testing.T) { emitter.server = server emitter.t = t - r, w := io.Pipe() - t.Cleanup(func() { - require.NoError(t, r.Close()) - require.NoError(t, w.Close()) - }) + if tc.discoveryConfig != nil { + _, err := tlsServer.Auth().DiscoveryConfigClient().CreateDiscoveryConfig(ctx, tc.discoveryConfig) + require.NoError(t, err) + + // Wait for the DiscoveryConfig to be added to the dynamic matchers + require.Eventually(t, func() bool { + server.muDynamicServerGCPFetchers.RLock() + defer server.muDynamicServerGCPFetchers.RUnlock() + return len(server.dynamicServerGCPFetchers) > 0 + }, 1*time.Second, 100*time.Millisecond) + } go server.Start() t.Cleanup(server.Stop) diff --git a/lib/srv/server/azure_watcher.go b/lib/srv/server/azure_watcher.go index 38633e1225ad2..5ef4c01565903 100644 --- a/lib/srv/server/azure_watcher.go +++ b/lib/srv/server/azure_watcher.go @@ -28,7 +28,6 @@ import ( usageeventsv1 "github.com/gravitational/teleport/api/gen/proto/go/usageevents/v1" "github.com/gravitational/teleport/api/types" "github.com/gravitational/teleport/api/types/installers" - "github.com/gravitational/teleport/lib/cloud" "github.com/gravitational/teleport/lib/cloud/azure" "github.com/gravitational/teleport/lib/services" ) @@ -79,10 +78,10 @@ type azureClientGetter interface { } // NewAzureWatcher creates a new Azure watcher instance. -func NewAzureWatcher(ctx context.Context, matchers []types.AzureMatcher, clients cloud.Clients, opts ...Option) (*Watcher, error) { +func NewAzureWatcher(ctx context.Context, fetchersFn func() []Fetcher, opts ...Option) (*Watcher, error) { cancelCtx, cancelFn := context.WithCancel(ctx) watcher := Watcher{ - fetchers: []Fetcher{}, + fetchersFn: fetchersFn, ctx: cancelCtx, cancel: cancelFn, pollInterval: time.Minute, @@ -91,6 +90,12 @@ func NewAzureWatcher(ctx context.Context, matchers []types.AzureMatcher, clients for _, opt := range opts { opt(&watcher) } + return &watcher, nil +} + +// MatchersToAzureInstanceFetchers converts a list of Azure VM Matchers into a list of Azure VM Fetchers. +func MatchersToAzureInstanceFetchers(matchers []types.AzureMatcher, clients azureClientGetter) []Fetcher { + ret := make([]Fetcher, 0) for _, matcher := range matchers { for _, subscription := range matcher.Subscriptions { for _, resourceGroup := range matcher.ResourceGroups { @@ -100,11 +105,11 @@ func NewAzureWatcher(ctx context.Context, matchers []types.AzureMatcher, clients ResourceGroup: resourceGroup, AzureClientGetter: clients, }) - watcher.fetchers = append(watcher.fetchers, fetcher) + ret = append(ret, fetcher) } } } - return &watcher, nil + return ret } type azureFetcherConfig struct { diff --git a/lib/srv/server/azure_watcher_test.go b/lib/srv/server/azure_watcher_test.go index 420e8d2830fa8..9edecaf8ebb84 100644 --- a/lib/srv/server/azure_watcher_test.go +++ b/lib/srv/server/azure_watcher_test.go @@ -136,7 +136,9 @@ func TestAzureWatcher(t *testing.T) { t.Run(tc.name, func(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) t.Cleanup(cancel) - watcher, err := NewAzureWatcher(ctx, []types.AzureMatcher{tc.matcher}, &clients) + watcher, err := NewAzureWatcher(ctx, func() []Fetcher { + return MatchersToAzureInstanceFetchers([]types.AzureMatcher{tc.matcher}, &clients) + }) require.NoError(t, err) go watcher.Run() diff --git a/lib/srv/server/ec2_watcher.go b/lib/srv/server/ec2_watcher.go index e8f8c7c4061fa..4bfc7ce27b17f 100644 --- a/lib/srv/server/ec2_watcher.go +++ b/lib/srv/server/ec2_watcher.go @@ -144,10 +144,10 @@ func (instances *EC2Instances) MakeEvents() map[string]*usageeventsv1.ResourceCr } // NewEC2Watcher creates a new EC2 watcher instance. -func NewEC2Watcher(ctx context.Context, matchers []types.AWSMatcher, clients cloud.Clients, missedRotation <-chan []types.Server, opts ...Option) (*Watcher, error) { +func NewEC2Watcher(ctx context.Context, fetchersFn func() []Fetcher, missedRotation <-chan []types.Server, opts ...Option) (*Watcher, error) { cancelCtx, cancelFn := context.WithCancel(ctx) watcher := Watcher{ - fetchers: []Fetcher{}, + fetchersFn: fetchersFn, ctx: cancelCtx, cancel: cancelFn, pollInterval: time.Minute, @@ -157,6 +157,12 @@ func NewEC2Watcher(ctx context.Context, matchers []types.AWSMatcher, clients clo for _, opt := range opts { opt(&watcher) } + return &watcher, nil +} + +// MatchersToEC2InstanceFetchers converts a list of AWS EC2 Matchers into a list of AWS EC2 Fetchers. +func MatchersToEC2InstanceFetchers(ctx context.Context, matchers []types.AWSMatcher, clients cloud.Clients) ([]Fetcher, error) { + ret := []Fetcher{} for _, matcher := range matchers { for _, region := range matcher.Regions { // TODO(gavin): support assume_role_arn for ec2. @@ -175,10 +181,10 @@ func NewEC2Watcher(ctx context.Context, matchers []types.AWSMatcher, clients clo if err != nil { return nil, trace.Wrap(err) } - watcher.fetchers = append(watcher.fetchers, fetcher) + ret = append(ret, fetcher) } } - return &watcher, nil + return ret, nil } type ec2FetcherConfig struct { diff --git a/lib/srv/server/ec2_watcher_test.go b/lib/srv/server/ec2_watcher_test.go index 24ebd6b7168fb..0f4950318e6c9 100644 --- a/lib/srv/server/ec2_watcher_test.go +++ b/lib/srv/server/ec2_watcher_test.go @@ -216,7 +216,14 @@ func TestEC2Watcher(t *testing.T) { }}, } clients.ec2Client.output = &output - watcher, err := NewEC2Watcher(ctx, matchers, &clients, make(<-chan []types.Server)) + + fetchersFn := func() []Fetcher { + fetchers, err := MatchersToEC2InstanceFetchers(ctx, matchers, &clients) + require.NoError(t, err) + + return fetchers + } + watcher, err := NewEC2Watcher(ctx, fetchersFn, make(<-chan []types.Server)) require.NoError(t, err) go watcher.Run() diff --git a/lib/srv/server/gcp_watcher.go b/lib/srv/server/gcp_watcher.go index 149692cd5d3bd..d084bb0926f1a 100644 --- a/lib/srv/server/gcp_watcher.go +++ b/lib/srv/server/gcp_watcher.go @@ -27,7 +27,6 @@ import ( usageeventsv1 "github.com/gravitational/teleport/api/gen/proto/go/usageevents/v1" "github.com/gravitational/teleport/api/types" "github.com/gravitational/teleport/api/types/installers" - "github.com/gravitational/teleport/lib/cloud" "github.com/gravitational/teleport/lib/cloud/gcp" "github.com/gravitational/teleport/lib/services" ) @@ -70,26 +69,30 @@ func (instances *GCPInstances) MakeEvents() map[string]*usageeventsv1.ResourceCr } // NewGCPWatcher creates a new GCP watcher. -func NewGCPWatcher(ctx context.Context, matchers []types.GCPMatcher, clients cloud.Clients) (*Watcher, error) { +func NewGCPWatcher(ctx context.Context, fetchersFn func() []Fetcher) (*Watcher, error) { cancelCtx, cancelFn := context.WithCancel(ctx) watcher := Watcher{ - fetchers: []Fetcher{}, + fetchersFn: fetchersFn, ctx: cancelCtx, cancel: cancelFn, pollInterval: time.Minute, InstancesC: make(chan Instances), } - client, err := clients.GetGCPInstancesClient(ctx) - if err != nil { - return nil, trace.Wrap(err) - } + return &watcher, nil +} + +// MatchersToGCPInstanceFetchers converts a list of GCP GCE Matchers into a list of GCP GCE Fetchers. +func MatchersToGCPInstanceFetchers(matchers []types.GCPMatcher, gcpClient gcp.InstancesClient) []Fetcher { + fetchers := make([]Fetcher, 0, len(matchers)) + for _, matcher := range matchers { - watcher.fetchers = append(watcher.fetchers, newGCPInstanceFetcher(gcpFetcherConfig{ + fetchers = append(fetchers, newGCPInstanceFetcher(gcpFetcherConfig{ Matcher: matcher, - GCPClient: client, + GCPClient: gcpClient, })) } - return &watcher, nil + + return fetchers } type gcpFetcherConfig struct { diff --git a/lib/srv/server/watcher.go b/lib/srv/server/watcher.go index d10b2b8e9c934..c234cd52d0afa 100644 --- a/lib/srv/server/watcher.go +++ b/lib/srv/server/watcher.go @@ -48,7 +48,7 @@ type Watcher struct { InstancesC chan Instances missedRotation <-chan []types.Server - fetchers []Fetcher + fetchersFn func() []Fetcher pollInterval time.Duration ctx context.Context cancel context.CancelFunc @@ -72,24 +72,21 @@ func (w *Watcher) sendInstancesOrLogError(instancesColl []Instances, err error) // Run starts the watcher's main watch loop. func (w *Watcher) Run() { - if len(w.fetchers) == 0 { - return - } ticker := time.NewTicker(w.pollInterval) defer ticker.Stop() - for _, fetcher := range w.fetchers { + for _, fetcher := range w.fetchersFn() { w.sendInstancesOrLogError(fetcher.GetInstances(w.ctx, false)) } for { select { case insts := <-w.missedRotation: - for _, fetcher := range w.fetchers { + for _, fetcher := range w.fetchersFn() { w.sendInstancesOrLogError(fetcher.GetMatchingInstances(insts, true)) } case <-ticker.C: - for _, fetcher := range w.fetchers { + for _, fetcher := range w.fetchersFn() { w.sendInstancesOrLogError(fetcher.GetInstances(w.ctx, false)) } case <-w.ctx.Done():