diff --git a/pkg/operator/connectivitycheckcontroller/connectivity_check_controller.go b/pkg/operator/connectivitycheckcontroller/connectivity_check_controller.go index 5f20d4a604..da8d92f2eb 100644 --- a/pkg/operator/connectivitycheckcontroller/connectivity_check_controller.go +++ b/pkg/operator/connectivitycheckcontroller/connectivity_check_controller.go @@ -210,6 +210,7 @@ func ensureConnectivityCheckCRDExists(ctx context.Context, syncContext factory.S ctx, resourceapply.NewClientHolder().WithAPIExtensionsClient(client), syncContext.Recorder(), + nil, func(name string) ([]byte, error) { return bindata.Asset(name) }, "pkg/operator/connectivitycheckcontroller/manifests/controlplane.operator.openshift.io_podnetworkconnectivitychecks.yaml", ) diff --git a/pkg/operator/resource/resourceapply/core.go b/pkg/operator/resource/resourceapply/core.go index 9e78612a90..318d2dc06f 100644 --- a/pkg/operator/resource/resourceapply/core.go +++ b/pkg/operator/resource/resourceapply/core.go @@ -3,23 +3,155 @@ package resourceapply import ( "bytes" "context" + "crypto/md5" + "errors" "fmt" + "io" "sort" "strings" - "k8s.io/klog/v2" - corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/sets" coreclientv1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/klog/v2" "github.com/openshift/library-go/pkg/operator/events" "github.com/openshift/library-go/pkg/operator/resource/resourcemerge" ) +type CachedVersionKey struct { + name, kind, namespace string +} + +// record of resource metadata used to determine if its safe to return early from an ApplyFoo +// resourceHash is an ms5 hash of the required in an ApplyFoo that is computed in case the input changes +// resourceVersion is the received resourceVersion from the apiserver in response to an update that is comparable to the GET +type CachedResource struct { + resourceHash, resourceVersion string +} + +func getResourceMetadata(obj runtime.Object) (string, string, string, string, error) { + var name, kind, namespace, resourceHash string + switch r := obj.(type) { + case *corev1.Namespace: + name = r.Name + kind = r.Kind + case *corev1.Service: + name = r.Name + kind = r.Kind + namespace = r.Namespace + case *corev1.Pod: + name = r.Name + kind = r.Kind + namespace = r.Namespace + case *corev1.ServiceAccount: + name = r.Name + kind = r.Kind + namespace = r.Namespace + case *corev1.ConfigMap: + name = r.Name + kind = r.Kind + namespace = r.Namespace + case *corev1.Secret: + name = r.Name + kind = r.Kind + namespace = r.Namespace + default: + return name, kind, namespace, resourceHash, errors.New("attempted to access metadata for unsupported type") + } + resourceHash = hashOfResourceStruct(obj) + return name, kind, namespace, resourceHash, nil +} + +func getResourceVersion(obj runtime.Object) (string, error) { + var resourceVersion string + switch r := obj.(type) { + case *corev1.Namespace: + resourceVersion = r.ResourceVersion + case *corev1.Service: + resourceVersion = r.ResourceVersion + case *corev1.Pod: + resourceVersion = r.ResourceVersion + case *corev1.ServiceAccount: + resourceVersion = r.ResourceVersion + case *corev1.ConfigMap: + resourceVersion = r.ResourceVersion + case *corev1.Secret: + resourceVersion = r.ResourceVersion + default: + return resourceVersion, errors.New("attempted to access metadata for unsupported type") + } + return resourceVersion, nil +} + +func updateCachedResourceMetadata(cache map[CachedVersionKey]CachedResource, required runtime.Object, actual runtime.Object) { + if cache == nil || required == nil || actual == nil { + return + } + name, kind, namespace, resourceHash, err := getResourceMetadata(required) + if err != nil { + return + } + + resourceVersion, err := getResourceVersion(actual) + if err != nil { + return + } + if resourceVersion == "" { + klog.V(4).Infof("resourceVersion has empty string %s:%s:%s %s", name, kind, namespace, resourceVersion) + return + } + + cache[CachedVersionKey{name, kind, namespace}] = CachedResource{resourceHash, resourceVersion} + klog.V(7).Infof("updated resourceVersion of %s:%s:%s %s", name, kind, namespace, resourceVersion) + +} + +// in the circumstance that an ApplyFoo's 'required' is the same one which was previously +// applied for a given (name, kind, namespace) and the existing resource (if any), +// hasn't been modified since the ApplyFoo last updated that resource, then return true (we don't +// need to reapply the resource). Otherwise return false. +func safeToSkipApply(cache map[CachedVersionKey]CachedResource, required runtime.Object, existing runtime.Object) bool { + if cache == nil || required == nil || existing == nil { + return false + } + name, kind, namespace, resourceHash, err := getResourceMetadata(required) + if err != nil { + return false + } + + resourceVersion, err := getResourceVersion(existing) + if err != nil { + return false + } + + var versionMatch, hashMatch bool + if cached, exists := cache[CachedVersionKey{name, kind, namespace}]; exists { + versionMatch = cached.resourceVersion == resourceVersion + hashMatch = cached.resourceHash == resourceHash + if versionMatch && hashMatch { + klog.V(4).Infof("found matching resourceVersion & manifest hash") + return true + } + } + + return false +} + +// detect changes in a resource by caching a hash of the string representation of the resource +// note: some changes in a resource e.g. nil vs empty, will not be detected this way +func hashOfResourceStruct(o interface{}) string { + oString := fmt.Sprintf("%v", o) + h := md5.New() + io.WriteString(h, oString) + rval := fmt.Sprintf("%x", h.Sum(nil)) + return rval +} + // ApplyNamespace merges objectmeta, does not worry about anything else func ApplyNamespace(ctx context.Context, client coreclientv1.NamespacesGetter, recorder events.Recorder, required *corev1.Namespace) (*corev1.Namespace, bool, error) { existing, err := client.Namespaces().Get(ctx, required.Name, metav1.GetOptions{}) @@ -328,6 +460,355 @@ func ApplySecret(ctx context.Context, client coreclientv1.SecretsGetter, recorde return actual, true, err } +// ApplyNamespace merges objectmeta, does not worry about anything else +func ApplyNamespaceImproved(ctx context.Context, client coreclientv1.NamespacesGetter, recorder events.Recorder, required *corev1.Namespace, cache map[CachedVersionKey]CachedResource) (*corev1.Namespace, bool, error) { + existing, err := client.Namespaces().Get(ctx, required.Name, metav1.GetOptions{}) + if apierrors.IsNotFound(err) { + requiredCopy := required.DeepCopy() + actual, err := client.Namespaces(). + Create(ctx, resourcemerge.WithCleanLabelsAndAnnotations(requiredCopy).(*corev1.Namespace), metav1.CreateOptions{}) + reportCreateEvent(recorder, requiredCopy, err) + updateCachedResourceMetadata(cache, required, actual) + return actual, true, err + } + if err != nil { + return nil, false, err + } + + if safeToSkipApply(cache, required, existing) { + return existing, false, nil + } + + modified := resourcemerge.BoolPtr(false) + existingCopy := existing.DeepCopy() + + resourcemerge.EnsureObjectMeta(modified, &existingCopy.ObjectMeta, required.ObjectMeta) + if !*modified { + updateCachedResourceMetadata(cache, required, existingCopy) + return existingCopy, false, nil + } + + if klog.V(4).Enabled() { + klog.Infof("Namespace %q changes: %v", required.Name, JSONPatchNoError(existing, existingCopy)) + } + + actual, err := client.Namespaces().Update(ctx, existingCopy, metav1.UpdateOptions{}) + reportUpdateEvent(recorder, required, err) + updateCachedResourceMetadata(cache, required, actual) + return actual, true, err +} + +// ApplyService merges objectmeta and requires +// TODO, since this cannot determine whether changes are due to legitimate actors (api server) or illegitimate ones (users), we cannot update +// TODO I've special cased the selector for now +func ApplyServiceImproved(ctx context.Context, client coreclientv1.ServicesGetter, recorder events.Recorder, required *corev1.Service, cache map[CachedVersionKey]CachedResource) (*corev1.Service, bool, error) { + existing, err := client.Services(required.Namespace).Get(ctx, required.Name, metav1.GetOptions{}) + if apierrors.IsNotFound(err) { + requiredCopy := required.DeepCopy() + actual, err := client.Services(requiredCopy.Namespace). + Create(ctx, resourcemerge.WithCleanLabelsAndAnnotations(requiredCopy).(*corev1.Service), metav1.CreateOptions{}) + reportCreateEvent(recorder, requiredCopy, err) + updateCachedResourceMetadata(cache, required, actual) + return actual, true, err + } + if err != nil { + return nil, false, err + } + + if safeToSkipApply(cache, required, existing) { + return existing, false, nil + } + + modified := resourcemerge.BoolPtr(false) + existingCopy := existing.DeepCopy() + + resourcemerge.EnsureObjectMeta(modified, &existingCopy.ObjectMeta, required.ObjectMeta) + selectorSame := equality.Semantic.DeepEqual(existingCopy.Spec.Selector, required.Spec.Selector) + + typeSame := false + requiredIsEmpty := len(required.Spec.Type) == 0 + existingCopyIsCluster := existingCopy.Spec.Type == corev1.ServiceTypeClusterIP + if (requiredIsEmpty && existingCopyIsCluster) || equality.Semantic.DeepEqual(existingCopy.Spec.Type, required.Spec.Type) { + typeSame = true + } + + if selectorSame && typeSame && !*modified { + updateCachedResourceMetadata(cache, required, existingCopy) + return existingCopy, false, nil + } + + existingCopy.Spec.Selector = required.Spec.Selector + existingCopy.Spec.Type = required.Spec.Type // if this is different, the update will fail. Status will indicate it. + + if klog.V(4).Enabled() { + klog.Infof("Service %q changes: %v", required.Namespace+"/"+required.Name, JSONPatchNoError(existing, required)) + } + + actual, err := client.Services(required.Namespace).Update(ctx, existingCopy, metav1.UpdateOptions{}) + reportUpdateEvent(recorder, required, err) + updateCachedResourceMetadata(cache, required, actual) + return actual, true, err +} + +// ApplyPod merges objectmeta, does not worry about anything else +func ApplyPodImproved(ctx context.Context, client coreclientv1.PodsGetter, recorder events.Recorder, required *corev1.Pod, cache map[CachedVersionKey]CachedResource) (*corev1.Pod, bool, error) { + existing, err := client.Pods(required.Namespace).Get(ctx, required.Name, metav1.GetOptions{}) + if apierrors.IsNotFound(err) { + requiredCopy := required.DeepCopy() + actual, err := client.Pods(requiredCopy.Namespace). + Create(ctx, resourcemerge.WithCleanLabelsAndAnnotations(requiredCopy).(*corev1.Pod), metav1.CreateOptions{}) + reportCreateEvent(recorder, requiredCopy, err) + updateCachedResourceMetadata(cache, required, actual) + return actual, true, err + } + if err != nil { + return nil, false, err + } + + if safeToSkipApply(cache, required, existing) { + return existing, false, nil + } + + modified := resourcemerge.BoolPtr(false) + existingCopy := existing.DeepCopy() + + resourcemerge.EnsureObjectMeta(modified, &existingCopy.ObjectMeta, required.ObjectMeta) + if !*modified { + updateCachedResourceMetadata(cache, required, existingCopy) + return existingCopy, false, nil + } + + if klog.V(4).Enabled() { + klog.Infof("Pod %q changes: %v", required.Namespace+"/"+required.Name, JSONPatchNoError(existing, required)) + } + + actual, err := client.Pods(required.Namespace).Update(ctx, existingCopy, metav1.UpdateOptions{}) + reportUpdateEvent(recorder, required, err) + updateCachedResourceMetadata(cache, required, actual) + return actual, true, err +} + +// ApplyServiceAccount merges objectmeta, does not worry about anything else +func ApplyServiceAccountImproved(ctx context.Context, client coreclientv1.ServiceAccountsGetter, recorder events.Recorder, required *corev1.ServiceAccount, cache map[CachedVersionKey]CachedResource) (*corev1.ServiceAccount, bool, error) { + existing, err := client.ServiceAccounts(required.Namespace).Get(ctx, required.Name, metav1.GetOptions{}) + if apierrors.IsNotFound(err) { + requiredCopy := required.DeepCopy() + actual, err := client.ServiceAccounts(requiredCopy.Namespace). + Create(ctx, resourcemerge.WithCleanLabelsAndAnnotations(requiredCopy).(*corev1.ServiceAccount), metav1.CreateOptions{}) + reportCreateEvent(recorder, requiredCopy, err) + updateCachedResourceMetadata(cache, required, actual) + return actual, true, err + } + if err != nil { + return nil, false, err + } + + if safeToSkipApply(cache, required, existing) { + return existing, false, nil + } + + modified := resourcemerge.BoolPtr(false) + existingCopy := existing.DeepCopy() + + resourcemerge.EnsureObjectMeta(modified, &existingCopy.ObjectMeta, required.ObjectMeta) + if !*modified { + updateCachedResourceMetadata(cache, required, existingCopy) + return existingCopy, false, nil + } + if klog.V(4).Enabled() { + klog.Infof("ServiceAccount %q changes: %v", required.Namespace+"/"+required.Name, JSONPatchNoError(existing, required)) + } + actual, err := client.ServiceAccounts(required.Namespace).Update(ctx, existingCopy, metav1.UpdateOptions{}) + reportUpdateEvent(recorder, required, err) + updateCachedResourceMetadata(cache, required, actual) + return actual, true, err +} + +// ApplyConfigMap merges objectmeta, requires data +func ApplyConfigMapImproved(ctx context.Context, client coreclientv1.ConfigMapsGetter, recorder events.Recorder, required *corev1.ConfigMap, cache map[CachedVersionKey]CachedResource) (*corev1.ConfigMap, bool, error) { + existing, err := client.ConfigMaps(required.Namespace).Get(ctx, required.Name, metav1.GetOptions{}) + if apierrors.IsNotFound(err) { + requiredCopy := required.DeepCopy() + actual, err := client.ConfigMaps(requiredCopy.Namespace). + Create(ctx, resourcemerge.WithCleanLabelsAndAnnotations(requiredCopy).(*corev1.ConfigMap), metav1.CreateOptions{}) + reportCreateEvent(recorder, requiredCopy, err) + updateCachedResourceMetadata(cache, required, actual) + return actual, true, err + } + if err != nil { + return nil, false, err + } + + if safeToSkipApply(cache, required, existing) { + return existing, false, nil + } + + modified := resourcemerge.BoolPtr(false) + existingCopy := existing.DeepCopy() + + resourcemerge.EnsureObjectMeta(modified, &existingCopy.ObjectMeta, required.ObjectMeta) + + caBundleInjected := required.Labels["config.openshift.io/inject-trusted-cabundle"] == "true" + _, newCABundleRequired := required.Data["ca-bundle.crt"] + + var modifiedKeys []string + for existingCopyKey, existingCopyValue := range existingCopy.Data { + // if we're injecting a ca-bundle and the required isn't forcing the value, then don't use the value of existing + // to drive a diff detection. If required has set the value then we need to force the value in order to have apply + // behave predictably. + if caBundleInjected && !newCABundleRequired && existingCopyKey == "ca-bundle.crt" { + continue + } + if requiredValue, ok := required.Data[existingCopyKey]; !ok || (existingCopyValue != requiredValue) { + modifiedKeys = append(modifiedKeys, "data."+existingCopyKey) + } + } + for existingCopyKey, existingCopyBinValue := range existingCopy.BinaryData { + if requiredBinValue, ok := required.BinaryData[existingCopyKey]; !ok || !bytes.Equal(existingCopyBinValue, requiredBinValue) { + modifiedKeys = append(modifiedKeys, "binaryData."+existingCopyKey) + } + } + for requiredKey := range required.Data { + if _, ok := existingCopy.Data[requiredKey]; !ok { + modifiedKeys = append(modifiedKeys, "data."+requiredKey) + } + } + for requiredBinKey := range required.BinaryData { + if _, ok := existingCopy.BinaryData[requiredBinKey]; !ok { + modifiedKeys = append(modifiedKeys, "binaryData."+requiredBinKey) + } + } + + dataSame := len(modifiedKeys) == 0 + if dataSame && !*modified { + updateCachedResourceMetadata(cache, required, existingCopy) + return existingCopy, false, nil + } + existingCopy.Data = required.Data + existingCopy.BinaryData = required.BinaryData + // if we're injecting a cabundle, and we had a previous value, and the required object isn't setting the value, then set back to the previous + if existingCABundle, existedBefore := existing.Data["ca-bundle.crt"]; caBundleInjected && existedBefore && !newCABundleRequired { + if existingCopy.Data == nil { + existingCopy.Data = map[string]string{} + } + existingCopy.Data["ca-bundle.crt"] = existingCABundle + } + + actual, err := client.ConfigMaps(required.Namespace).Update(ctx, existingCopy, metav1.UpdateOptions{}) + + var details string + if !dataSame { + sort.Sort(sort.StringSlice(modifiedKeys)) + details = fmt.Sprintf("cause by changes in %v", strings.Join(modifiedKeys, ",")) + } + if klog.V(4).Enabled() { + klog.Infof("ConfigMap %q changes: %v", required.Namespace+"/"+required.Name, JSONPatchNoError(existing, required)) + } + reportUpdateEvent(recorder, required, err, details) + updateCachedResourceMetadata(cache, required, actual) + return actual, true, err +} + +// ApplySecret merges objectmeta, requires data +func ApplySecretImproved(ctx context.Context, client coreclientv1.SecretsGetter, recorder events.Recorder, requiredInput *corev1.Secret, cache map[CachedVersionKey]CachedResource) (*corev1.Secret, bool, error) { + existing, err := client.Secrets(requiredInput.Namespace).Get(ctx, requiredInput.Name, metav1.GetOptions{}) + if apierrors.IsNotFound(err) { + requiredCopy := requiredInput.DeepCopy() + actual, err := client.Secrets(requiredCopy.Namespace). + Create(ctx, resourcemerge.WithCleanLabelsAndAnnotations(requiredCopy).(*corev1.Secret), metav1.CreateOptions{}) + reportCreateEvent(recorder, requiredCopy, err) + updateCachedResourceMetadata(cache, requiredInput, actual) + return actual, true, err + } + if err != nil { + return nil, false, err + } + + if safeToSkipApply(cache, requiredInput, existing) { + return existing, false, nil + } + + // copy the stringData to data. Error on a data content conflict inside required. This is usually a bug. + required := requiredInput.DeepCopy() + if required.Data == nil { + required.Data = map[string][]byte{} + } + for k, v := range required.StringData { + if dataV, ok := required.Data[k]; ok { + if string(dataV) != v { + return nil, false, fmt.Errorf("Secret.stringData[%q] conflicts with Secret.data[%q]", k, k) + } + } + required.Data[k] = []byte(v) + } + required.StringData = nil + + existingCopy := existing.DeepCopy() + + resourcemerge.EnsureObjectMeta(resourcemerge.BoolPtr(false), &existingCopy.ObjectMeta, required.ObjectMeta) + + switch required.Type { + case corev1.SecretTypeServiceAccountToken: + // Secrets for ServiceAccountTokens will have data injected by kube controller manager. + // We will apply only the explicitly set keys. + if existingCopy.Data == nil { + existingCopy.Data = map[string][]byte{} + } + + for k, v := range required.Data { + existingCopy.Data[k] = v + } + + default: + existingCopy.Data = required.Data + } + + existingCopy.Type = required.Type + + // Server defaults some values and we need to do it as well or it will never equal. + if existingCopy.Type == "" { + existingCopy.Type = corev1.SecretTypeOpaque + } + + if equality.Semantic.DeepEqual(existingCopy, existing) { + updateCachedResourceMetadata(cache, required, existingCopy) + return existing, false, nil + } + + if klog.V(4).Enabled() { + klog.Infof("Secret %s/%s changes: %v", required.Namespace, required.Name, JSONPatchSecretNoError(existing, existingCopy)) + } + + var actual *corev1.Secret + /* + * Kubernetes validation silently hides failures to update secret type. + * https://github.com/kubernetes/kubernetes/blob/98e65951dccfd40d3b4f31949c2ab8df5912d93e/pkg/apis/core/validation/validation.go#L5048 + * We need to explicitly opt for delete+create in that case. + */ + if existingCopy.Type == existing.Type { + actual, err = client.Secrets(required.Namespace).Update(ctx, existingCopy, metav1.UpdateOptions{}) + reportUpdateEvent(recorder, existingCopy, err) + + if err == nil { + return actual, true, err + } + if !strings.Contains(err.Error(), "field is immutable") { + return actual, true, err + } + } + + // if the field was immutable on a secret, we're going to be stuck until we delete it. Try to delete and then create + deleteErr := client.Secrets(required.Namespace).Delete(ctx, existingCopy.Name, metav1.DeleteOptions{}) + reportDeleteEvent(recorder, existingCopy, deleteErr) + + // clear the RV and track the original actual and error for the return like our create value. + existingCopy.ResourceVersion = "" + actual, err = client.Secrets(required.Namespace).Create(ctx, existingCopy, metav1.CreateOptions{}) + reportCreateEvent(recorder, existingCopy, err) + updateCachedResourceMetadata(cache, required, actual) + return actual, true, err +} + // SyncConfigMap applies a ConfigMap from a location `sourceNamespace/sourceName` to `targetNamespace/targetName` func SyncConfigMap(ctx context.Context, client coreclientv1.ConfigMapsGetter, recorder events.Recorder, sourceNamespace, sourceName, targetNamespace, targetName string, ownerRefs []metav1.OwnerReference) (*corev1.ConfigMap, bool, error) { return SyncPartialConfigMap(ctx, client, recorder, sourceNamespace, sourceName, targetNamespace, targetName, nil, ownerRefs) diff --git a/pkg/operator/resource/resourceapply/core_test.go b/pkg/operator/resource/resourceapply/core_test.go index b7959bf21b..28586c1037 100644 --- a/pkg/operator/resource/resourceapply/core_test.go +++ b/pkg/operator/resource/resourceapply/core_test.go @@ -672,6 +672,137 @@ func TestApplyNamespace(t *testing.T) { } } +func TestDeepCopyAvoidance(t *testing.T) { + tests := []struct { + name string + existing []runtime.Object + input *corev1.Namespace + expectedModified bool + verifyActions func(actions []clienttesting.Action, t *testing.T) + }{ + { + name: "create", + input: &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{Name: "foo", Labels: map[string]string{"foo": "bar"}, ResourceVersion: "1"}, + }, + + expectedModified: true, + verifyActions: func(actions []clienttesting.Action, t *testing.T) { + if len(actions) != 2 { + t.Fatal(spew.Sdump(actions)) + } + if !actions[0].Matches("get", "namespaces") || actions[0].(clienttesting.GetAction).GetName() != "foo" { + t.Error(spew.Sdump(actions)) + } + if !actions[1].Matches("create", "namespaces") { + t.Error(spew.Sdump(actions)) + } + expected := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{Name: "foo", Labels: map[string]string{"foo": "bar"}, ResourceVersion: "1"}, + } + actual := actions[1].(clienttesting.CreateAction).GetObject().(*corev1.Namespace) + if !equality.Semantic.DeepEqual(expected, actual) { + t.Error(JSONPatchNoError(expected, actual)) + } + }, + }, + { + name: "nothing should happen if neither the input or the resource being updated has changed, since the last update", + existing: []runtime.Object{ + &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{Name: "foo", Labels: map[string]string{"foo": "bar"}, ResourceVersion: "1"}, + }, + }, + input: &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{Name: "foo", Labels: map[string]string{"foo": "bar"}}, + }, + expectedModified: false, + verifyActions: func(actions []clienttesting.Action, t *testing.T) { + if len(actions) != 1 { + t.Fatal(spew.Sdump(actions)) + } + if !actions[0].Matches("get", "namespaces") || actions[0].(clienttesting.GetAction).GetName() != "foo" { + t.Error(spew.Sdump(actions)) + } + }, + }, + { + name: "update, if existing has changed outside of our control since last update of resource", + existing: []runtime.Object{ + &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{Name: "foo", Labels: map[string]string{"foo": "new"}, ResourceVersion: "2"}, + }, + }, + input: &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{Name: "foo", Labels: map[string]string{"foo": "bar"}}, + }, + expectedModified: true, + verifyActions: func(actions []clienttesting.Action, t *testing.T) { + if len(actions) != 2 { + t.Fatal(spew.Sdump(actions)) + } + if !actions[0].Matches("get", "namespaces") || actions[0].(clienttesting.GetAction).GetName() != "foo" { + t.Error(spew.Sdump(actions)) + } + if !actions[1].Matches("update", "namespaces") { + t.Error(spew.Sdump(actions)) + } + expected := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{Name: "foo", Labels: map[string]string{"foo": "bar"}, ResourceVersion: "2"}, + } + actual := actions[1].(clienttesting.UpdateAction).GetObject().(*corev1.Namespace) + if !equality.Semantic.DeepEqual(expected, actual) { + t.Error(JSONPatchNoError(expected, actual)) + } + }, + }, + { + name: "update, if input has changed since last update of resource", + existing: []runtime.Object{ + &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{Name: "foo", Labels: map[string]string{"foo": "bar"}, ResourceVersion: "2"}, + }, + }, + input: &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{Name: "foo", Labels: map[string]string{"foo": "new"}}, + }, + expectedModified: true, + verifyActions: func(actions []clienttesting.Action, t *testing.T) { + if len(actions) != 2 { + t.Fatal(spew.Sdump(actions)) + } + if !actions[0].Matches("get", "namespaces") || actions[0].(clienttesting.GetAction).GetName() != "foo" { + t.Error(spew.Sdump(actions)) + } + if !actions[1].Matches("update", "namespaces") { + t.Error(spew.Sdump(actions)) + } + expected := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{Name: "foo", Labels: map[string]string{"foo": "new"}, ResourceVersion: "2"}, + } + actual := actions[1].(clienttesting.UpdateAction).GetObject().(*corev1.Namespace) + if !equality.Semantic.DeepEqual(expected, actual) { + t.Error(JSONPatchNoError(expected, actual)) + } + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + client := fake.NewSimpleClientset(test.existing...) + var cache map[CachedVersionKey]CachedResource = make(map[CachedVersionKey]CachedResource) + _, actualModified, err := ApplyNamespaceImproved(context.TODO(), client.CoreV1(), events.NewInMemoryRecorder("test"), test.input, cache) + if err != nil { + t.Fatal(err) + } + if test.expectedModified != actualModified { + t.Errorf("expected %v, got %v", test.expectedModified, actualModified) + } + test.verifyActions(client.Actions(), t) + }) + } +} + func TestSyncSecret(t *testing.T) { tt := []struct { name string diff --git a/pkg/operator/resource/resourceapply/generic.go b/pkg/operator/resource/resourceapply/generic.go index d1df4e3dbc..3d5aad37fd 100644 --- a/pkg/operator/resource/resourceapply/generic.go +++ b/pkg/operator/resource/resourceapply/generic.go @@ -101,7 +101,7 @@ func (c *ClientHolder) WithMigrationClient(client migrationclient.Interface) *Cl } // ApplyDirectly applies the given manifest files to API server. -func ApplyDirectly(ctx context.Context, clients *ClientHolder, recorder events.Recorder, manifests AssetFunc, files ...string) []ApplyResult { +func ApplyDirectly(ctx context.Context, clients *ClientHolder, recorder events.Recorder, cache map[CachedVersionKey]CachedResource, manifests AssetFunc, files ...string) []ApplyResult { ret := []ApplyResult{} for _, file := range files { @@ -126,39 +126,39 @@ func ApplyDirectly(ctx context.Context, clients *ClientHolder, recorder events.R if clients.kubeClient == nil { result.Error = fmt.Errorf("missing kubeClient") } else { - result.Result, result.Changed, result.Error = ApplyNamespace(ctx, clients.kubeClient.CoreV1(), recorder, t) + result.Result, result.Changed, result.Error = ApplyNamespaceImproved(ctx, clients.kubeClient.CoreV1(), recorder, t, cache) } case *corev1.Service: if clients.kubeClient == nil { result.Error = fmt.Errorf("missing kubeClient") } else { - result.Result, result.Changed, result.Error = ApplyService(ctx, clients.kubeClient.CoreV1(), recorder, t) + result.Result, result.Changed, result.Error = ApplyServiceImproved(ctx, clients.kubeClient.CoreV1(), recorder, t, cache) } case *corev1.Pod: if clients.kubeClient == nil { result.Error = fmt.Errorf("missing kubeClient") } else { - result.Result, result.Changed, result.Error = ApplyPod(ctx, clients.kubeClient.CoreV1(), recorder, t) + result.Result, result.Changed, result.Error = ApplyPodImproved(ctx, clients.kubeClient.CoreV1(), recorder, t, cache) } case *corev1.ServiceAccount: if clients.kubeClient == nil { result.Error = fmt.Errorf("missing kubeClient") } else { - result.Result, result.Changed, result.Error = ApplyServiceAccount(ctx, clients.kubeClient.CoreV1(), recorder, t) + result.Result, result.Changed, result.Error = ApplyServiceAccountImproved(ctx, clients.kubeClient.CoreV1(), recorder, t, cache) } case *corev1.ConfigMap: client := clients.configMapsGetter() if client == nil { result.Error = fmt.Errorf("missing kubeClient") } else { - result.Result, result.Changed, result.Error = ApplyConfigMap(ctx, client, recorder, t) + result.Result, result.Changed, result.Error = ApplyConfigMapImproved(ctx, client, recorder, t, cache) } case *corev1.Secret: client := clients.secretsGetter() if client == nil { result.Error = fmt.Errorf("missing kubeClient") } else { - result.Result, result.Changed, result.Error = ApplySecret(ctx, client, recorder, t) + result.Result, result.Changed, result.Error = ApplySecretImproved(ctx, client, recorder, t, cache) } case *rbacv1.ClusterRole: if clients.kubeClient == nil { diff --git a/pkg/operator/resource/resourceapply/generic_test.go b/pkg/operator/resource/resourceapply/generic_test.go index d84b9f9c3a..924e144dcf 100644 --- a/pkg/operator/resource/resourceapply/generic_test.go +++ b/pkg/operator/resource/resourceapply/generic_test.go @@ -38,7 +38,7 @@ metadata: `), nil } recorder := events.NewInMemoryRecorder("") - ret := ApplyDirectly(context.TODO(), (&ClientHolder{}).WithKubernetes(fakeClient), recorder, content, "pvc") + ret := ApplyDirectly(context.TODO(), (&ClientHolder{}).WithKubernetes(fakeClient), recorder, nil, content, "pvc") if ret[0].Error == nil { t.Fatal("missing expected error") } else if ret[0].Error.Error() != "unhandled type *v1.PersistentVolumeClaim" { diff --git a/pkg/operator/staticresourcecontroller/static_resource_controller.go b/pkg/operator/staticresourcecontroller/static_resource_controller.go index 857a56ec60..0cb06f2160 100644 --- a/pkg/operator/staticresourcecontroller/static_resource_controller.go +++ b/pkg/operator/staticresourcecontroller/static_resource_controller.go @@ -64,6 +64,7 @@ type StaticResourceController struct { factory *factory.Factory restMapper meta.RESTMapper categoryExpander restmapper.CategoryExpander + performanceCache map[resourceapply.CachedVersionKey]resourceapply.CachedResource } type conditionalManifests struct { @@ -100,7 +101,8 @@ func NewStaticResourceController( eventRecorder: eventRecorder.WithComponentSuffix(strings.ToLower(name)), - factory: factory.New().WithInformers(operatorClient.Informer()).ResyncEvery(1 * time.Minute), + factory: factory.New().WithInformers(operatorClient.Informer()).ResyncEvery(1 * time.Minute), + performanceCache: make(map[resourceapply.CachedVersionKey]resourceapply.CachedResource), } c.WithConditionalResources(manifests, files, nil, nil) @@ -251,7 +253,7 @@ func (c *StaticResourceController) AddNamespaceInformer(informer cache.SharedInd return c } -func (c StaticResourceController) Sync(ctx context.Context, syncContext factory.SyncContext) error { +func (c *StaticResourceController) Sync(ctx context.Context, syncContext factory.SyncContext) error { operatorSpec, _, _, err := c.operatorClient.GetOperatorState() if err != nil { return err @@ -277,7 +279,7 @@ func (c StaticResourceController) Sync(ctx context.Context, syncContext factory. continue case shouldCreate: - directResourceResults = resourceapply.ApplyDirectly(ctx, c.clients, syncContext.Recorder(), conditionalManifest.manifests, conditionalManifest.files...) + directResourceResults = resourceapply.ApplyDirectly(ctx, c.clients, syncContext.Recorder(), c.performanceCache, conditionalManifest.manifests, conditionalManifest.files...) case shouldDelete: directResourceResults = resourceapply.DeleteAll(ctx, c.clients, syncContext.Recorder(), conditionalManifest.manifests, conditionalManifest.files...) }