Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 48 additions & 8 deletions system_tests/system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ var _ = Describe("Operator", func() {
Eventually(func() bool {
Expect(rmqClusterClient.Get(ctx, types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}, fetchedRmq)).To(Succeed())
return fetchedRmq.Status.ObservedGeneration == fetchedRmq.Generation
}, 30).Should(BeTrue())
}, k8sQueryTimeout, 10).Should(BeTrue(), fmt.Sprintf("expected %d (Status.ObservedGeneration) = %d (Generation)",
fetchedRmq.Status.ObservedGeneration, fetchedRmq.Generation))
})

By("having all feature flags enabled", func() {
Expand Down Expand Up @@ -164,7 +165,7 @@ var _ = Describe("Operator", func() {
Expect(err).ToNot(HaveOccurred())
return configMap.Annotations
}
Eventually(getConfigMapAnnotations, 30, 1).Should(
Eventually(getConfigMapAnnotations, k8sQueryTimeout, 1).Should(
HaveKey("rabbitmq.com/pluginsUpdatedAt"), "plugins ConfigMap should have been annotated")
Eventually(getConfigMapAnnotations, 4*time.Minute, 1).Should(
Not(HaveKey("rabbitmq.com/pluginsUpdatedAt")), "plugins ConfigMap annotation should have been removed")
Expand Down Expand Up @@ -345,8 +346,10 @@ CONSOLE_LOG=new`
pvcName := cluster.PVCName(0)
pvc, err := clientSet.CoreV1().PersistentVolumeClaims(namespace).Get(ctx, pvcName, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
fmt.Printf("Retrieved PVC %s with conditions %+v\n", pvcName, pvc.Status.Conditions)

return pvc.Spec.Resources.Requests["storage"]
}, "5m", 10).Should(Equal(newCapacity))
}, "10m", 10).Should(Equal(newCapacity))

// storage capacity reflected in the pod
Eventually(func() int {
Expand Down Expand Up @@ -545,7 +548,7 @@ CONSOLE_LOG=new`
})
})

When("(web) MQTT, STOMP, and stream plugins are enabled", func() {
When("(web) MQTT, STOMP are enabled", func() {
var (
cluster *rabbitmqv1beta1.RabbitmqCluster
hostname string
Expand All @@ -565,6 +568,8 @@ CONSOLE_LOG=new`
}
Expect(createRabbitmqCluster(ctx, rmqClusterClient, cluster)).To(Succeed())
waitForRabbitmqRunning(cluster)
waitForPortReadiness(cluster, 1883) // mqtt
waitForPortReadiness(cluster, 61613) // stomp

hostname = kubernetesNodeIp(ctx, clientSet)
var err error
Expand All @@ -586,13 +591,48 @@ CONSOLE_LOG=new`
By("STOMP")
publishAndConsumeSTOMPMsg(hostname, rabbitmqNodePort(ctx, clientSet, cluster, "stomp"), username, password, nil)

// github.com/go-stomp/stomp does not support STOMP-over-WebSockets
})

})

When("stream plugin is enabled", func() {
var (
cluster *rabbitmqv1beta1.RabbitmqCluster
hostname string
username string
password string
)

BeforeEach(func() {
instanceName := "stream"
cluster = newRabbitmqCluster(namespace, instanceName)
cluster.Spec.Service.Type = "NodePort"
cluster.Spec.Rabbitmq.AdditionalPlugins = []rabbitmqv1beta1.Plugin{
"rabbitmq_stream",
}
Expect(createRabbitmqCluster(ctx, rmqClusterClient, cluster)).To(Succeed())
waitForRabbitmqRunning(cluster)

hostname = kubernetesNodeIp(ctx, clientSet)
var err error
username, password, err = getUsernameAndPassword(ctx, clientSet, "rabbitmq-system", instanceName)
Expect(err).NotTo(HaveOccurred())
})

AfterEach(func() {
Expect(rmqClusterClient.Delete(context.TODO(), cluster)).To(Succeed())
})

By("stream")
if strings.Contains(cluster.Spec.Image, ":3.8") || strings.HasSuffix(cluster.Spec.Image, "tanzu-rabbitmq:1") {
It("publishes and consumes a message", func() {
if !hasFeatureEnabled(cluster, "stream_queue") {
Skip("rabbitmq_stream plugin is not supported by RabbitMQ image " + cluster.Spec.Image)
}else {
fmt.Println("Stream feature is enabled ")
waitForPortConnectivity(cluster)
waitForPortReadiness(cluster, 5552) // stream
publishAndConsumeStreamMsg(hostname, rabbitmqNodePort(ctx, clientSet, cluster, "stream"), username, password)
}
publishAndConsumeStreamMsg(hostname, rabbitmqNodePort(ctx, clientSet, cluster, "stream"), username, password)
})

})
})
82 changes: 70 additions & 12 deletions system_tests/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,16 @@ import (
)

const podCreationTimeout = 10 * time.Minute
const portReadinessTimeout = 1 * time.Minute
const k8sQueryTimeout = 1 * time.Minute

type featureFlag struct {
Name string
State string
}



func MustHaveEnv(name string) string {
value := os.Getenv(name)
if value == "" {
Expand Down Expand Up @@ -593,6 +597,48 @@ func waitForRabbitmqRunningWithOffset(cluster *rabbitmqv1beta1.RabbitmqCluster,
ExpectWithOffset(callStackOffset, err).NotTo(HaveOccurred())
}

func waitForPortConnectivity(cluster *rabbitmqv1beta1.RabbitmqCluster) {
waitForPortConnectivityWithOffset(cluster, 2)
}
func waitForPortConnectivityWithOffset(cluster *rabbitmqv1beta1.RabbitmqCluster, callStackOffset int) {
EventuallyWithOffset(callStackOffset, func() error {
_, err := kubectlExec(cluster.Namespace, statefulSetPodName(cluster, 0), "rabbitmq",
"rabbitmq-diagnostics", "check_port_connectivity")
return err
}, portReadinessTimeout, 3).Should(Not(HaveOccurred()))
}

func waitForPortReadiness(cluster *rabbitmqv1beta1.RabbitmqCluster, port int) {
waitForPortReadinessWithOffset(cluster, port, 2)
}
func waitForPortReadinessWithOffset(cluster *rabbitmqv1beta1.RabbitmqCluster, port int, callStackOffset int) {
EventuallyWithOffset(callStackOffset, func() error {
_, err := kubectlExec(cluster.Namespace, statefulSetPodName(cluster, 0), "rabbitmq",
"rabbitmq-diagnostics", "check_port_listener", strconv.Itoa(port))
return err
}, portReadinessTimeout, 3).Should(Not(HaveOccurred()))
}

func hasFeatureEnabled(cluster *rabbitmqv1beta1.RabbitmqCluster, featureFlagName string) bool {
output, err := kubectlExec(cluster.Namespace,
statefulSetPodName(cluster, 0),
"rabbitmq",
"rabbitmqctl",
"list_feature_flags",
"--formatter=json",
)
Expect(err).NotTo(HaveOccurred())
var flags []featureFlag
Expect(json.Unmarshal(output, &flags)).To(Succeed())

for _, v := range flags {
if v.Name == featureFlagName && v.State == "enabled" {
return true
}
}
return false
}

// asserts an event with reason: "TLSError", occurs for the cluster in it's namespace
func assertTLSError(cluster *rabbitmqv1beta1.RabbitmqCluster) {
var err error
Expand Down Expand Up @@ -839,15 +885,17 @@ func publishAndConsumeMQTTMsg(hostname, port, username, password string, overWeb
EventuallyWithOffset(1, func() bool {
token = c.Connect()
// Waits for the network request to reach the destination and receive a response
if !token.WaitTimeout(3 * time.Second) {
if !token.WaitTimeout(30 * time.Second) {
fmt.Printf("Timed out\n")
return false
}

if err := token.Error(); err == nil {
fmt.Printf("Connected !\n")
return true
}
return false
}, 30, 2).Should(BeTrue(), "Expected to be able to connect to MQTT port")
}, 30, 20).Should(BeTrue(), "Expected to be able to connect to MQTT port")

topic := "tests/mqtt"
msgReceived := false
Expand Down Expand Up @@ -948,16 +996,26 @@ func publishAndConsumeStreamMsg(host, port, username, password string) {
portInt, err := strconv.Atoi(port)
Expect(err).ToNot(HaveOccurred())

env, err := stream.NewEnvironment(stream.NewEnvironmentOptions().
SetHost(host).
SetPort(portInt).
SetPassword(password).
SetUser(username).
SetAddressResolver(stream.AddressResolver{
Host: host,
Port: portInt,
}))
Expect(err).ToNot(HaveOccurred())
var env *stream.Environment
Eventually(func() error{
fmt.Println("connecting to stream endpoint ...")
env, err = stream.NewEnvironment(stream.NewEnvironmentOptions().
SetHost(host).
SetPort(portInt).
SetPassword(password).
SetUser(username).
SetAddressResolver(stream.AddressResolver{
Host: host,
Port: portInt,
}))
if err == nil {
fmt.Println("connected to stream endpoint")
return nil
}else {
fmt.Errorf("failed to connect to stream endpoint (%s:%d) due to %g\n", host, portInt, err)
}
return err
}, portReadinessTimeout*5, portReadinessTimeout).ShouldNot(HaveOccurred())

const streamName = "system-test-stream"
Expect(env.DeclareStream(
Expand Down