diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index 105e7a104e63c..4179da21fd5e9 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.lang.reflect.Field; +import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; import org.apache.hadoop.fs.azurebfs.services.PrefixMode; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; @@ -255,7 +256,7 @@ public class AbfsConfiguration{ private int readAheadQueueDepth; @IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD, - DefaultValue = 0) + DefaultValue = DEFAULT_FS_AZURE_BLOB_RENAME_THREAD) private int blobDirRenameMaxThread; @LongConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_BLOB_COPY_PROGRESS_POLL_WAIT_MILLIS, @@ -343,6 +344,13 @@ public class AbfsConfiguration{ FS_AZURE_ENABLE_ABFS_LIST_ITERATOR, DefaultValue = DEFAULT_ENABLE_ABFS_LIST_ITERATOR) private boolean enableAbfsListIterator; + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = + FS_AZURE_PRODUCER_QUEUE_MAX_SIZE, DefaultValue = DEFAULT_FS_AZURE_PRODUCER_QUEUE_MAX_SIZE) + private int producerQueueMaxSize; + + @BooleanConfigurationValidatorAnnotation(ConfigurationKey=FS_AZURE_LEASE_CREATE_NON_RECURSIVE, DefaultValue = DEFAULT_FS_AZURE_LEASE_CREATE_NON_RECURSIVE) + private boolean leaseOnCreateNonRecursive; + public AbfsConfiguration(final Configuration rawConfig, String accountName) throws IllegalAccessException, InvalidConfigurationValueException, IOException { this.rawConfig = ProviderUtils.excludeIncompatibleCredentialProviders( @@ -1222,4 +1230,12 @@ public void setOptimizeFooterRead(boolean optimizeFooterRead) { public void setEnableAbfsListIterator(boolean enableAbfsListIterator) { this.enableAbfsListIterator = enableAbfsListIterator; } + + public int getProducerQueueMaxSize() { + return producerQueueMaxSize; + } + + public boolean isLeaseOnCreateNonRecursive() { + return leaseOnCreateNonRecursive; + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index a8865de16a3fc..98dbb0f11a884 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -44,6 +44,7 @@ import java.util.concurrent.Future; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException; +import org.apache.hadoop.fs.azurebfs.services.AbfsBlobLease; import org.apache.hadoop.fs.azurebfs.services.BlobProperty; import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.fs.azurebfs.services.OperativeEndpoint; @@ -114,6 +115,7 @@ import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL; import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_DEFAULT; import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.*; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.BLOB_LEASE_ONE_MINUTE_DURATION; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_BLOB_ENDPOINT; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_DNS_PREFIX; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.WASB_DNS_PREFIX; @@ -424,6 +426,29 @@ private boolean shouldRedirect(FSOperationType type, TracingContext context) @Override public FSDataOutputStream create(final Path f, final FsPermission permission, final boolean overwrite, final int bufferSize, final short replication, final long blockSize, final Progressable progress) throws IOException { + return create(f, permission, overwrite, bufferSize, replication, blockSize, + progress, false); + } + + /** + * Creates a file in the file system with the specified parameters. + * @param f the path of the file to create + * @param permission the permission of the file + * @param overwrite whether to overwrite the existing file if any + * @param bufferSize the size of the buffer to be used + * @param replication the number of replicas for the file + * @param blockSize the size of the block for the file + * @param progress the progress indicator for the file creation + * @param blobParentDirPresentChecked whether the presence of parent directory + * been checked + * @return a FSDataOutputStream object that can be used to write to the file + * @throws IOException if an error occurs while creating the file + */ + private FSDataOutputStream create(final Path f, + final FsPermission permission, + final boolean overwrite, final int bufferSize, + final short replication, + final long blockSize, final Progressable progress, final Boolean blobParentDirPresentChecked) throws IOException { LOG.debug("AzureBlobFileSystem.create path: {} permission: {} overwrite: {} bufferSize: {}", f, permission, @@ -449,9 +474,11 @@ public FSDataOutputStream create(final Path f, final FsPermission permission, fi if (prefixMode == PrefixMode.BLOB) { validatePathOrSubPathDoesNotExist(qualifiedPath, tracingContext); - Path parent = qualifiedPath.getParent(); - if (parent != null && !parent.isRoot()) { + if (!blobParentDirPresentChecked) { + Path parent = qualifiedPath.getParent(); + if (parent != null && !parent.isRoot()) { mkdirs(parent); + } } } @@ -478,14 +505,36 @@ public FSDataOutputStream createNonRecursive(final Path f, final FsPermission pe TracingContext tracingContext = new TracingContext(clientCorrelationId, fileSystemId, FSOperationType.CREATE_NON_RECURSIVE, tracingHeaderFormat, listener); + /* + * Get exclusive access to folder if this is a directory designated for atomic + * rename. The primary use case of the HBase write-ahead log file management. + */ + AbfsBlobLease abfsBlobLease = null; + String parentPath = parent.toUri().getPath(); + if (getAbfsStore().getPrefixMode() == PrefixMode.BLOB + && getAbfsStore().isAtomicRenameKey(parentPath)) { + if (getAbfsStore().getAbfsConfiguration().isLeaseOnCreateNonRecursive()) { + abfsBlobLease = new AbfsBlobLease(getAbfsClient(), + parentPath, BLOB_LEASE_ONE_MINUTE_DURATION, tracingContext); + } + } final FileStatus parentFileStatus = tryGetFileStatus(parent, tracingContext); - if (parentFileStatus == null) { + if (parentFileStatus == null || !parentFileStatus.isDirectory()) { + if (abfsBlobLease != null) { + abfsBlobLease.free(); + } throw new FileNotFoundException("Cannot create file " - + f.getName() + " because parent folder does not exist."); + + f.getName() + + " because parent folder does not exist or is a file."); } - return create(f, permission, overwrite, bufferSize, replication, blockSize, progress); + final FSDataOutputStream outputStream = create(f, permission, overwrite, + bufferSize, replication, blockSize, progress, true); + if (abfsBlobLease != null) { + abfsBlobLease.free(); + } + return outputStream; } @Override diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 1e4b2cfcc1928..8b74316c60920 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -61,6 +61,11 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException; import org.apache.hadoop.fs.azurebfs.enums.BlobCopyProgress; +import org.apache.hadoop.fs.azurebfs.services.AbfsBlobLease; +import org.apache.hadoop.fs.azurebfs.services.AbfsDfsLease; +import org.apache.hadoop.fs.azurebfs.services.ListBlobConsumer; +import org.apache.hadoop.fs.azurebfs.services.ListBlobProducer; +import org.apache.hadoop.fs.azurebfs.services.ListBlobQueue; import org.apache.hadoop.fs.azurebfs.services.OperativeEndpoint; import org.apache.hadoop.fs.azurebfs.services.AbfsHttpHeader; import org.apache.hadoop.fs.azurebfs.services.PrefixMode; @@ -147,9 +152,9 @@ import static java.net.HttpURLConnection.HTTP_CONFLICT; import static java.net.HttpURLConnection.HTTP_NOT_FOUND; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.BLOB_LEASE_ONE_MINUTE_DURATION; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FORWARD_SLASH; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_METADATA_PREFIX; -import static org.apache.hadoop.fs.azurebfs.services.RenameAtomicityUtils.SUFFIX; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_EQUALS; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_FORWARD_SLASH; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_HYPHEN; @@ -177,6 +182,7 @@ import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.COPY_BLOB_ABORTED; import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.COPY_BLOB_FAILED; import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.PATH_EXISTS; +import static org.apache.hadoop.fs.azurebfs.services.RenameAtomicityUtils.SUFFIX; /** * Provides the bridging logic between Hadoop's abstract filesystem and Azure Storage. @@ -207,8 +213,6 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport { private final AbfsCounters abfsCounters; private PrefixMode prefixMode; - private final ExecutorService renameBlobExecutorService; - /** * The set of directories where we should store files as append blobs. */ @@ -299,14 +303,6 @@ public AzureBlobFileSystemStore( abfsConfiguration.getMaxWriteRequestsToQueue(), 10L, TimeUnit.SECONDS, "abfs-bounded"); - if (abfsConfiguration.getBlobDirRenameMaxThread() == 0) { - renameBlobExecutorService = Executors.newFixedThreadPool( - Runtime.getRuntime() - .availableProcessors()); - } else { - renameBlobExecutorService = Executors.newFixedThreadPool( - abfsConfiguration.getBlobDirRenameMaxThread()); - } } /** @@ -559,19 +555,21 @@ public void setPathProperties(final Path path, * Orchestrates the copying of blob from given source to a given destination. * @param srcPath source path * @param dstPath destination path + * @param copySrcLeaseId leaseId on the source * @param tracingContext object of TracingContext used for the tracing of the * server calls. + * * @throws AzureBlobFileSystemException exception thrown from the server calls, * or if it is discovered that the copying is failed or aborted. */ @VisibleForTesting void copyBlob(Path srcPath, Path dstPath, - TracingContext tracingContext) throws AzureBlobFileSystemException { + final String copySrcLeaseId, TracingContext tracingContext) throws AzureBlobFileSystemException { AbfsRestOperation copyOp = null; try { copyOp = client.copyBlob(srcPath, dstPath, - tracingContext); + copySrcLeaseId, tracingContext); } catch (AbfsRestOperationException ex) { if (ex.getStatusCode() == HttpURLConnection.HTTP_CONFLICT) { final BlobProperty dstBlobProperty = getBlobProperty(dstPath, @@ -1385,134 +1383,58 @@ public void rename(final Path source, final Path destination, if (getAbfsConfiguration().getPrefixMode() == PrefixMode.BLOB) { LOG.debug("Rename for src: {} dst: {} for non-HNS blob-endpoint", source, destination); - final Boolean isSrcExist; - final Boolean isSrcDir; /* * Fetch the list of blobs in the given sourcePath. */ - List srcBlobProperties = getListBlobs(source, null, null, - tracingContext, null, true); - BlobProperty blobPropOnSrc; + StringBuilder listSrcBuilder = new StringBuilder( + source.toUri().getPath()); + if (!source.isRoot()) { + listSrcBuilder.append(FORWARD_SLASH); + } + String listSrc = listSrcBuilder.toString(); + BlobList blobList = client.getListBlobs(null, listSrc, null, null, + tracingContext).getResult() + .getBlobList(); + List srcBlobProperties = blobList.getBlobPropertyList(); + if (srcBlobProperties.size() > 0) { - LOG.debug("src {} exists and is a directory", source); - isSrcExist = true; - isSrcDir = true; + orchestrateBlobRenameDir(source, destination, renameAtomicityUtils, + tracingContext, listSrc, blobList); + } else { /* - * Fetch if there is a marker-blob for the source blob. - */ - BlobProperty blobPropOnSrcNullable; + * Source doesn't have any hierarchy. It can either be marker or non-marker blob. + * Or there can be no blob on the path. + * Rename procedure will start. If its a file or a marker file, it will be renamed. + * In case there is no blob on the path, server will return exception. + */ + LOG.debug("source {} doesn't have any blob in its hierarchy. " + + "Starting rename process on the source.", source); + + AbfsLease lease = null; try { - blobPropOnSrcNullable = getBlobProperty(source, tracingContext); - } catch (AbfsRestOperationException ex) { - if (ex.getStatusCode() != HttpURLConnection.HTTP_NOT_FOUND) { - throw ex; + if (isAtomicRenameKey(source.toUri().getPath())) { + lease = getBlobLease(source.toUri().getPath(), + BLOB_LEASE_ONE_MINUTE_DURATION, tracingContext); } - blobPropOnSrcNullable = null; - } - if (blobPropOnSrcNullable == null) { - /* - * There is no marker-blob, the client has to create marker blob before - * starting the rename. - */ - //create marker file; add in srcBlobProperties; - LOG.debug("Source {} is a directory but there is no marker-blob", - source); - createDirectory(source, null, FsPermission.getDirDefault(), - FsPermission.getUMask( - getAbfsConfiguration().getRawConfiguration()), - tracingContext); - blobPropOnSrc = new BlobProperty(); - blobPropOnSrc.setIsDirectory(true); - blobPropOnSrc.setPath(source); - } else { - LOG.debug("Source {} is a directory but there is a marker-blob", - source); - blobPropOnSrc = blobPropOnSrcNullable; - } - } else { - LOG.debug("source {} doesn't have any blob in its hierarchy. Checking" - + "if there is marker blob for it.", source); - try { - blobPropOnSrc = getBlobProperty(source, tracingContext); - } catch (AbfsRestOperationException ex) { - if (ex.getStatusCode() != HttpURLConnection.HTTP_NOT_FOUND) { - throw ex; + renameBlob(source, destination, lease, tracingContext); + } catch (AzureBlobFileSystemException ex) { + if (lease != null) { + lease.free(); } - blobPropOnSrc = null; - } - - if (blobPropOnSrc != null) { - isSrcExist = true; - if (blobPropOnSrc.getIsDirectory()) { - LOG.debug("source {} is a marker blob", source); - isSrcDir = true; - } else { - LOG.debug("source {} exists but is not a marker blob", source); - isSrcDir = false; + LOG.error( + String.format("Rename of path from %s to %s failed", + source, destination), ex); + if (ex instanceof AbfsRestOperationException + && ((AbfsRestOperationException) ex).getStatusCode() + == HTTP_NOT_FOUND) { + AbfsRestOperationException ex1 = (AbfsRestOperationException) ex; + throw new AbfsRestOperationException( + ex1.getStatusCode(), + AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND.getErrorCode(), + ex1.getErrorMessage(), ex1); } - } else { - LOG.debug("source {} doesn't exist", source); - isSrcExist = false; - isSrcDir = false; - } - } - srcBlobProperties.add(blobPropOnSrc); - - if (!isSrcExist) { - LOG.info("source {} doesn't exists", source); - throw new AbfsRestOperationException(HttpURLConnection.HTTP_NOT_FOUND, - AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND.getErrorCode(), null, - null); - } - if (isSrcDir) { - /* - * If source is a directory, all the blobs in the directory have to be - * individually copied and then deleted at the source. - */ - LOG.debug("source {} is a directory", source); - if (isAtomicRenameKey(source.toUri().getPath())) { - LOG.debug("source dir {} is an atomicRenameKey", - source.toUri().getPath()); - renameAtomicityUtils.preRename(srcBlobProperties, isCreateOperationOnBlobEndpoint()); - } else { - LOG.debug("source dir {} is not an atomicRenameKey", - source.toUri().getPath()); - } - List futures = new ArrayList<>(); - for (BlobProperty blobProperty : srcBlobProperties) { - futures.add(renameBlobExecutorService.submit(() -> { - try { - renameBlob( - createDestinationPathForBlobPartOfRenameSrcDir(destination, - blobProperty, source), - tracingContext, blobProperty.getPath()); - } catch (AzureBlobFileSystemException e) { - LOG.error(String.format("rename from %s to %s for blob %s failed", - source, destination, blobProperty.getPath()), e); - throw new RuntimeException(e); - } - })); - } - for (Future future : futures) { - try { - future.get(); - } catch (InterruptedException e) { - LOG.error(String.format("rename from %s to %s failed", source, - destination), e); - throw new RuntimeException(e); - } catch (ExecutionException e) { - LOG.error(String.format("rename from %s to %s failed", source, - destination), e); - throw new RuntimeException(e); - } - } - if (renameAtomicityUtils != null) { - renameAtomicityUtils.cleanup(); + throw ex; } - } else { - LOG.debug("source {} is not directory", source); - renameBlob(destination, tracingContext, - srcBlobProperties.get(0).getPath()); } LOG.info("Rename from source {} to destination {} done", source, destination); @@ -1552,6 +1474,163 @@ public void rename(final Path source, final Path destination, } while (shouldContinue); } + private void orchestrateBlobRenameDir(final Path source, + final Path destination, + final RenameAtomicityUtils renameAtomicityUtils, + final TracingContext tracingContext, + final String listSrc, + final BlobList blobList) throws IOException { + ListBlobQueue listBlobQueue = new ListBlobQueue( + blobList.getBlobPropertyList(), + getAbfsConfiguration().getProducerQueueMaxSize(), + getAbfsConfiguration().getBlobDirRenameMaxThread()); + + if (blobList.getNextMarker() != null) { + new ListBlobProducer(listSrc, + client, listBlobQueue, blobList.getNextMarker(), tracingContext); + } else { + listBlobQueue.complete(); + } + LOG.debug("src {} exists and is a directory", source); + /* + * Fetch if there is a marker-blob for the source blob. + */ + BlobProperty blobPropOnSrcNullable; + try { + blobPropOnSrcNullable = getBlobProperty(source, tracingContext); + } catch (AbfsRestOperationException ex) { + if (ex.getStatusCode() != HttpURLConnection.HTTP_NOT_FOUND) { + throw ex; + } + blobPropOnSrcNullable = null; + } + + if (blobPropOnSrcNullable == null) { + /* + * There is no marker-blob, the client has to create marker blob before + * starting the rename. + */ + LOG.debug("Source {} is a directory but there is no marker-blob", + source); + createDirectory(source, null, FsPermission.getDirDefault(), + FsPermission.getUMask( + getAbfsConfiguration().getRawConfiguration()), + tracingContext); + } else { + LOG.debug("Source {} is a directory but there is a marker-blob", + source); + } + /* + * If source is a directory, all the blobs in the directory have to be + * individually copied and then deleted at the source. + */ + LOG.debug("source {} is a directory", source); + final AbfsBlobLease srcDirLease; + final Boolean isAtomicRename; + if (isAtomicRenameKey(source.toUri().getPath())) { + LOG.debug("source dir {} is an atomicRenameKey", + source.toUri().getPath()); + srcDirLease = getBlobLease(source.toUri().getPath(), + BLOB_LEASE_ONE_MINUTE_DURATION, + tracingContext); + renameAtomicityUtils.preRename( + isCreateOperationOnBlobEndpoint()); + isAtomicRename = true; + } else { + srcDirLease = null; + isAtomicRename = false; + LOG.debug("source dir {} is not an atomicRenameKey", + source.toUri().getPath()); + } + + renameBlobDir(source, destination, tracingContext, listBlobQueue, + srcDirLease, isAtomicRename); + + if (isAtomicRename) { + renameAtomicityUtils.cleanup(); + } + } + + @VisibleForTesting + AbfsBlobLease getBlobLease(final String source, + final Integer blobLeaseOneMinuteDuration, + final TracingContext tracingContext) throws AzureBlobFileSystemException { + return new AbfsBlobLease(client, source, blobLeaseOneMinuteDuration, + tracingContext); + } + + private void renameBlobDir(final Path source, + final Path destination, + final TracingContext tracingContext, + final ListBlobQueue listBlobQueue, + final AbfsBlobLease srcDirBlobLease, + final Boolean isAtomicRename) throws AzureBlobFileSystemException { + List blobList; + ListBlobConsumer listBlobConsumer = new ListBlobConsumer(listBlobQueue); + final ExecutorService renameBlobExecutorService + = Executors.newFixedThreadPool( + getAbfsConfiguration().getBlobDirRenameMaxThread()); + while(!listBlobConsumer.isCompleted()) { + blobList = listBlobConsumer.consume(); + if(blobList == null) { + continue; + } + List futures = new ArrayList<>(); + for (BlobProperty blobProperty : blobList) { + futures.add(renameBlobExecutorService.submit(() -> { + try { + AbfsBlobLease blobLease = null; + if (isAtomicRename) { + /* + * Conditionally get a lease on the source blob to prevent other writers + * from changing it. This is used for correctness in HBase when log files + * are renamed. It generally should do no harm other than take a little + * more time for other rename scenarios. When the HBase master renames a + * log file folder, the lease locks out other writers. This + * prevents a region server that the master thinks is dead, but is still + * alive, from committing additional updates. This is different than + * when HBase runs on HDFS, where the region server recovers the lease + * on a log file, to gain exclusive access to it, before it splits it. + */ + blobLease = getBlobLease(blobProperty.getPath().toUri().getPath(), + BLOB_LEASE_ONE_MINUTE_DURATION, tracingContext); + } + renameBlob( + blobProperty.getPath(), + createDestinationPathForBlobPartOfRenameSrcDir(destination, + blobProperty.getPath(), source), + blobLease, + tracingContext); + } catch (AzureBlobFileSystemException e) { + LOG.error(String.format("rename from %s to %s for blob %s failed", + source, destination, blobProperty.getPath()), e); + throw new RuntimeException(e); + } + })); + } + for (Future future : futures) { + try { + future.get(); + } catch (InterruptedException | ExecutionException e) { + LOG.error(String.format("rename from %s to %s failed", source, + destination), e); + renameBlobExecutorService.shutdown(); + if (srcDirBlobLease != null) { + srcDirBlobLease.free(); + } + throw new RuntimeException(e); + } + } + } + renameBlobExecutorService.shutdown(); + + renameBlob( + source, createDestinationPathForBlobPartOfRenameSrcDir(destination, + source, source), + srcDirBlobLease, + tracingContext); + } + private Boolean isCreateOperationOnBlobEndpoint() { return !OperativeEndpoint.isIngressEnabledOnDFS(prefixMode, abfsConfiguration); } @@ -1559,17 +1638,18 @@ private Boolean isCreateOperationOnBlobEndpoint() { /** * Translates the destination path for a blob part of a source directory getting * renamed. + * * @param destinationDir destination directory for the rename operation - * @param srcBlobProperty blob part of the source directory getting renamed + * @param blobPath path of blob inside sourceDir being renamed. * @param sourceDir source directory for the rename operation + * * @return translated path for the blob */ private Path createDestinationPathForBlobPartOfRenameSrcDir(final Path destinationDir, - final BlobProperty srcBlobProperty, - final Path sourceDir) { + final Path blobPath, final Path sourceDir) { String destinationPathStr = destinationDir.toUri().getPath(); String sourcePathStr = sourceDir.toUri().getPath(); - String srcBlobPropertyPathStr = srcBlobProperty.getPath().toUri().getPath(); + String srcBlobPropertyPathStr = blobPath.toUri().getPath(); if (sourcePathStr.equals(srcBlobPropertyPathStr)) { return destinationDir; } @@ -1581,22 +1661,31 @@ private Path createDestinationPathForBlobPartOfRenameSrcDir(final Path destinati * Renames blob. * It copies the source blob to the destination. After copy is succesful, it * deletes the source blob - * @param destination destination path to which the source has to be moved + * * @param sourcePath source path which gets copied to the destination + * @param destination destination path to which the source has to be moved + * @param lease lease of the srcBlob * @param tracingContext tracingContext for tracing the API calls + * * @throws AzureBlobFileSystemException exception in making server calls */ - private void renameBlob(final Path destination, - final TracingContext tracingContext, - final Path sourcePath) throws AzureBlobFileSystemException { - copyBlob(sourcePath, destination, tracingContext); - deleteBlob(sourcePath, tracingContext); + private void renameBlob(final Path sourcePath, final Path destination, + final AbfsLease lease, final TracingContext tracingContext) + throws AzureBlobFileSystemException { + copyBlob(sourcePath, destination, lease != null ? lease.getLeaseID() : null, + tracingContext); + deleteBlob(sourcePath, lease, tracingContext); } private void deleteBlob(final Path sourcePath, - final TracingContext tracingContext) throws AzureBlobFileSystemException { + final AbfsLease lease, final TracingContext tracingContext) + throws AzureBlobFileSystemException { try { - client.deleteBlobPath(sourcePath, tracingContext); + client.deleteBlobPath(sourcePath, + lease != null ? lease.getLeaseID() : null, tracingContext); + if (lease != null) { + lease.cancelTimer(); + } } catch (AbfsRestOperationException ex) { if (ex.getStatusCode() != HttpURLConnection.HTTP_NOT_FOUND) { throw ex; @@ -2374,25 +2463,23 @@ public boolean isAtomicRenameKey(String key) { RenameAtomicityUtils.RedoRenameInvocation getRedoRenameInvocation(final TracingContext tracingContext) { return new RenameAtomicityUtils.RedoRenameInvocation() { @Override - public void redo(final Path destination, final List sourcePaths, - final List destinationSuffix) + public void redo(final Path destination, final Path src) throws AzureBlobFileSystemException { - for (int i = 0; i < sourcePaths.size(); i++) { - try { - final Path destinationPath; - if (destinationSuffix.get(i).isEmpty()) { - destinationPath = destination; - } else { - destinationPath = new Path(destination, destinationSuffix.get(i)); - } - renameBlob(destinationPath, tracingContext, sourcePaths.get(i)); - } catch (AbfsRestOperationException ex) { - if (ex.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) { - continue; - } - throw ex; - } + + ListBlobQueue listBlobQueue = new ListBlobQueue( + getAbfsConfiguration().getProducerQueueMaxSize(), + getAbfsConfiguration().getBlobDirRenameMaxThread()); + StringBuilder listSrcBuilder = new StringBuilder(src.toUri().getPath()); + if (!src.isRoot()) { + listSrcBuilder.append(FORWARD_SLASH); } + String listSrc = listSrcBuilder.toString(); + new ListBlobProducer(listSrc, client, listBlobQueue, null, + tracingContext); + AbfsBlobLease abfsBlobLease = getBlobLease(src.toUri().getPath(), + BLOB_LEASE_ONE_MINUTE_DURATION, tracingContext); + renameBlobDir(src, destination, tracingContext, listBlobQueue, + abfsBlobLease, true); } }; } @@ -2828,7 +2915,12 @@ private AbfsLease maybeCreateLease(String relativePath, TracingContext tracingCo if (!enableInfiniteLease) { return null; } - AbfsLease lease = new AbfsLease(client, relativePath, tracingContext); + final AbfsLease lease; + if (getPrefixMode() == PrefixMode.DFS) { + lease = new AbfsDfsLease(client, relativePath, null, tracingContext); + } else { + lease = getBlobLease(relativePath, null, tracingContext); + } leaseRefs.put(lease, null); return lease; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java index fd07c1c686c91..3b15222ff47ff 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java @@ -162,6 +162,7 @@ public final class AbfsHttpConstants { public static final String COPY_STATUS_ABORTED = "aborted"; public static final String COPY_STATUS_FAILED = "failed"; public static final String HDI_ISFOLDER = "hdi_isfolder"; + public static final Integer BLOB_LEASE_ONE_MINUTE_DURATION = 60; public static final String ETAG = "Etag"; public static final String LAST_MODIFIED_TIME = "Last-Modified"; public static final String CREATION_TIME = "Creation-Time"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index 6747cf62bd3d4..23989b9f3444f 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -274,6 +274,8 @@ public static String accountProperty(String property, String account) { public static final String FS_AZURE_REDIRECT_DELETE = "fs.azure.redirect.delete"; public static final String FS_AZURE_REDIRECT_RENAME = "fs.azure.redirect.rename"; + public static final String FS_AZURE_PRODUCER_QUEUE_MAX_SIZE = "fs.azure.producer.queue.max.size"; + public static final String FS_AZURE_LEASE_CREATE_NON_RECURSIVE = "fs.azure.lease.create.non.recursive"; private ConfigurationKeys() {} } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index cf94b2b16ad59..b7e688a76a9f7 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -131,6 +131,9 @@ public final class FileSystemConfigurations { // To have functionality similar to drop1 delete is going to wasb by default for now. public static final boolean DEFAULT_FS_AZURE_REDIRECT_RENAME = false; public static final boolean DEFAULT_FS_AZURE_REDIRECT_DELETE = true; + public static final int DEFAULT_FS_AZURE_PRODUCER_QUEUE_MAX_SIZE = 10000; + public static final boolean DEFAULT_FS_AZURE_LEASE_CREATE_NON_RECURSIVE = false; + public static final int DEFAULT_FS_AZURE_BLOB_RENAME_THREAD = 5; /** * Limit of queued block upload operations before writes diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java index d5751954a52a8..cd3c321b4395d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java @@ -68,6 +68,7 @@ public final class HttpHeaderConfigurations { public static final String X_MS_LEASE_ACTION = "x-ms-lease-action"; public static final String X_MS_LEASE_DURATION = "x-ms-lease-duration"; public static final String X_MS_LEASE_ID = "x-ms-lease-id"; + public static final String X_MS_SOURCE_LEASE_ID = "x-ms-source-lease-id"; public static final String X_MS_PROPOSED_LEASE_ID = "x-ms-proposed-lease-id"; public static final String X_MS_LEASE_BREAK_PERIOD = "x-ms-lease-break-period"; public static final String X_MS_BLOB_TYPE = "x-ms-blob-type"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java index 818eda8d7c7d8..1b493e590b9b7 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java @@ -29,6 +29,7 @@ public final class HttpQueryParams { public static final String QUERY_PARAM_RESOURCE = "resource"; public static final String QUERY_PARAM_RESTYPE = "restype"; public static final String QUERY_PARAM_COMP = "comp"; + public static final String QUERY_PARAM_COMP_LEASE_VALUE = "lease"; public static final String QUERY_PARAM_COMP_VALUE_LIST = "list"; public static final String QUERY_PARAM_PREFIX = "prefix"; public static final String QUERY_PARAM_MARKER = "marker"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/extensions/SASTokenProvider.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/extensions/SASTokenProvider.java index 84a81f37c8757..c5f59e96905d8 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/extensions/SASTokenProvider.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/extensions/SASTokenProvider.java @@ -59,6 +59,8 @@ public interface SASTokenProvider { String SET_BLOB_METADATA_OPERATION = "set-blob-metadata"; String WRITE_OPERATION = "write"; + String LEASE_OPERATION = "lease"; + /** * Initialize authorizer for Azure Blob File System. * @param configuration Configuration object diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobLease.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobLease.java new file mode 100644 index 0000000000000..e254ff98f6444 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobLease.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; +import org.apache.hadoop.fs.azurebfs.utils.TracingContext; + +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_LEASE_ID; + +public class AbfsBlobLease extends AbfsLease { + + public AbfsBlobLease(final AbfsClient client, + final String path, + final Integer leaseDuration, + final TracingContext tracingContext) throws AzureBlobFileSystemException { + super(client, path, leaseDuration, tracingContext); + } + + public AbfsBlobLease(final AbfsClient client, + final String path, + final int acquireMaxRetries, + final int acquireRetryInterval, + final Integer leaseDuration, + final TracingContext tracingContext) throws AzureBlobFileSystemException { + super(client, path, acquireMaxRetries, acquireRetryInterval, leaseDuration, + tracingContext); + } + + @Override + String callRenewLeaseAPI(final String path, + final String leaseId, + final TracingContext tracingContext) throws AzureBlobFileSystemException { + return extractLeaseInfo(client.renewBlobLease(path, leaseId, tracingContext)); + } + + @Override + AbfsRestOperation callAcquireLeaseAPI(final String path, + final Integer leaseDuration, + final TracingContext tracingContext) throws AzureBlobFileSystemException { + return client.acquireBlobLease(path, leaseDuration, tracingContext); + } + + @Override + void callReleaseLeaseAPI(final String path, + final String leaseID, + final TracingContext tracingContext) throws AzureBlobFileSystemException { + client.releaseBlobLease(path, leaseID, tracingContext); + } + + private String extractLeaseInfo(final AbfsRestOperation op) { + return op.getResult().getResponseHeader(X_MS_LEASE_ID); + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index f15961dc6b7b4..042d8560ef0c2 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -517,6 +517,28 @@ public AbfsRestOperation acquireLease(final String path, int duration, TracingCo return op; } + public AbfsRestOperation acquireBlobLease(final String path, final int duration, final TracingContext tracingContext) throws AzureBlobFileSystemException { + final List requestHeaders = createDefaultHeaders(); + requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION, ACQUIRE_LEASE_ACTION)); + requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_DURATION, Integer.toString(duration))); + requestHeaders.add(new AbfsHttpHeader(X_MS_PROPOSED_LEASE_ID, UUID.randomUUID().toString())); + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, QUERY_PARAM_COMP_LEASE_VALUE); + appendSASTokenToQuery(path, SASTokenProvider.LEASE_OPERATION, abfsUriQueryBuilder); + + + URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); + + final AbfsRestOperation op = new AbfsRestOperation( + AbfsRestOperationType.LeaseBlob, + this, + HTTP_METHOD_PUT, + url, + requestHeaders); + op.execute(tracingContext); + return op; + } + public AbfsRestOperation renewLease(final String path, final String leaseId, TracingContext tracingContext) throws AzureBlobFileSystemException { final List requestHeaders = createDefaultHeaders(); @@ -540,6 +562,29 @@ public AbfsRestOperation renewLease(final String path, final String leaseId, return op; } + public AbfsRestOperation renewBlobLease(final String path, final String leaseId, + TracingContext tracingContext) throws AzureBlobFileSystemException { + final List requestHeaders = createDefaultHeaders(); + + requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION, RENEW_LEASE_ACTION)); + requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId)); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, QUERY_PARAM_COMP_LEASE_VALUE); + appendSASTokenToQuery(path, SASTokenProvider.LEASE_OPERATION, abfsUriQueryBuilder); + + URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); + + final AbfsRestOperation op = new AbfsRestOperation( + AbfsRestOperationType.LeaseBlob, + this, + HTTP_METHOD_PUT, + url, + requestHeaders); + op.execute(tracingContext); + return op; + } + public AbfsRestOperation releaseLease(final String path, final String leaseId, TracingContext tracingContext) throws AzureBlobFileSystemException { final List requestHeaders = createDefaultHeaders(); @@ -563,6 +608,29 @@ public AbfsRestOperation releaseLease(final String path, return op; } + public AbfsRestOperation releaseBlobLease(final String path, + final String leaseId, TracingContext tracingContext) throws AzureBlobFileSystemException { + final List requestHeaders = createDefaultHeaders(); + + requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION, RELEASE_LEASE_ACTION)); + requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId)); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, QUERY_PARAM_COMP_LEASE_VALUE); + appendSASTokenToQuery(path, SASTokenProvider.LEASE_OPERATION, abfsUriQueryBuilder); + + URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); + + final AbfsRestOperation op = new AbfsRestOperation( + AbfsRestOperationType.LeasePath, + this, + HTTP_METHOD_PUT, + url, + requestHeaders); + op.execute(tracingContext); + return op; + } + public AbfsRestOperation breakLease(final String path, TracingContext tracingContext) throws AzureBlobFileSystemException { final List requestHeaders = createDefaultHeaders(); @@ -1298,6 +1366,7 @@ public AbfsRestOperation checkAccess(String path, String rwx, TracingContext tra * * @param sourceBlobPath path of source to be copied * @param destinationBlobPath path of the destination + * @param srcLeaseId * @param tracingContext tracingContext object * * @return AbfsRestOperation abfsRestOperation which contains the response from the server. @@ -1309,7 +1378,7 @@ public AbfsRestOperation checkAccess(String path, String rwx, TracingContext tra */ public AbfsRestOperation copyBlob(Path sourceBlobPath, Path destinationBlobPath, - TracingContext tracingContext) throws AzureBlobFileSystemException { + final String srcLeaseId, TracingContext tracingContext) throws AzureBlobFileSystemException { AbfsUriQueryBuilder abfsUriQueryBuilderDst = createDefaultUriQueryBuilder(); AbfsUriQueryBuilder abfsUriQueryBuilderSrc = new AbfsUriQueryBuilder(); String dstBlobRelativePath = destinationBlobPath.toUri().getPath(); @@ -1324,6 +1393,9 @@ public AbfsRestOperation copyBlob(Path sourceBlobPath, srcBlobRelativePath, abfsUriQueryBuilderSrc.toString()).toString(); List requestHeaders = createDefaultHeaders(); + if (srcLeaseId != null) { + requestHeaders.add(new AbfsHttpHeader(X_MS_SOURCE_LEASE_ID, srcLeaseId)); + } requestHeaders.add(new AbfsHttpHeader(X_MS_COPY_SOURCE, sourcePathUrl)); requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, STAR)); @@ -1520,6 +1592,7 @@ public AbfsRestOperation getListBlobs(String marker, * Deletes the blob for which the path is given. * * @param blobPath path on which blob has to be deleted. + * @param leaseId * @param tracingContext tracingContext object for tracing the server calls. * * @return abfsRestOpertion @@ -1528,7 +1601,7 @@ public AbfsRestOperation getListBlobs(String marker, * network issue. */ public AbfsRestOperation deleteBlobPath(final Path blobPath, - final TracingContext tracingContext) throws AzureBlobFileSystemException { + final String leaseId, final TracingContext tracingContext) throws AzureBlobFileSystemException { AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); String blobRelativePath = blobPath.toUri().getPath(); appendSASTokenToQuery(blobRelativePath, @@ -1536,6 +1609,9 @@ public AbfsRestOperation deleteBlobPath(final Path blobPath, final URL url = createRequestUrl(blobRelativePath, abfsUriQueryBuilder.toString()); final List requestHeaders = createDefaultHeaders(); + if(leaseId != null) { + requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId)); + } final AbfsRestOperation op = new AbfsRestOperation( AbfsRestOperationType.DeleteBlob, this, diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsLease.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsLease.java new file mode 100644 index 0000000000000..f72658fb789aa --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsLease.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; +import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; +import org.apache.hadoop.fs.azurebfs.utils.TracingContext; + +public class AbfsDfsLease extends AbfsLease { + + public AbfsDfsLease(final AbfsClient client, + final String path, + final int acquireMaxRetries, + final int acquireRetryInterval, + final Integer leaseDuration, + final TracingContext tracingContext) throws AzureBlobFileSystemException { + super(client, path, acquireMaxRetries, acquireRetryInterval, leaseDuration, + tracingContext); + } + + public AbfsDfsLease(final AbfsClient client, + final String path, + final Integer leaseDuration, + final TracingContext tracingContext) throws AzureBlobFileSystemException { + super(client, path, leaseDuration, tracingContext); + } + + @Override + String callRenewLeaseAPI(final String path, + final String leaseId, + final TracingContext tracingContext) throws AzureBlobFileSystemException { + AbfsRestOperation op = client.renewLease(path, leaseId, tracingContext); + return op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_LEASE_ID); + } + + @Override + AbfsRestOperation callAcquireLeaseAPI(final String path, final Integer leaseDuration, + final TracingContext tracingContext) + throws AzureBlobFileSystemException { + return client.acquireLease(path, + leaseDuration, tracingContext); + } + + @Override + void callReleaseLeaseAPI(final String path, final String leaseID, final TracingContext tracingContext) + throws AzureBlobFileSystemException { + client.releaseLease(path, leaseID, tracingContext); + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsLease.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsLease.java index 2e97598ef04f3..4c3c3e0d16bf8 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsLease.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsLease.java @@ -19,7 +19,10 @@ package org.apache.hadoop.fs.azurebfs.services; import java.io.IOException; +import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.FutureCallback; @@ -52,7 +55,7 @@ * Call free() to release the Lease. If the holder process dies, AzureBlobFileSystem breakLease * will need to be called before another client will be able to write to the file. */ -public final class AbfsLease { +public abstract class AbfsLease { private static final Logger LOG = LoggerFactory.getLogger(AbfsLease.class); // Number of retries for acquiring lease @@ -60,16 +63,19 @@ public final class AbfsLease { // Retry interval for acquiring lease in secs static final int DEFAULT_LEASE_ACQUIRE_RETRY_INTERVAL = 10; - private final AbfsClient client; + protected final AbfsClient client; private final String path; private final TracingContext tracingContext; // Lease status variables private volatile boolean leaseFreed; - private volatile String leaseID = null; + private AtomicReference leaseID = new AtomicReference<>(); private volatile Throwable exception = null; private volatile int acquireRetryCount = 0; private volatile ListenableScheduledFuture future = null; + private final Integer leaseDuration; + + private Timer timer = null; public static class LeaseException extends AzureBlobFileSystemException { public LeaseException(Throwable t) { @@ -81,20 +87,36 @@ public LeaseException(String s) { } } - public AbfsLease(AbfsClient client, String path, TracingContext tracingContext) throws AzureBlobFileSystemException { + /** + * @param client client object for making server calls + * @param path path on which lease has to be acquired, renewed and freed in future + * @param leaseDuration duration for which lease to be taken in seconds. If given + * null, it will be taken as infinte-lease. + * @param tracingContext for tracing server calls + * + * @throws AzureBlobFileSystemException exception while calling acquireLease API + */ + public AbfsLease(AbfsClient client, String path, + final Integer leaseDuration, + TracingContext tracingContext) throws AzureBlobFileSystemException { this(client, path, DEFAULT_LEASE_ACQUIRE_MAX_RETRIES, - DEFAULT_LEASE_ACQUIRE_RETRY_INTERVAL, tracingContext); + DEFAULT_LEASE_ACQUIRE_RETRY_INTERVAL, leaseDuration, tracingContext); } @VisibleForTesting public AbfsLease(AbfsClient client, String path, int acquireMaxRetries, - int acquireRetryInterval, TracingContext tracingContext) throws AzureBlobFileSystemException { + int acquireRetryInterval, final Integer leaseDuration, TracingContext tracingContext) throws AzureBlobFileSystemException { this.leaseFreed = false; this.client = client; this.path = path; this.tracingContext = tracingContext; + this.leaseDuration = leaseDuration; - if (client.getNumLeaseThreads() < 1) { + /* + * If the number of threads to use for lease operations for infinite lease directories + * and the object is created for infinite-lease (leaseDuration == null). + */ + if (client.getNumLeaseThreads() < 1 && leaseDuration == null) { throw new LeaseException(ERR_NO_LEASE_THREADS); } @@ -104,7 +126,7 @@ public AbfsLease(AbfsClient client, String path, int acquireMaxRetries, acquireLease(retryPolicy, 0, acquireRetryInterval, 0, new TracingContext(tracingContext)); - while (leaseID == null && exception == null) { + while (leaseID.get() == null && exception == null) { try { future.get(); } catch (Exception e) { @@ -122,18 +144,25 @@ public AbfsLease(AbfsClient client, String path, int acquireMaxRetries, private void acquireLease(RetryPolicy retryPolicy, int numRetries, int retryInterval, long delay, TracingContext tracingContext) - throws LeaseException { + throws AzureBlobFileSystemException { LOG.debug("Attempting to acquire lease on {}, retry {}", path, numRetries); if (future != null && !future.isDone()) { throw new LeaseException(ERR_LEASE_FUTURE_EXISTS); } - future = client.schedule(() -> client.acquireLease(path, - INFINITE_LEASE_DURATION, tracingContext), + if (leaseDuration != null) { + leaseID.set( + callAcquireLeaseAPI(path, leaseDuration, tracingContext).getResult() + .getResponseHeader(HttpHeaderConfigurations.X_MS_LEASE_ID)); + spawnLeaseRenewTimer(path, leaseDuration * 1000); + return; + } + future = client.schedule(() -> callAcquireLeaseAPI(path, + INFINITE_LEASE_DURATION, tracingContext), delay, TimeUnit.SECONDS); client.addCallback(future, new FutureCallback() { @Override public void onSuccess(@Nullable AbfsRestOperation op) { - leaseID = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_LEASE_ID); + leaseID.set(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_LEASE_ID)); LOG.debug("Acquired lease {} on {}", leaseID, path); } @@ -156,6 +185,28 @@ public void onFailure(Throwable throwable) { }); } + private void spawnLeaseRenewTimer(String path, Integer leaseDuration) { + timer = new Timer(); + timer.schedule(new TimerTask() { + @Override + public void run() { + try { + leaseID.set(callRenewLeaseAPI(path, leaseID.get(), tracingContext)); + } catch (AzureBlobFileSystemException ignored) { + } + } + }, leaseDuration / 2, leaseDuration / 2); + } + + abstract String callRenewLeaseAPI(final String path, + final String s, + final TracingContext tracingContext) throws AzureBlobFileSystemException; + + abstract AbfsRestOperation callAcquireLeaseAPI(final String path, + final Integer leaseDuration, + final TracingContext tracingContext) + throws AzureBlobFileSystemException; + /** * Cancel future and free the lease. If an exception occurs while releasing the lease, the error * will be logged. If the lease cannot be released, AzureBlobFileSystem breakLease will need to @@ -170,9 +221,10 @@ public void free() { if (future != null && !future.isDone()) { future.cancel(true); } + cancelTimer(); TracingContext tracingContext = new TracingContext(this.tracingContext); tracingContext.setOperation(FSOperationType.RELEASE_LEASE); - client.releaseLease(path, leaseID, tracingContext); + callReleaseLeaseAPI(path, leaseID.get(), tracingContext); } catch (IOException e) { LOG.warn("Exception when trying to release lease {} on {}. Lease will need to be broken: {}", leaseID, path, e.getMessage()); @@ -184,12 +236,21 @@ public void free() { } } + public void cancelTimer() { + if (timer != null) { + timer.cancel(); + } + } + + abstract void callReleaseLeaseAPI(final String path, final String leaseID, final TracingContext tracingContext) + throws AzureBlobFileSystemException; + public boolean isFreed() { return leaseFreed; } public String getLeaseID() { - return leaseID; + return leaseID.get(); } @VisibleForTesting diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperationType.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperationType.java index 35dc393069cb4..83814d883a6f1 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperationType.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperationType.java @@ -43,6 +43,7 @@ public enum AbfsRestOperationType { DeletePath, CheckAccess, LeasePath, + LeaseBlob, PutBlob, GetBlobProperties, GetContainerProperties, diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListBlobConsumer.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListBlobConsumer.java new file mode 100644 index 0000000000000..80017ab558a78 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListBlobConsumer.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.util.List; + +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; + +public class ListBlobConsumer { + + private final ListBlobQueue listBlobQueue; + + public ListBlobConsumer(final ListBlobQueue listBlobQueue) { + this.listBlobQueue = listBlobQueue; + } + + public List consume() throws AzureBlobFileSystemException { + if (listBlobQueue.getException() != null) { + throw listBlobQueue.getException(); + } + return listBlobQueue.dequeue(); + } + + public Boolean isCompleted() { + return listBlobQueue.getIsCompleted() + && listBlobQueue.size() == 0; + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListBlobProducer.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListBlobProducer.java new file mode 100644 index 0000000000000..577c51e9002fc --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListBlobProducer.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; +import org.apache.hadoop.fs.azurebfs.utils.TracingContext; + +/** + * ListBlob API can give maximum of 5000 blobs. If there are (~n*5000) blobs, the + * client would need to call the listBlob API n times. This would have two consequences: + *
    + *
  1. + * The consumer of the result of lists of blob would have to wait until all + * the blobs are received. The consumer could have used the time to start + * processing the blobs already in memory. The wait for receiving all the blobs + * would lead the processing more time. Lets say consumer need m time-units to process + * one blob. Lets assume that each set of blobs have x blobs. In total there + * have to be n sets Lets say that client needs t time to get all the blobs. If consumer + * wait for all the blobs to be received, the total time taken would be: + *
    t + (n * x * m)
    + * Now, lets assume that consumer in parallel work on the available the blobs, + * time taken would be: + *
    t + (((n * x) - t/m) * m)
    + *
  2. + *
  3. + * Since the information of the blobs have to be maintained in memory until the + * computation on the list is done. On low configuration machine, it may lead + * to OOM. + *
  4. + *
+ * In this design, the producer on a parallel thread to the main thread, will call + * ListBlob API and will populate {@link ListBlobQueue}, which would be dequeued by + * the main thread which will run the computation on the available blobs.
+ * + * How its different from {@link AbfsListStatusRemoteIterator}?
+ * It provides an iterator which on {@link AbfsListStatusRemoteIterator#hasNext()} checks + * if there are blobs available in memory. If not it will call Listing API on server for + * the next set of blobs. But here, it make the process sequential. As in, when the processing + * on whole set of blobs available in memory are done, after that only next set of blobs are + * fetched. + */ +public class ListBlobProducer { + + private final AbfsClient client; + + private final ListBlobQueue listBlobQueue; + + private final String src; + + private final TracingContext tracingContext; + + private String nextMarker; + + public ListBlobProducer(final String src, + final AbfsClient abfsClient, + final ListBlobQueue listBlobQueue, + final String initNextMarker, + TracingContext tracingContext) { + this.src = src; + this.client = abfsClient; + this.tracingContext = tracingContext; + this.listBlobQueue = listBlobQueue; + listBlobQueue.setProducer(this); + this.nextMarker = initNextMarker; + new Thread(() -> { + do { + int maxResult = listBlobQueue.availableSize(); + if (maxResult == 0) { + continue; + } + AbfsRestOperation op = null; + try { + op = client.getListBlobs(nextMarker, src, null, maxResult, tracingContext); + } catch (AzureBlobFileSystemException ex) { + listBlobQueue.setFailed(ex); + return; + } + BlobList blobList = op.getResult().getBlobList(); + nextMarker = blobList.getNextMarker(); + listBlobQueue.enqueue(blobList.getBlobPropertyList()); + if (nextMarker == null) { + listBlobQueue.complete(); + } + } while(nextMarker != null); + }).start(); + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListBlobQueue.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListBlobQueue.java new file mode 100644 index 0000000000000..3be6bac98db1f --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListBlobQueue.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.List; +import java.util.Queue; + +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; + +public class ListBlobQueue { + + private final Queue blobLists; + + private int totalProduced = 0; + + private int totalConsumed = 0; + + private Boolean isCompleted = false; + + private AzureBlobFileSystemException failureFromProducer; + + /** + * Since, Producer just spawns a thread and there are no public method for the + * class. Keeping its address in this object will prevent accidental GC close + * on the producer object. + */ + private ListBlobProducer producer; + + private final int maxSize; + private final int maxConsumedBlobCount; + + /** + * @param maxSize maxSize of the queue. + * @param maxConsumedBlobCount maximum number of blobs that would be returned + * by {@link #dequeue()} method. + */ + public ListBlobQueue(int maxSize, int maxConsumedBlobCount) { + blobLists = new ArrayDeque<>(maxSize); + this.maxSize = maxSize; + this.maxConsumedBlobCount = maxConsumedBlobCount; + } + + /** + * @param initBlobList list of blobProperties to be enqueued in th queue + * @param maxSize maxSize of the queue. + * @param maxConsumedBlobCount maximum number of blobs that would be returned + * by {@link #dequeue()} method. + */ + public ListBlobQueue(List initBlobList, int maxSize, int maxConsumedBlobCount) { + this(maxSize, maxConsumedBlobCount); + if (initBlobList != null) { + enqueue(initBlobList); + } + } + + void setProducer(ListBlobProducer producer) { + if (this.producer == null) { + this.producer = producer; + } + } + + void setFailed(AzureBlobFileSystemException failure) { + failureFromProducer = failure; + } + + public void complete() { + isCompleted = true; + } + + public Boolean getIsCompleted() { + return isCompleted; + } + + AzureBlobFileSystemException getException() { + return failureFromProducer; + } + + public void enqueue(List blobProperties) { + blobLists.addAll(blobProperties); + } + + public List dequeue() { + List blobProperties = new ArrayList<>(); + int counter = 0; + while (counter < maxConsumedBlobCount && blobLists.size() > 0) { + blobProperties.add(blobLists.poll()); + counter++; + } + return blobProperties; + } + + public int size() { + return blobLists.size(); + } + + public int availableSize() { + return maxSize - blobLists.size(); + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameAtomicityUtils.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameAtomicityUtils.java index d366a55a3071b..9f7b18a375560 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameAtomicityUtils.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameAtomicityUtils.java @@ -25,9 +25,7 @@ import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.text.SimpleDateFormat; -import java.util.ArrayList; import java.util.Date; -import java.util.List; import java.util.TimeZone; import com.fasterxml.jackson.core.JsonParseException; @@ -49,12 +47,10 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; -import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FORWARD_SLASH; - /** * For a directory enabled for atomic-rename, before rename starts, a * file with -RenamePending.json suffix is created. In this file, the states required - * for the rename are given. This file is created by {@link #writeFile()} method. + * for the rename are given. This file is created by {@link #preRename(Boolean)} ()} method. * This is important in case the JVM process crashes during rename, the atomicity * will be maintained, when the job calls {@link AzureBlobFileSystem#listStatus(Path)} * or {@link AzureBlobFileSystem#getFileStatus(Path)}. On these API calls to filesystem, @@ -97,8 +93,7 @@ public RenameAtomicityUtils(final AzureBlobFileSystem azureBlobFileSystem, final RenamePendingFileInfo renamePendingFileInfo = readFile(path); if (renamePendingFileInfo != null) { redoRenameInvocation.redo(renamePendingFileInfo.destination, - renamePendingFileInfo.srcList, - renamePendingFileInfo.destinationSuffix); + renamePendingFileInfo.src); } } @@ -159,16 +154,7 @@ private RenamePendingFileInfo readFile(final Path redoFile) newFolderName.textValue())) { RenamePendingFileInfo renamePendingFileInfo = new RenamePendingFileInfo(); renamePendingFileInfo.destination = new Path(newFolderName.textValue()); - String srcDir = oldFolderName.textValue() + FORWARD_SLASH; - List srcPaths = new ArrayList<>(); - List destinationSuffix = new ArrayList<>(); - JsonNode fileList = json.get("FileList"); - for (int i = 0; i < fileList.size(); i++) { - destinationSuffix.add(fileList.get(i).textValue()); - srcPaths.add(new Path(srcDir + fileList.get(i).textValue())); - } - renamePendingFileInfo.srcList = srcPaths; - renamePendingFileInfo.destinationSuffix = destinationSuffix; + renamePendingFileInfo.src = new Path(oldFolderName.textValue()); return renamePendingFileInfo; } return null; @@ -192,15 +178,14 @@ private void deleteRenamePendingFile(FileSystem fs, Path redoFile) /** * Write to disk the information needed to redo folder rename, * in JSON format. The file name will be - * {@code wasb:///folderName-RenamePending.json} + * {@code abfs:///folderName-RenamePending.json} * The file format will be: *
{@code
    * {
    *   FormatVersion: "1.0",
    *   OperationTime: "",
    *   OldFolderName: "",
-   *   NewFolderName: "",
-   *   FileList: [  ,  , ... ]
+   *   NewFolderName: ""
    * }
    *
    * Here's a sample:
@@ -208,21 +193,16 @@ private void deleteRenamePendingFile(FileSystem fs, Path redoFile)
    *  FormatVersion: "1.0",
    *  OperationUTCTime: "2014-07-01 23:50:35.572",
    *  OldFolderName: "user/ehans/folderToRename",
-   *  NewFolderName: "user/ehans/renamedFolder",
-   *  FileList: [
-   *    "innerFile",
-   *    "innerFile2"
-   *  ]
+   *  NewFolderName: "user/ehans/renamedFolder"
    * } }
* @throws IOException Thrown when fail to write file. */ - public void preRename(List blobPropertyList, - final Boolean isCreateOperationOnBlobEndpoint) throws IOException { + public void preRename(final Boolean isCreateOperationOnBlobEndpoint) throws IOException { Path path = getRenamePendingFilePath(); LOG.debug("Preparing to write atomic rename state to {}", path.toString()); OutputStream output = null; - String contents = makeRenamePendingFileContents(blobPropertyList); + String contents = makeRenamePendingFileContents(); // Write file. try { @@ -281,63 +261,18 @@ private Throwable getWrappedException(final IOException e) { * * @return JSON string which represents the operation. */ - private String makeRenamePendingFileContents(List blobPropertyList) { + private String makeRenamePendingFileContents() { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); sdf.setTimeZone(TimeZone.getTimeZone("UTC")); String time = sdf.format(new Date()); - // Make file list string - StringBuilder builder = new StringBuilder(); - builder.append("[\n"); - for (int i = 0; i != blobPropertyList.size(); i++) { - if (i > 0) { - builder.append(",\n"); - } - builder.append(" "); - final String noPrefix; - /* - * The marker file for the source directory has the same path on non-HNS. - * For the other files, the fileList has to save the filePath relative to the - * source directory. - */ - if (!blobPropertyList.get(i) - .getPath() - .toUri() - .getPath() - .equals(srcPath.toUri().getPath())) { - noPrefix = StringUtils.removeStart( - blobPropertyList.get(i).getPath().toUri().getPath(), - srcPath.toUri().getPath() + FORWARD_SLASH); - } else { - noPrefix = ""; - } - - // Quote string file names, escaping any possible " characters or other - // necessary characters in the name. - builder.append(quote(noPrefix)); - if (builder.length() >= - MAX_RENAME_PENDING_FILE_SIZE - FORMATTING_BUFFER) { - - // Give up now to avoid using too much memory. - LOG.error( - "Internal error: Exceeded maximum rename pending file size of {} bytes.", - MAX_RENAME_PENDING_FILE_SIZE); - - // return some bad JSON with an error message to make it human readable - return "exceeded maximum rename pending file size"; - } - } - builder.append("\n ]"); - String fileList = builder.toString(); - // Make file contents as a string. Again, quote file names, escaping // characters as appropriate. String contents = "{\n" + " FormatVersion: \"1.0\",\n" + " OperationUTCTime: \"" + time + "\",\n" + " OldFolderName: " + quote(srcPath.toUri().getPath()) + ",\n" - + " NewFolderName: " + quote(dstPath.toUri().getPath()) + ",\n" - + " FileList: " + fileList + "\n" + + " NewFolderName: " + quote(dstPath.toUri().getPath()) + "\n" + "}\n"; return contents; @@ -430,16 +365,11 @@ private Path getRenamePendingFilePath() { private static class RenamePendingFileInfo { public Path destination; - public List srcList; - /** - * Relative paths from the destination path. - */ - public List destinationSuffix; + public Path src; } public static interface RedoRenameInvocation { - void redo(Path destination, List sourcePaths, - final List destinationSuffix) throws + void redo(Path destination, Path src) throws AzureBlobFileSystemException; } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameNonAtomicUtils.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameNonAtomicUtils.java index fba0033d14abb..786c22a9c4cfe 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameNonAtomicUtils.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameNonAtomicUtils.java @@ -19,7 +19,6 @@ package org.apache.hadoop.fs.azurebfs.services; import java.io.IOException; -import java.util.List; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; @@ -35,8 +34,7 @@ public RenameNonAtomicUtils(final AzureBlobFileSystem azureBlobFileSystem, } @Override - public void preRename(final List blobPropertyList, - final Boolean isCreateOperationOnBlobEndpoint) + public void preRename(final Boolean isCreateOperationOnBlobEndpoint) throws IOException { } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemBlobConfig.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemBlobConfig.java index 5662b46aad4c4..ef1da136a970a 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemBlobConfig.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemBlobConfig.java @@ -609,7 +609,8 @@ private void countRenameOverDfsBlobAndWasb(final NativeAzureFileSystem nativeAzu Mockito.doAnswer(answer -> { isRenameOverBlobEndpoint[0] = true; return answer.callRealMethod(); - }).when(client).copyBlob(Mockito.any(Path.class), Mockito.any(Path.class), Mockito.any(TracingContext.class)); + }).when(client).copyBlob(Mockito.any(Path.class), Mockito.any(Path.class), + Mockito.nullable(String.class), Mockito.any(TracingContext.class)); } private void countDeleteOverAbfsAndWasb(final NativeAzureFileSystem nativeAzureFileSystem, diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java index b31278079233c..ef7b25883401c 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java @@ -40,13 +40,14 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException; import org.apache.hadoop.fs.azurebfs.services.OperativeEndpoint; import org.apache.hadoop.fs.azurebfs.services.PrefixMode; import org.apache.hadoop.test.LambdaTestUtils; -import org.checkerframework.common.value.qual.StaticallyExecutable; +import org.junit.Assert; import org.junit.Assume; import org.junit.Test; @@ -76,6 +77,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TRUE; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_BLOB_MKDIR_OVERWRITE; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_LEASE_CREATE_NON_RECURSIVE; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_MKDIRS_FALLBACK_TO_DFS; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_DNS_PREFIX; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.WASB_DNS_PREFIX; @@ -941,6 +943,113 @@ public void testCreateNonRecursive2() throws Exception { assertIsFile(fs, testFile); } + @Test + public void testCreateNonRecursiveForAtomicDirectoryFile() throws Exception { + AzureBlobFileSystem fileSystem = getFileSystem(); + Assume.assumeTrue( + fileSystem.getAbfsStore().getAbfsConfiguration().getPrefixMode() + == PrefixMode.BLOB); + fileSystem.setWorkingDirectory(new Path("/")); + fileSystem.mkdirs(new Path("/hbase/dir")); + fileSystem.createFile(new Path("/hbase/dir/file")) + .overwrite(false) + .replication((short) 1) + .bufferSize(1024) + .blockSize(1024) + .build(); + Assert.assertTrue(fileSystem.exists(new Path("/hbase/dir/file"))); + } + + @Test + public void testActiveCreateNonRecursiveDenyParallelReadOnAtomicDir() throws Exception { + Assume.assumeTrue(getPrefixMode(getFileSystem()) == PrefixMode.BLOB); + Configuration configuration = Mockito.spy(getRawConfiguration()); + configuration.set(FS_AZURE_LEASE_CREATE_NON_RECURSIVE, "true"); + AzureBlobFileSystem fileSystem = (AzureBlobFileSystem) FileSystem.newInstance(configuration); + AbfsClient client = Mockito.spy(fileSystem.getAbfsClient()); + fileSystem.getAbfsStore().setClient(client); + fileSystem.setWorkingDirectory(new Path("/")); + fileSystem.mkdirs(new Path("/hbase/dir")); + fileSystem.create(new Path("/hbase/dir/file")); + AtomicBoolean createCalled = new AtomicBoolean(false); + AtomicBoolean parallelRenameDone = new AtomicBoolean(false); + AtomicBoolean exceptionCaught = new AtomicBoolean(false); + + Mockito.doAnswer(answer -> { + AbfsRestOperation op = (AbfsRestOperation) answer.callRealMethod(); + createCalled.set(true); + while(!parallelRenameDone.get()); + return op; + }).when(client).createPathBlob(Mockito.anyString(), Mockito.anyBoolean(), + Mockito.anyBoolean(), Mockito.nullable(HashMap.class), Mockito.nullable(String.class), Mockito.nullable(TracingContext.class)); + + new Thread(() -> { + try { + while(!createCalled.get()); + getFileSystem().rename(new Path("/hbase/dir/"), new Path("/hbase/dir2")); + } catch (Exception e) { + exceptionCaught.set(true); + } finally { + parallelRenameDone.set(true); + } + }).start(); + + fileSystem.createFile(new Path("/hbase/dir/file1")) + .overwrite(false) + .replication((short) 1) + .bufferSize(1024) + .blockSize(1024) + .build(); + + Assert.assertTrue(exceptionCaught.get()); + Assert.assertTrue(fileSystem.exists(new Path("/hbase/dir/file"))); + } + + @Test + public void testActiveCreateNonRecursiveNotDenyParallelRenameOnAtomicDirIfLeaseConfigDisabled() throws Exception { + Assume.assumeTrue(getPrefixMode(getFileSystem()) == PrefixMode.BLOB); + Configuration configuration = Mockito.spy(getRawConfiguration()); + AzureBlobFileSystem fileSystem = (AzureBlobFileSystem) FileSystem.newInstance(configuration); + AbfsClient client = Mockito.spy(fileSystem.getAbfsClient()); + fileSystem.getAbfsStore().setClient(client); + fileSystem.setWorkingDirectory(new Path("/")); + fileSystem.mkdirs(new Path("/hbase/dir")); + fileSystem.create(new Path("/hbase/dir/file")); + AtomicBoolean createCalled = new AtomicBoolean(false); + AtomicBoolean parallelRenameDone = new AtomicBoolean(false); + AtomicBoolean exceptionCaught = new AtomicBoolean(false); + + Mockito.doAnswer(answer -> { + AbfsRestOperation op = (AbfsRestOperation) answer.callRealMethod(); + createCalled.set(true); + while(!parallelRenameDone.get()); + return op; + }).when(client).createPathBlob(Mockito.anyString(), Mockito.anyBoolean(), + Mockito.anyBoolean(), Mockito.nullable(HashMap.class), Mockito.nullable(String.class), Mockito.nullable(TracingContext.class)); + + new Thread(() -> { + try { + while(!createCalled.get()); + getFileSystem().rename(new Path("/hbase/dir/"), new Path("/hbase/dir2")); + } catch (Exception e) { + exceptionCaught.set(true); + } finally { + parallelRenameDone.set(true); + } + }).start(); + + fileSystem.createFile(new Path("/hbase/dir/file1")) + .overwrite(false) + .replication((short) 1) + .bufferSize(1024) + .blockSize(1024) + .build(); + + Assert.assertFalse(exceptionCaught.get()); + Assert.assertFalse(fileSystem.exists(new Path("/hbase/dir/file"))); + Assert.assertTrue(fileSystem.exists(new Path("/hbase/dir2/file"))); + } + /** * Attempts to use to the ABFS stream after it is closed. */ diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelegationSASForBlobEndpoint.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelegationSASForBlobEndpoint.java index 005bd216abce3..60165a2cef418 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelegationSASForBlobEndpoint.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelegationSASForBlobEndpoint.java @@ -141,7 +141,7 @@ public void testCopyBlobTakeTime() throws Exception { Mockito.doReturn(httpOp).when(op).getResult(); return op; }).when(spiedClient).copyBlob(Mockito.any(Path.class), Mockito.any(Path.class), - Mockito.any(TracingContext.class)); + Mockito.nullable(String.class), Mockito.any(TracingContext.class)); fileSystem.create(new Path("/test1/file")); fileSystem.rename(new Path("/test1/file"), new Path("/test1/file2")); Assert.assertTrue(fileSystem.exists(new Path("/test1/file2"))); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemExplictImplicitRename.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemExplictImplicitRename.java index 7b279bf19ef27..476e964a55ea8 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemExplictImplicitRename.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemExplictImplicitRename.java @@ -870,7 +870,7 @@ private void deleteBlobPath(final AzureBlobFileSystem fs, final Path srcParent) throws AzureBlobFileSystemException { try { fs.getAbfsClient() - .deleteBlobPath(srcParent, Mockito.mock(TracingContext.class)); + .deleteBlobPath(srcParent, null, Mockito.mock(TracingContext.class)); } catch (AbfsRestOperationException ex) { if(ex.getStatusCode() != HttpURLConnection.HTTP_NOT_FOUND) { throw ex; diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemLease.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemLease.java index 2ebdc7492a18d..0e7457e852e5b 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemLease.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemLease.java @@ -20,6 +20,8 @@ import java.io.IOException; import java.util.concurrent.RejectedExecutionException; +import org.apache.hadoop.fs.azurebfs.services.AbfsBlobLease; +import org.apache.hadoop.fs.azurebfs.services.AbfsDfsLease; import org.apache.hadoop.fs.azurebfs.services.OperativeEndpoint; import org.apache.hadoop.fs.azurebfs.services.PrefixMode; import org.junit.Assert; @@ -347,8 +349,14 @@ public void testAcquireRetry() throws Exception { FSOperationType.TEST_OP, true, 0); tracingContext.setListener(listener); - AbfsLease lease = new AbfsLease(fs.getAbfsClient(), - testFilePath.toUri().getPath(), tracingContext); + AbfsLease lease; + if (getPrefixMode(fs) == PrefixMode.BLOB) { + lease = new AbfsBlobLease(fs.getAbfsClient(), + testFilePath.toUri().getPath(), null, tracingContext); + } else { + lease = new AbfsDfsLease(fs.getAbfsClient(), + testFilePath.toUri().getPath(), null, tracingContext); + } Assert.assertNotNull("Did not successfully lease file", lease.getLeaseID()); listener.setOperation(FSOperationType.RELEASE_LEASE); lease.free(); @@ -362,7 +370,20 @@ public void testAcquireRetry() throws Exception { .doCallRealMethod().when(mockClient) .acquireLease(anyString(), anyInt(), any(TracingContext.class)); - lease = new AbfsLease(mockClient, testFilePath.toUri().getPath(), 5, 1, tracingContext); + doThrow(new AbfsLease.LeaseException("failed to acquire 1")) + .doThrow(new AbfsLease.LeaseException("failed to acquire 2")) + .doCallRealMethod().when(mockClient) + .acquireBlobLease(anyString(), anyInt(), any(TracingContext.class)); + + if (getPrefixMode(fs) == PrefixMode.BLOB) { + lease = new AbfsBlobLease(mockClient, testFilePath.toUri().getPath(), 5, + 1, null, + tracingContext); + } else { + lease = new AbfsDfsLease(mockClient, testFilePath.toUri().getPath(), 5, 1, + null, + tracingContext); + } Assert.assertNotNull("Acquire lease should have retried", lease.getLeaseID()); lease.free(); Assert.assertEquals("Unexpected acquire retry count", 2, lease.getAcquireRetryCount()); @@ -370,9 +391,18 @@ public void testAcquireRetry() throws Exception { doThrow(new AbfsLease.LeaseException("failed to acquire")).when(mockClient) .acquireLease(anyString(), anyInt(), any(TracingContext.class)); + doThrow(new AbfsLease.LeaseException("failed to acquire")).when(mockClient) + .acquireBlobLease(anyString(), anyInt(), any(TracingContext.class)); + LambdaTestUtils.intercept(AzureBlobFileSystemException.class, () -> { - new AbfsLease(mockClient, testFilePath.toUri().getPath(), 5, 1, - tracingContext); + if (getPrefixMode(fs) == PrefixMode.BLOB) { + new AbfsBlobLease(mockClient, testFilePath.toUri().getPath(), 5, 1, + null, + tracingContext); + } else { + new AbfsDfsLease(mockClient, testFilePath.toUri().getPath(), 5, 1, null, + tracingContext); + } }); } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java index 47388775dc2ad..6a7c1001c4642 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java @@ -23,42 +23,54 @@ import java.net.HttpURLConnection; import java.net.SocketException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; + +import org.assertj.core.api.Assertions; import org.junit.Assert; import org.junit.Assume; +import org.junit.Ignore; import org.junit.Test; +import org.mockito.Mock; import org.mockito.Mockito; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; +import org.apache.hadoop.fs.azurebfs.services.AbfsBlobLease; import org.apache.hadoop.fs.azurebfs.services.AbfsClient; import org.apache.hadoop.fs.azurebfs.services.AbfsClientTestUtil; import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOpTestUtil; import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation; +import org.apache.hadoop.fs.azurebfs.services.AbfsLease; +import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperationTestUtil; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import org.apache.hadoop.fs.azurebfs.services.PrefixMode; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_LEASE_CREATE_NON_RECURSIVE; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_REDIRECT_RENAME; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TRUE; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_INGRESS_FALLBACK_TO_DFS; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_LEASE_ID; import static org.apache.hadoop.fs.azurebfs.services.RenameAtomicityUtils.SUFFIX; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COPY_STATUS_ABORTED; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COPY_STATUS_FAILED; @@ -244,7 +256,8 @@ public void testRenameBlobInSameDirectoryWithNoMarker() throws Exception { AzureBlobFileSystem fs = getFileSystem(); assumeNonHnsAccountBlobEndpoint(fs); fs.create(new Path("/srcDir/dir/file")); - fs.getAbfsStore().getClient().deleteBlobPath(new Path("/srcDir/dir"), Mockito.mock(TracingContext.class)); + fs.getAbfsStore().getClient().deleteBlobPath(new Path("/srcDir/dir"), null, + Mockito.mock(TracingContext.class)); Assert.assertTrue(fs.rename(new Path("/srcDir/dir"), new Path("/srcDir"))); } @@ -376,6 +389,15 @@ public void testHBaseHandlingForFailedRename() throws Exception { final AzureBlobFileSystemStore spiedAbfsStore = Mockito.spy( spiedFs.getAbfsStore()); Mockito.doReturn(spiedAbfsStore).when(spiedFs).getAbfsStore(); + AbfsClient spiedClient = Mockito.spy(spiedAbfsStore.getClient()); + spiedAbfsStore.setClient(spiedClient); + Map pathLeaseIdMap = new HashMap<>(); + Mockito.doAnswer(answer -> { + AbfsRestOperation op = (AbfsRestOperation) answer.callRealMethod(); + String leaseId = op.getResult().getResponseHeader(X_MS_LEASE_ID); + pathLeaseIdMap.put(answer.getArgument(0), leaseId); + return op; + }).when(spiedClient).acquireBlobLease(Mockito.anyString(), Mockito.anyInt(), Mockito.any(TracingContext.class)); final Integer[] correctDeletePathCount = new Integer[1]; correctDeletePathCount[0] = 0; @@ -383,18 +405,19 @@ public void testHBaseHandlingForFailedRename() throws Exception { Mockito.doAnswer(answer -> { final Path srcPath = answer.getArgument(0); final Path dstPath = answer.getArgument(1); - final TracingContext tracingContext = answer.getArgument(2); + final String leaseId = answer.getArgument(2); + final TracingContext tracingContext = answer.getArgument(3); if (("/" + failedCopyPath).equalsIgnoreCase(srcPath.toUri().getPath())) { throw new AbfsRestOperationException(HttpURLConnection.HTTP_UNAVAILABLE, AzureServiceErrorCode.INGRESS_OVER_ACCOUNT_LIMIT.getErrorCode(), "Ingress is over the account limit.", new Exception()); } - fs.getAbfsStore().copyBlob(srcPath, dstPath, tracingContext); + fs.getAbfsStore().copyBlob(srcPath, dstPath, leaseId, tracingContext); return null; }) .when(spiedAbfsStore) .copyBlob(Mockito.any(Path.class), Mockito.any(Path.class), - Mockito.any(TracingContext.class)); + Mockito.nullable(String.class), Mockito.any(TracingContext.class)); try { spiedFs.rename(new Path("hbase/test1/test2/test3"), new Path("hbase/test4")); @@ -443,20 +466,32 @@ public void testHBaseHandlingForFailedRename() throws Exception { spiedFsForListPath.getAbfsStore().setClient(spiedClientForListPath); Mockito.doAnswer(answer -> { Path path = answer.getArgument(0); - TracingContext tracingContext = answer.getArgument(1); + String leaseId = answer.getArgument(1); + TracingContext tracingContext = answer.getArgument(2); Assert.assertTrue( - ("/" + failedCopyPath).equalsIgnoreCase(path.toUri().getPath())); + ("/" + failedCopyPath).equalsIgnoreCase(path.toUri().getPath()) + || "/hbase/test1/test2/test3".equalsIgnoreCase( + path.toUri().getPath())); deletedCount.incrementAndGet(); - client.deleteBlobPath(path, tracingContext); + client.deleteBlobPath(path, leaseId, tracingContext); return null; }) .when(spiedClientForListPath) .deleteBlobPath(Mockito.any(Path.class), - Mockito.any(TracingContext.class)); + Mockito.nullable(String.class), Mockito.any(TracingContext.class)); + for(Map.Entry entry : pathLeaseIdMap.entrySet()) { + try { + fs.getAbfsClient() + .releaseBlobLease(entry.getKey(), entry.getValue(), + Mockito.mock(TracingContext.class)); + } catch (AbfsRestOperationException ex) { + + } + } spiedFsForListPath.listStatus(new Path("hbase/test1/test2")); Assert.assertTrue(openRequiredFile[0] == 1); - Assert.assertTrue(deletedCount.get() == 2); + Assert.assertTrue(deletedCount.get() == 3); Assert.assertFalse(spiedFsForListPath.exists(new Path(failedCopyPath))); Assert.assertTrue(spiedFsForListPath.exists(new Path( failedCopyPath.replace("test1/test2/test3/", "test4/test3/")))); @@ -485,6 +520,15 @@ public void testHBaseHandlingForFailedRenameForNestedSourceThroughListFile() final AzureBlobFileSystemStore spiedAbfsStore = Mockito.spy( spiedFs.getAbfsStore()); Mockito.doReturn(spiedAbfsStore).when(spiedFs).getAbfsStore(); + AbfsClient spiedClient = Mockito.spy(spiedAbfsStore.getClient()); + spiedAbfsStore.setClient(spiedClient); + Map pathLeaseIdMap = new HashMap<>(); + Mockito.doAnswer(answer -> { + AbfsRestOperation op = (AbfsRestOperation) answer.callRealMethod(); + String leaseId = op.getResult().getResponseHeader(X_MS_LEASE_ID); + pathLeaseIdMap.put(answer.getArgument(0), leaseId); + return op; + }).when(spiedClient).acquireBlobLease(Mockito.anyString(), Mockito.anyInt(), Mockito.any(TracingContext.class)); final Integer[] correctDeletePathCount = new Integer[1]; correctDeletePathCount[0] = 0; @@ -492,18 +536,19 @@ public void testHBaseHandlingForFailedRenameForNestedSourceThroughListFile() Mockito.doAnswer(answer -> { final Path srcPath = answer.getArgument(0); final Path dstPath = answer.getArgument(1); - final TracingContext tracingContext = answer.getArgument(2); + final String leaseId = answer.getArgument(2); + final TracingContext tracingContext = answer.getArgument(3); if (("/" + failedCopyPath).equalsIgnoreCase(srcPath.toUri().getPath())) { throw new AbfsRestOperationException(HttpURLConnection.HTTP_UNAVAILABLE, AzureServiceErrorCode.INGRESS_OVER_ACCOUNT_LIMIT.getErrorCode(), "Ingress is over the account limit.", new Exception()); } - fs.getAbfsStore().copyBlob(srcPath, dstPath, tracingContext); + fs.getAbfsStore().copyBlob(srcPath, dstPath, leaseId, tracingContext); return null; }) .when(spiedAbfsStore) .copyBlob(Mockito.any(Path.class), Mockito.any(Path.class), - Mockito.any(TracingContext.class)); + Mockito.nullable(String.class), Mockito.any(TracingContext.class)); try { spiedFs.rename(new Path("hbase/test1/test2"), new Path("hbase/test4")); @@ -551,25 +596,37 @@ public void testHBaseHandlingForFailedRenameForNestedSourceThroughListFile() spiedFsForListPath.getAbfsStore().setClient(spiedClientForListPath); Mockito.doAnswer(answer -> { Path path = answer.getArgument(0); - TracingContext tracingContext = answer.getArgument(1); + String leaseId = answer.getArgument(1); + TracingContext tracingContext = answer.getArgument(2); Assert.assertTrue( - ("/" + failedCopyPath).equalsIgnoreCase(path.toUri().getPath())); + ("/" + failedCopyPath).equalsIgnoreCase(path.toUri().getPath()) + || "/hbase/test1/test2".equalsIgnoreCase(path.toUri().getPath())); deletedCount.incrementAndGet(); - client.deleteBlobPath(path, tracingContext); + client.deleteBlobPath(path, leaseId, tracingContext); return null; }) .when(spiedClientForListPath) .deleteBlobPath(Mockito.any(Path.class), - Mockito.any(TracingContext.class)); + Mockito.nullable(String.class), Mockito.any(TracingContext.class)); /* * listFile on /hbase/test1 would give no result because * /hbase/test1/test2 would be totally moved to /hbase/test4. + * */ + for(Map.Entry entry : pathLeaseIdMap.entrySet()) { + try { + fs.getAbfsClient() + .releaseBlobLease(entry.getKey(), entry.getValue(), + Mockito.mock(TracingContext.class)); + } catch (AbfsRestOperationException ex) { + + } + } final FileStatus[] listFileResult = spiedFsForListPath.listStatus( new Path("hbase/test1")); Assert.assertTrue(openRequiredFile[0] == 1); - Assert.assertTrue(deletedCount.get() == 2); + Assert.assertTrue(deletedCount.get() == 3); Assert.assertFalse(spiedFsForListPath.exists(new Path(failedCopyPath))); Assert.assertTrue(spiedFsForListPath.exists(new Path( failedCopyPath.replace("test1/test2/test3/", "test4/test2/test3/")))); @@ -600,6 +657,15 @@ public void testHBaseHandlingForFailedRenameForNestedSourceThroughGetPathStatus( final AzureBlobFileSystemStore spiedAbfsStore = Mockito.spy( spiedFs.getAbfsStore()); Mockito.doReturn(spiedAbfsStore).when(spiedFs).getAbfsStore(); + AbfsClient spiedClient = Mockito.spy(spiedAbfsStore.getClient()); + spiedAbfsStore.setClient(spiedClient); + Map pathLeaseIdMap = new HashMap<>(); + Mockito.doAnswer(answer -> { + AbfsRestOperation op = (AbfsRestOperation) answer.callRealMethod(); + String leaseId = op.getResult().getResponseHeader(X_MS_LEASE_ID); + pathLeaseIdMap.put(answer.getArgument(0), leaseId); + return op; + }).when(spiedClient).acquireBlobLease(Mockito.anyString(), Mockito.anyInt(), Mockito.any(TracingContext.class)); final Integer[] correctDeletePathCount = new Integer[1]; correctDeletePathCount[0] = 0; @@ -607,18 +673,19 @@ public void testHBaseHandlingForFailedRenameForNestedSourceThroughGetPathStatus( Mockito.doAnswer(answer -> { final Path srcPath = answer.getArgument(0); final Path dstPath = answer.getArgument(1); - final TracingContext tracingContext = answer.getArgument(2); + final String leaseId = answer.getArgument(2); + final TracingContext tracingContext = answer.getArgument(3); if (("/" + failedCopyPath).equalsIgnoreCase(srcPath.toUri().getPath())) { throw new AbfsRestOperationException(HttpURLConnection.HTTP_UNAVAILABLE, AzureServiceErrorCode.INGRESS_OVER_ACCOUNT_LIMIT.getErrorCode(), "Ingress is over the account limit.", new Exception()); } - fs.getAbfsStore().copyBlob(srcPath, dstPath, tracingContext); + fs.getAbfsStore().copyBlob(srcPath, dstPath, leaseId, tracingContext); return null; }) .when(spiedAbfsStore) .copyBlob(Mockito.any(Path.class), Mockito.any(Path.class), - Mockito.any(TracingContext.class)); + Mockito.nullable(String.class), Mockito.any(TracingContext.class)); try { spiedFs.rename(new Path("hbase/test1/test2"), new Path("hbase/test4")); @@ -666,16 +733,17 @@ public void testHBaseHandlingForFailedRenameForNestedSourceThroughGetPathStatus( spiedFsForListPath.getAbfsStore().setClient(spiedClientForListPath); Mockito.doAnswer(answer -> { Path path = answer.getArgument(0); - TracingContext tracingContext = answer.getArgument(1); + String leaseId = answer.getArgument(1); + TracingContext tracingContext = answer.getArgument(2); Assert.assertTrue( - ("/" + failedCopyPath).equalsIgnoreCase(path.toUri().getPath())); + ("/" + failedCopyPath).equalsIgnoreCase(path.toUri().getPath()) || "/hbase/test1/test2".equalsIgnoreCase(path.toUri().getPath())); deletedCount.incrementAndGet(); - client.deleteBlobPath(path, tracingContext); + client.deleteBlobPath(path, leaseId, tracingContext); return null; }) .when(spiedClientForListPath) .deleteBlobPath(Mockito.any(Path.class), - Mockito.any(TracingContext.class)); + Mockito.nullable(String.class), Mockito.any(TracingContext.class)); /* * getFileStatus on /hbase/test2 should give NOT_FOUND exception, since, @@ -683,6 +751,13 @@ public void testHBaseHandlingForFailedRenameForNestedSourceThroughGetPathStatus( * on the directory, the remaining rename will be made. And as the directory is renamed, * the method should give NOT_FOUND exception. */ + for(Map.Entry entry : pathLeaseIdMap.entrySet()) { + try { + fs.getAbfsClient().releaseBlobLease(entry.getKey(), entry.getValue(), Mockito.mock(TracingContext.class)); + } catch (AbfsRestOperationException ex) { + + } + } FileStatus fileStatus = null; Boolean notFoundExceptionReceived = false; try { @@ -695,7 +770,7 @@ public void testHBaseHandlingForFailedRenameForNestedSourceThroughGetPathStatus( Assert.assertTrue(notFoundExceptionReceived); Assert.assertNull(fileStatus); Assert.assertTrue(openRequiredFile[0] == 1); - Assert.assertTrue(deletedCount.get() == 2); + Assert.assertTrue(deletedCount.get() == 3); Assert.assertFalse(spiedFsForListPath.exists(new Path(failedCopyPath))); Assert.assertTrue(spiedFsForListPath.exists(new Path( failedCopyPath.replace("test1/test2/test3/", "test4/test2/test3/")))); @@ -915,6 +990,7 @@ public void testEmptyDirRenameResolveFromListStatus() throws Exception { String srcDir = "/hbase/test1/test2/test3"; fs.setWorkingDirectory(new Path("/")); fs.mkdirs(new Path(srcDir)); + fs.create(new Path(srcDir, "file1")); fs.mkdirs(new Path("hbase/test4")); AzureBlobFileSystem spiedFs = Mockito.spy(fs); @@ -922,33 +998,41 @@ public void testEmptyDirRenameResolveFromListStatus() throws Exception { AzureBlobFileSystemStore spiedAbfsStore = Mockito.spy( spiedFs.getAbfsStore()); Mockito.doReturn(spiedAbfsStore).when(spiedFs).getAbfsStore(); + AbfsClient spiedClient = Mockito.spy(spiedAbfsStore.getClient()); + spiedAbfsStore.setClient(spiedClient); + Map pathLeaseIdMap = new HashMap<>(); Mockito.doAnswer(answer -> { - final Path srcPath = answer.getArgument(0); - final Path dstPath = answer.getArgument(1); - final TracingContext tracingContext = answer.getArgument(2); - - if (srcDir.equalsIgnoreCase(srcPath.toUri().getPath())) { - throw new AbfsRestOperationException(HttpURLConnection.HTTP_UNAVAILABLE, - AzureServiceErrorCode.INGRESS_OVER_ACCOUNT_LIMIT.getErrorCode(), - "Ingress is over the account limit.", new Exception()); - } - fs.getAbfsStore().copyBlob(srcPath, dstPath, tracingContext); - return null; + AbfsRestOperation op = (AbfsRestOperation) answer.callRealMethod(); + String leaseId = op.getResult().getResponseHeader(X_MS_LEASE_ID); + pathLeaseIdMap.put(answer.getArgument(0), leaseId); + return op; + }).when(spiedClient).acquireBlobLease(Mockito.anyString(), Mockito.anyInt(), Mockito.any(TracingContext.class)); + Mockito.doAnswer(answer -> { + throw new AbfsRestOperationException(HttpURLConnection.HTTP_UNAVAILABLE, + AzureServiceErrorCode.INGRESS_OVER_ACCOUNT_LIMIT.getErrorCode(), + "Ingress is over the account limit.", new Exception()); }) .when(spiedAbfsStore) .copyBlob(Mockito.any(Path.class), Mockito.any(Path.class), - Mockito.any(TracingContext.class)); + Mockito.nullable(String.class), Mockito.any(TracingContext.class)); try { spiedFs.rename(new Path(srcDir), new Path("hbase/test4")); } catch (Exception ex) { - } Assert.assertFalse(spiedFs.exists( new Path(srcDir.replace("test1/test2/test3", "test4/test3/")))); //call listPath API, it will recover the rename atomicity. + for(Map.Entry entry : pathLeaseIdMap.entrySet()) { + try { + fs.getAbfsClient() + .releaseBlobLease(entry.getKey(), entry.getValue(), + Mockito.mock(TracingContext.class)); + } catch (Exception e) {} + } + final AzureBlobFileSystem spiedFsForListPath = Mockito.spy(fs); final int[] openRequiredFile = new int[1]; openRequiredFile[0] = 0; @@ -984,15 +1068,16 @@ public void testEmptyDirRenameResolveFromListStatus() throws Exception { spiedFsForListPath.getAbfsStore().setClient(spiedClientForListPath); Mockito.doAnswer(answer -> { Path path = answer.getArgument(0); - TracingContext tracingContext = answer.getArgument(1); - Assert.assertTrue((srcDir).equalsIgnoreCase(path.toUri().getPath())); + String leaseId = answer.getArgument(1); + TracingContext tracingContext = answer.getArgument(2); + Assert.assertTrue(((srcDir).equalsIgnoreCase(path.toUri().getPath()) || (srcDir+"/file1").equalsIgnoreCase(path.toUri().getPath()))); deletedCount.incrementAndGet(); - client.deleteBlobPath(path, tracingContext); + client.deleteBlobPath(path, leaseId, tracingContext); return null; }) .when(spiedClientForListPath) .deleteBlobPath(Mockito.any(Path.class), - Mockito.any(TracingContext.class)); + Mockito.nullable(String.class), Mockito.any(TracingContext.class)); /* * getFileStatus on /hbase/test2 should give NOT_FOUND exception, since, @@ -1011,7 +1096,7 @@ public void testEmptyDirRenameResolveFromListStatus() throws Exception { Assert.assertTrue(notFoundExceptionReceived); Assert.assertNull(fileStatus); Assert.assertTrue(openRequiredFile[0] == 1); - Assert.assertTrue(deletedCount.get() == 2); + Assert.assertTrue(deletedCount.get() == 3); Assert.assertFalse(spiedFsForListPath.exists(new Path(srcDir))); Assert.assertTrue(spiedFsForListPath.getFileStatus( new Path(srcDir.replace("test1/test2/test3", "test4/test3/"))) @@ -1306,7 +1391,7 @@ public void testRenameDirWhenMarkerBlobIsAbsent() throws Exception { fs.getAbfsClient() .deleteBlobPath(new Path("/test1/test2"), - Mockito.mock(TracingContext.class)); + null, Mockito.mock(TracingContext.class)); fs.mkdirs(new Path("/test4/test5")); fs.rename(new Path("/test4"), new Path("/test1/test2")); @@ -1318,7 +1403,7 @@ public void testRenameDirWhenMarkerBlobIsAbsent() throws Exception { fs.getAbfsClient() .deleteBlobPath(new Path("/test1/test2/test4/test5/test6"), - Mockito.mock(TracingContext.class)); + null, Mockito.mock(TracingContext.class)); fs.mkdirs(new Path("/test7")); fs.create(new Path("/test7/file")); fs.rename(new Path("/test7"), new Path("/test1/test2/test4/test5/test6")); @@ -1333,7 +1418,7 @@ public void testBlobRenameSrcDirHasNoMarker() throws Exception { fs.create(new Path("/test1/test2/file1")); fs.getAbfsStore() .getClient() - .deleteBlobPath(new Path("/test1"), Mockito.mock(TracingContext.class)); + .deleteBlobPath(new Path("/test1"), null, Mockito.mock(TracingContext.class)); intercept(AbfsRestOperationException.class, () -> { fs.getAbfsStore().getBlobProperty(new Path("/test1"), Mockito.mock(TracingContext.class)); @@ -1363,7 +1448,7 @@ public void testCopyBlobTakeTime() throws Exception { Mockito.doReturn(httpOp).when(op).getResult(); return op; }).when(spiedClient).copyBlob(Mockito.any(Path.class), Mockito.any(Path.class), - Mockito.any(TracingContext.class)); + Mockito.nullable(String.class), Mockito.any(TracingContext.class)); fileSystem.create(new Path("/test1/file")); fileSystem.rename(new Path("/test1/file"), new Path("/test1/file2")); Assert.assertTrue(fileSystem.exists(new Path("/test1/file2"))); @@ -1390,7 +1475,7 @@ public void testCopyBlobTakeTimeAndEventuallyFail() throws Exception { Mockito.doReturn(httpOp).when(op).getResult(); return op; }).when(spiedClient).copyBlob(Mockito.any(Path.class), Mockito.any(Path.class), - Mockito.any(TracingContext.class)); + Mockito.nullable(String.class), Mockito.any(TracingContext.class)); Mockito.doAnswer(answer -> { AbfsRestOperation op = Mockito.spy((AbfsRestOperation) answer.callRealMethod()); AbfsHttpOperation httpOp = Mockito.spy(op.getResult()); @@ -1434,7 +1519,7 @@ public void testCopyBlobTakeTimeAndEventuallyAborted() throws Exception { Mockito.doReturn(httpOp).when(op).getResult(); return op; }).when(spiedClient).copyBlob(Mockito.any(Path.class), Mockito.any(Path.class), - Mockito.any(TracingContext.class)); + Mockito.nullable(String.class), Mockito.any(TracingContext.class)); Mockito.doAnswer(answer -> { AbfsRestOperation op = Mockito.spy((AbfsRestOperation) answer.callRealMethod()); AbfsHttpOperation httpOp = Mockito.spy(op.getResult()); @@ -1481,14 +1566,12 @@ public void testCopyBlobTakeTimeAndBlobIsDeleted() throws Exception { Mockito.doReturn(httpOp).when(op).getResult(); return op; }).when(spiedClient).copyBlob(Mockito.any(Path.class), Mockito.any(Path.class), - Mockito.any(TracingContext.class)); + Mockito.nullable(String.class), Mockito.any(TracingContext.class)); fileSystem.create(new Path(srcFile)); - intercept(FileNotFoundException.class, () -> { - fileSystem.rename(new Path(srcFile), new Path(dstFile)); - }); + Assert.assertFalse(fileSystem.rename(new Path(srcFile), new Path(dstFile))); Assert.assertFalse(fileSystem.exists(new Path(dstFile))); } @@ -1517,7 +1600,7 @@ private void parallelCopyRunnable(final AzureBlobFileSystem fs, final AtomicInteger threadsCompleted) { try { fs.getAbfsClient().copyBlob(new Path("/src"), - new Path("/dst"), Mockito.mock(TracingContext.class)); + new Path("/dst"), null, Mockito.mock(TracingContext.class)); } catch (AbfsRestOperationException ex) { if (ex.getStatusCode() == HttpURLConnection.HTTP_CONFLICT) { dstBlobAlreadyThereExceptionReceived[0] = true; @@ -1535,12 +1618,12 @@ public void testCopyAfterSourceHasBeenDeleted() throws Exception { fs.create(new Path("/src")); fs.getAbfsStore() .getClient() - .deleteBlobPath(new Path("/src"), Mockito.mock(TracingContext.class)); + .deleteBlobPath(new Path("/src"), null, Mockito.mock(TracingContext.class)); Boolean srcBlobNotFoundExReceived = false; try { fs.getAbfsStore() .copyBlob(new Path("/src"), new Path("/dst"), - Mockito.mock(TracingContext.class)); + null, Mockito.mock(TracingContext.class)); } catch (AbfsRestOperationException ex) { if (ex.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) { srcBlobNotFoundExReceived = true; @@ -1548,4 +1631,311 @@ public void testCopyAfterSourceHasBeenDeleted() throws Exception { } Assert.assertTrue(srcBlobNotFoundExReceived); } + + @Test + public void testParallelRenameForAtomicDirShouldFail() throws Exception { + AzureBlobFileSystem fs = Mockito.spy(getFileSystem()); + assumeNonHnsAccountBlobEndpoint(fs); + fs.setWorkingDirectory(new Path("/")); + fs.mkdirs(new Path("/hbase/dir1")); + fs.create(new Path("/hbase/dir1/file1")); + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + Mockito.doReturn(store).when(fs).getAbfsStore(); + AbfsClient client = Mockito.spy(fs.getAbfsClient()); + store.setClient(client); + AtomicBoolean leaseAcquired = new AtomicBoolean(false); + AtomicBoolean exceptionOnParallelRename = new AtomicBoolean(false); + AtomicBoolean parallelThreadDone = new AtomicBoolean(false); + Mockito.doAnswer(answer -> { + AbfsRestOperation op = (AbfsRestOperation) answer.callRealMethod(); + leaseAcquired.set(true); + while (!parallelThreadDone.get()) ; + return op; + }) + .when(client) + .acquireBlobLease(Mockito.anyString(), Mockito.anyInt(), + Mockito.any(TracingContext.class)); + + new Thread(() -> { + while (!leaseAcquired.get()) ; + try { + fs.rename(new Path("/hbase/dir1/file1"), new Path("/hbase/dir2/")); + } catch (Exception e) { + if (e.getCause() instanceof AbfsRestOperationException + && ((AbfsRestOperationException) e.getCause()).getStatusCode() + == HttpURLConnection.HTTP_CONFLICT) { + exceptionOnParallelRename.set(true); + } + } finally { + parallelThreadDone.set(true); + } + }).start(); + fs.rename(new Path("/hbase/dir1/file1"), new Path("/hbase/dir2/")); + while (!parallelThreadDone.get()) ; + Assert.assertTrue(exceptionOnParallelRename.get()); + } + + @Test + public void testParallelAppendToFileBeingCopiedInAtomicDirectory() + throws Exception { + AzureBlobFileSystem fs = Mockito.spy(getFileSystem()); + assumeNonHnsAccountBlobEndpoint(fs); + fs.setWorkingDirectory(new Path("/")); + fs.mkdirs(new Path("/hbase/dir1")); + fs.create(new Path("/hbase/dir1/file1")); + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + Mockito.doReturn(store).when(fs).getAbfsStore(); + AbfsClient client = Mockito.spy(fs.getAbfsClient()); + store.setClient(client); + AtomicBoolean copyOfSrcFile = new AtomicBoolean(false); + AtomicBoolean parallelAppendDone = new AtomicBoolean(false); + AtomicBoolean exceptionCaught = new AtomicBoolean(false); + + Mockito.doAnswer(answer -> { + answer.callRealMethod(); + if ("/hbase/dir1/file1".equalsIgnoreCase( + ((Path) answer.getArgument(0)).toUri().getPath())) { + copyOfSrcFile.set(true); + while (!parallelAppendDone.get()) ; + } + return null; + }) + .when(store) + .copyBlob(Mockito.any(Path.class), Mockito.any(Path.class), + Mockito.nullable(String.class), Mockito.any(TracingContext.class)); + + FSDataOutputStream outputStream = fs.append(new Path("/hbase/dir1/file1")); + + new Thread(() -> { + while (!copyOfSrcFile.get()) ; + try { + byte[] bytes = new byte[4 * ONE_MB]; + new Random().nextBytes(bytes); + outputStream.write(bytes); + outputStream.hsync(); + } catch (Exception e) { + if (e.getCause() instanceof AbfsRestOperationException + && ((AbfsRestOperationException) e.getCause()).getStatusCode() + == HttpURLConnection.HTTP_PRECON_FAILED) { + exceptionCaught.set(true); + } + } finally { + parallelAppendDone.set(true); + } + }).start(); + + fs.rename(new Path("/hbase/dir1"), new Path("/hbase/dir2")); + + while (!parallelAppendDone.get()) ; + Assert.assertTrue(exceptionCaught.get()); + } + + @Test + public void testParallelBlobLeaseOnChildBlobInRenameSrcDir() + throws Exception { + assumeNonHnsAccountBlobEndpoint(getFileSystem()); + AzureBlobFileSystem fs = Mockito.spy( + (AzureBlobFileSystem) FileSystem.newInstance(getRawConfiguration())); + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + Path srcDirPath = new Path("/hbase/testDir"); + fs.mkdirs(srcDirPath); + fs.create(new Path(srcDirPath, "file1")); + fs.create(new Path(srcDirPath, "file2")); + Mockito.doReturn(store).when(fs).getAbfsStore(); + AbfsClient client = store.getClient(); + AbfsClient spiedClient = Mockito.spy(client); + store.setClient(spiedClient); + + fs.getAbfsClient() + .acquireBlobLease("/hbase/testDir/file2", -1, + Mockito.mock(TracingContext.class)); + + AbfsLease[] leases = new AbfsLease[1]; + Mockito.doAnswer(answer -> { + String path = answer.getArgument(0); + AbfsLease lease = (AbfsLease) answer.callRealMethod(); + if (srcDirPath.toUri().getPath().equalsIgnoreCase(path)) { + lease = Mockito.spy(lease); + leases[0] = lease; + } + return lease; + }) + .when(store) + .getBlobLease(Mockito.anyString(), Mockito.nullable(Integer.class), + Mockito.any(TracingContext.class)); + + Boolean renameFailed = false; + try { + fs.rename(srcDirPath, new Path("/hbase/newDir")); + } catch (Exception e) { + renameFailed = true; + } + + Assertions.assertThat(renameFailed).isTrue(); + Mockito.verify(leases[0], Mockito.times(1)).free(); + } + + @Test + public void testParallelCreateNonRecursiveToFilePartOfAtomicDirectoryInRename() + throws Exception { + Configuration configuration = Mockito.spy(getRawConfiguration()); + configuration.set(FS_AZURE_LEASE_CREATE_NON_RECURSIVE, "true"); + FileSystem fsCreate = FileSystem.newInstance(configuration); + AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(configuration); + assumeNonHnsAccountBlobEndpoint(fs); + fs.setWorkingDirectory(new Path("/")); + fs.mkdirs(new Path("/hbase/dir1")); + fs.create(new Path("/hbase/dir1/file1")); + AbfsClient client = Mockito.spy(fs.getAbfsClient()); + fs.getAbfsStore().setClient(client); + AtomicBoolean leaseAcquired = new AtomicBoolean(false); + AtomicBoolean parallelCreateDone = new AtomicBoolean(false); + AtomicBoolean exceptionCaught = new AtomicBoolean(false); + AtomicBoolean parallelRenameDone = new AtomicBoolean(false); + + Mockito.doAnswer(answer -> { + AbfsRestOperation op = (AbfsRestOperation) answer.callRealMethod(); + leaseAcquired.set(true); + while(!parallelCreateDone.get()); + return op; + }) + .when(client) + .acquireBlobLease(Mockito.anyString(), Mockito.anyInt(), + Mockito.any(TracingContext.class)); + + new Thread(() -> { + try { + fs.rename(new Path("/hbase/dir1"), new Path("/hbase/dir2")); + } catch (Exception e) {} finally { + parallelRenameDone.set(true); + } + }).start(); + + Path createNewFilePath = new Path("/hbase/dir1/file2"); + while (!leaseAcquired.get()) ; + try { + fsCreate.createFile(createNewFilePath) + .overwrite(false) + .replication((short) 1) + .bufferSize(1024) + .blockSize(1024) + .build(); + } catch (AbfsRestOperationException e) { + if (e.getStatusCode() + == HttpURLConnection.HTTP_CONFLICT) { + exceptionCaught.set(true); + } + } finally { + parallelCreateDone.set(true); + } + + + while (!parallelRenameDone.get()) ; + Assert.assertTrue(exceptionCaught.get()); + } + + @Test + public void testBlobRenameOfDirectoryHavingNeighborWithSamePrefix() + throws Exception { + AzureBlobFileSystem fs = getFileSystem(); + assumeNonHnsAccountBlobEndpoint(fs); + fs.mkdirs(new Path("/testDir/dir")); + fs.mkdirs(new Path("/testDir/dirSamePrefix")); + fs.create(new Path("/testDir/dir/file1")); + fs.create(new Path("/testDir/dir/file2")); + + fs.create(new Path("/testDir/dirSamePrefix/file1")); + fs.create(new Path("/testDir/dirSamePrefix/file2")); + + fs.rename(new Path("/testDir/dir"), new Path("/testDir/dir2")); + + Assertions.assertThat(fs.exists(new Path("/testDir/dirSamePrefix/file1"))) + .isTrue(); + Assertions.assertThat(fs.exists(new Path("/testDir/dir/file1"))) + .isFalse(); + Assertions.assertThat(fs.exists(new Path("/testDir/dir/file2"))) + .isFalse(); + Assertions.assertThat(fs.exists(new Path("/testDir/dir/"))) + .isFalse(); + } + + @Test + public void testBlobRenameCancelRenewTimerForLeaseTakenInAtomicRename() + throws Exception { + AzureBlobFileSystem fs = Mockito.spy( + (AzureBlobFileSystem) FileSystem.newInstance(getRawConfiguration())); + assumeNonHnsAccountBlobEndpoint(fs); + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + Mockito.doReturn(store).when(fs).getAbfsStore(); + + fs.mkdirs(new Path("/hbase/dir")); + fs.create(new Path("/hbase/dir/file1")); + fs.create(new Path("/hbase/dir/file2")); + + final List leases = new ArrayList<>(); + Mockito.doAnswer(answer -> { + AbfsBlobLease lease = Mockito.spy( + (AbfsBlobLease) answer.callRealMethod()); + leases.add(lease); + return lease; + }) + .when(store) + .getBlobLease(Mockito.anyString(), Mockito.nullable(Integer.class), + Mockito.any(TracingContext.class)); + + fs.rename(new Path("/hbase/dir"), new Path("/hbase/dir2")); + + Assertions.assertThat(leases).hasSize(3); + for (AbfsBlobLease lease : leases) { + Mockito.verify(lease, Mockito.times(1)).cancelTimer(); + } + } + + @Test + public void testBlobRenameServerReturnsOneBlobPerList() throws Exception { + assumeNonHnsAccountBlobEndpoint(getFileSystem()); + AzureBlobFileSystem fs = (AzureBlobFileSystem) Mockito.spy(FileSystem.newInstance(getRawConfiguration())); + fs.mkdirs(new Path("/testDir/")); + fs.create(new Path("/testDir/file1")); + fs.create(new Path("/testDir/file2")); + + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + AbfsClient client = fs.getAbfsClient(); + AbfsClient spiedClient = Mockito.spy(client); + store.setClient(spiedClient); + Mockito.doReturn(store).when(fs).getAbfsStore(); + Mockito.doAnswer(answer -> { + String marker = answer.getArgument(0); + String prefix = answer.getArgument(1); + String delimeter = answer.getArgument(2); + Integer count = answer.getArgument(3); + TracingContext tracingContext = answer.getArgument(4); + AbfsRestOperation op = client.getListBlobs(marker, prefix, delimeter, 1, tracingContext); + return op; + }).when(spiedClient).getListBlobs(Mockito.nullable(String.class), + Mockito.nullable(String.class), Mockito.nullable(String.class), + Mockito.nullable(Integer.class), Mockito.any(TracingContext.class)); + + fs.rename(new Path("/testDir"), new Path("/testDir1")); + Assertions.assertThat(fs.exists(new Path("/testDir"))).isFalse(); + Assertions.assertThat(fs.exists(new Path("/testDir1"))).isTrue(); + Assertions.assertThat(fs.exists(new Path("/testDir1/file1"))).isTrue(); + Assertions.assertThat(fs.exists(new Path("/testDir1/file2"))).isTrue(); + } + + @Test + public void testBlobAtomicRenameSrcAndDstAreNotLeftLeased() throws Exception { + AzureBlobFileSystem fs = Mockito.spy(getFileSystem()); + assumeNonHnsAccountBlobEndpoint(fs); + fs.setWorkingDirectory(new Path("/")); + fs.create(new Path("/hbase/dir1/blob1")); + fs.create(new Path("/hbase/dir1/blob2")); + fs.rename(new Path("/hbase/dir1/"), new Path("/hbase/dir2")); + fs.create(new Path("/hbase/dir1/blob1")); + byte[] bytes = new byte[4 * ONE_MB]; + new Random().nextBytes(bytes); + try (FSDataOutputStream os = fs.append(new Path("hbase/dir2/blob1"))) { + os.write(bytes); + } + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestListBlobProducer.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestListBlobProducer.java new file mode 100644 index 0000000000000..f3c4bd8d241d9 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestListBlobProducer.java @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import java.net.HttpURLConnection; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Test; +import org.mockito.Mockito; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; +import org.apache.hadoop.fs.azurebfs.services.AbfsClient; +import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; +import org.apache.hadoop.fs.azurebfs.services.BlobProperty; +import org.apache.hadoop.fs.azurebfs.services.ListBlobConsumer; +import org.apache.hadoop.fs.azurebfs.services.ListBlobProducer; +import org.apache.hadoop.fs.azurebfs.services.ListBlobQueue; +import org.apache.hadoop.fs.azurebfs.services.PrefixMode; +import org.apache.hadoop.fs.azurebfs.utils.TracingContext; + +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_PRODUCER_QUEUE_MAX_SIZE; + +public class ITestListBlobProducer extends AbstractAbfsIntegrationTest { + + public ITestListBlobProducer() throws Exception { + super(); + } + + @Override + public void setup() throws Exception { + super.setup(); + Assume.assumeTrue( + getFileSystem().getAbfsStore().getAbfsConfiguration().getPrefixMode() + == PrefixMode.BLOB); + } + + @Test + public void testProducerWaitingForConsumerLagToGoDown() throws Exception { + Configuration configuration = Mockito.spy(getRawConfiguration()); + configuration.set(FS_AZURE_PRODUCER_QUEUE_MAX_SIZE, "10"); + AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance( + configuration); + AbfsClient client = fs.getAbfsClient(); + AbfsClient spiedClient = Mockito.spy(client); + fs.getAbfsStore().setClient(spiedClient); + fs.setWorkingDirectory(new Path("/")); + fs.mkdirs(new Path("/src")); + ExecutorService executor = Executors.newFixedThreadPool(5); + List futureList = new ArrayList<>(); + for (int i = 0; i < 20; i++) { + int iter = i; + futureList.add(executor.submit(() -> { + return fs.create(new Path("/src/file" + iter)); + })); + } + for(Future future : futureList) { + future.get(); + } + + AtomicInteger producedBlobs = new AtomicInteger(0); + AtomicInteger listBlobInvoked = new AtomicInteger(0); + + final ITestListBlobProducer testObj = this; + final ListBlobQueue queue = new ListBlobQueue( + fs.getAbfsStore().getAbfsConfiguration().getProducerQueueMaxSize(), + 1); + final CountDownLatch latch = new CountDownLatch(10); + + Mockito.doAnswer(answer -> { + synchronized (testObj) { + listBlobInvoked.incrementAndGet(); + AbfsRestOperation op = client.getListBlobs(answer.getArgument(0), + answer.getArgument(1), answer.getArgument(2), 1, answer.getArgument(4)); + producedBlobs.incrementAndGet(); + latch.countDown(); + if(producedBlobs.get() > 10) { + Assert.assertTrue(queue.availableSize() > 0); + } + return op; + } + }) + .when(spiedClient) + .getListBlobs(Mockito.nullable(String.class), + Mockito.nullable(String.class), Mockito.nullable(String.class), + Mockito.nullable(Integer.class), + Mockito.nullable(TracingContext.class)); + + + ListBlobProducer producer = new ListBlobProducer("src/", spiedClient, queue, + null, Mockito.mock( + TracingContext.class)); + ListBlobConsumer consumer = new ListBlobConsumer(queue); + latch.await(); + + int oldInvocation = listBlobInvoked.get(); + Assert.assertTrue(listBlobInvoked.get() == oldInvocation); + + while (!consumer.isCompleted()) { + synchronized (testObj) { + consumer.consume(); + Assert.assertTrue(queue.availableSize() > 0); + } + } + + Assert.assertTrue(producedBlobs.get() == 20); + } + + @Test + public void testConsumerWhenProducerThrowException() throws Exception { + Configuration configuration = Mockito.spy(getRawConfiguration()); + configuration.set(FS_AZURE_PRODUCER_QUEUE_MAX_SIZE, "10"); + AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance( + configuration); + AbfsClient client = fs.getAbfsClient(); + AbfsClient spiedClient = Mockito.spy(client); + fs.getAbfsStore().setClient(spiedClient); + fs.setWorkingDirectory(new Path("/")); + fs.mkdirs(new Path("/src")); + for (int i = 0; i < 20; i++) { + fs.create(new Path("/src/file" + i)); + } + + Mockito.doAnswer(answer -> { + throw new AbfsRestOperationException(HttpURLConnection.HTTP_CONFLICT, "", + "", new Exception("")); + + }) + .when(spiedClient) + .getListBlobs(Mockito.nullable(String.class), + Mockito.nullable(String.class), Mockito.nullable(String.class), + Mockito.nullable(Integer.class), + Mockito.nullable(TracingContext.class)); + + ListBlobQueue queue = new ListBlobQueue(getConfiguration().getProducerQueueMaxSize(), + getConfiguration().getProducerQueueMaxSize()); + ListBlobProducer producer = new ListBlobProducer("src/", spiedClient, queue, + null, Mockito.mock( + TracingContext.class)); + ListBlobConsumer consumer = new ListBlobConsumer(queue); + + Boolean exceptionCaught = false; + try { + while (!consumer.isCompleted()) { + consumer.consume(); + } + } catch (AzureBlobFileSystemException e) { + exceptionCaught = true; + } + + Assert.assertTrue(exceptionCaught); + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/DelegationSASGenerator.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/DelegationSASGenerator.java index 0d088483836eb..3b7e3cacd0ec1 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/DelegationSASGenerator.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/DelegationSASGenerator.java @@ -60,6 +60,7 @@ public String getDelegationSAS(String accountName, String containerName, String case SASTokenProvider.CREATE_DIRECTORY_OPERATION: case SASTokenProvider.WRITE_OPERATION: case SASTokenProvider.SET_PROPERTIES_OPERATION: + case SASTokenProvider.LEASE_OPERATION: case SASTokenProvider.SET_BLOB_METADATA_OPERATION: sp = "w"; break;