-
Notifications
You must be signed in to change notification settings - Fork 9.2k
HADOOP-19256 Integrate PutIfNotExist functionality into S3A createFile() #7011
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
19cc916
b5637c0
1f78fdc
a36fa15
192b1f6
0eddecb
6f4af93
d66dc79
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,6 +25,7 @@ | |
| import java.util.ArrayList; | ||
| import java.util.List; | ||
| import java.util.Locale; | ||
| import java.util.Map; | ||
| import java.util.StringJoiner; | ||
| import java.util.concurrent.ExecutionException; | ||
| import java.util.concurrent.ExecutorService; | ||
|
|
@@ -99,6 +100,8 @@ class S3ABlockOutputStream extends OutputStream implements | |
| private static final String E_NOT_SYNCABLE = | ||
| "S3A streams are not Syncable. See HADOOP-17597."; | ||
|
|
||
| public static final String IF_NONE_MATCH_HEADER = "If-None-Match"; | ||
|
|
||
| /** Object being uploaded. */ | ||
| private final String key; | ||
|
|
||
|
|
@@ -596,7 +599,7 @@ private long putObject() throws IOException { | |
| final S3ADataBlocks.DataBlock block = getActiveBlock(); | ||
| long size = block.dataSize(); | ||
| final S3ADataBlocks.BlockUploadData uploadData = block.startUpload(); | ||
| final PutObjectRequest putObjectRequest = uploadData.hasFile() ? | ||
| PutObjectRequest putObjectRequest = uploadData.hasFile() ? | ||
| writeOperationHelper.createPutObjectRequest( | ||
| key, | ||
| uploadData.getFile().length(), | ||
|
|
@@ -608,6 +611,16 @@ private long putObject() throws IOException { | |
| builder.putOptions, | ||
| false); | ||
|
|
||
| PutObjectRequest.Builder maybeModifiedPutIfAbsentRequest = putObjectRequest.toBuilder(); | ||
| Map<String, String> optionHeaders = builder.putOptions.getHeaders(); | ||
|
|
||
| if (optionHeaders != null && optionHeaders.containsKey(IF_NONE_MATCH_HEADER)) { | ||
| maybeModifiedPutIfAbsentRequest.overrideConfiguration( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. as mentioned below as well, think we should upgrade SDK and then use the new .ifNoneMatch(). Also I would recommend you move all of this logic into a new private method in this class.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. actually, I would move this logic to RequestFactoryImpl. But you will need to add in a flag to the method signature, "isConditonalPutEnabled", and only add the if-none-match header in when sConditonalPutEnabled is true. Basically, you only want to add |
||
| override -> override.putHeader(IF_NONE_MATCH_HEADER, optionHeaders.get(IF_NONE_MATCH_HEADER))); | ||
| } | ||
|
|
||
| final PutObjectRequest finalizedRequest = maybeModifiedPutIfAbsentRequest.build(); | ||
|
|
||
| BlockUploadProgress progressCallback = | ||
| new BlockUploadProgress(block, progressListener, now()); | ||
| statistics.blockUploadQueued(size); | ||
|
|
@@ -617,7 +630,7 @@ private long putObject() throws IOException { | |
| // the putObject call automatically closes the input | ||
| // stream afterwards. | ||
| PutObjectResponse response = | ||
| writeOperationHelper.putObject(putObjectRequest, builder.putOptions, uploadData, | ||
| writeOperationHelper.putObject(finalizedRequest, builder.putOptions, uploadData, | ||
| uploadData.hasFile(), statistics); | ||
| progressCallback.progressChanged(REQUEST_BYTE_TRANSFER_EVENT); | ||
| return response; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -59,6 +59,7 @@ | |
| import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets; | ||
|
|
||
| import static org.apache.commons.lang3.StringUtils.isNotEmpty; | ||
| import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_IF_NONE_MATCH; | ||
| import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.UNKNOWN_ALGORITHM; | ||
| import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT; | ||
| import static org.apache.hadoop.util.Preconditions.checkArgument; | ||
|
|
@@ -517,12 +518,22 @@ public CreateMultipartUploadRequest.Builder newMultipartUploadRequestBuilder( | |
| public CompleteMultipartUploadRequest.Builder newCompleteMultipartUploadRequestBuilder( | ||
| String destKey, | ||
| String uploadId, | ||
| List<CompletedPart> partETags) { | ||
| List<CompletedPart> partETags, | ||
| PutObjectOptions putOptions) { | ||
|
|
||
| // a copy of the list is required, so that the AWS SDK doesn't | ||
| // attempt to sort an unmodifiable list. | ||
| CompleteMultipartUploadRequest.Builder requestBuilder = | ||
| CompleteMultipartUploadRequest.builder().bucket(bucket).key(destKey).uploadId(uploadId) | ||
| CompleteMultipartUploadRequest.Builder requestBuilder; | ||
| Map<String, String> optionHeaders = putOptions.getHeaders(); | ||
diljotgrewal marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| if (optionHeaders != null && optionHeaders.containsKey("If-None-Match")) { | ||
| requestBuilder = CompleteMultipartUploadRequest.builder().bucket(bucket).key(destKey).uploadId(uploadId) | ||
| .overrideConfiguration(override ->override.putHeader("If-None-Match", optionHeaders.get("If-None-Match"))) | ||
|
||
| .multipartUpload(CompletedMultipartUpload.builder().parts(partETags).build()); | ||
| } else { | ||
| requestBuilder = CompleteMultipartUploadRequest.builder().bucket(bucket).key(destKey).uploadId(uploadId) | ||
diljotgrewal marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| .multipartUpload(CompletedMultipartUpload.builder().parts(partETags).build()); | ||
| } | ||
|
|
||
| return prepareRequest(requestBuilder); | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,113 @@ | ||
| package org.apache.hadoop.fs.s3a.impl; | ||
|
|
||
| import org.apache.hadoop.conf.Configuration; | ||
| import org.apache.hadoop.fs.FSDataOutputStream; | ||
| import org.apache.hadoop.fs.FSDataOutputStreamBuilder; | ||
| import org.apache.hadoop.fs.FileSystem; | ||
| import org.apache.hadoop.fs.Path; | ||
| import org.apache.hadoop.fs.s3a.AbstractS3ATestBase; | ||
| import org.apache.hadoop.fs.s3a.RemoteFileChangedException; | ||
| import org.apache.hadoop.fs.s3a.S3ATestUtils; | ||
| import org.apache.hadoop.io.IOUtils; | ||
|
|
||
| import org.junit.Assert; | ||
| import org.junit.Test; | ||
| import software.amazon.awssdk.services.s3.model.S3Exception; | ||
|
|
||
| import java.io.IOException; | ||
| import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; | ||
| import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER; | ||
| import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER_ARRAY; | ||
| import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_IF_NONE_MATCH; | ||
| import static org.apache.hadoop.fs.s3a.Constants.MIN_MULTIPART_THRESHOLD; | ||
| import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE; | ||
| import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE; | ||
| import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; | ||
| import static org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT; | ||
| import static org.apache.hadoop.fs.s3a.scale.ITestS3AMultipartUploadSizeLimits.MPU_SIZE; | ||
| import static org.apache.hadoop.fs.s3a.scale.S3AScaleTestBase._1MB; | ||
|
|
||
|
|
||
| public class ITestS3APutIfMatch extends AbstractS3ATestBase { | ||
diljotgrewal marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| @Override | ||
| protected Configuration createConfiguration() { | ||
| Configuration conf = super.createConfiguration(); | ||
| S3ATestUtils.disableFilesystemCaching(conf); | ||
| removeBaseAndBucketOverrides(conf, | ||
diljotgrewal marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| MULTIPART_SIZE, | ||
| UPLOAD_PART_COUNT_LIMIT); | ||
| conf.setLong(MULTIPART_SIZE, MPU_SIZE); | ||
| conf.setLong(UPLOAD_PART_COUNT_LIMIT, 2); | ||
| conf.setLong(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE); | ||
| conf.setInt(MULTIPART_SIZE, MULTIPART_MIN_SIZE); | ||
| conf.set(FAST_UPLOAD_BUFFER, getBlockOutputBufferName()); | ||
diljotgrewal marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| return conf; | ||
| } | ||
|
|
||
| protected String getBlockOutputBufferName() { | ||
| return FAST_UPLOAD_BUFFER_ARRAY; | ||
| } | ||
|
|
||
| /** | ||
| * Create a file using the PutIfMatch feature from S3 | ||
| * @param fs filesystem | ||
| * @param path path to write | ||
| * @param data source dataset. Can be null | ||
| * @throws IOException on any problem | ||
| */ | ||
| private static void createFileWithIfNoneMatchFlag(FileSystem fs, | ||
diljotgrewal marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| Path path, | ||
| byte[] data, | ||
| String ifMatchTag) throws Exception { | ||
| FSDataOutputStreamBuilder builder = fs.createFile(path); | ||
| builder.must(FS_S3A_CREATE_IF_NONE_MATCH, ifMatchTag); | ||
| FSDataOutputStream stream = builder.create().build(); | ||
| if (data != null && data.length > 0) { | ||
| stream.write(data); | ||
| } | ||
| stream.close(); | ||
| IOUtils.closeStream(stream); | ||
| } | ||
|
|
||
| @Test | ||
| public void testPutIfAbsentConflict() throws IOException { | ||
| FileSystem fs = getFileSystem(); | ||
| Path testFile = methodPath(); | ||
|
|
||
| fs.mkdirs(testFile.getParent()); | ||
| byte[] fileBytes = dataset(TEST_FILE_LEN, 0, 255); | ||
|
|
||
| try { | ||
| createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*"); | ||
| createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*"); | ||
| } catch (Exception e) { | ||
| Assert.assertEquals(RemoteFileChangedException.class, e.getClass()); | ||
diljotgrewal marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| S3Exception s3Exception = (S3Exception) e.getCause(); | ||
|
||
| Assert.assertEquals(s3Exception.statusCode(), 412); | ||
diljotgrewal marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
| } | ||
|
|
||
|
|
||
| @Test | ||
| public void testPutIfAbsentLargeFileConflict() throws IOException { | ||
| FileSystem fs = getFileSystem(); | ||
| Path testFile = methodPath(); | ||
|
|
||
| fs.mkdirs(testFile.getParent()); | ||
diljotgrewal marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| // enough bytes for Multipart Upload | ||
| byte[] fileBytes = dataset(6 * _1MB, 'a', 'z' - 'a'); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. there are ways to do this with small files, this matters because a 6MB file is large enough that it'd have to become a scale test. Yes, it is a scale test when people are testing across the planet. Do not worry about this right now, but be aware before merging into trunk the large file test will have to be moved to a subset of S3AScaleTestBase, and this test replaced with something using a write to a magic path –or even better, we add another new create file option to force multipart uploads always. Designing for that makes me think that a followup to this should move to an enumset of CreateFile flags |
||
|
|
||
| try { | ||
| createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*"); | ||
| createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*"); | ||
| } catch (Exception e) { | ||
diljotgrewal marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| Assert.assertEquals(RemoteFileChangedException.class, e.getClass()); | ||
|
|
||
| // Error gets caught here: | ||
| S3Exception s3Exception = (S3Exception) e.getCause(); | ||
| Assert.assertEquals(s3Exception.statusCode(), 412); | ||
| } | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.