Skip to content

Commit

Permalink
Add cluster update support (#14)
Browse files Browse the repository at this point in the history
Description of changes:
Adds hooks for checking cluster status, and gating update or delete operations unless cluster is in `ACTIVE` status. Adds custom update logic for the `UpdateClusterVersion` and `UpdateClusterConfig` operations. 

Note:
Update operations are asynchronous for EKS clusters, but can only be made one at a time. At the end of any update call, we forcefully set the synced condition to false and requeue with a 15 second duration. This gives the server time to receive and start the async update request, start the update and set the cluster status to `UPDATING` before we requeue and call read one.

By submitting this pull request, I confirm that my contribution is made under
the terms of the Apache 2.0 license.
  • Loading branch information
RedbackThomson authored Jan 7, 2022
1 parent 547bf9f commit 955d10c
Show file tree
Hide file tree
Showing 13 changed files with 515 additions and 21 deletions.
6 changes: 3 additions & 3 deletions apis/v1alpha1/ack-generate-metadata.yaml
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
ack_generate_info:
build_date: "2021-11-17T18:42:03Z"
build_date: "2022-01-05T23:22:05Z"
build_hash: 966e9a9ac6dfb4bbc2d3ded1972ce2b706391d44
go_version: go1.17.1
version: v0.15.2
api_directory_checksum: 4c0c0a0fc33dd3b9ac0d5c6adbce3eb818ec0502
api_directory_checksum: 18423a5aa90c547e267c460f25ce08e5c0b4aaa6
api_version: v1alpha1
aws_sdk_go_version: v1.38.67
generator_config_info:
file_checksum: 4878e381dd85bb88d0691255f3359da6775fc169
file_checksum: f94cfb6bf61dca6a809c8d7cf0dfc1edc561add6
original_file_name: generator.yaml
last_modification:
reason: API generation
11 changes: 11 additions & 0 deletions apis/v1alpha1/generator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,17 @@ resources:
- MissingAction
- MissingParameter
- ValidationError
hooks:
sdk_create_post_set_output:
template_path: hooks/cluster/sdk_create_post_set_output.go.tpl
sdk_read_one_post_set_output:
template_path: hooks/cluster/sdk_read_one_post_set_output.go.tpl
sdk_delete_pre_build_request:
template_path: hooks/cluster/sdk_delete_pre_build_request.go.tpl
sdk_file_end:
template_path: hooks/cluster/sdk_file_end.go.tpl
update_operation:
custom_method_name: customUpdate
FargateProfile:
renames:
operations:
Expand Down
1 change: 1 addition & 0 deletions apis/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions generator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,17 @@ resources:
- MissingAction
- MissingParameter
- ValidationError
hooks:
sdk_create_post_set_output:
template_path: hooks/cluster/sdk_create_post_set_output.go.tpl
sdk_read_one_post_set_output:
template_path: hooks/cluster/sdk_read_one_post_set_output.go.tpl
sdk_delete_pre_build_request:
template_path: hooks/cluster/sdk_delete_pre_build_request.go.tpl
sdk_file_end:
template_path: hooks/cluster/sdk_file_end.go.tpl
update_operation:
custom_method_name: customUpdate
FargateProfile:
renames:
operations:
Expand Down
278 changes: 278 additions & 0 deletions pkg/resource/cluster/hook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,278 @@
// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may
// not use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language governing
// permissions and limitations under the License.

package cluster

import (
"context"
"errors"
"fmt"
"time"

ackcompare "github.com/aws-controllers-k8s/runtime/pkg/compare"
ackcondition "github.com/aws-controllers-k8s/runtime/pkg/condition"
ackerr "github.com/aws-controllers-k8s/runtime/pkg/errors"
ackrequeue "github.com/aws-controllers-k8s/runtime/pkg/requeue"
ackrtlog "github.com/aws-controllers-k8s/runtime/pkg/runtime/log"
svcsdk "github.com/aws/aws-sdk-go/service/eks"
corev1 "k8s.io/api/core/v1"
)

const (
LoggingNoChangesError = "No changes needed for the logging config provided"
)

// Taken from the list of cluster statuses on the boto3 documentation
// https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/eks.html#EKS.Client.describe_cluster
const (
StatusCreating = "CREATING"
StatusActive = "ACTIVE"
StatusDeleting = "DELETING"
StatusFailed = "FAILED"
StatusUpdating = "UPDATING"
StatusPending = "PENDING"
)

var (
// TerminalStatuses are the status strings that are terminal states for a
// cluster.
TerminalStatuses = []string{
StatusDeleting,
StatusFailed,
}
)

var (
requeueWaitWhileDeleting = ackrequeue.NeededAfter(
errors.New(
fmt.Sprintf("Cluster in '%s' state, cannot be modified or deleted.", StatusDeleting),
),
ackrequeue.DefaultRequeueAfterDuration,
)
RequeueAfterUpdateDuration = 15 * time.Second
)

// requeueWaitUntilCanModify returns a `ackrequeue.RequeueNeededAfter` struct
// explaining the cluster cannot be modified until it reaches an active status.
func requeueWaitUntilCanModify(r *resource) *ackrequeue.RequeueNeededAfter {
if r.ko.Status.Status == nil {
return nil
}
status := *r.ko.Status.Status
msg := fmt.Sprintf(
"Cluster in '%s' state, cannot be modified until '%s'.",
status, StatusActive,
)
return ackrequeue.NeededAfter(
errors.New(msg),
ackrequeue.DefaultRequeueAfterDuration,
)
}

// requeueAfterAsyncUpdate returns a `ackrequeue.RequeueNeededAfter` struct
// explaining the cluster cannot be modified until after the asynchronous update
// has (first, started and then) completed and the cluster reaches an active
// status.
func requeueAfterAsyncUpdate() *ackrequeue.RequeueNeededAfter {
msg := fmt.Sprintf(
"Cluster has started asynchronously updating, cannot be modified until '%s'.",
StatusActive,
)
return ackrequeue.NeededAfter(
errors.New(msg),
RequeueAfterUpdateDuration,
)
}

// clusterHasTerminalStatus returns whether the supplied cluster is in a
// terminal state
func clusterHasTerminalStatus(r *resource) bool {
if r.ko.Status.Status == nil {
return false
}
cs := *r.ko.Status.Status
for _, s := range TerminalStatuses {
if cs == s {
return true
}
}
return false
}

// clusterActive returns true if the supplied cluster is in an active status
func clusterActive(r *resource) bool {
if r.ko.Status.Status == nil {
return false
}
cs := *r.ko.Status.Status
return cs == StatusActive
}

// clusterCreating returns true if the supplied cluster is in the process of
// being created
func clusterCreating(r *resource) bool {
if r.ko.Status.Status == nil {
return false
}
cs := *r.ko.Status.Status
return cs == StatusCreating
}

// clusterDeleting returns true if the supplied cluster is in the process of
// being deleted
func clusterDeleting(r *resource) bool {
if r.ko.Status.Status == nil {
return false
}
cs := *r.ko.Status.Status
return cs == StatusDeleting
}

// returnClusterUpdating will set synced to false on the resource and
// return an async requeue error to signify that the resource should be
// forcefully requeued in order to pick up the 'UPDATING' status.
func returnClusterUpdating(r *resource) (*resource, error) {
msg := "Cluster is currently being updated"
ackcondition.SetSynced(r, corev1.ConditionFalse, &msg, nil)
return r, requeueAfterAsyncUpdate()
}

func (rm *resourceManager) customUpdate(
ctx context.Context,
desired *resource,
latest *resource,
delta *ackcompare.Delta,
) (updated *resource, err error) {
rlog := ackrtlog.FromContext(ctx)
exit := rlog.Trace("rm.customUpdate")
defer exit(err)

if clusterDeleting(latest) {
msg := "Cluster is currently being deleted"
ackcondition.SetSynced(desired, corev1.ConditionFalse, &msg, nil)
return desired, requeueWaitWhileDeleting
}
if !clusterActive(latest) {
msg := "Cluster is in '" + *latest.ko.Status.Status + "' status"
ackcondition.SetSynced(desired, corev1.ConditionFalse, &msg, nil)
if clusterHasTerminalStatus(latest) {
ackcondition.SetTerminal(desired, corev1.ConditionTrue, &msg, nil)
return desired, nil
}
return desired, requeueWaitUntilCanModify(latest)
}

// Merge in the information we read from the API call above to the copy of
// the original Kubernetes object we passed to the function
ko := desired.ko.DeepCopy()

// None of these methods modify the status, so we should return the latest
// status as given by the ReadOne
ko.Status = latest.ko.Status

if delta.DifferentAt("Spec.Logging") {
if err := rm.updateConfigLogging(ctx, desired); err != nil {
awserr, ok := ackerr.AWSError(err)

// The API responds with an error if there were no changes applied
if !ok || awserr.Message() != LoggingNoChangesError {
return nil, err
}
}
return returnClusterUpdating(desired)
}
if delta.DifferentAt("Spec.ResourcesVPCConfig") {
if err := rm.updateConfigResourcesVPCConfig(ctx, desired); err != nil {
return nil, err
}
return returnClusterUpdating(desired)
}
if delta.DifferentAt("Spec.Version") {
if err := rm.updateVersion(ctx, desired); err != nil {
return nil, err
}
return returnClusterUpdating(desired)
}

rm.setStatusDefaults(ko)
return &resource{ko}, nil
}

func (rm *resourceManager) updateVersion(
ctx context.Context,
r *resource,
) (err error) {
rlog := ackrtlog.FromContext(ctx)
exit := rlog.Trace("rm.updateVersion")
defer exit(err)
input := &svcsdk.UpdateClusterVersionInput{
Name: r.ko.Spec.Name,
Version: r.ko.Spec.Version,
}

_, err = rm.sdkapi.UpdateClusterVersionWithContext(ctx, input)
rm.metrics.RecordAPICall("UPDATE", "UpdateClusterVersion", err)
if err != nil {
return err
}

return nil
}

func (rm *resourceManager) updateConfigLogging(
ctx context.Context,
r *resource,
) (err error) {
rlog := ackrtlog.FromContext(ctx)
exit := rlog.Trace("rm.updateConfigLogging")
defer exit(err)
input := &svcsdk.UpdateClusterConfigInput{
Name: r.ko.Spec.Name,
Logging: rm.newLogging(r),
}

_, err = rm.sdkapi.UpdateClusterConfigWithContext(ctx, input)
rm.metrics.RecordAPICall("UPDATE", "UpdateClusterConfig", err)
if err != nil {
return err
}

return nil
}

func (rm *resourceManager) updateConfigResourcesVPCConfig(
ctx context.Context,
r *resource,
) (err error) {
rlog := ackrtlog.FromContext(ctx)
exit := rlog.Trace("rm.updateConfigResourcesVPCConfig")
defer exit(err)
input := &svcsdk.UpdateClusterConfigInput{
Name: r.ko.Spec.Name,
ResourcesVpcConfig: rm.newVpcConfigRequest(r),
}

// From the EKS documentation:
// "You can't update the subnets or security group IDs for an existing
// cluster."
input.ResourcesVpcConfig.SetSubnetIds(nil)
input.ResourcesVpcConfig.SetSecurityGroupIds(nil)

_, err = rm.sdkapi.UpdateClusterConfigWithContext(ctx, input)
rm.metrics.RecordAPICall("UPDATE", "UpdateClusterConfig", err)
if err != nil {
return err
}

return nil
}
Loading

0 comments on commit 955d10c

Please sign in to comment.