-
Notifications
You must be signed in to change notification settings - Fork 9.2k
HADOOP-16190. S3A copyFile operation to include source versionID or etag in the copy request #606
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
8ca29e7
6aabc59
c962fee
40591d7
5374d74
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -32,6 +32,12 @@ | |
| @InterfaceStability.Unstable | ||
| public class RemoteFileChangedException extends PathIOException { | ||
|
|
||
| /** | ||
| * Error message used when mapping a 412 to this exception. | ||
| */ | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. whitespace:end of line There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. whitespace:end of line |
||
| public static final String PRECONDITIONS_NOT_MET = | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The Http Response code is "Preconditions Failed" so we probably want to use the same string and keep the name. |
||
| "Constraints of request were unsatisfiable"; | ||
|
|
||
| /** | ||
| * Constructs a RemoteFileChangedException. | ||
| * | ||
|
|
@@ -46,4 +52,21 @@ public RemoteFileChangedException(String path, | |
| super(path, message); | ||
| setOperation(operation); | ||
| } | ||
|
|
||
| /** | ||
| * Constructs a RemoteFileChangedException. | ||
| * | ||
| * @param path the path accessed when the change was detected | ||
| * @param operation the operation (e.g. open, re-open) performed when the | ||
| * change was detected | ||
| * @param message a message providing more details about the condition | ||
| * @param cause inner cause. | ||
| */ | ||
| public RemoteFileChangedException(final String path, | ||
| final String operation, | ||
| final String message, | ||
| final Throwable cause) { | ||
| super(path, message, cause); | ||
| setOperation(operation); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -74,6 +74,7 @@ | |
| import com.amazonaws.services.s3.transfer.TransferManager; | ||
| import com.amazonaws.services.s3.transfer.TransferManagerConfiguration; | ||
| import com.amazonaws.services.s3.transfer.Upload; | ||
| import com.amazonaws.services.s3.transfer.model.CopyResult; | ||
| import com.amazonaws.services.s3.transfer.model.UploadResult; | ||
| import com.amazonaws.event.ProgressListener; | ||
| import com.google.common.annotations.VisibleForTesting; | ||
|
|
@@ -2855,16 +2856,17 @@ public List<RoleModel.Statement> listAWSPolicyRules( | |
| /** | ||
| * Copy a single object in the bucket via a COPY operation. | ||
| * There's no update of metadata, directory markers, etc. | ||
| * Callers must implement. | ||
| * Callers must add those. | ||
| * @param srcKey source object path | ||
| * @param dstKey destination object path | ||
| * @param size object size | ||
| * @return the result of the copy. | ||
| * @throws AmazonClientException on failures inside the AWS SDK | ||
| * @throws InterruptedIOException the operation was interrupted | ||
| * @throws IOException Other IO problems | ||
| */ | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done; will review rest of comment. |
||
| @Retries.RetryMixed | ||
| private void copyFile(String srcKey, String dstKey, long size) | ||
| private CopyResult copyFile(String srcKey, String dstKey, long size) | ||
| throws IOException, InterruptedIOException { | ||
| LOG.debug("copyFile {} -> {} ", srcKey, dstKey); | ||
|
|
||
|
|
@@ -2878,27 +2880,41 @@ private void copyFile(String srcKey, String dstKey, long size) | |
| } | ||
| }; | ||
|
|
||
| once("copyFile(" + srcKey + ", " + dstKey + ")", srcKey, | ||
| () -> { | ||
| ObjectMetadata srcom = getObjectMetadata(srcKey); | ||
| ObjectMetadata dstom = cloneObjectMetadata(srcom); | ||
| setOptionalObjectMetadata(dstom); | ||
| CopyObjectRequest copyObjectRequest = | ||
| new CopyObjectRequest(bucket, srcKey, bucket, dstKey); | ||
| setOptionalCopyObjectRequestParameters(copyObjectRequest); | ||
| copyObjectRequest.setCannedAccessControlList(cannedACL); | ||
| copyObjectRequest.setNewObjectMetadata(dstom); | ||
| Copy copy = transfers.copy(copyObjectRequest); | ||
| copy.addProgressListener(progressListener); | ||
| try { | ||
| copy.waitForCopyResult(); | ||
| incrementWriteOperations(); | ||
| instrumentation.filesCopied(1, size); | ||
| } catch (InterruptedException e) { | ||
| throw new InterruptedIOException("Interrupted copying " + srcKey | ||
| + " to " + dstKey + ", cancelling"); | ||
| } | ||
| }); | ||
| try { | ||
| return once("copyFile(" + srcKey + ", " + dstKey + ")", srcKey, | ||
| () -> { | ||
| ObjectMetadata srcom = getObjectMetadata(srcKey); | ||
| ObjectMetadata dstom = cloneObjectMetadata(srcom); | ||
| setOptionalObjectMetadata(dstom); | ||
| CopyObjectRequest copyObjectRequest = | ||
| new CopyObjectRequest(bucket, srcKey, bucket, dstKey); | ||
| setOptionalCopyObjectRequestParameters(copyObjectRequest); | ||
| copyObjectRequest.setCannedAccessControlList(cannedACL); | ||
| copyObjectRequest.setNewObjectMetadata(dstom); | ||
| String id = srcom.getVersionId(); | ||
| if (id != null) { | ||
| copyObjectRequest.setSourceVersionId(id); | ||
| } else if (isNotEmpty(srcom.getETag())) { | ||
| copyObjectRequest.withMatchingETagConstraint(srcom.getETag()); | ||
| } | ||
| Copy copy = transfers.copy(copyObjectRequest); | ||
| copy.addProgressListener(progressListener); | ||
| try { | ||
| CopyResult r = copy.waitForCopyResult(); | ||
| incrementWriteOperations(); | ||
| instrumentation.filesCopied(1, size); | ||
| return r; | ||
| } catch (InterruptedException e) { | ||
| throw (IOException) new InterruptedIOException( | ||
steveloughran marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| "Interrupted copying " + srcKey + " to " + dstKey | ||
| + ", cancelling").initCause(e); | ||
| } | ||
| }); | ||
| } catch (RemoteFileChangedException e) { | ||
| // file changed during the copy. Fail, after adding the counter. | ||
| instrumentation.incrementCounter(STREAM_READ_VERSION_MISMATCHES, 1); | ||
| throw e; | ||
| } | ||
| } | ||
|
|
||
| /** | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -257,6 +257,14 @@ public static IOException translateException(@Nullable String operation, | |
| ioe = new AWSBadRequestException(message, s3Exception); | ||
| break; | ||
|
|
||
| // version/etag id cannot be met in copy. | ||
| case 412: | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. public final static int PRECONDITION_FAILED = 412; |
||
| ioe = new RemoteFileChangedException(path, | ||
| operation, | ||
| RemoteFileChangedException.PRECONDITIONS_NOT_MET, | ||
| ase); | ||
| break; | ||
|
|
||
| // out of range. This may happen if an object is overwritten with | ||
| // a shorter one while it is being read. | ||
| case 416: | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. RANGE_NOT_SATISFIABLE There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (argh, hitting cmd-enter turns this into a review and not a comment like it used to) |
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -288,6 +288,57 @@ For the default test dataset, hosted in the `landsat-pds` bucket, this is: | |
| </property> | ||
| ``` | ||
|
|
||
| ## <a name="versions"></a> Testing against versioned buckets | ||
|
|
||
| AWS S3 and some third party stores support versioned buckets. | ||
|
|
||
| Hadoop is adding awareness of this, including | ||
|
|
||
| * Using version ID to guarantee consistent reads of opened files. | ||
| [HADOOP-15625](https://issues.apache.org/jira/browse/HADOOP-15625) | ||
| * Using version ID to guarantee consistent multipart copies. | ||
| * Checks to avoid creating needless delete markers. | ||
|
|
||
| + maybe more to come. | ||
|
|
||
| To test these features, you need to have buckets with object versioning | ||
| enabled. | ||
|
|
||
| A full `hadoop-aws` test run implicitly cleans up all files in the bucket | ||
| in `ITestS3AContractRootDir`, so every test run creates a large set of | ||
| old (deleted) file versions. To avoid large bills, you must | ||
| create a lifecycle rule on the bucket to purge the old versions. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nevermind, given the specific steps listed below |
||
|
|
||
|
|
||
|
|
||
| ### How to set up a test bucket for object versioning | ||
|
|
||
| 1. Find the bucket in the AWS management console. | ||
| 1. In the _Properties_ tab, enable versioning. | ||
| 1. In the _Management_ tab, add a lifecycle rule with an "expiration" policy | ||
| to delete old versions (included deletion markers) | ||
| in 1-2 days. | ||
| 1. Consider also adding an "abort all multipart uploads" option here too. | ||
|
|
||
|  | ||
|
|
||
|
|
||
| You will be billed for all old versions of every file, so this lifecycle rule _is critical_. | ||
|
|
||
|
|
||
| Once versioning is enabled, set change detection to `versionid` for the bucket, | ||
| so that it will be used for all IO in the test suite. | ||
|
|
||
| ```xml | ||
| <property> | ||
| <name>fs.s3a.bucket.TEST-BUCKET.change.detection.source</name> | ||
| <value>versionid</value> | ||
| </property> | ||
| ``` | ||
|
|
||
| To verify that the bucket has version support enabled, execute the test | ||
| `ITestS3ARemoteFileChanged` and verify that _no tests are skipped_. | ||
|
|
||
| ## <a name="reporting"></a> Viewing Integration Test Reports | ||
|
|
||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1155,6 +1155,25 @@ java.io.FileNotFoundException: Bucket stevel45r56666 does not exist | |
| Check the URI. If using a third-party store, verify that you've configured | ||
| the client to talk to the specific server in `fs.s3a.endpoint`. | ||
|
|
||
| #### <a name="preconditions_unmet"></a> `RemoteFileChangedException`: `Constraints of request were unsatisfiable` | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The http response is Precondition Failed, not "unmet". Even in AWS code, so I don't know from where unmet keeps popping up. |
||
|
|
||
| This error surfaces when the S3 endpoint returns the HTTP error code 412 on | ||
| a request. | ||
|
|
||
| This happens during a copy/rename when the etag of the source file | ||
| changed partway through the copy (when a large file is being copied), | ||
| or between the HEAD request to probe for the existence of the file | ||
| and the actual COPY request being initiated. | ||
|
|
||
| It may also surface if a file had just overwritten an existing file, | ||
| and, due to AWS S3's eventual consistency, the HEAD request | ||
| returned a different version of the file than the COPY command. | ||
|
|
||
| Fixes | ||
|
|
||
| 1. Don't change files while they are being copied. | ||
| 1. Don't rename files or directories immediately after they've been written to/under. | ||
|
|
||
| ## Other Issues | ||
|
|
||
| ### <a name="logging"></a> Enabling low-level logging | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
whitespace:end of line