diff --git a/lib/config/configuration.go b/lib/config/configuration.go index a0bc55d6e0bf3..d0775aa345fb7 100644 --- a/lib/config/configuration.go +++ b/lib/config/configuration.go @@ -1541,12 +1541,14 @@ func applyDiscoveryConfig(fc *FileConfig, cfg *servicecfg.Config) error { } serviceMatcher := types.AWSMatcher{ - Types: matcher.Types, - Regions: matcher.Regions, - AssumeRole: assumeRole, - Tags: matcher.Tags, - Params: installParams, - SSM: &types.AWSSSM{DocumentName: matcher.SSM.DocumentName}, + Types: matcher.Types, + Regions: matcher.Regions, + AssumeRole: assumeRole, + Tags: matcher.Tags, + Params: installParams, + SSM: &types.AWSSSM{DocumentName: matcher.SSM.DocumentName}, + Integration: matcher.Integration, + KubeAppDiscovery: matcher.KubeAppDiscovery, } if err := serviceMatcher.CheckAndSetDefaults(); err != nil { return trace.Wrap(err) @@ -1736,6 +1738,7 @@ func applyDatabasesConfig(fc *FileConfig, cfg *servicecfg.Config) error { RoleARN: matcher.AssumeRoleARN, ExternalID: matcher.ExternalID, }, + Integration: matcher.Integration, }) } for _, matcher := range fc.Databases.AzureMatchers { diff --git a/lib/config/configuration_test.go b/lib/config/configuration_test.go index 917dfe1607d0b..0ac621678d225 100644 --- a/lib/config/configuration_test.go +++ b/lib/config/configuration_test.go @@ -363,6 +363,15 @@ func TestConfigReading(t *testing.T) { AssumeRoleARN: "arn:aws:iam::123456789012:role/DBDiscoverer", ExternalID: "externalID123", }, + { + Types: []string{"eks"}, + Regions: []string{"us-west-1", "us-east-1"}, + Tags: map[string]apiutils.Strings{ + "a": {"b"}, + }, + Integration: "integration1", + KubeAppDiscovery: true, + }, }, AzureMatchers: []AzureMatcher{ { @@ -1489,6 +1498,13 @@ func makeConfigFixture() string { AssumeRoleARN: "arn:aws:iam::123456789012:role/DBDiscoverer", ExternalID: "externalID123", }, + { + Types: []string{"eks"}, + Regions: []string{"us-west-1", "us-east-1"}, + Tags: map[string]apiutils.Strings{"a": {"b"}}, + Integration: "integration1", + KubeAppDiscovery: true, + }, } conf.Discovery.AzureMatchers = []AzureMatcher{ diff --git a/lib/config/fileconf.go b/lib/config/fileconf.go index 75783ae945175..6ea47e20ec511 100644 --- a/lib/config/fileconf.go +++ b/lib/config/fileconf.go @@ -1685,6 +1685,12 @@ type AWSMatcher struct { // SSM provides options to use when sending a document command to // an EC2 node SSM AWSSSM `yaml:"ssm,omitempty"` + // Integration is the integration name used to generate credentials to interact with AWS APIs. + // Environment credentials will not be used when this value is set. + Integration string `yaml:"integration"` + // KubeAppDiscovery controls whether Kubernetes App Discovery will be enabled for agents running on + // discovered clusters, currently only affects AWS EKS discovery in integration mode. + KubeAppDiscovery bool `yaml:"kube_app_discovery"` } // InstallParams sets join method to use on discovered nodes diff --git a/lib/srv/discovery/discovery.go b/lib/srv/discovery/discovery.go index de2542887285d..415ee3d916745 100644 --- a/lib/srv/discovery/discovery.go +++ b/lib/srv/discovery/discovery.go @@ -278,7 +278,7 @@ type Server struct { // gcpInstaller is used to start the installation process on discovered GCP // virtual machines gcpInstaller gcpInstaller - // kubeFetchers holds all non-integration based kubernetes fetchers for Azure and other clouds. + // kubeFetchers holds all non-dynamic kubernetes fetchers for Azure and other clouds. kubeFetchers []common.Fetcher // kubeAppsFetchers holds all kubernetes fetchers for apps. kubeAppsFetchers []common.Fetcher @@ -312,11 +312,11 @@ type Server struct { muDynamicServerGCPFetchers sync.RWMutex staticServerGCPFetchers []server.Fetcher - // dynamicKubeIntegrationFetchers holds the current kube fetchers that use integration as a source of credentials, + // dynamicKubeFetchers holds the current kube fetchers that use integration as a source of credentials, // for the Dynamic Matchers (those coming from DiscoveryConfig resource). // The key is the DiscoveryConfig name. - dynamicKubeIntegrationFetchers map[string][]common.Fetcher - muDynamicKubeIntegrationFetchers sync.RWMutex + dynamicKubeFetchers map[string][]common.Fetcher + muDynamicKubeFetchers sync.RWMutex // dynamicTAGSyncFetchers holds the current TAG Fetchers for the Dynamic Matchers (those coming from DiscoveryConfig resource). // The key is the DiscoveryConfig name. @@ -344,16 +344,16 @@ func New(ctx context.Context, cfg *Config) (*Server, error) { localCtx, cancelfn := context.WithCancel(ctx) s := &Server{ - Config: cfg, - ctx: localCtx, - cancelfn: cancelfn, - usageEventCache: make(map[string]struct{}), - dynamicKubeIntegrationFetchers: make(map[string][]common.Fetcher), - dynamicDatabaseFetchers: make(map[string][]common.Fetcher), - dynamicServerAWSFetchers: make(map[string][]server.Fetcher), - dynamicServerAzureFetchers: make(map[string][]server.Fetcher), - dynamicServerGCPFetchers: make(map[string][]server.Fetcher), - dynamicTAGSyncFetchers: make(map[string][]aws_sync.AWSSync), + Config: cfg, + ctx: localCtx, + cancelfn: cancelfn, + usageEventCache: make(map[string]struct{}), + dynamicKubeFetchers: make(map[string][]common.Fetcher), + dynamicDatabaseFetchers: make(map[string][]common.Fetcher), + dynamicServerAWSFetchers: make(map[string][]server.Fetcher), + dynamicServerAzureFetchers: make(map[string][]server.Fetcher), + dynamicServerGCPFetchers: make(map[string][]server.Fetcher), + dynamicTAGSyncFetchers: make(map[string][]aws_sync.AWSSync), } s.discardUnsupportedMatchers(&s.Matchers) @@ -475,7 +475,7 @@ func (s *Server) initAWSWatchers(matchers []types.AWSMatcher) error { _, otherMatchers = splitMatchers(otherMatchers, db.IsAWSMatcherType) // Add non-integration kube fetchers. - kubeFetchers, _, err := fetchers.MakeEKSFetchersFromAWSMatchers(s.Log, s.CloudClients, otherMatchers) + kubeFetchers, err := fetchers.MakeEKSFetchersFromAWSMatchers(s.Log, s.CloudClients, otherMatchers) if err != nil { return trace.Wrap(err) } @@ -588,7 +588,7 @@ func (s *Server) databaseFetchersFromMatchers(matchers Matchers) ([]common.Fetch return fetchers, nil } -func (s *Server) kubeIntegrationFetchersFromMatchers(matchers Matchers) ([]common.Fetcher, error) { +func (s *Server) kubeFetchersFromMatchers(matchers Matchers) ([]common.Fetcher, error) { var result []common.Fetcher // AWS @@ -596,14 +596,14 @@ func (s *Server) kubeIntegrationFetchersFromMatchers(matchers Matchers) ([]commo return matcherType == types.AWSMatcherEKS }) if len(awsKubeMatchers) > 0 { - _, kubeIntegrationFetchers, err := fetchers.MakeEKSFetchersFromAWSMatchers(s.Log, s.CloudClients, awsKubeMatchers) + eksFetchers, err := fetchers.MakeEKSFetchersFromAWSMatchers(s.Log, s.CloudClients, awsKubeMatchers) if err != nil { return nil, trace.Wrap(err) } - result = append(result, kubeIntegrationFetchers...) + result = append(result, eksFetchers...) } - // There can't be kube integration fetchers for other matcher types. + // There can't be kube fetchers for other matcher types. return result, nil } @@ -1469,9 +1469,9 @@ func (s *Server) deleteDynamicFetchers(name string) { delete(s.dynamicServerGCPFetchers, name) s.muDynamicServerGCPFetchers.Unlock() - s.muDynamicKubeIntegrationFetchers.Lock() - delete(s.dynamicKubeIntegrationFetchers, name) - s.muDynamicKubeIntegrationFetchers.Unlock() + s.muDynamicKubeFetchers.Lock() + delete(s.dynamicKubeFetchers, name) + s.muDynamicKubeFetchers.Unlock() s.muDynamicTAGSyncFetchers.Lock() delete(s.dynamicTAGSyncFetchers, name) @@ -1519,15 +1519,6 @@ func (s *Server) upsertDynamicMatchers(ctx context.Context, dc *discoveryconfig. s.dynamicDatabaseFetchers[dc.GetName()] = databaseFetchers s.muDynamicDatabaseFetchers.Unlock() - kubeIntegrationFetchers, err := s.kubeIntegrationFetchersFromMatchers(matchers) - if err != nil { - return trace.Wrap(err) - } - - s.muDynamicKubeIntegrationFetchers.Lock() - s.dynamicKubeIntegrationFetchers[dc.GetName()] = kubeIntegrationFetchers - s.muDynamicKubeIntegrationFetchers.Unlock() - awsSyncMatchers, err := s.accessGraphFetchersFromMatchers( ctx, matchers, ) @@ -1538,6 +1529,15 @@ func (s *Server) upsertDynamicMatchers(ctx context.Context, dc *discoveryconfig. s.dynamicTAGSyncFetchers[dc.GetName()] = awsSyncMatchers s.muDynamicTAGSyncFetchers.Unlock() + kubeFetchers, err := s.kubeFetchersFromMatchers(matchers) + if err != nil { + return trace.Wrap(err) + } + + s.muDynamicKubeFetchers.Lock() + s.dynamicKubeFetchers[dc.GetName()] = kubeFetchers + s.muDynamicKubeFetchers.Unlock() + // TODO(marco): add other fetchers: Kube Clusters and Kube Resources (Apps) return nil } diff --git a/lib/srv/discovery/fetchers/eks.go b/lib/srv/discovery/fetchers/eks.go index ecacbb01faa36..a7a4e496c9fae 100644 --- a/lib/srv/discovery/fetchers/eks.go +++ b/lib/srv/discovery/fetchers/eks.go @@ -97,7 +97,7 @@ func (c *EKSFetcherConfig) CheckAndSetDefaults() error { // MakeEKSFetchersFromAWSMatchers creates fetchers from the provided matchers. Returned fetchers are separated // by their reliance on the integration. -func MakeEKSFetchersFromAWSMatchers(log logrus.FieldLogger, clients cloud.AWSClients, matchers []types.AWSMatcher) (kubeFetchers, kubeIntegrationFetchers []common.Fetcher, _ error) { +func MakeEKSFetchersFromAWSMatchers(log logrus.FieldLogger, clients cloud.AWSClients, matchers []types.AWSMatcher) (kubeFetchers []common.Fetcher, _ error) { for _, matcher := range matchers { var matcherAssumeRole types.AssumeRole if matcher.AssumeRole != nil { @@ -123,17 +123,12 @@ func MakeEKSFetchersFromAWSMatchers(log logrus.FieldLogger, clients cloud.AWSCli log.WithError(err).Warnf("Could not initialize EKS fetcher(Region=%q, Labels=%q, AssumeRole=%q), skipping.", region, matcher.Tags, matcherAssumeRole.RoleARN) continue } - - if matcher.Integration != "" { - kubeIntegrationFetchers = append(kubeIntegrationFetchers, fetcher) - } else { - kubeFetchers = append(kubeFetchers, fetcher) - } + kubeFetchers = append(kubeFetchers, fetcher) } } } } - return kubeFetchers, kubeIntegrationFetchers, nil + return kubeFetchers, nil } // NewEKSFetcher creates a new EKS fetcher configuration. @@ -170,6 +165,11 @@ func (a *eksFetcher) getClient(ctx context.Context) (eksiface.EKSAPI, error) { return a.client, nil } +// GetIntegration returns the integration name that is used for getting credentials of the fetcher. +func (a *eksFetcher) GetIntegration() string { + return a.Integration +} + type DiscoveredEKSCluster struct { types.KubeCluster diff --git a/lib/srv/discovery/kube_integration_watcher.go b/lib/srv/discovery/kube_integration_watcher.go index cee614f4a9f50..ec827617010bf 100644 --- a/lib/srv/discovery/kube_integration_watcher.go +++ b/lib/srv/discovery/kube_integration_watcher.go @@ -35,8 +35,11 @@ import ( "github.com/gravitational/teleport/lib/srv/discovery/common" ) +// startKubeIntegrationWatchers starts kube watchers that use integration for the credentials. Currently only +// EKS watchers can do that and they behave differently from non-integration ones - we install agent on the +// discovered clusters, instead of just proxying them. func (s *Server) startKubeIntegrationWatchers() error { - if s.dynamicMatcherWatcher == nil { + if len(s.getKubeIntegrationFetchers()) == 0 && s.dynamicMatcherWatcher == nil { return nil } @@ -53,7 +56,11 @@ func (s *Server) startKubeIntegrationWatchers() error { } watcher, err := common.NewWatcher(s.ctx, common.WatcherConfig{ - FetchersFn: s.getKubeIntegrationFetchers, + FetchersFn: func() []common.Fetcher { + kubeIntegrationFetchers := s.getKubeIntegrationFetchers() + s.submitFetchersEvent(kubeIntegrationFetchers) + return kubeIntegrationFetchers + }, Log: s.Log.WithField("kind", types.KindKubernetesCluster), DiscoveryGroup: s.DiscoveryGroup, Interval: s.PollInterval, @@ -110,7 +117,6 @@ func (s *Server) startKubeIntegrationWatchers() error { continue } - // When enrolling EKS clusters, client for enrollment depends on the region and integration used. // When enrolling EKS clusters, client for enrollment depends on the region and integration used. type regionIntegrationMapKey struct { region string @@ -220,16 +226,60 @@ func (s *Server) getKubeAgentVersion(releaseChannels automaticupgrades.Channels) return strings.TrimPrefix(agentVersion, "v"), nil } -func (s *Server) getKubeIntegrationFetchers() []common.Fetcher { +type IntegrationFetcher interface { + // GetIntegration returns the integration name that is used for getting credentials of the fetcher. + GetIntegration() string +} + +func (s *Server) getKubeFetchers(integration bool) []common.Fetcher { var kubeFetchers []common.Fetcher - s.muDynamicKubeIntegrationFetchers.RLock() - for _, fetcherSet := range s.dynamicKubeIntegrationFetchers { - kubeFetchers = append(kubeFetchers, fetcherSet...) + filterIntegrationFetchers := func(fetcher common.Fetcher) bool { + f, ok := fetcher.(IntegrationFetcher) + if !ok { + return false + } + + return f.GetIntegration() != "" } - s.muDynamicKubeIntegrationFetchers.RUnlock() - s.submitFetchersEvent(kubeFetchers) + filterNonIntegrationFetchers := func(fetcher common.Fetcher) bool { + f, ok := fetcher.(IntegrationFetcher) + if !ok { + return true + } + + return f.GetIntegration() == "" + } + + filter := filterIntegrationFetchers + if !integration { + filter = filterNonIntegrationFetchers + } + + s.muDynamicKubeFetchers.RLock() + for _, fetcherSet := range s.dynamicKubeFetchers { + for _, f := range fetcherSet { + if filter(f) { + kubeFetchers = append(kubeFetchers, f) + } + } + } + s.muDynamicKubeFetchers.RUnlock() + + for _, f := range s.kubeFetchers { + if filter(f) { + kubeFetchers = append(kubeFetchers, f) + } + } return kubeFetchers } + +func (s *Server) getKubeIntegrationFetchers() []common.Fetcher { + return s.getKubeFetchers(true) +} + +func (s *Server) getKubeNonIntegrationFetchers() []common.Fetcher { + return s.getKubeFetchers(false) +} diff --git a/lib/srv/discovery/kube_integration_watcher_test.go b/lib/srv/discovery/kube_integration_watcher_test.go index f45398d6afdf7..4bef839f789fa 100644 --- a/lib/srv/discovery/kube_integration_watcher_test.go +++ b/lib/srv/discovery/kube_integration_watcher_test.go @@ -50,6 +50,7 @@ import ( "github.com/gravitational/teleport/lib/integrations/awsoidc" "github.com/gravitational/teleport/lib/services" "github.com/gravitational/teleport/lib/srv/discovery/common" + "github.com/gravitational/teleport/lib/srv/discovery/fetchers" ) func TestGetAgentVersion(t *testing.T) { @@ -121,6 +122,82 @@ func TestGetAgentVersion(t *testing.T) { } } +func TestServer_getKubeFetchers(t *testing.T) { + eks1, err := fetchers.NewEKSFetcher(fetchers.EKSFetcherConfig{ + EKSClientGetter: &cloud.TestCloudClients{}, + FilterLabels: types.Labels{"l1": []string{"v1"}}, + Region: "region1", + }) + require.NoError(t, err) + eks2, err := fetchers.NewEKSFetcher(fetchers.EKSFetcherConfig{ + EKSClientGetter: &cloud.TestCloudClients{}, + FilterLabels: types.Labels{"l1": []string{"v1"}}, + Region: "region1", + Integration: "aws1"}) + require.NoError(t, err) + eks3, err := fetchers.NewEKSFetcher(fetchers.EKSFetcherConfig{ + EKSClientGetter: &cloud.TestCloudClients{}, + FilterLabels: types.Labels{"l1": []string{"v1"}}, + Region: "region1", + Integration: "aws1"}) + require.NoError(t, err) + + aks1, err := fetchers.NewAKSFetcher(fetchers.AKSFetcherConfig{ + Client: &mockAKSAPI{}, + FilterLabels: types.Labels{"l1": []string{"v1"}}, + Regions: []string{"region1"}, + }) + require.NoError(t, err) + aks2, err := fetchers.NewAKSFetcher(fetchers.AKSFetcherConfig{ + Client: &mockAKSAPI{}, + FilterLabels: types.Labels{"l1": []string{"v1"}}, + Regions: []string{"region1"}, + }) + require.NoError(t, err) + aks3, err := fetchers.NewAKSFetcher(fetchers.AKSFetcherConfig{ + Client: &mockAKSAPI{}, + FilterLabels: types.Labels{"l1": []string{"v1"}}, + Regions: []string{"region1"}, + }) + require.NoError(t, err) + + testCases := []struct { + kubeFetchers []common.Fetcher + kubeDynamicFetchers map[string][]common.Fetcher + expectedIntegrationFetchers []common.Fetcher + expectedNonIntegrationFetchers []common.Fetcher + }{ + { + kubeFetchers: []common.Fetcher{eks1}, + expectedNonIntegrationFetchers: []common.Fetcher{eks1}, + }, + { + kubeFetchers: []common.Fetcher{eks1, eks2, eks3, aks1, aks2, aks3}, + expectedIntegrationFetchers: []common.Fetcher{eks2, eks3}, + expectedNonIntegrationFetchers: []common.Fetcher{eks1, aks1, aks2, aks3}, + }, + { + kubeFetchers: []common.Fetcher{eks1}, + kubeDynamicFetchers: map[string][]common.Fetcher{"group1": {eks2}}, + expectedIntegrationFetchers: []common.Fetcher{eks2}, + expectedNonIntegrationFetchers: []common.Fetcher{eks1}, + }, + { + kubeFetchers: []common.Fetcher{aks1, aks2}, + kubeDynamicFetchers: map[string][]common.Fetcher{"group1": {eks1}}, + expectedIntegrationFetchers: []common.Fetcher{}, + expectedNonIntegrationFetchers: []common.Fetcher{eks1, aks1, aks2}, + }, + } + + for _, tc := range testCases { + s := Server{kubeFetchers: tc.kubeFetchers, dynamicKubeFetchers: tc.kubeDynamicFetchers} + + require.ElementsMatch(t, tc.expectedIntegrationFetchers, s.getKubeFetchers(true)) + require.ElementsMatch(t, tc.expectedNonIntegrationFetchers, s.getKubeFetchers(false)) + } +} + func TestDiscoveryKubeIntegrationEKS(t *testing.T) { const ( mainDiscoveryGroup = "main" @@ -384,9 +461,9 @@ func TestDiscoveryKubeIntegrationEKS(t *testing.T) { // Wait for the DiscoveryConfig to be added to the dynamic fetchers require.Eventually(t, func() bool { - discServer.muDynamicKubeIntegrationFetchers.RLock() - defer discServer.muDynamicKubeIntegrationFetchers.RUnlock() - return len(discServer.dynamicKubeIntegrationFetchers) > 0 + discServer.muDynamicKubeFetchers.RLock() + defer discServer.muDynamicKubeFetchers.RUnlock() + return len(discServer.dynamicKubeFetchers) > 0 }, 1*time.Second, 100*time.Millisecond) } diff --git a/lib/srv/discovery/kube_watcher.go b/lib/srv/discovery/kube_watcher.go index 9274dc30c5e42..58c61475c8f32 100644 --- a/lib/srv/discovery/kube_watcher.go +++ b/lib/srv/discovery/kube_watcher.go @@ -34,9 +34,10 @@ import ( const kubeEventPrefix = "kube/" func (s *Server) startKubeWatchers() error { - if len(s.kubeFetchers) == 0 { + if len(s.getKubeNonIntegrationFetchers()) == 0 && s.dynamicMatcherWatcher == nil { return nil } + var ( kubeResources []types.KubeCluster mu sync.Mutex @@ -70,7 +71,11 @@ func (s *Server) startKubeWatchers() error { } watcher, err := common.NewWatcher(s.ctx, common.WatcherConfig{ - FetchersFn: common.StaticFetchers(s.kubeFetchers), + FetchersFn: func() []common.Fetcher { + kubeNonIntegrationFetchers := s.getKubeNonIntegrationFetchers() + s.submitFetchersEvent(kubeNonIntegrationFetchers) + return kubeNonIntegrationFetchers + }, Log: s.Log.WithField("kind", types.KindKubernetesCluster), DiscoveryGroup: s.DiscoveryGroup, Interval: s.PollInterval,