Skip to content
This repository was archived by the owner on Jul 23, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 60 additions & 45 deletions aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,71 +48,86 @@ func AllASGInstanceIds(as []AutoScalingGroup) map[reapable.Region]map[reapable.I
return inASG
}

// returns ASGs as filterables
// AllAutoScalingGroups describes every AutoScalingGroup in the requested regions
// *AutoScalingGroups are created for every *autoscaling.AutoScalingGroup
// and are passed to a channel
func AllAutoScalingGroups() chan *AutoScalingGroup {
ch := make(chan *AutoScalingGroup)

go func(regions []string) {
for _, region := range regions {
// waitgroup for all regions
wg := sync.WaitGroup{}
for _, region := range config.Regions {
go func(region string) {
// add region to waitgroup
wg.Add(1)
api := autoscaling.New(&aws.Config{Region: region})

// TODO: nextToken paging
input := &autoscaling.DescribeAutoScalingGroupsInput{}
resp, err := api.DescribeAutoScalingGroups(input)
err := api.DescribeAutoScalingGroupsPages(&autoscaling.DescribeAutoScalingGroupsInput{}, func(resp *autoscaling.DescribeAutoScalingGroupsOutput, lastPage bool) bool {
for _, asg := range resp.AutoScalingGroups {
ch <- NewAutoScalingGroup(region, asg)
}
// if we are at the last page, we should not continue
// the return value of this func is "shouldContinue"
if lastPage {
// on the last page, finish this region
wg.Done()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the describe API call fails wg.Done() is never called and ch is never closed (likewise in AllInstances).

}
return true
})
if err != nil {
// TODO: wee
// probably should do something here...
log.Error(err.Error())
wg.Done()
}

for _, a := range resp.AutoScalingGroups {
ch <- NewAutoScalingGroup(region, a)
}
}
}(region)
}
go func() {
// in a separate goroutine, wait for all regions to finish
// when they finish, close the chan
wg.Wait()
close(ch)
}(config.Regions)

}()
return ch
}

// allInstances describes every instance in the requested regions
// instances of Instance are created for each *ec2.Instance
// returned as Filterables
// AllInstances describes every instance in the requested regions
// *Instances are created for each *ec2.Instance
// and are passed to a channel
func AllInstances() chan *Instance {
ch := make(chan *Instance)

go func(regions []string) {
for _, region := range regions {
// waitgroup for all regions
wg := sync.WaitGroup{}
for _, region := range config.Regions {
go func(region string) {
// add region to waitgroup
wg.Add(1)
api := ec2.New(&aws.Config{Region: region})

// repeat until we have everything
var nextToken *string
for done := false; done != true; {
input := &ec2.DescribeInstancesInput{
NextToken: nextToken,
}
resp, err := api.DescribeInstances(input)
if err != nil {
// probably should do something here...
log.Error(err.Error())
}

for _, r := range resp.Reservations {
for _, instance := range r.Instances {
// DescribeInstancesPages does autopagination
err := api.DescribeInstancesPages(&ec2.DescribeInstancesInput{}, func(resp *ec2.DescribeInstancesOutput, lastPage bool) bool {
for _, res := range resp.Reservations {
for _, instance := range res.Instances {
ch <- NewInstance(region, instance)
}
}

if resp.NextToken != nil {
log.Debug("More results for DescribeInstances in %s", region)
nextToken = resp.NextToken
} else {
done = true
// if we are at the last page, we should not continue
// the return value of this func is "shouldContinue"
if lastPage {
wg.Done()
}
return true
})
if err != nil {
// probably should do something here...
log.Error(err.Error())
wg.Done()
}
}
}(region)
}
go func() {
// in a separate goroutine, wait for all regions to finish
// when they finish, close the chan
wg.Wait()
close(ch)
}(config.Regions)

}()
return ch
}

Expand Down
6 changes: 1 addition & 5 deletions reaper/reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,6 @@ func getInstances() chan *reaperaws.Instance {
instanceCh := reaperaws.AllInstances()
regionSums := make(map[reapable.Region]int)
instanceTypeSums := make(map[reapable.Region]map[string]int)
sum := 0
for instance := range instanceCh {
// restore saved state from file
savedstate, ok := savedstates[instance.Region][instance.ID]
Expand All @@ -269,7 +268,6 @@ func getInstances() chan *reaperaws.Instance {
instanceTypeSums[instance.Region][instance.InstanceType]++

regionSums[instance.Region]++
sum++
ch <- instance
}

Expand Down Expand Up @@ -302,7 +300,6 @@ func getAutoScalingGroups() chan *reaperaws.AutoScalingGroup {
asgCh := reaperaws.AllAutoScalingGroups()
regionSums := make(map[reapable.Region]int)
asgSizeSums := make(map[reapable.Region]map[int64]int)
sum := 0
for asg := range asgCh {
// restore saved state from file
savedstate, ok := savedstates[asg.Region][asg.ID]
Expand All @@ -318,7 +315,6 @@ func getAutoScalingGroups() chan *reaperaws.AutoScalingGroup {
asgSizeSums[asg.Region][asg.DesiredCapacity]++

regionSums[asg.Region]++
sum++
ch <- asg
}
for _, e := range *events {
Expand All @@ -333,7 +329,7 @@ func getAutoScalingGroups() chan *reaperaws.AutoScalingGroup {

for region, regionSum := range regionSums {
log.Info(fmt.Sprintf("Found %d total AutoScalingGroups in %s", regionSum, region))
err := e.NewStatistic("reaper.asgs.total", float64(sum), []string{fmt.Sprintf("region:%s", region)})
err := e.NewStatistic("reaper.asgs.total", float64(regionSum), []string{fmt.Sprintf("region:%s", region)})
if err != nil {
log.Error(fmt.Sprintf("%s", err.Error()))
}
Expand Down