Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1437,8 +1437,6 @@ public void rename(final Path source, final Path destination,
if (getAbfsConfiguration().getPrefixMode() == PrefixMode.BLOB) {
LOG.debug("Rename for src: {} dst: {} for non-HNS blob-endpoint",
source, destination);
final Boolean isSrcExist;
final Boolean isSrcDir;
/*
* Fetch the list of blobs in the given sourcePath.
*/
Expand All @@ -1451,141 +1449,46 @@ public void rename(final Path source, final Path destination,
BlobList blobList = client.getListBlobs(null, listSrc, null, null,
tracingContext).getResult()
.getBlobList();
String nextMarker = blobList.getNextMarker();
List<BlobProperty> srcBlobProperties = blobList.getBlobPropertyList();

ListBlobQueue listBlobQueue = null;
if (srcBlobProperties.size() > 0) {
listBlobQueue = new ListBlobQueue(
blobList.getBlobPropertyList(),
getAbfsConfiguration().getProducerQueueMaxSize(),
getAbfsConfiguration().getBlobDirRenameMaxThread());
}
/*
* If nextMarker is non-null, there would be a list of blobs that would
* got returned, and listBlobQueue should be non-null. Adding null check
* on listBlobQueue for sanity.
*/
if (listBlobQueue != null && nextMarker != null) {
new ListBlobProducer(listSrc,
client, listBlobQueue, nextMarker, tracingContext);
orchestrateBlobRenameDir(source, destination, renameAtomicityUtils,
tracingContext, listSrc, blobList);
} else {
if (listBlobQueue != null) {
listBlobQueue.complete();
}
}

BlobProperty blobPropOnSrc;
if (srcBlobProperties.size() > 0) {
LOG.debug("src {} exists and is a directory", source);
isSrcExist = true;
isSrcDir = true;
/*
* Fetch if there is a marker-blob for the source blob.
*/
BlobProperty blobPropOnSrcNullable;
* Source doesn't have any hierarchy. It can either be marker or non-marker blob.
* Or there can be no blob on the path.
* Rename procedure will start. If its a file or a marker file, it will be renamed.
* In case there is no blob on the path, server will return exception.
*/
LOG.debug("source {} doesn't have any blob in its hierarchy. "
+ "Starting rename process on the source.", source);

AbfsLease lease = null;
try {
blobPropOnSrcNullable = getBlobProperty(source, tracingContext);
} catch (AbfsRestOperationException ex) {
if (ex.getStatusCode() != HttpURLConnection.HTTP_NOT_FOUND) {
throw ex;
if (isAtomicRenameKey(source.toUri().getPath())) {
lease = getBlobLease(source.toUri().getPath(),
BLOB_LEASE_ONE_MINUTE_DURATION, tracingContext);
}
blobPropOnSrcNullable = null;
}
if (blobPropOnSrcNullable == null) {
/*
* There is no marker-blob, the client has to create marker blob before
* starting the rename.
*/
//create marker file; add in srcBlobProperties;
LOG.debug("Source {} is a directory but there is no marker-blob",
source);
createDirectory(source, null, FsPermission.getDirDefault(),
FsPermission.getUMask(
getAbfsConfiguration().getRawConfiguration()),
tracingContext);
blobPropOnSrc = new BlobProperty();
blobPropOnSrc.setIsDirectory(true);
blobPropOnSrc.setPath(source);
} else {
LOG.debug("Source {} is a directory but there is a marker-blob",
source);
blobPropOnSrc = blobPropOnSrcNullable;
}
} else {
LOG.debug("source {} doesn't have any blob in its hierarchy. Checking"
+ "if there is marker blob for it.", source);
try {
blobPropOnSrc = getBlobProperty(source, tracingContext);
} catch (AbfsRestOperationException ex) {
if (ex.getStatusCode() != HttpURLConnection.HTTP_NOT_FOUND) {
throw ex;
renameBlob(source, destination, lease, tracingContext);
} catch (AzureBlobFileSystemException ex) {
if (lease != null) {
lease.free();
}
blobPropOnSrc = null;
}

if (blobPropOnSrc != null) {
isSrcExist = true;
if (blobPropOnSrc.getIsDirectory()) {
LOG.debug("source {} is a marker blob", source);
isSrcDir = true;
listBlobQueue = new ListBlobQueue(0,0);
listBlobQueue.complete();
} else {
LOG.debug("source {} exists but is not a marker blob", source);
isSrcDir = false;
LOG.error(
String.format("Rename of path from %s to %s failed",
source, destination), ex);
if (ex instanceof AbfsRestOperationException
&& ((AbfsRestOperationException) ex).getStatusCode()
== HTTP_NOT_FOUND) {
AbfsRestOperationException ex1 = (AbfsRestOperationException) ex;
throw new AbfsRestOperationException(
ex1.getStatusCode(),
AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND.getErrorCode(),
ex1.getErrorMessage(), ex1);
}
} else {
LOG.debug("source {} doesn't exist", source);
isSrcExist = false;
isSrcDir = false;
}
}

if (!isSrcExist) {
LOG.info("source {} doesn't exists", source);
throw new AbfsRestOperationException(HttpURLConnection.HTTP_NOT_FOUND,
AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND.getErrorCode(), null,
null);
}
if (isSrcDir) {
/*
* If source is a directory, all the blobs in the directory have to be
* individually copied and then deleted at the source.
*/
LOG.debug("source {} is a directory", source);
final AbfsBlobLease srcDirLease;
final Boolean isAtomicRename;
if (isAtomicRenameKey(source.toUri().getPath())) {
LOG.debug("source dir {} is an atomicRenameKey",
source.toUri().getPath());
srcDirLease = getBlobLease(source.toUri().getPath(),
BLOB_LEASE_ONE_MINUTE_DURATION,
tracingContext);
renameAtomicityUtils.preRename(srcBlobProperties, isCreateOperationOnBlobEndpoint());
isAtomicRename = true;
} else {
srcDirLease = null;
isAtomicRename = false;
LOG.debug("source dir {} is not an atomicRenameKey",
source.toUri().getPath());
}

renameBlobDir(source, destination, tracingContext, listBlobQueue,
srcDirLease, isAtomicRename);

if (renameAtomicityUtils != null) {
renameAtomicityUtils.cleanup();
}
} else {
LOG.debug("source {} is not directory", source);
AbfsLease lease = null;
if (isAtomicRenameKey(source.toUri().getPath())) {
lease = getBlobLease(source.toUri().getPath(),
BLOB_LEASE_ONE_MINUTE_DURATION, tracingContext);
throw ex;
}
renameBlob(blobPropOnSrc.getPath(), destination, lease, tracingContext
);
}
LOG.info("Rename from source {} to destination {} done", source,
destination);
Expand Down Expand Up @@ -1625,6 +1528,83 @@ public void rename(final Path source, final Path destination,
} while (shouldContinue);
}

private void orchestrateBlobRenameDir(final Path source,
final Path destination,
final RenameAtomicityUtils renameAtomicityUtils,
final TracingContext tracingContext,
final String listSrc,
final BlobList blobList) throws IOException {
ListBlobQueue listBlobQueue = new ListBlobQueue(
blobList.getBlobPropertyList(),
getAbfsConfiguration().getProducerQueueMaxSize(),
getAbfsConfiguration().getBlobDirRenameMaxThread());

if (blobList.getNextMarker() != null) {
new ListBlobProducer(listSrc,
client, listBlobQueue, blobList.getNextMarker(), tracingContext);
} else {
listBlobQueue.complete();
}
LOG.debug("src {} exists and is a directory", source);
/*
* Fetch if there is a marker-blob for the source blob.
*/
BlobProperty blobPropOnSrcNullable;
try {
blobPropOnSrcNullable = getBlobProperty(source, tracingContext);
} catch (AbfsRestOperationException ex) {
if (ex.getStatusCode() != HttpURLConnection.HTTP_NOT_FOUND) {
throw ex;
}
blobPropOnSrcNullable = null;
}

if (blobPropOnSrcNullable == null) {
/*
* There is no marker-blob, the client has to create marker blob before
* starting the rename.
*/
LOG.debug("Source {} is a directory but there is no marker-blob",
source);
createDirectory(source, null, FsPermission.getDirDefault(),
FsPermission.getUMask(
getAbfsConfiguration().getRawConfiguration()),
tracingContext);
} else {
LOG.debug("Source {} is a directory but there is a marker-blob",
source);
}
/*
* If source is a directory, all the blobs in the directory have to be
* individually copied and then deleted at the source.
*/
LOG.debug("source {} is a directory", source);
final AbfsBlobLease srcDirLease;
final Boolean isAtomicRename;
if (isAtomicRenameKey(source.toUri().getPath())) {
LOG.debug("source dir {} is an atomicRenameKey",
source.toUri().getPath());
srcDirLease = getBlobLease(source.toUri().getPath(),
BLOB_LEASE_ONE_MINUTE_DURATION,
tracingContext);
renameAtomicityUtils.preRename(
isCreateOperationOnBlobEndpoint());
isAtomicRename = true;
} else {
srcDirLease = null;
isAtomicRename = false;
LOG.debug("source dir {} is not an atomicRenameKey",
source.toUri().getPath());
}

renameBlobDir(source, destination, tracingContext, listBlobQueue,
srcDirLease, isAtomicRename);

if (isAtomicRename) {
renameAtomicityUtils.cleanup();
}
}

@VisibleForTesting
AbfsBlobLease getBlobLease(final String source,
final Integer blobLeaseOneMinuteDuration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.TimeZone;

import com.fasterxml.jackson.core.JsonParseException;
Expand All @@ -51,7 +50,7 @@
/**
* For a directory enabled for atomic-rename, before rename starts, a
* file with -RenamePending.json suffix is created. In this file, the states required
* for the rename are given. This file is created by {@link #preRename(List, Boolean)} ()} method.
* for the rename are given. This file is created by {@link #preRename(Boolean)} ()} method.
* This is important in case the JVM process crashes during rename, the atomicity
* will be maintained, when the job calls {@link AzureBlobFileSystem#listStatus(Path)}
* or {@link AzureBlobFileSystem#getFileStatus(Path)}. On these API calls to filesystem,
Expand Down Expand Up @@ -198,13 +197,12 @@ private void deleteRenamePendingFile(FileSystem fs, Path redoFile)
* } }</pre>
* @throws IOException Thrown when fail to write file.
*/
public void preRename(List<BlobProperty> blobPropertyList,
final Boolean isCreateOperationOnBlobEndpoint) throws IOException {
public void preRename(final Boolean isCreateOperationOnBlobEndpoint) throws IOException {
Path path = getRenamePendingFilePath();
LOG.debug("Preparing to write atomic rename state to {}", path.toString());
OutputStream output = null;

String contents = makeRenamePendingFileContents(blobPropertyList);
String contents = makeRenamePendingFileContents();

// Write file.
try {
Expand Down Expand Up @@ -263,7 +261,7 @@ private Throwable getWrappedException(final IOException e) {
*
* @return JSON string which represents the operation.
*/
private String makeRenamePendingFileContents(List<BlobProperty> blobPropertyList) {
private String makeRenamePendingFileContents() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
String time = sdf.format(new Date());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.hadoop.fs.azurebfs.services;

import java.io.IOException;
import java.util.List;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
Expand All @@ -35,8 +34,7 @@ public RenameNonAtomicUtils(final AzureBlobFileSystem azureBlobFileSystem,
}

@Override
public void preRename(final List<BlobProperty> blobPropertyList,
final Boolean isCreateOperationOnBlobEndpoint)
public void preRename(final Boolean isCreateOperationOnBlobEndpoint)
throws IOException {

}
Expand Down
Loading