Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions lib/config/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -1541,12 +1541,14 @@ func applyDiscoveryConfig(fc *FileConfig, cfg *servicecfg.Config) error {
}

serviceMatcher := types.AWSMatcher{
Types: matcher.Types,
Regions: matcher.Regions,
AssumeRole: assumeRole,
Tags: matcher.Tags,
Params: installParams,
SSM: &types.AWSSSM{DocumentName: matcher.SSM.DocumentName},
Types: matcher.Types,
Regions: matcher.Regions,
AssumeRole: assumeRole,
Tags: matcher.Tags,
Params: installParams,
SSM: &types.AWSSSM{DocumentName: matcher.SSM.DocumentName},
Integration: matcher.Integration,
KubeAppDiscovery: matcher.KubeAppDiscovery,
}
if err := serviceMatcher.CheckAndSetDefaults(); err != nil {
return trace.Wrap(err)
Expand Down Expand Up @@ -1736,6 +1738,7 @@ func applyDatabasesConfig(fc *FileConfig, cfg *servicecfg.Config) error {
RoleARN: matcher.AssumeRoleARN,
ExternalID: matcher.ExternalID,
},
Integration: matcher.Integration,
})
}
for _, matcher := range fc.Databases.AzureMatchers {
Expand Down
16 changes: 16 additions & 0 deletions lib/config/configuration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,15 @@ func TestConfigReading(t *testing.T) {
AssumeRoleARN: "arn:aws:iam::123456789012:role/DBDiscoverer",
ExternalID: "externalID123",
},
{
Types: []string{"eks"},
Regions: []string{"us-west-1", "us-east-1"},
Tags: map[string]apiutils.Strings{
"a": {"b"},
},
Integration: "integration1",
KubeAppDiscovery: true,
},
},
AzureMatchers: []AzureMatcher{
{
Expand Down Expand Up @@ -1489,6 +1498,13 @@ func makeConfigFixture() string {
AssumeRoleARN: "arn:aws:iam::123456789012:role/DBDiscoverer",
ExternalID: "externalID123",
},
{
Types: []string{"eks"},
Regions: []string{"us-west-1", "us-east-1"},
Tags: map[string]apiutils.Strings{"a": {"b"}},
Integration: "integration1",
KubeAppDiscovery: true,
},
}

conf.Discovery.AzureMatchers = []AzureMatcher{
Expand Down
6 changes: 6 additions & 0 deletions lib/config/fileconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -1685,6 +1685,12 @@ type AWSMatcher struct {
// SSM provides options to use when sending a document command to
// an EC2 node
SSM AWSSSM `yaml:"ssm,omitempty"`
// Integration is the integration name used to generate credentials to interact with AWS APIs.
// Environment credentials will not be used when this value is set.
Integration string `yaml:"integration"`
// KubeAppDiscovery controls whether Kubernetes App Discovery will be enabled for agents running on
// discovered clusters, currently only affects AWS EKS discovery in integration mode.
KubeAppDiscovery bool `yaml:"kube_app_discovery"`
}

// InstallParams sets join method to use on discovered nodes
Expand Down
62 changes: 31 additions & 31 deletions lib/srv/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ type Server struct {
// gcpInstaller is used to start the installation process on discovered GCP
// virtual machines
gcpInstaller gcpInstaller
// kubeFetchers holds all non-integration based kubernetes fetchers for Azure and other clouds.
// kubeFetchers holds all non-dynamic kubernetes fetchers for Azure and other clouds.
kubeFetchers []common.Fetcher
// kubeAppsFetchers holds all kubernetes fetchers for apps.
kubeAppsFetchers []common.Fetcher
Expand Down Expand Up @@ -312,11 +312,11 @@ type Server struct {
muDynamicServerGCPFetchers sync.RWMutex
staticServerGCPFetchers []server.Fetcher

// dynamicKubeIntegrationFetchers holds the current kube fetchers that use integration as a source of credentials,
// dynamicKubeFetchers holds the current kube fetchers that use integration as a source of credentials,
// for the Dynamic Matchers (those coming from DiscoveryConfig resource).
// The key is the DiscoveryConfig name.
dynamicKubeIntegrationFetchers map[string][]common.Fetcher
muDynamicKubeIntegrationFetchers sync.RWMutex
dynamicKubeFetchers map[string][]common.Fetcher
muDynamicKubeFetchers sync.RWMutex

// dynamicTAGSyncFetchers holds the current TAG Fetchers for the Dynamic Matchers (those coming from DiscoveryConfig resource).
// The key is the DiscoveryConfig name.
Expand Down Expand Up @@ -344,16 +344,16 @@ func New(ctx context.Context, cfg *Config) (*Server, error) {

localCtx, cancelfn := context.WithCancel(ctx)
s := &Server{
Config: cfg,
ctx: localCtx,
cancelfn: cancelfn,
usageEventCache: make(map[string]struct{}),
dynamicKubeIntegrationFetchers: make(map[string][]common.Fetcher),
dynamicDatabaseFetchers: make(map[string][]common.Fetcher),
dynamicServerAWSFetchers: make(map[string][]server.Fetcher),
dynamicServerAzureFetchers: make(map[string][]server.Fetcher),
dynamicServerGCPFetchers: make(map[string][]server.Fetcher),
dynamicTAGSyncFetchers: make(map[string][]aws_sync.AWSSync),
Config: cfg,
ctx: localCtx,
cancelfn: cancelfn,
usageEventCache: make(map[string]struct{}),
dynamicKubeFetchers: make(map[string][]common.Fetcher),
dynamicDatabaseFetchers: make(map[string][]common.Fetcher),
dynamicServerAWSFetchers: make(map[string][]server.Fetcher),
dynamicServerAzureFetchers: make(map[string][]server.Fetcher),
dynamicServerGCPFetchers: make(map[string][]server.Fetcher),
dynamicTAGSyncFetchers: make(map[string][]aws_sync.AWSSync),
}
s.discardUnsupportedMatchers(&s.Matchers)

Expand Down Expand Up @@ -475,7 +475,7 @@ func (s *Server) initAWSWatchers(matchers []types.AWSMatcher) error {
_, otherMatchers = splitMatchers(otherMatchers, db.IsAWSMatcherType)

// Add non-integration kube fetchers.
kubeFetchers, _, err := fetchers.MakeEKSFetchersFromAWSMatchers(s.Log, s.CloudClients, otherMatchers)
kubeFetchers, err := fetchers.MakeEKSFetchersFromAWSMatchers(s.Log, s.CloudClients, otherMatchers)
if err != nil {
return trace.Wrap(err)
}
Expand Down Expand Up @@ -588,22 +588,22 @@ func (s *Server) databaseFetchersFromMatchers(matchers Matchers) ([]common.Fetch
return fetchers, nil
}

func (s *Server) kubeIntegrationFetchersFromMatchers(matchers Matchers) ([]common.Fetcher, error) {
func (s *Server) kubeFetchersFromMatchers(matchers Matchers) ([]common.Fetcher, error) {
var result []common.Fetcher

// AWS
awsKubeMatchers, _ := splitMatchers(matchers.AWS, func(matcherType string) bool {
return matcherType == types.AWSMatcherEKS
})
if len(awsKubeMatchers) > 0 {
_, kubeIntegrationFetchers, err := fetchers.MakeEKSFetchersFromAWSMatchers(s.Log, s.CloudClients, awsKubeMatchers)
eksFetchers, err := fetchers.MakeEKSFetchersFromAWSMatchers(s.Log, s.CloudClients, awsKubeMatchers)
if err != nil {
return nil, trace.Wrap(err)
}
result = append(result, kubeIntegrationFetchers...)
result = append(result, eksFetchers...)
}

// There can't be kube integration fetchers for other matcher types.
// There can't be kube fetchers for other matcher types.

return result, nil
}
Expand Down Expand Up @@ -1469,9 +1469,9 @@ func (s *Server) deleteDynamicFetchers(name string) {
delete(s.dynamicServerGCPFetchers, name)
s.muDynamicServerGCPFetchers.Unlock()

s.muDynamicKubeIntegrationFetchers.Lock()
delete(s.dynamicKubeIntegrationFetchers, name)
s.muDynamicKubeIntegrationFetchers.Unlock()
s.muDynamicKubeFetchers.Lock()
delete(s.dynamicKubeFetchers, name)
s.muDynamicKubeFetchers.Unlock()

s.muDynamicTAGSyncFetchers.Lock()
delete(s.dynamicTAGSyncFetchers, name)
Expand Down Expand Up @@ -1519,15 +1519,6 @@ func (s *Server) upsertDynamicMatchers(ctx context.Context, dc *discoveryconfig.
s.dynamicDatabaseFetchers[dc.GetName()] = databaseFetchers
s.muDynamicDatabaseFetchers.Unlock()

kubeIntegrationFetchers, err := s.kubeIntegrationFetchersFromMatchers(matchers)
if err != nil {
return trace.Wrap(err)
}

s.muDynamicKubeIntegrationFetchers.Lock()
s.dynamicKubeIntegrationFetchers[dc.GetName()] = kubeIntegrationFetchers
s.muDynamicKubeIntegrationFetchers.Unlock()

awsSyncMatchers, err := s.accessGraphFetchersFromMatchers(
ctx, matchers,
)
Expand All @@ -1538,6 +1529,15 @@ func (s *Server) upsertDynamicMatchers(ctx context.Context, dc *discoveryconfig.
s.dynamicTAGSyncFetchers[dc.GetName()] = awsSyncMatchers
s.muDynamicTAGSyncFetchers.Unlock()

kubeFetchers, err := s.kubeFetchersFromMatchers(matchers)
if err != nil {
return trace.Wrap(err)
}

s.muDynamicKubeFetchers.Lock()
s.dynamicKubeFetchers[dc.GetName()] = kubeFetchers
s.muDynamicKubeFetchers.Unlock()

// TODO(marco): add other fetchers: Kube Clusters and Kube Resources (Apps)
return nil
}
Expand Down
16 changes: 8 additions & 8 deletions lib/srv/discovery/fetchers/eks.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (c *EKSFetcherConfig) CheckAndSetDefaults() error {

// MakeEKSFetchersFromAWSMatchers creates fetchers from the provided matchers. Returned fetchers are separated
// by their reliance on the integration.
func MakeEKSFetchersFromAWSMatchers(log logrus.FieldLogger, clients cloud.AWSClients, matchers []types.AWSMatcher) (kubeFetchers, kubeIntegrationFetchers []common.Fetcher, _ error) {
func MakeEKSFetchersFromAWSMatchers(log logrus.FieldLogger, clients cloud.AWSClients, matchers []types.AWSMatcher) (kubeFetchers []common.Fetcher, _ error) {
for _, matcher := range matchers {
var matcherAssumeRole types.AssumeRole
if matcher.AssumeRole != nil {
Expand All @@ -123,17 +123,12 @@ func MakeEKSFetchersFromAWSMatchers(log logrus.FieldLogger, clients cloud.AWSCli
log.WithError(err).Warnf("Could not initialize EKS fetcher(Region=%q, Labels=%q, AssumeRole=%q), skipping.", region, matcher.Tags, matcherAssumeRole.RoleARN)
continue
}

if matcher.Integration != "" {
kubeIntegrationFetchers = append(kubeIntegrationFetchers, fetcher)
} else {
kubeFetchers = append(kubeFetchers, fetcher)
}
kubeFetchers = append(kubeFetchers, fetcher)
}
}
}
}
return kubeFetchers, kubeIntegrationFetchers, nil
return kubeFetchers, nil
}

// NewEKSFetcher creates a new EKS fetcher configuration.
Expand Down Expand Up @@ -170,6 +165,11 @@ func (a *eksFetcher) getClient(ctx context.Context) (eksiface.EKSAPI, error) {
return a.client, nil
}

// GetIntegration returns the integration name that is used for getting credentials of the fetcher.
func (a *eksFetcher) GetIntegration() string {
return a.Integration
}

type DiscoveredEKSCluster struct {
types.KubeCluster

Expand Down
68 changes: 59 additions & 9 deletions lib/srv/discovery/kube_integration_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,11 @@ import (
"github.com/gravitational/teleport/lib/srv/discovery/common"
)

// startKubeIntegrationWatchers starts kube watchers that use integration for the credentials. Currently only
// EKS watchers can do that and they behave differently from non-integration ones - we install agent on the
// discovered clusters, instead of just proxying them.
func (s *Server) startKubeIntegrationWatchers() error {
if s.dynamicMatcherWatcher == nil {
if len(s.getKubeIntegrationFetchers()) == 0 && s.dynamicMatcherWatcher == nil {
return nil
}

Expand All @@ -53,7 +56,11 @@ func (s *Server) startKubeIntegrationWatchers() error {
}

watcher, err := common.NewWatcher(s.ctx, common.WatcherConfig{
FetchersFn: s.getKubeIntegrationFetchers,
FetchersFn: func() []common.Fetcher {
kubeIntegrationFetchers := s.getKubeIntegrationFetchers()
s.submitFetchersEvent(kubeIntegrationFetchers)
return kubeIntegrationFetchers
},
Log: s.Log.WithField("kind", types.KindKubernetesCluster),
DiscoveryGroup: s.DiscoveryGroup,
Interval: s.PollInterval,
Expand Down Expand Up @@ -110,7 +117,6 @@ func (s *Server) startKubeIntegrationWatchers() error {
continue
}

// When enrolling EKS clusters, client for enrollment depends on the region and integration used.
// When enrolling EKS clusters, client for enrollment depends on the region and integration used.
type regionIntegrationMapKey struct {
region string
Expand Down Expand Up @@ -220,16 +226,60 @@ func (s *Server) getKubeAgentVersion(releaseChannels automaticupgrades.Channels)
return strings.TrimPrefix(agentVersion, "v"), nil
}

func (s *Server) getKubeIntegrationFetchers() []common.Fetcher {
type IntegrationFetcher interface {
// GetIntegration returns the integration name that is used for getting credentials of the fetcher.
GetIntegration() string
}

func (s *Server) getKubeFetchers(integration bool) []common.Fetcher {
var kubeFetchers []common.Fetcher

s.muDynamicKubeIntegrationFetchers.RLock()
for _, fetcherSet := range s.dynamicKubeIntegrationFetchers {
kubeFetchers = append(kubeFetchers, fetcherSet...)
filterIntegrationFetchers := func(fetcher common.Fetcher) bool {
f, ok := fetcher.(IntegrationFetcher)
if !ok {
return false
}

return f.GetIntegration() != ""
}
s.muDynamicKubeIntegrationFetchers.RUnlock()

s.submitFetchersEvent(kubeFetchers)
filterNonIntegrationFetchers := func(fetcher common.Fetcher) bool {
f, ok := fetcher.(IntegrationFetcher)
if !ok {
return true
}

return f.GetIntegration() == ""
}

filter := filterIntegrationFetchers
if !integration {
filter = filterNonIntegrationFetchers
}

s.muDynamicKubeFetchers.RLock()
for _, fetcherSet := range s.dynamicKubeFetchers {
for _, f := range fetcherSet {
if filter(f) {
kubeFetchers = append(kubeFetchers, f)
}
}
}
s.muDynamicKubeFetchers.RUnlock()

for _, f := range s.kubeFetchers {
if filter(f) {
kubeFetchers = append(kubeFetchers, f)
}
}

return kubeFetchers
}

func (s *Server) getKubeIntegrationFetchers() []common.Fetcher {
return s.getKubeFetchers(true)
}

func (s *Server) getKubeNonIntegrationFetchers() []common.Fetcher {
return s.getKubeFetchers(false)
}
Loading