diff --git a/pkg/hostpath/controllerserver.go b/pkg/hostpath/controllerserver.go index 639f4f17f..a70ef1f32 100644 --- a/pkg/hostpath/controllerserver.go +++ b/pkg/hostpath/controllerserver.go @@ -203,9 +203,9 @@ func (hp *hostPath) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeReque return &csi.DeleteVolumeResponse{}, nil } - if vol.IsAttached || vol.IsPublished || vol.IsStaged { + if vol.Attached || !vol.Published.Empty() || !vol.Staged.Empty() { return nil, status.Errorf(codes.Internal, "Volume '%s' is still used (attached: %v, staged: %v, published: %v) by '%s' node", - vol.VolID, vol.IsAttached, vol.IsStaged, vol.IsPublished, vol.NodeID) + vol.VolID, vol.Attached, vol.Staged, vol.Published, vol.NodeID) } if err := hp.deleteVolume(volId); err != nil { @@ -287,7 +287,7 @@ func (hp *hostPath) ControllerPublishVolume(ctx context.Context, req *csi.Contro } // Check to see if the volume is already published. - if vol.IsAttached { + if vol.Attached { // Check if readonly flag is compatible with the publish request. if req.GetReadonly() != vol.ReadOnlyAttach { return nil, status.Error(codes.AlreadyExists, "Volume published but has incompatible readonly flag") @@ -303,7 +303,7 @@ func (hp *hostPath) ControllerPublishVolume(ctx context.Context, req *csi.Contro return nil, status.Errorf(codes.ResourceExhausted, "Cannot attach any more volumes to this node ('%s')", hp.config.NodeID) } - vol.IsAttached = true + vol.Attached = true vol.ReadOnlyAttach = req.GetReadonly() if err := hp.state.UpdateVolume(vol); err != nil { return nil, err @@ -339,12 +339,12 @@ func (hp *hostPath) ControllerUnpublishVolume(ctx context.Context, req *csi.Cont } // Check to see if the volume is staged/published on a node - if vol.IsPublished || vol.IsStaged { + if !vol.Published.Empty() || !vol.Staged.Empty() { return nil, status.Errorf(codes.Internal, "Volume '%s' is still used (staged: %v, published: %v) by '%s' node", - vol.VolID, vol.IsStaged, vol.IsPublished, vol.NodeID) + vol.VolID, vol.Staged, vol.Published, vol.NodeID) } - vol.IsAttached = false + vol.Attached = false if err := hp.state.UpdateVolume(vol); err != nil { return nil, status.Errorf(codes.Internal, "could not update volume %s: %v", vol.VolID, err) } diff --git a/pkg/hostpath/hostpath.go b/pkg/hostpath/hostpath.go index 7291638e4..6eeeb6543 100644 --- a/pkg/hostpath/hostpath.go +++ b/pkg/hostpath/hostpath.go @@ -368,7 +368,7 @@ func loadFromBlockVolume(hostPathVolume state.Volume, destPath string) error { func (hp *hostPath) getAttachCount() int64 { count := int64(0) for _, vol := range hp.state.GetVolumes() { - if vol.IsAttached { + if vol.Attached { count++ } } diff --git a/pkg/hostpath/nodeserver.go b/pkg/hostpath/nodeserver.go index 803e06b28..a1465deb1 100644 --- a/pkg/hostpath/nodeserver.go +++ b/pkg/hostpath/nodeserver.go @@ -80,8 +80,13 @@ func (hp *hostPath) NodePublishVolume(ctx context.Context, req *csi.NodePublishV return nil, status.Error(codes.NotFound, err.Error()) } - if !ephemeralVolume && !vol.IsStaged { - return nil, status.Errorf(codes.FailedPrecondition, "Volume ('%s') must be staged before publishing.", vol.VolID) + if !ephemeralVolume { + if vol.Staged.Empty() { + return nil, status.Errorf(codes.FailedPrecondition, "volume %q must be staged before publishing", vol.VolID) + } + if !vol.Staged.Has(req.GetStagingTargetPath()) { + return nil, status.Errorf(codes.InvalidArgument, "volume %q was staged at %v, not %q", vol.VolID, vol.Staged, req.GetStagingTargetPath()) + } } if req.GetVolumeCapability().GetBlock() != nil { @@ -184,7 +189,7 @@ func (hp *hostPath) NodePublishVolume(ctx context.Context, req *csi.NodePublishV } vol.NodeID = hp.config.NodeID - vol.IsPublished = true + vol.Published.Add(targetPath) if err := hp.state.UpdateVolume(vol); err != nil { return nil, err } @@ -213,6 +218,11 @@ func (hp *hostPath) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpubl return nil, err } + if !vol.Published.Has(targetPath) { + glog.V(4).Infof("Volume %q is not published at %q, nothing to do.", volumeID, targetPath) + return &csi.NodeUnpublishVolumeResponse{}, nil + } + // Unmount only if the target path is really a mount point. if notMnt, err := mount.IsNotMountPoint(mount.New(""), targetPath); err != nil { if !os.IsNotExist(err) { @@ -238,7 +248,7 @@ func (hp *hostPath) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpubl return nil, fmt.Errorf("failed to delete volume: %w", err) } } else { - vol.IsPublished = false + vol.Published.Remove(targetPath) if err := hp.state.UpdateVolume(vol); err != nil { return nil, err } @@ -253,24 +263,39 @@ func (hp *hostPath) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolum if len(req.GetVolumeId()) == 0 { return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") } - if len(req.GetStagingTargetPath()) == 0 { + stagingTargetPath := req.GetStagingTargetPath() + if stagingTargetPath == "" { return nil, status.Error(codes.InvalidArgument, "Target path missing in request") } if req.GetVolumeCapability() == nil { return nil, status.Error(codes.InvalidArgument, "Volume Capability missing in request") } + // Lock before acting on global state. A production-quality + // driver might use more fine-grained locking. + hp.mutex.Lock() + defer hp.mutex.Unlock() + vol, err := hp.state.GetVolumeByID(req.VolumeId) if err != nil { return nil, err } - if hp.config.EnableAttach && !vol.IsAttached { + if hp.config.EnableAttach && !vol.Attached { return nil, status.Errorf(codes.Internal, "ControllerPublishVolume must be called on volume '%s' before staging on node", vol.VolID) } - vol.IsStaged = true + if vol.Staged.Has(stagingTargetPath) { + glog.V(4).Infof("Volume %q is already staged at %q, nothing to do.", req.VolumeId, stagingTargetPath) + return &csi.NodeStageVolumeResponse{}, nil + } + + if !vol.Staged.Empty() { + return nil, status.Errorf(codes.FailedPrecondition, "volume %q is already staged at %v", req.VolumeId, vol.Staged) + } + + vol.Staged.Add(stagingTargetPath) if err := hp.state.UpdateVolume(vol); err != nil { return nil, err } @@ -284,19 +309,30 @@ func (hp *hostPath) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageV if len(req.GetVolumeId()) == 0 { return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") } - if len(req.GetStagingTargetPath()) == 0 { + stagingTargetPath := req.GetStagingTargetPath() + if stagingTargetPath == "" { return nil, status.Error(codes.InvalidArgument, "Target path missing in request") } + // Lock before acting on global state. A production-quality + // driver might use more fine-grained locking. + hp.mutex.Lock() + defer hp.mutex.Unlock() + vol, err := hp.state.GetVolumeByID(req.VolumeId) if err != nil { return nil, err } - if vol.IsPublished { - return nil, status.Errorf(codes.Internal, "Volume '%s' is still pulished on '%s' node", vol.VolID, vol.NodeID) + if !vol.Staged.Has(stagingTargetPath) { + glog.V(4).Infof("Volume %q is not staged at %q, nothing to do.", req.VolumeId, stagingTargetPath) + return &csi.NodeUnstageVolumeResponse{}, nil + } + + if !vol.Published.Empty() { + return nil, status.Errorf(codes.Internal, "volume %q is still published at %q on node %q", vol.VolID, vol.Published, vol.NodeID) } - vol.IsStaged = false + vol.Staged.Remove(stagingTargetPath) if err := hp.state.UpdateVolume(vol); err != nil { return nil, err } diff --git a/pkg/state/state.go b/pkg/state/state.go index af203f11e..84f3ebfb8 100644 --- a/pkg/state/state.go +++ b/pkg/state/state.go @@ -48,9 +48,14 @@ type Volume struct { NodeID string Kind string ReadOnlyAttach bool - IsAttached bool - IsStaged bool - IsPublished bool + Attached bool + // Staged contains the staging target path at which the volume + // was staged. A set of paths is used for consistency + // with Published. + Staged Strings + // Published contains the target paths where the volume + // was published. + Published Strings } type Snapshot struct { diff --git a/pkg/state/strings.go b/pkg/state/strings.go new file mode 100644 index 000000000..68dfcc1f8 --- /dev/null +++ b/pkg/state/strings.go @@ -0,0 +1,51 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package state + +// Strings is an ordered set of strings with helper functions for +// adding, searching and removing entries. +type Strings []string + +// Add appends at the end. +func (s *Strings) Add(str string) { + *s = append(*s, str) +} + +// Has checks whether the string is already present. +func (s *Strings) Has(str string) bool { + for _, str2 := range *s { + if str == str2 { + return true + } + } + return false +} + +// Empty returns true if the list is empty. +func (s *Strings) Empty() bool { + return len(*s) == 0 +} + +// Remove removes the first occurence of the string, if present. +func (s *Strings) Remove(str string) { + for i, str2 := range *s { + if str == str2 { + *s = append((*s)[:i], (*s)[i+1:]...) + return + } + } +}