From 221b21f1c49339a465a8a8ef13ddc7052a150a2a Mon Sep 17 00:00:00 2001 From: Pranav Saxena Date: Fri, 16 Jun 2023 03:44:59 -0700 Subject: [PATCH 01/18] testIfTracingContextPrimaryIdIsSameInAllTheStepsOfBlobRename --- .../fs/azurebfs/utils/TracingContext.java | 3 + .../ITestAzureBlobFileSystemRename.java | 58 +++++++++++++++++++ 2 files changed, 61 insertions(+) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java index 241232ed917dcb..061efef985f362 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java @@ -195,4 +195,7 @@ public String getHeader() { return header; } + public String getPrimaryRequestId() { + return primaryRequestId; + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java index 47388775dc2ad1..d1051e429ab584 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java @@ -53,6 +53,7 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation; import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperationTestUtil; +import org.apache.hadoop.fs.azurebfs.services.RenameAtomicityUtils; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import org.apache.hadoop.fs.azurebfs.services.PrefixMode; @@ -1548,4 +1549,61 @@ public void testCopyAfterSourceHasBeenDeleted() throws Exception { } Assert.assertTrue(srcBlobNotFoundExReceived); } + + @Test + public void testIfTracingContextPrimaryIdIsSameInAllTheStepsOfBlobRename() throws Exception { + assumeNonHnsAccountBlobEndpoint(getFileSystem()); + AzureBlobFileSystem fs = Mockito.spy(getFileSystem()); + fs.mkdirs(new Path("/dir")); + fs.create(new Path("/dir/file1")); + fs.create(new Path("/dir/file2")); + + AzureBlobFileSystemStore store = fs.getAbfsStore(); + AzureBlobFileSystemStore spiedStore = Mockito.spy(store); + AbfsClient client = Mockito.spy(store.getClient()); + spiedStore.setClient(client); + + Mockito.doAnswer(answer -> { + final TracingContext context = answer.getArgument(3); + Mockito.doAnswer(listAnswer -> { + TracingContext listContext = listAnswer.getArgument(3); + Assert.assertEquals(listContext.getPrimaryRequestId(), + context.getPrimaryRequestId()); + return listAnswer.callRealMethod(); + }) + .when(client) + .getListBlobs(Mockito.nullable(String.class), + Mockito.nullable(String.class), Mockito.nullable(Integer.class), + Mockito.any(TracingContext.class)); + + Mockito.doAnswer(copyAnswer -> { + TracingContext listContext = copyAnswer.getArgument(2); + Assert.assertEquals(listContext.getPrimaryRequestId(), + context.getPrimaryRequestId()); + return copyAnswer.callRealMethod(); + }) + .when(client) + .copyBlob(Mockito.any(Path.class), Mockito.any(Path.class), + Mockito.any(TracingContext.class)); + + Mockito.doAnswer(deleteAnswer -> { + TracingContext listContext = deleteAnswer.getArgument(1); + Assert.assertEquals(listContext.getPrimaryRequestId(), + context.getPrimaryRequestId()); + return deleteAnswer.callRealMethod(); + }) + .when(client) + .deleteBlobPath(Mockito.any(Path.class), + Mockito.any(TracingContext.class)); + + return answer.callRealMethod(); + }) + .when(spiedStore) + .rename(Mockito.any(Path.class), Mockito.any(Path.class), + Mockito.any( + RenameAtomicityUtils.class), Mockito.any(TracingContext.class)); + + Mockito.doReturn(spiedStore).when(fs).getAbfsStore(); + fs.rename(new Path("/dir"), new Path("/dir1")); + } } From 1ce147998f2f74d144c0f12f537579becb955a1d Mon Sep 17 00:00:00 2001 From: Pranav Saxena Date: Fri, 16 Jun 2023 03:47:52 -0700 Subject: [PATCH 02/18] nit refactor --- .../fs/azurebfs/ITestAzureBlobFileSystemRename.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java index d1051e429ab584..418deb2921e752 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java @@ -1577,8 +1577,8 @@ public void testIfTracingContextPrimaryIdIsSameInAllTheStepsOfBlobRename() throw Mockito.any(TracingContext.class)); Mockito.doAnswer(copyAnswer -> { - TracingContext listContext = copyAnswer.getArgument(2); - Assert.assertEquals(listContext.getPrimaryRequestId(), + TracingContext copyContext = copyAnswer.getArgument(2); + Assert.assertEquals(copyContext.getPrimaryRequestId(), context.getPrimaryRequestId()); return copyAnswer.callRealMethod(); }) @@ -1587,8 +1587,8 @@ public void testIfTracingContextPrimaryIdIsSameInAllTheStepsOfBlobRename() throw Mockito.any(TracingContext.class)); Mockito.doAnswer(deleteAnswer -> { - TracingContext listContext = deleteAnswer.getArgument(1); - Assert.assertEquals(listContext.getPrimaryRequestId(), + TracingContext deleteContext = deleteAnswer.getArgument(1); + Assert.assertEquals(deleteContext.getPrimaryRequestId(), context.getPrimaryRequestId()); return deleteAnswer.callRealMethod(); }) From e16000f7791874bcc0ab40b5a629ce7939d3faa5 Mon Sep 17 00:00:00 2001 From: Pranav Saxena Date: Sun, 18 Jun 2023 22:42:41 -0700 Subject: [PATCH 03/18] tracingContext to have new field for total renamed blobs in src-blob marker copy/delete --- .../hadoop/fs/azurebfs/AzureBlobFileSystemStore.java | 5 +++++ .../apache/hadoop/fs/azurebfs/utils/TracingContext.java | 9 +++++++++ 2 files changed, 14 insertions(+) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 96a9a27d52e1d9..222a9824db69e5 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -55,6 +55,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.classification.VisibleForTesting; @@ -1618,6 +1619,7 @@ private void renameBlobDir(final Path source, final ExecutorService renameBlobExecutorService = Executors.newFixedThreadPool( getAbfsConfiguration().getBlobDirRenameMaxThread()); + AtomicInteger renamedBlob = new AtomicInteger(0); while(!listBlobConsumer.isCompleted()) { blobList = listBlobConsumer.consume(); if(blobList == null) { @@ -1649,6 +1651,7 @@ private void renameBlobDir(final Path source, blobProperty, source), blobLease != null ? blobLease.getLeaseID() : null, tracingContext); + renamedBlob.incrementAndGet(); } catch (AzureBlobFileSystemException e) { LOG.error(String.format("rename from %s to %s for blob %s failed", source, destination, blobProperty.getPath()), e); @@ -1668,11 +1671,13 @@ private void renameBlobDir(final Path source, } renameBlobExecutorService.shutdown(); + tracingContext.setRenameBlobCount(renamedBlob.get() + 1); renameBlob( blobPropOnSrc.getPath(), createDestinationPathForBlobPartOfRenameSrcDir(destination, blobPropOnSrc, source), srcDirBlobLease != null ? srcDirBlobLease.getLeaseID() : null, tracingContext); + tracingContext.setRenameBlobCount(null); } private Boolean isCreateOperationOnBlobEndpoint() { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java index 061efef985f362..c3814df49aa5a5 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java @@ -68,6 +68,8 @@ public class TracingContext { public static final int MAX_CLIENT_CORRELATION_ID_LENGTH = 72; public static final String CLIENT_CORRELATION_ID_PATTERN = "[a-zA-Z0-9-]*"; + private Integer renameBlobCount = null; + /** * Initialize TracingContext * @param clientCorrelationID Provided over config by client @@ -174,6 +176,9 @@ public void constructHeader(AbfsHttpOperation httpOperation) { clientCorrelationID + ":" + clientRequestId + ":" + fileSystemID + ":" + primaryRequestId + ":" + streamID + ":" + opType + ":" + retryCount + ":" + fallbackDFSAppend; + if (renameBlobCount != null) { + header += (":" + renameBlobCount); + } break; case TWO_ID_FORMAT: header = clientCorrelationID + ":" + clientRequestId; @@ -198,4 +203,8 @@ public String getHeader() { public String getPrimaryRequestId() { return primaryRequestId; } + + public void setRenameBlobCount(Integer count) { + renameBlobCount = count; + } } From 740e3120ad69b8435304bae9ee8ea26801156e3d Mon Sep 17 00:00:00 2001 From: Pranav Saxena Date: Wed, 21 Jun 2023 02:33:02 -0700 Subject: [PATCH 04/18] added for deletion --- .../hadoop/fs/azurebfs/AzureBlobFileSystemStore.java | 12 +++++++++--- .../hadoop/fs/azurebfs/utils/TracingContext.java | 10 +++++----- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 5f3db4a621c852..33465b47551de0 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -1697,13 +1697,13 @@ private void renameBlobDir(final Path source, } renameBlobExecutorService.shutdown(); - tracingContext.setRenameBlobCount(renamedBlob.get() + 1); + tracingContext.setOperatedBlobCount(renamedBlob.get() + 1); renameBlob( source, createDestinationPathForBlobPartOfRenameSrcDir(destination, source, source), srcDirBlobLease, tracingContext); - tracingContext.setRenameBlobCount(null); + tracingContext.setOperatedBlobCount(null); } private Boolean isCreateOperationOnBlobEndpoint() { @@ -1828,7 +1828,7 @@ private void deleteBlobPath(final Path path, throw ex; } - BlobList blobList = client.getListBlobs(null, listSrc, null, + BlobList blobList = client.getListBlobs(null, listSrc, null, null, tracingContext).getResult() .getBlobList(); if (blobList.getBlobPropertyList().size() == 0) { @@ -1881,7 +1881,9 @@ private void deleteBlobPath(final Path path, } if (pathProperty != null && !pathProperty.getIsDirectory()) { + tracingContext.setOperatedBlobCount(1); client.deleteBlobPath(path, null, tracingContext); + tracingContext.setOperatedBlobCount(null); return; } @@ -1901,6 +1903,7 @@ private void deleteOnConsumedBlobs(final TracingContext tracingContext, final BlobProperty pathProperty, final ListBlobConsumer consumer, final Boolean recursive) throws IOException { + AtomicInteger deletedBlobCount = new AtomicInteger(0); while (!consumer.isCompleted()) { final List blobList = consumer.consume(); if (blobList == null) { @@ -1918,6 +1921,7 @@ private void deleteOnConsumedBlobs(final TracingContext tracingContext, futureList.add(deleteBlobExecutorService.submit(() -> { try { client.deleteBlobPath(blobProperty.getPath(), null, tracingContext); + deletedBlobCount.incrementAndGet(); } catch (AzureBlobFileSystemException ex) { if (ex instanceof AbfsRestOperationException && ((AbfsRestOperationException) ex).getStatusCode() @@ -1937,9 +1941,11 @@ private void deleteOnConsumedBlobs(final TracingContext tracingContext, } } } + tracingContext.setOperatedBlobCount(deletedBlobCount.get() + 1); if (pathProperty != null) { client.deleteBlobPath(pathProperty.getPath(), null, tracingContext); } + tracingContext.setOperatedBlobCount(null); } public FileStatus getFileStatus(final Path path, diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java index b3b84cf997b19b..80642e759362ec 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java @@ -78,7 +78,7 @@ public class TracingContext { public static final int MAX_CLIENT_CORRELATION_ID_LENGTH = 72; public static final String CLIENT_CORRELATION_ID_PATTERN = "[a-zA-Z0-9-]*"; - private Integer renameBlobCount = null; + private Integer operatedBlobCount = null; /** * Initialize TracingContext @@ -189,8 +189,8 @@ public void constructHeader(AbfsHttpOperation httpOperation, String previousFail + getPrimaryRequestIdForHeader(retryCount > 0) + ":" + streamID + ":" + opType + ":" + retryCount; header = addFailureReasons(header, previousFailure) + ":" + fallbackDFSAppend; - if (renameBlobCount != null) { - header += (":" + renameBlobCount); + if (operatedBlobCount != null) { + header += (":" + operatedBlobCount); } break; case TWO_ID_FORMAT: @@ -250,7 +250,7 @@ public String getPrimaryRequestId() { return primaryRequestId; } - public void setRenameBlobCount(Integer count) { - renameBlobCount = count; + public void setOperatedBlobCount(Integer count) { + operatedBlobCount = count; } } From 673b0f98bf032b8e0e80f917fe12fc28e5a26aa7 Mon Sep 17 00:00:00 2001 From: Pranav Saxena Date: Sun, 25 Jun 2023 23:22:58 -0700 Subject: [PATCH 05/18] test wip --- .../apache/hadoop/fs/azurebfs/utils/TracingContext.java | 4 ++++ .../hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java | 7 +++++++ 2 files changed, 11 insertions(+) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java index 80642e759362ec..3d68ff153138fe 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java @@ -253,4 +253,8 @@ public String getPrimaryRequestId() { public void setOperatedBlobCount(Integer count) { operatedBlobCount = count; } + + public Integer getOperatedBlobCount() { + return operatedBlobCount; + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java index 894917b8d8a03d..58eb9db9c1f454 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java @@ -414,4 +414,11 @@ public void testDeleteCheckIfParentLMTChange() throws Exception { Long newLmt = fs.getFileStatus(new Path("/dir1")).getModificationTime(); Assertions.assertThat(lmt).isEqualTo(newLmt); } + + @Test + public void testDeleteEmitDeletionCountInClientRequestId() throws Exception { + AzureBlobFileSystem fs = getFileSystem(); + Assume.assumeTrue(getPrefixMode(fs) == PrefixMode.BLOB); + + } } From 70be95ed04158b3eb34d670a143a92fd6399bbee Mon Sep 17 00:00:00 2001 From: Pranav Saxena Date: Mon, 26 Jun 2023 23:05:30 -0700 Subject: [PATCH 06/18] testDeleteEmitDeletionCountInClientRequestId --- .../ITestAzureBlobFileSystemDelete.java | 38 ++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java index 58eb9db9c1f454..9431890da6617d 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java @@ -417,8 +417,44 @@ public void testDeleteCheckIfParentLMTChange() throws Exception { @Test public void testDeleteEmitDeletionCountInClientRequestId() throws Exception { - AzureBlobFileSystem fs = getFileSystem(); + AzureBlobFileSystem fs = Mockito.spy(getFileSystem()); Assume.assumeTrue(getPrefixMode(fs) == PrefixMode.BLOB); + String dirPathStr = "/testDir/dir1"; + fs.mkdirs(new Path(dirPathStr)); + ExecutorService executorService = Executors.newFixedThreadPool(5); + List futures = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + final int iter = i; + Future future = executorService.submit(() -> { + return fs.create(new Path("/testDir/dir1/file" + iter)); + }); + futures.add(future); + } + + for (Future future : futures) { + future.get(); + } + executorService.shutdown(); + + + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + Mockito.doReturn(store).when(fs).getAbfsStore(); + AbfsClient client = Mockito.spy(store.getClient()); + store.setClient(client); + + Mockito.doAnswer(answer -> { + if (dirPathStr.equalsIgnoreCase( + ((Path) answer.getArgument(0)).toUri().getPath())) { + TracingContext tracingContext = answer.getArgument(2); + Assertions.assertThat(tracingContext.getOperatedBlobCount()) + .isEqualTo(11); + } + return answer.callRealMethod(); + }) + .when(client) + .deleteBlobPath(Mockito.any(Path.class), Mockito.nullable(String.class), + Mockito.any(TracingContext.class)); + fs.delete(new Path(dirPathStr), true); } } From 641a84bff352bf9b49193273d14a676f12c016a2 Mon Sep 17 00:00:00 2001 From: Pranav Saxena Date: Mon, 26 Jun 2023 23:32:14 -0700 Subject: [PATCH 07/18] testRenameSrcDirDeleteEmitDeletionCountInClientRequestId --- .../ITestAzureBlobFileSystemRename.java | 43 +++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java index 64c7b19a21e7e8..e9639cf1f5ac95 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java @@ -2030,4 +2030,47 @@ public void testBlobAtomicRenameSrcAndDstAreNotLeftLeased() throws Exception { os.write(bytes); } } + + @Test + public void testRenameSrcDirDeleteEmitDeletionCountInClientRequestId() throws Exception { + AzureBlobFileSystem fs = Mockito.spy(getFileSystem()); + Assume.assumeTrue(getPrefixMode(fs) == PrefixMode.BLOB); + String dirPathStr = "/testDir/dir1"; + fs.mkdirs(new Path(dirPathStr)); + ExecutorService executorService = Executors.newFixedThreadPool(5); + List futures = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + final int iter = i; + Future future = executorService.submit(() -> { + return fs.create(new Path("/testDir/dir1/file" + iter)); + }); + futures.add(future); + } + + for (Future future : futures) { + future.get(); + } + executorService.shutdown(); + + + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + Mockito.doReturn(store).when(fs).getAbfsStore(); + AbfsClient client = Mockito.spy(store.getClient()); + store.setClient(client); + + Mockito.doAnswer(answer -> { + if (dirPathStr.equalsIgnoreCase( + ((Path) answer.getArgument(0)).toUri().getPath())) { + TracingContext tracingContext = answer.getArgument(2); + Assertions.assertThat(tracingContext.getOperatedBlobCount()) + .isEqualTo(11); + } + return answer.callRealMethod(); + }) + .when(client) + .deleteBlobPath(Mockito.any(Path.class), Mockito.nullable(String.class), + Mockito.any(TracingContext.class)); + + fs.rename(new Path(dirPathStr), new Path("/dst/")); + } } From 609d65df8845f565f799db893ada128e6f53968e Mon Sep 17 00:00:00 2001 From: Pranav Saxena Date: Tue, 27 Jun 2023 03:58:03 -0700 Subject: [PATCH 08/18] refactor argument number and added assertion in testIfTracingContextPrimaryIdIsSameInAllTheStepsOfBlobRename --- .../apache/hadoop/fs/azurebfs/utils/TracingContext.java | 4 ++++ .../fs/azurebfs/ITestAzureBlobFileSystemRename.java | 9 ++++++--- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java index 3d68ff153138fe..e2ab95aec91028 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java @@ -257,4 +257,8 @@ public void setOperatedBlobCount(Integer count) { public Integer getOperatedBlobCount() { return operatedBlobCount; } + + public FSOperationType getOpType() { + return opType; + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java index e9639cf1f5ac95..22c8d734471bf9 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java @@ -1683,9 +1683,10 @@ public void testIfTracingContextPrimaryIdIsSameInAllTheStepsOfBlobRename() throw Mockito.doAnswer(answer -> { final TracingContext context = answer.getArgument(3); Mockito.doAnswer(listAnswer -> { - TracingContext listContext = listAnswer.getArgument(3); + TracingContext listContext = listAnswer.getArgument(4); Assert.assertEquals(listContext.getPrimaryRequestId(), context.getPrimaryRequestId()); + Assert.assertTrue(context.getOpType().equals(listContext.getOpType())); return listAnswer.callRealMethod(); }) .when(client) @@ -1694,9 +1695,10 @@ public void testIfTracingContextPrimaryIdIsSameInAllTheStepsOfBlobRename() throw Mockito.nullable(Integer.class), Mockito.any(TracingContext.class)); Mockito.doAnswer(copyAnswer -> { - TracingContext copyContext = copyAnswer.getArgument(2); + TracingContext copyContext = copyAnswer.getArgument(3); Assert.assertEquals(copyContext.getPrimaryRequestId(), context.getPrimaryRequestId()); + Assert.assertTrue(context.getOpType().equals(copyContext.getOpType())); return copyAnswer.callRealMethod(); }) .when(client) @@ -1704,9 +1706,10 @@ public void testIfTracingContextPrimaryIdIsSameInAllTheStepsOfBlobRename() throw Mockito.any(TracingContext.class)); Mockito.doAnswer(deleteAnswer -> { - TracingContext deleteContext = deleteAnswer.getArgument(1); + TracingContext deleteContext = deleteAnswer.getArgument(2); Assert.assertEquals(deleteContext.getPrimaryRequestId(), context.getPrimaryRequestId()); + Assert.assertTrue(context.getOpType().equals(deleteContext.getOpType())); return deleteAnswer.callRealMethod(); }) .when(client) From 4f3e47cd31c73c0bea95b74e3080a56bd4a21f72 Mon Sep 17 00:00:00 2001 From: Pranav Saxena Date: Tue, 27 Jun 2023 05:51:10 -0700 Subject: [PATCH 09/18] testIfTracingContextPrimaryIdIsSameInAllTheStepsOfBlobDelete --- .../ITestAzureBlobFileSystemDelete.java | 70 +++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java index 9431890da6617d..1f1039b3076538 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java @@ -21,6 +21,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; @@ -42,6 +43,7 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; import org.apache.hadoop.fs.azurebfs.services.PrefixMode; import org.apache.hadoop.fs.azurebfs.services.ITestAbfsClient; +import org.apache.hadoop.fs.azurebfs.services.RenameAtomicityUtils; import org.apache.hadoop.fs.azurebfs.services.TestAbfsPerfTracker; import org.apache.hadoop.fs.azurebfs.utils.TestMockHelpers; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; @@ -457,4 +459,72 @@ public void testDeleteEmitDeletionCountInClientRequestId() throws Exception { fs.delete(new Path(dirPathStr), true); } + + @Test + public void testIfTracingContextPrimaryIdIsSameInAllTheStepsOfBlobDelete() + throws Exception { + AzureBlobFileSystem fs = Mockito.spy(getFileSystem()); + Assume.assumeTrue(fs.getAbfsStore().getPrefixMode() == PrefixMode.BLOB); + fs.mkdirs(new Path("/testDir/dir")); + fs.create(new Path("/testDir/dir/file1")); + fs.create(new Path("/testDir/dir/file2")); + + AzureBlobFileSystemStore store = fs.getAbfsStore(); + AzureBlobFileSystemStore spiedStore = Mockito.spy(store); + AbfsClient client = Mockito.spy(store.getClient()); + spiedStore.setClient(client); + + Mockito.doAnswer(answer -> { + final TracingContext context = answer.getArgument(2); + Mockito.doAnswer(listAnswer -> { + TracingContext listContext = listAnswer.getArgument(4); + Assert.assertEquals(listContext.getPrimaryRequestId(), + context.getPrimaryRequestId()); + Assert.assertTrue(context.getOpType().equals(listContext.getOpType())); + return listAnswer.callRealMethod(); + }) + .when(client) + .getListBlobs(Mockito.nullable(String.class), + Mockito.nullable(String.class), Mockito.nullable(String.class), + Mockito.nullable(Integer.class), + Mockito.any(TracingContext.class)); + + Mockito.doAnswer(createBlobPathAnswer -> { + TracingContext createBlobPathContext = createBlobPathAnswer.getArgument( + 5); + Assert.assertEquals(createBlobPathContext.getPrimaryRequestId(), + context.getPrimaryRequestId()); + Assert.assertTrue( + context.getOpType().equals(createBlobPathContext.getOpType())); + return createBlobPathAnswer.callRealMethod(); + }) + .when(client) + .createPathBlob(Mockito.anyString(), Mockito.anyBoolean(), + Mockito.anyBoolean(), Mockito.nullable( + HashMap.class), Mockito.nullable(String.class), + Mockito.any(TracingContext.class)); + + + Mockito.doAnswer(deleteAnswer -> { + TracingContext deleteContext = deleteAnswer.getArgument(2); + Assert.assertEquals(deleteContext.getPrimaryRequestId(), + context.getPrimaryRequestId()); + Assert.assertTrue( + context.getOpType().equals(deleteContext.getOpType())); + return deleteAnswer.callRealMethod(); + }) + .when(client) + .deleteBlobPath(Mockito.any(Path.class), + Mockito.nullable(String.class), + Mockito.any(TracingContext.class)); + + return answer.callRealMethod(); + }) + .when(spiedStore) + .delete(Mockito.any(Path.class), Mockito.anyBoolean(), + Mockito.any(TracingContext.class)); + + Mockito.doReturn(spiedStore).when(fs).getAbfsStore(); + fs.delete(new Path("/testDir/dir"), true); + } } From 643c10f5c5a82b00b49bf7450f0a059c586e5b1a Mon Sep 17 00:00:00 2001 From: Pranav Saxena Date: Tue, 27 Jun 2023 07:46:46 -0700 Subject: [PATCH 10/18] testBlobRenameResumeWithListStatus --- .../fs/azurebfs/AzureBlobFileSystem.java | 8 +-- .../ITestAzureBlobFileSystemRename.java | 66 +++++++++++++++++++ 2 files changed, 70 insertions(+), 4 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index 2c96eaae4fb4bf..03b462f740628f 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -926,18 +926,18 @@ public FileStatus[] listStatus(final Path f) throws IOException { TracingContext tracingContext = new TracingContext(clientCorrelationId, fileSystemId, FSOperationType.LISTSTATUS, true, tracingHeaderFormat, listener); - FileStatus[] result = abfsStore.listStatus(qualifiedPath, tracingContext); + FileStatus[] result = getAbfsStore().listStatus(qualifiedPath, tracingContext); if (getAbfsStore().getAbfsConfiguration().getPrefixMode() == PrefixMode.BLOB) { FileStatus renamePendingFileStatus - = abfsStore.getRenamePendingFileStatus(result); + = getAbfsStore().getRenamePendingFileStatus(result); if (renamePendingFileStatus != null) { RenameAtomicityUtils renameAtomicityUtils = new RenameAtomicityUtils(this, renamePendingFileStatus.getPath(), - abfsStore.getRedoRenameInvocation(tracingContext)); + getAbfsStore().getRedoRenameInvocation(tracingContext)); renameAtomicityUtils.cleanup(renamePendingFileStatus.getPath()); - result = abfsStore.listStatus(qualifiedPath, tracingContext); + result = getAbfsStore().listStatus(qualifiedPath, tracingContext); } } return result; diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java index 22c8d734471bf9..b0691041fb97ca 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java @@ -44,6 +44,8 @@ import org.junit.Test; import org.mockito.Mock; import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -2076,4 +2078,68 @@ public void testRenameSrcDirDeleteEmitDeletionCountInClientRequestId() throws Ex fs.rename(new Path(dirPathStr), new Path("/dst/")); } + + @Test + public void testBlobRenameResumeWithListStatus() throws Exception { + assumeNonHnsAccountBlobEndpoint(getFileSystem()); + AzureBlobFileSystem fs = Mockito.spy(getFileSystem()); + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + + fs.mkdirs(new Path("/hbase/testDir")); + ExecutorService executorService = Executors.newFixedThreadPool(5); + List futures = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + final int iter = i; + futures.add(executorService.submit( + () -> fs.create(new Path("/hbase/testDir/file" + iter)))); + } + + for (Future future : futures) { + future.get(); + } + + AbfsClient client = Mockito.spy(fs.getAbfsClient()); + Mockito.doReturn(store).when(fs).getAbfsStore(); + store.setClient(client); + + AbfsRestOperation op = client.acquireBlobLease("/hbase/testDir/file5", -1, + Mockito.mock(TracingContext.class)); + String leaseId = op.getResult() + .getResponseHeader(HttpHeaderConfigurations.X_MS_LEASE_ID); + intercept(Exception.class, () -> { + fs.rename(new Path("/hbase/testDir"), new Path("/hbase/testDir2")); + }); + client.releaseBlobLease("/hbase/testDir/file5", leaseId, + Mockito.mock(TracingContext.class)); + + TracingContext[] tracingContextCreatedInFsListStatus + = new TracingContext[1]; + Mockito.doAnswer(answer -> { + tracingContextCreatedInFsListStatus[0] = answer.getArgument(1); + return answer.callRealMethod(); + }) + .when(store) + .listStatus(Mockito.any(Path.class), Mockito.any(TracingContext.class)); + + AtomicInteger copied = new AtomicInteger(0); + Mockito.doAnswer(answer -> { + copied.incrementAndGet(); + TracingContext tracingContext = answer.getArgument(3); + Assertions.assertThat(tracingContext) + .isEqualTo(tracingContextCreatedInFsListStatus[0]); + Path path = answer.getArgument(0); + if ("/hbase/testDir".equalsIgnoreCase(path.toUri().getPath())) { + Assertions.assertThat(tracingContext.getOperatedBlobCount()) + .isEqualTo(copied.get()); + } + return answer.callRealMethod(); + }) + .when(store) + .copyBlob(Mockito.any(Path.class), Mockito.any(Path.class), + Mockito.nullable(String.class), Mockito.any(TracingContext.class)); + + fs.listStatus(new Path("/hbase")); + Assertions.assertThat(fs.exists(new Path("/hbase/testDir2"))).isTrue(); + Assertions.assertThat(copied.get()).isGreaterThan(0); + } } From 2e9f1abe25c1cab4d4996a3bc48c51e81b2eca8c Mon Sep 17 00:00:00 2001 From: Pranav Saxena Date: Tue, 27 Jun 2023 08:00:06 -0700 Subject: [PATCH 11/18] testBlobRenameResumeWithGetFileStatus --- .../fs/azurebfs/AzureBlobFileSystem.java | 8 +-- .../ITestAzureBlobFileSystemRename.java | 71 +++++++++++++++++++ 2 files changed, 75 insertions(+), 4 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index 03b462f740628f..0d43697cdda828 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -1070,17 +1070,17 @@ private FileStatus getFileStatus(final Path path, * Get File Status over Blob Endpoint will Have an additional call * to check if directory is implicit. */ - fileStatus = abfsStore.getFileStatus(qualifiedPath, + fileStatus = getAbfsStore().getFileStatus(qualifiedPath, tracingContext, useBlobEndpoint); if (getAbfsStore().getPrefixMode() == PrefixMode.BLOB && fileStatus != null && fileStatus.isDirectory() - && abfsStore.isAtomicRenameKey(fileStatus.getPath().toUri().getPath()) - && abfsStore.getRenamePendingFileStatusInDirectory(fileStatus, + && getAbfsStore().isAtomicRenameKey(fileStatus.getPath().toUri().getPath()) + && getAbfsStore().getRenamePendingFileStatusInDirectory(fileStatus, tracingContext)) { RenameAtomicityUtils renameAtomicityUtils = new RenameAtomicityUtils( this, new Path(fileStatus.getPath().toUri().getPath() + SUFFIX), - abfsStore.getRedoRenameInvocation(tracingContext)); + getAbfsStore().getRedoRenameInvocation(tracingContext)); renameAtomicityUtils.cleanup( new Path(fileStatus.getPath().toUri().getPath() + SUFFIX)); throw new AbfsRestOperationException(HttpURLConnection.HTTP_NOT_FOUND, diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java index b0691041fb97ca..f9a54f16b19922 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java @@ -2142,4 +2142,75 @@ public void testBlobRenameResumeWithListStatus() throws Exception { Assertions.assertThat(fs.exists(new Path("/hbase/testDir2"))).isTrue(); Assertions.assertThat(copied.get()).isGreaterThan(0); } + + @Test + public void testBlobRenameResumeWithGetFileStatus() throws Exception { + assumeNonHnsAccountBlobEndpoint(getFileSystem()); + AzureBlobFileSystem fs = Mockito.spy(getFileSystem()); + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + + fs.mkdirs(new Path("/hbase/testDir")); + ExecutorService executorService = Executors.newFixedThreadPool(5); + List futures = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + final int iter = i; + futures.add(executorService.submit( + () -> fs.create(new Path("/hbase/testDir/file" + iter)))); + } + + for (Future future : futures) { + future.get(); + } + + AbfsClient client = Mockito.spy(fs.getAbfsClient()); + Mockito.doReturn(store).when(fs).getAbfsStore(); + store.setClient(client); + + AbfsRestOperation op = client.acquireBlobLease("/hbase/testDir/file5", -1, + Mockito.mock(TracingContext.class)); + String leaseId = op.getResult() + .getResponseHeader(HttpHeaderConfigurations.X_MS_LEASE_ID); + intercept(Exception.class, () -> { + fs.rename(new Path("/hbase/testDir"), new Path("/hbase/testDir2")); + }); + client.releaseBlobLease("/hbase/testDir/file5", leaseId, + Mockito.mock(TracingContext.class)); + + TracingContext[] tracingContextCreatedInFsListStatus + = new TracingContext[1]; + Mockito.doAnswer(answer -> { + synchronized (this) { + if (tracingContextCreatedInFsListStatus[0] == null) { + tracingContextCreatedInFsListStatus[0] = answer.getArgument(1); + } + } + return answer.callRealMethod(); + }) + .when(store) + .getFileStatus(Mockito.any(Path.class), + Mockito.any(TracingContext.class), Mockito.anyBoolean()); + + AtomicInteger copied = new AtomicInteger(0); + Mockito.doAnswer(answer -> { + copied.incrementAndGet(); + TracingContext tracingContext = answer.getArgument(3); + Assertions.assertThat(tracingContext) + .isEqualTo(tracingContextCreatedInFsListStatus[0]); + Path path = answer.getArgument(0); + if ("/hbase/testDir".equalsIgnoreCase(path.toUri().getPath())) { + Assertions.assertThat(tracingContext.getOperatedBlobCount()) + .isEqualTo(copied.get()); + } + return answer.callRealMethod(); + }) + .when(store) + .copyBlob(Mockito.any(Path.class), Mockito.any(Path.class), + Mockito.nullable(String.class), Mockito.any(TracingContext.class)); + + intercept(FileNotFoundException.class, () -> { + fs.getFileStatus(new Path("/hbase/testDir")); + }); + Assertions.assertThat(fs.exists(new Path("/hbase/testDir2"))).isTrue(); + Assertions.assertThat(copied.get()).isGreaterThan(0); + } } From 1607a11971668e1af15514ad30a5acd407abff41 Mon Sep 17 00:00:00 2001 From: Pranav Saxena Date: Tue, 27 Jun 2023 08:11:25 -0700 Subject: [PATCH 12/18] require primary key in getFileStatus --- .../java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index 0d43697cdda828..f4c343cbdde401 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -1048,7 +1048,7 @@ public synchronized void close() throws IOException { @Override public FileStatus getFileStatus(final Path f) throws IOException { TracingContext tracingContext = new TracingContext(clientCorrelationId, - fileSystemId, FSOperationType.GET_FILESTATUS, tracingHeaderFormat, + fileSystemId, FSOperationType.GET_FILESTATUS, true, tracingHeaderFormat, listener); return getFileStatus(f, tracingContext); } From 1d4b6baff69d615e4a56a585199f395a0ed1811c Mon Sep 17 00:00:00 2001 From: Pranav Saxena Date: Tue, 27 Jun 2023 21:22:34 -0700 Subject: [PATCH 13/18] release all lease after rename failure --- .../ITestAzureBlobFileSystemRename.java | 44 +++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java index f9a54f16b19922..f91f34e7f9282c 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java @@ -2106,9 +2106,30 @@ public void testBlobRenameResumeWithListStatus() throws Exception { Mockito.mock(TracingContext.class)); String leaseId = op.getResult() .getResponseHeader(HttpHeaderConfigurations.X_MS_LEASE_ID); + Map pathLeaseIdMap = new HashMap<>(); + Mockito.doAnswer(answer -> { + AbfsRestOperation abfsRestOperation + = (AbfsRestOperation) answer.callRealMethod(); + pathLeaseIdMap.put(answer.getArgument(0), + abfsRestOperation.getResult().getResponseHeader(X_MS_LEASE_ID)); + return abfsRestOperation; + }) + .when(client) + .acquireBlobLease(Mockito.anyString(), Mockito.anyInt(), + Mockito.any(TracingContext.class)); + intercept(Exception.class, () -> { fs.rename(new Path("/hbase/testDir"), new Path("/hbase/testDir2")); }); + + for (Map.Entry entry : pathLeaseIdMap.entrySet()) { + try { + client.releaseBlobLease(entry.getKey(), entry.getValue(), + Mockito.mock(TracingContext.class)); + } catch (Exception e) { + + } + } client.releaseBlobLease("/hbase/testDir/file5", leaseId, Mockito.mock(TracingContext.class)); @@ -2170,12 +2191,35 @@ public void testBlobRenameResumeWithGetFileStatus() throws Exception { Mockito.mock(TracingContext.class)); String leaseId = op.getResult() .getResponseHeader(HttpHeaderConfigurations.X_MS_LEASE_ID); + + Map pathLeaseIdMap = new HashMap<>(); + Mockito.doAnswer(answer -> { + AbfsRestOperation abfsRestOperation + = (AbfsRestOperation) answer.callRealMethod(); + pathLeaseIdMap.put(answer.getArgument(0), + abfsRestOperation.getResult().getResponseHeader(X_MS_LEASE_ID)); + return abfsRestOperation; + }) + .when(client) + .acquireBlobLease(Mockito.anyString(), Mockito.anyInt(), + Mockito.any(TracingContext.class)); + intercept(Exception.class, () -> { fs.rename(new Path("/hbase/testDir"), new Path("/hbase/testDir2")); }); + + for (Map.Entry entry : pathLeaseIdMap.entrySet()) { + try { + client.releaseBlobLease(entry.getKey(), entry.getValue(), + Mockito.mock(TracingContext.class)); + } catch (Exception e) { + + } + } client.releaseBlobLease("/hbase/testDir/file5", leaseId, Mockito.mock(TracingContext.class)); + TracingContext[] tracingContextCreatedInFsListStatus = new TracingContext[1]; Mockito.doAnswer(answer -> { From 460fd867e1ab93693445f56ba2c331bcc4c9f953 Mon Sep 17 00:00:00 2001 From: Pranav Saxena Date: Tue, 27 Jun 2023 22:26:47 -0700 Subject: [PATCH 14/18] concurrency in lease release --- .../ITestAzureBlobFileSystemRename.java | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java index f91f34e7f9282c..06b5b02de2c27f 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Random; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -2106,8 +2107,12 @@ public void testBlobRenameResumeWithListStatus() throws Exception { Mockito.mock(TracingContext.class)); String leaseId = op.getResult() .getResponseHeader(HttpHeaderConfigurations.X_MS_LEASE_ID); - Map pathLeaseIdMap = new HashMap<>(); + Map pathLeaseIdMap = new ConcurrentHashMap<>(); + final AtomicBoolean leaseCanBeTaken = new AtomicBoolean(true); Mockito.doAnswer(answer -> { + if (!leaseCanBeTaken.get()) { + throw new RuntimeException(); + } AbfsRestOperation abfsRestOperation = (AbfsRestOperation) answer.callRealMethod(); pathLeaseIdMap.put(answer.getArgument(0), @@ -2122,6 +2127,7 @@ public void testBlobRenameResumeWithListStatus() throws Exception { fs.rename(new Path("/hbase/testDir"), new Path("/hbase/testDir2")); }); + leaseCanBeTaken.set(false); for (Map.Entry entry : pathLeaseIdMap.entrySet()) { try { client.releaseBlobLease(entry.getKey(), entry.getValue(), @@ -2132,6 +2138,7 @@ public void testBlobRenameResumeWithListStatus() throws Exception { } client.releaseBlobLease("/hbase/testDir/file5", leaseId, Mockito.mock(TracingContext.class)); + leaseCanBeTaken.set(true); TracingContext[] tracingContextCreatedInFsListStatus = new TracingContext[1]; @@ -2192,8 +2199,12 @@ public void testBlobRenameResumeWithGetFileStatus() throws Exception { String leaseId = op.getResult() .getResponseHeader(HttpHeaderConfigurations.X_MS_LEASE_ID); - Map pathLeaseIdMap = new HashMap<>(); + Map pathLeaseIdMap = new ConcurrentHashMap<>(); + AtomicBoolean leaseCanBeTaken = new AtomicBoolean(true); Mockito.doAnswer(answer -> { + if (!leaseCanBeTaken.get()) { + throw new RuntimeException(); + } AbfsRestOperation abfsRestOperation = (AbfsRestOperation) answer.callRealMethod(); pathLeaseIdMap.put(answer.getArgument(0), @@ -2208,6 +2219,7 @@ public void testBlobRenameResumeWithGetFileStatus() throws Exception { fs.rename(new Path("/hbase/testDir"), new Path("/hbase/testDir2")); }); + leaseCanBeTaken.set(false); for (Map.Entry entry : pathLeaseIdMap.entrySet()) { try { client.releaseBlobLease(entry.getKey(), entry.getValue(), @@ -2218,6 +2230,7 @@ public void testBlobRenameResumeWithGetFileStatus() throws Exception { } client.releaseBlobLease("/hbase/testDir/file5", leaseId, Mockito.mock(TracingContext.class)); + leaseCanBeTaken.set(true); TracingContext[] tracingContextCreatedInFsListStatus From c35c71473fa553b6d96774ff4634b0baf7c3b698 Mon Sep 17 00:00:00 2001 From: Pranav Saxena Date: Tue, 27 Jun 2023 23:38:12 -0700 Subject: [PATCH 15/18] remove non-required imports --- .../hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java | 1 - .../hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java | 2 -- 2 files changed, 3 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java index 1f1039b3076538..427fe57fbf526c 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java @@ -43,7 +43,6 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; import org.apache.hadoop.fs.azurebfs.services.PrefixMode; import org.apache.hadoop.fs.azurebfs.services.ITestAbfsClient; -import org.apache.hadoop.fs.azurebfs.services.RenameAtomicityUtils; import org.apache.hadoop.fs.azurebfs.services.TestAbfsPerfTracker; import org.apache.hadoop.fs.azurebfs.utils.TestMockHelpers; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java index 06b5b02de2c27f..1582f81ae45bee 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java @@ -45,8 +45,6 @@ import org.junit.Test; import org.mockito.Mock; import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; From 8bde5daed09a94f17d15e0de4cb9cfb90f39a211 Mon Sep 17 00:00:00 2001 From: Pranav Saxena Date: Wed, 28 Jun 2023 03:23:49 -0700 Subject: [PATCH 16/18] added javadocs on the added tests. --- .../ITestAzureBlobFileSystemDelete.java | 6 ++++++ .../ITestAzureBlobFileSystemRename.java | 18 ++++++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java index 427fe57fbf526c..ad29224082dab7 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java @@ -459,6 +459,12 @@ public void testDeleteEmitDeletionCountInClientRequestId() throws Exception { fs.delete(new Path(dirPathStr), true); } + /** + * Test to assert that the CID in src marker delete contains the + * total number of blobs operated in the delete directory. + * Also, to assert that all operations in the delete-directory flow have same + * primaryId and opType. + */ @Test public void testIfTracingContextPrimaryIdIsSameInAllTheStepsOfBlobDelete() throws Exception { diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java index 1582f81ae45bee..9bc91ffaa20810 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java @@ -2035,6 +2035,12 @@ public void testBlobAtomicRenameSrcAndDstAreNotLeftLeased() throws Exception { } } + /** + * Test to assert that the CID in src marker blob copy and delete contains the + * total number of blobs operated in the rename directory. + * Also, to assert that all operations in the rename-directory flow have same + * primaryId and opType. + */ @Test public void testRenameSrcDirDeleteEmitDeletionCountInClientRequestId() throws Exception { AzureBlobFileSystem fs = Mockito.spy(getFileSystem()); @@ -2078,6 +2084,12 @@ public void testRenameSrcDirDeleteEmitDeletionCountInClientRequestId() throws Ex fs.rename(new Path(dirPathStr), new Path("/dst/")); } + /** + * Test to assert that the rename resume from FileStatus uses the same + * {@link TracingContext} object used by the initial ListStatus call. + * Also assert that last rename's copy and delete API call would push count + * of blobs operated in resume flow in clientRequestId. + */ @Test public void testBlobRenameResumeWithListStatus() throws Exception { assumeNonHnsAccountBlobEndpoint(getFileSystem()); @@ -2169,6 +2181,12 @@ public void testBlobRenameResumeWithListStatus() throws Exception { Assertions.assertThat(copied.get()).isGreaterThan(0); } + /** + * Test to assert that the rename resume from FileStatus uses the same + * {@link TracingContext} object used by the initial GetFileStatus call. + * Also assert that last rename's copy and delete API call would push count + * of blobs operated in resume flow in clientRequestId. + */ @Test public void testBlobRenameResumeWithGetFileStatus() throws Exception { assumeNonHnsAccountBlobEndpoint(getFileSystem()); From 024b451e33d0a01ed8de61b2186bc4e8381b7798 Mon Sep 17 00:00:00 2001 From: Pranav Saxena Date: Wed, 28 Jun 2023 22:08:26 -0700 Subject: [PATCH 17/18] Refactor to bring out common functionality into methods; aggregate similar tests --- .../ITestAzureBlobFileSystemDelete.java | 66 ++--- .../ITestAzureBlobFileSystemRename.java | 261 +++++++----------- 2 files changed, 126 insertions(+), 201 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java index ad29224082dab7..6c3272a90fc54f 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java @@ -416,6 +416,12 @@ public void testDeleteCheckIfParentLMTChange() throws Exception { Assertions.assertThat(lmt).isEqualTo(newLmt); } + /** + * Test to assert that the CID in src marker delete contains the + * total number of blobs operated in the delete directory. + * Also, to assert that all operations in the delete-directory flow have same + * primaryId and opType. + */ @Test public void testDeleteEmitDeletionCountInClientRequestId() throws Exception { AzureBlobFileSystem fs = Mockito.spy(getFileSystem()); @@ -443,42 +449,6 @@ public void testDeleteEmitDeletionCountInClientRequestId() throws Exception { AbfsClient client = Mockito.spy(store.getClient()); store.setClient(client); - Mockito.doAnswer(answer -> { - if (dirPathStr.equalsIgnoreCase( - ((Path) answer.getArgument(0)).toUri().getPath())) { - TracingContext tracingContext = answer.getArgument(2); - Assertions.assertThat(tracingContext.getOperatedBlobCount()) - .isEqualTo(11); - } - return answer.callRealMethod(); - }) - .when(client) - .deleteBlobPath(Mockito.any(Path.class), Mockito.nullable(String.class), - Mockito.any(TracingContext.class)); - - fs.delete(new Path(dirPathStr), true); - } - - /** - * Test to assert that the CID in src marker delete contains the - * total number of blobs operated in the delete directory. - * Also, to assert that all operations in the delete-directory flow have same - * primaryId and opType. - */ - @Test - public void testIfTracingContextPrimaryIdIsSameInAllTheStepsOfBlobDelete() - throws Exception { - AzureBlobFileSystem fs = Mockito.spy(getFileSystem()); - Assume.assumeTrue(fs.getAbfsStore().getPrefixMode() == PrefixMode.BLOB); - fs.mkdirs(new Path("/testDir/dir")); - fs.create(new Path("/testDir/dir/file1")); - fs.create(new Path("/testDir/dir/file2")); - - AzureBlobFileSystemStore store = fs.getAbfsStore(); - AzureBlobFileSystemStore spiedStore = Mockito.spy(store); - AbfsClient client = Mockito.spy(store.getClient()); - spiedStore.setClient(client); - Mockito.doAnswer(answer -> { final TracingContext context = answer.getArgument(2); Mockito.doAnswer(listAnswer -> { @@ -516,6 +486,12 @@ public void testIfTracingContextPrimaryIdIsSameInAllTheStepsOfBlobDelete() context.getPrimaryRequestId()); Assert.assertTrue( context.getOpType().equals(deleteContext.getOpType())); + if (dirPathStr.equalsIgnoreCase( + ((Path) deleteAnswer.getArgument(0)).toUri().getPath())) { + TracingContext tracingContext = deleteAnswer.getArgument(2); + Assertions.assertThat(tracingContext.getOperatedBlobCount()) + .isEqualTo(11); + } return deleteAnswer.callRealMethod(); }) .when(client) @@ -525,11 +501,23 @@ public void testIfTracingContextPrimaryIdIsSameInAllTheStepsOfBlobDelete() return answer.callRealMethod(); }) - .when(spiedStore) + .when(store) .delete(Mockito.any(Path.class), Mockito.anyBoolean(), Mockito.any(TracingContext.class)); - Mockito.doReturn(spiedStore).when(fs).getAbfsStore(); - fs.delete(new Path("/testDir/dir"), true); + Mockito.doAnswer(answer -> { + if (dirPathStr.equalsIgnoreCase( + ((Path) answer.getArgument(0)).toUri().getPath())) { + TracingContext tracingContext = answer.getArgument(2); + Assertions.assertThat(tracingContext.getOperatedBlobCount()) + .isEqualTo(11); + } + return answer.callRealMethod(); + }) + .when(client) + .deleteBlobPath(Mockito.any(Path.class), Mockito.nullable(String.class), + Mockito.any(TracingContext.class)); + + fs.delete(new Path(dirPathStr), true); } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java index 9bc91ffaa20810..518b8801a238bf 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java @@ -1668,66 +1668,6 @@ public void testCopyAfterSourceHasBeenDeleted() throws Exception { Assert.assertTrue(srcBlobNotFoundExReceived); } - @Test - public void testIfTracingContextPrimaryIdIsSameInAllTheStepsOfBlobRename() throws Exception { - assumeNonHnsAccountBlobEndpoint(getFileSystem()); - AzureBlobFileSystem fs = Mockito.spy(getFileSystem()); - fs.mkdirs(new Path("/dir")); - fs.create(new Path("/dir/file1")); - fs.create(new Path("/dir/file2")); - - AzureBlobFileSystemStore store = fs.getAbfsStore(); - AzureBlobFileSystemStore spiedStore = Mockito.spy(store); - AbfsClient client = Mockito.spy(store.getClient()); - spiedStore.setClient(client); - - Mockito.doAnswer(answer -> { - final TracingContext context = answer.getArgument(3); - Mockito.doAnswer(listAnswer -> { - TracingContext listContext = listAnswer.getArgument(4); - Assert.assertEquals(listContext.getPrimaryRequestId(), - context.getPrimaryRequestId()); - Assert.assertTrue(context.getOpType().equals(listContext.getOpType())); - return listAnswer.callRealMethod(); - }) - .when(client) - .getListBlobs(Mockito.nullable(String.class), - Mockito.nullable(String.class), Mockito.nullable(String.class), - Mockito.nullable(Integer.class), Mockito.any(TracingContext.class)); - - Mockito.doAnswer(copyAnswer -> { - TracingContext copyContext = copyAnswer.getArgument(3); - Assert.assertEquals(copyContext.getPrimaryRequestId(), - context.getPrimaryRequestId()); - Assert.assertTrue(context.getOpType().equals(copyContext.getOpType())); - return copyAnswer.callRealMethod(); - }) - .when(client) - .copyBlob(Mockito.any(Path.class), Mockito.any(Path.class), Mockito.nullable(String.class), - Mockito.any(TracingContext.class)); - - Mockito.doAnswer(deleteAnswer -> { - TracingContext deleteContext = deleteAnswer.getArgument(2); - Assert.assertEquals(deleteContext.getPrimaryRequestId(), - context.getPrimaryRequestId()); - Assert.assertTrue(context.getOpType().equals(deleteContext.getOpType())); - return deleteAnswer.callRealMethod(); - }) - .when(client) - .deleteBlobPath(Mockito.any(Path.class), Mockito.nullable(String.class), - Mockito.any(TracingContext.class)); - - return answer.callRealMethod(); - }) - .when(spiedStore) - .rename(Mockito.any(Path.class), Mockito.any(Path.class), - Mockito.any( - RenameAtomicityUtils.class), Mockito.any(TracingContext.class)); - - Mockito.doReturn(spiedStore).when(fs).getAbfsStore(); - fs.rename(new Path("/dir"), new Path("/dir1")); - } - @Test public void testParallelRenameForAtomicDirShouldFail() throws Exception { AzureBlobFileSystem fs = Mockito.spy(getFileSystem()); @@ -2042,7 +1982,8 @@ public void testBlobAtomicRenameSrcAndDstAreNotLeftLeased() throws Exception { * primaryId and opType. */ @Test - public void testRenameSrcDirDeleteEmitDeletionCountInClientRequestId() throws Exception { + public void testRenameSrcDirDeleteEmitDeletionCountInClientRequestId() + throws Exception { AzureBlobFileSystem fs = Mockito.spy(getFileSystem()); Assume.assumeTrue(getPrefixMode(fs) == PrefixMode.BLOB); String dirPathStr = "/testDir/dir1"; @@ -2069,17 +2010,57 @@ public void testRenameSrcDirDeleteEmitDeletionCountInClientRequestId() throws Ex store.setClient(client); Mockito.doAnswer(answer -> { - if (dirPathStr.equalsIgnoreCase( - ((Path) answer.getArgument(0)).toUri().getPath())) { - TracingContext tracingContext = answer.getArgument(2); - Assertions.assertThat(tracingContext.getOperatedBlobCount()) - .isEqualTo(11); - } + final TracingContext context = answer.getArgument(3); + Mockito.doAnswer(listAnswer -> { + TracingContext listContext = listAnswer.getArgument(4); + Assert.assertEquals(listContext.getPrimaryRequestId(), + context.getPrimaryRequestId()); + Assert.assertTrue(context.getOpType().equals(listContext.getOpType())); + return listAnswer.callRealMethod(); + }) + .when(client) + .getListBlobs(Mockito.nullable(String.class), + Mockito.nullable(String.class), Mockito.nullable(String.class), + Mockito.nullable(Integer.class), + Mockito.any(TracingContext.class)); + + Mockito.doAnswer(copyAnswer -> { + TracingContext copyContext = copyAnswer.getArgument(3); + Assert.assertEquals(copyContext.getPrimaryRequestId(), + context.getPrimaryRequestId()); + Assert.assertTrue(context.getOpType().equals(copyContext.getOpType())); + return copyAnswer.callRealMethod(); + }) + .when(client) + .copyBlob(Mockito.any(Path.class), Mockito.any(Path.class), + Mockito.nullable(String.class), + Mockito.any(TracingContext.class)); + + Mockito.doAnswer(deleteAnswer -> { + TracingContext deleteContext = deleteAnswer.getArgument(2); + Assert.assertEquals(deleteContext.getPrimaryRequestId(), + context.getPrimaryRequestId()); + Assert.assertTrue( + context.getOpType().equals(deleteContext.getOpType())); + if (dirPathStr.equalsIgnoreCase( + ((Path) deleteAnswer.getArgument(0)).toUri().getPath())) { + TracingContext tracingContext = deleteAnswer.getArgument(2); + Assertions.assertThat(tracingContext.getOperatedBlobCount()) + .isEqualTo(11); + } + return deleteAnswer.callRealMethod(); + }) + .when(client) + .deleteBlobPath(Mockito.any(Path.class), + Mockito.nullable(String.class), + Mockito.any(TracingContext.class)); + return answer.callRealMethod(); }) - .when(client) - .deleteBlobPath(Mockito.any(Path.class), Mockito.nullable(String.class), - Mockito.any(TracingContext.class)); + .when(store) + .rename(Mockito.any(Path.class), Mockito.any(Path.class), + Mockito.any( + RenameAtomicityUtils.class), Mockito.any(TracingContext.class)); fs.rename(new Path(dirPathStr), new Path("/dst/")); } @@ -2095,60 +2076,11 @@ public void testBlobRenameResumeWithListStatus() throws Exception { assumeNonHnsAccountBlobEndpoint(getFileSystem()); AzureBlobFileSystem fs = Mockito.spy(getFileSystem()); AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); - - fs.mkdirs(new Path("/hbase/testDir")); - ExecutorService executorService = Executors.newFixedThreadPool(5); - List futures = new ArrayList<>(); - for (int i = 0; i < 10; i++) { - final int iter = i; - futures.add(executorService.submit( - () -> fs.create(new Path("/hbase/testDir/file" + iter)))); - } - - for (Future future : futures) { - future.get(); - } - AbfsClient client = Mockito.spy(fs.getAbfsClient()); Mockito.doReturn(store).when(fs).getAbfsStore(); store.setClient(client); - AbfsRestOperation op = client.acquireBlobLease("/hbase/testDir/file5", -1, - Mockito.mock(TracingContext.class)); - String leaseId = op.getResult() - .getResponseHeader(HttpHeaderConfigurations.X_MS_LEASE_ID); - Map pathLeaseIdMap = new ConcurrentHashMap<>(); - final AtomicBoolean leaseCanBeTaken = new AtomicBoolean(true); - Mockito.doAnswer(answer -> { - if (!leaseCanBeTaken.get()) { - throw new RuntimeException(); - } - AbfsRestOperation abfsRestOperation - = (AbfsRestOperation) answer.callRealMethod(); - pathLeaseIdMap.put(answer.getArgument(0), - abfsRestOperation.getResult().getResponseHeader(X_MS_LEASE_ID)); - return abfsRestOperation; - }) - .when(client) - .acquireBlobLease(Mockito.anyString(), Mockito.anyInt(), - Mockito.any(TracingContext.class)); - - intercept(Exception.class, () -> { - fs.rename(new Path("/hbase/testDir"), new Path("/hbase/testDir2")); - }); - - leaseCanBeTaken.set(false); - for (Map.Entry entry : pathLeaseIdMap.entrySet()) { - try { - client.releaseBlobLease(entry.getKey(), entry.getValue(), - Mockito.mock(TracingContext.class)); - } catch (Exception e) { - - } - } - client.releaseBlobLease("/hbase/testDir/file5", leaseId, - Mockito.mock(TracingContext.class)); - leaseCanBeTaken.set(true); + renameFailureSetup(fs, client); TracingContext[] tracingContextCreatedInFsListStatus = new TracingContext[1]; @@ -2192,7 +2124,54 @@ public void testBlobRenameResumeWithGetFileStatus() throws Exception { assumeNonHnsAccountBlobEndpoint(getFileSystem()); AzureBlobFileSystem fs = Mockito.spy(getFileSystem()); AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + AbfsClient client = Mockito.spy(fs.getAbfsClient()); + Mockito.doReturn(store).when(fs).getAbfsStore(); + store.setClient(client); + renameFailureSetup(fs, client); + + + TracingContext[] tracingContextCreatedInFsListStatus + = new TracingContext[1]; + Mockito.doAnswer(answer -> { + synchronized (this) { + if (tracingContextCreatedInFsListStatus[0] == null) { + tracingContextCreatedInFsListStatus[0] = answer.getArgument(1); + } + } + return answer.callRealMethod(); + }) + .when(store) + .getFileStatus(Mockito.any(Path.class), + Mockito.any(TracingContext.class), Mockito.anyBoolean()); + + AtomicInteger copied = new AtomicInteger(0); + Mockito.doAnswer(answer -> { + copied.incrementAndGet(); + TracingContext tracingContext = answer.getArgument(3); + Assertions.assertThat(tracingContext) + .isEqualTo(tracingContextCreatedInFsListStatus[0]); + Path path = answer.getArgument(0); + if ("/hbase/testDir".equalsIgnoreCase(path.toUri().getPath())) { + Assertions.assertThat(tracingContext.getOperatedBlobCount()) + .isEqualTo(copied.get()); + } + return answer.callRealMethod(); + }) + .when(store) + .copyBlob(Mockito.any(Path.class), Mockito.any(Path.class), + Mockito.nullable(String.class), Mockito.any(TracingContext.class)); + + intercept(FileNotFoundException.class, () -> { + fs.getFileStatus(new Path("/hbase/testDir")); + }); + Assertions.assertThat(fs.exists(new Path("/hbase/testDir2"))).isTrue(); + Assertions.assertThat(copied.get()).isGreaterThan(0); + } + + private void renameFailureSetup(final AzureBlobFileSystem fs, + final AbfsClient client) + throws Exception { fs.mkdirs(new Path("/hbase/testDir")); ExecutorService executorService = Executors.newFixedThreadPool(5); List futures = new ArrayList<>(); @@ -2206,10 +2185,6 @@ public void testBlobRenameResumeWithGetFileStatus() throws Exception { future.get(); } - AbfsClient client = Mockito.spy(fs.getAbfsClient()); - Mockito.doReturn(store).when(fs).getAbfsStore(); - store.setClient(client); - AbfsRestOperation op = client.acquireBlobLease("/hbase/testDir/file5", -1, Mockito.mock(TracingContext.class)); String leaseId = op.getResult() @@ -2247,43 +2222,5 @@ public void testBlobRenameResumeWithGetFileStatus() throws Exception { client.releaseBlobLease("/hbase/testDir/file5", leaseId, Mockito.mock(TracingContext.class)); leaseCanBeTaken.set(true); - - - TracingContext[] tracingContextCreatedInFsListStatus - = new TracingContext[1]; - Mockito.doAnswer(answer -> { - synchronized (this) { - if (tracingContextCreatedInFsListStatus[0] == null) { - tracingContextCreatedInFsListStatus[0] = answer.getArgument(1); - } - } - return answer.callRealMethod(); - }) - .when(store) - .getFileStatus(Mockito.any(Path.class), - Mockito.any(TracingContext.class), Mockito.anyBoolean()); - - AtomicInteger copied = new AtomicInteger(0); - Mockito.doAnswer(answer -> { - copied.incrementAndGet(); - TracingContext tracingContext = answer.getArgument(3); - Assertions.assertThat(tracingContext) - .isEqualTo(tracingContextCreatedInFsListStatus[0]); - Path path = answer.getArgument(0); - if ("/hbase/testDir".equalsIgnoreCase(path.toUri().getPath())) { - Assertions.assertThat(tracingContext.getOperatedBlobCount()) - .isEqualTo(copied.get()); - } - return answer.callRealMethod(); - }) - .when(store) - .copyBlob(Mockito.any(Path.class), Mockito.any(Path.class), - Mockito.nullable(String.class), Mockito.any(TracingContext.class)); - - intercept(FileNotFoundException.class, () -> { - fs.getFileStatus(new Path("/hbase/testDir")); - }); - Assertions.assertThat(fs.exists(new Path("/hbase/testDir2"))).isTrue(); - Assertions.assertThat(copied.get()).isGreaterThan(0); } } From a943c316037453553e7a290b857df4f383b33e1b Mon Sep 17 00:00:00 2001 From: Pranav Saxena <108325433+saxenapranav@users.noreply.github.com> Date: Thu, 29 Jun 2023 03:51:05 -0700 Subject: [PATCH 18/18] Abfs 3.3.2 dev tc review (#16) * tracingContextValidator impl addition * testBlobRenameResumeWithListStatus * Done for getFileStatus as well * general refactor * javadoc; updateRequestId --- .../fs/azurebfs/AzureBlobFileSystem.java | 19 +- .../ITestAzureBlobFileSystemDelete.java | 61 +----- .../ITestAzureBlobFileSystemRename.java | 175 +++++++++--------- .../utils/TracingHeaderValidator.java | 27 ++- 4 files changed, 136 insertions(+), 146 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index f4c343cbdde401..1b9c55440e586d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -871,7 +871,7 @@ public boolean delete(final Path f, final boolean recursive) throws IOException statIncrement(CALL_DELETE); Path qualifiedPath = makeQualified(f); TracingContext tracingContext = new TracingContext(clientCorrelationId, - fileSystemId, FSOperationType.DELETE, tracingHeaderFormat, + fileSystemId, FSOperationType.DELETE, true, tracingHeaderFormat, listener); if (shouldRedirect(FSOperationType.DELETE, tracingContext)) { @@ -933,9 +933,8 @@ public FileStatus[] listStatus(final Path f) throws IOException { = getAbfsStore().getRenamePendingFileStatus(result); if (renamePendingFileStatus != null) { RenameAtomicityUtils renameAtomicityUtils = - new RenameAtomicityUtils(this, - renamePendingFileStatus.getPath(), - getAbfsStore().getRedoRenameInvocation(tracingContext)); + getRenameAtomicityUtilsForRedo(renamePendingFileStatus.getPath(), + tracingContext); renameAtomicityUtils.cleanup(renamePendingFileStatus.getPath()); result = getAbfsStore().listStatus(qualifiedPath, tracingContext); } @@ -947,6 +946,13 @@ public FileStatus[] listStatus(final Path f) throws IOException { } } + RenameAtomicityUtils getRenameAtomicityUtilsForRedo(final Path renamePendingFileStatus, + final TracingContext tracingContext) throws IOException { + return new RenameAtomicityUtils(this, + renamePendingFileStatus, + getAbfsStore().getRedoRenameInvocation(tracingContext)); + } + /** * Increment of an Abfs statistic. * @@ -1077,10 +1083,9 @@ private FileStatus getFileStatus(final Path path, && getAbfsStore().isAtomicRenameKey(fileStatus.getPath().toUri().getPath()) && getAbfsStore().getRenamePendingFileStatusInDirectory(fileStatus, tracingContext)) { - RenameAtomicityUtils renameAtomicityUtils = new RenameAtomicityUtils( - this, + RenameAtomicityUtils renameAtomicityUtils = getRenameAtomicityUtilsForRedo( new Path(fileStatus.getPath().toUri().getPath() + SUFFIX), - getAbfsStore().getRedoRenameInvocation(tracingContext)); + tracingContext); renameAtomicityUtils.cleanup( new Path(fileStatus.getPath().toUri().getPath() + SUFFIX)); throw new AbfsRestOperationException(HttpURLConnection.HTTP_NOT_FOUND, diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java index 6c3272a90fc54f..00a3048c000ebd 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java @@ -449,48 +449,20 @@ public void testDeleteEmitDeletionCountInClientRequestId() throws Exception { AbfsClient client = Mockito.spy(store.getClient()); store.setClient(client); - Mockito.doAnswer(answer -> { - final TracingContext context = answer.getArgument(2); - Mockito.doAnswer(listAnswer -> { - TracingContext listContext = listAnswer.getArgument(4); - Assert.assertEquals(listContext.getPrimaryRequestId(), - context.getPrimaryRequestId()); - Assert.assertTrue(context.getOpType().equals(listContext.getOpType())); - return listAnswer.callRealMethod(); - }) - .when(client) - .getListBlobs(Mockito.nullable(String.class), - Mockito.nullable(String.class), Mockito.nullable(String.class), - Mockito.nullable(Integer.class), - Mockito.any(TracingContext.class)); - - Mockito.doAnswer(createBlobPathAnswer -> { - TracingContext createBlobPathContext = createBlobPathAnswer.getArgument( - 5); - Assert.assertEquals(createBlobPathContext.getPrimaryRequestId(), - context.getPrimaryRequestId()); - Assert.assertTrue( - context.getOpType().equals(createBlobPathContext.getOpType())); - return createBlobPathAnswer.callRealMethod(); - }) - .when(client) - .createPathBlob(Mockito.anyString(), Mockito.anyBoolean(), - Mockito.anyBoolean(), Mockito.nullable( - HashMap.class), Mockito.nullable(String.class), - Mockito.any(TracingContext.class)); - + final TracingHeaderValidator tracingHeaderValidator + = new TracingHeaderValidator( + fs.getAbfsStore().getAbfsConfiguration().getClientCorrelationId(), + fs.getFileSystemId(), FSOperationType.DELETE, true, 0); + fs.registerListener(tracingHeaderValidator); + Mockito.doAnswer(answer -> { Mockito.doAnswer(deleteAnswer -> { - TracingContext deleteContext = deleteAnswer.getArgument(2); - Assert.assertEquals(deleteContext.getPrimaryRequestId(), - context.getPrimaryRequestId()); - Assert.assertTrue( - context.getOpType().equals(deleteContext.getOpType())); if (dirPathStr.equalsIgnoreCase( ((Path) deleteAnswer.getArgument(0)).toUri().getPath())) { - TracingContext tracingContext = deleteAnswer.getArgument(2); - Assertions.assertThat(tracingContext.getOperatedBlobCount()) - .isEqualTo(11); + tracingHeaderValidator.setOperatedBlobCount(11); + Object result = deleteAnswer.callRealMethod(); + tracingHeaderValidator.setOperatedBlobCount(null); + return result; } return deleteAnswer.callRealMethod(); }) @@ -505,19 +477,6 @@ public void testDeleteEmitDeletionCountInClientRequestId() throws Exception { .delete(Mockito.any(Path.class), Mockito.anyBoolean(), Mockito.any(TracingContext.class)); - Mockito.doAnswer(answer -> { - if (dirPathStr.equalsIgnoreCase( - ((Path) answer.getArgument(0)).toUri().getPath())) { - TracingContext tracingContext = answer.getArgument(2); - Assertions.assertThat(tracingContext.getOperatedBlobCount()) - .isEqualTo(11); - } - return answer.callRealMethod(); - }) - .when(client) - .deleteBlobPath(Mockito.any(Path.class), Mockito.nullable(String.class), - Mockito.any(TracingContext.class)); - fs.delete(new Path(dirPathStr), true); } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java index 518b8801a238bf..a13b68e61c8985 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java @@ -41,15 +41,14 @@ import org.assertj.core.api.Assertions; import org.junit.Assert; import org.junit.Assume; -import org.junit.Ignore; import org.junit.Test; -import org.mockito.Mock; import org.mockito.Mockito; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; @@ -60,12 +59,12 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOpTestUtil; import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation; import org.apache.hadoop.fs.azurebfs.services.AbfsLease; -import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperationTestUtil; import org.apache.hadoop.fs.azurebfs.services.RenameAtomicityUtils; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import org.apache.hadoop.fs.azurebfs.services.PrefixMode; +import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_LEASE_CREATE_NON_RECURSIVE; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_REDIRECT_RENAME; @@ -2009,26 +2008,19 @@ public void testRenameSrcDirDeleteEmitDeletionCountInClientRequestId() AbfsClient client = Mockito.spy(store.getClient()); store.setClient(client); - Mockito.doAnswer(answer -> { - final TracingContext context = answer.getArgument(3); - Mockito.doAnswer(listAnswer -> { - TracingContext listContext = listAnswer.getArgument(4); - Assert.assertEquals(listContext.getPrimaryRequestId(), - context.getPrimaryRequestId()); - Assert.assertTrue(context.getOpType().equals(listContext.getOpType())); - return listAnswer.callRealMethod(); - }) - .when(client) - .getListBlobs(Mockito.nullable(String.class), - Mockito.nullable(String.class), Mockito.nullable(String.class), - Mockito.nullable(Integer.class), - Mockito.any(TracingContext.class)); + final TracingHeaderValidator tracingHeaderValidator + = new TracingHeaderValidator( + fs.getAbfsStore().getAbfsConfiguration().getClientCorrelationId(), + fs.getFileSystemId(), FSOperationType.RENAME, true, 0); + fs.registerListener(tracingHeaderValidator); + Mockito.doAnswer(answer -> { Mockito.doAnswer(copyAnswer -> { - TracingContext copyContext = copyAnswer.getArgument(3); - Assert.assertEquals(copyContext.getPrimaryRequestId(), - context.getPrimaryRequestId()); - Assert.assertTrue(context.getOpType().equals(copyContext.getOpType())); + if (dirPathStr.equalsIgnoreCase( + ((Path) copyAnswer.getArgument(0)).toUri().getPath())) { + tracingHeaderValidator.setOperatedBlobCount(11); + return copyAnswer.callRealMethod(); + } return copyAnswer.callRealMethod(); }) .when(client) @@ -2037,16 +2029,11 @@ public void testRenameSrcDirDeleteEmitDeletionCountInClientRequestId() Mockito.any(TracingContext.class)); Mockito.doAnswer(deleteAnswer -> { - TracingContext deleteContext = deleteAnswer.getArgument(2); - Assert.assertEquals(deleteContext.getPrimaryRequestId(), - context.getPrimaryRequestId()); - Assert.assertTrue( - context.getOpType().equals(deleteContext.getOpType())); if (dirPathStr.equalsIgnoreCase( ((Path) deleteAnswer.getArgument(0)).toUri().getPath())) { - TracingContext tracingContext = deleteAnswer.getArgument(2); - Assertions.assertThat(tracingContext.getOperatedBlobCount()) - .isEqualTo(11); + Object result = deleteAnswer.callRealMethod(); + tracingHeaderValidator.setOperatedBlobCount(null); + return result; } return deleteAnswer.callRealMethod(); }) @@ -2081,34 +2068,11 @@ public void testBlobRenameResumeWithListStatus() throws Exception { store.setClient(client); renameFailureSetup(fs, client); - - TracingContext[] tracingContextCreatedInFsListStatus - = new TracingContext[1]; - Mockito.doAnswer(answer -> { - tracingContextCreatedInFsListStatus[0] = answer.getArgument(1); - return answer.callRealMethod(); - }) - .when(store) - .listStatus(Mockito.any(Path.class), Mockito.any(TracingContext.class)); - - AtomicInteger copied = new AtomicInteger(0); - Mockito.doAnswer(answer -> { - copied.incrementAndGet(); - TracingContext tracingContext = answer.getArgument(3); - Assertions.assertThat(tracingContext) - .isEqualTo(tracingContextCreatedInFsListStatus[0]); - Path path = answer.getArgument(0); - if ("/hbase/testDir".equalsIgnoreCase(path.toUri().getPath())) { - Assertions.assertThat(tracingContext.getOperatedBlobCount()) - .isEqualTo(copied.get()); - } - return answer.callRealMethod(); - }) - .when(store) - .copyBlob(Mockito.any(Path.class), Mockito.any(Path.class), - Mockito.nullable(String.class), Mockito.any(TracingContext.class)); + AtomicInteger copied = assertTracingContextOnRenameResumeProcess(fs, store, + client, FSOperationType.LISTSTATUS); fs.listStatus(new Path("/hbase")); + fs.registerListener(null); Assertions.assertThat(fs.exists(new Path("/hbase/testDir2"))).isTrue(); Assertions.assertThat(copied.get()).isGreaterThan(0); } @@ -2129,38 +2093,8 @@ public void testBlobRenameResumeWithGetFileStatus() throws Exception { store.setClient(client); renameFailureSetup(fs, client); - - - TracingContext[] tracingContextCreatedInFsListStatus - = new TracingContext[1]; - Mockito.doAnswer(answer -> { - synchronized (this) { - if (tracingContextCreatedInFsListStatus[0] == null) { - tracingContextCreatedInFsListStatus[0] = answer.getArgument(1); - } - } - return answer.callRealMethod(); - }) - .when(store) - .getFileStatus(Mockito.any(Path.class), - Mockito.any(TracingContext.class), Mockito.anyBoolean()); - - AtomicInteger copied = new AtomicInteger(0); - Mockito.doAnswer(answer -> { - copied.incrementAndGet(); - TracingContext tracingContext = answer.getArgument(3); - Assertions.assertThat(tracingContext) - .isEqualTo(tracingContextCreatedInFsListStatus[0]); - Path path = answer.getArgument(0); - if ("/hbase/testDir".equalsIgnoreCase(path.toUri().getPath())) { - Assertions.assertThat(tracingContext.getOperatedBlobCount()) - .isEqualTo(copied.get()); - } - return answer.callRealMethod(); - }) - .when(store) - .copyBlob(Mockito.any(Path.class), Mockito.any(Path.class), - Mockito.nullable(String.class), Mockito.any(TracingContext.class)); + AtomicInteger copied = assertTracingContextOnRenameResumeProcess(fs, store, + client, FSOperationType.GET_FILESTATUS); intercept(FileNotFoundException.class, () -> { fs.getFileStatus(new Path("/hbase/testDir")); @@ -2223,4 +2157,71 @@ private void renameFailureSetup(final AzureBlobFileSystem fs, Mockito.mock(TracingContext.class)); leaseCanBeTaken.set(true); } + + private AtomicInteger assertTracingContextOnRenameResumeProcess(final AzureBlobFileSystem fs, + final AzureBlobFileSystemStore store, + final AbfsClient client, final FSOperationType fsOperationType) + throws IOException { + final TracingHeaderValidator tracingHeaderValidator + = new TracingHeaderValidator( + fs.getAbfsStore().getAbfsConfiguration().getClientCorrelationId(), + fs.getFileSystemId(), fsOperationType, true, 0); + fs.registerListener(tracingHeaderValidator); + + AtomicInteger copied = new AtomicInteger(0); + Mockito.doAnswer(answer -> { + copied.incrementAndGet(); + Path path = answer.getArgument(0); + if ("/hbase/testDir".equalsIgnoreCase(path.toUri().getPath())) { + tracingHeaderValidator.setOperatedBlobCount(copied.get()); + } + return answer.callRealMethod(); + }) + .when(store) + .copyBlob(Mockito.any(Path.class), Mockito.any(Path.class), + Mockito.nullable(String.class), Mockito.any(TracingContext.class)); + + /* + * RenameAtomicUtil internally calls Filesystem's API of read and delete + * which would have different primaryIds. But once renameAtomicUtil has read + * the RenamePending JSON, all the operations will use the same tracingContext + * which was started by ListStatus or GetFileStatus operation. + * This is the reason why the validation is disabled until the RenameAtomicUtil + * object reads the JSON. + * The filesystem's delete API called in RenameAtomicUtils.cleanup create a + * new TracingContext object with a new primaryRequestId and also updates the + * new primaryRequestId in the listener object of FileSystem. Therefore, once, + * cleanup method is completed, the listener is explicitly updated with the + * primaryRequestId it was using before the RenameAtomicUtils object was created. + */ + Mockito.doAnswer(answer -> { + final String primaryRequestId = ((TracingContext) answer.getArgument( + 1)).getPrimaryRequestId(); + tracingHeaderValidator.setDisableValidation(true); + RenameAtomicityUtils renameAtomicityUtils = Mockito.spy( + (RenameAtomicityUtils) answer.callRealMethod()); + Mockito.doAnswer(cleanupAnswer -> { + tracingHeaderValidator.setDisableValidation(true); + cleanupAnswer.callRealMethod(); + tracingHeaderValidator.setDisableValidation(false); + tracingHeaderValidator.updatePrimaryRequestID(primaryRequestId); + return null; + }).when(renameAtomicityUtils).cleanup(Mockito.any(Path.class)); + tracingHeaderValidator.setDisableValidation(false); + return renameAtomicityUtils; + }) + .when(fs) + .getRenameAtomicityUtilsForRedo(Mockito.any(Path.class), + Mockito.any(TracingContext.class)); + + Mockito.doAnswer(answer -> { + answer.callRealMethod(); + tracingHeaderValidator.setOperatedBlobCount(null); + return null; + }) + .when(client) + .deleteBlobPath(Mockito.any(Path.class), Mockito.nullable(String.class), + Mockito.any(TracingContext.class)); + return copied; + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderValidator.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderValidator.java index 27a84e4978ad2c..37fb2db1de0ccc 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderValidator.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderValidator.java @@ -38,10 +38,16 @@ public class TracingHeaderValidator implements Listener { private TracingHeaderFormat format; private static final String GUID_PATTERN = "^[0-9a-fA-F]{8}-([0-9a-fA-F]{4}-){3}[0-9a-fA-F]{12}$"; + private Integer operatedBlobCount = null; + + private Boolean disableValidation = false; @Override public void callTracingHeaderValidator(String tracingContextHeader, TracingHeaderFormat format) { + if (disableValidation) { + return; + } this.format = format; validateTracingHeader(tracingContextHeader); } @@ -52,6 +58,9 @@ public TracingHeaderValidator getClone() { clientCorrelationId, fileSystemId, operation, needsPrimaryRequestId, retryNum, streamID); tracingHeaderValidator.primaryRequestId = primaryRequestId; + if (disableValidation) { + tracingHeaderValidator.setDisableValidation(true); + } return tracingHeaderValidator; } @@ -78,6 +87,13 @@ private void validateTracingHeader(String tracingContextHeader) { if (format != TracingHeaderFormat.ALL_ID_FORMAT) { return; } + if (idList.length >= 9) { + if (operatedBlobCount != null) { + Assertions.assertThat(Integer.parseInt(idList[8])) + .describedAs("OperatedBlobCount is incorrect") + .isEqualTo(operatedBlobCount); + } + } if (!primaryRequestId.isEmpty() && !idList[3].isEmpty()) { Assertions.assertThat(idList[3]) .describedAs("PrimaryReqID should be common for these requests") @@ -93,7 +109,8 @@ private void validateTracingHeader(String tracingContextHeader) { private void validateBasicFormat(String[] idList) { if (format == TracingHeaderFormat.ALL_ID_FORMAT) { Assertions.assertThat(idList) - .describedAs("header should have 8 elements").hasSize(8); + .describedAs("header should have 8 or 9 elements") + .hasSizeBetween(8, 9); } else if (format == TracingHeaderFormat.TWO_ID_FORMAT) { Assertions.assertThat(idList) .describedAs("header should have 2 elements").hasSize(2); @@ -152,4 +169,12 @@ public void setOperation(FSOperationType operation) { public void updatePrimaryRequestID(String primaryRequestId) { this.primaryRequestId = primaryRequestId; } + + public void setOperatedBlobCount(Integer operatedBlobCount) { + this.operatedBlobCount = operatedBlobCount; + } + + public void setDisableValidation(Boolean disableValidation) { + this.disableValidation = disableValidation; + } }