diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index de3c1bee029d2..6f483e3819e4a 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -43,8 +43,10 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException; import org.apache.hadoop.fs.azurebfs.services.AbfsBlobLease; +import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream; import org.apache.hadoop.fs.azurebfs.services.BlobProperty; import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.fs.azurebfs.services.OperativeEndpoint; @@ -116,6 +118,7 @@ import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_DEFAULT; import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.*; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.BLOB_LEASE_ONE_MINUTE_DURATION; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FORWARD_SLASH; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_BLOB_ENDPOINT; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_DNS_PREFIX; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.WASB_DNS_PREFIX; @@ -360,7 +363,7 @@ public FSDataInputStream open(final Path path, final int bufferSize) throws IOEx } private FSDataInputStream open(final Path path, - final Optional options) throws IOException { + final Optional parameters) throws IOException { statIncrement(CALL_OPEN); Path qualifiedPath = makeQualified(path); @@ -369,7 +372,7 @@ private FSDataInputStream open(final Path path, fileSystemId, FSOperationType.OPEN, tracingHeaderFormat, listener); InputStream inputStream = getAbfsStore().openFileForRead(qualifiedPath, - options, statistics, tracingContext); + parameters, statistics, tracingContext); return new FSDataInputStream(inputStream); } catch(AzureBlobFileSystemException ex) { checkException(path, ex); @@ -377,6 +380,15 @@ private FSDataInputStream open(final Path path, } } + /** + * Takes config and other options through + * {@link org.apache.hadoop.fs.impl.OpenFileParameters}. Ensure that + * FileStatus entered is up-to-date, as it will be used to create the + * InputStream (with info such as contentLength, eTag) + * @param path The location of file to be opened + * @param parameters OpenFileParameters instance; can hold FileStatus, + * Configuration, bufferSize and mandatoryKeys + */ @Override protected CompletableFuture openFileWithOptions( final Path path, final OpenFileParameters parameters) throws IOException { @@ -387,7 +399,7 @@ protected CompletableFuture openFileWithOptions( "for " + path); return LambdaUtils.eval( new CompletableFuture<>(), () -> - open(path, Optional.of(parameters.getOptions()))); + open(path, Optional.of(parameters))); } private boolean shouldRedirect(FSOperationType type, TracingContext context) @@ -914,17 +926,40 @@ public FileStatus[] listStatus(final Path f) throws IOException { TracingContext tracingContext = new TracingContext(clientCorrelationId, fileSystemId, FSOperationType.LISTSTATUS, true, tracingHeaderFormat, listener); - FileStatus[] result = getAbfsStore().listStatus(qualifiedPath, tracingContext); + FileStatus[] result = getAbfsStore().listStatus(qualifiedPath, + tracingContext); if (getAbfsStore().getAbfsConfiguration().getPrefixMode() - == PrefixMode.BLOB) { - FileStatus renamePendingFileStatus + == PrefixMode.BLOB && getAbfsStore().isAtomicRenameKey( + qualifiedPath.toUri().getPath() + FORWARD_SLASH)) { + Pair renamePendingJsonAndSrcFileStatusPair = getAbfsStore().getRenamePendingFileStatus(result); - if (renamePendingFileStatus != null) { - RenameAtomicityUtils renameAtomicityUtils = - getRenameAtomicityUtilsForRedo(renamePendingFileStatus.getPath(), - tracingContext); - renameAtomicityUtils.cleanup(renamePendingFileStatus.getPath()); - result = getAbfsStore().listStatus(qualifiedPath, tracingContext); + FileStatus renamePendingSrcFileStatus + = renamePendingJsonAndSrcFileStatusPair.getRight(); + FileStatus renamePendingJsonFileStatus + = renamePendingJsonAndSrcFileStatusPair.getLeft(); + if (renamePendingJsonFileStatus != null) { + final Boolean isRedone; + if (renamePendingSrcFileStatus != null) { + RenameAtomicityUtils renameAtomicityUtils = + getRenameAtomicityUtilsForRedo( + renamePendingJsonFileStatus.getPath(), + tracingContext, + ((AzureBlobFileSystemStore.VersionedFileStatus) renamePendingSrcFileStatus).getEtag(), + getRenamePendingJsonInputStream( + renamePendingJsonFileStatus, tracingContext)); + renameAtomicityUtils.cleanup(renamePendingJsonFileStatus.getPath()); + isRedone = renameAtomicityUtils.isRedone(); + } else { + isRedone = false; + getAbfsStore().delete(renamePendingJsonFileStatus.getPath(), true, + tracingContext); + } + if (isRedone) { + result = getAbfsStore().listStatus(qualifiedPath, tracingContext); + } else { + result = ArrayUtils.removeElement(result, + renamePendingJsonFileStatus); + } } } return result; @@ -934,11 +969,13 @@ public FileStatus[] listStatus(final Path f) throws IOException { } } - RenameAtomicityUtils getRenameAtomicityUtilsForRedo(final Path renamePendingFileStatus, - final TracingContext tracingContext) throws IOException { + RenameAtomicityUtils getRenameAtomicityUtilsForRedo(final Path renamePendingJsonPath, + final TracingContext tracingContext, final String srcEtag, + final AbfsInputStream renamePendingJsonInputStream) throws IOException { return new RenameAtomicityUtils(this, - renamePendingFileStatus, - getAbfsStore().getRedoRenameInvocation(tracingContext)); + renamePendingJsonPath, + getAbfsStore().getRedoRenameInvocation(tracingContext), srcEtag, + renamePendingJsonInputStream); } /** @@ -1052,7 +1089,6 @@ private FileStatus getFileStatus(final Path path, LOG.debug("AzureBlobFileSystem.getFileStatus path: {}", path); statIncrement(CALL_GET_FILE_STATUS); Path qualifiedPath = makeQualified(path); - FileStatus fileStatus; PrefixMode prefixMode = getAbfsStore().getPrefixMode(); AbfsConfiguration abfsConfiguration = getAbfsStore().getAbfsConfiguration(); @@ -1064,22 +1100,42 @@ private FileStatus getFileStatus(final Path path, * Get File Status over Blob Endpoint will Have an additional call * to check if directory is implicit. */ - fileStatus = getAbfsStore().getFileStatus(qualifiedPath, - tracingContext, useBlobEndpoint); - if (getAbfsStore().getPrefixMode() == PrefixMode.BLOB - && fileStatus != null && fileStatus.isDirectory() - && getAbfsStore().isAtomicRenameKey(fileStatus.getPath().toUri().getPath()) - && getAbfsStore().getRenamePendingFileStatusInDirectory(fileStatus, - tracingContext)) { - RenameAtomicityUtils renameAtomicityUtils = getRenameAtomicityUtilsForRedo( - new Path(fileStatus.getPath().toUri().getPath() + SUFFIX), - tracingContext); - renameAtomicityUtils.cleanup( - new Path(fileStatus.getPath().toUri().getPath() + SUFFIX)); - throw new AbfsRestOperationException(HttpURLConnection.HTTP_NOT_FOUND, - AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(), null, - new FileNotFoundException( - qualifiedPath + ": No such file or directory.")); + final FileStatus fileStatus = getAbfsStore().getFileStatus(qualifiedPath, + tracingContext, useBlobEndpoint); + final String filePathStr = qualifiedPath.toUri().getPath(); + if (getAbfsStore().getPrefixMode() == PrefixMode.BLOB + && fileStatus != null && fileStatus.isDirectory() + && getAbfsStore().isAtomicRenameKey(filePathStr)) { + FileStatus renamePendingJsonFileStatus; + try { + renamePendingJsonFileStatus = getAbfsStore().getPathProperty( + makeQualified( + new Path(filePathStr + SUFFIX)), + tracingContext, true); + } catch (AbfsRestOperationException ex) { + if(ex.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) { + renamePendingJsonFileStatus = null; + } else { + throw ex; + } + } + + if (renamePendingJsonFileStatus != null) { + RenameAtomicityUtils renameAtomicityUtils + = getRenameAtomicityUtilsForRedo( + renamePendingJsonFileStatus.getPath(), + tracingContext, + ((AzureBlobFileSystemStore.VersionedFileStatus) fileStatus).getEtag(), + getRenamePendingJsonInputStream(renamePendingJsonFileStatus, tracingContext)); + renameAtomicityUtils.cleanup(renamePendingJsonFileStatus.getPath()); + if (renameAtomicityUtils.isRedone()) { + throw new AbfsRestOperationException( + HttpURLConnection.HTTP_NOT_FOUND, + AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(), null, + new FileNotFoundException( + qualifiedPath + ": No such file or directory.")); + } + } } return fileStatus; } catch (AzureBlobFileSystemException ex) { @@ -1088,6 +1144,16 @@ && getAbfsStore().getRenamePendingFileStatusInDirectory(fileStatus, } } + private AbfsInputStream getRenamePendingJsonInputStream(final FileStatus renamePendingJsonFileStatus, + final TracingContext tracingContext) + throws IOException { + Path qualifiedPath = makeQualified(renamePendingJsonFileStatus.getPath()); + return getAbfsStore().openFileForRead(qualifiedPath, + Optional.of( + new OpenFileParameters().withStatus(renamePendingJsonFileStatus)), + statistics, tracingContext); + } + /** * Break the current lease on an ABFS file if it exists. A lease that is broken cannot be * renewed. A new lease may be obtained on the file immediately. diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 347745fa0c8f8..a2757cdb5517d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -58,6 +58,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException; @@ -138,6 +139,7 @@ import org.apache.hadoop.fs.azurebfs.utils.DateTimeUtils; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import org.apache.hadoop.fs.azurebfs.utils.UriUtils; +import org.apache.hadoop.fs.impl.OpenFileParameters; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsAction; @@ -167,6 +169,8 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COPY_STATUS_FAILED; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COPY_STATUS_SUCCESS; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DIRECTORY; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FILE; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TOKEN_VERSION; @@ -681,6 +685,7 @@ BlobProperty getBlobProperty(Path blobPath, blobProperty.setCopyStatus(opResult.getResponseHeader(X_MS_COPY_STATUS)); blobProperty.setContentLength( Long.parseLong(opResult.getResponseHeader(CONTENT_LENGTH))); + blobProperty.setETag(extractEtagHeader(opResult)); return blobProperty; } @@ -1027,7 +1032,7 @@ private AbfsRestOperation createFileOrMarker(boolean isNormalBlob, String relati } // Fallback plan : default to v1 create flow which will hit dfs endpoint. Config to enable: "fs.azure.ingress.fallback.to.dfs". - public OutputStream createFile(final Path path, final FileSystem.Statistics statistics, final boolean overwrite, + public AbfsOutputStream createFile(final Path path, final FileSystem.Statistics statistics, final boolean overwrite, final FsPermission permission, final FsPermission umask, TracingContext tracingContext, HashMap metadata) throws IOException { try (AbfsPerfInfo perfInfo = startTracking("createFile", "createPath")) { @@ -1222,7 +1227,7 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext( .build(); } - public void createDirectory(final Path path, final FileSystem.Statistics statistics, final FsPermission permission, + public String createDirectory(final Path path, final FileSystem.Statistics statistics, final FsPermission permission, final FsPermission umask, final Boolean checkParentChain, TracingContext tracingContext) @@ -1237,13 +1242,14 @@ public void createDirectory(final Path path, final FileSystem.Statistics statist } boolean blobOverwrite = abfsConfiguration.isEnabledBlobMkdirOverwrite(); - createDirectoryMarkerBlob(path, statistics, permission, umask, tracingContext, + AbfsOutputStream pathDirectoryOutputStream = createDirectoryMarkerBlob( + path, statistics, permission, umask, tracingContext, blobOverwrite); for (Path pathToCreate: keysToCreateAsFolder) { createDirectoryMarkerBlob(pathToCreate, statistics, permission, umask, tracingContext, blobOverwrite); } - return; + return pathDirectoryOutputStream.getETag(); } boolean isNamespaceEnabled = getIsNamespaceEnabled(tracingContext); LOG.debug("Mkdir created via dfs endpoint for the given path {} and config value {} ", @@ -1263,10 +1269,11 @@ public void createDirectory(final Path path, final FileSystem.Statistics statist isNamespaceEnabled ? getOctalNotation(umask) : null, false, null, tracingContext); perfInfo.registerResult(op.getResult()).registerSuccess(true); + return op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG); } } - private void createDirectoryMarkerBlob(final Path path, + private AbfsOutputStream createDirectoryMarkerBlob(final Path path, final FileSystem.Statistics statistics, final FsPermission permission, final FsPermission umask, @@ -1274,7 +1281,7 @@ private void createDirectoryMarkerBlob(final Path path, final boolean blobOverwrite) throws IOException { HashMap metadata = new HashMap<>(); metadata.put(X_MS_META_HDI_ISFOLDER, TRUE); - createFile(path, statistics, blobOverwrite, + return createFile(path, statistics, blobOverwrite, permission, umask, tracingContext, metadata); } @@ -1333,27 +1340,45 @@ private boolean checkPathIsDirectory(Path path, TracingContext tracingContext) t public AbfsInputStream openFileForRead(final Path path, final FileSystem.Statistics statistics, TracingContext tracingContext) throws IOException { - return openFileForRead(path, Optional.empty(), statistics, tracingContext); + return openFileForRead(path, Optional.empty(), statistics, + tracingContext); } - public AbfsInputStream openFileForRead(final Path path, - final Optional options, + public AbfsInputStream openFileForRead(Path path, + final Optional parameters, final FileSystem.Statistics statistics, TracingContext tracingContext) throws IOException { - try (AbfsPerfInfo perfInfo = startTracking("openFileForRead", "getPathStatus")) { + try (AbfsPerfInfo perfInfo = startTracking("openFileForRead", + "getPathStatus")) { LOG.debug("openFileForRead filesystem: {} path: {}", - client.getFileSystem(), - path); + client.getFileSystem(), path); + FileStatus parameterFileStatus = parameters.map(OpenFileParameters::getStatus) + .orElse(null); String relativePath = getRelativePath(path); - boolean useBlobEndpoint = getPrefixMode() == PrefixMode.BLOB; - if (OperativeEndpoint.isReadEnabledOnDFS(getAbfsConfiguration())) { - LOG.debug("GetFileStatus over DFS for open file for read for read config value {} for path {} ", - abfsConfiguration.shouldReadFallbackToDfs(), path); - useBlobEndpoint = false; + + final VersionedFileStatus fileStatus; + if (parameterFileStatus instanceof VersionedFileStatus) { + Preconditions.checkArgument(parameterFileStatus.getPath() + .equals(path.makeQualified(this.uri, path)), + String.format( + "Filestatus path [%s] does not match with given path [%s]", + parameterFileStatus.getPath(), path)); + fileStatus = (VersionedFileStatus) parameterFileStatus; + } else { + if (parameterFileStatus != null) { + LOG.warn( + "Fallback to getPathStatus REST call as provided filestatus " + + "is not of type VersionedFileStatus"); + } + boolean useBlobEndpoint = getPrefixMode() == PrefixMode.BLOB; + if (OperativeEndpoint.isReadEnabledOnDFS(getAbfsConfiguration())) { + LOG.debug("GetFileStatus over DFS for open file for read for read config value {} for path {} ", + abfsConfiguration.shouldReadFallbackToDfs(), path); + useBlobEndpoint = false; + } + fileStatus = (VersionedFileStatus) getFileStatus(path, tracingContext, useBlobEndpoint); } - VersionedFileStatus fileStatus; - fileStatus = (VersionedFileStatus) getFileStatus(path, tracingContext, useBlobEndpoint); boolean isDirectory = fileStatus.isDirectory(); @@ -1372,10 +1397,10 @@ public AbfsInputStream openFileForRead(final Path path, perfInfo.registerSuccess(true); // Add statistics for InputStream - return new AbfsInputStream(client, statistics, - relativePath, contentLength, - populateAbfsInputStreamContext(options), - eTag, tracingContext); + return new AbfsInputStream(client, statistics, relativePath, + contentLength, populateAbfsInputStreamContext( + parameters.map(OpenFileParameters::getOptions)), + eTag, tracingContext); } } @@ -1613,6 +1638,7 @@ private void orchestrateBlobRenameDir(final Path source, blobPropOnSrcNullable = null; } + final String srcDirETag; if (blobPropOnSrcNullable == null) { /* * There is no marker-blob, the client has to create marker blob before @@ -1620,13 +1646,14 @@ private void orchestrateBlobRenameDir(final Path source, */ LOG.debug("Source {} is a directory but there is no marker-blob", source); - createDirectory(source, null, FsPermission.getDirDefault(), + srcDirETag = createDirectory(source, null, FsPermission.getDirDefault(), FsPermission.getUMask( getAbfsConfiguration().getRawConfiguration()), true, tracingContext); } else { LOG.debug("Source {} is a directory but there is a marker-blob", source); + srcDirETag = blobPropOnSrcNullable.getETag(); } /* * If source is a directory, all the blobs in the directory have to be @@ -1642,7 +1669,7 @@ private void orchestrateBlobRenameDir(final Path source, BLOB_LEASE_ONE_MINUTE_DURATION, tracingContext); renameAtomicityUtils.preRename( - isCreateOperationOnBlobEndpoint()); + isCreateOperationOnBlobEndpoint(), srcDirETag); isAtomicRename = true; } else { srcDirLease = null; @@ -2831,8 +2858,21 @@ public void redo(final Path destination, final Path src) } String listSrc = listSrcBuilder.toString(); getListBlobProducer(listSrc, listBlobQueue, null, tracingContext); - AbfsBlobLease abfsBlobLease = getBlobLease(src.toUri().getPath(), - BLOB_LEASE_ONE_MINUTE_DURATION, tracingContext); + final AbfsBlobLease abfsBlobLease; + try { + abfsBlobLease = getBlobLease(src.toUri().getPath(), + BLOB_LEASE_ONE_MINUTE_DURATION, tracingContext); + } catch (AbfsRestOperationException ex) { + /* + * The required blob might be deleted in between the last check (from + * GetFileStatus or ListStatus) and the leaseAcquire. Hence, catching + * HTTP_NOT_FOUND error. + */ + if (ex.getStatusCode() == HTTP_NOT_FOUND) { + return; + } + throw ex; + } renameBlobDir(src, destination, tracingContext, listBlobQueue, abfsBlobLease, true); } @@ -3054,43 +3094,28 @@ private AbfsPerfInfo startTracking(String callerName, String calleeName) { } /** - * Search for a FileStatus corresponding to a RenamePending JSON file. - * @param fileStatuses array of fileStatus from which JSON file has to be searched. - * @return filestatus corresponding to RenamePending JSON file. + * Returns a pair of fileStatuses for the renamePendingJSON file and the renameSource file. + * @param fileStatuses array of fileStatus from which pair has to be searched. + * @return Pair of FileStatus. Left of the pair is fileStatus of renamePendingJson file. + * Right of the pair is fileStatus of renameSource file. */ - public FileStatus getRenamePendingFileStatus(final FileStatus[] fileStatuses) { + public Pair getRenamePendingFileStatus(final FileStatus[] fileStatuses) { + Map fileStatusMap = new HashMap<>(); + FileStatus renamePendingJsonFileStatus = null; + String requiredRenameSrcPath = null; for (FileStatus fileStatus : fileStatuses) { - if (fileStatus.getPath().toUri().getPath().endsWith(SUFFIX)) { - return fileStatus; + String path = fileStatus.getPath().toUri().getPath(); + if (path.equals(requiredRenameSrcPath)) { + return Pair.of(renamePendingJsonFileStatus, fileStatus); } - } - return null; - } - - /** - * For a given directory, returns back the fileStatus information for the - * RenamePending JSON file for the directory. - * - * @param fileStatus FileStatus object of the directory for which JSON file has - * to be searched. - * @param tracingContext TracingContext object for tracing the backend server calls - * for the operation. - * @throws IOException exception thrown from the call to {@link #getPathStatus(Path, TracingContext)} - * method. - */ - public boolean getRenamePendingFileStatusInDirectory(final FileStatus fileStatus, - final TracingContext tracingContext) throws IOException { - try { - getFileStatus( - new Path(fileStatus.getPath().toUri().getPath() + SUFFIX), - tracingContext, true); - return true; - } catch (AbfsRestOperationException ex) { - if (ex.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) { - return false; + fileStatusMap.put(path, fileStatus); + if (path.endsWith(SUFFIX)) { + renamePendingJsonFileStatus = fileStatus; + requiredRenameSrcPath = path.split(SUFFIX)[0]; } - throw ex; } + return Pair.of(renamePendingJsonFileStatus, + fileStatusMap.get(requiredRenameSrcPath)); } /** diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index 14188535b8418..e17db10c91cd1 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -148,7 +148,16 @@ public AbfsInputStream( this.streamStatistics = abfsInputStreamContext.getStreamStatistics(); this.inputStreamId = createInputStreamId(); this.tracingContext = new TracingContext(tracingContext); - this.tracingContext.setOperation(FSOperationType.READ); + /* + * If this inputStream is getting opened in listStatus or GetFileStatus, it would + * be in the flow of rename-resume. It required that all operations have the + * same primaryId and opType as that of the listStatus or getFileStatus which + * is invoking the rename-resume. + */ + if (tracingContext.getOpType() != FSOperationType.LISTSTATUS + && this.tracingContext.getOpType() != FSOperationType.GET_FILESTATUS) { + this.tracingContext.setOperation(FSOperationType.READ); + } this.tracingContext.setStreamID(inputStreamId); this.context = abfsInputStreamContext; readAheadBlockSize = abfsInputStreamContext.getReadAheadBlockSize(); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameAtomicityUtils.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameAtomicityUtils.java index 9f7b18a375560..7490d3c090a4b 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameAtomicityUtils.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameAtomicityUtils.java @@ -39,7 +39,6 @@ import org.slf4j.LoggerFactory; import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; @@ -50,7 +49,7 @@ /** * For a directory enabled for atomic-rename, before rename starts, a * file with -RenamePending.json suffix is created. In this file, the states required - * for the rename are given. This file is created by {@link #preRename(Boolean)} ()} method. + * for the rename are given. This file is created by {@link #preRename(Boolean, String)} ()} method. * This is important in case the JVM process crashes during rename, the atomicity * will be maintained, when the job calls {@link AzureBlobFileSystem#listStatus(Path)} * or {@link AzureBlobFileSystem#getFileStatus(Path)}. On these API calls to filesystem, @@ -66,6 +65,7 @@ public class RenameAtomicityUtils { private Path srcPath; private Path dstPath; private TracingContext tracingContext; + private Boolean isReDone; private static final int MAX_RENAME_PENDING_FILE_SIZE = 10000000; private static final int FORMATTING_BUFFER = 10000; @@ -87,22 +87,30 @@ public RenameAtomicityUtils(final AzureBlobFileSystem azureBlobFileSystem, } public RenameAtomicityUtils(final AzureBlobFileSystem azureBlobFileSystem, - final Path path, final RedoRenameInvocation redoRenameInvocation) + final Path renamePendingJsonPath, + final RedoRenameInvocation redoRenameInvocation, + final String srcEtag, + final AbfsInputStream renamePendingJsonInputStream) throws IOException { this.azureBlobFileSystem = azureBlobFileSystem; - final RenamePendingFileInfo renamePendingFileInfo = readFile(path); - if (renamePendingFileInfo != null) { + final RenamePendingFileInfo renamePendingFileInfo = readFile( + renamePendingJsonPath, renamePendingJsonInputStream); + if (renamePendingFileInfo != null + && renamePendingFileInfo.eTag.equalsIgnoreCase(srcEtag)) { redoRenameInvocation.redo(renamePendingFileInfo.destination, renamePendingFileInfo.src); + isReDone = true; + } else { + isReDone = false; } } - private RenamePendingFileInfo readFile(final Path redoFile) + private RenamePendingFileInfo readFile(final Path redoFile, + final AbfsInputStream redoFileInputStream) throws IOException { Path f = redoFile; - FSDataInputStream input = azureBlobFileSystem.open(f); byte[] bytes = new byte[MAX_RENAME_PENDING_FILE_SIZE]; - int l = input.read(bytes); + int l = redoFileInputStream.read(bytes); if (l <= 0) { // Jira HADOOP-12678 -Handle empty rename pending metadata file during // atomic rename in redo path. If during renamepending file is created @@ -148,13 +156,17 @@ private RenamePendingFileInfo readFile(final Path redoFile) // initialize this object's fields JsonNode oldFolderName = json.get("OldFolderName"); JsonNode newFolderName = json.get("NewFolderName"); + JsonNode eTag = json.get("ETag"); + if (oldFolderName != null && StringUtils.isNotEmpty( oldFolderName.textValue()) && newFolderName != null && StringUtils.isNotEmpty( - newFolderName.textValue())) { + newFolderName.textValue()) && eTag != null && StringUtils.isNotEmpty( + eTag.textValue())) { RenamePendingFileInfo renamePendingFileInfo = new RenamePendingFileInfo(); renamePendingFileInfo.destination = new Path(newFolderName.textValue()); renamePendingFileInfo.src = new Path(oldFolderName.textValue()); + renamePendingFileInfo.eTag = eTag.textValue(); return renamePendingFileInfo; } return null; @@ -186,6 +198,7 @@ private void deleteRenamePendingFile(FileSystem fs, Path redoFile) * OperationTime: "", * OldFolderName: "", * NewFolderName: "" + * ETag: "" * } * * Here's a sample: @@ -194,15 +207,17 @@ private void deleteRenamePendingFile(FileSystem fs, Path redoFile) * OperationUTCTime: "2014-07-01 23:50:35.572", * OldFolderName: "user/ehans/folderToRename", * NewFolderName: "user/ehans/renamedFolder" + * ETag: "ETag" * } } * @throws IOException Thrown when fail to write file. */ - public void preRename(final Boolean isCreateOperationOnBlobEndpoint) throws IOException { + public void preRename(final Boolean isCreateOperationOnBlobEndpoint, + final String eTag) throws IOException { Path path = getRenamePendingFilePath(); LOG.debug("Preparing to write atomic rename state to {}", path.toString()); OutputStream output = null; - String contents = makeRenamePendingFileContents(); + String contents = makeRenamePendingFileContents(eTag); // Write file. try { @@ -261,10 +276,13 @@ private Throwable getWrappedException(final IOException e) { * * @return JSON string which represents the operation. */ - private String makeRenamePendingFileContents() { + private String makeRenamePendingFileContents(String eTag) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); sdf.setTimeZone(TimeZone.getTimeZone("UTC")); String time = sdf.format(new Date()); + if(!eTag.startsWith("\"") && !eTag.endsWith("\"")) { + eTag = quote(eTag); + } // Make file contents as a string. Again, quote file names, escaping // characters as appropriate. @@ -272,7 +290,8 @@ private String makeRenamePendingFileContents() { + " FormatVersion: \"1.0\",\n" + " OperationUTCTime: \"" + time + "\",\n" + " OldFolderName: " + quote(srcPath.toUri().getPath()) + ",\n" - + " NewFolderName: " + quote(dstPath.toUri().getPath()) + "\n" + + " NewFolderName: " + quote(dstPath.toUri().getPath()) + ",\n" + + " ETag: " + eTag + "\n" + "}\n"; return contents; @@ -366,10 +385,15 @@ private Path getRenamePendingFilePath() { private static class RenamePendingFileInfo { public Path destination; public Path src; + public String eTag; } public static interface RedoRenameInvocation { void redo(Path destination, Path src) throws AzureBlobFileSystemException; } + + public Boolean isRedone() { + return isReDone; + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameNonAtomicUtils.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameNonAtomicUtils.java index 786c22a9c4cfe..3a106d46fb67c 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameNonAtomicUtils.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameNonAtomicUtils.java @@ -34,7 +34,8 @@ public RenameNonAtomicUtils(final AzureBlobFileSystem azureBlobFileSystem, } @Override - public void preRename(final Boolean isCreateOperationOnBlobEndpoint) + public void preRename(final Boolean isCreateOperationOnBlobEndpoint, + final String eTag) throws IOException { } @@ -43,4 +44,9 @@ public void preRename(final Boolean isCreateOperationOnBlobEndpoint) public void cleanup() throws IOException { } + + @Override + public Boolean isRedone() { + return true; + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java index cca2a94add2bc..3a44121d1ea69 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java @@ -41,6 +41,7 @@ import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenManager; +import org.apache.hadoop.fs.azurebfs.services.AbfsClient; import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; import org.apache.hadoop.fs.azurebfs.services.AuthType; import org.apache.hadoop.fs.azurebfs.services.ITestAbfsClient; @@ -449,7 +450,13 @@ public AbfsClient getClient(final AzureBlobFileSystem fs) { } public boolean isNamespaceEnabled(final AzureBlobFileSystem fs) throws AzureBlobFileSystemException { - return fs.getAbfsStore().getIsNamespaceEnabled(getTestTracingContext(fs, true)); + return fs.getAbfsStore() + .getIsNamespaceEnabled(getTestTracingContext(fs, true)); + } + + public void setAbfsClient(AzureBlobFileSystemStore abfsStore, + AbfsClient client) { + abfsStore.setClient(client); } public Path makeQualified(Path path) throws java.io.IOException { diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFileStatus.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFileStatus.java index dd114aa3131a2..83842319a833b 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFileStatus.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFileStatus.java @@ -315,12 +315,22 @@ public void testGetPathPropertyCalled() throws Exception { Assert.assertFalse(fileStatus.isDirectory()); - Mockito.verify(store, times(1)).getPathProperty(Mockito.any(Path.class), + Mockito.verify(store, times(2)).getPathProperty(Mockito.any(Path.class), Mockito.any(TracingContext.class), Mockito.any(Boolean.class)); - Mockito.verify(store, times(0)).getListBlobs(Mockito.any(Path.class), - Mockito.nullable(String.class), Mockito.nullable(String.class), - Mockito.any(TracingContext.class), Mockito.any(Integer.class), - Mockito.any(Boolean.class)); + final AbfsConfiguration configuration= fs.getAbfsStore().getAbfsConfiguration(); + final int listBlobAssertionTimes; + if (!configuration.shouldMkdirFallbackToDfs() + && !configuration.shouldReadFallbackToDfs() + && !configuration.shouldIngressFallbackToDfs()) { + listBlobAssertionTimes = 1; + } else{ + listBlobAssertionTimes = 0; + } + + Mockito.verify(store, times(listBlobAssertionTimes)).getListBlobs(Mockito.any(Path.class), + Mockito.nullable(String.class), Mockito.nullable(String.class), + Mockito.any(TracingContext.class), Mockito.any(Integer.class), + Mockito.any(Boolean.class)); } @Test @@ -335,9 +345,19 @@ public void testGetPathPropertyCalledImplicit() throws Exception { Assert.assertTrue(fileStatus.isDirectory()); + final AbfsConfiguration configuration= fs.getAbfsStore().getAbfsConfiguration(); + final int listBlobAssertionTimes; + if (!configuration.shouldMkdirFallbackToDfs() + && !configuration.shouldReadFallbackToDfs() + && !configuration.shouldIngressFallbackToDfs()) { + listBlobAssertionTimes = 1; + } else{ + listBlobAssertionTimes = 0; + } + Mockito.verify(store, times(1)).getPathProperty(Mockito.any(Path.class), Mockito.any(TracingContext.class), Mockito.any(Boolean.class)); - Mockito.verify(store, times(1)).getListBlobs(Mockito.any(Path.class), + Mockito.verify(store, times(listBlobAssertionTimes)).getListBlobs(Mockito.any(Path.class), Mockito.nullable(String.class), Mockito.nullable(String.class), Mockito.any(TracingContext.class), Mockito.any(Integer.class), Mockito.any(Boolean.class)); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatus.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatus.java index 9d8f0e7fd2c7b..cfdbb8a0f27fa 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatus.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatus.java @@ -41,6 +41,8 @@ import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; +import org.apache.hadoop.fs.azurebfs.services.PrefixMode; +import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator; import org.apache.hadoop.fs.contract.ContractTestUtils; import org.mockito.Mock; @@ -479,4 +481,51 @@ private void assertDirectoryFileStatus(final FileStatus fileStatus, Assertions.assertThat(fileStatus.isFile()).isEqualTo(false); Assertions.assertThat(fileStatus.getLen()).isEqualTo(0); } + + @Test + public void testListStatusNotTriesToRenameResumeForNonAtomicDir() + throws Exception { + Assume.assumeTrue(getPrefixMode(getFileSystem()) == PrefixMode.BLOB); + AzureBlobFileSystem fs = Mockito.spy(getFileSystem()); + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + Mockito.doReturn(store).when(fs).getAbfsStore(); + Mockito.doReturn(new FileStatus[1]) + .when(store) + .listStatus(Mockito.any(Path.class), Mockito.any( + TracingContext.class)); + fs.listStatus(new Path("/testDir/")); + Mockito.verify(store, Mockito.times(0)) + .getRenamePendingFileStatus(Mockito.any(FileStatus[].class)); + } + + @Test + public void testListStatusTriesToRenameResumeForAtomicDir() throws Exception { + Assume.assumeTrue(getPrefixMode(getFileSystem()) == PrefixMode.BLOB); + AzureBlobFileSystem fs = Mockito.spy(getFileSystem()); + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + Mockito.doReturn(store).when(fs).getAbfsStore(); + Mockito.doReturn(new FileStatus[0]) + .when(store) + .listStatus(Mockito.any(Path.class), Mockito.any( + TracingContext.class)); + fs.listStatus(new Path("/hbase/")); + Mockito.verify(store, Mockito.times(1)) + .getRenamePendingFileStatus(Mockito.any(FileStatus[].class)); + } + + @Test + public void testListStatusTriesToRenameResumeForAbsoluteAtomicDir() + throws Exception { + Assume.assumeTrue(getPrefixMode(getFileSystem()) == PrefixMode.BLOB); + AzureBlobFileSystem fs = Mockito.spy(getFileSystem()); + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + Mockito.doReturn(store).when(fs).getAbfsStore(); + Mockito.doReturn(new FileStatus[0]) + .when(store) + .listStatus(Mockito.any(Path.class), Mockito.any( + TracingContext.class)); + fs.listStatus(new Path("/hbase")); + Mockito.verify(store, Mockito.times(1)) + .getRenamePendingFileStatus(Mockito.any(FileStatus[].class)); + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java index 5e60a362b43b4..a9a1da911f99a 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java @@ -58,6 +58,7 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsClientTestUtil; import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOpTestUtil; import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation; +import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream; import org.apache.hadoop.fs.azurebfs.services.AbfsLease; import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperationTestUtil; @@ -67,9 +68,10 @@ import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import org.apache.hadoop.fs.azurebfs.services.PrefixMode; import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator; +import org.apache.hadoop.fs.impl.OpenFileParameters; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_LEASE_CREATE_NON_RECURSIVE; -import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_PRODUCER_QUEUE_MAX_SIZE; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_MKDIRS_FALLBACK_TO_DFS; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_REDIRECT_RENAME; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TRUE; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_INGRESS_FALLBACK_TO_DFS; @@ -434,16 +436,11 @@ public void testHBaseHandlingForFailedRename() throws Exception { //call listPath API, it will recover the rename atomicity. final AzureBlobFileSystem spiedFsForListPath = Mockito.spy(fs); - final int[] openRequiredFile = new int[1]; - openRequiredFile[0] = 0; - Mockito.doAnswer(answer -> { - final Path path = answer.getArgument(0); - if (("/" + "hbase/test1/test2/test3" + SUFFIX).equalsIgnoreCase( - path.toUri().getPath())) { - openRequiredFile[0] = 1; - } - return fs.open(path); - }).when(spiedFsForListPath).open(Mockito.any(Path.class)); + final AzureBlobFileSystemStore spiedStoreForListPath = Mockito.spy( + spiedFsForListPath.getAbfsStore()); + Mockito.doReturn(spiedStoreForListPath) + .when(spiedFsForListPath) + .getAbfsStore(); /* * Check if the fs.delete is on the renameJson file. @@ -502,7 +499,14 @@ public void testHBaseHandlingForFailedRename() throws Exception { } } spiedFsForListPath.listStatus(new Path("hbase/test1/test2")); - Assert.assertTrue(openRequiredFile[0] == 1); + /* + * The invocation of getPathProperty will happen on the reinvocation of listStatus + * of /hbase/test2/test3 as after the resume, there will be no listing for the path + * and hence, getFileStatus would be required. + */ + Mockito.verify(spiedStoreForListPath, Mockito.times(1)) + .getPathProperty(Mockito.any(Path.class), + Mockito.any(TracingContext.class), Mockito.anyBoolean()); Assert.assertTrue(deletedCount.get() == 3); Assert.assertFalse(spiedFsForListPath.exists(new Path(failedCopyPath))); Assert.assertTrue(spiedFsForListPath.exists(new Path( @@ -573,16 +577,11 @@ public void testHBaseHandlingForFailedRenameForNestedSourceThroughListFile() //call listPath API, it will recover the rename atomicity. final AzureBlobFileSystem spiedFsForListPath = Mockito.spy(fs); - final int[] openRequiredFile = new int[1]; - openRequiredFile[0] = 0; - Mockito.doAnswer(answer -> { - final Path path = answer.getArgument(0); - if (("/" + "hbase/test1/test2" + SUFFIX).equalsIgnoreCase( - path.toUri().getPath())) { - openRequiredFile[0] = 1; - } - return fs.open(path); - }).when(spiedFsForListPath).open(Mockito.any(Path.class)); + final AzureBlobFileSystemStore spiedStoreForListPath = Mockito.spy( + spiedFsForListPath.getAbfsStore()); + Mockito.doReturn(spiedStoreForListPath) + .when(spiedFsForListPath) + .getAbfsStore(); /* * Check if the fs.delete is on the renameJson file. @@ -645,7 +644,14 @@ public void testHBaseHandlingForFailedRenameForNestedSourceThroughListFile() } final FileStatus[] listFileResult = spiedFsForListPath.listStatus( new Path("hbase/test1")); - Assert.assertTrue(openRequiredFile[0] == 1); + /* + * The invocation of getPathProperty will happen on the reinvocation of listStatus + * of /hbase/test2/test3 as after the resume, there will be no listing for the path + * and hence, getFileStatus would be required. + */ + Mockito.verify(spiedStoreForListPath, Mockito.times(1)) + .getPathProperty(Mockito.any(Path.class), + Mockito.any(TracingContext.class), Mockito.anyBoolean()); Assert.assertTrue(deletedCount.get() == 3); Assert.assertFalse(spiedFsForListPath.exists(new Path(failedCopyPath))); Assert.assertTrue(spiedFsForListPath.exists(new Path( @@ -718,16 +724,11 @@ public void testHBaseHandlingForFailedRenameForNestedSourceThroughGetPathStatus( //call listPath API, it will recover the rename atomicity. final AzureBlobFileSystem spiedFsForListPath = Mockito.spy(fs); - final int[] openRequiredFile = new int[1]; - openRequiredFile[0] = 0; - Mockito.doAnswer(answer -> { - final Path path = answer.getArgument(0); - if (("/" + "hbase/test1/test2" + SUFFIX).equalsIgnoreCase( - path.toUri().getPath())) { - openRequiredFile[0] = 1; - } - return fs.open(path); - }).when(spiedFsForListPath).open(Mockito.any(Path.class)); + final AzureBlobFileSystemStore spiedStoreForListPath = Mockito.spy( + spiedFsForListPath.getAbfsStore()); + Mockito.doReturn(spiedStoreForListPath) + .when(spiedFsForListPath) + .getAbfsStore(); /* * Check if the fs.delete is on the renameJson file. @@ -798,7 +799,13 @@ public void testHBaseHandlingForFailedRenameForNestedSourceThroughGetPathStatus( } Assert.assertTrue(notFoundExceptionReceived); Assert.assertNull(fileStatus); - Assert.assertTrue(openRequiredFile[0] == 1); + /* + * GetPathProperty on store would be called to get FileStatus for srcDirectory + * and the corresponding renamePendingJson file. + */ + Mockito.verify(spiedStoreForListPath, Mockito.times(2)) + .getPathProperty(Mockito.any(Path.class), + Mockito.any(TracingContext.class), Mockito.anyBoolean()); Assert.assertTrue(deletedCount.get() == 3); Assert.assertFalse(spiedFsForListPath.exists(new Path(failedCopyPath))); Assert.assertTrue(spiedFsForListPath.exists(new Path( @@ -1013,7 +1020,7 @@ public void testInvalidJsonForRenamePendingFile() throws Exception { } @Test - public void testEmptyDirRenameResolveFromListStatus() throws Exception { + public void testEmptyDirRenameResolveFromGetFileStatus() throws Exception { final AzureBlobFileSystem fs = this.getFileSystem(); assumeNonHnsAccountBlobEndpoint(fs); String srcDir = "/hbase/test1/test2/test3"; @@ -1063,15 +1070,11 @@ public void testEmptyDirRenameResolveFromListStatus() throws Exception { } final AzureBlobFileSystem spiedFsForListPath = Mockito.spy(fs); - final int[] openRequiredFile = new int[1]; - openRequiredFile[0] = 0; - Mockito.doAnswer(answer -> { - final Path path = answer.getArgument(0); - if ((srcDir + SUFFIX).equalsIgnoreCase(path.toUri().getPath())) { - openRequiredFile[0] = 1; - } - return fs.open(path); - }).when(spiedFsForListPath).open(Mockito.any(Path.class)); + final AzureBlobFileSystemStore spiedStoreForListPath = Mockito.spy( + spiedFsForListPath.getAbfsStore()); + Mockito.doReturn(spiedStoreForListPath) + .when(spiedFsForListPath) + .getAbfsStore(); /* * Check if the fs.delete is on the renameJson file. @@ -1133,7 +1136,13 @@ public void testEmptyDirRenameResolveFromListStatus() throws Exception { } Assert.assertTrue(notFoundExceptionReceived); Assert.assertNull(fileStatus); - Assert.assertTrue(openRequiredFile[0] == 1); + /* + * GetFileStatus on store would be called to get FileStatus for srcDirectory + * and the corresponding renamePendingJson file. + */ + Mockito.verify(spiedStoreForListPath, Mockito.times(2)) + .getPathProperty(Mockito.any(Path.class), + Mockito.any(TracingContext.class), Mockito.anyBoolean()); Assert.assertTrue(deletedCount.get() == 3); Assert.assertFalse(spiedFsForListPath.exists(new Path(srcDir))); Assert.assertTrue(spiedFsForListPath.getFileStatus( @@ -2071,7 +2080,7 @@ public void testBlobRenameResumeWithListStatus() throws Exception { Mockito.doReturn(store).when(fs).getAbfsStore(); store.setClient(client); - renameFailureSetup(fs, client); + renameFailureSetup(fs, client, false); AtomicInteger copied = assertTracingContextOnRenameResumeProcess(fs, store, client, FSOperationType.LISTSTATUS); @@ -2096,7 +2105,7 @@ public void testBlobRenameResumeWithGetFileStatus() throws Exception { Mockito.doReturn(store).when(fs).getAbfsStore(); store.setClient(client); - renameFailureSetup(fs, client); + renameFailureSetup(fs, client, false); AtomicInteger copied = assertTracingContextOnRenameResumeProcess(fs, store, client, FSOperationType.GET_FILESTATUS); @@ -2108,7 +2117,7 @@ public void testBlobRenameResumeWithGetFileStatus() throws Exception { } private void renameFailureSetup(final AzureBlobFileSystem fs, - final AbfsClient client) + final AbfsClient client, final Boolean srcMarkerToBeDeleted) throws Exception { fs.mkdirs(new Path("/hbase/testDir")); ExecutorService executorService = Executors.newFixedThreadPool(5); @@ -2123,6 +2132,11 @@ private void renameFailureSetup(final AzureBlobFileSystem fs, future.get(); } + if (srcMarkerToBeDeleted) { + client.deleteBlobPath(new Path("/hbase/testDir"), null, + Mockito.mock(TracingContext.class)); + } + AbfsRestOperation op = client.acquireBlobLease("/hbase/testDir/file5", -1, Mockito.mock(TracingContext.class)); String leaseId = op.getResult() @@ -2216,7 +2230,8 @@ private AtomicInteger assertTracingContextOnRenameResumeProcess(final AzureBlobF }) .when(fs) .getRenameAtomicityUtilsForRedo(Mockito.any(Path.class), - Mockito.any(TracingContext.class)); + Mockito.any(TracingContext.class), Mockito.anyString(), Mockito.any( + AbfsInputStream.class)); Mockito.doAnswer(answer -> { answer.callRealMethod(); @@ -2312,4 +2327,160 @@ public void testProducerStopOnRenameFailure() throws Exception { Mockito.nullable(String.class), Mockito.nullable(String.class), Mockito.nullable(Integer.class), Mockito.any(TracingContext.class)); } + + @Test + public void testRenameResumeThroughListStatusWithSrcDirDeleted() throws Exception { + assumeNonHnsAccountBlobEndpoint(getFileSystem()); + AzureBlobFileSystem fs = Mockito.spy(getFileSystem()); + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + AbfsClient client = Mockito.spy(fs.getAbfsClient()); + Mockito.doReturn(store).when(fs).getAbfsStore(); + store.setClient(client); + + renameFailureSetup(fs, client, false); + + fs.delete(new Path("/hbase/testDir"), true); + AtomicInteger copied = assertTracingContextOnRenameResumeProcess(fs, store, + client, FSOperationType.LISTSTATUS); + Assertions.assertThat(fs.listStatus(new Path("/hbase"))).hasSize(1); + Assertions.assertThat(copied.get()).isEqualTo(0); + } + + @Test + public void testRenameResumeThroughListStatusWithSrcDirDeletedJustBeforeResume() throws Exception { + assumeNonHnsAccountBlobEndpoint(getFileSystem()); + AzureBlobFileSystem fs = Mockito.spy(getFileSystem()); + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + AbfsClient client = Mockito.spy(fs.getAbfsClient()); + Mockito.doReturn(store).when(fs).getAbfsStore(); + store.setClient(client); + + renameFailureSetup(fs, client, false); + + AtomicInteger copied = assertTracingContextOnRenameResumeProcess(fs, store, + client, FSOperationType.LISTSTATUS); + Mockito.doAnswer(answer -> { + String path = answer.getArgument(0); + if("/hbase/testDir".equalsIgnoreCase(path)) { + fs.delete(new Path(path), true); + } + return answer.callRealMethod(); + }).when(client).acquireBlobLease(Mockito.anyString(), Mockito.anyInt(), Mockito.any(TracingContext.class)); + Assertions.assertThat(fs.listStatus(new Path("/hbase"))).hasSize(1); + Assertions.assertThat(copied.get()).isEqualTo(0); + } + + @Test + public void testRenameResumeThroughListStatusWhenSrcDirectoryETagIsChanged() + throws Exception { + assumeNonHnsAccountBlobEndpoint(getFileSystem()); + AzureBlobFileSystem fs = Mockito.spy(getFileSystem()); + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + AbfsClient client = Mockito.spy(fs.getAbfsClient()); + Mockito.doReturn(store).when(fs).getAbfsStore(); + store.setClient(client); + + renameFailureSetup(fs, client, false); + + fs.delete(new Path("/hbase/testDir"), true); + fs.mkdirs(new Path("/hbase/testDir")); + AtomicInteger copied = assertTracingContextOnRenameResumeProcess(fs, store, + client, FSOperationType.LISTSTATUS); + Assertions.assertThat(fs.listStatus(new Path("/hbase"))).hasSize(2); + Assertions.assertThat(copied.get()).isEqualTo(0); + } + + @Test + public void testRenameResumeThroughGetStatusWhenSrcDirectoryETagIsChanged() + throws Exception { + assumeNonHnsAccountBlobEndpoint(getFileSystem()); + AzureBlobFileSystem fs = Mockito.spy(getFileSystem()); + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + AbfsClient client = Mockito.spy(fs.getAbfsClient()); + Mockito.doReturn(store).when(fs).getAbfsStore(); + store.setClient(client); + + renameFailureSetup(fs, client, false); + + fs.delete(new Path("/hbase/testDir"), true); + fs.mkdirs(new Path("/hbase/testDir")); + AtomicInteger copied = assertTracingContextOnRenameResumeProcess(fs, store, + client, FSOperationType.GET_FILESTATUS); + fs.getFileStatus(new Path("/hbase/testDir")); + Assertions.assertThat(copied.get()).isEqualTo(0); + } + + @Test + public void testRenameResumeThroughGetStatusWhenSrcDirMakerOnRenameCreatedOnDfsEndpoint() + throws Exception { + assumeNonHnsAccountBlobEndpoint(getFileSystem()); + Configuration configuration = Mockito.spy(getRawConfiguration()); + configuration.set(FS_AZURE_MKDIRS_FALLBACK_TO_DFS, "true"); + commonTestRenameResumeThroughGetStatusWhenSrcDirMarkerCreatedOnRename( + configuration); + } + + @Test + public void testRenameResumeThroughGetStatusWhenSrcDirMakerOnRenameCreatedOnBlobEndpoint() + throws Exception { + assumeNonHnsAccountBlobEndpoint(getFileSystem()); + Configuration configuration = Mockito.spy(getRawConfiguration()); + configuration.set(FS_AZURE_MKDIRS_FALLBACK_TO_DFS, "false"); + commonTestRenameResumeThroughGetStatusWhenSrcDirMarkerCreatedOnRename( + configuration); + } + + private void commonTestRenameResumeThroughGetStatusWhenSrcDirMarkerCreatedOnRename( + final Configuration configuration) throws Exception { + AzureBlobFileSystem fs = Mockito.spy( + (AzureBlobFileSystem) FileSystem.newInstance( + configuration)); + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + AbfsClient client = Mockito.spy(fs.getAbfsClient()); + Mockito.doReturn(store).when(fs).getAbfsStore(); + store.setClient(client); + + renameFailureSetup(fs, client, true); + intercept(FileNotFoundException.class, () -> { + fs.getFileStatus(new Path("/hbase/testDir")); + }); + FileStatus[] fileStatuses = fs.listStatus(new Path("/hbase/testDir2")); + Assertions.assertThat(fileStatuses).hasSize(10); + } + + @Test + public void testRenameResumeThroughListStatusWhenSrcDirMakerOnRenameCreatedOnDfsEndpoint() + throws Exception { + assumeNonHnsAccountBlobEndpoint(getFileSystem()); + Configuration configuration = Mockito.spy(getRawConfiguration()); + configuration.set(FS_AZURE_MKDIRS_FALLBACK_TO_DFS, "true"); + commonTestRenameResumeThroughListStatusWhenSrcDirMarkerCreatedOnRename( + configuration); + } + + @Test + public void testRenameResumeThroughListStatusWhenSrcDirMakerOnRenameCreatedOnBlobEndpoint() + throws Exception { + assumeNonHnsAccountBlobEndpoint(getFileSystem()); + Configuration configuration = Mockito.spy(getRawConfiguration()); + configuration.set(FS_AZURE_MKDIRS_FALLBACK_TO_DFS, "false"); + commonTestRenameResumeThroughListStatusWhenSrcDirMarkerCreatedOnRename( + configuration); + } + + private void commonTestRenameResumeThroughListStatusWhenSrcDirMarkerCreatedOnRename( + final Configuration configuration) throws Exception { + AzureBlobFileSystem fs = Mockito.spy( + (AzureBlobFileSystem) FileSystem.newInstance( + configuration)); + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + AbfsClient client = Mockito.spy(fs.getAbfsClient()); + Mockito.doReturn(store).when(fs).getAbfsStore(); + store.setClient(client); + + renameFailureSetup(fs, client, true); + Assertions.assertThat(fs.listStatus(new Path("/hbase/"))).hasSize(1); + FileStatus[] fileStatuses = fs.listStatus(new Path("/hbase/testDir2")); + Assertions.assertThat(fileStatuses).hasSize(10); + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java index 7d2efa8931f47..adf2b6676ca20 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java @@ -19,32 +19,40 @@ package org.apache.hadoop.fs.azurebfs.services; import java.io.IOException; - -import org.junit.Assert; -import org.junit.Test; import java.util.Arrays; +import java.util.Optional; +import java.util.Random; +import java.util.concurrent.ExecutionException; import org.assertj.core.api.Assertions; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FutureDataInputStreamBuilder; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; +import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TimeoutException; import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus; import org.apache.hadoop.fs.azurebfs.utils.TestCachedSASToken; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; -import org.mockito.Mockito; +import org.apache.hadoop.fs.impl.OpenFileParameters; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -199,6 +207,108 @@ public TestAbfsInputStream() throws Exception { ReadBufferManager.getBufferManager().setThresholdAgeMilliseconds(REDUCED_READ_BUFFER_AGE_THRESHOLD); } + private void writeBufferToNewFile(Path testFile, byte[] buffer) throws IOException { + AzureBlobFileSystem fs = getFileSystem(); + fs.create(testFile); + FSDataOutputStream out = fs.append(testFile); + out.write(buffer); + out.close(); + } + + private void verifyOpenWithProvidedStatus(Path path, FileStatus fileStatus, + byte[] buf, AbfsRestOperationType source) + throws IOException, ExecutionException, InterruptedException { + byte[] readBuf = new byte[buf.length]; + AzureBlobFileSystem fs = getFileSystem(); + FutureDataInputStreamBuilder builder = fs.openFile(path); + builder.withFileStatus(fileStatus); + FSDataInputStream in = builder.build().get(); + assertEquals(String.format( + "Open with fileStatus [from %s result]: Incorrect number of bytes read", + source), buf.length, in.read(readBuf)); + assertArrayEquals(String + .format("Open with fileStatus [from %s result]: Incorrect read data", + source), readBuf, buf); + } + + private void checkGetPathStatusCalls(Path testFile, FileStatus fileStatus, + AzureBlobFileSystemStore abfsStore, AbfsClient mockClient, + AbfsRestOperationType source, TracingContext tracingContext) + throws IOException { + + // verify GetPathStatus not invoked when FileStatus is provided + abfsStore.openFileForRead(testFile, Optional + .ofNullable(new OpenFileParameters().withStatus(fileStatus)), null, tracingContext); + verify(mockClient, times(0).description((String.format( + "FileStatus [from %s result] provided, GetFileStatus should not be invoked", + source)))).getPathStatus(anyString(), anyBoolean(), any(TracingContext.class)); + + // verify GetPathStatus invoked when FileStatus not provided + abfsStore.openFileForRead(testFile, + Optional.empty(), null, + tracingContext); + if (getPrefixMode(getFileSystem()) == PrefixMode.DFS) { + verify(mockClient, times(1).description( + "GetPathStatus should be invoked when FileStatus not provided")) + .getPathStatus(anyString(), anyBoolean(), any(TracingContext.class)); + } + + Mockito.reset(mockClient); //clears invocation count for next test case + } + + @Test + public void testOpenFileWithOptions() throws Exception { + AzureBlobFileSystem fs = getFileSystem(); + String testFolder = "/testFolder"; + Path smallTestFile = new Path(testFolder + "/testFile0"); + Path largeTestFile = new Path(testFolder + "/testFile1"); + fs.mkdirs(new Path(testFolder)); + int readBufferSize = getConfiguration().getReadBufferSize(); + byte[] smallBuffer = new byte[5]; + byte[] largeBuffer = new byte[readBufferSize + 5]; + new Random().nextBytes(smallBuffer); + new Random().nextBytes(largeBuffer); + writeBufferToNewFile(smallTestFile, smallBuffer); + writeBufferToNewFile(largeTestFile, largeBuffer); + + FileStatus[] getFileStatusResults = {fs.getFileStatus(smallTestFile), + fs.getFileStatus(largeTestFile)}; + FileStatus[] listStatusResults = fs.listStatus(new Path(testFolder)); + + // open with fileStatus from GetPathStatus + verifyOpenWithProvidedStatus(smallTestFile, getFileStatusResults[0], + smallBuffer, AbfsRestOperationType.GetPathStatus); + verifyOpenWithProvidedStatus(largeTestFile, getFileStatusResults[1], + largeBuffer, AbfsRestOperationType.GetPathStatus); + + // open with fileStatus from ListStatus + verifyOpenWithProvidedStatus(smallTestFile, listStatusResults[0], smallBuffer, + AbfsRestOperationType.ListPaths); + verifyOpenWithProvidedStatus(largeTestFile, listStatusResults[1], largeBuffer, + AbfsRestOperationType.ListPaths); + + // verify number of GetPathStatus invocations + AzureBlobFileSystemStore abfsStore = getAbfsStore(fs); + AbfsClient mockClient = spy(getClient(fs)); + setAbfsClient(abfsStore, mockClient); + TracingContext tracingContext = getTestTracingContext(fs, false); + checkGetPathStatusCalls(smallTestFile, getFileStatusResults[0], + abfsStore, mockClient, AbfsRestOperationType.GetPathStatus, tracingContext); + checkGetPathStatusCalls(largeTestFile, getFileStatusResults[1], + abfsStore, mockClient, AbfsRestOperationType.GetPathStatus, tracingContext); + checkGetPathStatusCalls(smallTestFile, listStatusResults[0], + abfsStore, mockClient, AbfsRestOperationType.ListPaths, tracingContext); + checkGetPathStatusCalls(largeTestFile, listStatusResults[1], + abfsStore, mockClient, AbfsRestOperationType.ListPaths, tracingContext); + + // Verify with incorrect filestatus + getFileStatusResults[0].setPath(new Path("wrongPath")); + intercept(ExecutionException.class, + () -> verifyOpenWithProvidedStatus(smallTestFile, + getFileStatusResults[0], smallBuffer, + AbfsRestOperationType.GetPathStatus)); + } + /** * This test expects AbfsInputStream to throw the exception that readAhead * thread received on read. The readAhead thread must be initiated from the