diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index 4e2af3a37652f..9aa351aac76c5 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -107,11 +107,15 @@ private Constants() { public static final String MAX_PAGING_KEYS = "fs.s3a.paging.maximum"; public static final int DEFAULT_MAX_PAGING_KEYS = 5000; - // the maximum number of threads to allow in the pool used by TransferManager + // the maximum number of threads to allow in the pool used by TransferManager for uploads public static final String MAX_THREADS = "fs.s3a.threads.max"; public static final int DEFAULT_MAX_THREADS = 10; - // the time an idle thread waits before terminating + // the maximum number of threads to allow in the pool used by TransferManager for copies + public static final String COPY_MAX_THREADS = "fs.s3a.copy.threads.max"; + public static final int DEFAULT_COPY_MAX_THREADS = 25; + + // the time an idle thread waits before terminating for uploads and copies public static final String KEEPALIVE_TIME = "fs.s3a.threads.keepalivetime"; public static final int DEFAULT_KEEPALIVE_TIME = 60; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/CopyContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/CopyContext.java new file mode 100644 index 0000000000000..16741352f489c --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/CopyContext.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import com.amazonaws.event.ProgressListener; +import com.amazonaws.services.s3.transfer.Copy; + + +/** + * A wrapper around a {@link Copy} object, the source key for the copy, the destination for the copy, the length of the + * key copied, and the {@link ProgressListener.ExceptionReporter} for the copy. + */ +class CopyContext { + + private final Copy copy; + private final String srcKey; + private final String destKey; + private final long length; + private final ProgressListener.ExceptionReporter progressListener; + + CopyContext(Copy copy, String srcKey, String destKey, long length, + ProgressListener.ExceptionReporter progressListener) { + this.copy = copy; + this.srcKey = srcKey; + this.destKey = destKey; + this.length = length; + this.progressListener = progressListener; + } + + Copy getCopy() { + return copy; + } + + String getSrcKey() { + return srcKey; + } + + String getDestKey() { + return destKey; + } + + long getLength() { + return length; + } + + ProgressListener.ExceptionReporter getProgressListener() { + return progressListener; + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/LazyTransferManager.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/LazyTransferManager.java new file mode 100644 index 0000000000000..69275e1c69076 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/LazyTransferManager.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.transfer.TransferManager; +import com.amazonaws.services.s3.transfer.TransferManagerConfiguration; + +import org.apache.hadoop.conf.Configuration; + +import static org.apache.hadoop.fs.s3a.Constants.COPY_MAX_THREADS; +import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_COPY_MAX_THREADS; +import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_KEEPALIVE_TIME; +import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_MAX_THREADS; +import static org.apache.hadoop.fs.s3a.Constants.KEEPALIVE_TIME; +import static org.apache.hadoop.fs.s3a.Constants.MAX_THREADS; +import static org.apache.hadoop.fs.s3a.S3AUtils.getMaxThreads; +import static org.apache.hadoop.fs.s3a.S3AUtils.longOption; + + +/** + * A wrapper around a {@link TransferManager} that lazily initializes the {@link TransferManager}. + */ +class LazyTransferManager { + + private final AmazonS3 s3; + private final TransferManagerConfiguration transferConfiguration; + private final int maxThreads; + private final long keepAliveTime; + private final String threadPoolName; + + private TransferManager transferManager; + private ExecutorService executorService; + + private LazyTransferManager(AmazonS3 s3, TransferManagerConfiguration transferConfiguration, int maxThreads, + long keepAliveTime, String threadPoolName) { + this.s3 = s3; + this.transferConfiguration = transferConfiguration; + this.maxThreads = maxThreads; + this.keepAliveTime = keepAliveTime; + this.threadPoolName = threadPoolName; + } + + synchronized TransferManager get() { + if (transferManager == null) { + transferManager = createTransferManager(); + return transferManager; + } + return transferManager; + } + + boolean isInitialized() { + return transferManager == null; + } + + ExecutorService getExecutorService() { + return executorService; + } + + private TransferManager createTransferManager() { + executorService = new ThreadPoolExecutor( + maxThreads, Integer.MAX_VALUE, + keepAliveTime, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + BlockingThreadPoolExecutorService.newDaemonThreadFactory( + threadPoolName)); + + TransferManager transferManager = new TransferManager(s3, executorService); + transferManager.setConfiguration(transferConfiguration); + return transferManager; + } + + static LazyTransferManager createLazyUploadTransferManager(AmazonS3 s3, Configuration conf, long partSize, + long multiPartThreshold) { + int maxThreads = getMaxThreads(conf, MAX_THREADS, DEFAULT_MAX_THREADS); + long keepAliveTime = longOption(conf, KEEPALIVE_TIME, DEFAULT_KEEPALIVE_TIME, 0); + + TransferManagerConfiguration transferConfiguration = new TransferManagerConfiguration(); + transferConfiguration.setMinimumUploadPartSize(partSize); + transferConfiguration.setMultipartUploadThreshold(multiPartThreshold); + + return new LazyTransferManager(s3, transferConfiguration, maxThreads, keepAliveTime, "s3a-upload-unbounded"); + } + + static LazyTransferManager createLazyCopyTransferManager(AmazonS3 s3, Configuration conf, long partSize, + long multiPartThreshold) { + int maxThreads = getMaxThreads(conf, COPY_MAX_THREADS, DEFAULT_COPY_MAX_THREADS); + long keepAliveTime = longOption(conf, KEEPALIVE_TIME, DEFAULT_KEEPALIVE_TIME, 0); + + TransferManagerConfiguration transferConfiguration = new TransferManagerConfiguration(); + transferConfiguration.setMultipartCopyPartSize(partSize); + transferConfiguration.setMultipartCopyThreshold(multiPartThreshold); + + return new LazyTransferManager(s3, transferConfiguration, maxThreads, keepAliveTime, "s3a-copy-unbounded"); + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/ParallelDirectoryRenamer.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/ParallelDirectoryRenamer.java new file mode 100644 index 0000000000000..125ece33dde75 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/ParallelDirectoryRenamer.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.FutureTask; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +import com.amazonaws.event.ProgressEvent; +import com.amazonaws.event.ProgressListener; +import com.amazonaws.services.s3.model.DeleteObjectsRequest; + +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; + + +/** + * Renames a S3 directory by issuing parallel COPY requests. + */ +class ParallelDirectoryRenamer { + + private final S3AFileSystem fs; + + private static final DeleteObjectsRequest.KeyVersion END_OF_KEYS_TO_DELETE = new DeleteObjectsRequest.KeyVersion(null, + null); + + ParallelDirectoryRenamer(S3AFileSystem fs) { + this.fs = fs; + } + + List rename(String srcKey, String dstKey, S3AFileStatus dstStatus) throws IOException { + + List dirKeysToDelete = new ArrayList<>(); + + List copies = new ArrayList<>(); + + // A blocking queue that tracks all objects that need to be deleted + BlockingQueue deleteQueue = new LinkedBlockingQueue<>(); + + // Used to track if the delete thread was gracefully shutdown + boolean deleteFutureComplete = false; + FutureTask deleteFuture = null; + + try { + // Launch a thread that will read from the deleteQueue and batch delete any files that have already been copied + deleteFuture = buildAndStartDeletionThread(deleteQueue); + + if (dstStatus != null && dstStatus.isEmptyDirectory() == Tristate.TRUE) { + // delete unnecessary fake directory. + deleteQueue.add(new DeleteObjectsRequest.KeyVersion(dstKey)); + } + + // Used to abort future copy tasks as soon as one copy task fails + AtomicBoolean copyFailure = new AtomicBoolean(false); + + Path parentPath = fs.keyToPath(srcKey); + RemoteIterator iterator = fs.listFilesAndEmptyDirectories(parentPath, true); + while (iterator.hasNext()) { + + LocatedFileStatus status = iterator.next(); + String key = fs.pathToKey(status.getPath()); + if (status.isDirectory() && !key.endsWith("/")) { + key += "/"; + } + String newDstKey = dstKey + key.substring(srcKey.length()); + + DeleteObjectsRequest.KeyVersion deleteKeyVersion = null; + if (status.isDirectory()) { + dirKeysToDelete.add(new DeleteObjectsRequest.KeyVersion(key)); + } else { + deleteKeyVersion = new DeleteObjectsRequest.KeyVersion(key); + } + + // If no previous file hit a copy failure, copy this file + if (!copyFailure.get()) { + + ProgressListener.ExceptionReporter progressListener = new ProgressListener.ExceptionReporter( + new RenameProgressListener(deleteKeyVersion, deleteQueue, copyFailure)); + + copies.add(new CopyContext(fs.copyFileAsync(key, newDstKey, progressListener), key, newDstKey, + status.getLen(), progressListener)); + } else { + // We got a copy failure, so don't bother going through the rest of the files + break; + } + } + + for (CopyContext copyContext : copies) { + try { + copyContext.getCopy().waitForCopyResult(); + copyContext.getProgressListener().throwExceptionIfAny(); + fs.incrementWriteOperations(); + fs.getInstrumentation().filesCopied(1, copyContext.getLength()); + } catch (InterruptedException e) { + throw new RenameFailedException(copyContext.getSrcKey(), copyContext.getDestKey(), e); + } + } + + if (copyFailure.get()) { + throw new RenameFailedException(srcKey, dstKey, + new IllegalStateException("Progress listener indicated a copy failure, but no exception was thrown")); + } + + deleteQueue.addAll(dirKeysToDelete); + deleteQueue.add(END_OF_KEYS_TO_DELETE); + + try { + deleteFuture.get(); + } catch (ExecutionException | InterruptedException e) { + throw new RenameFailedException(srcKey, dstKey, e); + } + deleteFutureComplete = true; + } finally { + if (!deleteFutureComplete) { + if (deleteFuture != null && !deleteFuture.isDone() && !deleteFuture.isCancelled()) { + deleteFuture.cancel(true); + } + } + } + return copies; + } + + private FutureTask buildAndStartDeletionThread(BlockingQueue deleteQueue) { + List keysToDelete = new ArrayList<>(); + FutureTask deleteFuture = new FutureTask<>(() -> { + while (true) { + while (keysToDelete.size() < fs.getMaxEntriesToDelete()) { + DeleteObjectsRequest.KeyVersion key = deleteQueue.take(); + + // The thread runs until it is given an a message that no more keys need to be deleted (END_OF_KEYS_TO_DELETE) + if (key == END_OF_KEYS_TO_DELETE) { + // Delete any remaining keys and exit + fs.removeKeys(keysToDelete, true, false); + return null; + } else { + keysToDelete.add(key); + } + } + fs.removeKeys(keysToDelete, true, false); + } + }); + + Thread deleteThread = new Thread(deleteFuture); + deleteThread.setName("s3a-rename-delete-thread"); + deleteThread.start(); + return deleteFuture; + } + + /** + * A {@link ProgressListener} for renames. When the transfer completes, the listener will delete the source key and + * update any relevant statistics. + */ + private static class RenameProgressListener implements ProgressListener { + + private final DeleteObjectsRequest.KeyVersion keyVersion; + private final Queue deleteQueue; + private final AtomicBoolean copyFailure; + + RenameProgressListener(DeleteObjectsRequest.KeyVersion keyVersion, + Queue deleteQueue, + AtomicBoolean copyFailure) { + this.keyVersion = keyVersion; + this.deleteQueue = deleteQueue; + this.copyFailure = copyFailure; + } + + @Override + public void progressChanged(ProgressEvent progressEvent) { + switch (progressEvent.getEventType()) { + case CLIENT_REQUEST_SUCCESS_EVENT: + if (keyVersion != null) { + deleteQueue.add(keyVersion); + } + break; + case CLIENT_REQUEST_FAILED_EVENT: + copyFailure.set(true); + break; + default: + break; + } + } + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index e76ef0b5be1fc..18f64bf2d54f9 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -34,9 +34,6 @@ import java.util.Map; import java.util.Set; import java.util.Objects; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -65,8 +62,6 @@ import com.amazonaws.services.s3.model.UploadPartRequest; import com.amazonaws.services.s3.model.UploadPartResult; import com.amazonaws.services.s3.transfer.Copy; -import com.amazonaws.services.s3.transfer.TransferManager; -import com.amazonaws.services.s3.transfer.TransferManagerConfiguration; import com.amazonaws.services.s3.transfer.Upload; import com.amazonaws.event.ProgressListener; import com.amazonaws.event.ProgressEvent; @@ -143,9 +138,9 @@ public class S3AFileSystem extends FileSystem { private Listing listing; private long partSize; private boolean enableMultiObjectsDelete; - private TransferManager transfers; + private LazyTransferManager transfers; + private LazyTransferManager copies; private ListeningExecutorService boundedThreadPool; - private ExecutorService unboundedThreadPool; private long multiPartThreshold; public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class); private static final Logger PROGRESS = @@ -241,11 +236,7 @@ public StorageStatistics provide() { } }); - int maxThreads = conf.getInt(MAX_THREADS, DEFAULT_MAX_THREADS); - if (maxThreads < 2) { - LOG.warn(MAX_THREADS + " must be at least 2: forcing to 2."); - maxThreads = 2; - } + int maxThreads = getMaxThreads(conf, MAX_THREADS, DEFAULT_MAX_THREADS); int totalTasks = intOption(conf, MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS, 1); long keepAliveTime = longOption(conf, KEEPALIVE_TIME, @@ -255,12 +246,6 @@ public StorageStatistics provide() { maxThreads + totalTasks, keepAliveTime, TimeUnit.SECONDS, "s3a-transfer-shared"); - unboundedThreadPool = new ThreadPoolExecutor( - maxThreads, Integer.MAX_VALUE, - keepAliveTime, TimeUnit.SECONDS, - new LinkedBlockingQueue(), - BlockingThreadPoolExecutorService.newDaemonThreadFactory( - "s3a-transfer-unbounded")); int listVersion = conf.getInt(LIST_VERSION, DEFAULT_LIST_VERSION); if (listVersion < 1 || listVersion > 2) { @@ -269,7 +254,8 @@ public StorageStatistics provide() { } useListV1 = (listVersion == 1); - initTransferManager(); + this.transfers = LazyTransferManager.createLazyUploadTransferManager(s3, conf, partSize, multiPartThreshold); + this.copies = LazyTransferManager.createLazyCopyTransferManager(s3, conf, partSize, multiPartThreshold); initCannedAcls(conf); @@ -339,18 +325,6 @@ public S3AInstrumentation getInstrumentation() { return instrumentation; } - private void initTransferManager() { - TransferManagerConfiguration transferConfiguration = - new TransferManagerConfiguration(); - transferConfiguration.setMinimumUploadPartSize(partSize); - transferConfiguration.setMultipartUploadThreshold(multiPartThreshold); - transferConfiguration.setMultipartCopyPartSize(partSize); - transferConfiguration.setMultipartCopyThreshold(multiPartThreshold); - - transfers = new TransferManager(s3, unboundedThreadPool); - transfers.setConfiguration(transferConfiguration); - } - private void initCannedAcls(Configuration conf) { String cannedACLName = conf.get(CANNED_ACL, DEFAULT_CANNED_ACL); if (!cannedACLName.isEmpty()) { @@ -371,7 +345,7 @@ private void initMultipartUploads(Configuration conf) throws IOException { new Date(new Date().getTime() - purgeExistingMultipartAge * 1000); try { - transfers.abortMultipartUploads(bucket, purgeBefore); + transfers.get().abortMultipartUploads(bucket, purgeBefore); } catch (AmazonServiceException e) { if (e.getStatusCode() == 403) { instrumentation.errorIgnored(); @@ -485,6 +459,10 @@ public String getBucket() { return bucket; } + int getMaxEntriesToDelete() { + return MAX_ENTRIES_TO_DELETE; + } + /** * Change the input policy for this FS. * @param inputPolicy new policy @@ -536,7 +514,7 @@ private String maybeAddTrailingSlash(String key) { * @param key input key * @return the path from this key */ - private Path keyToPath(String key) { + Path keyToPath(String key) { return new Path("/" + key); } @@ -890,63 +868,37 @@ private boolean innerRename(Path source, Path dest) "cannot rename a directory to a subdirectory of itself "); } - List keysToDelete = new ArrayList<>(); - if (dstStatus != null && dstStatus.isEmptyDirectory() == Tristate.TRUE) { - // delete unnecessary fake directory. - keysToDelete.add(new DeleteObjectsRequest.KeyVersion(dstKey)); - } - - Path parentPath = keyToPath(srcKey); - RemoteIterator iterator = listFilesAndEmptyDirectories( - parentPath, true); - while (iterator.hasNext()) { - LocatedFileStatus status = iterator.next(); - long length = status.getLen(); - String key = pathToKey(status.getPath()); - if (status.isDirectory() && !key.endsWith("/")) { - key += "/"; - } - keysToDelete - .add(new DeleteObjectsRequest.KeyVersion(key)); - String newDstKey = - dstKey + key.substring(srcKey.length()); - copyFile(key, newDstKey, length); + List copyContexts = new ParallelDirectoryRenamer(this).rename(srcKey, dstKey, dstStatus); - if (hasMetadataStore()) { + if (hasMetadataStore()) { + for (CopyContext copyContext : copyContexts) { // with a metadata store, the object entries need to be updated, // including, potentially, the ancestors - Path childSrc = keyToQualifiedPath(key); - Path childDst = keyToQualifiedPath(newDstKey); - if (objectRepresentsDirectory(key, length)) { + Path childSrc = keyToQualifiedPath(copyContext.getSrcKey()); + Path childDst = keyToQualifiedPath(copyContext.getDestKey()); + if (objectRepresentsDirectory(copyContext.getSrcKey(), copyContext.getLength())) { S3Guard.addMoveDir(metadataStore, srcPaths, dstMetas, childSrc, childDst, username); } else { S3Guard.addMoveFile(metadataStore, srcPaths, dstMetas, childSrc, - childDst, length, getDefaultBlockSize(childDst), username); + childDst, copyContext.getLength(), getDefaultBlockSize(childDst), username); } // Ancestor directories may not be listed, so we explicitly add them S3Guard.addMoveAncestors(metadataStore, srcPaths, dstMetas, keyToQualifiedPath(srcKey), childSrc, childDst, username); } - - if (keysToDelete.size() == MAX_ENTRIES_TO_DELETE) { - removeKeys(keysToDelete, true, false); - } - } - if (!keysToDelete.isEmpty()) { - removeKeys(keysToDelete, false, false); } + } - // We moved all the children, now move the top-level dir - // Empty directory should have been added as the object summary - if (hasMetadataStore() - && srcPaths != null - && !srcPaths.contains(src)) { - LOG.debug("To move the non-empty top-level dir src={} and dst={}", - src, dst); - S3Guard.addMoveDir(metadataStore, srcPaths, dstMetas, src, dst, - username); - } + // We moved all the children, now move the top-level dir + // Empty directory should have been added as the object summary + if (hasMetadataStore() + && srcPaths != null + && !srcPaths.contains(src)) { + LOG.debug("To move the non-empty top-level dir src={} and dst={}", + src, dst); + S3Guard.addMoveDir(metadataStore, srcPaths, dstMetas, src, dst, + username); } metadataStore.move(srcPaths, dstMetas); @@ -1256,7 +1208,7 @@ public UploadInfo putObject(PutObjectRequest putObjectRequest) { } incrementPutStartStatistics(len); try { - Upload upload = transfers.upload(putObjectRequest); + Upload upload = transfers.get().upload(putObjectRequest); incrementPutCompletedStatistics(true, len); return new UploadInfo(upload, len); } catch (AmazonClientException e) { @@ -1391,7 +1343,6 @@ public void incrementPutProgressStatistics(String key, long bytes) { * be deleted in a multiple object delete operation. * @throws AmazonClientException amazon-layer failure. */ - @VisibleForTesting void removeKeys(List keysToDelete, boolean clearKeys, boolean deleteFakeDir) throws MultiObjectDeleteException, AmazonClientException, @@ -2138,9 +2089,17 @@ public void close() throws IOException { super.close(); } finally { if (transfers != null) { - transfers.shutdownNow(true); + if (transfers.isInitialized()) { + transfers.get().shutdownNow(true); + } transfers = null; } + if (copies != null) { + if (copies.isInitialized()) { + copies.get().shutdownNow(true); + } + copies = null; + } if (metadataStore != null) { metadataStore.close(); metadataStore = null; @@ -2158,7 +2117,8 @@ public String getCanonicalServiceName() { } /** - * Copy a single object in the bucket via a COPY operation. + * Copy a single object in the bucket via a COPY operation. Waits until + * the copy has completed before returning. * @param srcKey source object path * @param dstKey destination object path * @param size object size @@ -2166,8 +2126,36 @@ public String getCanonicalServiceName() { * @throws InterruptedIOException the operation was interrupted * @throws IOException Other IO problems */ - private void copyFile(String srcKey, String dstKey, long size) + void copyFile(String srcKey, String dstKey, long size) + throws IOException, InterruptedIOException, AmazonClientException { + try { + Copy copy = copyFileAsync(srcKey, dstKey); + copy.waitForCopyResult(); + incrementWriteOperations(); + instrumentation.filesCopied(1, size); + } catch (InterruptedException e) { + throw new InterruptedIOException("Interrupted copying " + srcKey + + " to " + dstKey + ", cancelling"); + } + } + + Copy copyFileAsync(String srcKey, String dstKey) throws IOException, InterruptedIOException, AmazonClientException { + return copyFileAsync(srcKey, dstKey, null); + } + + /** + * Copy a single object in the bucket via a COPY operation, but does not + * wait until the copy completes. + * @param srcKey source object path + * @param dstKey destination object path + * @param generalProgressListener listener to track the progress of the copy object request + * @throws AmazonClientException on failures inside the AWS SDK + * @throws IOException Other IO problems + * @return a copy object to track the progress of the copy + */ + Copy copyFileAsync(String srcKey, String dstKey, ProgressListener generalProgressListener) + throws IOException, AmazonClientException { LOG.debug("copyFile {} -> {} ", srcKey, dstKey); try { @@ -2179,6 +2167,9 @@ private void copyFile(String srcKey, String dstKey, long size) setOptionalCopyObjectRequestParameters(copyObjectRequest); copyObjectRequest.setCannedAccessControlList(cannedACL); copyObjectRequest.setNewObjectMetadata(dstom); + if (generalProgressListener != null) { + copyObjectRequest.setGeneralProgressListener(generalProgressListener); + } ProgressListener progressListener = new ProgressListener() { public void progressChanged(ProgressEvent progressEvent) { @@ -2192,16 +2183,9 @@ public void progressChanged(ProgressEvent progressEvent) { } }; - Copy copy = transfers.copy(copyObjectRequest); + Copy copy = copies.get().copy(copyObjectRequest); copy.addProgressListener(progressListener); - try { - copy.waitForCopyResult(); - incrementWriteOperations(); - instrumentation.filesCopied(1, size); - } catch (InterruptedException e) { - throw new InterruptedIOException("Interrupted copying " + srcKey - + " to " + dstKey + ", cancelling"); - } + return copy; } catch (AmazonClientException e) { throw translateException("copyFile("+ srcKey+ ", " + dstKey + ")", srcKey, e); @@ -2485,7 +2469,8 @@ public String toString() { sb.append(", authoritative=").append(allowAuthoritative); sb.append(", useListV1=").append(useListV1); sb.append(", boundedExecutor=").append(boundedThreadPool); - sb.append(", unboundedExecutor=").append(unboundedThreadPool); + sb.append(", upload executor=").append(transfers.getExecutorService()); + sb.append(", copy executor=").append(copies.getExecutorService()); sb.append(", statistics {") .append(statistics) .append("}"); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java index 9dd5def2c1e1e..0b31db39b642d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java @@ -881,4 +881,12 @@ public static void closeAll(Logger log, } } + static int getMaxThreads(Configuration conf, String maxThreadsKey, int defaultMaxThreads) { + int maxThreads = conf.getInt(maxThreadsKey, defaultMaxThreads); + if (maxThreads < 2) { + LOG.warn(maxThreadsKey + " must be at least 2: forcing to 2."); + maxThreads = 2; + } + return maxThreads; + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AConcurrentOps.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AConcurrentOps.java index e320bb21918de..4573f8d233425 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AConcurrentOps.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AConcurrentOps.java @@ -94,6 +94,7 @@ public void setup() throws Exception { private S3AFileSystem getRestrictedFileSystem() throws Exception { Configuration conf = getConfiguration(); conf.setInt(MAX_THREADS, 2); + conf.setInt(COPY_MAX_THREADS, 2); conf.setInt(MAX_TOTAL_TASKS, 1); conf.set(MIN_MULTIPART_THRESHOLD, "10M");