Skip to content

Commit

Permalink
controllers: remove reference to allowremotestorageconsumer
Browse files Browse the repository at this point in the history
This commit does the following:
1. remove the allowremotestorageconsumer ref from everywhere
2. remove the ocs.openshift.io/deployment-mode annotation ref,
  from cephblockpool reconcile since mirroring will be handled
  by mirroring controller
3. update the tests

Signed-off-by: Rewant Soni <[email protected]>
  • Loading branch information
rewantsoni committed Feb 6, 2025
1 parent e098999 commit 2f49263
Show file tree
Hide file tree
Showing 18 changed files with 47 additions and 304 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"reflect"
"slices"
"strconv"
"strings"

Expand Down Expand Up @@ -502,10 +501,6 @@ func (r *OCSInitializationReconciler) getCsiTolerations(csiTolerationKey string)
// When any value in the configmap is updated, the rook-ceph-operator pod is restarted to pick up the new values.
func (r *OCSInitializationReconciler) ensureOcsOperatorConfigExists(initialData *ocsv1.OCSInitialization) error {

allowConsumers := slices.ContainsFunc(r.clusters.GetInternalStorageClusters(), func(sc ocsv1.StorageCluster) bool {
return sc.Spec.AllowRemoteStorageConsumers
})

enableCephfsVal, err := r.getEnableCephfsKeyValue()
if err != nil {
r.Log.Error(err, "Failed to get enableCephfsKeyValue")
Expand All @@ -519,7 +514,7 @@ func (r *OCSInitializationReconciler) ensureOcsOperatorConfigExists(initialData
util.TopologyDomainLabelsKey: r.getTopologyDomainLabelsKeyValue(),
util.EnableNFSKey: r.getEnableNFSKeyValue(),
util.EnableCephfsKey: enableCephfsVal,
util.DisableCSIDriverKey: strconv.FormatBool(allowConsumers),
util.DisableCSIDriverKey: strconv.FormatBool(true),
}

ocsOperatorConfig := &corev1.ConfigMap{
Expand Down
47 changes: 0 additions & 47 deletions controllers/storagecluster/cephblockpools.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,6 @@ import (

type ocsCephBlockPools struct{}

// ensures that peer cluster secret exists and adds it to CephBlockPool
func (o *ocsCephBlockPools) addPeerSecretsToCephBlockPool(r *StorageClusterReconciler, storageCluster *ocsv1.StorageCluster, poolName, poolNamespace string) *cephv1.MirroringPeerSpec {
mirroringPeerSpec := &cephv1.MirroringPeerSpec{}
secretNames := []string{}

if len(storageCluster.Spec.Mirroring.PeerSecretNames) == 0 {
err := fmt.Errorf("mirroring is enabled but peerSecretNames is not provided")
r.Log.Error(err, "Unable to add cluster peer token to CephBlockPool.", "CephBlockPool", klog.KRef(poolNamespace, poolName))
return mirroringPeerSpec
}
for _, secretName := range storageCluster.Spec.Mirroring.PeerSecretNames {
_, err := r.retrieveSecret(secretName, storageCluster)
if err != nil {
r.Log.Error(err, "Peer cluster token could not be retrieved using secretname.", "CephBlockPool", klog.KRef(poolNamespace, poolName))
return mirroringPeerSpec
}
secretNames = append(secretNames, secretName)
}

mirroringPeerSpec.SecretNames = secretNames
return mirroringPeerSpec
}

func (o *ocsCephBlockPools) deleteCephBlockPool(r *StorageClusterReconciler, cephBlockPool *cephv1.CephBlockPool) (reconcile.Result, error) {
// if deletion timestamp is set, wait till block pool is deleted
if cephBlockPool.DeletionTimestamp != nil {
Expand Down Expand Up @@ -93,18 +70,6 @@ func (o *ocsCephBlockPools) reconcileCephBlockPool(r *StorageClusterReconciler,
cephBlockPool.Spec.PoolSpec.FailureDomain = getFailureDomain(storageCluster)
cephBlockPool.Spec.PoolSpec.Replicated = generateCephReplicatedSpec(storageCluster, "data")
cephBlockPool.Spec.PoolSpec.EnableRBDStats = true

// Since provider mode handles mirroring, we only need to handle for internal mode
if storageCluster.Annotations["ocs.openshift.io/deployment-mode"] != "provider" {
if storageCluster.Spec.Mirroring != nil && storageCluster.Spec.Mirroring.Enabled {
cephBlockPool.Spec.PoolSpec.Mirroring.Enabled = true
cephBlockPool.Spec.PoolSpec.Mirroring.Mode = "image"
cephBlockPool.Spec.PoolSpec.Mirroring.Peers = o.addPeerSecretsToCephBlockPool(r, storageCluster, cephBlockPool.Name, cephBlockPool.Namespace)
} else {
// If mirroring is not enabled or is nil, disable it. This is to ensure that the pool mirroring does not remain enabled during further reconciliations
cephBlockPool.Spec.PoolSpec.Mirroring = cephv1.MirroringSpec{Enabled: false}
}
}
return controllerutil.SetControllerReference(storageCluster, cephBlockPool, r.Scheme)
})
if err != nil {
Expand Down Expand Up @@ -258,18 +223,6 @@ func (o *ocsCephBlockPools) reconcileNonResilientCephBlockPool(r *StorageCluster
RequireSafeReplicaSize: false,
}
cephBlockPool.Spec.PoolSpec.EnableRBDStats = true

// Since provider mode handles mirroring, we only need to handle for internal mode
if storageCluster.Annotations["ocs.openshift.io/deployment-mode"] != "provider" {
if storageCluster.Spec.Mirroring != nil && storageCluster.Spec.Mirroring.Enabled {
cephBlockPool.Spec.PoolSpec.Mirroring.Enabled = true
cephBlockPool.Spec.PoolSpec.Mirroring.Mode = "image"
cephBlockPool.Spec.PoolSpec.Mirroring.Peers = o.addPeerSecretsToCephBlockPool(r, storageCluster, cephBlockPool.Name, cephBlockPool.Namespace)
} else {
// If mirroring is not enabled or is nil, disable it. This is to ensure that the pool mirroring does not remain enabled during further reconciliations
cephBlockPool.Spec.PoolSpec.Mirroring = cephv1.MirroringSpec{Enabled: false}
}
}
return controllerutil.SetControllerReference(storageCluster, cephBlockPool, r.Scheme)
})
if err != nil {
Expand Down
61 changes: 0 additions & 61 deletions controllers/storagecluster/cephblockpools_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

Expand Down Expand Up @@ -44,66 +43,6 @@ func TestCephBlockPools(t *testing.T) {
}
}

func TestInjectingPeerTokenToCephBlockPool(t *testing.T) {
//cases for testing
var cases = []struct {
label string
createRuntimeObjects bool
spec *api.StorageClusterSpec
}{
{
label: "test-injecting-peer-token-to-cephblockpool",
createRuntimeObjects: false,
spec: &api.StorageClusterSpec{
Mirroring: &api.MirroringSpec{
Enabled: true,
PeerSecretNames: []string{testPeerSecretName},
},
},
},
{
label: "test-injecting-empty-peer-token-to-cephblockpool",
createRuntimeObjects: false,
spec: &api.StorageClusterSpec{
Mirroring: &api.MirroringSpec{
Enabled: true,
PeerSecretNames: []string{},
},
},
},
{
label: "test-injecting-invalid-peer-token-cephblockpool",
createRuntimeObjects: false,
spec: &api.StorageClusterSpec{
Mirroring: &api.MirroringSpec{
Enabled: true,
PeerSecretNames: []string{"wrong-secret-name"},
},
},
},
}

obj := &ocsCephBlockPools{}

for _, c := range cases {
cr := getInitData(c.spec)
request := reconcile.Request{
NamespacedName: types.NamespacedName{
Name: "ocsinit",
Namespace: "",
},
}
reconciler := createReconcilerFromCustomResources(t, cr)
_, err := obj.ensureCreated(&reconciler, cr)
assert.NoError(t, err)
if c.label == "test-injecting-peer-token-to-cephblockpool" {
assertCephBlockPools(t, reconciler, cr, request, true, true)
} else {
assertCephBlockPools(t, reconciler, cr, request, true, false)
}
}
}

func getInitData(customSpec *api.StorageClusterSpec) *api.StorageCluster {
cr := createDefaultStorageCluster()
if customSpec != nil {
Expand Down
94 changes: 0 additions & 94 deletions controllers/storagecluster/cephfilesystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,12 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

type ocsCephFilesystems struct{}

const defaultSubvolumeGroupName = "csi"

// newCephFilesystemInstances returns the cephFilesystem instances that should be created
// on first run.
func (r *StorageClusterReconciler) newCephFilesystemInstances(initStorageCluster *ocsv1.StorageCluster) ([]*cephv1.CephFilesystem, error) {
Expand Down Expand Up @@ -136,92 +133,11 @@ func (obj *ocsCephFilesystems) ensureCreated(r *StorageClusterReconciler, instan
return reconcile.Result{}, err
}
}
// create default csi subvolumegroup for the filesystem
// skip for the ocs provider mode
if !instance.Spec.AllowRemoteStorageConsumers {
err = r.createDefaultSubvolumeGroup(cephFilesystem.Name, cephFilesystem.Namespace, cephFilesystem.ObjectMeta.OwnerReferences)
if err != nil {
return reconcile.Result{}, err
}
}
}

return reconcile.Result{}, nil
}

func (r *StorageClusterReconciler) createDefaultSubvolumeGroup(filesystemName, filesystemNamespace string, ownerReferences []metav1.OwnerReference) error {

existingsvg := &cephv1.CephFilesystemSubVolumeGroup{}
svgName := generateNameForCephSubvolumeGroup(filesystemName)
err := r.Client.Get(r.ctx, types.NamespacedName{Name: svgName, Namespace: filesystemNamespace}, existingsvg)
if err == nil {
if existingsvg.DeletionTimestamp != nil {
r.Log.Info("Unable to restore subvolumegroup because it is marked for deletion.", "subvolumegroup", klog.KRef(filesystemNamespace, existingsvg.Name))
return fmt.Errorf("failed to restore subvolumegroup %s because it is marked for deletion", existingsvg.Name)
}
}

cephFilesystemSubVolumeGroup := &cephv1.CephFilesystemSubVolumeGroup{
ObjectMeta: metav1.ObjectMeta{
Name: svgName,
Namespace: filesystemNamespace,
OwnerReferences: ownerReferences,
},
}

// Default value of "distributed" option for pinning in the CephFilesystemSubVolumeGroup CR
defaultPinningValue := 1
mutateFn := func() error {
cephFilesystemSubVolumeGroup.Spec = cephv1.CephFilesystemSubVolumeGroupSpec{
Name: defaultSubvolumeGroupName,
FilesystemName: filesystemName,
Pinning: cephv1.CephFilesystemSubVolumeGroupSpecPinning{
Distributed: &defaultPinningValue,
},
}
return nil
}
_, err = ctrl.CreateOrUpdate(r.ctx, r.Client, cephFilesystemSubVolumeGroup, mutateFn)
if err != nil {
r.Log.Error(err, "Could not create/update default csi cephFilesystemSubVolumeGroup.", "cephFilesystemSubVolumeGroup", klog.KRef(cephFilesystemSubVolumeGroup.Namespace, cephFilesystemSubVolumeGroup.Name))
return err
}
return nil
}

func (r *StorageClusterReconciler) deleteDefaultSubvolumeGroup(filesystemName, filesystemNamespace string) error {
existingsvg := &cephv1.CephFilesystemSubVolumeGroup{}
svgName := generateNameForCephSubvolumeGroup(filesystemName)
err := r.Client.Get(r.ctx, types.NamespacedName{Name: svgName, Namespace: filesystemNamespace}, existingsvg)
if err != nil {
if errors.IsNotFound(err) {
r.Log.Info("Uninstall: csi subvolumegroup not found.", "Subvolumegroup", klog.KRef(filesystemNamespace, svgName))
return nil
}
r.Log.Error(err, "Uninstall: Unable to retrieve subvolumegroup.", "subvolumegroup", klog.KRef(filesystemNamespace, svgName))
return fmt.Errorf("uninstall: Unable to retrieve csi subvolumegroup : %v", err)
}

if existingsvg.GetDeletionTimestamp().IsZero() {
r.Log.Info("Uninstall: Deleting subvolumegroup.", "subvolumegroup", klog.KRef(filesystemNamespace, existingsvg.Name))
err = r.Client.Delete(r.ctx, existingsvg)
if err != nil {
r.Log.Error(err, "Uninstall: Failed to delete subvolumegroup.", "subvolumegroup", klog.KRef(filesystemNamespace, existingsvg.Name))
return fmt.Errorf("uninstall: Failed to delete subvolumegroup %v: %v", existingsvg.Name, err)
}
}

err = r.Client.Get(r.ctx, types.NamespacedName{Name: svgName, Namespace: filesystemNamespace}, existingsvg)
if err != nil {
if errors.IsNotFound(err) {
r.Log.Info("Uninstall: subvolumegroup is deleted.", "subvolumegroup", klog.KRef(filesystemNamespace, existingsvg.Name))
return nil
}
}
r.Log.Error(err, "Uninstall: Waiting for subvolumegroup to be deleted.", "subvolumegroup", klog.KRef(filesystemNamespace, existingsvg.Name))
return fmt.Errorf("uninstall: Waiting for subvolumegroup %v to be deleted", existingsvg.Name)
}

// ensureDeleted deletes the CephFilesystems owned by the StorageCluster
func (obj *ocsCephFilesystems) ensureDeleted(r *StorageClusterReconciler, sc *ocsv1.StorageCluster) (reconcile.Result, error) {
foundCephFilesystem := &cephv1.CephFilesystem{}
Expand All @@ -241,16 +157,6 @@ func (obj *ocsCephFilesystems) ensureDeleted(r *StorageClusterReconciler, sc *oc
return reconcile.Result{}, fmt.Errorf("uninstall: Unable to retrieve CephFileSystem %v: %v", cephFilesystem.Name, err)
}

// delete csi subvolume group for particular filesystem
// skip for the ocs provider mode
if !sc.Spec.AllowRemoteStorageConsumers {
cephSVGName := generateNameForCephSubvolumeGroup(cephFilesystem.Name)
err = r.deleteDefaultSubvolumeGroup(cephFilesystem.Name, cephFilesystem.Namespace)
if err != nil {
r.Log.Error(err, "Uninstall: unable to delete subvolumegroup", "subvolumegroup", klog.KRef(cephFilesystem.Namespace, cephSVGName))
return reconcile.Result{}, err
}
}
if cephFilesystem.GetDeletionTimestamp().IsZero() {
r.Log.Info("Uninstall: Deleting cephFilesystem.", "CephFileSystem", klog.KRef(foundCephFilesystem.Namespace, foundCephFilesystem.Name))
err = r.Client.Delete(r.ctx, foundCephFilesystem)
Expand Down
31 changes: 0 additions & 31 deletions controllers/storagecluster/cephfilesystem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

Expand Down Expand Up @@ -57,36 +56,6 @@ func assertCephFileSystem(t *testing.T, reconciler StorageClusterReconciler, cr
assert.Equal(t, expectedAf[0].Spec, actualFs.Spec)
}

func TestCreateDefaultSubvolumeGroup(t *testing.T) {
var objects []client.Object
t, reconciler, cr, _ := initStorageClusterResourceCreateUpdateTest(t, objects, nil)
filesystem, err := reconciler.newCephFilesystemInstances(cr)
assert.NoError(t, err)

err = reconciler.createDefaultSubvolumeGroup(filesystem[0].Name, filesystem[0].Namespace, filesystem[0].OwnerReferences)
assert.NoError(t, err)

svg := &cephv1.CephFilesystemSubVolumeGroup{}
expectedsvgName := generateNameForCephSubvolumeGroup(filesystem[0].Name)
err = reconciler.Client.Get(context.TODO(), types.NamespacedName{Name: expectedsvgName, Namespace: filesystem[0].Namespace}, svg)
assert.NoError(t, err) // no error
}

func TestDeleteDefaultSubvolumeGroup(t *testing.T) {
var objects []client.Object
t, reconciler, cr, _ := initStorageClusterResourceCreateUpdateTest(t, objects, nil)
filesystem, err := reconciler.newCephFilesystemInstances(cr)
assert.NoError(t, err)

err = reconciler.deleteDefaultSubvolumeGroup(filesystem[0].Name, filesystem[0].Namespace)
assert.NoError(t, err)

svg := &cephv1.CephFilesystemSubVolumeGroup{}
expectedsvgName := generateNameForCephSubvolumeGroup(filesystem[0].Name)
err = reconciler.Client.Get(context.TODO(), types.NamespacedName{Name: expectedsvgName, Namespace: filesystem[0].Namespace}, svg)
assert.Error(t, err) // error as csi svg is deleted
}

func TestGetActiveMetadataServers(t *testing.T) {
var cases = []struct {
label string
Expand Down
5 changes: 0 additions & 5 deletions controllers/storagecluster/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,3 @@ func generateCephReplicatedSpec(initData *ocsv1.StorageCluster, poolType string)
func generateStorageQuotaName(storageClassName, quotaName string) string {
return fmt.Sprintf("%s-%s", storageClassName, quotaName)
}

// generateNameForCephSubvolumeGroup function generates a name for CephFilesystemSubVolumeGroup
func generateNameForCephSubvolumeGroup(filesystemName string) string {
return fmt.Sprintf("%s-%s", filesystemName, defaultSubvolumeGroupName)
}
5 changes: 5 additions & 0 deletions controllers/storagecluster/initialization_reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,11 +299,16 @@ func createFakeInitializationStorageClusterReconciler(t *testing.T, obj ...runti
},
}

clientConfigMap := &v1.ConfigMap{}
clientConfigMap.Name = ocsClientConfigMapName
clientConfigMap.Namespace = sc.Namespace

obj = append(
obj,
mockNodeList.DeepCopy(),
cbp,
cfs,
clientConfigMap,
cnfs,
cnfsbp,
cnfssvc,
Expand Down
4 changes: 1 addition & 3 deletions controllers/storagecluster/noobaa_system_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,7 @@ func (r *StorageClusterReconciler) setNooBaaDesiredState(nb *nbv1.NooBaa, sc *oc
"app": "noobaa",
}

if sc.Spec.AllowRemoteStorageConsumers {
util.AddAnnotation(nb, "MulticloudObjectGatewayProviderMode", "true")
}
util.AddAnnotation(nb, "MulticloudObjectGatewayProviderMode", "true")

if !r.IsNoobaaStandalone {
storageClassName := generateNameForCephBlockPoolSC(sc)
Expand Down
9 changes: 0 additions & 9 deletions controllers/storagecluster/provider_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,6 @@ func (o *ocsProviderServer) ensureCreated(r *StorageClusterReconciler, instance
}

func (o *ocsProviderServer) ensureDeleted(r *StorageClusterReconciler, instance *ocsv1.StorageCluster) (reconcile.Result, error) {

// We do not check instance.Spec.AllowRemoteStorageConsumers because provider can disable this functionality
// and we need to delete the resources even the flag is not enabled (uninstall case).

// This func is directly called by the ensureCreated if the flag is disabled and deletes the resource
// Which means we do not need to call ensureDeleted while reconciling unless we are uninstalling

// NOTE: Do not add the check

if err := r.verifyNoStorageConsumerExist(instance); err != nil {
return reconcile.Result{}, err
}
Expand Down
Loading

0 comments on commit 2f49263

Please sign in to comment.