diff --git a/charts/aws-efs-csi-driver/templates/node-daemonset.yaml b/charts/aws-efs-csi-driver/templates/node-daemonset.yaml index 324350ce0..78677bd76 100644 --- a/charts/aws-efs-csi-driver/templates/node-daemonset.yaml +++ b/charts/aws-efs-csi-driver/templates/node-daemonset.yaml @@ -81,6 +81,10 @@ spec: - --vol-metrics-opt-in={{ hasKey .Values.node "volMetricsOptIn" | ternary .Values.node.volMetricsOptIn false }} - --vol-metrics-refresh-period={{ hasKey .Values.node "volMetricsRefreshPeriod" | ternary .Values.node.volMetricsRefreshPeriod 240 }} - --vol-metrics-fs-rate-limit={{ hasKey .Values.node "volMetricsFsRateLimit" | ternary .Values.node.volMetricsFsRateLimit 5 }} + - --max-inflight-mount-calls-opt-in={{ hasKey .Values.node "maxInflightMountCallsOptIn" | ternary .Values.node.maxInflightMountCallsOptIn false }} + - --max-inflight-mount-calls={{ hasKey .Values.node "maxInflightMountCalls" | ternary .Values.node.maxInflightMountCalls 10 }} + - --volume-attach-limit-opt-in={{ hasKey .Values.node "volumeAttachLimitOptIn" | ternary .Values.node.volumeAttachLimitOptIn false }} + - --volume-attach-limit={{ hasKey .Values.node "volumeAttachLimit" | ternary .Values.node.volumeAttachLimit 20 }} env: - name: CSI_ENDPOINT value: unix:/csi/csi.sock diff --git a/cmd/main.go b/cmd/main.go index 98e195a62..0ebab9318 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -41,8 +41,12 @@ func main() { volMetricsFsRateLimit = flag.Int("vol-metrics-fs-rate-limit", 5, "Volume metrics routines rate limiter per file system") deleteAccessPointRootDir = flag.Bool("delete-access-point-root-dir", false, "Opt in to delete access point root directory by DeleteVolume. By default, DeleteVolume will delete the access point behind Persistent Volume and deleting access point will not delete the access point root directory or its contents.") - adaptiveRetryMode = flag.Bool("adaptive-retry-mode", true, "Opt out to use standard sdk retry configuration. By default, adaptive retry mode will be used to more heavily client side rate limit EFS API requests.") - tags = flag.String("tags", "", "Space separated key:value pairs which will be added as tags for EFS resources. For example, 'environment:prod region:us-east-1'") + adaptiveRetryMode = flag.Bool("adaptive-retry-mode", true, "Opt out to use standard sdk retry configuration. By default, adaptive retry mode will be used to more heavily client side rate limit EFS API requests.") + tags = flag.String("tags", "", "Space separated key:value pairs which will be added as tags for EFS resources. For example, 'environment:prod region:us-east-1'") + maxInflightMountCallsOptIn = flag.Bool("max-inflight-mount-calls-opt-in", false, "Opt in to use max inflight mount calls limit.") + maxInflightMountCalls = flag.Int64("max-inflight-mount-calls", driver.UnsetMaxInflightMountCounts, "New NodePublishVolume operation will be blocked if maximum number of inflight calls is reached. If maxInflightMountCallsOptIn is true, it has to be set to a positive value.") + volumeAttachLimitOptIn = flag.Bool("volume-attach-limit-opt-in", false, "Opt in to use volume attach limit.") + volumeAttachLimit = flag.Int64("volume-attach-limit", driver.UnsetVolumeAttachLimit, "Maximum number of volumes that can be attached to a node. If volumeAttachLimitOptIn is true, it has to be set to a positive value.") ) klog.InitFlags(nil) flag.Parse() @@ -61,7 +65,7 @@ func main() { if err != nil { klog.Fatalln(err) } - drv := driver.NewDriver(*endpoint, etcAmazonEfs, *efsUtilsStaticFilesPath, *tags, *volMetricsOptIn, *volMetricsRefreshPeriod, *volMetricsFsRateLimit, *deleteAccessPointRootDir, *adaptiveRetryMode) + drv := driver.NewDriver(*endpoint, etcAmazonEfs, *efsUtilsStaticFilesPath, *tags, *volMetricsOptIn, *volMetricsRefreshPeriod, *volMetricsFsRateLimit, *deleteAccessPointRootDir, *adaptiveRetryMode, *maxInflightMountCallsOptIn, *maxInflightMountCalls, *volumeAttachLimitOptIn, *volumeAttachLimit) if err := drv.Run(); err != nil { klog.Fatalln(err) } diff --git a/deploy/kubernetes/base/node-daemonset.yaml b/deploy/kubernetes/base/node-daemonset.yaml index 0c675d334..a969fd1e6 100644 --- a/deploy/kubernetes/base/node-daemonset.yaml +++ b/deploy/kubernetes/base/node-daemonset.yaml @@ -57,6 +57,10 @@ spec: - --vol-metrics-opt-in=false - --vol-metrics-refresh-period=240 - --vol-metrics-fs-rate-limit=5 + - --max-inflight-mount-calls-opt-in=false + - --max-inflight-mount-calls=10 + - --volume-attach-limit-opt-in=false + - --volume-attach-limit=20 env: - name: CSI_ENDPOINT value: unix:/csi/csi.sock diff --git a/docs/README.md b/docs/README.md index a3e114432..d6aa46506 100644 --- a/docs/README.md +++ b/docs/README.md @@ -353,8 +353,30 @@ After deploying the driver, you can continue to these sections: |-----------------------------|--------|---------|----------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | vol-metrics-opt-in | | false | true | Opt in to emit volume metrics. | | vol-metrics-refresh-period | | 240 | true | Refresh period for volume metrics in minutes. | -| vol-metrics-fs-rate-limit | | 5 | true | Volume metrics routines rate limiter per file system. | +| max-inflight-mount-calls-opt-in | | false | true | Opt in to use max inflight mount calls limit. | +| max-inflight-mount-calls | | -1 | true | New NodePublishVolume operation will be blocked if maximum number of inflight calls is reached. If maxInflightMountCallsOptIn is true, it has to be set to a positive value. | +| volume-attach-limit-opt-in | | false | true | Opt in to use volume attach limit. | +| volume-attach-limit | | -1 | true | Maximum number of volumes that can be attached to a node. If volumeAttachLimitOptIn is true, it has to be set to a positive value. | +#### Suggestion for setting max-inflight-mount-calls and volume-attach-limit + +To prevent out-of-memory (OOM) issues in the efs-plugin container, configure these parameters based on your container's memory limit: + +- Each EFS volume consumes **~12 MiB** of memory (for the efs-proxy process) +- Each concurrent mount operation consumes **~30 MiB** of memory during peak usage + - A single mount operation typically takes **~100 milliseconds** to complete + - For example, concurrent mount operations can occur when multiple pods are being scheduled simultaneously and need to mount EFS volumes + +#### Recommended formula +``` +Container Memory Limit = ((volume-attach-limit × 12) + (max-inflight-mount-calls × 30)) × 1.5 MiB +``` + +#### Example calculation +- For 50 volumes and 10 concurrent mounts: `((50 × 12) + (10 × 30)) × 1.5 = 1,350 MiB` +- Set container memory limit to at least 1.4 GiB + +> **Note:** The 1.5x multiplier provides a safety buffer for other container processes and memory fluctuations. ##### Understanding the Impact of vol-metrics-opt-in: diff --git a/pkg/driver/driver.go b/pkg/driver/driver.go index 79f166492..271cec284 100644 --- a/pkg/driver/driver.go +++ b/pkg/driver/driver.go @@ -33,7 +33,9 @@ const ( driverName = "efs.csi.aws.com" // AgentNotReadyTaintKey contains the key of taints to be removed on driver startup - AgentNotReadyNodeTaintKey = "efs.csi.aws.com/agent-not-ready" + AgentNotReadyNodeTaintKey = "efs.csi.aws.com/agent-not-ready" + UnsetMaxInflightMountCounts = -1 + UnsetVolumeAttachLimit = -1 ) type Driver struct { @@ -53,9 +55,11 @@ type Driver struct { adaptiveRetryMode bool tags map[string]string lockManager LockManagerMap + inFlightMountTracker *InFlightMountTracker + volumeAttachLimit int64 } -func NewDriver(endpoint, efsUtilsCfgPath, efsUtilsStaticFilesPath, tags string, volMetricsOptIn bool, volMetricsRefreshPeriod float64, volMetricsFsRateLimit int, deleteAccessPointRootDir bool, adaptiveRetryMode bool) *Driver { +func NewDriver(endpoint, efsUtilsCfgPath, efsUtilsStaticFilesPath, tags string, volMetricsOptIn bool, volMetricsRefreshPeriod float64, volMetricsFsRateLimit int, deleteAccessPointRootDir bool, adaptiveRetryMode bool, maxInflightMountCallsOptIn bool, maxInflightMountCalls int64, volumeAttachLimitOptIn bool, volumeAttachLimit int64) *Driver { cloud, err := cloud.NewCloud(adaptiveRetryMode) if err != nil { klog.Fatalln(err) @@ -79,6 +83,8 @@ func NewDriver(endpoint, efsUtilsCfgPath, efsUtilsStaticFilesPath, tags string, adaptiveRetryMode: adaptiveRetryMode, tags: parseTagsFromStr(strings.TrimSpace(tags)), lockManager: NewLockManagerMap(), + inFlightMountTracker: NewInFlightMountTracker(getMaxInflightMountCalls(maxInflightMountCallsOptIn, maxInflightMountCalls)), + volumeAttachLimit: getVolumeAttachLimit(volumeAttachLimitOptIn, volumeAttachLimit), } } diff --git a/pkg/driver/inflight_mount_tracker.go b/pkg/driver/inflight_mount_tracker.go new file mode 100644 index 000000000..e628fe312 --- /dev/null +++ b/pkg/driver/inflight_mount_tracker.go @@ -0,0 +1,47 @@ +package driver + +import ( + "sync" + + "k8s.io/klog/v2" +) + +type InFlightMountTracker struct { + mux sync.Mutex + count int64 + maxCount int64 +} + +func NewInFlightMountTracker(maxCount int64) *InFlightMountTracker { + if maxCount <= 0 { + klog.V(4).InfoS("InFlightMountTracker is disabled") + return nil + } + return &InFlightMountTracker{ + maxCount: maxCount, + } +} + +func (checker *InFlightMountTracker) increment() bool { + checker.mux.Lock() + defer checker.mux.Unlock() + + if checker.count >= checker.maxCount { + return false + } + + checker.count++ + return true +} + +func (checker *InFlightMountTracker) decrement() bool { + checker.mux.Lock() + defer checker.mux.Unlock() + if checker.count == 0 { + klog.Error("InFlightMountTracker: trying to decrement count when it is already 0") + return false + } + + checker.count-- + return true +} diff --git a/pkg/driver/inflight_mount_tracker_test.go b/pkg/driver/inflight_mount_tracker_test.go new file mode 100644 index 000000000..b6166d620 --- /dev/null +++ b/pkg/driver/inflight_mount_tracker_test.go @@ -0,0 +1,97 @@ +package driver + +import ( + "sync" + "testing" +) + +func assertEqual[T comparable](t *testing.T, actual, expected T, description string) { + if expected != actual { + t.Errorf("%s: expected %v != actual %v", description, expected, actual) + } +} + +func TestNewInFlightMountTracker(t *testing.T) { + checker := NewInFlightMountTracker(5) + assertEqual(t, checker.maxCount, 5, "Max inflight count") + assertEqual(t, checker.count, 0, "Inflight count") + + checker = NewInFlightMountTracker(UnsetMaxInflightMountCounts) + assertEqual(t, checker, nil, "Nil checker for negative max inflight mount counts") + + checker = NewInFlightMountTracker(0) + assertEqual(t, checker, nil, "Nil checker for zero max inflight mount counts") +} + +func TestIncrement(t *testing.T) { + maxFlightCount := int64(2) + checker := NewInFlightMountTracker(maxFlightCount) + + if !checker.increment() { + t.Errorf("First increment should succeed with max inflight count=%d", maxFlightCount) + } + assertEqual(t, checker.count, 1, "Inflight count after first increment") + + if !checker.increment() { + t.Errorf("Second increment should succeed with max inflight count=%d", maxFlightCount) + } + assertEqual(t, checker.count, 2, "Inflight count after second increment") + + if checker.increment() { + t.Errorf("Third increment should fail with max inflight count=%d", maxFlightCount) + } + assertEqual(t, checker.count, 2, "Inflight count after third increment") +} + +func TestDecrement(t *testing.T) { + maxFlightCount := int64(2) + checker := NewInFlightMountTracker(maxFlightCount) + checker.increment() + checker.increment() + + checker.decrement() + assertEqual(t, checker.count, 1, "Inflight count after first decrement") + + checker.decrement() + assertEqual(t, checker.count, 0, "Inflight count after second decrement") + + // Should not decrement further when the count is already zero + checker.decrement() + assertEqual(t, checker.count, 0, "Inflight count after decrement when count is already zero") +} + +func TestConcurrency(t *testing.T) { + // Run multiple iterations to increase chance of catching race conditions + for i := 0; i < 100; i++ { + maxFlightCount := int64(500) + checker := NewInFlightMountTracker(maxFlightCount) + var wg sync.WaitGroup + var mu sync.Mutex + + numGoRoutinesForIncrement := 400 + for range numGoRoutinesForIncrement { + wg.Add(1) + go func() { + defer wg.Done() + checker.increment() + }() + } + + numGoRoutinesForDecrement := 350 + actualDecrements := 0 + for range numGoRoutinesForDecrement { + wg.Add(1) + go func() { + defer wg.Done() + if checker.decrement() { + mu.Lock() + actualDecrements++ + mu.Unlock() + } + }() + } + + wg.Wait() + assertEqual(t, checker.count, int64(numGoRoutinesForIncrement-actualDecrements), "inflight count") + } +} diff --git a/pkg/driver/node.go b/pkg/driver/node.go index 58da96f48..4040fda84 100644 --- a/pkg/driver/node.go +++ b/pkg/driver/node.go @@ -47,6 +47,10 @@ var ( supportedFSTypes = []string{"efs", ""} ) +const ( + maxInflightMountCallsReached = "The number of concurrent mount calls is %v, which has reached the limit" +) + func (d *Driver) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) { return nil, status.Error(codes.Unimplemented, "") } @@ -77,6 +81,17 @@ func (d *Driver) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolu return nil, status.Error(codes.InvalidArgument, "Volume capability access type must be mount") } + if d.inFlightMountTracker != nil { + if ok := d.inFlightMountTracker.increment(); !ok { + return nil, status.Errorf(codes.Aborted, maxInflightMountCallsReached, d.inFlightMountTracker.maxCount) + } + + defer func() { + klog.V(4).Infof("NodePublishVolume: volume operation finished for volumeId: %s with %d inflight count before decrementing", req.GetVolumeId(), d.inFlightMountTracker.count) + d.inFlightMountTracker.decrement() + }() + } + // TODO when CreateVolume is implemented, it must use the same key names subpath := "/" encryptInTransit := true @@ -323,8 +338,12 @@ func (d *Driver) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabi func (d *Driver) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) { klog.V(4).Infof("NodeGetInfo: called with args %+v", util.SanitizeRequest(*req)) + maxVolumesPerNode := d.volumeAttachLimit + klog.V(4).Infof("NodeGetInfo: maxVolumesPerNode=%d", maxVolumesPerNode) + return &csi.NodeGetInfoResponse{ - NodeId: d.nodeID, + NodeId: d.nodeID, + MaxVolumesPerNode: maxVolumesPerNode, }, nil } @@ -538,3 +557,33 @@ func tryRemoveNotReadyTaintUntilSucceed(interval time.Duration, removeFn func() time.Sleep(interval) } } + +func getMaxInflightMountCalls(maxInflightMountCallsOptIn bool, maxInflightMountCalls int64) int64 { + if maxInflightMountCallsOptIn && maxInflightMountCalls <= 0 { + klog.Errorf("Fatal error: maxInflightMountCalls must be greater than 0 when maxInflightMountCallsOptIn is true!") + klog.FlushAndExit(klog.ExitFlushTimeout, 1) + } + + if !maxInflightMountCallsOptIn { + klog.V(4).Infof("MaxInflightMountCallsOptIn is false, setting maxInflightMountCalls to %d and inflight check is disabled", UnsetMaxInflightMountCounts) + return UnsetMaxInflightMountCounts + } + + klog.V(4).Infof("MaxInflightMountCalls is manually set to %d", maxInflightMountCalls) + return maxInflightMountCalls +} + +func getVolumeAttachLimit(volumeAttachLimitOptIn bool, volumeAttachLimit int64) int64 { + if volumeAttachLimitOptIn && volumeAttachLimit <= 0 { + klog.Errorf("Fatal error: volumeAttachLimit must be greater than 0 when volumeAttachLimitOptIn is true!") + klog.FlushAndExit(klog.ExitFlushTimeout, 1) + } + + if !volumeAttachLimitOptIn { + klog.V(4).Infof("VolumeAttachLimitOptIn is false, setting maxVolumesPerNode to zero so that container orchestrator will decide the value") + return 0 + } + + klog.V(4).Infof("VolumeAttachLimit is manually set to %d", volumeAttachLimit) + return volumeAttachLimit +} diff --git a/pkg/driver/node_test.go b/pkg/driver/node_test.go index f1b01cb73..bc2bd9aea 100644 --- a/pkg/driver/node_test.go +++ b/pkg/driver/node_test.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "os" + "os/exec" "reflect" "strings" "testing" @@ -43,16 +44,17 @@ type errtyp struct { message string } -func setup(mockCtrl *gomock.Controller, volStatter VolStatter, volMetricsOptIn bool) (*mocks.MockMounter, *Driver, context.Context) { +func setup(mockCtrl *gomock.Controller, volStatter VolStatter, volMetricsOptIn bool, maxInflightCalls int64) (*mocks.MockMounter, *Driver, context.Context) { mockMounter := mocks.NewMockMounter(mockCtrl) nodeCaps := SetNodeCapOptInFeatures(volMetricsOptIn) driver := &Driver{ - endpoint: "endpoint", - nodeID: "nodeID", - mounter: mockMounter, - volStatter: volStatter, - volMetricsOptIn: true, - nodeCaps: nodeCaps, + endpoint: "endpoint", + nodeID: "nodeID", + mounter: mockMounter, + volStatter: volStatter, + volMetricsOptIn: true, + nodeCaps: nodeCaps, + inFlightMountTracker: NewInFlightMountTracker(maxInflightCalls), } ctx := context.Background() return mockMounter, driver, ctx @@ -99,13 +101,14 @@ func TestNodePublishVolume(t *testing.T) { ) testCases := []struct { - name string - req *csi.NodePublishVolumeRequest - expectMakeDir bool - mountArgs []interface{} - mountSuccess bool - volMetricsOptIn bool - expectError errtyp + name string + req *csi.NodePublishVolumeRequest + expectMakeDir bool + mountArgs []interface{} + mountSuccess bool + volMetricsOptIn bool + expectError errtyp + maxInflightMountCalls int64 }{ { name: "success: normal", @@ -114,10 +117,11 @@ func TestNodePublishVolume(t *testing.T) { VolumeCapability: stdVolCap, TargetPath: targetPath, }, - expectMakeDir: true, - mountArgs: []interface{}{volumeId + ":/", targetPath, "efs", []string{"tls"}}, - mountSuccess: true, - volMetricsOptIn: true, + expectMakeDir: true, + mountArgs: []interface{}{volumeId + ":/", targetPath, "efs", []string{"tls"}}, + mountSuccess: true, + volMetricsOptIn: true, + maxInflightMountCalls: UnsetMaxInflightMountCounts, }, { name: "success: empty path", @@ -126,10 +130,11 @@ func TestNodePublishVolume(t *testing.T) { VolumeCapability: stdVolCap, TargetPath: targetPath, }, - expectMakeDir: true, - mountArgs: []interface{}{volumeId + ":/", targetPath, "efs", []string{"tls"}}, - mountSuccess: true, - volMetricsOptIn: true, + expectMakeDir: true, + mountArgs: []interface{}{volumeId + ":/", targetPath, "efs", []string{"tls"}}, + mountSuccess: true, + volMetricsOptIn: true, + maxInflightMountCalls: UnsetMaxInflightMountCounts, }, { name: "success: empty path and access point", @@ -138,10 +143,11 @@ func TestNodePublishVolume(t *testing.T) { VolumeCapability: stdVolCap, TargetPath: targetPath, }, - expectMakeDir: true, - mountArgs: []interface{}{volumeId + ":/", targetPath, "efs", []string{"tls"}}, - mountSuccess: true, - volMetricsOptIn: true, + expectMakeDir: true, + mountArgs: []interface{}{volumeId + ":/", targetPath, "efs", []string{"tls"}}, + mountSuccess: true, + volMetricsOptIn: true, + maxInflightMountCalls: UnsetMaxInflightMountCounts, }, { name: "success: normal with read only mount", @@ -151,9 +157,10 @@ func TestNodePublishVolume(t *testing.T) { TargetPath: targetPath, Readonly: true, }, - expectMakeDir: true, - mountArgs: []interface{}{volumeId + ":/", targetPath, "efs", []string{"tls", "ro"}}, - mountSuccess: true, + expectMakeDir: true, + mountArgs: []interface{}{volumeId + ":/", targetPath, "efs", []string{"tls", "ro"}}, + mountSuccess: true, + maxInflightMountCalls: UnsetMaxInflightMountCounts, }, { name: "success: normal with tls mount options", @@ -171,9 +178,10 @@ func TestNodePublishVolume(t *testing.T) { }, TargetPath: targetPath, }, - expectMakeDir: true, - mountArgs: []interface{}{volumeId + ":/", targetPath, "efs", []string{"tls"}}, - mountSuccess: true, + expectMakeDir: true, + mountArgs: []interface{}{volumeId + ":/", targetPath, "efs", []string{"tls"}}, + mountSuccess: true, + maxInflightMountCalls: UnsetMaxInflightMountCounts, }, { // TODO: Validate deprecation warning @@ -184,9 +192,10 @@ func TestNodePublishVolume(t *testing.T) { TargetPath: targetPath, VolumeContext: map[string]string{"path": "/a/b"}, }, - expectMakeDir: true, - mountArgs: []interface{}{volumeId + ":/a/b", targetPath, "efs", []string{"tls"}}, - mountSuccess: true, + expectMakeDir: true, + mountArgs: []interface{}{volumeId + ":/a/b", targetPath, "efs", []string{"tls"}}, + mountSuccess: true, + maxInflightMountCalls: UnsetMaxInflightMountCounts, }, { name: "fail: path in volume context must be absolute", @@ -201,6 +210,7 @@ func TestNodePublishVolume(t *testing.T) { code: "InvalidArgument", message: `Volume context property "path" must be an absolute path`, }, + maxInflightMountCalls: UnsetMaxInflightMountCounts, }, { name: "success: normal with path in volume handle", @@ -210,9 +220,10 @@ func TestNodePublishVolume(t *testing.T) { VolumeCapability: stdVolCap, TargetPath: targetPath, }, - expectMakeDir: true, - mountArgs: []interface{}{volumeId + ":/a/b", targetPath, "efs", []string{"tls"}}, - mountSuccess: true, + expectMakeDir: true, + mountArgs: []interface{}{volumeId + ":/a/b", targetPath, "efs", []string{"tls"}}, + mountSuccess: true, + maxInflightMountCalls: UnsetMaxInflightMountCounts, }, { name: "success: normal with path in volume handle, empty access point", @@ -222,9 +233,10 @@ func TestNodePublishVolume(t *testing.T) { VolumeCapability: stdVolCap, TargetPath: targetPath, }, - expectMakeDir: true, - mountArgs: []interface{}{volumeId + ":a/b", targetPath, "efs", []string{"tls"}}, - mountSuccess: true, + expectMakeDir: true, + mountArgs: []interface{}{volumeId + ":a/b", targetPath, "efs", []string{"tls"}}, + mountSuccess: true, + maxInflightMountCalls: UnsetMaxInflightMountCounts, }, { name: "success: path in volume handle takes precedence", @@ -234,9 +246,10 @@ func TestNodePublishVolume(t *testing.T) { TargetPath: targetPath, VolumeContext: map[string]string{"path": "/c/d"}, }, - expectMakeDir: true, - mountArgs: []interface{}{volumeId + ":/a/b", targetPath, "efs", []string{"tls"}}, - mountSuccess: true, + expectMakeDir: true, + mountArgs: []interface{}{volumeId + ":/a/b", targetPath, "efs", []string{"tls"}}, + mountSuccess: true, + maxInflightMountCalls: UnsetMaxInflightMountCounts, }, { name: "success: access point in volume handle, no path", @@ -245,9 +258,10 @@ func TestNodePublishVolume(t *testing.T) { VolumeCapability: stdVolCap, TargetPath: targetPath, }, - expectMakeDir: true, - mountArgs: []interface{}{volumeId + ":/", targetPath, "efs", []string{"accesspoint=" + accessPointID, "tls"}}, - mountSuccess: true, + expectMakeDir: true, + mountArgs: []interface{}{volumeId + ":/", targetPath, "efs", []string{"accesspoint=" + accessPointID, "tls"}}, + mountSuccess: true, + maxInflightMountCalls: UnsetMaxInflightMountCounts, }, { name: "success: path and access point in volume handle", @@ -256,9 +270,10 @@ func TestNodePublishVolume(t *testing.T) { VolumeCapability: stdVolCap, TargetPath: targetPath, }, - expectMakeDir: true, - mountArgs: []interface{}{volumeId + ":/a/b", targetPath, "efs", []string{"accesspoint=" + accessPointID, "tls"}}, - mountSuccess: true, + expectMakeDir: true, + mountArgs: []interface{}{volumeId + ":/a/b", targetPath, "efs", []string{"accesspoint=" + accessPointID, "tls"}}, + mountSuccess: true, + maxInflightMountCalls: UnsetMaxInflightMountCounts, }, { // TODO: Validate deprecation warning @@ -278,9 +293,10 @@ func TestNodePublishVolume(t *testing.T) { }, TargetPath: targetPath, }, - expectMakeDir: true, - mountArgs: []interface{}{volumeId + ":/", targetPath, "efs", []string{"accesspoint=" + accessPointID, "tls"}}, - mountSuccess: true, + expectMakeDir: true, + mountArgs: []interface{}{volumeId + ":/", targetPath, "efs", []string{"accesspoint=" + accessPointID, "tls"}}, + mountSuccess: true, + maxInflightMountCalls: UnsetMaxInflightMountCounts, }, { name: "success: normal with encryptInTransit true volume context", @@ -290,9 +306,10 @@ func TestNodePublishVolume(t *testing.T) { TargetPath: targetPath, VolumeContext: map[string]string{"encryptInTransit": "true"}, }, - expectMakeDir: true, - mountArgs: []interface{}{volumeId + ":/", targetPath, "efs", []string{"tls"}}, - mountSuccess: true, + expectMakeDir: true, + mountArgs: []interface{}{volumeId + ":/", targetPath, "efs", []string{"tls"}}, + mountSuccess: true, + maxInflightMountCalls: UnsetMaxInflightMountCounts, }, { name: "success: normal with encryptInTransit false volume context", @@ -302,9 +319,10 @@ func TestNodePublishVolume(t *testing.T) { TargetPath: targetPath, VolumeContext: map[string]string{"encryptInTransit": "false"}, }, - expectMakeDir: true, - mountArgs: []interface{}{volumeId + ":/", targetPath, "efs", []string{}}, - mountSuccess: true, + expectMakeDir: true, + mountArgs: []interface{}{volumeId + ":/", targetPath, "efs", []string{}}, + mountSuccess: true, + maxInflightMountCalls: UnsetMaxInflightMountCounts, }, { name: "success: normal with crossaccount true volume context", @@ -314,9 +332,10 @@ func TestNodePublishVolume(t *testing.T) { TargetPath: targetPath, VolumeContext: map[string]string{"crossaccount": "true"}, }, - expectMakeDir: true, - mountArgs: []interface{}{volumeId + ":/", targetPath, "efs", []string{"tls", "crossaccount"}}, - mountSuccess: true, + expectMakeDir: true, + mountArgs: []interface{}{volumeId + ":/", targetPath, "efs", []string{"tls", "crossaccount"}}, + mountSuccess: true, + maxInflightMountCalls: UnsetMaxInflightMountCounts, }, { name: "success: normal with crossaccount false volume context", @@ -326,9 +345,10 @@ func TestNodePublishVolume(t *testing.T) { TargetPath: targetPath, VolumeContext: map[string]string{"crossaccount": "false"}, }, - expectMakeDir: true, - mountArgs: []interface{}{volumeId + ":/", targetPath, "efs", []string{"tls"}}, - mountSuccess: true, + expectMakeDir: true, + mountArgs: []interface{}{volumeId + ":/", targetPath, "efs", []string{"tls"}}, + mountSuccess: true, + maxInflightMountCalls: UnsetMaxInflightMountCounts, }, { name: "success: normal with volume context populated from dynamic provisioning", @@ -339,9 +359,10 @@ func TestNodePublishVolume(t *testing.T) { VolumeContext: map[string]string{"storage.kubernetes.io/csiprovisioneridentity": "efs.csi.aws.com", "mounttargetip": "127.0.0.1"}, }, - expectMakeDir: true, - mountArgs: []interface{}{volumeId + ":/", targetPath, "efs", []string{"mounttargetip=127.0.0.1", "tls"}}, - mountSuccess: true, + expectMakeDir: true, + mountArgs: []interface{}{volumeId + ":/", targetPath, "efs", []string{"mounttargetip=127.0.0.1", "tls"}}, + mountSuccess: true, + maxInflightMountCalls: UnsetMaxInflightMountCounts, }, { name: "success: supported volume fstype capability", @@ -359,9 +380,10 @@ func TestNodePublishVolume(t *testing.T) { }, TargetPath: targetPath, }, - expectMakeDir: true, - mountArgs: []interface{}{volumeId + ":/", targetPath, "efs", []string{"tls"}}, - mountSuccess: true, + expectMakeDir: true, + mountArgs: []interface{}{volumeId + ":/", targetPath, "efs", []string{"tls"}}, + mountSuccess: true, + maxInflightMountCalls: UnsetMaxInflightMountCounts, }, { name: "fail: conflicting access point in volume handle and mount options", @@ -384,6 +406,7 @@ func TestNodePublishVolume(t *testing.T) { code: "InvalidArgument", message: "Found conflicting access point IDs in mountOptions (fsap-deadbeef) and volumeHandle (fsap-abcd1234)", }, + maxInflightMountCalls: UnsetMaxInflightMountCounts, }, { name: "fail: too many fields in volume handle", @@ -397,6 +420,7 @@ func TestNodePublishVolume(t *testing.T) { code: "InvalidArgument", message: "volume ID 'fs-abc123:/a/b/::four!' is invalid: Expected at most three fields separated by ':'", }, + maxInflightMountCalls: UnsetMaxInflightMountCounts, }, { name: "fail: missing target path", @@ -409,6 +433,7 @@ func TestNodePublishVolume(t *testing.T) { code: "InvalidArgument", message: "Target path not provided", }, + maxInflightMountCalls: UnsetMaxInflightMountCounts, }, { name: "fail: missing volume capability", @@ -421,6 +446,7 @@ func TestNodePublishVolume(t *testing.T) { code: "InvalidArgument", message: "Volume capability not provided", }, + maxInflightMountCalls: UnsetMaxInflightMountCounts, }, { name: "fail: unsupported volume capability", @@ -441,6 +467,7 @@ func TestNodePublishVolume(t *testing.T) { code: "InvalidArgument", message: "Volume capability not supported: invalid access mode: SINGLE_NODE_READER_ONLY", }, + maxInflightMountCalls: UnsetMaxInflightMountCounts, }, { name: "fail: unsupported volume access type", @@ -461,6 +488,7 @@ func TestNodePublishVolume(t *testing.T) { code: "InvalidArgument", message: "Volume capability not supported: only filesystem volumes are supported", }, + maxInflightMountCalls: UnsetMaxInflightMountCounts, }, { name: "fail: multiple unsupported volume capabilities", @@ -482,6 +510,7 @@ func TestNodePublishVolume(t *testing.T) { code: "InvalidArgument", message: "Volume capability not supported: invalid access mode: SINGLE_NODE_READER_ONLY", }, + maxInflightMountCalls: UnsetMaxInflightMountCounts, }, { name: "fail: mounter failed to MakeDir", @@ -496,6 +525,7 @@ func TestNodePublishVolume(t *testing.T) { code: "Internal", message: `Could not create dir "/target/path": failed to MakeDir`, }, + maxInflightMountCalls: UnsetMaxInflightMountCounts, }, { name: "fail: mounter failed to Mount", @@ -511,6 +541,7 @@ func TestNodePublishVolume(t *testing.T) { code: "Internal", message: `Could not mount "fs-abc123:/" at "/target/path": failed to Mount`, }, + maxInflightMountCalls: UnsetMaxInflightMountCounts, }, { name: "fail: unsupported volume context", @@ -525,6 +556,7 @@ func TestNodePublishVolume(t *testing.T) { code: "InvalidArgument", message: "Volume context property asdf not supported.", }, + maxInflightMountCalls: UnsetMaxInflightMountCounts, }, { name: "fail: invalid filesystem ID", @@ -538,6 +570,7 @@ func TestNodePublishVolume(t *testing.T) { code: "InvalidArgument", message: "volume ID 'invalid-id' is invalid: Expected a file system ID of the form 'fs-...'", }, + maxInflightMountCalls: UnsetMaxInflightMountCounts, }, { name: "fail: invalid access point ID", @@ -551,6 +584,7 @@ func TestNodePublishVolume(t *testing.T) { code: "InvalidArgument", message: "volume ID 'fs-abc123::invalid-id' has an invalid access point ID 'invalid-id': Expected it to be of the form 'fsap-...'", }, + maxInflightMountCalls: UnsetMaxInflightMountCounts, }, { name: "fail: tls in mount options and encryptInTransit false volume context", @@ -574,6 +608,7 @@ func TestNodePublishVolume(t *testing.T) { code: "InvalidArgument", message: "Found tls in mountOptions but encryptInTransit is false", }, + maxInflightMountCalls: UnsetMaxInflightMountCounts, }, { name: "fail: encryptInTransit invalid boolean value volume context", @@ -595,7 +630,7 @@ func TestNodePublishVolume(t *testing.T) { t.Run(tc.name, func(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - mockMounter, driver, ctx := setup(mockCtrl, NewVolStatter(), tc.volMetricsOptIn) + mockMounter, driver, ctx := setup(mockCtrl, NewVolStatter(), tc.volMetricsOptIn, tc.maxInflightMountCalls) if tc.expectMakeDir { var err error @@ -723,7 +758,7 @@ func TestNodeUnpublishVolume(t *testing.T) { t.Run(tc.name, func(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - mockMounter, driver, ctx := setup(mockCtrl, NewVolStatter(), true) + mockMounter, driver, ctx := setup(mockCtrl, NewVolStatter(), true, UnsetMaxInflightMountCounts) if tc.expectGetDeviceName { mockMounter.EXPECT(). @@ -850,7 +885,7 @@ func TestNodeGetVolumeStats(t *testing.T) { //setup mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - _, driver, ctx = setup(mockCtrl, NewVolStatter(), true) + _, driver, ctx = setup(mockCtrl, NewVolStatter(), true, UnsetMaxInflightMountCounts) if tc.updateCache { mu.Lock() @@ -875,6 +910,53 @@ func TestNodeGetVolumeStats(t *testing.T) { os.RemoveAll(validPath) } +func TestNodeGetInfo(t *testing.T) { + testCases := []struct { + name string + volumeAttachLimit int64 + expectedResponse *csi.NodeGetInfoResponse + }{ + { + name: "returns nodeID and volumeAttachLimit", + volumeAttachLimit: 100, + expectedResponse: &csi.NodeGetInfoResponse{ + NodeId: "test-node-id", + MaxVolumesPerNode: 100, + }, + }, + { + name: "zero volume attach limit", + volumeAttachLimit: 0, + expectedResponse: &csi.NodeGetInfoResponse{ + NodeId: "test-node-id", + MaxVolumesPerNode: 0, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + driver := &Driver{ + nodeID: "test-node-id", + volumeAttachLimit: tc.volumeAttachLimit, + } + + req := &csi.NodeGetInfoRequest{} + ctx := context.Background() + + ret, err := driver.NodeGetInfo(ctx, req) + + testResult(t, "NodeGetInfo", ret, err, errtyp{}) + if !reflect.DeepEqual(tc.expectedResponse, ret) { + t.Errorf("Expected: %v, Actual: %v", tc.expectedResponse, ret) + } + }) + } +} + func testResponse(t *testing.T, expected, actual *csi.NodeGetVolumeStatsResponse) { if !reflect.DeepEqual(expected, actual) { t.Errorf("Expected: %v, Actual: %v", expected, actual) @@ -1042,3 +1124,124 @@ func TestTryRemoveNotReadyTaintUntilSucceed(t *testing.T) { } } } + +// Run a test in subprocess that may call os.Exit or klog.Fatal. +func runForkFatalTest(testName string) error { + cmd := exec.Command(os.Args[0], fmt.Sprintf("-test.run=%v", testName)) + // Fork off the process + cmd.Env = append(os.Environ(), "FORK=1") + err := cmd.Run() + return err +} + +func TestGetMaxInflightMountCalls(t *testing.T) { + testCases := []struct { + name string + maxInflightMountCallsOptIn bool + maxInflightMountCalls int64 + expected int64 + expectFatal bool + }{ + { + name: "opt-in false returns unset", + maxInflightMountCallsOptIn: false, + maxInflightMountCalls: 10, + expected: UnsetMaxInflightMountCounts, + }, + { + name: "opt-in true with valid value", + maxInflightMountCallsOptIn: true, + maxInflightMountCalls: 5, + expected: 5, + }, + { + name: "opt-in true with zero value should fatal", + maxInflightMountCallsOptIn: true, + maxInflightMountCalls: 0, + expectFatal: true, + }, + { + name: "opt-in true with negative value should fatal", + maxInflightMountCallsOptIn: true, + maxInflightMountCalls: UnsetMaxInflightMountCounts, + expectFatal: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + if tc.expectFatal { + if os.Getenv("FORK") == "1" { + // If it is in forked process, run the fatal code directly and let klog.Fatal exit + getMaxInflightMountCalls(tc.maxInflightMountCallsOptIn, tc.maxInflightMountCalls) + return + } + err := runForkFatalTest("TestGetMaxInflightMountCalls/" + tc.name) + if err == nil { + t.Fatal("expected process to exit with error") + } + } else { + result := getMaxInflightMountCalls(tc.maxInflightMountCallsOptIn, tc.maxInflightMountCalls) + if result != tc.expected { + t.Errorf("Expected %d, got %d", tc.expected, result) + } + } + }) + } +} + +func TestGetVolumeAttachLimit(t *testing.T) { + testCases := []struct { + name string + volumeAttachLimitOptIn bool + volumeAttachLimit int64 + expected int64 + expectFatal bool + }{ + { + name: "opt-in false returns zero", + volumeAttachLimitOptIn: false, + volumeAttachLimit: 100, + expected: 0, + }, + { + name: "opt-in true with valid value", + volumeAttachLimitOptIn: true, + volumeAttachLimit: 50, + expected: 50, + }, + { + name: "opt-in true with zero value should fatal", + volumeAttachLimitOptIn: true, + volumeAttachLimit: 0, + expectFatal: true, + }, + { + name: "opt-in true with negative value should fatal", + volumeAttachLimitOptIn: true, + volumeAttachLimit: -1, + expectFatal: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + if tc.expectFatal { + // If it is in forked process, run the fatal code directly and let klog.Fatal exit + if os.Getenv("FORK") == "1" { + getVolumeAttachLimit(tc.volumeAttachLimitOptIn, tc.volumeAttachLimit) + return + } + err := runForkFatalTest("TestGetVolumeAttachLimit/" + tc.name) + if err == nil { + t.Fatal("expected process to exit with error") + } + } else { + result := getVolumeAttachLimit(tc.volumeAttachLimitOptIn, tc.volumeAttachLimit) + if result != tc.expected { + t.Errorf("Expected %d, got %d", tc.expected, result) + } + } + }) + } +} diff --git a/pkg/driver/sanity_test.go b/pkg/driver/sanity_test.go index 1f748135b..cc740c02a 100644 --- a/pkg/driver/sanity_test.go +++ b/pkg/driver/sanity_test.go @@ -71,16 +71,17 @@ func TestSanityEFSCSI(t *testing.T) { mockCtrl := gomock.NewController(t) mockCloud := cloud.NewFakeCloudProvider() drv := Driver{ - endpoint: endpoint, - nodeID: "sanity", - mounter: NewFakeMounter(), - efsWatchdog: &mockWatchdog{}, - cloud: mockCloud, - nodeCaps: nodeCaps, - volMetricsOptIn: true, - volStatter: NewVolStatter(), - gidAllocator: NewGidAllocator(), - lockManager: NewLockManagerMap(), + endpoint: endpoint, + nodeID: "sanity", + mounter: NewFakeMounter(), + efsWatchdog: &mockWatchdog{}, + cloud: mockCloud, + nodeCaps: nodeCaps, + volMetricsOptIn: true, + volStatter: NewVolStatter(), + gidAllocator: NewGidAllocator(), + lockManager: NewLockManagerMap(), + inFlightMountTracker: NewInFlightMountTracker(UnsetMaxInflightMountCounts), } defer func() { if r := recover(); r != nil {