Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.amazonaws.regions.Regions.US_EAST_1;
Expand Down Expand Up @@ -275,38 +276,53 @@ public FileStatus[] listStatus(Path path)
return toArray(list, LocatedFileStatus.class);
}

@Override
public RemoteIterator<LocatedFileStatus> listFiles(Path path, boolean recursive)
{
// Either a single level or full listing, depending on the recursive flag, no "directories" are included
return new S3ObjectsRemoteIterator(listPrefix(path, OptionalInt.empty(), recursive ? ListingMode.RECURSIVE_FILES_ONLY : ListingMode.SHALLOW_FILES_ONLY));
}

@Override
public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path path)
{
STATS.newListLocatedStatusCall();
return new RemoteIterator<LocatedFileStatus>()
return new S3ObjectsRemoteIterator(listPrefix(path, OptionalInt.empty(), ListingMode.SHALLOW_ALL));
}

private static final class S3ObjectsRemoteIterator
implements RemoteIterator<LocatedFileStatus>
{
private final Iterator<LocatedFileStatus> iterator;

public S3ObjectsRemoteIterator(Iterator<LocatedFileStatus> iterator)
{
private final Iterator<LocatedFileStatus> iterator = listPrefix(path);
this.iterator = requireNonNull(iterator, "iterator is null");
}

@Override
public boolean hasNext()
throws IOException
{
try {
return iterator.hasNext();
}
catch (AmazonClientException e) {
throw new IOException(e);
}
@Override
public boolean hasNext()
throws IOException
{
try {
return iterator.hasNext();
}
catch (AmazonClientException e) {
throw new IOException(e);
}
}

@Override
public LocatedFileStatus next()
throws IOException
{
try {
return iterator.next();
}
catch (AmazonClientException e) {
throw new IOException(e);
}
@Override
public LocatedFileStatus next()
throws IOException
{
try {
return iterator.next();
}
};
catch (AmazonClientException e) {
throw new IOException(e);
}
}
}

@Override
Expand All @@ -325,7 +341,7 @@ public FileStatus getFileStatus(Path path)

if (metadata == null) {
// check if this path is a directory
Iterator<LocatedFileStatus> iterator = listPrefix(path);
Iterator<LocatedFileStatus> iterator = listPrefix(path, OptionalInt.of(1), ListingMode.SHALLOW_ALL);
if (iterator.hasNext()) {
return new FileStatus(0, true, 1, 0, 0, qualifiedPath(path));
}
Expand Down Expand Up @@ -480,7 +496,18 @@ public boolean mkdirs(Path f, FsPermission permission)
return true;
}

private Iterator<LocatedFileStatus> listPrefix(Path path)
private enum ListingMode {
SHALLOW_ALL, // Shallow listing of files AND directories
SHALLOW_FILES_ONLY,
RECURSIVE_FILES_ONLY;

public boolean isFilesOnly()
{
return (this == SHALLOW_FILES_ONLY || this == RECURSIVE_FILES_ONLY);
}
}

private Iterator<LocatedFileStatus> listPrefix(Path path, OptionalInt initialMaxKeys, ListingMode mode)
{
String key = keyFromPath(path);
if (!key.isEmpty()) {
Expand All @@ -490,7 +517,8 @@ private Iterator<LocatedFileStatus> listPrefix(Path path)
ListObjectsRequest request = new ListObjectsRequest()
.withBucketName(getBucketName(uri))
.withPrefix(key)
.withDelimiter(PATH_SEPARATOR);
.withDelimiter(mode == ListingMode.RECURSIVE_FILES_ONLY ? null : PATH_SEPARATOR)
.withMaxKeys(initialMaxKeys.isPresent() ? initialMaxKeys.getAsInt() : null);

STATS.newListObjectsCall();
Iterator<ObjectListing> listings = new AbstractSequentialIterator<ObjectListing>(s3.listObjects(request))
Expand All @@ -501,23 +529,39 @@ protected ObjectListing computeNext(ObjectListing previous)
if (!previous.isTruncated()) {
return null;
}
// Clear any max keys set for the initial request before submitting subsequent requests. Values < 0
// are not sent in the request and the default limit is used
previous.setMaxKeys(-1);
return s3.listNextBatchOfObjects(previous);
}
};

return Iterators.concat(Iterators.transform(listings, this::statusFromListing));
Iterator<LocatedFileStatus> result = Iterators.concat(Iterators.transform(listings, this::statusFromListing));
if (mode.isFilesOnly()) {
// Even recursive listing can still contain empty "directory" objects, must filter them out
result = Iterators.filter(result, LocatedFileStatus::isFile);
}
return result;
}

private Iterator<LocatedFileStatus> statusFromListing(ObjectListing listing)
{
List<String> prefixes = listing.getCommonPrefixes();
List<S3ObjectSummary> objects = listing.getObjectSummaries();
if (prefixes.isEmpty()) {
return statusFromObjects(objects);
}
if (objects.isEmpty()) {
return statusFromPrefixes(prefixes);
}
return Iterators.concat(
statusFromPrefixes(listing.getCommonPrefixes()),
statusFromObjects(listing.getObjectSummaries()));
statusFromPrefixes(prefixes),
statusFromObjects(objects));
}

private Iterator<LocatedFileStatus> statusFromPrefixes(List<String> prefixes)
{
List<LocatedFileStatus> list = new ArrayList<>();
List<LocatedFileStatus> list = new ArrayList<>(prefixes.size());
for (String prefix : prefixes) {
Path path = qualifiedPath(new Path(PATH_SEPARATOR + prefix));
FileStatus status = new FileStatus(0, true, 1, 0, 0, path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,22 @@
import com.amazonaws.services.s3.model.EncryptionMaterialsProvider;
import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ListObjectsRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectInputStream;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.s3.model.StorageClass;
import com.facebook.presto.hive.s3.PrestoS3FileSystem.UnrecoverableS3OperationException;
import com.google.common.base.VerifyException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.testng.SkipException;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
Expand All @@ -51,6 +57,10 @@
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -83,6 +93,7 @@
import static java.nio.file.Files.createTempDirectory;
import static java.nio.file.Files.createTempFile;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

public class TestPrestoS3FileSystem
Expand Down Expand Up @@ -664,4 +675,71 @@ public S3ObjectInputStream getObjectContent()
}
}
}

@Test
public void testListPrefixModes()
throws Exception
{
S3ObjectSummary rootObject = new S3ObjectSummary();
rootObject.setStorageClass(StorageClass.Standard.toString());
rootObject.setKey("standard-object-at-root.txt");
rootObject.setLastModified(new Date());

S3ObjectSummary childObject = new S3ObjectSummary();
childObject.setStorageClass(StorageClass.Standard.toString());
childObject.setKey("prefix/child-object.txt");
childObject.setLastModified(new Date());

try (PrestoS3FileSystem fs = new PrestoS3FileSystem()) {
MockAmazonS3 s3 = new MockAmazonS3()
{
@Override
public ObjectListing listObjects(ListObjectsRequest listObjectsRequest)
{
ObjectListing listing = new ObjectListing();
// Shallow listing
if ("/".equals(listObjectsRequest.getDelimiter())) {
listing.getCommonPrefixes().add("prefix");
listing.getObjectSummaries().add(rootObject);
return listing;
}
// Recursive listing of object keys only
listing.getObjectSummaries().addAll(Arrays.asList(childObject, rootObject));
return listing;
}
};
Path rootPath = new Path("s3n://test-bucket/");
fs.initialize(rootPath.toUri(), new Configuration());
fs.setS3Client(s3);

List<LocatedFileStatus> shallowAll = remoteIteratorToList(fs.listLocatedStatus(rootPath));
assertEquals(shallowAll.size(), 2);
assertTrue(shallowAll.get(0).isDirectory());
assertFalse(shallowAll.get(1).isDirectory());
assertEquals(shallowAll.get(0).getPath(), new Path(rootPath, "prefix"));
assertEquals(shallowAll.get(1).getPath(), new Path(rootPath, rootObject.getKey()));

List<LocatedFileStatus> shallowFiles = remoteIteratorToList(fs.listFiles(rootPath, false));
assertEquals(shallowFiles.size(), 1);
assertFalse(shallowFiles.get(0).isDirectory());
assertEquals(shallowFiles.get(0).getPath(), new Path(rootPath, rootObject.getKey()));

List<LocatedFileStatus> recursiveFiles = remoteIteratorToList(fs.listFiles(rootPath, true));
assertEquals(recursiveFiles.size(), 2);
assertFalse(recursiveFiles.get(0).isDirectory());
assertFalse(recursiveFiles.get(1).isDirectory());
assertEquals(recursiveFiles.get(0).getPath(), new Path(rootPath, childObject.getKey()));
assertEquals(recursiveFiles.get(1).getPath(), new Path(rootPath, rootObject.getKey()));
}
}

private static List<LocatedFileStatus> remoteIteratorToList(RemoteIterator<LocatedFileStatus> statuses)
throws IOException
{
List<LocatedFileStatus> result = new ArrayList<>();
while (statuses.hasNext()) {
result.add(statuses.next());
}
return result;
}
}