Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PoC] ReadWriteMany support #45

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/pvc-block.yaml
Original file line number Diff line number Diff line change
@@ -18,7 +18,7 @@ metadata:
name: test-block-pvc
spec:
accessModes:
- ReadWriteOnce
- ReadWriteMany
volumeMode: Block
resources:
requests:
12 changes: 7 additions & 5 deletions pkg/driverd/controllerserver.go
Original file line number Diff line number Diff line change
@@ -152,7 +152,9 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
return nil, status.Error(codes.InvalidArgument, "missing access mode")
}
if *accessMode != csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER && *accessMode != csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY {
return nil, status.Errorf(codes.InvalidArgument, "unsupported access mode %s", *accessMode)
if *accessMode != csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER && *accessMode != csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY && *accessMode != csi.VolumeCapability_AccessMode_MULTI_NODE_SINGLE_WRITER && fsName != BlockAccessFsName {
return nil, status.Errorf(codes.InvalidArgument, "unsupported access mode %s", *accessMode)
}
}
if fsName == "" {
fsName = cs.defaultFs
@@ -259,7 +261,7 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
klog.V(3).Infof("Trying to acquire lock on %q to read source data", lockVol)
// We add a uuid because we don't want this lock to be reentrant.
lockId := fmt.Sprintf("CreateVolume(%s,%s)", vol, uuid.New().String())
err = cs.volumeLock.LockVolume(ctx, lockVol.VolumeRef, lockId)
err = cs.volumeLock.LockVolume(ctx, lockVol.VolumeRef, false, lockId)
if status.Code(err) == codes.PermissionDenied {
ownerId, ownerNode, err := cs.volumeLock.GetOwner(ctx, lockVol.VolumeRef)
if err != nil {
@@ -342,7 +344,7 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
cleanupVols[vol.VgLv()] = struct{}{}
// Now lock the new volume
lockId := fmt.Sprintf("CreateVolume(%s,%s)", vol, uuid.New().String())
if err := cs.volumeLock.LockVolume(ctx, *vol, lockId); err != nil {
if err := cs.volumeLock.LockVolume(ctx, *vol, false, lockId); err != nil {
return nil, err
}
defer cs.tryVolumeUnlock(ctx, *vol, lockId)
@@ -658,7 +660,7 @@ func (cs *controllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS
// Acquire exclusive lock on orig, or delegate to owner node if already locked
// We add a uuid because we don't want this lock to be reentrant.
lockId := fmt.Sprintf("CreateSnapshot(%s,%s)", snap, uuid.New().String())
err = cs.volumeLock.LockVolume(ctx, *orig, lockId)
err = cs.volumeLock.LockVolume(ctx, *orig, false, lockId)
if status.Code(err) == codes.PermissionDenied {
ownerId, ownerNode, err := cs.volumeLock.GetOwner(ctx, *orig)
if err != nil {
@@ -772,7 +774,7 @@ func (cs *controllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteS
// Acquire exclusive lock on orig, or delegate to owner node if already locked
// We add a uuid because we don't want this lock to be reentrant.
lockId := fmt.Sprintf("DeleteSnapshot(%s,%s)", snap, uuid.New().String())
err = cs.volumeLock.LockVolume(ctx, lockVol.VolumeRef, lockId)
err = cs.volumeLock.LockVolume(ctx, lockVol.VolumeRef, false, lockId)
if status.Code(err) == codes.PermissionDenied {
ownerId, ownerNode, err := cs.volumeLock.GetOwner(ctx, *orig)
if err != nil {
2 changes: 1 addition & 1 deletion pkg/driverd/diskrpcservice.go
Original file line number Diff line number Diff line change
@@ -50,7 +50,7 @@ type volumeLockerAdapter struct {

func (vl *volumeLockerAdapter) Lock() {
for delay := 1 * time.Second; ; {
err := vl.locker.LockVolume(vl.ctx, vl.vol, diskRpcOp)
err := vl.locker.LockVolume(vl.ctx, vl.vol, false, diskRpcOp)
if err == nil {
return
}
32 changes: 30 additions & 2 deletions pkg/driverd/nodeserver.go
Original file line number Diff line number Diff line change
@@ -206,8 +206,16 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
return nil, status.Errorf(codes.InvalidArgument, "invalid volume id: %v", err)
}

// Use shared mode
shared := false
if req.VolumeCapability.AccessMode.Mode == csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER ||
req.VolumeCapability.AccessMode.Mode == csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY ||
req.VolumeCapability.AccessMode.Mode == csi.VolumeCapability_AccessMode_MULTI_NODE_SINGLE_WRITER {
shared = true
}

// Lock the volume.
err = ns.volumeLock.LockVolume(ctx, *vol, defaultLockOp)
err = ns.volumeLock.LockVolume(ctx, *vol, shared, defaultLockOp)
if err != nil {
return nil, err
}
@@ -335,10 +343,30 @@ func (ns *nodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandV
if err != nil {
return nil, status.Errorf(codes.Internal, "volume %q has an invalid filesystem: %v", vol, err)
}

// Issue the resize if the logical volume is smaller than required bytes
requiredBytes := uint64(req.CapacityRange.RequiredBytes)
if lv.LvSize < requiredBytes {

// Shared volumes have no owner tag
if _, ownerTag := tags[ownerNodeTagKey]; !ownerTag {
defer func() {
// Re-lock the volume into shared mode.
_, _ = ns.lvmctrld.LvChange(ctx, &pb.LvChangeRequest{
Target: []string{vol.VgLv()},
Activate: pb.LvActivationMode_LV_ACTIVATION_MODE_ACTIVE_SHARED,
})
}()

// Re-lock the volume in exclusive mode.
_, err = ns.lvmctrld.LvChange(ctx, &pb.LvChangeRequest{
Target: []string{vol.VgLv()},
Activate: pb.LvActivationMode_LV_ACTIVATION_MODE_ACTIVE_EXCLUSIVE,
})
if err != nil {
return nil, status.Errorf(status.Code(err), "failed to lock volume %s: %v", vol, err)
}
}

_, err = ns.lvmctrld.LvResize(ctx, &pb.LvResizeRequest{VgName: vol.Vg(), LvName: vol.Lv(), Size: requiredBytes})
if status.Code(err) == codes.OutOfRange {
return nil, status.Errorf(codes.OutOfRange, "insufficient free space")
33 changes: 20 additions & 13 deletions pkg/driverd/volumelock.go
Original file line number Diff line number Diff line change
@@ -32,7 +32,7 @@ type VolumeLocker interface {
//
// Volume locks are reentrant per volume (but not per <volume, op> pairs),
// that is one can lock the same volume with multiple ops.
LockVolume(ctx context.Context, vol VolumeRef, op string) error
LockVolume(ctx context.Context, vol VolumeRef, shared bool, op string) error

// Unlock a volume for a given <vol, op> pair.
UnlockVolume(ctx context.Context, vol VolumeRef, op string) error
@@ -71,7 +71,7 @@ func NewVolumeLocker(lvmctrld pb.LvmCtrldClient, nodeName string) (VolumeLocker,
return vl, nil
}

func (vl *volumeLocker) LockVolume(ctx context.Context, vol VolumeRef, op string) error {
func (vl *volumeLocker) LockVolume(ctx context.Context, vol VolumeRef, shared bool, op string) error {
vl.mutex.Lock()
defer vl.mutex.Unlock()

@@ -86,27 +86,34 @@ func (vl *volumeLocker) LockVolume(ctx context.Context, vol VolumeRef, op string
return nil
}

mode := proto.LvActivationMode_LV_ACTIVATION_MODE_ACTIVE_EXCLUSIVE
if shared {
mode = proto.LvActivationMode_LV_ACTIVATION_MODE_ACTIVE_SHARED
}

// Lock the volume.
_, err := vl.lvmctrld.LvChange(ctx, &proto.LvChangeRequest{
Target: []string{vol.VgLv()},
Activate: proto.LvActivationMode_LV_ACTIVATION_MODE_ACTIVE_EXCLUSIVE,
Activate: mode,
})
if err != nil {
return status.Errorf(status.Code(err), "failed to lock volume %s: %v", vol, err)
}

// Update tags.
err = vl.setOwner(ctx, vol, &vl.nodeID, &vl.nodeName)
if err != nil {
// Try to unlock the volume.
_, err2 := vl.lvmctrld.LvChange(ctx, &proto.LvChangeRequest{
Target: []string{vol.VgLv()},
Activate: proto.LvActivationMode_LV_ACTIVATION_MODE_DEACTIVATE,
})
if err2 != nil {
klog.Errorf("Failed to unlock volume %s: %v (ignoring error)", vol, err2)
if !shared {
err = vl.setOwner(ctx, vol, &vl.nodeID, &vl.nodeName)
if err != nil {
// Try to unlock the volume.
_, err2 := vl.lvmctrld.LvChange(ctx, &proto.LvChangeRequest{
Target: []string{vol.VgLv()},
Activate: proto.LvActivationMode_LV_ACTIVATION_MODE_DEACTIVATE,
})
if err2 != nil {
klog.Errorf("Failed to unlock volume %s: %v (ignoring error)", vol, err2)
}
return status.Errorf(status.Code(err), "failed to update owner tags on volume %s: %v", vol, err)
}
return status.Errorf(status.Code(err), "failed to update owner tags on volume %s: %v", vol, err)
}

m[op] = struct{}{}