Skip to content

Commit

Permalink
Add the ability to control if the roller drains and if it uses force
Browse files Browse the repository at this point in the history
Introduces ROLLER_DRAIN and ROLLER_DRAIN_FORCE, both defaulting to true to keep existing behaviour for compatibility

Fixes #52
  • Loading branch information
helgi committed Jun 7, 2021
1 parent 56e4c7f commit d0d021f
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 25 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ ASG Roller takes its configuration via environment variables. All environment va

* `ROLLER_ASG` [`string`, required]: comma-separated list of auto-scaling groups that should be managed.
* `ROLLER_KUBERNETES` [`bool`, default: `true`]: If set to `true`, will check if a new node is ready via-a-vis Kubernetes before declaring it "ready", and will drain an old node before eliminating it. Defaults to `true` when running in Kubernetes as a pod, `false` otherwise.
* `ROLLER_DRAIN` [`bool`, default: `true`]: If set to `true`, will handle draining of pods and other kubernetes resources. Consider setting to false if your distribution has a built in drain on terminate.
* `ROLLER_DRAIN_FORCE` [`bool` default: `true`]: If drain will force delete kubernetes resources if they violate PDB or grace periods.
* `ROLLER_IGNORE_DAEMONSETS` [`bool`, default: `true`]: If set to `false`, will not reclaim a node until there are no DaemonSets running on the node; if set to `true` (default), will reclaim node when all regular pods are drained off, but will ignore the presence of DaemonSets, which should be present on every node anyways. Normally, you want this set to `true`.
* `ROLLER_DELETE_LOCAL_DATA` [`bool`, default: `false`]: If set to `false` (default), will not reclaim a node until there are no pods with [emptyDir](https://kubernetes.io/docs/concepts/storage/volumes/#emptydir) running on the node; if set to `true`, will continue to terminate the pod and delete the local data before reclaiming the node. The default is `false` to maintain backward compatibility.
* `ROLLER_INTERVAL` [`time.Duration`, default: `30s`]: Time between roller runs. Takes time duration such as 10s, 10m, 10d
Expand Down
4 changes: 3 additions & 1 deletion configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import "time"
type Configs struct {
Interval time.Duration `env:"ROLLER_INTERVAL" envDefault:"30s"`
CheckDelay int `env:"ROLLER_CHECK_DELAY" envDefault:"30"`
Drain bool `env:"ROLLER_DRAIN" envDefault:"true"`
DrainForce bool `env:"ROLLER_DRAIN_FORCE" envDefault:"true"`
IncreaseMax bool `env:"ROLLER_CAN_INCREASE_MAX" envDefault:"false"`
IgnoreDaemonSets bool `env:"ROLLER_IGNORE_DAEMONSETS" envDefault:"false"`
IgnoreDaemonSets bool `env:"ROLLER_IGNORE_DAEMONSETS" envDefault:"true"`
DeleteLocalData bool `env:"ROLLER_DELETE_LOCAL_DATA" envDefault:"false"`
OriginalDesiredOnTag bool `env:"ROLLER_ORIGINAL_DESIRED_ON_TAG" envDefault:"false"`
ASGS []string `env:"ROLLER_ASG,required" envSeparator:","`
Expand Down
14 changes: 10 additions & 4 deletions kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"os"
"path/filepath"

drain "github.com/openshift/kubernetes-drain"
drainer "github.com/openshift/kubernetes-drain"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -57,22 +57,28 @@ func (k *kubernetesReadiness) getUnreadyCount(hostnames []string, ids []string)
}
return unReadyCount, nil
}
func (k *kubernetesReadiness) prepareTermination(hostnames []string, ids []string) error {
func (k *kubernetesReadiness) prepareTermination(hostnames []string, ids []string, drain, drainForce bool) error {
// get the node reference - first need the hostname
var (
node *corev1.Node
err error
)

// Skip drain
if !drain {
return nil
}

for _, h := range hostnames {
node, err = k.clientset.CoreV1().Nodes().Get(h, v1.GetOptions{})
if err != nil {
return fmt.Errorf("Unexpected error getting kubernetes node %s: %v", h, err)
}
// set options and drain nodes
err = drain.Drain(k.clientset, []*corev1.Node{node}, &drain.DrainOptions{
err = drainer.Drain(k.clientset, []*corev1.Node{node}, &drainer.DrainOptions{
IgnoreDaemonsets: k.ignoreDaemonSets,
GracePeriodSeconds: -1,
Force: true,
Force: drainForce,
DeleteLocalData: k.deleteLocalData,
})
if err != nil {
Expand Down
6 changes: 5 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@ func main() {

// infinite loop
for {
err := adjust(configs.KubernetesEnabled, configs.ASGS, ec2Svc, asgSvc, readinessHandler, originalDesired, configs.OriginalDesiredOnTag, configs.IncreaseMax, configs.Verbose)
err := adjust(
configs.KubernetesEnabled, configs.ASGS, ec2Svc, asgSvc,
readinessHandler, originalDesired, configs.OriginalDesiredOnTag,
configs.IncreaseMax, configs.Verbose, configs.Drain, configs.DrainForce,
)
if err != nil {
log.Printf("Error adjusting AutoScaling Groups: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion readiness.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ package main

type readiness interface {
getUnreadyCount(hostnames []string, ids []string) (int, error)
prepareTermination(hostnames []string, ids []string) error
prepareTermination(hostnames []string, ids []string, drain, drainForce bool) error
}
8 changes: 4 additions & 4 deletions roller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const (
)

// adjust runs a single adjustment in the loop to update an ASG in a rolling fashion to latest launch config
func adjust(kubernetesEnabled bool, asgList []string, ec2Svc ec2iface.EC2API, asgSvc autoscalingiface.AutoScalingAPI, readinessHandler readiness, originalDesired map[string]int64, storeOriginalDesiredOnTag, canIncreaseMax, verbose bool) error {
func adjust(kubernetesEnabled bool, asgList []string, ec2Svc ec2iface.EC2API, asgSvc autoscalingiface.AutoScalingAPI, readinessHandler readiness, originalDesired map[string]int64, storeOriginalDesiredOnTag, canIncreaseMax, verbose, drain, drainForce bool) error {
// get information on all of the groups
asgs, err := awsDescribeGroups(asgSvc, asgList)
if err != nil {
Expand Down Expand Up @@ -71,7 +71,7 @@ func adjust(kubernetesEnabled bool, asgList []string, ec2Svc ec2iface.EC2API, as

// keep keyed references to the ASGs
for _, asg := range asgMap {
newDesiredA, terminateID, err := calculateAdjustment(kubernetesEnabled, asg, ec2Svc, hostnameMap, readinessHandler, originalDesired[*asg.AutoScalingGroupName], verbose)
newDesiredA, terminateID, err := calculateAdjustment(kubernetesEnabled, asg, ec2Svc, hostnameMap, readinessHandler, originalDesired[*asg.AutoScalingGroupName], verbose, drain, drainForce)
log.Printf("[%v] desired: %d original: %d", p2v(asg.AutoScalingGroupName), newDesiredA, originalDesired[*asg.AutoScalingGroupName])
if err != nil {
log.Printf("[%v] error calculating adjustment - skipping: %v\n", p2v(asg.AutoScalingGroupName), err)
Expand Down Expand Up @@ -121,7 +121,7 @@ func ensureNoScaleDownDisabledAnnotation(kubernetesEnabled bool, ec2Svc ec2iface
// what the new desired number of instances should be
// ID of an instance to terminate, "" if none
// error
func calculateAdjustment(kubernetesEnabled bool, asg *autoscaling.Group, ec2Svc ec2iface.EC2API, hostnameMap map[string]string, readinessHandler readiness, originalDesired int64, verbose bool) (int64, string, error) {
func calculateAdjustment(kubernetesEnabled bool, asg *autoscaling.Group, ec2Svc ec2iface.EC2API, hostnameMap map[string]string, readinessHandler readiness, originalDesired int64, verbose, drain, drainForce bool) (int64, string, error) {
desired := *asg.DesiredCapacity

// get instances with old launch config
Expand Down Expand Up @@ -209,7 +209,7 @@ func calculateAdjustment(kubernetesEnabled bool, asg *autoscaling.Group, ec2Svc
err error
)
hostname = hostnameMap[candidate]
err = readinessHandler.prepareTermination([]string{hostname}, []string{candidate})
err = readinessHandler.prepareTermination([]string{hostname}, []string{candidate}, drain, drainForce)
if err != nil {
return desired, "", fmt.Errorf("unexpected error readiness handler terminating node %s: %v", hostname, err)
}
Expand Down
44 changes: 30 additions & 14 deletions roller_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type testReadyHandler struct {
func (t *testReadyHandler) getUnreadyCount(hostnames []string, ids []string) (int, error) {
return t.unreadyCount, t.unreadyError
}
func (t *testReadyHandler) prepareTermination(hostnames []string, ids []string) error {
func (t *testReadyHandler) prepareTermination(hostnames []string, ids []string, drain, drainForce bool) error {
return t.terminateError
}

Expand Down Expand Up @@ -71,30 +71,32 @@ func TestCalculateAdjustment(t *testing.T) {
targetTerminate string
err error
verbose bool
drain bool
drainForce bool
}{
// 1 old, 2 new healthy, 0 new unhealthy, should terminate old
{[]string{"1"}, []string{"2", "3"}, []string{}, 3, 2, nil, 3, "1", nil, false},
{[]string{"1"}, []string{"2", "3"}, []string{}, 3, 2, nil, 3, "1", nil, false, true, true},
// 0 old, 2 new healthy, 0 new unhealthy, should indicate end of process
{[]string{}, []string{"2", "3"}, []string{}, 2, 2, nil, 2, "", nil, false},
{[]string{}, []string{"2", "3"}, []string{}, 2, 2, nil, 2, "", nil, false, true, true},
// 2 old, 0 new healthy, 0 new unhealthy, should indicate start of process
{[]string{"1", "2"}, []string{}, []string{}, 2, 2, nil, 3, "", nil, false},
{[]string{"1", "2"}, []string{}, []string{}, 2, 2, nil, 3, "", nil, false, true, true},
// 2 old, 0 new healthy, 0 new unhealthy, started, should not do anything until new healthy one
{[]string{"1", "2"}, []string{}, []string{}, 3, 2, nil, 3, "", nil, false},
{[]string{"1", "2"}, []string{}, []string{}, 3, 2, nil, 3, "", nil, false, true, true},
// 2 old, 1 new healthy, 0 new unhealthy, remove an old one
{[]string{"1", "2"}, []string{"3"}, []string{}, 3, 2, nil, 3, "1", nil, false},
{[]string{"1", "2"}, []string{"3"}, []string{}, 3, 2, nil, 3, "1", nil, false, true, true},
// 2 old, 0 new healthy, 1 new unhealthy, started, should not do anything until new one is healthy
{[]string{"1", "2"}, []string{}, []string{"3"}, 3, 2, nil, 3, "", nil, false},
{[]string{"1", "2"}, []string{}, []string{"3"}, 3, 2, nil, 3, "", nil, false, true, true},

// 2 old, 1 new healthy, 0 new unhealthy, 1 new unready, should not change anything
{[]string{"1", "2"}, []string{"3"}, []string{}, 3, 2, unreadyCountHandler, 3, "", nil, false},
{[]string{"1", "2"}, []string{"3"}, []string{}, 3, 2, unreadyCountHandler, 3, "", nil, false, true, true},
// 2 old, 1 new healthy, 0 new unhealthy, 0 new unready, 1 error: should not change anything
{[]string{"1", "2"}, []string{"3"}, []string{}, 3, 2, unreadyErrorHandler, 3, "", fmt.Errorf("error"), false},
{[]string{"1", "2"}, []string{"3"}, []string{}, 3, 2, unreadyErrorHandler, 3, "", fmt.Errorf("error"), false, true, true},
// 2 old, 1 new healthy, 0 new unhealthy, 0 unready, remove an old one
{[]string{"1", "2"}, []string{"3"}, []string{}, 3, 2, readyHandler, 3, "1", nil, false},
{[]string{"1", "2"}, []string{"3"}, []string{}, 3, 2, readyHandler, 3, "1", nil, false, true, true},
// 2 old, 1 new healthy, 0 new unhealthy, 0 new unready, 1 error: should not change anything
{[]string{"1", "2"}, []string{"3"}, []string{}, 3, 2, terminateErrorHandler, 3, "", fmt.Errorf("unexpected error"), false},
{[]string{"1", "2"}, []string{"3"}, []string{}, 3, 2, terminateErrorHandler, 3, "", fmt.Errorf("unexpected error"), false, true, true},
// 2 old, 1 new healthy, 0 new unhealthy, 0 unready, successful terminate: remove an old one
{[]string{"1", "2"}, []string{"3"}, []string{}, 3, 2, terminateHandler, 3, "1", nil, false},
{[]string{"1", "2"}, []string{"3"}, []string{}, 3, 2, terminateHandler, 3, "1", nil, false, true, true},
}
hostnameMap := map[string]string{}
for i := 0; i < 20; i++ {
Expand Down Expand Up @@ -142,7 +144,7 @@ func TestCalculateAdjustment(t *testing.T) {
ec2Svc := &mockEc2Svc{
autodescribe: true,
}
desired, terminate, err := calculateAdjustment(kubernetesEnabled, asg, ec2Svc, hostnameMap, tt.readiness, tt.originalDesired, tt.verbose)
desired, terminate, err := calculateAdjustment(kubernetesEnabled, asg, ec2Svc, hostnameMap, tt.readiness, tt.originalDesired, tt.verbose, tt.drain, tt.drainForce)
switch {
case (err == nil && tt.err != nil) || (err != nil && tt.err == nil) || (err != nil && tt.err != nil && !strings.HasPrefix(err.Error(), tt.err.Error())):
t.Errorf("%d: mismatched errors, actual then expected", i)
Expand Down Expand Up @@ -172,6 +174,8 @@ func TestAdjust(t *testing.T) {
canIncreaseMax bool
persistOriginalDesiredOnTag bool
verbose bool
drain bool
drainForce bool
}{
{
"2 asgs adjust first run",
Expand All @@ -194,6 +198,8 @@ func TestAdjust(t *testing.T) {
false,
false,
false,
true,
true,
},
{
"2 asgs adjust in progress",
Expand All @@ -216,6 +222,8 @@ func TestAdjust(t *testing.T) {
false,
false,
false,
true,
true,
},
{
"2 asgs adjust in progress with ROLLER_ORIGINAL_DESIRED_ON_TAG set to true",
Expand All @@ -238,6 +246,8 @@ func TestAdjust(t *testing.T) {
false,
true,
false,
true,
true,
},
{
"2 asgs adjust complete",
Expand All @@ -260,6 +270,8 @@ func TestAdjust(t *testing.T) {
false,
false,
false,
true,
true,
},
{
"2 asgs adjust increase max fail",
Expand All @@ -282,6 +294,8 @@ func TestAdjust(t *testing.T) {
false,
false,
false,
true,
true,
},
{
"2 asgs adjust increase max succeed",
Expand All @@ -304,6 +318,8 @@ func TestAdjust(t *testing.T) {
true,
false,
false,
true,
true,
},
}

Expand Down Expand Up @@ -375,7 +391,7 @@ func TestAdjust(t *testing.T) {
ks := k
newDesiredPtr[&ks] = v
}
err := adjust(kubernetesEnabled, tt.asgs, ec2Svc, asgSvc, tt.handler, tt.originalDesired, tt.persistOriginalDesiredOnTag, tt.canIncreaseMax, tt.verbose)
err := adjust(kubernetesEnabled, tt.asgs, ec2Svc, asgSvc, tt.handler, tt.originalDesired, tt.persistOriginalDesiredOnTag, tt.canIncreaseMax, tt.verbose, tt.drain, tt.drainForce)
// what were our last calls to each?
switch {
case (err == nil && tt.err != nil) || (err != nil && tt.err == nil) || (err != nil && tt.err != nil && !strings.HasPrefix(err.Error(), tt.err.Error())):
Expand Down

0 comments on commit d0d021f

Please sign in to comment.