diff --git a/aws/aws.go b/aws/aws.go index e10d7fd..1eae115 100644 --- a/aws/aws.go +++ b/aws/aws.go @@ -48,71 +48,86 @@ func AllASGInstanceIds(as []AutoScalingGroup) map[reapable.Region]map[reapable.I return inASG } -// returns ASGs as filterables +// AllAutoScalingGroups describes every AutoScalingGroup in the requested regions +// *AutoScalingGroups are created for every *autoscaling.AutoScalingGroup +// and are passed to a channel func AllAutoScalingGroups() chan *AutoScalingGroup { ch := make(chan *AutoScalingGroup) - - go func(regions []string) { - for _, region := range regions { + // waitgroup for all regions + wg := sync.WaitGroup{} + for _, region := range config.Regions { + go func(region string) { + // add region to waitgroup + wg.Add(1) api := autoscaling.New(&aws.Config{Region: region}) - - // TODO: nextToken paging - input := &autoscaling.DescribeAutoScalingGroupsInput{} - resp, err := api.DescribeAutoScalingGroups(input) + err := api.DescribeAutoScalingGroupsPages(&autoscaling.DescribeAutoScalingGroupsInput{}, func(resp *autoscaling.DescribeAutoScalingGroupsOutput, lastPage bool) bool { + for _, asg := range resp.AutoScalingGroups { + ch <- NewAutoScalingGroup(region, asg) + } + // if we are at the last page, we should not continue + // the return value of this func is "shouldContinue" + if lastPage { + // on the last page, finish this region + wg.Done() + } + return true + }) if err != nil { - // TODO: wee + // probably should do something here... log.Error(err.Error()) + wg.Done() } - - for _, a := range resp.AutoScalingGroups { - ch <- NewAutoScalingGroup(region, a) - } - } + }(region) + } + go func() { + // in a separate goroutine, wait for all regions to finish + // when they finish, close the chan + wg.Wait() close(ch) - }(config.Regions) + }() return ch } -// allInstances describes every instance in the requested regions -// instances of Instance are created for each *ec2.Instance -// returned as Filterables +// AllInstances describes every instance in the requested regions +// *Instances are created for each *ec2.Instance +// and are passed to a channel func AllInstances() chan *Instance { ch := make(chan *Instance) - - go func(regions []string) { - for _, region := range regions { + // waitgroup for all regions + wg := sync.WaitGroup{} + for _, region := range config.Regions { + go func(region string) { + // add region to waitgroup + wg.Add(1) api := ec2.New(&aws.Config{Region: region}) - - // repeat until we have everything - var nextToken *string - for done := false; done != true; { - input := &ec2.DescribeInstancesInput{ - NextToken: nextToken, - } - resp, err := api.DescribeInstances(input) - if err != nil { - // probably should do something here... - log.Error(err.Error()) - } - - for _, r := range resp.Reservations { - for _, instance := range r.Instances { + // DescribeInstancesPages does autopagination + err := api.DescribeInstancesPages(&ec2.DescribeInstancesInput{}, func(resp *ec2.DescribeInstancesOutput, lastPage bool) bool { + for _, res := range resp.Reservations { + for _, instance := range res.Instances { ch <- NewInstance(region, instance) } } - - if resp.NextToken != nil { - log.Debug("More results for DescribeInstances in %s", region) - nextToken = resp.NextToken - } else { - done = true + // if we are at the last page, we should not continue + // the return value of this func is "shouldContinue" + if lastPage { + wg.Done() } + return true + }) + if err != nil { + // probably should do something here... + log.Error(err.Error()) + wg.Done() } - } + }(region) + } + go func() { + // in a separate goroutine, wait for all regions to finish + // when they finish, close the chan + wg.Wait() close(ch) - }(config.Regions) - + }() return ch } diff --git a/reaper/reaper.go b/reaper/reaper.go index 02b2b20..828a83f 100644 --- a/reaper/reaper.go +++ b/reaper/reaper.go @@ -253,7 +253,6 @@ func getInstances() chan *reaperaws.Instance { instanceCh := reaperaws.AllInstances() regionSums := make(map[reapable.Region]int) instanceTypeSums := make(map[reapable.Region]map[string]int) - sum := 0 for instance := range instanceCh { // restore saved state from file savedstate, ok := savedstates[instance.Region][instance.ID] @@ -269,7 +268,6 @@ func getInstances() chan *reaperaws.Instance { instanceTypeSums[instance.Region][instance.InstanceType]++ regionSums[instance.Region]++ - sum++ ch <- instance } @@ -302,7 +300,6 @@ func getAutoScalingGroups() chan *reaperaws.AutoScalingGroup { asgCh := reaperaws.AllAutoScalingGroups() regionSums := make(map[reapable.Region]int) asgSizeSums := make(map[reapable.Region]map[int64]int) - sum := 0 for asg := range asgCh { // restore saved state from file savedstate, ok := savedstates[asg.Region][asg.ID] @@ -318,7 +315,6 @@ func getAutoScalingGroups() chan *reaperaws.AutoScalingGroup { asgSizeSums[asg.Region][asg.DesiredCapacity]++ regionSums[asg.Region]++ - sum++ ch <- asg } for _, e := range *events { @@ -333,7 +329,7 @@ func getAutoScalingGroups() chan *reaperaws.AutoScalingGroup { for region, regionSum := range regionSums { log.Info(fmt.Sprintf("Found %d total AutoScalingGroups in %s", regionSum, region)) - err := e.NewStatistic("reaper.asgs.total", float64(sum), []string{fmt.Sprintf("region:%s", region)}) + err := e.NewStatistic("reaper.asgs.total", float64(regionSum), []string{fmt.Sprintf("region:%s", region)}) if err != nil { log.Error(fmt.Sprintf("%s", err.Error())) }