From c7437c1452a312a6a9344029b0164311861f327e Mon Sep 17 00:00:00 2001 From: Marco Braga Date: Thu, 29 May 2025 23:49:41 -0300 Subject: [PATCH 1/9] e2e/deps: enhance test scenarios with NLB --- tests/e2e/go.mod | 14 ++++++++++++++ tests/e2e/go.sum | 28 ++++++++++++++++++++++++++++ 2 files changed, 42 insertions(+) diff --git a/tests/e2e/go.mod b/tests/e2e/go.mod index 873f84da33..a5367e3589 100644 --- a/tests/e2e/go.mod +++ b/tests/e2e/go.mod @@ -3,6 +3,9 @@ module k8s.io/cloud-provider-aws/tests/e2e go 1.24.9 require ( + github.com/aws/aws-sdk-go-v2 v1.36.3 + github.com/aws/aws-sdk-go-v2/config v1.29.14 + github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2 v1.45.2 github.com/onsi/ginkgo/v2 v2.9.4 github.com/onsi/gomega v1.27.6 k8s.io/api v0.26.0 @@ -13,6 +16,17 @@ require ( ) require ( + github.com/aws/aws-sdk-go-v2/credentials v1.17.67 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.30 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.34 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.34 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.3 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.15 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.25.3 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.30.1 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.33.19 // indirect + github.com/aws/smithy-go v1.22.2 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/blang/semver/v4 v4.0.0 // indirect github.com/cenkalti/backoff/v4 v4.1.3 // indirect diff --git a/tests/e2e/go.sum b/tests/e2e/go.sum index 663324c896..4e8455f1bf 100644 --- a/tests/e2e/go.sum +++ b/tests/e2e/go.sum @@ -42,6 +42,34 @@ github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk5 github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= +github.com/aws/aws-sdk-go-v2 v1.36.3 h1:mJoei2CxPutQVxaATCzDUjcZEjVRdpsiiXi2o38yqWM= +github.com/aws/aws-sdk-go-v2 v1.36.3/go.mod h1:LLXuLpgzEbD766Z5ECcRmi8AzSwfZItDtmABVkRLGzg= +github.com/aws/aws-sdk-go-v2/config v1.29.14 h1:f+eEi/2cKCg9pqKBoAIwRGzVb70MRKqWX4dg1BDcSJM= +github.com/aws/aws-sdk-go-v2/config v1.29.14/go.mod h1:wVPHWcIFv3WO89w0rE10gzf17ZYy+UVS1Geq8Iei34g= +github.com/aws/aws-sdk-go-v2/credentials v1.17.67 h1:9KxtdcIA/5xPNQyZRgUSpYOE6j9Bc4+D7nZua0KGYOM= +github.com/aws/aws-sdk-go-v2/credentials v1.17.67/go.mod h1:p3C44m+cfnbv763s52gCqrjaqyPikj9Sg47kUVaNZQQ= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.30 h1:x793wxmUWVDhshP8WW2mlnXuFrO4cOd3HLBroh1paFw= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.30/go.mod h1:Jpne2tDnYiFascUEs2AWHJL9Yp7A5ZVy3TNyxaAjD6M= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.34 h1:ZK5jHhnrioRkUNOc+hOgQKlUL5JeC3S6JgLxtQ+Rm0Q= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.34/go.mod h1:p4VfIceZokChbA9FzMbRGz5OV+lekcVtHlPKEO0gSZY= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.34 h1:SZwFm17ZUNNg5Np0ioo/gq8Mn6u9w19Mri8DnJ15Jf0= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.34/go.mod h1:dFZsC0BLo346mvKQLWmoJxT+Sjp+qcVR1tRVHQGOH9Q= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 h1:bIqFDwgGXXN1Kpp99pDOdKMTTb5d2KyU5X/BZxjOkRo= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3/go.mod h1:H5O/EsxDWyU+LP/V8i5sm8cxoZgc2fdNR9bxlOFrQTo= +github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2 v1.45.2 h1:vX70Z4lNSr7XsioU0uJq5yvxgI50sB66MvD+V/3buS4= +github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2 v1.45.2/go.mod h1:xnCC3vFBfOKpU6PcsCKL2ktgBTZfOwTGxj6V8/X3IS4= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.3 h1:eAh2A4b5IzM/lum78bZ590jy36+d/aFLgKF/4Vd1xPE= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.3/go.mod h1:0yKJC/kb8sAnmlYa6Zs3QVYqaC8ug2AbnNChv5Ox3uA= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.15 h1:dM9/92u2F1JbDaGooxTq18wmmFzbJRfXfVfy96/1CXM= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.15/go.mod h1:SwFBy2vjtA0vZbjjaFtfN045boopadnoVPhu4Fv66vY= +github.com/aws/aws-sdk-go-v2/service/sso v1.25.3 h1:1Gw+9ajCV1jogloEv1RRnvfRFia2cL6c9cuKV2Ps+G8= +github.com/aws/aws-sdk-go-v2/service/sso v1.25.3/go.mod h1:qs4a9T5EMLl/Cajiw2TcbNt2UNo/Hqlyp+GiuG4CFDI= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.30.1 h1:hXmVKytPfTy5axZ+fYbR5d0cFmC3JvwLm5kM83luako= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.30.1/go.mod h1:MlYRNmYu/fGPoxBQVvBYr9nyr948aY/WLUvwBMBJubs= +github.com/aws/aws-sdk-go-v2/service/sts v1.33.19 h1:1XuUZ8mYJw9B6lzAkXhqHlJd/XvaX32evhproijJEZY= +github.com/aws/aws-sdk-go-v2/service/sts v1.33.19/go.mod h1:cQnB8CUnxbMU82JvlqjKR2HBOm3fe9pWorWBza6MBJ4= +github.com/aws/smithy-go v1.22.2 h1:6D9hW43xKFrRx/tXXfAlIZc4JI+yQe6snnWcQyxSyLQ= +github.com/aws/smithy-go v1.22.2/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= From 0cde97ee0152ce6e2220732d3acbf321567817fd Mon Sep 17 00:00:00 2001 From: Marco Braga Date: Thu, 29 May 2025 23:50:06 -0300 Subject: [PATCH 2/9] e2e: enhance test scenarios with NLB This change enhance test scenarios by: - supporting more distributions which does not allow pods to bind on privileged ports (default behavior of libjig, see issue - refact tests to allow adding more cases - introduce tests to NLB, including advanced tests to validate the node selector annotation. AWS SDK is added to satisfy this validatoin. --- tests/e2e/loadbalancer.go | 373 ++++++++++++++++++++++++++++++++++---- 1 file changed, 337 insertions(+), 36 deletions(-) diff --git a/tests/e2e/loadbalancer.go b/tests/e2e/loadbalancer.go index 79b0e2625e..bd786b6aae 100644 --- a/tests/e2e/loadbalancer.go +++ b/tests/e2e/loadbalancer.go @@ -14,15 +14,44 @@ limitations under the License. package e2e import ( + "context" + "fmt" + "strings" + . "github.com/onsi/ginkgo/v2" v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" clientset "k8s.io/client-go/kubernetes" "k8s.io/kubernetes/test/e2e/framework" e2eservice "k8s.io/kubernetes/test/e2e/framework/service" + imageutils "k8s.io/kubernetes/test/utils/image" admissionapi "k8s.io/pod-security-admission/api" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + elbv2 "github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2" +) + +const ( + annotationLBType = "service.beta.kubernetes.io/aws-load-balancer-type" + annotationLBTargetNodeLabels = "service.beta.kubernetes.io/aws-load-balancer-target-node-labels" +) + +var ( + // clusterNodeSelector is the discovered node(compute/worker) selector used in the cluster. + clusterNodesSelector string + clusterNodesCount int = 0 + + // lookupNodeSelectors are valid compute/node/worker selectors commonly used in different kubernetes + // distributions. + lookupNodeSelectors = []string{ + "node-role.kubernetes.io/worker", // used in must distributions + "node-role.kubernetes.io/node", // used in ccm-aws CI + } ) +// loadbalancer tests var _ = Describe("[cloud-provider-aws-e2e] loadbalancer", func() { f := framework.NewDefaultFramework("cloud-provider-aws") f.NamespacePodSecurityEnforceLevel = admissionapi.LevelPrivileged @@ -41,61 +70,333 @@ var _ = Describe("[cloud-provider-aws-e2e] loadbalancer", func() { // After each test }) - It("should configure the loadbalancer based on annotations", func() { - loadBalancerCreateTimeout := e2eservice.GetServiceLoadBalancerCreationTimeout(cs) - framework.Logf("Running tests against AWS with timeout %s", loadBalancerCreateTimeout) + type loadBalancerTestCases struct { + Name string + ResourceSuffix string + Annotations map[string]string + PostConfigService func(cfg *configServiceLB, svc *v1.Service) + PostRunValidation func(cfg *configServiceLB, svc *v1.Service) + } + cases := []loadBalancerTestCases{ + { + Name: "should configure the loadbalancer based on annotations", + ResourceSuffix: "", + Annotations: map[string]string{}, + }, + { + Name: "NLB should configure the loadbalancer based on annotations", + ResourceSuffix: "nlb", + Annotations: map[string]string{ + annotationLBType: "nlb", + }, + }, + { + Name: "NLB should configure the loadbalancer with target-node-labels", + ResourceSuffix: "sg-nd", + Annotations: map[string]string{ + annotationLBType: "nlb", + }, + PostConfigService: func(cfg *configServiceLB, svc *v1.Service) { + // discover clusterNodeSelector and patch service + // TODO: move to external function if there are more scenarios to discover nodes. + By("discovering node label used in the kubernetes distributions") + for _, selector := range lookupNodeSelectors { + nodeList, err := cs.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{ + LabelSelector: selector, + }) + framework.ExpectNoError(err, "failed to list worker nodes") + if len(nodeList.Items) > 0 { + clusterNodesCount = len(nodeList.Items) + clusterNodesSelector = selector + break + } + } + + if clusterNodesCount == 0 { + framework.ExpectNoError(fmt.Errorf("unable to find node selector for %v", lookupNodeSelectors)) + } + + By(fmt.Sprintf("found %d nodes with selector %q\n", clusterNodesCount, clusterNodesSelector)) + if svc.Annotations == nil { + svc.Annotations = map[string]string{} + } + svc.Annotations[annotationLBTargetNodeLabels] = clusterNodesSelector + By(fmt.Sprintf("using service with annotations: %v", svc.Annotations)) + }, + PostRunValidation: func(cfg *configServiceLB, svc *v1.Service) { + // Validate in the TG if the node count matches with expected target-node-labels selector. + if len(svc.Status.LoadBalancer.Ingress) == 0 { + framework.Failf("No ingress found in LoadBalancer status for service %s/%s", svc.Namespace, svc.Name) + } + lbDNS := svc.Status.LoadBalancer.Ingress[0].Hostname + framework.ExpectNoError(getLBTargetCount(context.TODO(), lbDNS, clusterNodesCount), "AWS LB target count validation failed") + }, + }, + } + + serviceNameBase := "lbconfig-test" + for _, tc := range cases { + It(tc.Name, func() { + loadBalancerCreateTimeout := e2eservice.GetServiceLoadBalancerCreationTimeout(cs) + framework.Logf("Running tests against AWS with timeout %s", loadBalancerCreateTimeout) + + // Create Configuration + serviceName := serviceNameBase + if len(tc.ResourceSuffix) > 0 { + serviceName = serviceName + "-" + tc.ResourceSuffix + } + framework.Logf("namespace for load balancer conig test: %s", ns.Name) + + By("creating a TCP service " + serviceName + " with type=LoadBalancerType in namespace " + ns.Name) + lbConfig := newConfigServiceLB() + lbConfig.LBJig = e2eservice.NewTestJig(cs, ns.Name, serviceName) + lbServiceConfig := lbConfig.buildService(tc.Annotations) + + // Hook: PostConfigService patchs service configuration. + if tc.PostConfigService != nil { + tc.PostConfigService(lbConfig, lbServiceConfig) + } + + // Create Load Balancer + By("creating loadbalancer for service " + lbServiceConfig.Namespace + "/" + lbServiceConfig.Name) + if _, err := lbConfig.LBJig.Client.CoreV1().Services(lbConfig.LBJig.Namespace).Create(context.TODO(), lbServiceConfig, metav1.CreateOptions{}); err != nil { + framework.ExpectNoError(fmt.Errorf("failed to create LoadBalancer Service %q: %v", lbServiceConfig.Name, err)) + } + + By("waiting for loadbalancer for service " + lbServiceConfig.Namespace + "/" + lbServiceConfig.Name) + lbService, err := lbConfig.LBJig.WaitForLoadBalancer(loadBalancerCreateTimeout) + framework.ExpectNoError(err) - serviceName := "lbconfig-test" - framework.Logf("namespace for load balancer conig test: %s", ns.Name) + // Run Workloads + By("creating a pod to be part of the TCP service " + serviceName) + _, err = lbConfig.LBJig.Run(lbConfig.buildReplicationController()) + framework.ExpectNoError(err) - By("creating a TCP service " + serviceName + " with type=LoadBalancerType in namespace " + ns.Name) - lbJig := e2eservice.NewTestJig(cs, ns.Name, serviceName) + // Hook: PostRunValidation performs LB validations after it is created (before test). + if tc.PostRunValidation != nil { + By("running post run validations") + tc.PostRunValidation(lbConfig, lbService) + } + + // Test the Service Endpoint + By("hitting the TCP service's LB External IP") + if len(lbService.Spec.Ports) == 0 { + framework.Failf("No ports found in service spec for service %s/%s", lbService.Namespace, lbService.Name) + } + if len(lbService.Status.LoadBalancer.Ingress) == 0 { + framework.Failf("No ingress found in LoadBalancer status for service %s/%s", lbService.Namespace, lbService.Name) + } + svcPort := int(lbService.Spec.Ports[0].Port) + ingressIP := e2eservice.GetIngressPoint(&lbService.Status.LoadBalancer.Ingress[0]) + framework.Logf("Load balancer's ingress IP: %s", ingressIP) + + e2eservice.TestReachableHTTP(ingressIP, svcPort, e2eservice.LoadBalancerLagTimeoutAWS) - serviceUpdateFunc := func(svc *v1.Service) { - annotations := make(map[string]string) - annotations["aws-load-balancer-backend-protocol"] = "http" - annotations["aws-load-balancer-ssl-ports"] = "https" + // Update the service to cluster IP + By("changing TCP service back to type=ClusterIP") + _, err = lbConfig.LBJig.UpdateService(func(s *v1.Service) { + s.Spec.Type = v1.ServiceTypeClusterIP + }) + framework.ExpectNoError(err) - svc.Annotations = annotations - svc.Spec.Ports = []v1.ServicePort{ + // Wait for the load balancer to be destroyed asynchronously + _, err = lbConfig.LBJig.WaitForLoadBalancerDestroy(ingressIP, svcPort, loadBalancerCreateTimeout) + framework.ExpectNoError(err) + }) + } +}) + +// configServiceLB hold loadbalancer test configurations used by e2e lib (jig). +type configServiceLB struct { + PodPort uint16 + PodProtocol v1.Protocol + DefaultAnnotations map[string]string + + LBJig *e2eservice.TestJig +} + +func newConfigServiceLB() *configServiceLB { + return &configServiceLB{ + PodPort: 8080, + PodProtocol: v1.ProtocolTCP, + DefaultAnnotations: map[string]string{ + "aws-load-balancer-backend-protocol": "http", + "aws-load-balancer-ssl-ports": "https", + }, + } +} + +// buildService creates a service instance with custom annotations. +func (s *configServiceLB) buildService(extraAnnotations map[string]string) *v1.Service { + svc := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: s.LBJig.Namespace, + Name: s.LBJig.Name, + Labels: s.LBJig.Labels, + Annotations: make(map[string]string, len(s.DefaultAnnotations)+len(extraAnnotations)), + }, + Spec: v1.ServiceSpec{ + Type: v1.ServiceTypeLoadBalancer, + SessionAffinity: v1.ServiceAffinityNone, + Selector: s.LBJig.Labels, + Ports: []v1.ServicePort{ { Name: "http", Protocol: v1.ProtocolTCP, Port: int32(80), - TargetPort: intstr.FromInt(80), + TargetPort: intstr.FromInt(int(s.PodPort)), }, { Name: "https", Protocol: v1.ProtocolTCP, Port: int32(443), - TargetPort: intstr.FromInt(80), + TargetPort: intstr.FromInt(int(s.PodPort)), }, - } + }, + }, + } + + // add default annotations - can be overriden by extra annotations + for aK, aV := range s.DefaultAnnotations { + svc.Annotations[aK] = aV + } + + // append test case annotations to the service + for aK, aV := range extraAnnotations { + svc.Annotations[aK] = aV + } + + // Defensive: ensure Annotations is not nil + if svc.Annotations == nil { + svc.Annotations = map[string]string{} + } + + return svc +} + +// buildReplicationController creates a replication controller wrapper for the test framework. +// buildReplicationController is basaed on newRCTemplate() from the test, which not provide +// customization to bind in non-privileged ports. +// TODO(mtulio): v1.33+[2] moved from RC to Deployments on tests, we must do the same to use Run() +// when the test framework is updated. +// [1] https://github.com/kubernetes/kubernetes/blob/89d95c9713a8fd189e8ad555120838b3c4f888d1/test/e2e/framework/service/jig.go#L636 +// [2] https://github.com/kubernetes/kubernetes/issues/119021 +func (s *configServiceLB) buildReplicationController() func(rc *v1.ReplicationController) { + return func(rc *v1.ReplicationController) { + var replicas int32 = 1 + var grace int64 = 3 // so we don't race with kube-proxy when scaling up/down + rc.ObjectMeta = metav1.ObjectMeta{ + Namespace: s.LBJig.Namespace, + Name: s.LBJig.Name, + Labels: s.LBJig.Labels, } + rc.Spec = v1.ReplicationControllerSpec{ + Replicas: &replicas, + Selector: s.LBJig.Labels, + Template: &v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: s.LBJig.Labels, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "netexec", + Image: imageutils.GetE2EImage(imageutils.Agnhost), + Args: []string{ + "netexec", + fmt.Sprintf("--http-port=%d", s.PodPort), + fmt.Sprintf("--udp-port=%d", s.PodPort), + }, + ReadinessProbe: &v1.Probe{ + PeriodSeconds: 3, + ProbeHandler: v1.ProbeHandler{ + HTTPGet: &v1.HTTPGetAction{ + Port: intstr.FromInt(int(s.PodPort)), + Path: "/hostName", + }, + }, + }, + }, + }, + TerminationGracePeriodSeconds: &grace, + }, + }, + } + } +} - lbService, err := lbJig.CreateLoadBalancerService(loadBalancerCreateTimeout, serviceUpdateFunc) - framework.ExpectNoError(err) +// getLBTargetCount verifies the number of registered targets for a given LBv2 DNS name matches the expected count. +// The steps includes: +// - Get Load Balancer ARN from DNS name extracted from service Status.LoadBalancer.Ingress[0].Hostname +// - List listeners for the load balancer +// - Get target groups attached to listeners +// - Count registered targets in target groups +// - Verify count matches number of worker nodes +func getLBTargetCount(ctx context.Context, lbDNSName string, expectedTargets int) error { + // Load AWS config + cfg, err := config.LoadDefaultConfig(ctx) + if err != nil { + return fmt.Errorf("unable to load AWS config: %v", err) + } + elbClient := elbv2.NewFromConfig(cfg) - By("creating a pod to be part of the TCP service " + serviceName) - _, err = lbJig.Run(nil) - framework.ExpectNoError(err) + // Get Load Balancer ARN from DNS name + describeLBs, err := elbClient.DescribeLoadBalancers(ctx, &elbv2.DescribeLoadBalancersInput{}) + if err != nil { + return fmt.Errorf("failed to describe load balancers: %v", err) + } + var lbARN string + for _, lb := range describeLBs.LoadBalancers { + if strings.EqualFold(aws.ToString(lb.DNSName), lbDNSName) { + lbARN = aws.ToString(lb.LoadBalancerArn) + break + } + } + if lbARN == "" { + return fmt.Errorf("could not find LB with DNS name: %s", lbDNSName) + } - By("hitting the TCP service's LB External IP") - svcPort := int(lbService.Spec.Ports[0].Port) - ingressIP := e2eservice.GetIngressPoint(&lbService.Status.LoadBalancer.Ingress[0]) - framework.Logf("Load balancer's ingress IP: %s", ingressIP) + // List listeners for the load balancer + listenersOut, err := elbClient.DescribeListeners(ctx, &elbv2.DescribeListenersInput{ + LoadBalancerArn: aws.String(lbARN), + }) + if err != nil { + return fmt.Errorf("failed to describe listeners: %v", err) + } - e2eservice.TestReachableHTTP(ingressIP, svcPort, e2eservice.LoadBalancerLagTimeoutAWS) + // Get target groups attached to listeners + targetGroupARNs := map[string]struct{}{} + for _, listener := range listenersOut.Listeners { + if len(targetGroupARNs) > 0 { + break + } + for _, action := range listener.DefaultActions { + if action.TargetGroupArn != nil { + targetGroupARNs[aws.ToString(action.TargetGroupArn)] = struct{}{} + break + } + } + } + + if len(targetGroupARNs) == 0 { + return fmt.Errorf("no target groups found for LB: %s", lbARN) + } - // Update the service to cluster IP - By("changing TCP service back to type=ClusterIP") - _, err = lbJig.UpdateService(func(s *v1.Service) { - s.Spec.Type = v1.ServiceTypeClusterIP + // Count registered targets in target groups + totalTargets := 0 + for tgARN := range targetGroupARNs { + tgHealth, err := elbClient.DescribeTargetHealth(ctx, &elbv2.DescribeTargetHealthInput{ + TargetGroupArn: aws.String(tgARN), }) - framework.ExpectNoError(err) + if err != nil { + return fmt.Errorf("failed to describe target health for TG %s: %v", tgARN, err) + } + totalTargets += len(tgHealth.TargetHealthDescriptions) + } - // Wait for the load balancer to be destroyed asynchronously - _, err = lbJig.WaitForLoadBalancerDestroy(ingressIP, svcPort, loadBalancerCreateTimeout) - framework.ExpectNoError(err) - }) -}) + // Verify count matches number of worker nodes + if totalTargets != expectedTargets { + return fmt.Errorf("target count mismatch: expected %d, got %d", expectedTargets, totalTargets) + } + return nil +} From 85eb52a57e3b23c96eed41edb17ddf57cff74c3d Mon Sep 17 00:00:00 2001 From: Marco Braga Date: Sat, 14 Jun 2025 00:29:31 -0300 Subject: [PATCH 3/9] e2e/loadbalancer: implement hairpin connection cases Implementing the hairpin connection test cases, and exposing an issue on NLB with internal scheme which fails when the client is trying to access a service loadbalancer which is hosted in the same node. The hairpin connection is caused by the client IP preservation attribute is set to true (default), and the service does not provide an interface to prevent the issue. The e2e is expecting to pass to prevent permanent failures in CI, but it is tracked by an issue https://github.com/kubernetes/cloud-provider-aws/issues/1160. --- tests/e2e/loadbalancer.go | 360 +++++++++++++++++++++++++++++++++----- 1 file changed, 314 insertions(+), 46 deletions(-) diff --git a/tests/e2e/loadbalancer.go b/tests/e2e/loadbalancer.go index bd786b6aae..ec6ba18b59 100644 --- a/tests/e2e/loadbalancer.go +++ b/tests/e2e/loadbalancer.go @@ -16,12 +16,15 @@ package e2e import ( "context" "fmt" + "sort" "strings" + "time" . "github.com/onsi/ginkgo/v2" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" "k8s.io/kubernetes/test/e2e/framework" e2eservice "k8s.io/kubernetes/test/e2e/framework/service" @@ -39,10 +42,6 @@ const ( ) var ( - // clusterNodeSelector is the discovered node(compute/worker) selector used in the cluster. - clusterNodesSelector string - clusterNodesCount int = 0 - // lookupNodeSelectors are valid compute/node/worker selectors commonly used in different kubernetes // distributions. lookupNodeSelectors = []string{ @@ -51,6 +50,13 @@ var ( } ) +// ClusterNodeDiscovery holds information about discovered worker nodes. +type ClusterNodeDiscovery struct { + Selector string + Count int + SingleWorker string +} + // loadbalancer tests var _ = Describe("[cloud-provider-aws-e2e] loadbalancer", func() { f := framework.NewDefaultFramework("cloud-provider-aws") @@ -71,11 +77,14 @@ var _ = Describe("[cloud-provider-aws-e2e] loadbalancer", func() { }) type loadBalancerTestCases struct { - Name string - ResourceSuffix string - Annotations map[string]string - PostConfigService func(cfg *configServiceLB, svc *v1.Service) - PostRunValidation func(cfg *configServiceLB, svc *v1.Service) + Name string + ResourceSuffix string + Annotations map[string]string + PostConfigService func(cfg *configServiceLB, svc *v1.Service, nodeDiscovery ClusterNodeDiscovery) + PostRunValidation func(cfg *configServiceLB, svc *v1.Service, nodeDiscovery ClusterNodeDiscovery) + HookInClusterTestReachableHTTP bool + RequireAffinity bool + SkipTestFailure bool } cases := []loadBalancerTestCases{ { @@ -93,50 +102,68 @@ var _ = Describe("[cloud-provider-aws-e2e] loadbalancer", func() { { Name: "NLB should configure the loadbalancer with target-node-labels", ResourceSuffix: "sg-nd", - Annotations: map[string]string{ - annotationLBType: "nlb", - }, - PostConfigService: func(cfg *configServiceLB, svc *v1.Service) { - // discover clusterNodeSelector and patch service - // TODO: move to external function if there are more scenarios to discover nodes. - By("discovering node label used in the kubernetes distributions") - for _, selector := range lookupNodeSelectors { - nodeList, err := cs.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{ - LabelSelector: selector, - }) - framework.ExpectNoError(err, "failed to list worker nodes") - if len(nodeList.Items) > 0 { - clusterNodesCount = len(nodeList.Items) - clusterNodesSelector = selector - break - } - } - - if clusterNodesCount == 0 { - framework.ExpectNoError(fmt.Errorf("unable to find node selector for %v", lookupNodeSelectors)) - } - - By(fmt.Sprintf("found %d nodes with selector %q\n", clusterNodesCount, clusterNodesSelector)) + Annotations: map[string]string{annotationLBType: "nlb"}, + PostConfigService: func(cfg *configServiceLB, svc *v1.Service, nodeDiscovery ClusterNodeDiscovery) { + By(fmt.Sprintf("found %d nodes with selector %q\n", nodeDiscovery.Count, nodeDiscovery.Selector)) if svc.Annotations == nil { svc.Annotations = map[string]string{} } - svc.Annotations[annotationLBTargetNodeLabels] = clusterNodesSelector + svc.Annotations[annotationLBTargetNodeLabels] = nodeDiscovery.Selector By(fmt.Sprintf("using service with annotations: %v", svc.Annotations)) }, - PostRunValidation: func(cfg *configServiceLB, svc *v1.Service) { - // Validate in the TG if the node count matches with expected target-node-labels selector. + PostRunValidation: func(cfg *configServiceLB, svc *v1.Service, nodeDiscovery ClusterNodeDiscovery) { if len(svc.Status.LoadBalancer.Ingress) == 0 { framework.Failf("No ingress found in LoadBalancer status for service %s/%s", svc.Namespace, svc.Name) } lbDNS := svc.Status.LoadBalancer.Ingress[0].Hostname - framework.ExpectNoError(getLBTargetCount(context.TODO(), lbDNS, clusterNodesCount), "AWS LB target count validation failed") + framework.ExpectNoError(getLBTargetCount(context.TODO(), lbDNS, nodeDiscovery.Count), "AWS LB target count validation failed") }, }, + // hairpin connection tests for internal CLB and NLB services LBs. + { + Name: "internal should support hairpin connection", + ResourceSuffix: "hp-clb-int", + Annotations: map[string]string{ + "service.beta.kubernetes.io/aws-load-balancer-internal": "true", + }, + PostConfigService: func(cfg *configServiceLB, svc *v1.Service, nodeDiscovery ClusterNodeDiscovery) { + if svc.Annotations == nil { + svc.Annotations = map[string]string{} + } + svc.Annotations[annotationLBTargetNodeLabels] = fmt.Sprintf("kubernetes.io/hostname=%s", nodeDiscovery.SingleWorker) + framework.Logf("Using service annotations: %v", svc.Annotations) + }, + HookInClusterTestReachableHTTP: true, + RequireAffinity: true, + }, + // FIXME: https://github.com/kubernetes/cloud-provider-aws/issues/1160 + // Hairpin connection work with target type as instance only when preserve client IP is disabled. + // Currently CCM does not provide an interface to create a service with that setup, making an internal + // Service to fail. + { + Name: "NLB internal should support hairpin connection", + ResourceSuffix: "hp-nlb-int", + Annotations: map[string]string{ + annotationLBType: "nlb", + "service.beta.kubernetes.io/aws-load-balancer-internal": "true", + }, + PostConfigService: func(cfg *configServiceLB, svc *v1.Service, nodeDiscovery ClusterNodeDiscovery) { + if svc.Annotations == nil { + svc.Annotations = map[string]string{} + } + svc.Annotations[annotationLBTargetNodeLabels] = fmt.Sprintf("kubernetes.io/hostname=%s", nodeDiscovery.SingleWorker) + framework.Logf("Using service annotations: %v", svc.Annotations) + }, + HookInClusterTestReachableHTTP: true, + RequireAffinity: true, + SkipTestFailure: true, + }, } serviceNameBase := "lbconfig-test" for _, tc := range cases { It(tc.Name, func() { + nodeDiscovery := discoverClusterWorkerNode(cs) loadBalancerCreateTimeout := e2eservice.GetServiceLoadBalancerCreationTimeout(cs) framework.Logf("Running tests against AWS with timeout %s", loadBalancerCreateTimeout) @@ -150,11 +177,13 @@ var _ = Describe("[cloud-provider-aws-e2e] loadbalancer", func() { By("creating a TCP service " + serviceName + " with type=LoadBalancerType in namespace " + ns.Name) lbConfig := newConfigServiceLB() lbConfig.LBJig = e2eservice.NewTestJig(cs, ns.Name, serviceName) + + // Hook annotations to support dynamic config lbServiceConfig := lbConfig.buildService(tc.Annotations) // Hook: PostConfigService patchs service configuration. if tc.PostConfigService != nil { - tc.PostConfigService(lbConfig, lbServiceConfig) + tc.PostConfigService(lbConfig, lbServiceConfig, nodeDiscovery) } // Create Load Balancer @@ -169,13 +198,13 @@ var _ = Describe("[cloud-provider-aws-e2e] loadbalancer", func() { // Run Workloads By("creating a pod to be part of the TCP service " + serviceName) - _, err = lbConfig.LBJig.Run(lbConfig.buildReplicationController()) + _, err = lbConfig.LBJig.Run(lbConfig.buildReplicationController(tc.RequireAffinity, nodeDiscovery)) framework.ExpectNoError(err) // Hook: PostRunValidation performs LB validations after it is created (before test). if tc.PostRunValidation != nil { By("running post run validations") - tc.PostRunValidation(lbConfig, lbService) + tc.PostRunValidation(lbConfig, lbService, nodeDiscovery) } // Test the Service Endpoint @@ -190,7 +219,16 @@ var _ = Describe("[cloud-provider-aws-e2e] loadbalancer", func() { ingressIP := e2eservice.GetIngressPoint(&lbService.Status.LoadBalancer.Ingress[0]) framework.Logf("Load balancer's ingress IP: %s", ingressIP) - e2eservice.TestReachableHTTP(ingressIP, svcPort, e2eservice.LoadBalancerLagTimeoutAWS) + // Hook: HookInClusterTestReachableHTTP changes the default test function to run the client in the cluster. + if tc.HookInClusterTestReachableHTTP { + err := inClusterTestReachableHTTP(cs, ns.Name, nodeDiscovery.SingleWorker, ingressIP, svcPort) + if err != nil && tc.SkipTestFailure { + Skip(err.Error()) + } + framework.ExpectNoError(err) + } else { + e2eservice.TestReachableHTTP(ingressIP, svcPort, e2eservice.LoadBalancerLagTimeoutAWS) + } // Update the service to cluster IP By("changing TCP service back to type=ClusterIP") @@ -275,16 +313,17 @@ func (s *configServiceLB) buildService(extraAnnotations map[string]string) *v1.S } // buildReplicationController creates a replication controller wrapper for the test framework. -// buildReplicationController is basaed on newRCTemplate() from the test, which not provide +// buildReplicationController is based on newRCTemplate() from the e2e test framework, which not provide // customization to bind in non-privileged ports. -// TODO(mtulio): v1.33+[2] moved from RC to Deployments on tests, we must do the same to use Run() +// TODO(mtulio): v1.33+[2][3] moved from RC to Deployments on tests, we must do the same to use Run() // when the test framework is updated. // [1] https://github.com/kubernetes/kubernetes/blob/89d95c9713a8fd189e8ad555120838b3c4f888d1/test/e2e/framework/service/jig.go#L636 // [2] https://github.com/kubernetes/kubernetes/issues/119021 -func (s *configServiceLB) buildReplicationController() func(rc *v1.ReplicationController) { +// [3] https://github.com/kubernetes/cloud-provider-aws/blob/master/tests/e2e/go.mod#L14 +func (s *configServiceLB) buildReplicationController(affinity bool, nodeDiscovery ClusterNodeDiscovery) func(rc *v1.ReplicationController) { return func(rc *v1.ReplicationController) { var replicas int32 = 1 - var grace int64 = 3 // so we don't race with kube-proxy when scaling up/down + var grace int64 = 3 rc.ObjectMeta = metav1.ObjectMeta{ Namespace: s.LBJig.Namespace, Name: s.LBJig.Name, @@ -322,6 +361,25 @@ func (s *configServiceLB) buildReplicationController() func(rc *v1.ReplicationCo }, }, } + if affinity { + rc.Spec.Template.Spec.Affinity = &v1.Affinity{ + NodeAffinity: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: "kubernetes.io/hostname", + Operator: v1.NodeSelectorOpIn, + Values: []string{nodeDiscovery.SingleWorker}, + }, + }, + }, + }, + }, + }, + } + } } } @@ -377,7 +435,6 @@ func getLBTargetCount(ctx context.Context, lbDNSName string, expectedTargets int } } } - if len(targetGroupARNs) == 0 { return fmt.Errorf("no target groups found for LB: %s", lbARN) } @@ -400,3 +457,214 @@ func getLBTargetCount(ctx context.Context, lbDNSName string, expectedTargets int } return nil } + +// discoverClusterWorkerNode identifies and selects worker nodes in the cluster based on predefined node label selectors. +// It returns a ClusterNodeDiscovery struct with the discovered information. +func discoverClusterWorkerNode(cs clientset.Interface) ClusterNodeDiscovery { + var workerNodeList []string + framework.Logf("discovering node label used in the kubernetes distributions") + for _, selector := range lookupNodeSelectors { + nodeList, err := cs.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{ + LabelSelector: selector, + }) + framework.ExpectNoError(err, "failed to list worker nodes") + if len(nodeList.Items) > 0 { + for _, node := range nodeList.Items { + workerNodeList = append(workerNodeList, node.Name) + } + // Save the first worker node in the list to be used in cases. + sort.Strings(workerNodeList) + return ClusterNodeDiscovery{ + Selector: selector, + Count: len(nodeList.Items), + SingleWorker: workerNodeList[0], + } + } + } + framework.ExpectNoError(fmt.Errorf("unable to find node selector for %v", lookupNodeSelectors)) + return ClusterNodeDiscovery{} +} + +// inClusterTestReachableHTTP creates a pod within the cluster to test HTTP connectivity to a target IP and port. +// It schedules the pod on the specified node using node affinity to test the hairpin scenario. +// The pod uses a curl-based container to perform the HTTP request and validates the response. +// The function waits for the pod to complete its execution and inspects its exit code to determine success or failure. +// +// Parameters: +// - cs: Kubernetes clientset interface used to interact with the cluster. +// - namespace: The namespace in which the test pod will be created. +// - nodeName: The name of the node where the test pod should be scheduled. +// - target: The IP address or Hostname of the target HTTP server. +// - targetPort: The port number of the target HTTP server. +// +// Returns: +// - error: Returns an error if the pod creation, execution, or cleanup fails, or if the HTTP test fails unexpectedly. +// +// Behavior: +// - The function creates a pod with a curl-based container to perform the HTTP request. +// - It configures the pod to run as a non-root user with security settings. +// - The pod is scheduled on the specified node using node affinity. +// - Logs are periodically collected during the pod's execution for troubleshooting. +// - Events are inspected if the pod remains in a pending state for too long. +// - The function waits for the pod to complete and inspects its exit code to determine success or failure. +// - If the pod fails, an error is returned. +// - The pod is cleaned up after the test completes. +func inClusterTestReachableHTTP(cs clientset.Interface, namespace, nodeName, target string, targetPort int) error { + podName := "http-test-pod" + + // client http test (curl) pod spec. + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + Namespace: namespace, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "curl", + Image: imageutils.GetE2EImage(imageutils.Agnhost), + Command: []string{"curl"}, + Args: []string{ + "--retry", "15", // Retry up to 15 times in case of transient network issues. + "--retry-delay", "20", // Wait 20 seconds between retries. + "--retry-max-time", "480", // Maximum time for retries is 480 seconds. + "--retry-all-errors", // Retry on all errors, ensuring robustness against temporary failures. + "--trace-time", // Include timestamps in trace output for debugging. + "-w", "\\\"\\n---> HTTPCode=%{http_code} Time=%{time_total}ms <---\\n\\\"", // Format output to include HTTP code and response time. + fmt.Sprintf("http://%s:%d/echo?msg=hello", target, targetPort), + }, + }, + }, + SecurityContext: &v1.PodSecurityContext{ + RunAsNonRoot: aws.Bool(true), // Ensures the pod runs as a non-root user for enhanced security. + RunAsUser: aws.Int64(1000), // Specifies the user ID for the container process. + RunAsGroup: aws.Int64(1000), // Specifies the group ID for the container process. + SeccompProfile: &v1.SeccompProfile{ + Type: v1.SeccompProfileTypeRuntimeDefault, // Enforces runtime default seccomp profile for syscall filtering. + }, + }, + RestartPolicy: v1.RestartPolicyNever, // Prevents the pod from restarting automatically. + Affinity: &v1.Affinity{ + NodeAffinity: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: "kubernetes.io/hostname", + Operator: v1.NodeSelectorOpIn, + Values: []string{nodeName}, // Ensures the pod is scheduled on the specified node. + }, + }, + }, + }, + }, + }, + }, + }, + } + ct := pod.Spec.Containers[0] + framework.Logf("In-Cluster test PodSpec Image=%v Command=%v Args=%v", ct.Image, ct.Command, ct.Args) + + // Create the pod + _, err := cs.CoreV1().Pods(namespace).Create(context.TODO(), pod, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("failed to create HTTP test pod: %v", err) + } + // Clean up the pod + defer func() { + err = cs.CoreV1().Pods(namespace).Delete(context.TODO(), podName, metav1.DeleteOptions{}) + if err != nil { + framework.Logf("Failed to delete pod %s: %v", podName, err) + } + }() + + // Pod logs wrapper. Collect recent logs, or all, from a test pod. + gatherLogs := func(tail int) string { + opts := &v1.PodLogOptions{} + if tail == 0 { + tail = 20 + } + opts.TailLines = aws.Int64(int64(tail)) + logs, errL := cs.CoreV1().Pods(namespace).GetLogs(podName, opts).DoRaw(context.TODO()) + if errL != nil { + framework.Logf("Failed to retrieve pod logs: %v", errL) + return "" + } + return string(logs) + } + + // Wait for the test pod to complete. Limit waiter be higher than curl retries. + waitCount := 0 + pendingCount := 0 + err = wait.PollImmediate(15*time.Second, 15*time.Minute, func() (bool, error) { + p, err := cs.CoreV1().Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{}) + if err != nil { + framework.Logf("Error getting pod %s: %v", podName, err) + return false, err + } + framework.Logf("Pod %s status: Phase=%s", podName, p.Status.Phase) + podFinished := p.Status.Phase == v1.PodSucceeded || p.Status.Phase == v1.PodFailed + + // Troubleshoot pending pods + if p.Status.Phase == v1.PodPending { + pendingCount++ + } + if pendingCount%10 == 0 && pendingCount > 0 { + framework.Logf("Pod %s is pending for too long, checking events...", podName) + events, errE := cs.CoreV1().Events(namespace).List(context.TODO(), metav1.ListOptions{ + FieldSelector: fmt.Sprintf("involvedObject.name=%s", podName), + }) + if errE != nil { + framework.Logf("Failed to list events for pod %s: %v", podName, errE) + } else { + for _, event := range events.Items { + framework.Logf("Event: %s - %s", event.Reason, event.Message) + } + } + } + // frequently collect logs. + if waitCount > 0 && waitCount%4 == 0 { + framework.Logf("Tail logs for HTTP test pod:\n%s", gatherLogs(5)) + } + if podFinished { + framework.Logf("Tail logs for HTTP test pod:\n%s", gatherLogs(0)) + } + waitCount++ + return podFinished, nil + }) + // Check overall error + if err != nil { + return fmt.Errorf("error waiting for pod %s to complete: %v", podName, err) + } + + // Inspect the pod's container status for exit code + pod, errS := cs.CoreV1().Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{}) + if errS != nil { + return fmt.Errorf("failed to get pod %s: %v", podName, errS) + } + if len(pod.Status.ContainerStatuses) == 0 { + return fmt.Errorf("no container statuses found for pod %s", podName) + } + containerStatus := pod.Status.ContainerStatuses[0] + + if containerStatus.State.Terminated != nil { + exitCode := containerStatus.State.Terminated.ExitCode + if exitCode != 0 { + errmsg := fmt.Errorf("pod %s exited with code %d", podName, exitCode) + framework.Logf("WARNING: %s.", errmsg.Error()) + return errmsg + } + } + + // Validate HTTP response format + // Expected format: HTTPCode=200 Time=