diff --git a/cmd/gce-pd-csi-driver/main.go b/cmd/gce-pd-csi-driver/main.go index 0cb6baf3e..618b45a31 100644 --- a/cmd/gce-pd-csi-driver/main.go +++ b/cmd/gce-pd-csi-driver/main.go @@ -94,6 +94,8 @@ var ( extraTagsStr = flag.String("extra-tags", "", "Extra tags to attach to each Compute Disk, Image, Snapshot created. It is a comma separated list of parent id, key and value like '//,...,//'. parent_id is the Organization or the Project ID or Project name where the tag key and the tag value resources exist. A maximum of 50 tags bindings is allowed for a resource. See https://cloud.google.com/resource-manager/docs/tags/tags-overview, https://cloud.google.com/resource-manager/docs/tags/tags-creating-and-managing for details") + diskTopology = flag.Bool("disk-topology", false, "If set to true, the driver will add a disk-type.gke.io/[some-disk-type] topology label to the Topologies returned in CreateVolumeResponse.") + version string ) @@ -225,9 +227,15 @@ func handle() { if err != nil { klog.Fatalf("Failed to get cloud provider: %v", err.Error()) } + initialBackoffDuration := time.Duration(*errorBackoffInitialDurationMs) * time.Millisecond maxBackoffDuration := time.Duration(*errorBackoffMaxDurationMs) * time.Millisecond - controllerServer = driver.NewControllerServer(gceDriver, cloudProvider, initialBackoffDuration, maxBackoffDuration, fallbackRequisiteZones, *enableStoragePoolsFlag, *enableDataCacheFlag, multiZoneVolumeHandleConfig, listVolumesConfig, provisionableDisksConfig, *enableHdHAFlag) + // TODO(2042): Move more of the constructor args into this struct + args := &driver.GCEControllerServerArgs{ + EnableDiskTopology: *diskTopology, + } + + controllerServer = driver.NewControllerServer(gceDriver, cloudProvider, initialBackoffDuration, maxBackoffDuration, fallbackRequisiteZones, *enableStoragePoolsFlag, *enableDataCacheFlag, multiZoneVolumeHandleConfig, listVolumesConfig, provisionableDisksConfig, *enableHdHAFlag, args) } else if *cloudConfigFilePath != "" { klog.Warningf("controller service is disabled but cloud config given - it has no effect") } @@ -239,6 +247,7 @@ func handle() { if err != nil { klog.Fatalf("Failed to get safe mounter: %v", err.Error()) } + deviceUtils := deviceutils.NewDeviceUtils() statter := mountmanager.NewStatter(mounter) meta, err := metadataservice.NewMetadataService() @@ -249,13 +258,16 @@ func handle() { if err != nil { klog.Fatalf("Failed to get node info from API server: %v", err.Error()) } - nsArgs := driver.NodeServerArgs{ + + // TODO(2042): Move more of the constructor args into this struct + nsArgs := &driver.NodeServerArgs{ EnableDeviceInUseCheck: *enableDeviceInUseCheck, DeviceInUseTimeout: *deviceInUseTimeout, EnableDataCache: *enableDataCacheFlag, DataCacheEnabledNodePool: isDataCacheEnabledNodePool, } nodeServer = driver.NewNodeServer(gceDriver, mounter, deviceUtils, meta, statter, nsArgs) + if *maxConcurrentFormatAndMount > 0 { nodeServer = nodeServer.WithSerializedFormatAndMount(*formatAndMountTimeout, *maxConcurrentFormatAndMount) } diff --git a/pkg/common/constants.go b/pkg/common/constants.go index d5e5c1016..2ab74b778 100644 --- a/pkg/common/constants.go +++ b/pkg/common/constants.go @@ -20,6 +20,10 @@ const ( // Keys for Topology. This key will be shared amongst drivers from GCP TopologyKeyZone = "topology.gke.io/zone" + // DiskTypeKeyPrefix is the prefix for the disk type label key used as part + // of the Disk Topology feature. + DiskTypeKeyPrefix = "disk-type.gke.io" + // VolumeAttributes for Partition VolumeAttributePartition = "partition" diff --git a/pkg/common/utils.go b/pkg/common/utils.go index 719dcb1d3..e775e5d44 100644 --- a/pkg/common/utils.go +++ b/pkg/common/utils.go @@ -764,3 +764,7 @@ func MapNumber(num int64) int64 { } return 0 } + +func DiskTypeLabelKey(diskType string) string { + return fmt.Sprintf("%s/%s", DiskTypeKeyPrefix, diskType) +} diff --git a/pkg/gce-cloud-provider/metadata/fake.go b/pkg/gce-cloud-provider/metadata/fake.go index 64d493eb6..35a339189 100644 --- a/pkg/gce-cloud-provider/metadata/fake.go +++ b/pkg/gce-cloud-provider/metadata/fake.go @@ -20,13 +20,13 @@ type fakeServiceManager struct{} var _ MetadataService = &fakeServiceManager{} -const ( - FakeZone = "country-region-zone" - FakeProject = "test-project" +var ( + FakeMachineType = "n1-standard-1" + FakeZone = "country-region-zone" + FakeProject = "test-project" + FakeName = "test-name" ) -var FakeMachineType = "n1-standard-1" - func NewFakeService() MetadataService { return &fakeServiceManager{} } @@ -40,7 +40,7 @@ func (manager *fakeServiceManager) GetProject() string { } func (manager *fakeServiceManager) GetName() string { - return "test-name" + return FakeName } func (manager *fakeServiceManager) GetMachineType() string { @@ -50,3 +50,11 @@ func (manager *fakeServiceManager) GetMachineType() string { func SetMachineType(s string) { FakeMachineType = s } + +func SetZone(s string) { + FakeZone = s +} + +func SetName(s string) { + FakeName = s +} diff --git a/pkg/gce-pd-csi-driver/controller.go b/pkg/gce-pd-csi-driver/controller.go index dcb0e3c45..32f52286c 100644 --- a/pkg/gce-pd-csi-driver/controller.go +++ b/pkg/gce-pd-csi-driver/controller.go @@ -120,6 +120,12 @@ type GCEControllerServer struct { // Embed UnimplementedControllerServer to ensure the driver returns Unimplemented for any // new RPC methods that might be introduced in future versions of the spec. csi.UnimplementedControllerServer + + EnableDiskTopology bool +} + +type GCEControllerServerArgs struct { + EnableDiskTopology bool } type MultiZoneVolumeHandleConfig struct { @@ -320,7 +326,7 @@ func (gceCS *GCEControllerServer) createVolumeInternal(ctx context.Context, req if len(req.GetName()) == 0 { return nil, status.Error(codes.InvalidArgument, "CreateVolume Name must be provided") } - if volumeCapabilities == nil || len(volumeCapabilities) == 0 { + if len(volumeCapabilities) == 0 { return nil, status.Error(codes.InvalidArgument, "CreateVolume Volume capabilities must be provided") } @@ -465,9 +471,7 @@ func (gceCS *GCEControllerServer) getMultiZoneProvisioningZones(ctx context.Cont } func (gceCS *GCEControllerServer) createMultiZoneDisk(ctx context.Context, req *csi.CreateVolumeRequest, params common.DiskParameters, dataCacheParams common.DataCacheParameters, enableDataCache bool) (*csi.CreateVolumeResponse, error) { - // Determine the zones that are needed. var err error - // For multi-zone, we either select: // 1) The zones specified in requisite topology requirements // 2) All zones in the region that are compatible with the selected disk type @@ -517,7 +521,8 @@ func (gceCS *GCEControllerServer) createMultiZoneDisk(ctx context.Context, req * // Use the first response as a template volumeId := fmt.Sprintf("projects/%s/zones/%s/disks/%s", gceCS.CloudProvider.GetDefaultProject(), common.MultiZoneValue, req.GetName()) klog.V(4).Infof("CreateVolume succeeded for multi-zone disks in zones %s: %v", zones, multiZoneVolKey) - return generateCreateVolumeResponseWithVolumeId(createdDisks[0], zones, params, dataCacheParams, enableDataCache, volumeId), nil + + return gceCS.generateCreateVolumeResponseWithVolumeId(createdDisks[0], zones, params, dataCacheParams, enableDataCache, volumeId), nil } func (gceCS *GCEControllerServer) getZonesWithDiskNameAndType(ctx context.Context, name string, diskType string) ([]string, error) { @@ -617,13 +622,13 @@ func (gceCS *GCEControllerServer) createSingleDeviceDisk(ctx context.Context, re return nil, status.Errorf(codes.Aborted, common.VolumeOperationAlreadyExistsFmt, volumeID) } defer gceCS.volumeLocks.Release(volumeID) - disk, err := gceCS.createSingleDisk(ctx, req, params, volKey, zones, accessMode) + disk, err := gceCS.createSingleDisk(ctx, req, params, volKey, zones, accessMode) if err != nil { return nil, common.LoggedError("CreateVolume failed: %v", err) } - return generateCreateVolumeResponseWithVolumeId(disk, zones, params, dataCacheParams, enableDataCache, volumeID), err + return gceCS.generateCreateVolumeResponseWithVolumeId(disk, zones, params, dataCacheParams, enableDataCache, volumeID), nil } func getAccessMode(req *csi.CreateVolumeRequest, params common.DiskParameters) (string, error) { @@ -2396,7 +2401,7 @@ func extractVolumeContext(context map[string]string) (*PDCSIContext, error) { case contextForceAttach: b, err := common.ConvertStringToBool(val) if err != nil { - return nil, fmt.Errorf("Bad volume context force attach: %v", err) + return nil, fmt.Errorf("bad volume context force attach: %w", err) } info.ForceAttach = b } @@ -2404,13 +2409,22 @@ func extractVolumeContext(context map[string]string) (*PDCSIContext, error) { return info, nil } -func generateCreateVolumeResponseWithVolumeId(disk *gce.CloudDisk, zones []string, params common.DiskParameters, dataCacheParams common.DataCacheParameters, enableDataCache bool, volumeId string) *csi.CreateVolumeResponse { +func (gceCS *GCEControllerServer) generateCreateVolumeResponseWithVolumeId(disk *gce.CloudDisk, zones []string, params common.DiskParameters, dataCacheParams common.DataCacheParameters, enableDataCache bool, volumeId string) *csi.CreateVolumeResponse { tops := []*csi.Topology{} for _, zone := range zones { - tops = append(tops, &csi.Topology{ - Segments: map[string]string{common.TopologyKeyZone: zone}, - }) + top := &csi.Topology{ + Segments: map[string]string{ + common.TopologyKeyZone: zone, + }, + } + + if gceCS.EnableDiskTopology { + top.Segments[common.DiskTypeLabelKey(params.DiskType)] = "true" + } + + tops = append(tops, top) } + realDiskSizeBytes := common.GbToBytes(disk.GetSizeGb()) createResp := &csi.CreateVolumeResponse{ Volume: &csi.Volume{ @@ -2468,10 +2482,10 @@ func generateCreateVolumeResponseWithVolumeId(disk *gce.CloudDisk, zones []strin func getResourceId(resourceLink string) (string, error) { url, err := neturl.Parse(resourceLink) if err != nil { - return "", fmt.Errorf("Could not parse resource %s: %w", resourceLink, err) + return "", fmt.Errorf("could not parse resource %s: %w", resourceLink, err) } if url.Scheme != resourceApiScheme { - return "", fmt.Errorf("Unexpected API scheme for resource %s", resourceLink) + return "", fmt.Errorf("unexpected API scheme for resource %s", resourceLink) } // Note that the resource host can basically be anything, if we are running in @@ -2480,16 +2494,16 @@ func getResourceId(resourceLink string) (string, error) { // The path should be /compute/VERSION/project/.... elts := strings.Split(url.Path, "/") if len(elts) < 4 { - return "", fmt.Errorf("Short resource path %s", resourceLink) + return "", fmt.Errorf("short resource path %s", resourceLink) } if elts[1] != resourceApiService { - return "", fmt.Errorf("Bad resource service %s in %s", elts[1], resourceLink) + return "", fmt.Errorf("bad resource service %s in %s", elts[1], resourceLink) } if _, ok := validResourceApiVersions[elts[2]]; !ok { - return "", fmt.Errorf("Bad version %s in %s", elts[2], resourceLink) + return "", fmt.Errorf("bad version %s in %s", elts[2], resourceLink) } if elts[3] != resourceProject { - return "", fmt.Errorf("Expected %v to start with %s in resource %s", elts[3:], resourceProject, resourceLink) + return "", fmt.Errorf("expected %v to start with %s in resource %s", elts[3:], resourceProject, resourceLink) } return strings.Join(elts[3:], "/"), nil } diff --git a/pkg/gce-pd-csi-driver/controller_test.go b/pkg/gce-pd-csi-driver/controller_test.go index f55d33529..34d8c2de8 100644 --- a/pkg/gce-pd-csi-driver/controller_test.go +++ b/pkg/gce-pd-csi-driver/controller_test.go @@ -58,6 +58,7 @@ const ( name = "test-name" parameterConfidentialCompute = "EnableConfidentialCompute" testDiskEncryptionKmsKey = "projects/KMS_PROJECT_ID/locations/REGION/keyRings/KEY_RING/cryptoKeys/KEY" + stdDiskType = "test-disk-type" ) var ( @@ -66,7 +67,7 @@ var ( RequiredBytes: common.GbToBytes(20), } stdParams = map[string]string{ - common.ParameterKeyType: "test-type", + common.ParameterKeyType: stdDiskType, } stdTopology = []*csi.Topology{ { @@ -288,7 +289,7 @@ func TestCreateSnapshotArguments(t *testing.T) { for _, tc := range testCases { t.Logf("test case: %s", tc.name) // Setup new driver each time so no interference - gceDriver := initGCEDriver(t, tc.seedDisks) + gceDriver := initGCEDriver(t, tc.seedDisks, &GCEControllerServerArgs{}) // Start Test resp, err := gceDriver.cs.CreateSnapshot(context.Background(), tc.req) @@ -335,7 +336,7 @@ func TestUnsupportedMultiZoneCreateSnapshot(t *testing.T) { t.Logf("test case: %s", testCase.name) - gceDriver := initGCEDriver(t, nil) + gceDriver := initGCEDriver(t, nil, &GCEControllerServerArgs{}) gceDriver.cs.multiZoneVolumeHandleConfig = MultiZoneVolumeHandleConfig{ Enable: true, } @@ -370,7 +371,7 @@ func TestUnsupportedMultiZoneControllerExpandVolume(t *testing.T) { t.Logf("test case: %s", testCase.name) - gceDriver := initGCEDriver(t, nil) + gceDriver := initGCEDriver(t, nil, &GCEControllerServerArgs{}) gceDriver.cs.multiZoneVolumeHandleConfig = MultiZoneVolumeHandleConfig{ Enable: true, } @@ -425,7 +426,7 @@ func TestDeleteSnapshot(t *testing.T) { for _, tc := range testCases { t.Logf("test case: %s", tc.name) // Setup new driver each time so no interference - gceDriver := initGCEDriver(t, nil) + gceDriver := initGCEDriver(t, nil, &GCEControllerServerArgs{}) _, err := gceDriver.cs.DeleteSnapshot(context.Background(), tc.req) if err != nil { @@ -547,7 +548,7 @@ func TestListSnapshotsArguments(t *testing.T) { } // Setup new driver each time so no interference - gceDriver := initGCEDriver(t, disks) + gceDriver := initGCEDriver(t, disks, &GCEControllerServerArgs{}) for i := 0; i < tc.numSnapshots; i++ { volumeID := fmt.Sprintf("%s%d", testVolumeID, i) @@ -616,6 +617,7 @@ func TestCreateVolumeArguments(t *testing.T) { enableStoragePools bool expVol *csi.Volume expErrCode codes.Code + enableDiskTopology bool }{ { name: "success default", @@ -736,7 +738,7 @@ func TestCreateVolumeArguments(t *testing.T) { Name: "test-name", CapacityRange: stdCapRange, VolumeCapabilities: stdVolCaps, - Parameters: map[string]string{"type": "test-type"}, + Parameters: map[string]string{"type": stdDiskType}, AccessibilityRequirements: &csi.TopologyRequirement{ Requisite: []*csi.Topology{ { @@ -762,7 +764,7 @@ func TestCreateVolumeArguments(t *testing.T) { Name: "test-name", CapacityRange: stdCapRange, VolumeCapabilities: stdVolCaps, - Parameters: map[string]string{"type": "test-type"}, + Parameters: map[string]string{"type": stdDiskType}, AccessibilityRequirements: &csi.TopologyRequirement{ Requisite: []*csi.Topology{ { @@ -1346,14 +1348,42 @@ func TestCreateVolumeArguments(t *testing.T) { }, expErrCode: codes.OK, }, + // Disk Topology Enabled tests + { + name: "success with disk topology enabled", + req: &csi.CreateVolumeRequest{ + Name: "test-name", + CapacityRange: stdCapRange, + VolumeCapabilities: stdVolCaps, + Parameters: stdParams, + }, + expVol: &csi.Volume{ + CapacityBytes: common.GbToBytes(20), + VolumeId: testVolumeID, + VolumeContext: nil, + AccessibleTopology: []*csi.Topology{ + { + Segments: map[string]string{ + common.TopologyKeyZone: zone, + // Disk type is added as topology segment. + common.DiskTypeLabelKey(stdDiskType): "true", + }, + }, + }, + }, + enableDiskTopology: true, + }, } // Run test cases for _, tc := range testCases { t.Logf("test case: %s", tc.name) + // Setup new driver each time so no interference - gceDriver := initGCEDriver(t, nil) + args := &GCEControllerServerArgs{EnableDiskTopology: tc.enableDiskTopology} + gceDriver := initGCEDriver(t, nil, args) gceDriver.cs.enableStoragePools = tc.enableStoragePools + // Start Test resp, err := gceDriver.cs.CreateVolume(context.Background(), tc.req) if err != nil { @@ -1748,7 +1778,7 @@ func TestMultiZoneVolumeCreation(t *testing.T) { t.Fatalf("Failed to create fake cloud provider: %v", err) } // Setup new driver each time so no interference - gceDriver := initGCEDriverWithCloudProvider(t, fcp) + gceDriver := initGCEDriverWithCloudProvider(t, fcp, &GCEControllerServerArgs{}) gceDriver.cs.multiZoneVolumeHandleConfig.DiskTypes = []string{"hyperdisk-ml"} gceDriver.cs.multiZoneVolumeHandleConfig.Enable = true gceDriver.cs.fallbackRequisiteZones = tc.fallbackZones @@ -1982,7 +2012,7 @@ func TestCreateVolumeMultiWriterOrAccessMode(t *testing.T) { t.Fatalf("Failed to create fake cloud provider: %v", err) } // Setup new driver each time so no interference - gceDriver := initGCEDriverWithCloudProvider(t, fcp) + gceDriver := initGCEDriverWithCloudProvider(t, fcp, &GCEControllerServerArgs{}) // Start Test resp, err := gceDriver.cs.CreateVolume(context.Background(), tc.req) @@ -2179,7 +2209,7 @@ func TestMultiZoneVolumeCreationErrHandling(t *testing.T) { t.Fatalf("Failed to create fake cloud provider: %v", err) } // Setup new driver each time so no interference - gceDriver := initGCEDriverWithCloudProvider(t, fcp) + gceDriver := initGCEDriverWithCloudProvider(t, fcp, &GCEControllerServerArgs{}) gceDriver.cs.multiZoneVolumeHandleConfig.DiskTypes = []string{"hyperdisk-ml"} gceDriver.cs.multiZoneVolumeHandleConfig.Enable = true @@ -2296,7 +2326,7 @@ func TestCreateVolumeWithVolumeAttributeClassParameters(t *testing.T) { for _, tc := range testCases { var d []*gce.CloudDisk fcp, err := gce.CreateFakeCloudProvider(project, zone, d) - gceDriver := initGCEDriverWithCloudProvider(t, fcp) + gceDriver := initGCEDriverWithCloudProvider(t, fcp, &GCEControllerServerArgs{}) if err != nil { t.Fatalf("Failed to create fake cloud provider: %v", err) @@ -2405,7 +2435,7 @@ func TestVolumeModifyOperation(t *testing.T) { t.Fatalf("Failed to create mock cloud provider: %v", err) } - gceDriver := initGCEDriverWithCloudProvider(t, fcp) + gceDriver := initGCEDriverWithCloudProvider(t, fcp, &GCEControllerServerArgs{}) project, volKey, err := common.VolumeIDToKey(testVolumeID) if err != nil { t.Fatalf("Failed convert key: %v", err) @@ -2567,7 +2597,7 @@ func TestVolumeModifyErrorHandling(t *testing.T) { if err != nil { t.Fatalf("Failed to create mock cloud provider") } - gceDriver := initGCEDriverWithCloudProvider(t, fcp) + gceDriver := initGCEDriverWithCloudProvider(t, fcp, &GCEControllerServerArgs{}) for volKey, err := range tc.modifyVolumeErrors { fcp.AddDiskForErr(volKey, err) @@ -2653,7 +2683,7 @@ func TestListVolumePagination(t *testing.T) { SelfLink: fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/project/zones/zone/disk/%s", name), })) } - gceDriver := initGCEDriver(t, d) + gceDriver := initGCEDriver(t, d, &GCEControllerServerArgs{}) tok := "" for i, expectedEntry := range tc.expectedEntries { lvr := &csi.ListVolumesRequest{ @@ -2740,7 +2770,7 @@ func TestListAttachedVolumePagination(t *testing.T) { } fakeCloudProvider.InsertInstance(&instance, zone, instanceName) } - gceDriver := initGCEDriverWithCloudProvider(t, fakeCloudProvider) + gceDriver := initGCEDriverWithCloudProvider(t, fakeCloudProvider, &GCEControllerServerArgs{}) // Use attached disks (instances.list) API gceDriver.cs.listVolumesConfig.UseInstancesAPIForPublishedNodes = true @@ -2805,7 +2835,7 @@ func TestListVolumeArgs(t *testing.T) { SelfLink: fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/project/zones/zone/disk/%s", name), })) } - gceDriver := initGCEDriver(t, d) + gceDriver := initGCEDriver(t, d, &GCEControllerServerArgs{}) lvr := &csi.ListVolumesRequest{ MaxEntries: tc.maxEntries, } @@ -3000,7 +3030,7 @@ func TestListVolumeResponse(t *testing.T) { fakeCloudProvider.InsertInstance(&instance, instance.Zone, instance.Name) } // Setup new driver each time so no interference - gceDriver := initGCEDriverWithCloudProvider(t, fakeCloudProvider) + gceDriver := initGCEDriverWithCloudProvider(t, fakeCloudProvider, &GCEControllerServerArgs{}) gceDriver.cs.multiZoneVolumeHandleConfig = MultiZoneVolumeHandleConfig{ Enable: true, } @@ -3070,7 +3100,7 @@ func TestCreateVolumeWithVolumeSourceFromSnapshot(t *testing.T) { for _, tc := range testCases { t.Logf("test case: %s", tc.name) // Setup new driver each time so no interference - gceDriver := initGCEDriver(t, nil) + gceDriver := initGCEDriver(t, nil, &GCEControllerServerArgs{}) snapshotParams, err := common.ExtractAndDefaultSnapshotParameters(nil, gceDriver.name, nil) if err != nil { @@ -3241,11 +3271,11 @@ func TestCreateVolumeWithVolumeSourceFromVolume(t *testing.T) { testRegionalVolumeSourceID := fmt.Sprintf("projects/%s/regions/%s/disks/%s", project, region, testSourceVolumeName) testSecondZonalVolumeSourceID := fmt.Sprintf("projects/%s/zones/%s/disks/%s", project, "different-zone1", testSourceVolumeName) zonalParams := map[string]string{ - common.ParameterKeyType: "test-type", common.ParameterKeyReplicationType: replicationTypeNone, + common.ParameterKeyType: stdDiskType, common.ParameterKeyReplicationType: replicationTypeNone, common.ParameterKeyDiskEncryptionKmsKey: "encryption-key", } regionalParams := map[string]string{ - common.ParameterKeyType: "test-type", common.ParameterKeyReplicationType: replicationTypeRegionalPD, + common.ParameterKeyType: stdDiskType, common.ParameterKeyReplicationType: replicationTypeRegionalPD, common.ParameterKeyDiskEncryptionKmsKey: "encryption-key", } requisiteTopology := []*csi.Topology{ @@ -3690,7 +3720,7 @@ func TestCreateVolumeWithVolumeSourceFromVolume(t *testing.T) { sourceCapacityRange: stdCapRange, reqParameters: zonalParams, sourceReqParameters: map[string]string{ - common.ParameterKeyType: "test-type", common.ParameterKeyReplicationType: replicationTypeNone, + common.ParameterKeyType: stdDiskType, common.ParameterKeyReplicationType: replicationTypeNone, common.ParameterKeyDiskEncryptionKmsKey: "different-encryption-key", }, sourceTopology: &csi.TopologyRequirement{ @@ -3797,7 +3827,7 @@ func TestCreateVolumeWithVolumeSourceFromVolume(t *testing.T) { for _, tc := range testCases { t.Logf("test case: %s", tc.name) - gceDriver := initGCEDriver(t, nil) + gceDriver := initGCEDriver(t, nil, &GCEControllerServerArgs{}) gceDriver.cs.enableStoragePools = tc.enableStoragePools req := &csi.CreateVolumeRequest{ @@ -3892,7 +3922,7 @@ func TestCreateVolumeRandomRequisiteTopology(t *testing.T) { Name: "test-name", CapacityRange: stdCapRange, VolumeCapabilities: stdVolCaps, - Parameters: map[string]string{"type": "test-type"}, + Parameters: map[string]string{"type": stdDiskType}, AccessibilityRequirements: &csi.TopologyRequirement{ Requisite: []*csi.Topology{ { @@ -3908,7 +3938,7 @@ func TestCreateVolumeRandomRequisiteTopology(t *testing.T) { }, } - gceDriver := initGCEDriver(t, nil) + gceDriver := initGCEDriver(t, nil, &GCEControllerServerArgs{}) tZones := map[string]bool{} // Start Test @@ -3991,7 +4021,7 @@ func TestDeleteVolume(t *testing.T) { for _, tc := range testCases { t.Logf("test case: %s", tc.name) // Setup new driver each time so no interference - gceDriver := initGCEDriver(t, tc.seedDisks) + gceDriver := initGCEDriver(t, tc.seedDisks, &GCEControllerServerArgs{}) _, err := gceDriver.cs.DeleteVolume(context.Background(), tc.req) if err == nil && tc.expErr { @@ -4043,7 +4073,7 @@ func TestMultiZoneDeleteVolume(t *testing.T) { t.Fatalf("Failed to create fake cloud provider: %v", err) } // Setup new driver each time so no interference - gceDriver := initGCEDriverWithCloudProvider(t, fcp) + gceDriver := initGCEDriverWithCloudProvider(t, fcp, &GCEControllerServerArgs{}) gceDriver.cs.multiZoneVolumeHandleConfig.DiskTypes = []string{"hyperdisk-ml"} gceDriver.cs.multiZoneVolumeHandleConfig.Enable = true _, err = gceDriver.cs.DeleteVolume(context.Background(), tc.req) @@ -4297,7 +4327,7 @@ func TestGetZonesFromTopology(t *testing.T) { expErr bool }{ { - name: "succes: normal", + name: "success: normal", topology: []*csi.Topology{ { Segments: map[string]string{common.TopologyKeyZone: "test-zone"}, @@ -4306,7 +4336,7 @@ func TestGetZonesFromTopology(t *testing.T) { expZones: sets.NewString([]string{"test-zone"}...), }, { - name: "succes: multiple topologies", + name: "success: multiple topologies", topology: []*csi.Topology{ { Segments: map[string]string{common.TopologyKeyZone: "test-zone"}, @@ -5156,7 +5186,7 @@ func TestCreateVolumeDiskReady(t *testing.T) { // Setup hook to create new disks with given status. fcp.UpdateDiskStatus(tc.diskStatus) // Setup new driver each time so no interference - gceDriver := initGCEDriverWithCloudProvider(t, fcp) + gceDriver := initGCEDriverWithCloudProvider(t, fcp, &GCEControllerServerArgs{}) // Start Test resp, err := gceDriver.cs.CreateVolume(context.Background(), tc.req) @@ -5677,7 +5707,7 @@ func TestCreateConfidentialVolume(t *testing.T) { t.Fatalf("Failed to create fake cloud provider: %v", err) } // Setup new driver each time so no interference - gceDriver := initGCEDriverWithCloudProvider(t, fcp) + gceDriver := initGCEDriverWithCloudProvider(t, fcp, &GCEControllerServerArgs{}) if tc.req.VolumeContentSource.GetType() != nil { snapshotParams, err := common.ExtractAndDefaultSnapshotParameters(nil, gceDriver.name, nil) diff --git a/pkg/gce-pd-csi-driver/gce-pd-driver.go b/pkg/gce-pd-csi-driver/gce-pd-driver.go index b92702089..6ec508440 100644 --- a/pkg/gce-pd-csi-driver/gce-pd-driver.go +++ b/pkg/gce-pd-csi-driver/gce-pd-driver.go @@ -145,7 +145,7 @@ func NewIdentityServer(gceDriver *GCEDriver) *GCEIdentityServer { } } -func NewNodeServer(gceDriver *GCEDriver, mounter *mount.SafeFormatAndMount, deviceUtils deviceutils.DeviceUtils, meta metadataservice.MetadataService, statter mountmanager.Statter, args NodeServerArgs) *GCENodeServer { +func NewNodeServer(gceDriver *GCEDriver, mounter *mount.SafeFormatAndMount, deviceUtils deviceutils.DeviceUtils, meta metadataservice.MetadataService, statter mountmanager.Statter, args *NodeServerArgs) *GCENodeServer { return &GCENodeServer{ Driver: gceDriver, Mounter: mounter, @@ -160,7 +160,7 @@ func NewNodeServer(gceDriver *GCEDriver, mounter *mount.SafeFormatAndMount, devi } } -func NewControllerServer(gceDriver *GCEDriver, cloudProvider gce.GCECompute, errorBackoffInitialDuration, errorBackoffMaxDuration time.Duration, fallbackRequisiteZones []string, enableStoragePools bool, enableDataCache bool, multiZoneVolumeHandleConfig MultiZoneVolumeHandleConfig, listVolumesConfig ListVolumesConfig, provisionableDisksConfig ProvisionableDisksConfig, enableHdHA bool) *GCEControllerServer { +func NewControllerServer(gceDriver *GCEDriver, cloudProvider gce.GCECompute, errorBackoffInitialDuration, errorBackoffMaxDuration time.Duration, fallbackRequisiteZones []string, enableStoragePools bool, enableDataCache bool, multiZoneVolumeHandleConfig MultiZoneVolumeHandleConfig, listVolumesConfig ListVolumesConfig, provisionableDisksConfig ProvisionableDisksConfig, enableHdHA bool, args *GCEControllerServerArgs) *GCEControllerServer { return &GCEControllerServer{ Driver: gceDriver, CloudProvider: cloudProvider, @@ -174,6 +174,7 @@ func NewControllerServer(gceDriver *GCEDriver, cloudProvider gce.GCECompute, err listVolumesConfig: listVolumesConfig, provisionableDisksConfig: provisionableDisksConfig, enableHdHA: enableHdHA, + EnableDiskTopology: args.EnableDiskTopology, } } diff --git a/pkg/gce-pd-csi-driver/gce-pd-driver_test.go b/pkg/gce-pd-csi-driver/gce-pd-driver_test.go index cf3ac38ce..fb08cda4b 100644 --- a/pkg/gce-pd-csi-driver/gce-pd-driver_test.go +++ b/pkg/gce-pd-csi-driver/gce-pd-driver_test.go @@ -21,12 +21,12 @@ import ( gce "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/compute" ) -func initGCEDriver(t *testing.T, cloudDisks []*gce.CloudDisk) *GCEDriver { +func initGCEDriver(t *testing.T, cloudDisks []*gce.CloudDisk, args *GCEControllerServerArgs) *GCEDriver { fakeCloudProvider, err := gce.CreateFakeCloudProvider(project, zone, cloudDisks) if err != nil { t.Fatalf("Failed to create fake cloud provider: %v", err) } - return initGCEDriverWithCloudProvider(t, fakeCloudProvider) + return initGCEDriverWithCloudProvider(t, fakeCloudProvider, args) } func initBlockingGCEDriver(t *testing.T, cloudDisks []*gce.CloudDisk, readyToExecute chan chan gce.Signal) *GCEDriver { @@ -38,10 +38,10 @@ func initBlockingGCEDriver(t *testing.T, cloudDisks []*gce.CloudDisk, readyToExe FakeCloudProvider: fakeCloudProvider, ReadyToExecute: readyToExecute, } - return initGCEDriverWithCloudProvider(t, fakeBlockingBlockProvider) + return initGCEDriverWithCloudProvider(t, fakeBlockingBlockProvider, &GCEControllerServerArgs{}) } -func controllerServerForTest(cloudProvider gce.GCECompute) *GCEControllerServer { +func controllerServerForTest(cloudProvider gce.GCECompute, args *GCEControllerServerArgs) *GCEControllerServer { gceDriver := GetGCEDriver() errorBackoffInitialDuration := 200 * time.Millisecond errorBackoffMaxDuration := 5 * time.Minute @@ -54,14 +54,14 @@ func controllerServerForTest(cloudProvider gce.GCECompute) *GCEControllerServer SupportsIopsChange: []string{"hyperdisk-balanced", "hyperdisk-extreme"}, SupportsThroughputChange: []string{"hyperdisk-balanced", "hyperdisk-throughput", "hyperdisk-ml"}, } - - return NewControllerServer(gceDriver, cloudProvider, errorBackoffInitialDuration, errorBackoffMaxDuration, fallbackRequisiteZones, enableStoragePools, enableDataCache, multiZoneVolumeHandleConfig, listVolumesConfig, provisionableDisksConfig, true /* enableHdHA */) + return NewControllerServer(gceDriver, cloudProvider, errorBackoffInitialDuration, errorBackoffMaxDuration, fallbackRequisiteZones, enableStoragePools, enableDataCache, multiZoneVolumeHandleConfig, listVolumesConfig, provisionableDisksConfig, true /* enableHdHA */, args) } -func initGCEDriverWithCloudProvider(t *testing.T, cloudProvider gce.GCECompute) *GCEDriver { +func initGCEDriverWithCloudProvider(t *testing.T, cloudProvider gce.GCECompute, args *GCEControllerServerArgs) *GCEDriver { vendorVersion := "test-vendor" gceDriver := GetGCEDriver() - controllerServer := controllerServerForTest(cloudProvider) + + controllerServer := controllerServerForTest(cloudProvider, args) err := gceDriver.SetupGCEDriver(driver, vendorVersion, nil, nil, nil, controllerServer, nil) if err != nil { t.Fatalf("Failed to setup GCE Driver: %v", err) diff --git a/pkg/gce-pd-csi-driver/node.go b/pkg/gce-pd-csi-driver/node.go index f45d159b7..c1fd96e3c 100644 --- a/pkg/gce-pd-csi-driver/node.go +++ b/pkg/gce-pd-csi-driver/node.go @@ -54,7 +54,7 @@ type GCENodeServer struct { // for that same volume (as defined by VolumeID) return an Aborted error volumeLocks *common.VolumeLocks - // enableDeviceInUseCheck, if true, will block NodeUnstageVolume requests if the specified + // enableDeviceInUseCheck, if true, will block NodeUnstageVolume request if the specified // device is still in use (or until --device-in-use-timeout is reached, if specified) enableDeviceInUseCheck bool // deviceInUseErrors keeps tracks of device names and a timestamp for when an error is diff --git a/pkg/gce-pd-csi-driver/node_test.go b/pkg/gce-pd-csi-driver/node_test.go index 57c3e82b9..2de924470 100644 --- a/pkg/gce-pd-csi-driver/node_test.go +++ b/pkg/gce-pd-csi-driver/node_test.go @@ -37,22 +37,23 @@ import ( mountmanager "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/mount-manager" ) -const defaultVolumeID = "project/test001/zones/c1/disks/testDisk" -const defaultTargetPath = "/mnt/test" -const defaultStagingPath = "/staging" +const ( + defaultVolumeID = "project/test001/zones/c1/disks/testDisk" + defaultTargetPath = "/mnt/test" + defaultStagingPath = "/staging" +) func getTestGCEDriver(t *testing.T) *GCEDriver { - return getCustomTestGCEDriver(t, mountmanager.NewFakeSafeMounter(), deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService()) + return getCustomTestGCEDriver(t, mountmanager.NewFakeSafeMounter(), deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), &NodeServerArgs{}) } func getTestGCEDriverWithCustomMounter(t *testing.T, mounter *mount.SafeFormatAndMount) *GCEDriver { - return getCustomTestGCEDriver(t, mounter, deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService()) + return getCustomTestGCEDriver(t, mounter, deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), &NodeServerArgs{}) } -func getCustomTestGCEDriver(t *testing.T, mounter *mount.SafeFormatAndMount, deviceUtils deviceutils.DeviceUtils, metaService metadataservice.MetadataService) *GCEDriver { +func getCustomTestGCEDriver(t *testing.T, mounter *mount.SafeFormatAndMount, deviceUtils deviceutils.DeviceUtils, metaService metadataservice.MetadataService, args *NodeServerArgs) *GCEDriver { gceDriver := GetGCEDriver() - enableDataCache := false - nodeServer := NewNodeServer(gceDriver, mounter, deviceUtils, metaService, mountmanager.NewFakeStatter(mounter), NodeServerArgs{true, 0, enableDataCache, false /*dataCacheEnableNodePool */}) + nodeServer := NewNodeServer(gceDriver, mounter, deviceUtils, metaService, mountmanager.NewFakeStatter(mounter), args) err := gceDriver.SetupGCEDriver(driver, "test-vendor", nil, nil, nil, nil, nodeServer) if err != nil { t.Fatalf("Failed to setup GCE Driver: %v", err) @@ -63,7 +64,13 @@ func getCustomTestGCEDriver(t *testing.T, mounter *mount.SafeFormatAndMount, dev func getTestBlockingMountGCEDriver(t *testing.T, readyToExecute chan chan struct{}) *GCEDriver { gceDriver := GetGCEDriver() mounter := mountmanager.NewFakeSafeBlockingMounter(readyToExecute) - nodeServer := NewNodeServer(gceDriver, mounter, deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), mountmanager.NewFakeStatter(mounter), NodeServerArgs{true, 0, true, false /*dataCacheEnableNodePool */}) + args := &NodeServerArgs{ + EnableDeviceInUseCheck: true, + DeviceInUseTimeout: 0, + EnableDataCache: true, + DataCacheEnabledNodePool: false, + } + nodeServer := NewNodeServer(gceDriver, mounter, deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), mountmanager.NewFakeStatter(mounter), args) err := gceDriver.SetupGCEDriver(driver, "test-vendor", nil, nil, nil, nil, nodeServer) if err != nil { t.Fatalf("Failed to setup GCE Driver: %v", err) @@ -75,7 +82,13 @@ func getTestBlockingFormatAndMountGCEDriver(t *testing.T, readyToExecute chan ch gceDriver := GetGCEDriver() enableDataCache := true mounter := mountmanager.NewFakeSafeBlockingMounter(readyToExecute) - nodeServer := NewNodeServer(gceDriver, mounter, deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), mountmanager.NewFakeStatter(mounter), NodeServerArgs{true, 0, enableDataCache, false /*dataCacheEnableNodePool */}).WithSerializedFormatAndMount(5*time.Second, 1) + args := &NodeServerArgs{ + EnableDeviceInUseCheck: true, + DeviceInUseTimeout: 0, + EnableDataCache: enableDataCache, + DataCacheEnabledNodePool: false, + } + nodeServer := NewNodeServer(gceDriver, mounter, deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), mountmanager.NewFakeStatter(mounter), args).WithSerializedFormatAndMount(5*time.Second, 1) err := gceDriver.SetupGCEDriver(driver, "test-vendor", nil, nil, nil, nil, nodeServer) if err != nil { @@ -215,7 +228,6 @@ func TestNodeGetVolumeStats(t *testing.T) { } func TestNodeGetVolumeLimits(t *testing.T) { - gceDriver := getTestGCEDriver(t) ns := gceDriver.ns req := &csi.NodeGetInfoRequest{} @@ -301,18 +313,20 @@ func TestNodeGetVolumeLimits(t *testing.T) { for _, tc := range testCases { t.Logf("Test case: %s", tc.name) + metadataservice.SetMachineType(tc.machineType) res, err := ns.NodeGetInfo(context.Background(), req) if err != nil && !tc.expectError { t.Fatalf("Failed to get node info: %v", err) - } else { - volumeLimit := res.GetMaxVolumesPerNode() - if volumeLimit != tc.expVolumeLimit { - t.Fatalf("Expected volume limit: %v, got %v, for machine-type: %v", - tc.expVolumeLimit, volumeLimit, tc.machineType) - } - t.Logf("Get node info: %v", res) } + + volumeLimit := res.GetMaxVolumesPerNode() + if volumeLimit != tc.expVolumeLimit { + t.Fatalf("Expected volume limit: %v, got %v, for machine-type: %v", + tc.expVolumeLimit, volumeLimit, tc.machineType) + } + + t.Logf("Get node info: %v", res) } } diff --git a/pkg/gce-pd-csi-driver/server_test.go b/pkg/gce-pd-csi-driver/server_test.go index cd8b78b10..9b6bcf420 100644 --- a/pkg/gce-pd-csi-driver/server_test.go +++ b/pkg/gce-pd-csi-driver/server_test.go @@ -22,7 +22,6 @@ import ( "strings" "testing" - csipb "github.com/container-storage-interface/spec/lib/go/csi" "google.golang.org/grpc" csi "github.com/container-storage-interface/spec/lib/go/csi" @@ -63,7 +62,11 @@ func createServerClient(mm *metrics.MetricsManager, socketFile string, seedDisks if err != nil { return nil, fmt.Errorf("failed to create fake cloud provider: %v", err) } - controllerServer := controllerServerForTest(fakeCloudProvider) + + args := &GCEControllerServerArgs{ + EnableDiskTopology: false, + } + controllerServer := controllerServerForTest(fakeCloudProvider, args) if err := gceDriver.SetupGCEDriver(driver, "test-vendor", nil, nil, identityServer, controllerServer, nil); err != nil { return nil, fmt.Errorf("failed to setup GCE Driver: %v", err) } @@ -95,7 +98,7 @@ func TestServerCreateVolumeMetric(t *testing.T) { if err != nil { t.Fatalf("Failed to create server client: %v", err) } - controllerClient := csipb.NewControllerClient(conn) + controllerClient := csi.NewControllerClient(conn) req := &csi.CreateVolumeRequest{ Name: name, CapacityRange: stdCapRange, @@ -145,7 +148,7 @@ func TestServerValidateVolumeCapabilitiesMetric(t *testing.T) { if err != nil { t.Fatalf("Failed to create server client: %v", err) } - controllerClient := csipb.NewControllerClient(conn) + controllerClient := csi.NewControllerClient(conn) req := &csi.ValidateVolumeCapabilitiesRequest{ VolumeId: fmt.Sprintf("projects/%s/zones/%s/disks/%s", project, zone, name), VolumeCapabilities: stdVolCaps, @@ -187,7 +190,7 @@ func TestServerGetPluginInfoMetric(t *testing.T) { if err != nil { t.Fatalf("Failed to create server client: %v", err) } - idClient := csipb.NewIdentityClient(conn) + idClient := csi.NewIdentityClient(conn) resp, err := idClient.GetPluginInfo(context.Background(), &csi.GetPluginInfoRequest{}) if err != nil { t.Fatalf("GetPluginInfo returned unexpected error: %v", err) diff --git a/test/sanity/sanity_test.go b/test/sanity/sanity_test.go index f3ff5e59a..89192f150 100644 --- a/test/sanity/sanity_test.go +++ b/test/sanity/sanity_test.go @@ -73,16 +73,20 @@ func TestSanity(t *testing.T) { mounter := mountmanager.NewFakeSafeMounter() deviceUtils := deviceutils.NewFakeDeviceUtils(true) - args := driver.NodeServerArgs{ + nodeArgs := &driver.NodeServerArgs{ EnableDeviceInUseCheck: true, DeviceInUseTimeout: 0, - EnableDataCache: true} + EnableDataCache: enableDataCache, + } - //Initialize GCE Driver + // Initialize GCE Driver identityServer := driver.NewIdentityServer(gceDriver) - controllerServer := driver.NewControllerServer(gceDriver, cloudProvider, 0, 5*time.Minute, fallbackRequisiteZones, enableStoragePools, enableDataCache, multiZoneVolumeHandleConfig, listVolumesConfig, provisionableDisksConfig, true) + contArgs := &driver.GCEControllerServerArgs{ + EnableDiskTopology: false, + } + controllerServer := driver.NewControllerServer(gceDriver, cloudProvider, 0, 5*time.Minute, fallbackRequisiteZones, enableStoragePools, enableDataCache, multiZoneVolumeHandleConfig, listVolumesConfig, provisionableDisksConfig, true, contArgs) fakeStatter := mountmanager.NewFakeStatterWithOptions(mounter, mountmanager.FakeStatterOptions{IsBlock: false}) - nodeServer := driver.NewNodeServer(gceDriver, mounter, deviceUtils, metadataservice.NewFakeService(), fakeStatter, args) + nodeServer := driver.NewNodeServer(gceDriver, mounter, deviceUtils, metadataservice.NewFakeService(), fakeStatter, nodeArgs) err = gceDriver.SetupGCEDriver(driverName, vendorVersion, extraLabels, nil, identityServer, controllerServer, nodeServer) if err != nil { t.Fatalf("Failed to initialize GCE CSI Driver: %v", err.Error())