Skip to content

Commit 17550dc

Browse files
committed
Open stream ports when osr plugin is enabled
- the osr plugin rabbitmq_multi_dc_replication enables stream plugin as a dependendy and connect to stream port
1 parent 4b0f638 commit 17550dc

File tree

5 files changed

+45
-7
lines changed

5 files changed

+45
-7
lines changed

api/v1beta1/rabbitmqcluster_types.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,11 @@ func (cluster *RabbitmqCluster) AdditionalPluginEnabled(plugin Plugin) bool {
351351
return false
352352
}
353353

354+
// the OSR plugin `rabbitmq_multi_dc_replication` enables `rabbitmq_stream` as a dependency
355+
func (cluster *RabbitmqCluster) StreamNeeded() bool {
356+
return cluster.AdditionalPluginEnabled("rabbitmq_stream") || cluster.AdditionalPluginEnabled("rabbitmq_multi_dc_replication")
357+
}
358+
354359
// +kubebuilder:object:root=true
355360

356361
// RabbitmqClusterList contains a list of RabbitmqClusters.

internal/resource/service.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ func (builder *ServiceBuilder) generateServicePortsMapOnlyTLSListeners() map[str
149149
}
150150
}
151151

152-
if builder.Instance.AdditionalPluginEnabled("rabbitmq_stream") {
152+
if builder.Instance.StreamNeeded() {
153153
servicePortsMap["streams"] = corev1.ServicePort{
154154
Protocol: corev1.ProtocolTCP,
155155
Port: 5551,
@@ -232,7 +232,7 @@ func (builder *ServiceBuilder) generateServicePortsMap() map[string]corev1.Servi
232232
}
233233
}
234234

235-
if builder.Instance.AdditionalPluginEnabled("rabbitmq_stream") {
235+
if builder.Instance.StreamNeeded() {
236236
servicePortsMap["stream"] = corev1.ServicePort{
237237
Protocol: corev1.ProtocolTCP,
238238
Port: 5552,
@@ -270,7 +270,7 @@ func (builder *ServiceBuilder) generateServicePortsMap() map[string]corev1.Servi
270270
TargetPort: intstr.FromInt(8883),
271271
}
272272
}
273-
if builder.Instance.AdditionalPluginEnabled("rabbitmq_stream") {
273+
if builder.Instance.StreamNeeded() {
274274
servicePortsMap["streams"] = corev1.ServicePort{
275275
Protocol: corev1.ProtocolTCP,
276276
Port: 5551,

internal/resource/service_test.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,21 @@ var _ = Context("Services", func() {
169169
})
170170
})
171171

172+
When("rabbitmq_multi_dc_replication is enabled", func() {
173+
It("opens port for streams", func() {
174+
instance.Spec.Rabbitmq.AdditionalPlugins = []rabbitmqv1beta1.Plugin{"rabbitmq_multi_dc_replication"}
175+
Expect(serviceBuilder.Update(svc)).To(Succeed())
176+
Expect(svc.Spec.Ports).To(ContainElements([]corev1.ServicePort{
177+
{
178+
Name: "streams",
179+
Protocol: corev1.ProtocolTCP,
180+
Port: 5551,
181+
TargetPort: intstr.FromInt(5551),
182+
},
183+
}))
184+
})
185+
})
186+
172187
When("DisableNonTLSListeners is set to true", func() {
173188
It("only exposes tls ports in the service", func() {
174189
instance.Spec.TLS.DisableNonTLSListeners = true
@@ -234,6 +249,7 @@ var _ = Context("Services", func() {
234249
Entry("STOMP", "rabbitmq_stomp", "stomps", 61614),
235250
Entry("STOMP-over-WebSockets", "rabbitmq_web_stomp", "web-stomp-tls", 15673),
236251
Entry("Stream", "rabbitmq_stream", "streams", 5551),
252+
Entry("OSR", "rabbitmq_multi_dc_replication", "streams", 5551),
237253
)
238254
})
239255
})
@@ -466,6 +482,7 @@ var _ = Context("Services", func() {
466482
Entry("STOMP", "rabbitmq_stomp", "stomp", 61613),
467483
Entry("STOMP-over-WebSockets", "rabbitmq_web_stomp", "web-stomp", 15674),
468484
Entry("Stream", "rabbitmq_stream", "stream", 5552),
485+
Entry("OSR", "rabbitmq_multi_dc_replication", "stream", 5552),
469486
)
470487

471488
It("updates the service type from ClusterIP to NodePort", func() {

internal/resource/statefulset.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -762,7 +762,7 @@ func (builder *StatefulSetBuilder) updateContainerPorts() []corev1.ContainerPort
762762
})
763763
}
764764

765-
if builder.Instance.AdditionalPluginEnabled("rabbitmq_stream") {
765+
if builder.Instance.StreamNeeded() {
766766
ports = append(ports, corev1.ContainerPort{
767767
Name: "stream",
768768
ContainerPort: 5552,
@@ -799,7 +799,7 @@ func (builder *StatefulSetBuilder) updateContainerPorts() []corev1.ContainerPort
799799
})
800800
}
801801

802-
if builder.Instance.AdditionalPluginEnabled("rabbitmq_stream") {
802+
if builder.Instance.StreamNeeded() {
803803
ports = append(ports, corev1.ContainerPort{
804804
Name: "streams",
805805
ContainerPort: 5551,
@@ -860,7 +860,7 @@ func (builder *StatefulSetBuilder) updateContainerPortsOnlyTLSListeners() []core
860860
})
861861
}
862862

863-
if builder.Instance.AdditionalPluginEnabled("rabbitmq_stream") {
863+
if builder.Instance.StreamNeeded() {
864864
ports = append(ports, corev1.ContainerPort{
865865
Name: "streams",
866866
ContainerPort: 5551,

internal/resource/statefulset_test.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -599,6 +599,21 @@ var _ = Describe("StatefulSet", func() {
599599
}))
600600
})
601601

602+
It("opens tls port for stream when rabbitmq_multi_dc_replication is enabled", func() {
603+
instance.Spec.TLS.SecretName = "tls-secret"
604+
instance.Spec.Rabbitmq.AdditionalPlugins = []rabbitmqv1beta1.Plugin{"rabbitmq_multi_dc_replication"}
605+
Expect(stsBuilder.Update(statefulSet)).To(Succeed())
606+
607+
rabbitmqContainerSpec := extractContainer(statefulSet.Spec.Template.Spec.Containers, "rabbitmq")
608+
609+
Expect(rabbitmqContainerSpec.Ports).To(ContainElements([]corev1.ContainerPort{
610+
{
611+
Name: "streams",
612+
ContainerPort: 5551,
613+
},
614+
}))
615+
})
616+
602617
When("Mutual TLS (same secret) is enabled", func() {
603618
It("opens tls ports when rabbitmq_web_mqtt and rabbitmq_web_stomp are configured", func() {
604619
instance.Spec.TLS.SecretName = "tls-secret"
@@ -685,7 +700,7 @@ var _ = Describe("StatefulSet", func() {
685700
})
686701

687702
It("disables non tls ports for mqtt, stomp and stream if enabled", func() {
688-
instance.Spec.Rabbitmq.AdditionalPlugins = []rabbitmqv1beta1.Plugin{"rabbitmq_mqtt", "rabbitmq_stomp", "rabbitmq_stream"}
703+
instance.Spec.Rabbitmq.AdditionalPlugins = []rabbitmqv1beta1.Plugin{"rabbitmq_mqtt", "rabbitmq_stomp", "rabbitmq_stream", "rabbitmq_multi_dc_replication"}
689704
Expect(stsBuilder.Update(statefulSet)).To(Succeed())
690705

691706
rabbitmqContainerSpec := extractContainer(statefulSet.Spec.Template.Spec.Containers, "rabbitmq")
@@ -808,6 +823,7 @@ var _ = Describe("StatefulSet", func() {
808823
Entry("STOMP", "rabbitmq_stomp", "stomp", 61613),
809824
Entry("STOMP-over-WebSockets", "rabbitmq_web_stomp", "web-stomp", 15674),
810825
Entry("Stream", "rabbitmq_stream", "stream", 5552),
826+
Entry("OSR", "rabbitmq_multi_dc_replication", "stream", 5552),
811827
)
812828

813829
It("uses required Environment Variables", func() {

0 commit comments

Comments
 (0)