From 5cd0c9b9d9ce9f730c50fb74ba417aa79858dbf6 Mon Sep 17 00:00:00 2001 From: Pranav Saxena Date: Thu, 29 Jun 2023 00:30:46 -0700 Subject: [PATCH 1/5] tracingContextValidator impl addition --- .../fs/azurebfs/AzureBlobFileSystem.java | 2 +- .../ITestAzureBlobFileSystemDelete.java | 61 +++---------------- .../ITestAzureBlobFileSystemRename.java | 42 +++++-------- .../utils/TracingHeaderValidator.java | 14 ++++- 4 files changed, 40 insertions(+), 79 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index f4c343cbdde401..1c736d2fbf1730 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -871,7 +871,7 @@ public boolean delete(final Path f, final boolean recursive) throws IOException statIncrement(CALL_DELETE); Path qualifiedPath = makeQualified(f); TracingContext tracingContext = new TracingContext(clientCorrelationId, - fileSystemId, FSOperationType.DELETE, tracingHeaderFormat, + fileSystemId, FSOperationType.DELETE, true, tracingHeaderFormat, listener); if (shouldRedirect(FSOperationType.DELETE, tracingContext)) { diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java index 6c3272a90fc54f..00a3048c000ebd 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java @@ -449,48 +449,20 @@ public void testDeleteEmitDeletionCountInClientRequestId() throws Exception { AbfsClient client = Mockito.spy(store.getClient()); store.setClient(client); - Mockito.doAnswer(answer -> { - final TracingContext context = answer.getArgument(2); - Mockito.doAnswer(listAnswer -> { - TracingContext listContext = listAnswer.getArgument(4); - Assert.assertEquals(listContext.getPrimaryRequestId(), - context.getPrimaryRequestId()); - Assert.assertTrue(context.getOpType().equals(listContext.getOpType())); - return listAnswer.callRealMethod(); - }) - .when(client) - .getListBlobs(Mockito.nullable(String.class), - Mockito.nullable(String.class), Mockito.nullable(String.class), - Mockito.nullable(Integer.class), - Mockito.any(TracingContext.class)); - - Mockito.doAnswer(createBlobPathAnswer -> { - TracingContext createBlobPathContext = createBlobPathAnswer.getArgument( - 5); - Assert.assertEquals(createBlobPathContext.getPrimaryRequestId(), - context.getPrimaryRequestId()); - Assert.assertTrue( - context.getOpType().equals(createBlobPathContext.getOpType())); - return createBlobPathAnswer.callRealMethod(); - }) - .when(client) - .createPathBlob(Mockito.anyString(), Mockito.anyBoolean(), - Mockito.anyBoolean(), Mockito.nullable( - HashMap.class), Mockito.nullable(String.class), - Mockito.any(TracingContext.class)); - + final TracingHeaderValidator tracingHeaderValidator + = new TracingHeaderValidator( + fs.getAbfsStore().getAbfsConfiguration().getClientCorrelationId(), + fs.getFileSystemId(), FSOperationType.DELETE, true, 0); + fs.registerListener(tracingHeaderValidator); + Mockito.doAnswer(answer -> { Mockito.doAnswer(deleteAnswer -> { - TracingContext deleteContext = deleteAnswer.getArgument(2); - Assert.assertEquals(deleteContext.getPrimaryRequestId(), - context.getPrimaryRequestId()); - Assert.assertTrue( - context.getOpType().equals(deleteContext.getOpType())); if (dirPathStr.equalsIgnoreCase( ((Path) deleteAnswer.getArgument(0)).toUri().getPath())) { - TracingContext tracingContext = deleteAnswer.getArgument(2); - Assertions.assertThat(tracingContext.getOperatedBlobCount()) - .isEqualTo(11); + tracingHeaderValidator.setOperatedBlobCount(11); + Object result = deleteAnswer.callRealMethod(); + tracingHeaderValidator.setOperatedBlobCount(null); + return result; } return deleteAnswer.callRealMethod(); }) @@ -505,19 +477,6 @@ public void testDeleteEmitDeletionCountInClientRequestId() throws Exception { .delete(Mockito.any(Path.class), Mockito.anyBoolean(), Mockito.any(TracingContext.class)); - Mockito.doAnswer(answer -> { - if (dirPathStr.equalsIgnoreCase( - ((Path) answer.getArgument(0)).toUri().getPath())) { - TracingContext tracingContext = answer.getArgument(2); - Assertions.assertThat(tracingContext.getOperatedBlobCount()) - .isEqualTo(11); - } - return answer.callRealMethod(); - }) - .when(client) - .deleteBlobPath(Mockito.any(Path.class), Mockito.nullable(String.class), - Mockito.any(TracingContext.class)); - fs.delete(new Path(dirPathStr), true); } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java index 518b8801a238bf..d010ac0a2d5b61 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java @@ -50,6 +50,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; @@ -66,6 +67,7 @@ import org.apache.hadoop.fs.azurebfs.services.RenameAtomicityUtils; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import org.apache.hadoop.fs.azurebfs.services.PrefixMode; +import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_LEASE_CREATE_NON_RECURSIVE; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_REDIRECT_RENAME; @@ -2009,26 +2011,19 @@ public void testRenameSrcDirDeleteEmitDeletionCountInClientRequestId() AbfsClient client = Mockito.spy(store.getClient()); store.setClient(client); - Mockito.doAnswer(answer -> { - final TracingContext context = answer.getArgument(3); - Mockito.doAnswer(listAnswer -> { - TracingContext listContext = listAnswer.getArgument(4); - Assert.assertEquals(listContext.getPrimaryRequestId(), - context.getPrimaryRequestId()); - Assert.assertTrue(context.getOpType().equals(listContext.getOpType())); - return listAnswer.callRealMethod(); - }) - .when(client) - .getListBlobs(Mockito.nullable(String.class), - Mockito.nullable(String.class), Mockito.nullable(String.class), - Mockito.nullable(Integer.class), - Mockito.any(TracingContext.class)); + final TracingHeaderValidator tracingHeaderValidator + = new TracingHeaderValidator( + fs.getAbfsStore().getAbfsConfiguration().getClientCorrelationId(), + fs.getFileSystemId(), FSOperationType.RENAME, true, 0); + fs.registerListener(tracingHeaderValidator); + Mockito.doAnswer(answer -> { Mockito.doAnswer(copyAnswer -> { - TracingContext copyContext = copyAnswer.getArgument(3); - Assert.assertEquals(copyContext.getPrimaryRequestId(), - context.getPrimaryRequestId()); - Assert.assertTrue(context.getOpType().equals(copyContext.getOpType())); + if (dirPathStr.equalsIgnoreCase( + ((Path) copyAnswer.getArgument(0)).toUri().getPath())) { + tracingHeaderValidator.setOperatedBlobCount(11); + return copyAnswer.callRealMethod(); + } return copyAnswer.callRealMethod(); }) .when(client) @@ -2037,16 +2032,11 @@ public void testRenameSrcDirDeleteEmitDeletionCountInClientRequestId() Mockito.any(TracingContext.class)); Mockito.doAnswer(deleteAnswer -> { - TracingContext deleteContext = deleteAnswer.getArgument(2); - Assert.assertEquals(deleteContext.getPrimaryRequestId(), - context.getPrimaryRequestId()); - Assert.assertTrue( - context.getOpType().equals(deleteContext.getOpType())); if (dirPathStr.equalsIgnoreCase( ((Path) deleteAnswer.getArgument(0)).toUri().getPath())) { - TracingContext tracingContext = deleteAnswer.getArgument(2); - Assertions.assertThat(tracingContext.getOperatedBlobCount()) - .isEqualTo(11); + Object result = deleteAnswer.callRealMethod(); + tracingHeaderValidator.setOperatedBlobCount(null); + return result; } return deleteAnswer.callRealMethod(); }) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderValidator.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderValidator.java index 27a84e4978ad2c..068128384930c7 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderValidator.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderValidator.java @@ -38,6 +38,7 @@ public class TracingHeaderValidator implements Listener { private TracingHeaderFormat format; private static final String GUID_PATTERN = "^[0-9a-fA-F]{8}-([0-9a-fA-F]{4}-){3}[0-9a-fA-F]{12}$"; + private Integer operatedBlobCount = null; @Override public void callTracingHeaderValidator(String tracingContextHeader, @@ -78,6 +79,13 @@ private void validateTracingHeader(String tracingContextHeader) { if (format != TracingHeaderFormat.ALL_ID_FORMAT) { return; } + if (idList.length >= 9) { + if (operatedBlobCount != null) { + Assertions.assertThat(Integer.parseInt(idList[8])) + .describedAs("OperatedBlobCount is incorrect") + .isEqualTo(operatedBlobCount); + } + } if (!primaryRequestId.isEmpty() && !idList[3].isEmpty()) { Assertions.assertThat(idList[3]) .describedAs("PrimaryReqID should be common for these requests") @@ -93,7 +101,7 @@ private void validateTracingHeader(String tracingContextHeader) { private void validateBasicFormat(String[] idList) { if (format == TracingHeaderFormat.ALL_ID_FORMAT) { Assertions.assertThat(idList) - .describedAs("header should have 8 elements").hasSize(8); + .describedAs("header should have 8 elements").hasSizeBetween(8, 9); } else if (format == TracingHeaderFormat.TWO_ID_FORMAT) { Assertions.assertThat(idList) .describedAs("header should have 2 elements").hasSize(2); @@ -152,4 +160,8 @@ public void setOperation(FSOperationType operation) { public void updatePrimaryRequestID(String primaryRequestId) { this.primaryRequestId = primaryRequestId; } + + public void setOperatedBlobCount(Integer operatedBlobCount) { + this.operatedBlobCount = operatedBlobCount; + } } From 71a01159dbc653aa448920e9c2af9b9bbc8408c5 Mon Sep 17 00:00:00 2001 From: Pranav Saxena Date: Thu, 29 Jun 2023 02:06:15 -0700 Subject: [PATCH 2/5] testBlobRenameResumeWithListStatus --- .../fs/azurebfs/AzureBlobFileSystem.java | 17 +++++---- .../ITestAzureBlobFileSystemRename.java | 36 ++++++++++++------- .../utils/TracingHeaderValidator.java | 12 +++++++ 3 files changed, 46 insertions(+), 19 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index 1c736d2fbf1730..1b9c55440e586d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -933,9 +933,8 @@ public FileStatus[] listStatus(final Path f) throws IOException { = getAbfsStore().getRenamePendingFileStatus(result); if (renamePendingFileStatus != null) { RenameAtomicityUtils renameAtomicityUtils = - new RenameAtomicityUtils(this, - renamePendingFileStatus.getPath(), - getAbfsStore().getRedoRenameInvocation(tracingContext)); + getRenameAtomicityUtilsForRedo(renamePendingFileStatus.getPath(), + tracingContext); renameAtomicityUtils.cleanup(renamePendingFileStatus.getPath()); result = getAbfsStore().listStatus(qualifiedPath, tracingContext); } @@ -947,6 +946,13 @@ public FileStatus[] listStatus(final Path f) throws IOException { } } + RenameAtomicityUtils getRenameAtomicityUtilsForRedo(final Path renamePendingFileStatus, + final TracingContext tracingContext) throws IOException { + return new RenameAtomicityUtils(this, + renamePendingFileStatus, + getAbfsStore().getRedoRenameInvocation(tracingContext)); + } + /** * Increment of an Abfs statistic. * @@ -1077,10 +1083,9 @@ private FileStatus getFileStatus(final Path path, && getAbfsStore().isAtomicRenameKey(fileStatus.getPath().toUri().getPath()) && getAbfsStore().getRenamePendingFileStatusInDirectory(fileStatus, tracingContext)) { - RenameAtomicityUtils renameAtomicityUtils = new RenameAtomicityUtils( - this, + RenameAtomicityUtils renameAtomicityUtils = getRenameAtomicityUtilsForRedo( new Path(fileStatus.getPath().toUri().getPath() + SUFFIX), - getAbfsStore().getRedoRenameInvocation(tracingContext)); + tracingContext); renameAtomicityUtils.cleanup( new Path(fileStatus.getPath().toUri().getPath() + SUFFIX)); throw new AbfsRestOperationException(HttpURLConnection.HTTP_NOT_FOUND, diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java index d010ac0a2d5b61..ea157ccf4ebbfd 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java @@ -2072,25 +2072,18 @@ public void testBlobRenameResumeWithListStatus() throws Exception { renameFailureSetup(fs, client); - TracingContext[] tracingContextCreatedInFsListStatus - = new TracingContext[1]; - Mockito.doAnswer(answer -> { - tracingContextCreatedInFsListStatus[0] = answer.getArgument(1); - return answer.callRealMethod(); - }) - .when(store) - .listStatus(Mockito.any(Path.class), Mockito.any(TracingContext.class)); + final TracingHeaderValidator tracingHeaderValidator + = new TracingHeaderValidator( + fs.getAbfsStore().getAbfsConfiguration().getClientCorrelationId(), + fs.getFileSystemId(), FSOperationType.LISTSTATUS, true, 0); + fs.registerListener(tracingHeaderValidator); AtomicInteger copied = new AtomicInteger(0); Mockito.doAnswer(answer -> { copied.incrementAndGet(); - TracingContext tracingContext = answer.getArgument(3); - Assertions.assertThat(tracingContext) - .isEqualTo(tracingContextCreatedInFsListStatus[0]); Path path = answer.getArgument(0); if ("/hbase/testDir".equalsIgnoreCase(path.toUri().getPath())) { - Assertions.assertThat(tracingContext.getOperatedBlobCount()) - .isEqualTo(copied.get()); + tracingHeaderValidator.setOperatedBlobCount(copied.get()); } return answer.callRealMethod(); }) @@ -2098,6 +2091,23 @@ public void testBlobRenameResumeWithListStatus() throws Exception { .copyBlob(Mockito.any(Path.class), Mockito.any(Path.class), Mockito.nullable(String.class), Mockito.any(TracingContext.class)); + Mockito.doAnswer(answer -> { + tracingHeaderValidator.setDisableValidation(true); + RenameAtomicityUtils renameAtomicityUtils = Mockito.spy((RenameAtomicityUtils) answer.callRealMethod()); + Mockito.doAnswer(cleanupAnswer -> { + tracingHeaderValidator.setDisableValidation(true); + cleanupAnswer.callRealMethod(); + return null; + }).when(renameAtomicityUtils).cleanup(Mockito.any(Path.class)); + tracingHeaderValidator.setDisableValidation(false); + return renameAtomicityUtils; + }).when(fs).getRenameAtomicityUtilsForRedo(Mockito.any(Path.class), Mockito.any(TracingContext.class)); + + Mockito.doAnswer(answer -> { + tracingHeaderValidator.setOperatedBlobCount(null); + return answer.callRealMethod(); + }).when(client).deleteBlobPath(Mockito.any(Path.class), Mockito.nullable(String.class), Mockito.any(TracingContext.class)); + fs.listStatus(new Path("/hbase")); Assertions.assertThat(fs.exists(new Path("/hbase/testDir2"))).isTrue(); Assertions.assertThat(copied.get()).isGreaterThan(0); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderValidator.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderValidator.java index 068128384930c7..3f675e3d1e242b 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderValidator.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderValidator.java @@ -40,9 +40,14 @@ public class TracingHeaderValidator implements Listener { private static final String GUID_PATTERN = "^[0-9a-fA-F]{8}-([0-9a-fA-F]{4}-){3}[0-9a-fA-F]{12}$"; private Integer operatedBlobCount = null; + private Boolean disableValidation = false; + @Override public void callTracingHeaderValidator(String tracingContextHeader, TracingHeaderFormat format) { + if (disableValidation) { + return; + } this.format = format; validateTracingHeader(tracingContextHeader); } @@ -53,6 +58,9 @@ public TracingHeaderValidator getClone() { clientCorrelationId, fileSystemId, operation, needsPrimaryRequestId, retryNum, streamID); tracingHeaderValidator.primaryRequestId = primaryRequestId; + if (disableValidation) { + tracingHeaderValidator.setDisableValidation(true); + } return tracingHeaderValidator; } @@ -164,4 +172,8 @@ public void updatePrimaryRequestID(String primaryRequestId) { public void setOperatedBlobCount(Integer operatedBlobCount) { this.operatedBlobCount = operatedBlobCount; } + + public void setDisableValidation(Boolean disableValidation) { + this.disableValidation = disableValidation; + } } From fb33d02b8f4bb0c525373703e20200cc8db8db1b Mon Sep 17 00:00:00 2001 From: Pranav Saxena Date: Thu, 29 Jun 2023 02:21:29 -0700 Subject: [PATCH 3/5] Done for getFileStatus as well --- .../ITestAzureBlobFileSystemRename.java | 117 +++++++----------- 1 file changed, 46 insertions(+), 71 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java index ea157ccf4ebbfd..168c523f08d7cd 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java @@ -41,9 +41,7 @@ import org.assertj.core.api.Assertions; import org.junit.Assert; import org.junit.Assume; -import org.junit.Ignore; import org.junit.Test; -import org.mockito.Mock; import org.mockito.Mockito; import org.apache.hadoop.fs.FSDataInputStream; @@ -61,7 +59,6 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOpTestUtil; import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation; import org.apache.hadoop.fs.azurebfs.services.AbfsLease; -import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperationTestUtil; import org.apache.hadoop.fs.azurebfs.services.RenameAtomicityUtils; @@ -2071,42 +2068,8 @@ public void testBlobRenameResumeWithListStatus() throws Exception { store.setClient(client); renameFailureSetup(fs, client); - - final TracingHeaderValidator tracingHeaderValidator - = new TracingHeaderValidator( - fs.getAbfsStore().getAbfsConfiguration().getClientCorrelationId(), - fs.getFileSystemId(), FSOperationType.LISTSTATUS, true, 0); - fs.registerListener(tracingHeaderValidator); - - AtomicInteger copied = new AtomicInteger(0); - Mockito.doAnswer(answer -> { - copied.incrementAndGet(); - Path path = answer.getArgument(0); - if ("/hbase/testDir".equalsIgnoreCase(path.toUri().getPath())) { - tracingHeaderValidator.setOperatedBlobCount(copied.get()); - } - return answer.callRealMethod(); - }) - .when(store) - .copyBlob(Mockito.any(Path.class), Mockito.any(Path.class), - Mockito.nullable(String.class), Mockito.any(TracingContext.class)); - - Mockito.doAnswer(answer -> { - tracingHeaderValidator.setDisableValidation(true); - RenameAtomicityUtils renameAtomicityUtils = Mockito.spy((RenameAtomicityUtils) answer.callRealMethod()); - Mockito.doAnswer(cleanupAnswer -> { - tracingHeaderValidator.setDisableValidation(true); - cleanupAnswer.callRealMethod(); - return null; - }).when(renameAtomicityUtils).cleanup(Mockito.any(Path.class)); - tracingHeaderValidator.setDisableValidation(false); - return renameAtomicityUtils; - }).when(fs).getRenameAtomicityUtilsForRedo(Mockito.any(Path.class), Mockito.any(TracingContext.class)); - - Mockito.doAnswer(answer -> { - tracingHeaderValidator.setOperatedBlobCount(null); - return answer.callRealMethod(); - }).when(client).deleteBlobPath(Mockito.any(Path.class), Mockito.nullable(String.class), Mockito.any(TracingContext.class)); + AtomicInteger copied = assertTracingContextOnRenameResumeProcess(fs, store, + client, FSOperationType.LISTSTATUS); fs.listStatus(new Path("/hbase")); Assertions.assertThat(fs.exists(new Path("/hbase/testDir2"))).isTrue(); @@ -2129,38 +2092,8 @@ public void testBlobRenameResumeWithGetFileStatus() throws Exception { store.setClient(client); renameFailureSetup(fs, client); - - - TracingContext[] tracingContextCreatedInFsListStatus - = new TracingContext[1]; - Mockito.doAnswer(answer -> { - synchronized (this) { - if (tracingContextCreatedInFsListStatus[0] == null) { - tracingContextCreatedInFsListStatus[0] = answer.getArgument(1); - } - } - return answer.callRealMethod(); - }) - .when(store) - .getFileStatus(Mockito.any(Path.class), - Mockito.any(TracingContext.class), Mockito.anyBoolean()); - - AtomicInteger copied = new AtomicInteger(0); - Mockito.doAnswer(answer -> { - copied.incrementAndGet(); - TracingContext tracingContext = answer.getArgument(3); - Assertions.assertThat(tracingContext) - .isEqualTo(tracingContextCreatedInFsListStatus[0]); - Path path = answer.getArgument(0); - if ("/hbase/testDir".equalsIgnoreCase(path.toUri().getPath())) { - Assertions.assertThat(tracingContext.getOperatedBlobCount()) - .isEqualTo(copied.get()); - } - return answer.callRealMethod(); - }) - .when(store) - .copyBlob(Mockito.any(Path.class), Mockito.any(Path.class), - Mockito.nullable(String.class), Mockito.any(TracingContext.class)); + AtomicInteger copied = assertTracingContextOnRenameResumeProcess(fs, store, + client, FSOperationType.GET_FILESTATUS); intercept(FileNotFoundException.class, () -> { fs.getFileStatus(new Path("/hbase/testDir")); @@ -2223,4 +2156,46 @@ private void renameFailureSetup(final AzureBlobFileSystem fs, Mockito.mock(TracingContext.class)); leaseCanBeTaken.set(true); } + + private AtomicInteger assertTracingContextOnRenameResumeProcess(final AzureBlobFileSystem fs, + final AzureBlobFileSystemStore store, + final AbfsClient client, final FSOperationType fsOperationType) throws IOException { + final TracingHeaderValidator tracingHeaderValidator + = new TracingHeaderValidator( + fs.getAbfsStore().getAbfsConfiguration().getClientCorrelationId(), + fs.getFileSystemId(), fsOperationType, true, 0); + fs.registerListener(tracingHeaderValidator); + + AtomicInteger copied = new AtomicInteger(0); + Mockito.doAnswer(answer -> { + copied.incrementAndGet(); + Path path = answer.getArgument(0); + if ("/hbase/testDir".equalsIgnoreCase(path.toUri().getPath())) { + tracingHeaderValidator.setOperatedBlobCount(copied.get()); + } + return answer.callRealMethod(); + }) + .when(store) + .copyBlob(Mockito.any(Path.class), Mockito.any(Path.class), + Mockito.nullable(String.class), Mockito.any(TracingContext.class)); + + Mockito.doAnswer(answer -> { + tracingHeaderValidator.setDisableValidation(true); + RenameAtomicityUtils renameAtomicityUtils = Mockito.spy((RenameAtomicityUtils) answer.callRealMethod()); + Mockito.doAnswer(cleanupAnswer -> { + tracingHeaderValidator.setDisableValidation(true); + cleanupAnswer.callRealMethod(); + return null; + }).when(renameAtomicityUtils).cleanup(Mockito.any(Path.class)); + tracingHeaderValidator.setDisableValidation(false); + return renameAtomicityUtils; + }).when(fs).getRenameAtomicityUtilsForRedo(Mockito.any(Path.class), Mockito.any(TracingContext.class)); + + Mockito.doAnswer(answer -> { + answer.callRealMethod(); + tracingHeaderValidator.setOperatedBlobCount(null); + return null; + }).when(client).deleteBlobPath(Mockito.any(Path.class), Mockito.nullable(String.class), Mockito.any(TracingContext.class)); + return copied; + } } From fbe43a99c4f7e4dfdb14b220b3cbe2b7c62adbb5 Mon Sep 17 00:00:00 2001 From: Pranav Saxena Date: Thu, 29 Jun 2023 02:28:24 -0700 Subject: [PATCH 4/5] general refactor --- .../ITestAzureBlobFileSystemRename.java | 38 +++++++++++-------- .../utils/TracingHeaderValidator.java | 3 +- 2 files changed, 25 insertions(+), 16 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java index 168c523f08d7cd..4aa4d3c75b0956 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java @@ -2159,7 +2159,8 @@ private void renameFailureSetup(final AzureBlobFileSystem fs, private AtomicInteger assertTracingContextOnRenameResumeProcess(final AzureBlobFileSystem fs, final AzureBlobFileSystemStore store, - final AbfsClient client, final FSOperationType fsOperationType) throws IOException { + final AbfsClient client, final FSOperationType fsOperationType) + throws IOException { final TracingHeaderValidator tracingHeaderValidator = new TracingHeaderValidator( fs.getAbfsStore().getAbfsConfiguration().getClientCorrelationId(), @@ -2180,22 +2181,29 @@ private AtomicInteger assertTracingContextOnRenameResumeProcess(final AzureBlobF Mockito.nullable(String.class), Mockito.any(TracingContext.class)); Mockito.doAnswer(answer -> { - tracingHeaderValidator.setDisableValidation(true); - RenameAtomicityUtils renameAtomicityUtils = Mockito.spy((RenameAtomicityUtils) answer.callRealMethod()); - Mockito.doAnswer(cleanupAnswer -> { - tracingHeaderValidator.setDisableValidation(true); - cleanupAnswer.callRealMethod(); - return null; - }).when(renameAtomicityUtils).cleanup(Mockito.any(Path.class)); - tracingHeaderValidator.setDisableValidation(false); - return renameAtomicityUtils; - }).when(fs).getRenameAtomicityUtilsForRedo(Mockito.any(Path.class), Mockito.any(TracingContext.class)); + tracingHeaderValidator.setDisableValidation(true); + RenameAtomicityUtils renameAtomicityUtils = Mockito.spy( + (RenameAtomicityUtils) answer.callRealMethod()); + Mockito.doAnswer(cleanupAnswer -> { + tracingHeaderValidator.setDisableValidation(true); + cleanupAnswer.callRealMethod(); + return null; + }).when(renameAtomicityUtils).cleanup(Mockito.any(Path.class)); + tracingHeaderValidator.setDisableValidation(false); + return renameAtomicityUtils; + }) + .when(fs) + .getRenameAtomicityUtilsForRedo(Mockito.any(Path.class), + Mockito.any(TracingContext.class)); Mockito.doAnswer(answer -> { - answer.callRealMethod(); - tracingHeaderValidator.setOperatedBlobCount(null); - return null; - }).when(client).deleteBlobPath(Mockito.any(Path.class), Mockito.nullable(String.class), Mockito.any(TracingContext.class)); + answer.callRealMethod(); + tracingHeaderValidator.setOperatedBlobCount(null); + return null; + }) + .when(client) + .deleteBlobPath(Mockito.any(Path.class), Mockito.nullable(String.class), + Mockito.any(TracingContext.class)); return copied; } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderValidator.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderValidator.java index 3f675e3d1e242b..37fb2db1de0ccc 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderValidator.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderValidator.java @@ -109,7 +109,8 @@ private void validateTracingHeader(String tracingContextHeader) { private void validateBasicFormat(String[] idList) { if (format == TracingHeaderFormat.ALL_ID_FORMAT) { Assertions.assertThat(idList) - .describedAs("header should have 8 elements").hasSizeBetween(8, 9); + .describedAs("header should have 8 or 9 elements") + .hasSizeBetween(8, 9); } else if (format == TracingHeaderFormat.TWO_ID_FORMAT) { Assertions.assertThat(idList) .describedAs("header should have 2 elements").hasSize(2); From df5d4c10acc862bcd7a3d500c049acb7cddc8e2d Mon Sep 17 00:00:00 2001 From: Pranav Saxena Date: Thu, 29 Jun 2023 03:34:10 -0700 Subject: [PATCH 5/5] javadoc; updateRequestId --- .../ITestAzureBlobFileSystemRename.java | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java index 4aa4d3c75b0956..a13b68e61c8985 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java @@ -2072,6 +2072,7 @@ public void testBlobRenameResumeWithListStatus() throws Exception { client, FSOperationType.LISTSTATUS); fs.listStatus(new Path("/hbase")); + fs.registerListener(null); Assertions.assertThat(fs.exists(new Path("/hbase/testDir2"))).isTrue(); Assertions.assertThat(copied.get()).isGreaterThan(0); } @@ -2180,13 +2181,30 @@ private AtomicInteger assertTracingContextOnRenameResumeProcess(final AzureBlobF .copyBlob(Mockito.any(Path.class), Mockito.any(Path.class), Mockito.nullable(String.class), Mockito.any(TracingContext.class)); + /* + * RenameAtomicUtil internally calls Filesystem's API of read and delete + * which would have different primaryIds. But once renameAtomicUtil has read + * the RenamePending JSON, all the operations will use the same tracingContext + * which was started by ListStatus or GetFileStatus operation. + * This is the reason why the validation is disabled until the RenameAtomicUtil + * object reads the JSON. + * The filesystem's delete API called in RenameAtomicUtils.cleanup create a + * new TracingContext object with a new primaryRequestId and also updates the + * new primaryRequestId in the listener object of FileSystem. Therefore, once, + * cleanup method is completed, the listener is explicitly updated with the + * primaryRequestId it was using before the RenameAtomicUtils object was created. + */ Mockito.doAnswer(answer -> { + final String primaryRequestId = ((TracingContext) answer.getArgument( + 1)).getPrimaryRequestId(); tracingHeaderValidator.setDisableValidation(true); RenameAtomicityUtils renameAtomicityUtils = Mockito.spy( (RenameAtomicityUtils) answer.callRealMethod()); Mockito.doAnswer(cleanupAnswer -> { tracingHeaderValidator.setDisableValidation(true); cleanupAnswer.callRealMethod(); + tracingHeaderValidator.setDisableValidation(false); + tracingHeaderValidator.updatePrimaryRequestID(primaryRequestId); return null; }).when(renameAtomicityUtils).cleanup(Mockito.any(Path.class)); tracingHeaderValidator.setDisableValidation(false);