Skip to content

Commit c91a0c8

Browse files
authored
Merge pull request #872 from rabbitmq/fix-system-tests
Fix system tests
2 parents 5f8578d + 88c80d9 commit c91a0c8

File tree

2 files changed

+85
-21
lines changed

2 files changed

+85
-21
lines changed

system_tests/system_test.go

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,8 @@ var _ = Describe("Operator", func() {
115115
Eventually(func() bool {
116116
Expect(rmqClusterClient.Get(ctx, types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}, fetchedRmq)).To(Succeed())
117117
return fetchedRmq.Status.ObservedGeneration == fetchedRmq.Generation
118-
}, 30).Should(BeTrue())
118+
}, k8sQueryTimeout, 10).Should(BeTrue(), fmt.Sprintf("expected %d (Status.ObservedGeneration) = %d (Generation)",
119+
fetchedRmq.Status.ObservedGeneration, fetchedRmq.Generation))
119120
})
120121

121122
By("having all feature flags enabled", func() {
@@ -164,7 +165,7 @@ var _ = Describe("Operator", func() {
164165
Expect(err).ToNot(HaveOccurred())
165166
return configMap.Annotations
166167
}
167-
Eventually(getConfigMapAnnotations, 30, 1).Should(
168+
Eventually(getConfigMapAnnotations, k8sQueryTimeout, 1).Should(
168169
HaveKey("rabbitmq.com/pluginsUpdatedAt"), "plugins ConfigMap should have been annotated")
169170
Eventually(getConfigMapAnnotations, 4*time.Minute, 1).Should(
170171
Not(HaveKey("rabbitmq.com/pluginsUpdatedAt")), "plugins ConfigMap annotation should have been removed")
@@ -345,8 +346,10 @@ CONSOLE_LOG=new`
345346
pvcName := cluster.PVCName(0)
346347
pvc, err := clientSet.CoreV1().PersistentVolumeClaims(namespace).Get(ctx, pvcName, metav1.GetOptions{})
347348
Expect(err).ToNot(HaveOccurred())
349+
fmt.Printf("Retrieved PVC %s with conditions %+v\n", pvcName, pvc.Status.Conditions)
350+
348351
return pvc.Spec.Resources.Requests["storage"]
349-
}, "5m", 10).Should(Equal(newCapacity))
352+
}, "10m", 10).Should(Equal(newCapacity))
350353

351354
// storage capacity reflected in the pod
352355
Eventually(func() int {
@@ -545,7 +548,7 @@ CONSOLE_LOG=new`
545548
})
546549
})
547550

548-
When("(web) MQTT, STOMP, and stream plugins are enabled", func() {
551+
When("(web) MQTT, STOMP and stream are enabled", func() {
549552
var (
550553
cluster *rabbitmqv1beta1.RabbitmqCluster
551554
hostname string
@@ -565,6 +568,8 @@ CONSOLE_LOG=new`
565568
}
566569
Expect(createRabbitmqCluster(ctx, rmqClusterClient, cluster)).To(Succeed())
567570
waitForRabbitmqRunning(cluster)
571+
waitForPortReadiness(cluster, 1883) // mqtt
572+
waitForPortReadiness(cluster, 61613) // stomp
568573

569574
hostname = kubernetesNodeIp(ctx, clientSet)
570575
var err error
@@ -586,13 +591,16 @@ CONSOLE_LOG=new`
586591
By("STOMP")
587592
publishAndConsumeSTOMPMsg(hostname, rabbitmqNodePort(ctx, clientSet, cluster, "stomp"), username, password, nil)
588593

589-
// github.com/go-stomp/stomp does not support STOMP-over-WebSockets
590-
591-
By("stream")
592-
if strings.Contains(cluster.Spec.Image, ":3.8") || strings.HasSuffix(cluster.Spec.Image, "tanzu-rabbitmq:1") {
594+
By("Streams")
595+
if !hasFeatureEnabled(cluster, "stream_queue") {
593596
Skip("rabbitmq_stream plugin is not supported by RabbitMQ image " + cluster.Spec.Image)
597+
} else {
598+
waitForPortConnectivity(cluster)
599+
waitForPortReadiness(cluster, 5552) // stream
600+
publishAndConsumeStreamMsg(hostname, rabbitmqNodePort(ctx, clientSet, cluster, "stream"), username, password)
594601
}
595-
publishAndConsumeStreamMsg(hostname, rabbitmqNodePort(ctx, clientSet, cluster, "stream"), username, password)
596602
})
603+
597604
})
605+
598606
})

system_tests/utils.go

Lines changed: 68 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ import (
5959
)
6060

6161
const podCreationTimeout = 10 * time.Minute
62+
const portReadinessTimeout = 1 * time.Minute
63+
const k8sQueryTimeout = 1 * time.Minute
6264

6365
type featureFlag struct {
6466
Name string
@@ -593,6 +595,48 @@ func waitForRabbitmqRunningWithOffset(cluster *rabbitmqv1beta1.RabbitmqCluster,
593595
ExpectWithOffset(callStackOffset, err).NotTo(HaveOccurred())
594596
}
595597

598+
func waitForPortConnectivity(cluster *rabbitmqv1beta1.RabbitmqCluster) {
599+
waitForPortConnectivityWithOffset(cluster, 2)
600+
}
601+
func waitForPortConnectivityWithOffset(cluster *rabbitmqv1beta1.RabbitmqCluster, callStackOffset int) {
602+
EventuallyWithOffset(callStackOffset, func() error {
603+
_, err := kubectlExec(cluster.Namespace, statefulSetPodName(cluster, 0), "rabbitmq",
604+
"rabbitmq-diagnostics", "check_port_connectivity")
605+
return err
606+
}, portReadinessTimeout, 3).Should(Not(HaveOccurred()))
607+
}
608+
609+
func waitForPortReadiness(cluster *rabbitmqv1beta1.RabbitmqCluster, port int) {
610+
waitForPortReadinessWithOffset(cluster, port, 2)
611+
}
612+
func waitForPortReadinessWithOffset(cluster *rabbitmqv1beta1.RabbitmqCluster, port int, callStackOffset int) {
613+
EventuallyWithOffset(callStackOffset, func() error {
614+
_, err := kubectlExec(cluster.Namespace, statefulSetPodName(cluster, 0), "rabbitmq",
615+
"rabbitmq-diagnostics", "check_port_listener", strconv.Itoa(port))
616+
return err
617+
}, portReadinessTimeout, 3).Should(Not(HaveOccurred()))
618+
}
619+
620+
func hasFeatureEnabled(cluster *rabbitmqv1beta1.RabbitmqCluster, featureFlagName string) bool {
621+
output, err := kubectlExec(cluster.Namespace,
622+
statefulSetPodName(cluster, 0),
623+
"rabbitmq",
624+
"rabbitmqctl",
625+
"list_feature_flags",
626+
"--formatter=json",
627+
)
628+
Expect(err).NotTo(HaveOccurred())
629+
var flags []featureFlag
630+
Expect(json.Unmarshal(output, &flags)).To(Succeed())
631+
632+
for _, v := range flags {
633+
if v.Name == featureFlagName && v.State == "enabled" {
634+
return true
635+
}
636+
}
637+
return false
638+
}
639+
596640
// asserts an event with reason: "TLSError", occurs for the cluster in it's namespace
597641
func assertTLSError(cluster *rabbitmqv1beta1.RabbitmqCluster) {
598642
var err error
@@ -839,15 +883,17 @@ func publishAndConsumeMQTTMsg(hostname, port, username, password string, overWeb
839883
EventuallyWithOffset(1, func() bool {
840884
token = c.Connect()
841885
// Waits for the network request to reach the destination and receive a response
842-
if !token.WaitTimeout(3 * time.Second) {
886+
if !token.WaitTimeout(30 * time.Second) {
887+
fmt.Printf("Timed out\n")
843888
return false
844889
}
845890

846891
if err := token.Error(); err == nil {
892+
fmt.Printf("Connected !\n")
847893
return true
848894
}
849895
return false
850-
}, 30, 2).Should(BeTrue(), "Expected to be able to connect to MQTT port")
896+
}, 30, 20).Should(BeTrue(), "Expected to be able to connect to MQTT port")
851897

852898
topic := "tests/mqtt"
853899
msgReceived := false
@@ -948,16 +994,26 @@ func publishAndConsumeStreamMsg(host, port, username, password string) {
948994
portInt, err := strconv.Atoi(port)
949995
Expect(err).ToNot(HaveOccurred())
950996

951-
env, err := stream.NewEnvironment(stream.NewEnvironmentOptions().
952-
SetHost(host).
953-
SetPort(portInt).
954-
SetPassword(password).
955-
SetUser(username).
956-
SetAddressResolver(stream.AddressResolver{
957-
Host: host,
958-
Port: portInt,
959-
}))
960-
Expect(err).ToNot(HaveOccurred())
997+
var env *stream.Environment
998+
Eventually(func() error {
999+
fmt.Println("connecting to stream endpoint ...")
1000+
env, err = stream.NewEnvironment(stream.NewEnvironmentOptions().
1001+
SetHost(host).
1002+
SetPort(portInt).
1003+
SetPassword(password).
1004+
SetUser(username).
1005+
SetAddressResolver(stream.AddressResolver{
1006+
Host: host,
1007+
Port: portInt,
1008+
}))
1009+
if err == nil {
1010+
fmt.Println("connected to stream endpoint")
1011+
return nil
1012+
} else {
1013+
fmt.Printf("failed to connect to stream endpoint (%s:%d) due to %g\n", host, portInt, err)
1014+
}
1015+
return err
1016+
}, portReadinessTimeout*5, portReadinessTimeout).ShouldNot(HaveOccurred())
9611017

9621018
const streamName = "system-test-stream"
9631019
Expect(env.DeclareStream(

0 commit comments

Comments
 (0)