Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
221b21f
testIfTracingContextPrimaryIdIsSameInAllTheStepsOfBlobRename
saxenapranav Jun 16, 2023
1ce1479
nit refactor
saxenapranav Jun 16, 2023
f581bec
Merge branch 'ABFS_3.3.2_dev_rename_improvements' into ABFS_3.3.2_dev…
saxenapranav Jun 19, 2023
e16000f
tracingContext to have new field for total renamed blobs in src-blob …
saxenapranav Jun 19, 2023
816c34b
Merge branch 'ABFS_3.3.2_dev_rename_improvements' into ABFS_3.3.2_dev…
saxenapranav Jun 20, 2023
c4533fe
Merge branch 'ABFS_3.3.2_dev' into ABFS_3.3.2_dev_rename_tc
saxenapranav Jun 20, 2023
742e357
Merge branch 'ABFS_3.3.2_dev_delete' into ABFS_3.3.2_dev_rename_tc
saxenapranav Jun 20, 2023
997127c
Merge branch 'ABFS_3.3.2_dev_rename_improvements' into ABFS_3.3.2_dev…
saxenapranav Jun 21, 2023
740e312
added for deletion
saxenapranav Jun 21, 2023
7df7df9
Merge branch 'ABFS_3.3.2_dev_delete' into ABFS_3.3.2_dev_rename_tc
saxenapranav Jun 26, 2023
673b0f9
test wip
saxenapranav Jun 26, 2023
a8783f2
Merge branch 'ABFS_3.3.2_dev_delete' into ABFS_3.3.2_dev_rename_tc
saxenapranav Jun 27, 2023
70be95e
testDeleteEmitDeletionCountInClientRequestId
saxenapranav Jun 27, 2023
641a84b
testRenameSrcDirDeleteEmitDeletionCountInClientRequestId
saxenapranav Jun 27, 2023
3439b1b
Merge branch 'ABFS_3.3.2_dev_delete' into ABFS_3.3.2_dev_rename_tc
saxenapranav Jun 27, 2023
609d65d
refactor argument number and added assertion in testIfTracingContextP…
saxenapranav Jun 27, 2023
4f3e47c
testIfTracingContextPrimaryIdIsSameInAllTheStepsOfBlobDelete
saxenapranav Jun 27, 2023
643c10f
testBlobRenameResumeWithListStatus
saxenapranav Jun 27, 2023
2e9f1ab
testBlobRenameResumeWithGetFileStatus
saxenapranav Jun 27, 2023
1607a11
require primary key in getFileStatus
saxenapranav Jun 27, 2023
1d4b6ba
release all lease after rename failure
saxenapranav Jun 28, 2023
460fd86
concurrency in lease release
saxenapranav Jun 28, 2023
c35c714
remove non-required imports
saxenapranav Jun 28, 2023
8bde5da
added javadocs on the added tests.
saxenapranav Jun 28, 2023
024b451
Refactor to bring out common functionality into methods; aggregate si…
saxenapranav Jun 29, 2023
a943c31
Abfs 3.3.2 dev tc review (#16)
saxenapranav Jun 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -871,7 +871,7 @@ public boolean delete(final Path f, final boolean recursive) throws IOException
statIncrement(CALL_DELETE);
Path qualifiedPath = makeQualified(f);
TracingContext tracingContext = new TracingContext(clientCorrelationId,
fileSystemId, FSOperationType.DELETE, tracingHeaderFormat,
fileSystemId, FSOperationType.DELETE, true, tracingHeaderFormat,
listener);

if (shouldRedirect(FSOperationType.DELETE, tracingContext)) {
Expand Down Expand Up @@ -926,18 +926,17 @@ public FileStatus[] listStatus(final Path f) throws IOException {
TracingContext tracingContext = new TracingContext(clientCorrelationId,
fileSystemId, FSOperationType.LISTSTATUS, true, tracingHeaderFormat,
listener);
FileStatus[] result = abfsStore.listStatus(qualifiedPath, tracingContext);
FileStatus[] result = getAbfsStore().listStatus(qualifiedPath, tracingContext);
if (getAbfsStore().getAbfsConfiguration().getPrefixMode()
== PrefixMode.BLOB) {
FileStatus renamePendingFileStatus
= abfsStore.getRenamePendingFileStatus(result);
= getAbfsStore().getRenamePendingFileStatus(result);
if (renamePendingFileStatus != null) {
RenameAtomicityUtils renameAtomicityUtils =
new RenameAtomicityUtils(this,
renamePendingFileStatus.getPath(),
abfsStore.getRedoRenameInvocation(tracingContext));
getRenameAtomicityUtilsForRedo(renamePendingFileStatus.getPath(),
tracingContext);
renameAtomicityUtils.cleanup(renamePendingFileStatus.getPath());
result = abfsStore.listStatus(qualifiedPath, tracingContext);
result = getAbfsStore().listStatus(qualifiedPath, tracingContext);
}
}
return result;
Expand All @@ -947,6 +946,13 @@ public FileStatus[] listStatus(final Path f) throws IOException {
}
}

RenameAtomicityUtils getRenameAtomicityUtilsForRedo(final Path renamePendingFileStatus,
final TracingContext tracingContext) throws IOException {
return new RenameAtomicityUtils(this,
renamePendingFileStatus,
getAbfsStore().getRedoRenameInvocation(tracingContext));
}

/**
* Increment of an Abfs statistic.
*
Expand Down Expand Up @@ -1048,7 +1054,7 @@ public synchronized void close() throws IOException {
@Override
public FileStatus getFileStatus(final Path f) throws IOException {
TracingContext tracingContext = new TracingContext(clientCorrelationId,
fileSystemId, FSOperationType.GET_FILESTATUS, tracingHeaderFormat,
fileSystemId, FSOperationType.GET_FILESTATUS, true, tracingHeaderFormat,
listener);
return getFileStatus(f, tracingContext);
}
Expand All @@ -1070,17 +1076,16 @@ private FileStatus getFileStatus(final Path path,
* Get File Status over Blob Endpoint will Have an additional call
* to check if directory is implicit.
*/
fileStatus = abfsStore.getFileStatus(qualifiedPath,
fileStatus = getAbfsStore().getFileStatus(qualifiedPath,
tracingContext, useBlobEndpoint);
if (getAbfsStore().getPrefixMode() == PrefixMode.BLOB
&& fileStatus != null && fileStatus.isDirectory()
&& abfsStore.isAtomicRenameKey(fileStatus.getPath().toUri().getPath())
&& abfsStore.getRenamePendingFileStatusInDirectory(fileStatus,
&& getAbfsStore().isAtomicRenameKey(fileStatus.getPath().toUri().getPath())
&& getAbfsStore().getRenamePendingFileStatusInDirectory(fileStatus,
tracingContext)) {
RenameAtomicityUtils renameAtomicityUtils = new RenameAtomicityUtils(
this,
RenameAtomicityUtils renameAtomicityUtils = getRenameAtomicityUtilsForRedo(
new Path(fileStatus.getPath().toUri().getPath() + SUFFIX),
abfsStore.getRedoRenameInvocation(tracingContext));
tracingContext);
renameAtomicityUtils.cleanup(
new Path(fileStatus.getPath().toUri().getPath() + SUFFIX));
throw new AbfsRestOperationException(HttpURLConnection.HTTP_NOT_FOUND,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.hadoop.classification.VisibleForTesting;

Expand Down Expand Up @@ -1580,6 +1581,7 @@ private void renameBlobDir(final Path source,
final ExecutorService renameBlobExecutorService
= Executors.newFixedThreadPool(
getAbfsConfiguration().getBlobDirRenameMaxThread());
AtomicInteger renamedBlob = new AtomicInteger(0);
while(!listBlobConsumer.isCompleted()) {
blobList = listBlobConsumer.consume();
if(blobList == null) {
Expand Down Expand Up @@ -1611,6 +1613,7 @@ private void renameBlobDir(final Path source,
blobProperty.getPath(), source),
blobLease,
tracingContext);
renamedBlob.incrementAndGet();
} catch (AzureBlobFileSystemException e) {
LOG.error(String.format("rename from %s to %s for blob %s failed",
source, destination, blobProperty.getPath()), e);
Expand All @@ -1634,11 +1637,13 @@ private void renameBlobDir(final Path source,
}
renameBlobExecutorService.shutdown();

tracingContext.setOperatedBlobCount(renamedBlob.get() + 1);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are already doing an increment and get, why +1 here ?

Copy link
Copy Markdown
Collaborator Author

@saxenapranav saxenapranav Jun 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Increment is being done only for blobs in renamed in parallel

This +1 is for blob which would contain the count value in CID.

renameBlob(
source, createDestinationPathForBlobPartOfRenameSrcDir(destination,
source, source),
srcDirBlobLease,
tracingContext);
tracingContext.setOperatedBlobCount(null);
}

private Boolean isCreateOperationOnBlobEndpoint() {
Expand Down Expand Up @@ -1757,7 +1762,6 @@ private void deleteBlobPath(final Path path,
if (!path.isRoot()) {
listSrcBuilder.append(FORWARD_SLASH);
}

String listSrc = listSrcBuilder.toString();
BlobList blobList = client.getListBlobs(null, listSrc, null, null,
tracingContext).getResult().getBlobList();
Expand Down Expand Up @@ -1868,6 +1872,7 @@ private void createParentDirectory(final Path path,
private void deleteOnConsumedBlobs(final Path srcPath,
final ListBlobConsumer consumer,
final TracingContext tracingContext) throws AzureBlobFileSystemException {
AtomicInteger deletedBlobCount = new AtomicInteger(0);
String srcPathStr = srcPath.toUri().getPath();
ExecutorService deleteBlobExecutorService = Executors.newFixedThreadPool(
getAbfsConfiguration().getBlobDirDeleteMaxThread());
Expand All @@ -1885,6 +1890,7 @@ private void deleteOnConsumedBlobs(final Path srcPath,
try {
client.deleteBlobPath(blobProperty.getPath(), null,
tracingContext);
deletedBlobCount.incrementAndGet();
} catch (AzureBlobFileSystemException ex) {
if (ex instanceof AbfsRestOperationException
&& ((AbfsRestOperationException) ex).getStatusCode()
Expand Down Expand Up @@ -1912,6 +1918,8 @@ private void deleteOnConsumedBlobs(final Path srcPath,
} finally {
deleteBlobExecutorService.shutdown();
}

tracingContext.setOperatedBlobCount(deletedBlobCount.get() + 1);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here ?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Increment is being done only for blobs in renamed in parallel

This +1 is for blob which would contain the count value in CID.

if (!srcPath.isRoot()) {
try {
LOG.debug(String.format("Deleting Path %s", srcPathStr));
Expand All @@ -1926,6 +1934,7 @@ private void deleteOnConsumedBlobs(final Path srcPath,
String.format("Path %s is an implicit directory", srcPathStr));
}
}
tracingContext.setOperatedBlobCount(null);
}

public FileStatus getFileStatus(Path path, TracingContext tracingContext, boolean useBlobEndpoint) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ public class TracingContext {
public static final int MAX_CLIENT_CORRELATION_ID_LENGTH = 72;
public static final String CLIENT_CORRELATION_ID_PATTERN = "[a-zA-Z0-9-]*";

private Integer operatedBlobCount = null;

/**
* Initialize TracingContext
* @param clientCorrelationID Provided over config by client
Expand Down Expand Up @@ -187,6 +189,9 @@ public void constructHeader(AbfsHttpOperation httpOperation, String previousFail
+ getPrimaryRequestIdForHeader(retryCount > 0) + ":" + streamID
+ ":" + opType + ":" + retryCount;
header = addFailureReasons(header, previousFailure) + ":" + fallbackDFSAppend;
if (operatedBlobCount != null) {
header += (":" + operatedBlobCount);
}
break;
case TWO_ID_FORMAT:
header = clientCorrelationID + ":" + clientRequestId;
Expand Down Expand Up @@ -241,4 +246,19 @@ public String getHeader() {
return header;
}

public String getPrimaryRequestId() {
return primaryRequestId;
}

public void setOperatedBlobCount(Integer count) {
operatedBlobCount = count;
}

public Integer getOperatedBlobCount() {
return operatedBlobCount;
}

public FSOperationType getOpType() {
return opType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -414,4 +415,68 @@ public void testDeleteCheckIfParentLMTChange() throws Exception {
Long newLmt = fs.getFileStatus(new Path("/dir1")).getModificationTime();
Assertions.assertThat(lmt).isEqualTo(newLmt);
}

/**
* Test to assert that the CID in src marker delete contains the
* total number of blobs operated in the delete directory.
* Also, to assert that all operations in the delete-directory flow have same
* primaryId and opType.
*/
@Test
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add a small description for each test as in what the test is trying to achieve ?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added as javadoc in all new tests.

public void testDeleteEmitDeletionCountInClientRequestId() throws Exception {
AzureBlobFileSystem fs = Mockito.spy(getFileSystem());
Assume.assumeTrue(getPrefixMode(fs) == PrefixMode.BLOB);
String dirPathStr = "/testDir/dir1";
fs.mkdirs(new Path(dirPathStr));
ExecutorService executorService = Executors.newFixedThreadPool(5);
List<Future> futures = new ArrayList<>();
for (int i = 0; i < 10; i++) {
final int iter = i;
Future future = executorService.submit(() -> {
return fs.create(new Path("/testDir/dir1/file" + iter));
});
futures.add(future);
}

for (Future future : futures) {
future.get();
}
executorService.shutdown();


AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
Mockito.doReturn(store).when(fs).getAbfsStore();
AbfsClient client = Mockito.spy(store.getClient());
store.setClient(client);

final TracingHeaderValidator tracingHeaderValidator
= new TracingHeaderValidator(
fs.getAbfsStore().getAbfsConfiguration().getClientCorrelationId(),
fs.getFileSystemId(), FSOperationType.DELETE, true, 0);
fs.registerListener(tracingHeaderValidator);

Mockito.doAnswer(answer -> {
Mockito.doAnswer(deleteAnswer -> {
if (dirPathStr.equalsIgnoreCase(
((Path) deleteAnswer.getArgument(0)).toUri().getPath())) {
tracingHeaderValidator.setOperatedBlobCount(11);
Object result = deleteAnswer.callRealMethod();
tracingHeaderValidator.setOperatedBlobCount(null);
return result;
}
return deleteAnswer.callRealMethod();
})
.when(client)
.deleteBlobPath(Mockito.any(Path.class),
Mockito.nullable(String.class),
Mockito.any(TracingContext.class));

return answer.callRealMethod();
})
.when(store)
.delete(Mockito.any(Path.class), Mockito.anyBoolean(),
Mockito.any(TracingContext.class));

fs.delete(new Path(dirPathStr), true);
}
}
Loading