From 8e329a2447a9c2793855df2539e4e2f08fc1e151 Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Thu, 14 Oct 2021 16:48:54 +0200 Subject: [PATCH 01/13] Raise mqtt connect wait time I noticed that it randomly failed establishing connection. Raising it from 10 to 30 seconds seems to have fixed it, at least, when running in GKE. --- system_tests/system_test.go | 35 +++++++++++++++++++++++++++++++++-- system_tests/utils.go | 8 ++++++-- 2 files changed, 39 insertions(+), 4 deletions(-) diff --git a/system_tests/system_test.go b/system_tests/system_test.go index 50dda29bd..1fdbef4e2 100644 --- a/system_tests/system_test.go +++ b/system_tests/system_test.go @@ -586,13 +586,44 @@ CONSOLE_LOG=new` By("STOMP") publishAndConsumeSTOMPMsg(hostname, rabbitmqNodePort(ctx, clientSet, cluster, "stomp"), username, password, nil) - // github.com/go-stomp/stomp does not support STOMP-over-WebSockets + }) + + }) - By("stream") + FWhen("stream plugin is enabled", func() { + var ( + cluster *rabbitmqv1beta1.RabbitmqCluster + hostname string + username string + password string + ) + + BeforeEach(func() { + instanceName := "stream" + cluster = newRabbitmqCluster(namespace, instanceName) + cluster.Spec.Service.Type = "NodePort" + cluster.Spec.Rabbitmq.AdditionalPlugins = []rabbitmqv1beta1.Plugin{ + "rabbitmq_stream", + } + Expect(createRabbitmqCluster(ctx, rmqClusterClient, cluster)).To(Succeed()) + waitForRabbitmqRunning(cluster) + + hostname = kubernetesNodeIp(ctx, clientSet) + var err error + username, password, err = getUsernameAndPassword(ctx, clientSet, "rabbitmq-system", instanceName) + Expect(err).NotTo(HaveOccurred()) + }) + + AfterEach(func() { + Expect(rmqClusterClient.Delete(context.TODO(), cluster)).To(Succeed()) + }) + + FIt("publishes and consumes a message", func() { if strings.Contains(cluster.Spec.Image, ":3.8") || strings.HasSuffix(cluster.Spec.Image, "tanzu-rabbitmq:1") { Skip("rabbitmq_stream plugin is not supported by RabbitMQ image " + cluster.Spec.Image) } publishAndConsumeStreamMsg(hostname, rabbitmqNodePort(ctx, clientSet, cluster, "stream"), username, password) }) + }) }) diff --git a/system_tests/utils.go b/system_tests/utils.go index fa222a73a..926437d24 100644 --- a/system_tests/utils.go +++ b/system_tests/utils.go @@ -837,17 +837,21 @@ func publishAndConsumeMQTTMsg(hostname, port, username, password string, overWeb var token mqtt.Token EventuallyWithOffset(1, func() bool { + fmt.Printf("Attempt to connect using MQTT to url %s ( %+v\n )", url, opts) + token = c.Connect() // Waits for the network request to reach the destination and receive a response - if !token.WaitTimeout(3 * time.Second) { + if !token.WaitTimeout(30 * time.Second) { + fmt.Printf("Timed out\n") return false } if err := token.Error(); err == nil { + fmt.Printf("Connected !\n") return true } return false - }, 30, 2).Should(BeTrue(), "Expected to be able to connect to MQTT port") + }, 30, 20).Should(BeTrue(), "Expected to be able to connect to MQTT port") topic := "tests/mqtt" msgReceived := false From 349a250e9e0a9b2012c7564a8aeb8640b5baf387 Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Wed, 20 Oct 2021 12:54:47 +0200 Subject: [PATCH 02/13] Wait until mqtt/stomp ports are ready and wait until stream port is ready only when the feature flag stream_queue is enabled --- system_tests/system_test.go | 15 ++++++++++----- system_tests/utils.go | 34 ++++++++++++++++++++++++++++++++++ 2 files changed, 44 insertions(+), 5 deletions(-) diff --git a/system_tests/system_test.go b/system_tests/system_test.go index 1fdbef4e2..fc60fb7c3 100644 --- a/system_tests/system_test.go +++ b/system_tests/system_test.go @@ -545,7 +545,7 @@ CONSOLE_LOG=new` }) }) - When("(web) MQTT, STOMP, and stream plugins are enabled", func() { + When("(web) MQTT, STOMP are enabled", func() { var ( cluster *rabbitmqv1beta1.RabbitmqCluster hostname string @@ -565,6 +565,8 @@ CONSOLE_LOG=new` } Expect(createRabbitmqCluster(ctx, rmqClusterClient, cluster)).To(Succeed()) waitForRabbitmqRunning(cluster) + waitForPortReadiness(cluster, 1883) // mqtt + waitForPortReadiness(cluster, 61613) // stomp hostname = kubernetesNodeIp(ctx, clientSet) var err error @@ -587,7 +589,7 @@ CONSOLE_LOG=new` publishAndConsumeSTOMPMsg(hostname, rabbitmqNodePort(ctx, clientSet, cluster, "stomp"), username, password, nil) }) - + }) FWhen("stream plugin is enabled", func() { @@ -618,11 +620,14 @@ CONSOLE_LOG=new` Expect(rmqClusterClient.Delete(context.TODO(), cluster)).To(Succeed()) }) - FIt("publishes and consumes a message", func() { - if strings.Contains(cluster.Spec.Image, ":3.8") || strings.HasSuffix(cluster.Spec.Image, "tanzu-rabbitmq:1") { + It("publishes and consumes a message", func() { + if !hasFeatureEnabled(cluster, "stream_queue") { Skip("rabbitmq_stream plugin is not supported by RabbitMQ image " + cluster.Spec.Image) + }else { + fmt.Printf("Stream feature is enabled ") + waitForPortReadiness(cluster, 5552) // stream + publishAndConsumeStreamMsg(hostname, rabbitmqNodePort(ctx, clientSet, cluster, "stream"), username, password) } - publishAndConsumeStreamMsg(hostname, rabbitmqNodePort(ctx, clientSet, cluster, "stream"), username, password) }) }) diff --git a/system_tests/utils.go b/system_tests/utils.go index 926437d24..2ccaca2f8 100644 --- a/system_tests/utils.go +++ b/system_tests/utils.go @@ -59,12 +59,15 @@ import ( ) const podCreationTimeout = 10 * time.Minute +const portReadinessTimeout = 10 * time.Second type featureFlag struct { Name string State string } + + func MustHaveEnv(name string) string { value := os.Getenv(name) if value == "" { @@ -593,6 +596,37 @@ func waitForRabbitmqRunningWithOffset(cluster *rabbitmqv1beta1.RabbitmqCluster, ExpectWithOffset(callStackOffset, err).NotTo(HaveOccurred()) } +func waitForPortReadiness(cluster *rabbitmqv1beta1.RabbitmqCluster, port int) { + waitForPortReadinessWithOffset(cluster, port, 2) +} +func waitForPortReadinessWithOffset(cluster *rabbitmqv1beta1.RabbitmqCluster, port int, callStackOffset int) { + EventuallyWithOffset(callStackOffset, func() error { + _, err := kubectlExec(cluster.Namespace, statefulSetPodName(cluster, 0), "rabbitmq", + "rabbitmq-diagnostics", "check_port_listener", strconv.Itoa(port)) + return err + }, portReadinessTimeout, 3).Should(Not(HaveOccurred())) +} + +func hasFeatureEnabled(cluster *rabbitmqv1beta1.RabbitmqCluster, featureFlagName string) bool { + output, err := kubectlExec(cluster.Namespace, + statefulSetPodName(cluster, 0), + "rabbitmq", + "rabbitmqctl", + "list_feature_flags", + "--formatter=json", + ) + Expect(err).NotTo(HaveOccurred()) + var flags []featureFlag + Expect(json.Unmarshal(output, &flags)).To(Succeed()) + + for _, v := range flags { + if v.Name == featureFlagName && v.State == "enabled" { + return true + } + } + return false +} + // asserts an event with reason: "TLSError", occurs for the cluster in it's namespace func assertTLSError(cluster *rabbitmqv1beta1.RabbitmqCluster) { var err error From 7abef6aa6be8ad23af599413cb8d6a14fe64bd36 Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Wed, 20 Oct 2021 15:55:39 +0200 Subject: [PATCH 03/13] Ramp up timeout to wait for ConfigMap annotation to 1minute. We still have 3 more minutes before the annotation is removed. --- system_tests/system_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/system_tests/system_test.go b/system_tests/system_test.go index fc60fb7c3..18107a922 100644 --- a/system_tests/system_test.go +++ b/system_tests/system_test.go @@ -151,7 +151,7 @@ var _ = Describe("Operator", func() { Expect(rmqClusterClient.Delete(context.TODO(), cluster)).To(Succeed()) }) - It("keeps rabbitmq server related configurations up-to-date", func() { + FIt("keeps rabbitmq server related configurations up-to-date", func() { By("updating enabled plugins and the secret ports when additionalPlugins are modified", func() { // modify rabbitmqcluster.spec.rabbitmq.additionalPlugins Expect(updateRabbitmqCluster(ctx, rmqClusterClient, cluster.Name, cluster.Namespace, func(cluster *rabbitmqv1beta1.RabbitmqCluster) { @@ -164,7 +164,7 @@ var _ = Describe("Operator", func() { Expect(err).ToNot(HaveOccurred()) return configMap.Annotations } - Eventually(getConfigMapAnnotations, 30, 1).Should( + Eventually(getConfigMapAnnotations, 1*time.Minute, 1).Should( HaveKey("rabbitmq.com/pluginsUpdatedAt"), "plugins ConfigMap should have been annotated") Eventually(getConfigMapAnnotations, 4*time.Minute, 1).Should( Not(HaveKey("rabbitmq.com/pluginsUpdatedAt")), "plugins ConfigMap annotation should have been removed") @@ -592,7 +592,7 @@ CONSOLE_LOG=new` }) - FWhen("stream plugin is enabled", func() { + When("stream plugin is enabled", func() { var ( cluster *rabbitmqv1beta1.RabbitmqCluster hostname string From 49f10a3bf9979f3ebbc727c4ab81cb3a480fbcdf Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Wed, 20 Oct 2021 16:34:04 +0200 Subject: [PATCH 04/13] Ramp up timeout when calling k8s api to get RabbitMQCluster resource. Also provided a description of asserted generation values to help troubleshoot when it occurs. --- system_tests/system_test.go | 9 +++++---- system_tests/utils.go | 3 ++- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/system_tests/system_test.go b/system_tests/system_test.go index 18107a922..bdd4495c4 100644 --- a/system_tests/system_test.go +++ b/system_tests/system_test.go @@ -66,7 +66,7 @@ var _ = Describe("Operator", func() { Expect(rmqClusterClient.Delete(context.TODO(), cluster)).To(Succeed()) }) - It("works", func() { + FIt("works", func() { By("publishing and consuming a message", func() { response := alivenessTest(hostname, port, username, password) Expect(response.Status).To(Equal("ok")) @@ -115,7 +115,8 @@ var _ = Describe("Operator", func() { Eventually(func() bool { Expect(rmqClusterClient.Get(ctx, types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}, fetchedRmq)).To(Succeed()) return fetchedRmq.Status.ObservedGeneration == fetchedRmq.Generation - }, 30).Should(BeTrue()) + }, k8sQueryTimeout, 10).Should(BeTrue(), fmt.Sprintf("expected %d (Status.ObservedGeneration) = %d (Generation)", + fetchedRmq.Status.ObservedGeneration, fetchedRmq.Generation)) }) By("having all feature flags enabled", func() { @@ -151,7 +152,7 @@ var _ = Describe("Operator", func() { Expect(rmqClusterClient.Delete(context.TODO(), cluster)).To(Succeed()) }) - FIt("keeps rabbitmq server related configurations up-to-date", func() { + It("keeps rabbitmq server related configurations up-to-date", func() { By("updating enabled plugins and the secret ports when additionalPlugins are modified", func() { // modify rabbitmqcluster.spec.rabbitmq.additionalPlugins Expect(updateRabbitmqCluster(ctx, rmqClusterClient, cluster.Name, cluster.Namespace, func(cluster *rabbitmqv1beta1.RabbitmqCluster) { @@ -329,7 +330,7 @@ CONSOLE_LOG=new` waitForRabbitmqRunning(cluster) }) - It("allows volume expansion", func() { + FIt("allows volume expansion", func() { podUID := pod(ctx, clientSet, cluster, 0).UID output, err := kubectlExec(namespace, statefulSetPodName(cluster, 0), "rabbitmq", "df", "/var/lib/rabbitmq/mnesia") Expect(err).ToNot(HaveOccurred()) diff --git a/system_tests/utils.go b/system_tests/utils.go index 2ccaca2f8..8035e0112 100644 --- a/system_tests/utils.go +++ b/system_tests/utils.go @@ -59,7 +59,8 @@ import ( ) const podCreationTimeout = 10 * time.Minute -const portReadinessTimeout = 10 * time.Second +const portReadinessTimeout = 1 * time.Minute +const k8sQueryTimeout = 1 * time.Minute type featureFlag struct { Name string From 6b1484d4f3098301777884108a9779e8acc415a1 Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Wed, 20 Oct 2021 16:58:52 +0200 Subject: [PATCH 05/13] Ramp up timeout to wait for PVC expansion TODO: Provide an assertion message that gives us information about the PVC such as events or conditions. So that we can know why it has not expanded. --- system_tests/system_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/system_tests/system_test.go b/system_tests/system_test.go index bdd4495c4..0c93c706c 100644 --- a/system_tests/system_test.go +++ b/system_tests/system_test.go @@ -66,7 +66,7 @@ var _ = Describe("Operator", func() { Expect(rmqClusterClient.Delete(context.TODO(), cluster)).To(Succeed()) }) - FIt("works", func() { + It("works", func() { By("publishing and consuming a message", func() { response := alivenessTest(hostname, port, username, password) Expect(response.Status).To(Equal("ok")) @@ -347,7 +347,7 @@ CONSOLE_LOG=new` pvc, err := clientSet.CoreV1().PersistentVolumeClaims(namespace).Get(ctx, pvcName, metav1.GetOptions{}) Expect(err).ToNot(HaveOccurred()) return pvc.Spec.Resources.Requests["storage"] - }, "5m", 10).Should(Equal(newCapacity)) + }, "10m", 10).Should(Equal(newCapacity)) // storage capacity reflected in the pod Eventually(func() int { From 9b857b902882ab7577ae2af79933a0ed082d109e Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Thu, 21 Oct 2021 11:10:21 +0200 Subject: [PATCH 06/13] Unfocus volume expansion test --- system_tests/system_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/system_tests/system_test.go b/system_tests/system_test.go index 0c93c706c..6deee08d2 100644 --- a/system_tests/system_test.go +++ b/system_tests/system_test.go @@ -330,7 +330,7 @@ CONSOLE_LOG=new` waitForRabbitmqRunning(cluster) }) - FIt("allows volume expansion", func() { + It("allows volume expansion", func() { podUID := pod(ctx, clientSet, cluster, 0).UID output, err := kubectlExec(namespace, statefulSetPodName(cluster, 0), "rabbitmq", "df", "/var/lib/rabbitmq/mnesia") Expect(err).ToNot(HaveOccurred()) @@ -346,6 +346,8 @@ CONSOLE_LOG=new` pvcName := cluster.PVCName(0) pvc, err := clientSet.CoreV1().PersistentVolumeClaims(namespace).Get(ctx, pvcName, metav1.GetOptions{}) Expect(err).ToNot(HaveOccurred()) + fmt.Printf("Retrieved PVC %s with conditions %+v\n", pvcName, pvc.Status.Conditions) + return pvc.Spec.Resources.Requests["storage"] }, "10m", 10).Should(Equal(newCapacity)) From 73fac8e0fddac948a500f16617d2b8f2df1fe01c Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Thu, 21 Oct 2021 13:10:49 +0200 Subject: [PATCH 07/13] Retry stream connection Even though RabbitMQ diagnostics tool says that the stream port is ready to accept connections, the fact to the matter is that the stream client cannot connect on the first attempt. The stream client connects via the k8s nodePort not directly to the stream port. --- system_tests/system_test.go | 5 +++-- system_tests/utils.go | 42 +++++++++++++++++++++++++++---------- 2 files changed, 34 insertions(+), 13 deletions(-) diff --git a/system_tests/system_test.go b/system_tests/system_test.go index 6deee08d2..3b2161b05 100644 --- a/system_tests/system_test.go +++ b/system_tests/system_test.go @@ -623,11 +623,12 @@ CONSOLE_LOG=new` Expect(rmqClusterClient.Delete(context.TODO(), cluster)).To(Succeed()) }) - It("publishes and consumes a message", func() { + FIt("publishes and consumes a message", func() { if !hasFeatureEnabled(cluster, "stream_queue") { Skip("rabbitmq_stream plugin is not supported by RabbitMQ image " + cluster.Spec.Image) }else { - fmt.Printf("Stream feature is enabled ") + fmt.Println("Stream feature is enabled ") + waitForPortConnectivity(cluster) waitForPortReadiness(cluster, 5552) // stream publishAndConsumeStreamMsg(hostname, rabbitmqNodePort(ctx, clientSet, cluster, "stream"), username, password) } diff --git a/system_tests/utils.go b/system_tests/utils.go index 8035e0112..c6c9bd4e6 100644 --- a/system_tests/utils.go +++ b/system_tests/utils.go @@ -597,6 +597,17 @@ func waitForRabbitmqRunningWithOffset(cluster *rabbitmqv1beta1.RabbitmqCluster, ExpectWithOffset(callStackOffset, err).NotTo(HaveOccurred()) } +func waitForPortConnectivity(cluster *rabbitmqv1beta1.RabbitmqCluster) { + waitForPortConnectivityWithOffset(cluster, 2) +} +func waitForPortConnectivityWithOffset(cluster *rabbitmqv1beta1.RabbitmqCluster, callStackOffset int) { + EventuallyWithOffset(callStackOffset, func() error { + _, err := kubectlExec(cluster.Namespace, statefulSetPodName(cluster, 0), "rabbitmq", + "rabbitmq-diagnostics", "check_port_connectivity") + return err + }, portReadinessTimeout, 3).Should(Not(HaveOccurred())) +} + func waitForPortReadiness(cluster *rabbitmqv1beta1.RabbitmqCluster, port int) { waitForPortReadinessWithOffset(cluster, port, 2) } @@ -872,8 +883,6 @@ func publishAndConsumeMQTTMsg(hostname, port, username, password string, overWeb var token mqtt.Token EventuallyWithOffset(1, func() bool { - fmt.Printf("Attempt to connect using MQTT to url %s ( %+v\n )", url, opts) - token = c.Connect() // Waits for the network request to reach the destination and receive a response if !token.WaitTimeout(30 * time.Second) { @@ -987,15 +996,26 @@ func publishAndConsumeStreamMsg(host, port, username, password string) { portInt, err := strconv.Atoi(port) Expect(err).ToNot(HaveOccurred()) - env, err := stream.NewEnvironment(stream.NewEnvironmentOptions(). - SetHost(host). - SetPort(portInt). - SetPassword(password). - SetUser(username). - SetAddressResolver(stream.AddressResolver{ - Host: host, - Port: portInt, - })) + var env *stream.Environment + for retry := 0; retry < 5; retry++ { + fmt.Println("connecting to stream endpoint ...") + env, err = stream.NewEnvironment(stream.NewEnvironmentOptions(). + SetHost(host). + SetPort(portInt). + SetPassword(password). + SetUser(username). + SetAddressResolver(stream.AddressResolver{ + Host: host, + Port: portInt, + })) + if err == nil { + fmt.Println("connected to stream endpoint") + break + }else { + fmt.Errorf("failed to connect to stream endpoint (%s:%d) due to %g\n", host, portInt, err) + } + time.Sleep(portReadinessTimeout) + } Expect(err).ToNot(HaveOccurred()) const streamName = "system-test-stream" From 07d03fe54ca4c63378553f0a5daa45f99a6c3c45 Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Thu, 21 Oct 2021 13:15:18 +0200 Subject: [PATCH 08/13] Use k8sQueryTimeout rather than hardcoding it --- system_tests/system_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/system_tests/system_test.go b/system_tests/system_test.go index 3b2161b05..c3bc962e3 100644 --- a/system_tests/system_test.go +++ b/system_tests/system_test.go @@ -165,7 +165,7 @@ var _ = Describe("Operator", func() { Expect(err).ToNot(HaveOccurred()) return configMap.Annotations } - Eventually(getConfigMapAnnotations, 1*time.Minute, 1).Should( + Eventually(getConfigMapAnnotations, k8sQueryTimeout, 1).Should( HaveKey("rabbitmq.com/pluginsUpdatedAt"), "plugins ConfigMap should have been annotated") Eventually(getConfigMapAnnotations, 4*time.Minute, 1).Should( Not(HaveKey("rabbitmq.com/pluginsUpdatedAt")), "plugins ConfigMap annotation should have been removed") From a7e0e7ad313461e7f681f500233241f883d0134e Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Thu, 21 Oct 2021 15:56:13 +0200 Subject: [PATCH 09/13] Unfocus --- system_tests/system_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/system_tests/system_test.go b/system_tests/system_test.go index c3bc962e3..f5e0f08b7 100644 --- a/system_tests/system_test.go +++ b/system_tests/system_test.go @@ -623,7 +623,7 @@ CONSOLE_LOG=new` Expect(rmqClusterClient.Delete(context.TODO(), cluster)).To(Succeed()) }) - FIt("publishes and consumes a message", func() { + It("publishes and consumes a message", func() { if !hasFeatureEnabled(cluster, "stream_queue") { Skip("rabbitmq_stream plugin is not supported by RabbitMQ image " + cluster.Spec.Image) }else { From 5de00ba1d0f249d7a28a7fdf267bc35772d02924 Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Thu, 21 Oct 2021 17:21:59 +0200 Subject: [PATCH 10/13] Use Eventually rather than for-loop to send a message via stream protocol --- system_tests/utils.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/system_tests/utils.go b/system_tests/utils.go index c6c9bd4e6..d0672c97e 100644 --- a/system_tests/utils.go +++ b/system_tests/utils.go @@ -997,7 +997,7 @@ func publishAndConsumeStreamMsg(host, port, username, password string) { Expect(err).ToNot(HaveOccurred()) var env *stream.Environment - for retry := 0; retry < 5; retry++ { + Eventually(func() error{ fmt.Println("connecting to stream endpoint ...") env, err = stream.NewEnvironment(stream.NewEnvironmentOptions(). SetHost(host). @@ -1010,13 +1010,12 @@ func publishAndConsumeStreamMsg(host, port, username, password string) { })) if err == nil { fmt.Println("connected to stream endpoint") - break + return nil }else { fmt.Errorf("failed to connect to stream endpoint (%s:%d) due to %g\n", host, portInt, err) } - time.Sleep(portReadinessTimeout) - } - Expect(err).ToNot(HaveOccurred()) + return err + }, portReadinessTimeout*5, portReadinessTimeout).ShouldNot(HaveOccurred()) const streamName = "system-test-stream" Expect(env.DeclareStream( From 5b973385b0672d8eaa9023941ceb176a071d9c10 Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Mon, 25 Oct 2021 15:26:49 +0200 Subject: [PATCH 11/13] Replace Errorf by Printf --- system_tests/utils.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/system_tests/utils.go b/system_tests/utils.go index d0672c97e..188560ccf 100644 --- a/system_tests/utils.go +++ b/system_tests/utils.go @@ -1012,7 +1012,7 @@ func publishAndConsumeStreamMsg(host, port, username, password string) { fmt.Println("connected to stream endpoint") return nil }else { - fmt.Errorf("failed to connect to stream endpoint (%s:%d) due to %g\n", host, portInt, err) + fmt.Printf("failed to connect to stream endpoint (%s:%d) due to %g\n", host, portInt, err) } return err }, portReadinessTimeout*5, portReadinessTimeout).ShouldNot(HaveOccurred()) From e97687d1248ee63ea2753bce027dfa9790a09e4b Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Fri, 29 Oct 2021 12:04:04 +0200 Subject: [PATCH 12/13] Move stream test case together with other plugin --- system_tests/system_test.go | 39 ++++--------------------------------- 1 file changed, 4 insertions(+), 35 deletions(-) diff --git a/system_tests/system_test.go b/system_tests/system_test.go index f5e0f08b7..c5ca67d69 100644 --- a/system_tests/system_test.go +++ b/system_tests/system_test.go @@ -548,7 +548,7 @@ CONSOLE_LOG=new` }) }) - When("(web) MQTT, STOMP are enabled", func() { + When("(web) MQTT, STOMP and stream are enabled", func() { var ( cluster *rabbitmqv1beta1.RabbitmqCluster hostname string @@ -591,48 +591,17 @@ CONSOLE_LOG=new` By("STOMP") publishAndConsumeSTOMPMsg(hostname, rabbitmqNodePort(ctx, clientSet, cluster, "stomp"), username, password, nil) - }) - - }) - - When("stream plugin is enabled", func() { - var ( - cluster *rabbitmqv1beta1.RabbitmqCluster - hostname string - username string - password string - ) - - BeforeEach(func() { - instanceName := "stream" - cluster = newRabbitmqCluster(namespace, instanceName) - cluster.Spec.Service.Type = "NodePort" - cluster.Spec.Rabbitmq.AdditionalPlugins = []rabbitmqv1beta1.Plugin{ - "rabbitmq_stream", - } - Expect(createRabbitmqCluster(ctx, rmqClusterClient, cluster)).To(Succeed()) - waitForRabbitmqRunning(cluster) - - hostname = kubernetesNodeIp(ctx, clientSet) - var err error - username, password, err = getUsernameAndPassword(ctx, clientSet, "rabbitmq-system", instanceName) - Expect(err).NotTo(HaveOccurred()) - }) - - AfterEach(func() { - Expect(rmqClusterClient.Delete(context.TODO(), cluster)).To(Succeed()) - }) - - It("publishes and consumes a message", func() { + By("Streams") if !hasFeatureEnabled(cluster, "stream_queue") { Skip("rabbitmq_stream plugin is not supported by RabbitMQ image " + cluster.Spec.Image) }else { - fmt.Println("Stream feature is enabled ") waitForPortConnectivity(cluster) waitForPortReadiness(cluster, 5552) // stream publishAndConsumeStreamMsg(hostname, rabbitmqNodePort(ctx, clientSet, cluster, "stream"), username, password) } }) + }) + }) From 88c80d9280f1e3b6936f395b319ec3fdf475c8eb Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Mon, 8 Nov 2021 10:09:33 +0100 Subject: [PATCH 13/13] Run go fmt on system_tests package --- system_tests/system_test.go | 5 ++--- system_tests/utils.go | 8 +++----- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/system_tests/system_test.go b/system_tests/system_test.go index c5ca67d69..a71a204de 100644 --- a/system_tests/system_test.go +++ b/system_tests/system_test.go @@ -568,7 +568,7 @@ CONSOLE_LOG=new` } Expect(createRabbitmqCluster(ctx, rmqClusterClient, cluster)).To(Succeed()) waitForRabbitmqRunning(cluster) - waitForPortReadiness(cluster, 1883) // mqtt + waitForPortReadiness(cluster, 1883) // mqtt waitForPortReadiness(cluster, 61613) // stomp hostname = kubernetesNodeIp(ctx, clientSet) @@ -594,14 +594,13 @@ CONSOLE_LOG=new` By("Streams") if !hasFeatureEnabled(cluster, "stream_queue") { Skip("rabbitmq_stream plugin is not supported by RabbitMQ image " + cluster.Spec.Image) - }else { + } else { waitForPortConnectivity(cluster) waitForPortReadiness(cluster, 5552) // stream publishAndConsumeStreamMsg(hostname, rabbitmqNodePort(ctx, clientSet, cluster, "stream"), username, password) } }) - }) }) diff --git a/system_tests/utils.go b/system_tests/utils.go index 188560ccf..db940cdd3 100644 --- a/system_tests/utils.go +++ b/system_tests/utils.go @@ -67,8 +67,6 @@ type featureFlag struct { State string } - - func MustHaveEnv(name string) string { value := os.Getenv(name) if value == "" { @@ -996,8 +994,8 @@ func publishAndConsumeStreamMsg(host, port, username, password string) { portInt, err := strconv.Atoi(port) Expect(err).ToNot(HaveOccurred()) - var env *stream.Environment - Eventually(func() error{ + var env *stream.Environment + Eventually(func() error { fmt.Println("connecting to stream endpoint ...") env, err = stream.NewEnvironment(stream.NewEnvironmentOptions(). SetHost(host). @@ -1011,7 +1009,7 @@ func publishAndConsumeStreamMsg(host, port, username, password string) { if err == nil { fmt.Println("connected to stream endpoint") return nil - }else { + } else { fmt.Printf("failed to connect to stream endpoint (%s:%d) due to %g\n", host, portInt, err) } return err