Skip to content

Commit

Permalink
Adds support to store original desired count as an ASG tag.
Browse files Browse the repository at this point in the history
  • Loading branch information
outofcoffee committed Apr 9, 2020
1 parent bfb150d commit 9f7e129
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 45 deletions.
16 changes: 15 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,13 @@ These permissions are as follows:
Resource: "*"
```

These permissions can be set either via runninn ASG Roller on an AWS node that has the correct role, or via API keys to a user that has the correct roles/permissions.
If the `ROLLER_ORIGINAL_DESIRED_ON_TAG` tag option is enabled, the following permission is also required:

```
autoscaling:CreateOrUpdateTags
```

These permissions can be set either via running ASG Roller on an AWS node that has the correct role, or via API keys to a user that has the correct roles/permissions.

* If the AWS environment variables `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY`are set, it will use those
* If the AWS environment variables are not set, it will fall back to relying on the local node's IAM role
Expand Down Expand Up @@ -224,6 +230,7 @@ ASG Roller takes its configuration via environment variables. All environment va
* `ROLLER_DELETE_LOCAL_DATA`: If set to `false` (default), will not reclaim a node until there are no pods with [emptyDir](https://kubernetes.io/docs/concepts/storage/volumes/#emptydir) running on the node; if set to `true`, will continue to terminate the pod and delete the local data before reclaiming the node. The default is `false` to maintain backward compatibility.
* `ROLLER_CHECK_DELAY`: Time, in seconds, between checks of ASG status.
* `ROLLER_CAN_INCREASE_MAX`: If set to `true`, will increase the ASG maximum size to accommodate the increase in desired count. If set to `false`, will instead error when desired is higher than max.
* `ROLLER_ORIGINAL_DESIRED_ON_TAG`: If set to `true`, will store the original desired value of the ASG as a tag on the ASG, with the key `aws-asg-roller/OriginalDesired`. This helps maintain state in the situation where the process terminates.
* `ROLLER_VERBOSE`: If set to `true`, will increase verbosity of logs.
* `KUBECONFIG`: Path to kubernetes config file for authenticating to the kubernetes cluster. Required only if `ROLLER_KUBERNETES` is `true` and we are not operating in a kubernetes cluster.

Expand All @@ -250,6 +257,13 @@ Since AWS recommends launch templates over launch configurations going forward,

The only pre-requisite for building is [docker](https://docker.com). All builds take place inside a docker container. If you want, you _may_ build locally using locally installed go. It requires go version 1.12+.

If required, set the target OS/architecture, for example:

```sh
export BUILDOS=linux
export BUILDARCH=amd64
```

To build:

```sh
Expand Down
103 changes: 103 additions & 0 deletions original_desired.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package main

import (
"fmt"
"log"
"os"
"strconv"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/autoscaling"
"github.com/aws/aws-sdk-go/service/autoscaling/autoscalingiface"
)

const asgTagNameOriginalDesired = "aws-asg-roller/OriginalDesired"

var (
storeOriginalDesiredOnTag = os.Getenv("ROLLER_ORIGINAL_DESIRED_ON_TAG") == "true"
)

// Populates the original desired values for each ASG, based on the current 'desired' value if unkonwn.
// The original desired value is recorded as a tag on the respective ASG. Subsequent runs attempt to
// read the value of the tag to preserve state in the case of the process terminating.
func populateOriginalDesired(originalDesired map[string]int64, asgs []*autoscaling.Group, asgSvc autoscalingiface.AutoScalingAPI) error {
for _, asg := range asgs {
asgName := *asg.AutoScalingGroupName
if storeOriginalDesiredOnTag {
tagOriginalDesired, err := getOriginalDesiredTag(asgSvc, asgName)
if err != nil {
return err
}
if tagOriginalDesired >= 0 {
originalDesired[asgName] = tagOriginalDesired
continue
}
}
// guess based on the current value
originalDesired[asgName] = *asg.DesiredCapacity
if verbose {
log.Printf("guessed desired value of %d from current desired on ASG: %s", *asg.DesiredCapacity, asgName)
}
if storeOriginalDesiredOnTag {
err := setOriginalDesiredTag(asgSvc, asgName, asg)
if err != nil {
return err
}
}
}
return nil
}

// attempt to read the original desired value from the ASG tag
// returns
// the original desired value from the tag, if present, otherwise -1
// error
func getOriginalDesiredTag(asgSvc autoscalingiface.AutoScalingAPI, asgName string) (int64, error) {
tags, err := asgSvc.DescribeTags(&autoscaling.DescribeTagsInput{
Filters: []*autoscaling.Filter{
{
Name: aws.String("auto-scaling-group"),
Values: aws.StringSlice([]string{asgName}),
},
{
Name: aws.String("key"),
Values: aws.StringSlice([]string{asgTagNameOriginalDesired}),
},
},
})
if err != nil {
return -1, fmt.Errorf("unable to read tag '%s' for ASG %s: %v", asgTagNameOriginalDesired, asgName, err)
}
if len(tags.Tags) == 1 {
if tagOriginalDesired, err := strconv.ParseInt(aws.StringValue(tags.Tags[0].Value), 10, 64); err == nil {
if verbose {
log.Printf("read original desired of %d from tag on ASG: %s", tagOriginalDesired, asgName)
}
return tagOriginalDesired, nil
}
return -1, fmt.Errorf("unable to read tag '%s' for ASG %s: %v", asgTagNameOriginalDesired, asgName, err)
}
return -1, nil
}

// record original desired value on a tag, in case of process restart
func setOriginalDesiredTag(asgSvc autoscalingiface.AutoScalingAPI, asgName string, asg *autoscaling.Group) error {
_, err := asgSvc.CreateOrUpdateTags(&autoscaling.CreateOrUpdateTagsInput{
Tags: []*autoscaling.Tag{
{
Key: aws.String(asgTagNameOriginalDesired),
PropagateAtLaunch: aws.Bool(false),
ResourceId: aws.String(asgName),
ResourceType: aws.String("auto-scaling-group"),
Value: aws.String(strconv.FormatInt(*asg.DesiredCapacity, 10)),
},
},
})
if err != nil {
return fmt.Errorf("unable to set tag '%s' for ASG %s: %v", asgTagNameOriginalDesired, asgName, err)
}
if verbose {
log.Printf("recorded desired value of %d in tag on ASG: %s", *asg.DesiredCapacity, asgName)
}
return nil
}
81 changes: 40 additions & 41 deletions roller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ func adjust(asgList []string, ec2Svc ec2iface.EC2API, asgSvc autoscalingiface.Au
if err != nil {
return fmt.Errorf("Unexpected error describing ASGs, skipping: %v", err)
}

// look up and record original desired values
err = populateOriginalDesired(originalDesired, asgs, asgSvc)
if err != nil {
return fmt.Errorf("unexpected error looking up original desired values for ASGs, skipping: %v", err)
}

asgMap := map[string]*autoscaling.Group{}
// get information on all of the ec2 instances
instances := make([]*autoscaling.Instance, 0)
Expand All @@ -31,7 +38,7 @@ func adjust(asgList []string, ec2Svc ec2iface.EC2API, asgSvc autoscalingiface.Au
return fmt.Errorf("unable to group instances into new and old: %v", err)
}
// if there are no outdated instances skip updating
if len(oldInstances) == 0 {
if len(oldInstances) == 0 && *asg.DesiredCapacity == originalDesired[*asg.AutoScalingGroupName] {
log.Printf("[%s] ok\n", *asg.AutoScalingGroupName)
err := ensureNoScaleDownDisabledAnnotation(ec2Svc, mapInstancesIds(asg.Instances))
if err != nil {
Expand All @@ -45,7 +52,6 @@ func adjust(asgList []string, ec2Svc ec2iface.EC2API, asgSvc autoscalingiface.Au
asgMap[*asg.AutoScalingGroupName] = asg
instances = append(instances, oldInstances...)
instances = append(instances, newInstances...)

}
// no instances no work needed
if len(instances) == 0 {
Expand All @@ -54,42 +60,35 @@ func adjust(asgList []string, ec2Svc ec2iface.EC2API, asgSvc autoscalingiface.Au
ids := mapInstancesIds(instances)
hostnames, err := awsGetHostnames(ec2Svc, ids)
if err != nil {
return fmt.Errorf("Unable to get aws hostnames for ids %v: %v", ids, err)
return fmt.Errorf("unable to get aws hostnames for ids %v: %v", ids, err)
}
hostnameMap := map[string]string{}
for i, id := range ids {
hostnameMap[id] = hostnames[i]
}
newDesired := map[string]int64{}
newTerminate := map[string]string{}
newOriginalDesired := map[string]int64{}
errors := map[*string]error{}

// keep keyed references to the ASGs
for _, asg := range asgMap {
newDesiredA, newOriginalA, terminateID, err := calculateAdjustment(asg, ec2Svc, hostnameMap, readinessHandler, originalDesired[*asg.AutoScalingGroupName])
log.Printf("[%s] desired: %d original: %d", *asg.AutoScalingGroupName, newDesiredA, newOriginalA)
newDesiredA, terminateID, err := calculateAdjustment(asg, ec2Svc, hostnameMap, readinessHandler, originalDesired[*asg.AutoScalingGroupName])
log.Printf("[%s] desired: %d original: %d", *asg.AutoScalingGroupName, newDesiredA, originalDesired[*asg.AutoScalingGroupName])
if err != nil {
log.Printf("[%s] error: %v\n", *asg.AutoScalingGroupName, err)
log.Printf("[%s] error calculating adjustment - skipping: %v\n", *asg.AutoScalingGroupName, err)
continue
}
newDesired[*asg.AutoScalingGroupName] = newDesiredA
newOriginalDesired[*asg.AutoScalingGroupName] = newOriginalA
if terminateID != "" {
log.Printf("[%s] Scheduled termination: %s", *asg.AutoScalingGroupName, terminateID)
log.Printf("[%s] scheduled termination: %s", *asg.AutoScalingGroupName, terminateID)
newTerminate[*asg.AutoScalingGroupName] = terminateID
}
errors[asg.AutoScalingGroupName] = err
}
// adjust original desired
for asg, desired := range newOriginalDesired {
originalDesired[asg] = desired
}
// adjust current desired
for asg, desired := range newDesired {
log.Printf("[%s] set desired instances: %d\n", asg, desired)
err = setAsgDesired(asgSvc, asgMap[asg], desired)
if err != nil {
return fmt.Errorf("Error setting desired to %d for ASG %s: %v", desired, asg, err)
return fmt.Errorf("[%s] error setting desired to %d: %v", asg, desired, err)
}
}
// terminate nodes
Expand All @@ -98,7 +97,7 @@ func adjust(asgList []string, ec2Svc ec2iface.EC2API, asgSvc autoscalingiface.Au
// all new config instances are ready, terminate an old one
err = awsTerminateNode(asgSvc, id)
if err != nil {
return fmt.Errorf("Error terminating node %s in ASG %s: %v", id, asg, err)
return fmt.Errorf("[%s] error terminating node %s: %v", asg, id, err)
}
}
return nil
Expand All @@ -118,29 +117,30 @@ func ensureNoScaleDownDisabledAnnotation(ec2Svc ec2iface.EC2API, ids []string) e
// this makes no actual adjustment, only calculates what new settings should be
// returns:
// what the new desired number of instances should be
// what the new original desired should be, primarily if it should be reset
// ID of an instance to terminate, "" if none
// error
func calculateAdjustment(asg *autoscaling.Group, ec2Svc ec2iface.EC2API, hostnameMap map[string]string, readinessHandler readiness, originalDesired int64) (int64, int64, string, error) {
func calculateAdjustment(asg *autoscaling.Group, ec2Svc ec2iface.EC2API, hostnameMap map[string]string, readinessHandler readiness, originalDesired int64) (int64, string, error) {
desired := *asg.DesiredCapacity

// get instances with old launch config
oldInstances, newInstances, err := groupInstances(asg, ec2Svc)
if err != nil {
return originalDesired, 0, "", fmt.Errorf("unable to group instances into new and old: %v", err)
return originalDesired, "", fmt.Errorf("unable to group instances into new and old: %v", err)
}

// Possibilities:
// 1- we have some old ones, but have not started updates yet: set the desired, increment and loop
// 2- we have no old ones, but have started updates: we must be at end, so finish
// 3- we have some old ones, but have started updates: run the updates
if len(oldInstances) == 0 {
if originalDesired > 0 {
return originalDesired, 0, "", nil
if desired != originalDesired {
// we are done; return to desired to original value
return originalDesired, "", nil
}
}
if originalDesired == 0 {
return desired + 1, desired, "", nil
if originalDesired == desired {
// we have not started updates; raise the desired count
return originalDesired + 1, "", nil
}

// how we determine if we can terminate one
Expand All @@ -157,10 +157,9 @@ func calculateAdjustment(asg *autoscaling.Group, ec2Svc ec2iface.EC2API, hostnam
if *i.HealthStatus == healthy {
readyCount++
}

}
if int64(readyCount) < originalDesired+1 {
return desired, originalDesired, "", nil
return desired, "", nil
}
// are any of the updated config instances not ready?
unReadyCount := 0
Expand All @@ -171,7 +170,7 @@ func calculateAdjustment(asg *autoscaling.Group, ec2Svc ec2iface.EC2API, hostnam
}
}
if unReadyCount > 0 {
return desired, originalDesired, "", nil
return desired, "", nil
}
// do we have additional requirements for readiness?
if readinessHandler != nil {
Expand All @@ -191,11 +190,11 @@ func calculateAdjustment(asg *autoscaling.Group, ec2Svc ec2iface.EC2API, hostnam
}
unReadyCount, err = readinessHandler.getUnreadyCount(hostnames, ids)
if err != nil {
return desired, originalDesired, "", fmt.Errorf("Error getting readiness new node status: %v", err)
return desired, "", fmt.Errorf("Error getting readiness new node status: %v", err)
}
if unReadyCount > 0 {
log.Printf("[%s] Nodes not ready: %d", *asg.AutoScalingGroupName, unReadyCount)
return desired, originalDesired, "", nil
return desired, "", nil
}
}
candidate := *oldInstances[0].InstanceId
Expand All @@ -209,16 +208,16 @@ func calculateAdjustment(asg *autoscaling.Group, ec2Svc ec2iface.EC2API, hostnam
hostname = hostnameMap[candidate]
err = readinessHandler.prepareTermination([]string{hostname}, []string{candidate})
if err != nil {
return desired, originalDesired, "", fmt.Errorf("Unexpected error readiness handler terminating node %s: %v", hostname, err)
return desired, "", fmt.Errorf("Unexpected error readiness handler terminating node %s: %v", hostname, err)
}
}

// all new config instances are ready, terminate an old one
return desired, originalDesired, candidate, nil
return desired, candidate, nil
}

// groupInstances handles all of the logic for determining which nodes in the ASG have an old or outdated
// config, and which are up to date. It should to nothing else.
// config, and which are up to date. It should do nothing else.
// The entire rest of the code should rely on this for making the determination
func groupInstances(asg *autoscaling.Group, ec2Svc ec2iface.EC2API) ([]*autoscaling.Instance, []*autoscaling.Instance, error) {
oldInstances := make([]*autoscaling.Instance, 0)
Expand Down Expand Up @@ -246,11 +245,11 @@ func groupInstances(asg *autoscaling.Group, ec2Svc ec2iface.EC2API) ([]*autoscal
switch {
case targetLt.LaunchTemplateId != nil && *targetLt.LaunchTemplateId != "":
if targetTemplate, err = awsGetLaunchTemplateByID(ec2Svc, *targetLt.LaunchTemplateId); err != nil {
return nil, nil, fmt.Errorf("error retrieving information about launch template ID %s: %v", *targetLt.LaunchTemplateId, err)
return nil, nil, fmt.Errorf("[%s] error retrieving information about launch template ID %s: %v", *asg.AutoScalingGroupName, *targetLt.LaunchTemplateId, err)
}
case targetLt.LaunchTemplateName != nil && *targetLt.LaunchTemplateName != "":
if targetTemplate, err = awsGetLaunchTemplateByName(ec2Svc, *targetLt.LaunchTemplateName); err != nil {
return nil, nil, fmt.Errorf("error retrieving information about launch template name %s: %v", *targetLt.LaunchTemplateName, err)
return nil, nil, fmt.Errorf("[%s] error retrieving information about launch template name %s: %v", *asg.AutoScalingGroupName, *targetLt.LaunchTemplateName, err)
}
default:
return nil, nil, fmt.Errorf("AutoScaling Group %s had invalid Launch Template", *asg.AutoScalingGroupName)
Expand All @@ -267,31 +266,31 @@ func groupInstances(asg *autoscaling.Group, ec2Svc ec2iface.EC2API) ([]*autoscal
switch {
case i.LaunchTemplate == nil:
if verbose {
log.Printf("Adding %s to list of old instances because it does not have a launch template", *i.InstanceId)
log.Printf("[%s] adding %s to list of old instances because it does not have a launch template", *asg.AutoScalingGroupName, *i.InstanceId)
}
// has no launch template at all
oldInstances = append(oldInstances, i)
case aws.StringValue(i.LaunchTemplate.LaunchTemplateName) != aws.StringValue(targetLt.LaunchTemplateName):
// mismatched name
if verbose {
log.Printf("Adding %s to list of old instances because its name is %s and the target template's name is %s", *i.InstanceId, *i.LaunchTemplate.LaunchTemplateName, *targetLt.LaunchTemplateName)
log.Printf("[%s] adding %s to list of old instances because its name is %s and the target template's name is %s", *asg.AutoScalingGroupName, *i.InstanceId, *i.LaunchTemplate.LaunchTemplateName, *targetLt.LaunchTemplateName)
}
oldInstances = append(oldInstances, i)
case aws.StringValue(i.LaunchTemplate.LaunchTemplateId) != aws.StringValue(targetLt.LaunchTemplateId):
// mismatched ID
if verbose {
log.Printf("Adding %s to list of old instances because its template id is %s and the target template's id is %s", *i.InstanceId, *i.LaunchTemplate.LaunchTemplateId, *targetLt.LaunchTemplateId)
log.Printf("[%s] adding %s to list of old instances because its template id is %s and the target template's id is %s", *asg.AutoScalingGroupName, *i.InstanceId, *i.LaunchTemplate.LaunchTemplateId, *targetLt.LaunchTemplateId)
}
oldInstances = append(oldInstances, i)
// name and id match, just need to check versions
case !compareLaunchTemplateVersions(targetTemplate, targetLt, i.LaunchTemplate):
if verbose {
log.Printf("Adding %s to list of old instances because the launch template versions do not match (%s!=%s)", *i.InstanceId, *i.LaunchTemplate.Version, *targetLt.Version)
log.Printf("[%s] adding %s to list of old instances because the launch template versions do not match (%s!=%s)", *asg.AutoScalingGroupName, *i.InstanceId, *i.LaunchTemplate.Version, *targetLt.Version)
}
oldInstances = append(oldInstances, i)
default:
if verbose {
log.Printf("Adding %s to list of new instances because the instance matches the launch template with id %s", *i.InstanceId, *targetLt.LaunchTemplateId)
log.Printf("[%s] adding %s to list of new instances because the instance matches the launch template with id %s", *asg.AutoScalingGroupName, *i.InstanceId, *targetLt.LaunchTemplateId)
}
newInstances = append(newInstances, i)
}
Expand All @@ -303,13 +302,13 @@ func groupInstances(asg *autoscaling.Group, ec2Svc ec2iface.EC2API) ([]*autoscal
newInstances = append(newInstances, i)
} else {
if verbose {
log.Printf("Adding %s to list of old instances because the launch configuration names do not match (%s!=%s)", *i.InstanceId, *i.LaunchConfigurationName, *targetLc)
log.Printf("[%s] adding %s to list of old instances because the launch configuration names do not match (%s!=%s)", *asg.AutoScalingGroupName, *i.InstanceId, *i.LaunchConfigurationName, *targetLc)
}
oldInstances = append(oldInstances, i)
}
}
} else {
return nil, nil, fmt.Errorf("both target launch configuration and launch template are nil")
return nil, nil, fmt.Errorf("[%s] both target launch configuration and launch template are nil", *asg.AutoScalingGroupName)
}
return oldInstances, newInstances, nil
}
Expand Down
Loading

0 comments on commit 9f7e129

Please sign in to comment.