getUploadsAborted() {
+ return uploadsAborted;
+ }
+
/**
* Delete a file or directory tree.
*
@@ -236,6 +260,17 @@ protected void deleteDirectoryTree(final Path path,
try (DurationInfo ignored =
new DurationInfo(LOG, false, "deleting %s", dirKey)) {
+ final CompletableFuture abortUploads;
+ if (dirOperationsPurgeUploads) {
+ final StoreContext sc = getStoreContext();
+ final String key = sc.pathToKey(path) + "/";
+ LOG.debug("All uploads under {} will be deleted", key);
+ abortUploads = submit(sc.getExecutor(), sc.getActiveAuditSpan(), () ->
+ callbacks.abortMultipartUploadsUnderPrefix(key));
+ } else {
+ abortUploads = null;
+ }
+
// init the lists of keys and paths to delete
resetDeleteList();
deleteFuture = null;
@@ -257,10 +292,10 @@ protected void deleteDirectoryTree(final Path path,
LOG.debug("Deleting final batch of listed files");
submitNextBatch();
maybeAwaitCompletion(deleteFuture);
-
+ uploadsAborted = waitForCompletionIgnoringExceptions(abortUploads);
}
- LOG.debug("Delete \"{}\" completed; deleted {} objects", path,
- filesDeleted);
+ LOG.debug("Delete \"{}\" completed; deleted {} objects and aborted {} uploads", path,
+ filesDeleted, uploadsAborted.orElse(0L));
}
/**
@@ -313,7 +348,8 @@ private void submitNextBatch()
throws IOException {
// delete a single page of keys and the metadata.
// block for any previous batch.
- maybeAwaitCompletion(deleteFuture);
+ maybeAwaitCompletion(deleteFuture).ifPresent(count ->
+ LOG.debug("Deleted {} uploads", count));
// delete the current page of keys and paths
deleteFuture = submitDelete(keys);
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OperationCallbacks.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OperationCallbacks.java
index e0d9c7c6aada70..9c88870633a358 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OperationCallbacks.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OperationCallbacks.java
@@ -164,4 +164,16 @@ RemoteIterator listObjects(
Path path,
String key)
throws IOException;
+
+ /**
+ * Abort multipart uploads under a path; paged.
+ * @param prefix prefix for uploads to abort
+ * @return a count of aborts
+ * @throws IOException trouble; FileNotFoundExceptions are swallowed.
+ */
+ @Retries.RetryTranslated
+ default long abortMultipartUploadsUnderPrefix(String prefix)
+ throws IOException {
+ return 0;
+ }
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RenameOperation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RenameOperation.java
index 4bb15f74965a9b..288b3c0aae585e 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RenameOperation.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RenameOperation.java
@@ -22,6 +22,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
@@ -44,6 +45,7 @@
import org.apache.hadoop.util.OperationDuration;
import static org.apache.hadoop.fs.s3a.S3AUtils.translateException;
+import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletionIgnoringExceptions;
import static org.apache.hadoop.fs.store.audit.AuditingFunctions.callableWithinAuditSpan;
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit;
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletion;
@@ -124,9 +126,18 @@ public class RenameOperation extends ExecutingStoreOperation {
private final List keysToDelete =
new ArrayList<>();
+ /**
+ * Do directory operations purge pending uploads?
+ */
+ private final boolean dirOperationsPurgeUploads;
+
+ /**
+ * Count of uploads aborted.
+ */
+ private Optional uploadsAborted = Optional.empty();
+
/**
* Initiate the rename.
- *
* @param storeContext store context
* @param sourcePath source path
* @param sourceKey key of source
@@ -136,6 +147,7 @@ public class RenameOperation extends ExecutingStoreOperation {
* @param destStatus destination status.
* @param callbacks callback provider
* @param pageSize size of delete requests
+ * @param dirOperationsPurgeUploads Do directory operations purge pending uploads?
*/
public RenameOperation(
final StoreContext storeContext,
@@ -146,7 +158,8 @@ public RenameOperation(
final String destKey,
final S3AFileStatus destStatus,
final OperationCallbacks callbacks,
- final int pageSize) {
+ final int pageSize,
+ final boolean dirOperationsPurgeUploads) {
super(storeContext);
this.sourcePath = sourcePath;
this.sourceKey = sourceKey;
@@ -159,6 +172,16 @@ public RenameOperation(
&& pageSize <= InternalConstants.MAX_ENTRIES_TO_DELETE,
"page size out of range: %s", pageSize);
this.pageSize = pageSize;
+ this.dirOperationsPurgeUploads = dirOperationsPurgeUploads;
+ }
+
+ /**
+ * Get the count of uploads aborted.
+ * Non-empty iff enabled, and the operations completed without errors.
+ * @return count of aborted uploads.
+ */
+ public Optional getUploadsAborted() {
+ return uploadsAborted;
}
/**
@@ -341,6 +364,16 @@ protected void recursiveDirectoryRename() throws IOException {
throw new RenameFailedException(srcKey, dstKey,
"cannot rename a directory to a subdirectory of itself ");
}
+ // start the async dir cleanup
+ final CompletableFuture abortUploads;
+ if (dirOperationsPurgeUploads) {
+ final String key = srcKey;
+ LOG.debug("All uploads under {} will be deleted", key);
+ abortUploads = submit(getStoreContext().getExecutor(), () ->
+ callbacks.abortMultipartUploadsUnderPrefix(key));
+ } else {
+ abortUploads = null;
+ }
if (destStatus != null
&& destStatus.isEmptyDirectory() == Tristate.TRUE) {
@@ -422,6 +455,8 @@ protected void recursiveDirectoryRename() throws IOException {
// have been deleted.
completeActiveCopiesAndDeleteSources("final copy and delete");
+ // and if uploads were being aborted, wait for that to finish
+ uploadsAborted = waitForCompletionIgnoringExceptions(abortUploads);
}
/**
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java
index 22fc630dad1f5a..ea1ea908486e25 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java
@@ -47,8 +47,8 @@
import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.s3a.Constants;
-import org.apache.hadoop.fs.s3a.MultipartUtils;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.WriteOperationHelper;
import org.apache.hadoop.fs.s3a.auth.RolePolicies;
@@ -683,7 +683,7 @@ private void promptBeforeAbort(PrintStream out) throws IOException {
private void processUploads(PrintStream out) throws IOException {
final S3AFileSystem fs = getFilesystem();
- MultipartUtils.UploadIterator uploads = fs.listUploads(prefix);
+ RemoteIterator uploads = fs.listUploads(prefix);
// create a span so that the write operation helper
// is within one
AuditSpan span =
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/third_party_stores.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/third_party_stores.md
index a7ea7b2e590248..0216e46014c7ea 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/third_party_stores.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/third_party_stores.md
@@ -39,11 +39,12 @@ The features which may be unavailable include:
* Optional Bucket Probes at startup (`fs.s3a.bucket.probe = 0`).
This is now the default -do not change it.
* List API to use (`fs.s3a.list.version = 1`)
+* Bucket lifecycle rules to clean up pending uploads.
## Configuring s3a to connect to a third party store
-### Connecting to a third party object store over HTTPS
+## Connecting to a third party object store over HTTPS
The core setting for a third party store is to change the endpoint in `fs.s3a.endpoint`.
@@ -89,6 +90,57 @@ then these must be set, either in XML or (preferred) in a JCEKS file.
If per-bucket settings are used here, then third-party stores and credentials may be used alongside an AWS store.
+
+
+## Other issues
+
+### Coping without bucket lifecycle rules
+
+Not all third-party stores support bucket lifecycle rules to clean up buckets
+of incomplete uploads.
+
+This can be addressed in two ways
+* Command line: `hadoop s3guard uploads -abort -force \`.
+* With `fs.s3a.multipart.purge` and a purge age set in `fs.s3a.multipart.purge.age`
+* In rename/delete `fs.s3a.directory.operations.purge.uploads = true`.
+
+#### S3Guard uploads command
+
+This can be executed on a schedule, or manually
+
+```
+hadoop s3guard uploads -abort -force s3a://bucket/
+```
+
+Consult the [S3Guard documentation](s3guard.html) for the full set of parameters.
+
+#### In startup: `fs.s3a.multipart.purge`
+
+This lists all uploads in a bucket when a filesystem is created and deletes
+all of those above a certain age.
+
+This can hurt performance on a large bucket, as the purge scans the entire tree,
+and is executed whenever a filesystem is created -which can happen many times during
+hive, spark, distcp jobs.
+
+For this reason, this option may be deleted in future, however it has long been
+available in the S3A client and so guaranteed to work across versions.
+
+#### During rename and delete: `fs.s3a.directory.operations.purge.uploads`
+
+When `fs.s3a.directory.operations.purge.uploads` is set, when a directory is renamed
+or deleted, then in parallel with the delete an attempt is made to list
+all pending uploads.
+If there are any, they are aborted (sequentially).
+
+* This is disabled by default: it adds overhead and extra cost.
+* Because it only applies to the directories being processed, directories which
+ are not renamed or deleted will retain all incomplete uploads.
+* There is no age checking: all uploads will be aborted.
+* If any other process is writing to the same directory tree, their operations
+will be cancelled.
+
+
# Troubleshooting
The most common problem when talking to third-party stores are
@@ -412,4 +464,5 @@ It is also a way to regression test foundational S3A third-party store compatibi
```
_Note_ If anyone is set up to test this reguarly, please let the hadoop developer team know if regressions do surface,
-as it is not a common test configuration.
\ No newline at end of file
+as it is not a common test configuration.
+[]
\ No newline at end of file
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractCreate.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractCreate.java
index d2a858f615ef6a..7a2a10879dd8ee 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractCreate.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractCreate.java
@@ -18,18 +18,111 @@
package org.apache.hadoop.fs.contract.s3a;
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractCreateTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+
+import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
/**
* S3A contract tests creating files.
+ * Parameterized on the create performance flag as all overwrite
+ * tests are required to fail in create performance mode.
*/
+@RunWith(Parameterized.class)
public class ITestS3AContractCreate extends AbstractContractCreateTest {
+ /**
+ * This test suite is parameterized for the different create file
+ * options.
+ * @return a list of test parameters.
+ */
+ @Parameterized.Parameters
+ public static Collection