-
Notifications
You must be signed in to change notification settings - Fork 9.2k
HADOOP-18637:S3A to support upload of files greater than 2 GB using DiskBlocks #5481
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from 16 commits
Commits
Show all changes
17 commits
Select commit
Hold shift + click to select a range
03a8c8b
BASE COMMIT
768f41b
ADDED TEST
ddde1e6
MORE FIXES
542ffc2
MORE FIXES and Patches
773e03a
MORE FIXES and Patches
58a0453
MORE FIXES
5d3f9d9
MORE FIXES FOR TEST
a2d25f6
Added License Doc
f381b88
Review Fixes
ca725f9
Review Fixes
ea0007f
Review Fixes with the test
1f56e2a
File Fixes
4e922b4
Minor Changes
13fc2d5
Added tests for the committers
1476424
Review Fixes
f18c0cb
Review Fixes for the Path Capabilities on S3
7207fdd
Review Fixes for tests
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -101,7 +101,7 @@ class S3ABlockOutputStream extends OutputStream implements | |
| private final String key; | ||
|
|
||
| /** Size of all blocks. */ | ||
| private final int blockSize; | ||
| private final long blockSize; | ||
|
|
||
| /** IO Statistics. */ | ||
| private final IOStatistics iostatistics; | ||
|
|
@@ -169,6 +169,9 @@ class S3ABlockOutputStream extends OutputStream implements | |
| /** Thread level IOStatistics Aggregator. */ | ||
| private final IOStatisticsAggregator threadIOStatisticsAggregator; | ||
|
|
||
| /** Is multipart upload enabled? */ | ||
| private final boolean isMultipartUploadEnabled; | ||
|
|
||
| /** | ||
| * An S3A output stream which uploads partitions in a separate pool of | ||
| * threads; different {@link S3ADataBlocks.BlockFactory} | ||
|
|
@@ -181,7 +184,7 @@ class S3ABlockOutputStream extends OutputStream implements | |
| this.builder = builder; | ||
| this.key = builder.key; | ||
| this.blockFactory = builder.blockFactory; | ||
| this.blockSize = (int) builder.blockSize; | ||
| this.blockSize = builder.blockSize; | ||
| this.statistics = builder.statistics; | ||
| // test instantiations may not provide statistics; | ||
| this.iostatistics = statistics.getIOStatistics(); | ||
|
|
@@ -200,6 +203,7 @@ class S3ABlockOutputStream extends OutputStream implements | |
| createBlockIfNeeded(); | ||
| LOG.debug("Initialized S3ABlockOutputStream for {}" + | ||
| " output to {}", key, activeBlock); | ||
| this.isMultipartUploadEnabled = builder.isMultipartUploadEnabled; | ||
| if (putTracker.initialize()) { | ||
| LOG.debug("Put tracker requests multipart upload"); | ||
| initMultipartUpload(); | ||
|
|
@@ -318,7 +322,7 @@ public synchronized void write(byte[] source, int offset, int len) | |
| statistics.writeBytes(len); | ||
| S3ADataBlocks.DataBlock block = createBlockIfNeeded(); | ||
| int written = block.write(source, offset, len); | ||
| int remainingCapacity = block.remainingCapacity(); | ||
| int remainingCapacity = (int) block.remainingCapacity(); | ||
| if (written < len) { | ||
| // not everything was written —the block has run out | ||
| // of capacity | ||
|
|
@@ -369,6 +373,8 @@ private synchronized void uploadCurrentBlock(boolean isLast) | |
| */ | ||
| @Retries.RetryTranslated | ||
| private void initMultipartUpload() throws IOException { | ||
| Preconditions.checkState(!isMultipartUploadEnabled, | ||
|
||
| "multipart upload is disabled"); | ||
| if (multiPartUpload == null) { | ||
| LOG.debug("Initiating Multipart upload"); | ||
| multiPartUpload = new MultiPartUpload(key); | ||
|
|
@@ -558,19 +564,20 @@ public String toString() { | |
| } | ||
|
|
||
| /** | ||
| * Upload the current block as a single PUT request; if the buffer | ||
| * is empty a 0-byte PUT will be invoked, as it is needed to create an | ||
| * entry at the far end. | ||
| * @throws IOException any problem. | ||
| * @return number of bytes uploaded. If thread was interrupted while | ||
| * waiting for upload to complete, returns zero with interrupted flag set | ||
| * on this thread. | ||
| * Upload the current block as a single PUT request; if the buffer is empty a | ||
| * 0-byte PUT will be invoked, as it is needed to create an entry at the far | ||
steveloughran marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| * end. | ||
| * @return number of bytes uploaded. If thread was interrupted while waiting | ||
| * for upload to complete, returns zero with interrupted flag set on this | ||
| * thread. | ||
| * @throws IOException | ||
| * any problem. | ||
| */ | ||
| private int putObject() throws IOException { | ||
| private long putObject() throws IOException { | ||
| LOG.debug("Executing regular upload for {}", writeOperationHelper); | ||
|
|
||
| final S3ADataBlocks.DataBlock block = getActiveBlock(); | ||
| int size = block.dataSize(); | ||
| long size = block.dataSize(); | ||
| final S3ADataBlocks.BlockUploadData uploadData = block.startUpload(); | ||
| final PutObjectRequest putObjectRequest = uploadData.hasFile() ? | ||
| writeOperationHelper.createPutObjectRequest( | ||
|
|
@@ -835,7 +842,7 @@ private void uploadBlockAsync(final S3ADataBlocks.DataBlock block, | |
| Preconditions.checkNotNull(uploadId, "Null uploadId"); | ||
| maybeRethrowUploadFailure(); | ||
| partsSubmitted++; | ||
| final int size = block.dataSize(); | ||
| final long size = block.dataSize(); | ||
| bytesSubmitted += size; | ||
| final int currentPartNumber = partETagsFutures.size() + 1; | ||
| final UploadPartRequest request; | ||
|
|
@@ -1011,7 +1018,7 @@ public void progressChanged(ProgressEvent progressEvent) { | |
| ProgressEventType eventType = progressEvent.getEventType(); | ||
| long bytesTransferred = progressEvent.getBytesTransferred(); | ||
|
|
||
| int size = block.dataSize(); | ||
| long size = block.dataSize(); | ||
| switch (eventType) { | ||
|
|
||
| case REQUEST_BYTE_TRANSFER_EVENT: | ||
|
|
@@ -1126,6 +1133,11 @@ public static final class BlockOutputStreamBuilder { | |
| */ | ||
| private IOStatisticsAggregator ioStatisticsAggregator; | ||
|
|
||
| /** | ||
| * Is Multipart Uploads enabled for the given upload. | ||
| */ | ||
| private boolean isMultipartUploadEnabled; | ||
|
|
||
| private BlockOutputStreamBuilder() { | ||
| } | ||
|
|
||
|
|
@@ -1276,5 +1288,11 @@ public BlockOutputStreamBuilder withIOStatisticsAggregator( | |
| ioStatisticsAggregator = value; | ||
| return this; | ||
| } | ||
|
|
||
| public BlockOutputStreamBuilder withMultipartEnabled( | ||
| final boolean value) { | ||
| isMultipartUploadEnabled = value; | ||
| return this; | ||
| } | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.