Skip to content

Commit

Permalink
Merge pull request #287 from divyenpatel/spbm-association-for-migrate…
Browse files Browse the repository at this point in the history
…d-volume

Set Storage Policy ID while registering migrated in-tree vSphere volume
  • Loading branch information
k8s-ci-robot committed Aug 4, 2020
2 parents 850cd8c + 7193f0d commit f79346c
Show file tree
Hide file tree
Showing 11 changed files with 139 additions and 104 deletions.
119 changes: 66 additions & 53 deletions pkg/apis/migration/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,19 @@ import (
k8s "sigs.k8s.io/vsphere-csi-driver/pkg/kubernetes"
)

// VolumeSpec contains VolumePath and StoragePolicyID
// using which Volume can be looked up using VolumeMigrationService
type VolumeSpec struct {
VolumePath string
StoragePolicyName string
}

// VolumeMigrationService exposes interfaces to support VCP to CSI migration.
// It will maintain internal state to map volume path to volume ID and reverse mapping.
type VolumeMigrationService interface {
// GetVolumeID returns VolumeID for given VolumePath
// GetVolumeID returns VolumeID for given VolumeSpec
// Returns an error if not able to retrieve VolumeID.
GetVolumeID(ctx context.Context, volumePath string) (string, error)
GetVolumeID(ctx context.Context, volumeSpec VolumeSpec) (string, error)

// GetVolumePath returns VolumePath for given VolumeID
// Returns an error if not able to retrieve VolumePath.
Expand Down Expand Up @@ -164,26 +171,26 @@ func GetVolumeMigrationService(ctx context.Context, volumeManager *cnsvolume.Man
return volumeMigrationInstance, nil
}

// GetVolumeID returns VolumeID for given VolumePath
// GetVolumeID returns VolumeID for given VolumeSpec
// Returns an error if not able to retrieve VolumeID.
func (volumeMigration *volumeMigration) GetVolumeID(ctx context.Context, volumePath string) (string, error) {
func (volumeMigration *volumeMigration) GetVolumeID(ctx context.Context, volumeSpec VolumeSpec) (string, error) {
log := logger.GetLogger(ctx)
info, found := volumeMigration.volumePathToVolumeID.Load(volumePath)
info, found := volumeMigration.volumePathToVolumeID.Load(volumeSpec.VolumePath)
if found {
log.Infof("VolumeID: %q found from the cache for VolumePath: %q", info.(string), volumePath)
log.Debugf("VolumeID: %q found from the cache for VolumePath: %q", info.(string), volumeSpec.VolumePath)
return info.(string), nil
}
log.Infof("Could not retrieve VolumeID from cache for Volume Path: %q. volume may not be registered. Registering Volume with CNS", volumePath)
volumeID, err := volumeMigration.registerVolume(ctx, volumePath)
log.Infof("Could not retrieve VolumeID from cache for Volume Path: %q. volume may not be registered. Registering Volume with CNS", volumeSpec.VolumePath)
volumeID, err := volumeMigration.registerVolume(ctx, volumeSpec)
if err != nil {
log.Errorf("failed to register volume for VolumePath: %q, with err: %v", volumePath, err)
log.Errorf("failed to register volume for volumeSpec: %v, with err: %v", volumeSpec, err)
return "", err
}
log.Infof("successfully registered volume: %q with CNS. VolumeID: %q", volumePath, volumeID)
log.Infof("Successfully registered volumeSpec: %v with CNS. VolumeID: %v", volumeSpec, volumeID)
cnsvSphereVolumeMigration := migrationv1alpha1.CnsVSphereVolumeMigration{
ObjectMeta: metav1.ObjectMeta{Name: volumeID},
Spec: migrationv1alpha1.CnsVSphereVolumeMigrationSpec{
VolumePath: volumePath,
VolumePath: volumeSpec.VolumePath,
VolumeID: volumeID,
},
}
Expand All @@ -193,7 +200,6 @@ func (volumeMigration *volumeMigration) GetVolumeID(ctx context.Context, volumeP
log.Errorf("failed to save cnsvSphereVolumeMigration CR:%v, err: %v", err)
return "", err
}
log.Infof("successfully saved cnsvSphereVolumeMigration CR: %v", cnsvSphereVolumeMigration)
return volumeID, nil
}

Expand Down Expand Up @@ -236,7 +242,7 @@ func (volumeMigration *volumeMigration) GetVolumePath(ctx context.Context, volum
log.Debugf("QueryVolumeInfo successfully returned volumeInfo %v for volumeIDList %v:", spew.Sdump(queryVolumeInfoResult), volumeIds)
cnsBlockVolumeInfo := interface{}(queryVolumeInfoResult.VolumeInfo).(*cnstypes.CnsBlockVolumeInfo)
fileBackingInfo := interface{}(cnsBlockVolumeInfo.VStorageObject.Config.Backing).(*vim25types.BaseConfigInfoDiskFileBackingInfo)
log.Infof("successfully retrieved volume path: %q for VolumeID: %q", fileBackingInfo.FilePath, volumeID)
log.Infof("Successfully retrieved volume path: %q for VolumeID: %q", fileBackingInfo.FilePath, volumeID)
cnsvSphereVolumeMigration := migrationv1alpha1.CnsVSphereVolumeMigration{
ObjectMeta: metav1.ObjectMeta{Name: volumeID},
Spec: migrationv1alpha1.CnsVSphereVolumeMigrationSpec{
Expand All @@ -250,7 +256,6 @@ func (volumeMigration *volumeMigration) GetVolumePath(ctx context.Context, volum
log.Errorf("failed to save cnsvSphereVolumeMigration CR:%v, err: %v", err)
return "", err
}
log.Infof("successfully saved cnsvSphereVolumeMigration CR: %v", cnsvSphereVolumeMigration)
return fileBackingInfo.FilePath, nil
}

Expand All @@ -268,7 +273,7 @@ func (volumeMigration *volumeMigration) saveVolumeInfo(ctx context.Context, cnsV
log.Info("CR already exists")
return nil
}
log.Infof("successfully created CR for cnsVSphereVolumeMigration: %+v", cnsVSphereVolumeMigration)
log.Infof("Successfully created CR for cnsVSphereVolumeMigration: %+v", cnsVSphereVolumeMigration)
return nil
}

Expand All @@ -292,23 +297,23 @@ func (volumeMigration *volumeMigration) DeleteVolumeInfo(ctx context.Context, vo
return nil
}

// registerVolume takes VolumePath and helps register Volume with CNS
// registerVolume takes VolumeSpec and helps register Volume with CNS
// Returns VolumeID for successful registration, otherwise return error
func (volumeMigration *volumeMigration) registerVolume(ctx context.Context, volumePath string) (string, error) {
func (volumeMigration *volumeMigration) registerVolume(ctx context.Context, volumeSpec VolumeSpec) (string, error) {
log := logger.GetLogger(ctx)
uuid, err := uuid.NewUUID()
if err != nil {
log.Errorf("failed to generate uuid")
return "", err
}
re := regexp.MustCompile(`\[([^\[\]]*)\]`)
if !re.MatchString(volumePath) {
msg := fmt.Sprintf("failed to extract datastore name from in-tree volume path: %q", volumePath)
if !re.MatchString(volumeSpec.VolumePath) {
msg := fmt.Sprintf("failed to extract datastore name from in-tree volume path: %q", volumeSpec.VolumePath)
log.Errorf(msg)
return "", errors.New(msg)
}
datastoreName := re.FindAllString(volumePath, -1)[0]
vmdkPath := strings.TrimSpace(strings.Trim(volumePath, datastoreName))
datastoreName := re.FindAllString(volumeSpec.VolumePath, -1)[0]
vmdkPath := strings.TrimSpace(strings.Trim(volumeSpec.VolumePath, datastoreName))
datastoreName = strings.Trim(strings.Trim(datastoreName, "["), "]")

var datacenters string
Expand All @@ -325,22 +330,16 @@ func (volumeMigration *volumeMigration) registerVolume(ctx context.Context, volu
host = key
break
}
// Get vCenter
vCenter, err := vsphere.GetVirtualCenterManager(ctx).GetVirtualCenter(ctx, host)
if err != nil {
log.Errorf("failed to get vCenter. err: %v", err)
return "", err
}
datacenterPaths := make([]string, 0)
if datacenters != "" {
datacenterPaths = strings.Split(datacenters, ",")
} else {
// Get vCenter
vCenter, err := vsphere.GetVirtualCenterManager(ctx).GetVirtualCenter(ctx, host)
if err != nil {
log.Errorf("failed to get vCenter. err: %v", err)
return "", err
}
// Connect to vCenter
err = vCenter.Connect(ctx)
if err != nil {
log.Errorf("failed to connect to vCenter. err: %v", err)
return "", err
}
// Get all datacenters from vCenter
dcs, err := vCenter.GetDatacenters(ctx)
if err != nil {
Expand All @@ -353,39 +352,53 @@ func (volumeMigration *volumeMigration) registerVolume(ctx context.Context, volu
log.Debugf("retrieved all datacenters %v from vCenter", datacenterPaths)
}
var volumeID *cnstypes.CnsVolumeId
var storagePolicyID string
if volumeSpec.StoragePolicyName != "" {
log.Debugf("Obtaining storage policy ID for storage policy name: %q", volumeSpec.StoragePolicyName)
storagePolicyID, err = vCenter.GetStoragePolicyIDByName(ctx, volumeSpec.StoragePolicyName)
if err != nil {
msg := fmt.Sprintf("Error occurred while getting stroage policy ID from storage policy name: %q, err: %+v", volumeSpec.StoragePolicyName, err)
log.Error(msg)
return "", errors.New(msg)
}
log.Debugf("Obtained storage policy ID: %q for storage policy name: %q", storagePolicyID, volumeSpec.StoragePolicyName)
}
var containerClusterArray []cnstypes.CnsContainerCluster
containerCluster := vsphere.GetContainerCluster(volumeMigration.cnsConfig.Global.ClusterID, user, cnstypes.CnsClusterFlavorVanilla)
containerClusterArray = append(containerClusterArray, containerCluster)
createSpec := &cnstypes.CnsVolumeCreateSpec{
Name: uuid.String(),
VolumeType: common.BlockVolumeType,
Metadata: cnstypes.CnsVolumeMetadata{
ContainerCluster: containerCluster,
ContainerClusterArray: containerClusterArray,
},
}
if storagePolicyID != "" {
profileSpec := &vim25types.VirtualMachineDefinedProfileSpec{
ProfileId: storagePolicyID,
}
createSpec.Profile = append(createSpec.Profile, profileSpec)
}
for _, datacenter := range datacenterPaths {
// Format:
// https://<vc_ip>/folder/<vm_vmdk_path>?dcPath=<datacenter-path>&dsName=<datastoreName>
backingDiskURLPath := "https://" + host + "/folder/" +
vmdkPath + "?dcPath=" + url.PathEscape(datacenter) + "&dsName=" + url.PathEscape(datastoreName)

log.Infof("Registering volume: %q using backingDiskURLPath :%q", volumePath, backingDiskURLPath)
var containerClusterArray []cnstypes.CnsContainerCluster
containerCluster := vsphere.GetContainerCluster(volumeMigration.cnsConfig.Global.ClusterID, user, cnstypes.CnsClusterFlavorVanilla)
containerClusterArray = append(containerClusterArray, containerCluster)
createSpec := &cnstypes.CnsVolumeCreateSpec{
Name: uuid.String(),
VolumeType: common.BlockVolumeType,
Metadata: cnstypes.CnsVolumeMetadata{
ContainerCluster: containerCluster,
ContainerClusterArray: containerClusterArray,
},
BackingObjectDetails: &cnstypes.CnsBlockBackingDetails{
BackingDiskUrlPath: backingDiskURLPath,
},
}
log.Debugf("vSphere CNS driver registering volume %q with create spec %+v", volumePath, spew.Sdump(createSpec))
createSpec.BackingObjectDetails = &cnstypes.CnsBlockBackingDetails{BackingDiskUrlPath: backingDiskURLPath}
log.Infof("Registering volume: %q using backingDiskURLPath :%q", volumeSpec.VolumePath, backingDiskURLPath)
log.Debugf("vSphere CNS driver registering volume %q with create spec %+v", volumeSpec.VolumePath, spew.Sdump(createSpec))
volumeID, err = (*volumeMigration.volumeManager).CreateVolume(ctx, createSpec)
if err != nil {
log.Warnf("failed to register volume %q with error %+v", volumePath, err)
log.Warnf("failed to register volume %q with createSpec: %v. error: %+v", volumeSpec.VolumePath, createSpec, err)
} else {
break
}
}
if volumeID != nil {
log.Infof("successfully registered volume %q as container volume with ID: %q", volumePath, volumeID.Id)
log.Infof("Successfully registered volume %q as container volume with ID: %q", volumeSpec.VolumePath, volumeID.Id)
} else {
msg := fmt.Sprintf("registration failed for volumePath: %q", volumePath)
msg := fmt.Sprintf("registration failed for volumeSpec: %v", volumeSpec)
log.Error(msg)
return "", errors.New(msg)
}
Expand Down
7 changes: 0 additions & 7 deletions pkg/common/cns-lib/vsphere/datacenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,6 @@ func asyncGetAllDatacenters(ctx context.Context, dcsChan chan<- *Datacenter, err
return
default:
}

if err := vc.Connect(ctx); err != nil {
log.Errorf("Failed connecting to VC %q with err: %v", vc.Config.Host, err)
errChan <- err
return
}

dcs, err := vc.GetDatacenters(ctx)
if err != nil {
log.Errorf("failed to fetch datacenters for vc %v with err: %v", vc.Config.Host, err)
Expand Down
5 changes: 5 additions & 0 deletions pkg/common/cns-lib/vsphere/pbm.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ func (vc *VirtualCenter) DisconnectPbm(ctx context.Context) error {
// GetStoragePolicyIDByName gets storage policy ID by name.
func (vc *VirtualCenter) GetStoragePolicyIDByName(ctx context.Context, storagePolicyName string) (string, error) {
log := logger.GetLogger(ctx)
err := vc.ConnectPbm(ctx)
if err != nil {
log.Errorf("Error occurred while connecting to PBM, err: %+v", err)
return "", err
}
storagePolicyID, err := vc.PbmClient.ProfileIDByName(ctx, storagePolicyName)
if err != nil {
log.Errorf("failed to get StoragePolicyID from StoragePolicyName %s with err: %v", storagePolicyName, err)
Expand Down
17 changes: 17 additions & 0 deletions pkg/common/cns-lib/vsphere/virtualcenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,11 @@ func (vc *VirtualCenter) getDatacenters(ctx context.Context, dcPaths []string) (
// is configured in VirtualCenterConfig during registration, only the listed
// Datacenters are returned.
func (vc *VirtualCenter) GetDatacenters(ctx context.Context) ([]*Datacenter, error) {
log := logger.GetLogger(ctx)
if err := vc.Connect(ctx); err != nil {
log.Errorf("failed to connect to vCenter. err: %v", err)
return nil, err
}
if len(vc.Config.DatacenterPaths) != 0 {
return vc.getDatacenters(ctx, vc.Config.DatacenterPaths)
}
Expand All @@ -330,6 +335,10 @@ func (vc *VirtualCenter) Disconnect(ctx context.Context) error {
// GetHostsByCluster return hosts inside the cluster using cluster moref.
func (vc *VirtualCenter) GetHostsByCluster(ctx context.Context, clusterMorefValue string) ([]*HostSystem, error) {
log := logger.GetLogger(ctx)
if err := vc.Connect(ctx); err != nil {
log.Errorf("failed to connect to vCenter. err: %v", err)
return nil, err
}
clusterMoref := types.ManagedObjectReference{
Type: "ClusterComputeResource",
Value: clusterMorefValue,
Expand All @@ -353,6 +362,10 @@ func (vc *VirtualCenter) GetHostsByCluster(ctx context.Context, clusterMorefValu
// GetVsanDatastores returns all the vsan datastore exists in the vc inventory
func (vc *VirtualCenter) GetVsanDatastores(ctx context.Context) ([]mo.Datastore, error) {
log := logger.GetLogger(ctx)
if err := vc.Connect(ctx); err != nil {
log.Errorf("failed to connect to vCenter. err: %v", err)
return nil, err
}
datacenters, err := vc.GetDatacenters(ctx)
if err != nil {
log.Errorf("failed to find datacenters from VC: %+v, Error: %+v", vc.Config.Host, err)
Expand Down Expand Up @@ -395,6 +408,10 @@ func (vc *VirtualCenter) GetVsanDatastores(ctx context.Context) ([]mo.Datastore,
// GetDatastoresByCluster return datastores inside the cluster using cluster moref.
func (vc *VirtualCenter) GetDatastoresByCluster(ctx context.Context, clusterMorefValue string) ([]*DatastoreInfo, error) {
log := logger.GetLogger(ctx)
if err := vc.Connect(ctx); err != nil {
log.Errorf("failed to connect to vCenter. err: %v", err)
return nil, err
}
clusterMoref := types.ManagedObjectReference{
Type: "ClusterComputeResource",
Value: clusterMorefValue,
Expand Down
5 changes: 3 additions & 2 deletions pkg/common/unittestcommon/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"sync"

"sigs.k8s.io/vsphere-csi-driver/pkg/apis/migration"
cnsvolume "sigs.k8s.io/vsphere-csi-driver/pkg/common/cns-lib/volume"
cnsconfig "sigs.k8s.io/vsphere-csi-driver/pkg/common/config"
)
Expand All @@ -41,9 +42,9 @@ type mockVolumeMigration struct {

// MockVolumeMigrationService is a mocked VolumeMigrationService needed for CSI migration feature
type MockVolumeMigrationService interface {
// GetVolumeID returns VolumeID for given VolumePath
// GetVolumeID returns VolumeID for given migration volumeSpec
// Returns an error if not able to retrieve VolumeID.
GetVolumeID(ctx context.Context, volumePath string) (string, error)
GetVolumeID(ctx context.Context, volumeSpec migration.VolumeSpec) (string, error)

// GetVolumePath returns VolumePath for given VolumeID
// Returns an error if not able to retrieve VolumePath.
Expand Down
5 changes: 3 additions & 2 deletions pkg/common/unittestcommon/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"strconv"
"sync"

"sigs.k8s.io/vsphere-csi-driver/pkg/apis/migration"
cnsvolume "sigs.k8s.io/vsphere-csi-driver/pkg/common/cns-lib/volume"
cnsconfig "sigs.k8s.io/vsphere-csi-driver/pkg/common/config"
"sigs.k8s.io/vsphere-csi-driver/pkg/csi/service/common"
Expand Down Expand Up @@ -80,8 +81,8 @@ func GetFakeVolumeMigrationService(ctx context.Context, volumeManager *cnsvolume
}

// GetVolumeID mocks the method with returns Volume Id for a given Volume Path
func (dummyInstance *mockVolumeMigration) GetVolumeID(ctx context.Context, volumePath string) (string, error) {
return mapVolumePathToID["dummy-vms-CR"]["VolumeID"], nil
func (dummyInstance *mockVolumeMigration) GetVolumeID(ctx context.Context, volumeSpec migration.VolumeSpec) (string, error) {
return mapVolumePathToID["dummy-vms-CR"][volumeSpec.VolumePath], nil
}

// GetVolumePath mocks the method with returns Volume Path for a given Volume ID
Expand Down
10 changes: 0 additions & 10 deletions pkg/csi/service/common/vsphereutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,6 @@ func CreateBlockVolumeUtil(ctx context.Context, clusterFlavor cnstypes.CnsCluste
}
if spec.ScParams.StoragePolicyName != "" {
// Get Storage Policy ID from Storage Policy Name
err = vc.ConnectPbm(ctx)
if err != nil {
log.Errorf("Error occurred while connecting to PBM, err: %+v", err)
return "", err
}
spec.StoragePolicyID, err = vc.GetStoragePolicyIDByName(ctx, spec.ScParams.StoragePolicyName)
if err != nil {
log.Errorf("Error occurred while getting Profile Id from Profile Name: %s, err: %+v", spec.ScParams.StoragePolicyName, err)
Expand Down Expand Up @@ -193,11 +188,6 @@ func CreateFileVolumeUtil(ctx context.Context, clusterFlavor cnstypes.CnsCluster
}
if spec.ScParams.StoragePolicyName != "" {
// Get Storage Policy ID from Storage Policy Name
err = vc.ConnectPbm(ctx)
if err != nil {
log.Errorf("Error occurred while connecting to PBM, err: %+v", err)
return "", err
}
spec.StoragePolicyID, err = vc.GetStoragePolicyIDByName(ctx, spec.ScParams.StoragePolicyName)
if err != nil {
log.Errorf("Error occurred while getting Profile Id from Profile Name: %q, err: %+v", spec.ScParams.StoragePolicyName, err)
Expand Down
Loading

0 comments on commit f79346c

Please sign in to comment.