Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions controllers/rabbitmqcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,23 @@ func (r *RabbitmqClusterReconciler) Reconcile(ctx context.Context, req ctrl.Requ
if err := builder.Update(sts); err != nil {
return ctrl.Result{}, err
}
if r.scaleDown(ctx, rabbitmqCluster, current, sts) {
// return when cluster scale down detected; unsupported operation
return ctrl.Result{}, nil
if ScaleToZero(current, sts) {
err := r.saveReplicasBeforeZero(ctx, rabbitmqCluster, current)
if err != nil {
return ctrl.Result{}, err
}
} else {
if r.scaleDown(ctx, rabbitmqCluster, current, sts) {
// return when cluster scale down detected; unsupported operation
return ctrl.Result{}, nil
}
}
if ScaleFromZero(current, sts) {
if r.scaleFromZeroToBeforeReplicasConfigured(ctx, rabbitmqCluster, sts) {
// return when cluster scale down from zero detected; unsupported operation
return ctrl.Result{}, nil
}
r.removeReplicasBeforeZeroAnnotationIfExists(ctx, rabbitmqCluster)
}
}

Expand All @@ -212,8 +226,8 @@ func (r *RabbitmqClusterReconciler) Reconcile(ctx context.Context, req ctrl.Requ
r.setReconcileSuccess(ctx, rabbitmqCluster, corev1.ConditionFalse, "FailedReconcilePVC", err.Error())
return ctrl.Result{}, err
}
}

}
var operationResult controllerutil.OperationResult
err = clientretry.RetryOnConflict(clientretry.DefaultRetry, func() error {
var apiError error
Expand Down
92 changes: 92 additions & 0 deletions controllers/reconcile_scale_zero.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package controllers

import (
"context"
"errors"
"fmt"
"strconv"

ctrl "sigs.k8s.io/controller-runtime"

"github.com/rabbitmq/cluster-operator/v2/api/v1beta1"
"github.com/rabbitmq/cluster-operator/v2/internal/status"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
)

const beforeZeroReplicasConfigured = "rabbitmq.com/before-zero-replicas-configured"

// ScaleToZero checks if the desired replicas is zero and the current replicas is not zero.
func ScaleToZero(current, sts *appsv1.StatefulSet) bool {
currentReplicas := *current.Spec.Replicas
desiredReplicas := *sts.Spec.Replicas
return desiredReplicas == 0 && currentReplicas > 0
}

// ScaleFromZero checks if the current replicas is zero and the desired replicas is greater than zero.
func ScaleFromZero(current, sts *appsv1.StatefulSet) bool {
currentReplicas := *current.Spec.Replicas
desiredReplicas := *sts.Spec.Replicas
return currentReplicas == 0 && desiredReplicas > 0
}

// scaleDownFromZero checks if the current replicas is desired replicas would be greatter than replicas configured before zero state.
func (r *RabbitmqClusterReconciler) scaleFromZeroToBeforeReplicasConfigured(ctx context.Context, cluster *v1beta1.RabbitmqCluster, sts *appsv1.StatefulSet) bool {
logger := ctrl.LoggerFrom(ctx)
var err error
var beforeZeroReplicas int64
desiredReplicas := *sts.Spec.Replicas
annotationValue, ok := cluster.Annotations[beforeZeroReplicasConfigured]
if !ok {
return false
}

beforeZeroReplicas, err = strconv.ParseInt(annotationValue, 10, 32)
if err != nil {
msg := "Failed to convert string to integer for before-zero-replicas-configuration annotation"
reason := "TransformErrorOperation"
logger.Error(errors.New(reason), msg)
err = r.recordEventsAndSetCondition(ctx, cluster, status.ReconcileSuccess, corev1.ConditionFalse, corev1.EventTypeWarning, reason, msg)
if err != nil {
logger.V(1).Info(err.Error())
}
return true
}
if desiredReplicas != int32(beforeZeroReplicas) {
msg := fmt.Sprintf("Unsupported operation; when scaling from zero, you can only restore the previous number of replicas (%d)", int32(beforeZeroReplicas))
reason := "UnsupportedOperation"
logger.Error(errors.New(reason), msg)
err = r.recordEventsAndSetCondition(ctx, cluster, status.ReconcileSuccess, corev1.ConditionFalse, corev1.EventTypeWarning, reason, msg)
if err != nil {
logger.V(1).Info(err.Error())
}
return true
}
return false

}

// saveReplicasBeforeZero saves the current replicas count in an annotation before scaling down to zero.
// This is used to prevent scaling down when the cluster change from zero replicas to a number less than the saved replicas count.
func (r *RabbitmqClusterReconciler) saveReplicasBeforeZero(ctx context.Context, cluster *v1beta1.RabbitmqCluster, current *appsv1.StatefulSet) error {
currentReplicas := *current.Spec.Replicas
logger := ctrl.LoggerFrom(ctx)
msg := "Cluster Scale down to 0 replicas"
reason := "ScaleDownToZero"
logger.Info(msg)
r.Recorder.Event(cluster, corev1.EventTypeNormal, reason, msg)
return r.updateAnnotation(ctx, cluster, cluster.Namespace, cluster.Name, beforeZeroReplicasConfigured, fmt.Sprint(currentReplicas))
}

// If the annotation rabbitmq.com/before-zero-replicas-configured exists it will be deleted.
func (r *RabbitmqClusterReconciler) removeReplicasBeforeZeroAnnotationIfExists(ctx context.Context, cluster *v1beta1.RabbitmqCluster) {
if _, ok := cluster.Annotations[beforeZeroReplicasConfigured]; ok {
r.deleteAnnotation(ctx, cluster, beforeZeroReplicasConfigured)
}
}

func (r *RabbitmqClusterReconciler) recordEventsAndSetCondition(ctx context.Context, cluster *v1beta1.RabbitmqCluster, condType status.RabbitmqClusterConditionType, condStatus corev1.ConditionStatus, eventType, reason, msg string) error {
r.Recorder.Event(cluster, eventType, reason, msg)
cluster.Status.SetCondition(condType, condStatus, reason, msg)
return r.Status().Update(ctx, cluster)
}
231 changes: 231 additions & 0 deletions controllers/reconcile_scale_zero_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
package controllers_test

import (
"context"
"fmt"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
rabbitmqv1beta1 "github.com/rabbitmq/cluster-operator/v2/api/v1beta1"
"github.com/rabbitmq/cluster-operator/v2/internal/status"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
runtimeClient "sigs.k8s.io/controller-runtime/pkg/client"
)

var _ = Describe("Cluster scale to zero", func() {
var (
cluster *rabbitmqv1beta1.RabbitmqCluster
defaultNamespace = "default"
ctx = context.Background()
)

AfterEach(func() {
Expect(client.Delete(ctx, cluster)).To(Succeed())
waitForClusterDeletion(ctx, cluster, client)
Eventually(func() bool {
rmq := &rabbitmqv1beta1.RabbitmqCluster{}
err := client.Get(ctx, types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}, rmq)
return apierrors.IsNotFound(err)
}).Should(BeTrue())
})

It("scale to zero", func() {
By("update statefulSet replicas to zero", func() {
cluster = &rabbitmqv1beta1.RabbitmqCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "rabbitmq-to-zero",
Namespace: defaultNamespace,
},
Spec: rabbitmqv1beta1.RabbitmqClusterSpec{
Replicas: ptr.To(int32(2)),
},
}
Expect(client.Create(ctx, cluster)).To(Succeed())
waitForClusterCreation(ctx, cluster, client)

Expect(updateWithRetry(cluster, func(r *rabbitmqv1beta1.RabbitmqCluster) {
r.Spec.Replicas = ptr.To(int32(0))
})).To(Succeed())

Eventually(func() int32 {
sts, err := clientSet.AppsV1().StatefulSets(defaultNamespace).Get(ctx, cluster.ChildResourceName("server"), metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())
return *sts.Spec.Replicas
}, 10, 1).Should(Equal(int32(0)))

})

By("setting ReconcileSuccess to 'true'", func() {
Eventually(func() string {
rabbit := &rabbitmqv1beta1.RabbitmqCluster{}
Expect(client.Get(ctx, runtimeClient.ObjectKey{
Name: cluster.Name,
Namespace: defaultNamespace,
}, rabbit)).To(Succeed())

for i := range rabbit.Status.Conditions {
if rabbit.Status.Conditions[i].Type == status.ReconcileSuccess {
return fmt.Sprintf(
"ReconcileSuccess status: %s, with reason: %s and message: %s",
rabbit.Status.Conditions[i].Status,
rabbit.Status.Conditions[i].Reason,
rabbit.Status.Conditions[i].Message)
}
}
return "ReconcileSuccess status: condition not present"
}, 0).Should(Equal("ReconcileSuccess status: True, " +
"with reason: Success " +
"and message: Finish reconciling"))
})
})
})

var _ = Describe("Cluster scale from zero", func() {
var (
cluster *rabbitmqv1beta1.RabbitmqCluster
defaultNamespace = "default"
ctx = context.Background()
)

AfterEach(func() {
Expect(client.Delete(ctx, cluster)).To(Succeed())
waitForClusterDeletion(ctx, cluster, client)
Eventually(func() bool {
rmq := &rabbitmqv1beta1.RabbitmqCluster{}
err := client.Get(ctx, types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}, rmq)
return apierrors.IsNotFound(err)
}).Should(BeTrue())
})

It("scale from zero", func() {
By("update statefulSet replicas from zero", func() {
cluster = &rabbitmqv1beta1.RabbitmqCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "rabbitmq-from-zero",
Namespace: defaultNamespace,
Annotations: map[string]string{
"rabbitmq.com/before-zero-replicas-configured": "2",
},
},
Spec: rabbitmqv1beta1.RabbitmqClusterSpec{
Replicas: ptr.To(int32(0)),
},
}
Expect(client.Create(ctx, cluster)).To(Succeed())
waitForClusterCreation(ctx, cluster, client)

Expect(updateWithRetry(cluster, func(r *rabbitmqv1beta1.RabbitmqCluster) {
r.Spec.Replicas = ptr.To(int32(2))
})).To(Succeed())

Eventually(func() int32 {
sts, err := clientSet.AppsV1().StatefulSets(defaultNamespace).Get(ctx, cluster.ChildResourceName("server"), metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())
return *sts.Spec.Replicas
}, 10, 1).Should(Equal(int32(2)))

})

By("setting ReconcileSuccess to 'true'", func() {
Eventually(func() string {
rabbit := &rabbitmqv1beta1.RabbitmqCluster{}
Expect(client.Get(ctx, runtimeClient.ObjectKey{
Name: cluster.Name,
Namespace: defaultNamespace,
}, rabbit)).To(Succeed())

for i := range rabbit.Status.Conditions {
if rabbit.Status.Conditions[i].Type == status.ReconcileSuccess {
return fmt.Sprintf(
"ReconcileSuccess status: %s, with reason: %s and message: %s",
rabbit.Status.Conditions[i].Status,
rabbit.Status.Conditions[i].Reason,
rabbit.Status.Conditions[i].Message)
}
}
return "ReconcileSuccess status: condition not present"
}, 0).Should(Equal("ReconcileSuccess status: True, " +
"with reason: Success " +
"and message: Finish reconciling"))
})
})
})

var _ = Describe("Cluster scale from zero to less replicas configured", Ordered, func() {
var (
cluster *rabbitmqv1beta1.RabbitmqCluster
defaultNamespace = "default"
ctx = context.Background()
)

AfterEach(func() {
Expect(client.Delete(ctx, cluster)).To(Succeed())
waitForClusterDeletion(ctx, cluster, client)
Eventually(func() bool {
rmq := &rabbitmqv1beta1.RabbitmqCluster{}
err := client.Get(ctx, types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}, rmq)
return apierrors.IsNotFound(err)
}).Should(BeTrue())
})

It("scale from zero to less replicas", func() {
By("update statefulSet replicas from zero", func() {
cluster = &rabbitmqv1beta1.RabbitmqCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "rabbitmq-from-zero-to-less",
Namespace: defaultNamespace,
Annotations: map[string]string{
"rabbitmq.com/before-zero-replicas-configured": "2",
},
},
Spec: rabbitmqv1beta1.RabbitmqClusterSpec{
Replicas: ptr.To(int32(0)),
},
}
Expect(client.Create(ctx, cluster)).To(Succeed())
waitForClusterCreation(ctx, cluster, client)

Expect(updateWithRetry(cluster, func(r *rabbitmqv1beta1.RabbitmqCluster) {
r.Spec.Replicas = ptr.To(int32(1))
})).To(Succeed())

Consistently(func() int32 {
sts, err := clientSet.AppsV1().StatefulSets(defaultNamespace).Get(ctx, cluster.ChildResourceName("server"), metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())
return *sts.Spec.Replicas
}, 10, 1).Should(Equal(int32(0)))

})

By("setting 'Warning' events", func() {
Expect(aggregateEventMsgs(ctx, cluster, "UnsupportedOperation")).To(
ContainSubstring("Unsupported operation"))
})

By("setting ReconcileSuccess to 'false'", func() {
Eventually(func() string {
rabbit := &rabbitmqv1beta1.RabbitmqCluster{}
Expect(client.Get(ctx, runtimeClient.ObjectKey{
Name: cluster.Name,
Namespace: defaultNamespace,
}, rabbit)).To(Succeed())

for i := range rabbit.Status.Conditions {
if rabbit.Status.Conditions[i].Type == status.ReconcileSuccess {
return fmt.Sprintf(
"ReconcileSuccess status: %s, with reason: %s and message: %s",
rabbit.Status.Conditions[i].Status,
rabbit.Status.Conditions[i].Reason,
rabbit.Status.Conditions[i].Message)
}
}
return "ReconcileSuccess status: condition not present"
}, 0).Should(Equal("ReconcileSuccess status: False, " +
"with reason: UnsupportedOperation " +
"and message: Unsupported operation; when scaling from zero, you can only restore the previous number of replicas (2)"))
})
})
})
7 changes: 7 additions & 0 deletions internal/status/all_replicas_ready.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ func AllReplicasReadyCondition(resources []runtime.Object,
if resource.Spec.Replicas != nil {
desiredReplicas = *resource.Spec.Replicas
}

if desiredReplicas == 0 {
condition.Status = corev1.ConditionFalse
condition.Reason = "ScaledToZero"
goto assignLastTransitionTime
}

if desiredReplicas == resource.Status.ReadyReplicas {
condition.Status = corev1.ConditionTrue
condition.Reason = "AllPodsAreReady"
Expand Down