Skip to content

Commit 0e73117

Browse files
committed
PVC expansion
1 parent 994fca7 commit 0e73117

File tree

9 files changed

+262
-9
lines changed

9 files changed

+262
-9
lines changed

api/v1beta1/rabbitmqcluster_types.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
package v1beta1
1010

1111
import (
12+
"strconv"
1213
"strings"
1314

1415
appsv1 "k8s.io/api/apps/v1"
@@ -364,6 +365,10 @@ func (cluster RabbitmqCluster) ChildResourceName(name string) string {
364365
return strings.TrimSuffix(strings.Join([]string{cluster.Name, name}, "-"), "-")
365366
}
366367

368+
func (cluster RabbitmqCluster) PVCName(i int) string {
369+
return strings.Join([]string{"persistence", cluster.Name, "server", strconv.Itoa(i)}, "-")
370+
}
371+
367372
func init() {
368373
SchemeBuilder.Register(&RabbitmqCluster{}, &RabbitmqClusterList{})
369374
}

api/v1beta1/rabbitmqcluster_types_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -456,6 +456,12 @@ var _ = Describe("RabbitmqCluster", func() {
456456
Expect(updatedCondition.LastTransitionTime.Before(&notExpectedTime)).To(BeFalse())
457457
})
458458
})
459+
Context("PVC Name helper function", func() {
460+
It("returns the correct PVC name", func() {
461+
r := generateRabbitmqClusterObject("testrabbit")
462+
Expect(r.PVCName(0)).To(Equal("persistence-testrabbit-server-0"))
463+
})
464+
})
459465
})
460466

461467
func getKey(cluster *RabbitmqCluster) types.NamespacedName {

config/rbac/role.yaml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,16 @@ rules:
4040
- create
4141
- get
4242
- patch
43+
- apiGroups:
44+
- ""
45+
resources:
46+
- persistentvolumeclaims
47+
verbs:
48+
- create
49+
- get
50+
- list
51+
- update
52+
- watch
4353
- apiGroups:
4454
- ""
4555
resources:

controllers/rabbitmqcluster_controller.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ type RabbitmqClusterReconciler struct {
7878
// +kubebuilder:rbac:groups=rabbitmq.com,resources=rabbitmqclusters/finalizers,verbs=update
7979
// +kubebuilder:rbac:groups="",resources=events,verbs=get;create;patch
8080
// +kubebuilder:rbac:groups="",resources=serviceaccounts,verbs=get;list;watch;create;update
81+
// +kubebuilder:rbac:groups="",resources=persistentvolumeclaims,verbs=get;list;watch;create;update
8182
// +kubebuilder:rbac:groups="rbac.authorization.k8s.io",resources=roles,verbs=get;list;watch;create;update
8283
// +kubebuilder:rbac:groups="rbac.authorization.k8s.io",resources=rolebindings,verbs=get;list;watch;create;update
8384

@@ -160,6 +161,10 @@ func (r *RabbitmqClusterReconciler) Reconcile(ctx context.Context, req ctrl.Requ
160161
return ctrl.Result{}, err
161162
}
162163

164+
if err = r.reconcilePVC(ctx, builder, rabbitmqCluster, resource); err != nil {
165+
return ctrl.Result{}, err
166+
}
167+
163168
var operationResult controllerutil.OperationResult
164169
err = clientretry.RetryOnConflict(clientretry.DefaultRetry, func() error {
165170
var apiError error
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
package controllers
2+
3+
import (
4+
"errors"
5+
"context"
6+
"fmt"
7+
"github.com/go-logr/logr"
8+
rabbitmqv1beta1 "github.com/rabbitmq/cluster-operator/api/v1beta1"
9+
"github.com/rabbitmq/cluster-operator/internal/resource"
10+
appsv1 "k8s.io/api/apps/v1"
11+
corev1 "k8s.io/api/core/v1"
12+
k8serrors "k8s.io/apimachinery/pkg/api/errors"
13+
k8sresource "k8s.io/apimachinery/pkg/api/resource"
14+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
15+
"k8s.io/apimachinery/pkg/types"
16+
ctrl "sigs.k8s.io/controller-runtime"
17+
"sigs.k8s.io/controller-runtime/pkg/client"
18+
"time"
19+
)
20+
21+
22+
func (r *RabbitmqClusterReconciler) reconcilePVC(ctx context.Context, builder resource.ResourceBuilder, cluster *rabbitmqv1beta1.RabbitmqCluster, resource client.Object) error {
23+
logger := ctrl.LoggerFrom(ctx)
24+
25+
switch sts := resource.(type) {
26+
case *appsv1.StatefulSet:
27+
current, err := r.statefulSet(ctx, cluster)
28+
if client.IgnoreNotFound(err) != nil {
29+
return err
30+
} else if k8serrors.IsNotFound(err) {
31+
logger.Info("statefulSet not created yet, skipping checks to expand PersistentVolumeClaims")
32+
return nil
33+
}
34+
35+
if err := builder.Update(sts); err != nil {
36+
return err
37+
}
38+
39+
resize, err := r.needsPVCResize(current, sts)
40+
41+
if err != nil {
42+
return err
43+
}
44+
45+
if resize {
46+
if err := r.expandPVC(ctx, cluster, current, sts); err != nil {
47+
logger.Error(err, "Failed to expand PersistentVolumeClaims", "statefulSet", cluster.ChildResourceName("server"))
48+
return err
49+
}
50+
}
51+
}
52+
return nil
53+
}
54+
55+
func (r *RabbitmqClusterReconciler) expandPVC(ctx context.Context, rmq *rabbitmqv1beta1.RabbitmqCluster, current, desired *appsv1.StatefulSet) error {
56+
logger := ctrl.LoggerFrom(ctx)
57+
58+
currentCapacity, err := persistenceStorageCapacity(current.Spec.VolumeClaimTemplates)
59+
if err != nil {
60+
return err
61+
}
62+
63+
desiredCapacity, err := persistenceStorageCapacity(desired.Spec.VolumeClaimTemplates)
64+
if err != nil {
65+
return err
66+
}
67+
68+
logger.Info(fmt.Sprintf("updating storage capacity from %v to %v", currentCapacity, desiredCapacity))
69+
70+
if err := r.deleteSts(ctx, rmq); err != nil {
71+
return err
72+
}
73+
74+
if err := r.updatePVC(ctx, rmq, *current.Spec.Replicas, desiredCapacity); err != nil {
75+
return err
76+
}
77+
78+
return nil
79+
}
80+
81+
func (r *RabbitmqClusterReconciler) updatePVC(ctx context.Context, rmq *rabbitmqv1beta1.RabbitmqCluster, replicas int32, desiredCapacity k8sresource.Quantity) error {
82+
logger := ctrl.LoggerFrom(ctx)
83+
logger.Info("expanding PersistentVolumeClaims")
84+
85+
for i := 0; i < int(replicas); i++ {
86+
PVCName := rmq.PVCName(i)
87+
PVC := corev1.PersistentVolumeClaim{}
88+
89+
if err := r.Client.Get(ctx, types.NamespacedName{Namespace: rmq.Namespace, Name: PVCName}, &PVC); err != nil {
90+
logger.Error(err, "failed to get PersistentVolumeClaim")
91+
return err
92+
}
93+
PVC.Spec.Resources.Requests[corev1.ResourceStorage] = desiredCapacity
94+
if err := r.Client.Update(ctx, &PVC, &client.UpdateOptions{}); err != nil {
95+
logger.Error(err, "failed to update PersistentVolumeClaim")
96+
return err
97+
}
98+
logger.Info("successfully expanded", "PVC", PVCName)
99+
}
100+
return nil
101+
}
102+
103+
func (r *RabbitmqClusterReconciler) needsPVCResize(current, desired *appsv1.StatefulSet) (bool, error) {
104+
currentCapacity, err := persistenceStorageCapacity(current.Spec.VolumeClaimTemplates)
105+
if err != nil {
106+
return false, err
107+
}
108+
109+
desiredCapacity, err := persistenceStorageCapacity(desired.Spec.VolumeClaimTemplates)
110+
if err != nil {
111+
return false, err
112+
}
113+
114+
if currentCapacity.Cmp(desiredCapacity) != 0 {
115+
return true, nil
116+
}
117+
118+
return false, nil
119+
}
120+
121+
func persistenceStorageCapacity(templates []corev1.PersistentVolumeClaim) (k8sresource.Quantity, error) {
122+
for _, t := range templates {
123+
if t.Name == "persistence" {
124+
return t.Spec.Resources.Requests[corev1.ResourceStorage], nil
125+
}
126+
}
127+
return k8sresource.Quantity{}, errors.New("cannot find PersistentVolumeClaim 'persistence'")
128+
}
129+
130+
131+
// deleteSts deletes a sts without deleting pods and PVCs
132+
// using DeletePropagationPolicy set to 'Orphan'
133+
func (r *RabbitmqClusterReconciler) deleteSts(ctx context.Context, rmq *rabbitmqv1beta1.RabbitmqCluster) error {
134+
logger := ctrl.LoggerFrom(ctx)
135+
logger.Info("deleting statefulSet (pods won't be deleted)", "statefulSet", rmq.ChildResourceName("server"))
136+
deletePropagationPolicy := metav1.DeletePropagationOrphan
137+
deleteOptions := &client.DeleteOptions{PropagationPolicy: &deletePropagationPolicy}
138+
stsName := rmq.ChildResourceName("server")
139+
current, err := r.statefulSet(ctx, rmq)
140+
if err != nil {
141+
return err
142+
}
143+
if err := r.Delete(ctx, current, deleteOptions); err != nil {
144+
logger.Error(err, "failed to delete statefulSet", "statefulSet", stsName)
145+
return err
146+
}
147+
148+
if err := retryWithInterval(logger, "delete statefulSet", 10, 3*time.Second, func() bool {
149+
_, getErr := r.statefulSet(ctx, rmq)
150+
if k8serrors.IsNotFound(getErr) {
151+
return true
152+
}
153+
return false
154+
}); err != nil {
155+
logger.Error(err, "statefulSet not deleting after 50 seconds", "statefulSet", stsName)
156+
return err
157+
}
158+
logger.Info("statefulSet deleted", "statefulSet", stsName)
159+
return nil
160+
}
161+
162+
func retryWithInterval(logger logr.Logger, msg string, retry int, interval time.Duration, f func() bool) (err error) {
163+
for i := 0; i < retry; i++ {
164+
if ok := f(); ok {
165+
return
166+
}
167+
time.Sleep(interval)
168+
logger.Info("retrying again", "action", msg, "interval", interval, "attempt", i+1)
169+
}
170+
return fmt.Errorf("failed to %s after %d retries", msg, retry)
171+
}

controllers/utils.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,14 @@ package controllers
22

33
import (
44
"context"
5-
"sigs.k8s.io/controller-runtime/pkg/client"
6-
75
rabbitmqv1beta1 "github.com/rabbitmq/cluster-operator/api/v1beta1"
86
appsv1 "k8s.io/api/apps/v1"
97
corev1 "k8s.io/api/core/v1"
108
"k8s.io/apimachinery/pkg/api/errors"
119
"k8s.io/apimachinery/pkg/api/meta"
1210
"k8s.io/apimachinery/pkg/types"
1311
"k8s.io/client-go/util/retry"
12+
"sigs.k8s.io/controller-runtime/pkg/client"
1413
)
1514

1615
func (r *RabbitmqClusterReconciler) exec(namespace, podName, containerName string, command ...string) (string, string, error) {

system_tests/system_tests.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,13 @@ import (
1414
"crypto/tls"
1515
"crypto/x509"
1616
"encoding/json"
17+
"fmt"
1718
"io/ioutil"
19+
storagev1 "k8s.io/api/storage/v1"
20+
k8sresource "k8s.io/apimachinery/pkg/api/resource"
21+
"k8s.io/apimachinery/pkg/types"
22+
"strconv"
23+
"strings"
1824

1925
. "github.com/onsi/ginkgo"
2026
. "github.com/onsi/gomega"
@@ -235,6 +241,9 @@ CONSOLE_LOG=new`
235241

236242
BeforeEach(func() {
237243
cluster = newRabbitmqCluster(namespace, "persistence-rabbit")
244+
cluster.Spec.Persistence = rabbitmqv1beta1.RabbitmqClusterPersistenceSpec{
245+
StorageClassName: pointer.StringPtr("persistent-test"),
246+
}
238247
Expect(createRabbitmqCluster(ctx, rmqClusterClient, cluster)).To(Succeed())
239248

240249
waitForRabbitmqRunning(cluster)
@@ -278,6 +287,42 @@ CONSOLE_LOG=new`
278287
Expect(pvc.OwnerReferences).To(HaveLen(1))
279288
Expect(pvc.OwnerReferences[0].Name).To(Equal(cluster.Name))
280289
})
290+
291+
By("allowing volume expansion", func() {
292+
podUID := pod(ctx, clientSet, cluster, 0).UID
293+
output, err := kubectlExec(namespace, statefulSetPodName(cluster, 0), "df", "/var/lib/rabbitmq/mnesia")
294+
Expect(err).ToNot(HaveOccurred())
295+
previousDiskSize, err := strconv.Atoi(strings.Fields(strings.Split(string(output), "\n")[1])[1])
296+
297+
storageClass := &storagev1.StorageClass{}
298+
Expect(rmqClusterClient.Get(ctx, types.NamespacedName{Name: storageClassName, Namespace: namespace}, storageClass)).To(Succeed())
299+
Expect(*storageClass.AllowVolumeExpansion).To(BeTrue(), fmt.Sprintf(" 'AllowVolumeExpansion' set to false for storage class %s", storageClassName))
300+
301+
newCapacity, _ := k8sresource.ParseQuantity("12Gi")
302+
Expect(updateRabbitmqCluster(ctx, rmqClusterClient, cluster.Name, cluster.Namespace, func(cluster *rabbitmqv1beta1.RabbitmqCluster) {
303+
cluster.Spec.Persistence.Storage = &newCapacity
304+
})).To(Succeed())
305+
306+
// PVC storage capacity updated
307+
Eventually(func() k8sresource.Quantity {
308+
pvcName := cluster.PVCName(0)
309+
pvc, err := clientSet.CoreV1().PersistentVolumeClaims(namespace).Get(ctx, pvcName, metav1.GetOptions{})
310+
Expect(err).ToNot(HaveOccurred())
311+
return pvc.Spec.Resources.Requests["storage"]
312+
}, 120, 5).Should(Equal(newCapacity))
313+
314+
// storage capacity reflected in the pod
315+
Eventually(func() int {
316+
output, err = kubectlExec(namespace, statefulSetPodName(cluster, 0), "df", "/var/lib/rabbitmq/mnesia")
317+
Expect(err).ToNot(HaveOccurred())
318+
updatedDiskSize, err := strconv.Atoi(strings.Fields(strings.Split(string(output), "\n")[1])[1])
319+
Expect(err).ToNot(HaveOccurred())
320+
return updatedDiskSize
321+
}, 120, 5).Should(BeNumerically(">", previousDiskSize))
322+
323+
// pod was not recreated
324+
Expect(pod(ctx, clientSet, cluster, 0).UID).To(Equal(podUID))
325+
})
281326
})
282327
})
283328

system_tests/system_tests_suite_test.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ package system_tests
1111

1212
import (
1313
"context"
14+
"k8s.io/utils/pointer"
1415
"testing"
1516

1617
storagev1 "k8s.io/api/storage/v1"
@@ -33,10 +34,10 @@ func TestSystemTests(t *testing.T) {
3334
}
3435

3536
var (
36-
rmqClusterClient client.Client
37-
clientSet *kubernetes.Clientset
38-
namespace string
39-
specifiedStorageClassName = "persistent-test"
37+
rmqClusterClient client.Client
38+
clientSet *kubernetes.Clientset
39+
namespace string
40+
storageClassName = "persistent-test"
4041
)
4142

4243
var _ = BeforeSuite(func() {
@@ -49,17 +50,17 @@ var _ = BeforeSuite(func() {
4950

5051
rmqClusterClient, err = client.New(restConfig, client.Options{Scheme: scheme})
5152
Expect(err).NotTo(HaveOccurred())
52-
5353
clientSet, err = createClientSet()
5454
Expect(err).NotTo(HaveOccurred())
5555

5656
namespace = MustHaveEnv("NAMESPACE")
5757

5858
storageClass := &storagev1.StorageClass{
5959
ObjectMeta: metav1.ObjectMeta{
60-
Name: specifiedStorageClassName,
60+
Name: storageClassName,
6161
},
62-
Provisioner: "kubernetes.io/gce-pd",
62+
Provisioner: "kubernetes.io/gce-pd",
63+
AllowVolumeExpansion: pointer.BoolPtr(true),
6364
}
6465
err = rmqClusterClient.Create(context.TODO(), storageClass)
6566
if !apierrors.IsAlreadyExists(err) {

system_tests/utils.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -950,3 +950,14 @@ func publishAndConsumeSTOMPMsg(hostname, stompNodePort, username, password strin
950950
ExpectWithOffset(1, sub.Unsubscribe()).To(Succeed())
951951
ExpectWithOffset(1, conn.Disconnect()).To(Succeed())
952952
}
953+
954+
func pod(ctx context.Context, clientSet *kubernetes.Clientset, r *rabbitmqv1beta1.RabbitmqCluster, i int) *corev1.Pod {
955+
podName := statefulSetPodName(r, i)
956+
var pod *corev1.Pod
957+
EventuallyWithOffset(1, func() error {
958+
var err error
959+
pod, err = clientSet.CoreV1().Pods(r.Namespace).Get(ctx, podName, metav1.GetOptions{})
960+
return err
961+
}, 10).Should(Succeed())
962+
return pod
963+
}

0 commit comments

Comments
 (0)