diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index ce9a65f14dd0a8..1529aace8ea033 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -1437,8 +1437,6 @@ public void rename(final Path source, final Path destination, if (getAbfsConfiguration().getPrefixMode() == PrefixMode.BLOB) { LOG.debug("Rename for src: {} dst: {} for non-HNS blob-endpoint", source, destination); - final Boolean isSrcExist; - final Boolean isSrcDir; /* * Fetch the list of blobs in the given sourcePath. */ @@ -1451,141 +1449,46 @@ public void rename(final Path source, final Path destination, BlobList blobList = client.getListBlobs(null, listSrc, null, null, tracingContext).getResult() .getBlobList(); - String nextMarker = blobList.getNextMarker(); List srcBlobProperties = blobList.getBlobPropertyList(); - ListBlobQueue listBlobQueue = null; if (srcBlobProperties.size() > 0) { - listBlobQueue = new ListBlobQueue( - blobList.getBlobPropertyList(), - getAbfsConfiguration().getProducerQueueMaxSize(), - getAbfsConfiguration().getBlobDirRenameMaxThread()); - } - /* - * If nextMarker is non-null, there would be a list of blobs that would - * got returned, and listBlobQueue should be non-null. Adding null check - * on listBlobQueue for sanity. - */ - if (listBlobQueue != null && nextMarker != null) { - new ListBlobProducer(listSrc, - client, listBlobQueue, nextMarker, tracingContext); + orchestrateBlobRenameDir(source, destination, renameAtomicityUtils, + tracingContext, listSrc, blobList); } else { - if (listBlobQueue != null) { - listBlobQueue.complete(); - } - } - - BlobProperty blobPropOnSrc; - if (srcBlobProperties.size() > 0) { - LOG.debug("src {} exists and is a directory", source); - isSrcExist = true; - isSrcDir = true; /* - * Fetch if there is a marker-blob for the source blob. - */ - BlobProperty blobPropOnSrcNullable; + * Source doesn't have any hierarchy. It can either be marker or non-marker blob. + * Or there can be no blob on the path. + * Rename procedure will start. If its a file or a marker file, it will be renamed. + * In case there is no blob on the path, server will return exception. + */ + LOG.debug("source {} doesn't have any blob in its hierarchy. " + + "Starting rename process on the source.", source); + + AbfsLease lease = null; try { - blobPropOnSrcNullable = getBlobProperty(source, tracingContext); - } catch (AbfsRestOperationException ex) { - if (ex.getStatusCode() != HttpURLConnection.HTTP_NOT_FOUND) { - throw ex; + if (isAtomicRenameKey(source.toUri().getPath())) { + lease = getBlobLease(source.toUri().getPath(), + BLOB_LEASE_ONE_MINUTE_DURATION, tracingContext); } - blobPropOnSrcNullable = null; - } - if (blobPropOnSrcNullable == null) { - /* - * There is no marker-blob, the client has to create marker blob before - * starting the rename. - */ - //create marker file; add in srcBlobProperties; - LOG.debug("Source {} is a directory but there is no marker-blob", - source); - createDirectory(source, null, FsPermission.getDirDefault(), - FsPermission.getUMask( - getAbfsConfiguration().getRawConfiguration()), - tracingContext); - blobPropOnSrc = new BlobProperty(); - blobPropOnSrc.setIsDirectory(true); - blobPropOnSrc.setPath(source); - } else { - LOG.debug("Source {} is a directory but there is a marker-blob", - source); - blobPropOnSrc = blobPropOnSrcNullable; - } - } else { - LOG.debug("source {} doesn't have any blob in its hierarchy. Checking" - + "if there is marker blob for it.", source); - try { - blobPropOnSrc = getBlobProperty(source, tracingContext); - } catch (AbfsRestOperationException ex) { - if (ex.getStatusCode() != HttpURLConnection.HTTP_NOT_FOUND) { - throw ex; + renameBlob(source, destination, lease, tracingContext); + } catch (AzureBlobFileSystemException ex) { + if (lease != null) { + lease.free(); } - blobPropOnSrc = null; - } - - if (blobPropOnSrc != null) { - isSrcExist = true; - if (blobPropOnSrc.getIsDirectory()) { - LOG.debug("source {} is a marker blob", source); - isSrcDir = true; - listBlobQueue = new ListBlobQueue(0,0); - listBlobQueue.complete(); - } else { - LOG.debug("source {} exists but is not a marker blob", source); - isSrcDir = false; + LOG.error( + String.format("Rename of path from %s to %s failed", + source, destination), ex); + if (ex instanceof AbfsRestOperationException + && ((AbfsRestOperationException) ex).getStatusCode() + == HTTP_NOT_FOUND) { + AbfsRestOperationException ex1 = (AbfsRestOperationException) ex; + throw new AbfsRestOperationException( + ex1.getStatusCode(), + AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND.getErrorCode(), + ex1.getErrorMessage(), ex1); } - } else { - LOG.debug("source {} doesn't exist", source); - isSrcExist = false; - isSrcDir = false; - } - } - - if (!isSrcExist) { - LOG.info("source {} doesn't exists", source); - throw new AbfsRestOperationException(HttpURLConnection.HTTP_NOT_FOUND, - AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND.getErrorCode(), null, - null); - } - if (isSrcDir) { - /* - * If source is a directory, all the blobs in the directory have to be - * individually copied and then deleted at the source. - */ - LOG.debug("source {} is a directory", source); - final AbfsBlobLease srcDirLease; - final Boolean isAtomicRename; - if (isAtomicRenameKey(source.toUri().getPath())) { - LOG.debug("source dir {} is an atomicRenameKey", - source.toUri().getPath()); - srcDirLease = getBlobLease(source.toUri().getPath(), - BLOB_LEASE_ONE_MINUTE_DURATION, - tracingContext); - renameAtomicityUtils.preRename(srcBlobProperties, isCreateOperationOnBlobEndpoint()); - isAtomicRename = true; - } else { - srcDirLease = null; - isAtomicRename = false; - LOG.debug("source dir {} is not an atomicRenameKey", - source.toUri().getPath()); - } - - renameBlobDir(source, destination, tracingContext, listBlobQueue, - srcDirLease, isAtomicRename); - - if (renameAtomicityUtils != null) { - renameAtomicityUtils.cleanup(); - } - } else { - LOG.debug("source {} is not directory", source); - AbfsLease lease = null; - if (isAtomicRenameKey(source.toUri().getPath())) { - lease = getBlobLease(source.toUri().getPath(), - BLOB_LEASE_ONE_MINUTE_DURATION, tracingContext); + throw ex; } - renameBlob(blobPropOnSrc.getPath(), destination, lease, tracingContext - ); } LOG.info("Rename from source {} to destination {} done", source, destination); @@ -1625,6 +1528,83 @@ public void rename(final Path source, final Path destination, } while (shouldContinue); } + private void orchestrateBlobRenameDir(final Path source, + final Path destination, + final RenameAtomicityUtils renameAtomicityUtils, + final TracingContext tracingContext, + final String listSrc, + final BlobList blobList) throws IOException { + ListBlobQueue listBlobQueue = new ListBlobQueue( + blobList.getBlobPropertyList(), + getAbfsConfiguration().getProducerQueueMaxSize(), + getAbfsConfiguration().getBlobDirRenameMaxThread()); + + if (blobList.getNextMarker() != null) { + new ListBlobProducer(listSrc, + client, listBlobQueue, blobList.getNextMarker(), tracingContext); + } else { + listBlobQueue.complete(); + } + LOG.debug("src {} exists and is a directory", source); + /* + * Fetch if there is a marker-blob for the source blob. + */ + BlobProperty blobPropOnSrcNullable; + try { + blobPropOnSrcNullable = getBlobProperty(source, tracingContext); + } catch (AbfsRestOperationException ex) { + if (ex.getStatusCode() != HttpURLConnection.HTTP_NOT_FOUND) { + throw ex; + } + blobPropOnSrcNullable = null; + } + + if (blobPropOnSrcNullable == null) { + /* + * There is no marker-blob, the client has to create marker blob before + * starting the rename. + */ + LOG.debug("Source {} is a directory but there is no marker-blob", + source); + createDirectory(source, null, FsPermission.getDirDefault(), + FsPermission.getUMask( + getAbfsConfiguration().getRawConfiguration()), + tracingContext); + } else { + LOG.debug("Source {} is a directory but there is a marker-blob", + source); + } + /* + * If source is a directory, all the blobs in the directory have to be + * individually copied and then deleted at the source. + */ + LOG.debug("source {} is a directory", source); + final AbfsBlobLease srcDirLease; + final Boolean isAtomicRename; + if (isAtomicRenameKey(source.toUri().getPath())) { + LOG.debug("source dir {} is an atomicRenameKey", + source.toUri().getPath()); + srcDirLease = getBlobLease(source.toUri().getPath(), + BLOB_LEASE_ONE_MINUTE_DURATION, + tracingContext); + renameAtomicityUtils.preRename( + isCreateOperationOnBlobEndpoint()); + isAtomicRename = true; + } else { + srcDirLease = null; + isAtomicRename = false; + LOG.debug("source dir {} is not an atomicRenameKey", + source.toUri().getPath()); + } + + renameBlobDir(source, destination, tracingContext, listBlobQueue, + srcDirLease, isAtomicRename); + + if (isAtomicRename) { + renameAtomicityUtils.cleanup(); + } + } + @VisibleForTesting AbfsBlobLease getBlobLease(final String source, final Integer blobLeaseOneMinuteDuration, diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameAtomicityUtils.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameAtomicityUtils.java index c37080cde21640..9f7b18a3755606 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameAtomicityUtils.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameAtomicityUtils.java @@ -26,7 +26,6 @@ import java.nio.charset.StandardCharsets; import java.text.SimpleDateFormat; import java.util.Date; -import java.util.List; import java.util.TimeZone; import com.fasterxml.jackson.core.JsonParseException; @@ -51,7 +50,7 @@ /** * For a directory enabled for atomic-rename, before rename starts, a * file with -RenamePending.json suffix is created. In this file, the states required - * for the rename are given. This file is created by {@link #preRename(List, Boolean)} ()} method. + * for the rename are given. This file is created by {@link #preRename(Boolean)} ()} method. * This is important in case the JVM process crashes during rename, the atomicity * will be maintained, when the job calls {@link AzureBlobFileSystem#listStatus(Path)} * or {@link AzureBlobFileSystem#getFileStatus(Path)}. On these API calls to filesystem, @@ -198,13 +197,12 @@ private void deleteRenamePendingFile(FileSystem fs, Path redoFile) * } } * @throws IOException Thrown when fail to write file. */ - public void preRename(List blobPropertyList, - final Boolean isCreateOperationOnBlobEndpoint) throws IOException { + public void preRename(final Boolean isCreateOperationOnBlobEndpoint) throws IOException { Path path = getRenamePendingFilePath(); LOG.debug("Preparing to write atomic rename state to {}", path.toString()); OutputStream output = null; - String contents = makeRenamePendingFileContents(blobPropertyList); + String contents = makeRenamePendingFileContents(); // Write file. try { @@ -263,7 +261,7 @@ private Throwable getWrappedException(final IOException e) { * * @return JSON string which represents the operation. */ - private String makeRenamePendingFileContents(List blobPropertyList) { + private String makeRenamePendingFileContents() { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); sdf.setTimeZone(TimeZone.getTimeZone("UTC")); String time = sdf.format(new Date()); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameNonAtomicUtils.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameNonAtomicUtils.java index fba0033d14abb3..786c22a9c4cfe7 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameNonAtomicUtils.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameNonAtomicUtils.java @@ -19,7 +19,6 @@ package org.apache.hadoop.fs.azurebfs.services; import java.io.IOException; -import java.util.List; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; @@ -35,8 +34,7 @@ public RenameNonAtomicUtils(final AzureBlobFileSystem azureBlobFileSystem, } @Override - public void preRename(final List blobPropertyList, - final Boolean isCreateOperationOnBlobEndpoint) + public void preRename(final Boolean isCreateOperationOnBlobEndpoint) throws IOException { } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java index 711cfb69f8885e..1ff224c4dcd116 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java @@ -990,6 +990,7 @@ public void testEmptyDirRenameResolveFromListStatus() throws Exception { String srcDir = "/hbase/test1/test2/test3"; fs.setWorkingDirectory(new Path("/")); fs.mkdirs(new Path(srcDir)); + fs.create(new Path(srcDir, "file1")); fs.mkdirs(new Path("hbase/test4")); AzureBlobFileSystem spiedFs = Mockito.spy(fs); @@ -1007,18 +1008,9 @@ public void testEmptyDirRenameResolveFromListStatus() throws Exception { return op; }).when(spiedClient).acquireBlobLease(Mockito.anyString(), Mockito.anyInt(), Mockito.any(TracingContext.class)); Mockito.doAnswer(answer -> { - final Path srcPath = answer.getArgument(0); - final Path dstPath = answer.getArgument(1); - final String leaseId = answer.getArgument(2); - final TracingContext tracingContext = answer.getArgument(3); - - if (srcDir.equalsIgnoreCase(srcPath.toUri().getPath())) { - throw new AbfsRestOperationException(HttpURLConnection.HTTP_UNAVAILABLE, - AzureServiceErrorCode.INGRESS_OVER_ACCOUNT_LIMIT.getErrorCode(), - "Ingress is over the account limit.", new Exception()); - } - fs.getAbfsStore().copyBlob(srcPath, dstPath, leaseId, tracingContext); - return null; + throw new AbfsRestOperationException(HttpURLConnection.HTTP_UNAVAILABLE, + AzureServiceErrorCode.INGRESS_OVER_ACCOUNT_LIMIT.getErrorCode(), + "Ingress is over the account limit.", new Exception()); }) .when(spiedAbfsStore) .copyBlob(Mockito.any(Path.class), Mockito.any(Path.class), @@ -1027,7 +1019,6 @@ public void testEmptyDirRenameResolveFromListStatus() throws Exception { spiedFs.rename(new Path(srcDir), new Path("hbase/test4")); } catch (Exception ex) { - } Assert.assertFalse(spiedFs.exists( @@ -1035,7 +1026,11 @@ public void testEmptyDirRenameResolveFromListStatus() throws Exception { //call listPath API, it will recover the rename atomicity. for(Map.Entry entry : pathLeaseIdMap.entrySet()) { - fs.getAbfsClient().releaseBlobLease(entry.getKey(), entry.getValue(), Mockito.mock(TracingContext.class)); + try { + fs.getAbfsClient() + .releaseBlobLease(entry.getKey(), entry.getValue(), + Mockito.mock(TracingContext.class)); + } catch (Exception e) {} } final AzureBlobFileSystem spiedFsForListPath = Mockito.spy(fs); @@ -1075,7 +1070,7 @@ public void testEmptyDirRenameResolveFromListStatus() throws Exception { Path path = answer.getArgument(0); String leaseId = answer.getArgument(1); TracingContext tracingContext = answer.getArgument(2); - Assert.assertTrue((srcDir).equalsIgnoreCase(path.toUri().getPath())); + Assert.assertTrue(((srcDir).equalsIgnoreCase(path.toUri().getPath()) || (srcDir+"/file1").equalsIgnoreCase(path.toUri().getPath()))); deletedCount.incrementAndGet(); client.deleteBlobPath(path, leaseId, tracingContext); return null; @@ -1101,7 +1096,7 @@ public void testEmptyDirRenameResolveFromListStatus() throws Exception { Assert.assertTrue(notFoundExceptionReceived); Assert.assertNull(fileStatus); Assert.assertTrue(openRequiredFile[0] == 1); - Assert.assertTrue(deletedCount.get() == 2); + Assert.assertTrue(deletedCount.get() == 3); Assert.assertFalse(spiedFsForListPath.exists(new Path(srcDir))); Assert.assertTrue(spiedFsForListPath.getFileStatus( new Path(srcDir.replace("test1/test2/test3", "test4/test3/"))) @@ -1576,9 +1571,7 @@ public void testCopyBlobTakeTimeAndBlobIsDeleted() throws Exception { fileSystem.create(new Path(srcFile)); - intercept(FileNotFoundException.class, () -> { - fileSystem.rename(new Path(srcFile), new Path(dstFile)); - }); + Assert.assertFalse(fileSystem.rename(new Path(srcFile), new Path(dstFile))); Assert.assertFalse(fileSystem.exists(new Path(dstFile))); } @@ -1740,6 +1733,7 @@ public void testParallelAppendToFileBeingCopiedInAtomicDirectory() @Test public void testParallelBlobLeaseOnChildBlobInRenameSrcDir() throws Exception { + assumeNonHnsAccountBlobEndpoint(getFileSystem()); AzureBlobFileSystem fs = Mockito.spy( (AzureBlobFileSystem) FileSystem.newInstance(getRawConfiguration())); AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());