diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index f4c343cbdde40..1b9c55440e586 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -871,7 +871,7 @@ public boolean delete(final Path f, final boolean recursive) throws IOException statIncrement(CALL_DELETE); Path qualifiedPath = makeQualified(f); TracingContext tracingContext = new TracingContext(clientCorrelationId, - fileSystemId, FSOperationType.DELETE, tracingHeaderFormat, + fileSystemId, FSOperationType.DELETE, true, tracingHeaderFormat, listener); if (shouldRedirect(FSOperationType.DELETE, tracingContext)) { @@ -933,9 +933,8 @@ public FileStatus[] listStatus(final Path f) throws IOException { = getAbfsStore().getRenamePendingFileStatus(result); if (renamePendingFileStatus != null) { RenameAtomicityUtils renameAtomicityUtils = - new RenameAtomicityUtils(this, - renamePendingFileStatus.getPath(), - getAbfsStore().getRedoRenameInvocation(tracingContext)); + getRenameAtomicityUtilsForRedo(renamePendingFileStatus.getPath(), + tracingContext); renameAtomicityUtils.cleanup(renamePendingFileStatus.getPath()); result = getAbfsStore().listStatus(qualifiedPath, tracingContext); } @@ -947,6 +946,13 @@ public FileStatus[] listStatus(final Path f) throws IOException { } } + RenameAtomicityUtils getRenameAtomicityUtilsForRedo(final Path renamePendingFileStatus, + final TracingContext tracingContext) throws IOException { + return new RenameAtomicityUtils(this, + renamePendingFileStatus, + getAbfsStore().getRedoRenameInvocation(tracingContext)); + } + /** * Increment of an Abfs statistic. * @@ -1077,10 +1083,9 @@ private FileStatus getFileStatus(final Path path, && getAbfsStore().isAtomicRenameKey(fileStatus.getPath().toUri().getPath()) && getAbfsStore().getRenamePendingFileStatusInDirectory(fileStatus, tracingContext)) { - RenameAtomicityUtils renameAtomicityUtils = new RenameAtomicityUtils( - this, + RenameAtomicityUtils renameAtomicityUtils = getRenameAtomicityUtilsForRedo( new Path(fileStatus.getPath().toUri().getPath() + SUFFIX), - getAbfsStore().getRedoRenameInvocation(tracingContext)); + tracingContext); renameAtomicityUtils.cleanup( new Path(fileStatus.getPath().toUri().getPath() + SUFFIX)); throw new AbfsRestOperationException(HttpURLConnection.HTTP_NOT_FOUND, diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java index 6c3272a90fc54..00a3048c000eb 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java @@ -449,48 +449,20 @@ public void testDeleteEmitDeletionCountInClientRequestId() throws Exception { AbfsClient client = Mockito.spy(store.getClient()); store.setClient(client); - Mockito.doAnswer(answer -> { - final TracingContext context = answer.getArgument(2); - Mockito.doAnswer(listAnswer -> { - TracingContext listContext = listAnswer.getArgument(4); - Assert.assertEquals(listContext.getPrimaryRequestId(), - context.getPrimaryRequestId()); - Assert.assertTrue(context.getOpType().equals(listContext.getOpType())); - return listAnswer.callRealMethod(); - }) - .when(client) - .getListBlobs(Mockito.nullable(String.class), - Mockito.nullable(String.class), Mockito.nullable(String.class), - Mockito.nullable(Integer.class), - Mockito.any(TracingContext.class)); - - Mockito.doAnswer(createBlobPathAnswer -> { - TracingContext createBlobPathContext = createBlobPathAnswer.getArgument( - 5); - Assert.assertEquals(createBlobPathContext.getPrimaryRequestId(), - context.getPrimaryRequestId()); - Assert.assertTrue( - context.getOpType().equals(createBlobPathContext.getOpType())); - return createBlobPathAnswer.callRealMethod(); - }) - .when(client) - .createPathBlob(Mockito.anyString(), Mockito.anyBoolean(), - Mockito.anyBoolean(), Mockito.nullable( - HashMap.class), Mockito.nullable(String.class), - Mockito.any(TracingContext.class)); - + final TracingHeaderValidator tracingHeaderValidator + = new TracingHeaderValidator( + fs.getAbfsStore().getAbfsConfiguration().getClientCorrelationId(), + fs.getFileSystemId(), FSOperationType.DELETE, true, 0); + fs.registerListener(tracingHeaderValidator); + Mockito.doAnswer(answer -> { Mockito.doAnswer(deleteAnswer -> { - TracingContext deleteContext = deleteAnswer.getArgument(2); - Assert.assertEquals(deleteContext.getPrimaryRequestId(), - context.getPrimaryRequestId()); - Assert.assertTrue( - context.getOpType().equals(deleteContext.getOpType())); if (dirPathStr.equalsIgnoreCase( ((Path) deleteAnswer.getArgument(0)).toUri().getPath())) { - TracingContext tracingContext = deleteAnswer.getArgument(2); - Assertions.assertThat(tracingContext.getOperatedBlobCount()) - .isEqualTo(11); + tracingHeaderValidator.setOperatedBlobCount(11); + Object result = deleteAnswer.callRealMethod(); + tracingHeaderValidator.setOperatedBlobCount(null); + return result; } return deleteAnswer.callRealMethod(); }) @@ -505,19 +477,6 @@ public void testDeleteEmitDeletionCountInClientRequestId() throws Exception { .delete(Mockito.any(Path.class), Mockito.anyBoolean(), Mockito.any(TracingContext.class)); - Mockito.doAnswer(answer -> { - if (dirPathStr.equalsIgnoreCase( - ((Path) answer.getArgument(0)).toUri().getPath())) { - TracingContext tracingContext = answer.getArgument(2); - Assertions.assertThat(tracingContext.getOperatedBlobCount()) - .isEqualTo(11); - } - return answer.callRealMethod(); - }) - .when(client) - .deleteBlobPath(Mockito.any(Path.class), Mockito.nullable(String.class), - Mockito.any(TracingContext.class)); - fs.delete(new Path(dirPathStr), true); } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java index 518b8801a238b..a13b68e61c898 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java @@ -41,15 +41,14 @@ import org.assertj.core.api.Assertions; import org.junit.Assert; import org.junit.Assume; -import org.junit.Ignore; import org.junit.Test; -import org.mockito.Mock; import org.mockito.Mockito; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; @@ -60,12 +59,12 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOpTestUtil; import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation; import org.apache.hadoop.fs.azurebfs.services.AbfsLease; -import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperationTestUtil; import org.apache.hadoop.fs.azurebfs.services.RenameAtomicityUtils; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import org.apache.hadoop.fs.azurebfs.services.PrefixMode; +import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_LEASE_CREATE_NON_RECURSIVE; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_REDIRECT_RENAME; @@ -2009,26 +2008,19 @@ public void testRenameSrcDirDeleteEmitDeletionCountInClientRequestId() AbfsClient client = Mockito.spy(store.getClient()); store.setClient(client); - Mockito.doAnswer(answer -> { - final TracingContext context = answer.getArgument(3); - Mockito.doAnswer(listAnswer -> { - TracingContext listContext = listAnswer.getArgument(4); - Assert.assertEquals(listContext.getPrimaryRequestId(), - context.getPrimaryRequestId()); - Assert.assertTrue(context.getOpType().equals(listContext.getOpType())); - return listAnswer.callRealMethod(); - }) - .when(client) - .getListBlobs(Mockito.nullable(String.class), - Mockito.nullable(String.class), Mockito.nullable(String.class), - Mockito.nullable(Integer.class), - Mockito.any(TracingContext.class)); + final TracingHeaderValidator tracingHeaderValidator + = new TracingHeaderValidator( + fs.getAbfsStore().getAbfsConfiguration().getClientCorrelationId(), + fs.getFileSystemId(), FSOperationType.RENAME, true, 0); + fs.registerListener(tracingHeaderValidator); + Mockito.doAnswer(answer -> { Mockito.doAnswer(copyAnswer -> { - TracingContext copyContext = copyAnswer.getArgument(3); - Assert.assertEquals(copyContext.getPrimaryRequestId(), - context.getPrimaryRequestId()); - Assert.assertTrue(context.getOpType().equals(copyContext.getOpType())); + if (dirPathStr.equalsIgnoreCase( + ((Path) copyAnswer.getArgument(0)).toUri().getPath())) { + tracingHeaderValidator.setOperatedBlobCount(11); + return copyAnswer.callRealMethod(); + } return copyAnswer.callRealMethod(); }) .when(client) @@ -2037,16 +2029,11 @@ public void testRenameSrcDirDeleteEmitDeletionCountInClientRequestId() Mockito.any(TracingContext.class)); Mockito.doAnswer(deleteAnswer -> { - TracingContext deleteContext = deleteAnswer.getArgument(2); - Assert.assertEquals(deleteContext.getPrimaryRequestId(), - context.getPrimaryRequestId()); - Assert.assertTrue( - context.getOpType().equals(deleteContext.getOpType())); if (dirPathStr.equalsIgnoreCase( ((Path) deleteAnswer.getArgument(0)).toUri().getPath())) { - TracingContext tracingContext = deleteAnswer.getArgument(2); - Assertions.assertThat(tracingContext.getOperatedBlobCount()) - .isEqualTo(11); + Object result = deleteAnswer.callRealMethod(); + tracingHeaderValidator.setOperatedBlobCount(null); + return result; } return deleteAnswer.callRealMethod(); }) @@ -2081,34 +2068,11 @@ public void testBlobRenameResumeWithListStatus() throws Exception { store.setClient(client); renameFailureSetup(fs, client); - - TracingContext[] tracingContextCreatedInFsListStatus - = new TracingContext[1]; - Mockito.doAnswer(answer -> { - tracingContextCreatedInFsListStatus[0] = answer.getArgument(1); - return answer.callRealMethod(); - }) - .when(store) - .listStatus(Mockito.any(Path.class), Mockito.any(TracingContext.class)); - - AtomicInteger copied = new AtomicInteger(0); - Mockito.doAnswer(answer -> { - copied.incrementAndGet(); - TracingContext tracingContext = answer.getArgument(3); - Assertions.assertThat(tracingContext) - .isEqualTo(tracingContextCreatedInFsListStatus[0]); - Path path = answer.getArgument(0); - if ("/hbase/testDir".equalsIgnoreCase(path.toUri().getPath())) { - Assertions.assertThat(tracingContext.getOperatedBlobCount()) - .isEqualTo(copied.get()); - } - return answer.callRealMethod(); - }) - .when(store) - .copyBlob(Mockito.any(Path.class), Mockito.any(Path.class), - Mockito.nullable(String.class), Mockito.any(TracingContext.class)); + AtomicInteger copied = assertTracingContextOnRenameResumeProcess(fs, store, + client, FSOperationType.LISTSTATUS); fs.listStatus(new Path("/hbase")); + fs.registerListener(null); Assertions.assertThat(fs.exists(new Path("/hbase/testDir2"))).isTrue(); Assertions.assertThat(copied.get()).isGreaterThan(0); } @@ -2129,38 +2093,8 @@ public void testBlobRenameResumeWithGetFileStatus() throws Exception { store.setClient(client); renameFailureSetup(fs, client); - - - TracingContext[] tracingContextCreatedInFsListStatus - = new TracingContext[1]; - Mockito.doAnswer(answer -> { - synchronized (this) { - if (tracingContextCreatedInFsListStatus[0] == null) { - tracingContextCreatedInFsListStatus[0] = answer.getArgument(1); - } - } - return answer.callRealMethod(); - }) - .when(store) - .getFileStatus(Mockito.any(Path.class), - Mockito.any(TracingContext.class), Mockito.anyBoolean()); - - AtomicInteger copied = new AtomicInteger(0); - Mockito.doAnswer(answer -> { - copied.incrementAndGet(); - TracingContext tracingContext = answer.getArgument(3); - Assertions.assertThat(tracingContext) - .isEqualTo(tracingContextCreatedInFsListStatus[0]); - Path path = answer.getArgument(0); - if ("/hbase/testDir".equalsIgnoreCase(path.toUri().getPath())) { - Assertions.assertThat(tracingContext.getOperatedBlobCount()) - .isEqualTo(copied.get()); - } - return answer.callRealMethod(); - }) - .when(store) - .copyBlob(Mockito.any(Path.class), Mockito.any(Path.class), - Mockito.nullable(String.class), Mockito.any(TracingContext.class)); + AtomicInteger copied = assertTracingContextOnRenameResumeProcess(fs, store, + client, FSOperationType.GET_FILESTATUS); intercept(FileNotFoundException.class, () -> { fs.getFileStatus(new Path("/hbase/testDir")); @@ -2223,4 +2157,71 @@ private void renameFailureSetup(final AzureBlobFileSystem fs, Mockito.mock(TracingContext.class)); leaseCanBeTaken.set(true); } + + private AtomicInteger assertTracingContextOnRenameResumeProcess(final AzureBlobFileSystem fs, + final AzureBlobFileSystemStore store, + final AbfsClient client, final FSOperationType fsOperationType) + throws IOException { + final TracingHeaderValidator tracingHeaderValidator + = new TracingHeaderValidator( + fs.getAbfsStore().getAbfsConfiguration().getClientCorrelationId(), + fs.getFileSystemId(), fsOperationType, true, 0); + fs.registerListener(tracingHeaderValidator); + + AtomicInteger copied = new AtomicInteger(0); + Mockito.doAnswer(answer -> { + copied.incrementAndGet(); + Path path = answer.getArgument(0); + if ("/hbase/testDir".equalsIgnoreCase(path.toUri().getPath())) { + tracingHeaderValidator.setOperatedBlobCount(copied.get()); + } + return answer.callRealMethod(); + }) + .when(store) + .copyBlob(Mockito.any(Path.class), Mockito.any(Path.class), + Mockito.nullable(String.class), Mockito.any(TracingContext.class)); + + /* + * RenameAtomicUtil internally calls Filesystem's API of read and delete + * which would have different primaryIds. But once renameAtomicUtil has read + * the RenamePending JSON, all the operations will use the same tracingContext + * which was started by ListStatus or GetFileStatus operation. + * This is the reason why the validation is disabled until the RenameAtomicUtil + * object reads the JSON. + * The filesystem's delete API called in RenameAtomicUtils.cleanup create a + * new TracingContext object with a new primaryRequestId and also updates the + * new primaryRequestId in the listener object of FileSystem. Therefore, once, + * cleanup method is completed, the listener is explicitly updated with the + * primaryRequestId it was using before the RenameAtomicUtils object was created. + */ + Mockito.doAnswer(answer -> { + final String primaryRequestId = ((TracingContext) answer.getArgument( + 1)).getPrimaryRequestId(); + tracingHeaderValidator.setDisableValidation(true); + RenameAtomicityUtils renameAtomicityUtils = Mockito.spy( + (RenameAtomicityUtils) answer.callRealMethod()); + Mockito.doAnswer(cleanupAnswer -> { + tracingHeaderValidator.setDisableValidation(true); + cleanupAnswer.callRealMethod(); + tracingHeaderValidator.setDisableValidation(false); + tracingHeaderValidator.updatePrimaryRequestID(primaryRequestId); + return null; + }).when(renameAtomicityUtils).cleanup(Mockito.any(Path.class)); + tracingHeaderValidator.setDisableValidation(false); + return renameAtomicityUtils; + }) + .when(fs) + .getRenameAtomicityUtilsForRedo(Mockito.any(Path.class), + Mockito.any(TracingContext.class)); + + Mockito.doAnswer(answer -> { + answer.callRealMethod(); + tracingHeaderValidator.setOperatedBlobCount(null); + return null; + }) + .when(client) + .deleteBlobPath(Mockito.any(Path.class), Mockito.nullable(String.class), + Mockito.any(TracingContext.class)); + return copied; + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderValidator.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderValidator.java index 27a84e4978ad2..37fb2db1de0cc 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderValidator.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderValidator.java @@ -38,10 +38,16 @@ public class TracingHeaderValidator implements Listener { private TracingHeaderFormat format; private static final String GUID_PATTERN = "^[0-9a-fA-F]{8}-([0-9a-fA-F]{4}-){3}[0-9a-fA-F]{12}$"; + private Integer operatedBlobCount = null; + + private Boolean disableValidation = false; @Override public void callTracingHeaderValidator(String tracingContextHeader, TracingHeaderFormat format) { + if (disableValidation) { + return; + } this.format = format; validateTracingHeader(tracingContextHeader); } @@ -52,6 +58,9 @@ public TracingHeaderValidator getClone() { clientCorrelationId, fileSystemId, operation, needsPrimaryRequestId, retryNum, streamID); tracingHeaderValidator.primaryRequestId = primaryRequestId; + if (disableValidation) { + tracingHeaderValidator.setDisableValidation(true); + } return tracingHeaderValidator; } @@ -78,6 +87,13 @@ private void validateTracingHeader(String tracingContextHeader) { if (format != TracingHeaderFormat.ALL_ID_FORMAT) { return; } + if (idList.length >= 9) { + if (operatedBlobCount != null) { + Assertions.assertThat(Integer.parseInt(idList[8])) + .describedAs("OperatedBlobCount is incorrect") + .isEqualTo(operatedBlobCount); + } + } if (!primaryRequestId.isEmpty() && !idList[3].isEmpty()) { Assertions.assertThat(idList[3]) .describedAs("PrimaryReqID should be common for these requests") @@ -93,7 +109,8 @@ private void validateTracingHeader(String tracingContextHeader) { private void validateBasicFormat(String[] idList) { if (format == TracingHeaderFormat.ALL_ID_FORMAT) { Assertions.assertThat(idList) - .describedAs("header should have 8 elements").hasSize(8); + .describedAs("header should have 8 or 9 elements") + .hasSizeBetween(8, 9); } else if (format == TracingHeaderFormat.TWO_ID_FORMAT) { Assertions.assertThat(idList) .describedAs("header should have 2 elements").hasSize(2); @@ -152,4 +169,12 @@ public void setOperation(FSOperationType operation) { public void updatePrimaryRequestID(String primaryRequestId) { this.primaryRequestId = primaryRequestId; } + + public void setOperatedBlobCount(Integer operatedBlobCount) { + this.operatedBlobCount = operatedBlobCount; + } + + public void setDisableValidation(Boolean disableValidation) { + this.disableValidation = disableValidation; + } }