Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 17 additions & 9 deletions system_tests/system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ var _ = Describe("Operator", func() {
Eventually(func() bool {
Expect(rmqClusterClient.Get(ctx, types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}, fetchedRmq)).To(Succeed())
return fetchedRmq.Status.ObservedGeneration == fetchedRmq.Generation
}, 30).Should(BeTrue())
}, k8sQueryTimeout, 10).Should(BeTrue(), fmt.Sprintf("expected %d (Status.ObservedGeneration) = %d (Generation)",
fetchedRmq.Status.ObservedGeneration, fetchedRmq.Generation))
})

By("having all feature flags enabled", func() {
Expand Down Expand Up @@ -164,7 +165,7 @@ var _ = Describe("Operator", func() {
Expect(err).ToNot(HaveOccurred())
return configMap.Annotations
}
Eventually(getConfigMapAnnotations, 30, 1).Should(
Eventually(getConfigMapAnnotations, k8sQueryTimeout, 1).Should(
HaveKey("rabbitmq.com/pluginsUpdatedAt"), "plugins ConfigMap should have been annotated")
Eventually(getConfigMapAnnotations, 4*time.Minute, 1).Should(
Not(HaveKey("rabbitmq.com/pluginsUpdatedAt")), "plugins ConfigMap annotation should have been removed")
Expand Down Expand Up @@ -345,8 +346,10 @@ CONSOLE_LOG=new`
pvcName := cluster.PVCName(0)
pvc, err := clientSet.CoreV1().PersistentVolumeClaims(namespace).Get(ctx, pvcName, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
fmt.Printf("Retrieved PVC %s with conditions %+v\n", pvcName, pvc.Status.Conditions)

return pvc.Spec.Resources.Requests["storage"]
}, "5m", 10).Should(Equal(newCapacity))
}, "10m", 10).Should(Equal(newCapacity))

// storage capacity reflected in the pod
Eventually(func() int {
Expand Down Expand Up @@ -545,7 +548,7 @@ CONSOLE_LOG=new`
})
})

When("(web) MQTT, STOMP, and stream plugins are enabled", func() {
When("(web) MQTT, STOMP and stream are enabled", func() {
var (
cluster *rabbitmqv1beta1.RabbitmqCluster
hostname string
Expand All @@ -565,6 +568,8 @@ CONSOLE_LOG=new`
}
Expect(createRabbitmqCluster(ctx, rmqClusterClient, cluster)).To(Succeed())
waitForRabbitmqRunning(cluster)
waitForPortReadiness(cluster, 1883) // mqtt
waitForPortReadiness(cluster, 61613) // stomp

hostname = kubernetesNodeIp(ctx, clientSet)
var err error
Expand All @@ -586,13 +591,16 @@ CONSOLE_LOG=new`
By("STOMP")
publishAndConsumeSTOMPMsg(hostname, rabbitmqNodePort(ctx, clientSet, cluster, "stomp"), username, password, nil)

// github.com/go-stomp/stomp does not support STOMP-over-WebSockets

By("stream")
if strings.Contains(cluster.Spec.Image, ":3.8") || strings.HasSuffix(cluster.Spec.Image, "tanzu-rabbitmq:1") {
By("Streams")
if !hasFeatureEnabled(cluster, "stream_queue") {
Skip("rabbitmq_stream plugin is not supported by RabbitMQ image " + cluster.Spec.Image)
} else {
waitForPortConnectivity(cluster)
waitForPortReadiness(cluster, 5552) // stream
publishAndConsumeStreamMsg(hostname, rabbitmqNodePort(ctx, clientSet, cluster, "stream"), username, password)
}
publishAndConsumeStreamMsg(hostname, rabbitmqNodePort(ctx, clientSet, cluster, "stream"), username, password)
})

})

})
80 changes: 68 additions & 12 deletions system_tests/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ import (
)

const podCreationTimeout = 10 * time.Minute
const portReadinessTimeout = 1 * time.Minute
const k8sQueryTimeout = 1 * time.Minute

type featureFlag struct {
Name string
Expand Down Expand Up @@ -593,6 +595,48 @@ func waitForRabbitmqRunningWithOffset(cluster *rabbitmqv1beta1.RabbitmqCluster,
ExpectWithOffset(callStackOffset, err).NotTo(HaveOccurred())
}

func waitForPortConnectivity(cluster *rabbitmqv1beta1.RabbitmqCluster) {
waitForPortConnectivityWithOffset(cluster, 2)
}
func waitForPortConnectivityWithOffset(cluster *rabbitmqv1beta1.RabbitmqCluster, callStackOffset int) {
EventuallyWithOffset(callStackOffset, func() error {
_, err := kubectlExec(cluster.Namespace, statefulSetPodName(cluster, 0), "rabbitmq",
"rabbitmq-diagnostics", "check_port_connectivity")
return err
}, portReadinessTimeout, 3).Should(Not(HaveOccurred()))
}

func waitForPortReadiness(cluster *rabbitmqv1beta1.RabbitmqCluster, port int) {
waitForPortReadinessWithOffset(cluster, port, 2)
}
func waitForPortReadinessWithOffset(cluster *rabbitmqv1beta1.RabbitmqCluster, port int, callStackOffset int) {
EventuallyWithOffset(callStackOffset, func() error {
_, err := kubectlExec(cluster.Namespace, statefulSetPodName(cluster, 0), "rabbitmq",
"rabbitmq-diagnostics", "check_port_listener", strconv.Itoa(port))
return err
}, portReadinessTimeout, 3).Should(Not(HaveOccurred()))
}

func hasFeatureEnabled(cluster *rabbitmqv1beta1.RabbitmqCluster, featureFlagName string) bool {
output, err := kubectlExec(cluster.Namespace,
statefulSetPodName(cluster, 0),
"rabbitmq",
"rabbitmqctl",
"list_feature_flags",
"--formatter=json",
)
Expect(err).NotTo(HaveOccurred())
var flags []featureFlag
Expect(json.Unmarshal(output, &flags)).To(Succeed())

for _, v := range flags {
if v.Name == featureFlagName && v.State == "enabled" {
return true
}
}
return false
}

// asserts an event with reason: "TLSError", occurs for the cluster in it's namespace
func assertTLSError(cluster *rabbitmqv1beta1.RabbitmqCluster) {
var err error
Expand Down Expand Up @@ -839,15 +883,17 @@ func publishAndConsumeMQTTMsg(hostname, port, username, password string, overWeb
EventuallyWithOffset(1, func() bool {
token = c.Connect()
// Waits for the network request to reach the destination and receive a response
if !token.WaitTimeout(3 * time.Second) {
if !token.WaitTimeout(30 * time.Second) {
fmt.Printf("Timed out\n")
return false
}

if err := token.Error(); err == nil {
fmt.Printf("Connected !\n")
return true
}
return false
}, 30, 2).Should(BeTrue(), "Expected to be able to connect to MQTT port")
}, 30, 20).Should(BeTrue(), "Expected to be able to connect to MQTT port")

topic := "tests/mqtt"
msgReceived := false
Expand Down Expand Up @@ -948,16 +994,26 @@ func publishAndConsumeStreamMsg(host, port, username, password string) {
portInt, err := strconv.Atoi(port)
Expect(err).ToNot(HaveOccurred())

env, err := stream.NewEnvironment(stream.NewEnvironmentOptions().
SetHost(host).
SetPort(portInt).
SetPassword(password).
SetUser(username).
SetAddressResolver(stream.AddressResolver{
Host: host,
Port: portInt,
}))
Expect(err).ToNot(HaveOccurred())
var env *stream.Environment
Eventually(func() error {
fmt.Println("connecting to stream endpoint ...")
env, err = stream.NewEnvironment(stream.NewEnvironmentOptions().
SetHost(host).
SetPort(portInt).
SetPassword(password).
SetUser(username).
SetAddressResolver(stream.AddressResolver{
Host: host,
Port: portInt,
}))
if err == nil {
fmt.Println("connected to stream endpoint")
return nil
} else {
fmt.Printf("failed to connect to stream endpoint (%s:%d) due to %g\n", host, portInt, err)
}
return err
}, portReadinessTimeout*5, portReadinessTimeout).ShouldNot(HaveOccurred())

const streamName = "system-test-stream"
Expect(env.DeclareStream(
Expand Down