diff --git a/api/types/kubernetes.go b/api/types/kubernetes.go
index 0f540c0f07fc8..b470eb08bdbdc 100644
--- a/api/types/kubernetes.go
+++ b/api/types/kubernetes.go
@@ -77,6 +77,18 @@ type KubeCluster interface {
GetCloud() string
}
+// DiscoveredEKSCluster represents a server discovered by EKS discovery fetchers.
+type DiscoveredEKSCluster interface {
+ // KubeCluster is base discovered cluster.
+ KubeCluster
+ // GetKubeCluster returns base cluster.
+ GetKubeCluster() KubeCluster
+ // GetIntegration returns integration name used when discovering this cluster.
+ GetIntegration() string
+ // GetKubeAppDiscovery returns setting showing if Kubernetes App Discovery show be enabled for the discovered cluster.
+ GetKubeAppDiscovery() bool
+}
+
// NewKubernetesClusterV3FromLegacyCluster creates a new Kubernetes cluster resource
// from the legacy type.
func NewKubernetesClusterV3FromLegacyCluster(namespace string, cluster *KubernetesCluster) (*KubernetesClusterV3, error) {
diff --git a/api/types/kubernetes_server.go b/api/types/kubernetes_server.go
index 8dbf68cee8127..a277b67a7de99 100644
--- a/api/types/kubernetes_server.go
+++ b/api/types/kubernetes_server.go
@@ -329,6 +329,15 @@ func (s KubeServers) Less(i, j int) bool {
// Swap swaps two kube servers.
func (s KubeServers) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
+// ToMap returns these kubernetes clusters as a map keyed by cluster name.
+func (s KubeServers) ToMap() map[string]KubeServer {
+ m := make(map[string]KubeServer, len(s))
+ for _, kubeServer := range s {
+ m[kubeServer.GetName()] = kubeServer
+ }
+ return m
+}
+
// SortByCustom custom sorts by given sort criteria.
func (s KubeServers) SortByCustom(sortBy SortBy) error {
if sortBy.Field == "" {
diff --git a/lib/auth/api.go b/lib/auth/api.go
index c14f9e5c5da55..1fabc359ff1d0 100644
--- a/lib/auth/api.go
+++ b/lib/auth/api.go
@@ -24,8 +24,10 @@ import (
"time"
"github.com/gravitational/trace"
+ "google.golang.org/grpc"
"github.com/gravitational/teleport/api/client/proto"
+ integrationpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/integration/v1"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/api/types/accesslist"
"github.com/gravitational/teleport/api/types/discoveryconfig"
@@ -697,6 +699,8 @@ type ReadDiscoveryAccessPoint interface {
GetKubernetesCluster(ctx context.Context, name string) (types.KubeCluster, error)
// GetKubernetesClusters returns all kubernetes cluster resources.
GetKubernetesClusters(ctx context.Context) ([]types.KubeCluster, error)
+ // GetKubernetesServers returns all registered kubernetes servers.
+ GetKubernetesServers(ctx context.Context) ([]types.KubeServer, error)
// GetDatabases returns all database resources.
GetDatabases(ctx context.Context) ([]types.Database, error)
@@ -755,6 +759,12 @@ type DiscoveryAccessPoint interface {
// GenerateAWSOIDCToken generates a token to be used to execute an AWS OIDC Integration action.
GenerateAWSOIDCToken(ctx context.Context) (string, error)
+
+ // EnrollEKSClusters enrolls EKS clusters into Teleport by installing teleport-kube-agent chart on the clusters.
+ EnrollEKSClusters(context.Context, *integrationpb.EnrollEKSClustersRequest, ...grpc.CallOption) (*integrationpb.EnrollEKSClustersResponse, error)
+
+ // Ping gets basic info about the auth server.
+ Ping(context.Context) (proto.PingResponse, error)
}
// ReadOktaAccessPoint is a read only API interface to be
@@ -1319,6 +1329,16 @@ func (w *DiscoveryWrapper) GenerateAWSOIDCToken(ctx context.Context) (string, er
return w.NoCache.GenerateAWSOIDCToken(ctx)
}
+// EnrollEKSClusters enrolls EKS clusters into Teleport by installing teleport-kube-agent chart on the clusters.
+func (w *DiscoveryWrapper) EnrollEKSClusters(ctx context.Context, req *integrationpb.EnrollEKSClustersRequest, _ ...grpc.CallOption) (*integrationpb.EnrollEKSClustersResponse, error) {
+ return w.NoCache.EnrollEKSClusters(ctx, req)
+}
+
+// Ping gets basic info about the auth server.
+func (w *DiscoveryWrapper) Ping(ctx context.Context) (proto.PingResponse, error) {
+ return w.NoCache.Ping(ctx)
+}
+
// Close closes all associated resources
func (w *DiscoveryWrapper) Close() error {
err := w.NoCache.Close()
diff --git a/lib/authz/permissions.go b/lib/authz/permissions.go
index 2bf879a6ca81b..9c7480348d0f7 100644
--- a/lib/authz/permissions.go
+++ b/lib/authz/permissions.go
@@ -1111,6 +1111,7 @@ func definitionForBuiltinRole(clusterName string, recConfig types.SessionRecordi
types.NewRule(types.KindClusterName, services.RO()),
types.NewRule(types.KindNamespace, services.RO()),
types.NewRule(types.KindNode, services.RO()),
+ types.NewRule(types.KindKubeServer, services.RO()),
types.NewRule(types.KindKubernetesCluster, services.RW()),
types.NewRule(types.KindDatabase, services.RW()),
types.NewRule(types.KindServerInfo, services.RW()),
diff --git a/lib/cache/cache.go b/lib/cache/cache.go
index 546fa52b36fd6..e4e654dd111ad 100644
--- a/lib/cache/cache.go
+++ b/lib/cache/cache.go
@@ -418,6 +418,7 @@ func ForDiscovery(cfg Config) Config {
{Kind: types.KindNamespace, Name: apidefaults.Namespace},
{Kind: types.KindNode},
{Kind: types.KindKubernetesCluster},
+ {Kind: types.KindKubeServer},
{Kind: types.KindDatabase},
{Kind: types.KindApp},
{Kind: types.KindDiscoveryConfig},
diff --git a/lib/integrations/awsoidc/eks_enroll_clusters.go b/lib/integrations/awsoidc/eks_enroll_clusters.go
index a502c5ea3ffe9..d63e31dc66f0f 100644
--- a/lib/integrations/awsoidc/eks_enroll_clusters.go
+++ b/lib/integrations/awsoidc/eks_enroll_clusters.go
@@ -550,9 +550,6 @@ func installKubeAgent(ctx context.Context, cfg installKubeAgentParams) error {
installCmd := action.NewInstall(cfg.actionConfig)
installCmd.RepoURL = agentRepoURL.String()
installCmd.Version = cfg.req.AgentVersion
- if strings.Contains(installCmd.Version, "dev") {
- installCmd.Version = "" // For testing during development.
- }
agentChart, err := getChartData(installCmd.Version)
if err != nil {
@@ -587,6 +584,7 @@ func installKubeAgent(ctx context.Context, cfg installKubeAgentParams) error {
for k, v := range cfg.eksCluster.Tags {
eksTags[k] = aws.String(v)
}
+ eksTags[types.OriginLabel] = aws.String(types.OriginCloud)
kubeCluster, err := services.NewKubeClusterFromAWSEKS(aws.ToString(cfg.eksCluster.Name), aws.ToString(cfg.eksCluster.Arn), eksTags)
if err != nil {
return trace.Wrap(err)
diff --git a/lib/service/discovery.go b/lib/service/discovery.go
index 7e6dcfebdc1f8..cf3f7b509a202 100644
--- a/lib/service/discovery.go
+++ b/lib/service/discovery.go
@@ -79,6 +79,7 @@ func (process *TeleportProcess) initDiscoveryService() error {
AccessPoint: accessPoint,
Log: process.log,
ClusterName: conn.ClientIdentity.ClusterName,
+ ClusterFeatures: process.getClusterFeatures,
PollInterval: process.Config.Discovery.PollInterval,
ServerCredentials: tlsConfig,
AccessGraphConfig: process.Config.AccessGraph,
diff --git a/lib/service/service.go b/lib/service/service.go
index 4700517f8cd0e..88a17545844b9 100644
--- a/lib/service/service.go
+++ b/lib/service/service.go
@@ -69,6 +69,7 @@ import (
"github.com/gravitational/teleport/api/client/webclient"
"github.com/gravitational/teleport/api/constants"
apidefaults "github.com/gravitational/teleport/api/defaults"
+ integrationpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/integration/v1"
kubeproto "github.com/gravitational/teleport/api/gen/proto/go/teleport/kube/v1"
transportpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/transport/v1"
"github.com/gravitational/teleport/api/types"
@@ -2306,10 +2307,15 @@ func (process *TeleportProcess) newLocalCacheForDatabase(clt auth.ClientI, cache
return auth.NewDatabaseWrapper(clt, cache), nil
}
+type eksClustersEnroller interface {
+ EnrollEKSClusters(context.Context, *integrationpb.EnrollEKSClustersRequest, ...grpc.CallOption) (*integrationpb.EnrollEKSClustersResponse, error)
+}
+
// combinedDiscoveryClient is an auth.Client client with other, specific, services added to it.
type combinedDiscoveryClient struct {
auth.ClientI
services.DiscoveryConfigsGetter
+ eksClustersEnroller
}
// newLocalCacheForDiscovery returns a new instance of access point for a discovery service.
@@ -2317,6 +2323,7 @@ func (process *TeleportProcess) newLocalCacheForDiscovery(clt auth.ClientI, cach
client := combinedDiscoveryClient{
ClientI: clt,
DiscoveryConfigsGetter: clt.DiscoveryConfigClient(),
+ eksClustersEnroller: clt.IntegrationAWSOIDCClient(),
}
// if caching is disabled, return access point
diff --git a/lib/services/kubernetes.go b/lib/services/kubernetes.go
index 8a6e95cba865e..3df72bcb0dcbd 100644
--- a/lib/services/kubernetes.go
+++ b/lib/services/kubernetes.go
@@ -124,6 +124,10 @@ func MarshalKubeCluster(kubeCluster types.KubeCluster, opts ...MarshalOption) ([
return nil, trace.Wrap(err)
}
+ if c, ok := kubeCluster.(types.DiscoveredEKSCluster); ok {
+ kubeCluster = c.GetKubeCluster()
+ }
+
switch cluster := kubeCluster.(type) {
case *types.KubernetesClusterV3:
if err := cluster.CheckAndSetDefaults(); err != nil {
diff --git a/lib/srv/discovery/discovery.go b/lib/srv/discovery/discovery.go
index 5d661e4ff87ff..0f44d93b590e1 100644
--- a/lib/srv/discovery/discovery.go
+++ b/lib/srv/discovery/discovery.go
@@ -140,6 +140,10 @@ type Config struct {
// AccessGraphConfig is the configuration for the Access Graph client
AccessGraphConfig servicecfg.AccessGraphConfig
+ // ClusterFeatures returns flags for supported/unsupported features.
+ // Used as a function because cluster features might change on Auth restarts.
+ ClusterFeatures func() proto.Features
+
// TriggerFetchC is a list of channels that must be notified when a off-band poll must be performed.
// This is used to start a polling iteration when a new DiscoveryConfig change is received.
TriggerFetchC []chan struct{}
@@ -209,6 +213,10 @@ kubernetes matchers are present.`)
c.clock = clockwork.NewRealClock()
}
+ if c.ClusterFeatures == nil {
+ return trace.BadParameter("cluster features are required")
+ }
+
c.Log = c.Log.WithField(trace.Component, teleport.ComponentDiscovery)
if c.DiscoveryGroup == "" {
@@ -245,7 +253,7 @@ type Server struct {
// gcpInstaller is used to start the installation process on discovered GCP
// virtual machines
gcpInstaller gcpInstaller
- // kubeFetchers holds all kubernetes fetchers for Azure and other clouds.
+ // kubeFetchers holds all non-integration based kubernetes fetchers for Azure and other clouds.
kubeFetchers []common.Fetcher
// kubeAppsFetchers holds all kubernetes fetchers for apps.
kubeAppsFetchers []common.Fetcher
@@ -285,6 +293,12 @@ type Server struct {
muDynamicTAGSyncFetchers sync.RWMutex
staticTAGSyncFetchers []aws_sync.AWSSync
+ // dynamicKubeIntegrationFetchers holds the current kube fetchers that use integration as a source of credentials,
+ // for the Dynamic Matchers (those coming from DiscoveryConfig resource).
+ // The key is the DiscoveryConfig name.
+ dynamicKubeIntegrationFetchers map[string][]common.Fetcher
+ muDynamicKubeIntegrationFetchers sync.RWMutex
+
// caRotationCh receives nodes that need to have their CAs rotated.
caRotationCh chan []types.Server
// reconciler periodically reconciles the labels of discovered instances
@@ -305,15 +319,16 @@ func New(ctx context.Context, cfg *Config) (*Server, error) {
localCtx, cancelfn := context.WithCancel(ctx)
s := &Server{
- Config: cfg,
- ctx: localCtx,
- cancelfn: cancelfn,
- usageEventCache: make(map[string]struct{}),
- dynamicDatabaseFetchers: make(map[string][]common.Fetcher),
- dynamicServerAWSFetchers: make(map[string][]server.Fetcher),
- dynamicServerAzureFetchers: make(map[string][]server.Fetcher),
- dynamicServerGCPFetchers: make(map[string][]server.Fetcher),
- dynamicTAGSyncFetchers: make(map[string][]aws_sync.AWSSync),
+ Config: cfg,
+ ctx: localCtx,
+ cancelfn: cancelfn,
+ usageEventCache: make(map[string]struct{}),
+ dynamicKubeIntegrationFetchers: make(map[string][]common.Fetcher),
+ dynamicDatabaseFetchers: make(map[string][]common.Fetcher),
+ dynamicServerAWSFetchers: make(map[string][]server.Fetcher),
+ dynamicServerAzureFetchers: make(map[string][]server.Fetcher),
+ dynamicServerGCPFetchers: make(map[string][]server.Fetcher),
+ dynamicTAGSyncFetchers: make(map[string][]aws_sync.AWSSync),
}
s.discardUnsupportedMatchers(&s.Matchers)
@@ -430,44 +445,16 @@ func (s *Server) initAWSWatchers(matchers []types.AWSMatcher) error {
// Database fetchers were added in databaseFetchersFromMatchers.
_, otherMatchers = splitMatchers(otherMatchers, db.IsAWSMatcherType)
- // Add kube fetchers.
- for _, matcher := range otherMatchers {
- matcherAssumeRole := types.AssumeRole{}
- if matcher.AssumeRole != nil {
- matcherAssumeRole = *matcher.AssumeRole
- }
-
- for _, t := range matcher.Types {
- for _, region := range matcher.Regions {
- switch t {
- case types.AWSMatcherEKS:
- fetcher, err := s.getEKSFetcher(region, matcherAssumeRole, matcher.Tags)
- if err != nil {
- s.Log.WithError(err).Warnf("Could not initialize EKS fetcher(Region=%q, Labels=%q, AssumeRole=%q), skipping.", region, matcher.Tags, matcherAssumeRole.RoleARN)
- continue
- }
- s.kubeFetchers = append(s.kubeFetchers, fetcher)
- }
- }
- }
+ // Add non-integration kube fetchers.
+ kubeFetchers, _, err := fetchers.MakeEKSFetchersFromAWSMatchers(s.Log, s.CloudClients, otherMatchers)
+ if err != nil {
+ return trace.Wrap(err)
}
+ s.kubeFetchers = append(s.kubeFetchers, kubeFetchers...)
return nil
}
-func (s *Server) getEKSFetcher(region string, assumeRole types.AssumeRole, tags types.Labels) (common.Fetcher, error) {
- fetcher, err := fetchers.NewEKSFetcher(
- fetchers.EKSFetcherConfig{
- EKSClientGetter: s.CloudClients,
- AssumeRole: assumeRole,
- Region: region,
- FilterLabels: tags,
- Log: s.Log,
- },
- )
- return fetcher, trace.Wrap(err)
-}
-
func (s *Server) initKubeAppWatchers(matchers []types.KubernetesMatcher) error {
if len(matchers) == 0 {
return nil
@@ -572,6 +559,26 @@ func (s *Server) databaseFetchersFromMatchers(matchers Matchers) ([]common.Fetch
return fetchers, nil
}
+func (s *Server) kubeIntegrationFetchersFromMatchers(matchers Matchers) ([]common.Fetcher, error) {
+ var result []common.Fetcher
+
+ // AWS
+ awsKubeMatchers, _ := splitMatchers(matchers.AWS, func(matcherType string) bool {
+ return matcherType == types.AWSMatcherEKS
+ })
+ if len(awsKubeMatchers) > 0 {
+ _, kubeIntegrationFetchers, err := fetchers.MakeEKSFetchersFromAWSMatchers(s.Log, s.CloudClients, awsKubeMatchers)
+ if err != nil {
+ return nil, trace.Wrap(err)
+ }
+ result = append(result, kubeIntegrationFetchers...)
+ }
+
+ // There can't be kube integration fetchers for other matcher types.
+
+ return result, nil
+}
+
// initAzureWatchers starts Azure resource watchers based on types provided.
func (s *Server) initAzureWatchers(ctx context.Context, matchers []types.AzureMatcher) error {
vmMatchers, otherMatchers := splitMatchers(matchers, func(matcherType string) bool {
@@ -1214,6 +1221,9 @@ func (s *Server) Start() error {
if err := s.startKubeWatchers(); err != nil {
return trace.Wrap(err)
}
+ if err := s.startKubeIntegrationWatchers(); err != nil {
+ return trace.Wrap(err)
+ }
if err := s.startKubeAppsWatchers(); err != nil {
return trace.Wrap(err)
}
@@ -1336,6 +1346,10 @@ func (s *Server) deleteDynamicFetchers(name string) {
s.muDynamicTAGSyncFetchers.Lock()
delete(s.dynamicTAGSyncFetchers, name)
s.muDynamicTAGSyncFetchers.Unlock()
+
+ s.muDynamicKubeIntegrationFetchers.Lock()
+ delete(s.dynamicKubeIntegrationFetchers, name)
+ s.muDynamicKubeIntegrationFetchers.Unlock()
}
// upsertDynamicMatchers upserts the internal set of dynamic matchers given a particular discovery config.
@@ -1389,6 +1403,15 @@ func (s *Server) upsertDynamicMatchers(ctx context.Context, dc *discoveryconfig.
s.dynamicTAGSyncFetchers[dc.GetName()] = awsSyncMatchers
s.muDynamicTAGSyncFetchers.Unlock()
+ kubeIntegrationFetchers, err := s.kubeIntegrationFetchersFromMatchers(matchers)
+ if err != nil {
+ return trace.Wrap(err)
+ }
+
+ s.muDynamicKubeIntegrationFetchers.Lock()
+ s.dynamicKubeIntegrationFetchers[dc.GetName()] = kubeIntegrationFetchers
+ s.muDynamicKubeIntegrationFetchers.Unlock()
+
// TODO(marco): add other fetchers: Kube Clusters and Kube Resources (Apps)
return nil
}
diff --git a/lib/srv/discovery/discovery_test.go b/lib/srv/discovery/discovery_test.go
index ef6c29158251e..16b2e0ea69fb6 100644
--- a/lib/srv/discovery/discovery_test.go
+++ b/lib/srv/discovery/discovery_test.go
@@ -54,12 +54,15 @@ import (
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
+ "google.golang.org/grpc"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/fake"
+ "github.com/gravitational/teleport/api/client/proto"
"github.com/gravitational/teleport/api/defaults"
+ integrationpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/integration/v1"
usageeventsv1 "github.com/gravitational/teleport/api/gen/proto/go/usageevents/v1"
"github.com/gravitational/teleport/api/internalutils/stream"
"github.com/gravitational/teleport/api/types"
@@ -493,8 +496,9 @@ func TestDiscoveryServer(t *testing.T) {
tlsServer.Auth().SetUsageReporter(reporter)
server, err := New(authz.ContextWithUser(context.Background(), identity.I), &Config{
CloudClients: testCloudClients,
+ ClusterFeatures: func() proto.Features { return proto.Features{} },
KubernetesClient: fake.NewSimpleClientset(),
- AccessPoint: tlsServer.Auth(),
+ AccessPoint: getDiscoveryAccessPoint(tlsServer.Auth(), authClient),
Matchers: tc.staticMatchers,
Emitter: tc.emitter,
Log: logger,
@@ -690,8 +694,9 @@ func TestDiscoveryKubeServices(t *testing.T) {
ctx,
&Config{
CloudClients: &cloud.TestCloudClients{},
+ ClusterFeatures: func() proto.Features { return proto.Features{} },
KubernetesClient: fake.NewSimpleClientset(objects...),
- AccessPoint: tlsServer.Auth(),
+ AccessPoint: getDiscoveryAccessPoint(tlsServer.Auth(), authClient),
Matchers: Matchers{
Kubernetes: tt.kubernetesMatchers,
},
@@ -1009,8 +1014,9 @@ func TestDiscoveryInCloudKube(t *testing.T) {
authz.ContextWithUser(ctx, identity.I),
&Config{
CloudClients: testCloudClients,
+ ClusterFeatures: func() proto.Features { return proto.Features{} },
KubernetesClient: fake.NewSimpleClientset(),
- AccessPoint: tlsServer.Auth(),
+ AccessPoint: getDiscoveryAccessPoint(tlsServer.Auth(), authClient),
Matchers: Matchers{
AWS: tc.awsMatchers,
Azure: tc.azureMatchers,
@@ -1163,6 +1169,7 @@ func TestDiscoveryServer_New(t *testing.T) {
ctx,
&Config{
CloudClients: nil,
+ ClusterFeatures: func() proto.Features { return proto.Features{} },
AccessPoint: newFakeAccessPoint(),
Matchers: tt.matchers,
Emitter: &mockEmitter{},
@@ -1750,8 +1757,9 @@ func TestDiscoveryDatabase(t *testing.T) {
&Config{
IntegrationOnlyCredentials: integrationOnlyCredential,
CloudClients: testCloudClients,
+ ClusterFeatures: func() proto.Features { return proto.Features{} },
KubernetesClient: fake.NewSimpleClientset(),
- AccessPoint: tlsServer.Auth(),
+ AccessPoint: getDiscoveryAccessPoint(tlsServer.Auth(), authClient),
Matchers: Matchers{
AWS: tc.awsMatchers,
Azure: tc.azureMatchers,
@@ -1856,8 +1864,9 @@ func TestDiscoveryDatabaseRemovingDiscoveryConfigs(t *testing.T) {
authz.ContextWithUser(ctx, identity.I),
&Config{
CloudClients: testCloudClients,
+ ClusterFeatures: func() proto.Features { return proto.Features{} },
KubernetesClient: fake.NewSimpleClientset(),
- AccessPoint: tlsServer.Auth(),
+ AccessPoint: getDiscoveryAccessPoint(tlsServer.Auth(), authClient),
Matchers: Matchers{},
Emitter: authClient,
DiscoveryGroup: mainDiscoveryGroup,
@@ -2285,8 +2294,9 @@ func TestAzureVMDiscovery(t *testing.T) {
tlsServer.Auth().SetUsageReporter(reporter)
server, err := New(authz.ContextWithUser(context.Background(), identity.I), &Config{
CloudClients: testCloudClients,
+ ClusterFeatures: func() proto.Features { return proto.Features{} },
KubernetesClient: fake.NewSimpleClientset(),
- AccessPoint: tlsServer.Auth(),
+ AccessPoint: getDiscoveryAccessPoint(tlsServer.Auth(), authClient),
Matchers: tc.staticMatchers,
Emitter: emitter,
Log: logger,
@@ -2545,8 +2555,9 @@ func TestGCPVMDiscovery(t *testing.T) {
tlsServer.Auth().SetUsageReporter(reporter)
server, err := New(authz.ContextWithUser(context.Background(), identity.I), &Config{
CloudClients: testCloudClients,
+ ClusterFeatures: func() proto.Features { return proto.Features{} },
KubernetesClient: fake.NewSimpleClientset(),
- AccessPoint: tlsServer.Auth(),
+ AccessPoint: getDiscoveryAccessPoint(tlsServer.Auth(), authClient),
Matchers: tc.staticMatchers,
Emitter: emitter,
Log: logger,
@@ -2660,8 +2671,9 @@ func TestEmitUsageEvents(t *testing.T) {
tlsServer.Auth().SetUsageReporter(reporter)
server, err := New(authz.ContextWithUser(context.Background(), identity.I), &Config{
- CloudClients: &testClients,
- AccessPoint: tlsServer.Auth(),
+ CloudClients: &testClients,
+ ClusterFeatures: func() proto.Features { return proto.Features{} },
+ AccessPoint: getDiscoveryAccessPoint(tlsServer.Auth(), authClient),
Matchers: Matchers{
Azure: []types.AzureMatcher{{
Types: []string{"vm"},
@@ -2691,8 +2703,33 @@ func TestEmitUsageEvents(t *testing.T) {
require.Equal(t, 3, reporter.ResourceCreateEventCount())
}
+type eksClustersEnroller interface {
+ EnrollEKSClusters(context.Context, *integrationpb.EnrollEKSClustersRequest, ...grpc.CallOption) (*integrationpb.EnrollEKSClustersResponse, error)
+}
+
+type combinedDiscoveryClient struct {
+ *auth.Server
+ eksClustersEnroller
+}
+
+func (d *combinedDiscoveryClient) EnrollEKSClusters(ctx context.Context, req *integrationpb.EnrollEKSClustersRequest, _ ...grpc.CallOption) (*integrationpb.EnrollEKSClustersResponse, error) {
+ if d.eksClustersEnroller != nil {
+ return d.eksClustersEnroller.EnrollEKSClusters(ctx, req)
+ }
+ return nil, trace.BadParameter("not implemented.")
+}
+
+func getDiscoveryAccessPoint(authServer *auth.Server, authClient auth.ClientI) auth.DiscoveryAccessPoint {
+ return &combinedDiscoveryClient{Server: authServer, eksClustersEnroller: authClient.IntegrationAWSOIDCClient()}
+
+}
+
type fakeAccessPoint struct {
auth.DiscoveryAccessPoint
+
+ ping func(context.Context) (proto.PingResponse, error)
+ enrollEKSClusters func(context.Context, *integrationpb.EnrollEKSClustersRequest, ...grpc.CallOption) (*integrationpb.EnrollEKSClustersResponse, error)
+
updateKube bool
updateDatabase bool
kube types.KubeCluster
@@ -2706,6 +2743,23 @@ func newFakeAccessPoint() *fakeAccessPoint {
}
}
+func (f *fakeAccessPoint) Ping(ctx context.Context) (proto.PingResponse, error) {
+ if f.ping != nil {
+ return f.ping(ctx)
+ }
+ return proto.PingResponse{}, trace.NotImplemented("not implemented")
+}
+
+func (f *fakeAccessPoint) EnrollEKSClusters(ctx context.Context, req *integrationpb.EnrollEKSClustersRequest, _ ...grpc.CallOption) (*integrationpb.EnrollEKSClustersResponse, error) {
+ if f.enrollEKSClusters != nil {
+ return f.enrollEKSClusters(ctx, req)
+ }
+ if f.DiscoveryAccessPoint != nil {
+ return f.DiscoveryAccessPoint.EnrollEKSClusters(ctx, req)
+ }
+ return &integrationpb.EnrollEKSClustersResponse{}, trace.NotImplemented("not implemented")
+}
+
func (f *fakeAccessPoint) GetKubernetesCluster(ctx context.Context, name string) (types.KubeCluster, error) {
return f.kube, nil
}
@@ -2739,6 +2793,9 @@ func (f *fakeAccessPoint) UpsertServerInfo(ctx context.Context, si types.ServerI
}
func (f *fakeAccessPoint) NewWatcher(ctx context.Context, watch types.Watch) (types.Watcher, error) {
+ if f.DiscoveryAccessPoint != nil {
+ return f.DiscoveryAccessPoint.NewWatcher(ctx, watch)
+ }
return newFakeWatcher(), nil
}
diff --git a/lib/srv/discovery/fetchers/eks.go b/lib/srv/discovery/fetchers/eks.go
index 92cfe4051ea54..d702905a73d02 100644
--- a/lib/srv/discovery/fetchers/eks.go
+++ b/lib/srv/discovery/fetchers/eks.go
@@ -60,6 +60,13 @@ type EKSFetcherConfig struct {
// AssumeRole provides a role ARN and ExternalID to assume an AWS role
// when fetching clusters.
AssumeRole types.AssumeRole
+ // Integration is the integration name to be used to fetch credentials.
+ // When present, it will use this integration and discard any local credentials.
+ Integration string
+ // KubeAppDiscovery specifies if Kubernetes App Discovery should be enabled for the
+ // discovered cluster. We don't use this information for fetching itself, but we need it for
+ // correct enrollment of the clusters returned from this fetcher.
+ KubeAppDiscovery bool
// Region is the region where the clusters should be located.
Region string
// FilterLabels are the filter criteria.
@@ -87,6 +94,47 @@ func (c *EKSFetcherConfig) CheckAndSetDefaults() error {
return nil
}
+// MakeEKSFetchersFromAWSMatchers creates fetchers from the provided matchers. Returned fetchers are separated
+// by their reliance on the integration.
+func MakeEKSFetchersFromAWSMatchers(log logrus.FieldLogger, clients cloud.AWSClients, matchers []types.AWSMatcher) (kubeFetchers, kubeIntegrationFetchers []common.Fetcher, _ error) {
+ for _, matcher := range matchers {
+ var matcherAssumeRole types.AssumeRole
+ if matcher.AssumeRole != nil {
+ matcherAssumeRole = *matcher.AssumeRole
+ }
+
+ for _, t := range matcher.Types {
+ for _, region := range matcher.Regions {
+ switch t {
+ case types.AWSMatcherEKS:
+ fetcher, err := NewEKSFetcher(
+ EKSFetcherConfig{
+ EKSClientGetter: clients,
+ AssumeRole: matcherAssumeRole,
+ Region: region,
+ Integration: matcher.Integration,
+ KubeAppDiscovery: matcher.KubeAppDiscovery,
+ FilterLabels: matcher.Tags,
+ Log: log,
+ },
+ )
+ if err != nil {
+ log.WithError(err).Warnf("Could not initialize EKS fetcher(Region=%q, Labels=%q, AssumeRole=%q), skipping.", region, matcher.Tags, matcherAssumeRole.RoleARN)
+ continue
+ }
+
+ if matcher.Integration != "" {
+ kubeIntegrationFetchers = append(kubeIntegrationFetchers, fetcher)
+ } else {
+ kubeFetchers = append(kubeFetchers, fetcher)
+ }
+ }
+ }
+ }
+ }
+ return kubeFetchers, kubeIntegrationFetchers, nil
+}
+
// NewEKSFetcher creates a new EKS fetcher configuration.
func NewEKSFetcher(cfg EKSFetcherConfig) (common.Fetcher, error) {
if err := cfg.CheckAndSetDefaults(); err != nil {
@@ -111,7 +159,7 @@ func (a *eksFetcher) getClient(ctx context.Context) (eksiface.EKSAPI, error) {
a.AssumeRole.RoleARN,
a.AssumeRole.ExternalID,
),
- cloud.WithAmbientCredentials(),
+ cloud.WithCredentialsMaybeIntegration(a.Integration),
)
if err != nil {
return nil, trace.Wrap(err)
@@ -121,14 +169,41 @@ func (a *eksFetcher) getClient(ctx context.Context) (eksiface.EKSAPI, error) {
return a.client, nil
}
+type DiscoveredEKSCluster struct {
+ types.KubeCluster
+
+ Integration string
+ EnableKubeAppDiscovery bool
+}
+
+func (d *DiscoveredEKSCluster) GetIntegration() string {
+ return d.Integration
+}
+
+func (d *DiscoveredEKSCluster) GetKubeAppDiscovery() bool {
+ return d.EnableKubeAppDiscovery
+}
+
+func (d *DiscoveredEKSCluster) GetKubeCluster() types.KubeCluster {
+ return d.KubeCluster
+}
+
func (a *eksFetcher) Get(ctx context.Context) (types.ResourcesWithLabels, error) {
clusters, err := a.getEKSClusters(ctx)
if err != nil {
return nil, trace.Wrap(err)
}
-
a.rewriteKubeClusters(clusters)
- return clusters.AsResources(), nil
+
+ resources := make(types.ResourcesWithLabels, 0, len(clusters))
+ for _, cluster := range clusters {
+ resources = append(resources, &DiscoveredEKSCluster{
+ KubeCluster: cluster,
+ Integration: a.Integration,
+ EnableKubeAppDiscovery: a.KubeAppDiscovery,
+ })
+ }
+ return resources, nil
}
// rewriteKubeClusters rewrites the discovered kube clusters.
diff --git a/lib/srv/discovery/fetchers/eks_test.go b/lib/srv/discovery/fetchers/eks_test.go
index b28dde46f16c3..84a1a3f26f982 100644
--- a/lib/srv/discovery/fetchers/eks_test.go
+++ b/lib/srv/discovery/fetchers/eks_test.go
@@ -112,7 +112,16 @@ func TestEKSFetcher(t *testing.T) {
resources, err := fetcher.Get(context.Background())
require.NoError(t, err)
- require.Equal(t, tt.want.ToMap(), resources.ToMap())
+ clusters := types.ResourcesWithLabels{}
+ for _, r := range resources {
+ if e, ok := r.(*DiscoveredEKSCluster); ok {
+ clusters = append(clusters, e.GetKubeCluster())
+ } else {
+ clusters = append(clusters, r)
+ }
+ }
+
+ require.Equal(t, tt.want.ToMap(), clusters.ToMap())
})
}
}
diff --git a/lib/srv/discovery/kube_integration_watcher.go b/lib/srv/discovery/kube_integration_watcher.go
new file mode 100644
index 0000000000000..1d76220e0bfff
--- /dev/null
+++ b/lib/srv/discovery/kube_integration_watcher.go
@@ -0,0 +1,235 @@
+/*
+ * Teleport
+ * Copyright (C) 2024 Gravitational, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+package discovery
+
+import (
+ "context"
+ "errors"
+ "slices"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/gravitational/trace"
+
+ integrationv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/integration/v1"
+ "github.com/gravitational/teleport/api/types"
+ "github.com/gravitational/teleport/lib/automaticupgrades"
+ "github.com/gravitational/teleport/lib/automaticupgrades/version"
+ "github.com/gravitational/teleport/lib/srv/discovery/common"
+)
+
+func (s *Server) startKubeIntegrationWatchers() error {
+ if s.dynamicMatcherWatcher == nil {
+ return nil
+ }
+
+ var mu sync.Mutex
+ // enrollingClusters keeps track of clusters that are in the process of being enrolled, so they are
+ // not yet among existing clusters, but we also should not try to enroll them again.
+ enrollingClusters := map[string]bool{}
+
+ clt := s.AccessPoint
+
+ releaseChannels := automaticupgrades.Channels{}
+ if err := releaseChannels.CheckAndSetDefaults(s.ClusterFeatures()); err != nil {
+ return trace.Wrap(err)
+ }
+
+ watcher, err := common.NewWatcher(s.ctx, common.WatcherConfig{
+ FetchersFn: s.getKubeIntegrationFetchers,
+ Log: s.Log.WithField("kind", types.KindKubernetesCluster),
+ DiscoveryGroup: s.DiscoveryGroup,
+ Interval: s.PollInterval,
+ Origin: types.OriginCloud,
+ })
+ if err != nil {
+ return trace.Wrap(err)
+ }
+ go watcher.Start()
+
+ go func() {
+ for {
+ select {
+ case resources := <-watcher.ResourcesC():
+ if len(resources) == 0 {
+ continue
+ }
+
+ existingServers, err := clt.GetKubernetesServers(s.ctx)
+ if err != nil {
+ s.Log.WithError(err).Warn("Failed to get Kubernetes servers from cache.")
+ continue
+ }
+
+ existingClusters, err := clt.GetKubernetesClusters(s.ctx)
+ if err != nil {
+ s.Log.WithError(err).Warn("Failed to get Kubernetes clusters from cache.")
+ continue
+ }
+
+ var newClusters []types.DiscoveredEKSCluster
+ mu.Lock()
+ for _, r := range resources {
+ newCluster, ok := r.(types.DiscoveredEKSCluster)
+ if !ok ||
+ enrollingClusters[newCluster.GetAWSConfig().Name] ||
+ slices.ContainsFunc(existingServers, func(c types.KubeServer) bool { return c.GetName() == newCluster.GetName() }) ||
+ slices.ContainsFunc(existingClusters, func(c types.KubeCluster) bool { return c.GetName() == newCluster.GetName() }) {
+
+ continue
+ }
+
+ newClusters = append(newClusters, newCluster)
+ }
+ mu.Unlock()
+
+ if len(newClusters) == 0 {
+ continue
+ }
+
+ agentVersion, err := s.getKubeAgentVersion(releaseChannels)
+ if err != nil {
+ s.Log.WithError(err).Warn("Could not get agent version to enroll EKS clusters")
+ continue
+ }
+
+ // When enrolling EKS clusters, client for enrollment depends on the region and integration used.
+ // When enrolling EKS clusters, client for enrollment depends on the region and integration used.
+ type regionIntegrationMapKey struct {
+ region string
+ integration string
+ }
+ clustersByRegionAndIntegration := map[regionIntegrationMapKey][]types.DiscoveredEKSCluster{}
+ for _, c := range newClusters {
+ mapKey := regionIntegrationMapKey{
+ region: c.GetAWSConfig().Region,
+ integration: c.GetIntegration(),
+ }
+ clustersByRegionAndIntegration[mapKey] = append(clustersByRegionAndIntegration[mapKey], c)
+
+ }
+
+ for key, val := range clustersByRegionAndIntegration {
+ key, val := key, val
+ go s.enrollEKSClusters(key.region, key.integration, val, agentVersion, &mu, enrollingClusters)
+ }
+
+ case <-s.ctx.Done():
+ return
+ }
+ }
+ }()
+ return nil
+}
+
+func (s *Server) enrollEKSClusters(region, integration string, clusters []types.DiscoveredEKSCluster, agentVersion string, mu *sync.Mutex, enrollingClusters map[string]bool) {
+ mu.Lock()
+ for _, c := range clusters {
+ if _, ok := enrollingClusters[c.GetAWSConfig().Name]; !ok {
+ enrollingClusters[c.GetAWSConfig().Name] = true
+ }
+ }
+ mu.Unlock()
+ defer func() {
+ // Clear enrolling clusters in the end.
+ mu.Lock()
+ for _, c := range clusters {
+ delete(enrollingClusters, c.GetAWSConfig().Name)
+ }
+ mu.Unlock()
+ }()
+
+ // We sort input clusters into two batches - one that has Kubernetes App Discovery
+ // enabled, and another one that doesn't have it.
+ var batchedClusters = map[bool][]types.DiscoveredEKSCluster{}
+ for _, c := range clusters {
+ batchedClusters[c.GetKubeAppDiscovery()] = append(batchedClusters[c.GetKubeAppDiscovery()], c)
+ }
+ ctx, cancel := context.WithTimeout(s.ctx, time.Duration(len(clusters))*30*time.Second)
+ defer cancel()
+ var clusterNames []string
+
+ for _, kubeAppDiscovery := range []bool{true, false} {
+ for _, c := range batchedClusters[kubeAppDiscovery] {
+ clusterNames = append(clusterNames, c.GetAWSConfig().Name)
+ }
+ if len(clusterNames) == 0 {
+ continue
+ }
+
+ rsp, err := s.AccessPoint.EnrollEKSClusters(ctx, &integrationv1.EnrollEKSClustersRequest{
+ Integration: integration,
+ Region: region,
+ EksClusterNames: clusterNames,
+ EnableAppDiscovery: kubeAppDiscovery,
+ AgentVersion: agentVersion,
+ })
+ if err != nil {
+ s.Log.WithError(err).Errorf("failed to enroll EKS clusters %v", clusterNames)
+ continue
+ }
+
+ for _, r := range rsp.Results {
+ if r.Error != "" {
+ if !strings.Contains(r.Error, "teleport-kube-agent is already installed on the cluster") {
+ s.Log.WithError(err).Errorf("failed to enroll EKS cluster %q", r.EksClusterName)
+ } else {
+ s.Log.Debugf("EKS cluster %q already has installed kube agent", r.EksClusterName)
+ }
+ } else {
+ s.Log.Infof("successfully enrolled EKS cluster %q", r.EksClusterName)
+ }
+ }
+ }
+}
+
+func (s *Server) getKubeAgentVersion(releaseChannels automaticupgrades.Channels) (string, error) {
+ pingResponse, err := s.AccessPoint.Ping(s.ctx)
+ if err != nil {
+ return "", trace.Wrap(err)
+ }
+ agentVersion := pingResponse.ServerVersion
+
+ clusterFeatures := s.ClusterFeatures()
+ if clusterFeatures.GetAutomaticUpgrades() {
+ defaultVersion, err := releaseChannels.DefaultVersion(s.ctx)
+ if err == nil {
+ agentVersion = defaultVersion
+ } else if !errors.Is(err, &version.NoNewVersionError{}) {
+ return "", trace.Wrap(err)
+ }
+ }
+
+ return strings.TrimPrefix(agentVersion, "v"), nil
+}
+
+func (s *Server) getKubeIntegrationFetchers() []common.Fetcher {
+ var kubeFetchers []common.Fetcher
+
+ s.muDynamicKubeIntegrationFetchers.RLock()
+ for _, fetcherSet := range s.dynamicKubeIntegrationFetchers {
+ kubeFetchers = append(kubeFetchers, fetcherSet...)
+ }
+ s.muDynamicKubeIntegrationFetchers.RUnlock()
+
+ s.submitFetchersEvent(kubeFetchers)
+
+ return kubeFetchers
+}
diff --git a/lib/srv/discovery/kube_integration_watcher_test.go b/lib/srv/discovery/kube_integration_watcher_test.go
new file mode 100644
index 0000000000000..579bf239e6001
--- /dev/null
+++ b/lib/srv/discovery/kube_integration_watcher_test.go
@@ -0,0 +1,570 @@
+/*
+ * Teleport
+ * Copyright (C) 2024 Gravitational, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+package discovery
+
+import (
+ "context"
+ "testing"
+ "time"
+
+ "github.com/aws/aws-sdk-go-v2/aws"
+ "github.com/aws/aws-sdk-go-v2/service/eks"
+ eksTypes "github.com/aws/aws-sdk-go-v2/service/eks/types"
+ "github.com/aws/aws-sdk-go-v2/service/sts"
+ eksV1 "github.com/aws/aws-sdk-go/service/eks"
+ "github.com/google/uuid"
+ "github.com/gravitational/trace"
+ "github.com/sirupsen/logrus"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "google.golang.org/grpc"
+ "k8s.io/cli-runtime/pkg/genericclioptions"
+ "k8s.io/client-go/kubernetes/fake"
+
+ "github.com/gravitational/teleport/api/client/proto"
+ integrationpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/integration/v1"
+ "github.com/gravitational/teleport/api/types"
+ "github.com/gravitational/teleport/api/types/discoveryconfig"
+ "github.com/gravitational/teleport/api/types/header"
+ "github.com/gravitational/teleport/lib/auth"
+ "github.com/gravitational/teleport/lib/authz"
+ "github.com/gravitational/teleport/lib/automaticupgrades"
+ "github.com/gravitational/teleport/lib/cloud"
+ "github.com/gravitational/teleport/lib/cloud/mocks"
+ "github.com/gravitational/teleport/lib/integrations/awsoidc"
+ "github.com/gravitational/teleport/lib/services"
+ "github.com/gravitational/teleport/lib/srv/discovery/common"
+)
+
+func TestGetAgentVersion(t *testing.T) {
+ t.Parallel()
+
+ ctx := context.Background()
+
+ testCases := []struct {
+ desc string
+ ping func(ctx context.Context) (proto.PingResponse, error)
+ clusterFeatures proto.Features
+ channelVersion string
+ expectedVersion string
+ errorAssert require.ErrorAssertionFunc
+ }{
+ {
+ desc: "ping error",
+ ping: func(ctx context.Context) (proto.PingResponse, error) {
+ return proto.PingResponse{}, trace.BadParameter("ping error")
+ },
+ expectedVersion: "",
+ errorAssert: require.Error,
+ },
+ {
+ desc: "no automatic upgrades",
+ ping: func(ctx context.Context) (proto.PingResponse, error) {
+ return proto.PingResponse{ServerVersion: "1.2.3"}, nil
+ },
+ expectedVersion: "1.2.3",
+ errorAssert: require.NoError,
+ },
+ {
+ desc: "automatic upgrades",
+ ping: func(ctx context.Context) (proto.PingResponse, error) {
+ return proto.PingResponse{ServerVersion: "10"}, nil
+ },
+ clusterFeatures: proto.Features{AutomaticUpgrades: true},
+ channelVersion: "v1.2.3",
+ expectedVersion: "1.2.3",
+ errorAssert: require.NoError,
+ },
+ }
+
+ for _, tt := range testCases {
+ t.Run(tt.desc, func(t *testing.T) {
+ server := Server{
+ ctx: ctx,
+ Config: &Config{
+ AccessPoint: &fakeAccessPoint{ping: tt.ping},
+ ClusterFeatures: func() proto.Features {
+ return tt.clusterFeatures
+ },
+ },
+ }
+
+ var channel *automaticupgrades.Channel
+ if tt.channelVersion != "" {
+ channel = &automaticupgrades.Channel{StaticVersion: tt.channelVersion}
+ err := channel.CheckAndSetDefaults()
+ require.NoError(t, err)
+ }
+ releaseChannels := automaticupgrades.Channels{automaticupgrades.DefaultChannelName: channel}
+
+ version, err := server.getKubeAgentVersion(releaseChannels)
+
+ tt.errorAssert(t, err)
+ require.Equal(t, tt.expectedVersion, version)
+ })
+ }
+}
+
+func TestDiscoveryKubeIntegrationEKS(t *testing.T) {
+ const (
+ mainDiscoveryGroup = "main"
+ awsAccountID = "880713328506"
+ awsUserID = "AIDAJQABLZS4A3QDU576Q"
+ roleArn = "arn:aws:sts::880713328506:assumed-role/TeleportRole/1404549515185351000"
+ testCAData = "VGVzdENBREFUQQ=="
+ )
+
+ testEKSClusters := []eksTypes.Cluster{
+ {
+ Name: aws.String("eks-cluster1"),
+ Arn: aws.String("arn:aws:eks:eu-west-1:accountID:cluster/cluster1"),
+ Tags: map[string]string{"env": "prod", "location": "eu-west-1"},
+ CertificateAuthority: &eksTypes.Certificate{Data: aws.String(testCAData)},
+ Status: eksTypes.ClusterStatusActive,
+ },
+ {
+ Name: aws.String("eks-cluster2"),
+ Arn: aws.String("arn:aws:eks:eu-west-1:accountID:cluster/cluster2"),
+ Tags: map[string]string{"env": "prod", "location": "eu-west-1"},
+ CertificateAuthority: &eksTypes.Certificate{Data: aws.String(testCAData)},
+ Status: eksTypes.ClusterStatusActive,
+ },
+ }
+
+ getDc := func() *discoveryconfig.DiscoveryConfig {
+ dc, _ := discoveryconfig.NewDiscoveryConfig(
+ header.Metadata{Name: uuid.NewString()},
+ discoveryconfig.Spec{
+ DiscoveryGroup: mainDiscoveryGroup,
+ AWS: []types.AWSMatcher{
+ {
+ Types: []string{types.AWSMatcherEKS},
+ Regions: []string{"eu-west-1"},
+ Integration: "integration1",
+ },
+ },
+ },
+ )
+ return dc
+ }
+
+ clusterFinder := func(clusterName string) *eksTypes.Cluster {
+ for _, c := range testEKSClusters {
+ if aws.ToString(c.Name) == clusterName {
+ return &c
+ }
+ }
+ return nil
+ }
+ clusterUpserter := func(ctx context.Context, authServer *auth.Server, request *integrationpb.EnrollEKSClustersRequest) (*integrationpb.EnrollEKSClustersResponse, error) {
+ response := &integrationpb.EnrollEKSClustersResponse{}
+ for _, c := range request.EksClusterNames {
+ eksCluster := clusterFinder(c)
+ if eksCluster == nil {
+ response.Results = append(response.Results, &integrationpb.EnrollEKSClusterResult{
+ EksClusterName: c,
+ Error: "not found",
+ })
+ continue
+ }
+
+ kubeServer := mustConvertEKSToKubeServerV2(t, eksCluster, "resourceID", mainDiscoveryGroup)
+
+ _, err := authServer.UpsertKubernetesServer(ctx, kubeServer)
+ if err != nil {
+ return nil, err
+ }
+ assert.NoError(t, err)
+
+ response.Results = append(response.Results, &integrationpb.EnrollEKSClusterResult{
+ EksClusterName: c,
+ ResourceId: "resourceID",
+ })
+ }
+ return response, nil
+ }
+
+ testCases := []struct {
+ name string
+ existingKubeClusters []types.KubeCluster
+ existingKubeServers []types.KubeServer
+ awsMatchers []types.AWSMatcher
+ expectedServersToExistInAuth []types.KubeServer
+ accessPoint func(*testing.T, *auth.Server, auth.ClientI) auth.DiscoveryAccessPoint
+ discoveryConfig func(*testing.T) *discoveryconfig.DiscoveryConfig
+ }{
+ {
+ name: "no clusters in auth server, discover two clusters from EKS",
+ discoveryConfig: func(t *testing.T) *discoveryconfig.DiscoveryConfig {
+ return getDc()
+ },
+ accessPoint: func(t *testing.T, authServer *auth.Server, authClient auth.ClientI) auth.DiscoveryAccessPoint {
+ return &accessPointWrapper{
+ DiscoveryAccessPoint: getDiscoveryAccessPoint(authServer, authClient),
+ enrollEKSClusters: func(ctx context.Context, request *integrationpb.EnrollEKSClustersRequest, _ ...grpc.CallOption) (*integrationpb.EnrollEKSClustersResponse, error) {
+ response, err := clusterUpserter(ctx, authServer, request)
+ assert.NoError(t, err)
+ return response, err
+ },
+ }
+ },
+ awsMatchers: []types.AWSMatcher{
+ {
+ Types: []string{"eks"},
+ Regions: []string{"eu-west-1"},
+ Integration: "integration1",
+ },
+ },
+ expectedServersToExistInAuth: []types.KubeServer{
+ mustConvertEKSToKubeServerV1(t, eksMockClusters[0], "resourceID", mainDiscoveryGroup),
+ mustConvertEKSToKubeServerV1(t, eksMockClusters[1], "resourceID", mainDiscoveryGroup),
+ },
+ },
+ {
+ name: "one cluster in auth server, discover one cluster from EKS and ignore another one",
+ existingKubeServers: []types.KubeServer{mustConvertEKSToKubeServerV1(t, eksMockClusters[0], "resourceID", mainDiscoveryGroup)},
+ discoveryConfig: func(t *testing.T) *discoveryconfig.DiscoveryConfig {
+ return getDc()
+ },
+ accessPoint: func(t *testing.T, authServer *auth.Server, authClient auth.ClientI) auth.DiscoveryAccessPoint {
+ return &accessPointWrapper{
+ DiscoveryAccessPoint: getDiscoveryAccessPoint(authServer, authClient),
+ enrollEKSClusters: func(ctx context.Context, request *integrationpb.EnrollEKSClustersRequest, _ ...grpc.CallOption) (*integrationpb.EnrollEKSClustersResponse, error) {
+ assert.Len(t, request.EksClusterNames, 1)
+
+ response, err := clusterUpserter(ctx, authServer, request)
+ assert.NoError(t, err)
+ return response, err
+ },
+ }
+ },
+ awsMatchers: []types.AWSMatcher{
+ {
+ Types: []string{"eks"},
+ Regions: []string{"eu-west-1"},
+ Integration: "integration1",
+ },
+ },
+ expectedServersToExistInAuth: []types.KubeServer{
+ mustConvertEKSToKubeServerV1(t, eksMockClusters[0], "resourceID", mainDiscoveryGroup),
+ mustConvertEKSToKubeServerV1(t, eksMockClusters[1], "resourceID", mainDiscoveryGroup),
+ },
+ },
+ {
+ name: "one non-matching cluster in auth server, discover two cluster from EKS",
+ existingKubeServers: []types.KubeServer{mustConvertEKSToKubeServerV1(t, eksMockClusters[2], "resourceID", mainDiscoveryGroup)},
+ discoveryConfig: func(t *testing.T) *discoveryconfig.DiscoveryConfig {
+ return getDc()
+ },
+ accessPoint: func(t *testing.T, authServer *auth.Server, authClient auth.ClientI) auth.DiscoveryAccessPoint {
+ return &accessPointWrapper{
+ DiscoveryAccessPoint: getDiscoveryAccessPoint(authServer, authClient),
+ enrollEKSClusters: func(ctx context.Context, request *integrationpb.EnrollEKSClustersRequest, _ ...grpc.CallOption) (*integrationpb.EnrollEKSClustersResponse, error) {
+ assert.Len(t, request.EksClusterNames, 2)
+
+ response, err := clusterUpserter(ctx, authServer, request)
+ assert.NoError(t, err)
+ return response, err
+ },
+ }
+ },
+ awsMatchers: []types.AWSMatcher{
+ {
+ Types: []string{"eks"},
+ Regions: []string{"eu-west-1"},
+ Integration: "integration1",
+ },
+ },
+ expectedServersToExistInAuth: []types.KubeServer{
+ mustConvertEKSToKubeServerV1(t, eksMockClusters[0], "resourceID", mainDiscoveryGroup),
+ mustConvertEKSToKubeServerV1(t, eksMockClusters[1], "resourceID", mainDiscoveryGroup),
+ mustConvertEKSToKubeServerV1(t, eksMockClusters[2], "resourceID", mainDiscoveryGroup),
+ },
+ },
+ }
+
+ for _, tc := range testCases {
+ tc := tc
+ t.Run(tc.name, func(t *testing.T) {
+ t.Parallel()
+
+ testCloudClients := &cloud.TestCloudClients{
+ STS: &mocks.STSMock{},
+ EKS: &mockEKSAPI{
+ clusters: eksMockClusters[:2],
+ },
+ }
+
+ ctx := context.Background()
+ // Create and start test auth server.
+ testAuthServer, err := auth.NewTestAuthServer(auth.TestAuthServerConfig{
+ Dir: t.TempDir(),
+ })
+ require.NoError(t, err)
+ t.Cleanup(func() { require.NoError(t, testAuthServer.Close()) })
+
+ tlsServer, err := testAuthServer.NewTestTLSServer()
+ require.NoError(t, err)
+ t.Cleanup(func() { require.NoError(t, tlsServer.Close()) })
+
+ // Auth client for discovery service.
+ identity := auth.TestServerID(types.RoleDiscovery, "hostID")
+ authClient, err := tlsServer.NewClient(identity)
+ require.NoError(t, err)
+ t.Cleanup(func() { require.NoError(t, authClient.Close()) })
+
+ integration, err := types.NewIntegrationAWSOIDC(
+ types.Metadata{Name: "integration1"},
+ &types.AWSOIDCIntegrationSpecV1{
+ RoleARN: "arn:aws:iam::123456789012:role/IntegrationRole",
+ },
+ )
+ require.NoError(t, err)
+
+ testAuthServer.AuthServer.IntegrationsTokenGenerator = &mockIntegrationsTokenGenerator{
+ proxies: nil,
+ integrations: map[string]types.Integration{
+ integration.GetName(): integration,
+ },
+ }
+
+ _, err = tlsServer.Auth().CreateIntegration(ctx, integration)
+ require.NoError(t, err)
+
+ for _, kubeCluster := range tc.existingKubeClusters {
+ err := tlsServer.Auth().CreateKubernetesCluster(ctx, kubeCluster)
+ require.NoError(t, err)
+ }
+
+ for _, kubeServer := range tc.existingKubeServers {
+ _, err := tlsServer.Auth().UpsertKubernetesServer(ctx, kubeServer)
+ require.NoError(t, err)
+ }
+
+ reporter := &mockUsageReporter{}
+
+ tlsServer.Auth().SetUsageReporter(reporter)
+ discServer, err := New(
+ authz.ContextWithUser(ctx, identity.I),
+ &Config{
+ CloudClients: testCloudClients,
+ ClusterFeatures: func() proto.Features { return proto.Features{} },
+ KubernetesClient: fake.NewSimpleClientset(),
+ AccessPoint: tc.accessPoint(t, tlsServer.Auth(), authClient),
+ Matchers: Matchers{
+ AWS: tc.awsMatchers,
+ },
+ Emitter: authClient,
+ Log: logrus.New(),
+ DiscoveryGroup: mainDiscoveryGroup,
+ })
+
+ require.NoError(t, err)
+
+ if tc.discoveryConfig != nil {
+ dc := tc.discoveryConfig(t)
+ _, err := tlsServer.Auth().DiscoveryConfigClient().CreateDiscoveryConfig(ctx, dc)
+ require.NoError(t, err)
+
+ // Wait for the DiscoveryConfig to be added to the dynamic fetchers
+ require.Eventually(t, func() bool {
+ discServer.muDynamicKubeIntegrationFetchers.RLock()
+ defer discServer.muDynamicKubeIntegrationFetchers.RUnlock()
+ return len(discServer.dynamicKubeIntegrationFetchers) > 0
+ }, 1*time.Second, 100*time.Millisecond)
+ }
+
+ t.Cleanup(func() {
+ discServer.Stop()
+ })
+ go discServer.Start()
+
+ require.Eventually(t, func() bool {
+ kubeServers, err := tlsServer.Auth().GetKubernetesServers(ctx)
+ require.NoError(t, err)
+
+ if len(kubeServers) == len(tc.expectedServersToExistInAuth) {
+ k1 := types.KubeServers(kubeServers).ToMap()
+ k2 := types.KubeServers(tc.expectedServersToExistInAuth).ToMap()
+ for k := range k1 {
+ if services.CompareResources(k1[k], k2[k]) != services.Equal {
+ return false
+ }
+ }
+ return true
+ }
+
+ return false
+ }, 315*time.Second, 200*time.Millisecond)
+ })
+ }
+}
+
+func mustConvertEKSToKubeServerV1(t *testing.T, eksCluster *eksV1.Cluster, resourceID, discoveryGroup string) types.KubeServer {
+ eksCluster.Tags[types.OriginLabel] = aws.String(types.OriginCloud)
+ eksCluster.Tags[types.InternalResourceIDLabel] = aws.String(resourceID)
+
+ kubeCluster, err := services.NewKubeClusterFromAWSEKS(aws.ToString(eksCluster.Name), aws.ToString(eksCluster.Arn), eksCluster.Tags)
+ assert.NoError(t, err)
+
+ kubeClusterV3 := kubeCluster.(*types.KubernetesClusterV3)
+ common.ApplyEKSNameSuffix(kubeClusterV3)
+ kubeServer, err := types.NewKubernetesServerV3FromCluster(kubeClusterV3, "host", "uuid")
+ assert.NoError(t, err)
+
+ return kubeServer
+}
+
+func mustConvertEKSToKubeServerV2(t *testing.T, eksCluster *eksTypes.Cluster, resourceID, discoveryGroup string) types.KubeServer {
+ eksTags := make(map[string]*string, len(eksCluster.Tags))
+ for k, v := range eksCluster.Tags {
+ eksTags[k] = aws.String(v)
+ }
+ eksTags[types.OriginLabel] = aws.String(types.OriginCloud)
+ eksTags[types.InternalResourceIDLabel] = aws.String(resourceID)
+
+ kubeCluster, err := services.NewKubeClusterFromAWSEKS(aws.ToString(eksCluster.Name), aws.ToString(eksCluster.Arn), eksTags)
+ assert.NoError(t, err)
+
+ kubeClusterV3 := kubeCluster.(*types.KubernetesClusterV3)
+ common.ApplyEKSNameSuffix(kubeClusterV3)
+ kubeServer, err := types.NewKubernetesServerV3FromCluster(kubeClusterV3, "host", "uuid")
+ assert.NoError(t, err)
+
+ return kubeServer
+}
+
+type accessPointWrapper struct {
+ auth.DiscoveryAccessPoint
+
+ enrollEKSClusters func(context.Context, *integrationpb.EnrollEKSClustersRequest, ...grpc.CallOption) (*integrationpb.EnrollEKSClustersResponse, error)
+}
+
+func (a *accessPointWrapper) EnrollEKSClusters(ctx context.Context, req *integrationpb.EnrollEKSClustersRequest, _ ...grpc.CallOption) (*integrationpb.EnrollEKSClustersResponse, error) {
+ if a.enrollEKSClusters != nil {
+ return a.enrollEKSClusters(ctx, req)
+ }
+ if a.DiscoveryAccessPoint != nil {
+ return a.DiscoveryAccessPoint.EnrollEKSClusters(ctx, req)
+ }
+ return &integrationpb.EnrollEKSClustersResponse{}, trace.NotImplemented("not implemented")
+}
+
+type mockIntegrationsTokenGenerator struct {
+ proxies []types.Server
+ integrations map[string]types.Integration
+ tokenCallsCount int
+}
+
+// GetIntegration returns the specified integration resources.
+func (m *mockIntegrationsTokenGenerator) GetIntegration(ctx context.Context, name string) (types.Integration, error) {
+ if ig, found := m.integrations[name]; found {
+ return ig, nil
+ }
+
+ return nil, trace.NotFound("integration not found")
+}
+
+// GetProxies returns a list of registered proxies.
+func (m *mockIntegrationsTokenGenerator) GetProxies() ([]types.Server, error) {
+ return m.proxies, nil
+}
+
+// GenerateAWSOIDCToken generates a token to be used to execute an AWS OIDC Integration action.
+func (m *mockIntegrationsTokenGenerator) GenerateAWSOIDCToken(ctx context.Context) (string, error) {
+ m.tokenCallsCount++
+ return uuid.NewString(), nil
+}
+
+type mockEnrollEKSClusterClient struct {
+ createAccessEntry func(context.Context, *eks.CreateAccessEntryInput, ...func(*eks.Options)) (*eks.CreateAccessEntryOutput, error)
+ associateAccessPolicy func(context.Context, *eks.AssociateAccessPolicyInput, ...func(*eks.Options)) (*eks.AssociateAccessPolicyOutput, error)
+ listAccessEntries func(context.Context, *eks.ListAccessEntriesInput, ...func(*eks.Options)) (*eks.ListAccessEntriesOutput, error)
+ deleteAccessEntry func(context.Context, *eks.DeleteAccessEntryInput, ...func(*eks.Options)) (*eks.DeleteAccessEntryOutput, error)
+ describeCluster func(context.Context, *eks.DescribeClusterInput, ...func(*eks.Options)) (*eks.DescribeClusterOutput, error)
+ getCallerIdentity func(context.Context, *sts.GetCallerIdentityInput, ...func(*sts.Options)) (*sts.GetCallerIdentityOutput, error)
+ checkAgentAlreadyInstalled func(context.Context, genericclioptions.RESTClientGetter, logrus.FieldLogger) (bool, error)
+ installKubeAgent func(context.Context, *eksTypes.Cluster, string, string, string, genericclioptions.RESTClientGetter, logrus.FieldLogger, awsoidc.EnrollEKSClustersRequest) error
+ createToken func(context.Context, types.ProvisionToken) error
+}
+
+func (m *mockEnrollEKSClusterClient) CreateAccessEntry(ctx context.Context, params *eks.CreateAccessEntryInput, optFns ...func(*eks.Options)) (*eks.CreateAccessEntryOutput, error) {
+ if m.createAccessEntry != nil {
+ return m.createAccessEntry(ctx, params, optFns...)
+ }
+ return &eks.CreateAccessEntryOutput{}, nil
+}
+
+func (m *mockEnrollEKSClusterClient) AssociateAccessPolicy(ctx context.Context, params *eks.AssociateAccessPolicyInput, optFns ...func(*eks.Options)) (*eks.AssociateAccessPolicyOutput, error) {
+ if m.associateAccessPolicy != nil {
+ return m.associateAccessPolicy(ctx, params, optFns...)
+ }
+ return &eks.AssociateAccessPolicyOutput{}, nil
+}
+
+func (m *mockEnrollEKSClusterClient) ListAccessEntries(ctx context.Context, params *eks.ListAccessEntriesInput, optFns ...func(*eks.Options)) (*eks.ListAccessEntriesOutput, error) {
+ if m.listAccessEntries != nil {
+ return m.listAccessEntries(ctx, params, optFns...)
+ }
+ return &eks.ListAccessEntriesOutput{}, nil
+}
+
+func (m *mockEnrollEKSClusterClient) DeleteAccessEntry(ctx context.Context, params *eks.DeleteAccessEntryInput, optFns ...func(*eks.Options)) (*eks.DeleteAccessEntryOutput, error) {
+ if m.deleteAccessEntry != nil {
+ return m.deleteAccessEntry(ctx, params, optFns...)
+ }
+ return &eks.DeleteAccessEntryOutput{}, nil
+}
+
+func (m *mockEnrollEKSClusterClient) DescribeCluster(ctx context.Context, params *eks.DescribeClusterInput, optFns ...func(*eks.Options)) (*eks.DescribeClusterOutput, error) {
+ if m.describeCluster != nil {
+ return m.describeCluster(ctx, params, optFns...)
+ }
+ return &eks.DescribeClusterOutput{}, nil
+}
+
+func (m *mockEnrollEKSClusterClient) GetCallerIdentity(ctx context.Context, params *sts.GetCallerIdentityInput, optFns ...func(*sts.Options)) (*sts.GetCallerIdentityOutput, error) {
+ if m.getCallerIdentity != nil {
+ return m.getCallerIdentity(ctx, params, optFns...)
+ }
+ return &sts.GetCallerIdentityOutput{}, nil
+}
+
+func (m *mockEnrollEKSClusterClient) CheckAgentAlreadyInstalled(ctx context.Context, kubeconfig genericclioptions.RESTClientGetter, log logrus.FieldLogger) (bool, error) {
+ if m.checkAgentAlreadyInstalled != nil {
+ return m.checkAgentAlreadyInstalled(ctx, kubeconfig, log)
+ }
+ return false, nil
+}
+
+func (m *mockEnrollEKSClusterClient) InstallKubeAgent(ctx context.Context, eksCluster *eksTypes.Cluster, proxyAddr, joinToken, resourceId string, kubeconfig genericclioptions.RESTClientGetter, log logrus.FieldLogger, req awsoidc.EnrollEKSClustersRequest) error {
+ if m.installKubeAgent != nil {
+ return m.installKubeAgent(ctx, eksCluster, proxyAddr, joinToken, resourceId, kubeconfig, log, req)
+ }
+ return nil
+}
+
+func (m *mockEnrollEKSClusterClient) CreateToken(ctx context.Context, token types.ProvisionToken) error {
+ if m.createToken != nil {
+ return m.createToken(ctx, token)
+ }
+ return nil
+}
+
+var _ awsoidc.EnrollEKSCLusterClient = &mockEnrollEKSClusterClient{}
diff --git a/lib/srv/discovery/kube_watcher.go b/lib/srv/discovery/kube_watcher.go
index e81547831a83a..9274dc30c5e42 100644
--- a/lib/srv/discovery/kube_watcher.go
+++ b/lib/srv/discovery/kube_watcher.go
@@ -87,12 +87,14 @@ func (s *Server) startKubeWatchers() error {
case newResources := <-watcher.ResourcesC():
clusters := make([]types.KubeCluster, 0, len(newResources))
for _, r := range newResources {
- cluster, ok := r.(types.KubeCluster)
- if !ok {
+ if cluster, ok := r.(types.DiscoveredEKSCluster); ok {
+ clusters = append(clusters, cluster.GetKubeCluster())
+ continue
+ }
+ if cluster, ok := r.(types.KubeCluster); ok {
+ clusters = append(clusters, cluster)
continue
}
-
- clusters = append(clusters, cluster)
}
mu.Lock()
kubeResources = clusters