diff --git a/pkg/operator/controller/ingress/load_balancer_service.go b/pkg/operator/controller/ingress/load_balancer_service.go index b19667e6e7..c0b43d4140 100644 --- a/pkg/operator/controller/ingress/load_balancer_service.go +++ b/pkg/operator/controller/ingress/load_balancer_service.go @@ -2,6 +2,7 @@ package ingress import ( "context" + "encoding/json" "fmt" "strconv" "strings" @@ -21,6 +22,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" crclient "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -104,6 +106,12 @@ const ( // openstackInternalLBAnnotation is the annotation used on a service to specify an // OpenStack load balancer as being internal. openstackInternalLBAnnotation = "service.beta.kubernetes.io/openstack-internal-load-balancer" + + // localWithFallbackAnnotation is the annotation used on a service that + // has "Local" external traffic policy to indicate that the service + // proxy should prefer using a local endpoint but forward traffic to any + // available endpoint if no local endpoint is available. + localWithFallbackAnnotation = "traffic-policy.network.alpha.openshift.io/local-with-fallback" ) var ( @@ -298,12 +306,50 @@ func desiredLoadBalancerService(ci *operatorv1.IngressController, deploymentRef } // Azure load balancers are not customizable and are set to (2 fail @ 5s interval, 2 healthy) // GCP load balancers are not customizable and are set to (3 fail @ 8s interval, 1 healthy) + + if v, err := shouldUseLocalWithFallback(ci, service); err != nil { + return true, service, err + } else if v { + service.Annotations[localWithFallbackAnnotation] = "" + } } service.SetOwnerReferences([]metav1.OwnerReference{deploymentRef}) return true, service, nil } +// shouldUseLocalWithFallback returns a Boolean value indicating whether the +// local-with-fallback annotation should be set for the given service, and +// returns an error if the given ingresscontroller has an invalid unsupported +// config override. +func shouldUseLocalWithFallback(ic *operatorv1.IngressController, service *corev1.Service) (bool, error) { + // By default, use local-with-fallback when using the "Local" external + // traffic policy. + if service.Spec.ExternalTrafficPolicy != corev1.ServiceExternalTrafficPolicyTypeLocal { + return false, nil + } + + // Allow the user to override local-with-fallback. + if len(ic.Spec.UnsupportedConfigOverrides.Raw) > 0 { + var unsupportedConfigOverrides struct { + LocalWithFallback string `json:"localWithFallback"` + } + if err := json.Unmarshal(ic.Spec.UnsupportedConfigOverrides.Raw, &unsupportedConfigOverrides); err != nil { + return false, fmt.Errorf("ingresscontroller %q has invalid spec.unsupportedConfigOverrides: %w", ic.Name, err) + } + override := unsupportedConfigOverrides.LocalWithFallback + if len(override) != 0 { + if val, err := strconv.ParseBool(override); err != nil { + return false, fmt.Errorf("ingresscontroller %q has invalid spec.unsupportedConfigOverrides.localWithFallback: %w", ic.Name, err) + } else { + return val, nil + } + } + } + + return true, nil +} + // currentLoadBalancerService returns any existing LB service for the // ingresscontroller. func (r *reconciler) currentLoadBalancerService(ci *operatorv1.IngressController) (bool, *corev1.Service, error) { @@ -394,14 +440,31 @@ func (r *reconciler) updateLoadBalancerService(current, desired *corev1.Service, return true, nil } +// managedLoadBalancerServiceAnnotations is a set of annotation keys for +// annotations that the operator manages for LoadBalancer-type services. +var managedLoadBalancerServiceAnnotations = sets.NewString( + awsLBHealthCheckIntervalAnnotation, + GCPGlobalAccessAnnotation, + localWithFallbackAnnotation, +) + // loadBalancerServiceChanged checks if the current load balancer service // matches the expected and if not returns an updated one. func loadBalancerServiceChanged(current, expected *corev1.Service) (bool, *corev1.Service) { + annotationCmpOpts := []cmp.Option{ + cmpopts.IgnoreMapEntries(func(k, _ string) bool { + return !managedLoadBalancerServiceAnnotations.Has(k) + }), + } + if cmp.Equal(current.Annotations, expected.Annotations, annotationCmpOpts...) { + return false, nil + } + updated := current.DeepCopy() - changed := false - // Preserve everything but the AWS LB health check interval annotation & - // GCP Global Access internal Load Balancer annotation. + // Preserve everything but the AWS LB health check interval annotation, + // GCP Global Access internal Load Balancer annotation, and + // local-with-fallback annotation // (see ). // Updating annotations and spec fields cannot be done unless the // previous release blocks upgrades when the user has modified those @@ -409,18 +472,15 @@ func loadBalancerServiceChanged(current, expected *corev1.Service) (bool, *corev if updated.Annotations == nil { updated.Annotations = map[string]string{} } - if current.Annotations[awsLBHealthCheckIntervalAnnotation] != expected.Annotations[awsLBHealthCheckIntervalAnnotation] { - updated.Annotations[awsLBHealthCheckIntervalAnnotation] = expected.Annotations[awsLBHealthCheckIntervalAnnotation] - changed = true - } - - if current.Annotations[GCPGlobalAccessAnnotation] != expected.Annotations[GCPGlobalAccessAnnotation] { - updated.Annotations[GCPGlobalAccessAnnotation] = expected.Annotations[GCPGlobalAccessAnnotation] - changed = true - } - if !changed { - return false, nil + for annotation := range managedLoadBalancerServiceAnnotations { + currentVal, have := current.Annotations[annotation] + expectedVal, want := expected.Annotations[annotation] + if want && (!have || currentVal != expectedVal) { + updated.Annotations[annotation] = expected.Annotations[annotation] + } else if have && !want { + delete(updated.Annotations, annotation) + } } return true, updated diff --git a/pkg/operator/controller/ingress/load_balancer_service_test.go b/pkg/operator/controller/ingress/load_balancer_service_test.go index 4c371fad9b..4a2a0588ba 100644 --- a/pkg/operator/controller/ingress/load_balancer_service_test.go +++ b/pkg/operator/controller/ingress/load_balancer_service_test.go @@ -10,6 +10,7 @@ import ( operatorv1 "github.com/openshift/api/operator/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/intstr" ) @@ -319,6 +320,9 @@ func TestDesiredLoadBalancerService(t *testing.T) { if err := checkServiceHasAnnotation(svc, awsLBHealthCheckHealthyThresholdAnnotation, true, awsLBHealthCheckHealthyThresholdDefault); err != nil { t.Errorf("annotation check for test %q failed: %v", tc.description, err) } + if err := checkServiceHasAnnotation(svc, localWithFallbackAnnotation, true, ""); err != nil { + t.Errorf("local-with-fallback annotation check for test %q failed: %v", tc.description, err) + } classicLB := tc.lbStrategy.ProviderParameters == nil || tc.lbStrategy.ProviderParameters.AWS.Type == operatorv1.AWSClassicLoadBalancer switch { case classicLB: @@ -388,6 +392,9 @@ func TestDesiredLoadBalancerService(t *testing.T) { t.Errorf("annotation check for test %q failed; unexpected annotation %s", tc.description, azureInternalLBAnnotation) } } + if err := checkServiceHasAnnotation(svc, localWithFallbackAnnotation, true, ""); err != nil { + t.Errorf("local-with-fallback annotation check for test %q failed: %v", tc.description, err) + } case configv1.GCPPlatformType: if isInternal { if err := checkServiceHasAnnotation(svc, gcpLBTypeAnnotation, true, "Internal"); err != nil { @@ -399,6 +406,9 @@ func TestDesiredLoadBalancerService(t *testing.T) { t.Errorf("annotation check for test %q failed; unexpected annotation %s", tc.description, gcpLBTypeAnnotation) } } + if err := checkServiceHasAnnotation(svc, localWithFallbackAnnotation, true, ""); err != nil { + t.Errorf("local-with-fallback annotation check for test %q failed: %v", tc.description, err) + } case configv1.OpenStackPlatformType: if isInternal { if err := checkServiceHasAnnotation(svc, openstackInternalLBAnnotation, true, "true"); err != nil { @@ -410,6 +420,9 @@ func TestDesiredLoadBalancerService(t *testing.T) { t.Errorf("annotation check for test %q failed; unexpected annotation %s", tc.description, openstackInternalLBAnnotation) } } + if err := checkServiceHasAnnotation(svc, localWithFallbackAnnotation, true, ""); err != nil { + t.Errorf("local-with-fallback annotation check for test %q failed: %v", tc.description, err) + } } } } @@ -439,6 +452,72 @@ func checkServiceHasAnnotation(svc *corev1.Service, name string, expectValue boo } } +// TestShouldUseLocalWithFallback verifies that shouldUseLocalWithFallback +// behaves as expected. +func TestShouldUseLocalWithFallback(t *testing.T) { + testCases := []struct { + description string + local bool + override string + expect bool + expectError bool + }{ + { + description: "if using Cluster without an override", + local: false, + expect: false, + }, + { + description: "if using Local without an override", + local: true, + expect: true, + }, + { + description: "if using Local with an override", + local: true, + override: `{"localWithFallback":"false"}`, + expect: false, + }, + { + description: "if using Local with a garbage override", + local: true, + override: `{"localWithFallback":"x"}`, + expectError: true, + }, + } + for _, tc := range testCases { + var override []byte + if len(tc.override) != 0 { + override = []byte(tc.override) + } + ic := &operatorv1.IngressController{ + Spec: operatorv1.IngressControllerSpec{ + UnsupportedConfigOverrides: runtime.RawExtension{ + Raw: override, + }, + }, + } + policy := corev1.ServiceExternalTrafficPolicyTypeCluster + if tc.local { + policy = corev1.ServiceExternalTrafficPolicyTypeLocal + } + service := corev1.Service{ + Spec: corev1.ServiceSpec{ + ExternalTrafficPolicy: policy, + }, + } + actual, err := shouldUseLocalWithFallback(ic, &service) + switch { + case !tc.expectError && err != nil: + t.Errorf("%q: unexpected error: %w", tc.description, err) + case tc.expectError && err == nil: + t.Errorf("%q: expected error, got nil", tc.description) + case tc.expect != actual: + t.Errorf("%q: expected %t, got %t", tc.description, tc.expect, actual) + } + } +} + func TestLoadBalancerServiceChanged(t *testing.T) { testCases := []struct { description string @@ -478,6 +557,13 @@ func TestLoadBalancerServiceChanged(t *testing.T) { }, expect: false, }, + { + description: "if the local-with-fallback annotation is added", + mutate: func(svc *corev1.Service) { + svc.Annotations[localWithFallbackAnnotation] = "" + }, + expect: true, + }, { description: "if .spec.healthCheckNodePort changes", mutate: func(svc *corev1.Service) { @@ -548,6 +634,13 @@ func TestLoadBalancerServiceChanged(t *testing.T) { }, expect: true, }, + { + description: "if the service.beta.kubernetes.io/aws-load-balancer-healthcheck-interval annotation is deleted", + mutate: func(svc *corev1.Service) { + delete(svc.Annotations, "service.beta.kubernetes.io/aws-load-balancer-healthcheck-interval") + }, + expect: true, + }, } for _, tc := range testCases { diff --git a/pkg/operator/controller/ingress/nodeport_service.go b/pkg/operator/controller/ingress/nodeport_service.go index e38cc5d57c..755fd924ea 100644 --- a/pkg/operator/controller/ingress/nodeport_service.go +++ b/pkg/operator/controller/ingress/nodeport_service.go @@ -15,6 +15,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/intstr" ) @@ -46,7 +47,10 @@ func (r *reconciler) ensureNodePortService(ic *operatorv1.IngressController, dep } } - wantService, desired := desiredNodePortService(ic, deploymentRef, wantMetricsPort) + wantService, desired, err := desiredNodePortService(ic, deploymentRef, wantMetricsPort) + if err != nil { + return false, nil, err + } switch { case !wantService && !haveService: @@ -79,16 +83,17 @@ func (r *reconciler) ensureNodePortService(ic *operatorv1.IngressController, dep // desiredNodePortService returns a Boolean indicating whether a NodePort // service is desired, as well as the NodePort service if one is desired. -func desiredNodePortService(ic *operatorv1.IngressController, deploymentRef metav1.OwnerReference, wantMetricsPort bool) (bool, *corev1.Service) { +func desiredNodePortService(ic *operatorv1.IngressController, deploymentRef metav1.OwnerReference, wantMetricsPort bool) (bool, *corev1.Service, error) { if ic.Status.EndpointPublishingStrategy.Type != operatorv1.NodePortServiceStrategyType { - return false, nil + return false, nil, nil } name := controller.NodePortServiceName(ic) service := &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ - Namespace: name.Namespace, - Name: name.Name, + Annotations: map[string]string{}, + Namespace: name.Namespace, + Name: name.Name, Labels: map[string]string{ "app": "router", "router": name.Name, @@ -126,7 +131,13 @@ func desiredNodePortService(ic *operatorv1.IngressController, deploymentRef meta service.Spec.Ports = service.Spec.Ports[0:2] } - return true, service + if v, err := shouldUseLocalWithFallback(ic, service); err != nil { + return true, service, err + } else if v { + service.Annotations[localWithFallbackAnnotation] = "" + } + + return true, service, nil } // currentNodePortService returns a Boolean indicating whether a NodePort @@ -160,9 +171,17 @@ func (r *reconciler) updateNodePortService(current, desired *corev1.Service) (bo return true, nil } +// managedNodePortServiceAnnotations is a set of annotation keys for annotations +// that the operator manages for NodePort-type services. +var managedNodePortServiceAnnotations = sets.NewString( + localWithFallbackAnnotation, +) + // nodePortServiceChanged checks if the current NodePort service spec matches // the expected spec and if not returns an updated one. func nodePortServiceChanged(current, expected *corev1.Service) (bool, *corev1.Service) { + changed := false + serviceCmpOpts := []cmp.Option{ // Ignore fields that the API, other controllers, or user may // have modified. @@ -171,13 +190,38 @@ func nodePortServiceChanged(current, expected *corev1.Service) (bool, *corev1.Se cmp.Comparer(cmpServiceAffinity), cmpopts.EquateEmpty(), } - if cmp.Equal(current.Spec, expected.Spec, serviceCmpOpts...) { + if !cmp.Equal(current.Spec, expected.Spec, serviceCmpOpts...) { + changed = true + } + + annotationCmpOpts := []cmp.Option{ + cmpopts.IgnoreMapEntries(func(k, _ string) bool { + return !managedNodePortServiceAnnotations.Has(k) + }), + } + if !cmp.Equal(current.Annotations, expected.Annotations, annotationCmpOpts...) { + changed = true + } + + if !changed { return false, nil } updated := current.DeepCopy() updated.Spec = expected.Spec + if updated.Annotations == nil { + updated.Annotations = map[string]string{} + } + for annotation := range managedNodePortServiceAnnotations { + currentVal, have := current.Annotations[annotation] + expectedVal, want := expected.Annotations[annotation] + if want && (!have || currentVal != expectedVal) { + updated.Annotations[annotation] = expected.Annotations[annotation] + } else if have && !want { + delete(updated.Annotations, annotation) + } + } // Preserve fields that the API, other controllers, or user may have // modified. updated.Spec.ClusterIP = current.Spec.ClusterIP diff --git a/pkg/operator/controller/ingress/nodeport_service_test.go b/pkg/operator/controller/ingress/nodeport_service_test.go index faf3627c39..f315bd177c 100644 --- a/pkg/operator/controller/ingress/nodeport_service_test.go +++ b/pkg/operator/controller/ingress/nodeport_service_test.go @@ -37,6 +37,9 @@ func TestDesiredNodePortService(t *testing.T) { expect: true, expectService: corev1.Service{ ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + localWithFallbackAnnotation: "", + }, Namespace: "openshift-ingress", Name: "router-nodeport-default", Labels: map[string]string{ @@ -75,6 +78,9 @@ func TestDesiredNodePortService(t *testing.T) { expect: true, expectService: corev1.Service{ ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + localWithFallbackAnnotation: "", + }, Namespace: "openshift-ingress", Name: "router-nodeport-default", Labels: map[string]string{ @@ -126,8 +132,10 @@ func TestDesiredNodePortService(t *testing.T) { }, }, } - want, svc := desiredNodePortService(ic, deploymentRef, tc.wantMetricsPort) - if want != tc.expect { + want, svc, err := desiredNodePortService(ic, deploymentRef, tc.wantMetricsPort) + if err != nil { + t.Errorf("unexpected error from desiredNodePortService: %w", err) + } else if want != tc.expect { t.Errorf("expected desiredNodePortService to return %t for endpoint publishing strategy type %v, got %t, with service %#v", tc.expect, tc.strategyType, want, svc) } else if tc.expect && !reflect.DeepEqual(svc, &tc.expectService) { t.Errorf("expected desiredNodePortService to return %#v, got %#v", &tc.expectService, svc) @@ -175,6 +183,20 @@ func TestNodePortServiceChanged(t *testing.T) { }, expect: true, }, + { + description: "if the local-with-fallback annotation changes", + mutate: func(svc *corev1.Service) { + svc.Annotations["traffic-policy.network.alpha.openshift.io/local-with-fallback"] = "x" + }, + expect: true, + }, + { + description: "if the local-with-fallback annotation is deleted", + mutate: func(svc *corev1.Service) { + delete(svc.Annotations, "traffic-policy.network.alpha.openshift.io/local-with-fallback") + }, + expect: true, + }, { description: "if .spec.healthCheckNodePort changes", mutate: func(svc *corev1.Service) { @@ -236,6 +258,9 @@ func TestNodePortServiceChanged(t *testing.T) { for _, tc := range testCases { original := corev1.Service{ ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + "traffic-policy.network.alpha.openshift.io/local-with-fallback": "", + }, Namespace: "openshift-ingress", Name: "router-original", UID: "1", diff --git a/test/e2e/operator_test.go b/test/e2e/operator_test.go index 9817256261..01dab12982 100644 --- a/test/e2e/operator_test.go +++ b/test/e2e/operator_test.go @@ -1978,6 +1978,128 @@ func TestLoadBalancingAlgorithmUnsupportedConfigOverride(t *testing.T) { } } +// TestLocalWithFallbackOverrideForLoadBalancerService verifies that the +// operator does not set the local-with-fallback annotation on a LoadBalancer +// service if the the localWithFallback unsupported config override is set to +// "false". +// +// Note: This test mutates the default ingresscontroller rather than creating a +// new one to reduce the risk of failing due to cloud provider API throttling. +func TestLocalWithFallbackOverrideForLoadBalancerService(t *testing.T) { + supportedPlatforms := map[configv1.PlatformType]struct{}{ + configv1.AWSPlatformType: {}, + configv1.AzurePlatformType: {}, + configv1.GCPPlatformType: {}, + } + platform := infraConfig.Status.Platform + if _, supported := supportedPlatforms[platform]; !supported { + t.Skipf("test skipped on platform %q", platform) + } + + ic := &operatorv1.IngressController{} + if err := kclient.Get(context.TODO(), defaultName, ic); err != nil { + t.Fatalf("failed to get ingresscontroller %q: %v", defaultName, err) + } + + if err := waitForIngressControllerCondition(t, kclient, 5*time.Minute, defaultName, defaultAvailableConditions...); err != nil { + t.Fatalf("failed to observe expected conditions: %v", err) + } + + service := &corev1.Service{} + serviceName := controller.LoadBalancerServiceName(ic) + if err := kclient.Get(context.TODO(), serviceName, service); err != nil { + t.Fatalf("failed to get service %q: %v", serviceName, err) + } + + const annotation = "traffic-policy.network.alpha.openshift.io/local-with-fallback" + + if _, ok := service.Annotations[annotation]; !ok { + t.Fatalf("failed to observe the %q annotation on service %q", annotation, serviceName) + } + + ic.Spec.UnsupportedConfigOverrides = runtime.RawExtension{ + Raw: []byte(`{"localWithFallback":"false"}`), + } + if err := kclient.Update(context.TODO(), ic); err != nil { + t.Fatalf("failed to update ingresscontroller %q with override: %v", defaultName, err) + } + defer func() { + if err := kclient.Get(context.TODO(), defaultName, ic); err != nil { + t.Fatalf("failed to get ingresscontroller %q: %v", defaultName, err) + } + ic.Spec.UnsupportedConfigOverrides = runtime.RawExtension{} + if err := kclient.Update(context.TODO(), ic); err != nil { + t.Fatalf("failed to update ingresscontroller %q to remove the override: %v", defaultName, err) + } + }() + + wait.PollImmediate(1*time.Second, 30*time.Second, func() (bool, error) { + if err := kclient.Get(context.TODO(), serviceName, service); err != nil { + t.Logf("failed to get service %q: %v", serviceName, err) + return false, nil + } + _, ok := service.Annotations[annotation] + return !ok, nil + }) + if _, ok := service.Annotations[annotation]; ok { + t.Fatalf("failed to observe removal of the %q annotation on service %q", annotation, serviceName) + } +} + +// TestLocalWithFallbackOverrideForNodePortService verifies that the operator +// does not set the local-with-fallback annotation on a NodePort service if the +// the localWithFallback unsupported config override is set to "false". +func TestLocalWithFallbackOverrideForNodePortService(t *testing.T) { + icName := types.NamespacedName{ + Namespace: operatorNamespace, + Name: "local-with-fallback", + } + domain := icName.Name + "." + dnsConfig.Spec.BaseDomain + ic := newNodePortController(icName, domain) + if err := kclient.Create(context.TODO(), ic); err != nil { + t.Fatalf("failed to create ingresscontroller %q: %v", icName, err) + } + defer assertIngressControllerDeleted(t, kclient, ic) + + if err := waitForIngressControllerCondition(t, kclient, 5*time.Minute, icName, availableConditionsForIngressControllerWithNodePort...); err != nil { + t.Fatalf("failed to observe expected conditions: %v", err) + } + + service := &corev1.Service{} + serviceName := controller.NodePortServiceName(ic) + if err := kclient.Get(context.TODO(), serviceName, service); err != nil { + t.Fatalf("failed to get service %q: %v", serviceName, err) + } + + const annotation = "traffic-policy.network.alpha.openshift.io/local-with-fallback" + + if _, ok := service.Annotations[annotation]; !ok { + t.Fatalf("failed to observe the %q annotation on ingresscontroller %q", annotation, icName) + } + + if err := kclient.Get(context.TODO(), icName, ic); err != nil { + t.Fatalf("failed to get ingresscontroller %q: %v", icName, err) + } + ic.Spec.UnsupportedConfigOverrides = runtime.RawExtension{ + Raw: []byte(`{"localWithFallback":"false"}`), + } + if err := kclient.Update(context.TODO(), ic); err != nil { + t.Fatalf("failed to update ingresscontroller %q with override: %v", icName, err) + } + + wait.PollImmediate(1*time.Second, 30*time.Second, func() (bool, error) { + if err := kclient.Get(context.TODO(), serviceName, service); err != nil { + t.Logf("failed to get service %q: %v", serviceName, err) + return false, nil + } + _, ok := service.Annotations[annotation] + return !ok, nil + }) + if _, ok := service.Annotations[annotation]; ok { + t.Fatalf("failed to observe removal of the %q annotation on service %q", annotation, serviceName) + } +} + func newLoadBalancerController(name types.NamespacedName, domain string) *operatorv1.IngressController { repl := int32(1) return &operatorv1.IngressController{