Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions charts/aws-efs-csi-driver/templates/node-daemonset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ spec:
- --vol-metrics-opt-in={{ hasKey .Values.node "volMetricsOptIn" | ternary .Values.node.volMetricsOptIn false }}
- --vol-metrics-refresh-period={{ hasKey .Values.node "volMetricsRefreshPeriod" | ternary .Values.node.volMetricsRefreshPeriod 240 }}
- --vol-metrics-fs-rate-limit={{ hasKey .Values.node "volMetricsFsRateLimit" | ternary .Values.node.volMetricsFsRateLimit 5 }}
- --max-inflight-mount-calls-opt-in={{ hasKey .Values.node "maxInflightMountCallsOptIn" | ternary .Values.node.maxInflightMountCallsOptIn false }}
- --max-inflight-mount-calls={{ hasKey .Values.node "maxInflightMountCalls" | ternary .Values.node.maxInflightMountCalls 10 }}
- --volume-attach-limit-opt-in={{ hasKey .Values.node "volumeAttachLimitOptIn" | ternary .Values.node.volumeAttachLimitOptIn false }}
- --volume-attach-limit={{ hasKey .Values.node "volumeAttachLimit" | ternary .Values.node.volumeAttachLimit 20 }}
env:
- name: CSI_ENDPOINT
value: unix:/csi/csi.sock
Expand Down
10 changes: 7 additions & 3 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,12 @@ func main() {
volMetricsFsRateLimit = flag.Int("vol-metrics-fs-rate-limit", 5, "Volume metrics routines rate limiter per file system")
deleteAccessPointRootDir = flag.Bool("delete-access-point-root-dir", false,
"Opt in to delete access point root directory by DeleteVolume. By default, DeleteVolume will delete the access point behind Persistent Volume and deleting access point will not delete the access point root directory or its contents.")
adaptiveRetryMode = flag.Bool("adaptive-retry-mode", true, "Opt out to use standard sdk retry configuration. By default, adaptive retry mode will be used to more heavily client side rate limit EFS API requests.")
tags = flag.String("tags", "", "Space separated key:value pairs which will be added as tags for EFS resources. For example, 'environment:prod region:us-east-1'")
adaptiveRetryMode = flag.Bool("adaptive-retry-mode", true, "Opt out to use standard sdk retry configuration. By default, adaptive retry mode will be used to more heavily client side rate limit EFS API requests.")
tags = flag.String("tags", "", "Space separated key:value pairs which will be added as tags for EFS resources. For example, 'environment:prod region:us-east-1'")
maxInflightMountCallsOptIn = flag.Bool("max-inflight-mount-calls-opt-in", false, "Opt in to use max inflight mount calls limit.")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we could simplify this by just creating one flag for each? If no is maxInflightCalls or volumeLimit is provided we just use current behavior (not limited at all)?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer to keep the OptIn flag primarily for future-proofing. If we later implement default calculations for maxInflightCalls or volumeLimit, users could set OptIn=true without specifying values for parameters like maxInflightMountCalls.

While we could remove the OptIn feature gate later if we decide not to provide default calculations, adding it back would be more disruptive. From a user perspective, without OptIn=true, a configuration with maxInflightMountCalls=10 would be ignored, breaking backward compatibility.

maxInflightMountCalls = flag.Int64("max-inflight-mount-calls", driver.UnsetMaxInflightMountCounts, "New NodePublishVolume operation will be blocked if maximum number of inflight calls is reached. If maxInflightMountCallsOptIn is true, it has to be set to a positive value.")
Comment thread
DavidXU12345 marked this conversation as resolved.
volumeAttachLimitOptIn = flag.Bool("volume-attach-limit-opt-in", false, "Opt in to use volume attach limit.")
volumeAttachLimit = flag.Int64("volume-attach-limit", driver.UnsetVolumeAttachLimit, "Maximum number of volumes that can be attached to a node. If volumeAttachLimitOptIn is true, it has to be set to a positive value.")
)
klog.InitFlags(nil)
flag.Parse()
Expand All @@ -61,7 +65,7 @@ func main() {
if err != nil {
klog.Fatalln(err)
}
drv := driver.NewDriver(*endpoint, etcAmazonEfs, *efsUtilsStaticFilesPath, *tags, *volMetricsOptIn, *volMetricsRefreshPeriod, *volMetricsFsRateLimit, *deleteAccessPointRootDir, *adaptiveRetryMode)
drv := driver.NewDriver(*endpoint, etcAmazonEfs, *efsUtilsStaticFilesPath, *tags, *volMetricsOptIn, *volMetricsRefreshPeriod, *volMetricsFsRateLimit, *deleteAccessPointRootDir, *adaptiveRetryMode, *maxInflightMountCallsOptIn, *maxInflightMountCalls, *volumeAttachLimitOptIn, *volumeAttachLimit)
Copy link
Copy Markdown
Contributor Author

@DavidXU12345 DavidXU12345 Sep 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably add all options into its own type (similar as EBS CSI driver) so that we can pass option around rather than passing each value into NewDriver for example. We can also encapsulate validation logic in option as well (e.g. check if maxInflightMountCallsOptIn is true, maxInflightMountCalls has to be a positive value).

I may consider to do the refactoring in next PR since the size of this PR is already very large.

if err := drv.Run(); err != nil {
klog.Fatalln(err)
}
Expand Down
4 changes: 4 additions & 0 deletions deploy/kubernetes/base/node-daemonset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ spec:
- --vol-metrics-opt-in=false
- --vol-metrics-refresh-period=240
- --vol-metrics-fs-rate-limit=5
- --max-inflight-mount-calls-opt-in=false
- --max-inflight-mount-calls=10
- --volume-attach-limit-opt-in=false
- --volume-attach-limit=20
env:
- name: CSI_ENDPOINT
value: unix:/csi/csi.sock
Expand Down
24 changes: 23 additions & 1 deletion docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,30 @@ After deploying the driver, you can continue to these sections:
|-----------------------------|--------|---------|----------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| vol-metrics-opt-in | | false | true | Opt in to emit volume metrics. |
| vol-metrics-refresh-period | | 240 | true | Refresh period for volume metrics in minutes. |
| vol-metrics-fs-rate-limit | | 5 | true | Volume metrics routines rate limiter per file system. |
| max-inflight-mount-calls-opt-in | | false | true | Opt in to use max inflight mount calls limit. |
| max-inflight-mount-calls | | -1 | true | New NodePublishVolume operation will be blocked if maximum number of inflight calls is reached. If maxInflightMountCallsOptIn is true, it has to be set to a positive value. |
| volume-attach-limit-opt-in | | false | true | Opt in to use volume attach limit. |
| volume-attach-limit | | -1 | true | Maximum number of volumes that can be attached to a node. If volumeAttachLimitOptIn is true, it has to be set to a positive value. |

#### Suggestion for setting max-inflight-mount-calls and volume-attach-limit
Comment thread
DavidXU12345 marked this conversation as resolved.

To prevent out-of-memory (OOM) issues in the efs-plugin container, configure these parameters based on your container's memory limit:

- Each EFS volume consumes **~12 MiB** of memory (for the efs-proxy process)
- Each concurrent mount operation consumes **~30 MiB** of memory during peak usage
- A single mount operation typically takes **~100 milliseconds** to complete
- For example, concurrent mount operations can occur when multiple pods are being scheduled simultaneously and need to mount EFS volumes

#### Recommended formula
```
Container Memory Limit = ((volume-attach-limit × 12) + (max-inflight-mount-calls × 30)) × 1.5 MiB
```

#### Example calculation
- For 50 volumes and 10 concurrent mounts: `((50 × 12) + (10 × 30)) × 1.5 = 1,350 MiB`
- Set container memory limit to at least 1.4 GiB

> **Note:** The 1.5x multiplier provides a safety buffer for other container processes and memory fluctuations.


##### Understanding the Impact of vol-metrics-opt-in:
Expand Down
10 changes: 8 additions & 2 deletions pkg/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ const (
driverName = "efs.csi.aws.com"

// AgentNotReadyTaintKey contains the key of taints to be removed on driver startup
AgentNotReadyNodeTaintKey = "efs.csi.aws.com/agent-not-ready"
AgentNotReadyNodeTaintKey = "efs.csi.aws.com/agent-not-ready"
UnsetMaxInflightMountCounts = -1
UnsetVolumeAttachLimit = -1
)

type Driver struct {
Expand All @@ -53,9 +55,11 @@ type Driver struct {
adaptiveRetryMode bool
tags map[string]string
lockManager LockManagerMap
inFlightMountTracker *InFlightMountTracker
volumeAttachLimit int64
}

func NewDriver(endpoint, efsUtilsCfgPath, efsUtilsStaticFilesPath, tags string, volMetricsOptIn bool, volMetricsRefreshPeriod float64, volMetricsFsRateLimit int, deleteAccessPointRootDir bool, adaptiveRetryMode bool) *Driver {
func NewDriver(endpoint, efsUtilsCfgPath, efsUtilsStaticFilesPath, tags string, volMetricsOptIn bool, volMetricsRefreshPeriod float64, volMetricsFsRateLimit int, deleteAccessPointRootDir bool, adaptiveRetryMode bool, maxInflightMountCallsOptIn bool, maxInflightMountCalls int64, volumeAttachLimitOptIn bool, volumeAttachLimit int64) *Driver {
cloud, err := cloud.NewCloud(adaptiveRetryMode)
if err != nil {
klog.Fatalln(err)
Expand All @@ -79,6 +83,8 @@ func NewDriver(endpoint, efsUtilsCfgPath, efsUtilsStaticFilesPath, tags string,
adaptiveRetryMode: adaptiveRetryMode,
tags: parseTagsFromStr(strings.TrimSpace(tags)),
lockManager: NewLockManagerMap(),
inFlightMountTracker: NewInFlightMountTracker(getMaxInflightMountCalls(maxInflightMountCallsOptIn, maxInflightMountCalls)),
volumeAttachLimit: getVolumeAttachLimit(volumeAttachLimitOptIn, volumeAttachLimit),
}
}

Expand Down
47 changes: 47 additions & 0 deletions pkg/driver/inflight_mount_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package driver

import (
"sync"

"k8s.io/klog/v2"
)

type InFlightMountTracker struct {
mux sync.Mutex
count int64
maxCount int64
}

func NewInFlightMountTracker(maxCount int64) *InFlightMountTracker {
if maxCount <= 0 {
klog.V(4).InfoS("InFlightMountTracker is disabled")
return nil
}
return &InFlightMountTracker{
maxCount: maxCount,
}
}

func (checker *InFlightMountTracker) increment() bool {
checker.mux.Lock()
defer checker.mux.Unlock()

if checker.count >= checker.maxCount {
return false
}

checker.count++
return true
}

func (checker *InFlightMountTracker) decrement() bool {
checker.mux.Lock()
defer checker.mux.Unlock()
if checker.count == 0 {
klog.Error("InFlightMountTracker: trying to decrement count when it is already 0")
return false
}

checker.count--
return true
}
97 changes: 97 additions & 0 deletions pkg/driver/inflight_mount_tracker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package driver

import (
"sync"
"testing"
)

func assertEqual[T comparable](t *testing.T, actual, expected T, description string) {
if expected != actual {
t.Errorf("%s: expected %v != actual %v", description, expected, actual)
}
}

func TestNewInFlightMountTracker(t *testing.T) {
checker := NewInFlightMountTracker(5)
assertEqual(t, checker.maxCount, 5, "Max inflight count")
assertEqual(t, checker.count, 0, "Inflight count")

checker = NewInFlightMountTracker(UnsetMaxInflightMountCounts)
assertEqual(t, checker, nil, "Nil checker for negative max inflight mount counts")

checker = NewInFlightMountTracker(0)
assertEqual(t, checker, nil, "Nil checker for zero max inflight mount counts")
}

func TestIncrement(t *testing.T) {
maxFlightCount := int64(2)
checker := NewInFlightMountTracker(maxFlightCount)

if !checker.increment() {
t.Errorf("First increment should succeed with max inflight count=%d", maxFlightCount)
}
assertEqual(t, checker.count, 1, "Inflight count after first increment")

if !checker.increment() {
t.Errorf("Second increment should succeed with max inflight count=%d", maxFlightCount)
}
assertEqual(t, checker.count, 2, "Inflight count after second increment")

if checker.increment() {
t.Errorf("Third increment should fail with max inflight count=%d", maxFlightCount)
}
assertEqual(t, checker.count, 2, "Inflight count after third increment")
}

func TestDecrement(t *testing.T) {
maxFlightCount := int64(2)
checker := NewInFlightMountTracker(maxFlightCount)
checker.increment()
checker.increment()

checker.decrement()
assertEqual(t, checker.count, 1, "Inflight count after first decrement")

checker.decrement()
assertEqual(t, checker.count, 0, "Inflight count after second decrement")

// Should not decrement further when the count is already zero
checker.decrement()
assertEqual(t, checker.count, 0, "Inflight count after decrement when count is already zero")
}

func TestConcurrency(t *testing.T) {
// Run multiple iterations to increase chance of catching race conditions
for i := 0; i < 100; i++ {
maxFlightCount := int64(500)
checker := NewInFlightMountTracker(maxFlightCount)
var wg sync.WaitGroup
var mu sync.Mutex

numGoRoutinesForIncrement := 400
for range numGoRoutinesForIncrement {
wg.Add(1)
go func() {
defer wg.Done()
checker.increment()
}()
}

numGoRoutinesForDecrement := 350
actualDecrements := 0
for range numGoRoutinesForDecrement {
wg.Add(1)
go func() {
defer wg.Done()
if checker.decrement() {
mu.Lock()
actualDecrements++
mu.Unlock()
}
}()
}

wg.Wait()
assertEqual(t, checker.count, int64(numGoRoutinesForIncrement-actualDecrements), "inflight count")
}
}
51 changes: 50 additions & 1 deletion pkg/driver/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ var (
supportedFSTypes = []string{"efs", ""}
)

const (
maxInflightMountCallsReached = "The number of concurrent mount calls is %v, which has reached the limit"
)

func (d *Driver) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
Expand Down Expand Up @@ -77,6 +81,17 @@ func (d *Driver) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolu
return nil, status.Error(codes.InvalidArgument, "Volume capability access type must be mount")
}

if d.inFlightMountTracker != nil {
if ok := d.inFlightMountTracker.increment(); !ok {
return nil, status.Errorf(codes.Aborted, maxInflightMountCallsReached, d.inFlightMountTracker.maxCount)
}

defer func() {
klog.V(4).Infof("NodePublishVolume: volume operation finished for volumeId: %s with %d inflight count before decrementing", req.GetVolumeId(), d.inFlightMountTracker.count)
d.inFlightMountTracker.decrement()
}()
}

// TODO when CreateVolume is implemented, it must use the same key names
subpath := "/"
encryptInTransit := true
Expand Down Expand Up @@ -323,8 +338,12 @@ func (d *Driver) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabi
func (d *Driver) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
klog.V(4).Infof("NodeGetInfo: called with args %+v", util.SanitizeRequest(*req))

maxVolumesPerNode := d.volumeAttachLimit
klog.V(4).Infof("NodeGetInfo: maxVolumesPerNode=%d", maxVolumesPerNode)

return &csi.NodeGetInfoResponse{
NodeId: d.nodeID,
NodeId: d.nodeID,
MaxVolumesPerNode: maxVolumesPerNode,
}, nil
}

Expand Down Expand Up @@ -538,3 +557,33 @@ func tryRemoveNotReadyTaintUntilSucceed(interval time.Duration, removeFn func()
time.Sleep(interval)
}
}

func getMaxInflightMountCalls(maxInflightMountCallsOptIn bool, maxInflightMountCalls int64) int64 {
if maxInflightMountCallsOptIn && maxInflightMountCalls <= 0 {
klog.Errorf("Fatal error: maxInflightMountCalls must be greater than 0 when maxInflightMountCallsOptIn is true!")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}

if !maxInflightMountCallsOptIn {
klog.V(4).Infof("MaxInflightMountCallsOptIn is false, setting maxInflightMountCalls to %d and inflight check is disabled", UnsetMaxInflightMountCounts)
return UnsetMaxInflightMountCounts
}

klog.V(4).Infof("MaxInflightMountCalls is manually set to %d", maxInflightMountCalls)
return maxInflightMountCalls
}

func getVolumeAttachLimit(volumeAttachLimitOptIn bool, volumeAttachLimit int64) int64 {
if volumeAttachLimitOptIn && volumeAttachLimit <= 0 {
klog.Errorf("Fatal error: volumeAttachLimit must be greater than 0 when volumeAttachLimitOptIn is true!")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}

if !volumeAttachLimitOptIn {
klog.V(4).Infof("VolumeAttachLimitOptIn is false, setting maxVolumesPerNode to zero so that container orchestrator will decide the value")
return 0
}

klog.V(4).Infof("VolumeAttachLimit is manually set to %d", volumeAttachLimit)
return volumeAttachLimit
}
Loading