Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -871,7 +871,7 @@ public boolean delete(final Path f, final boolean recursive) throws IOException
statIncrement(CALL_DELETE);
Path qualifiedPath = makeQualified(f);
TracingContext tracingContext = new TracingContext(clientCorrelationId,
fileSystemId, FSOperationType.DELETE, tracingHeaderFormat,
fileSystemId, FSOperationType.DELETE, true, tracingHeaderFormat,
listener);

if (shouldRedirect(FSOperationType.DELETE, tracingContext)) {
Expand Down Expand Up @@ -933,9 +933,8 @@ public FileStatus[] listStatus(final Path f) throws IOException {
= getAbfsStore().getRenamePendingFileStatus(result);
if (renamePendingFileStatus != null) {
RenameAtomicityUtils renameAtomicityUtils =
new RenameAtomicityUtils(this,
renamePendingFileStatus.getPath(),
getAbfsStore().getRedoRenameInvocation(tracingContext));
getRenameAtomicityUtilsForRedo(renamePendingFileStatus.getPath(),
tracingContext);
renameAtomicityUtils.cleanup(renamePendingFileStatus.getPath());
result = getAbfsStore().listStatus(qualifiedPath, tracingContext);
}
Expand All @@ -947,6 +946,13 @@ public FileStatus[] listStatus(final Path f) throws IOException {
}
}

RenameAtomicityUtils getRenameAtomicityUtilsForRedo(final Path renamePendingFileStatus,
final TracingContext tracingContext) throws IOException {
return new RenameAtomicityUtils(this,
renamePendingFileStatus,
getAbfsStore().getRedoRenameInvocation(tracingContext));
}

/**
* Increment of an Abfs statistic.
*
Expand Down Expand Up @@ -1077,10 +1083,9 @@ private FileStatus getFileStatus(final Path path,
&& getAbfsStore().isAtomicRenameKey(fileStatus.getPath().toUri().getPath())
&& getAbfsStore().getRenamePendingFileStatusInDirectory(fileStatus,
tracingContext)) {
RenameAtomicityUtils renameAtomicityUtils = new RenameAtomicityUtils(
this,
RenameAtomicityUtils renameAtomicityUtils = getRenameAtomicityUtilsForRedo(
new Path(fileStatus.getPath().toUri().getPath() + SUFFIX),
getAbfsStore().getRedoRenameInvocation(tracingContext));
tracingContext);
renameAtomicityUtils.cleanup(
new Path(fileStatus.getPath().toUri().getPath() + SUFFIX));
throw new AbfsRestOperationException(HttpURLConnection.HTTP_NOT_FOUND,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,48 +449,20 @@ public void testDeleteEmitDeletionCountInClientRequestId() throws Exception {
AbfsClient client = Mockito.spy(store.getClient());
store.setClient(client);

Mockito.doAnswer(answer -> {
final TracingContext context = answer.getArgument(2);
Mockito.doAnswer(listAnswer -> {
TracingContext listContext = listAnswer.getArgument(4);
Assert.assertEquals(listContext.getPrimaryRequestId(),
context.getPrimaryRequestId());
Assert.assertTrue(context.getOpType().equals(listContext.getOpType()));
return listAnswer.callRealMethod();
})
.when(client)
.getListBlobs(Mockito.nullable(String.class),
Mockito.nullable(String.class), Mockito.nullable(String.class),
Mockito.nullable(Integer.class),
Mockito.any(TracingContext.class));

Mockito.doAnswer(createBlobPathAnswer -> {
TracingContext createBlobPathContext = createBlobPathAnswer.getArgument(
5);
Assert.assertEquals(createBlobPathContext.getPrimaryRequestId(),
context.getPrimaryRequestId());
Assert.assertTrue(
context.getOpType().equals(createBlobPathContext.getOpType()));
return createBlobPathAnswer.callRealMethod();
})
.when(client)
.createPathBlob(Mockito.anyString(), Mockito.anyBoolean(),
Mockito.anyBoolean(), Mockito.nullable(
HashMap.class), Mockito.nullable(String.class),
Mockito.any(TracingContext.class));

final TracingHeaderValidator tracingHeaderValidator
= new TracingHeaderValidator(
fs.getAbfsStore().getAbfsConfiguration().getClientCorrelationId(),
fs.getFileSystemId(), FSOperationType.DELETE, true, 0);
fs.registerListener(tracingHeaderValidator);

Mockito.doAnswer(answer -> {
Mockito.doAnswer(deleteAnswer -> {
TracingContext deleteContext = deleteAnswer.getArgument(2);
Assert.assertEquals(deleteContext.getPrimaryRequestId(),
context.getPrimaryRequestId());
Assert.assertTrue(
context.getOpType().equals(deleteContext.getOpType()));
if (dirPathStr.equalsIgnoreCase(
((Path) deleteAnswer.getArgument(0)).toUri().getPath())) {
TracingContext tracingContext = deleteAnswer.getArgument(2);
Assertions.assertThat(tracingContext.getOperatedBlobCount())
.isEqualTo(11);
tracingHeaderValidator.setOperatedBlobCount(11);
Object result = deleteAnswer.callRealMethod();
tracingHeaderValidator.setOperatedBlobCount(null);
return result;
}
return deleteAnswer.callRealMethod();
})
Expand All @@ -505,19 +477,6 @@ public void testDeleteEmitDeletionCountInClientRequestId() throws Exception {
.delete(Mockito.any(Path.class), Mockito.anyBoolean(),
Mockito.any(TracingContext.class));

Mockito.doAnswer(answer -> {
if (dirPathStr.equalsIgnoreCase(
((Path) answer.getArgument(0)).toUri().getPath())) {
TracingContext tracingContext = answer.getArgument(2);
Assertions.assertThat(tracingContext.getOperatedBlobCount())
.isEqualTo(11);
}
return answer.callRealMethod();
})
.when(client)
.deleteBlobPath(Mockito.any(Path.class), Mockito.nullable(String.class),
Mockito.any(TracingContext.class));

fs.delete(new Path(dirPathStr), true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,14 @@
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.Mockito;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
Expand All @@ -60,12 +59,12 @@
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOpTestUtil;
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
import org.apache.hadoop.fs.azurebfs.services.AbfsLease;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperationTestUtil;
import org.apache.hadoop.fs.azurebfs.services.RenameAtomicityUtils;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.fs.azurebfs.services.PrefixMode;
import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator;

import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_LEASE_CREATE_NON_RECURSIVE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_REDIRECT_RENAME;
Expand Down Expand Up @@ -2009,26 +2008,19 @@ public void testRenameSrcDirDeleteEmitDeletionCountInClientRequestId()
AbfsClient client = Mockito.spy(store.getClient());
store.setClient(client);

Mockito.doAnswer(answer -> {
final TracingContext context = answer.getArgument(3);
Mockito.doAnswer(listAnswer -> {
TracingContext listContext = listAnswer.getArgument(4);
Assert.assertEquals(listContext.getPrimaryRequestId(),
context.getPrimaryRequestId());
Assert.assertTrue(context.getOpType().equals(listContext.getOpType()));
return listAnswer.callRealMethod();
})
.when(client)
.getListBlobs(Mockito.nullable(String.class),
Mockito.nullable(String.class), Mockito.nullable(String.class),
Mockito.nullable(Integer.class),
Mockito.any(TracingContext.class));
final TracingHeaderValidator tracingHeaderValidator
= new TracingHeaderValidator(
fs.getAbfsStore().getAbfsConfiguration().getClientCorrelationId(),
fs.getFileSystemId(), FSOperationType.RENAME, true, 0);
fs.registerListener(tracingHeaderValidator);

Mockito.doAnswer(answer -> {
Mockito.doAnswer(copyAnswer -> {
TracingContext copyContext = copyAnswer.getArgument(3);
Assert.assertEquals(copyContext.getPrimaryRequestId(),
context.getPrimaryRequestId());
Assert.assertTrue(context.getOpType().equals(copyContext.getOpType()));
if (dirPathStr.equalsIgnoreCase(
((Path) copyAnswer.getArgument(0)).toUri().getPath())) {
tracingHeaderValidator.setOperatedBlobCount(11);
return copyAnswer.callRealMethod();
}
return copyAnswer.callRealMethod();
})
.when(client)
Expand All @@ -2037,16 +2029,11 @@ public void testRenameSrcDirDeleteEmitDeletionCountInClientRequestId()
Mockito.any(TracingContext.class));

Mockito.doAnswer(deleteAnswer -> {
TracingContext deleteContext = deleteAnswer.getArgument(2);
Assert.assertEquals(deleteContext.getPrimaryRequestId(),
context.getPrimaryRequestId());
Assert.assertTrue(
context.getOpType().equals(deleteContext.getOpType()));
if (dirPathStr.equalsIgnoreCase(
((Path) deleteAnswer.getArgument(0)).toUri().getPath())) {
TracingContext tracingContext = deleteAnswer.getArgument(2);
Assertions.assertThat(tracingContext.getOperatedBlobCount())
.isEqualTo(11);
Object result = deleteAnswer.callRealMethod();
tracingHeaderValidator.setOperatedBlobCount(null);
return result;
}
return deleteAnswer.callRealMethod();
})
Expand Down Expand Up @@ -2081,34 +2068,11 @@ public void testBlobRenameResumeWithListStatus() throws Exception {
store.setClient(client);

renameFailureSetup(fs, client);

TracingContext[] tracingContextCreatedInFsListStatus
= new TracingContext[1];
Mockito.doAnswer(answer -> {
tracingContextCreatedInFsListStatus[0] = answer.getArgument(1);
return answer.callRealMethod();
})
.when(store)
.listStatus(Mockito.any(Path.class), Mockito.any(TracingContext.class));

AtomicInteger copied = new AtomicInteger(0);
Mockito.doAnswer(answer -> {
copied.incrementAndGet();
TracingContext tracingContext = answer.getArgument(3);
Assertions.assertThat(tracingContext)
.isEqualTo(tracingContextCreatedInFsListStatus[0]);
Path path = answer.getArgument(0);
if ("/hbase/testDir".equalsIgnoreCase(path.toUri().getPath())) {
Assertions.assertThat(tracingContext.getOperatedBlobCount())
.isEqualTo(copied.get());
}
return answer.callRealMethod();
})
.when(store)
.copyBlob(Mockito.any(Path.class), Mockito.any(Path.class),
Mockito.nullable(String.class), Mockito.any(TracingContext.class));
AtomicInteger copied = assertTracingContextOnRenameResumeProcess(fs, store,
client, FSOperationType.LISTSTATUS);

fs.listStatus(new Path("/hbase"));
fs.registerListener(null);
Assertions.assertThat(fs.exists(new Path("/hbase/testDir2"))).isTrue();
Assertions.assertThat(copied.get()).isGreaterThan(0);
}
Expand All @@ -2129,38 +2093,8 @@ public void testBlobRenameResumeWithGetFileStatus() throws Exception {
store.setClient(client);

renameFailureSetup(fs, client);


TracingContext[] tracingContextCreatedInFsListStatus
= new TracingContext[1];
Mockito.doAnswer(answer -> {
synchronized (this) {
if (tracingContextCreatedInFsListStatus[0] == null) {
tracingContextCreatedInFsListStatus[0] = answer.getArgument(1);
}
}
return answer.callRealMethod();
})
.when(store)
.getFileStatus(Mockito.any(Path.class),
Mockito.any(TracingContext.class), Mockito.anyBoolean());

AtomicInteger copied = new AtomicInteger(0);
Mockito.doAnswer(answer -> {
copied.incrementAndGet();
TracingContext tracingContext = answer.getArgument(3);
Assertions.assertThat(tracingContext)
.isEqualTo(tracingContextCreatedInFsListStatus[0]);
Path path = answer.getArgument(0);
if ("/hbase/testDir".equalsIgnoreCase(path.toUri().getPath())) {
Assertions.assertThat(tracingContext.getOperatedBlobCount())
.isEqualTo(copied.get());
}
return answer.callRealMethod();
})
.when(store)
.copyBlob(Mockito.any(Path.class), Mockito.any(Path.class),
Mockito.nullable(String.class), Mockito.any(TracingContext.class));
AtomicInteger copied = assertTracingContextOnRenameResumeProcess(fs, store,
client, FSOperationType.GET_FILESTATUS);

intercept(FileNotFoundException.class, () -> {
fs.getFileStatus(new Path("/hbase/testDir"));
Expand Down Expand Up @@ -2223,4 +2157,71 @@ private void renameFailureSetup(final AzureBlobFileSystem fs,
Mockito.mock(TracingContext.class));
leaseCanBeTaken.set(true);
}

private AtomicInteger assertTracingContextOnRenameResumeProcess(final AzureBlobFileSystem fs,
final AzureBlobFileSystemStore store,
final AbfsClient client, final FSOperationType fsOperationType)
throws IOException {
final TracingHeaderValidator tracingHeaderValidator
= new TracingHeaderValidator(
fs.getAbfsStore().getAbfsConfiguration().getClientCorrelationId(),
fs.getFileSystemId(), fsOperationType, true, 0);
fs.registerListener(tracingHeaderValidator);

AtomicInteger copied = new AtomicInteger(0);
Mockito.doAnswer(answer -> {
copied.incrementAndGet();
Path path = answer.getArgument(0);
if ("/hbase/testDir".equalsIgnoreCase(path.toUri().getPath())) {
tracingHeaderValidator.setOperatedBlobCount(copied.get());
}
return answer.callRealMethod();
})
.when(store)
.copyBlob(Mockito.any(Path.class), Mockito.any(Path.class),
Mockito.nullable(String.class), Mockito.any(TracingContext.class));

/*
* RenameAtomicUtil internally calls Filesystem's API of read and delete
* which would have different primaryIds. But once renameAtomicUtil has read
* the RenamePending JSON, all the operations will use the same tracingContext
* which was started by ListStatus or GetFileStatus operation.
* This is the reason why the validation is disabled until the RenameAtomicUtil
* object reads the JSON.
* The filesystem's delete API called in RenameAtomicUtils.cleanup create a
* new TracingContext object with a new primaryRequestId and also updates the
* new primaryRequestId in the listener object of FileSystem. Therefore, once,
* cleanup method is completed, the listener is explicitly updated with the
* primaryRequestId it was using before the RenameAtomicUtils object was created.
*/
Mockito.doAnswer(answer -> {
final String primaryRequestId = ((TracingContext) answer.getArgument(
1)).getPrimaryRequestId();
tracingHeaderValidator.setDisableValidation(true);
RenameAtomicityUtils renameAtomicityUtils = Mockito.spy(
(RenameAtomicityUtils) answer.callRealMethod());
Mockito.doAnswer(cleanupAnswer -> {
tracingHeaderValidator.setDisableValidation(true);
cleanupAnswer.callRealMethod();
tracingHeaderValidator.setDisableValidation(false);
tracingHeaderValidator.updatePrimaryRequestID(primaryRequestId);
return null;
}).when(renameAtomicityUtils).cleanup(Mockito.any(Path.class));
tracingHeaderValidator.setDisableValidation(false);
return renameAtomicityUtils;
})
.when(fs)
.getRenameAtomicityUtilsForRedo(Mockito.any(Path.class),
Mockito.any(TracingContext.class));

Mockito.doAnswer(answer -> {
answer.callRealMethod();
tracingHeaderValidator.setOperatedBlobCount(null);
return null;
})
.when(client)
.deleteBlobPath(Mockito.any(Path.class), Mockito.nullable(String.class),
Mockito.any(TracingContext.class));
return copied;
}
}
Loading