@@ -16,7 +16,6 @@ import (
1616 "crypto/x509"
1717 "encoding/json"
1818 "fmt"
19- "gopkg.in/ini.v1"
2019 "io"
2120 "io/ioutil"
2221 "log"
@@ -28,6 +27,8 @@ import (
2827 "strings"
2928 "time"
3029
30+ "gopkg.in/ini.v1"
31+
3132 k8sresource "k8s.io/apimachinery/pkg/api/resource"
3233
3334 mgmtApi "github.com/michaelklishin/rabbit-hole/v2"
@@ -41,6 +42,8 @@ import (
4142 mqtt "github.com/eclipse/paho.mqtt.golang"
4243 "github.com/go-stomp/stomp"
4344 rabbitmqv1beta1 "github.com/rabbitmq/cluster-operator/api/v1beta1"
45+ streamamqp "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
46+ "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/streaming"
4447 "github.com/streadway/amqp"
4548 corev1 "k8s.io/api/core/v1"
4649 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -821,10 +824,10 @@ func createCertificateChain(hostname string, caCertWriter, certWriter, keyWriter
821824 return caCert , caKey , nil
822825}
823826
824- func publishAndConsumeMQTTMsg (hostname , nodePort , username , password string , overWebSocket bool , tlsConfig * tls.Config ) {
825- url := fmt .Sprintf ("tcp://%s:%s" , hostname , nodePort )
827+ func publishAndConsumeMQTTMsg (hostname , port , username , password string , overWebSocket bool , tlsConfig * tls.Config ) {
828+ url := fmt .Sprintf ("tcp://%s:%s" , hostname , port )
826829 if overWebSocket {
827- url = fmt .Sprintf ("ws://%s:%s/ws" , hostname , nodePort )
830+ url = fmt .Sprintf ("ws://%s:%s/ws" , hostname , port )
828831 }
829832 opts := mqtt .NewClientOptions ().
830833 AddBroker (url ).
@@ -834,7 +837,7 @@ func publishAndConsumeMQTTMsg(hostname, nodePort, username, password string, ove
834837 SetProtocolVersion (4 ) // RabbitMQ MQTT plugin targets MQTT 3.1.1
835838
836839 if tlsConfig != nil {
837- url = fmt .Sprintf ("ssl://%s:%s" , hostname , nodePort )
840+ url = fmt .Sprintf ("ssl://%s:%s" , hostname , port )
838841 opts = opts .
839842 AddBroker (url ).
840843 SetTLSConfig (tlsConfig )
@@ -885,13 +888,13 @@ func publishAndConsumeMQTTMsg(hostname, nodePort, username, password string, ove
885888 c .Disconnect (250 )
886889}
887890
888- func publishAndConsumeSTOMPMsg (hostname , stompNodePort , username , password string , tlsConfig * tls.Config ) {
891+ func publishAndConsumeSTOMPMsg (hostname , port , username , password string , tlsConfig * tls.Config ) {
889892 var conn * stomp.Conn
890893 var err error
891894
892895 // Create a secure tls.Conn and pass to stomp.Connect, otherwise use Stomp.Dial
893896 if tlsConfig != nil {
894- secureConn , err := tls .Dial ("tcp" , fmt .Sprintf ("%s:%s" , hostname , stompNodePort ), tlsConfig )
897+ secureConn , err := tls .Dial ("tcp" , fmt .Sprintf ("%s:%s" , hostname , port ), tlsConfig )
895898 ExpectWithOffset (1 , err ).NotTo (HaveOccurred ())
896899 defer secureConn .Close ()
897900
@@ -913,7 +916,7 @@ func publishAndConsumeSTOMPMsg(hostname, stompNodePort, username, password strin
913916 for retry := 0 ; retry < 5 ; retry ++ {
914917 fmt .Printf ("Attempt #%d to connect using STOMP\n " , retry )
915918 conn , err = stomp .Dial ("tcp" ,
916- fmt .Sprintf ("%s:%s" , hostname , stompNodePort ),
919+ fmt .Sprintf ("%s:%s" , hostname , port ),
917920 stomp .ConnOpt .Login (username , password ),
918921 stomp .ConnOpt .AcceptVersion (stomp .V12 ),
919922 stomp .ConnOpt .Host ("/" ),
@@ -951,6 +954,42 @@ func publishAndConsumeSTOMPMsg(hostname, stompNodePort, username, password strin
951954 ExpectWithOffset (1 , conn .Disconnect ()).To (Succeed ())
952955}
953956
957+ func publishAndConsumeStreamMsg (ctx context.Context , hostname , port , username , password string ) {
958+ uri := fmt .Sprintf ("rabbitmq-streaming://%s:%s@%s:%s/%%2f" , username , password , hostname , port )
959+ client , err := streaming .NewClientCreator ().Uri (uri ).Connect ()
960+ Expect (err ).ToNot (HaveOccurred ())
961+
962+ streamName := "system-test-stream"
963+ Expect (client .StreamCreator ().Stream (streamName ).Create ()).To (Succeed ())
964+
965+ var msgReceived []byte
966+ consumer , err := client .ConsumerCreator ().
967+ Stream (streamName ).
968+ Name ("system-test-consumer" ).
969+ MessagesHandler (func (context streaming.ConsumerContext , message * streamamqp.Message ) {
970+ Expect (message .Data ).To (HaveLen (1 ))
971+ msgReceived = message .Data [0 ]
972+ }).Build ()
973+ Expect (err ).ToNot (HaveOccurred ())
974+
975+ msgSent := []byte ("test message stream" )
976+ producer , err := client .ProducerCreator ().Stream (streamName ).Build ()
977+ Expect (err ).ToNot (HaveOccurred ())
978+ _ , err = producer .BatchPublish (ctx , []* streamamqp.Message {
979+ streamamqp .NewMessage (msgSent )},
980+ )
981+ Expect (err ).ToNot (HaveOccurred ())
982+
983+ Eventually (func () []byte {
984+ return msgReceived
985+ }, 5 * time .Second ).Should (Equal (msgSent ), "consumer should receive message sent by producer" )
986+
987+ Expect (producer .Close ()).To (Succeed ())
988+ Expect (consumer .UnSubscribe ()).To (Succeed ())
989+ Expect (client .DeleteStream (streamName )).To (Succeed ())
990+ Expect (client .Close ()).To (Succeed ())
991+ }
992+
954993func pod (ctx context.Context , clientSet * kubernetes.Clientset , r * rabbitmqv1beta1.RabbitmqCluster , i int ) * corev1.Pod {
955994 podName := statefulSetPodName (r , i )
956995 var pod * corev1.Pod
0 commit comments