From 7f129952390d63bd1e6ac25615feb46de0cc12cc Mon Sep 17 00:00:00 2001 From: David Ansari Date: Mon, 10 May 2021 13:39:10 +0200 Subject: [PATCH 1/2] Use classic peer discovery instead of rabbit_peer_discovery_k8s plugin. For RabbitMQ clusters deployed by the RabbitMQ cluster operator, there is no need for dynamic service discovery since cluster members are known at deploy time. By using the classic config, we increase likelihood of nodes discovering peers. In contrast, K8S peer discovery only considers peers that are ready, which might take a long time resulting in more than one node to start a cluster. --- api/v1beta1/rabbitmqcluster_types.go | 2 +- .../bases/rabbitmq.com_rabbitmqclusters.yaml | 2 +- docs/api/rabbitmq.com.ref.asciidoc | 4 +- internal/resource/configmap.go | 14 +++-- internal/resource/configmap_test.go | 52 ++++++++++++++----- internal/resource/headless_service.go | 4 +- internal/resource/rabbitmq_plugins.go | 3 +- internal/resource/rabbitmq_plugins_test.go | 15 ++---- internal/resource/statefulset.go | 3 +- system_tests/system_test.go | 2 - 10 files changed, 62 insertions(+), 39 deletions(-) diff --git a/api/v1beta1/rabbitmqcluster_types.go b/api/v1beta1/rabbitmqcluster_types.go index e4c77b653..be162eb9c 100644 --- a/api/v1beta1/rabbitmqcluster_types.go +++ b/api/v1beta1/rabbitmqcluster_types.go @@ -280,7 +280,7 @@ type Plugin string // RabbitMQ-related configuration. type RabbitmqClusterConfigurationSpec struct { - // List of plugins to enable in addition to essential plugins: rabbitmq_management, rabbitmq_prometheus, and rabbitmq_peer_discovery_k8s. + // List of plugins to enable in addition to essential plugins: rabbitmq_management, and rabbitmq_prometheus. // +kubebuilder:validation:MaxItems:=100 AdditionalPlugins []Plugin `json:"additionalPlugins,omitempty"` // Modify to add to the rabbitmq.conf file in addition to default configurations set by the operator. diff --git a/config/crd/bases/rabbitmq.com_rabbitmqclusters.yaml b/config/crd/bases/rabbitmq.com_rabbitmqclusters.yaml index 9efb4b22b..9f3a0c380 100644 --- a/config/crd/bases/rabbitmq.com_rabbitmqclusters.yaml +++ b/config/crd/bases/rabbitmq.com_rabbitmqclusters.yaml @@ -3496,7 +3496,7 @@ spec: maxLength: 2000 type: string additionalPlugins: - description: 'List of plugins to enable in addition to essential plugins: rabbitmq_management, rabbitmq_prometheus, and rabbitmq_peer_discovery_k8s.' + description: 'List of plugins to enable in addition to essential plugins: rabbitmq_management, and rabbitmq_prometheus.' items: description: A Plugin to enable on the RabbitmqCluster. maxLength: 100 diff --git a/docs/api/rabbitmq.com.ref.asciidoc b/docs/api/rabbitmq.com.ref.asciidoc index 5f8d4f64d..31d25f13b 100644 --- a/docs/api/rabbitmq.com.ref.asciidoc +++ b/docs/api/rabbitmq.com.ref.asciidoc @@ -148,7 +148,7 @@ RabbitMQ-related configuration. [cols="25a,75a", options="header"] |=== | Field | Description -| *`additionalPlugins`* __xref:{anchor_prefix}-github-com-rabbitmq-cluster-operator-api-v1beta1-plugin[$$Plugin$$] array__ | List of plugins to enable in addition to essential plugins: rabbitmq_management, rabbitmq_prometheus, and rabbitmq_peer_discovery_k8s. +| *`additionalPlugins`* __xref:{anchor_prefix}-github-com-rabbitmq-cluster-operator-api-v1beta1-plugin[$$Plugin$$] array__ | List of plugins to enable in addition to essential plugins: rabbitmq_management, and rabbitmq_prometheus. | *`additionalConfig`* __string__ | Modify to add to the rabbitmq.conf file in addition to default configurations set by the operator. Modifying this property on an existing RabbitmqCluster will trigger a StatefulSet rolling restart and will cause rabbitmq downtime. For more information on this config, see https://www.rabbitmq.com/configure.html#config-file | *`advancedConfig`* __string__ | Specify any rabbitmq advanced.config configurations to apply to the cluster. For more information on advanced config, see https://www.rabbitmq.com/configure.html#advanced-config-file | *`envConfig`* __string__ | Modify to add to the rabbitmq-env.conf file. Modifying this property on an existing RabbitmqCluster will trigger a StatefulSet rolling restart and will cause rabbitmq downtime. For more information on env config, see https://www.rabbitmq.com/man/rabbitmq-env.conf.5.html @@ -298,7 +298,7 @@ Spec is the desired state of the RabbitmqCluster Custom Resource. | Field | Description | *`replicas`* __integer__ | Replicas is the number of nodes in the RabbitMQ cluster. Each node is deployed as a Replica in a StatefulSet. Only 1, 3, 5 replicas clusters are tested. This value should be an odd number to ensure the resultant cluster can establish exactly one quorum of nodes in the event of a fragmenting network partition. | *`image`* __string__ | Image is the name of the RabbitMQ docker image to use for RabbitMQ nodes in the RabbitmqCluster. Must be provided together with ImagePullSecrets in order to use an image in a private registry. -| *`imagePullSecrets`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.20/#localobjectreference-v1-core[$$LocalObjectReference$$] array__ | List of Secret resource containing access credentials to the registry for the RabbitMQ image. Required if the docker registry is private. +| *`imagePullSecrets`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.20/#localobjectreference-v1-core[$$LocalObjectReference$$]__ | List of Secret resource containing access credentials to the registry for the RabbitMQ image. Required if the docker registry is private. | *`service`* __xref:{anchor_prefix}-github-com-rabbitmq-cluster-operator-api-v1beta1-rabbitmqclusterservicespec[$$RabbitmqClusterServiceSpec$$]__ | The desired state of the Kubernetes Service to create for the cluster. | *`persistence`* __xref:{anchor_prefix}-github-com-rabbitmq-cluster-operator-api-v1beta1-rabbitmqclusterpersistencespec[$$RabbitmqClusterPersistenceSpec$$]__ | The desired persistent storage configuration for each Pod in the cluster. | *`resources`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.20/#resourcerequirements-v1-core[$$ResourceRequirements$$]__ | The desired compute resource requirements of Pods in the cluster. diff --git a/internal/resource/configmap.go b/internal/resource/configmap.go index 00a2cebb8..7ba9ffe75 100644 --- a/internal/resource/configmap.go +++ b/internal/resource/configmap.go @@ -22,17 +22,16 @@ import ( "github.com/rabbitmq/cluster-operator/internal/metadata" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/pointer" ) const ( ServerConfigMapName = "server-conf" defaultRabbitmqConf = ` -cluster_formation.peer_discovery_backend = rabbit_peer_discovery_k8s -cluster_formation.k8s.host = kubernetes.default -cluster_formation.k8s.address_type = hostname cluster_partition_handling = pause_minority queue_master_locator = min-masters disk_free_limit.absolute = 2GB +cluster_formation.peer_discovery_backend = classic_config cluster_formation.randomized_startup_delay_range.min = 0 cluster_formation.randomized_startup_delay_range.max = 60` @@ -87,6 +86,15 @@ func (builder *ServerConfigMapBuilder) Update(object client.Object) error { } defaultSection := operatorConfiguration.Section("") + var i int32 + for i = 0; i < pointer.Int32PtrDerefOr(builder.Instance.Spec.Replicas, 1); i++ { + if _, err := defaultSection.NewKey( + fmt.Sprintf("cluster_formation.classic_config.nodes.%d", i+1), + fmt.Sprintf("rabbit@%s-%d.%s.%s", builder.Instance.ChildResourceName(stsSuffix), i, builder.Instance.ChildResourceName(headlessServiceSuffix), builder.Instance.Namespace)); err != nil { + return err + } + } + if _, err := defaultSection.NewKey("cluster_name", builder.Instance.Name); err != nil { return err } diff --git a/internal/resource/configmap_test.go b/internal/resource/configmap_test.go index aae2a6fda..0b79385ab 100644 --- a/internal/resource/configmap_test.go +++ b/internal/resource/configmap_test.go @@ -23,21 +23,9 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" defaultscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/utils/pointer" ) -func defaultRabbitmqConf(instanceName string) string { - return iniString(` -cluster_formation.peer_discovery_backend = rabbit_peer_discovery_k8s -cluster_formation.k8s.host = kubernetes.default -cluster_formation.k8s.address_type = hostname -cluster_partition_handling = pause_minority -queue_master_locator = min-masters -disk_free_limit.absolute = 2GB -cluster_formation.randomized_startup_delay_range.min = 0 -cluster_formation.randomized_startup_delay_range.max = 60 -cluster_name = ` + instanceName) -} - var _ = Describe("GenerateServerConfigMap", func() { var ( instance rabbitmqv1beta1.RabbitmqCluster @@ -139,7 +127,17 @@ var _ = Describe("GenerateServerConfigMap", func() { It("returns the default rabbitmq configuration", func() { builder.Instance.Spec.Rabbitmq.AdditionalConfig = "" - expectedConfiguration := defaultRabbitmqConf(builder.Instance.Name) + name := builder.Instance.Name + expectedConfiguration := fmt.Sprintf( + `cluster_partition_handling = pause_minority +queue_master_locator = min-masters +disk_free_limit.absolute = 2GB +cluster_formation.peer_discovery_backend = classic_config +cluster_formation.randomized_startup_delay_range.min = 0 +cluster_formation.randomized_startup_delay_range.max = 60 +cluster_formation.classic_config.nodes.1 = rabbit@%s-server-0.%s-nodes.%s +cluster_name = %s +`, name, name, builder.Instance.Namespace, name) Expect(configMapBuilder.Update(configMap)).To(Succeed()) Expect(configMap.Data).To(HaveKeyWithValue("operatorDefaults.conf", expectedConfiguration)) @@ -543,6 +541,32 @@ CONSOLE_LOG=new` Expect(configMapBuilder.Update(configMap)).To(Succeed()) Expect(configMap.Annotations).To(BeEmpty()) }) + + When("multiple replicas", func() { + It("adds nodes for peer discovery", func() { + instance.Spec.Replicas = pointer.Int32Ptr(3) + + Expect(configMapBuilder.Update(configMap)).To(Succeed()) + n := instance.Name + ns := instance.Namespace + Expect(configMap.Data).To(HaveKeyWithValue("operatorDefaults.conf", ContainSubstring( + `cluster_formation.classic_config.nodes.1 = rabbit@%s-server-0.%s-nodes.%s +cluster_formation.classic_config.nodes.2 = rabbit@%s-server-1.%s-nodes.%s +cluster_formation.classic_config.nodes.3 = rabbit@%s-server-2.%s-nodes.%s`, + n, n, ns, n, n, ns, n, n, ns, + ))) + }) + }) + When("no replicas", func() { + It("does not add nodes for peer discovery", func() { + instance.Spec.Replicas = pointer.Int32Ptr(0) + + Expect(configMapBuilder.Update(configMap)).To(Succeed()) + Expect(configMap.Data).To(HaveKeyWithValue("operatorDefaults.conf", + Not(ContainSubstring("cluster_formation.classic_config.nodes")), + )) + }) + }) }) Context("UpdateMayRequireStsRecreate", func() { diff --git a/internal/resource/headless_service.go b/internal/resource/headless_service.go index 1cbdc1abc..9e8832808 100644 --- a/internal/resource/headless_service.go +++ b/internal/resource/headless_service.go @@ -21,9 +21,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" ) -const ( - headlessServiceSuffix = "nodes" -) +const headlessServiceSuffix = "nodes" type HeadlessServiceBuilder struct { *RabbitmqResourceBuilder diff --git a/internal/resource/rabbitmq_plugins.go b/internal/resource/rabbitmq_plugins.go index 2201f9cdb..4e3028ccf 100644 --- a/internal/resource/rabbitmq_plugins.go +++ b/internal/resource/rabbitmq_plugins.go @@ -15,8 +15,7 @@ import ( ) var requiredPlugins = []string{ - "rabbitmq_peer_discovery_k8s", // required for clustering - "rabbitmq_prometheus", // enforce prometheus metrics + "rabbitmq_prometheus", // enforce prometheus metrics "rabbitmq_management", } diff --git a/internal/resource/rabbitmq_plugins_test.go b/internal/resource/rabbitmq_plugins_test.go index 5277b9bf5..5f4fb88cd 100644 --- a/internal/resource/rabbitmq_plugins_test.go +++ b/internal/resource/rabbitmq_plugins_test.go @@ -28,7 +28,7 @@ var _ = Describe("RabbitMQPlugins", func() { When("AdditionalPlugins is empty", func() { It("returns list of required plugins", func() { plugins := NewRabbitmqPlugins(nil) - Expect(plugins.DesiredPlugins()).To(ConsistOf([]string{"rabbitmq_peer_discovery_k8s", "rabbitmq_prometheus", "rabbitmq_management"})) + Expect(plugins.DesiredPlugins()).To(ConsistOf([]string{"rabbitmq_prometheus", "rabbitmq_management"})) }) }) @@ -37,7 +37,7 @@ var _ = Describe("RabbitMQPlugins", func() { morePlugins := []rabbitmqv1beta1.Plugin{"rabbitmq_shovel", "my_great_plugin"} plugins := NewRabbitmqPlugins(morePlugins) - Expect(plugins.DesiredPlugins()).To(ConsistOf([]string{"rabbitmq_peer_discovery_k8s", + Expect(plugins.DesiredPlugins()).To(ConsistOf([]string{ "rabbitmq_prometheus", "rabbitmq_management", "my_great_plugin", @@ -51,7 +51,7 @@ var _ = Describe("RabbitMQPlugins", func() { morePlugins := []rabbitmqv1beta1.Plugin{"rabbitmq_management", "rabbitmq_shovel", "my_great_plugin", "rabbitmq_shovel"} plugins := NewRabbitmqPlugins(morePlugins) - Expect(plugins.DesiredPlugins()).To(ConsistOf([]string{"rabbitmq_peer_discovery_k8s", + Expect(plugins.DesiredPlugins()).To(ConsistOf([]string{ "rabbitmq_prometheus", "rabbitmq_management", "my_great_plugin", @@ -117,10 +117,7 @@ var _ = Describe("RabbitMQPlugins", func() { }) It("adds list of default plugins", func() { - expectedEnabledPlugins := "[" + - "rabbitmq_peer_discovery_k8s," + - "rabbitmq_prometheus," + - "rabbitmq_management]." + expectedEnabledPlugins := "[rabbitmq_prometheus,rabbitmq_management]." obj, err := configMapBuilder.Build() Expect(err).NotTo(HaveOccurred()) @@ -184,7 +181,6 @@ var _ = Describe("RabbitMQPlugins", func() { builder.Instance.Spec.Rabbitmq.AdditionalPlugins = []rabbitmqv1beta1.Plugin{"rabbitmq_management", "rabbitmq_management", "rabbitmq_shovel", "my_great_plugin"} expectedEnabledPlugins := "[" + - "rabbitmq_peer_discovery_k8s," + "rabbitmq_prometheus," + "rabbitmq_management," + "rabbitmq_shovel," + @@ -199,7 +195,7 @@ var _ = Describe("RabbitMQPlugins", func() { When("previous data is present", func() { BeforeEach(func() { configMap.Data = map[string]string{ - "enabled_plugins": "[rabbitmq_peer_discovery_k8s,rabbitmq_shovel]", + "enabled_plugins": "[rabbitmq_prometheus,rabbitmq_shovel]", } }) @@ -207,7 +203,6 @@ var _ = Describe("RabbitMQPlugins", func() { builder.Instance.Spec.Rabbitmq.AdditionalPlugins = []rabbitmqv1beta1.Plugin{"rabbitmq_management", "rabbitmq_management", "rabbitmq_shovel", "my_great_plugin"} expectedEnabledPlugins := "[" + - "rabbitmq_peer_discovery_k8s," + "rabbitmq_prometheus," + "rabbitmq_management," + "rabbitmq_shovel," + diff --git a/internal/resource/statefulset.go b/internal/resource/statefulset.go index 5b348cfd7..3f2495e9c 100644 --- a/internal/resource/statefulset.go +++ b/internal/resource/statefulset.go @@ -31,6 +31,7 @@ import ( ) const ( + stsSuffix string = "server" initContainerCPU string = "100m" initContainerMemory string = "500Mi" defaultPVCName string = "persistence" @@ -54,7 +55,7 @@ func (builder *StatefulSetBuilder) Build() (client.Object, error) { sts := &appsv1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ - Name: builder.Instance.ChildResourceName("server"), + Name: builder.Instance.ChildResourceName(stsSuffix), Namespace: builder.Instance.Namespace, }, Spec: appsv1.StatefulSetSpec{ diff --git a/system_tests/system_test.go b/system_tests/system_test.go index 89f7cb64c..14c3b630a 100644 --- a/system_tests/system_test.go +++ b/system_tests/system_test.go @@ -78,7 +78,6 @@ var _ = Describe("Operator", func() { "rabbitmq-plugins", "is_enabled", "rabbitmq_management", - "rabbitmq_peer_discovery_k8s", "rabbitmq_prometheus", ) Expect(err).NotTo(HaveOccurred()) @@ -174,7 +173,6 @@ var _ = Describe("Operator", func() { "rabbitmq-plugins", "is_enabled", "rabbitmq_management", - "rabbitmq_peer_discovery_k8s", "rabbitmq_prometheus", "rabbitmq_top", ) From ff33f4126d873e54423c1a5255dd3f36498cbdf3 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Tue, 11 May 2021 09:18:31 +0200 Subject: [PATCH 2/2] Do not restart StatefulSet on scale out If the RabbitMQ cluster is under heavy load and is being scaled out (i.e. more RabbitMQ nodes added to the RabbitMQ cluster), existing nodes shouldn't be restarted. Before this commit, exising nodes were restarted because the ConfigMap gets updated since new peers get included for peer discovery. However, existing nodes do not need this new peer discovery configuration since the cluster is already formed. --- .../reconcile_rabbitmq_configurations.go | 4 +- .../reconcile_rabbitmq_configurations_test.go | 124 +++++++++++------- internal/resource/configmap.go | 49 ++++++- internal/resource/configmap_test.go | 32 ++++- 4 files changed, 156 insertions(+), 53 deletions(-) diff --git a/controllers/reconcile_rabbitmq_configurations.go b/controllers/reconcile_rabbitmq_configurations.go index 717dea470..734fc0f12 100644 --- a/controllers/reconcile_rabbitmq_configurations.go +++ b/controllers/reconcile_rabbitmq_configurations.go @@ -34,7 +34,7 @@ func (r *RabbitmqClusterReconciler) annotateIfNeeded(ctx context.Context, logger annotationKey string ) - switch builder.(type) { + switch b := builder.(type) { case *resource.RabbitmqPluginsConfigMapBuilder: if operationResult != controllerutil.OperationResultUpdated { @@ -45,7 +45,7 @@ func (r *RabbitmqClusterReconciler) annotateIfNeeded(ctx context.Context, logger annotationKey = pluginsUpdateAnnotation case *resource.ServerConfigMapBuilder: - if operationResult != controllerutil.OperationResultUpdated { + if operationResult != controllerutil.OperationResultUpdated || !b.UpdateRequiresStsRestart { return nil } obj = &corev1.ConfigMap{} diff --git a/controllers/reconcile_rabbitmq_configurations_test.go b/controllers/reconcile_rabbitmq_configurations_test.go index 96f2bd3cf..c9d639c51 100644 --- a/controllers/reconcile_rabbitmq_configurations_test.go +++ b/controllers/reconcile_rabbitmq_configurations_test.go @@ -1,14 +1,16 @@ package controllers_test import ( - rabbitmqv1beta1 "github.com/rabbitmq/cluster-operator/api/v1beta1" "time" + rabbitmqv1beta1 "github.com/rabbitmq/cluster-operator/api/v1beta1" + . "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo/extensions/table" . "github.com/onsi/gomega" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/pointer" ) var _ = Describe("Reconcile rabbitmq Configurations", func() { @@ -16,66 +18,96 @@ var _ = Describe("Reconcile rabbitmq Configurations", func() { cluster *rabbitmqv1beta1.RabbitmqCluster defaultNamespace = "default" ) - DescribeTable("Server configurations updates", - func(testCase string) { - // create rabbitmqcluster + + AfterEach(func() { + Expect(client.Delete(ctx, cluster)).To(Succeed()) + waitForClusterDeletion(ctx, cluster, client) + }) + + DescribeTable("Server configurations updates", func(testCase string) { + cluster = &rabbitmqv1beta1.RabbitmqCluster{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: defaultNamespace, + Name: "rabbitmq-" + testCase, + }, + } + Expect(client.Create(ctx, cluster)).To(Succeed()) + waitForClusterCreation(ctx, cluster, client) + + // ensure that cfm and statefulSet does not have annotations set when cluster just got created + cfm := configMap(ctx, cluster, "server-conf") + Expect(cfm.Annotations).ShouldNot(HaveKey("rabbitmq.com/serverConfUpdatedAt")) + sts := statefulSet(ctx, cluster) + Expect(sts.Annotations).ShouldNot(HaveKey("rabbitmq.com/lastRestartAt")) + + // update rabbitmq server configurations + Expect(updateWithRetry(cluster, func(r *rabbitmqv1beta1.RabbitmqCluster) { + if testCase == "additional-config" { + r.Spec.Rabbitmq.AdditionalConfig = "test_config=0" + } + if testCase == "advanced-config" { + r.Spec.Rabbitmq.AdvancedConfig = "sample-advanced-config." + } + if testCase == "env-config" { + r.Spec.Rabbitmq.EnvConfig = "some-env-variable" + } + })).To(Succeed()) + + By("annotating the server-conf ConfigMap") + // ensure annotations from the server-conf ConfigMap + var annotations map[string]string + Eventually(func() map[string]string { + cfm := configMap(ctx, cluster, "server-conf") + annotations = cfm.Annotations + return annotations + }, 5).Should(HaveKey("rabbitmq.com/serverConfUpdatedAt")) + _, err := time.Parse(time.RFC3339, annotations["rabbitmq.com/serverConfUpdatedAt"]) + Expect(err).NotTo(HaveOccurred(), "Annotation rabbitmq.com/serverConfUpdatedAt was not a valid RFC3339 timestamp") + + By("annotating the sts podTemplate") + // ensure statefulSet annotations + Eventually(func() map[string]string { + sts := statefulSet(ctx, cluster) + annotations = sts.Spec.Template.Annotations + return annotations + }, 5).Should(HaveKey("rabbitmq.com/lastRestartAt")) + _, err = time.Parse(time.RFC3339, annotations["rabbitmq.com/lastRestartAt"]) + Expect(err).NotTo(HaveOccurred(), "Annotation rabbitmq.com/lastRestartAt was not a valid RFC3339 timestamp") + }, + + Entry("spec.rabbitmq.additionalConfig is updated", "additional-config"), + Entry("spec.rabbitmq.advancedConfig is updated", "advanced-config"), + Entry("spec.rabbitmq.envConfig is updated", "env-config"), + ) + + Context("scale out", func() { + It("does not restart StatefulSet", func() { cluster = &rabbitmqv1beta1.RabbitmqCluster{ ObjectMeta: metav1.ObjectMeta{ - Name: "rabbitmq-" + testCase, Namespace: defaultNamespace, + Name: "rabbitmq-scale-out", }, } Expect(client.Create(ctx, cluster)).To(Succeed()) waitForClusterCreation(ctx, cluster, client) - // ensure that cfm and statefulSet does not have annotations set when configurations haven't changed cfm := configMap(ctx, cluster, "server-conf") Expect(cfm.Annotations).ShouldNot(HaveKey("rabbitmq.com/serverConfUpdatedAt")) - sts := statefulSet(ctx, cluster) Expect(sts.Annotations).ShouldNot(HaveKey("rabbitmq.com/lastRestartAt")) - // update rabbitmq server configurations Expect(updateWithRetry(cluster, func(r *rabbitmqv1beta1.RabbitmqCluster) { - if testCase == "additional-config" { - r.Spec.Rabbitmq.AdditionalConfig = "test_config=0" - } - if testCase == "advanced-config" { - r.Spec.Rabbitmq.AdvancedConfig = "sample-advanced-config." - } - if testCase == "env-config" { - r.Spec.Rabbitmq.EnvConfig = "some-env-variable" - } + r.Spec.Replicas = pointer.Int32Ptr(5) })).To(Succeed()) - By("annotating the server-conf ConfigMap") - // ensure annotations from the server-conf ConfigMap - var annotations map[string]string - Eventually(func() map[string]string { - cfm := configMap(ctx, cluster, "server-conf") - annotations = cfm.Annotations - return annotations - }, 5).Should(HaveKey("rabbitmq.com/serverConfUpdatedAt")) - _, err := time.Parse(time.RFC3339, annotations["rabbitmq.com/serverConfUpdatedAt"]) - Expect(err).NotTo(HaveOccurred(), "Annotation rabbitmq.com/serverConfUpdatedAt was not a valid RFC3339 timestamp") - - By("annotating the sts podTemplate") - // ensure statefulSet annotations - Eventually(func() map[string]string { - sts := statefulSet(ctx, cluster) - annotations = sts.Spec.Template.Annotations - return annotations - }, 5).Should(HaveKey("rabbitmq.com/lastRestartAt")) - _, err = time.Parse(time.RFC3339, annotations["rabbitmq.com/lastRestartAt"]) - Expect(err).NotTo(HaveOccurred(), "Annotation rabbitmq.com/lastRestartAt was not a valid RFC3339 timestamp") - - // delete rmq cluster - Expect(client.Delete(ctx, cluster)).To(Succeed()) - waitForClusterDeletion(ctx, cluster, client) - }, + Consistently(func() map[string]string { + return configMap(ctx, cluster, "server-conf").Annotations + }, 3, 0.3).ShouldNot(HaveKey("rabbitmq.com/serverConfUpdatedAt")) - Entry("spec.rabbitmq.additionalConfig is updated", "additional-config"), - Entry("spec.rabbitmq.advancedConfig is updated", "advanced-config"), - Entry("spec.rabbitmq.envConfig is updated", "env-config"), - ) + Consistently(func() map[string]string { + sts := statefulSet(ctx, cluster) + return sts.Spec.Template.Annotations + }, 3, 0.3).ShouldNot(HaveKey("rabbitmq.com/lastRestartAt")) + }) + }) }) diff --git a/internal/resource/configmap.go b/internal/resource/configmap.go index 7ba9ffe75..fdae3820d 100644 --- a/internal/resource/configmap.go +++ b/internal/resource/configmap.go @@ -14,19 +14,20 @@ import ( "strings" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "gopkg.in/ini.v1" "github.com/rabbitmq/cluster-operator/internal/metadata" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/utils/pointer" ) const ( ServerConfigMapName = "server-conf" + operatorDefaults = "operatorDefaults.conf" defaultRabbitmqConf = ` cluster_partition_handling = pause_minority queue_master_locator = min-masters @@ -55,10 +56,12 @@ prometheus.ssl.port = 15691 type ServerConfigMapBuilder struct { *RabbitmqResourceBuilder + // Set to true if the config change requires RabbitMQ nodes to be restarted. + UpdateRequiresStsRestart bool } func (builder *RabbitmqResourceBuilder) ServerConfigMap() *ServerConfigMapBuilder { - return &ServerConfigMapBuilder{builder} + return &ServerConfigMapBuilder{builder, true} } func (builder *ServerConfigMapBuilder) Build() (client.Object, error) { @@ -78,6 +81,7 @@ func (builder *ServerConfigMapBuilder) UpdateMayRequireStsRecreate() bool { func (builder *ServerConfigMapBuilder) Update(object client.Object) error { configMap := object.(*corev1.ConfigMap) + previousConfigMap := configMap.DeepCopy() ini.PrettySection = false // Remove trailing new line because rabbitmq.conf has only a default section. operatorConfiguration, err := ini.Load([]byte(defaultRabbitmqConf)) @@ -215,7 +219,7 @@ func (builder *ServerConfigMapBuilder) Update(object client.Object) error { configMap.Data = make(map[string]string) } - configMap.Data["operatorDefaults.conf"] = rmqConfBuffer.String() + configMap.Data[operatorDefaults] = rmqConfBuffer.String() rmqConfBuffer.Reset() @@ -237,6 +241,45 @@ func (builder *ServerConfigMapBuilder) Update(object client.Object) error { return fmt.Errorf("failed setting controller reference: %v", err) } + updatedConfigMap := configMap.DeepCopy() + if err := removeConfigNotRequiringNodeRestart(previousConfigMap); err != nil { + return err + } + if err := removeConfigNotRequiringNodeRestart(updatedConfigMap); err != nil { + return err + } + if equality.Semantic.DeepEqual(previousConfigMap, updatedConfigMap) { + builder.UpdateRequiresStsRestart = false + } + + return nil +} + +// removeConfigNotRequiringNodeRestart removes configuration data that does not require a restart of RabbitMQ nodes. +// As of now, this data consists of peer discovery nodes: +// In the case of scale out (i.e. adding more RabbitMQ nodes to the RabbitMQ cluster), new RabbitMQ nodes will +// be added to the peer discovery configuration. New nodes will receive the configuration containing all peers. +// However, exisiting nodes do not need to receive the new peer discovery configuration since the cluster is already formed. +func removeConfigNotRequiringNodeRestart(configMap *corev1.ConfigMap) error { + operatorConf := configMap.Data[operatorDefaults] + if operatorConf == "" { + return nil + } + conf, err := ini.Load([]byte(operatorConf)) + if err != nil { + return err + } + defaultSection := conf.Section("") + for _, key := range defaultSection.KeyStrings() { + if strings.HasPrefix(key, "cluster_formation.classic_config.nodes.") { + defaultSection.DeleteKey(key) + } + } + var b strings.Builder + if _, err := conf.WriteTo(&b); err != nil { + return err + } + configMap.Data[operatorDefaults] = b.String() return nil } diff --git a/internal/resource/configmap_test.go b/internal/resource/configmap_test.go index 0b79385ab..a1265e296 100644 --- a/internal/resource/configmap_test.go +++ b/internal/resource/configmap_test.go @@ -542,7 +542,7 @@ CONSOLE_LOG=new` Expect(configMap.Annotations).To(BeEmpty()) }) - When("multiple replicas", func() { + Context("multiple replicas", func() { It("adds nodes for peer discovery", func() { instance.Spec.Replicas = pointer.Int32Ptr(3) @@ -557,7 +557,7 @@ cluster_formation.classic_config.nodes.3 = rabbit@%s-server-2.%s-nod ))) }) }) - When("no replicas", func() { + Context("no replicas", func() { It("does not add nodes for peer discovery", func() { instance.Spec.Replicas = pointer.Int32Ptr(0) @@ -567,6 +567,34 @@ cluster_formation.classic_config.nodes.3 = rabbit@%s-server-2.%s-nod )) }) }) + + Describe("UpdateRequiresStsRestart", func() { + BeforeEach(func() { + Expect(configMapBuilder.Update(configMap)).To(Succeed()) + Expect(configMapBuilder.UpdateRequiresStsRestart).To(BeTrue()) + }) + When("the config does not change", func() { + It("does not restart StatefulSet", func() { + Expect(configMapBuilder.Update(configMap)).To(Succeed()) + Expect(configMapBuilder.UpdateRequiresStsRestart).To(BeFalse()) + }) + }) + When("the only config change is cluster formation nodes", func() { + It("does not require the StatefulSet to be restarted", func() { + instance.Spec.Replicas = pointer.Int32Ptr(3) + Expect(configMapBuilder.Update(configMap)).To(Succeed()) + Expect(configMapBuilder.UpdateRequiresStsRestart).To(BeFalse()) + }) + }) + When("config change includes more than cluster formation nodes", func() { + It("requires the StatefulSet to be restarted", func() { + instance.Spec.Replicas = pointer.Int32Ptr(3) + instance.Spec.Rabbitmq.AdditionalConfig = "foo = bar" + Expect(configMapBuilder.Update(configMap)).To(Succeed()) + Expect(configMapBuilder.UpdateRequiresStsRestart).To(BeTrue()) + }) + }) + }) }) Context("UpdateMayRequireStsRecreate", func() {