Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
767273b
Backport HADOOP-17767. ABFS: Updates test scripts #3124
snvijaya Oct 20, 2022
dd8b64b
Merge pull request #14 from ABFSDriver/common_code
anmolanmol1234 Apr 3, 2023
f3531df
Merge pull request #13 from ABFSDriver/testScriptPR
anmolanmol1234 Apr 4, 2023
10628a7
listBlob, getBlobProperty
saxenapranav Apr 10, 2023
a5135e2
WIP blobList review. Test to be fixed; SAS test to be added
saxenapranav Apr 12, 2023
3e0dbc6
added tests; SAS test; refactor signature
saxenapranav Apr 12, 2023
7aa6b5b
pom exclude/include
saxenapranav Apr 12, 2023
ac01ec3
EOF
saxenapranav Apr 12, 2023
9bb9e06
type; away from sdk function: get threadLocal for SAXParser
saxenapranav Apr 12, 2023
9963dae
backmerged ABFS_3.3.2_dev_list
saxenapranav Apr 12, 2023
507e5c2
backemrge correction: move setPathProperties to correct place
saxenapranav Apr 12, 2023
5e825ca
small refactors as per review
saxenapranav Apr 12, 2023
355d00c
PR review refactor1.
saxenapranav Apr 12, 2023
3ab9e33
refactores; javadocs; blobList related class to be in service; PR rev…
saxenapranav Apr 13, 2023
0fca35b
reduced javadocs sentences; review comment for the the getter which s…
saxenapranav Apr 13, 2023
3f2da6e
backmerge ABFS_3.3.2_dev_list/
saxenapranav Apr 13, 2023
ff0d873
for the root, we just want prefix as /
saxenapranav Apr 13, 2023
5c848b5
reduced tests heuristics; removed maxServerResultPerCall
saxenapranav Apr 13, 2023
3af0258
backmerge ABFS_3.3.2_dev_list
saxenapranav Apr 16, 2023
bce38f1
mkdirs to create marker-blob; test change to assert isDir
saxenapranav Apr 16, 2023
2f90981
deleteBlob should return AbfsrestOperation
saxenapranav Apr 16, 2023
d036d2a
check parent using indexOf instead of looping
saxenapranav Apr 16, 2023
0fd748d
small refactors as per review.
saxenapranav Apr 16, 2023
f1f3e14
no fs in store.
saxenapranav Apr 16, 2023
433a56c
testRenameBlobToDstWithColonInPath
saxenapranav Apr 16, 2023
f4b3254
refactors class as per apache-hadoop coding standdards
saxenapranav Apr 17, 2023
0fffa6d
PathInformation new class; move away from atomicBoolean for isExists,…
saxenapranav Apr 17, 2023
7b9ea3e
thread wait for copy progress poll made configurable; instead of dfs …
saxenapranav Apr 17, 2023
d70b894
removed getBlobPropertyWithNotFoundHandling
saxenapranav Apr 17, 2023
cb991ec
removed getBlobPropertyWithNotFoundHandling
saxenapranav Apr 17, 2023
2873e17
backmerge ABFS_3.3.2_dev_list: removal of getBlobProp404Handling
saxenapranav Apr 17, 2023
ef85e51
testRenameFileUsingUnicode: in case of Blob endpoint, dest should not…
saxenapranav Apr 17, 2023
c2db100
small refactors
saxenapranav Apr 17, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions hadoop-tools/hadoop-azure/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,6 @@
<exclude>**/azurebfs/ITestSmallWriteOptimization.java</exclude>
<exclude>**/azurebfs/services/ITestReadBufferManager.java</exclude>
<exclude>**/azurebfs/ITestAzureBlobFileSystemRename.java</exclude>
<exclude>**/azurebfs/ITestListBlob.java</exclude>
</excludes>

</configuration>
Expand Down Expand Up @@ -600,7 +599,6 @@
<include>**/azurebfs/ITestSmallWriteOptimization.java</include>
<include>**/azurebfs/services/ITestReadBufferManager.java</include>
<include>**/azurebfs/ITestAzureBlobFileSystemRename.java</include>
<include>**/azurebfs/ITestListBlob.java</include>
</includes>
</configuration>
</execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,10 @@ public class AbfsConfiguration{
DefaultValue = 0)
private int blobDirRenameMaxThread;

@LongConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_BLOB_COPY_PROGRESS_POLL_WAIT_MILLIS,
DefaultValue = 1_000L)
private long blobCopyProgressPollWaitMillis;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_READ_AHEAD_BLOCK_SIZE,
MinValue = MIN_BUFFER_SIZE,
MaxValue = MAX_BUFFER_SIZE,
Expand Down Expand Up @@ -1031,6 +1035,10 @@ public int getBlobDirRenameMaxThread() {
return blobDirRenameMaxThread;
}

public long getBlobCopyProgressPollWaitMillis() {
return blobCopyProgressPollWaitMillis;
}

@VisibleForTesting
void setReadBufferSize(int bufferSize) {
this.readBufferSize = bufferSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.azurebfs.services.BlobProperty;
import org.apache.hadoop.fs.azurebfs.services.PathInformation;
import org.apache.hadoop.fs.azurebfs.services.PrefixMode;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -401,14 +402,18 @@ public FSDataOutputStream append(final Path f, final int bufferSize, final Progr
}

public boolean rename(final Path src, final Path dst) throws IOException {
LOG.debug("AzureBlobFileSystem.rename src: {} dst: {}", src, dst);
LOG.debug("Rename via Blob-endpoint for non-HNS account: {}",
getAbfsStore().getAbfsConfiguration().getPrefixMode()
== PrefixMode.BLOB);
LOG.debug("AzureBlobFileSystem.rename src: {} dst: {} via {} endpoint", src, dst,
getAbfsStore().getAbfsConfiguration().getPrefixMode());
statIncrement(CALL_RENAME);

trailingPeriodCheck(dst);

if (getAbfsStore().getAbfsConfiguration().getPrefixMode() == PrefixMode.BLOB
&& containsColon(dst)) {
throw new IOException("Cannot rename to file " + dst
+ " that has colons in the name through blob endpoint");
}

Path parentFolder = src.getParent();
if (parentFolder == null) {
return false;
Expand All @@ -420,61 +425,72 @@ public boolean rename(final Path src, final Path dst) throws IOException {
if (getAbfsStore().getAbfsConfiguration().getPrefixMode()
== PrefixMode.BLOB) {
/*
* Special case 1:
* For blob endpoint with non-HNS account, client has to ensure that destination
* is not a sub-directory of source.
*/
LOG.debug("Check if the destination is subDirectory");
while (nestedDstParent != null) {
if (makeQualified(nestedDstParent).equals(qualifiedSrcPath)) {
//testRenameChildDirForbidden.
LOG.info("Rename src: {} dst: {} failed as dst is subDir of src",
qualifiedSrcPath, qualifiedDstPath);
return false;
}
nestedDstParent = nestedDstParent.getParent();
if (nestedDstParent != null && makeQualified(nestedDstParent).toUri()
.getPath()
.indexOf(qualifiedSrcPath.toUri().getPath()) == 0) {
LOG.info("Rename src: {} dst: {} failed as dst is subDir of src",
qualifiedSrcPath, qualifiedDstPath);
return false;
}
}


TracingContext tracingContext = new TracingContext(clientCorrelationId,
fileSystemId, FSOperationType.RENAME, true, tracingHeaderFormat,
listener);
// special case 2:
// rename under same folder;
if (makeQualified(parentFolder).equals(qualifiedDstPath)) {
return tryGetFileStatus(qualifiedSrcPath, tracingContext) != null;
PathInformation pathInformation = getPathInformation(qualifiedDstPath,
tracingContext);
return pathInformation.getPathExists();
}

//special case 3:
if (qualifiedSrcPath.equals(qualifiedDstPath)) {
// rename to itself
// - if it doesn't exist, return false
// - if it is file, return true
// - if it is dir, return false.
final AtomicBoolean isDstDirectory = new AtomicBoolean();
final AtomicBoolean isDstExists = new AtomicBoolean();

getPathInformation(qualifiedDstPath, tracingContext, isDstDirectory,
isDstExists);
if (!isDstExists.get()) {
final PathInformation pathInformation = getPathInformation(
qualifiedDstPath, tracingContext
);
final Boolean isDstExists = pathInformation.getPathExists();
final Boolean isDstDirectory = pathInformation.getIsDirectory();
if (!isDstExists) {
return false;
}
return isDstDirectory.get() ? false : true;
return isDstDirectory ? false : true;
}

final AtomicBoolean isDstDirectory = new AtomicBoolean();
final AtomicBoolean isDstExists = new AtomicBoolean();

// special case 4:
// Non-HNS account need to check dst status on driver side.
PathInformation fnsPathInformation = null;
if (!abfsStore.getIsNamespaceEnabled(tracingContext)) {
getPathInformation(qualifiedDstPath, tracingContext, isDstDirectory,
isDstExists);
fnsPathInformation = getPathInformation(qualifiedDstPath, tracingContext
);
}

try {
final Boolean isFnsDstExists, isFnsDstDirectory;
if (fnsPathInformation != null) {
isFnsDstDirectory = fnsPathInformation.getIsDirectory();
isFnsDstExists = fnsPathInformation.getPathExists();
} else {
isFnsDstExists = false;
isFnsDstDirectory = false;
}
String sourceFileName = src.getName();
Path adjustedDst = dst;

if (isDstExists.get()) {
if (!isDstDirectory.get()) {
if (isFnsDstExists) {
if (!isFnsDstDirectory) {
return qualifiedSrcPath.equals(qualifiedDstPath);
}
adjustedDst = new Path(dst, sourceFileName);
Expand All @@ -486,13 +502,15 @@ public boolean rename(final Path src, final Path dst) throws IOException {
*/
if (getAbfsStore().getAbfsConfiguration().getPrefixMode()
== PrefixMode.BLOB) {
isDstDirectory.set(false);
isDstExists.set(false);
getPathInformation(qualifiedDstPath, tracingContext, isDstDirectory,
isDstExists);
if (isDstExists.get()) {
final PathInformation qualifiedDstPathInformation
= getPathInformation(qualifiedDstPath, tracingContext
);
final Boolean isQualifiedDstExists
= qualifiedDstPathInformation.getPathExists();
if (isQualifiedDstExists) {
//destination already there. Rename should not be overwriting.
LOG.info("Rename src: {} dst: {} failed as qualifiedDst already exists",
LOG.info(
"Rename src: {} dst: {} failed as qualifiedDst already exists",
qualifiedSrcPath, qualifiedDstPath);
throw new AbfsRestOperationException(
HttpURLConnection.HTTP_CONFLICT,
Expand All @@ -501,18 +519,23 @@ public boolean rename(final Path src, final Path dst) throws IOException {
}
}
} else {
LOG.debug("dst {} doesn't exists. Check if the parent exists.", adjustedDst);
LOG.debug("dst {} doesn't exists. Check if the parent exists.",
adjustedDst);
qualifiedDstPath = makeQualified(adjustedDst);
/*
* If the destination doesn't exist, check if parent of destination exists.
*/
Path parent = qualifiedDstPath.getParent();
if (parent == null || !parent.isRoot()) {
isDstDirectory.set(false);
isDstExists.set(false);
getPathInformation(parent, tracingContext, isDstDirectory,
isDstExists);
if (!isDstExists.get() || !isDstDirectory.get()) {
if (getAbfsStore().getAbfsConfiguration().getPrefixMode()
== PrefixMode.BLOB && (parent != null && !parent.isRoot())) {
PathInformation dstParentPathInformation = getPathInformation(parent,
tracingContext
);
final Boolean dstParentPathExists
= dstParentPathInformation.getPathExists();
final Boolean isDstParentPathDirectory
= dstParentPathInformation.getIsDirectory();
if (!dstParentPathExists || !isDstParentPathDirectory) {
LOG.info("parent of {} is {} doesn't exists. Failing rename",
adjustedDst, parent);
throw new AbfsRestOperationException(
Expand All @@ -523,21 +546,30 @@ public boolean rename(final Path src, final Path dst) throws IOException {
}
}
}

abfsStore.rename(qualifiedSrcPath, qualifiedDstPath, this,
final RenameAtomicityUtils renameAtomicityUtils;
if (getAbfsStore().getAbfsConfiguration().getPrefixMode()
== PrefixMode.BLOB &&
abfsStore.isAtomicRenameKey(qualifiedSrcPath.toUri().getPath())) {
renameAtomicityUtils = new RenameAtomicityUtils(this,
qualifiedSrcPath, qualifiedDstPath, tracingContext);
} else {
renameAtomicityUtils = new RenameNonAtomicUtils(this,
qualifiedSrcPath, qualifiedDstPath, tracingContext);
}
abfsStore.rename(qualifiedSrcPath, qualifiedDstPath, renameAtomicityUtils,
tracingContext);
return true;
} catch (AzureBlobFileSystemException ex) {
LOG.debug("Rename operation failed. ", ex);
checkException(
src,
ex,
AzureServiceErrorCode.PATH_ALREADY_EXISTS,
AzureServiceErrorCode.INVALID_RENAME_SOURCE_PATH,
AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND,
AzureServiceErrorCode.INVALID_SOURCE_OR_DESTINATION_RESOURCE_TYPE,
AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND,
AzureServiceErrorCode.INTERNAL_OPERATION_ABORT);
src,
ex,
AzureServiceErrorCode.PATH_ALREADY_EXISTS,
AzureServiceErrorCode.INVALID_RENAME_SOURCE_PATH,
AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND,
AzureServiceErrorCode.INVALID_SOURCE_OR_DESTINATION_RESOURCE_TYPE,
AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND,
AzureServiceErrorCode.INTERNAL_OPERATION_ABORT);
return false;
}
}
Expand All @@ -555,44 +587,43 @@ public boolean rename(final Path src, final Path dst) throws IOException {
* shall be called. If the response returned an object, the path can be defined
* as existing. If the response's metadata contains it is directory, the path
* can be defined as a directory.
*
* @param path path for which information is requried.
* @param tracingContext tracingContext for the operations.
* @param isPathDirectory atomicBoolean object which will be set in the method
* if the given path is directory.
* @param isPathExists atomicBoolean object which will be set in the method if
* the given path exists.
*
* @return pathInformation containing if path exists and is a directory.
*
* @throws AzureBlobFileSystemException exceptions caught from the server calls.
*/
private void getPathInformation(final Path path,
final TracingContext tracingContext,
final AtomicBoolean isPathDirectory,
final AtomicBoolean isPathExists) throws AzureBlobFileSystemException {
private PathInformation getPathInformation(final Path path,
final TracingContext tracingContext) throws AzureBlobFileSystemException {
if (getAbfsStore().getAbfsConfiguration().getPrefixMode()
== PrefixMode.BLOB) {
List<BlobProperty> blobProperties = getAbfsStore()
.getListBlobs(path, tracingContext, 2, 2, true);
.getListBlobs(path, null, tracingContext, 2, true);
if (blobProperties.size() > 0) {
isPathExists.set(true);
isPathDirectory.set(true);
return;
return new PathInformation(true, true);
}
BlobProperty blobProperty
= getAbfsStore().getBlobPropertyWithNotFoundHandling(path,
tracingContext);
if (blobProperty != null) {
isPathExists.set(true);
if (blobProperty.getIsDirectory()) {
isPathDirectory.set(true);
BlobProperty blobProperty;
try {
blobProperty = getAbfsStore().getBlobProperty(path, tracingContext);
} catch (AbfsRestOperationException ex) {
if (ex.getStatusCode() != HttpURLConnection.HTTP_NOT_FOUND) {
throw ex;
}
blobProperty = null;
}
if (blobProperty != null) {
return new PathInformation(true, blobProperty.getIsDirectory());
}
} else {
final FileStatus fileStatus = tryGetFileStatus(path,
tracingContext);
if (fileStatus != null) {
isPathExists.set(true);
isPathDirectory.set(fileStatus.isDirectory());
return new PathInformation(true, fileStatus.isDirectory());
}
}
return new PathInformation(false, false);
}

@Override
Expand Down Expand Up @@ -1596,6 +1627,10 @@ private Throwable getRootCause(Throwable throwable) {
return result;
}

private boolean containsColon(Path p) {
return p.toUri().getPath().contains(":");
}

/**
* Get a delegation token from remote service endpoint if
* 'fs.azure.enable.kerberos.support' is set to 'true', and
Expand Down Expand Up @@ -1696,7 +1731,7 @@ String getClientCorrelationId() {
return clientCorrelationId;
}

@org.apache.hadoop.classification.VisibleForTesting
@VisibleForTesting
void setAbfsStore(final AzureBlobFileSystemStore abfsStore) {
this.abfsStore = abfsStore;
}
Expand Down
Loading