diff --git a/cmd/hostpathplugin/main.go b/cmd/hostpathplugin/main.go index 0018d753e..bd36c3a71 100644 --- a/cmd/hostpathplugin/main.go +++ b/cmd/hostpathplugin/main.go @@ -52,7 +52,9 @@ func main() { flag.Var(&cfg.Capacity, "capacity", "Simulate storage capacity. The parameter is = where is the value of a 'kind' storage class parameter and is the total amount of bytes for that kind. The flag may be used multiple times to configure different kinds.") flag.BoolVar(&cfg.EnableAttach, "enable-attach", false, "Enables RPC_PUBLISH_UNPUBLISH_VOLUME capability.") flag.Int64Var(&cfg.MaxVolumeSize, "max-volume-size", 1024*1024*1024*1024, "maximum size of volumes in bytes (inclusive)") - + flag.BoolVar(&cfg.EnableTopology, "enable-topology", true, "Enables PluginCapability_Service_VOLUME_ACCESSIBILITY_CONSTRAINTS capability.") + flag.BoolVar(&cfg.EnableVolumeExpansion, "node-expand-required", true, "Enables NodeServiceCapability_RPC_EXPAND_VOLUME capacity.") + flag.Int64Var(&cfg.AttachLimit, "attach-limit", 0, "Maximum number of attachable volumes on a node. Zero refers to no limit.") showVersion := flag.Bool("version", false, "Show version.") // The proxy-endpoint option is intended to used by the Kubernetes E2E test suite // for proxying incoming calls to the embedded mock CSI driver. diff --git a/pkg/hostpath/controllerserver.go b/pkg/hostpath/controllerserver.go index e5c57de5b..6bbd732bb 100644 --- a/pkg/hostpath/controllerserver.go +++ b/pkg/hostpath/controllerserver.go @@ -99,8 +99,9 @@ func (hp *hostPath) CreateVolume(ctx context.Context, req *csi.CreateVolumeReque defer hp.mutex.Unlock() capacity := int64(req.GetCapacityRange().GetRequiredBytes()) - topologies := []*csi.Topology{ - {Segments: map[string]string{TopologyKeyNode: hp.config.NodeID}}, + topologies := []*csi.Topology{} + if hp.config.EnableTopology { + topologies = append(topologies, &csi.Topology{Segments: map[string]string{TopologyKeyNode: hp.config.NodeID}}) } // Need to check for already existing volume name, and if found @@ -303,6 +304,11 @@ func (hp *hostPath) ControllerPublishVolume(ctx context.Context, req *csi.Contro }, nil } + // Check attach limit before publishing. + if hp.config.AttachLimit > 0 && hp.getAttachCount() >= hp.config.AttachLimit { + return nil, status.Errorf(codes.ResourceExhausted, "Cannot attach any more volumes to this node ('%s')", hp.config.NodeID) + } + vol.IsAttached = true vol.ReadOnlyAttach = req.GetReadonly() if err := hp.updateVolume(vol.VolID, vol); err != nil { @@ -695,6 +701,9 @@ func (hp *hostPath) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsReq } func (hp *hostPath) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) { + if !hp.config.EnableVolumeExpansion { + return nil, status.Error(codes.Unimplemented, "ControllerExpandVolume is not supported") + } volID := req.GetVolumeId() if len(volID) == 0 { @@ -778,9 +787,11 @@ func (hp *hostPath) getControllerServiceCapabilities() []*csi.ControllerServiceC csi.ControllerServiceCapability_RPC_LIST_SNAPSHOTS, csi.ControllerServiceCapability_RPC_LIST_VOLUMES, csi.ControllerServiceCapability_RPC_CLONE_VOLUME, - csi.ControllerServiceCapability_RPC_EXPAND_VOLUME, csi.ControllerServiceCapability_RPC_VOLUME_CONDITION, } + if hp.config.EnableVolumeExpansion { + cl = append(cl, csi.ControllerServiceCapability_RPC_EXPAND_VOLUME) + } if hp.config.EnableAttach { cl = append(cl, csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME) } diff --git a/pkg/hostpath/hostpath.go b/pkg/hostpath/hostpath.go index 4b6f8ba5e..b992bacfc 100644 --- a/pkg/hostpath/hostpath.go +++ b/pkg/hostpath/hostpath.go @@ -91,17 +91,20 @@ type hostPathSnapshot struct { } type Config struct { - DriverName string - Endpoint string - ProxyEndpoint string - NodeID string - VendorVersion string - MaxVolumesPerNode int64 - MaxVolumeSize int64 - Capacity Capacity - Ephemeral bool - ShowVersion bool - EnableAttach bool + DriverName string + Endpoint string + ProxyEndpoint string + NodeID string + VendorVersion string + MaxVolumesPerNode int64 + MaxVolumeSize int64 + AttachLimit int64 + Capacity Capacity + Ephemeral bool + ShowVersion bool + EnableAttach bool + EnableTopology bool + EnableVolumeExpansion bool } var ( @@ -423,7 +426,7 @@ func (hp *hostPath) loadFromSnapshot(size int64, snapshotId, destPath string, mo if !ok { return status.Errorf(codes.NotFound, "cannot find snapshot %v", snapshotId) } - if snapshot.ReadyToUse != true { + if !snapshot.ReadyToUse { return fmt.Errorf("snapshot %v is not yet ready to use", snapshotId) } if snapshot.SizeBytes > size { @@ -516,6 +519,16 @@ func (hp *hostPath) getSortedVolumeIDs() []string { return ids } +func (hp *hostPath) getAttachCount() int64 { + count := int64(0) + for _, vol := range hp.volumes { + if vol.IsAttached { + count++ + } + } + return count +} + func filterVolumeName(targetPath string) string { pathItems := strings.Split(targetPath, "kubernetes.io~csi/") if len(pathItems) < 2 { diff --git a/pkg/hostpath/identityserver.go b/pkg/hostpath/identityserver.go index 2cd2c8a02..05225d800 100644 --- a/pkg/hostpath/identityserver.go +++ b/pkg/hostpath/identityserver.go @@ -47,22 +47,24 @@ func (hp *hostPath) Probe(ctx context.Context, req *csi.ProbeRequest) (*csi.Prob func (hp *hostPath) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) { glog.V(5).Infof("Using default capabilities") - return &csi.GetPluginCapabilitiesResponse{ - Capabilities: []*csi.PluginCapability{ - { - Type: &csi.PluginCapability_Service_{ - Service: &csi.PluginCapability_Service{ - Type: csi.PluginCapability_Service_CONTROLLER_SERVICE, - }, + caps := []*csi.PluginCapability{ + { + Type: &csi.PluginCapability_Service_{ + Service: &csi.PluginCapability_Service{ + Type: csi.PluginCapability_Service_CONTROLLER_SERVICE, }, }, - { - Type: &csi.PluginCapability_Service_{ - Service: &csi.PluginCapability_Service{ - Type: csi.PluginCapability_Service_VOLUME_ACCESSIBILITY_CONSTRAINTS, - }, + }, + } + if hp.config.EnableTopology { + caps = append(caps, &csi.PluginCapability{ + Type: &csi.PluginCapability_Service_{ + Service: &csi.PluginCapability_Service{ + Type: csi.PluginCapability_Service_VOLUME_ACCESSIBILITY_CONSTRAINTS, }, }, - }, - }, nil + }) + } + + return &csi.GetPluginCapabilitiesResponse{Capabilities: caps}, nil } diff --git a/pkg/hostpath/nodeserver.go b/pkg/hostpath/nodeserver.go index 61115e109..ff9b676d2 100644 --- a/pkg/hostpath/nodeserver.go +++ b/pkg/hostpath/nodeserver.go @@ -302,51 +302,59 @@ func (hp *hostPath) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageV } func (hp *hostPath) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) { + resp := &csi.NodeGetInfoResponse{ + NodeId: hp.config.NodeID, + MaxVolumesPerNode: hp.config.MaxVolumesPerNode, + } - topology := &csi.Topology{ - Segments: map[string]string{TopologyKeyNode: hp.config.NodeID}, + if hp.config.EnableTopology { + resp.AccessibleTopology = &csi.Topology{ + Segments: map[string]string{TopologyKeyNode: hp.config.NodeID}, + } } - return &csi.NodeGetInfoResponse{ - NodeId: hp.config.NodeID, - MaxVolumesPerNode: hp.config.MaxVolumesPerNode, - AccessibleTopology: topology, - }, nil + if hp.config.AttachLimit > 0 { + resp.MaxVolumesPerNode = hp.config.AttachLimit + } + + return resp, nil } func (hp *hostPath) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) { - - return &csi.NodeGetCapabilitiesResponse{ - Capabilities: []*csi.NodeServiceCapability{ - { - Type: &csi.NodeServiceCapability_Rpc{ - Rpc: &csi.NodeServiceCapability_RPC{ - Type: csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME, - }, + caps := []*csi.NodeServiceCapability{ + { + Type: &csi.NodeServiceCapability_Rpc{ + Rpc: &csi.NodeServiceCapability_RPC{ + Type: csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME, }, }, - { - Type: &csi.NodeServiceCapability_Rpc{ - Rpc: &csi.NodeServiceCapability_RPC{ - Type: csi.NodeServiceCapability_RPC_EXPAND_VOLUME, - }, + }, + { + Type: &csi.NodeServiceCapability_Rpc{ + Rpc: &csi.NodeServiceCapability_RPC{ + Type: csi.NodeServiceCapability_RPC_VOLUME_CONDITION, }, }, - { - Type: &csi.NodeServiceCapability_Rpc{ - Rpc: &csi.NodeServiceCapability_RPC{ - Type: csi.NodeServiceCapability_RPC_GET_VOLUME_STATS, - }, - }, - }, { - Type: &csi.NodeServiceCapability_Rpc{ - Rpc: &csi.NodeServiceCapability_RPC{ - Type: csi.NodeServiceCapability_RPC_VOLUME_CONDITION, - }, + }, + { + Type: &csi.NodeServiceCapability_Rpc{ + Rpc: &csi.NodeServiceCapability_RPC{ + Type: csi.NodeServiceCapability_RPC_GET_VOLUME_STATS, }, }, }, - }, nil + } + if hp.config.EnableVolumeExpansion { + caps = append(caps, &csi.NodeServiceCapability{ + Type: &csi.NodeServiceCapability_Rpc{ + Rpc: &csi.NodeServiceCapability_RPC{ + Type: csi.NodeServiceCapability_RPC_EXPAND_VOLUME, + }, + }, + }) + } + + return &csi.NodeGetCapabilitiesResponse{Capabilities: caps}, nil } func (hp *hostPath) NodeGetVolumeStats(ctx context.Context, in *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) { @@ -403,6 +411,9 @@ func (hp *hostPath) NodeGetVolumeStats(ctx context.Context, in *csi.NodeGetVolum // NodeExpandVolume is only implemented so the driver can be used for e2e testing. func (hp *hostPath) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) { + if !hp.config.EnableVolumeExpansion { + return nil, status.Error(codes.Unimplemented, "NodeExpandVolume is not supported") + } volID := req.GetVolumeId() if len(volID) == 0 {