Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/srv/discovery/database_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,11 @@ func (s *Server) startDatabaseWatchers() error {
func (s *Server) getAllDatabaseFetchers() []common.Fetcher {
allFetchers := make([]common.Fetcher, 0, len(s.databaseFetchers))

s.muDynamicFetchers.RLock()
s.muDynamicDatabaseFetchers.RLock()
for _, fetcherSet := range s.dynamicDatabaseFetchers {
allFetchers = append(allFetchers, fetcherSet...)
}
s.muDynamicFetchers.RUnlock()
s.muDynamicDatabaseFetchers.RUnlock()

return append(allFetchers, s.databaseFetchers...)
}
Expand Down
279 changes: 217 additions & 62 deletions lib/srv/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,26 @@ type Server struct {

// dynamicDatabaseFetchers holds the current Database Fetchers for the Dynamic Matchers (those coming from DiscoveryConfig resource).
// The key is the DiscoveryConfig name.
dynamicDatabaseFetchers map[string][]common.Fetcher
muDynamicFetchers sync.RWMutex
dynamicDatabaseFetchers map[string][]common.Fetcher
muDynamicDatabaseFetchers sync.RWMutex

// dynamicServerAWSFetchers holds the current AWS EC2 Fetchers for the Dynamic Matchers (those coming from DiscoveryConfig resource).
// The key is the DiscoveryConfig name.
dynamicServerAWSFetchers map[string][]server.Fetcher
muDynamicServerAWSFetchers sync.RWMutex
staticServerAWSFetchers []server.Fetcher

// dynamicServerAzureFetchers holds the current Azure VM Fetchers for the Dynamic Matchers (those coming from DiscoveryConfig resource).
// The key is the DiscoveryConfig name.
dynamicServerAzureFetchers map[string][]server.Fetcher
muDynamicServerAzureFetchers sync.RWMutex
staticServerAzureFetchers []server.Fetcher

// dynamicServerGCPFetchers holds the current GCP VM Fetchers for the Dynamic Matchers (those coming from DiscoveryConfig resource).
// The key is the DiscoveryConfig name.
dynamicServerGCPFetchers map[string][]server.Fetcher
muDynamicServerGCPFetchers sync.RWMutex
staticServerGCPFetchers []server.Fetcher

// caRotationCh receives nodes that need to have their CAs rotated.
caRotationCh chan []types.Server
Expand All @@ -240,11 +258,14 @@ func New(ctx context.Context, cfg *Config) (*Server, error) {

localCtx, cancelfn := context.WithCancel(ctx)
s := &Server{
Config: cfg,
ctx: localCtx,
cancelfn: cancelfn,
usageEventCache: make(map[string]struct{}),
dynamicDatabaseFetchers: make(map[string][]common.Fetcher),
Config: cfg,
ctx: localCtx,
cancelfn: cancelfn,
usageEventCache: make(map[string]struct{}),
dynamicDatabaseFetchers: make(map[string][]common.Fetcher),
dynamicServerAWSFetchers: make(map[string][]server.Fetcher),
dynamicServerAzureFetchers: make(map[string][]server.Fetcher),
dynamicServerGCPFetchers: make(map[string][]server.Fetcher),
}

if err := s.startDynamicMatchersWatcher(ctx); err != nil {
Expand Down Expand Up @@ -316,35 +337,39 @@ func (s *Server) startDynamicMatchersWatcher(ctx context.Context) error {

// initAWSWatchers starts AWS resource watchers based on types provided.
func (s *Server) initAWSWatchers(matchers []types.AWSMatcher) error {
var err error

ec2Matchers, otherMatchers := splitMatchers(matchers, func(matcherType string) bool {
return matcherType == types.AWSMatcherEC2
})

// start ec2 watchers
var err error
if len(ec2Matchers) > 0 {
s.caRotationCh = make(chan []types.Server)
s.ec2Watcher, err = server.NewEC2Watcher(s.ctx, ec2Matchers, s.CloudClients, s.caRotationCh, server.WithPollInterval(s.PollInterval))
if err != nil {
return trace.Wrap(err)
}
s.staticServerAWSFetchers, err = server.MatchersToEC2InstanceFetchers(s.ctx, ec2Matchers, s.CloudClients)
if err != nil {
return trace.Wrap(err)
}

if s.ec2Installer == nil {
s.ec2Installer = server.NewSSMInstaller(server.SSMInstallerConfig{
Emitter: s.Emitter,
})
}
s.ec2Watcher, err = server.NewEC2Watcher(s.ctx, s.getAllAWSServerFetchers, s.caRotationCh, server.WithPollInterval(s.PollInterval))
if err != nil {
return trace.Wrap(err)
}

lr, err := newLabelReconciler(&labelReconcilerConfig{
log: s.Log,
accessPoint: s.AccessPoint,
s.caRotationCh = make(chan []types.Server)

if s.ec2Installer == nil {
s.ec2Installer = server.NewSSMInstaller(server.SSMInstallerConfig{
Emitter: s.Emitter,
})
if err != nil {
return trace.Wrap(err)
}
s.reconciler = lr
}

lr, err := newLabelReconciler(&labelReconcilerConfig{
log: s.Log,
accessPoint: s.AccessPoint,
})
if err != nil {
return trace.Wrap(err)
}
s.reconciler = lr

// Database fetchers were added in databaseFetchersFromMatchers.
_, otherMatchers = splitMatchers(otherMatchers, db.IsAWSMatcherType)

Expand Down Expand Up @@ -421,6 +446,49 @@ func (s *Server) initKubeAppWatchers(matchers []types.KubernetesMatcher) error {
return nil
}

// awsServerFetchersFromMatchers converts Matchers into a set of AWS EC2 Fetchers.
func (s *Server) awsServerFetchersFromMatchers(ctx context.Context, matchers []types.AWSMatcher) ([]server.Fetcher, error) {
serverMatchers, _ := splitMatchers(matchers, func(matcherType string) bool {
return matcherType == types.AWSMatcherEC2
})

fetchers, err := server.MatchersToEC2InstanceFetchers(ctx, serverMatchers, s.CloudClients)
if err != nil {
return nil, trace.Wrap(err)
}

return fetchers, nil
}

// azureServerFetchersFromMatchers converts Matchers into a set of Azure Servers Fetchers.
func (s *Server) azureServerFetchersFromMatchers(matchers []types.AzureMatcher) []server.Fetcher {
serverMatchers, _ := splitMatchers(matchers, func(matcherType string) bool {
return matcherType == types.AzureMatcherVM
})

return server.MatchersToAzureInstanceFetchers(serverMatchers, s.CloudClients)
}

// gcpServerFetchersFromMatchers converts Matchers into a set of GCP Servers Fetchers.
func (s *Server) gcpServerFetchersFromMatchers(ctx context.Context, matchers []types.GCPMatcher) ([]server.Fetcher, error) {
serverMatchers, _ := splitMatchers(matchers, func(matcherType string) bool {
return matcherType == types.GCPMatcherCompute
})

if len(serverMatchers) == 0 {
// We have an early exit here because GetGCPInstancesClient returns an error
// when there are no credentials in the environment.
return nil, nil
}

client, err := s.CloudClients.GetGCPInstancesClient(ctx)
if err != nil {
return nil, trace.Wrap(err)
}

return server.MatchersToGCPInstanceFetchers(serverMatchers, client), nil
}

// databaseFetchersFromMatchers converts Matchers into a set of Database Fetchers.
func (s *Server) databaseFetchersFromMatchers(matchers Matchers) ([]common.Fetcher, error) {
var fetchers []common.Fetcher
Expand Down Expand Up @@ -457,17 +525,17 @@ func (s *Server) initAzureWatchers(ctx context.Context, matchers []types.AzureMa
return matcherType == types.AzureMatcherVM
})

s.staticServerAzureFetchers = server.MatchersToAzureInstanceFetchers(vmMatchers, s.CloudClients)

// VM watcher.
if len(vmMatchers) > 0 {
var err error
s.azureWatcher, err = server.NewAzureWatcher(s.ctx, vmMatchers, s.CloudClients, server.WithPollInterval(s.PollInterval))
if err != nil {
return trace.Wrap(err)
}
if s.azureInstaller == nil {
s.azureInstaller = &server.AzureInstaller{
Emitter: s.Emitter,
}
var err error
s.azureWatcher, err = server.NewAzureWatcher(s.ctx, s.getAllAzureServerFetchers, server.WithPollInterval(s.PollInterval))
if err != nil {
return trace.Wrap(err)
}
if s.azureInstaller == nil {
s.azureInstaller = &server.AzureInstaller{
Emitter: s.Emitter,
}
}

Expand Down Expand Up @@ -506,30 +574,46 @@ func (s *Server) initAzureWatchers(ctx context.Context, matchers []types.AzureMa
return nil
}

func (s *Server) initGCPServerWatcher(ctx context.Context, vmMatchers []types.GCPMatcher) error {
var err error

s.staticServerGCPFetchers, err = s.gcpServerFetchersFromMatchers(ctx, vmMatchers)
if err != nil {
return trace.Wrap(err)
}

s.gcpWatcher, err = server.NewGCPWatcher(s.ctx, s.getAllGCPServerFetchers)
if err != nil {
return trace.Wrap(err)
}

if s.gcpInstaller == nil {
s.gcpInstaller = &server.GCPInstaller{
Emitter: s.Emitter,
}
}

return nil
}

// initGCPWatchers starts GCP resource watchers based on types provided.
func (s *Server) initGCPWatchers(ctx context.Context, matchers []types.GCPMatcher) error {
// return early if there are no matchers as GetGCPGKEClient causes
// an error if there are no credentials present
if len(matchers) == 0 {
return nil
}

vmMatchers, otherMatchers := splitMatchers(matchers, func(matcherType string) bool {
return matcherType == types.GCPMatcherCompute
})

// VM watcher.
if len(vmMatchers) > 0 {
var err error
s.gcpWatcher, err = server.NewGCPWatcher(s.ctx, vmMatchers, s.CloudClients)
if err != nil {
return trace.Wrap(err)
}
if s.gcpInstaller == nil {
s.gcpInstaller = &server.GCPInstaller{
Emitter: s.Emitter,
}
}
if err := s.initGCPServerWatcher(ctx, vmMatchers); err != nil {
return trace.Wrap(err)
}

// If there's no GCP Client creds in the environment
// and call to GetGCP...Client
// Early exit if there's no Kube Matchers, to prevent the error.
if len(otherMatchers) == 0 {
return nil
}

kubeClient, err := s.CloudClients.GetGCPGKEClient(ctx)
Expand Down Expand Up @@ -963,6 +1047,42 @@ func (s *Server) emitUsageEvents(events map[string]*usageeventsv1.ResourceCreate
return nil
}

func (s *Server) getAllAWSServerFetchers() []server.Fetcher {
allFetchers := make([]server.Fetcher, 0, len(s.staticServerAWSFetchers))

s.muDynamicServerAWSFetchers.RLock()
for _, fetcherSet := range s.dynamicServerAWSFetchers {
allFetchers = append(allFetchers, fetcherSet...)
}
s.muDynamicServerAWSFetchers.RUnlock()

return append(allFetchers, s.staticServerAWSFetchers...)
}

func (s *Server) getAllAzureServerFetchers() []server.Fetcher {
allFetchers := make([]server.Fetcher, 0, len(s.staticServerAzureFetchers))

s.muDynamicServerAzureFetchers.RLock()
for _, fetcherSet := range s.dynamicServerAzureFetchers {
allFetchers = append(allFetchers, fetcherSet...)
}
s.muDynamicServerAzureFetchers.RUnlock()

return append(allFetchers, s.staticServerAzureFetchers...)
}

func (s *Server) getAllGCPServerFetchers() []server.Fetcher {
allFetchers := make([]server.Fetcher, 0, len(s.staticServerGCPFetchers))

s.muDynamicServerGCPFetchers.RLock()
for _, fetcherSet := range s.dynamicServerGCPFetchers {
allFetchers = append(allFetchers, fetcherSet...)
}
s.muDynamicServerGCPFetchers.RUnlock()

return append(allFetchers, s.staticServerGCPFetchers...)
}

// Start starts the discovery service.
func (s *Server) Start() error {
if s.ec2Watcher != nil {
Expand Down Expand Up @@ -1032,9 +1152,7 @@ func (s *Server) startDynamicWatcherUpdater() {
// Let's assume there's a DiscoveryConfig DC1 has DiscoveryGroup DG1, which this process is monitoring.
// If the user updates the DiscoveryGroup to DG2, then DC1 must be removed from the scope of this process.
// We blindly delete it, in the worst case, this is a no-op.
s.muDynamicFetchers.Lock()
delete(s.dynamicDatabaseFetchers, event.Resource.GetName())
s.muDynamicFetchers.Unlock()
s.deleteDynamicFetchers(event.Resource.GetName())
continue
}

Expand All @@ -1044,9 +1162,7 @@ func (s *Server) startDynamicWatcherUpdater() {
}

case types.OpDelete:
s.muDynamicFetchers.Lock()
delete(s.dynamicDatabaseFetchers, event.Resource.GetName())
s.muDynamicFetchers.Unlock()
s.deleteDynamicFetchers(event.Resource.GetName())

default:
s.Log.Warnf("Skipping unknown event type %s", event.Type)
Expand All @@ -1058,6 +1174,24 @@ func (s *Server) startDynamicWatcherUpdater() {
}
}

func (s *Server) deleteDynamicFetchers(name string) {
s.muDynamicDatabaseFetchers.Lock()
delete(s.dynamicDatabaseFetchers, name)
s.muDynamicDatabaseFetchers.Unlock()

s.muDynamicServerAWSFetchers.Lock()
delete(s.dynamicServerAWSFetchers, name)
s.muDynamicServerAWSFetchers.Unlock()

s.muDynamicServerAzureFetchers.Lock()
delete(s.dynamicServerAzureFetchers, name)
s.muDynamicServerAzureFetchers.Unlock()

s.muDynamicServerGCPFetchers.Lock()
delete(s.dynamicServerGCPFetchers, name)
s.muDynamicServerGCPFetchers.Unlock()
}

// upsertDynamicMatchers upserts the internal set of dynamic matchers given a particular discovery config.
func (s *Server) upsertDynamicMatchers(dc *discoveryconfig.DiscoveryConfig) error {
matchers := Matchers{
Expand All @@ -1067,16 +1201,37 @@ func (s *Server) upsertDynamicMatchers(dc *discoveryconfig.DiscoveryConfig) erro
Kubernetes: dc.Spec.Kube,
}

databaseFetchers, err := s.databaseFetchersFromMatchers(matchers)
awsServerFetchers, err := s.awsServerFetchersFromMatchers(s.ctx, matchers.AWS)
if err != nil {
return trace.Wrap(err)
}
s.muDynamicServerAWSFetchers.Lock()
s.dynamicServerAWSFetchers[dc.GetName()] = awsServerFetchers
s.muDynamicServerAWSFetchers.Unlock()

// TODO(marco): add other matcher types (VMs, Kubes, KubeApps)
azureServerFetchers := s.azureServerFetchersFromMatchers(matchers.Azure)
s.muDynamicServerAzureFetchers.Lock()
s.dynamicServerAzureFetchers[dc.GetName()] = azureServerFetchers
s.muDynamicServerAzureFetchers.Unlock()

s.muDynamicFetchers.Lock()
gcpServerFetchers, err := s.gcpServerFetchersFromMatchers(s.ctx, matchers.GCP)
if err != nil {
return trace.Wrap(err)
}
s.muDynamicServerGCPFetchers.Lock()
s.dynamicServerGCPFetchers[dc.GetName()] = gcpServerFetchers
s.muDynamicServerGCPFetchers.Unlock()

databaseFetchers, err := s.databaseFetchersFromMatchers(matchers)
if err != nil {
return trace.Wrap(err)
}

s.muDynamicDatabaseFetchers.Lock()
s.dynamicDatabaseFetchers[dc.GetName()] = databaseFetchers
s.muDynamicFetchers.Unlock()
s.muDynamicDatabaseFetchers.Unlock()

// TODO(marco): add other fetchers: Kube Clusters and Kube Resources (Apps)
return nil
}

Expand Down
Loading