Skip to content

Commit

Permalink
feat: zfs storage migration
Browse files Browse the repository at this point in the history
ZFS-replicated PVs can migrate smoothly between proxmox nodes that are part of the disk replica.
During migration to another zone, the pod may start slowly, as usual.

Signed-off-by: Serge Logvinov <[email protected]>
  • Loading branch information
sergelogvinov committed Feb 1, 2025
1 parent 0b66712 commit 37d7fb0
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 30 deletions.
95 changes: 67 additions & 28 deletions pkg/csi/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,12 @@ func (d *ControllerService) CreateVolume(_ context.Context, request *csi.CreateV

klog.V(5).InfoS("CreateVolume", "storageConfig", storageConfig)

topology := &csi.Topology{
Segments: map[string]string{
corev1.LabelTopologyRegion: region,
corev1.LabelTopologyZone: zone,
topology := []*csi.Topology{
{
Segments: map[string]string{
corev1.LabelTopologyRegion: region,
corev1.LabelTopologyZone: zone,
},
},
}

Expand All @@ -182,9 +184,11 @@ func (d *ControllerService) CreateVolume(_ context.Context, request *csi.CreateV
return nil, status.Error(codes.Internal, "error: shared storage type cifs, pbs are not supported")
}

topology = &csi.Topology{
Segments: map[string]string{
corev1.LabelTopologyRegion: region,
topology = []*csi.Topology{
{
Segments: map[string]string{
corev1.LabelTopologyRegion: region,
},
},
}
}
Expand Down Expand Up @@ -246,6 +250,8 @@ func (d *ControllerService) CreateVolume(_ context.Context, request *csi.CreateV
return nil, status.Error(codes.AlreadyExists, "volume already exists with same name and different capacity")
}

volumeID := vol.VolumeID()

if paramsSC.Replicate != nil && *paramsSC.Replicate {
_, err := attachVolume(cl, vmr, vol.Storage(), vol.Disk(), paramsSC.ToMap())
if err != nil {
Expand All @@ -271,25 +277,38 @@ func (d *ControllerService) CreateVolume(_ context.Context, request *csi.CreateV

return nil, status.Error(codes.Internal, err.Error())
}

volumeID = vol.VolumeSharedID()
topology = []*csi.Topology{
{
Segments: map[string]string{
corev1.LabelTopologyRegion: region,
corev1.LabelTopologyZone: zone,
},
},
{
Segments: map[string]string{
corev1.LabelTopologyRegion: region,
corev1.LabelTopologyZone: replicaZone,
},
},
}
}
}
}

volID := vol.VolumeID()
if storageConfig["shared"] != nil && int(storageConfig["shared"].(float64)) == 1 { //nolint:errcheck
volID = vol.VolumeSharedID()
volumeID = vol.VolumeSharedID()
}

klog.V(3).InfoS("CreateVolume: volume created", "cluster", vol.Cluster(), "volumeID", volID, "size", volSizeBytes)
klog.V(3).InfoS("CreateVolume: volume created", "cluster", vol.Cluster(), "volumeID", volumeID, "size", volSizeBytes)

volume := csi.Volume{
VolumeId: volID,
VolumeContext: paramsVAC.MergeMap(params),
ContentSource: request.GetVolumeContentSource(),
CapacityBytes: volSizeBytes,
AccessibleTopology: []*csi.Topology{
topology,
},
VolumeId: volumeID,
VolumeContext: paramsVAC.MergeMap(params),
ContentSource: request.GetVolumeContentSource(),
CapacityBytes: volSizeBytes,
AccessibleTopology: topology,
}

return &csi.CreateVolumeResponse{Volume: &volume}, nil
Expand Down Expand Up @@ -352,6 +371,11 @@ func (d *ControllerService) DeleteVolume(_ context.Context, request *csi.DeleteV
return nil, status.Error(codes.Internal, fmt.Sprintf("failed to delete volume: %s", vol.Disk()))
}
}

mc := metrics.NewMetricContext("deleteDisk")
if err = proxmox.DeleteDisk(cl, vol); mc.ObserveRequest(err) != nil {
klog.ErrorS(err, "DeleteVolume: failed to delete disk", "cluster", vol.Cluster(), "volumeName", vol.Disk())
}
}
}

Expand Down Expand Up @@ -422,15 +446,6 @@ func (d *ControllerService) ControllerPublishVolume(ctx context.Context, request
return nil, status.Error(codes.Internal, err.Error())
}

vmr, err := d.getVMRefbyNodeID(ctx, cl, nodeID)
if err != nil {
return nil, err
}

if vol.Zone() == "" {
vol = volume.NewVolume(vol.Region(), vmr.Node(), vol.Storage(), vol.Disk())
}

params, err := ExtractAndDefaultParameters(volCtx)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
Expand All @@ -447,7 +462,12 @@ func (d *ControllerService) ControllerPublishVolume(ctx context.Context, request
params.ReadOnly = ptr.Ptr(true)
}

exist, err := isPvcExists(cl, vol)
vmr, err := d.getVMRefbyNodeID(ctx, cl, nodeID)
if err != nil {
return nil, err
}

exist, err := isPvcExists(cl, volume.NewVolume(vol.Region(), vmr.Node(), vol.Storage(), vol.Disk()))
if err != nil {
klog.ErrorS(err, "ControllerPublishVolume: failed to verify the existence of the PVC", "cluster", vol.Cluster(), "volumeID", vol.VolumeID())

Expand All @@ -461,6 +481,26 @@ func (d *ControllerService) ControllerPublishVolume(ctx context.Context, request
d.volumeLocks.Lock()
defer d.volumeLocks.Unlock()

if params.Replicate != nil && *params.Replicate {
vmrVol, err := getVMRefByVolume(cl, vol)
if err != nil {
klog.ErrorS(err, "ControllerPublishVolume: failed to get vm ref by volume", "cluster", vol.Cluster(), "volumeName", vol.Disk())

return nil, status.Error(codes.Internal, err.Error())
}

if vmr.Node() != vmrVol.Node() {
klog.V(4).InfoS("ControllerPublishVolume: replicate volume", "cluster", vol.Cluster(), "volumeID", vol.VolumeID(), "src", vmrVol.Node(), "dst", vmr.Node())

_, err := cl.MigrateNode(vmrVol, vmr.Node(), false)
if err != nil {
klog.ErrorS(err, "ControllerPublishVolume: failed to migrate vm", "cluster", vol.Cluster(), "volumeID", vol.VolumeID(), "vmID", vmr.VmId())

return nil, status.Error(codes.Internal, err.Error())
}
}
}

mc := metrics.NewMetricContext("attachVolume")

pvInfo, err := attachVolume(cl, vmr, vol.Storage(), vol.Disk(), params.ToMap())
Expand Down Expand Up @@ -507,7 +547,6 @@ func (d *ControllerService) ControllerUnpublishVolume(ctx context.Context, reque
}

mc := metrics.NewMetricContext("detachVolume")

if err := detachVolume(cl, vmr, vol.Disk()); mc.ObserveRequest(err) != nil {
klog.ErrorS(err, "ControllerUnpublishVolume: failed to detach volume", "cluster", vol.Cluster(), "volumeID", vol.VolumeID(), "vmID", vmr.VmId())

Expand Down
11 changes: 9 additions & 2 deletions pkg/csi/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,16 +84,23 @@ func getNodeWithStorage(cl *pxapi.Client, storageName string) (string, error) {
}

func getVMRefByVolume(cl *pxapi.Client, vol *volume.Volume) (vmr *pxapi.VmRef, err error) {
vmID, err := strconv.Atoi(vol.VMID())
id, err := strconv.Atoi(vol.VMID())
if err != nil {
return nil, fmt.Errorf("failed to parse volume vm id: %v", err)
}

vmr = pxapi.NewVmRef(vmID)
vmr = pxapi.NewVmRef(id)
vmr.SetVmType("qemu")

node := vol.Node()
if node == "" {
if id != vmID {
_, err = cl.GetVmInfo(vmr)
if err == nil {
return vmr, nil
}
}

node, err = getNodeWithStorage(cl, vol.Storage())
if err != nil {
return nil, err
Expand Down
56 changes: 56 additions & 0 deletions pkg/proxmox/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package proxmox
import (
"encoding/json"
"fmt"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -107,3 +108,58 @@ func MoveQemuDisk(cluster *pxapi.Client, vol *volume.Volume, node string, taskTi

return nil
}

// DeleteDisk delete the volume from all nodes.
func DeleteDisk(cluster *pxapi.Client, vol *volume.Volume) error {
data, err := cluster.GetNodeList()
if err != nil {
return fmt.Errorf("failed to get node list: %v", err)
}

if data["data"] == nil {
return fmt.Errorf("failed to parce node list: %v", err)
}

id, err := strconv.Atoi(vol.VMID())
if err != nil {
return fmt.Errorf("failed to parse volume vm id: %v", err)
}

for _, item := range data["data"].([]interface{}) { //nolint:errcheck
node, ok := item.(map[string]interface{})
if !ok {
continue
}

vmr := pxapi.NewVmRef(id)
vmr.SetNode(node["node"].(string)) //nolint:errcheck
vmr.SetVmType("qemu")

content, err := cluster.GetStorageContent(vmr, vol.Storage())
if err != nil {
return fmt.Errorf("failed to get storage content: %v", err)
}

images, ok := content["data"].([]interface{})
if !ok {
return fmt.Errorf("failed to cast images to map: %v", err)
}

volid := fmt.Sprintf("%s:%s", vol.Storage(), vol.Disk())

for i := range images {
image, ok := images[i].(map[string]interface{})
if !ok {
return fmt.Errorf("failed to cast image to map: %v", err)
}

if image["volid"].(string) == volid && image["size"] != nil { //nolint:errcheck
if _, err := cluster.DeleteVolume(vmr, vol.Storage(), vol.Disk()); err != nil {
return fmt.Errorf("failed to delete volume: %s", vol.Disk())
}
}
}
}

return nil
}

0 comments on commit 37d7fb0

Please sign in to comment.