diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDelete.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDelete.java
new file mode 100644
index 0000000000000..3be71f54b43c2
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDelete.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+
+import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
+
+/**
+ * Interface for bulk file delete operations.
+ *
+ * The expectation is that the iterator-provided list of paths
+ * will be batched into pages and submitted to the remote filesystem/store
+ * for bulk deletion, possibly in parallel.
+ *
+ * A remote iterator provides the list of paths to delete; all must be under
+ * the base path.
+ *
+ * There is no guarantee order of execution.
+ * Implementations may shuffle paths before posting requests.
+ *
+ * Callers MUST have no expectation that parent directories will exist after the
+ * operation completes; if an object store needs to explicitly look for and create
+ * directory markers, that step will be omitted.
+ *
+ * The iterator may be a {@code Closeable} and if so, it will be closed on
+ * completion, irrespective of the outcome.
+ *
+ * The iterator may be an {@code IOStatisticsSource} and if so, its statistics
+ * will be included in the statistics of the {@link Outcome}.
+ *
+ * If the iterator's methods raise any exception, the delete will fail fast;
+ * no more files will be submitted for deletion.
+ *
+ * The {@link #OPT_BACKGROUND} boolean option is a hint to prioritise other work
+ * over the delete; use it for background cleanup, compaction etc.
+ *
+ * no guarantee of page size being greater than 1, or even constant through
+ * the entire operation. The {@link #OPT_PAGE_SIZE} option can be used as
+ * a hint.
+ * Most object stores may have a maximum page size; if a larger size is requested,
+ * you will always get something at or below that limit (i.e. it is not an
+ * error to ask for more than the limit)
+ *
+ * Be aware that on some stores (AWS S3) each object listed in a bulk delete counts
+ * against the write IOPS limit; large page sizes are counterproductive here.
+ * @see HADOOP-16823.
+ * Large DeleteObject requests are their own Thundering Herd
+ *
+ * Progress callback: the callback may come from any thread, further work may or may
+ * not be blocked during the callback's processing in application code.
+ *
+ * If a bulk delete call fails, then the next progress report will include
+ * information about the failure; the callback can decide whether to continue
+ * or not. The default callback is {@link #FAIL_FAST}, which will trigger
+ * a fast failure.
+ *
+ * After a progress callback requests abort, active operations MAY continue.
+ * The {@link ProgressReport#aborting} flag indicates this.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public interface BulkDelete {
+
+ /**
+ * Initiate a bulk delete operation.
+ * @param base base path for the delete; it must belong to the FS and
+ * all files MUST be under this path
+ * @param files iterator of files. If Closeable, it will be closed once complete
+ * @return a builder for the operation
+ * @throws UnsupportedOperationException not supported.
+ * @throws IOException IO failure on initial builder creation.
+ */
+ Builder bulkDelete(Path base, RemoteIterator files)
+ throws UnsupportedOperationException, IOException;
+
+ /**
+ * Builder for the operation;
+ * The {@link #build()} method will initiate the operation, possibly
+ * blocking, possibly in a separate thread, possibly in a pool
+ * of threads.
+ */
+ interface Builder
+ extends FSBuilder, Builder> {
+
+ /**
+ * Add a progress callback.
+ * @param deleteProgress progress callback
+ * @return the builder
+ */
+ Builder withProgress(DeleteProgress deleteProgress);
+
+ }
+
+ /**
+ * Path capability for bulk delete.
+ */
+ String CAPABILITY_BULK_DELETE = "fs.capability.bulk.delete";
+
+ /**
+ * Numeric hint about page size, "preferred" rather than "required".
+ * Implementations will ignore this if it is out of their supported/preferred
+ * range.
+ */
+ String OPT_PAGE_SIZE = "fs.option.bulkdelete.page.size";
+
+ /**
+ * Is this a background operation?
+ * If so, a lower write rate may be used so that it doesn't interfere
+ * with higher priority workloads -such as through rate limiting
+ * and/or the use of smaller page sizes.
+ */
+ String OPT_BACKGROUND = "fs.option.bulkdelete.background";
+
+ /**
+ * Callback for progress; allows for a delete
+ * to be aborted (best effort).
+ * There are no guarantees as to which thread this will be called from.
+ */
+ interface DeleteProgress {
+
+ /**
+ * Report progress.
+ * @param update update to report
+ * @return true if the operation should continue, false to abort.
+ */
+ boolean report(ProgressReport update);
+ }
+
+ /**
+ * Delete progress report.
+ */
+ class ProgressReport {
+
+ /**
+ * Number of files deleted.
+ */
+ private final int deleteCount;
+
+ /**
+ * List of files which were deleted.
+ */
+ private final List successes;
+
+ /**
+ * List of files which failed to delete.
+ * This may be empty, but will never be null.
+ */
+ private final List failures;
+
+ /**
+ * An exception covering at least one of the failures
+ * encountered.
+ */
+ private final IOException exception;
+
+ /**
+ * Has an abort been requested from a previous progress report?
+ * If a progress report is delivered with this flag, it indicates
+ * that the abort has been requested, and that this report is
+ * from a page which was submitted before the abort request.
+ */
+ private final boolean aborting;
+
+ public ProgressReport(final int deleteCount,
+ final List successes,
+ final List failures,
+ final IOException exception,
+ final boolean aborting) {
+ this.deleteCount = deleteCount;
+ this.successes = successes;
+ this.failures = failures;
+ this.exception = exception;
+ this.aborting = aborting;
+ }
+
+ public int getDeleteCount() {
+ return deleteCount;
+ }
+
+ public List getSuccesses() {
+ return successes;
+ }
+
+ public List getFailures() {
+ return failures;
+ }
+
+ public IOException getException() {
+ return exception;
+ }
+
+ public boolean isAborting() {
+ return aborting;
+ }
+ }
+
+ /**
+ * Result of a bulk delete operation.
+ */
+ class Outcome implements IOStatisticsSource {
+
+ /**
+ * Did the operation succeed?
+ * That is: delete all files without any failures?
+ */
+ private final boolean successful;
+
+ /**
+ * Wast the operation aborted?
+ */
+ private final boolean aborted;
+
+ /**
+ * An exception covering at least one of the failures
+ * encountered.
+ */
+ private final Exception exception;
+
+ /**
+ * Number of files deleted.
+ */
+ private final int deleted;
+
+ /**
+ * Number of files which failed to delete.
+ */
+ private final int failures;
+
+ /**
+ * Number of delete pages submitted to the store.
+ */
+ private final int pageCount;
+
+
+ /**
+ * IO Statistics.
+ * This will include any statistics supplied by
+ * the iterator.
+ */
+ private final IOStatistics iostats;
+
+ public Outcome(final boolean successful,
+ final boolean aborted,
+ final Exception exception,
+ final int deleted,
+ final int failures,
+ final int pageCount,
+ final IOStatistics iostats) {
+
+ this.successful = successful;
+ this.aborted = aborted;
+ this.exception = exception;
+ this.deleted = deleted;
+ this.pageCount = pageCount;
+ this.failures = failures;
+ this.iostats = iostats;
+ }
+
+ public boolean successful() {
+ return successful;
+ }
+
+ public Exception getException() {
+ return exception;
+ }
+
+ public int getDeleted() {
+ return deleted;
+ }
+
+ public int getFailures() {
+ return failures;
+ }
+
+ @Override
+ public IOStatistics getIOStatistics() {
+ return iostats;
+ }
+
+ @Override
+ public String toString() {
+ return "Outcome{" +
+ "successful=" + successful +
+ ", deleteCount=" + deleted +
+ ", failures=" + failures +
+ ", iostats=" + ioStatisticsToPrettyString(iostats) +
+ '}';
+ }
+ }
+
+ /**
+ * A fail fast policy: if there are any failures, abort.
+ * This is the default until any other progress callback
+ * is set in the builder.
+ */
+ DeleteProgress FAIL_FAST = (report -> report.failures.isEmpty());
+
+ /**
+ * Continue if there are any failures.
+ */
+ DeleteProgress CONTINUE = (report -> true);
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/BulkDeleteSupport.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/BulkDeleteSupport.java
new file mode 100644
index 0000000000000..a70cb486ffd0f
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/BulkDeleteSupport.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.impl;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+import javax.annotation.Nonnull;
+
+import org.apache.hadoop.fs.BulkDelete;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Support for bulk delete operations.
+ */
+public final class BulkDeleteSupport {
+
+ /**
+ * Builder implementation; takes a callback for the actual operation.
+ */
+ public static class BulkDeleteBinding
+ extends AbstractFSBuilderImpl, BulkDelete.Builder>
+ implements BulkDelete.Builder {
+
+ private final RemoteIterator files;
+
+ private final BulkDeleteBuilderCallbacks callbacks;
+
+ private BulkDelete.DeleteProgress deleteProgress;
+
+ public BulkDeleteBinding(
+ @Nonnull final Path path,
+ @Nonnull RemoteIterator files,
+ @Nonnull BulkDeleteBuilderCallbacks callbacks) {
+ super(path);
+ this.files = requireNonNull(files);
+ this.callbacks = requireNonNull(callbacks);
+ }
+
+ @Override
+ public BulkDelete.Builder withProgress(final BulkDelete.DeleteProgress deleteProgress) {
+ this.deleteProgress = deleteProgress;
+ return this;
+ }
+
+ @Override
+ public BulkDelete.Builder getThisBuilder() {
+ return this;
+ }
+
+ public RemoteIterator getFiles() {
+ return files;
+ }
+
+ public BulkDelete.DeleteProgress getDeleteProgress() {
+ return deleteProgress;
+ }
+
+ @Override
+ public CompletableFuture build()
+ throws IllegalArgumentException, IOException {
+ return null;
+ }
+ }
+
+ /**
+ * Callbacks for the builder.
+ */
+ public interface BulkDeleteBuilderCallbacks {
+ CompletableFuture initiateBulkDelete(
+ BulkDeleteBinding builder) throws IOException;
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java
index 19ee9d1414ecf..d7d4bc45dba08 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java
@@ -46,6 +46,9 @@ public final class StoreStatisticNames {
/** {@value}. */
public static final String OP_APPEND = "op_append";
+ /** {@value}. */
+ public static final String OP_BULK_DELETE = "op_bulk-delete";
+
/** {@value}. */
public static final String OP_COPY_FROM_LOCAL_FILE =
"op_copy_from_local_file";
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index c5e6e09a835eb..792952bfb1f04 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -105,6 +105,7 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BulkDelete;
import org.apache.hadoop.fs.CommonPathCapabilities;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
@@ -113,6 +114,7 @@
import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
import org.apache.hadoop.fs.Globber;
import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.impl.BulkDeleteSupport;
import org.apache.hadoop.fs.impl.OpenFileParameters;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A;
@@ -121,6 +123,7 @@
import org.apache.hadoop.fs.s3a.auth.delegation.DelegationTokenProvider;
import org.apache.hadoop.fs.s3a.impl.AWSCannedACL;
import org.apache.hadoop.fs.s3a.impl.AWSHeaders;
+import org.apache.hadoop.fs.s3a.impl.BulkDeleteApiOperation;
import org.apache.hadoop.fs.s3a.impl.BulkDeleteRetryHandler;
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
import org.apache.hadoop.fs.s3a.impl.ConfigurationHelper;
@@ -284,7 +287,8 @@
@InterfaceStability.Evolving
public class S3AFileSystem extends FileSystem implements StreamCapabilities,
AWSPolicyProvider, DelegationTokenProvider, IOStatisticsSource,
- AuditSpanSource, ActiveThreadSpanSource {
+ AuditSpanSource, ActiveThreadSpanSource,
+ BulkDelete {
/**
* Default blocksize as used in blocksize and FS status queries.
@@ -5523,6 +5527,10 @@ public boolean hasPathCapability(final Path path, final String capability)
case FS_S3A_CREATE_HEADER:
return true;
+ // bulk delete support.
+ case CAPABILITY_BULK_DELETE:
+ return true;
+
// is the FS configured for create file performance
case FS_S3A_CREATE_PERFORMANCE_ENABLED:
return performanceCreation;
@@ -5851,4 +5859,63 @@ public boolean isMultipartUploadEnabled() {
return isMultipartUploadEnabled;
}
+ /**
+ * Bulk delete operation.
+ */
+ @Override
+ public BulkDelete.Builder bulkDelete(final Path base, final RemoteIterator files)
+ throws UnsupportedOperationException, IOException {
+ final Path path = qualify(base);
+
+ AuditSpan span = createSpan(INVOCATION_BULK_DELETE.getSymbol(), path.toString(), null);
+ return new BulkDeleteSupport.BulkDeleteBinding(path, files, builder ->
+ new BulkDeleteApiOperation(createStoreContext(),
+ span,
+ builder,
+ pageSize,
+ createBulkDeleteApiOperationCallbacks(builder, span))
+ .execute());
+ }
+
+ /**
+ * Create the callbacks for the bulk delete operation.
+ *
+ * @param builder builder
+ * @param span span
+ * @return callbacks
+ */
+ protected BulkDeleteApiOperation.BulkDeleteApiOperationCallbacks createBulkDeleteApiOperationCallbacks(
+ final BulkDeleteSupport.BulkDeleteBinding builder,
+ final AuditSpan span) {
+ return new BulkDeleteApiOperationCallbacksImpl(span, builder.getPath());
+ }
+
+ /**
+ * Callbacks for the bulk delete operation.
+ */
+ private class BulkDeleteApiOperationCallbacksImpl implements
+ BulkDeleteApiOperation.BulkDeleteApiOperationCallbacks {
+
+
+ /** Audit Span at time of creation. */
+ private final AuditSpan auditSpan;
+
+ private final Path path;
+
+ private BulkDeleteApiOperationCallbacksImpl(AuditSpan auditSpan, Path path) {
+ this.auditSpan = auditSpan;
+ this.path = path;
+ }
+
+ @Override
+ @Retries.RetryTranslated
+ public void removeKeys(final List keysToDelete)
+ throws MultiObjectDeleteException, IOException {
+ auditSpan.activate();
+ once("delete", path.toString(), () ->
+ S3AFileSystem.this.removeKeys(keysToDelete, false));
+ }
+ }
+
+
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
index 72fc75b642415..f8d52b0f665ff 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
@@ -103,6 +103,10 @@ public enum Statistic {
StoreStatisticNames.OP_ACCESS,
"Calls of access()",
TYPE_DURATION),
+ INVOCATION_BULK_DELETE(
+ StoreStatisticNames.OP_BULK_DELETE,
+ "Calls of bulk delete()",
+ TYPE_COUNTER),
INVOCATION_COPY_FROM_LOCAL_FILE(
StoreStatisticNames.OP_COPY_FROM_LOCAL_FILE,
"Calls of copyFromLocalFile()",
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/BulkDeleteApiOperation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/BulkDeleteApiOperation.java
new file mode 100644
index 0000000000000..577fccfd2df7d
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/BulkDeleteApiOperation.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import com.amazonaws.AmazonClientException;
+import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
+
+import org.apache.hadoop.fs.BulkDelete;
+import org.apache.hadoop.fs.InvalidRequestException;
+import org.apache.hadoop.fs.impl.BulkDeleteSupport;
+import org.apache.hadoop.fs.s3a.Retries;
+import org.apache.hadoop.fs.store.audit.AuditSpan;
+import org.apache.hadoop.util.functional.FutureIO;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.emptyStatistics;
+import static org.apache.hadoop.util.functional.RemoteIterators.mappingRemoteIterator;
+import static org.apache.hadoop.util.functional.RemoteIterators.toList;
+
+/**
+ * Implementation of the public bulk delete API.
+ * Iterates through the list of files, initiating bulk delete calls.
+ */
+public class BulkDeleteApiOperation
+ extends ExecutingStoreOperation> {
+
+
+ private final BulkDeleteSupport.BulkDeleteBinding binding;
+
+
+ /**
+ * Number of entries in a page.
+ */
+ private final int pageSize;
+
+ private final BulkDeleteApiOperationCallbacks callbacks;
+
+ public BulkDeleteApiOperation(
+ final StoreContext storeContext,
+ final AuditSpan auditSpan,
+ final BulkDeleteSupport.BulkDeleteBinding binding,
+ final int pageSize,
+ final BulkDeleteApiOperationCallbacks callbacks) {
+ super(storeContext, auditSpan);
+ this.binding = requireNonNull(binding);
+ this.pageSize = pageSize;
+ this.callbacks = requireNonNull(callbacks);
+ }
+
+ @Override
+ public CompletableFuture execute() throws IOException {
+ // initial POC does a large blocking call.
+ return FutureIO.eval(() -> {
+ final StoreContext context = getStoreContext();
+ final List keysToDelete =
+ toList(mappingRemoteIterator(binding.getFiles(),
+ p -> ObjectIdentifier.builder().key(context.pathToKey(p)).build()));
+
+ boolean successful = true;
+ Exception exception = null;
+ try {
+ callbacks.removeKeys(keysToDelete);
+ } catch (IOException e) {
+ successful = false;
+ exception = e;
+ }
+ return new BulkDelete.Outcome(
+ successful,
+ false,
+ exception,
+ keysToDelete.size(),
+ 0,
+ 1,
+ emptyStatistics());
+ });
+ }
+
+ public interface BulkDeleteApiOperationCallbacks {
+
+ /**
+ * Remove keys from the store.
+ * @param keysToDelete collection of keys to delete on the s3-backend.
+ * if empty, no request is made of the object store.
+ * @throws InvalidRequestException if the request was rejected due to
+ * a mistaken attempt to delete the root directory.
+ * @throws MultiObjectDeleteException one or more of the keys could not
+ * be deleted in a multiple object delete operation.
+ * @throws IOException other IO Exception.
+ */
+ @Retries.RetryRaw
+ void removeKeys(List keysToDelete)
+ throws MultiObjectDeleteException, IOException;
+ }
+}