Skip to content

Commit

Permalink
Add support for json lists as resource + resource processing dedupped.
Browse files Browse the repository at this point in the history
  • Loading branch information
Sedef committed Jul 7, 2020
1 parent 20ff3f1 commit 3487602
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 109 deletions.
1 change: 1 addition & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ rules:
verbs:
- get
- list
- patch
- watch
- apiGroups:
- ""
Expand Down
7 changes: 2 additions & 5 deletions exp/addons/api/v1alpha3/condition_consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,9 @@ const (
// ApplyFailedReason (Severity=Warning) documents applying at least one of the resources to one of the matching clusters is failed.
ApplyFailedReason = "ApplyFailed"

// MissingResource (Severity=Warning) documents at least one of the resources are not found.
MissingResourceReason = "MissingResource"
// RetrievingResourceFailedReason (Severity=Warning) documents at least one of the resources are not successfully retrieved.
RetrievingResourceFailedReason = "RetrievingResourceFailed"

// WrongSecretType (Severity=Warning) documents at least one of the Secret's type in the resource list is not supported.
WrongSecretTypeReason = "WrongSecretType"

// UnsupportedResourceType (Severity=Warning) documents at least one of the resources has unsupported resource type.
UnsupportedResourceTypeReason = "UnsupportedResourceType"
)
193 changes: 94 additions & 99 deletions exp/addons/controllers/clusterresourceset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package controllers

import (
"context"
"encoding/base64"
"fmt"
"sort"
"time"
Expand Down Expand Up @@ -45,8 +46,12 @@ import (
"sigs.k8s.io/controller-runtime/pkg/source"
)

// +kubebuilder:rbac:groups=core,resources=secrets,verbs=get;list;watch
// +kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch
var (
ErrSecretTypeNotSupported = errors.New("unsupported secret type")
)

// +kubebuilder:rbac:groups=core,resources=secrets,verbs=get;list;watch;patch
// +kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch;patch
// +kubebuilder:rbac:groups=addons.cluster.x-k8s.io,resources=*,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=addons.cluster.x-k8s.io,resources=clusterresourcesets/status,verbs=get;update;patch

Expand Down Expand Up @@ -193,10 +198,9 @@ func (r *ClusterResourceSetReconciler) ApplyClusterResourceSet(ctx context.Conte
// Initializes the maps in ClusterResourceSetBinding object if they are uninitialized.
initClusterResourceSetBinding(clusterResourceSetBinding, clusterResourceSet)

var bindings addonsv1.ResourcesSetBinding
var ok bool
errList := []error{}
bindings = clusterResourceSetBinding.Spec.Bindings[clusterResourceSet.Name]
bindings := clusterResourceSetBinding.Spec.Bindings[clusterResourceSet.Name]

// Iterate all resources and apply them to the cluster and update the resource status in the ClusterResourceSetBinding object.
for _, resource := range clusterResourceSet.Spec.Resources {
Expand All @@ -209,104 +213,63 @@ func (r *ClusterResourceSetReconciler) ApplyClusterResourceSet(ctx context.Conte
}
}

// Only allow using resources (Secrets/Configmaps) in the same namespace with the cluster.
resourceName := types.NamespacedName{Name: resource.Name, Namespace: cluster.Namespace}
dataList := make([][]byte, 0)
unstructuredObj := &unstructured.Unstructured{}

// Retrieve data in the resource as an array because there can be many key-value pairs in resource and all will be applied to the cluster.
switch resource.Kind {
case string(addonsv1.ConfigMapClusterResourceSetResourceKind):
// Set status in ClusterResourceSetBinding in case of early continue due to a failure. Do this for only supported kinds.
bindings.Resources[resourceKindName] = generateResourceStatus(false, "")

resourceConfigMap, err := getConfigMap(context.Background(), r.Client, resourceName)
if err != nil {
logger.Error(err, "Failed to find ConfigMap resource", "ConfigMap name", resourceName)
conditions.MarkFalse(clusterResourceSet, addonsv1.ResourcesAppliedCondition, addonsv1.MissingResourceReason, clusterv1.ConditionSeverityWarning, err.Error())
errList = append(errList, err)
continue
}

// Convert the object to unstructured to compare against our before copy.
raw, err := runtime.DefaultUnstructuredConverter.ToUnstructured(resourceConfigMap.DeepCopyObject())
if err != nil {
logger.Error(err, "Failed to set ClusterResourceSet as resource owner reference",
"Resource type", resourceConfigMap.Kind, "Resource name", resourceConfigMap.Name)
errList = append(errList, err)
}
unstructuredObj = &unstructured.Unstructured{Object: raw}

err = r.patchOwnerRefToResource(ctx, clusterResourceSet, unstructuredObj)
if err != nil {
logger.Error(err, "Failed to patch ClusterResourceSet as resource owner reference",
"Resource type", resourceConfigMap.Kind, "Resource name", resourceConfigMap.Name)
errList = append(errList, err)
unstructuredObj, err := r.getResource(resource, cluster.GetNamespace())
if err != nil {
if err == ErrSecretTypeNotSupported {
conditions.MarkFalse(clusterResourceSet, addonsv1.ResourcesAppliedCondition, addonsv1.WrongSecretTypeReason, clusterv1.ConditionSeverityWarning, err.Error())
} else {
conditions.MarkFalse(clusterResourceSet, addonsv1.ResourcesAppliedCondition, addonsv1.RetrievingResourceFailedReason, clusterv1.ConditionSeverityWarning, err.Error())
}
errList = append(errList, err)
continue
}

// Since maps are not ordered, we need to order them to get the same hash at each reconcile.
keys := make([]string, 0)
for key := range resourceConfigMap.Data {
keys = append(keys, key)
}
sort.Strings(keys)
// Set status in ClusterResourceSetBinding in case of early continue due to a failure.
// Set only when resource is retrieved successfully.
bindings.Resources[resourceKindName] = addonsv1.ResourceBinding{
Hash: "",
Applied: false,
LastAppliedTime: &metav1.Time{Time: time.Now().UTC()},
}

for _, key := range keys {
dataList = append(dataList, []byte(resourceConfigMap.Data[key]))
}
if err := r.patchOwnerRefToResource(ctx, clusterResourceSet, unstructuredObj); err != nil {
logger.Error(err, "Failed to patch ClusterResourceSet as resource owner reference",
"Resource type", unstructuredObj.GetKind(), "Resource name", unstructuredObj.GetName())
errList = append(errList, err)
}

case string(addonsv1.SecretClusterResourceSetResourceKind):
// Set status in ClusterResourceSetBinding in case of early continue due to a failure. Do this for only supported kinds.
bindings.Resources[resourceKindName] = generateResourceStatus(false, "")
// Since maps are not ordered, we need to order them to get the same hash at each reconcile.
keys := make([]string, 0)
data, ok := unstructuredObj.UnstructuredContent()["data"]
if !ok {
errList = append(errList, errors.New("failed to get data field from the resource"))
continue
}

resourceSecret, err := getSecret(context.Background(), r.Client, resourceName)
if err != nil {
logger.Error(err, "Failed to find Secret resource", "Secret name", resourceName)
conditions.MarkFalse(clusterResourceSet, addonsv1.ResourcesAppliedCondition, addonsv1.MissingResourceReason, clusterv1.ConditionSeverityWarning, err.Error())
errList = append(errList, err)
continue
}
unstructuredData := data.(map[string]interface{})
for key := range unstructuredData {
keys = append(keys, key)
}
sort.Strings(keys)

if resourceSecret.Type != addonsv1.ClusterResourceSetSecretType {
logger.Error(err, "Unsupported ClusterResourceSet Secret type", "Resource type", resourceSecret.Type)
conditions.MarkFalse(clusterResourceSet, addonsv1.ResourcesAppliedCondition, addonsv1.WrongSecretTypeReason, clusterv1.ConditionSeverityWarning, "")
errList = append(errList, err)
dataList := make([][]byte, 0)
for _, key := range keys {
val, ok, err := unstructured.NestedString(unstructuredData, key)
if !ok || err != nil {
errList = append(errList, errors.New("failed to get value field from the resource"))
continue
}

// Convert the object to unstructured to compare against our before copy.
raw, err := runtime.DefaultUnstructuredConverter.ToUnstructured(resourceSecret.DeepCopyObject())
if err != nil {
logger.Error(err, "Failed to set ClusterResourceSet as resource owner reference",
"Resource type", resourceSecret.Type, "Resource name", resourceSecret.Name)
errList = append(errList, err)
byteArr := []byte(val)
// If the resource is a Secret, data needs to be decoded.
if unstructuredObj.GetKind() == string(addonsv1.SecretClusterResourceSetResourceKind) {
byteArr, _ = base64.StdEncoding.DecodeString(val)
}
unstructuredObj = &unstructured.Unstructured{Object: raw}

err = r.patchOwnerRefToResource(ctx, clusterResourceSet, unstructuredObj)
if err != nil {
logger.Error(err, "Failed to patch ClusterResourceSet as resource owner reference",
"Resource type", resourceSecret.Type, "Resource name", resourceSecret.Name)
errList = append(errList, err)
}
// Since maps are not ordered, we need to order them to get the same hash at each reconcile.
keys := make([]string, 0)
for key := range resourceSecret.Data {
keys = append(keys, key)
}
sort.Strings(keys)

for _, key := range keys {
dataList = append(dataList, resourceSecret.Data[key])
}
default:
logger.Error(err, "Unsupported ClusterResourceSet resource kind", "Resource Kind", resource.Kind)
conditions.MarkFalse(clusterResourceSet, addonsv1.ResourcesAppliedCondition, addonsv1.UnsupportedResourceTypeReason, clusterv1.ConditionSeverityWarning, "")
errList = append(errList, err)
continue
dataList = append(dataList, byteArr)
}

// apply all values in the key-value pair of the resource to the cluster.
// Apply all values in the key-value pair of the resource to the cluster.
// As there can be multiple key-value pairs in a resource, each value may have multiple objects in it.
isSuccessful := true
for i := range dataList {
Expand All @@ -320,7 +283,12 @@ func (r *ClusterResourceSetReconciler) ApplyClusterResourceSet(ctx context.Conte
}
}

binding = generateResourceStatus(isSuccessful, computeHash(dataList))
binding = addonsv1.ResourceBinding{
Hash: computeHash(dataList),
Applied: isSuccessful,
LastAppliedTime: &metav1.Time{Time: time.Now().UTC()},
}

bindings.Resources[resourceKindName] = binding
}
if len(errList) > 0 {
Expand All @@ -332,6 +300,42 @@ func (r *ClusterResourceSetReconciler) ApplyClusterResourceSet(ctx context.Conte
return nil
}

// getResource retrieves the requested resource and convert it to unstructured type.
// Unsupported resource kinds are not denied by validation webhook, hence no need to check here.
// Only supports Secrets/Configmaps as resource types and allow using resources in the same namespace with the cluster.
func (r *ClusterResourceSetReconciler) getResource(resourceRef addonsv1.ResourceRef, namespace string) (*unstructured.Unstructured, error) {
resourceName := types.NamespacedName{Name: resourceRef.Name, Namespace: namespace}

var resourceInterface interface{}
switch resourceRef.Kind {
case string(addonsv1.ConfigMapClusterResourceSetResourceKind):
resourceConfigMap, err := getConfigMap(context.Background(), r.Client, resourceName)
if err != nil {
return nil, err
}

resourceInterface = resourceConfigMap.DeepCopyObject()
case string(addonsv1.SecretClusterResourceSetResourceKind):
resourceSecret, err := getSecret(context.Background(), r.Client, resourceName)
if err != nil {
return nil, err
}

if resourceSecret.Type != addonsv1.ClusterResourceSetSecretType {
return nil, ErrSecretTypeNotSupported
}

resourceInterface = resourceSecret.DeepCopyObject()
}

raw, err := runtime.DefaultUnstructuredConverter.ToUnstructured(resourceInterface)
if err != nil {
return nil, err
}

return &unstructured.Unstructured{Object: raw}, nil
}

// patchOwnerRefToResource adds the ClusterResourceSet as a OwnerReference to the resource.
func (r *ClusterResourceSetReconciler) patchOwnerRefToResource(ctx context.Context, clusterResourceSet *addonsv1.ClusterResourceSet, resource *unstructured.Unstructured) error {
newRef := metav1.OwnerReference{
Expand All @@ -351,15 +355,6 @@ func (r *ClusterResourceSetReconciler) patchOwnerRefToResource(ctx context.Conte
return nil
}

// generateResourceStatus returns ResourceBinding to be used in ClusterResourceSetBinding resource of the matching cluster
func generateResourceStatus(isSuccessful bool, hash string) addonsv1.ResourceBinding {
return addonsv1.ResourceBinding{
Hash: hash,
Applied: isSuccessful,
LastAppliedTime: &metav1.Time{Time: time.Now().UTC()},
}
}

// generateResourceKindName generates a unique name to identify resources that is used in resources map in ClusterResourceSetBinding.
func generateResourceKindName(resource addonsv1.ResourceRef) string {
return resource.Kind + "/" + resource.Name
Expand Down
25 changes: 23 additions & 2 deletions exp/addons/controllers/clusterresourceset_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,25 @@ var _ = Describe("ClusterResourceSet Reconciler", func() {
},
Data: map[string]string{
"cm": `metadata:
name: resource-configmap
namespace: default
name: resource-configmap
namespace: default
kind: ConfigMap
apiVersion: v1
---
metadata:
name: resource-configmap2
namespace: default
kind: ConfigMap
apiVersion: v1`,
"cm2": `metadata:
name: resource-configmap3
namespace: default
kind: ConfigMap
apiVersion: v1
---
metadata:
name: resource-configmap4
namespace: default
kind: ConfigMap
apiVersion: v1`,
},
Expand Down Expand Up @@ -115,6 +132,10 @@ apiVersion: v1`,
if err != nil {
return false
}
//|| (len(binding.Spec.Bindings) == 1 && len(binding.Spec.Bindings["ConfigMap/"+configmapName].Resources) != 2
if len(binding.Spec.Bindings) != 1 {
return false
}

return util.HasOwnerRef(binding.GetOwnerReferences(), metav1.OwnerReference{
APIVersion: clusterv1.GroupVersion.String(),
Expand Down
43 changes: 40 additions & 3 deletions exp/addons/controllers/clusterresourceset_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@ limitations under the License.
package controllers

import (
"bufio"
"bytes"
"context"
"crypto/sha256"
"encoding/json"
"fmt"
"unicode"

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
Expand All @@ -36,12 +40,45 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

var jsonListPrefix = []byte("[")

// isJSONList returns whether the data is in JSON list format.
func isJSONList(data []byte) (bool, error) {
const peekSize = 32
buffer := bufio.NewReaderSize(bytes.NewReader(data), peekSize)
b, err := buffer.Peek(peekSize)
if err != nil {
return false, err
}
trim := bytes.TrimLeftFunc(b, unicode.IsSpace)
return bytes.HasPrefix(trim, jsonListPrefix), nil
}

func apply(ctx context.Context, c client.Client, data []byte) error {
// Transform the yaml in a list of objects, so following transformation can work on typed objects (instead of working on a string/slice of bytes)
objs, err := utilyaml.ToUnstructured(data)
isJSONList, err := isJSONList(data)
if err != nil {
return errors.Wrapf(err, "failed converting data to unstructured objects")
return err
}
objs := []unstructured.Unstructured{}
// If it is a json list, convert each list element to an unstructured object.
if isJSONList {
var results []map[string]interface{}
// Unmarshal the JSON to the interface.
if err = json.Unmarshal(data, &results); err == nil {
for i := range results {
var u unstructured.Unstructured
u.SetUnstructuredContent(results[i])
objs = append(objs, u)
}
}
} else {
// If it is not a json list, data is either json or yaml format.
objs, err = utilyaml.ToUnstructured(data)
if err != nil {
return errors.Wrapf(err, "failed converting data to unstructured objects")
}
}

errList := []error{}
sortedObjs := utilresource.SortForCreate(objs)
for i := range sortedObjs {
Expand Down

0 comments on commit 3487602

Please sign in to comment.