-
Notifications
You must be signed in to change notification settings - Fork 9.2k
HADOOP-19256 Integrate PutIfNotExist functionality into S3A createFile() #7011
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
19cc916
b5637c0
1f78fdc
a36fa15
192b1f6
0eddecb
6f4af93
d66dc79
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -79,6 +79,7 @@ | |
| import static org.apache.hadoop.fs.s3a.Statistic.*; | ||
| import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.CONTENT_TYPE_OCTET_STREAM; | ||
| import static org.apache.hadoop.fs.s3a.impl.ProgressListenerEvent.*; | ||
| import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.IF_NONE_MATCH; | ||
| import static org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext.EMPTY_BLOCK_OUTPUT_STREAM_STATISTICS; | ||
| import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration; | ||
| import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation; | ||
|
|
@@ -690,13 +691,23 @@ private long putObject() throws IOException { | |
| final S3ADataBlocks.DataBlock block = getActiveBlock(); | ||
| final long size = block.dataSize(); | ||
| final S3ADataBlocks.BlockUploadData uploadData = block.startUpload(); | ||
| final PutObjectRequest putObjectRequest = | ||
| PutObjectRequest putObjectRequest = | ||
| writeOperationHelper.createPutObjectRequest( | ||
| key, | ||
| uploadData.getSize(), | ||
| builder.putOptions); | ||
| clearActiveBlock(); | ||
|
|
||
| PutObjectRequest.Builder maybeModifiedPutIfAbsentRequest = putObjectRequest.toBuilder(); | ||
| Map<String, String> optionHeaders = builder.putOptions.getHeaders(); | ||
|
|
||
| if (optionHeaders != null && optionHeaders.containsKey(IF_NONE_MATCH)) { | ||
| maybeModifiedPutIfAbsentRequest.overrideConfiguration( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. as mentioned below as well, think we should upgrade SDK and then use the new .ifNoneMatch(). Also I would recommend you move all of this logic into a new private method in this class.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. actually, I would move this logic to RequestFactoryImpl. But you will need to add in a flag to the method signature, "isConditonalPutEnabled", and only add the if-none-match header in when sConditonalPutEnabled is true. Basically, you only want to add |
||
| override -> override.putHeader(IF_NONE_MATCH, optionHeaders.get(IF_NONE_MATCH))); | ||
| } | ||
|
|
||
| final PutObjectRequest finalizedRequest = maybeModifiedPutIfAbsentRequest.build(); | ||
|
|
||
| BlockUploadProgress progressCallback = | ||
| new BlockUploadProgress(block, progressListener, now()); | ||
| statistics.blockUploadQueued(size); | ||
|
|
@@ -1389,6 +1400,11 @@ public static final class BlockOutputStreamBuilder { | |
| */ | ||
| private boolean isMultipartUploadEnabled; | ||
|
|
||
| /** | ||
| * Is conditional create enables. | ||
| */ | ||
| private boolean isConditionalEnabled; | ||
diljotgrewal marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| private BlockOutputStreamBuilder() { | ||
| } | ||
|
|
||
|
|
@@ -1550,5 +1566,11 @@ public BlockOutputStreamBuilder withMultipartEnabled( | |
| isMultipartUploadEnabled = value; | ||
| return this; | ||
| } | ||
|
|
||
| public BlockOutputStreamBuilder withConditionalEnabled( | ||
| final boolean value){ | ||
| isConditionalEnabled = value; | ||
| return this; | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,142 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.hadoop.fs.s3a.impl; | ||
|
|
||
| import org.apache.hadoop.conf.Configuration; | ||
| import org.apache.hadoop.fs.FSDataOutputStream; | ||
| import org.apache.hadoop.fs.FSDataOutputStreamBuilder; | ||
| import org.apache.hadoop.fs.FileSystem; | ||
| import org.apache.hadoop.fs.Path; | ||
| import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest; | ||
| import org.apache.hadoop.fs.s3a.RemoteFileChangedException; | ||
| import org.apache.hadoop.fs.s3a.S3ATestUtils; | ||
| import org.apache.hadoop.io.IOUtils; | ||
|
|
||
| import org.assertj.core.api.Assertions; | ||
| import org.junit.Assert; | ||
| import org.junit.Test; | ||
| import software.amazon.awssdk.services.s3.model.S3Exception; | ||
|
|
||
| import java.io.IOException; | ||
| import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; | ||
| import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER; | ||
| import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER_ARRAY; | ||
| import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CONDITIONAL_FILE_CREATE; | ||
| import static org.apache.hadoop.fs.s3a.Constants.MIN_MULTIPART_THRESHOLD; | ||
| import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE; | ||
| import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE; | ||
| import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfNotEnabled; | ||
| import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; | ||
| import static org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT; | ||
| import static org.apache.hadoop.fs.s3a.scale.ITestS3AMultipartUploadSizeLimits.MPU_SIZE; | ||
| import static org.apache.hadoop.fs.s3a.scale.S3AScaleTestBase._1MB; | ||
|
|
||
|
|
||
| public class ITestS3APutIfMatch extends AbstractS3ACostTest { | ||
|
|
||
| private Configuration conf; | ||
|
|
||
| @Override | ||
| public Configuration createConfiguration() { | ||
| Configuration conf = super.createConfiguration(); | ||
| S3ATestUtils.disableFilesystemCaching(conf); | ||
| removeBaseAndBucketOverrides(conf, | ||
diljotgrewal marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| MULTIPART_SIZE, | ||
| UPLOAD_PART_COUNT_LIMIT); | ||
| conf.setLong(MULTIPART_SIZE, MPU_SIZE); | ||
| conf.setLong(UPLOAD_PART_COUNT_LIMIT, 2); | ||
| conf.setLong(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE); | ||
| conf.setInt(MULTIPART_SIZE, MULTIPART_MIN_SIZE); | ||
| conf.set(FAST_UPLOAD_BUFFER, getBlockOutputBufferName()); | ||
diljotgrewal marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| return conf; | ||
| } | ||
|
|
||
| @Override | ||
| public void setup() throws Exception { | ||
| super.setup(); | ||
| conf = createConfiguration(); | ||
| skipIfNotEnabled(conf, FS_S3A_CONDITIONAL_FILE_CREATE, | ||
| "Skipping IfNoneMatch tests"); | ||
| } | ||
|
|
||
| protected String getBlockOutputBufferName() { | ||
| return FAST_UPLOAD_BUFFER_ARRAY; | ||
| } | ||
|
|
||
| /** | ||
| * Create a file using the PutIfMatch feature from S3 | ||
| * @param fs filesystem | ||
| * @param path path to write | ||
| * @param data source dataset. Can be null | ||
| * @throws IOException on any problem | ||
| */ | ||
| private static void createFileWithIfNoneMatchFlag(FileSystem fs, | ||
diljotgrewal marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| Path path, | ||
| byte[] data, | ||
| String ifMatchTag) throws Exception { | ||
| FSDataOutputStreamBuilder builder = fs.createFile(path); | ||
| builder.must(FS_S3A_CONDITIONAL_FILE_CREATE, ifMatchTag); | ||
| FSDataOutputStream stream = builder.create().build(); | ||
| if (data != null && data.length > 0) { | ||
| stream.write(data); | ||
| } | ||
| stream.close(); | ||
| IOUtils.closeStream(stream); | ||
| } | ||
|
|
||
| @Test | ||
| public void testPutIfAbsentConflict() throws IOException { | ||
| FileSystem fs = getFileSystem(); | ||
| Path testFile = methodPath(); | ||
|
|
||
| fs.mkdirs(testFile.getParent()); | ||
| byte[] fileBytes = dataset(TEST_FILE_LEN, 0, 255); | ||
|
|
||
| try { | ||
| createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*"); | ||
| createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*"); | ||
| } catch (Exception e) { | ||
| Assert.assertEquals(RemoteFileChangedException.class, e.getClass()); | ||
diljotgrewal marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| S3Exception s3Exception = (S3Exception) e.getCause(); | ||
|
||
| Assertions.assertThat(s3Exception.statusCode()).isEqualTo(412); | ||
diljotgrewal marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
| } | ||
|
|
||
|
|
||
| @Test | ||
| public void testPutIfAbsentLargeFileConflict() throws IOException { | ||
| FileSystem fs = getFileSystem(); | ||
| Path testFile = methodPath(); | ||
|
|
||
| // enough bytes for Multipart Upload | ||
| byte[] fileBytes = dataset(6 * _1MB, 'a', 'z' - 'a'); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. there are ways to do this with small files, this matters because a 6MB file is large enough that it'd have to become a scale test. Yes, it is a scale test when people are testing across the planet. Do not worry about this right now, but be aware before merging into trunk the large file test will have to be moved to a subset of S3AScaleTestBase, and this test replaced with something using a write to a magic path –or even better, we add another new create file option to force multipart uploads always. Designing for that makes me think that a followup to this should move to an enumset of CreateFile flags |
||
|
|
||
| try { | ||
| createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*"); | ||
| createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*"); | ||
| } catch (Exception e) { | ||
diljotgrewal marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| Assert.assertEquals(RemoteFileChangedException.class, e.getClass()); | ||
|
|
||
| // Error gets caught here: | ||
| S3Exception s3Exception = (S3Exception) e.getCause(); | ||
| Assertions.assertThat(s3Exception.statusCode()).isEqualTo(412); | ||
| } | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.