Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cmd/gce-pd-csi-driver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ var (

diskTopology = flag.Bool("disk-topology", false, "If set to true, the driver will add a disk-type.gke.io/[disk-type] topology label when the StorageClass has the use-allowed-disk-topology parameter set to true. That topology label is included in the Topologies returned in CreateVolumeResponse. This flag is disabled by default.")

dynamicVolumes = flag.Bool("dynamic-volumes", false, "If set to true, the CSI driver will automatically select a compatible disk type based on the presence of the dynamic-volume parameter and disk types defined in the StorageClass. Disabled by default.")

diskCacheSyncPeriod = flag.Duration("disk-cache-sync-period", 10*time.Minute, "Period for the disk cache to check the /dev/disk/by-id/ directory and evaluate the symlinks")

enableDiskSizeValidation = flag.Bool("enable-disk-size-validation", false, "If set to true, the driver will validate that the requested disk size is matches the physical disk size. This flag is disabled by default.")
Expand Down Expand Up @@ -255,6 +257,7 @@ func handle() {
args := &driver.GCEControllerServerArgs{
EnableDiskTopology: *diskTopology,
EnableDiskSizeValidation: *enableDiskSizeValidation,
EnableDynamicVolumes: *dynamicVolumes,
}

controllerServer = driver.NewControllerServer(gceDriver, cloudProvider, initialBackoffDuration, maxBackoffDuration, fallbackRequisiteZones, *enableStoragePoolsFlag, *enableDataCacheFlag, multiZoneVolumeHandleConfig, listVolumesConfig, provisionableDisksConfig, *enableHdHAFlag, args)
Expand Down Expand Up @@ -297,6 +300,8 @@ func handle() {
SysfsPath: "/sys",
MetricsManager: metricsManager,
DeviceCache: deviceCache,
EnableDynamicVolumes: *dynamicVolumes,
NodeName: *nodeName,
}
nodeServer = driver.NewNodeServer(gceDriver, mounter, deviceUtils, meta, statter, nsArgs)

Expand Down
5 changes: 5 additions & 0 deletions pkg/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,11 @@ func MapNumber(vCPUs int64, limitMap []MachineHyperdiskLimit) int64 {
return 15
}

// HasDiskTypeLabelKeyPrefix checks if the label key starts with the DiskTypeKeyPrefix.
func HasDiskTypeLabelKeyPrefix(labelKey string) bool {
return strings.HasPrefix(labelKey, DiskTypeKeyPrefix)
}

func DiskTypeLabelKey(diskType string) string {
return fmt.Sprintf("%s/%s", DiskTypeKeyPrefix, diskType)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/gce-pd-csi-driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ type GCEControllerServer struct {
type GCEControllerServerArgs struct {
EnableDiskTopology bool
EnableDiskSizeValidation bool
EnableDynamicVolumes bool
}

type MultiZoneVolumeHandleConfig struct {
Expand Down
2 changes: 2 additions & 0 deletions pkg/gce-pd-csi-driver/gce-pd-driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ func NewNodeServer(gceDriver *GCEDriver, mounter *mount.SafeFormatAndMount, devi
SysfsPath: args.SysfsPath,
metricsManager: args.MetricsManager,
DeviceCache: args.DeviceCache,
EnableDynamicVolumes: args.EnableDynamicVolumes,
nodeName: args.NodeName,
}
}

Expand Down
62 changes: 54 additions & 8 deletions pkg/gce-pd-csi-driver/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ import (

csi "github.com/container-storage-interface/spec/lib/go/csi"

corev1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
"k8s.io/mount-utils"

"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/deviceutils"
metadataservice "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/metadata"
Expand All @@ -54,6 +54,7 @@ type GCENodeServer struct {
EnableDataCache bool
DataCacheEnabledNodePool bool
SysfsPath string
nodeName string

// A map storing all volumes with ongoing operations so that additional operations
// for that same volume (as defined by VolumeID) return an Aborted error
Expand Down Expand Up @@ -82,6 +83,8 @@ type GCENodeServer struct {
metricsManager *metrics.MetricsManager
// A cache of the device paths for the volumes that are attached to the node.
DeviceCache *linkcache.DeviceCache

EnableDynamicVolumes bool
}

type NodeServerArgs struct {
Expand All @@ -98,8 +101,12 @@ type NodeServerArgs struct {
// SysfsPath defaults to "/sys", except if it's a unit test.
SysfsPath string

NodeName string

MetricsManager *metrics.MetricsManager
DeviceCache *linkcache.DeviceCache

EnableDynamicVolumes bool
}

var _ csi.NodeServer = &GCENodeServer{}
Expand Down Expand Up @@ -166,6 +173,15 @@ func (ns *GCENodeServer) WithSerializedFormatAndMount(timeout time.Duration, max
return ns
}

// GetNodeName returns the node name, prioritizing the override value (from Downward API)
// over the metadata service if available.
func (ns *GCENodeServer) GetNodeName() string {
if ns.nodeName != "" {
return ns.nodeName
}
return ns.MetadataService.GetName()
}

func (ns *GCENodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
// Validate Arguments
targetPath := req.GetTargetPath()
Expand Down Expand Up @@ -686,9 +702,25 @@ func (ns *GCENodeServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRe
Segments: map[string]string{common.TopologyKeyZone: ns.MetadataService.GetZone()},
}

node, err := k8sclient.GetNodeWithRetry(ctx, ns.GetNodeName())
if err != nil {
klog.Errorf("Failed to get node %s: %v. The error is ignored so that the driver can register", ns.GetNodeName(), err.Error())
}

if ns.EnableDynamicVolumes {
labels, err := ns.getDiskTypeLabels(node)
if err != nil {
klog.Errorf("Failed to fetch disk type topology labels: %v", err)
}

for k, v := range labels {
top.Segments[k] = v
}
}

nodeID := common.CreateNodeID(ns.MetadataService.GetProject(), ns.MetadataService.GetZone(), ns.MetadataService.GetName())

volumeLimits, err := ns.GetVolumeLimits(ctx)
volumeLimits, err := ns.getVolumeLimits(ctx, node)
if err != nil {
klog.Errorf("GetVolumeLimits failed: %v. The error is ignored so that the driver can register", err.Error())
// No error should be returned from NodeGetInfo, otherwise the driver will not register
Expand Down Expand Up @@ -850,7 +882,7 @@ func (ns *GCENodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpa
}, nil
}

func (ns *GCENodeServer) GetVolumeLimits(ctx context.Context) (int64, error) {
func (ns *GCENodeServer) getVolumeLimits(ctx context.Context, node *corev1.Node) (int64, error) {
// Machine-type format: n1-type-CPUS or custom-CPUS-RAM or f1/g1-type
machineType := ns.MetadataService.GetMachineType()

Expand All @@ -862,7 +894,7 @@ func (ns *GCENodeServer) GetVolumeLimits(ctx context.Context) (int64, error) {
}

// Get attach limit override from label
attachLimitOverride, err := GetAttachLimitsOverrideFromNodeLabel(ctx, ns.MetadataService.GetName())
attachLimitOverride, err := getAttachLimitsOverrideFromNodeLabel(node)
if err == nil && attachLimitOverride > 0 && attachLimitOverride < 128 {
return attachLimitOverride, nil
} else {
Expand Down Expand Up @@ -924,10 +956,10 @@ func (ns *GCENodeServer) GetVolumeLimits(ctx context.Context) (int64, error) {
return volumeLimitBig, nil
}

func GetAttachLimitsOverrideFromNodeLabel(ctx context.Context, nodeName string) (int64, error) {
node, err := k8sclient.GetNodeWithRetry(ctx, nodeName)
if err != nil {
return 0, err
func getAttachLimitsOverrideFromNodeLabel(node *corev1.Node) (int64, error) {
// If then node is nil, return 0 which means there is no override
if node == nil {
return 0, fmt.Errorf("node is nil")
}
if val, found := node.GetLabels()[fmt.Sprintf(common.NodeRestrictionLabelPrefix, common.AttachLimitOverrideLabel)]; found {
attachLimitOverrideForNode, err := strconv.ParseInt(val, 10, 64)
Expand All @@ -939,3 +971,17 @@ func GetAttachLimitsOverrideFromNodeLabel(ctx context.Context, nodeName string)
}
return 0, nil
}

func (ns *GCENodeServer) getDiskTypeLabels(node *corev1.Node) (map[string]string, error) {
if node == nil {
return nil, fmt.Errorf("node is nil")
}
lbls := make(map[string]string)
for k, v := range node.GetLabels() {
if common.HasDiskTypeLabelKeyPrefix(k) {
lbls[k] = v
}
}

return lbls, nil
}
Loading