Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/cloud/aws/actuators/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ go_library(
"//vendor/github.com/pkg/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/client-go/util/retry:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/klog/klogr:go_default_library",
"//vendor/sigs.k8s.io/cluster-api/pkg/apis/cluster/v1alpha1:go_default_library",
"//vendor/sigs.k8s.io/cluster-api/pkg/client/clientset_generated/clientset/typed/cluster/v1alpha1:go_default_library",
"//vendor/sigs.k8s.io/controller-runtime/pkg/patch:go_default_library",
"//vendor/sigs.k8s.io/yaml:go_default_library",
],
)
71 changes: 33 additions & 38 deletions pkg/cloud/aws/actuators/machine_scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,18 @@ limitations under the License.
package actuators

import (
"encoding/json"
"fmt"

"github.com/go-logr/logr"
"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/util/retry"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/cluster-api-provider-aws/pkg/apis/awsprovider/v1alpha1"
clusterv1 "sigs.k8s.io/cluster-api/pkg/apis/cluster/v1alpha1"
client "sigs.k8s.io/cluster-api/pkg/client/clientset_generated/clientset/typed/cluster/v1alpha1"
"sigs.k8s.io/controller-runtime/pkg/patch"
"sigs.k8s.io/yaml"
)

Expand Down Expand Up @@ -69,6 +71,7 @@ func NewMachineScope(params MachineScopeParams) (*MachineScope, error) {
return &MachineScope{
Scope: scope,
Machine: params.Machine,
MachineCopy: params.Machine.DeepCopy(),
MachineClient: machineClient,
MachineConfig: machineConfig,
MachineStatus: machineStatus,
Expand All @@ -79,7 +82,9 @@ func NewMachineScope(params MachineScopeParams) (*MachineScope, error) {
type MachineScope struct {
*Scope

Machine *clusterv1.Machine
Machine *clusterv1.Machine
// MachineCopy is used to generate a patch diff at the end of the scope's lifecycle.
MachineCopy *clusterv1.Machine
MachineClient client.MachineInterface
MachineConfig *v1alpha1.AWSMachineProviderSpec
MachineStatus *v1alpha1.AWSMachineProviderStatus
Expand Down Expand Up @@ -120,6 +125,7 @@ func (m *MachineScope) Close() {
if m.MachineClient == nil {
return
}

ext, err := v1alpha1.EncodeMachineSpec(m.MachineConfig)
if err != nil {
m.Error(err, "failed to encode machine spec")
Expand All @@ -131,43 +137,32 @@ func (m *MachineScope) Close() {
return
}

// Sometimes when an object gets updated the local copy is out of date with
// the copy stored on the server. In the case of cluster-api this will
// always be because the local copy will have an out-of-date resource
// version. This is because something else has updated the resource version
// on the server and thus the local copy is behind.
// This retry function will update the resource version if the local copy is
// behind and try again.
// This retry function will *only* update the resource version. If some
// other data has changed then there is a problem. Nothing else should be
// updating the object and this function will (correctly) fail.
if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
m.V(2).Info("Updating machine", "machine-resource-version", m.Machine.ResourceVersion, "node-ref", m.Machine.Status.NodeRef)
m.Machine.Spec.ProviderSpec.Value = ext
m.V(6).Info("Machine status before update", "machine-status", m.Machine.Status)
latest, err := m.MachineClient.Update(m.Machine)
if err != nil {
m.V(3).Info("Machine resource version is out of date")
// Fetch and update the latest resource version
newestMachine, err2 := m.MachineClient.Get(m.Machine.Name, metav1.GetOptions{})
if err2 != nil {
m.Error(err2, "failed to fetch latest Machine")
return err2
}
m.Machine.ResourceVersion = newestMachine.ResourceVersion
return err
}
m.V(5).Info("Latest machine", "machine", latest)
// The machine may have status (nodeRef) that the latest doesn't yet
// have, however some timestamps may be rolled back a bit with this copy.
m.Machine.Status.DeepCopyInto(&latest.Status)
latest.Status.ProviderStatus = status
_, err = m.MachineClient.UpdateStatus(latest)
return err
}); err != nil {
m.Error(err, "error retrying on conflict")
m.Machine.Spec.ProviderSpec.Value = ext

p, err := patch.NewJSONPatch(m.MachineCopy, m.Machine)
if err != nil {
m.Error(err, "failed to create new JSONPatch for machine")
return
}
pb, err := json.MarshalIndent(p, "", " ")
if err != nil {
m.Error(err, "failed to json marshal patch for machine")
return
}

updated, err := m.MachineClient.Patch(m.Machine.Name, types.JSONPatchType, pb)
if err != nil {
m.Error(err, "failed to patch machine")
return
}
m.V(2).Info("Successfully updated machine")

updated.Status.ProviderStatus = status
if _, err := m.MachineClient.UpdateStatus(updated); err != nil {
m.Error(err, "failed to update cluster status")
return
}

m.V(1).Info("Updated machine")
}

// MachineConfigFromProviderSpec tries to decode the JSON-encoded spec, falling back on getting a MachineClass if the value is absent.
Expand Down
63 changes: 35 additions & 28 deletions pkg/cloud/aws/actuators/scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,20 @@ limitations under the License.
package actuators

import (
"encoding/json"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/elb"
"github.com/go-logr/logr"
"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/retry"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/klogr"
"sigs.k8s.io/cluster-api-provider-aws/pkg/apis/awsprovider/v1alpha1"
clusterv1 "sigs.k8s.io/cluster-api/pkg/apis/cluster/v1alpha1"
client "sigs.k8s.io/cluster-api/pkg/client/clientset_generated/clientset/typed/cluster/v1alpha1"
"sigs.k8s.io/controller-runtime/pkg/patch"
)

// ScopeParams defines the input parameters used to create a new Scope.
Expand Down Expand Up @@ -81,6 +83,7 @@ func NewScope(params ScopeParams) (*Scope, error) {
return &Scope{
AWSClients: params.AWSClients,
Cluster: params.Cluster,
ClusterCopy: params.Cluster.DeepCopy(),
ClusterClient: clusterClient,
ClusterConfig: clusterConfig,
ClusterStatus: clusterStatus,
Expand All @@ -91,7 +94,9 @@ func NewScope(params ScopeParams) (*Scope, error) {
// Scope defines the basic context for an actuator to operate upon.
type Scope struct {
AWSClients
Cluster *clusterv1.Cluster
Cluster *clusterv1.Cluster
// ClusterCopy is used for patch generation at the end of the scope's lifecycle.
ClusterCopy *clusterv1.Cluster
ClusterClient client.ClusterInterface
ClusterConfig *v1alpha1.AWSClusterProviderSpec
ClusterStatus *v1alpha1.AWSClusterProviderStatus
Expand Down Expand Up @@ -138,6 +143,7 @@ func (s *Scope) Close() {
if s.ClusterClient == nil {
return
}

ext, err := v1alpha1.EncodeClusterSpec(s.ClusterConfig)
if err != nil {
s.Error(err, "failed encoding cluster spec")
Expand All @@ -149,30 +155,31 @@ func (s *Scope) Close() {
return
}

// Update the resource version and try again if there is a conflict saving the object.
if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
s.V(2).Info("Updating cluster", "cluster-resource-version", s.Cluster.ResourceVersion)
s.Cluster.Spec.ProviderSpec.Value = ext
s.V(6).Info("Cluster status before update", "cluster-status", s.Cluster.Status)
latest, err := s.ClusterClient.Update(s.Cluster)
if err != nil {
s.V(3).Info("Cluster resource version is out of date")
// Fetch and update the latest resource version
newestCluster, err2 := s.ClusterClient.Get(s.Cluster.Name, metav1.GetOptions{})
if err2 != nil {
s.Error(err2, "failed to fetch latest cluster")
return err2
}
s.Cluster.ResourceVersion = newestCluster.ResourceVersion
return err
}
s.V(5).Info("Latest cluster status", "cluster-status", latest.Status)
s.Cluster.Status.DeepCopyInto(&latest.Status)
latest.Status.ProviderStatus = status
_, err = s.ClusterClient.UpdateStatus(latest)
return err
}); err != nil {
s.Error(err, "failed to retry on conflict")
s.Cluster.Spec.ProviderSpec.Value = ext

// Build a patch and marshal that patch to something the client will understand.
p, err := patch.NewJSONPatch(s.ClusterCopy, s.Cluster)
if err != nil {
s.Error(err, "failed to create new JSONPatch")
return
}

pb, err := json.MarshalIndent(p, "", " ")
if err != nil {
s.Error(err, "failed to json marshal patch")
return
}

updated, err := s.ClusterClient.Patch(s.Cluster.Name, types.JSONPatchType, pb)
if err != nil {
s.Error(err, "failed to patch cluster")
return
}

updated.Status.ProviderStatus = status
if _, err := s.ClusterClient.UpdateStatus(updated); err != nil {
s.Error(err, "failed to update cluster status")
return
}
s.V(2).Info("Successfully updated cluster")
s.V(1).Info("Updated cluster")
}