keysToDelete,
boolean deleteFakeDir)
throws MultiObjectDeleteException, AwsServiceException, IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Initiating delete operation for {} objects",
- keysToDelete.size());
- for (ObjectIdentifier objectIdentifier : keysToDelete) {
- LOG.debug(" \"{}\" {}", objectIdentifier.key(),
- objectIdentifier.versionId() != null ? objectIdentifier.versionId() : "");
- }
- }
if (keysToDelete.isEmpty()) {
// exit fast if there are no keys to delete
return;
}
- for (ObjectIdentifier objectIdentifier : keysToDelete) {
- blockRootDelete(objectIdentifier.key());
+ if (keysToDelete.size() == 1) {
+ // single object is a single delete call.
+ // this is more informative in server logs and may be more efficient..
+ deleteObject(keysToDelete.get(0).key());
+ noteDeleted(1, deleteFakeDir);
+ return;
}
try {
if (enableMultiObjectsDelete) {
@@ -5457,7 +5421,11 @@ public boolean hasPathCapability(final Path path, final String capability)
case STORE_CAPABILITY_DIRECTORY_MARKER_AWARE:
return true;
- // multi object delete flag
+ // this is always true, even if multi object
+ // delete is disabled -the page size is simply reduced to 1.
+ case CommonPathCapabilities.BULK_DELETE:
+ return true;
+
case ENABLE_MULTI_DELETE:
return enableMultiObjectsDelete;
@@ -5639,6 +5607,7 @@ public S3AMultipartUploaderBuilder createMultipartUploader(
* new store context instances should be created as appropriate.
* @return the store context of this FS.
*/
+ @Override
@InterfaceAudience.Private
public StoreContext createStoreContext() {
return new StoreContextBuilder().setFsURI(getUri())
@@ -5740,4 +5709,29 @@ public boolean isMultipartUploadEnabled() {
return isMultipartUploadEnabled;
}
+ @Override
+ public BulkDelete createBulkDelete(final Path path)
+ throws IllegalArgumentException, IOException {
+
+ final Path p = makeQualified(path);
+ final AuditSpanS3A span = createSpan("bulkdelete", p.toString(), null);
+ final int size = enableMultiObjectsDelete ? pageSize : 1;
+ return new BulkDeleteOperation(
+ createStoreContext(),
+ createBulkDeleteCallbacks(p, size, span),
+ p,
+ size,
+ span);
+ }
+
+ /**
+ * Create the callbacks for the bulk delete operation.
+ * @param span span for operations.
+ * @return an instance of the Bulk Delete callbacks.
+ */
+ protected BulkDeleteOperation.BulkDeleteOperationCallbacks createBulkDeleteCallbacks(
+ Path path, int pageSize, AuditSpanS3A span) {
+ return new BulkDeleteOperationCallbacksImpl(store, pathToKey(path), pageSize, span);
+ }
+
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInternals.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInternals.java
index b4116068565c2..3f3178c7e6e28 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInternals.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInternals.java
@@ -33,6 +33,9 @@
/**
* This is an unstable interface for access to S3A Internal state, S3 operations
* and the S3 client connector itself.
+ *
+ * Note for maintainers: this is documented in {@code aws_sdk_upgrade.md}; update
+ * on changes.
*/
@InterfaceStability.Unstable
@InterfaceAudience.LimitedPrivate("testing/diagnostics")
@@ -52,13 +55,19 @@ public interface S3AInternals {
* set to false.
*
* Mocking note: this is the same S3Client as is used by the owning
- * filesystem; changes to this client will be reflected by changes
+ * filesystem and S3AStore; changes to this client will be reflected by changes
* in the behavior of that filesystem.
* @param reason a justification for requesting access.
* @return S3Client
*/
S3Client getAmazonS3Client(String reason);
+ /**
+ * Get the store for low-level operations.
+ * @return the store the S3A FS is working through.
+ */
+ S3AStore getStore();
+
/**
* Get the region of a bucket.
* Invoked from StoreContext; consider an entry point.
@@ -131,4 +140,5 @@ public interface S3AInternals {
@AuditEntryPoint
@Retries.RetryTranslated
long abortMultipartUploads(Path path) throws IOException;
+
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java
new file mode 100644
index 0000000000000..da4f52a8f11a0
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Map;
+import java.util.Optional;
+
+import software.amazon.awssdk.core.exception.SdkException;
+import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
+import software.amazon.awssdk.services.s3.model.DeleteObjectResponse;
+import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
+import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.s3a.api.RequestFactory;
+import org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteException;
+import org.apache.hadoop.fs.s3a.impl.StoreContext;
+import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
+import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+
+/**
+ * Interface for the S3A Store;
+ * S3 client interactions should be via this; mocking
+ * is possible for unit tests.
+ */
+@InterfaceAudience.LimitedPrivate("Extensions")
+@InterfaceStability.Unstable
+public interface S3AStore extends IOStatisticsSource {
+
+ /**
+ * Acquire write capacity for operations.
+ * This should be done within retry loops.
+ * @param capacity capacity to acquire.
+ * @return time spent waiting for output.
+ */
+ Duration acquireWriteCapacity(int capacity);
+
+ /**
+ * Acquire read capacity for operations.
+ * This should be done within retry loops.
+ * @param capacity capacity to acquire.
+ * @return time spent waiting for output.
+ */
+ Duration acquireReadCapacity(int capacity);
+
+ StoreContext getStoreContext();
+
+ DurationTrackerFactory getDurationTrackerFactory();
+
+ S3AStatisticsContext getStatisticsContext();
+
+ RequestFactory getRequestFactory();
+
+ /**
+ * Perform a bulk object delete operation against S3.
+ * Increments the {@code OBJECT_DELETE_REQUESTS} and write
+ * operation statistics
+ *
+ * {@code OBJECT_DELETE_OBJECTS} is updated with the actual number
+ * of objects deleted in the request.
+ *
+ * Retry policy: retry untranslated; delete considered idempotent.
+ * If the request is throttled, this is logged in the throttle statistics,
+ * with the counter set to the number of keys, rather than the number
+ * of invocations of the delete operation.
+ * This is because S3 considers each key as one mutating operation on
+ * the store when updating its load counters on a specific partition
+ * of an S3 bucket.
+ * If only the request was measured, this operation would under-report.
+ * @param deleteRequest keys to delete on the s3-backend
+ * @return the AWS response
+ * @throws MultiObjectDeleteException one or more of the keys could not
+ * be deleted.
+ * @throws SdkException amazon-layer failure.
+ */
+ @Retries.RetryRaw
+ Map.Entry deleteObjects(DeleteObjectsRequest deleteRequest)
+ throws MultiObjectDeleteException, SdkException, IOException;
+
+ /**
+ * Delete an object.
+ * Increments the {@code OBJECT_DELETE_REQUESTS} statistics.
+ *
+ * Retry policy: retry untranslated; delete considered idempotent.
+ * 404 errors other than bucket not found are swallowed;
+ * this can be raised by third party stores (GCS).
+ *
+ * If an exception is caught and swallowed, the response will be empty;
+ * otherwise it will be the response from the delete operation.
+ * @param request request to make
+ * @return the total duration and response.
+ * @throws SdkException problems working with S3
+ * @throws IllegalArgumentException if the request was rejected due to
+ * a mistaken attempt to delete the root directory.
+ */
+ @Retries.RetryRaw
+ Map.Entry> deleteObject(
+ DeleteObjectRequest request) throws SdkException;
+
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
index ce3af3de803a4..7102623996e95 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
@@ -103,6 +103,10 @@ public enum Statistic {
StoreStatisticNames.OP_ACCESS,
"Calls of access()",
TYPE_DURATION),
+ INVOCATION_BULK_DELETE(
+ StoreStatisticNames.OP_BULK_DELETE,
+ "Calls of bulk delete()",
+ TYPE_COUNTER),
INVOCATION_COPY_FROM_LOCAL_FILE(
StoreStatisticNames.OP_COPY_FROM_LOCAL_FILE,
"Calls of copyFromLocalFile()",
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/BulkDeleteOperation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/BulkDeleteOperation.java
new file mode 100644
index 0000000000000..7cae4c81f198a
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/BulkDeleteOperation.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
+
+import org.apache.hadoop.fs.BulkDelete;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.Retries;
+import org.apache.hadoop.fs.store.audit.AuditSpan;
+import org.apache.hadoop.util.functional.Tuples;
+
+import static java.util.Collections.emptyList;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toList;
+import static org.apache.hadoop.util.Preconditions.checkArgument;
+
+/**
+ * S3A Implementation of the {@link BulkDelete} interface.
+ */
+public class BulkDeleteOperation extends AbstractStoreOperation implements BulkDelete {
+
+ private final BulkDeleteOperationCallbacks callbacks;
+
+ private final Path basePath;
+
+ private final int pageSize;
+
+ public BulkDeleteOperation(
+ final StoreContext storeContext,
+ final BulkDeleteOperationCallbacks callbacks,
+ final Path basePath,
+ final int pageSize,
+ final AuditSpan span) {
+ super(storeContext, span);
+ this.callbacks = requireNonNull(callbacks);
+ this.basePath = requireNonNull(basePath);
+ checkArgument(pageSize > 0, "Page size must be greater than 0");
+ this.pageSize = pageSize;
+ }
+
+ @Override
+ public int pageSize() {
+ return pageSize;
+ }
+
+ @Override
+ public Path basePath() {
+ return basePath;
+ }
+
+ @Override
+ public List> bulkDelete(final List paths)
+ throws IOException, IllegalArgumentException {
+ requireNonNull(paths);
+ checkArgument(paths.size() <= pageSize,
+ "Number of paths (%d) is larger than the page size (%d)", paths.size(), pageSize);
+
+ final StoreContext context = getStoreContext();
+ final List objects = paths.stream().map(p -> {
+ checkArgument(p.isAbsolute(), "Path %s is not absolute", p);
+ final String k = context.pathToKey(p);
+ return ObjectIdentifier.builder().key(k).build();
+ }).collect(toList());
+
+ final List> errors = callbacks.bulkDelete(objects);
+ if (!errors.isEmpty()) {
+
+ final List> outcomeElements = errors
+ .stream()
+ .map(error -> Tuples.pair(
+ context.keyToPath(error.getKey()),
+ error.getValue()
+ ))
+ .collect(toList());
+ return outcomeElements;
+ }
+ return emptyList();
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ /**
+ * Callbacks for the bulk delete operation.
+ */
+ public interface BulkDeleteOperationCallbacks {
+
+ /**
+ * Perform a bulk delete operation.
+ * @param keys key list
+ * @return paths which failed to delete (if any).
+ * @throws IOException IO Exception.
+ * @throws IllegalArgumentException illegal arguments
+ */
+ @Retries.RetryTranslated
+ List> bulkDelete(final List keys)
+ throws IOException, IllegalArgumentException;
+ }
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/BulkDeleteOperationCallbacksImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/BulkDeleteOperationCallbacksImpl.java
new file mode 100644
index 0000000000000..8cc02dca19848
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/BulkDeleteOperationCallbacksImpl.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.AccessDeniedException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
+import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
+import software.amazon.awssdk.services.s3.model.S3Error;
+
+import org.apache.hadoop.fs.s3a.Retries;
+import org.apache.hadoop.fs.s3a.S3AStore;
+import org.apache.hadoop.fs.store.audit.AuditSpan;
+import org.apache.hadoop.util.functional.Tuples;
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
+import static org.apache.hadoop.fs.s3a.Invoker.once;
+import static org.apache.hadoop.util.Preconditions.checkArgument;
+import static org.apache.hadoop.util.functional.Tuples.pair;
+
+/**
+ * Callbacks for the bulk delete operation.
+ */
+public class BulkDeleteOperationCallbacksImpl implements
+ BulkDeleteOperation.BulkDeleteOperationCallbacks {
+
+ /**
+ * Path for logging.
+ */
+ private final String path;
+
+ /** Page size for bulk delete. */
+ private final int pageSize;
+
+ /** span for operations. */
+ private final AuditSpan span;
+
+ /**
+ * Store.
+ */
+ private final S3AStore store;
+
+
+ public BulkDeleteOperationCallbacksImpl(final S3AStore store,
+ String path, int pageSize, AuditSpan span) {
+ this.span = span;
+ this.pageSize = pageSize;
+ this.path = path;
+ this.store = store;
+ }
+
+ @Override
+ @Retries.RetryTranslated
+ public List> bulkDelete(final List keysToDelete)
+ throws IOException, IllegalArgumentException {
+ span.activate();
+ final int size = keysToDelete.size();
+ checkArgument(size <= pageSize,
+ "Too many paths to delete in one operation: %s", size);
+ if (size == 0) {
+ return emptyList();
+ }
+
+ if (size == 1) {
+ return deleteSingleObject(keysToDelete.get(0).key());
+ }
+
+ final DeleteObjectsResponse response = once("bulkDelete", path, () ->
+ store.deleteObjects(store.getRequestFactory()
+ .newBulkDeleteRequestBuilder(keysToDelete)
+ .build())).getValue();
+ final List errors = response.errors();
+ if (errors.isEmpty()) {
+ // all good.
+ return emptyList();
+ } else {
+ return errors.stream()
+ .map(e -> pair(e.key(), e.message()))
+ .collect(Collectors.toList());
+ }
+ }
+
+ /**
+ * Delete a single object.
+ * @param key key to delete
+ * @return list of keys which failed to delete: length 0 or 1.
+ * @throws IOException IO problem other than AccessDeniedException
+ */
+ @Retries.RetryTranslated
+ private List> deleteSingleObject(final String key) throws IOException {
+ try {
+ once("bulkDelete", path, () ->
+ store.deleteObject(store.getRequestFactory()
+ .newDeleteObjectRequestBuilder(key)
+ .build()));
+ } catch (AccessDeniedException e) {
+ return singletonList(pair(key, e.toString()));
+ }
+ return emptyList();
+
+ }
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/MultiObjectDeleteException.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/MultiObjectDeleteException.java
index 72ead1fb151fc..14ad559ead293 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/MultiObjectDeleteException.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/MultiObjectDeleteException.java
@@ -118,11 +118,7 @@ public IOException translateException(final String message) {
String exitCode = "";
for (S3Error error : errors()) {
String code = error.code();
- String item = String.format("%s: %s%s: %s%n", code, error.key(),
- (error.versionId() != null
- ? (" (" + error.versionId() + ")")
- : ""),
- error.message());
+ String item = errorToString(error);
LOG.info(item);
result.append(item);
if (exitCode == null || exitCode.isEmpty() || ACCESS_DENIED.equals(code)) {
@@ -136,4 +132,18 @@ public IOException translateException(final String message) {
return new AWSS3IOException(result.toString(), this);
}
}
+
+ /**
+ * Convert an error to a string.
+ * @param error error from a delete request
+ * @return string value
+ */
+ public static String errorToString(final S3Error error) {
+ String code = error.code();
+ return String.format("%s: %s%s: %s%n", code, error.key(),
+ (error.versionId() != null
+ ? (" (" + error.versionId() + ")")
+ : ""),
+ error.message());
+ }
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreBuilder.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreBuilder.java
new file mode 100644
index 0000000000000..40f649a7378b6
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreBuilder.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import software.amazon.awssdk.services.s3.S3Client;
+
+import org.apache.hadoop.fs.s3a.S3AInstrumentation;
+import org.apache.hadoop.fs.s3a.S3AStorageStatistics;
+import org.apache.hadoop.fs.s3a.S3AStore;
+import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A;
+import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
+import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
+import org.apache.hadoop.fs.store.audit.AuditSpanSource;
+import org.apache.hadoop.util.RateLimiting;
+
+/**
+ * Builder for the S3AStore.
+ */
+public class S3AStoreBuilder {
+
+ private StoreContextFactory storeContextFactory;
+
+ private S3Client s3Client;
+
+ private DurationTrackerFactory durationTrackerFactory;
+
+ private S3AInstrumentation instrumentation;
+
+ private S3AStatisticsContext statisticsContext;
+
+ private S3AStorageStatistics storageStatistics;
+
+ private RateLimiting readRateLimiter;
+
+ private RateLimiting writeRateLimiter;
+
+ private AuditSpanSource auditSpanSource;
+
+ public S3AStoreBuilder withStoreContextFactory(final StoreContextFactory storeContextFactory) {
+ this.storeContextFactory = storeContextFactory;
+ return this;
+ }
+
+ public S3AStoreBuilder withS3Client(final S3Client s3Client) {
+ this.s3Client = s3Client;
+ return this;
+ }
+
+ public S3AStoreBuilder withDurationTrackerFactory(final DurationTrackerFactory durationTrackerFactory) {
+ this.durationTrackerFactory = durationTrackerFactory;
+ return this;
+ }
+
+ public S3AStoreBuilder withInstrumentation(final S3AInstrumentation instrumentation) {
+ this.instrumentation = instrumentation;
+ return this;
+ }
+
+ public S3AStoreBuilder withStatisticsContext(final S3AStatisticsContext statisticsContext) {
+ this.statisticsContext = statisticsContext;
+ return this;
+ }
+
+ public S3AStoreBuilder withStorageStatistics(final S3AStorageStatistics storageStatistics) {
+ this.storageStatistics = storageStatistics;
+ return this;
+ }
+
+ public S3AStoreBuilder withReadRateLimiter(final RateLimiting readRateLimiter) {
+ this.readRateLimiter = readRateLimiter;
+ return this;
+ }
+
+ public S3AStoreBuilder withWriteRateLimiter(final RateLimiting writeRateLimiter) {
+ this.writeRateLimiter = writeRateLimiter;
+ return this;
+ }
+
+ public S3AStoreBuilder withAuditSpanSource(final AuditSpanSource auditSpanSource) {
+ this.auditSpanSource = auditSpanSource;
+ return this;
+ }
+
+ public S3AStore build() {
+ return new S3AStoreImpl(storeContextFactory, s3Client, durationTrackerFactory, instrumentation,
+ statisticsContext, storageStatistics, readRateLimiter, writeRateLimiter, auditSpanSource);
+ }
+}
\ No newline at end of file
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java
new file mode 100644
index 0000000000000..6d60b71a95cb2
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.awscore.exception.AwsServiceException;
+import software.amazon.awssdk.core.exception.SdkException;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
+import software.amazon.awssdk.services.s3.model.DeleteObjectResponse;
+import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
+import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
+import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
+import software.amazon.awssdk.services.s3.model.S3Error;
+
+import org.apache.hadoop.fs.s3a.Invoker;
+import org.apache.hadoop.fs.s3a.Retries;
+import org.apache.hadoop.fs.s3a.S3AInstrumentation;
+import org.apache.hadoop.fs.s3a.S3AStorageStatistics;
+import org.apache.hadoop.fs.s3a.S3AStore;
+import org.apache.hadoop.fs.s3a.Statistic;
+import org.apache.hadoop.fs.s3a.api.RequestFactory;
+import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A;
+import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
+import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.store.audit.AuditSpanSource;
+import org.apache.hadoop.util.DurationInfo;
+import org.apache.hadoop.util.RateLimiting;
+import org.apache.hadoop.util.functional.Tuples;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.fs.s3a.S3AUtils.isThrottleException;
+import static org.apache.hadoop.fs.s3a.Statistic.IGNORED_ERRORS;
+import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_BULK_DELETE_REQUEST;
+import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_DELETE_OBJECTS;
+import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_DELETE_REQUEST;
+import static org.apache.hadoop.fs.s3a.Statistic.STORE_IO_RETRY;
+import static org.apache.hadoop.fs.s3a.Statistic.STORE_IO_THROTTLED;
+import static org.apache.hadoop.fs.s3a.Statistic.STORE_IO_THROTTLE_RATE;
+import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isObjectNotFound;
+import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DELETE_CONSIDERED_IDEMPOTENT;
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfOperation;
+import static org.apache.hadoop.util.Preconditions.checkArgument;
+
+/**
+ * Store Layer.
+ * This is where lower level storage operations are intended
+ * to move.
+ */
+public class S3AStoreImpl implements S3AStore {
+
+ private static final Logger LOG = LoggerFactory.getLogger(S3AStoreImpl.class);
+
+ private final StoreContextFactory storeContextFactory;
+
+ private final S3Client s3Client;
+
+ private final String bucket;
+
+ private final RequestFactory requestFactory;
+
+ /** Async client is used for transfer manager. */
+ private S3AsyncClient s3AsyncClient;
+
+ private final DurationTrackerFactory durationTrackerFactory;
+
+ /** The core instrumentation. */
+ private final S3AInstrumentation instrumentation;
+
+ /** Accessors to statistics for this FS. */
+ private final S3AStatisticsContext statisticsContext;
+
+ /** Storage Statistics Bonded to the instrumentation. */
+ private final S3AStorageStatistics storageStatistics;
+
+ private final RateLimiting readRateLimiter;
+
+ private final RateLimiting writeRateLimiter;
+
+ private final StoreContext storeContext;
+
+ private final Invoker invoker;
+
+ private final AuditSpanSource auditSpanSource;
+
+ S3AStoreImpl(StoreContextFactory storeContextFactory,
+ S3Client s3Client,
+ DurationTrackerFactory durationTrackerFactory,
+ S3AInstrumentation instrumentation,
+ S3AStatisticsContext statisticsContext,
+ S3AStorageStatistics storageStatistics,
+ RateLimiting readRateLimiter,
+ RateLimiting writeRateLimiter,
+ AuditSpanSource auditSpanSource) {
+ this.storeContextFactory = requireNonNull(storeContextFactory);
+ this.s3Client = requireNonNull(s3Client);
+ this.durationTrackerFactory = requireNonNull(durationTrackerFactory);
+ this.instrumentation = requireNonNull(instrumentation);
+ this.statisticsContext = requireNonNull(statisticsContext);
+ this.storageStatistics = requireNonNull(storageStatistics);
+ this.readRateLimiter = requireNonNull(readRateLimiter);
+ this.writeRateLimiter = requireNonNull(writeRateLimiter);
+ this.auditSpanSource = requireNonNull(auditSpanSource);
+ this.storeContext = requireNonNull(storeContextFactory.createStoreContext());
+ this.invoker = storeContext.getInvoker();
+ this.bucket = storeContext.getBucket();
+ this.requestFactory = storeContext.getRequestFactory();
+ }
+
+ @Override
+ public Duration acquireWriteCapacity(final int capacity) {
+ return writeRateLimiter.acquire(capacity);
+ }
+
+ @Override
+ public Duration acquireReadCapacity(final int capacity) {
+ return readRateLimiter.acquire(capacity);
+
+ }
+
+ /**
+ * Create the store context.
+ * @return a new store context.
+ */
+ private StoreContext createStoreContext() {
+ return storeContextFactory.createStoreContext();
+ }
+
+ @Override
+ public StoreContext getStoreContext() {
+ return storeContext;
+ }
+
+ private S3Client getS3Client() {
+ return s3Client;
+ }
+
+ @Override
+ public DurationTrackerFactory getDurationTrackerFactory() {
+ return durationTrackerFactory;
+ }
+
+ private S3AInstrumentation getInstrumentation() {
+ return instrumentation;
+ }
+
+ @Override
+ public S3AStatisticsContext getStatisticsContext() {
+ return statisticsContext;
+ }
+
+ private S3AStorageStatistics getStorageStatistics() {
+ return storageStatistics;
+ }
+
+ @Override
+ public RequestFactory getRequestFactory() {
+ return requestFactory;
+ }
+
+ /**
+ * Increment a statistic by 1.
+ * This increments both the instrumentation and storage statistics.
+ * @param statistic The operation to increment
+ */
+ protected void incrementStatistic(Statistic statistic) {
+ incrementStatistic(statistic, 1);
+ }
+
+ /**
+ * Increment a statistic by a specific value.
+ * This increments both the instrumentation and storage statistics.
+ * @param statistic The operation to increment
+ * @param count the count to increment
+ */
+ protected void incrementStatistic(Statistic statistic, long count) {
+ statisticsContext.incrementCounter(statistic, count);
+ }
+
+ /**
+ * Decrement a gauge by a specific value.
+ * @param statistic The operation to decrement
+ * @param count the count to decrement
+ */
+ protected void decrementGauge(Statistic statistic, long count) {
+ statisticsContext.decrementGauge(statistic, count);
+ }
+
+ /**
+ * Increment a gauge by a specific value.
+ * @param statistic The operation to increment
+ * @param count the count to increment
+ */
+ protected void incrementGauge(Statistic statistic, long count) {
+ statisticsContext.incrementGauge(statistic, count);
+ }
+
+ /**
+ * Callback when an operation was retried.
+ * Increments the statistics of ignored errors or throttled requests,
+ * depending up on the exception class.
+ * @param ex exception.
+ */
+ public void operationRetried(Exception ex) {
+ if (isThrottleException(ex)) {
+ LOG.debug("Request throttled");
+ incrementStatistic(STORE_IO_THROTTLED);
+ statisticsContext.addValueToQuantiles(STORE_IO_THROTTLE_RATE, 1);
+ } else {
+ incrementStatistic(STORE_IO_RETRY);
+ incrementStatistic(IGNORED_ERRORS);
+ }
+ }
+
+ /**
+ * Callback from {@link Invoker} when an operation is retried.
+ * @param text text of the operation
+ * @param ex exception
+ * @param retries number of retries
+ * @param idempotent is the method idempotent
+ */
+ public void operationRetried(String text, Exception ex, int retries, boolean idempotent) {
+ operationRetried(ex);
+ }
+
+ /**
+ * Get the instrumentation's IOStatistics.
+ * @return statistics
+ */
+ @Override
+ public IOStatistics getIOStatistics() {
+ return instrumentation.getIOStatistics();
+ }
+
+ /**
+ * Start an operation; this informs the audit service of the event
+ * and then sets it as the active span.
+ * @param operation operation name.
+ * @param path1 first path of operation
+ * @param path2 second path of operation
+ * @return a span for the audit
+ * @throws IOException failure
+ */
+ public AuditSpanS3A createSpan(String operation, @Nullable String path1, @Nullable String path2)
+ throws IOException {
+
+ return auditSpanSource.createSpan(operation, path1, path2);
+ }
+
+ /**
+ * Reject any request to delete an object where the key is root.
+ * @param key key to validate
+ * @throws IllegalArgumentException if the request was rejected due to
+ * a mistaken attempt to delete the root directory.
+ */
+ private void blockRootDelete(String key) throws IllegalArgumentException {
+ checkArgument(!key.isEmpty() && !"/".equals(key), "Bucket %s cannot be deleted", bucket);
+ }
+
+ /**
+ * Perform a bulk object delete operation against S3.
+ * Increments the {@code OBJECT_DELETE_REQUESTS} and write
+ * operation statistics
+ *
+ * {@code OBJECT_DELETE_OBJECTS} is updated with the actual number
+ * of objects deleted in the request.
+ *
+ * Retry policy: retry untranslated; delete considered idempotent.
+ * If the request is throttled, this is logged in the throttle statistics,
+ * with the counter set to the number of keys, rather than the number
+ * of invocations of the delete operation.
+ * This is because S3 considers each key as one mutating operation on
+ * the store when updating its load counters on a specific partition
+ * of an S3 bucket.
+ * If only the request was measured, this operation would under-report.
+ * @param deleteRequest keys to delete on the s3-backend
+ * @return the AWS response
+ * @throws IllegalArgumentException if the request was rejected due to
+ * a mistaken attempt to delete the root directory
+ * @throws SdkException amazon-layer failure.
+ */
+ @Override
+ @Retries.RetryRaw
+ public Map.Entry deleteObjects(final DeleteObjectsRequest deleteRequest)
+ throws SdkException {
+
+ DeleteObjectsResponse response;
+ BulkDeleteRetryHandler retryHandler = new BulkDeleteRetryHandler(createStoreContext());
+
+ final List keysToDelete = deleteRequest.delete().objects();
+ int keyCount = keysToDelete.size();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Initiating delete operation for {} objects", keysToDelete.size());
+ keysToDelete.stream().forEach(objectIdentifier -> {
+ LOG.debug(" \"{}\" {}", objectIdentifier.key(),
+ objectIdentifier.versionId() != null ? objectIdentifier.versionId() : "");
+ });
+ }
+ // block root calls
+ keysToDelete.stream().map(ObjectIdentifier::key).forEach(this::blockRootDelete);
+
+ try (DurationInfo d = new DurationInfo(LOG, false, "DELETE %d keys", keyCount)) {
+ response =
+ invoker.retryUntranslated("delete", DELETE_CONSIDERED_IDEMPOTENT, (text, e, r, i) -> {
+ // handle the failure
+ retryHandler.bulkDeleteRetried(deleteRequest, e);
+ },
+ // duration is tracked in the bulk delete counters
+ trackDurationOfOperation(getDurationTrackerFactory(),
+ OBJECT_BULK_DELETE_REQUEST.getSymbol(), () -> {
+ acquireWriteCapacity(keyCount);
+ incrementStatistic(OBJECT_DELETE_OBJECTS, keyCount);
+ return s3Client.deleteObjects(deleteRequest);
+ }));
+ if (!response.errors().isEmpty()) {
+ // one or more of the keys could not be deleted.
+ // log and then throw
+ List errors = response.errors();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Partial failure of delete, {} errors", errors.size());
+ for (S3Error error : errors) {
+ LOG.debug("{}: \"{}\" - {}", error.key(), error.code(), error.message());
+ }
+ }
+ }
+ d.close();
+ return Tuples.pair(d.asDuration(), response);
+
+ } catch (IOException e) {
+ // this is part of the retry signature, nothing else.
+ // convert to unchecked.
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ /**
+ * Delete an object.
+ * Increments the {@code OBJECT_DELETE_REQUESTS} statistics.
+ *
+ * Retry policy: retry untranslated; delete considered idempotent.
+ * 404 errors other than bucket not found are swallowed;
+ * this can be raised by third party stores (GCS).
+ * If an exception is caught and swallowed, the response will be empty;
+ * otherwise it will be the response from the delete operation.
+ * @param request request to make
+ * @return the total duration and response.
+ * @throws SdkException problems working with S3
+ * @throws IllegalArgumentException if the request was rejected due to
+ * a mistaken attempt to delete the root directory.
+ */
+ @Override
+ @Retries.RetryRaw
+ public Map.Entry> deleteObject(final DeleteObjectRequest request)
+ throws SdkException {
+
+ String key = request.key();
+ blockRootDelete(key);
+ DurationInfo d = new DurationInfo(LOG, false, "deleting %s", key);
+ try {
+ DeleteObjectResponse response =
+ invoker.retryUntranslated(String.format("Delete %s:/%s", bucket, key),
+ DELETE_CONSIDERED_IDEMPOTENT, trackDurationOfOperation(getDurationTrackerFactory(),
+ OBJECT_DELETE_REQUEST.getSymbol(), () -> {
+ incrementStatistic(OBJECT_DELETE_OBJECTS);
+ acquireWriteCapacity(1);
+ return s3Client.deleteObject(request);
+ }));
+ d.close();
+ return Tuples.pair(d.asDuration(), Optional.of(response));
+ } catch (AwsServiceException ase) {
+ // 404 errors get swallowed; this can be raised by
+ // third party stores (GCS).
+ if (!isObjectNotFound(ase)) {
+ throw ase;
+ }
+ d.close();
+ return Tuples.pair(d.asDuration(), Optional.empty());
+ } catch (IOException e) {
+ // this is part of the retry signature, nothing else.
+ // convert to unchecked.
+ throw new UncheckedIOException(e);
+ }
+ }
+
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContextFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContextFactory.java
new file mode 100644
index 0000000000000..355288619d30a
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContextFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+public interface StoreContextFactory {
+
+ /**
+ * Build an immutable store context, including picking
+ * up the current audit span.
+ * @return the store context.
+ */
+ StoreContext createStoreContext();
+}
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/aws_sdk_upgrade.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/aws_sdk_upgrade.md
index e2c095e5317a4..abd58bffc6201 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/aws_sdk_upgrade.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/aws_sdk_upgrade.md
@@ -324,6 +324,7 @@ They have also been updated to return V2 SDK classes.
public interface S3AInternals {
S3Client getAmazonS3V2Client(String reason);
+ S3AStore getStore();
@Retries.RetryTranslated
@AuditEntryPoint
String getBucketLocation() throws IOException;
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java
index 734bcfd9c5d30..f43710cf25eb0 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java
@@ -35,8 +35,7 @@
/**
- * Abstract base class for S3A unit tests using a mock S3 client and a null
- * metadata store.
+ * Abstract base class for S3A unit tests using a mock S3 client.
*/
public abstract class AbstractS3AMockTest {
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ADeleteOnExit.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ADeleteOnExit.java
index a4162f212179b..28a443f04cda9 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ADeleteOnExit.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ADeleteOnExit.java
@@ -61,9 +61,8 @@ public boolean deleteOnExit(Path f) throws IOException {
// processDeleteOnExit.
@Override
protected boolean deleteWithoutCloseCheck(Path f, boolean recursive) throws IOException {
- boolean result = super.deleteWithoutCloseCheck(f, recursive);
deleteOnDnExitCount--;
- return result;
+ return true;
}
}