diff --git a/hadoop-tools/hadoop-azure/pom.xml b/hadoop-tools/hadoop-azure/pom.xml index 18b2f6b477bc2..67a85bcc5a35a 100644 --- a/hadoop-tools/hadoop-azure/pom.xml +++ b/hadoop-tools/hadoop-azure/pom.xml @@ -557,7 +557,6 @@ **/azurebfs/ITestSmallWriteOptimization.java **/azurebfs/services/ITestReadBufferManager.java **/azurebfs/ITestAzureBlobFileSystemRename.java - **/azurebfs/ITestListBlob.java @@ -600,7 +599,6 @@ **/azurebfs/ITestSmallWriteOptimization.java **/azurebfs/services/ITestReadBufferManager.java **/azurebfs/ITestAzureBlobFileSystemRename.java - **/azurebfs/ITestListBlob.java diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index 41e8f537e8799..953b752dd665f 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -244,6 +244,10 @@ public class AbfsConfiguration{ DefaultValue = 0) private int blobDirRenameMaxThread; + @LongConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_BLOB_COPY_PROGRESS_POLL_WAIT_MILLIS, + DefaultValue = 1_000L) + private long blobCopyProgressPollWaitMillis; + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_READ_AHEAD_BLOCK_SIZE, MinValue = MIN_BUFFER_SIZE, MaxValue = MAX_BUFFER_SIZE, @@ -1031,6 +1035,10 @@ public int getBlobDirRenameMaxThread() { return blobDirRenameMaxThread; } + public long getBlobCopyProgressPollWaitMillis() { + return blobCopyProgressPollWaitMillis; + } + @VisibleForTesting void setReadBufferSize(int bufferSize) { this.readBufferSize = bufferSize; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index e48d21b80b905..b04719c2a390b 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -41,11 +41,12 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.azurebfs.services.BlobProperty; +import org.apache.hadoop.fs.azurebfs.services.PathInformation; import org.apache.hadoop.fs.azurebfs.services.PrefixMode; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -401,14 +402,18 @@ public FSDataOutputStream append(final Path f, final int bufferSize, final Progr } public boolean rename(final Path src, final Path dst) throws IOException { - LOG.debug("AzureBlobFileSystem.rename src: {} dst: {}", src, dst); - LOG.debug("Rename via Blob-endpoint for non-HNS account: {}", - getAbfsStore().getAbfsConfiguration().getPrefixMode() - == PrefixMode.BLOB); + LOG.debug("AzureBlobFileSystem.rename src: {} dst: {} via {} endpoint", src, dst, + getAbfsStore().getAbfsConfiguration().getPrefixMode()); statIncrement(CALL_RENAME); trailingPeriodCheck(dst); + if (getAbfsStore().getAbfsConfiguration().getPrefixMode() == PrefixMode.BLOB + && containsColon(dst)) { + throw new IOException("Cannot rename to file " + dst + + " that has colons in the name through blob endpoint"); + } + Path parentFolder = src.getParent(); if (parentFolder == null) { return false; @@ -420,18 +425,17 @@ public boolean rename(final Path src, final Path dst) throws IOException { if (getAbfsStore().getAbfsConfiguration().getPrefixMode() == PrefixMode.BLOB) { /* + * Special case 1: * For blob endpoint with non-HNS account, client has to ensure that destination * is not a sub-directory of source. */ LOG.debug("Check if the destination is subDirectory"); - while (nestedDstParent != null) { - if (makeQualified(nestedDstParent).equals(qualifiedSrcPath)) { - //testRenameChildDirForbidden. - LOG.info("Rename src: {} dst: {} failed as dst is subDir of src", - qualifiedSrcPath, qualifiedDstPath); - return false; - } - nestedDstParent = nestedDstParent.getParent(); + if (nestedDstParent != null && makeQualified(nestedDstParent).toUri() + .getPath() + .indexOf(qualifiedSrcPath.toUri().getPath()) == 0) { + LOG.info("Rename src: {} dst: {} failed as dst is subDir of src", + qualifiedSrcPath, qualifiedDstPath); + return false; } } @@ -439,42 +443,54 @@ public boolean rename(final Path src, final Path dst) throws IOException { TracingContext tracingContext = new TracingContext(clientCorrelationId, fileSystemId, FSOperationType.RENAME, true, tracingHeaderFormat, listener); + // special case 2: // rename under same folder; if (makeQualified(parentFolder).equals(qualifiedDstPath)) { - return tryGetFileStatus(qualifiedSrcPath, tracingContext) != null; + PathInformation pathInformation = getPathInformation(qualifiedDstPath, + tracingContext); + return pathInformation.getPathExists(); } + //special case 3: if (qualifiedSrcPath.equals(qualifiedDstPath)) { // rename to itself // - if it doesn't exist, return false // - if it is file, return true // - if it is dir, return false. - final AtomicBoolean isDstDirectory = new AtomicBoolean(); - final AtomicBoolean isDstExists = new AtomicBoolean(); - getPathInformation(qualifiedDstPath, tracingContext, isDstDirectory, - isDstExists); - if (!isDstExists.get()) { + final PathInformation pathInformation = getPathInformation( + qualifiedDstPath, tracingContext + ); + final Boolean isDstExists = pathInformation.getPathExists(); + final Boolean isDstDirectory = pathInformation.getIsDirectory(); + if (!isDstExists) { return false; } - return isDstDirectory.get() ? false : true; + return isDstDirectory ? false : true; } - final AtomicBoolean isDstDirectory = new AtomicBoolean(); - final AtomicBoolean isDstExists = new AtomicBoolean(); - + // special case 4: // Non-HNS account need to check dst status on driver side. + PathInformation fnsPathInformation = null; if (!abfsStore.getIsNamespaceEnabled(tracingContext)) { - getPathInformation(qualifiedDstPath, tracingContext, isDstDirectory, - isDstExists); + fnsPathInformation = getPathInformation(qualifiedDstPath, tracingContext + ); } try { + final Boolean isFnsDstExists, isFnsDstDirectory; + if (fnsPathInformation != null) { + isFnsDstDirectory = fnsPathInformation.getIsDirectory(); + isFnsDstExists = fnsPathInformation.getPathExists(); + } else { + isFnsDstExists = false; + isFnsDstDirectory = false; + } String sourceFileName = src.getName(); Path adjustedDst = dst; - if (isDstExists.get()) { - if (!isDstDirectory.get()) { + if (isFnsDstExists) { + if (!isFnsDstDirectory) { return qualifiedSrcPath.equals(qualifiedDstPath); } adjustedDst = new Path(dst, sourceFileName); @@ -486,13 +502,15 @@ public boolean rename(final Path src, final Path dst) throws IOException { */ if (getAbfsStore().getAbfsConfiguration().getPrefixMode() == PrefixMode.BLOB) { - isDstDirectory.set(false); - isDstExists.set(false); - getPathInformation(qualifiedDstPath, tracingContext, isDstDirectory, - isDstExists); - if (isDstExists.get()) { + final PathInformation qualifiedDstPathInformation + = getPathInformation(qualifiedDstPath, tracingContext + ); + final Boolean isQualifiedDstExists + = qualifiedDstPathInformation.getPathExists(); + if (isQualifiedDstExists) { //destination already there. Rename should not be overwriting. - LOG.info("Rename src: {} dst: {} failed as qualifiedDst already exists", + LOG.info( + "Rename src: {} dst: {} failed as qualifiedDst already exists", qualifiedSrcPath, qualifiedDstPath); throw new AbfsRestOperationException( HttpURLConnection.HTTP_CONFLICT, @@ -501,18 +519,23 @@ public boolean rename(final Path src, final Path dst) throws IOException { } } } else { - LOG.debug("dst {} doesn't exists. Check if the parent exists.", adjustedDst); + LOG.debug("dst {} doesn't exists. Check if the parent exists.", + adjustedDst); qualifiedDstPath = makeQualified(adjustedDst); /* * If the destination doesn't exist, check if parent of destination exists. */ Path parent = qualifiedDstPath.getParent(); - if (parent == null || !parent.isRoot()) { - isDstDirectory.set(false); - isDstExists.set(false); - getPathInformation(parent, tracingContext, isDstDirectory, - isDstExists); - if (!isDstExists.get() || !isDstDirectory.get()) { + if (getAbfsStore().getAbfsConfiguration().getPrefixMode() + == PrefixMode.BLOB && (parent != null && !parent.isRoot())) { + PathInformation dstParentPathInformation = getPathInformation(parent, + tracingContext + ); + final Boolean dstParentPathExists + = dstParentPathInformation.getPathExists(); + final Boolean isDstParentPathDirectory + = dstParentPathInformation.getIsDirectory(); + if (!dstParentPathExists || !isDstParentPathDirectory) { LOG.info("parent of {} is {} doesn't exists. Failing rename", adjustedDst, parent); throw new AbfsRestOperationException( @@ -523,21 +546,30 @@ public boolean rename(final Path src, final Path dst) throws IOException { } } } - - abfsStore.rename(qualifiedSrcPath, qualifiedDstPath, this, + final RenameAtomicityUtils renameAtomicityUtils; + if (getAbfsStore().getAbfsConfiguration().getPrefixMode() + == PrefixMode.BLOB && + abfsStore.isAtomicRenameKey(qualifiedSrcPath.toUri().getPath())) { + renameAtomicityUtils = new RenameAtomicityUtils(this, + qualifiedSrcPath, qualifiedDstPath, tracingContext); + } else { + renameAtomicityUtils = new RenameNonAtomicUtils(this, + qualifiedSrcPath, qualifiedDstPath, tracingContext); + } + abfsStore.rename(qualifiedSrcPath, qualifiedDstPath, renameAtomicityUtils, tracingContext); return true; } catch (AzureBlobFileSystemException ex) { LOG.debug("Rename operation failed. ", ex); checkException( - src, - ex, - AzureServiceErrorCode.PATH_ALREADY_EXISTS, - AzureServiceErrorCode.INVALID_RENAME_SOURCE_PATH, - AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND, - AzureServiceErrorCode.INVALID_SOURCE_OR_DESTINATION_RESOURCE_TYPE, - AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND, - AzureServiceErrorCode.INTERNAL_OPERATION_ABORT); + src, + ex, + AzureServiceErrorCode.PATH_ALREADY_EXISTS, + AzureServiceErrorCode.INVALID_RENAME_SOURCE_PATH, + AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND, + AzureServiceErrorCode.INVALID_SOURCE_OR_DESTINATION_RESOURCE_TYPE, + AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND, + AzureServiceErrorCode.INTERNAL_OPERATION_ABORT); return false; } } @@ -555,44 +587,43 @@ public boolean rename(final Path src, final Path dst) throws IOException { * shall be called. If the response returned an object, the path can be defined * as existing. If the response's metadata contains it is directory, the path * can be defined as a directory. + * * @param path path for which information is requried. * @param tracingContext tracingContext for the operations. - * @param isPathDirectory atomicBoolean object which will be set in the method - * if the given path is directory. - * @param isPathExists atomicBoolean object which will be set in the method if - * the given path exists. + * + * @return pathInformation containing if path exists and is a directory. + * * @throws AzureBlobFileSystemException exceptions caught from the server calls. */ - private void getPathInformation(final Path path, - final TracingContext tracingContext, - final AtomicBoolean isPathDirectory, - final AtomicBoolean isPathExists) throws AzureBlobFileSystemException { + private PathInformation getPathInformation(final Path path, + final TracingContext tracingContext) throws AzureBlobFileSystemException { if (getAbfsStore().getAbfsConfiguration().getPrefixMode() == PrefixMode.BLOB) { List blobProperties = getAbfsStore() - .getListBlobs(path, tracingContext, 2, 2, true); + .getListBlobs(path, null, tracingContext, 2, true); if (blobProperties.size() > 0) { - isPathExists.set(true); - isPathDirectory.set(true); - return; + return new PathInformation(true, true); } - BlobProperty blobProperty - = getAbfsStore().getBlobPropertyWithNotFoundHandling(path, - tracingContext); - if (blobProperty != null) { - isPathExists.set(true); - if (blobProperty.getIsDirectory()) { - isPathDirectory.set(true); + BlobProperty blobProperty; + try { + blobProperty = getAbfsStore().getBlobProperty(path, tracingContext); + } catch (AbfsRestOperationException ex) { + if (ex.getStatusCode() != HttpURLConnection.HTTP_NOT_FOUND) { + throw ex; } + blobProperty = null; + } + if (blobProperty != null) { + return new PathInformation(true, blobProperty.getIsDirectory()); } } else { final FileStatus fileStatus = tryGetFileStatus(path, tracingContext); if (fileStatus != null) { - isPathExists.set(true); - isPathDirectory.set(fileStatus.isDirectory()); + return new PathInformation(true, fileStatus.isDirectory()); } } + return new PathInformation(false, false); } @Override @@ -1596,6 +1627,10 @@ private Throwable getRootCause(Throwable throwable) { return result; } + private boolean containsColon(Path p) { + return p.toUri().getPath().contains(":"); + } + /** * Get a delegation token from remote service endpoint if * 'fs.azure.enable.kerberos.support' is set to 'true', and @@ -1696,7 +1731,7 @@ String getClientCorrelationId() { return clientCorrelationId; } - @org.apache.hadoop.classification.VisibleForTesting + @VisibleForTesting void setAbfsStore(final AzureBlobFileSystemStore abfsStore) { this.abfsStore = abfsStore; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 5ba405fa6d434..3a7ebca5d8786 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -56,8 +56,13 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import org.apache.hadoop.classification.VisibleForTesting; + +import org.apache.hadoop.fs.azurebfs.enums.BlobCopyProgress; import org.apache.hadoop.fs.azurebfs.services.PrefixMode; -import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.fs.azurebfs.services.BlobList; +import org.apache.hadoop.fs.azurebfs.services.BlobProperty; + import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; import org.apache.hadoop.thirdparty.com.google.common.base.Strings; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures; @@ -144,13 +149,15 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COPY_STATUS_ABORTED; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COPY_STATUS_FAILED; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COPY_STATUS_SUCCESS; -import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HDI_ISFOLDER; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TOKEN_VERSION; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_ABFS_ENDPOINT; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BUFFERED_PREAD_DISABLE; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_IDENTITY_TRANSFORM_CLASS; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HBASE_ROOT; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.CONTENT_LENGTH; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_COPY_ID; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_COPY_SOURCE; @@ -188,8 +195,6 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport { private final AbfsPerfTracker abfsPerfTracker; private final AbfsCounters abfsCounters; - private final Boolean useSecureHttp; - private final ExecutorService renameBlobExecutorService; /** @@ -248,12 +253,11 @@ public AzureBlobFileSystemStore( this.azureAtomicRenameDirSet = new HashSet<>(Arrays.asList( abfsConfiguration.getAzureAtomicRenameDirs().split(AbfsHttpConstants.COMMA))); - this.azureAtomicRenameDirSet.add("/hbase"); + this.azureAtomicRenameDirSet.add(HBASE_ROOT); updateInfiniteLeaseDirs(); this.authType = abfsConfiguration.getAuthType(accountName); boolean usingOauth = (authType == AuthType.OAuth); boolean useHttps = (usingOauth || abfsConfiguration.isHttpsAlwaysUsed()) ? true : abfsStoreBuilder.isSecureScheme; - useSecureHttp = useHttps; this.abfsPerfTracker = new AbfsPerfTracker(fileSystemName, accountName, this.abfsConfiguration); this.abfsCounters = abfsStoreBuilder.abfsCounters; initializeClient(uri, fileSystemName, accountName, useHttps); @@ -519,7 +523,7 @@ public Hashtable getPathStatus(final Path path, * @throws AzureBlobFileSystemException exception thrown from the server calls, * or if it is discovered that the copying is failed or aborted. */ - @org.apache.hadoop.classification.VisibleForTesting + @VisibleForTesting void copyBlob(Path srcPath, Path dstPath, TracingContext tracingContext) throws AzureBlobFileSystemException { @@ -533,7 +537,7 @@ void copyBlob(Path srcPath, tracingContext); try { if (dstBlobProperty.getCopySourceUrl() != null && - ("/" + client.getFileSystem() + srcPath.toUri().getPath()).equals( + (ROOT_PATH + client.getFileSystem() + srcPath.toUri().getPath()).equals( new URL(dstBlobProperty.getCopySourceUrl()).toURI() .getPath())) { return; @@ -544,17 +548,15 @@ void copyBlob(Path srcPath, } throw ex; } - final String progress = getCopyBlobProgress(copyOp); - if (COPY_STATUS_SUCCESS.equals(progress)) { + final String progress = copyOp.getResult().getResponseHeader(X_MS_COPY_STATUS); + if (COPY_STATUS_SUCCESS.equalsIgnoreCase(progress)) { return; } final String copyId = copyOp.getResult().getResponseHeader(X_MS_COPY_ID); - while (true) {//TODO: better condition. - if (handleCopyInProgress(dstPath, tracingContext, copyId)) { - return; - } + final long pollWait = abfsConfiguration.getBlobCopyProgressPollWaitMillis(); + while (handleCopyInProgress(dstPath, tracingContext, copyId) == BlobCopyProgress.PENDING) { try { - Thread.sleep(1000l); //Taken sleep time from AzureNativeFileSystemStore. + Thread.sleep(pollWait); } catch (Exception e) { } @@ -563,6 +565,7 @@ void copyBlob(Path srcPath, /** * Verifies if the blob copy is success or a failure or still in progress. + * * @param dstPath path of the destination for the copying * @param tracingContext object of tracingContext used for the tracing of the * server calls. @@ -570,78 +573,48 @@ void copyBlob(Path srcPath, * attached to blob and is returned by GetBlobProperties API on the destination. * * @return true if copying is success, false if it is still in progress. + * * @throws AzureBlobFileSystemException exception returned in making server call * for GetBlobProperties on the path. It can be thrown if the copyStatus is failure * or is aborted. */ - @org.apache.hadoop.classification.VisibleForTesting - boolean handleCopyInProgress(final Path dstPath, + @VisibleForTesting + BlobCopyProgress handleCopyInProgress(final Path dstPath, final TracingContext tracingContext, final String copyId) throws AzureBlobFileSystemException { BlobProperty blobProperty = getBlobProperty(dstPath, tracingContext); if (blobProperty != null && copyId.equals(blobProperty.getCopyId())) { - if (COPY_STATUS_SUCCESS.equals(blobProperty.getCopyStatus())) { - return true; + if (COPY_STATUS_SUCCESS.equalsIgnoreCase(blobProperty.getCopyStatus())) { + return BlobCopyProgress.SUCCESS; } - if (COPY_STATUS_FAILED.equals(blobProperty.getCopyStatus())) { + if (COPY_STATUS_FAILED.equalsIgnoreCase(blobProperty.getCopyStatus())) { throw new AbfsRestOperationException( COPY_BLOB_FAILED.getStatusCode(), COPY_BLOB_FAILED.getErrorCode(), - null, + String.format("copy to path %s failed due to: %s", + dstPath.toUri().getPath(), blobProperty.getStatusDescription()), new Exception(COPY_BLOB_FAILED.getErrorCode())); } - if (COPY_STATUS_ABORTED.equals(blobProperty.getCopyStatus())) { + if (COPY_STATUS_ABORTED.equalsIgnoreCase(blobProperty.getCopyStatus())) { throw new AbfsRestOperationException( COPY_BLOB_ABORTED.getStatusCode(), COPY_BLOB_ABORTED.getErrorCode(), - null, + String.format("copy to path %s aborted", dstPath.toUri().getPath()), new Exception(COPY_BLOB_ABORTED.getErrorCode())); } } - return false; - } - - @org.apache.hadoop.classification.VisibleForTesting - String getCopyBlobProgress(final AbfsRestOperation copyOp) { - return getCopyStatus(copyOp.getResult()); - } - - /** - * Wrapper on {@link #getBlobProperty(Path, TracingContext)} with the handling - * for the httpStatusCode = 404 on the server response. - * - * @param blobPath path for which the property information is required - * @param tracingContext object of TracingContext required for the tracing of - * server calls - * @return instance of BlobProperty if the blob is present on the given path. - * null if there is no blob on the given path. - * @throws AzureBlobFileSystemException exception other than - * {@link AbfsRestOperationException} for httpStatusCode = 404 on the server - * response. - */ - BlobProperty getBlobPropertyWithNotFoundHandling(Path blobPath, - TracingContext tracingContext) throws AzureBlobFileSystemException { - try { - return getBlobProperty(blobPath, tracingContext); - } catch (AbfsRestOperationException ex) { - if (ex.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) { - return null; - } - throw ex; - } + return BlobCopyProgress.PENDING; } /** - * Calls {@link AbfsClient#getBlobProperty(Path, TracingContext)} on the given - * path. Extract the headers from the server-response and converts it to an object - * of {@link BlobProperty}. + * Gets the property for the blob over Blob Endpoint. * - * @param blobPath blobPath for which property information is requried + * @param blobPath blobPath for which property information is required * @param tracingContext object of TracingContext required for tracing server calls. * @return BlobProperty for the given path * @throws AzureBlobFileSystemException exception thrown from * {@link AbfsClient#getBlobProperty(Path, TracingContext)} call */ - private BlobProperty getBlobProperty(Path blobPath, + BlobProperty getBlobProperty(Path blobPath, TracingContext tracingContext) throws AzureBlobFileSystemException { AbfsRestOperation op = client.getBlobProperty(blobPath, tracingContext); BlobProperty blobProperty = new BlobProperty(); @@ -654,46 +627,45 @@ private BlobProperty getBlobProperty(Path blobPath, blobProperty.setCopySourceUrl(opResult.getResponseHeader(X_MS_COPY_SOURCE)); blobProperty.setStatusDescription( opResult.getResponseHeader(X_MS_COPY_STATUS_DESCRIPTION)); - blobProperty.setCopyStatus(getCopyStatus(opResult)); + blobProperty.setCopyStatus(opResult.getResponseHeader(X_MS_COPY_STATUS)); blobProperty.setContentLength( Long.parseLong(opResult.getResponseHeader(CONTENT_LENGTH))); return blobProperty; } - @org.apache.hadoop.classification.VisibleForTesting - String getCopyStatus(final AbfsHttpOperation opResult) { - return opResult.getResponseHeader(X_MS_COPY_STATUS); - } - /** - * Call server API for ListBlob on the blob endpoint. This API returns a limited - * number of blobs and provide a field called as NextMarker which is reference to - * next list of blobs for the query. Server expects that the client calls this API - * in loop with the NextMarker received in previous iteration of backend call for - * the same request. + * Get the list of a blob on a give path, or blob starting with the given prefix. * - * @param sourceDirBlobPath path from where the list of blob is requried. + * @param sourceDirBlobPath path from where the list of blob is required. + * @param prefix Optional value to be provided. If provided, API call would have + * prefix = given value. If not provided, the API call would have prefix = + * sourceDirBlobPath. * @param tracingContext object of {@link TracingContext} - * @param maxPerServerCallResult define how many blobs can client handle in server response. - * In case maxResult <= 5000, server sends number of blobs equal to the value. In - * case maxResult > 5000, server sends maximum 5000 blobs. * @param maxResult defines maximum blobs the method should process - * @param absoluteDirSearch defines if (true) it is blobList search on a + * @param isDefinitiveDirSearch defines if (true) it is blobList search on a * definitive directory, if (false) it is blobList search on a prefix. + * * @return List of blobProperties * * @throws AbfsRestOperationException exception from server-calls / xml-parsing */ public List getListBlobs(Path sourceDirBlobPath, - TracingContext tracingContext, Integer maxPerServerCallResult, - final Integer maxResult, final Boolean absoluteDirSearch) + String prefix, TracingContext tracingContext, + final Integer maxResult, final Boolean isDefinitiveDirSearch) throws AzureBlobFileSystemException { List blobProperties = new ArrayList<>(); String nextMarker = null; + if (prefix == null) { + prefix = (!sourceDirBlobPath.isRoot() + ? sourceDirBlobPath.toUri().getPath() + : EMPTY_STRING) + (isDefinitiveDirSearch + ? ROOT_PATH + : EMPTY_STRING); + } do { - AbfsRestOperation op = client.getListBlobs(sourceDirBlobPath, - tracingContext, nextMarker, null, maxPerServerCallResult, - absoluteDirSearch); + AbfsRestOperation op = client.getListBlobs( + nextMarker, prefix, maxResult, tracingContext + ); BlobList blobList = op.getResult().getBlobList(); nextMarker = blobList.getNextMarker(); blobProperties.addAll(blobList.getBlobPropertyList()); @@ -709,9 +681,9 @@ public void setPathProperties(final Path path, throws AzureBlobFileSystemException { try (AbfsPerfInfo perfInfo = startTracking("setPathProperties", "setPathProperties")){ LOG.debug("setFilesystemProperties for filesystem: {} path: {} with properties: {}", - client.getFileSystem(), - path, - properties); + client.getFileSystem(), + path, + properties); final String commaSeparatedProperties; try { @@ -1081,7 +1053,7 @@ public void breakLease(final Path path, final TracingContext tracingContext) thr } public void rename(final Path source, final Path destination, - final AzureBlobFileSystem azureBlobFileSystem, TracingContext tracingContext) throws + final RenameAtomicityUtils renameAtomicityUtils, TracingContext tracingContext) throws IOException { final Instant startAggregate = abfsPerfTracker.getLatencyInstant(); long countAggregate = 0; @@ -1095,9 +1067,9 @@ public void rename(final Path source, final Path destination, /* * Fetch the list of blobs in the given sourcePath. */ - List srcBlobProperties = getListBlobs(source, - tracingContext, null, null, true); - final BlobProperty blobPropOnSrc; + List srcBlobProperties = getListBlobs(source, null, + tracingContext, null, true); + BlobProperty blobPropOnSrc; if (srcBlobProperties.size() > 0) { LOG.debug("src {} exists and is a directory", source); isSrcExist = true; @@ -1105,7 +1077,15 @@ public void rename(final Path source, final Path destination, /* * Fetch if there is a marker-blob for the source blob. */ - BlobProperty blobPropOnSrcNullable = getBlobPropertyWithNotFoundHandling(source, tracingContext); + BlobProperty blobPropOnSrcNullable; + try { + blobPropOnSrcNullable = getBlobProperty(source, tracingContext); + } catch (AbfsRestOperationException ex) { + if(ex.getStatusCode() != HttpURLConnection.HTTP_NOT_FOUND) { + throw ex; + } + blobPropOnSrcNullable = null; + } if(blobPropOnSrcNullable == null) { /* * There is no marker-blob, the client has to create marker blob before @@ -1113,10 +1093,7 @@ public void rename(final Path source, final Path destination, */ //create marker file; add in srcBlobProperties; LOG.debug("Source {} is a directory but there is no marker-blob", source); - azureBlobFileSystem.create(source); - Hashtable props = new Hashtable<>(); - props.put(HDI_ISFOLDER, "true"); - setPathProperties(source, props, tracingContext); + createDirectory(source, FsPermission.getDirDefault(), FsPermission.getUMask(getAbfsConfiguration().getRawConfiguration()), tracingContext); blobPropOnSrc = new BlobProperty(); blobPropOnSrc.setIsDirectory(true); blobPropOnSrc.setPath(source); @@ -1127,7 +1104,15 @@ public void rename(final Path source, final Path destination, } else { LOG.debug("source {} doesn't have any blob in its hierarchy. Checking" + "if there is marker blob for it.", source); - blobPropOnSrc = getBlobPropertyWithNotFoundHandling(source, tracingContext); + try { + blobPropOnSrc = getBlobProperty(source, tracingContext); + } catch (AbfsRestOperationException ex) { + if(ex.getStatusCode() != HttpURLConnection.HTTP_NOT_FOUND) { + throw ex; + } + blobPropOnSrc = null; + } + if(blobPropOnSrc != null) { isSrcExist = true; if(blobPropOnSrc.getIsDirectory()) { @@ -1156,14 +1141,11 @@ public void rename(final Path source, final Path destination, * individually copied and then deleted at the source. */ LOG.debug("source {} is a directory", source); - final RenameAtomicityUtils renameAtomicityUtils; if (isAtomicRenameKey(source.toUri().getPath())) { LOG.debug("source dir {} is an atomicRenameKey", source.toUri().getPath()); - renameAtomicityUtils = new RenameAtomicityUtils(azureBlobFileSystem, - source, destination, tracingContext, srcBlobProperties); + renameAtomicityUtils.preRename(srcBlobProperties); } else { LOG.debug("source dir {} is not an atomicRenameKey", source.toUri().getPath()); - renameAtomicityUtils = null; } List futures = new ArrayList<>(); for (BlobProperty blobProperty : srcBlobProperties) { @@ -1252,7 +1234,7 @@ private Path createDestinationPathForBlobPartOfRenameSrcDir(final Path destinati if (sourcePathStr.equals(srcBlobPropertyPathStr)) { return destinationDir; } - return new Path(destinationPathStr + "/" + srcBlobPropertyPathStr.substring( + return new Path(destinationPathStr + ROOT_PATH + srcBlobPropertyPathStr.substring( sourcePathStr.length())); } @@ -1269,7 +1251,18 @@ private void renameBlob(final Path destination, final TracingContext tracingContext, final Path sourcePath) throws AzureBlobFileSystemException { copyBlob(sourcePath, destination, tracingContext); - client.deleteBlobPath(sourcePath, tracingContext); + deleteBlob(sourcePath, tracingContext); + } + + private void deleteBlob(final Path sourcePath, + final TracingContext tracingContext) throws AzureBlobFileSystemException { + try { + client.deleteBlobPath(sourcePath, tracingContext); + } catch (AbfsRestOperationException ex) { + if (ex.getStatusCode() != HttpURLConnection.HTTP_NOT_FOUND) { + throw ex; + } + } } public void delete(final Path path, final boolean recursive, @@ -1906,6 +1899,7 @@ public void redo(final Path destination, final List sourcePaths, if (ex.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) { continue; } + throw ex; } } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/RenameAtomicityUtils.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/RenameAtomicityUtils.java index ddb5ea6a4311f..55bc2908e4b45 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/RenameAtomicityUtils.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/RenameAtomicityUtils.java @@ -44,15 +44,18 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; +import org.apache.hadoop.fs.azurebfs.services.BlobProperty; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FORWARD_SLASH; + /** * For a directory enabled for atomic-rename, before rename starts, a * file with -RenamePending.json suffix is created. In this file, the states required * for the rename are given. This file is created by {@link #writeFile()} method. * This is important in case the JVM process crashes during rename, the atomicity * will be maintained, when the job calls {@link AzureBlobFileSystem#listStatus(Path)} - * ot {@link AzureBlobFileSystem#getFileStatus(Path)}. On these API calls to filesystem, + * or {@link AzureBlobFileSystem#getFileStatus(Path)}. On these API calls to filesystem, * it will be checked if there is any RenamePending JSON file. If yes, the rename * would be resumed as per the file. */ @@ -65,7 +68,6 @@ public class RenameAtomicityUtils { private Path srcPath; private Path dstPath; private TracingContext tracingContext; - private List blobPropertyList; private static final int MAX_RENAME_PENDING_FILE_SIZE = 10000000; private static final int FORMATTING_BUFFER = 10000; @@ -79,15 +81,11 @@ public class RenameAtomicityUtils { RenameAtomicityUtils(final AzureBlobFileSystem azureBlobFileSystem, final Path srcPath, final Path dstPath, - final TracingContext tracingContext, - List blobPropertyList) throws IOException { + final TracingContext tracingContext) throws IOException { this.azureBlobFileSystem = azureBlobFileSystem; this.srcPath = srcPath; this.dstPath = dstPath; this.tracingContext = tracingContext; - this.blobPropertyList = blobPropertyList; - - writeFile(); } RenameAtomicityUtils(final AzureBlobFileSystem azureBlobFileSystem, @@ -159,7 +157,7 @@ private RenamePendingFileInfo readFile(final Path redoFile) newFolderName.textValue())) { RenamePendingFileInfo renamePendingFileInfo = new RenamePendingFileInfo(); renamePendingFileInfo.destination = new Path(newFolderName.textValue()); - String srcDir = oldFolderName.textValue() + "/"; + String srcDir = oldFolderName.textValue() + FORWARD_SLASH; List srcPaths = new ArrayList<>(); List destinationSuffix = new ArrayList<>(); JsonNode fileList = json.get("FileList"); @@ -216,12 +214,12 @@ private void deleteRenamePendingFile(FileSystem fs, Path redoFile) * } } * @throws IOException Thrown when fail to write file. */ - private void writeFile() throws IOException { + public void preRename(List blobPropertyList) throws IOException { Path path = getRenamePendingFilePath(); LOG.debug("Preparing to write atomic rename state to {}", path.toString()); OutputStream output = null; - String contents = makeRenamePendingFileContents(); + String contents = makeRenamePendingFileContents(blobPropertyList); // Write file. try { @@ -257,7 +255,7 @@ private void writeFile() throws IOException { * * @return JSON string which represents the operation. */ - private String makeRenamePendingFileContents() { + private String makeRenamePendingFileContents(List blobPropertyList) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); sdf.setTimeZone(TimeZone.getTimeZone("UTC")); String time = sdf.format(new Date()); @@ -283,7 +281,7 @@ private String makeRenamePendingFileContents() { .equals(srcPath.toUri().getPath())) { noPrefix = StringUtils.removeStart( blobPropertyList.get(i).getPath().toUri().getPath(), - srcPath.toUri().getPath() + "/"); + srcPath.toUri().getPath() + FORWARD_SLASH); } else { noPrefix = ""; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/RenameNonAtomicUtils.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/RenameNonAtomicUtils.java new file mode 100644 index 0000000000000..402fb2de2eb9a --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/RenameNonAtomicUtils.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.services.BlobProperty; +import org.apache.hadoop.fs.azurebfs.utils.TracingContext; + +public class RenameNonAtomicUtils extends RenameAtomicityUtils { + + RenameNonAtomicUtils(final AzureBlobFileSystem azureBlobFileSystem, + final Path srcPath, + final Path dstPath, + final TracingContext tracingContext) throws IOException { + super(azureBlobFileSystem, srcPath, dstPath, tracingContext); + } + + @Override + public void preRename(final List blobPropertyList) + throws IOException { + + } + + @Override + public void cleanup() throws IOException { + + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index e2c9386167427..4d897bb188da6 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -247,5 +247,6 @@ public static String accountProperty(String property, String account) { */ public static final String FS_AZURE_BUFFERED_PREAD_DISABLE = "fs.azure.buffered.pread.disable"; public static final String FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD = "fs.azure.blob.dir.rename.max.thread"; + public static final String FS_AZURE_BLOB_COPY_PROGRESS_POLL_WAIT_MILLIS = "fs.azure.blob.copy.progress.poll.wait.millis"; private ConfigurationKeys() {} } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index f58c61e8908a6..63be6c08399fb 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -78,6 +78,7 @@ public final class FileSystemConfigurations { public static final boolean DEFAULT_AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION = false; public static final String DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES = "/hbase"; + public static final String HBASE_ROOT = "/hbase"; public static final boolean DEFAULT_FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE = true; public static final boolean DEFAULT_FS_AZURE_ENABLE_MKDIR_OVERWRITE = true; public static final String DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES = ""; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/BlobCopyProgress.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/BlobCopyProgress.java new file mode 100644 index 0000000000000..16ab2a0ac0262 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/BlobCopyProgress.java @@ -0,0 +1,8 @@ +package org.apache.hadoop.fs.azurebfs.enums; + +public enum BlobCopyProgress { + SUCCESS, + FAILURE, + ABORTED, + PENDING; +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index d3f4361d1df15..5705f197cf495 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -37,10 +37,9 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.azurebfs.BlobProperty; -import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; import org.apache.hadoop.thirdparty.com.google.common.base.Strings; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.FutureCallback; @@ -1049,7 +1048,7 @@ public AbfsRestOperation copyBlob(Path sourceBlobPath, return op; } - @org.apache.hadoop.classification.VisibleForTesting + @VisibleForTesting AbfsRestOperation getCopyBlobOperation(final URL url, final List requestHeaders) { return new AbfsRestOperation( @@ -1087,44 +1086,39 @@ public AbfsRestOperation getBlobProperty(Path blobPath, /** * Call server API BlobList. * - * @param sourceDirBlobPath path from where the list of blob is requried. - * @param tracingContext object of {@link TracingContext} * @param marker optional value. To be sent in case this method call in a non-first * iteration to the blobList API. Value has to be equal to the field NextMarker in the response * of previous iteration for the same operation. * @param maxResult define how many blobs can client handle in server response. * In case maxResult <= 5000, server sends number of blobs equal to the value. In * case maxResult > 5000, server sends maximum 5000 blobs. - * @param absoluteDirSearch + * @param tracingContext object of {@link TracingContext} * - * @return list of {@link BlobProperty} + * @return abfsRestOperation which contain list of {@link BlobProperty} + * via {@link AbfsRestOperation#getResult()}.{@link AbfsHttpOperation#getBlobList()} * * @throws AzureBlobFileSystemException thrown from server-call / xml-parsing */ - public AbfsRestOperation getListBlobs(Path sourceDirBlobPath, - TracingContext tracingContext, - String marker, + public AbfsRestOperation getListBlobs(String marker, String prefix, - Integer maxResult, final Boolean absoluteDirSearch) + Integer maxResult, + TracingContext tracingContext) throws AzureBlobFileSystemException { AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESTYPE, CONTAINER); abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, QUERY_PARAM_COMP_VALUE_LIST); abfsUriQueryBuilder.addQuery(QUERY_PARAM_INCLUDE, QUERY_PARAM_INCLUDE_VALUE_METADATA); - if (prefix == null) { - prefix = sourceDirBlobPath.toUri().getPath() + (absoluteDirSearch ?"/" : ""); - } - prefix = removeInitialSlash(prefix); + prefix = getDirectoryQueryParameter(prefix); abfsUriQueryBuilder.addQuery(QUERY_PARAM_PREFIX, prefix); if (marker != null) { abfsUriQueryBuilder.addQuery(QUERY_PARAM_MARKER, marker); } if (maxResult != null) { - abfsUriQueryBuilder.addQuery(QUERY_PARAM_MAXRESULT, maxResult + ""); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_MAXRESULT, maxResult.toString()); } - appendSASTokenToQuery(sourceDirBlobPath.toUri().getPath(), - SASTokenProvider.LIST_BLOB_OPERATION, abfsUriQueryBuilder); + appendSASTokenToQuery(null, SASTokenProvider.LIST_BLOB_OPERATION, + abfsUriQueryBuilder); URL url = createRequestUrl(abfsUriQueryBuilder.toString()); final List requestHeaders = createDefaultHeaders(); final AbfsRestOperation op = new AbfsRestOperation( @@ -1138,17 +1132,18 @@ public AbfsRestOperation getListBlobs(Path sourceDirBlobPath, return op; } - private String removeInitialSlash(final String prefix) { - int len = prefix.length(); - for (int i = 0; i < len; i++) { - if (prefix.charAt(i) != '/') { - return prefix.substring(i); - } - } - return null; - } - - public void deleteBlobPath(final Path blobPath, + /** + * Deletes the blob for which the path is given. + * + * @param blobPath path on which blob has to be deleted. + * @param tracingContext tracingContext object for tracing the server calls. + * + * @return abfsRestOpertion + * + * @throws AzureBlobFileSystemException exception thrown from server or due to + * network issue. + */ + public AbfsRestOperation deleteBlobPath(final Path blobPath, final TracingContext tracingContext) throws AzureBlobFileSystemException { AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); String blobRelativePath = blobPath.toUri().getPath(); @@ -1163,18 +1158,8 @@ public void deleteBlobPath(final Path blobPath, HTTP_METHOD_DELETE, url, requestHeaders); - try { - op.execute(tracingContext); - return; - } catch (AzureBlobFileSystemException ex) { - if (!op.hasResult()) { - throw ex; - } - if (op.getResult().getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) { - return; - } - throw ex; - } + op.execute(tracingContext); + return op; } /** diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java index 4ff698fc4244c..d3d6dbe09bea8 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java @@ -29,18 +29,14 @@ import javax.net.ssl.HttpsURLConnection; import javax.net.ssl.SSLSocketFactory; -import javax.xml.parsers.DocumentBuilder; -import javax.xml.parsers.DocumentBuilderFactory; import javax.xml.parsers.ParserConfigurationException; import javax.xml.parsers.SAXParser; +import javax.xml.parsers.SAXParserFactory; import org.apache.hadoop.classification.VisibleForTesting; -import org.apache.hadoop.fs.azurebfs.BlobList; -import org.apache.hadoop.fs.azurebfs.BlobListXmlParser; import org.apache.hadoop.fs.azurebfs.utils.UriUtils; import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory; -import com.microsoft.azure.storage.core.Utility; import org.codehaus.jackson.JsonFactory; import org.codehaus.jackson.JsonParser; import org.codehaus.jackson.JsonToken; @@ -100,6 +96,22 @@ public class AbfsHttpOperation implements AbfsPerfLoggable { private BlobList blobList; private List blockIdList = new ArrayList<>(); + private static final ThreadLocal saxParserThreadLocal + = new ThreadLocal() { + @Override + public SAXParser initialValue() { + SAXParserFactory factory = SAXParserFactory.newInstance(); + factory.setNamespaceAware(true); + try { + return factory.newSAXParser(); + } catch (SAXException e) { + throw new RuntimeException("Unable to create SAXParser", e); + } catch (ParserConfigurationException e) { + throw new RuntimeException("Check parser configuration", e); + } + } + }; + public static AbfsHttpOperation getAbfsHttpOperationWithFixedResult( final URL url, final String method, @@ -415,7 +427,7 @@ public void processResponse(final byte[] buffer, final int offset, final int len if (AbfsHttpConstants.HTTP_METHOD_GET.equals(this.method) && buffer == null) { if (url.toString().contains(COMP_LIST)) { - parsListBlobResponse(stream); + parseListBlobResponse(stream); } else { parseListFilesResponse(stream); } @@ -451,17 +463,20 @@ public void processResponse(final byte[] buffer, final int offset, final int len } } - private void parsListBlobResponse(final InputStream stream) { + /** + * Parse the stream from the response and set {@link #blobList} field of this + * class. + * + * @param stream inputStream from the server-response. + */ + private void parseListBlobResponse(final InputStream stream) { try { - final SAXParser saxParser = Utility.getSAXParser(); + final SAXParser saxParser = saxParserThreadLocal.get(); + saxParser.reset(); BlobList blobList = new BlobList(); saxParser.parse(stream, new BlobListXmlParser(blobList, getBaseUrl())); this.blobList = blobList; - } catch (ParserConfigurationException e) { - throw new RuntimeException(e); - } catch (SAXException e) { - throw new RuntimeException(e); - } catch (IOException e) { + } catch (SAXException | IOException e) { throw new RuntimeException(e); } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/BlobList.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobList.java similarity index 97% rename from hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/BlobList.java rename to hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobList.java index a2d15c9a920da..fdac6559507fd 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/BlobList.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobList.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.fs.azurebfs; +package org.apache.hadoop.fs.azurebfs.services; import java.util.ArrayList; import java.util.List; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/BlobListXmlParser.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobListXmlParser.java similarity index 95% rename from hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/BlobListXmlParser.java rename to hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobListXmlParser.java index 9684fa5205fd7..9a0ad4f4cea38 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/BlobListXmlParser.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobListXmlParser.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.fs.azurebfs; +package org.apache.hadoop.fs.azurebfs.services; import java.util.Stack; @@ -30,6 +30,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DIRECTORY; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HDI_ISFOLDER; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.INVALID_XML; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH; /** * Parses the response inputSteam and populates an object of {@link BlobList}. Parsing @@ -138,8 +139,8 @@ public void endElement(final String uri, if (parentNode.equals(AbfsHttpConstants.BLOB)) { if (currentNode.equals(AbfsHttpConstants.NAME)) { currentBlobProperty.setName(value); - currentBlobProperty.setPath(new Path("/" + value)); - currentBlobProperty.setUrl(url + "/" + value); + currentBlobProperty.setPath(new Path(ROOT_PATH + value)); + currentBlobProperty.setUrl(url + ROOT_PATH + value); } } /* diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/BlobProperty.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobProperty.java similarity index 80% rename from hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/BlobProperty.java rename to hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobProperty.java index 9ea9c09136b9e..71bbb55fd9a90 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/BlobProperty.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobProperty.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.fs.azurebfs; +package org.apache.hadoop.fs.azurebfs.services; import java.util.HashMap; import java.util.Map; @@ -41,51 +41,51 @@ public class BlobProperty { private String blobPrefix; private AzureBlobFileSystemException ex; - BlobProperty() { + public BlobProperty() { } - void setName(String name) { + public void setName(String name) { this.name = name; } - void setUrl(String url) { + public void setUrl(String url) { this.url = url; } - void setBlobPrefix(String blobPrefix) { + public void setBlobPrefix(String blobPrefix) { this.blobPrefix = blobPrefix; } - void addMetadata(String key, String value) { + public void addMetadata(String key, String value) { metadata.put(key, value); } - void setIsDirectory(Boolean isDirectory) { + public void setIsDirectory(Boolean isDirectory) { this.isDirectory = isDirectory; } - void setCopyId(String copyId) { + public void setCopyId(String copyId) { this.copyId = copyId; } - void setCopySourceUrl(String copySourceUrl) { + public void setCopySourceUrl(String copySourceUrl) { this.copySourceUrl = copySourceUrl; } - void setPath(Path path) { + public void setPath(Path path) { this.path = path; } - void setCopyStatus(String copyStatus) { + public void setCopyStatus(String copyStatus) { this.copyStatus = copyStatus; } - void setStatusDescription(String statusDescription) { + public void setStatusDescription(String statusDescription) { this.statusDescription = statusDescription; } - void setContentLength(Long length) { + public void setContentLength(Long length) { this.contentLength = length; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/PathInformation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/PathInformation.java new file mode 100644 index 0000000000000..e4f2790565fbf --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/PathInformation.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +public class PathInformation { + private Boolean pathExists; + private Boolean isDirectory; + + public PathInformation(Boolean pathExists, Boolean isDirectory) { + this.pathExists = pathExists; + this.isDirectory = isDirectory; + } + + public Boolean getPathExists() { + return pathExists; + } + + public Boolean getIsDirectory() { + return isDirectory; + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelegationSASForBlobEndpoint.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelegationSASForBlobEndpoint.java index 9bce4c7041b72..499957513b032 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelegationSASForBlobEndpoint.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelegationSASForBlobEndpoint.java @@ -18,6 +18,9 @@ package org.apache.hadoop.fs.azurebfs; +import java.util.List; + +import org.assertj.core.api.Assertions; import org.junit.Assert; import org.junit.Assume; import org.junit.Before; @@ -25,10 +28,14 @@ import org.mockito.Mockito; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; import org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys; import org.apache.hadoop.fs.azurebfs.extensions.MockDelegationSASTokenProvider; +import org.apache.hadoop.fs.azurebfs.services.AbfsClient; +import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation; import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; import org.apache.hadoop.fs.azurebfs.services.AuthType; +import org.apache.hadoop.fs.azurebfs.services.BlobProperty; import org.apache.hadoop.fs.azurebfs.services.PrefixMode; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; @@ -122,13 +129,49 @@ public void testCopyBlobTakeTime() throws Exception { AzureBlobFileSystem fileSystem = getFileSystem(); AzureBlobFileSystemStore store = Mockito.spy(fileSystem.getAbfsStore()); fileSystem.setAbfsStore(store); - Mockito.doReturn(COPY_STATUS_PENDING).when(store) - .getCopyBlobProgress(Mockito.any(AbfsRestOperation.class)); + AbfsClient client = store.getClient(); + AbfsClient spiedClient = Mockito.spy(client); + store.setClient(spiedClient); + + Mockito.doAnswer(answer -> { + AbfsRestOperation op = Mockito.spy((AbfsRestOperation) answer.callRealMethod()); + AbfsHttpOperation httpOp = Mockito.spy(op.getResult()); + Mockito.doReturn(COPY_STATUS_PENDING).when(httpOp).getResponseHeader( + HttpHeaderConfigurations.X_MS_COPY_STATUS); + Mockito.doReturn(httpOp).when(op).getResult(); + return op; + }).when(spiedClient).copyBlob(Mockito.any(Path.class), Mockito.any(Path.class), + Mockito.any(TracingContext.class)); fileSystem.create(new Path("/test1/file")); fileSystem.rename(new Path("/test1/file"), new Path("/test1/file2")); Assert.assertTrue(fileSystem.exists(new Path("/test1/file2"))); Mockito.verify(store, Mockito.times(1)) - .handleCopyInProgress(Mockito.any(Path.class), Mockito.any( - TracingContext.class), Mockito.any(String.class)); + .handleCopyInProgress(Mockito.any(Path.class), + Mockito.any(TracingContext.class), Mockito.any(String.class)); + } + + @Test + public void testListBlob() throws Exception { + AzureBlobFileSystem fs = getFileSystem(); + int i = 0; + while (i < 10) { + fs.create(new Path("/dir/" + i)); + i++; + } + List blobProperties = fs.getAbfsStore() + .getListBlobs(new Path("dir"), null, + Mockito.mock(TracingContext.class), null, false); + Assertions.assertThat(blobProperties) + .describedAs( + "BlobList should match the number of files created in tests + the directory itself") + .hasSize(11); + + blobProperties = fs.getAbfsStore() + .getListBlobs(new Path("dir"), null, + Mockito.mock(TracingContext.class), null, true); + Assertions.assertThat(blobProperties) + .describedAs( + "BlobList should match the number of files created in tests") + .hasSize(10); } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java index 28d0883e21649..25cd33c4f64a1 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java @@ -42,6 +42,7 @@ import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; import org.apache.hadoop.fs.azurebfs.services.AbfsClient; @@ -189,6 +190,31 @@ public void testPosixRenameDirectory() throws Exception { assertFalse(fs.exists(new Path("testDir2/test1/test2/test3"))); } + @Test + public void testRenameToRoot() throws Exception { + AzureBlobFileSystem fs = getFileSystem(); + fs.mkdirs(new Path("/src1/src2")); + Assert.assertTrue(fs.rename(new Path("/src1/src2"), new Path("/"))); + Assert.assertTrue(fs.exists(new Path("/src2"))); + } + + @Test(expected = IOException.class) + public void testRenameBlobToDstWithColonInPath() throws Exception{ + AzureBlobFileSystem fs = getFileSystem(); + assumeNonHnsAccountBlobEndpoint(fs); + fs.create(new Path("/src")); + fs.rename(new Path("/src"), new Path("/dst:file")); + } + + @Test + public void testRenameBlobInSameDirectoryWithNoMarker() throws Exception { + AzureBlobFileSystem fs = getFileSystem(); + assumeNonHnsAccountBlobEndpoint(fs); + fs.create(new Path("/srcDir/dir/file")); + fs.getAbfsStore().getClient().deleteBlobPath(new Path("/srcDir/dir"), Mockito.mock(TracingContext.class)); + Assert.assertTrue(fs.rename(new Path("/srcDir/dir"), new Path("/srcDir"))); + } + /** *
    * Test to check behaviour of rename API if the destination directory is already
@@ -1273,14 +1299,15 @@ public void testBlobRenameSrcDirHasNoMarker() throws Exception {
     fs.getAbfsStore()
         .getClient()
         .deleteBlobPath(new Path("/test1"), Mockito.mock(TracingContext.class));
-    Assert.assertNull(fs.getAbfsStore()
-        .getBlobPropertyWithNotFoundHandling(new Path("/test1"),
-            Mockito.mock(TracingContext.class)));
+    LambdaTestUtils.intercept(AbfsRestOperationException.class, () -> {
+      fs.getAbfsStore().getBlobProperty(new Path("/test1"),
+              Mockito.mock(TracingContext.class));
+    });
     fs.mkdirs(new Path("/test2"));
     fs.rename(new Path("/test1"), new Path("/test2"));
-    Assert.assertNotNull(fs.getAbfsStore()
-        .getBlobPropertyWithNotFoundHandling(new Path("/test2/test1"),
-            Mockito.mock(TracingContext.class)));
+    Assert.assertTrue(fs.getAbfsStore()
+        .getBlobProperty(new Path("/test2/test1"),
+            Mockito.mock(TracingContext.class)).getIsDirectory());
   }
 
   @Test
@@ -1289,8 +1316,19 @@ public void testCopyBlobTakeTime() throws Exception {
     assumeNonHnsAccountBlobEndpoint(fileSystem);
     AzureBlobFileSystemStore store = Mockito.spy(fileSystem.getAbfsStore());
     fileSystem.setAbfsStore(store);
-    Mockito.doReturn(COPY_STATUS_PENDING).when(store)
-        .getCopyBlobProgress(Mockito.any(AbfsRestOperation.class));
+    AbfsClient client = store.getClient();
+    AbfsClient spiedClient = Mockito.spy(client);
+    store.setClient(spiedClient);
+
+    Mockito.doAnswer(answer -> {
+      AbfsRestOperation op = Mockito.spy((AbfsRestOperation) answer.callRealMethod());
+      AbfsHttpOperation httpOp = Mockito.spy(op.getResult());
+      Mockito.doReturn(COPY_STATUS_PENDING).when(httpOp).getResponseHeader(
+          HttpHeaderConfigurations.X_MS_COPY_STATUS);
+      Mockito.doReturn(httpOp).when(op).getResult();
+      return op;
+    }).when(spiedClient).copyBlob(Mockito.any(Path.class), Mockito.any(Path.class),
+        Mockito.any(TracingContext.class));
     fileSystem.create(new Path("/test1/file"));
     fileSystem.rename(new Path("/test1/file"), new Path("/test1/file2"));
     Assert.assertTrue(fileSystem.exists(new Path("/test1/file2")));
@@ -1305,10 +1343,28 @@ public void testCopyBlobTakeTimeAndEventuallyFail() throws Exception {
     assumeNonHnsAccountBlobEndpoint(fileSystem);
     AzureBlobFileSystemStore store = Mockito.spy(fileSystem.getAbfsStore());
     fileSystem.setAbfsStore(store);
-    Mockito.doReturn(COPY_STATUS_PENDING).when(store)
-        .getCopyBlobProgress(Mockito.any(AbfsRestOperation.class));
-    Mockito.doReturn(COPY_STATUS_FAILED).when(store).getCopyStatus(Mockito.any(
-        AbfsHttpOperation.class));
+    AbfsClient client = store.getClient();
+    AbfsClient spiedClient = Mockito.spy(client);
+    store.setClient(spiedClient);
+
+    Mockito.doAnswer(answer -> {
+      AbfsRestOperation op = Mockito.spy((AbfsRestOperation) answer.callRealMethod());
+      AbfsHttpOperation httpOp = Mockito.spy(op.getResult());
+      Mockito.doReturn(COPY_STATUS_PENDING).when(httpOp).getResponseHeader(
+          HttpHeaderConfigurations.X_MS_COPY_STATUS);
+      Mockito.doReturn(httpOp).when(op).getResult();
+      return op;
+    }).when(spiedClient).copyBlob(Mockito.any(Path.class), Mockito.any(Path.class),
+        Mockito.any(TracingContext.class));
+    Mockito.doAnswer(answer -> {
+      AbfsRestOperation op = Mockito.spy((AbfsRestOperation) answer.callRealMethod());
+      AbfsHttpOperation httpOp = Mockito.spy(op.getResult());
+      Mockito.doReturn(COPY_STATUS_FAILED).when(httpOp).getResponseHeader(
+          HttpHeaderConfigurations.X_MS_COPY_STATUS);
+      Mockito.doReturn(httpOp).when(op).getResult();
+      return op;
+    }).when(spiedClient).getBlobProperty(Mockito.any(Path.class), Mockito.any(TracingContext.class));
+
     fileSystem.create(new Path("/test1/file"));
     Boolean copyBlobFailureCaught = false;
     try {
@@ -1331,10 +1387,28 @@ public void testCopyBlobTakeTimeAndEventuallyAborted() throws Exception {
     assumeNonHnsAccountBlobEndpoint(fileSystem);
     AzureBlobFileSystemStore store = Mockito.spy(fileSystem.getAbfsStore());
     fileSystem.setAbfsStore(store);
-    Mockito.doReturn(COPY_STATUS_PENDING).when(store)
-        .getCopyBlobProgress(Mockito.any(AbfsRestOperation.class));
-    Mockito.doReturn(COPY_STATUS_ABORTED).when(store).getCopyStatus(Mockito.any(
-        AbfsHttpOperation.class));
+    AbfsClient client = store.getClient();
+    AbfsClient spiedClient = Mockito.spy(client);
+    store.setClient(spiedClient);
+
+    Mockito.doAnswer(answer -> {
+      AbfsRestOperation op = Mockito.spy((AbfsRestOperation) answer.callRealMethod());
+      AbfsHttpOperation httpOp = Mockito.spy(op.getResult());
+      Mockito.doReturn(COPY_STATUS_PENDING).when(httpOp).getResponseHeader(
+          HttpHeaderConfigurations.X_MS_COPY_STATUS);
+      Mockito.doReturn(httpOp).when(op).getResult();
+      return op;
+    }).when(spiedClient).copyBlob(Mockito.any(Path.class), Mockito.any(Path.class),
+        Mockito.any(TracingContext.class));
+    Mockito.doAnswer(answer -> {
+      AbfsRestOperation op = Mockito.spy((AbfsRestOperation) answer.callRealMethod());
+      AbfsHttpOperation httpOp = Mockito.spy(op.getResult());
+      Mockito.doReturn(COPY_STATUS_ABORTED).when(httpOp).getResponseHeader(
+          HttpHeaderConfigurations.X_MS_COPY_STATUS);
+      Mockito.doReturn(httpOp).when(op).getResult();
+      return op;
+    }).when(spiedClient).getBlobProperty(Mockito.any(Path.class), Mockito.any(TracingContext.class));
+
     fileSystem.create(new Path("/test1/file"));
     Boolean copyBlobFailureCaught = false;
     try {
@@ -1359,11 +1433,21 @@ public void testCopyBlobTakeTimeAndBlobIsDeleted() throws Exception {
     String srcFile = "/test1/file";
     String dstFile = "/test1/file2";
     fileSystem.setAbfsStore(store);
+    AbfsClient client = store.getClient();
+    AbfsClient spiedClient = Mockito.spy(client);
+    store.setClient(spiedClient);
+
     Mockito.doAnswer(answer -> {
-          fileSystem.delete(new Path(dstFile), false);
-          return COPY_STATUS_PENDING;
-        }).when(store)
-        .getCopyBlobProgress(Mockito.any(AbfsRestOperation.class));
+      AbfsRestOperation op = Mockito.spy((AbfsRestOperation) answer.callRealMethod());
+      fileSystem.delete(new Path(dstFile), false);
+      AbfsHttpOperation httpOp = Mockito.spy(op.getResult());
+      Mockito.doReturn(COPY_STATUS_PENDING).when(httpOp).getResponseHeader(
+          HttpHeaderConfigurations.X_MS_COPY_STATUS);
+      Mockito.doReturn(httpOp).when(op).getResult();
+      return op;
+    }).when(spiedClient).copyBlob(Mockito.any(Path.class), Mockito.any(Path.class),
+        Mockito.any(TracingContext.class));
+
     fileSystem.create(new Path(srcFile));
 
 
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRenameUnicode.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRenameUnicode.java
index 044c325c8c8dc..a4b668b5e4835 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRenameUnicode.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRenameUnicode.java
@@ -20,12 +20,14 @@
 
 import java.util.Arrays;
 
+import org.junit.Assume;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.services.PrefixMode;
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsDirectory;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsFile;
@@ -76,6 +78,8 @@ public ITestAzureBlobFileSystemRenameUnicode() throws Exception {
   @Test
   public void testRenameFileUsingUnicode() throws Exception {
     final AzureBlobFileSystem fs = getFileSystem();
+    Assume.assumeTrue(fs.getAbfsStore().getAbfsConfiguration().getPrefixMode()
+        == PrefixMode.DFS || !destDir.contains(":"));
     Path folderPath1 = new Path(srcDir);
     assertMkdirs(fs, folderPath1);
     assertIsDirectory(fs, folderPath1);
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestListBlob.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestListBlob.java
index d8d4da35ecb01..4ae0a6df29c1e 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestListBlob.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestListBlob.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.fs.azurebfs;
 
 
+import java.io.IOException;
 import java.util.List;
 
 import org.assertj.core.api.Assertions;
@@ -28,6 +29,7 @@
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
+import org.apache.hadoop.fs.azurebfs.services.BlobProperty;
 import org.apache.hadoop.fs.azurebfs.services.PrefixMode;
 import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
 
@@ -42,102 +44,97 @@ public ITestListBlob() throws Exception {
   public void testListBlob() throws Exception {
     AzureBlobFileSystem fs = getFileSystem();
     assumeNonHnsAccountBlobEndpoint(fs);
-    int i = 0;
-    while (i < 10) {
-      fs.create(new Path("/dir/" + i));
-      i++;
-    }
-    List blobProperties = fs.getAbfsStore()
-        .getListBlobs(new Path("dir"),
-            Mockito.mock(TracingContext.class), null, null, false);
+    createBlob(fs, "/dir/");
+    List blobProperties;
+    /*
+     * Call getListBlob for a path with isDefinitiveDirSearch = false. Should give
+     * results including the directory blob(hdi_isfolder=true).
+     */
+    blobProperties = fs.getAbfsStore()
+        .getListBlobs(new Path("dir"), null,
+            Mockito.mock(TracingContext.class), null, false);
     Assertions.assertThat(blobProperties)
         .describedAs(
             "BlobList should match the number of files created in tests + the directory itself")
         .hasSize(11);
 
+    /*
+     * Call getListBlob for a path with isDefinitiveDirSearch = false. Should give
+     * results excluding the directory blob(hdi_isfolder=true).
+     */
     blobProperties = fs.getAbfsStore()
-        .getListBlobs(new Path("dir"),
-            Mockito.mock(TracingContext.class), null, null, true);
+        .getListBlobs(new Path("dir"), null,
+            Mockito.mock(TracingContext.class), null, true);
     Assertions.assertThat(blobProperties)
         .describedAs(
             "BlobList should match the number of files created in tests")
         .hasSize(10);
-  }
 
-  @Test
-  public void testListBlobWithMarkers() throws Exception {
-    AzureBlobFileSystem fs = getFileSystem();
-    assumeNonHnsAccountBlobEndpoint(fs);
-    int i = 0;
-    while (i < 10) {
-      fs.create(new Path("/dir/" + i));
-      i++;
-    }
-    AbfsClient spiedClient = Mockito.spy(fs.getAbfsClient());
-    fs.getAbfsStore().setClient(spiedClient);
-    List blobProperties = fs.getAbfsStore()
-        .getListBlobs(new Path("dir"),
-            Mockito.mock(TracingContext.class), 1, null, false);
+    /*
+     * Call getListBlob for a path with isDefinitiveDirSearch = false with
+     * maxResult more than the number of exact blobs. Should give results including
+     * the directory blob(hdi_isfolder=true).
+     */
+    blobProperties = fs.getAbfsStore()
+        .getListBlobs(new Path("dir"), null,
+            Mockito.mock(TracingContext.class), 13, false);
     Assertions.assertThat(blobProperties)
         .describedAs(
             "BlobList should match the number of files created in tests + the directory itself")
         .hasSize(11);
-    Mockito.verify(spiedClient, Mockito.times(11))
-        .getListBlobs(Mockito.any(Path.class),
-            Mockito.any(TracingContext.class),
-            Mockito.nullable(String.class), Mockito.nullable(String.class),
-            Mockito.anyInt(), Mockito.anyBoolean());
 
+    /*
+     * Call getListBlob for a path with isDefinitiveDirSearch = false with
+     * maxResult lesser than the number of exact blobs. Should give result size
+     * same as the maxResult
+     */
     blobProperties = fs.getAbfsStore()
-        .getListBlobs(new Path("dir"),
-            Mockito.mock(TracingContext.class), 1, null, true);
+        .getListBlobs(new Path("dir"), null,
+            Mockito.mock(TracingContext.class), 5, false);
     Assertions.assertThat(blobProperties)
         .describedAs(
-            "BlobList should match the number of files created in tests + the directory itself")
-        .hasSize(10);
-    Mockito.verify(spiedClient, Mockito.times(21))
-        .getListBlobs(Mockito.any(Path.class),
-            Mockito.any(TracingContext.class),
-            Mockito.nullable(String.class), Mockito.nullable(String.class),
-            Mockito.anyInt(), Mockito.anyBoolean());
+            "BlobList should match the number of maxResult given")
+        .hasSize(5);
   }
 
   @Test
-  public void testListBlobWithMarkersWithMaxResult() throws Exception {
+  public void testListBlobWithMarkers() throws Exception {
     AzureBlobFileSystem fs = getFileSystem();
     assumeNonHnsAccountBlobEndpoint(fs);
-    int i = 0;
-    while (i < 10) {
-      fs.create(new Path("/dir/" + i));
-      i++;
-    }
-    AbfsClient spiedClient = Mockito.spy(fs.getAbfsClient());
+    createBlob(fs, "/dir/");
+    AbfsClient client = fs.getAbfsClient();
+    AbfsClient spiedClient = Mockito.spy(client);
     fs.getAbfsStore().setClient(spiedClient);
+
+    /*
+     * Server can give lesser number of results. In this case, server will give
+     * nextMarker.
+     * In this case, server will return one object, expectation is that the client
+     * uses nextMarker to make calls for the remaining blobs.
+     */
+    int count[] = new int[1];
+    count[0] = 0;
+    Mockito.doAnswer(answer -> {
+      String marker = answer.getArgument(0);
+      String prefix = answer.getArgument(1);
+      TracingContext tracingContext = answer.getArgument(3);
+      count[0]++;
+      return client.getListBlobs(marker, prefix, 1, tracingContext);
+    }).when(spiedClient).getListBlobs(Mockito.nullable(String.class),
+        Mockito.anyString(), Mockito.nullable(Integer.class),
+        Mockito.any(TracingContext.class));
+
     List blobProperties = fs.getAbfsStore()
-        .getListBlobs(new Path("dir"),
-            Mockito.mock(TracingContext.class), 1, 5, false);
+        .getListBlobs(new Path("dir"), null,
+            Mockito.mock(TracingContext.class), 5, false);
     Assertions.assertThat(blobProperties)
         .describedAs(
-            "BlobList should match the number of files created in tests + the directory itself")
+            "BlobList should match the number of maxResult given")
         .hasSize(5);
-    Mockito.verify(spiedClient, Mockito.times(5))
-        .getListBlobs(Mockito.any(Path.class),
-            Mockito.any(TracingContext.class),
-            Mockito.nullable(String.class), Mockito.nullable(String.class),
-            Mockito.anyInt(), Mockito.anyBoolean());
-
-    blobProperties = fs.getAbfsStore()
-        .getListBlobs(new Path("dir"),
-            Mockito.mock(TracingContext.class), 1, 5, true);
-    Assertions.assertThat(blobProperties)
+    Assertions.assertThat(count[0])
         .describedAs(
-            "BlobList should match the number of files created in tests + the directory itself")
-        .hasSize(5);
-    Mockito.verify(spiedClient, Mockito.times(10))
-        .getListBlobs(Mockito.any(Path.class),
-            Mockito.any(TracingContext.class),
-            Mockito.nullable(String.class), Mockito.nullable(String.class),
-            Mockito.anyInt(), Mockito.anyBoolean());
+            "Number of calls to backend should be equal to maxResult given")
+        .isEqualTo(5);
   }
 
   private void assumeNonHnsAccountBlobEndpoint(final AzureBlobFileSystem fs) {
@@ -145,4 +142,13 @@ private void assumeNonHnsAccountBlobEndpoint(final AzureBlobFileSystem fs) {
         fs.getAbfsStore().getAbfsConfiguration().getPrefixMode()
             == PrefixMode.BLOB);
   }
+
+  private void createBlob(final AzureBlobFileSystem fs, final String pathString)
+      throws IOException {
+    int i = 0;
+    while (i < 10) {
+      fs.create(new Path(pathString + i));
+      i++;
+    }
+  }
 }