Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
28e9aa0
AWSOIDC: Move enrollment of EKS clusters to the gRPC service.
AntonAM Feb 28, 2024
050b344
Fix after rebase.
AntonAM Feb 28, 2024
b3571c9
Add EKS auto discovery for dynamic integration based fetchers.
AntonAM Feb 27, 2024
2e32830
Remove commented code.
AntonAM Feb 27, 2024
7fb9a96
Remove unneeded code.
AntonAM Feb 27, 2024
1cdb0c5
Add missing godoc.
AntonAM Feb 27, 2024
40383d1
Initiate map with length.
AntonAM Feb 27, 2024
0c624dd
Fix typo in the interface name.
AntonAM Feb 27, 2024
0db9fd4
Introduce iam join method for EKS enrollment.
AntonAM Feb 27, 2024
34d7a7c
Correctly process DiscoveredEKSCluster in kube watcher.
AntonAM Feb 27, 2024
f7e57ea
Remove remaining CreateToken related code from Discovery service.
AntonAM Feb 27, 2024
9476f11
Change var initialization.
AntonAM Feb 27, 2024
b980fcb
Remove dev version detection for EKS enrollment.
AntonAM Feb 27, 2024
c683809
Revert "Introduce iam join method for EKS enrollment."
AntonAM Feb 27, 2024
c2b05a0
Fix usage of a missing function.
AntonAM Feb 27, 2024
38066a4
Refactor cluster features to be a function.
AntonAM Feb 27, 2024
7f33e38
Change discovery to use gRPC call for EKS enrollment.
AntonAM Feb 28, 2024
2729ad8
Remove unused mock.
AntonAM Feb 28, 2024
ae6b4f2
Correct a comment.
AntonAM Feb 28, 2024
caa8132
Remove token creation permission from Discovery service.
AntonAM Feb 28, 2024
960584e
Fix after rebase.
AntonAM Feb 28, 2024
dabe512
Improve sorting clusters by region and integration.
AntonAM Feb 28, 2024
1d1bee5
Return and error if cluster features are missing.
AntonAM Feb 28, 2024
9597513
Fix tests.
AntonAM Feb 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions api/types/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,18 @@ type KubeCluster interface {
GetCloud() string
}

// DiscoveredEKSCluster represents a server discovered by EKS discovery fetchers.
type DiscoveredEKSCluster interface {
Comment thread
AntonAM marked this conversation as resolved.
Outdated
// KubeCluster is base discovered cluster.
KubeCluster
// GetKubeCluster returns base cluster.
GetKubeCluster() KubeCluster
// GetIntegration returns integration name used when discovering this cluster.
GetIntegration() string
// GetKubeAppDiscovery returns setting showing if Kubernetes App Discovery show be enabled for the discovered cluster.
GetKubeAppDiscovery() bool
}

// NewKubernetesClusterV3FromLegacyCluster creates a new Kubernetes cluster resource
// from the legacy type.
func NewKubernetesClusterV3FromLegacyCluster(namespace string, cluster *KubernetesCluster) (*KubernetesClusterV3, error) {
Expand Down
9 changes: 9 additions & 0 deletions api/types/kubernetes_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,15 @@ func (s KubeServers) Less(i, j int) bool {
// Swap swaps two kube servers.
func (s KubeServers) Swap(i, j int) { s[i], s[j] = s[j], s[i] }

// ToMap returns these kubernetes clusters as a map keyed by cluster name.
func (s KubeServers) ToMap() map[string]KubeServer {
m := make(map[string]KubeServer, len(s))
for _, kubeServer := range s {
m[kubeServer.GetName()] = kubeServer
}
return m
}

// SortByCustom custom sorts by given sort criteria.
func (s KubeServers) SortByCustom(sortBy SortBy) error {
if sortBy.Field == "" {
Expand Down
20 changes: 20 additions & 0 deletions lib/auth/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ import (
"time"

"github.com/gravitational/trace"
"google.golang.org/grpc"

"github.com/gravitational/teleport/api/client/proto"
integrationpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/integration/v1"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/api/types/accesslist"
"github.com/gravitational/teleport/api/types/discoveryconfig"
Expand Down Expand Up @@ -697,6 +699,8 @@ type ReadDiscoveryAccessPoint interface {
GetKubernetesCluster(ctx context.Context, name string) (types.KubeCluster, error)
// GetKubernetesClusters returns all kubernetes cluster resources.
GetKubernetesClusters(ctx context.Context) ([]types.KubeCluster, error)
// GetKubernetesServers returns all registered kubernetes servers.
GetKubernetesServers(ctx context.Context) ([]types.KubeServer, error)

// GetDatabases returns all database resources.
GetDatabases(ctx context.Context) ([]types.Database, error)
Expand Down Expand Up @@ -755,6 +759,12 @@ type DiscoveryAccessPoint interface {

// GenerateAWSOIDCToken generates a token to be used to execute an AWS OIDC Integration action.
GenerateAWSOIDCToken(ctx context.Context) (string, error)

// EnrollEKSClusters enrolls EKS clusters into Teleport by installing teleport-kube-agent chart on the clusters.
EnrollEKSClusters(context.Context, *integrationpb.EnrollEKSClustersRequest, ...grpc.CallOption) (*integrationpb.EnrollEKSClustersResponse, error)

// Ping gets basic info about the auth server.
Ping(context.Context) (proto.PingResponse, error)
}

// ReadOktaAccessPoint is a read only API interface to be
Expand Down Expand Up @@ -1319,6 +1329,16 @@ func (w *DiscoveryWrapper) GenerateAWSOIDCToken(ctx context.Context) (string, er
return w.NoCache.GenerateAWSOIDCToken(ctx)
}

// EnrollEKSClusters enrolls EKS clusters into Teleport by installing teleport-kube-agent chart on the clusters.
func (w *DiscoveryWrapper) EnrollEKSClusters(ctx context.Context, req *integrationpb.EnrollEKSClustersRequest, _ ...grpc.CallOption) (*integrationpb.EnrollEKSClustersResponse, error) {
return w.NoCache.EnrollEKSClusters(ctx, req)
}

// Ping gets basic info about the auth server.
func (w *DiscoveryWrapper) Ping(ctx context.Context) (proto.PingResponse, error) {
return w.NoCache.Ping(ctx)
}

// Close closes all associated resources
func (w *DiscoveryWrapper) Close() error {
err := w.NoCache.Close()
Expand Down
1 change: 1 addition & 0 deletions lib/authz/permissions.go
Original file line number Diff line number Diff line change
Expand Up @@ -1111,6 +1111,7 @@ func definitionForBuiltinRole(clusterName string, recConfig types.SessionRecordi
types.NewRule(types.KindClusterName, services.RO()),
types.NewRule(types.KindNamespace, services.RO()),
types.NewRule(types.KindNode, services.RO()),
types.NewRule(types.KindKubeServer, services.RO()),
types.NewRule(types.KindKubernetesCluster, services.RW()),
types.NewRule(types.KindDatabase, services.RW()),
types.NewRule(types.KindServerInfo, services.RW()),
Expand Down
1 change: 1 addition & 0 deletions lib/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,7 @@ func ForDiscovery(cfg Config) Config {
{Kind: types.KindNamespace, Name: apidefaults.Namespace},
{Kind: types.KindNode},
{Kind: types.KindKubernetesCluster},
{Kind: types.KindKubeServer},
{Kind: types.KindDatabase},
{Kind: types.KindApp},
{Kind: types.KindDiscoveryConfig},
Expand Down
4 changes: 1 addition & 3 deletions lib/integrations/awsoidc/eks_enroll_clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,9 +550,6 @@ func installKubeAgent(ctx context.Context, cfg installKubeAgentParams) error {
installCmd := action.NewInstall(cfg.actionConfig)
installCmd.RepoURL = agentRepoURL.String()
installCmd.Version = cfg.req.AgentVersion
if strings.Contains(installCmd.Version, "dev") {
installCmd.Version = "" // For testing during development.
}

agentChart, err := getChartData(installCmd.Version)
if err != nil {
Expand Down Expand Up @@ -587,6 +584,7 @@ func installKubeAgent(ctx context.Context, cfg installKubeAgentParams) error {
for k, v := range cfg.eksCluster.Tags {
eksTags[k] = aws.String(v)
}
eksTags[types.OriginLabel] = aws.String(types.OriginCloud)
kubeCluster, err := services.NewKubeClusterFromAWSEKS(aws.ToString(cfg.eksCluster.Name), aws.ToString(cfg.eksCluster.Arn), eksTags)
if err != nil {
return trace.Wrap(err)
Expand Down
1 change: 1 addition & 0 deletions lib/service/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func (process *TeleportProcess) initDiscoveryService() error {
AccessPoint: accessPoint,
Log: process.log,
ClusterName: conn.ClientIdentity.ClusterName,
ClusterFeatures: process.getClusterFeatures,
PollInterval: process.Config.Discovery.PollInterval,
ServerCredentials: tlsConfig,
AccessGraphConfig: process.Config.AccessGraph,
Expand Down
7 changes: 7 additions & 0 deletions lib/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ import (
"github.com/gravitational/teleport/api/client/webclient"
"github.com/gravitational/teleport/api/constants"
apidefaults "github.com/gravitational/teleport/api/defaults"
integrationpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/integration/v1"
kubeproto "github.com/gravitational/teleport/api/gen/proto/go/teleport/kube/v1"
transportpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/transport/v1"
"github.com/gravitational/teleport/api/types"
Expand Down Expand Up @@ -2306,17 +2307,23 @@ func (process *TeleportProcess) newLocalCacheForDatabase(clt auth.ClientI, cache
return auth.NewDatabaseWrapper(clt, cache), nil
}

type eksClustersEnroller interface {
EnrollEKSClusters(context.Context, *integrationpb.EnrollEKSClustersRequest, ...grpc.CallOption) (*integrationpb.EnrollEKSClustersResponse, error)
}

// combinedDiscoveryClient is an auth.Client client with other, specific, services added to it.
type combinedDiscoveryClient struct {
auth.ClientI
services.DiscoveryConfigsGetter
eksClustersEnroller
}

// newLocalCacheForDiscovery returns a new instance of access point for a discovery service.
func (process *TeleportProcess) newLocalCacheForDiscovery(clt auth.ClientI, cacheName []string) (auth.DiscoveryAccessPoint, error) {
client := combinedDiscoveryClient{
ClientI: clt,
DiscoveryConfigsGetter: clt.DiscoveryConfigClient(),
eksClustersEnroller: clt.IntegrationAWSOIDCClient(),
}

// if caching is disabled, return access point
Expand Down
4 changes: 4 additions & 0 deletions lib/services/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ func MarshalKubeCluster(kubeCluster types.KubeCluster, opts ...MarshalOption) ([
return nil, trace.Wrap(err)
}

if c, ok := kubeCluster.(types.DiscoveredEKSCluster); ok {
kubeCluster = c.GetKubeCluster()
}
Comment thread
AntonAM marked this conversation as resolved.
Outdated

switch cluster := kubeCluster.(type) {
case *types.KubernetesClusterV3:
if err := cluster.CheckAndSetDefaults(); err != nil {
Expand Down
109 changes: 66 additions & 43 deletions lib/srv/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ type Config struct {
// AccessGraphConfig is the configuration for the Access Graph client
AccessGraphConfig servicecfg.AccessGraphConfig

// ClusterFeatures returns flags for supported/unsupported features.
// Used as a function because cluster features might change on Auth restarts.
ClusterFeatures func() proto.Features

// TriggerFetchC is a list of channels that must be notified when a off-band poll must be performed.
// This is used to start a polling iteration when a new DiscoveryConfig change is received.
TriggerFetchC []chan struct{}
Expand Down Expand Up @@ -209,6 +213,10 @@ kubernetes matchers are present.`)
c.clock = clockwork.NewRealClock()
}

if c.ClusterFeatures == nil {
return trace.BadParameter("cluster features are required")
}

c.Log = c.Log.WithField(trace.Component, teleport.ComponentDiscovery)

if c.DiscoveryGroup == "" {
Expand Down Expand Up @@ -245,7 +253,7 @@ type Server struct {
// gcpInstaller is used to start the installation process on discovered GCP
// virtual machines
gcpInstaller gcpInstaller
// kubeFetchers holds all kubernetes fetchers for Azure and other clouds.
// kubeFetchers holds all non-integration based kubernetes fetchers for Azure and other clouds.
kubeFetchers []common.Fetcher
// kubeAppsFetchers holds all kubernetes fetchers for apps.
kubeAppsFetchers []common.Fetcher
Expand Down Expand Up @@ -285,6 +293,12 @@ type Server struct {
muDynamicTAGSyncFetchers sync.RWMutex
staticTAGSyncFetchers []aws_sync.AWSSync

// dynamicKubeIntegrationFetchers holds the current kube fetchers that use integration as a source of credentials,
// for the Dynamic Matchers (those coming from DiscoveryConfig resource).
// The key is the DiscoveryConfig name.
dynamicKubeIntegrationFetchers map[string][]common.Fetcher
muDynamicKubeIntegrationFetchers sync.RWMutex

// caRotationCh receives nodes that need to have their CAs rotated.
caRotationCh chan []types.Server
// reconciler periodically reconciles the labels of discovered instances
Expand All @@ -305,15 +319,16 @@ func New(ctx context.Context, cfg *Config) (*Server, error) {

localCtx, cancelfn := context.WithCancel(ctx)
s := &Server{
Config: cfg,
ctx: localCtx,
cancelfn: cancelfn,
usageEventCache: make(map[string]struct{}),
dynamicDatabaseFetchers: make(map[string][]common.Fetcher),
dynamicServerAWSFetchers: make(map[string][]server.Fetcher),
dynamicServerAzureFetchers: make(map[string][]server.Fetcher),
dynamicServerGCPFetchers: make(map[string][]server.Fetcher),
dynamicTAGSyncFetchers: make(map[string][]aws_sync.AWSSync),
Config: cfg,
ctx: localCtx,
cancelfn: cancelfn,
usageEventCache: make(map[string]struct{}),
dynamicKubeIntegrationFetchers: make(map[string][]common.Fetcher),
dynamicDatabaseFetchers: make(map[string][]common.Fetcher),
dynamicServerAWSFetchers: make(map[string][]server.Fetcher),
dynamicServerAzureFetchers: make(map[string][]server.Fetcher),
dynamicServerGCPFetchers: make(map[string][]server.Fetcher),
dynamicTAGSyncFetchers: make(map[string][]aws_sync.AWSSync),
}
s.discardUnsupportedMatchers(&s.Matchers)

Expand Down Expand Up @@ -430,44 +445,16 @@ func (s *Server) initAWSWatchers(matchers []types.AWSMatcher) error {
// Database fetchers were added in databaseFetchersFromMatchers.
_, otherMatchers = splitMatchers(otherMatchers, db.IsAWSMatcherType)

// Add kube fetchers.
for _, matcher := range otherMatchers {
matcherAssumeRole := types.AssumeRole{}
if matcher.AssumeRole != nil {
matcherAssumeRole = *matcher.AssumeRole
}

for _, t := range matcher.Types {
for _, region := range matcher.Regions {
switch t {
case types.AWSMatcherEKS:
fetcher, err := s.getEKSFetcher(region, matcherAssumeRole, matcher.Tags)
if err != nil {
s.Log.WithError(err).Warnf("Could not initialize EKS fetcher(Region=%q, Labels=%q, AssumeRole=%q), skipping.", region, matcher.Tags, matcherAssumeRole.RoleARN)
continue
}
s.kubeFetchers = append(s.kubeFetchers, fetcher)
}
}
}
// Add non-integration kube fetchers.
kubeFetchers, _, err := fetchers.MakeEKSFetchersFromAWSMatchers(s.Log, s.CloudClients, otherMatchers)
if err != nil {
return trace.Wrap(err)
}
s.kubeFetchers = append(s.kubeFetchers, kubeFetchers...)

return nil
}

func (s *Server) getEKSFetcher(region string, assumeRole types.AssumeRole, tags types.Labels) (common.Fetcher, error) {
fetcher, err := fetchers.NewEKSFetcher(
fetchers.EKSFetcherConfig{
EKSClientGetter: s.CloudClients,
AssumeRole: assumeRole,
Region: region,
FilterLabels: tags,
Log: s.Log,
},
)
return fetcher, trace.Wrap(err)
}

func (s *Server) initKubeAppWatchers(matchers []types.KubernetesMatcher) error {
if len(matchers) == 0 {
return nil
Expand Down Expand Up @@ -572,6 +559,26 @@ func (s *Server) databaseFetchersFromMatchers(matchers Matchers) ([]common.Fetch
return fetchers, nil
}

func (s *Server) kubeIntegrationFetchersFromMatchers(matchers Matchers) ([]common.Fetcher, error) {
var result []common.Fetcher

// AWS
awsKubeMatchers, _ := splitMatchers(matchers.AWS, func(matcherType string) bool {
return matcherType == types.AWSMatcherEKS
})
if len(awsKubeMatchers) > 0 {
_, kubeIntegrationFetchers, err := fetchers.MakeEKSFetchersFromAWSMatchers(s.Log, s.CloudClients, awsKubeMatchers)
if err != nil {
return nil, trace.Wrap(err)
}
result = append(result, kubeIntegrationFetchers...)
}

// There can't be kube integration fetchers for other matcher types.

return result, nil
}

// initAzureWatchers starts Azure resource watchers based on types provided.
func (s *Server) initAzureWatchers(ctx context.Context, matchers []types.AzureMatcher) error {
vmMatchers, otherMatchers := splitMatchers(matchers, func(matcherType string) bool {
Expand Down Expand Up @@ -1214,6 +1221,9 @@ func (s *Server) Start() error {
if err := s.startKubeWatchers(); err != nil {
return trace.Wrap(err)
}
if err := s.startKubeIntegrationWatchers(); err != nil {
return trace.Wrap(err)
}
if err := s.startKubeAppsWatchers(); err != nil {
return trace.Wrap(err)
}
Expand Down Expand Up @@ -1336,6 +1346,10 @@ func (s *Server) deleteDynamicFetchers(name string) {
s.muDynamicTAGSyncFetchers.Lock()
delete(s.dynamicTAGSyncFetchers, name)
s.muDynamicTAGSyncFetchers.Unlock()

s.muDynamicKubeIntegrationFetchers.Lock()
delete(s.dynamicKubeIntegrationFetchers, name)
s.muDynamicKubeIntegrationFetchers.Unlock()
}

// upsertDynamicMatchers upserts the internal set of dynamic matchers given a particular discovery config.
Expand Down Expand Up @@ -1389,6 +1403,15 @@ func (s *Server) upsertDynamicMatchers(ctx context.Context, dc *discoveryconfig.
s.dynamicTAGSyncFetchers[dc.GetName()] = awsSyncMatchers
s.muDynamicTAGSyncFetchers.Unlock()

kubeIntegrationFetchers, err := s.kubeIntegrationFetchersFromMatchers(matchers)
if err != nil {
return trace.Wrap(err)
}

s.muDynamicKubeIntegrationFetchers.Lock()
s.dynamicKubeIntegrationFetchers[dc.GetName()] = kubeIntegrationFetchers
s.muDynamicKubeIntegrationFetchers.Unlock()

// TODO(marco): add other fetchers: Kube Clusters and Kube Resources (Apps)
return nil
}
Expand Down
Loading