Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/hostpathplugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ func main() {
flag.Var(&cfg.Capacity, "capacity", "Simulate storage capacity. The parameter is <kind>=<quantity> where <kind> is the value of a 'kind' storage class parameter and <quantity> is the total amount of bytes for that kind. The flag may be used multiple times to configure different kinds.")
flag.BoolVar(&cfg.EnableAttach, "enable-attach", false, "Enables RPC_PUBLISH_UNPUBLISH_VOLUME capability.")
flag.Int64Var(&cfg.MaxVolumeSize, "max-volume-size", 1024*1024*1024*1024, "maximum size of volumes in bytes (inclusive)")

flag.BoolVar(&cfg.EnableTopology, "enable-topology", true, "Enables PluginCapability_Service_VOLUME_ACCESSIBILITY_CONSTRAINTS capability.")
flag.BoolVar(&cfg.EnableVolumeExpansion, "node-expand-required", true, "Enables NodeServiceCapability_RPC_EXPAND_VOLUME capacity.")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This variable and configuration option is strangely named. It says "node-expand-required" but in truth, it controls entire volume expansion feature. I would like us to rename this.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't looked into it, but that sounds reasonable. Can you submit a PR?

flag.Int64Var(&cfg.AttachLimit, "attach-limit", 0, "Maximum number of attachable volumes on a node. Zero refers to no limit.")
showVersion := flag.Bool("version", false, "Show version.")
// The proxy-endpoint option is intended to used by the Kubernetes E2E test suite
// for proxying incoming calls to the embedded mock CSI driver.
Expand Down
17 changes: 14 additions & 3 deletions pkg/hostpath/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,9 @@ func (hp *hostPath) CreateVolume(ctx context.Context, req *csi.CreateVolumeReque
defer hp.mutex.Unlock()

capacity := int64(req.GetCapacityRange().GetRequiredBytes())
topologies := []*csi.Topology{
{Segments: map[string]string{TopologyKeyNode: hp.config.NodeID}},
topologies := []*csi.Topology{}
if hp.config.EnableTopology {
topologies = append(topologies, &csi.Topology{Segments: map[string]string{TopologyKeyNode: hp.config.NodeID}})
}

// Need to check for already existing volume name, and if found
Expand Down Expand Up @@ -303,6 +304,11 @@ func (hp *hostPath) ControllerPublishVolume(ctx context.Context, req *csi.Contro
}, nil
}

// Check attach limit before publishing.
if hp.config.AttachLimit > 0 && hp.getAttachCount() >= hp.config.AttachLimit {
return nil, status.Errorf(codes.ResourceExhausted, "Cannot attach any more volumes to this node ('%s')", hp.config.NodeID)
}

vol.IsAttached = true
vol.ReadOnlyAttach = req.GetReadonly()
if err := hp.updateVolume(vol.VolID, vol); err != nil {
Expand Down Expand Up @@ -695,6 +701,9 @@ func (hp *hostPath) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsReq
}

func (hp *hostPath) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
if !hp.config.EnableVolumeExpansion {
return nil, status.Error(codes.Unimplemented, "ControllerExpandVolume is not supported")
}

volID := req.GetVolumeId()
if len(volID) == 0 {
Expand Down Expand Up @@ -778,9 +787,11 @@ func (hp *hostPath) getControllerServiceCapabilities() []*csi.ControllerServiceC
csi.ControllerServiceCapability_RPC_LIST_SNAPSHOTS,
csi.ControllerServiceCapability_RPC_LIST_VOLUMES,
csi.ControllerServiceCapability_RPC_CLONE_VOLUME,
csi.ControllerServiceCapability_RPC_EXPAND_VOLUME,
csi.ControllerServiceCapability_RPC_VOLUME_CONDITION,
}
if hp.config.EnableVolumeExpansion {
cl = append(cl, csi.ControllerServiceCapability_RPC_EXPAND_VOLUME)
}
if hp.config.EnableAttach {
cl = append(cl, csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME)
}
Expand Down
37 changes: 25 additions & 12 deletions pkg/hostpath/hostpath.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,17 +91,20 @@ type hostPathSnapshot struct {
}

type Config struct {
DriverName string
Endpoint string
ProxyEndpoint string
NodeID string
VendorVersion string
MaxVolumesPerNode int64
MaxVolumeSize int64
Capacity Capacity
Ephemeral bool
ShowVersion bool
EnableAttach bool
DriverName string
Endpoint string
ProxyEndpoint string
NodeID string
VendorVersion string
MaxVolumesPerNode int64
MaxVolumeSize int64
AttachLimit int64
Capacity Capacity
Ephemeral bool
ShowVersion bool
EnableAttach bool
EnableTopology bool
EnableVolumeExpansion bool
}

var (
Expand Down Expand Up @@ -423,7 +426,7 @@ func (hp *hostPath) loadFromSnapshot(size int64, snapshotId, destPath string, mo
if !ok {
return status.Errorf(codes.NotFound, "cannot find snapshot %v", snapshotId)
}
if snapshot.ReadyToUse != true {
if !snapshot.ReadyToUse {
return fmt.Errorf("snapshot %v is not yet ready to use", snapshotId)
}
if snapshot.SizeBytes > size {
Expand Down Expand Up @@ -516,6 +519,16 @@ func (hp *hostPath) getSortedVolumeIDs() []string {
return ids
}

func (hp *hostPath) getAttachCount() int64 {
count := int64(0)
for _, vol := range hp.volumes {
if vol.IsAttached {
count++
}
}
return count
}

func filterVolumeName(targetPath string) string {
pathItems := strings.Split(targetPath, "kubernetes.io~csi/")
if len(pathItems) < 2 {
Expand Down
30 changes: 16 additions & 14 deletions pkg/hostpath/identityserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,22 +47,24 @@ func (hp *hostPath) Probe(ctx context.Context, req *csi.ProbeRequest) (*csi.Prob

func (hp *hostPath) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) {
glog.V(5).Infof("Using default capabilities")
return &csi.GetPluginCapabilitiesResponse{
Capabilities: []*csi.PluginCapability{
{
Type: &csi.PluginCapability_Service_{
Service: &csi.PluginCapability_Service{
Type: csi.PluginCapability_Service_CONTROLLER_SERVICE,
},
caps := []*csi.PluginCapability{
{
Type: &csi.PluginCapability_Service_{
Service: &csi.PluginCapability_Service{
Type: csi.PluginCapability_Service_CONTROLLER_SERVICE,
},
},
{
Type: &csi.PluginCapability_Service_{
Service: &csi.PluginCapability_Service{
Type: csi.PluginCapability_Service_VOLUME_ACCESSIBILITY_CONSTRAINTS,
},
},
}
if hp.config.EnableTopology {
caps = append(caps, &csi.PluginCapability{
Type: &csi.PluginCapability_Service_{
Service: &csi.PluginCapability_Service{
Type: csi.PluginCapability_Service_VOLUME_ACCESSIBILITY_CONSTRAINTS,
},
},
},
}, nil
})
}

return &csi.GetPluginCapabilitiesResponse{Capabilities: caps}, nil
}
75 changes: 43 additions & 32 deletions pkg/hostpath/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,51 +302,59 @@ func (hp *hostPath) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageV
}

func (hp *hostPath) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
resp := &csi.NodeGetInfoResponse{
NodeId: hp.config.NodeID,
MaxVolumesPerNode: hp.config.MaxVolumesPerNode,
}

topology := &csi.Topology{
Segments: map[string]string{TopologyKeyNode: hp.config.NodeID},
if hp.config.EnableTopology {
resp.AccessibleTopology = &csi.Topology{
Segments: map[string]string{TopologyKeyNode: hp.config.NodeID},
}
}

return &csi.NodeGetInfoResponse{
NodeId: hp.config.NodeID,
MaxVolumesPerNode: hp.config.MaxVolumesPerNode,
AccessibleTopology: topology,
}, nil
if hp.config.AttachLimit > 0 {
resp.MaxVolumesPerNode = hp.config.AttachLimit
}

return resp, nil
}

func (hp *hostPath) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {

return &csi.NodeGetCapabilitiesResponse{
Capabilities: []*csi.NodeServiceCapability{
{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
Type: csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME,
},
caps := []*csi.NodeServiceCapability{
{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
Type: csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME,
},
},
{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
Type: csi.NodeServiceCapability_RPC_EXPAND_VOLUME,
},
},
{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
Type: csi.NodeServiceCapability_RPC_VOLUME_CONDITION,
},
},
{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
Type: csi.NodeServiceCapability_RPC_GET_VOLUME_STATS,
},
},
}, {
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
Type: csi.NodeServiceCapability_RPC_VOLUME_CONDITION,
},
},
{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
Type: csi.NodeServiceCapability_RPC_GET_VOLUME_STATS,
},
},
},
}, nil
}
if hp.config.EnableVolumeExpansion {
caps = append(caps, &csi.NodeServiceCapability{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
Type: csi.NodeServiceCapability_RPC_EXPAND_VOLUME,
},
},
})
}

return &csi.NodeGetCapabilitiesResponse{Capabilities: caps}, nil
}

func (hp *hostPath) NodeGetVolumeStats(ctx context.Context, in *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
Expand Down Expand Up @@ -403,6 +411,9 @@ func (hp *hostPath) NodeGetVolumeStats(ctx context.Context, in *csi.NodeGetVolum

// NodeExpandVolume is only implemented so the driver can be used for e2e testing.
func (hp *hostPath) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
if !hp.config.EnableVolumeExpansion {
return nil, status.Error(codes.Unimplemented, "NodeExpandVolume is not supported")
}

volID := req.GetVolumeId()
if len(volID) == 0 {
Expand Down