Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -773,8 +773,13 @@ public void renameInternal(final Path src, final Path dst,
if (dstStatus.isDirectory()) {
RemoteIterator<FileStatus> list = listStatusIterator(dst);
if (list != null && list.hasNext()) {
FileStatus child = list.next();
LOG.debug("Rename {}, {} failing as destination has"
+ " at least one child: {}",
src, dst, child);
throw new IOException(
"Rename cannot overwrite non empty destination directory " + dst);
"Rename cannot overwrite non empty destination directory " + dst
+ " containing child: " + child.getPath());
}
}
delete(dst, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1306,14 +1306,33 @@ protected void createFile(Path path) throws IOException {

protected void rename(Path src, Path dst, boolean srcExists,
boolean dstExists, Rename... options) throws IOException {
IOException ioe = null;
try {
fc.rename(src, dst, options);
} finally {
} catch (IOException ex) {
// lets not swallow this completely.
LOG.warn("Rename result: " + ex, ex);
ioe = ex;
}

// There's a bit of extra work in these assertions, as if they fail
// any IOE caught earlier is added as the cause. This
// gets the likely root cause to the test report
try {
LOG.debug("Probing source and destination");
Assert.assertEquals("Source exists", srcExists, exists(fc, src));
Assert.assertEquals("Destination exists", dstExists, exists(fc, dst));
} catch (AssertionError e) {
if (ioe != null && e.getCause() == null) {
e.initCause(ioe);
}
throw e;
}
if (ioe != null) {
throw ioe;
}
}

private boolean containsPath(Path path, FileStatus[] filteredPaths)
throws IOException {
for(int i = 0; i < filteredPaths.length; i ++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1574,7 +1574,7 @@ public void deleteObjectAtPath(final Path path,

@Override
@Retries.RetryTranslated
public RemoteIterator<S3ALocatedFileStatus> listFilesAndEmptyDirectories(
public RemoteIterator<S3ALocatedFileStatus> listFilesAndDirectoryMarkers(
final Path path,
final S3AFileStatus status,
final boolean collectTombstones,
Expand Down Expand Up @@ -2079,6 +2079,7 @@ protected void deleteObject(String key)
DELETE_CONSIDERED_IDEMPOTENT,
()-> {
incrementStatistic(OBJECT_DELETE_REQUESTS);
incrementStatistic(OBJECT_DELETE_OBJECTS);
s3.deleteObject(bucket, key);
return null;
});
Expand Down Expand Up @@ -2125,9 +2126,14 @@ private void blockRootDelete(String key) throws InvalidRequestException {
}

/**
* Perform a bulk object delete operation.
* Perform a bulk object delete operation against S3; leaves S3Guard
* alone.
* Increments the {@code OBJECT_DELETE_REQUESTS} and write
* operation statistics.
* operation statistics
* <p></p>
* {@code OBJECT_DELETE_OBJECTS} is updated with the actual number
* of objects deleted in the request.
* <p></p>
* Retry policy: retry untranslated; delete considered idempotent.
* If the request is throttled, this is logged in the throttle statistics,
* with the counter set to the number of keys, rather than the number
Expand All @@ -2148,9 +2154,10 @@ private DeleteObjectsResult deleteObjects(DeleteObjectsRequest deleteRequest)
incrementWriteOperations();
BulkDeleteRetryHandler retryHandler =
new BulkDeleteRetryHandler(createStoreContext());
int keyCount = deleteRequest.getKeys().size();
try(DurationInfo ignored =
new DurationInfo(LOG, false, "DELETE %d keys",
deleteRequest.getKeys().size())) {
keyCount)) {
return invoker.retryUntranslated("delete",
DELETE_CONSIDERED_IDEMPOTENT,
(text, e, r, i) -> {
Expand All @@ -2159,6 +2166,7 @@ private DeleteObjectsResult deleteObjects(DeleteObjectsRequest deleteRequest)
},
() -> {
incrementStatistic(OBJECT_DELETE_REQUESTS, 1);
incrementStatistic(OBJECT_DELETE_OBJECTS, keyCount);
return s3.deleteObjects(deleteRequest);
});
} catch (MultiObjectDeleteException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ public class S3AInstrumentation implements Closeable, MetricsSource {
INVOCATION_RENAME,
OBJECT_COPY_REQUESTS,
OBJECT_DELETE_REQUESTS,
OBJECT_DELETE_OBJECTS,
OBJECT_LIST_REQUESTS,
OBJECT_CONTINUE_LIST_REQUESTS,
OBJECT_METADATA_REQUESTS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ public enum Statistic {
"Calls of rename()"),
OBJECT_COPY_REQUESTS("object_copy_requests", "Object copy requests"),
OBJECT_DELETE_REQUESTS("object_delete_requests", "Object delete requests"),
OBJECT_DELETE_OBJECTS("object_delete_objects",
"Objects deleted in delete requests"),
OBJECT_LIST_REQUESTS("object_list_requests",
"Number of object listings made"),
OBJECT_CONTINUE_LIST_REQUESTS("object_continue_list_requests",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.DeleteObjectsResult;
Expand Down Expand Up @@ -152,10 +153,13 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
/**
* List of keys built up for the next delete batch.
*/
private List<DeleteObjectsRequest.KeyVersion> keys;
private List<DeleteEntry> keys;

/**
* List of paths built up for deletion.
* List of paths built up for incremental deletion on tree delete.
* At the end of the entire delete the full tree is scanned in S3Guard
* and tombstones added. For this reason this list of paths <i>must not</i>
* include directory markers, as that will break the scan.
*/
private List<Path> paths;

Expand Down Expand Up @@ -323,7 +327,7 @@ protected void deleteDirectoryTree(final Path path,
// list files including any under tombstones through S3Guard
LOG.debug("Getting objects for directory prefix {} to delete", dirKey);
final RemoteIterator<S3ALocatedFileStatus> locatedFiles =
callbacks.listFilesAndEmptyDirectories(path, status,
callbacks.listFilesAndDirectoryMarkers(path, status,
false, true);

// iterate through and delete. The next() call will block when a new S3
Expand Down Expand Up @@ -359,7 +363,10 @@ protected void deleteDirectoryTree(final Path path,
while (objects.hasNext()) {
// get the next entry in the listing.
extraFilesDeleted++;
queueForDeletion(deletionKey(objects.next()), null);
S3AFileStatus next = objects.next();
LOG.debug("Found Unlisted entry {}", next);
queueForDeletion(deletionKey(next), null,
next.isDirectory());
}
if (extraFilesDeleted > 0) {
LOG.debug("Raw S3 Scan found {} extra file(s) to delete",
Expand Down Expand Up @@ -402,7 +409,7 @@ private String deletionKey(final S3AFileStatus stat) {
*/
private void queueForDeletion(
final S3AFileStatus stat) throws IOException {
queueForDeletion(deletionKey(stat), stat.getPath());
queueForDeletion(deletionKey(stat), stat.getPath(), stat.isDirectory());
}

/**
Expand All @@ -413,14 +420,18 @@ private void queueForDeletion(
*
* @param key key to delete
* @param deletePath nullable path of the key
* @param isDirMarker is the entry a directory?
* @throws IOException failure of the previous batch of deletions.
*/
private void queueForDeletion(final String key,
@Nullable final Path deletePath) throws IOException {
@Nullable final Path deletePath,
boolean isDirMarker) throws IOException {
LOG.debug("Adding object to delete: \"{}\"", key);
keys.add(new DeleteObjectsRequest.KeyVersion(key));
keys.add(new DeleteEntry(key, isDirMarker));
if (deletePath != null) {
paths.add(deletePath);
if (!isDirMarker) {
paths.add(deletePath);
}
}

if (keys.size() == pageSize) {
Expand Down Expand Up @@ -484,7 +495,7 @@ private void deleteObjectAtPath(
* @return the submitted future or null
*/
private CompletableFuture<Void> submitDelete(
final List<DeleteObjectsRequest.KeyVersion> keyList,
final List<DeleteEntry> keyList,
final List<Path> pathList) {

if (keyList.isEmpty() && pathList.isEmpty()) {
Expand Down Expand Up @@ -514,31 +525,59 @@ private CompletableFuture<Void> submitDelete(
@Retries.RetryTranslated
private void asyncDeleteAction(
final BulkOperationState state,
final List<DeleteObjectsRequest.KeyVersion> keyList,
final List<DeleteEntry> keyList,
final List<Path> pathList,
final boolean auditDeletedKeys)
throws IOException {
List<DeleteObjectsResult.DeletedObject> deletedObjects = new ArrayList<>();
try (DurationInfo ignored =
new DurationInfo(LOG, false, "Delete page of keys")) {
DeleteObjectsResult result = null;
List<Path> undeletedObjects = new ArrayList<>();
if (!keyList.isEmpty()) {
result = Invoker.once("Remove S3 Keys",
// first delete the files.
List<DeleteObjectsRequest.KeyVersion> files = keyList.stream()
.filter(e -> !e.isDirMarker)
.map(e -> e.keyVersion)
.collect(Collectors.toList());
result = Invoker.once("Remove S3 Files",
status.getPath().toString(),
() -> callbacks.removeKeys(
keyList,
files,
false,
undeletedObjects,
state,
!auditDeletedKeys));
if (result != null) {
deletedObjects.addAll(result.getDeletedObjects());
}
// now the dirs
List<DeleteObjectsRequest.KeyVersion> dirs = keyList.stream()
.filter(e -> e.isDirMarker)
.map(e -> e.keyVersion)
.collect(Collectors.toList());
// This is invoked with deleteFakeDir = true, so
// S3Guard is not updated.
result = Invoker.once("Remove S3 Dir Markers",
status.getPath().toString(),
() -> callbacks.removeKeys(
dirs,
true,
undeletedObjects,
state,
!auditDeletedKeys));
if (result != null) {
deletedObjects.addAll(result.getDeletedObjects());
}
}
if (!pathList.isEmpty()) {
// delete file paths only. This stops tombstones
// being added until the final directory cleanup
// (HADOOP-17244)
metadataStore.deletePaths(pathList, state);
}
if (auditDeletedKeys && result != null) {
if (auditDeletedKeys) {
// audit the deleted keys
List<DeleteObjectsResult.DeletedObject> deletedObjects =
result.getDeletedObjects();
if (deletedObjects.size() != keyList.size()) {
// size mismatch
LOG.warn("Size mismatch in deletion operation. "
Expand All @@ -549,13 +588,39 @@ private void asyncDeleteAction(
for (DeleteObjectsResult.DeletedObject del : deletedObjects) {
keyList.removeIf(kv -> kv.getKey().equals(del.getKey()));
}
for (DeleteObjectsRequest.KeyVersion kv : keyList) {
for (DeleteEntry kv : keyList) {
LOG.debug("{}", kv.getKey());
}
}
}
}
}

/**
* Deletion entry; dir marker state is tracked to control S3Guard
* update policy.
*/
private static final class DeleteEntry {
private final DeleteObjectsRequest.KeyVersion keyVersion;

private final boolean isDirMarker;

private DeleteEntry(final String key, final boolean isDirMarker) {
this.keyVersion = new DeleteObjectsRequest.KeyVersion(key);
this.isDirMarker = isDirMarker;
}

public String getKey() {
return keyVersion.getKey();
}

@Override
public String toString() {
return "DeleteEntry{" +
"key='" + getKey() + '\'' +
", isDirMarker=" + isDirMarker +
'}';
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ void deleteObjectAtPath(Path path,
throws IOException;

/**
* Recursive list of files and empty directories.
* Recursive list of files and directory markers.
*
* @param path path to list from
* @param status optional status of path to list.
Expand All @@ -115,7 +115,7 @@ void deleteObjectAtPath(Path path,
* @throws IOException failure
*/
@Retries.RetryTranslated
RemoteIterator<S3ALocatedFileStatus> listFilesAndEmptyDirectories(
RemoteIterator<S3ALocatedFileStatus> listFilesAndDirectoryMarkers(
Path path,
S3AFileStatus status,
boolean collectTombstones,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ protected void recursiveDirectoryRename() throws IOException {
destStatus.getPath());
// Although the dir marker policy doesn't always need to do this,
// it's simplest just to be consistent here.
// note: updates the metastore as well a S3.
callbacks.deleteObjectAtPath(destStatus.getPath(), dstKey, false, null);
}

Expand All @@ -411,7 +412,7 @@ protected void recursiveDirectoryRename() throws IOException {
false);

final RemoteIterator<S3ALocatedFileStatus> iterator =
callbacks.listFilesAndEmptyDirectories(parentPath,
callbacks.listFilesAndDirectoryMarkers(parentPath,
sourceStatus,
true,
true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,7 @@ public DDBPathMetadata get(Path path) throws IOException {
public DDBPathMetadata get(Path path, boolean wantEmptyDirectoryFlag)
throws IOException {
checkPath(path);
LOG.debug("Get from table {} in region {}: {}. wantEmptyDirectory={}",
LOG.debug("Get from table {} in region {}: {} ; wantEmptyDirectory={}",
tableName, region, path, wantEmptyDirectoryFlag);
DDBPathMetadata result = innerGet(path, wantEmptyDirectoryFlag);
LOG.debug("result of get {} is: {}", path, result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.assertj.core.api.Assertions;
import org.junit.Test;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
import org.apache.hadoop.fs.s3a.impl.StoreContext;
Expand Down Expand Up @@ -286,4 +287,27 @@ public int read() {
s3.putObject(putObjectRequest);
}

@Test
public void testDirMarkerDelete() throws Throwable {
S3AFileSystem fs = getFileSystem();
assumeFilesystemHasMetadatastore(getFileSystem());
Path baseDir = methodPath();
Path subFile = new Path(baseDir, "subdir/file.txt");
// adds the s3guard entry
fs.mkdirs(baseDir);
touch(fs, subFile);
// PUT a marker
createEmptyObject(fs, fs.pathToKey(baseDir) + "/");
fs.delete(baseDir, true);
assertPathDoesNotExist("Should have been deleted", baseDir);

// now create the dir again
fs.mkdirs(baseDir);
FileStatus fileStatus = fs.getFileStatus(baseDir);
Assertions.assertThat(fileStatus)
.matches(FileStatus::isDirectory, "Not a directory");
Assertions.assertThat(fs.listStatus(baseDir))
.describedAs("listing of %s", baseDir)
.isEmpty();
}
}
Loading