diff --git a/lib/srv/discovery/access_graph_aws.go b/lib/srv/discovery/access_graph_aws.go index fa662da21dada..ab5b185450174 100644 --- a/lib/srv/discovery/access_graph_aws.go +++ b/lib/srv/discovery/access_graph_aws.go @@ -82,8 +82,10 @@ func (s *Server) reconcileAccessGraph(ctx context.Context, currentTAGResources * return trace.Wrap(errNoAccessGraphFetchers) } - s.awsSyncStatus.iterationStarted(allFetchers, s.clock.Now()) - for _, discoveryConfigName := range s.awsSyncStatus.discoveryConfigs() { + for _, fetcher := range allFetchers { + s.tagSyncStatus.syncStarted(fetcher, s.clock.Now()) + } + for _, discoveryConfigName := range s.tagSyncStatus.discoveryConfigs() { s.updateDiscoveryConfigStatus(discoveryConfigName) } @@ -92,7 +94,6 @@ func (s *Server) reconcileAccessGraph(ctx context.Context, currentTAGResources * tokens := make(chan struct{}, 3) accountIds := map[string]struct{}{} for _, fetcher := range allFetchers { - fetcher := fetcher accountIds[fetcher.GetAccountID()] = struct{}{} tokens <- struct{}{} go func() { @@ -127,8 +128,10 @@ func (s *Server) reconcileAccessGraph(ctx context.Context, currentTAGResources * upsert, toDel := aws_sync.ReconcileResults(currentTAGResources, result) pushErr := push(stream, upsert, toDel) - s.awsSyncStatus.iterationFinished(allFetchers, pushErr, s.clock.Now()) - for _, discoveryConfigName := range s.awsSyncStatus.discoveryConfigs() { + for _, fetcher := range allFetchers { + s.tagSyncStatus.syncFinished(fetcher, pushErr, s.clock.Now()) + } + for _, discoveryConfigName := range s.tagSyncStatus.discoveryConfigs() { s.updateDiscoveryConfigStatus(discoveryConfigName) } @@ -153,8 +156,8 @@ func (s *Server) reconcileAccessGraph(ctx context.Context, currentTAGResources * } // getAllAWSSyncFetchers returns all AWS sync fetchers. -func (s *Server) getAllAWSSyncFetchers() []aws_sync.AWSSync { - allFetchers := make([]aws_sync.AWSSync, 0, len(s.dynamicTAGAWSFetchers)) +func (s *Server) getAllAWSSyncFetchers() []*aws_sync.Fetcher { + allFetchers := make([]*aws_sync.Fetcher, 0, len(s.dynamicTAGAWSFetchers)) s.muDynamicTAGAWSFetchers.RLock() for _, fetcherSet := range s.dynamicTAGAWSFetchers { @@ -483,8 +486,8 @@ func (s *Server) initTAGAWSWatchers(ctx context.Context, cfg *Config) error { } // accessGraphAWSFetchersFromMatchers converts Matchers into a set of AWS Sync Fetchers. -func (s *Server) accessGraphAWSFetchersFromMatchers(ctx context.Context, matchers Matchers, discoveryConfigName string) ([]aws_sync.AWSSync, error) { - var fetchers []aws_sync.AWSSync +func (s *Server) accessGraphAWSFetchersFromMatchers(ctx context.Context, matchers Matchers, discoveryConfigName string) ([]*aws_sync.Fetcher, error) { + var fetchers []*aws_sync.Fetcher var errs []error if matchers.AccessGraph == nil { return fetchers, nil @@ -498,7 +501,7 @@ func (s *Server) accessGraphAWSFetchersFromMatchers(ctx context.Context, matcher ExternalID: awsFetcher.AssumeRole.ExternalID, } } - fetcher, err := aws_sync.NewAWSFetcher( + fetcher, err := aws_sync.NewFetcher( ctx, aws_sync.Config{ AWSConfigProvider: s.AWSConfigProvider, diff --git a/lib/srv/discovery/access_graph_azure.go b/lib/srv/discovery/access_graph_azure.go index 98192983617e7..166c126103284 100644 --- a/lib/srv/discovery/access_graph_azure.go +++ b/lib/srv/discovery/access_graph_azure.go @@ -64,6 +64,13 @@ func (s *Server) reconcileAccessGraphAzure( return trace.Wrap(errNoAccessGraphFetchers) } + for _, fetcher := range allFetchers { + s.tagSyncStatus.syncStarted(fetcher, s.clock.Now()) + } + for _, discoveryConfigName := range s.tagSyncStatus.discoveryConfigs() { + s.updateDiscoveryConfigStatus(discoveryConfigName) + } + // Fetch results concurrently resultsC := make(chan fetcherResult, len(allFetchers)) // Restricts concurrently running fetchers to 3 @@ -107,6 +114,13 @@ func (s *Server) reconcileAccessGraphAzure( upsert, toDel := azuresync.ReconcileResults(currentTAGResources, result) pushErr := azurePush(stream, upsert, toDel) + for _, fetcher := range allFetchers { + s.tagSyncStatus.syncFinished(fetcher, pushErr, s.clock.Now()) + } + for _, discoveryConfigName := range s.tagSyncStatus.discoveryConfigs() { + s.updateDiscoveryConfigStatus(discoveryConfigName) + } + if pushErr != nil { s.Log.ErrorContext(ctx, "Error pushing TAGs", "error", pushErr) return nil diff --git a/lib/srv/discovery/access_graph_test.go b/lib/srv/discovery/access_graph_test.go index b02c32348c6af..c6efe1064cbfc 100644 --- a/lib/srv/discovery/access_graph_test.go +++ b/lib/srv/discovery/access_graph_test.go @@ -36,7 +36,7 @@ func TestServer_updateDiscoveryConfigStatus(t *testing.T) { testErr := "test error" clock := clockwork.NewFakeClock() type args struct { - fetchers []aws_sync.AWSSync + fetchers []*fakeFetcher pushErr error preRun bool } @@ -48,8 +48,8 @@ func TestServer_updateDiscoveryConfigStatus(t *testing.T) { { name: "test updateDiscoveryConfigStatus", args: args{ - fetchers: []aws_sync.AWSSync{ - &fakeFetcher{ + fetchers: []*fakeFetcher{ + { count: 1, discoveryConfigName: "test", }, @@ -71,8 +71,8 @@ func TestServer_updateDiscoveryConfigStatus(t *testing.T) { { name: "test updateDiscoveryConfigStatus with pushError", args: args{ - fetchers: []aws_sync.AWSSync{ - &fakeFetcher{ + fetchers: []*fakeFetcher{ + { count: 1, discoveryConfigName: "test", }, @@ -94,8 +94,8 @@ func TestServer_updateDiscoveryConfigStatus(t *testing.T) { { name: "test updateDiscoveryConfigStatus with error", args: args{ - fetchers: []aws_sync.AWSSync{ - &fakeFetcher{ + fetchers: []*fakeFetcher{ + { count: 1, discoveryConfigName: "test", err: errors.New(testErr), @@ -117,8 +117,8 @@ func TestServer_updateDiscoveryConfigStatus(t *testing.T) { { name: "discar reports for non-discovery config results", args: args{ - fetchers: []aws_sync.AWSSync{ - &fakeFetcher{ + fetchers: []*fakeFetcher{ + { count: 1, }, }, @@ -128,8 +128,8 @@ func TestServer_updateDiscoveryConfigStatus(t *testing.T) { { name: "test updateDiscoveryConfigStatus pre-run", args: args{ - fetchers: []aws_sync.AWSSync{ - &fakeFetcher{ + fetchers: []*fakeFetcher{ + { discoveryConfigName: "test", }, }, @@ -150,16 +150,16 @@ func TestServer_updateDiscoveryConfigStatus(t *testing.T) { { name: "test multiple aws sync fetchers", args: args{ - fetchers: []aws_sync.AWSSync{ - &fakeFetcher{ + fetchers: []*fakeFetcher{ + { discoveryConfigName: "test1", count: 1, }, - &fakeFetcher{ + { discoveryConfigName: "test1", count: 1, }, - &fakeFetcher{ + { discoveryConfigName: "test2", count: 1, }, @@ -189,7 +189,7 @@ func TestServer_updateDiscoveryConfigStatus(t *testing.T) { { name: "merge two errors", args: args{ - fetchers: []aws_sync.AWSSync{ + fetchers: []*fakeFetcher{ &fakeFetcher{ discoveryConfigName: "test1", err: fmt.Errorf("error in fetcher 1"), @@ -214,12 +214,12 @@ func TestServer_updateDiscoveryConfigStatus(t *testing.T) { { name: "reports error if at least one fetcher fails", args: args{ - fetchers: []aws_sync.AWSSync{ - &fakeFetcher{ + fetchers: []*fakeFetcher{ + { discoveryConfigName: "test1", err: fmt.Errorf("error in fetcher 1"), }, - &fakeFetcher{ + { discoveryConfigName: "test1", count: 2, }, @@ -247,16 +247,20 @@ func TestServer_updateDiscoveryConfigStatus(t *testing.T) { AccessPoint: accessPoint, clock: clock, }, - awsSyncStatus: awsSyncStatus{}, + tagSyncStatus: newTagSyncStatus(), } if tt.args.preRun { - s.awsSyncStatus.iterationStarted(tt.args.fetchers, s.clock.Now()) + for _, fetcher := range tt.args.fetchers { + s.tagSyncStatus.syncStarted(fetcher, s.clock.Now()) + } } else { - s.awsSyncStatus.iterationFinished(tt.args.fetchers, tt.args.pushErr, s.clock.Now()) + for _, fetcher := range tt.args.fetchers { + s.tagSyncStatus.syncFinished(fetcher, tt.args.pushErr, s.clock.Now()) + } } - for _, discoveryConfigName := range s.awsSyncStatus.discoveryConfigs() { + for _, discoveryConfigName := range s.tagSyncStatus.discoveryConfigs() { s.updateDiscoveryConfigStatus(discoveryConfigName) } @@ -270,7 +274,7 @@ func stringPointer(s string) *string { } type fakeFetcher struct { - aws_sync.AWSSync + aws_sync.Fetcher err error count uint64 discoveryConfigName string diff --git a/lib/srv/discovery/discovery.go b/lib/srv/discovery/discovery.go index 4dd5eb77f59a3..0f2617fb3f140 100644 --- a/lib/srv/discovery/discovery.go +++ b/lib/srv/discovery/discovery.go @@ -422,9 +422,9 @@ type Server struct { // dynamicTAGAWSFetchers holds the current TAG Fetchers for the Dynamic Matchers (those coming from DiscoveryConfig resource). // The key is the DiscoveryConfig name. - dynamicTAGAWSFetchers map[string][]aws_sync.AWSSync + dynamicTAGAWSFetchers map[string][]*aws_sync.Fetcher muDynamicTAGAWSFetchers sync.RWMutex - staticTAGAWSFetchers []aws_sync.AWSSync + staticTAGAWSFetchers []*aws_sync.Fetcher // dynamicTAGAzureFetchers holds the current TAG Fetchers for the Dynamic Matchers (those coming from DiscoveryConfig resource). // The key is the DiscoveryConfig name. @@ -440,7 +440,7 @@ type Server struct { dynamicDiscoveryConfig map[string]*discoveryconfig.DiscoveryConfig - awsSyncStatus awsSyncStatus + tagSyncStatus *tagSyncStatus awsEC2ResourcesStatus awsResourcesStatus awsRDSResourcesStatus awsResourcesStatus awsEKSResourcesStatus awsResourcesStatus @@ -477,10 +477,10 @@ func New(ctx context.Context, cfg *Config) (*Server, error) { dynamicServerAWSFetchers: make(map[string][]server.Fetcher), dynamicServerAzureFetchers: make(map[string][]server.Fetcher), dynamicServerGCPFetchers: make(map[string][]server.Fetcher), - dynamicTAGAWSFetchers: make(map[string][]aws_sync.AWSSync), + dynamicTAGAWSFetchers: make(map[string][]*aws_sync.Fetcher), dynamicTAGAzureFetchers: make(map[string][]*azure_sync.Fetcher), dynamicDiscoveryConfig: make(map[string]*discoveryconfig.DiscoveryConfig), - awsSyncStatus: awsSyncStatus{}, + tagSyncStatus: newTagSyncStatus(), awsEC2ResourcesStatus: newAWSResourceStatusCollector(types.AWSMatcherEC2), awsRDSResourcesStatus: newAWSResourceStatusCollector(types.AWSMatcherRDS), awsEKSResourcesStatus: newAWSResourceStatusCollector(types.AWSMatcherEKS), diff --git a/lib/srv/discovery/fetchers/aws-sync/aws-sync.go b/lib/srv/discovery/fetchers/aws-sync/aws-sync.go index f3f1056352ae1..eecc06b0ef218 100644 --- a/lib/srv/discovery/fetchers/aws-sync/aws-sync.go +++ b/lib/srv/discovery/fetchers/aws-sync/aws-sync.go @@ -138,28 +138,14 @@ type AssumeRole struct { ExternalID string } -// awsFetcher is a fetcher that fetches AWS resources. -type awsFetcher struct { +// Fetcher is a fetcher that fetches AWS resources. +type Fetcher struct { Config lastError error lastDiscoveredResources uint64 lastResult *Resources } -// AWSSync is the interface for fetching AWS resources. -type AWSSync interface { - // Poll polls all AWS resources and returns the result. - Poll(context.Context, Features) (*Resources, error) - // Status reports the last known status of the fetcher. - Status() (uint64, error) - // DiscoveryConfigName returns the name of the Discovery Config. - DiscoveryConfigName() string - // IsFromDiscoveryConfig returns true if the fetcher is associated with a Discovery Config. - IsFromDiscoveryConfig() bool - // GetAccountID returns the AWS account ID. - GetAccountID() string -} - // Resources is a collection of polled AWS resources. type Resources struct { // Users is the list of AWS users. @@ -249,12 +235,12 @@ func (r *Resources) UsageReport(numberAccounts int) *usageeventsv1.AccessGraphAW } } -// NewAWSFetcher creates a new AWS fetcher. -func NewAWSFetcher(ctx context.Context, cfg Config) (AWSSync, error) { +// NewFetcher creates a new AWS fetcher. +func NewFetcher(ctx context.Context, cfg Config) (*Fetcher, error) { if err := cfg.CheckAndSetDefaults(); err != nil { return nil, trace.Wrap(err) } - a := &awsFetcher{ + a := &Fetcher{ Config: cfg, lastResult: &Resources{}, } @@ -270,14 +256,14 @@ func NewAWSFetcher(ctx context.Context, cfg Config) (AWSSync, error) { // Poll is a blocking call and will return when all resources have been fetched. // It's possible that the call returns Resources and an error at the same time // if some resources were fetched successfully and some were not. -func (a *awsFetcher) Poll(ctx context.Context, features Features) (*Resources, error) { +func (a *Fetcher) Poll(ctx context.Context, features Features) (*Resources, error) { result, err := a.poll(ctx, features) deduplicateResources(result) a.storeReport(result, err) return result, trace.Wrap(err) } -func (a *awsFetcher) storeReport(rec *Resources, err error) { +func (a *Fetcher) storeReport(rec *Resources, err error) { a.lastError = err if rec == nil { return @@ -286,11 +272,11 @@ func (a *awsFetcher) storeReport(rec *Resources, err error) { a.lastDiscoveredResources = uint64(rec.count()) } -func (a *awsFetcher) GetAccountID() string { +func (a *Fetcher) GetAccountID() string { return a.AccountID } -func (a *awsFetcher) poll(ctx context.Context, features Features) (*Resources, error) { +func (a *Fetcher) poll(ctx context.Context, features Features) (*Resources, error) { eGroup, ctx := errgroup.WithContext(ctx) // Set the limit for the number of concurrent pollers running in parallel. // This is to prevent the number of concurrent pollers from growing too large @@ -371,7 +357,7 @@ func (a *awsFetcher) poll(ctx context.Context, features Features) (*Resources, e // getAWSOptions returns a list of AWSAssumeRoleOptionFn to be used when // creating AWS clients. -func (a *awsFetcher) getAWSOptions() []cloud.AWSOptionsFn { +func (a *Fetcher) getAWSOptions() []cloud.AWSOptionsFn { opts := []cloud.AWSOptionsFn{ cloud.WithCredentialsMaybeIntegration(a.Config.Integration), } @@ -398,7 +384,7 @@ func (a *awsFetcher) getAWSOptions() []cloud.AWSOptionsFn { // getAWSV2Options returns a list of options to be used when // creating AWS clients with the v2 sdk. -func (a *awsFetcher) getAWSV2Options() []awsconfig.OptionsFn { +func (a *Fetcher) getAWSV2Options() []awsconfig.OptionsFn { opts := []awsconfig.OptionsFn{ awsconfig.WithCredentialsMaybeIntegration(a.Config.Integration), } @@ -417,7 +403,7 @@ func (a *awsFetcher) getAWSV2Options() []awsconfig.OptionsFn { return opts } -func (a *awsFetcher) getAccountId(ctx context.Context) (string, error) { +func (a *Fetcher) getAccountId(ctx context.Context) (string, error) { stsClient, err := a.CloudClients.GetAWSSTSClient( ctx, "", /* region is empty because groups are global */ @@ -436,14 +422,14 @@ func (a *awsFetcher) getAccountId(ctx context.Context) (string, error) { return aws.ToString(req.Account), nil } -func (a *awsFetcher) DiscoveryConfigName() string { +func (a *Fetcher) DiscoveryConfigName() string { return a.Config.DiscoveryConfigName } -func (a *awsFetcher) IsFromDiscoveryConfig() bool { +func (a *Fetcher) IsFromDiscoveryConfig() bool { return a.Config.DiscoveryConfigName != "" } -func (a *awsFetcher) Status() (uint64, error) { +func (a *Fetcher) Status() (uint64, error) { return a.lastDiscoveredResources, a.lastError } diff --git a/lib/srv/discovery/fetchers/aws-sync/ec2.go b/lib/srv/discovery/fetchers/aws-sync/ec2.go index ec24aca8184e1..1ae520e743c38 100644 --- a/lib/srv/discovery/fetchers/aws-sync/ec2.go +++ b/lib/srv/discovery/fetchers/aws-sync/ec2.go @@ -39,7 +39,7 @@ import ( // pollAWSEC2Instances is a function that returns a function that fetches // ec2 instances and instance profiles and returns an error if any. -func (a *awsFetcher) pollAWSEC2Instances(ctx context.Context, result *Resources, collectErr func(error)) func() error { +func (a *Fetcher) pollAWSEC2Instances(ctx context.Context, result *Resources, collectErr func(error)) func() error { return func() error { var err error @@ -59,7 +59,7 @@ func (a *awsFetcher) pollAWSEC2Instances(ctx context.Context, result *Resources, // as a slice of accessgraphv1alpha.AWSInstanceV1. // It uses ec2.DescribeInstancesPagesWithContext to iterate over all instances // in all regions. -func (a *awsFetcher) fetchAWSEC2Instances(ctx context.Context) ([]*accessgraphv1alpha.AWSInstanceV1, error) { +func (a *Fetcher) fetchAWSEC2Instances(ctx context.Context) ([]*accessgraphv1alpha.AWSInstanceV1, error) { var ( hosts []*accessgraphv1alpha.AWSInstanceV1 hostsMu sync.Mutex @@ -150,7 +150,7 @@ func awsInstanceToProtoInstance(instance ec2types.Instance, region string, accou // fetchInstanceProfiles fetches instance profiles from all regions and returns them // as a slice of accessgraphv1alpha.AWSInstanceProfileV1. -func (a *awsFetcher) fetchInstanceProfiles(ctx context.Context) ([]*accessgraphv1alpha.AWSInstanceProfileV1, error) { +func (a *Fetcher) fetchInstanceProfiles(ctx context.Context) ([]*accessgraphv1alpha.AWSInstanceProfileV1, error) { existing := a.lastResult.InstanceProfiles awsCfg, err := a.AWSConfigProvider.GetConfig( ctx, diff --git a/lib/srv/discovery/fetchers/aws-sync/eks.go b/lib/srv/discovery/fetchers/aws-sync/eks.go index fc1791b4cb13a..beba3a5a9a3b3 100644 --- a/lib/srv/discovery/fetchers/aws-sync/eks.go +++ b/lib/srv/discovery/fetchers/aws-sync/eks.go @@ -50,7 +50,7 @@ type EKSClient interface { // pollAWSEKSClusters is a function that returns a function that fetches // eks clusters and their access scope levels. -func (a *awsFetcher) pollAWSEKSClusters(ctx context.Context, result *Resources, collectErr func(error)) func() error { +func (a *Fetcher) pollAWSEKSClusters(ctx context.Context, result *Resources, collectErr func(error)) func() error { return func() error { output, err := a.fetchAWSSEKSClusters(ctx) if err != nil { @@ -71,7 +71,7 @@ type fetchAWSEKSClustersOutput struct { } // fetchAWSSEKSClusters fetches eks instances from all regions. -func (a *awsFetcher) fetchAWSSEKSClusters(ctx context.Context) (fetchAWSEKSClustersOutput, error) { +func (a *Fetcher) fetchAWSSEKSClusters(ctx context.Context) (fetchAWSEKSClustersOutput, error) { var ( output fetchAWSEKSClustersOutput hostsMu sync.Mutex @@ -213,7 +213,7 @@ func awsEKSClusterToProtoCluster(cluster *ekstypes.Cluster, region, accountID st } // fetchAccessEntries fetches the access entries for the given cluster. -func (a *awsFetcher) fetchAccessEntries(ctx context.Context, eksClient EKSClient, cluster *accessgraphv1alpha.AWSEKSClusterV1) ([]*accessgraphv1alpha.AWSEKSClusterAccessEntryV1, error) { +func (a *Fetcher) fetchAccessEntries(ctx context.Context, eksClient EKSClient, cluster *accessgraphv1alpha.AWSEKSClusterV1) ([]*accessgraphv1alpha.AWSEKSClusterAccessEntryV1, error) { var accessEntries []string for p := eks.NewListAccessEntriesPaginator(eksClient, @@ -277,7 +277,7 @@ func awsAccessEntryToProtoAccessEntry(accessEntry *ekstypes.AccessEntry, cluster } // fetchAccessEntries fetches the access entries for the given cluster. -func (a *awsFetcher) fetchAssociatedPolicies(ctx context.Context, eksClient EKSClient, cluster *accessgraphv1alpha.AWSEKSClusterV1, arns []string) ([]*accessgraphv1alpha.AWSEKSAssociatedAccessPolicyV1, error) { +func (a *Fetcher) fetchAssociatedPolicies(ctx context.Context, eksClient EKSClient, cluster *accessgraphv1alpha.AWSEKSClusterV1, arns []string) ([]*accessgraphv1alpha.AWSEKSAssociatedAccessPolicyV1, error) { var associatedPolicies []*accessgraphv1alpha.AWSEKSAssociatedAccessPolicyV1 var errs []error diff --git a/lib/srv/discovery/fetchers/aws-sync/eks_test.go b/lib/srv/discovery/fetchers/aws-sync/eks_test.go index b38f1ff851a92..e8cd55b767767 100644 --- a/lib/srv/discovery/fetchers/aws-sync/eks_test.go +++ b/lib/srv/discovery/fetchers/aws-sync/eks_test.go @@ -196,7 +196,7 @@ func TestPollAWSEKSClusters(t *testing.T) { defer mu.Unlock() errs = append(errs, err) } - a := &awsFetcher{ + a := &Fetcher{ Config: Config{ AccountID: accountID, Regions: regions, diff --git a/lib/srv/discovery/fetchers/aws-sync/groups.go b/lib/srv/discovery/fetchers/aws-sync/groups.go index b4c9ac1dc8a62..b212ee1629b5e 100644 --- a/lib/srv/discovery/fetchers/aws-sync/groups.go +++ b/lib/srv/discovery/fetchers/aws-sync/groups.go @@ -34,7 +34,7 @@ import ( // pollAWSGroups is a function that returns a function that fetches // AWS groups and their inline and attached policies. -func (a *awsFetcher) pollAWSGroups(ctx context.Context, result *Resources, collectErr func(error)) func() error { +func (a *Fetcher) pollAWSGroups(ctx context.Context, result *Resources, collectErr func(error)) func() error { return func() error { var err error @@ -90,7 +90,7 @@ func (a *awsFetcher) pollAWSGroups(ctx context.Context, result *Resources, colle // fetchGroups fetches AWS groups and returns them as a slice of accessgraphv1alpha.AWSGroupV1. // It uses ListGroupsPagesWithContext to iterate over all groups. -func (a *awsFetcher) fetchGroups(ctx context.Context) ([]*accessgraphv1alpha.AWSGroupV1, error) { +func (a *Fetcher) fetchGroups(ctx context.Context) ([]*accessgraphv1alpha.AWSGroupV1, error) { awsCfg, err := a.AWSConfigProvider.GetConfig( ctx, "", /* region is empty because groups are global */ @@ -140,7 +140,7 @@ func awsGroupToProtoGroup(group iamtypes.Group, accountID string) *accessgraphv1 // as a slice of accessgraphv1alpha.AWSGroupInlinePolicyV1. // It uses ListGroupPoliciesPagesWithContext to iterate over all inline policies // associated with the group. -func (a *awsFetcher) fetchGroupInlinePolicies(ctx context.Context, group *accessgraphv1alpha.AWSGroupV1) ([]*accessgraphv1alpha.AWSGroupInlinePolicyV1, error) { +func (a *Fetcher) fetchGroupInlinePolicies(ctx context.Context, group *accessgraphv1alpha.AWSGroupV1) ([]*accessgraphv1alpha.AWSGroupInlinePolicyV1, error) { awsCfg, err := a.AWSConfigProvider.GetConfig( ctx, "", /* region is empty because users and groups are global */ @@ -197,7 +197,7 @@ func awsGroupPolicyToProtoGroupPolicy(policy *iam.GetGroupPolicyOutput, accountI } // fetchGroupAttachedPolicies fetches attached policies for a group. -func (a *awsFetcher) fetchGroupAttachedPolicies(ctx context.Context, group *accessgraphv1alpha.AWSGroupV1) (*accessgraphv1alpha.AWSGroupAttachedPolicies, error) { +func (a *Fetcher) fetchGroupAttachedPolicies(ctx context.Context, group *accessgraphv1alpha.AWSGroupV1) (*accessgraphv1alpha.AWSGroupAttachedPolicies, error) { awsCfg, err := a.AWSConfigProvider.GetConfig( ctx, "", /* region is empty because users and groups are global */ diff --git a/lib/srv/discovery/fetchers/aws-sync/iam_test.go b/lib/srv/discovery/fetchers/aws-sync/iam_test.go index d428aa4ed3a42..9e84d892a224f 100644 --- a/lib/srv/discovery/fetchers/aws-sync/iam_test.go +++ b/lib/srv/discovery/fetchers/aws-sync/iam_test.go @@ -65,7 +65,7 @@ func TestAWSIAMPollSAMLProviders(t *testing.T) { }, ) require.NoError(t, err) - a := &awsFetcher{ + a := &Fetcher{ Config: Config{ AccountID: accountID, AWSConfigProvider: &mocks.AWSConfigProvider{ @@ -203,7 +203,7 @@ func TestAWSIAMPollOIDCProviders(t *testing.T) { defer mu.Unlock() errs = append(errs, err) } - a := &awsFetcher{ + a := &Fetcher{ Config: Config{ AccountID: accountID, AWSConfigProvider: &mocks.AWSConfigProvider{ diff --git a/lib/srv/discovery/fetchers/aws-sync/idp.go b/lib/srv/discovery/fetchers/aws-sync/idp.go index 98f7c61536c6d..a564c5f02ebf8 100644 --- a/lib/srv/discovery/fetchers/aws-sync/idp.go +++ b/lib/srv/discovery/fetchers/aws-sync/idp.go @@ -31,7 +31,7 @@ import ( accessgraphv1alpha "github.com/gravitational/teleport/gen/proto/go/accessgraph/v1alpha" ) -func (a *awsFetcher) pollAWSSAMLProviders(ctx context.Context, result *Resources, collectErr func(error)) func() error { +func (a *Fetcher) pollAWSSAMLProviders(ctx context.Context, result *Resources, collectErr func(error)) func() error { return func() error { existing := a.lastResult awsCfg, err := a.AWSConfigProvider.GetConfig( @@ -74,7 +74,7 @@ func (a *awsFetcher) pollAWSSAMLProviders(ctx context.Context, result *Resources } // fetchAWSSAMLProvider fetches data about a single SAML identity provider. -func (a *awsFetcher) fetchAWSSAMLProvider(ctx context.Context, client iamClient, arn string) (*accessgraphv1alpha.AWSSAMLProviderV1, error) { +func (a *Fetcher) fetchAWSSAMLProvider(ctx context.Context, client iamClient, arn string) (*accessgraphv1alpha.AWSSAMLProviderV1, error) { providerResp, err := client.GetSAMLProvider(ctx, &iam.GetSAMLProviderInput{ SAMLProviderArn: aws.String(arn), }) @@ -132,7 +132,7 @@ func awsSAMLProviderOutputToProto(arn string, accountID string, provider *iam.Ge }, nil } -func (a *awsFetcher) pollAWSOIDCProviders(ctx context.Context, result *Resources, collectErr func(error)) func() error { +func (a *Fetcher) pollAWSOIDCProviders(ctx context.Context, result *Resources, collectErr func(error)) func() error { return func() error { existing := a.lastResult awsCfg, err := a.AWSConfigProvider.GetConfig( @@ -175,7 +175,7 @@ func (a *awsFetcher) pollAWSOIDCProviders(ctx context.Context, result *Resources } // fetchAWSOIDCProvider fetches data about a single OIDC identity provider. -func (a *awsFetcher) fetchAWSOIDCProvider(ctx context.Context, client iamClient, arn string) (*accessgraphv1alpha.AWSOIDCProviderV1, error) { +func (a *Fetcher) fetchAWSOIDCProvider(ctx context.Context, client iamClient, arn string) (*accessgraphv1alpha.AWSOIDCProviderV1, error) { providerResp, err := client.GetOpenIDConnectProvider(ctx, &iam.GetOpenIDConnectProviderInput{ OpenIDConnectProviderArn: aws.String(arn), }) diff --git a/lib/srv/discovery/fetchers/aws-sync/policies.go b/lib/srv/discovery/fetchers/aws-sync/policies.go index 2b3171d3045df..511245a4b732c 100644 --- a/lib/srv/discovery/fetchers/aws-sync/policies.go +++ b/lib/srv/discovery/fetchers/aws-sync/policies.go @@ -34,7 +34,7 @@ import ( // pollAWSPolicies is a function that returns a function that fetches // AWS policies and returns an error if any. -func (a *awsFetcher) pollAWSPolicies(ctx context.Context, result *Resources, collectErr func(error)) func() error { +func (a *Fetcher) pollAWSPolicies(ctx context.Context, result *Resources, collectErr func(error)) func() error { return func() error { var err error result.Policies, err = a.fetchPolicies(ctx) @@ -49,7 +49,7 @@ func (a *awsFetcher) pollAWSPolicies(ctx context.Context, result *Resources, col // accessgraphv1alpha.AWSPolicyV1. // It uses iam.ListPoliciesPagesWithContext to iterate over all policies // and iam.GetPolicyVersionWithContext to fetch policy documents. -func (a *awsFetcher) fetchPolicies(ctx context.Context) ([]*accessgraphv1alpha.AWSPolicyV1, error) { +func (a *Fetcher) fetchPolicies(ctx context.Context) ([]*accessgraphv1alpha.AWSPolicyV1, error) { awsCfg, err := a.AWSConfigProvider.GetConfig( ctx, "", /* region is empty because users and groups are global */ diff --git a/lib/srv/discovery/fetchers/aws-sync/rds.go b/lib/srv/discovery/fetchers/aws-sync/rds.go index f163c49f6b6d3..3d7117425d6ab 100644 --- a/lib/srv/discovery/fetchers/aws-sync/rds.go +++ b/lib/srv/discovery/fetchers/aws-sync/rds.go @@ -40,7 +40,7 @@ type rdsClient interface { // pollAWSRDSDatabases is a function that returns a function that fetches // RDS instances and clusters. -func (a *awsFetcher) pollAWSRDSDatabases(ctx context.Context, result *Resources, collectErr func(error)) func() error { +func (a *Fetcher) pollAWSRDSDatabases(ctx context.Context, result *Resources, collectErr func(error)) func() error { return func() error { var err error result.RDSDatabases, err = a.fetchAWSRDSDatabases(ctx) @@ -52,7 +52,7 @@ func (a *awsFetcher) pollAWSRDSDatabases(ctx context.Context, result *Resources, } // fetchAWSRDSDatabases fetches RDS databases from all regions. -func (a *awsFetcher) fetchAWSRDSDatabases(ctx context.Context) ( +func (a *Fetcher) fetchAWSRDSDatabases(ctx context.Context) ( []*accessgraphv1alpha.AWSRDSDatabaseV1, error, ) { @@ -155,7 +155,7 @@ func awsRDSClusterToRDS(instance *rdstypes.DBCluster, region, accountID string) } } -func (a *awsFetcher) collectDBInstances(ctx context.Context, +func (a *Fetcher) collectDBInstances(ctx context.Context, clt rdsClient, region string, collectDBs func([]*accessgraphv1alpha.AWSRDSDatabaseV1, error), @@ -189,7 +189,7 @@ func (a *awsFetcher) collectDBInstances(ctx context.Context, collectDBs(instances, nil) } -func (a *awsFetcher) collectDBClusters( +func (a *Fetcher) collectDBClusters( ctx context.Context, clt rdsClient, region string, diff --git a/lib/srv/discovery/fetchers/aws-sync/rds_test.go b/lib/srv/discovery/fetchers/aws-sync/rds_test.go index f888294c3ddcf..27940a1d4215f 100644 --- a/lib/srv/discovery/fetchers/aws-sync/rds_test.go +++ b/lib/srv/discovery/fetchers/aws-sync/rds_test.go @@ -101,14 +101,14 @@ func TestPollAWSRDS(t *testing.T) { tests := []struct { name string - fetcherConfigOpt func(*awsFetcher) + fetcherConfigOpt func(*Fetcher) want *Resources checkError func(*testing.T, error) }{ { name: "poll rds databases", want: &resourcesFixture, - fetcherConfigOpt: func(a *awsFetcher) { + fetcherConfigOpt: func(a *Fetcher) { a.awsClients = fakeAWSClients{ rdsClient: &mocks.RDSClient{ DBInstances: dbInstances(), @@ -123,7 +123,7 @@ func TestPollAWSRDS(t *testing.T) { { name: "reuse last synced databases on failure", want: &resourcesFixture, - fetcherConfigOpt: func(a *awsFetcher) { + fetcherConfigOpt: func(a *Fetcher) { a.awsClients = fakeAWSClients{ rdsClient: &mocks.RDSClient{Unauth: true}, } @@ -137,7 +137,7 @@ func TestPollAWSRDS(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - a := &awsFetcher{ + a := &Fetcher{ Config: Config{ AccountID: accountID, AWSConfigProvider: &mocks.AWSConfigProvider{ diff --git a/lib/srv/discovery/fetchers/aws-sync/roles.go b/lib/srv/discovery/fetchers/aws-sync/roles.go index a56678b609a44..5f076f4b525c0 100644 --- a/lib/srv/discovery/fetchers/aws-sync/roles.go +++ b/lib/srv/discovery/fetchers/aws-sync/roles.go @@ -36,7 +36,7 @@ import ( // pollAWSRoles is a function that returns a function that fetches // AWS roles and their inline and attached policies. -func (a *awsFetcher) pollAWSRoles(ctx context.Context, result *Resources, collectErr func(error)) func() error { +func (a *Fetcher) pollAWSRoles(ctx context.Context, result *Resources, collectErr func(error)) func() error { return func() error { var err error existing := a.lastResult @@ -85,7 +85,7 @@ func (a *awsFetcher) pollAWSRoles(ctx context.Context, result *Resources, collec } // fetchRoles fetches AWS roles and returns them as a slice of accessgraphv1alpha.AWSRoleV1. -func (a *awsFetcher) fetchRoles(ctx context.Context) ([]*accessgraphv1alpha.AWSRoleV1, error) { +func (a *Fetcher) fetchRoles(ctx context.Context) ([]*accessgraphv1alpha.AWSRoleV1, error) { awsCfg, err := a.AWSConfigProvider.GetConfig( ctx, "", /* region is empty because roles are global */ @@ -122,7 +122,7 @@ func (a *awsFetcher) fetchRoles(ctx context.Context) ([]*accessgraphv1alpha.AWSR // them as a slice of accessgraphv1alpha.AWSRoleInlinePolicyV1. // It uses iam.ListRolePoliciesPagesWithContext to iterate over all inline policies // and iam.GetRolePolicyWithContext to fetch policy documents. -func (a *awsFetcher) fetchRoleInlinePolicies(ctx context.Context, role *accessgraphv1alpha.AWSRoleV1) ([]*accessgraphv1alpha.AWSRoleInlinePolicyV1, error) { +func (a *Fetcher) fetchRoleInlinePolicies(ctx context.Context, role *accessgraphv1alpha.AWSRoleV1) ([]*accessgraphv1alpha.AWSRoleInlinePolicyV1, error) { awsCfg, err := a.AWSConfigProvider.GetConfig( ctx, "", /* region is empty because users and groups are global */ @@ -171,7 +171,7 @@ func (a *awsFetcher) fetchRoleInlinePolicies(ctx context.Context, role *accessgr } // fetchRoleAttachedPolicies fetches attached policies for an AWS role. -func (a *awsFetcher) fetchRoleAttachedPolicies(ctx context.Context, role *accessgraphv1alpha.AWSRoleV1) (*accessgraphv1alpha.AWSRoleAttachedPolicies, error) { +func (a *Fetcher) fetchRoleAttachedPolicies(ctx context.Context, role *accessgraphv1alpha.AWSRoleV1) (*accessgraphv1alpha.AWSRoleAttachedPolicies, error) { awsCfg, err := a.AWSConfigProvider.GetConfig( ctx, "", /* region is empty because users and groups are global */ diff --git a/lib/srv/discovery/fetchers/aws-sync/s3.go b/lib/srv/discovery/fetchers/aws-sync/s3.go index b340d6d8d50d4..20f749f6fd952 100644 --- a/lib/srv/discovery/fetchers/aws-sync/s3.go +++ b/lib/srv/discovery/fetchers/aws-sync/s3.go @@ -49,7 +49,7 @@ type s3Client interface { // pollAWSS3Buckets is a function that returns a function that fetches // AWS s3 buckets and their inline and attached policies. -func (a *awsFetcher) pollAWSS3Buckets(ctx context.Context, result *Resources, collectErr func(error)) func() error { +func (a *Fetcher) pollAWSS3Buckets(ctx context.Context, result *Resources, collectErr func(error)) func() error { return func() error { var err error result.S3Buckets, err = a.fetchS3Buckets(ctx) @@ -62,7 +62,7 @@ func (a *awsFetcher) pollAWSS3Buckets(ctx context.Context, result *Resources, co // fetchS3Buckets fetches AWS s3 buckets and returns them as a slice of // accessgraphv1alpha.AWSS3BucketV1. -func (a *awsFetcher) fetchS3Buckets(ctx context.Context) ([]*accessgraphv1alpha.AWSS3BucketV1, error) { +func (a *Fetcher) fetchS3Buckets(ctx context.Context) ([]*accessgraphv1alpha.AWSS3BucketV1, error) { var s3s []*accessgraphv1alpha.AWSS3BucketV1 var errs []error var mu sync.Mutex @@ -213,7 +213,7 @@ type s3Details struct { tags *s3.GetBucketTaggingOutput } -func (a *awsFetcher) getS3BucketDetails(ctx context.Context, bucket s3types.Bucket, bucketRegion string) (s3Details, failedRequests, []error) { +func (a *Fetcher) getS3BucketDetails(ctx context.Context, bucket s3types.Bucket, bucketRegion string) (s3Details, failedRequests, []error) { var failedReqs failedRequests var errs []error var details s3Details @@ -293,7 +293,7 @@ func isS3BucketNoTagSet(err error) bool { return isAPIErrorCode(err, "NoSuchTagSet") } -func (a *awsFetcher) listS3Buckets(ctx context.Context) ([]s3types.Bucket, func(*string) (string, error), error) { +func (a *Fetcher) listS3Buckets(ctx context.Context) ([]s3types.Bucket, func(*string) (string, error), error) { region := awsutil.GetKnownRegions()[0] if len(a.Regions) > 0 { region = a.Regions[0] diff --git a/lib/srv/discovery/fetchers/aws-sync/s3_test.go b/lib/srv/discovery/fetchers/aws-sync/s3_test.go index c6dc57d9b6266..ade779690b490 100644 --- a/lib/srv/discovery/fetchers/aws-sync/s3_test.go +++ b/lib/srv/discovery/fetchers/aws-sync/s3_test.go @@ -174,7 +174,7 @@ func TestPollAWSS3(t *testing.T) { defer mu.Unlock() errs = append(errs, err) } - a := &awsFetcher{ + a := &Fetcher{ Config: Config{ AWSConfigProvider: &mocks.AWSConfigProvider{ OIDCIntegrationClient: &mocks.FakeOIDCIntegrationClient{ diff --git a/lib/srv/discovery/fetchers/aws-sync/users.go b/lib/srv/discovery/fetchers/aws-sync/users.go index 3284dfb518466..94f28cc9a3928 100644 --- a/lib/srv/discovery/fetchers/aws-sync/users.go +++ b/lib/srv/discovery/fetchers/aws-sync/users.go @@ -34,7 +34,7 @@ import ( // pollAWSUsers is a function that returns a function that fetches // AWS users and their inline and attached policies, and groups. -func (a *awsFetcher) pollAWSUsers(ctx context.Context, result, existing *Resources, collectErr func(error)) func() error { +func (a *Fetcher) pollAWSUsers(ctx context.Context, result, existing *Resources, collectErr func(error)) func() error { return func() error { var err error @@ -104,7 +104,7 @@ func (a *awsFetcher) pollAWSUsers(ctx context.Context, result, existing *Resourc // fetchUsers fetches AWS users and returns them as a slice of accessgraphv1alpha.AWSUserV1. // It uses iam.ListUsersPagesWithContext to iterate over all users. -func (a *awsFetcher) fetchUsers(ctx context.Context) ([]*accessgraphv1alpha.AWSUserV1, error) { +func (a *Fetcher) fetchUsers(ctx context.Context) ([]*accessgraphv1alpha.AWSUserV1, error) { awsCfg, err := a.AWSConfigProvider.GetConfig( ctx, "", /* region is empty because users are global */ @@ -170,7 +170,7 @@ func awsUserToProtoUser(user iamtypes.User, accountID string) *accessgraphv1alph } } -func (a *awsFetcher) fetchUserInlinePolicies(ctx context.Context, user *accessgraphv1alpha.AWSUserV1) ([]*accessgraphv1alpha.AWSUserInlinePolicyV1, error) { +func (a *Fetcher) fetchUserInlinePolicies(ctx context.Context, user *accessgraphv1alpha.AWSUserV1) ([]*accessgraphv1alpha.AWSUserInlinePolicyV1, error) { awsCfg, err := a.AWSConfigProvider.GetConfig( ctx, "", /* region is empty because users and groups are global */ @@ -227,7 +227,7 @@ func awsUserPolicyToProtoUserPolicy(policy *iam.GetUserPolicyOutput, user *acces } } -func (a *awsFetcher) fetchUserAttachedPolicies(ctx context.Context, user *accessgraphv1alpha.AWSUserV1) (*accessgraphv1alpha.AWSUserAttachedPolicies, error) { +func (a *Fetcher) fetchUserAttachedPolicies(ctx context.Context, user *accessgraphv1alpha.AWSUserV1) (*accessgraphv1alpha.AWSUserAttachedPolicies, error) { awsCfg, err := a.AWSConfigProvider.GetConfig( ctx, "", /* region is empty because users and groups are global */ @@ -271,7 +271,7 @@ func (a *awsFetcher) fetchUserAttachedPolicies(ctx context.Context, user *access return rsp, trace.Wrap(err) } -func (a *awsFetcher) fetchGroupsForUser(ctx context.Context, user *accessgraphv1alpha.AWSUserV1) (*accessgraphv1alpha.AWSUserGroupsV1, error) { +func (a *Fetcher) fetchGroupsForUser(ctx context.Context, user *accessgraphv1alpha.AWSUserV1) (*accessgraphv1alpha.AWSUserGroupsV1, error) { userGroups := &accessgraphv1alpha.AWSUserGroupsV1{ User: user, LastSyncTime: timestamppb.Now(), diff --git a/lib/srv/discovery/status.go b/lib/srv/discovery/status.go index ab7a23e040039..4b25bff187540 100644 --- a/lib/srv/discovery/status.go +++ b/lib/srv/discovery/status.go @@ -35,10 +35,19 @@ import ( "github.com/gravitational/teleport/api/utils/retryutils" libevents "github.com/gravitational/teleport/lib/events" "github.com/gravitational/teleport/lib/services" - aws_sync "github.com/gravitational/teleport/lib/srv/discovery/fetchers/aws-sync" "github.com/gravitational/teleport/lib/srv/server" ) +// FetcherStatus defines an interface for fetchers to report status +type FetcherStatus interface { + // Status reports the last known status of the fetcher. + Status() (uint64, error) + // DiscoveryConfigName returns the name of the Discovery Config. + DiscoveryConfigName() string + // IsFromDiscoveryConfig returns true if the fetcher is associated with a Discovery Config. + IsFromDiscoveryConfig() bool +} + // updateDiscoveryConfigStatus updates the DiscoveryConfig Status field with the current in-memory status. // The status will be updated with the following matchers: // - AWS Sync (TAG) status @@ -59,8 +68,8 @@ func (s *Server) updateDiscoveryConfigStatus(discoveryConfigNames ...string) { IntegrationDiscoveredResources: make(map[string]*discoveryconfigv1.IntegrationDiscoveredSummary), } - // Merge AWS Sync (TAG) status - discoveryConfigStatus = s.awsSyncStatus.mergeIntoGlobalStatus(discoveryConfigName, discoveryConfigStatus) + // Merge AWS or Azure Sync (TAG) status + discoveryConfigStatus = s.tagSyncStatus.mergeIntoGlobalStatus(discoveryConfigName, discoveryConfigStatus) // Merge AWS EC2 Instances (auto discovery) status discoveryConfigStatus = s.awsEC2ResourcesStatus.mergeIntoGlobalStatus(discoveryConfigName, discoveryConfigStatus) @@ -84,16 +93,23 @@ func (s *Server) updateDiscoveryConfigStatus(discoveryConfigNames ...string) { } } -// awsSyncStatus contains all the status for aws_sync Fetchers grouped by DiscoveryConfig. -type awsSyncStatus struct { +// tagSyncStatus contains all the status for both AWS and Azure fetchers grouped by DiscoveryConfig. +type tagSyncStatus struct { mu sync.RWMutex - // awsSyncResults maps the DiscoveryConfig name to a aws_sync result. - // Each DiscoveryConfig might have multiple `aws_sync` matchers. - awsSyncResults map[string][]awsSyncResult + // syncResults maps the DiscoveryConfig name to a AWS or Azure fetcher result. + // Each DiscoveryConfig might have multiple AWS or Azure matchers. + syncResults map[string][]tagSyncResult +} + +// newTagSyncStatus creates a new sync status object for storing results from the last fetch +func newTagSyncStatus() *tagSyncStatus { + return &tagSyncStatus{ + syncResults: make(map[string][]tagSyncResult), + } } -// awsSyncResult stores the result of the aws_sync Matchers for a given DiscoveryConfig. -type awsSyncResult struct { +// tagSyncResult stores the result of the aws_sync Matchers for a given DiscoveryConfig. +type tagSyncResult struct { // state is the State for the DiscoveryConfigStatus. // Allowed values are: // - DISCOVERY_CONFIG_STATE_SYNCING @@ -105,72 +121,65 @@ type awsSyncResult struct { discoveredResources uint64 } -func (d *awsSyncStatus) iterationFinished(fetchers []aws_sync.AWSSync, pushErr error, lastUpdate time.Time) { +func (d *tagSyncStatus) syncFinished(fetcher FetcherStatus, pushErr error, lastUpdate time.Time) { d.mu.Lock() defer d.mu.Unlock() - d.awsSyncResults = make(map[string][]awsSyncResult) - for _, fetcher := range fetchers { - // Only update the status for fetchers that are from the discovery config. - if !fetcher.IsFromDiscoveryConfig() { - continue - } - - count, statusErr := fetcher.Status() - statusAndPushErr := trace.NewAggregate(statusErr, pushErr) + // Only update the status for fetchers that are from the discovery config. + if !fetcher.IsFromDiscoveryConfig() { + return + } - fetcherResult := awsSyncResult{ - state: discoveryconfigv1.DiscoveryConfigState_DISCOVERY_CONFIG_STATE_RUNNING.String(), - lastSyncTime: lastUpdate, - discoveredResources: count, - } + count, statusErr := fetcher.Status() + statusAndPushErr := trace.NewAggregate(statusErr, pushErr) - if statusAndPushErr != nil { - errorMessage := statusAndPushErr.Error() - fetcherResult.errorMessage = &errorMessage - fetcherResult.state = discoveryconfigv1.DiscoveryConfigState_DISCOVERY_CONFIG_STATE_ERROR.String() - } + fetcherResult := tagSyncResult{ + state: discoveryconfigv1.DiscoveryConfigState_DISCOVERY_CONFIG_STATE_RUNNING.String(), + lastSyncTime: lastUpdate, + discoveredResources: count, + } - d.awsSyncResults[fetcher.DiscoveryConfigName()] = append(d.awsSyncResults[fetcher.DiscoveryConfigName()], fetcherResult) + if statusAndPushErr != nil { + errorMessage := statusAndPushErr.Error() + fetcherResult.errorMessage = &errorMessage + fetcherResult.state = discoveryconfigv1.DiscoveryConfigState_DISCOVERY_CONFIG_STATE_ERROR.String() } + + d.syncResults[fetcher.DiscoveryConfigName()] = append(d.syncResults[fetcher.DiscoveryConfigName()], fetcherResult) } -func (d *awsSyncStatus) discoveryConfigs() []string { +func (d *tagSyncStatus) discoveryConfigs() []string { d.mu.RLock() defer d.mu.RUnlock() - ret := make([]string, 0, len(d.awsSyncResults)) - for k := range d.awsSyncResults { + ret := make([]string, 0, len(d.syncResults)) + for k := range d.syncResults { ret = append(ret, k) } return ret } -func (d *awsSyncStatus) iterationStarted(fetchers []aws_sync.AWSSync, lastUpdate time.Time) { +func (d *tagSyncStatus) syncStarted(fetcher FetcherStatus, lastUpdate time.Time) { d.mu.Lock() defer d.mu.Unlock() + // Only update the status for fetchers that are from the discovery config. + if !fetcher.IsFromDiscoveryConfig() { + return + } - d.awsSyncResults = make(map[string][]awsSyncResult) - for _, fetcher := range fetchers { - // Only update the status for fetchers that are from the discovery config. - if !fetcher.IsFromDiscoveryConfig() { - continue - } - - fetcherResult := awsSyncResult{ - state: discoveryconfigv1.DiscoveryConfigState_DISCOVERY_CONFIG_STATE_SYNCING.String(), - lastSyncTime: lastUpdate, - } - - d.awsSyncResults[fetcher.DiscoveryConfigName()] = append(d.awsSyncResults[fetcher.DiscoveryConfigName()], fetcherResult) + fetcherResult := tagSyncResult{ + state: discoveryconfigv1.DiscoveryConfigState_DISCOVERY_CONFIG_STATE_SYNCING.String(), + lastSyncTime: lastUpdate, } + + d.syncResults[fetcher.DiscoveryConfigName()] = append(d.syncResults[fetcher.DiscoveryConfigName()], fetcherResult) } -func (d *awsSyncStatus) mergeIntoGlobalStatus(discoveryConfigName string, existingStatus discoveryconfig.Status) discoveryconfig.Status { +func (d *tagSyncStatus) mergeIntoGlobalStatus(discoveryConfigName string, existingStatus discoveryconfig.Status) discoveryconfig.Status { d.mu.RLock() defer d.mu.RUnlock() - awsStatusFetchers, found := d.awsSyncResults[discoveryConfigName] + awsStatusFetchers, found := d.syncResults[discoveryConfigName] if !found { return existingStatus }