Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
109a0ba
changed signature to bring it to redo method
saxenapranav Jul 20, 2023
e585777
flowing etag in renameAtomicUtils to write to JSON; read from JSON
saxenapranav Jul 24, 2023
1ad1c02
etag check before redo
saxenapranav Jul 24, 2023
3156efd
redo method doesn't need etag; tests wip
saxenapranav Jul 24, 2023
5563827
added tests
saxenapranav Jul 24, 2023
08606b4
quote or unquote the etag
saxenapranav Jul 24, 2023
38b6dfa
javadocs changes
saxenapranav Jul 24, 2023
77ec128
try saving listStatus call if nothing changed
saxenapranav Jul 24, 2023
bab6499
asssertion if the listStatus API of FS will give correct result when …
saxenapranav Jul 24, 2023
1dd8204
added a comment on leaseAcquire catch in redo method
saxenapranav Jul 25, 2023
3d04da3
renamePendingJSON in listStatus to be searched only if its atomicDire…
saxenapranav Jul 25, 2023
d9a38b8
added tests; to take qualified path uri for checking if its atomic in…
saxenapranav Jul 25, 2023
fbf0a24
Merge branch 'ABFS_3.3.2_dev' into ABFS_3.3.2_dev_redo_optimization_r…
saxenapranav Jul 25, 2023
14104c1
line spacing refactor
saxenapranav Jul 25, 2023
a660122
HADOOP-17682. ABFS: Support FileStatus input to OpenFileWithOptions()…
sumangala-patki Aug 18, 2021
08b8c79
listStatus and getFileStatus will open the inputStream and send for r…
saxenapranav Jul 26, 2023
aa2d21d
modified time to check the number of invocation of getFileStatus in t…
saxenapranav Jul 26, 2023
e69321e
directly calling store.openWithOptions from listStatus/getFileStatus;…
saxenapranav Jul 26, 2023
9be288b
testRefator
saxenapranav Jul 26, 2023
f326e52
Merge branch 'ABFS_3.3.2_dev' into ABFS_3.3.2_dev_redo_optimization_f…
saxenapranav Jul 26, 2023
dd12da4
for fileStatus of renamePendingJson, we will only call getPathProperty.
saxenapranav Jul 26, 2023
549285d
invocation of getPathProperty instead of getFileStatus
saxenapranav Jul 26, 2023
b76972e
fix tests in ITestAzureBlobFileSystemFileStatus
saxenapranav Jul 27, 2023
1f77da6
fileStatus.getpath().toUri().getPathh() extract variable for reuse
saxenapranav Jul 27, 2023
680f18c
filePathStr = qualifiedPath.toUri().getPath() instead of path.toUri()…
saxenapranav Jul 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException;
import org.apache.hadoop.fs.azurebfs.services.AbfsBlobLease;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
import org.apache.hadoop.fs.azurebfs.services.BlobProperty;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.azurebfs.services.OperativeEndpoint;
Expand Down Expand Up @@ -116,6 +118,7 @@
import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_DEFAULT;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.*;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.BLOB_LEASE_ONE_MINUTE_DURATION;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FORWARD_SLASH;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_BLOB_ENDPOINT;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_DNS_PREFIX;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.WASB_DNS_PREFIX;
Expand Down Expand Up @@ -360,7 +363,7 @@ public FSDataInputStream open(final Path path, final int bufferSize) throws IOEx
}

private FSDataInputStream open(final Path path,
final Optional<Configuration> options) throws IOException {
final Optional<OpenFileParameters> parameters) throws IOException {
statIncrement(CALL_OPEN);
Path qualifiedPath = makeQualified(path);

Expand All @@ -369,14 +372,23 @@ private FSDataInputStream open(final Path path,
fileSystemId, FSOperationType.OPEN, tracingHeaderFormat,
listener);
InputStream inputStream = getAbfsStore().openFileForRead(qualifiedPath,
options, statistics, tracingContext);
parameters, statistics, tracingContext);
return new FSDataInputStream(inputStream);
} catch(AzureBlobFileSystemException ex) {
checkException(path, ex);
return null;
}
}

/**
* Takes config and other options through
* {@link org.apache.hadoop.fs.impl.OpenFileParameters}. Ensure that
* FileStatus entered is up-to-date, as it will be used to create the
* InputStream (with info such as contentLength, eTag)
* @param path The location of file to be opened
* @param parameters OpenFileParameters instance; can hold FileStatus,
* Configuration, bufferSize and mandatoryKeys
*/
@Override
protected CompletableFuture<FSDataInputStream> openFileWithOptions(
final Path path, final OpenFileParameters parameters) throws IOException {
Expand All @@ -387,7 +399,7 @@ protected CompletableFuture<FSDataInputStream> openFileWithOptions(
"for " + path);
return LambdaUtils.eval(
new CompletableFuture<>(), () ->
open(path, Optional.of(parameters.getOptions())));
open(path, Optional.of(parameters)));
}

private boolean shouldRedirect(FSOperationType type, TracingContext context)
Expand Down Expand Up @@ -914,17 +926,40 @@ public FileStatus[] listStatus(final Path f) throws IOException {
TracingContext tracingContext = new TracingContext(clientCorrelationId,
fileSystemId, FSOperationType.LISTSTATUS, true, tracingHeaderFormat,
listener);
FileStatus[] result = getAbfsStore().listStatus(qualifiedPath, tracingContext);
FileStatus[] result = getAbfsStore().listStatus(qualifiedPath,
tracingContext);
if (getAbfsStore().getAbfsConfiguration().getPrefixMode()
== PrefixMode.BLOB) {
FileStatus renamePendingFileStatus
== PrefixMode.BLOB && getAbfsStore().isAtomicRenameKey(
qualifiedPath.toUri().getPath() + FORWARD_SLASH)) {
Pair<FileStatus, FileStatus> renamePendingJsonAndSrcFileStatusPair
= getAbfsStore().getRenamePendingFileStatus(result);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename variable to renamePendingJSONStatus

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactored.

if (renamePendingFileStatus != null) {
RenameAtomicityUtils renameAtomicityUtils =
getRenameAtomicityUtilsForRedo(renamePendingFileStatus.getPath(),
tracingContext);
renameAtomicityUtils.cleanup(renamePendingFileStatus.getPath());
result = getAbfsStore().listStatus(qualifiedPath, tracingContext);
FileStatus renamePendingSrcFileStatus
= renamePendingJsonAndSrcFileStatusPair.getRight();
FileStatus renamePendingJsonFileStatus
= renamePendingJsonAndSrcFileStatusPair.getLeft();
if (renamePendingJsonFileStatus != null) {
final Boolean isRedone;
if (renamePendingSrcFileStatus != null) {
RenameAtomicityUtils renameAtomicityUtils =
getRenameAtomicityUtilsForRedo(
renamePendingJsonFileStatus.getPath(),
tracingContext,
((AzureBlobFileSystemStore.VersionedFileStatus) renamePendingSrcFileStatus).getEtag(),
getRenamePendingJsonInputStream(
renamePendingJsonFileStatus, tracingContext));
renameAtomicityUtils.cleanup(renamePendingJsonFileStatus.getPath());
isRedone = renameAtomicityUtils.isRedone();
} else {
isRedone = false;
getAbfsStore().delete(renamePendingJsonFileStatus.getPath(), true,
tracingContext);
}
if (isRedone) {
result = getAbfsStore().listStatus(qualifiedPath, tracingContext);
} else {
result = ArrayUtils.removeElement(result,
renamePendingJsonFileStatus);
}
}
}
return result;
Expand All @@ -934,11 +969,13 @@ public FileStatus[] listStatus(final Path f) throws IOException {
}
}

RenameAtomicityUtils getRenameAtomicityUtilsForRedo(final Path renamePendingFileStatus,
final TracingContext tracingContext) throws IOException {
RenameAtomicityUtils getRenameAtomicityUtilsForRedo(final Path renamePendingJsonPath,
final TracingContext tracingContext, final String srcEtag,
final AbfsInputStream renamePendingJsonInputStream) throws IOException {
return new RenameAtomicityUtils(this,
renamePendingFileStatus,
getAbfsStore().getRedoRenameInvocation(tracingContext));
renamePendingJsonPath,
getAbfsStore().getRedoRenameInvocation(tracingContext), srcEtag,
renamePendingJsonInputStream);
}

/**
Expand Down Expand Up @@ -1052,7 +1089,6 @@ private FileStatus getFileStatus(final Path path,
LOG.debug("AzureBlobFileSystem.getFileStatus path: {}", path);
statIncrement(CALL_GET_FILE_STATUS);
Path qualifiedPath = makeQualified(path);
FileStatus fileStatus;
PrefixMode prefixMode = getAbfsStore().getPrefixMode();
AbfsConfiguration abfsConfiguration = getAbfsStore().getAbfsConfiguration();

Expand All @@ -1064,22 +1100,42 @@ private FileStatus getFileStatus(final Path path,
* Get File Status over Blob Endpoint will Have an additional call
* to check if directory is implicit.
*/
fileStatus = getAbfsStore().getFileStatus(qualifiedPath,
tracingContext, useBlobEndpoint);
if (getAbfsStore().getPrefixMode() == PrefixMode.BLOB
&& fileStatus != null && fileStatus.isDirectory()
&& getAbfsStore().isAtomicRenameKey(fileStatus.getPath().toUri().getPath())
&& getAbfsStore().getRenamePendingFileStatusInDirectory(fileStatus,
tracingContext)) {
RenameAtomicityUtils renameAtomicityUtils = getRenameAtomicityUtilsForRedo(
new Path(fileStatus.getPath().toUri().getPath() + SUFFIX),
tracingContext);
renameAtomicityUtils.cleanup(
new Path(fileStatus.getPath().toUri().getPath() + SUFFIX));
throw new AbfsRestOperationException(HttpURLConnection.HTTP_NOT_FOUND,
AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(), null,
new FileNotFoundException(
qualifiedPath + ": No such file or directory."));
final FileStatus fileStatus = getAbfsStore().getFileStatus(qualifiedPath,
tracingContext, useBlobEndpoint);
final String filePathStr = qualifiedPath.toUri().getPath();
if (getAbfsStore().getPrefixMode() == PrefixMode.BLOB
&& fileStatus != null && fileStatus.isDirectory()
&& getAbfsStore().isAtomicRenameKey(filePathStr)) {
FileStatus renamePendingJsonFileStatus;
try {
renamePendingJsonFileStatus = getAbfsStore().getPathProperty(
makeQualified(
new Path(filePathStr + SUFFIX)),
tracingContext, true);
} catch (AbfsRestOperationException ex) {
if(ex.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
renamePendingJsonFileStatus = null;
} else {
throw ex;
}
}

if (renamePendingJsonFileStatus != null) {
RenameAtomicityUtils renameAtomicityUtils
= getRenameAtomicityUtilsForRedo(
renamePendingJsonFileStatus.getPath(),
tracingContext,
((AzureBlobFileSystemStore.VersionedFileStatus) fileStatus).getEtag(),
getRenamePendingJsonInputStream(renamePendingJsonFileStatus, tracingContext));
renameAtomicityUtils.cleanup(renamePendingJsonFileStatus.getPath());
if (renameAtomicityUtils.isRedone()) {
throw new AbfsRestOperationException(
HttpURLConnection.HTTP_NOT_FOUND,
AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(), null,
new FileNotFoundException(
qualifiedPath + ": No such file or directory."));
}
}
}
return fileStatus;
} catch (AzureBlobFileSystemException ex) {
Expand All @@ -1088,6 +1144,16 @@ && getAbfsStore().getRenamePendingFileStatusInDirectory(fileStatus,
}
}

private AbfsInputStream getRenamePendingJsonInputStream(final FileStatus renamePendingJsonFileStatus,
final TracingContext tracingContext)
throws IOException {
Path qualifiedPath = makeQualified(renamePendingJsonFileStatus.getPath());
return getAbfsStore().openFileForRead(qualifiedPath,
Optional.of(
new OpenFileParameters().withStatus(renamePendingJsonFileStatus)),
statistics, tracingContext);
}

/**
* Break the current lease on an ABFS file if it exists. A lease that is broken cannot be
* renewed. A new lease may be obtained on the file immediately.
Expand Down
Loading