-
Notifications
You must be signed in to change notification settings - Fork 9.2k
HADOOP-18637:S3A to support upload of files greater than 2 GB using DiskBlocks #5481
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 9 commits
03a8c8b
768f41b
ddde1e6
542ffc2
773e03a
58a0453
5d3f9d9
a2d25f6
f381b88
ca725f9
ea0007f
1f56e2a
4e922b4
13fc2d5
1476424
f18c0cb
7207fdd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -180,7 +180,7 @@ protected BlockFactory(S3AFileSystem owner) { | |
| * @param statistics stats to work with | ||
| * @return a new block. | ||
| */ | ||
| abstract DataBlock create(long index, int limit, | ||
| abstract DataBlock create(long index, long limit, | ||
| BlockOutputStreamStatistics statistics) | ||
| throws IOException; | ||
|
|
||
|
|
@@ -258,7 +258,7 @@ final DestState getState() { | |
| * Return the current data size. | ||
| * @return the size of the data | ||
| */ | ||
| abstract int dataSize(); | ||
| abstract long dataSize(); | ||
|
|
||
| /** | ||
| * Predicate to verify that the block has the capacity to write | ||
|
|
@@ -280,7 +280,7 @@ boolean hasData() { | |
| * The remaining capacity in the block before it is full. | ||
| * @return the number of bytes remaining. | ||
| */ | ||
| abstract int remainingCapacity(); | ||
| abstract long remainingCapacity(); | ||
|
|
||
| /** | ||
| * Write a series of bytes from the buffer, from the offset. | ||
|
|
@@ -391,7 +391,7 @@ static class ArrayBlockFactory extends BlockFactory { | |
| } | ||
|
|
||
| @Override | ||
| DataBlock create(long index, int limit, | ||
| DataBlock create(long index, long limit, | ||
| BlockOutputStreamStatistics statistics) | ||
| throws IOException { | ||
| return new ByteArrayBlock(0, limit, statistics); | ||
|
|
@@ -436,11 +436,11 @@ static class ByteArrayBlock extends DataBlock { | |
| private Integer dataSize; | ||
|
|
||
| ByteArrayBlock(long index, | ||
| int limit, | ||
| long limit, | ||
| BlockOutputStreamStatistics statistics) { | ||
| super(index, statistics); | ||
| this.limit = limit; | ||
| buffer = new S3AByteArrayOutputStream(limit); | ||
| this.limit = (limit > Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) limit; | ||
| buffer = new S3AByteArrayOutputStream((int) limit); | ||
|
||
| blockAllocated(); | ||
| } | ||
|
|
||
|
|
@@ -449,7 +449,7 @@ static class ByteArrayBlock extends DataBlock { | |
| * @return the amount of data available to upload. | ||
| */ | ||
| @Override | ||
| int dataSize() { | ||
| long dataSize() { | ||
| return dataSize != null ? dataSize : buffer.size(); | ||
| } | ||
|
|
||
|
|
@@ -468,14 +468,14 @@ boolean hasCapacity(long bytes) { | |
| } | ||
|
|
||
| @Override | ||
| int remainingCapacity() { | ||
| long remainingCapacity() { | ||
| return limit - dataSize(); | ||
| } | ||
|
|
||
| @Override | ||
| int write(byte[] b, int offset, int len) throws IOException { | ||
| super.write(b, offset, len); | ||
| int written = Math.min(remainingCapacity(), len); | ||
| int written = (int) Math.min(remainingCapacity(), len); | ||
| buffer.write(b, offset, written); | ||
| return written; | ||
| } | ||
|
|
@@ -514,7 +514,7 @@ static class ByteBufferBlockFactory extends BlockFactory { | |
| } | ||
|
|
||
| @Override | ||
| ByteBufferBlock create(long index, int limit, | ||
| ByteBufferBlock create(long index, long limit, | ||
| BlockOutputStreamStatistics statistics) | ||
| throws IOException { | ||
| return new ByteBufferBlock(index, limit, statistics); | ||
|
|
@@ -564,11 +564,12 @@ class ByteBufferBlock extends DataBlock { | |
| * @param statistics statistics to update | ||
| */ | ||
| ByteBufferBlock(long index, | ||
| int bufferSize, | ||
| long bufferSize, | ||
| BlockOutputStreamStatistics statistics) { | ||
| super(index, statistics); | ||
| this.bufferSize = bufferSize; | ||
| blockBuffer = requestBuffer(bufferSize); | ||
| this.bufferSize = bufferSize > Integer.MAX_VALUE ? | ||
| Integer.MAX_VALUE : (int) bufferSize; | ||
| blockBuffer = requestBuffer(this.bufferSize); | ||
| blockAllocated(); | ||
| } | ||
|
|
||
|
|
@@ -577,7 +578,7 @@ class ByteBufferBlock extends DataBlock { | |
| * @return the amount of data available to upload. | ||
| */ | ||
| @Override | ||
| int dataSize() { | ||
| long dataSize() { | ||
| return dataSize != null ? dataSize : bufferCapacityUsed(); | ||
| } | ||
|
|
||
|
|
@@ -598,7 +599,7 @@ public boolean hasCapacity(long bytes) { | |
| } | ||
|
|
||
| @Override | ||
| public int remainingCapacity() { | ||
| public long remainingCapacity() { | ||
| return blockBuffer != null ? blockBuffer.remaining() : 0; | ||
| } | ||
|
|
||
|
|
@@ -609,7 +610,7 @@ private int bufferCapacityUsed() { | |
| @Override | ||
| int write(byte[] b, int offset, int len) throws IOException { | ||
| super.write(b, offset, len); | ||
| int written = Math.min(remainingCapacity(), len); | ||
| int written = (int) Math.min(remainingCapacity(), len); | ||
| blockBuffer.put(b, offset, written); | ||
| return written; | ||
| } | ||
|
|
@@ -809,7 +810,7 @@ static class DiskBlockFactory extends BlockFactory { | |
| */ | ||
| @Override | ||
| DataBlock create(long index, | ||
| int limit, | ||
| long limit, | ||
| BlockOutputStreamStatistics statistics) | ||
| throws IOException { | ||
| File destFile = getOwner() | ||
|
|
@@ -825,14 +826,14 @@ DataBlock create(long index, | |
| */ | ||
| static class DiskBlock extends DataBlock { | ||
|
|
||
| private int bytesWritten; | ||
| private long bytesWritten; | ||
| private final File bufferFile; | ||
| private final int limit; | ||
| private final long limit; | ||
| private BufferedOutputStream out; | ||
| private final AtomicBoolean closed = new AtomicBoolean(false); | ||
|
|
||
| DiskBlock(File bufferFile, | ||
| int limit, | ||
| long limit, | ||
| long index, | ||
| BlockOutputStreamStatistics statistics) | ||
| throws FileNotFoundException { | ||
|
|
@@ -844,7 +845,7 @@ static class DiskBlock extends DataBlock { | |
| } | ||
|
|
||
| @Override | ||
| int dataSize() { | ||
| long dataSize() { | ||
| return bytesWritten; | ||
| } | ||
|
|
||
|
|
@@ -854,14 +855,14 @@ boolean hasCapacity(long bytes) { | |
| } | ||
|
|
||
| @Override | ||
| int remainingCapacity() { | ||
| long remainingCapacity() { | ||
| return limit - bytesWritten; | ||
| } | ||
|
|
||
| @Override | ||
| int write(byte[] b, int offset, int len) throws IOException { | ||
| super.write(b, offset, len); | ||
| int written = Math.min(remainingCapacity(), len); | ||
| int written = (int) Math.min(remainingCapacity(), len); | ||
| out.write(b, offset, written); | ||
| bytesWritten += written; | ||
| return written; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -595,7 +595,6 @@ public void initialize(URI name, Configuration originalConf) | |
| } | ||
| blockOutputBuffer = conf.getTrimmed(FAST_UPLOAD_BUFFER, | ||
| DEFAULT_FAST_UPLOAD_BUFFER); | ||
| partSize = ensureOutputParameterInRange(MULTIPART_SIZE, partSize); | ||
| blockFactory = S3ADataBlocks.createFactory(this, blockOutputBuffer); | ||
| blockOutputActiveBlocks = intOption(conf, | ||
| FAST_UPLOAD_ACTIVE_BLOCKS, DEFAULT_FAST_UPLOAD_ACTIVE_BLOCKS, 1); | ||
|
|
@@ -1831,6 +1830,11 @@ private FSDataOutputStream innerCreateFile( | |
| final PutObjectOptions putOptions = | ||
| new PutObjectOptions(keep, null, options.getHeaders()); | ||
|
|
||
| if(!checkDiskBuffer(getConf())){ | ||
|
||
| throw new IOException("The filesystem conf is not " + | ||
HarshitGupta11 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| "proper for the output stream"); | ||
| } | ||
|
|
||
| final S3ABlockOutputStream.BlockOutputStreamBuilder builder = | ||
| S3ABlockOutputStream.builder() | ||
| .withKey(destKey) | ||
|
|
@@ -1854,7 +1858,9 @@ private FSDataOutputStream innerCreateFile( | |
| .withCSEEnabled(isCSEEnabled) | ||
| .withPutOptions(putOptions) | ||
| .withIOStatisticsAggregator( | ||
| IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator()); | ||
| IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator()) | ||
| .withMultipartAllowed(getConf().getBoolean( | ||
| MULTIPART_UPLOADS_ENABLED, MULTIPART_UPLOAD_ENABLED_DEFAULT)); | ||
| return new FSDataOutputStream( | ||
| new S3ABlockOutputStream(builder), | ||
| null); | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.