Skip to content

Commit 73fac8e

Browse files
Retry stream connection
Even though RabbitMQ diagnostics tool says that the stream port is ready to accept connections, the fact to the matter is that the stream client cannot connect on the first attempt. The stream client connects via the k8s nodePort not directly to the stream port.
1 parent 9b857b9 commit 73fac8e

File tree

2 files changed

+34
-13
lines changed

2 files changed

+34
-13
lines changed

system_tests/system_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -623,11 +623,12 @@ CONSOLE_LOG=new`
623623
Expect(rmqClusterClient.Delete(context.TODO(), cluster)).To(Succeed())
624624
})
625625

626-
It("publishes and consumes a message", func() {
626+
FIt("publishes and consumes a message", func() {
627627
if !hasFeatureEnabled(cluster, "stream_queue") {
628628
Skip("rabbitmq_stream plugin is not supported by RabbitMQ image " + cluster.Spec.Image)
629629
}else {
630-
fmt.Printf("Stream feature is enabled ")
630+
fmt.Println("Stream feature is enabled ")
631+
waitForPortConnectivity(cluster)
631632
waitForPortReadiness(cluster, 5552) // stream
632633
publishAndConsumeStreamMsg(hostname, rabbitmqNodePort(ctx, clientSet, cluster, "stream"), username, password)
633634
}

system_tests/utils.go

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -597,6 +597,17 @@ func waitForRabbitmqRunningWithOffset(cluster *rabbitmqv1beta1.RabbitmqCluster,
597597
ExpectWithOffset(callStackOffset, err).NotTo(HaveOccurred())
598598
}
599599

600+
func waitForPortConnectivity(cluster *rabbitmqv1beta1.RabbitmqCluster) {
601+
waitForPortConnectivityWithOffset(cluster, 2)
602+
}
603+
func waitForPortConnectivityWithOffset(cluster *rabbitmqv1beta1.RabbitmqCluster, callStackOffset int) {
604+
EventuallyWithOffset(callStackOffset, func() error {
605+
_, err := kubectlExec(cluster.Namespace, statefulSetPodName(cluster, 0), "rabbitmq",
606+
"rabbitmq-diagnostics", "check_port_connectivity")
607+
return err
608+
}, portReadinessTimeout, 3).Should(Not(HaveOccurred()))
609+
}
610+
600611
func waitForPortReadiness(cluster *rabbitmqv1beta1.RabbitmqCluster, port int) {
601612
waitForPortReadinessWithOffset(cluster, port, 2)
602613
}
@@ -872,8 +883,6 @@ func publishAndConsumeMQTTMsg(hostname, port, username, password string, overWeb
872883

873884
var token mqtt.Token
874885
EventuallyWithOffset(1, func() bool {
875-
fmt.Printf("Attempt to connect using MQTT to url %s ( %+v\n )", url, opts)
876-
877886
token = c.Connect()
878887
// Waits for the network request to reach the destination and receive a response
879888
if !token.WaitTimeout(30 * time.Second) {
@@ -987,15 +996,26 @@ func publishAndConsumeStreamMsg(host, port, username, password string) {
987996
portInt, err := strconv.Atoi(port)
988997
Expect(err).ToNot(HaveOccurred())
989998

990-
env, err := stream.NewEnvironment(stream.NewEnvironmentOptions().
991-
SetHost(host).
992-
SetPort(portInt).
993-
SetPassword(password).
994-
SetUser(username).
995-
SetAddressResolver(stream.AddressResolver{
996-
Host: host,
997-
Port: portInt,
998-
}))
999+
var env *stream.Environment
1000+
for retry := 0; retry < 5; retry++ {
1001+
fmt.Println("connecting to stream endpoint ...")
1002+
env, err = stream.NewEnvironment(stream.NewEnvironmentOptions().
1003+
SetHost(host).
1004+
SetPort(portInt).
1005+
SetPassword(password).
1006+
SetUser(username).
1007+
SetAddressResolver(stream.AddressResolver{
1008+
Host: host,
1009+
Port: portInt,
1010+
}))
1011+
if err == nil {
1012+
fmt.Println("connected to stream endpoint")
1013+
break
1014+
}else {
1015+
fmt.Errorf("failed to connect to stream endpoint (%s:%d) due to %g\n", host, portInt, err)
1016+
}
1017+
time.Sleep(portReadinessTimeout)
1018+
}
9991019
Expect(err).ToNot(HaveOccurred())
10001020

10011021
const streamName = "system-test-stream"

0 commit comments

Comments
 (0)