Skip to content

Conversation

jorgee
Copy link
Contributor

@jorgee jorgee commented Sep 15, 2025

There is a current issue in the S3 CRT client, where concurrent downloads can consume large amount of heap memory.
aws/aws-sdk-java-v2#6323
It was produced when benchmarking the download directory with several large files from S3 to an EC2. I initially fixed with S3TranferManager.transferDirectoryMaxConcurrency included recentlyin the AWS SDK v2, but it just solve in the case of a single download directory, but not for concurrent directory downloads. Moreover, the same problem is produced when increasing the executor pool size and downloading several big files concurrently.

To avoid the OOM, I have reverted the downloadDirectory method to be managed by Nextflow as in V1. A new class is created that extends the S3 Transfer manager, limiting the number of file downloads based on the buffer created by each task and the maximum heap memory we want to dedicate to file download buffers. By default, it is set to 400MB, but it can be modified with aws.client.maxDownloadBuffer

The extended transfer manager contains a semaphore to limit file downloads. The number of permits of this semaphore is floor(maxDownloadBuffer / minimumPartSize). A file consumes a maximum of 10 parts because (if size exceeds 10 *minimumPartSize) or the number of parts (if size is smaller). This is how CRT client manages the download buffer

Copy link

netlify bot commented Sep 15, 2025

Deploy Preview for nextflow-docs-staging canceled.

Name Link
🔨 Latest commit 106aa07
🔍 Latest deploy log https://app.netlify.com/projects/nextflow-docs-staging/deploys/68d6d4121e9048000994cf22

Signed-off-by: jorgee <[email protected]>
@jorgee jorgee marked this pull request as ready for review September 16, 2025 07:19
@jorgee jorgee requested a review from a team as a code owner September 16, 2025 07:19
cursor[bot]

This comment was marked as outdated.

Copy link
Collaborator

@christopher-hakkaart christopher-hakkaart left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've made a small suggestion for the docs. It's non-blocking so I've approved.

Co-authored-by: Chris Hakkaart <[email protected]>
Signed-off-by: Jorge Ejarque <[email protected]>
cursor[bot]

This comment was marked as outdated.

@pditommaso
Copy link
Member

@claude review

Copy link

claude bot commented Sep 22, 2025

Claude finished @pditommaso's task —— View job


Code Review: S3 Concurrent Download Memory Management

Task: Review PR implementation for limiting S3 concurrent downloads to prevent Java Heap OOM

Todo List:

  • Read and analyze core S3Client changes
  • Review new ExtendedS3TransferManager implementation
  • Examine configuration changes and documentation updates
  • Check test coverage and implementation
  • Provide comprehensive review feedback

Summary

This PR addresses a critical memory management issue in S3 concurrent downloads by implementing a semaphore-based solution to limit concurrent transfers based on heap buffer consumption. The implementation reverts from using AWS SDK's downloadDirectory to a custom Nextflow-managed approach using the new ExtendedS3TransferManager.

Key Changes Analysis

Strengths

  1. Addresses Real Issue: Tackles AWS SDK v2 issue #6323 with practical solution
  2. Memory-Aware Design: Uses buffer estimation (filesize/partsize, capped at 10 parts) to calculate semaphore permits
  3. Configurable: New aws.client.maxDownloadBuffer config option with sensible 400MB default
  4. Comprehensive Testing: Good test coverage for the new ExtendedS3TransferManager class
  5. Documentation Updated: Both user guide and config reference properly updated

⚠️ Issues Identified

Critical Issues

  1. Directory Creation Bug (S3Client.java:346)

    Files.createDirectory(newFolder);
    • Will throw FileAlreadyExistsException if parent directories exist
    • Should use Files.createDirectories(newFolder) for recursive creation
  2. Sequential Processing Performance Regression (S3Client.java:376-396)

    • The downloadDirectory now processes files sequentially in a blocking loop
    • Original concurrent approach provided better performance
    • Consider using CompletableFuture.allOf() or similar for concurrent processing while maintaining memory limits
  3. Error Handling Issues

    • InterruptedException in file visitor silently skips files (line 366-369)
    • Only the last download failure is reported, masking other failures (line 389)
    • Should collect and report all failures

Configuration Issues

  1. Property Key Inconsistency (ExtendedS3TransferManager.java:73-80)
    if( props.containsKey("max_download_buffer")) // underscore format
    • Config uses maxDownloadBuffer but code checks max_download_buffer
    • Should align with AwsS3Config.getAwsClientConfig() which converts to underscores

Code Quality Issues

  1. Magic Numbers: DEFAULT_INIT_BUFFER_PARTS (10) hardcoded without clear rationale
  2. Semaphore Management: Consider timeout on semaphore acquisition to prevent indefinite blocking
  3. Resource Cleanup: Download futures should be properly cancelled on interruption

Minor Issues

  1. Comment Typo (ExtendedS3TransferManager.java:45)
    private static final long DEFAULT_MAX_DOWNLOAD_BUFFER_SIZE = 400 * 1024 * 1024; // 8 MB
    • Comment says "8 MB" but value is 400 MB

Recommendations

Immediate Fixes Required

  1. Fix directory creation: Use Files.createDirectories() instead of Files.createDirectory()

  2. Improve error handling:

    // Collect all errors, don't skip files on InterruptedException
    List<Exception> errors = new ArrayList<>();
    // Process and report all failures
  3. Property key alignment: Ensure property keys match between config and implementation

Performance Improvements

  1. Maintain concurrency: Modify the download loop to process transfers concurrently while respecting semaphore limits

  2. Add semaphore timeout: Prevent indefinite blocking with reasonable timeout

Future Considerations

  1. Metrics/Monitoring: Add logging for semaphore usage to help with debugging/tuning
  2. Dynamic Adjustment: Consider making part estimation more sophisticated based on actual usage patterns

Verdict

The core approach is sound and addresses a real production issue. However, the implementation has several critical bugs that need fixing before merge, particularly around directory creation and error handling. The performance regression from sequential processing should also be addressed.

Recommendation: Request changes to fix critical issues before approval.

@pditommaso pditommaso force-pushed the s3_crt_client_memory_issues branch from 4f99c77 to 5c163ca Compare September 24, 2025 16:53
@pditommaso
Copy link
Member

@claude review again

Copy link

claude bot commented Sep 24, 2025

Claude finished @pditommaso's task —— View job


Re-reviewing S3 Concurrent Download Memory Management PR

Todo List:

  • Read current S3Client.java implementation
  • Review ExtendedS3TransferManager changes
  • Check configuration updates and property naming
  • Compare against previous review feedback
  • Provide updated comprehensive review

Updated Code Review Analysis

After reviewing the current state of the PR and comparing it to the previous review feedback from September 22nd, here are my findings:

Issues Addressed Since Previous Review

  1. Property naming fixed: The configuration property has been renamed from maxDownloadBuffer to maxDownloadHeapMemory as requested by @pditommaso
  2. Property key alignment resolved: The code now properly checks for max_download_heap_memory in ExtendedS3TransferManager.java:80, which matches the underscore format from AwsS3Config.getAwsClientConfig():293
  3. Documentation improved: Configuration reference updated to reflect the new property name

⚠️ Issues Still Present

Critical Issues Remaining

  1. Directory Creation Bug FIXED: Good news - the createDirectory() method now properly handles FileAlreadyExistsException (S3Client.java:324-330). This addresses the previous critical issue.

  2. Performance Regression Still Present (S3Client.java:383-401):

    • Downloads are still processed sequentially in a blocking while loop
    • This causes significant performance degradation compared to concurrent processing
    • The loop blocks on each download completion before starting the next
  3. Error Handling Issues Remain:

    • InterruptedException handling (S3Client.java:373-376): Still silently skips files on interruption instead of re-throwing or setting interrupt flag
    • Error masking (S3Client.java:389-392): Only the last error is captured and thrown, losing information about other failed transfers

Thread Safety & Resource Management

  1. Interrupt Handling Inconsistency:

    • Single file download properly calls Thread.currentThread().interrupt() (line 316)
    • Directory download interrupt handler doesn't follow the same pattern (line 375)
  2. Semaphore Resource Leak Risk:

    • If an InterruptedException occurs during semaphore acquisition, the download never starts but error handling doesn't account for this properly

🔍 New Observations

  1. Test Coverage: Excellent test coverage added for ExtendedS3TransferManager with comprehensive edge cases
  2. Memory Calculation: The semaphore permit calculation logic appears sound (ExtendedS3TransferManager.java:89-96)
  3. Async Completion: Good use of whenComplete callback to ensure semaphore permits are always released (ExtendedS3TransferManager.java:72-74)

📋 Remaining Recommendations

Priority 1 - Fix Error Handling:

// In downloadDirectory visitFile method (around line 373)
} catch (InterruptedException e) {
    log.debug("S3 download directory: s3://{}/{} interrupted", source.getBucket(), source.getKey());
    Thread.currentThread().interrupt(); // Add this line
    return FileVisitResult.TERMINATE;   // Stop processing more files
}

Priority 2 - Improve Performance:
Consider processing downloads concurrently while respecting semaphore limits rather than the current sequential approach.

Priority 3 - Collect All Errors:

// Collect all errors instead of just the last one
List<Throwable> errors = new ArrayList<>();
// ... in the loop
} catch (ExecutionException e) {
    errors.add(e.getCause());
    log.debug("Exception thrown downloading S3 object s3://{}/{}", current.bucket, current.key, e.getCause());
}
// ... after loop
if (!errors.isEmpty()) {
    throw new IOException("Multiple transfers failed. First error: " + errors.get(0).getMessage(), errors.get(0));
}

Summary

The PR has made significant improvements since the previous review, particularly fixing the property naming and directory creation issues. However, critical error handling problems and performance regression remain unresolved. The core memory management approach is sound, but the implementation needs refinement to handle edge cases properly.

Recommendation: Request changes to address the remaining error handling issues before approval. The performance regression, while less critical for correctness, should also be considered for user experience.


cursor[bot]

This comment was marked as outdated.

…eption

Replace logging with throwing InterruptedIOException when handling InterruptedException
in S3 operations. This provides consistent exception propagation instead of silently
logging and continuing execution, allowing proper interruption handling in calling code.

Changes:
- downloadFile: throw InterruptedIOException instead of logging
- downloadDirectory: throw InterruptedIOException in both file visitor and main method
- uploadFile: throw InterruptedIOException instead of logging
- uploadDirectory: throw InterruptedIOException instead of logging
- copyFile: throw InterruptedIOException instead of logging

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <[email protected]>
@pditommaso
Copy link
Member

InterruptedException Handling Improvements

I've updated the S3Client to improve InterruptedException handling across all S3 operations (downloadFile, downloadDirectory, uploadFile, uploadDirectory, and copyFile).

Approach Used

When an InterruptedException occurs during S3 operations, the code now follows Java best practices for interrupted thread handling:

  1. Thread.currentThread().interrupt() - Restores the interrupted status of the current thread. This is crucial because catching InterruptedException clears the thread's interrupted flag, and we need to preserve this information for higher-level code that may need to know the thread was interrupted.

  2. throw new InterruptedIOException - Converts the InterruptedException to an InterruptedIOException (a checked IOException subclass) to properly propagate the interruption through the IOException-throwing method signatures.

Rationale

This approach provides several benefits:

  • Proper exception propagation: Calling code can distinguish between regular IO failures and interruptions
  • Maintains method contracts: No need to change existing method signatures that throw IOException
  • Preserves thread state: The interrupted flag is restored for any subsequent interruption-aware operations
  • Consistent error reporting: Descriptive messages indicate exactly which S3 operation was interrupted
  • No silent failures: Interruptions are not swallowed by logging but properly communicated to calling code

Previous vs. Current Behavior

Before: InterruptedException was caught, logged at debug level, and execution continued normally

} catch (InterruptedException e) {
    log.debug("S3 download file: s3://{}/{} cancelled", source.getBucket(), source.getKey());
    Thread.currentThread().interrupt();
    // Execution continues without throwing
}

After: InterruptedException is caught, thread state is restored, and a proper IOException is thrown

} catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    throw new InterruptedIOException(String.format("S3 download file: s3://%s/%s cancelled", source.getBucket(), source.getKey()));
}

This change ensures that thread interruptions in S3 operations are handled correctly and can be properly responded to by calling code.

Signed-off-by: Paolo Di Tommaso <[email protected]>
cursor[bot]

This comment was marked as outdated.

@pditommaso
Copy link
Member

pditommaso commented Sep 25, 2025

Cursor is annoying, but this could be a relevant point to address

cursor[bot]

This comment was marked as outdated.

Signed-off-by: jorgee <[email protected]>
cursor[bot]

This comment was marked as outdated.

Signed-off-by: jorgee <[email protected]>
@jorgee
Copy link
Contributor Author

jorgee commented Sep 25, 2025

@pditommaso fixed the cursor comments. The relevant changes.

  • Solved the semaphore leak when Runtime exception at submission
  • Validate maxBufferHeapMemory and minimumPartSize values in AwsS3Config
  • Change arrayList.remove(0) that is O(n) by a LinkedList.poll that is O(1) in the while loop to wait for transfers to be finished

There is a commit from you with a sign-off missingc7fad5f

cursor[bot]

This comment was marked as outdated.

Signed-off-by: Paolo Di Tommaso <[email protected]>
@pditommaso pditommaso merged commit fd71d0e into master Sep 27, 2025
26 checks passed
@pditommaso pditommaso deleted the s3_crt_client_memory_issues branch September 27, 2025 12:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants