diff --git a/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml b/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml deleted file mode 100644 index fa6085faa55bb..0000000000000 --- a/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml +++ /dev/null @@ -1,99 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 94a445a703953..e42343a6e8380 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -70,7 +70,6 @@ import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; -import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConcurrentWriteOperationDetectedException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.FileSystemOperationUnhandledException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidFileSystemPropertyException; @@ -762,53 +761,8 @@ private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePa final TracingContext tracingContext) throws IOException { AbfsRestOperation op; AbfsClient createClient = getClientHandler().getIngressClient(); - try { - // Trigger a create with overwrite=false first so that eTag fetch can be - // avoided for cases when no pre-existing file is present (major portion - // of create file traffic falls into the case of no pre-existing file). - op = createClient.createPath(relativePath, true, false, permissions, - isAppendBlob, null, contextEncryptionAdapter, tracingContext); - - } catch (AbfsRestOperationException e) { - if (e.getStatusCode() == HttpURLConnection.HTTP_CONFLICT) { - // File pre-exists, fetch eTag - try { - op = getClient().getPathStatus(relativePath, false, tracingContext, null); - } catch (AbfsRestOperationException ex) { - if (ex.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) { - // Is a parallel access case, as file which was found to be - // present went missing by this request. - throw new ConcurrentWriteOperationDetectedException( - "Parallel access to the create path detected. Failing request " - + "to honor single writer semantics"); - } else { - throw ex; - } - } - - String eTag = extractEtagHeader(op.getResult()); - - try { - // overwrite only if eTag matches with the file properties fetched befpre - op = createClient.createPath(relativePath, true, true, permissions, - isAppendBlob, eTag, contextEncryptionAdapter, tracingContext); - } catch (AbfsRestOperationException ex) { - if (ex.getStatusCode() == HttpURLConnection.HTTP_PRECON_FAILED) { - // Is a parallel access case, as file with eTag was just queried - // and precondition failure can happen only when another file with - // different etag got created. - throw new ConcurrentWriteOperationDetectedException( - "Parallel access to the create path detected. Failing request " - + "to honor single writer semantics"); - } else { - throw ex; - } - } - } else { - throw e; - } - } - + op = createClient.conditionalCreateOverwriteFile(relativePath, statistics, + permissions, isAppendBlob, contextEncryptionAdapter, tracingContext); return op; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/ConcurrentWriteOperationDetectedException.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/ConcurrentWriteOperationDetectedException.java index 79813ddfe6400..0a5839277807d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/ConcurrentWriteOperationDetectedException.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/ConcurrentWriteOperationDetectedException.java @@ -26,7 +26,10 @@ public class ConcurrentWriteOperationDetectedException extends AzureBlobFileSystemException { - public ConcurrentWriteOperationDetectedException(String message) { - super(message); + private static final String ERROR_MESSAGE = "Parallel access to the create path detected. Failing request " + + "to honor single writer semantics"; + + public ConcurrentWriteOperationDetectedException() { + super(ERROR_MESSAGE); } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java index 89416acc9d5b6..07fd4f84de86c 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java @@ -49,6 +49,7 @@ import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; @@ -61,6 +62,7 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsInvalidChecksumException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConcurrentWriteOperationDetectedException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters; import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; @@ -80,6 +82,7 @@ import static java.net.HttpURLConnection.HTTP_NOT_FOUND; import static java.net.HttpURLConnection.HTTP_OK; import static java.net.HttpURLConnection.HTTP_PRECON_FAILED; +import static org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.extractEtagHeader; import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CALL_GET_FILE_STATUS; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ACQUIRE_LEASE_ACTION; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.AND_MARK; @@ -454,29 +457,45 @@ public void createNonRecursivePreCheck(Path parentPath, /** * Get Rest Operation for API - * Put Blob. - * Creates a file or directory(marker file) at specified path. - * @param path of the directory to be created. - * @param tracingContext for tracing the service call. - * @return executed rest operation containing response from server. - * @throws AzureBlobFileSystemException if rest operation fails. + * Put Blob. + * Creates a file or directory (marker file) at the specified path. + * + * @param path the path of the directory to be created. + * @param isFileCreation whether the path to create is a file. + * @param overwrite whether to overwrite if the path already exists. + * @param permissions the permissions to set on the path. + * @param isAppendBlob whether the path is an append blob. + * @param eTag the eTag of the path. + * @param contextEncryptionAdapter the context encryption adapter. + * @param tracingContext the tracing context. + * @return the executed rest operation containing the response from the server. + * @throws AzureBlobFileSystemException if the rest operation fails. */ @Override public AbfsRestOperation createPath(final String path, - final boolean isFile, + final boolean isFileCreation, final boolean overwrite, final AzureBlobFileSystemStore.Permissions permissions, final boolean isAppendBlob, final String eTag, final ContextEncryptionAdapter contextEncryptionAdapter, final TracingContext tracingContext) throws AzureBlobFileSystemException { - return createPath(path, isFile, overwrite, permissions, isAppendBlob, eTag, - contextEncryptionAdapter, tracingContext, false); + AbfsRestOperation op; + if (isFileCreation) { + // Create a file with the specified parameters + op = createFile(path, overwrite, permissions, isAppendBlob, eTag, + contextEncryptionAdapter, tracingContext); + } else { + // Create a directory with the specified parameters + op = createDirectory(path, overwrite, permissions, isAppendBlob, eTag, + contextEncryptionAdapter, tracingContext); + } + return op; } /** * Get Rest Operation for API - * Put Blob. + * Put Blob. * Creates a file or directory (marker file) at the specified path. * * @param path the path of the directory to be created. @@ -486,49 +505,19 @@ public AbfsRestOperation createPath(final String path, * @param isAppendBlob whether the path is an append blob. * @param eTag the eTag of the path. * @param contextEncryptionAdapter the context encryption adapter. - * @param tracingContext the tracing context. - * @param isCreateCalledFromMarkers whether the create is called from markers. + * @param tracingContext the tracing context for the service call. * @return the executed rest operation containing the response from the server. * @throws AzureBlobFileSystemException if the rest operation fails. */ - public AbfsRestOperation createPath(final String path, + public AbfsRestOperation createPathRestOp(final String path, final boolean isFile, final boolean overwrite, final AzureBlobFileSystemStore.Permissions permissions, final boolean isAppendBlob, final String eTag, final ContextEncryptionAdapter contextEncryptionAdapter, - final TracingContext tracingContext, - boolean isCreateCalledFromMarkers) throws AzureBlobFileSystemException { + final TracingContext tracingContext) throws AzureBlobFileSystemException { final List requestHeaders = createDefaultHeaders(); - if (!getIsNamespaceEnabled() && !isCreateCalledFromMarkers) { - AbfsHttpOperation op1Result = null; - try { - op1Result = getPathStatus(path, tracingContext, - null, true).getResult(); - } catch (AbfsRestOperationException ex) { - if (ex.getStatusCode() == HTTP_NOT_FOUND) { - LOG.debug("No directory/path found: {}", path); - } else { - LOG.debug("Failed to get path status for: {}", path, ex); - throw ex; - } - } - if (op1Result != null) { - boolean isDir = checkIsDir(op1Result); - if (isFile == isDir) { - throw new AbfsRestOperationException(HTTP_CONFLICT, - AzureServiceErrorCode.PATH_CONFLICT.getErrorCode(), - PATH_EXISTS, - null); - } - } - Path parentPath = new Path(path).getParent(); - if (parentPath != null && !parentPath.isRoot()) { - createMarkers(parentPath, overwrite, permissions, isAppendBlob, eTag, - contextEncryptionAdapter, tracingContext); - } - } if (isFile) { addEncryptionKeyRequestHeaders(path, requestHeaders, true, contextEncryptionAdapter, tracingContext); @@ -555,31 +544,222 @@ public AbfsRestOperation createPath(final String path, final AbfsRestOperation op = getAbfsRestOperation( AbfsRestOperationType.PutBlob, HTTP_METHOD_PUT, url, requestHeaders); + op.execute(tracingContext); + return op; + } + + /** + * Checks if the specified path is a directory by listing its contents. + * + * @param path the path to check. + * @param tracingContext the tracing context for the service call. + * @return true if the path is a directory and contains entries, false otherwise. + * @throws AzureBlobFileSystemException if the rest operation fails. + */ + private boolean checkDirectoryByList(String path, + TracingContext tracingContext) + throws AzureBlobFileSystemException { + AbfsRestOperation listPathOp = listPath(path, false, 1, null, + tracingContext, false); + AbfsHttpOperation listPathResult = listPathOp.getResult(); + if (listPathResult != null) { + // Determine if the path is a directory by checking if the list result schema has any paths + return !listPathResult.getListResultSchema().paths().isEmpty(); + } + return false; + } + + /** + * Checks if the specified path exists as a directory. + * + * @param path the path of the directory to check. + * @param tracingContext the tracing context for the service call. + * @return true if the directory exists, false otherwise. + * @throws AzureBlobFileSystemException if the rest operation fails. + */ + private boolean checkForDirectoryExistence(String path, + TracingContext tracingContext) + throws AzureBlobFileSystemException { + // Check if the directory contains any entries by listing its contents. + if (checkDirectoryByList(path, tracingContext)) { + // If the list result schema has any paths, it is a directory. + return true; + } else { + // If the directory does not contain any entries, check if it exists as an empty directory. + return checkEmptyDirectoryPathExists(path, tracingContext, true); + } + } + + /** + * Checks the status of the path to determine if it exists and whether it is a file or directory. + * Throws an exception if the path exists as a file. + * + * @param path the path to check + * @param tracingContext the tracing context + * @return true if the path exists and is a directory, false otherwise + * @throws AbfsRestOperationException if the path exists as a file + */ + private boolean checkEmptyDirectoryPathExists(final String path, + final TracingContext tracingContext, boolean isDirCheck) throws AzureBlobFileSystemException { + // If the call is to create a directory, there are 3 possible cases: + // a) a file exists at that path + // b) an empty directory exists + // c) the path does not exist. + AbfsRestOperation getPathStatusOp = null; try { - op.execute(tracingContext); - } catch (AzureBlobFileSystemException ex) { - // If we have no HTTP response, throw the original exception. - if (!op.hasResult()) { + // GetPathStatus call to check if path already exists. + getPathStatusOp = getPathStatus(path, tracingContext, null, false); + } catch (AbfsRestOperationException ex) { + if (ex.getStatusCode() != HTTP_NOT_FOUND) { throw ex; } - if (!isFile && op.getResult().getStatusCode() == HTTP_CONFLICT) { - // This ensures that we don't throw ex only for existing directory but if a blob exists we throw exception. - AbfsHttpOperation opResult = null; - try { - opResult = this.getPathStatus(path, true, tracingContext, null).getResult(); - } catch (AbfsRestOperationException e) { - if (opResult != null) { - LOG.debug("Failed to get path status for: {} during blob type check", path, e); - throw e; - } - } - if (opResult != null && checkIsDir(opResult)) { - return op; + } + if (getPathStatusOp != null) { + // If path exists and is a directory, return true. + boolean isDirectory = checkIsDir(getPathStatusOp.getResult()); + if (!isDirectory && isDirCheck) { + // This indicates path exists as a file, hence throw conflict. + throw new AbfsRestOperationException(HTTP_CONFLICT, + AzureServiceErrorCode.PATH_CONFLICT.getErrorCode(), + PATH_EXISTS, + null); + } else { + return isDirectory; + } + } + return false; + } + + /** + * Creates a directory at the specified path. + * + * @param path the path of the directory to be created. + * @param overwrite whether to overwrite the existing directory. + * @param permissions the permissions to be set for the directory. + * @param isAppendBlob whether the directory is an append blob. + * @param eTag the eTag of the directory. + * @param contextEncryptionAdapter the encryption context adapter. + * @param tracingContext the tracing context for the service call. + * @return the executed rest operation containing the response from the server. + * @throws AzureBlobFileSystemException if the rest operation fails. + */ + private AbfsRestOperation createDirectory(final String path, + final boolean overwrite, + final AzureBlobFileSystemStore.Permissions permissions, + final boolean isAppendBlob, + final String eTag, + final ContextEncryptionAdapter contextEncryptionAdapter, + final TracingContext tracingContext) throws AzureBlobFileSystemException { + if (!getIsNamespaceEnabled()) { + try { + if (checkForDirectoryExistence(path, tracingContext)) { + final AbfsRestOperation successOp = getAbfsRestOperation( + AbfsRestOperationType.PutBlob, + HTTP_METHOD_PUT, createRequestUrl(path, EMPTY_STRING), + createDefaultHeaders()); + successOp.hardSetResult(HttpURLConnection.HTTP_CREATED); + return successOp; } + } catch (AzureBlobFileSystemException ex) { + LOG.error("Path exists as file {} : {}", path, ex.getMessage()); + throw ex; } - throw ex; + createParentMarkersIfNeeded(path, overwrite, permissions, isAppendBlob, + eTag, contextEncryptionAdapter, tracingContext); } - return op; + return createPathRestOp(path, false, true, permissions, + isAppendBlob, eTag, contextEncryptionAdapter, tracingContext); + } + + /** + * Creates markers for the parent path if needed. + * + * @param path the path for which to create parent markers. + * @param overwrite whether to overwrite if the path already exists. + * @param permissions the permissions to set on the path. + * @param isAppendBlob whether the path is an append blob. + * @param eTag the eTag of the path. + * @param contextEncryptionAdapter the context encryption adapter. + * @param tracingContext the tracing context. + * @throws AzureBlobFileSystemException if the creation of markers fails. + */ + private void createParentMarkersIfNeeded(String path, boolean overwrite, AzureBlobFileSystemStore.Permissions permissions, boolean isAppendBlob, String eTag, ContextEncryptionAdapter contextEncryptionAdapter, TracingContext tracingContext) throws AzureBlobFileSystemException { + Path parentPath = new Path(path).getParent(); + if (parentPath != null && !parentPath.isRoot()) { + createMarkers(parentPath, overwrite, permissions, isAppendBlob, eTag, contextEncryptionAdapter, tracingContext); + } + } + + /** + * Validates the path and creates markers if necessary when the namespace is disabled. + * + * @param path the path to validate and create markers for. + * @param overwrite whether to overwrite if the path already exists. + * @param permissions the permissions to set on the path. + * @param isAppendBlob whether the path is an append blob. + * @param eTag the eTag of the path. + * @param contextEncryptionAdapter the context encryption adapter. + * @param tracingContext the tracing context for the service call. + * @throws AbfsRestOperationException if a conflict is detected. + * @throws AzureBlobFileSystemException if the creation of markers fails. + */ + public void validatePathAndCreateMarkersIfNeeded(final String path, + final boolean overwrite, + final AzureBlobFileSystemStore.Permissions permissions, + final boolean isAppendBlob, + final String eTag, + final ContextEncryptionAdapter contextEncryptionAdapter, + final TracingContext tracingContext) throws AbfsRestOperationException, AzureBlobFileSystemException { + if (!getIsNamespaceEnabled()) { + if (checkDirectoryByList(path, tracingContext)) { + throw new AbfsRestOperationException(HTTP_CONFLICT, + AzureServiceErrorCode.PATH_CONFLICT.getErrorCode(), + PATH_EXISTS, + null); + } + createParentMarkersIfNeeded(path, overwrite, permissions, isAppendBlob, + eTag, contextEncryptionAdapter, tracingContext); + } + } + + /** + * Creates a file at the specified path. + * + * @param path the path of the file to be created. + * @param overwrite whether to overwrite if the file already exists. + * @param permissions the permissions to set on the file. + * @param isAppendBlob whether the file is an append blob. + * @param eTag the eTag of the file. + * @param contextEncryptionAdapter the context encryption adapter. + * @param tracingContext the tracing context for the service call. + * @return the executed rest operation containing the response from the server. + * @throws AzureBlobFileSystemException if the rest operation fails. + */ + private AbfsRestOperation createFile(final String path, + final boolean overwrite, + final AzureBlobFileSystemStore.Permissions permissions, + final boolean isAppendBlob, + final String eTag, + final ContextEncryptionAdapter contextEncryptionAdapter, + final TracingContext tracingContext) throws AzureBlobFileSystemException { + if (!getIsNamespaceEnabled()) { + if (checkDirectoryByList(path, tracingContext)) { + throw new AbfsRestOperationException(HTTP_CONFLICT, + AzureServiceErrorCode.PATH_CONFLICT.getErrorCode(), + PATH_EXISTS, + null); + } + if (overwrite && checkEmptyDirectoryPathExists(path, tracingContext, false)) { + throw new AbfsRestOperationException(HTTP_CONFLICT, + AzureServiceErrorCode.PATH_CONFLICT.getErrorCode(), + PATH_EXISTS, + null); + } + createParentMarkersIfNeeded(path, overwrite, permissions, isAppendBlob, + eTag, contextEncryptionAdapter, tracingContext); + } + return createPathRestOp(path, true, overwrite, permissions, + isAppendBlob, eTag, contextEncryptionAdapter, tracingContext); } /** @@ -605,8 +785,13 @@ private void createMarkers(final Path path, checkParentChainForFile(path, tracingContext, keysToCreateAsFolder); for (Path pathToCreate : keysToCreateAsFolder) { - createPath(pathToCreate.toUri().getPath(), false, overwrite, permissions, - isAppendBlob, eTag, contextEncryptionAdapter, tracingContext, true); + try { + createPathRestOp(pathToCreate.toUri().getPath(), false, overwrite, + permissions, + isAppendBlob, eTag, contextEncryptionAdapter, tracingContext); + } catch (AbfsRestOperationException e) { + LOG.debug("No exception to be thrown if marker creation fails"); + } } } @@ -647,6 +832,87 @@ private void checkParentChainForFile(Path path, TracingContext tracingContext, } while (current != null && !current.isRoot()); } + /** + * Conditionally creates or overwrites a file at the specified relative path. + * This method ensures that the file is created or overwritten based on the provided parameters. + * + * @param relativePath The relative path of the file to be created or overwritten. + * @param statistics The file system statistics to be updated. + * @param permissions The permissions to be set on the file. + * @param isAppendBlob Specifies if the file is an append blob. + * @param contextEncryptionAdapter The encryption context adapter for handling encryption. + * @param tracingContext The tracing context for tracking the operation. + * @return An AbfsRestOperation object containing the result of the operation. + * @throws IOException If an I/O error occurs during the operation. + */ + @Override + public AbfsRestOperation conditionalCreateOverwriteFile(String relativePath, + FileSystem.Statistics statistics, + AzureBlobFileSystemStore.Permissions permissions, + boolean isAppendBlob, + ContextEncryptionAdapter contextEncryptionAdapter, + TracingContext tracingContext) throws IOException { + try { + validatePathAndCreateMarkersIfNeeded(relativePath, false, permissions, + isAppendBlob, null, contextEncryptionAdapter, tracingContext); + } catch (AzureBlobFileSystemException ex) { + LOG.error("Path exists as directory {} : {}", relativePath, ex.getMessage()); + throw ex; + } + AbfsRestOperation op; + try { + // Trigger a creation with overwrite=false first so that eTag fetch can be + // avoided for cases when no pre-existing file is present (major portion + // of create file traffic falls into the case of no pre-existing file). + op = createPathRestOp(relativePath, true, false, permissions, + isAppendBlob, null, contextEncryptionAdapter, tracingContext); + } catch (AbfsRestOperationException e) { + if (e.getStatusCode() == HTTP_CONFLICT) { + // File pre-exists, fetch eTag + try { + op = getPathStatus(relativePath, tracingContext, null, false); + } catch (AbfsRestOperationException ex) { + if (ex.getStatusCode() == HTTP_NOT_FOUND) { + // Is a parallel access case, as file which was found to be + // present went missing by this request. + throw new ConcurrentWriteOperationDetectedException(); + } else { + throw ex; + } + } + + // If present as an explicit empty directory, we should throw conflict exception. + boolean isDir = checkIsDir(op.getResult()); + if (isDir) { + throw new AbfsRestOperationException(HTTP_CONFLICT, + AzureServiceErrorCode.PATH_CONFLICT.getErrorCode(), + PATH_EXISTS, + null); + } + + String eTag = extractEtagHeader(op.getResult()); + + try { + // overwrite only if eTag matches with the file properties fetched before + op = createPathRestOp(relativePath, true, true, permissions, + isAppendBlob, eTag, contextEncryptionAdapter, tracingContext); + } catch (AbfsRestOperationException ex) { + if (ex.getStatusCode() == HTTP_PRECON_FAILED) { + // Is a parallel access case, as file with eTag was just queried + // and precondition failure can happen only when another file with + // different etag got created. + throw new ConcurrentWriteOperationDetectedException(); + } else { + throw ex; + } + } + } else { + throw e; + } + } + return op; + } + /** * Get Rest Operation for API * Lease Blob. @@ -1116,8 +1382,8 @@ public AbfsRestOperation setPathProperties(final String path, // This path could be present as an implicit directory in FNS. if (op.getResult().getStatusCode() == HTTP_NOT_FOUND && isNonEmptyListing(path, tracingContext)) { // Implicit path found, create a marker blob at this path and set properties. - this.createPath(path, false, false, null, false, null, - contextEncryptionAdapter, tracingContext, false); + this.createPathRestOp(path, false, false, null, false, null, + contextEncryptionAdapter, tracingContext); // Make sure hdi_isFolder is added to the list of properties to be set. boolean hdiIsFolderExists = properties.containsKey(XML_TAG_HDI_ISFOLDER); if (!hdiIsFolderExists) { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index 47ae988419c85..b6aca80768249 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -47,6 +47,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.Permissions; @@ -552,6 +553,26 @@ public abstract AbfsRestOperation createPath(String path, ContextEncryptionAdapter contextEncryptionAdapter, TracingContext tracingContext) throws AzureBlobFileSystemException; + /** + * Conditionally creates or overwrites a file at the specified relative path. + * This method ensures that the file is created or overwritten based on the provided parameters. + * + * @param relativePath The relative path of the file to be created or overwritten. + * @param statistics The file system statistics to be updated. + * @param permissions The permissions to be set on the file. + * @param isAppendBlob Specifies if the file is an append blob. + * @param contextEncryptionAdapter The encryption context adapter for handling encryption. + * @param tracingContext The tracing context for tracking the operation. + * @return An AbfsRestOperation object containing the result of the operation. + * @throws IOException If an I/O error occurs during the operation. + */ + public abstract AbfsRestOperation conditionalCreateOverwriteFile(String relativePath, + FileSystem.Statistics statistics, + Permissions permissions, + boolean isAppendBlob, + ContextEncryptionAdapter contextEncryptionAdapter, + TracingContext tracingContext) throws IOException; + /** * Performs a pre-check for a createNonRecursivePreCheck operation. Checks if parentPath * exists or not. diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java index 71b89147017d7..8505b533ce4c4 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java @@ -40,6 +40,7 @@ import com.fasterxml.jackson.core.JsonToken; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore; @@ -50,6 +51,7 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsInvalidChecksumException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConcurrentWriteOperationDetectedException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidFileSystemPropertyException; import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters; @@ -431,6 +433,71 @@ public void createNonRecursivePreCheck(Path parentPath, } } + /** + * Conditionally creates or overwrites a file at the specified relative path. + * This method ensures that the file is created or overwritten based on the provided parameters. + * + * @param relativePath The relative path of the file to be created or overwritten. + * @param statistics The file system statistics to be updated. + * @param permissions The permissions to be set on the file. + * @param isAppendBlob Specifies if the file is an append blob. + * @param contextEncryptionAdapter The encryption context adapter for handling encryption. + * @param tracingContext The tracing context for tracking the operation. + * @return An AbfsRestOperation object containing the result of the operation. + * @throws IOException If an I/O error occurs during the operation. + */ + public AbfsRestOperation conditionalCreateOverwriteFile(String relativePath, + FileSystem.Statistics statistics, + AzureBlobFileSystemStore.Permissions permissions, + boolean isAppendBlob, + ContextEncryptionAdapter contextEncryptionAdapter, + TracingContext tracingContext) throws IOException { + AbfsRestOperation op; + try { + // Trigger a create with overwrite=false first so that eTag fetch can be + // avoided for cases when no pre-existing file is present (major portion + // of create file traffic falls into the case of no pre-existing file). + op = createPath(relativePath, true, false, permissions, + isAppendBlob, null, contextEncryptionAdapter, tracingContext); + + } catch (AbfsRestOperationException e) { + if (e.getStatusCode() == HttpURLConnection.HTTP_CONFLICT) { + // File pre-exists, fetch eTag + try { + op = getPathStatus(relativePath, false, tracingContext, null); + } catch (AbfsRestOperationException ex) { + if (ex.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) { + // Is a parallel access case, as file which was found to be + // present went missing by this request. + throw new ConcurrentWriteOperationDetectedException(); + } else { + throw ex; + } + } + + String eTag = extractEtagHeader(op.getResult()); + + try { + // overwrite only if eTag matches with the file properties fetched befpre + op = createPath(relativePath, true, true, permissions, + isAppendBlob, eTag, contextEncryptionAdapter, tracingContext); + } catch (AbfsRestOperationException ex) { + if (ex.getStatusCode() == HttpURLConnection.HTTP_PRECON_FAILED) { + // Is a parallel access case, as file with eTag was just queried + // and precondition failure can happen only when another file with + // different etag got created. + throw new ConcurrentWriteOperationDetectedException(); + } else { + throw ex; + } + } + } else { + throw e; + } + } + return op; + } + /** * Get Rest Operation for API * diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobDeleteHandler.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobDeleteHandler.java index 50bdb96c9e7d6..02604745e35e8 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobDeleteHandler.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobDeleteHandler.java @@ -151,7 +151,7 @@ private void ensurePathParentExist() throws AzureBlobFileSystemException { if (!path.isRoot() && !path.getParent().isRoot()) { try { - getAbfsClient().createPath(path.getParent().toUri().getPath(), + getAbfsClient().createPathRestOp(path.getParent().toUri().getPath(), false, false, null, diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobRenameHandler.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobRenameHandler.java index 4c27752243f4d..1a221fcfd7a3a 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobRenameHandler.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobRenameHandler.java @@ -127,7 +127,7 @@ public boolean execute() throws AzureBlobFileSystemException { RenameAtomicity renameAtomicity = null; if (pathInformation.getIsDirectory() && pathInformation.getIsImplicit()) { - AbfsRestOperation createMarkerOp = getAbfsClient().createPath( + AbfsRestOperation createMarkerOp = getAbfsClient().createPathRestOp( src.toUri().getPath(), false, false, null, false, null, null, tracingContext); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java index 0de2e40ce3d8c..e66afbcaa7492 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java @@ -96,8 +96,8 @@ public void testAbfsHttpSendStatistics() throws IOException { // 1 create request = 1 connection made and 1 send request if (client instanceof AbfsBlobClient && !getIsNamespaceEnabled(fs)) { expectedRequestsSent += (directory); - // Per directory, we have 2 calls :- GetBlobProperties and PutBlob and 1 ListBlobs call (implicit check) for the path. - expectedConnectionsMade += ((directory * 2) + 1); + // Per directory, we have 2 calls :- 1 PutBlob and 1 ListBlobs call. + expectedConnectionsMade += ((directory * 2)); } else { expectedRequestsSent++; expectedConnectionsMade++; @@ -176,12 +176,12 @@ public void testAbfsHttpSendStatistics() throws IOException { * + getFileStatus to fetch the file ETag * + create overwrite=true * = 3 connections and 2 send requests in case of Dfs Client - * = 7 connections (5 GBP and 2 PutBlob calls) in case of Blob Client + * = 1 ListBlob + 2 GPS + 2 PutBlob */ if (fs.getAbfsStore().getAbfsConfiguration().isConditionalCreateOverwriteEnabled()) { if (client instanceof AbfsBlobClient && !getIsNamespaceEnabled(fs)) { expectedRequestsSent += 2; - expectedConnectionsMade += 7; + expectedConnectionsMade += 5; } else { expectedConnectionsMade += 3; expectedRequestsSent += 2; diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java index 81b86088c561f..9544c9f03b386 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java @@ -51,6 +51,7 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient; import org.apache.hadoop.fs.azurebfs.services.AbfsClient; import org.apache.hadoop.fs.azurebfs.services.AbfsClientHandler; +import org.apache.hadoop.fs.azurebfs.services.AbfsDfsClient; import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation; import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; import org.apache.hadoop.fs.azurebfs.services.ITestAbfsClient; @@ -77,8 +78,12 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsFile; import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.nullable; +import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -458,9 +463,10 @@ public void testCreateFileOverwrite(boolean enableConditionalCreateOverwrite) // One request to server to create path should be issued // two calls added for - // 1. getFileStatus on DFS endpoint : 1 - // getFileStatus on Blob endpoint: 2 (Additional List blob call) + // getFileStatus on Blob endpoint: 1 ListBlobcall // 2. actual create call: 1 - createRequestCount += (client instanceof AbfsBlobClient && !getIsNamespaceEnabled(fs) ? 3: 1); + createRequestCount += ( + client instanceof AbfsBlobClient && !getIsNamespaceEnabled(fs) ? 2 : 1); assertAbfsStatistics( CONNECTIONS_MADE, @@ -480,7 +486,8 @@ public void testCreateFileOverwrite(boolean enableConditionalCreateOverwrite) // 1. getFileStatus on DFS endpoint : 1 // getFileStatus on Blob endpoint: 1 (No Additional List blob call as file exists) - createRequestCount += (client instanceof AbfsBlobClient && !getIsNamespaceEnabled(fs) ? 2: 1); + createRequestCount += ( + client instanceof AbfsBlobClient && !getIsNamespaceEnabled(fs) ? 2 : 1); assertAbfsStatistics( CONNECTIONS_MADE, @@ -497,9 +504,12 @@ public void testCreateFileOverwrite(boolean enableConditionalCreateOverwrite) /// One request to server to create path should be issued // two calls added for - // 1. getFileStatus on DFS endpoint : 1 - // getFileStatus on Blob endpoint: 2 (Additional List blob call for non-existing path) + // getFileStatus on Blob endpoint: 1 ListBlobCall + 1 GPS // 2. actual create call: 1 - createRequestCount += (client instanceof AbfsBlobClient && !getIsNamespaceEnabled(fs) ? 3: 1); + // 1 extra call when conditional overwrite is not enabled to check for empty directory + createRequestCount += (client instanceof AbfsBlobClient && !getIsNamespaceEnabled(fs)) + ? (enableConditionalCreateOverwrite ? 2 : 3) + : 1; assertAbfsStatistics( CONNECTIONS_MADE, @@ -515,15 +525,10 @@ public void testCreateFileOverwrite(boolean enableConditionalCreateOverwrite) createRequestCount += (client instanceof AbfsBlobClient && !getIsNamespaceEnabled(fs) ? 1: 0); - // Second actual create call will hap if (enableConditionalCreateOverwrite) { - // Three requests will be sent to server to create path, - // 1. create without overwrite - // 2. GetFileStatus to get eTag - // 3. create with overwrite - createRequestCount += (client instanceof AbfsBlobClient && !getIsNamespaceEnabled(fs) ? 4: 3); + createRequestCount += 3; } else { - createRequestCount++; + createRequestCount += (client instanceof AbfsBlobClient && !getIsNamespaceEnabled(fs)) ? 2 : 1; } assertAbfsStatistics( @@ -576,8 +581,6 @@ public void testNegativeScenariosForCreateOverwriteDisabled() ReflectionUtils.setFinalField(AzureBlobFileSystemStore.class, abfsStore, "clientHandler", clientHandler); ReflectionUtils.setFinalField(AzureBlobFileSystemStore.class, abfsStore, "client", mockClient); - boolean isNamespaceEnabled = abfsStore - .getIsNamespaceEnabled(getTestTracingContext(fs, false)); AbfsRestOperation successOp = mock( AbfsRestOperation.class); @@ -595,6 +598,14 @@ public void testNegativeScenariosForCreateOverwriteDisabled() AbfsRestOperationException preConditionResponseEx = getMockAbfsRestOperationException(HTTP_PRECON_FAILED); + doCallRealMethod().when(mockClient) + .conditionalCreateOverwriteFile(anyString(), + Mockito.nullable(FileSystem.Statistics.class), + Mockito.nullable(AzureBlobFileSystemStore.Permissions.class), + anyBoolean(), + Mockito.nullable(ContextEncryptionAdapter.class), + Mockito.nullable(TracingContext.class)); + // mock for overwrite=false doThrow(conflictResponseEx) // Scn1: GFS fails with Http404 .doThrow(conflictResponseEx) // Scn2: GFS fails with Http500 @@ -627,6 +638,52 @@ public void testNegativeScenariosForCreateOverwriteDisabled() any(AzureBlobFileSystemStore.Permissions.class), any(boolean.class), eq(null), any(), any(TracingContext.class)); + if (mockClient instanceof AbfsBlobClient) { + // Mock for validatePathAndCreateMarkers to do nothing + doNothing().when((AbfsBlobClient) mockClient) + .validatePathAndCreateMarkersIfNeeded(any(String.class), + any(Boolean.class), + any(AzureBlobFileSystemStore.Permissions.class), + any(Boolean.class), + any(String.class), any(ContextEncryptionAdapter.class), + any(TracingContext.class)); + + // mock for overwrite=true + doThrow( + preConditionResponseEx) // Scn3: create overwrite=true fails with Http412 + .doThrow( + serverErrorResponseEx) // Scn4: create overwrite=true fails with Http500 + .when((AbfsBlobClient) mockClient) + .createPathRestOp(any(String.class), eq(true), eq(true), + any(AzureBlobFileSystemStore.Permissions.class), + any(boolean.class), eq(null), any(), + any(TracingContext.class)); + + // mock for overwrite=false + doThrow(conflictResponseEx) // Scn1: GFS fails with Http404 + .doThrow(conflictResponseEx) // Scn2: GFS fails with Http500 + .doThrow( + conflictResponseEx) // Scn3: create overwrite=true fails with Http412 + .doThrow( + conflictResponseEx) // Scn4: create overwrite=true fails with Http500 + .doThrow( + serverErrorResponseEx) + // Scn5: create overwrite=false fails with Http500 + .when((AbfsBlobClient) mockClient) + .createPathRestOp(any(String.class), eq(true), eq(false), + any(AzureBlobFileSystemStore.Permissions.class), + any(boolean.class), eq(null), any(), + any(TracingContext.class)); + + doThrow(fileNotFoundResponseEx) // Scn1: GFS fails with Http404 + .doThrow(serverErrorResponseEx) // Scn2: GFS fails with Http500 + .doReturn(successOp) // Scn3: create overwrite=true fails with Http412 + .doReturn(successOp) // Scn4: create overwrite=true fails with Http500 + .when((AbfsBlobClient) mockClient) + .getPathStatus(any(String.class), any(TracingContext.class), nullable( + ContextEncryptionAdapter.class), eq(false)); + } + // Scn1: GFS fails with Http404 // Sequence of events expected: // 1. create overwrite=false - fail with conflict @@ -749,6 +806,17 @@ public void testMkdirsFailsForSubdirectoryOfExistingFile() throws Exception { .isTrue(); } + /** + * Calling mkdir for existing implicit directory. + * @throws Exception + */ + @Test + public void testMkdirSameFolder() throws Exception { + final AzureBlobFileSystem fs = getFileSystem(); + createAzCopyFolder(new Path("a/b/d")); + fs.mkdirs(new Path("a/b/d")); + } + /** * Try creating file same as an existing directory. * @throws Exception @@ -781,6 +849,40 @@ public void testCreateSameFile() throws Exception { .isTrue(); } + @Test + public void testCreationWithoutConditionalOverwrite() + throws Throwable { + final AzureBlobFileSystem currentFs = getFileSystem(); + Configuration config = new Configuration(this.getRawConfiguration()); + config.set("fs.azure.enable.conditional.create.overwrite", + String.valueOf(false)); + AzureBlobFileSystemStore store = currentFs.getAbfsStore(); + AbfsClient client = store.getClientHandler().getIngressClient(); + + final AzureBlobFileSystem fs = + (AzureBlobFileSystem) FileSystem.newInstance(currentFs.getUri(), + config); + fs.mkdirs(new Path("a/b/c")); + intercept(IOException.class, () -> fs.create(new Path("a/b/c"), true)); + } + + @Test + public void testCreationOverwriteFalseWithoutConditionalOverwrite() + throws Throwable { + final AzureBlobFileSystem currentFs = getFileSystem(); + Configuration config = new Configuration(this.getRawConfiguration()); + config.set("fs.azure.enable.conditional.create.overwrite", + String.valueOf(false)); + AzureBlobFileSystemStore store = currentFs.getAbfsStore(); + AbfsClient client = store.getClientHandler().getIngressClient(); + + final AzureBlobFileSystem fs = + (AzureBlobFileSystem) FileSystem.newInstance(currentFs.getUri(), + config); + fs.mkdirs(new Path("a/b/c")); + intercept(IOException.class, () -> fs.create(new Path("a/b/c"), false)); + } + /** * Creating same file with overwrite flag set to false. * @throws Exception @@ -1375,9 +1477,9 @@ fs, getTestTracingContext(fs, true))) .isTrue(); // Asserting that the directory created by mkdir exists as explicit. - Assertions.assertThat(DirectoryStateHelper.isExplicitDirectory(path, + Assertions.assertThat(DirectoryStateHelper.isImplicitDirectory(path, fs, getTestTracingContext(fs, true))) - .describedAs("Directory created by mkdir does not exist as explicit") + .describedAs("Mkdir created explicit directory") .isTrue(); } @@ -1400,16 +1502,15 @@ public void testMkdirOnExistingImplicitDirWithImplicitParentDir() throws Excepti // Creating a directory on existing implicit directory inside an implicit directory fs.mkdirs(path); - // Asserting that path created by azcopy becomes explicit. - Assertions.assertThat(DirectoryStateHelper.isExplicitDirectory(implicitPath, + Assertions.assertThat(DirectoryStateHelper.isImplicitDirectory(implicitPath, fs, getTestTracingContext(fs, true))) - .describedAs("Path created by azcopy did not become explicit") + .describedAs("Marker is present for path created by azcopy") .isTrue(); - // Asserting that the directory created by mkdir exists as explicit. - Assertions.assertThat(DirectoryStateHelper.isExplicitDirectory(path, + // Asserting that the mkdir didn't create markers for existing directory. + Assertions.assertThat(DirectoryStateHelper.isImplicitDirectory(path, fs, getTestTracingContext(fs, true))) - .describedAs("Directory created by mkdir does not exist as explicit") + .describedAs("Marker is present for existing directory") .isTrue(); } @@ -1435,12 +1536,6 @@ public void testMkdirOnExistingFileWithImplicitParentDir() throws Exception { fs.mkdirs(path); }); - // Asserting that path created by azcopy becomes explicit. - Assertions.assertThat(DirectoryStateHelper.isExplicitDirectory(implicitPath, - fs, getTestTracingContext(fs, true))) - .describedAs("Path created by azcopy did not become explicit") - .isTrue(); - // Asserting that the file still exists at path. Assertions.assertThat(DirectoryStateHelper.isExplicitDirectory(path, fs, getTestTracingContext(fs, true))) @@ -1464,9 +1559,8 @@ public void testImplicitExplicitFolder() throws Exception { Path path = makeQualified(new Path("a/b")); AbfsBlobClient blobClient = (AbfsBlobClient) fs.getAbfsStore().getClient(AbfsServiceType.BLOB); - blobClient.createPath(path.toUri().getPath(), false, true, - null, false, null, null, getTestTracingContext(fs, true), - true); + blobClient.createPathRestOp(path.toUri().getPath(), false, true, + null, false, null, null, getTestTracingContext(fs, true)); fs.mkdirs(new Path("a/b/c/d")); @@ -1504,12 +1598,12 @@ public void testImplicitExplicitFolder1() throws Exception { Path path = makeQualified(new Path("a")); AbfsBlobClient blobClient = (AbfsBlobClient) fs.getAbfsStore().getClient(AbfsServiceType.BLOB); - blobClient.createPath(path.toUri().getPath(), false, true, - null, false, null, null, getTestTracingContext(fs, true), true); + blobClient.createPathRestOp(path.toUri().getPath(), false, true, + null, false, null, null, getTestTracingContext(fs, true)); Path newPath = makeQualified(new Path("a/b/c")); - blobClient.createPath(newPath.toUri().getPath(), false, true, - null, false, null, null, getTestTracingContext(fs, true), true); + blobClient.createPathRestOp(newPath.toUri().getPath(), false, true, + null, false, null, null, getTestTracingContext(fs, true)); fs.mkdirs(new Path("a/b/c/d")); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemMkDir.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemMkDir.java index 0c50e279df27a..e54b98e0b7a6e 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemMkDir.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemMkDir.java @@ -145,7 +145,7 @@ public void testCreateDirOverwrite(boolean enableConditionalCreateOverwrite) // One request to server if (client instanceof AbfsBlobClient && !getIsNamespaceEnabled(fs)) { - // 1 GetBlobProperties + 1 PutBlob call. + // 1 ListBlobs + 1 GetBlobProperties mkdirRequestCount +=2; } else { mkdirRequestCount++; diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestTracingContext.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestTracingContext.java index 506eae7598668..7fb672920cd67 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestTracingContext.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestTracingContext.java @@ -112,7 +112,7 @@ public void checkCorrelationConfigValidation(String clientCorrelationId, //request should not fail for invalid clientCorrelationID AbfsRestOperation op = fs.getAbfsClient() - .createPath(path, false, true, permissions, false, null, null, + .createPath(path, true, true, permissions, false, null, null, tracingContext); int statusCode = op.getResult().getStatusCode(); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java index 6f24015b4ff08..a786b77a7936c 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java @@ -452,8 +452,12 @@ public static AbfsClient getMockAbfsClient(AbfsClient baseAbfsClientInstance, (currentAuthType == AuthType.SharedKey) || (currentAuthType == AuthType.OAuth)); - // TODO : [FnsOverBlob][HADOOP-19234] Update to work with Blob Endpoint as well when Fns Over Blob is ready. - AbfsClient client = mock(AbfsDfsClient.class); + AbfsClient client; + if (AbfsServiceType.DFS.equals(abfsConfig.getFsConfiguredServiceType())) { + client = mock(AbfsDfsClient.class); + } else { + client = mock(AbfsBlobClient.class); + } AbfsPerfTracker tracker = new AbfsPerfTracker( "test", abfsConfig.getAccountName(),