Skip to content

Commit ff33f41

Browse files
committed
Do not restart StatefulSet on scale out
If the RabbitMQ cluster is under heavy load and is being scaled out (i.e. more RabbitMQ nodes added to the RabbitMQ cluster), existing nodes shouldn't be restarted. Before this commit, exising nodes were restarted because the ConfigMap gets updated since new peers get included for peer discovery. However, existing nodes do not need this new peer discovery configuration since the cluster is already formed.
1 parent 7f12995 commit ff33f41

File tree

4 files changed

+156
-53
lines changed

4 files changed

+156
-53
lines changed

controllers/reconcile_rabbitmq_configurations.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func (r *RabbitmqClusterReconciler) annotateIfNeeded(ctx context.Context, logger
3434
annotationKey string
3535
)
3636

37-
switch builder.(type) {
37+
switch b := builder.(type) {
3838

3939
case *resource.RabbitmqPluginsConfigMapBuilder:
4040
if operationResult != controllerutil.OperationResultUpdated {
@@ -45,7 +45,7 @@ func (r *RabbitmqClusterReconciler) annotateIfNeeded(ctx context.Context, logger
4545
annotationKey = pluginsUpdateAnnotation
4646

4747
case *resource.ServerConfigMapBuilder:
48-
if operationResult != controllerutil.OperationResultUpdated {
48+
if operationResult != controllerutil.OperationResultUpdated || !b.UpdateRequiresStsRestart {
4949
return nil
5050
}
5151
obj = &corev1.ConfigMap{}
Lines changed: 78 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,81 +1,113 @@
11
package controllers_test
22

33
import (
4-
rabbitmqv1beta1 "github.com/rabbitmq/cluster-operator/api/v1beta1"
54
"time"
65

6+
rabbitmqv1beta1 "github.com/rabbitmq/cluster-operator/api/v1beta1"
7+
78
. "github.com/onsi/ginkgo"
89
. "github.com/onsi/ginkgo/extensions/table"
910
. "github.com/onsi/gomega"
1011

1112
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
13+
"k8s.io/utils/pointer"
1214
)
1315

1416
var _ = Describe("Reconcile rabbitmq Configurations", func() {
1517
var (
1618
cluster *rabbitmqv1beta1.RabbitmqCluster
1719
defaultNamespace = "default"
1820
)
19-
DescribeTable("Server configurations updates",
20-
func(testCase string) {
21-
// create rabbitmqcluster
21+
22+
AfterEach(func() {
23+
Expect(client.Delete(ctx, cluster)).To(Succeed())
24+
waitForClusterDeletion(ctx, cluster, client)
25+
})
26+
27+
DescribeTable("Server configurations updates", func(testCase string) {
28+
cluster = &rabbitmqv1beta1.RabbitmqCluster{
29+
ObjectMeta: metav1.ObjectMeta{
30+
Namespace: defaultNamespace,
31+
Name: "rabbitmq-" + testCase,
32+
},
33+
}
34+
Expect(client.Create(ctx, cluster)).To(Succeed())
35+
waitForClusterCreation(ctx, cluster, client)
36+
37+
// ensure that cfm and statefulSet does not have annotations set when cluster just got created
38+
cfm := configMap(ctx, cluster, "server-conf")
39+
Expect(cfm.Annotations).ShouldNot(HaveKey("rabbitmq.com/serverConfUpdatedAt"))
40+
sts := statefulSet(ctx, cluster)
41+
Expect(sts.Annotations).ShouldNot(HaveKey("rabbitmq.com/lastRestartAt"))
42+
43+
// update rabbitmq server configurations
44+
Expect(updateWithRetry(cluster, func(r *rabbitmqv1beta1.RabbitmqCluster) {
45+
if testCase == "additional-config" {
46+
r.Spec.Rabbitmq.AdditionalConfig = "test_config=0"
47+
}
48+
if testCase == "advanced-config" {
49+
r.Spec.Rabbitmq.AdvancedConfig = "sample-advanced-config."
50+
}
51+
if testCase == "env-config" {
52+
r.Spec.Rabbitmq.EnvConfig = "some-env-variable"
53+
}
54+
})).To(Succeed())
55+
56+
By("annotating the server-conf ConfigMap")
57+
// ensure annotations from the server-conf ConfigMap
58+
var annotations map[string]string
59+
Eventually(func() map[string]string {
60+
cfm := configMap(ctx, cluster, "server-conf")
61+
annotations = cfm.Annotations
62+
return annotations
63+
}, 5).Should(HaveKey("rabbitmq.com/serverConfUpdatedAt"))
64+
_, err := time.Parse(time.RFC3339, annotations["rabbitmq.com/serverConfUpdatedAt"])
65+
Expect(err).NotTo(HaveOccurred(), "Annotation rabbitmq.com/serverConfUpdatedAt was not a valid RFC3339 timestamp")
66+
67+
By("annotating the sts podTemplate")
68+
// ensure statefulSet annotations
69+
Eventually(func() map[string]string {
70+
sts := statefulSet(ctx, cluster)
71+
annotations = sts.Spec.Template.Annotations
72+
return annotations
73+
}, 5).Should(HaveKey("rabbitmq.com/lastRestartAt"))
74+
_, err = time.Parse(time.RFC3339, annotations["rabbitmq.com/lastRestartAt"])
75+
Expect(err).NotTo(HaveOccurred(), "Annotation rabbitmq.com/lastRestartAt was not a valid RFC3339 timestamp")
76+
},
77+
78+
Entry("spec.rabbitmq.additionalConfig is updated", "additional-config"),
79+
Entry("spec.rabbitmq.advancedConfig is updated", "advanced-config"),
80+
Entry("spec.rabbitmq.envConfig is updated", "env-config"),
81+
)
82+
83+
Context("scale out", func() {
84+
It("does not restart StatefulSet", func() {
2285
cluster = &rabbitmqv1beta1.RabbitmqCluster{
2386
ObjectMeta: metav1.ObjectMeta{
24-
Name: "rabbitmq-" + testCase,
2587
Namespace: defaultNamespace,
88+
Name: "rabbitmq-scale-out",
2689
},
2790
}
2891
Expect(client.Create(ctx, cluster)).To(Succeed())
2992
waitForClusterCreation(ctx, cluster, client)
3093

31-
// ensure that cfm and statefulSet does not have annotations set when configurations haven't changed
3294
cfm := configMap(ctx, cluster, "server-conf")
3395
Expect(cfm.Annotations).ShouldNot(HaveKey("rabbitmq.com/serverConfUpdatedAt"))
34-
3596
sts := statefulSet(ctx, cluster)
3697
Expect(sts.Annotations).ShouldNot(HaveKey("rabbitmq.com/lastRestartAt"))
3798

38-
// update rabbitmq server configurations
3999
Expect(updateWithRetry(cluster, func(r *rabbitmqv1beta1.RabbitmqCluster) {
40-
if testCase == "additional-config" {
41-
r.Spec.Rabbitmq.AdditionalConfig = "test_config=0"
42-
}
43-
if testCase == "advanced-config" {
44-
r.Spec.Rabbitmq.AdvancedConfig = "sample-advanced-config."
45-
}
46-
if testCase == "env-config" {
47-
r.Spec.Rabbitmq.EnvConfig = "some-env-variable"
48-
}
100+
r.Spec.Replicas = pointer.Int32Ptr(5)
49101
})).To(Succeed())
50102

51-
By("annotating the server-conf ConfigMap")
52-
// ensure annotations from the server-conf ConfigMap
53-
var annotations map[string]string
54-
Eventually(func() map[string]string {
55-
cfm := configMap(ctx, cluster, "server-conf")
56-
annotations = cfm.Annotations
57-
return annotations
58-
}, 5).Should(HaveKey("rabbitmq.com/serverConfUpdatedAt"))
59-
_, err := time.Parse(time.RFC3339, annotations["rabbitmq.com/serverConfUpdatedAt"])
60-
Expect(err).NotTo(HaveOccurred(), "Annotation rabbitmq.com/serverConfUpdatedAt was not a valid RFC3339 timestamp")
61-
62-
By("annotating the sts podTemplate")
63-
// ensure statefulSet annotations
64-
Eventually(func() map[string]string {
65-
sts := statefulSet(ctx, cluster)
66-
annotations = sts.Spec.Template.Annotations
67-
return annotations
68-
}, 5).Should(HaveKey("rabbitmq.com/lastRestartAt"))
69-
_, err = time.Parse(time.RFC3339, annotations["rabbitmq.com/lastRestartAt"])
70-
Expect(err).NotTo(HaveOccurred(), "Annotation rabbitmq.com/lastRestartAt was not a valid RFC3339 timestamp")
71-
72-
// delete rmq cluster
73-
Expect(client.Delete(ctx, cluster)).To(Succeed())
74-
waitForClusterDeletion(ctx, cluster, client)
75-
},
103+
Consistently(func() map[string]string {
104+
return configMap(ctx, cluster, "server-conf").Annotations
105+
}, 3, 0.3).ShouldNot(HaveKey("rabbitmq.com/serverConfUpdatedAt"))
76106

77-
Entry("spec.rabbitmq.additionalConfig is updated", "additional-config"),
78-
Entry("spec.rabbitmq.advancedConfig is updated", "advanced-config"),
79-
Entry("spec.rabbitmq.envConfig is updated", "env-config"),
80-
)
107+
Consistently(func() map[string]string {
108+
sts := statefulSet(ctx, cluster)
109+
return sts.Spec.Template.Annotations
110+
}, 3, 0.3).ShouldNot(HaveKey("rabbitmq.com/lastRestartAt"))
111+
})
112+
})
81113
})

internal/resource/configmap.go

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,19 +14,20 @@ import (
1414
"strings"
1515

1616
"sigs.k8s.io/controller-runtime/pkg/client"
17-
1817
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
1918

2019
"gopkg.in/ini.v1"
2120

2221
"github.com/rabbitmq/cluster-operator/internal/metadata"
2322
corev1 "k8s.io/api/core/v1"
23+
"k8s.io/apimachinery/pkg/api/equality"
2424
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2525
"k8s.io/utils/pointer"
2626
)
2727

2828
const (
2929
ServerConfigMapName = "server-conf"
30+
operatorDefaults = "operatorDefaults.conf"
3031
defaultRabbitmqConf = `
3132
cluster_partition_handling = pause_minority
3233
queue_master_locator = min-masters
@@ -55,10 +56,12 @@ prometheus.ssl.port = 15691
5556

5657
type ServerConfigMapBuilder struct {
5758
*RabbitmqResourceBuilder
59+
// Set to true if the config change requires RabbitMQ nodes to be restarted.
60+
UpdateRequiresStsRestart bool
5861
}
5962

6063
func (builder *RabbitmqResourceBuilder) ServerConfigMap() *ServerConfigMapBuilder {
61-
return &ServerConfigMapBuilder{builder}
64+
return &ServerConfigMapBuilder{builder, true}
6265
}
6366

6467
func (builder *ServerConfigMapBuilder) Build() (client.Object, error) {
@@ -78,6 +81,7 @@ func (builder *ServerConfigMapBuilder) UpdateMayRequireStsRecreate() bool {
7881

7982
func (builder *ServerConfigMapBuilder) Update(object client.Object) error {
8083
configMap := object.(*corev1.ConfigMap)
84+
previousConfigMap := configMap.DeepCopy()
8185

8286
ini.PrettySection = false // Remove trailing new line because rabbitmq.conf has only a default section.
8387
operatorConfiguration, err := ini.Load([]byte(defaultRabbitmqConf))
@@ -215,7 +219,7 @@ func (builder *ServerConfigMapBuilder) Update(object client.Object) error {
215219
configMap.Data = make(map[string]string)
216220
}
217221

218-
configMap.Data["operatorDefaults.conf"] = rmqConfBuffer.String()
222+
configMap.Data[operatorDefaults] = rmqConfBuffer.String()
219223

220224
rmqConfBuffer.Reset()
221225

@@ -237,6 +241,45 @@ func (builder *ServerConfigMapBuilder) Update(object client.Object) error {
237241
return fmt.Errorf("failed setting controller reference: %v", err)
238242
}
239243

244+
updatedConfigMap := configMap.DeepCopy()
245+
if err := removeConfigNotRequiringNodeRestart(previousConfigMap); err != nil {
246+
return err
247+
}
248+
if err := removeConfigNotRequiringNodeRestart(updatedConfigMap); err != nil {
249+
return err
250+
}
251+
if equality.Semantic.DeepEqual(previousConfigMap, updatedConfigMap) {
252+
builder.UpdateRequiresStsRestart = false
253+
}
254+
255+
return nil
256+
}
257+
258+
// removeConfigNotRequiringNodeRestart removes configuration data that does not require a restart of RabbitMQ nodes.
259+
// As of now, this data consists of peer discovery nodes:
260+
// In the case of scale out (i.e. adding more RabbitMQ nodes to the RabbitMQ cluster), new RabbitMQ nodes will
261+
// be added to the peer discovery configuration. New nodes will receive the configuration containing all peers.
262+
// However, exisiting nodes do not need to receive the new peer discovery configuration since the cluster is already formed.
263+
func removeConfigNotRequiringNodeRestart(configMap *corev1.ConfigMap) error {
264+
operatorConf := configMap.Data[operatorDefaults]
265+
if operatorConf == "" {
266+
return nil
267+
}
268+
conf, err := ini.Load([]byte(operatorConf))
269+
if err != nil {
270+
return err
271+
}
272+
defaultSection := conf.Section("")
273+
for _, key := range defaultSection.KeyStrings() {
274+
if strings.HasPrefix(key, "cluster_formation.classic_config.nodes.") {
275+
defaultSection.DeleteKey(key)
276+
}
277+
}
278+
var b strings.Builder
279+
if _, err := conf.WriteTo(&b); err != nil {
280+
return err
281+
}
282+
configMap.Data[operatorDefaults] = b.String()
240283
return nil
241284
}
242285

internal/resource/configmap_test.go

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -542,7 +542,7 @@ CONSOLE_LOG=new`
542542
Expect(configMap.Annotations).To(BeEmpty())
543543
})
544544

545-
When("multiple replicas", func() {
545+
Context("multiple replicas", func() {
546546
It("adds nodes for peer discovery", func() {
547547
instance.Spec.Replicas = pointer.Int32Ptr(3)
548548

@@ -557,7 +557,7 @@ cluster_formation.classic_config.nodes.3 = rabbit@%s-server-2.%s-nod
557557
)))
558558
})
559559
})
560-
When("no replicas", func() {
560+
Context("no replicas", func() {
561561
It("does not add nodes for peer discovery", func() {
562562
instance.Spec.Replicas = pointer.Int32Ptr(0)
563563

@@ -567,6 +567,34 @@ cluster_formation.classic_config.nodes.3 = rabbit@%s-server-2.%s-nod
567567
))
568568
})
569569
})
570+
571+
Describe("UpdateRequiresStsRestart", func() {
572+
BeforeEach(func() {
573+
Expect(configMapBuilder.Update(configMap)).To(Succeed())
574+
Expect(configMapBuilder.UpdateRequiresStsRestart).To(BeTrue())
575+
})
576+
When("the config does not change", func() {
577+
It("does not restart StatefulSet", func() {
578+
Expect(configMapBuilder.Update(configMap)).To(Succeed())
579+
Expect(configMapBuilder.UpdateRequiresStsRestart).To(BeFalse())
580+
})
581+
})
582+
When("the only config change is cluster formation nodes", func() {
583+
It("does not require the StatefulSet to be restarted", func() {
584+
instance.Spec.Replicas = pointer.Int32Ptr(3)
585+
Expect(configMapBuilder.Update(configMap)).To(Succeed())
586+
Expect(configMapBuilder.UpdateRequiresStsRestart).To(BeFalse())
587+
})
588+
})
589+
When("config change includes more than cluster formation nodes", func() {
590+
It("requires the StatefulSet to be restarted", func() {
591+
instance.Spec.Replicas = pointer.Int32Ptr(3)
592+
instance.Spec.Rabbitmq.AdditionalConfig = "foo = bar"
593+
Expect(configMapBuilder.Update(configMap)).To(Succeed())
594+
Expect(configMapBuilder.UpdateRequiresStsRestart).To(BeTrue())
595+
})
596+
})
597+
})
570598
})
571599

572600
Context("UpdateMayRequireStsRecreate", func() {

0 commit comments

Comments
 (0)