Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.amazonaws.regions.Regions.US_EAST_1;
Expand Down Expand Up @@ -305,38 +306,54 @@ public FileStatus[] listStatus(Path path)
return toArray(list, LocatedFileStatus.class);
}

@Override
public RemoteIterator<LocatedFileStatus> listFiles(Path path, boolean recursive)
{
// Either a single level or full listing, depending on the recursive flag, no "directories"
// included in either path
return new S3ObjectsV2RemoteIterator(listPrefix(path, OptionalInt.empty(), recursive ? ListingMode.RECURSIVE_FILES_ONLY : ListingMode.SHALLOW_FILES_ONLY));
}

@Override
public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path path)
{
STATS.newListLocatedStatusCall();
return new RemoteIterator<>()
return new S3ObjectsV2RemoteIterator(listPrefix(path, OptionalInt.empty(), ListingMode.SHALLOW_ALL));
}

private static final class S3ObjectsV2RemoteIterator
implements RemoteIterator<LocatedFileStatus>
{
private final Iterator<LocatedFileStatus> iterator;

public S3ObjectsV2RemoteIterator(Iterator<LocatedFileStatus> iterator)
{
private final Iterator<LocatedFileStatus> iterator = listPrefix(path);
this.iterator = requireNonNull(iterator, "iterator is null");
}

@Override
public boolean hasNext()
throws IOException
{
try {
return iterator.hasNext();
}
catch (AmazonClientException e) {
throw new IOException(e);
}
@Override
public boolean hasNext()
throws IOException
{
try {
return iterator.hasNext();
}
catch (AmazonClientException e) {
throw new IOException(e);
}
}

@Override
public LocatedFileStatus next()
throws IOException
{
try {
return iterator.next();
}
catch (AmazonClientException e) {
throw new IOException(e);
}
@Override
public LocatedFileStatus next()
throws IOException
{
try {
return iterator.next();
}
};
catch (AmazonClientException e) {
throw new IOException(e);
}
}
}

@Override
Expand All @@ -355,7 +372,7 @@ public FileStatus getFileStatus(Path path)

if (metadata == null) {
// check if this path is a directory
Iterator<LocatedFileStatus> iterator = listPrefix(path);
Iterator<LocatedFileStatus> iterator = listPrefix(path, OptionalInt.of(1), ListingMode.SHALLOW_ALL);
if (iterator.hasNext()) {
return new FileStatus(0, true, 1, 0, 0, qualifiedPath(path));
}
Expand Down Expand Up @@ -517,7 +534,22 @@ public boolean mkdirs(Path f, FsPermission permission)
return true;
}

private Iterator<LocatedFileStatus> listPrefix(Path path)
/**
* Enum representing the valid listing modes. This could be two booleans (recursive, filesOnly) except
* that (recursive=true, filesOnly=false) can't be translated directly to a natively supported behavior
*/
private enum ListingMode {
SHALLOW_ALL,
SHALLOW_FILES_ONLY,
RECURSIVE_FILES_ONLY;

public boolean isFilesOnly()
{
return (this == SHALLOW_FILES_ONLY || this == RECURSIVE_FILES_ONLY);
}
}

private Iterator<LocatedFileStatus> listPrefix(Path path, OptionalInt initialMaxKeys, ListingMode mode)
{
String key = keyFromPath(path);
if (!key.isEmpty()) {
Expand All @@ -527,7 +559,8 @@ private Iterator<LocatedFileStatus> listPrefix(Path path)
ListObjectsV2Request request = new ListObjectsV2Request()
.withBucketName(getBucketName(uri))
.withPrefix(key)
.withDelimiter(PATH_SEPARATOR)
.withDelimiter(mode == ListingMode.RECURSIVE_FILES_ONLY ? null : PATH_SEPARATOR)
.withMaxKeys(initialMaxKeys.isPresent() ? initialMaxKeys.getAsInt() : null)
.withRequesterPays(requesterPaysEnabled);

STATS.newListObjectsCall();
Expand All @@ -539,26 +572,38 @@ protected ListObjectsV2Result computeNext(ListObjectsV2Result previous)
if (!previous.isTruncated()) {
return null;
}

request.setContinuationToken(previous.getNextContinuationToken());

// Clear any max keys after the first batch completes
request.withMaxKeys(null).setContinuationToken(previous.getNextContinuationToken());
return s3.listObjectsV2(request);
}
};

return Iterators.concat(Iterators.transform(listings, this::statusFromListing));
Iterator<LocatedFileStatus> results = Iterators.concat(Iterators.transform(listings, this::statusFromListing));
if (mode.isFilesOnly()) {
// Even recursive listing can still contain empty "directory" objects, must filter them out
results = Iterators.filter(results, LocatedFileStatus::isFile);
}
return results;
}

private Iterator<LocatedFileStatus> statusFromListing(ListObjectsV2Result listing)
{
List<String> prefixes = listing.getCommonPrefixes();
List<S3ObjectSummary> objects = listing.getObjectSummaries();
if (prefixes.isEmpty()) {
return statusFromObjects(objects);
}
if (objects.isEmpty()) {
return statusFromPrefixes(prefixes);
}
return Iterators.concat(
statusFromPrefixes(listing.getCommonPrefixes()),
statusFromObjects(listing.getObjectSummaries()));
statusFromPrefixes(prefixes),
statusFromObjects(objects));
}

private Iterator<LocatedFileStatus> statusFromPrefixes(List<String> prefixes)
{
List<LocatedFileStatus> list = new ArrayList<>();
List<LocatedFileStatus> list = new ArrayList<>(prefixes.size());
for (String prefix : prefixes) {
Path path = qualifiedPath(new Path(PATH_SEPARATOR + prefix));
FileStatus status = new FileStatus(0, true, 1, 0, 0, path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@
import com.amazonaws.services.s3.model.EncryptionMaterials;
import com.amazonaws.services.s3.model.EncryptionMaterialsProvider;
import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.s3.model.StorageClass;
import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient;
import com.google.common.base.VerifyException;
Expand All @@ -38,7 +42,9 @@
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.testng.SkipException;
import org.testng.annotations.Test;

Expand All @@ -50,6 +56,10 @@
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Map;

import static com.google.common.base.Preconditions.checkArgument;
Expand Down Expand Up @@ -83,6 +93,7 @@
import static java.nio.file.Files.createTempFile;
import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;

Expand Down Expand Up @@ -719,4 +730,71 @@ public ObjectMetadata getObjectMetadata(GetObjectMetadataRequest getObjectMetada
assertTrue(fileStatus.isDirectory());
}
}

@Test
public void testListPrefixModes()
throws Exception
{
S3ObjectSummary rootObject = new S3ObjectSummary();
rootObject.setStorageClass(StorageClass.Standard.toString());
rootObject.setKey("standard-object-at-root.txt");
rootObject.setLastModified(new Date());

S3ObjectSummary childObject = new S3ObjectSummary();
childObject.setStorageClass(StorageClass.Standard.toString());
childObject.setKey("prefix/child-object.txt");
childObject.setLastModified(new Date());

try (PrestoS3FileSystem fs = new PrestoS3FileSystem()) {
MockAmazonS3 s3 = new MockAmazonS3()
{
@Override
public ListObjectsV2Result listObjectsV2(ListObjectsV2Request listObjectsV2Request)
{
ListObjectsV2Result listing = new ListObjectsV2Result();
// Shallow listing
if ("/".equals(listObjectsV2Request.getDelimiter())) {
listing.getCommonPrefixes().add("prefix");
listing.getObjectSummaries().add(rootObject);
return listing;
}
// Recursive listing of object keys only
listing.getObjectSummaries().addAll(Arrays.asList(childObject, rootObject));
return listing;
}
};
Path rootPath = new Path("s3n://test-bucket/");
fs.initialize(rootPath.toUri(), new Configuration(false));
fs.setS3Client(s3);

List<LocatedFileStatus> shallowAll = remoteIteratorToList(fs.listLocatedStatus(rootPath));
assertEquals(shallowAll.size(), 2);
assertTrue(shallowAll.get(0).isDirectory());
assertFalse(shallowAll.get(1).isDirectory());
assertEquals(shallowAll.get(0).getPath(), new Path(rootPath, "prefix"));
assertEquals(shallowAll.get(1).getPath(), new Path(rootPath, rootObject.getKey()));

List<LocatedFileStatus> shallowFiles = remoteIteratorToList(fs.listFiles(rootPath, false));
assertEquals(shallowFiles.size(), 1);
assertFalse(shallowFiles.get(0).isDirectory());
assertEquals(shallowFiles.get(0).getPath(), new Path(rootPath, rootObject.getKey()));

List<LocatedFileStatus> recursiveFiles = remoteIteratorToList(fs.listFiles(rootPath, true));
assertEquals(recursiveFiles.size(), 2);
assertFalse(recursiveFiles.get(0).isDirectory());
assertFalse(recursiveFiles.get(1).isDirectory());
assertEquals(recursiveFiles.get(0).getPath(), new Path(rootPath, childObject.getKey()));
assertEquals(recursiveFiles.get(1).getPath(), new Path(rootPath, rootObject.getKey()));
}
}

private static List<LocatedFileStatus> remoteIteratorToList(RemoteIterator<LocatedFileStatus> statuses)
throws IOException
{
List<LocatedFileStatus> result = new ArrayList<>();
while (statuses.hasNext()) {
result.add(statuses.next());
}
return result;
}
}