diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index 2c96eaae4fb4b..1b9c55440e586 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -871,7 +871,7 @@ public boolean delete(final Path f, final boolean recursive) throws IOException statIncrement(CALL_DELETE); Path qualifiedPath = makeQualified(f); TracingContext tracingContext = new TracingContext(clientCorrelationId, - fileSystemId, FSOperationType.DELETE, tracingHeaderFormat, + fileSystemId, FSOperationType.DELETE, true, tracingHeaderFormat, listener); if (shouldRedirect(FSOperationType.DELETE, tracingContext)) { @@ -926,18 +926,17 @@ public FileStatus[] listStatus(final Path f) throws IOException { TracingContext tracingContext = new TracingContext(clientCorrelationId, fileSystemId, FSOperationType.LISTSTATUS, true, tracingHeaderFormat, listener); - FileStatus[] result = abfsStore.listStatus(qualifiedPath, tracingContext); + FileStatus[] result = getAbfsStore().listStatus(qualifiedPath, tracingContext); if (getAbfsStore().getAbfsConfiguration().getPrefixMode() == PrefixMode.BLOB) { FileStatus renamePendingFileStatus - = abfsStore.getRenamePendingFileStatus(result); + = getAbfsStore().getRenamePendingFileStatus(result); if (renamePendingFileStatus != null) { RenameAtomicityUtils renameAtomicityUtils = - new RenameAtomicityUtils(this, - renamePendingFileStatus.getPath(), - abfsStore.getRedoRenameInvocation(tracingContext)); + getRenameAtomicityUtilsForRedo(renamePendingFileStatus.getPath(), + tracingContext); renameAtomicityUtils.cleanup(renamePendingFileStatus.getPath()); - result = abfsStore.listStatus(qualifiedPath, tracingContext); + result = getAbfsStore().listStatus(qualifiedPath, tracingContext); } } return result; @@ -947,6 +946,13 @@ public FileStatus[] listStatus(final Path f) throws IOException { } } + RenameAtomicityUtils getRenameAtomicityUtilsForRedo(final Path renamePendingFileStatus, + final TracingContext tracingContext) throws IOException { + return new RenameAtomicityUtils(this, + renamePendingFileStatus, + getAbfsStore().getRedoRenameInvocation(tracingContext)); + } + /** * Increment of an Abfs statistic. * @@ -1048,7 +1054,7 @@ public synchronized void close() throws IOException { @Override public FileStatus getFileStatus(final Path f) throws IOException { TracingContext tracingContext = new TracingContext(clientCorrelationId, - fileSystemId, FSOperationType.GET_FILESTATUS, tracingHeaderFormat, + fileSystemId, FSOperationType.GET_FILESTATUS, true, tracingHeaderFormat, listener); return getFileStatus(f, tracingContext); } @@ -1070,17 +1076,16 @@ private FileStatus getFileStatus(final Path path, * Get File Status over Blob Endpoint will Have an additional call * to check if directory is implicit. */ - fileStatus = abfsStore.getFileStatus(qualifiedPath, + fileStatus = getAbfsStore().getFileStatus(qualifiedPath, tracingContext, useBlobEndpoint); if (getAbfsStore().getPrefixMode() == PrefixMode.BLOB && fileStatus != null && fileStatus.isDirectory() - && abfsStore.isAtomicRenameKey(fileStatus.getPath().toUri().getPath()) - && abfsStore.getRenamePendingFileStatusInDirectory(fileStatus, + && getAbfsStore().isAtomicRenameKey(fileStatus.getPath().toUri().getPath()) + && getAbfsStore().getRenamePendingFileStatusInDirectory(fileStatus, tracingContext)) { - RenameAtomicityUtils renameAtomicityUtils = new RenameAtomicityUtils( - this, + RenameAtomicityUtils renameAtomicityUtils = getRenameAtomicityUtilsForRedo( new Path(fileStatus.getPath().toUri().getPath() + SUFFIX), - abfsStore.getRedoRenameInvocation(tracingContext)); + tracingContext); renameAtomicityUtils.cleanup( new Path(fileStatus.getPath().toUri().getPath() + SUFFIX)); throw new AbfsRestOperationException(HttpURLConnection.HTTP_NOT_FOUND, diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index d9b480b49b70e..5ab6d1b463d9a 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -56,6 +56,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.classification.VisibleForTesting; @@ -1580,6 +1581,7 @@ private void renameBlobDir(final Path source, final ExecutorService renameBlobExecutorService = Executors.newFixedThreadPool( getAbfsConfiguration().getBlobDirRenameMaxThread()); + AtomicInteger renamedBlob = new AtomicInteger(0); while(!listBlobConsumer.isCompleted()) { blobList = listBlobConsumer.consume(); if(blobList == null) { @@ -1611,6 +1613,7 @@ private void renameBlobDir(final Path source, blobProperty.getPath(), source), blobLease, tracingContext); + renamedBlob.incrementAndGet(); } catch (AzureBlobFileSystemException e) { LOG.error(String.format("rename from %s to %s for blob %s failed", source, destination, blobProperty.getPath()), e); @@ -1634,11 +1637,13 @@ private void renameBlobDir(final Path source, } renameBlobExecutorService.shutdown(); + tracingContext.setOperatedBlobCount(renamedBlob.get() + 1); renameBlob( source, createDestinationPathForBlobPartOfRenameSrcDir(destination, source, source), srcDirBlobLease, tracingContext); + tracingContext.setOperatedBlobCount(null); } private Boolean isCreateOperationOnBlobEndpoint() { @@ -1757,7 +1762,6 @@ private void deleteBlobPath(final Path path, if (!path.isRoot()) { listSrcBuilder.append(FORWARD_SLASH); } - String listSrc = listSrcBuilder.toString(); BlobList blobList = client.getListBlobs(null, listSrc, null, null, tracingContext).getResult().getBlobList(); @@ -1868,6 +1872,7 @@ private void createParentDirectory(final Path path, private void deleteOnConsumedBlobs(final Path srcPath, final ListBlobConsumer consumer, final TracingContext tracingContext) throws AzureBlobFileSystemException { + AtomicInteger deletedBlobCount = new AtomicInteger(0); String srcPathStr = srcPath.toUri().getPath(); ExecutorService deleteBlobExecutorService = Executors.newFixedThreadPool( getAbfsConfiguration().getBlobDirDeleteMaxThread()); @@ -1885,6 +1890,7 @@ private void deleteOnConsumedBlobs(final Path srcPath, try { client.deleteBlobPath(blobProperty.getPath(), null, tracingContext); + deletedBlobCount.incrementAndGet(); } catch (AzureBlobFileSystemException ex) { if (ex instanceof AbfsRestOperationException && ((AbfsRestOperationException) ex).getStatusCode() @@ -1912,6 +1918,8 @@ private void deleteOnConsumedBlobs(final Path srcPath, } finally { deleteBlobExecutorService.shutdown(); } + + tracingContext.setOperatedBlobCount(deletedBlobCount.get() + 1); if (!srcPath.isRoot()) { try { LOG.debug(String.format("Deleting Path %s", srcPathStr)); @@ -1926,6 +1934,7 @@ private void deleteOnConsumedBlobs(final Path srcPath, String.format("Path %s is an implicit directory", srcPathStr)); } } + tracingContext.setOperatedBlobCount(null); } public FileStatus getFileStatus(Path path, TracingContext tracingContext, boolean useBlobEndpoint) throws IOException { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java index e121d21a284b2..e2ab95aec9102 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java @@ -78,6 +78,8 @@ public class TracingContext { public static final int MAX_CLIENT_CORRELATION_ID_LENGTH = 72; public static final String CLIENT_CORRELATION_ID_PATTERN = "[a-zA-Z0-9-]*"; + private Integer operatedBlobCount = null; + /** * Initialize TracingContext * @param clientCorrelationID Provided over config by client @@ -187,6 +189,9 @@ public void constructHeader(AbfsHttpOperation httpOperation, String previousFail + getPrimaryRequestIdForHeader(retryCount > 0) + ":" + streamID + ":" + opType + ":" + retryCount; header = addFailureReasons(header, previousFailure) + ":" + fallbackDFSAppend; + if (operatedBlobCount != null) { + header += (":" + operatedBlobCount); + } break; case TWO_ID_FORMAT: header = clientCorrelationID + ":" + clientRequestId; @@ -241,4 +246,19 @@ public String getHeader() { return header; } + public String getPrimaryRequestId() { + return primaryRequestId; + } + + public void setOperatedBlobCount(Integer count) { + operatedBlobCount = count; + } + + public Integer getOperatedBlobCount() { + return operatedBlobCount; + } + + public FSOperationType getOpType() { + return opType; + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java index 894917b8d8a03..00a3048c000eb 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java @@ -21,6 +21,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; @@ -414,4 +415,68 @@ public void testDeleteCheckIfParentLMTChange() throws Exception { Long newLmt = fs.getFileStatus(new Path("/dir1")).getModificationTime(); Assertions.assertThat(lmt).isEqualTo(newLmt); } + + /** + * Test to assert that the CID in src marker delete contains the + * total number of blobs operated in the delete directory. + * Also, to assert that all operations in the delete-directory flow have same + * primaryId and opType. + */ + @Test + public void testDeleteEmitDeletionCountInClientRequestId() throws Exception { + AzureBlobFileSystem fs = Mockito.spy(getFileSystem()); + Assume.assumeTrue(getPrefixMode(fs) == PrefixMode.BLOB); + String dirPathStr = "/testDir/dir1"; + fs.mkdirs(new Path(dirPathStr)); + ExecutorService executorService = Executors.newFixedThreadPool(5); + List futures = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + final int iter = i; + Future future = executorService.submit(() -> { + return fs.create(new Path("/testDir/dir1/file" + iter)); + }); + futures.add(future); + } + + for (Future future : futures) { + future.get(); + } + executorService.shutdown(); + + + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + Mockito.doReturn(store).when(fs).getAbfsStore(); + AbfsClient client = Mockito.spy(store.getClient()); + store.setClient(client); + + final TracingHeaderValidator tracingHeaderValidator + = new TracingHeaderValidator( + fs.getAbfsStore().getAbfsConfiguration().getClientCorrelationId(), + fs.getFileSystemId(), FSOperationType.DELETE, true, 0); + fs.registerListener(tracingHeaderValidator); + + Mockito.doAnswer(answer -> { + Mockito.doAnswer(deleteAnswer -> { + if (dirPathStr.equalsIgnoreCase( + ((Path) deleteAnswer.getArgument(0)).toUri().getPath())) { + tracingHeaderValidator.setOperatedBlobCount(11); + Object result = deleteAnswer.callRealMethod(); + tracingHeaderValidator.setOperatedBlobCount(null); + return result; + } + return deleteAnswer.callRealMethod(); + }) + .when(client) + .deleteBlobPath(Mockito.any(Path.class), + Mockito.nullable(String.class), + Mockito.any(TracingContext.class)); + + return answer.callRealMethod(); + }) + .when(store) + .delete(Mockito.any(Path.class), Mockito.anyBoolean(), + Mockito.any(TracingContext.class)); + + fs.delete(new Path(dirPathStr), true); + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java index e5b1683971a9b..a13b68e61c898 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Random; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -40,15 +41,14 @@ import org.assertj.core.api.Assertions; import org.junit.Assert; import org.junit.Assume; -import org.junit.Ignore; import org.junit.Test; -import org.mockito.Mock; import org.mockito.Mockito; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; @@ -59,11 +59,12 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOpTestUtil; import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation; import org.apache.hadoop.fs.azurebfs.services.AbfsLease; -import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperationTestUtil; +import org.apache.hadoop.fs.azurebfs.services.RenameAtomicityUtils; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import org.apache.hadoop.fs.azurebfs.services.PrefixMode; +import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_LEASE_CREATE_NON_RECURSIVE; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_REDIRECT_RENAME; @@ -1972,4 +1973,255 @@ public void testBlobAtomicRenameSrcAndDstAreNotLeftLeased() throws Exception { os.write(bytes); } } + + /** + * Test to assert that the CID in src marker blob copy and delete contains the + * total number of blobs operated in the rename directory. + * Also, to assert that all operations in the rename-directory flow have same + * primaryId and opType. + */ + @Test + public void testRenameSrcDirDeleteEmitDeletionCountInClientRequestId() + throws Exception { + AzureBlobFileSystem fs = Mockito.spy(getFileSystem()); + Assume.assumeTrue(getPrefixMode(fs) == PrefixMode.BLOB); + String dirPathStr = "/testDir/dir1"; + fs.mkdirs(new Path(dirPathStr)); + ExecutorService executorService = Executors.newFixedThreadPool(5); + List futures = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + final int iter = i; + Future future = executorService.submit(() -> { + return fs.create(new Path("/testDir/dir1/file" + iter)); + }); + futures.add(future); + } + + for (Future future : futures) { + future.get(); + } + executorService.shutdown(); + + + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + Mockito.doReturn(store).when(fs).getAbfsStore(); + AbfsClient client = Mockito.spy(store.getClient()); + store.setClient(client); + + final TracingHeaderValidator tracingHeaderValidator + = new TracingHeaderValidator( + fs.getAbfsStore().getAbfsConfiguration().getClientCorrelationId(), + fs.getFileSystemId(), FSOperationType.RENAME, true, 0); + fs.registerListener(tracingHeaderValidator); + + Mockito.doAnswer(answer -> { + Mockito.doAnswer(copyAnswer -> { + if (dirPathStr.equalsIgnoreCase( + ((Path) copyAnswer.getArgument(0)).toUri().getPath())) { + tracingHeaderValidator.setOperatedBlobCount(11); + return copyAnswer.callRealMethod(); + } + return copyAnswer.callRealMethod(); + }) + .when(client) + .copyBlob(Mockito.any(Path.class), Mockito.any(Path.class), + Mockito.nullable(String.class), + Mockito.any(TracingContext.class)); + + Mockito.doAnswer(deleteAnswer -> { + if (dirPathStr.equalsIgnoreCase( + ((Path) deleteAnswer.getArgument(0)).toUri().getPath())) { + Object result = deleteAnswer.callRealMethod(); + tracingHeaderValidator.setOperatedBlobCount(null); + return result; + } + return deleteAnswer.callRealMethod(); + }) + .when(client) + .deleteBlobPath(Mockito.any(Path.class), + Mockito.nullable(String.class), + Mockito.any(TracingContext.class)); + + return answer.callRealMethod(); + }) + .when(store) + .rename(Mockito.any(Path.class), Mockito.any(Path.class), + Mockito.any( + RenameAtomicityUtils.class), Mockito.any(TracingContext.class)); + + fs.rename(new Path(dirPathStr), new Path("/dst/")); + } + + /** + * Test to assert that the rename resume from FileStatus uses the same + * {@link TracingContext} object used by the initial ListStatus call. + * Also assert that last rename's copy and delete API call would push count + * of blobs operated in resume flow in clientRequestId. + */ + @Test + public void testBlobRenameResumeWithListStatus() throws Exception { + assumeNonHnsAccountBlobEndpoint(getFileSystem()); + AzureBlobFileSystem fs = Mockito.spy(getFileSystem()); + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + AbfsClient client = Mockito.spy(fs.getAbfsClient()); + Mockito.doReturn(store).when(fs).getAbfsStore(); + store.setClient(client); + + renameFailureSetup(fs, client); + AtomicInteger copied = assertTracingContextOnRenameResumeProcess(fs, store, + client, FSOperationType.LISTSTATUS); + + fs.listStatus(new Path("/hbase")); + fs.registerListener(null); + Assertions.assertThat(fs.exists(new Path("/hbase/testDir2"))).isTrue(); + Assertions.assertThat(copied.get()).isGreaterThan(0); + } + + /** + * Test to assert that the rename resume from FileStatus uses the same + * {@link TracingContext} object used by the initial GetFileStatus call. + * Also assert that last rename's copy and delete API call would push count + * of blobs operated in resume flow in clientRequestId. + */ + @Test + public void testBlobRenameResumeWithGetFileStatus() throws Exception { + assumeNonHnsAccountBlobEndpoint(getFileSystem()); + AzureBlobFileSystem fs = Mockito.spy(getFileSystem()); + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + AbfsClient client = Mockito.spy(fs.getAbfsClient()); + Mockito.doReturn(store).when(fs).getAbfsStore(); + store.setClient(client); + + renameFailureSetup(fs, client); + AtomicInteger copied = assertTracingContextOnRenameResumeProcess(fs, store, + client, FSOperationType.GET_FILESTATUS); + + intercept(FileNotFoundException.class, () -> { + fs.getFileStatus(new Path("/hbase/testDir")); + }); + Assertions.assertThat(fs.exists(new Path("/hbase/testDir2"))).isTrue(); + Assertions.assertThat(copied.get()).isGreaterThan(0); + } + + private void renameFailureSetup(final AzureBlobFileSystem fs, + final AbfsClient client) + throws Exception { + fs.mkdirs(new Path("/hbase/testDir")); + ExecutorService executorService = Executors.newFixedThreadPool(5); + List futures = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + final int iter = i; + futures.add(executorService.submit( + () -> fs.create(new Path("/hbase/testDir/file" + iter)))); + } + + for (Future future : futures) { + future.get(); + } + + AbfsRestOperation op = client.acquireBlobLease("/hbase/testDir/file5", -1, + Mockito.mock(TracingContext.class)); + String leaseId = op.getResult() + .getResponseHeader(HttpHeaderConfigurations.X_MS_LEASE_ID); + + Map pathLeaseIdMap = new ConcurrentHashMap<>(); + AtomicBoolean leaseCanBeTaken = new AtomicBoolean(true); + Mockito.doAnswer(answer -> { + if (!leaseCanBeTaken.get()) { + throw new RuntimeException(); + } + AbfsRestOperation abfsRestOperation + = (AbfsRestOperation) answer.callRealMethod(); + pathLeaseIdMap.put(answer.getArgument(0), + abfsRestOperation.getResult().getResponseHeader(X_MS_LEASE_ID)); + return abfsRestOperation; + }) + .when(client) + .acquireBlobLease(Mockito.anyString(), Mockito.anyInt(), + Mockito.any(TracingContext.class)); + + intercept(Exception.class, () -> { + fs.rename(new Path("/hbase/testDir"), new Path("/hbase/testDir2")); + }); + + leaseCanBeTaken.set(false); + for (Map.Entry entry : pathLeaseIdMap.entrySet()) { + try { + client.releaseBlobLease(entry.getKey(), entry.getValue(), + Mockito.mock(TracingContext.class)); + } catch (Exception e) { + + } + } + client.releaseBlobLease("/hbase/testDir/file5", leaseId, + Mockito.mock(TracingContext.class)); + leaseCanBeTaken.set(true); + } + + private AtomicInteger assertTracingContextOnRenameResumeProcess(final AzureBlobFileSystem fs, + final AzureBlobFileSystemStore store, + final AbfsClient client, final FSOperationType fsOperationType) + throws IOException { + final TracingHeaderValidator tracingHeaderValidator + = new TracingHeaderValidator( + fs.getAbfsStore().getAbfsConfiguration().getClientCorrelationId(), + fs.getFileSystemId(), fsOperationType, true, 0); + fs.registerListener(tracingHeaderValidator); + + AtomicInteger copied = new AtomicInteger(0); + Mockito.doAnswer(answer -> { + copied.incrementAndGet(); + Path path = answer.getArgument(0); + if ("/hbase/testDir".equalsIgnoreCase(path.toUri().getPath())) { + tracingHeaderValidator.setOperatedBlobCount(copied.get()); + } + return answer.callRealMethod(); + }) + .when(store) + .copyBlob(Mockito.any(Path.class), Mockito.any(Path.class), + Mockito.nullable(String.class), Mockito.any(TracingContext.class)); + + /* + * RenameAtomicUtil internally calls Filesystem's API of read and delete + * which would have different primaryIds. But once renameAtomicUtil has read + * the RenamePending JSON, all the operations will use the same tracingContext + * which was started by ListStatus or GetFileStatus operation. + * This is the reason why the validation is disabled until the RenameAtomicUtil + * object reads the JSON. + * The filesystem's delete API called in RenameAtomicUtils.cleanup create a + * new TracingContext object with a new primaryRequestId and also updates the + * new primaryRequestId in the listener object of FileSystem. Therefore, once, + * cleanup method is completed, the listener is explicitly updated with the + * primaryRequestId it was using before the RenameAtomicUtils object was created. + */ + Mockito.doAnswer(answer -> { + final String primaryRequestId = ((TracingContext) answer.getArgument( + 1)).getPrimaryRequestId(); + tracingHeaderValidator.setDisableValidation(true); + RenameAtomicityUtils renameAtomicityUtils = Mockito.spy( + (RenameAtomicityUtils) answer.callRealMethod()); + Mockito.doAnswer(cleanupAnswer -> { + tracingHeaderValidator.setDisableValidation(true); + cleanupAnswer.callRealMethod(); + tracingHeaderValidator.setDisableValidation(false); + tracingHeaderValidator.updatePrimaryRequestID(primaryRequestId); + return null; + }).when(renameAtomicityUtils).cleanup(Mockito.any(Path.class)); + tracingHeaderValidator.setDisableValidation(false); + return renameAtomicityUtils; + }) + .when(fs) + .getRenameAtomicityUtilsForRedo(Mockito.any(Path.class), + Mockito.any(TracingContext.class)); + + Mockito.doAnswer(answer -> { + answer.callRealMethod(); + tracingHeaderValidator.setOperatedBlobCount(null); + return null; + }) + .when(client) + .deleteBlobPath(Mockito.any(Path.class), Mockito.nullable(String.class), + Mockito.any(TracingContext.class)); + return copied; + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderValidator.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderValidator.java index 27a84e4978ad2..37fb2db1de0cc 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderValidator.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderValidator.java @@ -38,10 +38,16 @@ public class TracingHeaderValidator implements Listener { private TracingHeaderFormat format; private static final String GUID_PATTERN = "^[0-9a-fA-F]{8}-([0-9a-fA-F]{4}-){3}[0-9a-fA-F]{12}$"; + private Integer operatedBlobCount = null; + + private Boolean disableValidation = false; @Override public void callTracingHeaderValidator(String tracingContextHeader, TracingHeaderFormat format) { + if (disableValidation) { + return; + } this.format = format; validateTracingHeader(tracingContextHeader); } @@ -52,6 +58,9 @@ public TracingHeaderValidator getClone() { clientCorrelationId, fileSystemId, operation, needsPrimaryRequestId, retryNum, streamID); tracingHeaderValidator.primaryRequestId = primaryRequestId; + if (disableValidation) { + tracingHeaderValidator.setDisableValidation(true); + } return tracingHeaderValidator; } @@ -78,6 +87,13 @@ private void validateTracingHeader(String tracingContextHeader) { if (format != TracingHeaderFormat.ALL_ID_FORMAT) { return; } + if (idList.length >= 9) { + if (operatedBlobCount != null) { + Assertions.assertThat(Integer.parseInt(idList[8])) + .describedAs("OperatedBlobCount is incorrect") + .isEqualTo(operatedBlobCount); + } + } if (!primaryRequestId.isEmpty() && !idList[3].isEmpty()) { Assertions.assertThat(idList[3]) .describedAs("PrimaryReqID should be common for these requests") @@ -93,7 +109,8 @@ private void validateTracingHeader(String tracingContextHeader) { private void validateBasicFormat(String[] idList) { if (format == TracingHeaderFormat.ALL_ID_FORMAT) { Assertions.assertThat(idList) - .describedAs("header should have 8 elements").hasSize(8); + .describedAs("header should have 8 or 9 elements") + .hasSizeBetween(8, 9); } else if (format == TracingHeaderFormat.TWO_ID_FORMAT) { Assertions.assertThat(idList) .describedAs("header should have 2 elements").hasSize(2); @@ -152,4 +169,12 @@ public void setOperation(FSOperationType operation) { public void updatePrimaryRequestID(String primaryRequestId) { this.primaryRequestId = primaryRequestId; } + + public void setOperatedBlobCount(Integer operatedBlobCount) { + this.operatedBlobCount = operatedBlobCount; + } + + public void setDisableValidation(Boolean disableValidation) { + this.disableValidation = disableValidation; + } }