Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 20 additions & 64 deletions lib/srv/discovery/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,7 @@ func (s *Server) mergeUpsertDiscoverEC2Task(taskGroup awsEC2TaskKey, failedInsta
case err != nil:
return trace.Wrap(err)
default:
failedInstances = s.discoverEC2UserTaskAddExistingInstances(currentUserTask, failedInstances)
mergeExistingInstances(s, currentUserTask.Spec.DiscoverEc2.Instances, failedInstances.Instances)
}

// If the DiscoveryService is stopped, or the issue does not happen again
Expand Down Expand Up @@ -697,30 +697,6 @@ func (s *Server) mergeUpsertDiscoverEC2Task(taskGroup awsEC2TaskKey, failedInsta
return nil
}

// discoverEC2UserTaskAddExistingInstances takes the UserTask stored in the cluster and merges it into the existing map of failed instances.
func (s *Server) discoverEC2UserTaskAddExistingInstances(currentUserTask *usertasksv1.UserTask, failedInstances *usertasksv1.DiscoverEC2) *usertasksv1.DiscoverEC2 {
for existingInstanceID, existingInstance := range currentUserTask.Spec.DiscoverEc2.Instances {
// Each DiscoveryService works on all the DiscoveryConfigs assigned to a given DiscoveryGroup.
// So, it's safe to say that current DiscoveryService has the last state for a given DiscoveryGroup.
// If other instances exist for this DiscoveryGroup, they can be discarded because, as said before, the current DiscoveryService has the last state for a given DiscoveryGroup.
if existingInstance.DiscoveryGroup == s.DiscoveryGroup {
continue
}

// For existing instances whose sync time is too far in the past, just drop them.
// This ensures that if an instance is removed from AWS, it will eventually disappear from the User Tasks' instance list.
// It might also be the case that the DiscoveryConfig was changed and the instance is no longer matched (because of labels/regions or other matchers).
instanceIssueExpiration := s.clock.Now().Add(-2 * s.PollInterval)
if existingInstance.SyncTime.AsTime().Before(instanceIssueExpiration) {
continue
}

// Merge existing cluster state into in-memory object.
failedInstances.Instances[existingInstanceID] = existingInstance
}
return failedInstances
}

func (s *Server) upsertTasksForAWSEC2FailedEnrollments() {
s.awsEC2Tasks.mu.Lock()
defer s.awsEC2Tasks.mu.Unlock()
Expand Down Expand Up @@ -789,7 +765,7 @@ func (s *Server) mergeUpsertDiscoverEKSTask(taskGroup awsEKSTaskKey, failedClust
case err != nil:
return trace.Wrap(err)
default:
failedClusters = s.discoverEKSUserTaskAddExistingClusters(currentUserTask, failedClusters)
mergeExistingInstances(s, currentUserTask.Spec.DiscoverEks.Clusters, failedClusters.Clusters)
}

// If the DiscoveryService is stopped, or the issue does not happen again
Expand Down Expand Up @@ -817,30 +793,6 @@ func (s *Server) mergeUpsertDiscoverEKSTask(taskGroup awsEKSTaskKey, failedClust
return nil
}

// discoverEKSUserTaskAddExistingClusters takes the UserTask stored in the cluster and merges it into the existing map of failed clusters.
func (s *Server) discoverEKSUserTaskAddExistingClusters(currentUserTask *usertasksv1.UserTask, failedClusters *usertasksv1.DiscoverEKS) *usertasksv1.DiscoverEKS {
for existingClusterName, existingCluster := range currentUserTask.Spec.DiscoverEks.Clusters {
// Each DiscoveryService works on all the DiscoveryConfigs assigned to a given DiscoveryGroup.
// So, it's safe to say that current DiscoveryService has the last state for a given DiscoveryGroup.
// If other clusters exist for this DiscoveryGroup, they can be discarded because, as said before, the current DiscoveryService has the last state for a given DiscoveryGroup.
if existingCluster.DiscoveryGroup == s.DiscoveryGroup {
continue
}

// For existing clusters whose sync time is too far in the past, just drop them.
// This ensures that if a cluster is removed from AWS, it will eventually disappear from the User Tasks' cluster list.
// It might also be the case that the DiscoveryConfig was changed and the cluster is no longer matched (because of labels/regions or other matchers).
clusterIssueExpiration := s.clock.Now().Add(-2 * s.PollInterval)
if existingCluster.SyncTime.AsTime().Before(clusterIssueExpiration) {
continue
}

// Merge existing cluster state into in-memory object.
failedClusters.Clusters[existingClusterName] = existingCluster
}
return failedClusters
}

func (s *Server) upsertTasksForAWSRDSFailedEnrollments() {
s.awsRDSTasks.mu.Lock()
defer s.awsRDSTasks.mu.Unlock()
Expand Down Expand Up @@ -889,7 +841,7 @@ func (s *Server) mergeUpsertDiscoverRDSTask(taskGroup awsRDSTaskKey, failedDatab
case err != nil:
return trace.Wrap(err)
default:
failedDatabases = s.discoverRDSUserTaskAddExistingDatabases(currentUserTask, failedDatabases)
mergeExistingInstances(s, currentUserTask.Spec.DiscoverRds.Databases, failedDatabases.Databases)
}

// If the DiscoveryService is stopped, or the issue does not happen again
Expand Down Expand Up @@ -917,26 +869,30 @@ func (s *Server) mergeUpsertDiscoverRDSTask(taskGroup awsRDSTaskKey, failedDatab
return nil
}

// discoverRDSUserTaskAddExistingDatabases takes the UserTask stored in the cluster and merges it into the existing map of failed databases.
func (s *Server) discoverRDSUserTaskAddExistingDatabases(currentUserTask *usertasksv1.UserTask, failedDatabases *usertasksv1.DiscoverRDS) *usertasksv1.DiscoverRDS {
for existingDatabaseName, existingDatabase := range currentUserTask.Spec.DiscoverRds.Databases {
func mergeExistingInstances[Instance interface {
GetSyncTime() *timestamppb.Timestamp
GetDiscoveryGroup() string
}](s *Server, oldInstances map[string]Instance, freshInstances map[string]Instance) {
issueExpiration := s.clock.Now().Add(-2 * s.PollInterval)

for instanceKey, instance := range oldInstances {
// Each DiscoveryService works on all the DiscoveryConfigs assigned to a given DiscoveryGroup.
// So, it's safe to say that current DiscoveryService has the last state for a given DiscoveryGroup.
// If other databases exist for this DiscoveryGroup, they can be discarded because, as said before, the current DiscoveryService has the last state for a given DiscoveryGroup.
if existingDatabase.DiscoveryGroup == s.DiscoveryGroup {
// If other VMs exist for this DiscoveryGroup, they can be discarded because, as said before, the current DiscoveryService has the last state for a given DiscoveryGroup.
if instance.GetDiscoveryGroup() == s.DiscoveryGroup {
continue
}

// For existing clusters whose sync time is too far in the past, just drop them.
// This ensures that if a cluster is removed from AWS, it will eventually disappear from the User Tasks' cluster list.
// It might also be the case that the DiscoveryConfig was changed and the cluster is no longer matched (because of labels/regions or other matchers).
clusterIssueExpiration := s.clock.Now().Add(-2 * s.PollInterval)
if existingDatabase.SyncTime.AsTime().Before(clusterIssueExpiration) {
// For existing instances whose sync time is too far in the past, just drop them.
// This ensures that if a resource is removed from its hosting platform, it will eventually be removed from the relevant User Tasks' list.
// This also covers the case where DiscoveryConfig change moves a particular resource out of scope (because of labels/regions or other matchers).
if instance.GetSyncTime().AsTime().Before(issueExpiration) {
continue
}

// Merge existing cluster state into in-memory object.
failedDatabases.Databases[existingDatabaseName] = existingDatabase
// Merge existing cluster state into in-memory object, but only if we don't have a fresh key.
if _, found := freshInstances[instanceKey]; !found {
freshInstances[instanceKey] = instance
}
}
return failedDatabases
}
108 changes: 108 additions & 0 deletions lib/srv/discovery/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@
package discovery

import (
"maps"
"strings"
"testing"
"time"

"github.com/jonboulle/clockwork"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/gravitational/teleport/api/types/discoveryconfig"
)
Expand Down Expand Up @@ -55,3 +59,107 @@ func TestTruncateErrorMessage(t *testing.T) {
})
}
}

type mockInstance struct {
syncTime *timestamppb.Timestamp
discoveryGroup string
}

func (m *mockInstance) GetSyncTime() *timestamppb.Timestamp {
return m.syncTime
}

func (m *mockInstance) GetDiscoveryGroup() string {
return m.discoveryGroup
}

func TestMergeExistingInstances(t *testing.T) {
clock := clockwork.NewFakeClock()
pollInterval := 10 * time.Minute
s := &Server{
Config: &Config{
clock: clock,
PollInterval: pollInterval,
DiscoveryGroup: "group-1",
},
}

now := clock.Now()
tooOld := now.Add(-3 * pollInterval)
recent := now.Add(-pollInterval)

tests := []struct {
name string
oldInstances map[string]*mockInstance
freshInstances map[string]*mockInstance
expected map[string]*mockInstance
}{
{
name: "skip instances from the same discovery group",
oldInstances: map[string]*mockInstance{
"inst-1": {
syncTime: timestamppb.New(recent),
discoveryGroup: "group-1",
},
},
freshInstances: map[string]*mockInstance{},
expected: map[string]*mockInstance{},
},
{
name: "skip expired instances",
oldInstances: map[string]*mockInstance{
"inst-2": {
syncTime: timestamppb.New(tooOld),
discoveryGroup: "group-2",
},
},
freshInstances: map[string]*mockInstance{},
expected: map[string]*mockInstance{},
},
{
name: "merge missing instances",
oldInstances: map[string]*mockInstance{
"inst-3": {
syncTime: timestamppb.New(recent),
discoveryGroup: "group-2",
},
},
freshInstances: map[string]*mockInstance{},
expected: map[string]*mockInstance{
"inst-3": {
syncTime: timestamppb.New(recent),
discoveryGroup: "group-2",
},
},
},
{
name: "do not overwrite fresh instances",
oldInstances: map[string]*mockInstance{
"inst-4": {
syncTime: timestamppb.New(recent),
discoveryGroup: "group-2",
},
},
freshInstances: map[string]*mockInstance{
"inst-4": {
syncTime: timestamppb.New(now),
discoveryGroup: "group-1",
},
},
expected: map[string]*mockInstance{
"inst-4": {
syncTime: timestamppb.New(now),
discoveryGroup: "group-1",
},
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
workingCopy := maps.Clone(tt.freshInstances)
mergeExistingInstances(s, tt.oldInstances, workingCopy)
require.Equal(t, tt.expected, workingCopy)
})
}
}
Loading