From d0d021ffd5bc5ae79301d8e59456ba34e0f7b207 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Helgi=20=C3=9Eormar=20=C3=9Eorbj=C3=B6rnsson?= Date: Thu, 6 May 2021 16:04:38 -0700 Subject: [PATCH] Add the ability to control if the roller drains and if it uses force Introduces ROLLER_DRAIN and ROLLER_DRAIN_FORCE, both defaulting to true to keep existing behaviour for compatibility Fixes #52 --- README.md | 2 ++ configs.go | 4 +++- kubernetes.go | 14 +++++++++---- main.go | 6 +++++- readiness.go | 2 +- roller.go | 8 ++++---- roller_internal_test.go | 44 ++++++++++++++++++++++++++++------------- 7 files changed, 55 insertions(+), 25 deletions(-) diff --git a/README.md b/README.md index 3760104..1dfb09b 100644 --- a/README.md +++ b/README.md @@ -228,6 +228,8 @@ ASG Roller takes its configuration via environment variables. All environment va * `ROLLER_ASG` [`string`, required]: comma-separated list of auto-scaling groups that should be managed. * `ROLLER_KUBERNETES` [`bool`, default: `true`]: If set to `true`, will check if a new node is ready via-a-vis Kubernetes before declaring it "ready", and will drain an old node before eliminating it. Defaults to `true` when running in Kubernetes as a pod, `false` otherwise. +* `ROLLER_DRAIN` [`bool`, default: `true`]: If set to `true`, will handle draining of pods and other kubernetes resources. Consider setting to false if your distribution has a built in drain on terminate. +* `ROLLER_DRAIN_FORCE` [`bool` default: `true`]: If drain will force delete kubernetes resources if they violate PDB or grace periods. * `ROLLER_IGNORE_DAEMONSETS` [`bool`, default: `true`]: If set to `false`, will not reclaim a node until there are no DaemonSets running on the node; if set to `true` (default), will reclaim node when all regular pods are drained off, but will ignore the presence of DaemonSets, which should be present on every node anyways. Normally, you want this set to `true`. * `ROLLER_DELETE_LOCAL_DATA` [`bool`, default: `false`]: If set to `false` (default), will not reclaim a node until there are no pods with [emptyDir](https://kubernetes.io/docs/concepts/storage/volumes/#emptydir) running on the node; if set to `true`, will continue to terminate the pod and delete the local data before reclaiming the node. The default is `false` to maintain backward compatibility. * `ROLLER_INTERVAL` [`time.Duration`, default: `30s`]: Time between roller runs. Takes time duration such as 10s, 10m, 10d diff --git a/configs.go b/configs.go index 2401489..8d7ce11 100644 --- a/configs.go +++ b/configs.go @@ -6,8 +6,10 @@ import "time" type Configs struct { Interval time.Duration `env:"ROLLER_INTERVAL" envDefault:"30s"` CheckDelay int `env:"ROLLER_CHECK_DELAY" envDefault:"30"` + Drain bool `env:"ROLLER_DRAIN" envDefault:"true"` + DrainForce bool `env:"ROLLER_DRAIN_FORCE" envDefault:"true"` IncreaseMax bool `env:"ROLLER_CAN_INCREASE_MAX" envDefault:"false"` - IgnoreDaemonSets bool `env:"ROLLER_IGNORE_DAEMONSETS" envDefault:"false"` + IgnoreDaemonSets bool `env:"ROLLER_IGNORE_DAEMONSETS" envDefault:"true"` DeleteLocalData bool `env:"ROLLER_DELETE_LOCAL_DATA" envDefault:"false"` OriginalDesiredOnTag bool `env:"ROLLER_ORIGINAL_DESIRED_ON_TAG" envDefault:"false"` ASGS []string `env:"ROLLER_ASG,required" envSeparator:","` diff --git a/kubernetes.go b/kubernetes.go index cd8465e..fc7b458 100644 --- a/kubernetes.go +++ b/kubernetes.go @@ -6,7 +6,7 @@ import ( "os" "path/filepath" - drain "github.com/openshift/kubernetes-drain" + drainer "github.com/openshift/kubernetes-drain" corev1 "k8s.io/api/core/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" @@ -57,22 +57,28 @@ func (k *kubernetesReadiness) getUnreadyCount(hostnames []string, ids []string) } return unReadyCount, nil } -func (k *kubernetesReadiness) prepareTermination(hostnames []string, ids []string) error { +func (k *kubernetesReadiness) prepareTermination(hostnames []string, ids []string, drain, drainForce bool) error { // get the node reference - first need the hostname var ( node *corev1.Node err error ) + + // Skip drain + if !drain { + return nil + } + for _, h := range hostnames { node, err = k.clientset.CoreV1().Nodes().Get(h, v1.GetOptions{}) if err != nil { return fmt.Errorf("Unexpected error getting kubernetes node %s: %v", h, err) } // set options and drain nodes - err = drain.Drain(k.clientset, []*corev1.Node{node}, &drain.DrainOptions{ + err = drainer.Drain(k.clientset, []*corev1.Node{node}, &drainer.DrainOptions{ IgnoreDaemonsets: k.ignoreDaemonSets, GracePeriodSeconds: -1, - Force: true, + Force: drainForce, DeleteLocalData: k.deleteLocalData, }) if err != nil { diff --git a/main.go b/main.go index 95fc2e1..1047b6c 100644 --- a/main.go +++ b/main.go @@ -29,7 +29,11 @@ func main() { // infinite loop for { - err := adjust(configs.KubernetesEnabled, configs.ASGS, ec2Svc, asgSvc, readinessHandler, originalDesired, configs.OriginalDesiredOnTag, configs.IncreaseMax, configs.Verbose) + err := adjust( + configs.KubernetesEnabled, configs.ASGS, ec2Svc, asgSvc, + readinessHandler, originalDesired, configs.OriginalDesiredOnTag, + configs.IncreaseMax, configs.Verbose, configs.Drain, configs.DrainForce, + ) if err != nil { log.Printf("Error adjusting AutoScaling Groups: %v", err) } diff --git a/readiness.go b/readiness.go index 4008465..de5eb48 100644 --- a/readiness.go +++ b/readiness.go @@ -2,5 +2,5 @@ package main type readiness interface { getUnreadyCount(hostnames []string, ids []string) (int, error) - prepareTermination(hostnames []string, ids []string) error + prepareTermination(hostnames []string, ids []string, drain, drainForce bool) error } diff --git a/roller.go b/roller.go index b5d79a5..986ff63 100644 --- a/roller.go +++ b/roller.go @@ -16,7 +16,7 @@ const ( ) // adjust runs a single adjustment in the loop to update an ASG in a rolling fashion to latest launch config -func adjust(kubernetesEnabled bool, asgList []string, ec2Svc ec2iface.EC2API, asgSvc autoscalingiface.AutoScalingAPI, readinessHandler readiness, originalDesired map[string]int64, storeOriginalDesiredOnTag, canIncreaseMax, verbose bool) error { +func adjust(kubernetesEnabled bool, asgList []string, ec2Svc ec2iface.EC2API, asgSvc autoscalingiface.AutoScalingAPI, readinessHandler readiness, originalDesired map[string]int64, storeOriginalDesiredOnTag, canIncreaseMax, verbose, drain, drainForce bool) error { // get information on all of the groups asgs, err := awsDescribeGroups(asgSvc, asgList) if err != nil { @@ -71,7 +71,7 @@ func adjust(kubernetesEnabled bool, asgList []string, ec2Svc ec2iface.EC2API, as // keep keyed references to the ASGs for _, asg := range asgMap { - newDesiredA, terminateID, err := calculateAdjustment(kubernetesEnabled, asg, ec2Svc, hostnameMap, readinessHandler, originalDesired[*asg.AutoScalingGroupName], verbose) + newDesiredA, terminateID, err := calculateAdjustment(kubernetesEnabled, asg, ec2Svc, hostnameMap, readinessHandler, originalDesired[*asg.AutoScalingGroupName], verbose, drain, drainForce) log.Printf("[%v] desired: %d original: %d", p2v(asg.AutoScalingGroupName), newDesiredA, originalDesired[*asg.AutoScalingGroupName]) if err != nil { log.Printf("[%v] error calculating adjustment - skipping: %v\n", p2v(asg.AutoScalingGroupName), err) @@ -121,7 +121,7 @@ func ensureNoScaleDownDisabledAnnotation(kubernetesEnabled bool, ec2Svc ec2iface // what the new desired number of instances should be // ID of an instance to terminate, "" if none // error -func calculateAdjustment(kubernetesEnabled bool, asg *autoscaling.Group, ec2Svc ec2iface.EC2API, hostnameMap map[string]string, readinessHandler readiness, originalDesired int64, verbose bool) (int64, string, error) { +func calculateAdjustment(kubernetesEnabled bool, asg *autoscaling.Group, ec2Svc ec2iface.EC2API, hostnameMap map[string]string, readinessHandler readiness, originalDesired int64, verbose, drain, drainForce bool) (int64, string, error) { desired := *asg.DesiredCapacity // get instances with old launch config @@ -209,7 +209,7 @@ func calculateAdjustment(kubernetesEnabled bool, asg *autoscaling.Group, ec2Svc err error ) hostname = hostnameMap[candidate] - err = readinessHandler.prepareTermination([]string{hostname}, []string{candidate}) + err = readinessHandler.prepareTermination([]string{hostname}, []string{candidate}, drain, drainForce) if err != nil { return desired, "", fmt.Errorf("unexpected error readiness handler terminating node %s: %v", hostname, err) } diff --git a/roller_internal_test.go b/roller_internal_test.go index 9b68418..c614c42 100644 --- a/roller_internal_test.go +++ b/roller_internal_test.go @@ -23,7 +23,7 @@ type testReadyHandler struct { func (t *testReadyHandler) getUnreadyCount(hostnames []string, ids []string) (int, error) { return t.unreadyCount, t.unreadyError } -func (t *testReadyHandler) prepareTermination(hostnames []string, ids []string) error { +func (t *testReadyHandler) prepareTermination(hostnames []string, ids []string, drain, drainForce bool) error { return t.terminateError } @@ -71,30 +71,32 @@ func TestCalculateAdjustment(t *testing.T) { targetTerminate string err error verbose bool + drain bool + drainForce bool }{ // 1 old, 2 new healthy, 0 new unhealthy, should terminate old - {[]string{"1"}, []string{"2", "3"}, []string{}, 3, 2, nil, 3, "1", nil, false}, + {[]string{"1"}, []string{"2", "3"}, []string{}, 3, 2, nil, 3, "1", nil, false, true, true}, // 0 old, 2 new healthy, 0 new unhealthy, should indicate end of process - {[]string{}, []string{"2", "3"}, []string{}, 2, 2, nil, 2, "", nil, false}, + {[]string{}, []string{"2", "3"}, []string{}, 2, 2, nil, 2, "", nil, false, true, true}, // 2 old, 0 new healthy, 0 new unhealthy, should indicate start of process - {[]string{"1", "2"}, []string{}, []string{}, 2, 2, nil, 3, "", nil, false}, + {[]string{"1", "2"}, []string{}, []string{}, 2, 2, nil, 3, "", nil, false, true, true}, // 2 old, 0 new healthy, 0 new unhealthy, started, should not do anything until new healthy one - {[]string{"1", "2"}, []string{}, []string{}, 3, 2, nil, 3, "", nil, false}, + {[]string{"1", "2"}, []string{}, []string{}, 3, 2, nil, 3, "", nil, false, true, true}, // 2 old, 1 new healthy, 0 new unhealthy, remove an old one - {[]string{"1", "2"}, []string{"3"}, []string{}, 3, 2, nil, 3, "1", nil, false}, + {[]string{"1", "2"}, []string{"3"}, []string{}, 3, 2, nil, 3, "1", nil, false, true, true}, // 2 old, 0 new healthy, 1 new unhealthy, started, should not do anything until new one is healthy - {[]string{"1", "2"}, []string{}, []string{"3"}, 3, 2, nil, 3, "", nil, false}, + {[]string{"1", "2"}, []string{}, []string{"3"}, 3, 2, nil, 3, "", nil, false, true, true}, // 2 old, 1 new healthy, 0 new unhealthy, 1 new unready, should not change anything - {[]string{"1", "2"}, []string{"3"}, []string{}, 3, 2, unreadyCountHandler, 3, "", nil, false}, + {[]string{"1", "2"}, []string{"3"}, []string{}, 3, 2, unreadyCountHandler, 3, "", nil, false, true, true}, // 2 old, 1 new healthy, 0 new unhealthy, 0 new unready, 1 error: should not change anything - {[]string{"1", "2"}, []string{"3"}, []string{}, 3, 2, unreadyErrorHandler, 3, "", fmt.Errorf("error"), false}, + {[]string{"1", "2"}, []string{"3"}, []string{}, 3, 2, unreadyErrorHandler, 3, "", fmt.Errorf("error"), false, true, true}, // 2 old, 1 new healthy, 0 new unhealthy, 0 unready, remove an old one - {[]string{"1", "2"}, []string{"3"}, []string{}, 3, 2, readyHandler, 3, "1", nil, false}, + {[]string{"1", "2"}, []string{"3"}, []string{}, 3, 2, readyHandler, 3, "1", nil, false, true, true}, // 2 old, 1 new healthy, 0 new unhealthy, 0 new unready, 1 error: should not change anything - {[]string{"1", "2"}, []string{"3"}, []string{}, 3, 2, terminateErrorHandler, 3, "", fmt.Errorf("unexpected error"), false}, + {[]string{"1", "2"}, []string{"3"}, []string{}, 3, 2, terminateErrorHandler, 3, "", fmt.Errorf("unexpected error"), false, true, true}, // 2 old, 1 new healthy, 0 new unhealthy, 0 unready, successful terminate: remove an old one - {[]string{"1", "2"}, []string{"3"}, []string{}, 3, 2, terminateHandler, 3, "1", nil, false}, + {[]string{"1", "2"}, []string{"3"}, []string{}, 3, 2, terminateHandler, 3, "1", nil, false, true, true}, } hostnameMap := map[string]string{} for i := 0; i < 20; i++ { @@ -142,7 +144,7 @@ func TestCalculateAdjustment(t *testing.T) { ec2Svc := &mockEc2Svc{ autodescribe: true, } - desired, terminate, err := calculateAdjustment(kubernetesEnabled, asg, ec2Svc, hostnameMap, tt.readiness, tt.originalDesired, tt.verbose) + desired, terminate, err := calculateAdjustment(kubernetesEnabled, asg, ec2Svc, hostnameMap, tt.readiness, tt.originalDesired, tt.verbose, tt.drain, tt.drainForce) switch { case (err == nil && tt.err != nil) || (err != nil && tt.err == nil) || (err != nil && tt.err != nil && !strings.HasPrefix(err.Error(), tt.err.Error())): t.Errorf("%d: mismatched errors, actual then expected", i) @@ -172,6 +174,8 @@ func TestAdjust(t *testing.T) { canIncreaseMax bool persistOriginalDesiredOnTag bool verbose bool + drain bool + drainForce bool }{ { "2 asgs adjust first run", @@ -194,6 +198,8 @@ func TestAdjust(t *testing.T) { false, false, false, + true, + true, }, { "2 asgs adjust in progress", @@ -216,6 +222,8 @@ func TestAdjust(t *testing.T) { false, false, false, + true, + true, }, { "2 asgs adjust in progress with ROLLER_ORIGINAL_DESIRED_ON_TAG set to true", @@ -238,6 +246,8 @@ func TestAdjust(t *testing.T) { false, true, false, + true, + true, }, { "2 asgs adjust complete", @@ -260,6 +270,8 @@ func TestAdjust(t *testing.T) { false, false, false, + true, + true, }, { "2 asgs adjust increase max fail", @@ -282,6 +294,8 @@ func TestAdjust(t *testing.T) { false, false, false, + true, + true, }, { "2 asgs adjust increase max succeed", @@ -304,6 +318,8 @@ func TestAdjust(t *testing.T) { true, false, false, + true, + true, }, } @@ -375,7 +391,7 @@ func TestAdjust(t *testing.T) { ks := k newDesiredPtr[&ks] = v } - err := adjust(kubernetesEnabled, tt.asgs, ec2Svc, asgSvc, tt.handler, tt.originalDesired, tt.persistOriginalDesiredOnTag, tt.canIncreaseMax, tt.verbose) + err := adjust(kubernetesEnabled, tt.asgs, ec2Svc, asgSvc, tt.handler, tt.originalDesired, tt.persistOriginalDesiredOnTag, tt.canIncreaseMax, tt.verbose, tt.drain, tt.drainForce) // what were our last calls to each? switch { case (err == nil && tt.err != nil) || (err != nil && tt.err == nil) || (err != nil && tt.err != nil && !strings.HasPrefix(err.Error(), tt.err.Error())):