Skip to content

Commit

Permalink
Implements Save and Restore. Save writes objects from target cluster
Browse files Browse the repository at this point in the history
and namespace to disc in configured directory. Restore will
rebuild the object graph from configured directory files.
Enables reboot / reload workflow.
  • Loading branch information
jpmcb committed Jun 9, 2021
1 parent 5a09f69 commit e9e6c9d
Show file tree
Hide file tree
Showing 6 changed files with 632 additions and 1 deletion.
6 changes: 6 additions & 0 deletions cmd/clusterctl/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ type Client interface {
// Move moves all the Cluster API objects existing in a namespace (or from all the namespaces if empty) to a target management cluster.
Move(options MoveOptions) error

// Save saves all the Cluster API objects existing in a namespace (or from all the namespaces if empty) to a target management cluster.
Save(options SaveOptions) error

// Restore restores all the Cluster API objects existing in a configured directory based on a glob to a target management cluster.
Restore(options RestoreOptions) error

// PlanUpgrade returns a set of suggested Upgrade plans for the cluster, and more specifically:
// - Each management group gets separated upgrade plans.
// - For each management group, an upgrade plan is generated for each API Version of Cluster API (contract) available, e.g.
Expand Down
8 changes: 8 additions & 0 deletions cmd/clusterctl/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,14 @@ func (f fakeClient) Move(options MoveOptions) error {
return f.internalClient.Move(options)
}

func (f fakeClient) Save(options SaveOptions) error {
return f.internalClient.Save(options)
}

func (f fakeClient) Restore(options RestoreOptions) error {
return f.internalClient.Restore(options)
}

func (f fakeClient) PlanUpgrade(options PlanUpgradeOptions) ([]UpgradePlan, error) {
return f.internalClient.PlanUpgrade(options)
}
Expand Down
316 changes: 316 additions & 0 deletions cmd/clusterctl/client/cluster/mover.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ package cluster

import (
"fmt"
"io/ioutil"
"path/filepath"
"regexp"

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
Expand All @@ -30,13 +33,18 @@ import (
"k8s.io/apimachinery/pkg/util/version"
clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha3"
logf "sigs.k8s.io/cluster-api/cmd/clusterctl/log"
"sigs.k8s.io/cluster-api/util/yaml"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// ObjectMover defines methods for moving Cluster API objects to another management cluster.
type ObjectMover interface {
// Move moves all the Cluster API objects existing in a namespace (or from all the namespaces if empty) to a target management cluster.
Move(namespace string, toCluster Client, dryRun bool) error
// Save saves all the Cluster API objects existing in a namespace (or from all the namespaces if empty) to a target management cluster.
Save(namespace string, directory string) error
// Restore restores all the Cluster API objects existing in a configured directory based on a glob to a target management cluster.
Restore(namespace string, glob string, directory string) error
}

// objectMover implements the ObjectMover interface.
Expand Down Expand Up @@ -105,6 +113,106 @@ func (o *objectMover) Move(namespace string, toCluster Client, dryRun bool) erro
return nil
}

func (o *objectMover) Save(namespace string, directory string) error {
log := logf.Log
log.Info("Performing save...")

objectGraph := newObjectGraph(o.fromProxy)

// Gets all the types defines by the CRDs installed by clusterctl plus the ConfigMap/Secret core types.
err := objectGraph.getDiscoveryTypes()
if err != nil {
return err
}

// Discovery the object graph for the selected types:
// - Nodes are defined the Kubernetes objects (Clusters, Machines etc.) identified during the discovery process.
// - Edges are derived by the OwnerReferences between nodes.
if err := objectGraph.Discovery(namespace); err != nil {
return err
}

// Checks if Cluster API has already completed the provisioning of the infrastructure for the objects involved in the save operation.
// This is required because if the infrastructure is provisioned, then we can reasonably assume that the objects we are saving are
// not currently waiting for long-running reconciliation loops, and so we can safely rely on the pause field on the Cluster object
// for blocking any further object reconciliation on the source objects.
if err := o.checkProvisioningCompleted(objectGraph); err != nil {
return err
}

// Check whether nodes are not included in GVK considered for move
objectGraph.checkVirtualNode()

if err := o.save(objectGraph, directory); err != nil {
return err
}

return nil
}

func (o *objectMover) Restore(namespace string, glob string, directory string) error {
log := logf.Log
log.Info("Performing restore...")

objectGraph := newObjectGraph(o.fromProxy)

// Rebuild the restore objectgraph from files (based on tag / glob / directory)
objectsMatcher := regexp.MustCompile(`^.*` + glob + `.*`)

files, err := filepath.Glob(directory + "/*")
if err != nil {
return err
}

var rawYamls [][]byte
for _, objectFile := range files {
if objectsMatcher.MatchString(objectFile) {
byObj, err := ioutil.ReadFile(objectFile)
if err != nil {
return err
}

rawYamls = append(rawYamls, byObj)
}
}

processedYamls := yaml.JoinYaml(rawYamls...)

objs, err := yaml.ToUnstructured(processedYamls)
if err != nil {
return err
}

// Gets all the types defined by the CRDs provided by file plus the ConfigMap/Secret core types.
err = objectGraph.getDiscoveryFileTypes(objs)
if err != nil {
return err
}

for _, obj := range objs {
objectGraph.addObj(&obj)
}

// Completes the graph by searching for soft ownership relations such as secrets linked to the cluster
// by a naming convention (without any explicit OwnerReference).
objectGraph.setSoftOwnership()

// Completes the graph by setting for each node the list of Clusters the node belong to.
objectGraph.setClusterTenants()

// Completes the graph by setting for each node the list of ClusterResourceSet the node belong to.
objectGraph.setCRSTenants()

// Check whether nodes are not included in GVK considered for move
objectGraph.checkVirtualNode()

if err := o.restore(objectGraph, o.fromProxy); err != nil {
return err
}

return nil
}

func newObjectMover(fromProxy Proxy, fromProviderInventory InventoryClient) *objectMover {
return &objectMover{
fromProxy: fromProxy,
Expand Down Expand Up @@ -256,6 +364,71 @@ func (o *objectMover) move(graph *objectGraph, toProxy Proxy) error {
return nil
}

func (o *objectMover) save(graph *objectGraph, directory string) error {
log := logf.Log

clusters := graph.getClusters()
log.Info("Moving Cluster API objects", "Clusters", len(clusters))

// Sets the pause field on the Cluster object in the source management cluster, so the controllers stop reconciling it.
log.V(1).Info("Pausing the source cluster")
if err := setClusterPause(o.fromProxy, clusters, true, o.dryRun); err != nil {
return err
}

// Define the move sequence by processing the ownerReference chain, so we ensure that a Kubernetes object is moved only after its owners.
// The sequence is bases on object graph nodes, each one representing a Kubernetes object; nodes are grouped, so bulk of nodes can be moved in parallel. e.g.
// - All the Clusters should be moved first (group 1, processed in parallel)
// - All the MachineDeployments should be moved second (group 1, processed in parallel)
// - then all the MachineSets, then all the Machines, etc.
moveSequence := getMoveSequence(graph)

// Save all objects group by group, ensuring all the ownerReferences are re-created.
log.Info("Creating objects in the target cluster")
for groupIndex := 0; groupIndex < len(moveSequence.groups); groupIndex++ {
if err := o.saveGroup(moveSequence.getGroup(groupIndex), directory); err != nil {
return err
}
}

return nil
}

func (o *objectMover) restore(graph *objectGraph, toProxy Proxy) error {
log := logf.Log

// Get clusters from graph
clusters := graph.getClusters()

// Ensure all the expected target namespaces are in place before creating objects.
log.V(1).Info("Creating target namespaces, if missing")
if err := o.ensureNamespaces(graph, toProxy); err != nil {
return err
}

// Define the move sequence by processing the ownerReference chain, so we ensure that a Kubernetes object is moved only after its owners.
// The sequence is bases on object graph nodes, each one representing a Kubernetes object; nodes are grouped, so bulk of nodes can be moved in parallel. e.g.
// - All the Clusters should be moved first (group 1, processed in parallel)
// - All the MachineDeployments should be moved second (group 1, processed in parallel)
// - then all the MachineSets, then all the Machines, etc.
moveSequence := getMoveSequence(graph)

// Create all objects group by group, ensuring all the ownerReferences are re-created.
for groupIndex := 0; groupIndex < len(moveSequence.groups); groupIndex++ {
if err := o.restoreGroup(moveSequence.getGroup(groupIndex), toProxy); err != nil {
return err
}
}

// Reset the pause field on the Cluster object in the target management cluster, so the controllers start reconciling it.
log.V(1).Info("Resuming the source cluster")
if err := setClusterPause(toProxy, clusters, false, o.dryRun); err != nil {
return err
}

return nil
}

// moveSequence defines a list of group of moveGroups
type moveSequence struct {
groups []moveGroup
Expand Down Expand Up @@ -502,6 +675,52 @@ func (o *objectMover) createGroup(group moveGroup, toProxy Proxy) error {
return nil
}

func (o *objectMover) saveGroup(group moveGroup, directory string) error {
createTargetObjectBackoff := newWriteBackoff()
errList := []error{}
for i := range group {
nodeToCreate := group[i]

// Creates the Kubernetes object corresponding to the nodeToCreate.
// Nb. The operation is wrapped in a retry loop to make move more resilient to unexpected conditions.
err := retryWithExponentialBackoff(createTargetObjectBackoff, func() error {
return o.saveTargetObject(nodeToCreate, directory)
})
if err != nil {
errList = append(errList, err)
}
}

if len(errList) > 0 {
return kerrors.NewAggregate(errList)
}

return nil
}

func (o *objectMover) restoreGroup(group moveGroup, toProxy Proxy) error {
createTargetObjectBackoff := newWriteBackoff()
errList := []error{}
for i := range group {
nodeToCreate := group[i]

// Creates the Kubernetes object corresponding to the nodeToCreate.
// Nb. The operation is wrapped in a retry loop to make move more resilient to unexpected conditions.
err := retryWithExponentialBackoff(createTargetObjectBackoff, func() error {
return o.restoreTargetObject(nodeToCreate, toProxy)
})
if err != nil {
errList = append(errList, err)
}
}

if len(errList) > 0 {
return kerrors.NewAggregate(errList)
}

return nil
}

// createTargetObject creates the Kubernetes object in the target Management cluster corresponding to the object graph node, taking care of restoring the OwnerReference with the owner nodes, if any.
func (o *objectMover) createTargetObject(nodeToCreate *node, toProxy Proxy) error {
log := logf.Log
Expand Down Expand Up @@ -598,6 +817,103 @@ func (o *objectMover) createTargetObject(nodeToCreate *node, toProxy Proxy) erro
return nil
}

func (o *objectMover) saveTargetObject(nodeToCreate *node, directory string) error {
log := logf.Log
log.V(1).Info("Saving", nodeToCreate.identity.Kind, nodeToCreate.identity.Name, "Namespace", nodeToCreate.identity.Namespace)

cFrom, err := o.fromProxy.NewClient()
if err != nil {
return err
}

// Get the source object
obj := &unstructured.Unstructured{}
obj.SetAPIVersion(nodeToCreate.identity.APIVersion)
obj.SetKind(nodeToCreate.identity.Kind)
objKey := client.ObjectKey{
Namespace: nodeToCreate.identity.Namespace,
Name: nodeToCreate.identity.Name,
}

if err := cFrom.Get(ctx, objKey, obj); err != nil {
return errors.Wrapf(err, "error reading %q %s/%s",
obj.GroupVersionKind(), obj.GetNamespace(), obj.GetName())
}

// Get JSON for object and write it into the configured directory
byObj, err := obj.MarshalJSON()
if err != nil {
return err
}

filenameObj := nodeToCreate.identity.Kind + "_" + nodeToCreate.identity.Name + "_" + nodeToCreate.identity.Namespace
objectFile := filepath.Join(directory, filenameObj)

err = ioutil.WriteFile(objectFile, byObj, 0644)
if err != nil {
return err
}

return nil
}

func (o *objectMover) restoreTargetObject(nodeToCreate *node, toProxy Proxy) error {
log := logf.Log
log.V(1).Info("Restoring", nodeToCreate.identity.Kind, nodeToCreate.identity.Name, "Namespace", nodeToCreate.identity.Namespace)

// Get the source object
obj := nodeToCreate.rawObject
obj.SetAPIVersion(nodeToCreate.identity.APIVersion)
obj.SetKind(nodeToCreate.identity.Kind)

// New objects cannot have a specified resource version. Clear it out.
obj.SetResourceVersion("")

// Removes current OwnerReferences
obj.SetOwnerReferences(nil)

// Rebuild all the OwnerReferences using the newUID of the owner nodes.
if len(nodeToCreate.owners) > 0 {
ownerRefs := []metav1.OwnerReference{}
for ownerNode := range nodeToCreate.owners {
ownerRef := metav1.OwnerReference{
APIVersion: ownerNode.identity.APIVersion,
Kind: ownerNode.identity.Kind,
Name: ownerNode.identity.Name,
UID: ownerNode.newUID, // Use the owner's newUID read from the target management cluster (instead of the UID read during discovery).
}

// Restores the attributes of the OwnerReference.
if attributes, ok := nodeToCreate.owners[ownerNode]; ok {
ownerRef.Controller = attributes.Controller
ownerRef.BlockOwnerDeletion = attributes.BlockOwnerDeletion
}

ownerRefs = append(ownerRefs, ownerRef)
}

obj.SetOwnerReferences(ownerRefs)
}

// Creates the targetObj into the target management cluster.
cTo, err := toProxy.NewClient()
if err != nil {
return err
}

if err := cTo.Create(ctx, obj); err != nil {
if !apierrors.IsAlreadyExists(err) {
return errors.Wrapf(err, "error creating %q %s/%s",
obj.GroupVersionKind(), obj.GetNamespace(), obj.GetName())
}
}

// Stores the newUID assigned to the newly created object.
nodeToCreate.newUID = obj.GetUID()

return nil
}

// deleteGroup deletes all the Kubernetes objects from the source management cluster corresponding to the object graph nodes in a moveGroup.
func (o *objectMover) deleteGroup(group moveGroup) error {
deleteSourceObjectBackoff := newWriteBackoff()
Expand Down
Loading

0 comments on commit e9e6c9d

Please sign in to comment.