From 9629164862fc682181e579431d28a84186d6ceaa Mon Sep 17 00:00:00 2001 From: Jack Date: Thu, 24 Feb 2022 17:22:31 -0800 Subject: [PATCH] add Eventually() to retryable k8s E2E operations --- test/e2e/aks.go | 4 +- test/e2e/azure_failuredomains.go | 16 ++- test/e2e/azure_gpu.go | 13 +- test/e2e/azure_lb.go | 122 ++++++++++++++---- test/e2e/azure_machinepool_drain.go | 76 ++++++----- test/e2e/azure_privatecluster.go | 10 +- test/e2e/helpers.go | 5 +- test/e2e/kubernetes/deployment/deployment.go | 40 ++++-- test/e2e/kubernetes/job/job.go | 2 +- test/e2e/kubernetes/namespace/namespace.go | 24 +++- .../kubernetes/networkpolicy/networkpolicy.go | 42 ++++-- test/e2e/kubernetes/node/node.go | 58 ++++++--- test/e2e/kubernetes/pod/pod.go | 63 ++++----- test/e2e/log.go | 10 ++ 14 files changed, 320 insertions(+), 165 deletions(-) diff --git a/test/e2e/aks.go b/test/e2e/aks.go index 54f72e269be..2f908f3c9fd 100644 --- a/test/e2e/aks.go +++ b/test/e2e/aks.go @@ -188,7 +188,7 @@ func WaitForControlPlaneMachinesToExist(ctx context.Context, input WaitForContro ammpList := &infrav1exp.AzureManagedMachinePoolList{} if err := input.Lister.List(ctx, ammpList, opt1, opt2); err != nil { - Logf("Failed to get machinePool: %+v", err) + LogWarningf("Failed to get machinePool: %+v", err) return false } @@ -202,7 +202,7 @@ func WaitForControlPlaneMachinesToExist(ctx context.Context, input WaitForContro ownerMachinePool := &clusterv1exp.MachinePool{} if err := input.Getter.Get(ctx, types.NamespacedName{Namespace: input.Namespace, Name: ref.Name}, ownerMachinePool); err != nil { - Logf("Failed to get machinePool: %+v", err) + LogWarningf("Failed to get machinePool: %+v", err) return false } if len(ownerMachinePool.Status.NodeRefs) >= minReplicas.value(ownerMachinePool) { diff --git a/test/e2e/azure_failuredomains.go b/test/e2e/azure_failuredomains.go index 51850478e1b..88f6067972e 100644 --- a/test/e2e/azure_failuredomains.go +++ b/test/e2e/azure_failuredomains.go @@ -62,11 +62,17 @@ func AzureFailureDomainsSpec(ctx context.Context, inputGetter func() AzureFailur By("Ensuring zones match CAPI failure domains") // fetch updated cluster object to ensure Status.FailureDomains is up-to-date - err := input.BootstrapClusterProxy.GetClient().Get(ctx, - apimachinerytypes.NamespacedName{ - Namespace: input.Namespace.Name, - Name: input.ClusterName, - }, input.Cluster) + Eventually(func() error { + err := input.BootstrapClusterProxy.GetClient().Get(ctx, + apimachinerytypes.NamespacedName{ + Namespace: input.Namespace.Name, + Name: input.ClusterName, + }, input.Cluster) + if err != nil { + LogWarning(err.Error()) + } + return err + }, retryableOperationTimeout, retryableOperationSleepBetweenRetries).Should(Succeed()) Expect(err).NotTo(HaveOccurred()) Expect(len(input.Cluster.Status.FailureDomains)).To(Equal(len(zones))) for _, z := range zones { diff --git a/test/e2e/azure_gpu.go b/test/e2e/azure_gpu.go index 28daa2e60e2..c8e031c3a0f 100644 --- a/test/e2e/azure_gpu.go +++ b/test/e2e/azure_gpu.go @@ -121,10 +121,15 @@ func AzureGPUSpec(ctx context.Context, inputGetter func() AzureGPUSpecInput) { // getGPUOperatorPodLogs returns the logs of the Nvidia GPU operator pods. func getGPUOperatorPodLogs(ctx context.Context, clientset *kubernetes.Clientset) string { podsClient := clientset.CoreV1().Pods(corev1.NamespaceAll) - pods, err := podsClient.List(ctx, metav1.ListOptions{LabelSelector: "app.kubernetes.io/instance=gpu-operator"}) - if err != nil { - return err.Error() - } + var pods *corev1.PodList + var err error + Eventually(func() error { + pods, err = podsClient.List(ctx, metav1.ListOptions{LabelSelector: "app.kubernetes.io/instance=gpu-operator"}) + if err != nil { + LogWarning(err.Error()) + } + return err + }, retryableOperationTimeout, retryableOperationSleepBetweenRetries).Should(Succeed()) b := strings.Builder{} for _, pod := range pods.Items { b.WriteString(fmt.Sprintf("\nLogs for pod %s:\n", pod.Name)) diff --git a/test/e2e/azure_lb.go b/test/e2e/azure_lb.go index 79e5953a4ae..853034e7b13 100644 --- a/test/e2e/azure_lb.go +++ b/test/e2e/azure_lb.go @@ -123,10 +123,16 @@ func AzureLBSpec(ctx context.Context, inputGetter func() AzureLBSpecInput) { if !input.IPv6 { By("creating an internal Load Balancer service") - ilbService := webDeployment.GetService(ports, deploymentBuilder.InternalLoadbalancer) + ilbService := webDeployment.CreateServiceResourceSpec(ports, deploymentBuilder.InternalLoadbalancer) Log("starting to create an internal Load Balancer service") - _, err = servicesClient.Create(ctx, ilbService, metav1.CreateOptions{}) - Expect(err).NotTo(HaveOccurred()) + Eventually(func() error { + _, err := servicesClient.Create(ctx, ilbService, metav1.CreateOptions{}) + if err != nil { + LogWarningf("failed creating service (%s):%s\n", ilbService.Name, err.Error()) + return err + } + return nil + }, retryableOperationTimeout, retryableOperationSleepBetweenRetries).Should(Succeed()) ilbSvcInput := WaitForServiceAvailableInput{ Getter: servicesClientAdapter{client: servicesClient}, Service: ilbService, @@ -136,14 +142,28 @@ func AzureLBSpec(ctx context.Context, inputGetter func() AzureLBSpecInput) { By("connecting to the internal LB service from a curl pod") - svc, err := servicesClient.Get(ctx, ilbService.Name, metav1.GetOptions{}) - Expect(err).NotTo(HaveOccurred()) + var svc *corev1.Service + Eventually(func() error { + var err error + svc, err = servicesClient.Get(ctx, ilbService.Name, metav1.GetOptions{}) + if err != nil { + LogWarningf("failed getting service (%s):%s\n", ilbService.Name, err.Error()) + return err + } + return nil + }, retryableOperationTimeout, retryableOperationSleepBetweenRetries).Should(Succeed()) ilbIP := extractServiceIp(svc) - ilbJob := job.CreateCurlJob("curl-to-ilb-job", ilbIP) + ilbJob := job.CreateCurlJobResourceSpec("curl-to-ilb-job", ilbIP) Log("starting to create a curl to ilb job") - _, err = jobsClient.Create(ctx, ilbJob, metav1.CreateOptions{}) - Expect(err).NotTo(HaveOccurred()) + Eventually(func() error { + _, err := jobsClient.Create(ctx, ilbJob, metav1.CreateOptions{}) + if err != nil { + LogWarningf("failed creating job (%s):%s\n", ilbJob.Name, err.Error()) + return err + } + return nil + }, retryableOperationTimeout, retryableOperationSleepBetweenRetries).Should(Succeed()) ilbJobInput := WaitForJobCompleteInput{ Getter: jobsClientAdapter{client: jobsClient}, Job: ilbJob, @@ -154,19 +174,37 @@ func AzureLBSpec(ctx context.Context, inputGetter func() AzureLBSpecInput) { if !input.SkipCleanup { By("deleting the ilb test resources") Logf("deleting the ilb service: %s", ilbService.Name) - err = servicesClient.Delete(ctx, ilbService.Name, metav1.DeleteOptions{}) - Expect(err).NotTo(HaveOccurred()) + Eventually(func() error { + err := servicesClient.Delete(ctx, ilbService.Name, metav1.DeleteOptions{}) + if err != nil { + LogWarningf("failed deleting service (%s):%s\n", ilbService.Name, err.Error()) + return err + } + return nil + }, deleteOperationTimeout, retryableOperationSleepBetweenRetries).Should(Succeed()) Logf("deleting the ilb job: %s", ilbJob.Name) - err = jobsClient.Delete(ctx, ilbJob.Name, metav1.DeleteOptions{}) - Expect(err).NotTo(HaveOccurred()) + Eventually(func() error { + err := jobsClient.Delete(ctx, ilbJob.Name, metav1.DeleteOptions{}) + if err != nil { + LogWarningf("failed deleting job (%s):%s\n", ilbJob.Name, err.Error()) + return err + } + return nil + }, deleteOperationTimeout, retryableOperationSleepBetweenRetries).Should(Succeed()) } } By("creating an external Load Balancer service") - elbService := webDeployment.GetService(ports, deploymentBuilder.ExternalLoadbalancer) + elbService := webDeployment.CreateServiceResourceSpec(ports, deploymentBuilder.ExternalLoadbalancer) Log("starting to create an external Load Balancer service") - _, err = servicesClient.Create(ctx, elbService, metav1.CreateOptions{}) - Expect(err).NotTo(HaveOccurred()) + Eventually(func() error { + _, err := servicesClient.Create(ctx, elbService, metav1.CreateOptions{}) + if err != nil { + LogWarningf("failed creating service (%s):%s\n", elbService.Name, err.Error()) + return err + } + return nil + }, retryableOperationTimeout, retryableOperationSleepBetweenRetries).Should(Succeed()) elbSvcInput := WaitForServiceAvailableInput{ Getter: servicesClientAdapter{client: servicesClient}, Service: elbService, @@ -175,14 +213,28 @@ func AzureLBSpec(ctx context.Context, inputGetter func() AzureLBSpecInput) { WaitForServiceAvailable(ctx, elbSvcInput, e2eConfig.GetIntervals(specName, "wait-service")...) By("connecting to the external LB service from a curl pod") - svc, err := servicesClient.Get(ctx, elbService.Name, metav1.GetOptions{}) - Expect(err).NotTo(HaveOccurred()) + var svc *corev1.Service + Eventually(func() error { + var err error + svc, err = servicesClient.Get(ctx, elbService.Name, metav1.GetOptions{}) + if err != nil { + LogWarningf("failed getting service (%s):%s\n", elbService.Name, err.Error()) + return err + } + return nil + }, retryableOperationTimeout, retryableOperationSleepBetweenRetries).Should(Succeed()) elbIP := extractServiceIp(svc) Log("starting to create curl-to-elb job") - elbJob := job.CreateCurlJob("curl-to-elb-job"+util.RandomString(6), elbIP) - _, err = jobsClient.Create(ctx, elbJob, metav1.CreateOptions{}) - Expect(err).NotTo(HaveOccurred()) + elbJob := job.CreateCurlJobResourceSpec("curl-to-elb-job"+util.RandomString(6), elbIP) + Eventually(func() error { + _, err := jobsClient.Create(ctx, elbJob, metav1.CreateOptions{}) + if err != nil { + LogWarningf("failed creating job (%s):%s\n", elbJob.Name, err.Error()) + return err + } + return nil + }, retryableOperationTimeout, retryableOperationSleepBetweenRetries).Should(Succeed()) elbJobInput := WaitForJobCompleteInput{ Getter: jobsClientAdapter{client: jobsClient}, Job: elbJob, @@ -208,14 +260,32 @@ func AzureLBSpec(ctx context.Context, inputGetter func() AzureLBSpecInput) { } By("deleting the test resources") Logf("starting to delete external LB service %s", elbService.Name) - err = servicesClient.Delete(ctx, elbService.Name, metav1.DeleteOptions{}) - Expect(err).NotTo(HaveOccurred()) + Eventually(func() error { + err := servicesClient.Delete(ctx, elbService.Name, metav1.DeleteOptions{}) + if err != nil { + LogWarningf("failed deleting service (%s):%s\n", elbService.Name, err.Error()) + return err + } + return nil + }, deleteOperationTimeout, retryableOperationSleepBetweenRetries).Should(Succeed()) Logf("starting to delete deployment %s", deployment.Name) - err = webDeployment.Client(clientset).Delete(ctx, deployment.Name, metav1.DeleteOptions{}) - Expect(err).NotTo(HaveOccurred()) + Eventually(func() error { + err := webDeployment.Client(clientset).Delete(ctx, deployment.Name, metav1.DeleteOptions{}) + if err != nil { + LogWarningf("failed deleting deployment (%s):%s\n", deployment.Name, err.Error()) + return err + } + return nil + }, deleteOperationTimeout, retryableOperationSleepBetweenRetries).Should(Succeed()) Logf("starting to delete job %s", elbJob.Name) - err = jobsClient.Delete(ctx, elbJob.Name, metav1.DeleteOptions{}) - Expect(err).NotTo(HaveOccurred()) + Eventually(func() error { + err := jobsClient.Delete(ctx, elbJob.Name, metav1.DeleteOptions{}) + if err != nil { + LogWarningf("failed deleting job (%s):%s\n", elbJob.Name, err.Error()) + return err + } + return nil + }, deleteOperationTimeout, retryableOperationSleepBetweenRetries).Should(Succeed()) } func extractServiceIp(svc *corev1.Service) string { diff --git a/test/e2e/azure_machinepool_drain.go b/test/e2e/azure_machinepool_drain.go index 6ae78e1ddb1..bdca6b50444 100644 --- a/test/e2e/azure_machinepool_drain.go +++ b/test/e2e/azure_machinepool_drain.go @@ -47,7 +47,11 @@ import ( ) const ( - AzureMachinePoolDrainSpecName = "azure-mp-drain" + AzureMachinePoolDrainSpecName = "azure-mp-drain" + waitForDrainOperationTimeout = 5 * time.Minute + waitForDrainSleepBetweenRetries = 500 * time.Millisecond + waitforResourceOperationTimeout = 30 * time.Second + waitforResourceOperationSleepBetweenRetries = 3 * time.Second ) // AzureMachinePoolDrainSpecInput is the input for AzureMachinePoolDrainSpec. @@ -154,19 +158,21 @@ func testMachinePoolCordonAndDrain(ctx context.Context, mgmtClusterProxy, worklo Eventually(func() error { helper, err := patch.NewHelper(owningMachinePool, mgmtClusterProxy.GetClient()) if err != nil { + LogWarning(err.Error()) return err } decreasedReplicas := *owningMachinePool.Spec.Replicas - int32(1) owningMachinePool.Spec.Replicas = &decreasedReplicas return helper.Patch(ctx, owningMachinePool) - }) + }, 3*time.Minute, 3*time.Second).Should(Succeed()) By(fmt.Sprintf("checking for a machine to start draining for machine pool: %s/%s", amp.Namespace, amp.Name)) Eventually(func() error { ampmls, err := getAzureMachinePoolMachines(ctx, mgmtClusterProxy, workloadClusterProxy, amp) if err != nil { - return errors.Wrap(err, "failed to list the azure machine pool machines") + LogWarning(errors.Wrap(err, "failed to list the azure machine pool machines").Error()) + return err } for _, machine := range ampmls { @@ -176,35 +182,33 @@ func testMachinePoolCordonAndDrain(ctx context.Context, mgmtClusterProxy, worklo } return errors.New("no machine has started to drain") - }) - - By(fmt.Sprintf("checking for a machine to successfully complete draining for machine pool: %s/%s", amp.Namespace, amp.Name)) - Eventually(func() error { - ampmls, err := getAzureMachinePoolMachines(ctx, mgmtClusterProxy, workloadClusterProxy, amp) - if err != nil { - return errors.Wrap(err, "failed to list the azure machine pool machines") - } - - for _, machine := range ampmls { - if conditions.Has(&machine, clusterv1.DrainingSucceededCondition) && conditions.IsTrue(&machine, clusterv1.DrainingSucceededCondition) { - return nil // started draining the node prior to delete - } - } + }, waitForDrainOperationTimeout, waitForDrainSleepBetweenRetries).Should(Succeed()) - return errors.New("no machine has finished draining") - }) + // TODO setup a watcher to detect the terminal drain success state } func labelNodesWithMachinePoolName(ctx context.Context, workloadClient client.Client, mpName string, ampms []infrav1exp.AzureMachinePoolMachine) { for _, ampm := range ampms { n := &corev1.Node{} - Expect(workloadClient.Get(ctx, client.ObjectKey{ - Name: ampm.Status.NodeRef.Name, - Namespace: ampm.Status.NodeRef.Namespace, - }, n)).ToNot(HaveOccurred()) + Eventually(func() error { + err := workloadClient.Get(ctx, client.ObjectKey{ + Name: ampm.Status.NodeRef.Name, + Namespace: ampm.Status.NodeRef.Namespace, + }, n) + if err != nil { + LogWarning(err.Error()) + } + return err + }, waitforResourceOperationTimeout, 3*time.Second).Should(Succeed()) n.Labels[clusterv1.OwnerKindAnnotation] = "MachinePool" n.Labels[clusterv1.OwnerNameAnnotation] = mpName - Expect(workloadClient.Update(ctx, n)).ToNot(HaveOccurred()) + Eventually(func() error { + err := workloadClient.Update(ctx, n) + if err != nil { + LogWarning(err.Error()) + } + return err + }, waitforResourceOperationTimeout, 3*time.Second).Should(Succeed()) } } @@ -231,10 +235,16 @@ func getOwnerMachinePool(ctx context.Context, c client.Client, obj metav1.Object if ref.Kind == "MachinePool" && gv.Group == clusterv1exp.GroupVersion.Group { mp := &clusterv1exp.MachinePool{} - err := c.Get(ctx, client.ObjectKey{ - Name: ref.Name, - Namespace: obj.Namespace, - }, mp) + Eventually(func() error { + err := c.Get(ctx, client.ObjectKey{ + Name: ref.Name, + Namespace: obj.Namespace, + }, mp) + if err != nil { + LogWarning(err.Error()) + } + return err + }, waitforResourceOperationTimeout, 3*time.Second).Should(Succeed()) return mp, err } } @@ -271,18 +281,14 @@ func deployHttpService(ctx context.Context, clientset *kubernetes.Clientset, isW webDeploymentBuilder.AddContainerPort("http", "http", 80, corev1.ProtocolTCP) if isWindows { - var windowsVersion windows.OSVersion - Eventually(func() error { - version, err := node.GetWindowsVersion(ctx, clientset) - windowsVersion = version - return err - }, 300*time.Second, 5*time.Second).Should(Succeed()) + windowsVersion, err := node.GetWindowsVersion(ctx, clientset) + Expect(err).NotTo(HaveOccurred()) iisImage := windows.GetWindowsImage(windows.Httpd, windowsVersion) webDeploymentBuilder.SetImage(deploymentName, iisImage) webDeploymentBuilder.AddWindowsSelectors() } - elbService := webDeploymentBuilder.GetService(ports, deployments.ExternalLoadbalancer) + elbService := webDeploymentBuilder.CreateServiceResourceSpec(ports, deployments.ExternalLoadbalancer) for _, opt := range opts { opt(webDeploymentBuilder, elbService) diff --git a/test/e2e/azure_privatecluster.go b/test/e2e/azure_privatecluster.go index 8b90549ae0f..416b39f64e1 100644 --- a/test/e2e/azure_privatecluster.go +++ b/test/e2e/azure_privatecluster.go @@ -25,7 +25,6 @@ import ( "os" "path/filepath" "strings" - "time" "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2021-04-01/compute" "github.com/Azure/azure-sdk-for-go/services/network/mgmt/2021-02-01/network" @@ -175,10 +174,11 @@ func AzurePrivateClusterSpec(ctx context.Context, inputGetter func() AzurePrivat if err != nil { // some unexpected error occurred; return it + LogWarning(err.Error()) return err } - return fmt.Errorf("cluster %q as not yet been deleted", cluster.Name) + return fmt.Errorf("cluster %q has not yet been deleted", cluster.Name) }, input.E2EConfig.GetIntervals(specName, "wait-delete-cluster")...).Should(BeNil()) Logf("deleted private cluster %q in namespace %q", cluster.Name, cluster.Namespace) } @@ -406,7 +406,7 @@ func SetupExistingVNet(ctx context.Context, vnetCidr string, cpSubnetCidrs, node for _, genericResource := range page.Values() { apiversion, err := getAPIVersion(*genericResource.ID) if err != nil { - Logf("failed to get API version for %q with %+v", *genericResource.ID, err) + LogWarningf("failed to get API version for %q with %+v", *genericResource.ID, err) } _, err = resClient.GetByID(ctx, *genericResource.ID, apiversion) @@ -417,7 +417,7 @@ func SetupExistingVNet(ctx context.Context, vnetCidr string, cpSubnetCidrs, node // unexpected error calling GET on the resource if err != nil { - Logf("failed GETing resource %q with %+v", *genericResource.ID, err) + LogWarningf("failed GETing resource %q with %+v", *genericResource.ID, err) return nil, err } @@ -426,7 +426,7 @@ func SetupExistingVNet(ctx context.Context, vnetCidr string, cpSubnetCidrs, node } return foundResources, nil // add some tolerance for Azure caching of resource group resource caching - }, 5*time.Minute, 10*time.Second).Should(HaveLen(0), "Expect the manually created resource group is empty after removing the manually created resources.") + }, deleteOperationTimeout, retryableOperationTimeout).Should(HaveLen(0), "Expect the manually created resource group is empty after removing the manually created resources.") Logf("deleting the existing resource group %q", groupName) grpFuture, err := groupClient.Delete(ctx, groupName) diff --git a/test/e2e/helpers.go b/test/e2e/helpers.go index 3c722e29971..e85f613d419 100644 --- a/test/e2e/helpers.go +++ b/test/e2e/helpers.go @@ -61,7 +61,10 @@ import ( ) const ( - sshPort = "22" + sshPort = "22" + deleteOperationTimeout = 20 * time.Minute + retryableOperationTimeout = 30 * time.Second + retryableOperationSleepBetweenRetries = 3 * time.Second ) // deploymentsClientAdapter adapts a Deployment to work with WaitForDeploymentsAvailable. diff --git a/test/e2e/kubernetes/deployment/deployment.go b/test/e2e/kubernetes/deployment/deployment.go index 202cac89cd3..9aaa98e322d 100644 --- a/test/e2e/kubernetes/deployment/deployment.go +++ b/test/e2e/kubernetes/deployment/deployment.go @@ -22,10 +22,12 @@ import ( "context" "fmt" "log" + "time" typedappsv1 "k8s.io/client-go/kubernetes/typed/apps/v1" clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" + . "github.com/onsi/gomega" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -36,8 +38,10 @@ import ( ) const ( - ExternalLoadbalancer = LoadbalancerType("external") - InternalLoadbalancer = LoadbalancerType("internal") + ExternalLoadbalancer = LoadbalancerType("external") + InternalLoadbalancer = LoadbalancerType("internal") + deploymentOperationTimeout = 30 * time.Second + deploymentOperationSleepBetweenRetries = 3 * time.Second ) type ( @@ -128,11 +132,16 @@ func (d *Builder) AddContainerPort(name, portName string, portNumber int32, prot } func (d *Builder) Deploy(ctx context.Context, clientset *kubernetes.Clientset) (*appsv1.Deployment, error) { - deployment, err := d.Client(clientset).Create(ctx, d.deployment, metav1.CreateOptions{}) - if err != nil { - log.Printf("Error trying to deploy %s in namespace %s:%s\n", d.deployment.Name, d.deployment.ObjectMeta.Namespace, err.Error()) - return nil, err - } + var deployment *appsv1.Deployment + Eventually(func() error { + var err error + deployment, err = d.Client(clientset).Create(ctx, d.deployment, metav1.CreateOptions{}) + if err != nil { + log.Printf("Error trying to deploy %s in namespace %s:%s\n", d.deployment.Name, d.deployment.ObjectMeta.Namespace, err.Error()) + return err + } + return nil + }, deploymentOperationTimeout, deploymentOperationSleepBetweenRetries).Should(Succeed()) return deployment, nil } @@ -146,15 +155,20 @@ func (d *Builder) GetPodsFromDeployment(ctx context.Context, clientset *kubernet LabelSelector: labels.Set(d.deployment.Labels).String(), Limit: 100, } - pods, err := clientset.CoreV1().Pods(d.deployment.GetNamespace()).List(ctx, opts) - if err != nil { - log.Printf("Error trying to get the pods from deployment %s:%s\n", d.deployment.GetName(), err.Error()) - return nil, err - } + var pods *corev1.PodList + Eventually(func() error { + var err error + pods, err = clientset.CoreV1().Pods(d.deployment.GetNamespace()).List(ctx, opts) + if err != nil { + log.Printf("Error trying to get the pods from deployment %s:%s\n", d.deployment.GetName(), err.Error()) + return err + } + return nil + }, deploymentOperationTimeout, deploymentOperationSleepBetweenRetries).Should(Succeed()) return pods.Items, nil } -func (d *Builder) GetService(ports []corev1.ServicePort, lbtype LoadbalancerType) *corev1.Service { +func (d *Builder) CreateServiceResourceSpec(ports []corev1.ServicePort, lbtype LoadbalancerType) *corev1.Service { suffix := "elb" annotations := map[string]string{} if lbtype == InternalLoadbalancer { diff --git a/test/e2e/kubernetes/job/job.go b/test/e2e/kubernetes/job/job.go index 6b79465cd15..38b6c49f61d 100644 --- a/test/e2e/kubernetes/job/job.go +++ b/test/e2e/kubernetes/job/job.go @@ -25,7 +25,7 @@ import ( "sigs.k8s.io/cluster-api/util" ) -func CreateCurlJob(name, endpoint string) *batchv1.Job { +func CreateCurlJobResourceSpec(name, endpoint string) *batchv1.Job { name = name + util.RandomString(5) return &batchv1.Job{ ObjectMeta: metav1.ObjectMeta{ diff --git a/test/e2e/kubernetes/namespace/namespace.go b/test/e2e/kubernetes/namespace/namespace.go index ea803e33488..b9f86c20914 100644 --- a/test/e2e/kubernetes/namespace/namespace.go +++ b/test/e2e/kubernetes/namespace/namespace.go @@ -21,12 +21,19 @@ package namespace import ( "context" "log" + "time" + . "github.com/onsi/gomega" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" ) +const ( + namespaceOperationTimeout = 30 * time.Second + namespaceOperationSleepBetweenRetries = 3 * time.Second +) + // Create a namespace with the given name func Create(ctx context.Context, clientset *kubernetes.Clientset, name string, labels map[string]string) (*corev1.Namespace, error) { ns := &corev1.Namespace{ @@ -36,15 +43,20 @@ func Create(ctx context.Context, clientset *kubernetes.Clientset, name string, l }, } - namespace, err := clientset.CoreV1().Namespaces().Create(ctx, ns, metav1.CreateOptions{}) - if err != nil { - log.Printf("failed trying to create namespace (%s):%s\n", name, err.Error()) - return nil, err - } + var namespace *corev1.Namespace + Eventually(func() error { + var err error + namespace, err = clientset.CoreV1().Namespaces().Create(ctx, ns, metav1.CreateOptions{}) + if err != nil { + log.Printf("failed trying to create namespace (%s):%s\n", name, err.Error()) + return err + } + return nil + }, namespaceOperationTimeout, namespaceOperationSleepBetweenRetries).Should(Succeed()) return namespace, nil } -// CreateIfNotExist a namespace with the given name if it doesn't exist already +// CreateNamespaceDeleteIfExist creates a namespace, deletes it first if it already exists func CreateNamespaceDeleteIfExist(ctx context.Context, clientset *kubernetes.Clientset, name string, labels map[string]string) (*corev1.Namespace, error) { n, err := Get(ctx, clientset, name) if err == nil { diff --git a/test/e2e/kubernetes/networkpolicy/networkpolicy.go b/test/e2e/kubernetes/networkpolicy/networkpolicy.go index 6bdb428521a..b394269e57c 100644 --- a/test/e2e/kubernetes/networkpolicy/networkpolicy.go +++ b/test/e2e/kubernetes/networkpolicy/networkpolicy.go @@ -22,7 +22,9 @@ import ( "context" "fmt" "io/ioutil" + "log" "path/filepath" + "time" . "github.com/onsi/gomega" @@ -36,6 +38,11 @@ import ( "k8s.io/kubectl/pkg/scheme" ) +const ( + networkPolicyOperationTimeout = 30 * time.Second + networkPolicyOperationSleepBetweenRetries = 3 * time.Second +) + // CreateNetworkPolicyFromFile will create a NetworkPolicy from file with a name func CreateNetworkPolicyFromFile(ctx context.Context, clientset *kubernetes.Clientset, filename, namespace string) error { data, err := ioutil.ReadFile(filename) @@ -61,21 +68,33 @@ func CreateNetworkPolicyFromFile(ctx context.Context, clientset *kubernetes.Clie } func createNetworkPolicyV1(ctx context.Context, clientset *kubernetes.Clientset, namespace string, networkPolicy *networkingv1.NetworkPolicy) error { - _, err := clientset.NetworkingV1().NetworkPolicies(namespace).Create(ctx, networkPolicy, metav1.CreateOptions{}) - return err + Eventually(func() error { + _, err := clientset.NetworkingV1().NetworkPolicies(namespace).Create(ctx, networkPolicy, metav1.CreateOptions{}) + if err != nil { + log.Printf("failed trying to create NetworkPolicy (%s):%s\n", networkPolicy.Name, err.Error()) + return err + } + return nil + }, networkPolicyOperationTimeout, networkPolicyOperationSleepBetweenRetries).Should(Succeed()) + return nil } // DeleteNetworkPolicy will create a NetworkPolicy from file with a name func DeleteNetworkPolicy(ctx context.Context, clientset *kubernetes.Clientset, name, namespace string) { opts := metav1.DeleteOptions{} - err := clientset.NetworkingV1().NetworkPolicies(namespace).Delete(ctx, name, opts) - Expect(err).NotTo(HaveOccurred()) + Eventually(func() error { + err := clientset.NetworkingV1().NetworkPolicies(namespace).Delete(ctx, name, opts) + if err != nil { + log.Printf("failed trying to delete NetworkPolicy (%s):%s\n", name, err.Error()) + return err + } + return nil + }, networkPolicyOperationTimeout, networkPolicyOperationSleepBetweenRetries).Should(Succeed()) } func EnsureOutboundInternetAccess(clientset *kubernetes.Clientset, config *restclient.Config, pods []v1.Pod) { for _, pod := range pods { - err := CheckOutboundConnection(clientset, config, pod) - Expect(err).NotTo(HaveOccurred()) + CheckOutboundConnection(clientset, config, pod) } } @@ -83,19 +102,14 @@ func EnsureConnectivityResultBetweenPods(clientset *kubernetes.Clientset, config for _, fromPod := range fromPods { for _, toPod := range toPods { command := []string{"curl", "-S", "-s", "-o", "/dev/null", toPod.Status.PodIP} - err := e2e_pod.Exec(clientset, config, fromPod, command) - if shouldHaveConnection { - Expect(err).NotTo(HaveOccurred()) - } else { - Expect(err).Should(HaveOccurred()) - } + e2e_pod.Exec(clientset, config, fromPod, command, shouldHaveConnection) } } } -func CheckOutboundConnection(clientset *kubernetes.Clientset, config *restclient.Config, pod v1.Pod) error { +func CheckOutboundConnection(clientset *kubernetes.Clientset, config *restclient.Config, pod v1.Pod) { command := []string{"curl", "-S", "-s", "-o", "/dev/null", "www.bing.com"} - return e2e_pod.Exec(clientset, config, pod, command) + e2e_pod.Exec(clientset, config, pod, command, true) } func ApplyNetworkPolicy(ctx context.Context, clientset *kubernetes.Clientset, nwpolicyName string, namespace string, nwpolicyFileName string, policyDir string) { diff --git a/test/e2e/kubernetes/node/node.go b/test/e2e/kubernetes/node/node.go index 6aa5ab0ae7e..4a0b0edf7d5 100644 --- a/test/e2e/kubernetes/node/node.go +++ b/test/e2e/kubernetes/node/node.go @@ -22,8 +22,11 @@ import ( "context" "encoding/json" "fmt" + "log" "strings" + "time" + . "github.com/onsi/gomega" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -33,18 +36,28 @@ import ( "sigs.k8s.io/cluster-api-provider-azure/test/e2e/kubernetes/windows" ) +const ( + nodeOperationTimeout = 30 * time.Second + nodeOperationSleepBetweenRetries = 3 * time.Second +) + func GetWindowsVersion(ctx context.Context, clientset *kubernetes.Clientset) (windows.OSVersion, error) { options := metav1.ListOptions{ LabelSelector: "kubernetes.io/os=windows", } - result, err := clientset.CoreV1().Nodes().List(ctx, options) - if err != nil { - return windows.Unknown, err - } + var result *corev1.NodeList + Eventually(func() error { + var err error + result, err = clientset.CoreV1().Nodes().List(ctx, options) + if err != nil { + return err + } - if len(result.Items) == 0 { - return windows.Unknown, fmt.Errorf("No Windows Nodes found.") - } + if len(result.Items) == 0 { + return fmt.Errorf("No Windows Nodes found.") + } + return nil + }, nodeOperationTimeout, nodeOperationSleepBetweenRetries).Should(Succeed()) kernalVersion := result.Items[0].Status.NodeInfo.KernelVersion kernalVersions := strings.Split(kernalVersion, ".") @@ -61,14 +74,20 @@ func GetWindowsVersion(ctx context.Context, clientset *kubernetes.Clientset) (wi } func TaintNode(clientset *kubernetes.Clientset, options metav1.ListOptions, taint *corev1.Taint) error { - result, err := clientset.CoreV1().Nodes().List(context.Background(), options) - if err != nil { - return err - } + var result *corev1.NodeList + Eventually(func() error { + var err error + result, err = clientset.CoreV1().Nodes().List(context.Background(), options) + if err != nil { + log.Printf("Error trying to list nodes %v: %s\n", options, err.Error()) + return err + } - if len(result.Items) == 0 { - return fmt.Errorf("No Nodes found.") - } + if len(result.Items) == 0 { + return fmt.Errorf("No Nodes found.") + } + return nil + }, nodeOperationTimeout, nodeOperationSleepBetweenRetries).Should(Succeed()) for _, n := range result.Items { newNode, needsUpdate := addOrUpdateTaint(&n, taint) @@ -76,7 +95,7 @@ func TaintNode(clientset *kubernetes.Clientset, options metav1.ListOptions, tain continue } - err = PatchNodeTaints(clientset, newNode.Name, &n, newNode) + err := PatchNodeTaints(clientset, newNode.Name, &n, newNode) if err != nil { return err } @@ -105,7 +124,14 @@ func PatchNodeTaints(clientset *kubernetes.Clientset, nodeName string, oldNode * return fmt.Errorf("failed to create patch for node %q: %v", nodeName, err) } - _, err = clientset.CoreV1().Nodes().Patch(context.Background(), nodeName, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}) + Eventually(func() error { + _, err := clientset.CoreV1().Nodes().Patch(context.Background(), nodeName, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}) + if err != nil { + log.Printf("Error updating node taints on node %s:%s\n", nodeName, err.Error()) + return err + } + return nil + }, nodeOperationTimeout, nodeOperationSleepBetweenRetries).Should(Succeed()) return err } diff --git a/test/e2e/kubernetes/pod/pod.go b/test/e2e/kubernetes/pod/pod.go index 63e02c013c3..1a0c5f18337 100644 --- a/test/e2e/kubernetes/pod/pod.go +++ b/test/e2e/kubernetes/pod/pod.go @@ -19,9 +19,11 @@ limitations under the License. package pod import ( - "bytes" + "fmt" "os" + "time" + . "github.com/onsi/gomega" v1 "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" @@ -29,7 +31,12 @@ import ( "k8s.io/kubectl/pkg/scheme" ) -func Exec(clientset *kubernetes.Clientset, config *restclient.Config, pod v1.Pod, command []string) error { +const ( + podExecOperationTimeout = 3 * time.Minute + podExecOperationSleepBetweenRetries = 3 * time.Second +) + +func Exec(clientset *kubernetes.Clientset, config *restclient.Config, pod v1.Pod, command []string, testSuccess bool) error { req := clientset.CoreV1().RESTClient().Post().Resource("pods").Name(pod.GetName()). Namespace(pod.GetNamespace()).SubResource("exec") option := &v1.PodExecOptions{ @@ -39,6 +46,9 @@ func Exec(clientset *kubernetes.Clientset, config *restclient.Config, pod v1.Pod Stderr: true, TTY: true, } + if !testSuccess { + option.Stderr = false + } req.VersionedParams( option, scheme.ParameterCodec, @@ -47,41 +57,20 @@ func Exec(clientset *kubernetes.Clientset, config *restclient.Config, pod v1.Pod if err != nil { return err } - err = exec.Stream(remotecommand.StreamOptions{ - Stdout: os.Stdout, - Stderr: os.Stderr, - }) - if err != nil { - return err - } + Eventually(func() error { + err = exec.Stream(remotecommand.StreamOptions{ + Stdout: os.Stdout, + Stderr: os.Stderr, + }) + if testSuccess { + return err + } + // If we get here we are validating that the command returned an expected error + if err == nil { + return fmt.Errorf("Expected error from command %s but got nil", command) + } + return nil + }, podExecOperationTimeout, podExecOperationSleepBetweenRetries).Should(Succeed()) return nil } - -func ExecWithOutput(clientset *kubernetes.Clientset, config *restclient.Config, pod v1.Pod, command []string) (*bytes.Buffer, error) { - req := clientset.CoreV1().RESTClient().Post().Resource("pods").Name(pod.GetName()). - Namespace(pod.GetNamespace()).SubResource("exec") - option := &v1.PodExecOptions{ - Command: command, - Stdin: false, - Stdout: true, - Stderr: true, - TTY: true, - } - req.VersionedParams( - option, - scheme.ParameterCodec, - ) - exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL()) - if err != nil { - return nil, err - } - stdout := bytes.NewBuffer(nil) - err = exec.Stream(remotecommand.StreamOptions{ - Stdout: stdout, - // needs to be populated or else exec hangs, but we don't need the output. Failures write to stdout when TTY enabled. - Stderr: bytes.NewBuffer(nil), - }) - - return stdout, err -} diff --git a/test/e2e/log.go b/test/e2e/log.go index 5e37e2fc716..36254a5bade 100644 --- a/test/e2e/log.go +++ b/test/e2e/log.go @@ -38,7 +38,17 @@ func Logf(format string, args ...interface{}) { logf("INFO", format, args...) } +// LogWarningf prints warning logs with a timestamp and formatting. +func LogWarningf(format string, args ...interface{}) { + logf("WARNING", format, args...) +} + // Log prints info logs with a timestamp. func Log(message string) { logf("INFO", message) } + +// Log prints warning logs with a timestamp. +func LogWarning(message string) { + logf("WARNING", message) +}