Skip to content

Commit 5b9ae5b

Browse files
ansdZerpet
authored andcommitted
Only 'rabbitmq-plugins set' if plugins config map changed
Relates to #304 Before this commit, the controller exec'ed into every RabbitMQ cluster pod in every reconcile loop to idempotently 'rabbitmq-plugins set'. Although simple and correct, these were unnecessary and expensive operations. After this commit, the controller only execs into the pods if the plugins config map got updated.
1 parent 4b231d4 commit 5b9ae5b

File tree

6 files changed

+141
-65
lines changed

6 files changed

+141
-65
lines changed

controllers/rabbitmqcluster_controller.go

Lines changed: 103 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,10 @@ var (
5555
)
5656

5757
const (
58-
ownerKey = ".metadata.controller"
59-
ownerKind = "RabbitmqCluster"
60-
deletionFinalizer = "deletion.finalizers.rabbitmqclusters.rabbitmq.com"
58+
ownerKey = ".metadata.controller"
59+
ownerKind = "RabbitmqCluster"
60+
deletionFinalizer = "deletion.finalizers.rabbitmqclusters.rabbitmq.com"
61+
pluginsUpdateAnnotation = "rabbitmq.com/pluginsUpdatedAt"
6162
)
6263

6364
// RabbitmqClusterReconciler reconciles a RabbitmqCluster object
@@ -183,7 +184,6 @@ func (r *RabbitmqClusterReconciler) Reconcile(req ctrl.Request) (ctrl.Result, er
183184
operationResult, apiError = controllerutil.CreateOrUpdate(ctx, r, resource, func() error {
184185
return builder.Update(resource)
185186
})
186-
187187
return apiError
188188
})
189189
r.logAndRecordOperationResult(rabbitmqCluster, resource, operationResult, err)
@@ -194,10 +194,10 @@ func (r *RabbitmqClusterReconciler) Reconcile(req ctrl.Request) (ctrl.Result, er
194194
"namespace", rabbitmqCluster.Namespace,
195195
"name", rabbitmqCluster.Name)
196196
}
197-
198197
return ctrl.Result{}, err
199198
}
200199

200+
r.annotatePluginsConfigMapIfUpdated(ctx, builder, operationResult, rabbitmqCluster)
201201
if restarted := r.restartStatefulSetIfNeeded(ctx, builder, operationResult, rabbitmqCluster); restarted {
202202
return ctrl.Result{RequeueAfter: time.Second * 10}, nil
203203
}
@@ -216,18 +216,13 @@ func (r *RabbitmqClusterReconciler) Reconcile(req ctrl.Request) (ctrl.Result, er
216216
return ctrl.Result{}, err
217217
}
218218

219-
if ok, err := r.allReplicasReadyAndUpdated(ctx, rabbitmqCluster); !ok {
220-
// only enable plugins when all pods of the StatefulSet become ready
221-
// requeue request after 10 seconds without error
222-
logger.Info("Not all replicas ready yet; requeuing request to enable plugins on RabbitmqCluster",
223-
"namespace", rabbitmqCluster.Namespace,
224-
"name", rabbitmqCluster.Name)
225-
return ctrl.Result{RequeueAfter: time.Second * 10}, err
226-
}
227-
228-
if err := r.enablePlugins(rabbitmqCluster); err != nil {
219+
requeueAfter, err := r.setPluginsIfNeeded(ctx, rabbitmqCluster)
220+
if err != nil {
229221
return ctrl.Result{}, err
230222
}
223+
if requeueAfter > 0 {
224+
return ctrl.Result{RequeueAfter: requeueAfter}, nil
225+
}
231226

232227
logger.Info("Finished reconciling RabbitmqCluster",
233228
"namespace", rabbitmqCluster.Namespace,
@@ -337,60 +332,131 @@ func (r *RabbitmqClusterReconciler) restartStatefulSetIfNeeded(
337332
sts.Spec.Template.ObjectMeta.Annotations["rabbitmq.com/lastRestartAt"] = time.Now().Format(time.RFC3339)
338333
return r.Update(ctx, sts)
339334
}); err != nil {
340-
msg := fmt.Sprintf("Failed to restart StatefulSet %s of Namespace %s; rabbitmq.conf configuration may be outdated", rmq.ChildResourceName("server"), rmq.Namespace)
335+
msg := fmt.Sprintf("failed to restart StatefulSet %s of Namespace %s; rabbitmq.conf configuration may be outdated", rmq.ChildResourceName("server"), rmq.Namespace)
341336
r.Log.Error(err, msg)
342337
r.Recorder.Event(rmq, corev1.EventTypeWarning, "FailedUpdate", msg)
343338
return false
344339
}
345340

346-
msg := fmt.Sprintf("Restarted StatefulSet %s of Namespace %s", rmq.ChildResourceName("server"), rmq.Namespace)
341+
msg := fmt.Sprintf("restarted StatefulSet %s of Namespace %s", rmq.ChildResourceName("server"), rmq.Namespace)
347342
r.Log.Info(msg)
348343
r.Recorder.Event(rmq, corev1.EventTypeNormal, "SuccessfulUpdate", msg)
349344
return true
350345
}
351346

352-
func (r *RabbitmqClusterReconciler) allReplicasReadyAndUpdated(ctx context.Context, rmq *rabbitmqv1beta1.RabbitmqCluster) (bool, error) {
353-
sts := &appsv1.StatefulSet{}
347+
// There are 2 paths how plugins are set:
348+
// 1. When SatefulSet is (re)started, the up-to-date plugins list (ConfigMap copied by the init container) is read by RabbitMQ nodes during node start up.
349+
// 2. When the plugins ConfigMap is changed, 'rabbitmq-plugins set' updates the plugins on every node (without the need to re-start the nodes).
350+
// This method implements the 2nd path.
351+
func (r *RabbitmqClusterReconciler) setPluginsIfNeeded(ctx context.Context, rmq *rabbitmqv1beta1.RabbitmqCluster) (requeueAfter time.Duration, err error) {
352+
configMap := corev1.ConfigMap{}
353+
if err := r.Get(ctx, types.NamespacedName{Namespace: rmq.Namespace, Name: rmq.ChildResourceName(resource.PluginsConfig)}, &configMap); err != nil {
354+
return 0, client.IgnoreNotFound(err)
355+
}
354356

355-
if err := r.Get(ctx, types.NamespacedName{Name: rmq.ChildResourceName("server"), Namespace: rmq.Namespace}, sts); err != nil {
356-
return false, client.IgnoreNotFound(err)
357+
pluginsUpdatedAt, ok := configMap.Annotations[pluginsUpdateAnnotation]
358+
if !ok {
359+
return 0, nil // plugins configMap was not updated
357360
}
358361

359-
desiredReplicas := *sts.Spec.Replicas
360-
if sts.Status.ReadyReplicas < desiredReplicas ||
361-
sts.Status.UpdatedReplicas < desiredReplicas { // StatefulSet rolling update is still ongoing (see https://github.com/rabbitmq/cluster-operator/issues/304)
362-
return false, nil
362+
annotationTime, err := time.Parse(time.RFC3339, pluginsUpdatedAt)
363+
if err != nil {
364+
return 0, err
365+
}
366+
if time.Since(annotationTime).Seconds() < 2 {
367+
// plugins configMap was updated very recently
368+
// give StatefulSet controller some time to trigger restart of StatefulSet if necessary
369+
// otherwise, there would be race conditions where we exec into containers losing the connection due to pods being terminated
370+
r.Log.Info("requeuing request to set plugins on RabbitmqCluster",
371+
"namespace", rmq.Namespace,
372+
"name", rmq.Name)
373+
return 2 * time.Second, nil
363374
}
364375

365-
return true, nil
366-
}
376+
ready, err := r.allReplicasReadyAndUpdated(ctx, rmq)
377+
if err != nil {
378+
return 0, err
379+
}
380+
if !ready {
381+
r.Log.Info("not all replicas ready yet; requeuing request to set plugins on RabbitmqCluster",
382+
"namespace", rmq.Namespace,
383+
"name", rmq.Name)
384+
return 15 * time.Second, err
385+
}
367386

368-
// Helper function to set the list of enabled plugins in the given RabbitmqCluster pods.
369-
// `rabbitmq-plugins set` disables plugins that are not in the provided list
370-
func (r *RabbitmqClusterReconciler) enablePlugins(rmq *rabbitmqv1beta1.RabbitmqCluster) error {
371387
plugins := resource.NewRabbitmqPlugins(rmq.Spec.Rabbitmq.AdditionalPlugins)
372388
for i := int32(0); i < *rmq.Spec.Replicas; i++ {
373389
podName := fmt.Sprintf("%s-%d", rmq.ChildResourceName("server"), i)
374390
rabbitCommand := fmt.Sprintf("rabbitmq-plugins set %s", plugins.AsString(" "))
375-
376391
stdout, stderr, err := r.exec(rmq.Namespace, podName, "rabbitmq", "sh", "-c", rabbitCommand)
377-
378392
if err != nil {
379-
r.Log.Error(err, "Failed to enable plugins",
393+
r.Log.Error(err, "failed to set plugins",
380394
"namespace", rmq.Namespace,
381395
"name", rmq.Name,
382396
"pod", podName,
383397
"command", rabbitCommand,
384398
"stdout", stdout,
385399
"stderr", stderr)
386-
return err
400+
return 0, err
387401
}
388402
}
389-
390-
r.Log.Info("Successfully enabled plugins on RabbitmqCluster",
403+
r.Log.Info("successfully set plugins on RabbitmqCluster",
391404
"namespace", rmq.Namespace,
392405
"name", rmq.Name)
393-
return nil
406+
407+
delete(configMap.Annotations, pluginsUpdateAnnotation)
408+
if err := r.Update(ctx, &configMap); err != nil {
409+
return 0, client.IgnoreNotFound(err)
410+
}
411+
412+
return 0, nil
413+
}
414+
415+
func (r *RabbitmqClusterReconciler) allReplicasReadyAndUpdated(ctx context.Context, rmq *rabbitmqv1beta1.RabbitmqCluster) (bool, error) {
416+
sts := &appsv1.StatefulSet{}
417+
418+
if err := r.Get(ctx, types.NamespacedName{Name: rmq.ChildResourceName("server"), Namespace: rmq.Namespace}, sts); err != nil {
419+
return false, client.IgnoreNotFound(err)
420+
}
421+
422+
desiredReplicas := *sts.Spec.Replicas
423+
if sts.Status.ReadyReplicas < desiredReplicas ||
424+
sts.Status.UpdatedReplicas < desiredReplicas { // StatefulSet rolling update is ongoing
425+
return false, nil
426+
}
427+
428+
return true, nil
429+
}
430+
431+
// Annotates the plugins ConfigMap if it was updated such that 'rabbitmq-plugins set' will be called on the RabbitMQ nodes at a later point in time
432+
func (r *RabbitmqClusterReconciler) annotatePluginsConfigMapIfUpdated(
433+
ctx context.Context,
434+
builder resource.ResourceBuilder,
435+
operationResult controllerutil.OperationResult,
436+
rmq *rabbitmqv1beta1.RabbitmqCluster) {
437+
438+
if _, ok := builder.(*resource.RabbitmqPluginsConfigMapBuilder); !ok {
439+
return
440+
}
441+
if operationResult != controllerutil.OperationResultUpdated {
442+
return
443+
}
444+
445+
if retryOnConflictErr := clientretry.RetryOnConflict(clientretry.DefaultRetry, func() error {
446+
configMap := corev1.ConfigMap{}
447+
if err := r.Get(ctx, types.NamespacedName{Namespace: rmq.Namespace, Name: rmq.ChildResourceName(resource.PluginsConfig)}, &configMap); err != nil {
448+
return client.IgnoreNotFound(err)
449+
}
450+
if configMap.Annotations == nil {
451+
configMap.Annotations = make(map[string]string)
452+
}
453+
configMap.Annotations[pluginsUpdateAnnotation] = time.Now().Format(time.RFC3339)
454+
return r.Update(ctx, &configMap)
455+
}); retryOnConflictErr != nil {
456+
msg := fmt.Sprintf("Failed to annotate ConfigMap %s of Namespace %s; enabled_plugins may be outdated", rmq.ChildResourceName(resource.PluginsConfig), rmq.Namespace)
457+
r.Log.Error(retryOnConflictErr, msg)
458+
r.Recorder.Event(rmq, corev1.EventTypeWarning, "FailedUpdate", msg)
459+
}
394460
}
395461

396462
func (r *RabbitmqClusterReconciler) exec(namespace, podName, containerName string, command ...string) (string, string, error) {

controllers/rabbitmqcluster_controller_test.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -167,16 +167,16 @@ var _ = Describe("RabbitmqclusterController", func() {
167167
})
168168
By("recording SuccessfullCreate events for all child resources", func() {
169169
allEventMsgs := aggregateEventMsgs(ctx, rabbitmqCluster, "SuccessfulCreate")
170-
Expect(allEventMsgs).To(ContainSubstring(fmt.Sprintf("created resource %s of Type *v1.StatefulSet", rabbitmqCluster.ChildResourceName("server"))))
171-
Expect(allEventMsgs).To(ContainSubstring(fmt.Sprintf("created resource %s of Type *v1.Service", rabbitmqCluster.ChildResourceName("client"))))
172-
Expect(allEventMsgs).To(ContainSubstring(fmt.Sprintf("created resource %s of Type *v1.Service", rabbitmqCluster.ChildResourceName("headless"))))
173-
Expect(allEventMsgs).To(ContainSubstring(fmt.Sprintf("created resource %s of Type *v1.ConfigMap", rabbitmqCluster.ChildResourceName("plugins-conf"))))
174-
Expect(allEventMsgs).To(ContainSubstring(fmt.Sprintf("created resource %s of Type *v1.ConfigMap", rabbitmqCluster.ChildResourceName("server-conf"))))
175-
Expect(allEventMsgs).To(ContainSubstring(fmt.Sprintf("created resource %s of Type *v1.Secret", rabbitmqCluster.ChildResourceName("erlang-cookie"))))
176-
Expect(allEventMsgs).To(ContainSubstring(fmt.Sprintf("created resource %s of Type *v1.Secret", rabbitmqCluster.ChildResourceName("admin"))))
177-
Expect(allEventMsgs).To(ContainSubstring(fmt.Sprintf("created resource %s of Type *v1.ServiceAccount", rabbitmqCluster.ChildResourceName("server"))))
178-
Expect(allEventMsgs).To(ContainSubstring(fmt.Sprintf("created resource %s of Type *v1.Role", rabbitmqCluster.ChildResourceName("peer-discovery"))))
179-
Expect(allEventMsgs).To(ContainSubstring(fmt.Sprintf("created resource %s of Type *v1.RoleBinding", rabbitmqCluster.ChildResourceName("server"))))
170+
Expect(allEventMsgs).To(ContainSubstring("created resource %s of Type *v1.StatefulSet", rabbitmqCluster.ChildResourceName("server")))
171+
Expect(allEventMsgs).To(ContainSubstring("created resource %s of Type *v1.Service", rabbitmqCluster.ChildResourceName("client")))
172+
Expect(allEventMsgs).To(ContainSubstring("created resource %s of Type *v1.Service", rabbitmqCluster.ChildResourceName("headless")))
173+
Expect(allEventMsgs).To(ContainSubstring("created resource %s of Type *v1.ConfigMap", rabbitmqCluster.ChildResourceName("plugins-conf")))
174+
Expect(allEventMsgs).To(ContainSubstring("created resource %s of Type *v1.ConfigMap", rabbitmqCluster.ChildResourceName("server-conf")))
175+
Expect(allEventMsgs).To(ContainSubstring("created resource %s of Type *v1.Secret", rabbitmqCluster.ChildResourceName("erlang-cookie")))
176+
Expect(allEventMsgs).To(ContainSubstring("created resource %s of Type *v1.Secret", rabbitmqCluster.ChildResourceName("admin")))
177+
Expect(allEventMsgs).To(ContainSubstring("created resource %s of Type *v1.ServiceAccount", rabbitmqCluster.ChildResourceName("server")))
178+
Expect(allEventMsgs).To(ContainSubstring("created resource %s of Type *v1.Role", rabbitmqCluster.ChildResourceName("peer-discovery")))
179+
Expect(allEventMsgs).To(ContainSubstring("created resource %s of Type *v1.RoleBinding", rabbitmqCluster.ChildResourceName("server")))
180180
})
181181

182182
By("adding the deletion finalizer", func() {
@@ -665,7 +665,7 @@ var _ = Describe("RabbitmqclusterController", func() {
665665

666666
// verify that SuccessfulUpdate event is recorded for the client service
667667
Expect(aggregateEventMsgs(ctx, rabbitmqCluster, "SuccessfulUpdate")).To(
668-
ContainSubstring(fmt.Sprintf("updated resource %s of Type *v1.Service", rabbitmqCluster.ChildResourceName("client"))))
668+
ContainSubstring("updated resource %s of Type *v1.Service", rabbitmqCluster.ChildResourceName("client")))
669669
})
670670

671671
It("the CPU and memory requirements are updated", func() {
@@ -699,7 +699,7 @@ var _ = Describe("RabbitmqclusterController", func() {
699699

700700
// verify that SuccessfulUpdate event is recorded for the StatefulSet
701701
Expect(aggregateEventMsgs(ctx, rabbitmqCluster, "SuccessfulUpdate")).To(
702-
ContainSubstring(fmt.Sprintf("updated resource %s of Type *v1.StatefulSet", rabbitmqCluster.ChildResourceName("server"))))
702+
ContainSubstring("updated resource %s of Type *v1.StatefulSet", rabbitmqCluster.ChildResourceName("server")))
703703
})
704704

705705
It("the rabbitmq image is updated", func() {

go.sum

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -702,6 +702,7 @@ google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ij
702702
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
703703
google.golang.org/grpc v1.23.1 h1:q4XQuHFC6I28BKZpo6IYyb3mNO+l7lSOxRuYTCiDfXk=
704704
google.golang.org/grpc v1.23.1/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
705+
google.golang.org/grpc v1.26.0 h1:2dTRdpdFEEhJYQD8EMLB61nnrzSCTbG38PhqdhvOltg=
705706
google.golang.org/grpc v1.26.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
706707
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
707708
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=

internal/resource/rabbitmq_plugins.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ var requiredPlugins = []string{
1616
"rabbitmq_management",
1717
}
1818

19-
const pluginsConfig = "plugins-conf"
19+
const PluginsConfig = "plugins-conf"
2020

2121
type RabbitmqPlugins struct {
2222
requiredPlugins []string
@@ -82,7 +82,7 @@ func (builder *RabbitmqPluginsConfigMapBuilder) Update(object runtime.Object) er
8282
func (builder *RabbitmqPluginsConfigMapBuilder) Build() (runtime.Object, error) {
8383
return &corev1.ConfigMap{
8484
ObjectMeta: metav1.ObjectMeta{
85-
Name: builder.Instance.ChildResourceName(pluginsConfig),
85+
Name: builder.Instance.ChildResourceName(PluginsConfig),
8686
Namespace: builder.Instance.Namespace,
8787
},
8888
Data: map[string]string{

internal/resource/statefulset.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ func (builder *StatefulSetBuilder) podTemplateSpec(annotations, labels map[strin
291291
VolumeSource: corev1.VolumeSource{
292292
ConfigMap: &corev1.ConfigMapVolumeSource{
293293
LocalObjectReference: corev1.LocalObjectReference{
294-
Name: builder.Instance.ChildResourceName(pluginsConfig),
294+
Name: builder.Instance.ChildResourceName(PluginsConfig),
295295
},
296296
},
297297
},

system_tests/system_tests.go

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,13 @@ package system_tests
1111

1212
import (
1313
"context"
14-
"time"
1514

1615
. "github.com/onsi/ginkgo"
1716
. "github.com/onsi/gomega"
1817
"gopkg.in/ini.v1"
1918

2019
rabbitmqv1beta1 "github.com/rabbitmq/cluster-operator/api/v1beta1"
20+
"github.com/rabbitmq/cluster-operator/internal/resource"
2121
corev1 "k8s.io/api/core/v1"
2222
k8sresource "k8s.io/apimachinery/pkg/api/resource"
2323
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -138,18 +138,27 @@ var _ = Describe("Operator", func() {
138138
cluster.Spec.Rabbitmq.AdditionalPlugins = []rabbitmqv1beta1.Plugin{"rabbitmq_top"}
139139
})).To(Succeed())
140140

141-
Eventually(func() error {
142-
_, err := kubectlExec(namespace,
143-
statefulSetPodName(cluster, 0),
144-
"rabbitmq-plugins",
145-
"is_enabled",
146-
"rabbitmq_management",
147-
"rabbitmq_peer_discovery_k8s",
148-
"rabbitmq_prometheus",
149-
"rabbitmq_top",
150-
)
151-
return err
152-
}, 360*time.Second).Should(Succeed())
141+
getConfigMapAnnotations := func() map[string]string {
142+
configMapName := cluster.ChildResourceName(resource.PluginsConfig)
143+
configMap, err := clientSet.CoreV1().ConfigMaps(cluster.Namespace).Get(ctx, configMapName, metav1.GetOptions{})
144+
Expect(err).ToNot(HaveOccurred())
145+
return configMap.Annotations
146+
}
147+
Eventually(getConfigMapAnnotations, 10, 0.5).Should(
148+
HaveKey("rabbitmq.com/pluginsUpdatedAt"), "plugins ConfigMap should have been annotated")
149+
Eventually(getConfigMapAnnotations, 60, 1).Should(
150+
Not(HaveKey("rabbitmq.com/pluginsUpdatedAt")), "plugins ConfigMap annotation should have been removed")
151+
152+
_, err := kubectlExec(namespace,
153+
statefulSetPodName(cluster, 0),
154+
"rabbitmq-plugins",
155+
"is_enabled",
156+
"rabbitmq_management",
157+
"rabbitmq_peer_discovery_k8s",
158+
"rabbitmq_prometheus",
159+
"rabbitmq_top",
160+
)
161+
Expect(err).ToNot(HaveOccurred())
153162
})
154163

155164
By("updating the rabbitmq.conf file when additionalConfig are modified", func() {

0 commit comments

Comments
 (0)