diff --git a/lib/srv/discovery/status.go b/lib/srv/discovery/status.go index 27a91fb880c3c..0ac521020d95e 100644 --- a/lib/srv/discovery/status.go +++ b/lib/srv/discovery/status.go @@ -669,7 +669,7 @@ func (s *Server) mergeUpsertDiscoverEC2Task(taskGroup awsEC2TaskKey, failedInsta case err != nil: return trace.Wrap(err) default: - failedInstances = s.discoverEC2UserTaskAddExistingInstances(currentUserTask, failedInstances) + mergeExistingInstances(s, currentUserTask.Spec.DiscoverEc2.Instances, failedInstances.Instances) } // If the DiscoveryService is stopped, or the issue does not happen again @@ -697,30 +697,6 @@ func (s *Server) mergeUpsertDiscoverEC2Task(taskGroup awsEC2TaskKey, failedInsta return nil } -// discoverEC2UserTaskAddExistingInstances takes the UserTask stored in the cluster and merges it into the existing map of failed instances. -func (s *Server) discoverEC2UserTaskAddExistingInstances(currentUserTask *usertasksv1.UserTask, failedInstances *usertasksv1.DiscoverEC2) *usertasksv1.DiscoverEC2 { - for existingInstanceID, existingInstance := range currentUserTask.Spec.DiscoverEc2.Instances { - // Each DiscoveryService works on all the DiscoveryConfigs assigned to a given DiscoveryGroup. - // So, it's safe to say that current DiscoveryService has the last state for a given DiscoveryGroup. - // If other instances exist for this DiscoveryGroup, they can be discarded because, as said before, the current DiscoveryService has the last state for a given DiscoveryGroup. - if existingInstance.DiscoveryGroup == s.DiscoveryGroup { - continue - } - - // For existing instances whose sync time is too far in the past, just drop them. - // This ensures that if an instance is removed from AWS, it will eventually disappear from the User Tasks' instance list. - // It might also be the case that the DiscoveryConfig was changed and the instance is no longer matched (because of labels/regions or other matchers). - instanceIssueExpiration := s.clock.Now().Add(-2 * s.PollInterval) - if existingInstance.SyncTime.AsTime().Before(instanceIssueExpiration) { - continue - } - - // Merge existing cluster state into in-memory object. - failedInstances.Instances[existingInstanceID] = existingInstance - } - return failedInstances -} - func (s *Server) upsertTasksForAWSEC2FailedEnrollments() { s.awsEC2Tasks.mu.Lock() defer s.awsEC2Tasks.mu.Unlock() @@ -789,7 +765,7 @@ func (s *Server) mergeUpsertDiscoverEKSTask(taskGroup awsEKSTaskKey, failedClust case err != nil: return trace.Wrap(err) default: - failedClusters = s.discoverEKSUserTaskAddExistingClusters(currentUserTask, failedClusters) + mergeExistingInstances(s, currentUserTask.Spec.DiscoverEks.Clusters, failedClusters.Clusters) } // If the DiscoveryService is stopped, or the issue does not happen again @@ -817,30 +793,6 @@ func (s *Server) mergeUpsertDiscoverEKSTask(taskGroup awsEKSTaskKey, failedClust return nil } -// discoverEKSUserTaskAddExistingClusters takes the UserTask stored in the cluster and merges it into the existing map of failed clusters. -func (s *Server) discoverEKSUserTaskAddExistingClusters(currentUserTask *usertasksv1.UserTask, failedClusters *usertasksv1.DiscoverEKS) *usertasksv1.DiscoverEKS { - for existingClusterName, existingCluster := range currentUserTask.Spec.DiscoverEks.Clusters { - // Each DiscoveryService works on all the DiscoveryConfigs assigned to a given DiscoveryGroup. - // So, it's safe to say that current DiscoveryService has the last state for a given DiscoveryGroup. - // If other clusters exist for this DiscoveryGroup, they can be discarded because, as said before, the current DiscoveryService has the last state for a given DiscoveryGroup. - if existingCluster.DiscoveryGroup == s.DiscoveryGroup { - continue - } - - // For existing clusters whose sync time is too far in the past, just drop them. - // This ensures that if a cluster is removed from AWS, it will eventually disappear from the User Tasks' cluster list. - // It might also be the case that the DiscoveryConfig was changed and the cluster is no longer matched (because of labels/regions or other matchers). - clusterIssueExpiration := s.clock.Now().Add(-2 * s.PollInterval) - if existingCluster.SyncTime.AsTime().Before(clusterIssueExpiration) { - continue - } - - // Merge existing cluster state into in-memory object. - failedClusters.Clusters[existingClusterName] = existingCluster - } - return failedClusters -} - func (s *Server) upsertTasksForAWSRDSFailedEnrollments() { s.awsRDSTasks.mu.Lock() defer s.awsRDSTasks.mu.Unlock() @@ -889,7 +841,7 @@ func (s *Server) mergeUpsertDiscoverRDSTask(taskGroup awsRDSTaskKey, failedDatab case err != nil: return trace.Wrap(err) default: - failedDatabases = s.discoverRDSUserTaskAddExistingDatabases(currentUserTask, failedDatabases) + mergeExistingInstances(s, currentUserTask.Spec.DiscoverRds.Databases, failedDatabases.Databases) } // If the DiscoveryService is stopped, or the issue does not happen again @@ -917,26 +869,30 @@ func (s *Server) mergeUpsertDiscoverRDSTask(taskGroup awsRDSTaskKey, failedDatab return nil } -// discoverRDSUserTaskAddExistingDatabases takes the UserTask stored in the cluster and merges it into the existing map of failed databases. -func (s *Server) discoverRDSUserTaskAddExistingDatabases(currentUserTask *usertasksv1.UserTask, failedDatabases *usertasksv1.DiscoverRDS) *usertasksv1.DiscoverRDS { - for existingDatabaseName, existingDatabase := range currentUserTask.Spec.DiscoverRds.Databases { +func mergeExistingInstances[Instance interface { + GetSyncTime() *timestamppb.Timestamp + GetDiscoveryGroup() string +}](s *Server, oldInstances map[string]Instance, freshInstances map[string]Instance) { + issueExpiration := s.clock.Now().Add(-2 * s.PollInterval) + + for instanceKey, instance := range oldInstances { // Each DiscoveryService works on all the DiscoveryConfigs assigned to a given DiscoveryGroup. // So, it's safe to say that current DiscoveryService has the last state for a given DiscoveryGroup. - // If other databases exist for this DiscoveryGroup, they can be discarded because, as said before, the current DiscoveryService has the last state for a given DiscoveryGroup. - if existingDatabase.DiscoveryGroup == s.DiscoveryGroup { + // If other VMs exist for this DiscoveryGroup, they can be discarded because, as said before, the current DiscoveryService has the last state for a given DiscoveryGroup. + if instance.GetDiscoveryGroup() == s.DiscoveryGroup { continue } - // For existing clusters whose sync time is too far in the past, just drop them. - // This ensures that if a cluster is removed from AWS, it will eventually disappear from the User Tasks' cluster list. - // It might also be the case that the DiscoveryConfig was changed and the cluster is no longer matched (because of labels/regions or other matchers). - clusterIssueExpiration := s.clock.Now().Add(-2 * s.PollInterval) - if existingDatabase.SyncTime.AsTime().Before(clusterIssueExpiration) { + // For existing instances whose sync time is too far in the past, just drop them. + // This ensures that if a resource is removed from its hosting platform, it will eventually be removed from the relevant User Tasks' list. + // This also covers the case where DiscoveryConfig change moves a particular resource out of scope (because of labels/regions or other matchers). + if instance.GetSyncTime().AsTime().Before(issueExpiration) { continue } - // Merge existing cluster state into in-memory object. - failedDatabases.Databases[existingDatabaseName] = existingDatabase + // Merge existing cluster state into in-memory object, but only if we don't have a fresh key. + if _, found := freshInstances[instanceKey]; !found { + freshInstances[instanceKey] = instance + } } - return failedDatabases } diff --git a/lib/srv/discovery/status_test.go b/lib/srv/discovery/status_test.go index d97baf3e9639b..14671a19dc175 100644 --- a/lib/srv/discovery/status_test.go +++ b/lib/srv/discovery/status_test.go @@ -19,10 +19,14 @@ package discovery import ( + "maps" "strings" "testing" + "time" + "github.com/jonboulle/clockwork" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/types/known/timestamppb" "github.com/gravitational/teleport/api/types/discoveryconfig" ) @@ -55,3 +59,107 @@ func TestTruncateErrorMessage(t *testing.T) { }) } } + +type mockInstance struct { + syncTime *timestamppb.Timestamp + discoveryGroup string +} + +func (m *mockInstance) GetSyncTime() *timestamppb.Timestamp { + return m.syncTime +} + +func (m *mockInstance) GetDiscoveryGroup() string { + return m.discoveryGroup +} + +func TestMergeExistingInstances(t *testing.T) { + clock := clockwork.NewFakeClock() + pollInterval := 10 * time.Minute + s := &Server{ + Config: &Config{ + clock: clock, + PollInterval: pollInterval, + DiscoveryGroup: "group-1", + }, + } + + now := clock.Now() + tooOld := now.Add(-3 * pollInterval) + recent := now.Add(-pollInterval) + + tests := []struct { + name string + oldInstances map[string]*mockInstance + freshInstances map[string]*mockInstance + expected map[string]*mockInstance + }{ + { + name: "skip instances from the same discovery group", + oldInstances: map[string]*mockInstance{ + "inst-1": { + syncTime: timestamppb.New(recent), + discoveryGroup: "group-1", + }, + }, + freshInstances: map[string]*mockInstance{}, + expected: map[string]*mockInstance{}, + }, + { + name: "skip expired instances", + oldInstances: map[string]*mockInstance{ + "inst-2": { + syncTime: timestamppb.New(tooOld), + discoveryGroup: "group-2", + }, + }, + freshInstances: map[string]*mockInstance{}, + expected: map[string]*mockInstance{}, + }, + { + name: "merge missing instances", + oldInstances: map[string]*mockInstance{ + "inst-3": { + syncTime: timestamppb.New(recent), + discoveryGroup: "group-2", + }, + }, + freshInstances: map[string]*mockInstance{}, + expected: map[string]*mockInstance{ + "inst-3": { + syncTime: timestamppb.New(recent), + discoveryGroup: "group-2", + }, + }, + }, + { + name: "do not overwrite fresh instances", + oldInstances: map[string]*mockInstance{ + "inst-4": { + syncTime: timestamppb.New(recent), + discoveryGroup: "group-2", + }, + }, + freshInstances: map[string]*mockInstance{ + "inst-4": { + syncTime: timestamppb.New(now), + discoveryGroup: "group-1", + }, + }, + expected: map[string]*mockInstance{ + "inst-4": { + syncTime: timestamppb.New(now), + discoveryGroup: "group-1", + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + workingCopy := maps.Clone(tt.freshInstances) + mergeExistingInstances(s, tt.oldInstances, workingCopy) + require.Equal(t, tt.expected, workingCopy) + }) + } +}