Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -236,26 +236,39 @@ private void renameGen2File(AzureLocation source, AzureLocation target)
@Override
public FileIterator listFiles(Location location)
throws IOException
{
return listFiles(location, true);
}

@Override
public FileIterator listFilesNonRecursively(Location location)
throws IOException
{
return listFiles(location, false);
}

public FileIterator listFiles(Location location, Boolean isRecursive)
throws IOException
{
AzureLocation azureLocation = new AzureLocation(location);
try {
// blob API returns directories as blobs, so it cannot be used when Gen2 is enabled
return isHierarchicalNamespaceEnabled(azureLocation)
? listGen2Files(azureLocation)
? listGen2Files(azureLocation, isRecursive)
: listBlobFiles(azureLocation);
}
catch (RuntimeException e) {
throw handleAzureException(e, "listing files", azureLocation);
}
}

private FileIterator listGen2Files(AzureLocation location)
private FileIterator listGen2Files(AzureLocation location, boolean isRecursive)
throws IOException
{
DataLakeFileSystemClient fileSystemClient = createFileSystemClient(location);
PagedIterable<PathItem> pathItems;
if (location.path().isEmpty()) {
pathItems = fileSystemClient.listPaths(new ListPathsOptions().setRecursive(true), null);
pathItems = fileSystemClient.listPaths(new ListPathsOptions().setRecursive(isRecursive), null);
}
else {
DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(location.path());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public void deleteDirectory(Location location)
try {
List<ListenableFuture<?>> batchFutures = new ArrayList<>();

for (List<Blob> blobBatch : partition(getPage(gcsLocation).iterateAll(), batchSize)) {
for (List<Blob> blobBatch : partition(getPage(gcsLocation, true).iterateAll(), batchSize)) {
StorageBatch batch = storage.batch();
for (Blob blob : blobBatch) {
batch.delete(blob.getBlobId());
Expand All @@ -155,10 +155,23 @@ public void renameFile(Location source, Location target)
@Override
public FileIterator listFiles(Location location)
throws IOException
{
return listFiles(location, true);
}

@Override
public FileIterator listFilesNonRecursively(Location location)
throws IOException
{
return listFiles(location, true);
}

public FileIterator listFiles(Location location, Boolean isRecursive)
throws IOException
{
GcsLocation gcsLocation = new GcsLocation(normalizeToDirectory(location));
try {
return new GcsFileIterator(gcsLocation, getPage(gcsLocation));
return new GcsFileIterator(gcsLocation, getPage(gcsLocation, isRecursive));
}
catch (RuntimeException e) {
throw handleGcsException(e, "listing files", gcsLocation);
Expand All @@ -180,13 +193,16 @@ private static void checkIsValidFile(GcsLocation gcsLocation)
checkState(!gcsLocation.path().endsWith("/"), "Location path ends with a slash: %s", gcsLocation);
}

private Page<Blob> getPage(GcsLocation location, BlobListOption... blobListOptions)
private Page<Blob> getPage(GcsLocation location, Boolean isRecursive, BlobListOption... blobListOptions)
{
List<BlobListOption> optionsBuilder = new ArrayList<>();

if (!location.path().isEmpty()) {
optionsBuilder.add(BlobListOption.prefix(location.path()));
}
if (!isRecursive) {
optionsBuilder.add(BlobListOption.delimiter("/"));
}
Arrays.stream(blobListOptions).forEach(optionsBuilder::add);
optionsBuilder.add(pageSize(this.pageSize));
return storage.list(location.bucket(), optionsBuilder.toArray(BlobListOption[]::new));
Expand Down Expand Up @@ -249,7 +265,7 @@ public Set<Location> listDirectories(Location location)
{
GcsLocation gcsLocation = new GcsLocation(normalizeToDirectory(location));
try {
Page<Blob> page = getPage(gcsLocation, currentDirectory(), matchGlob(gcsLocation.path() + "*/"));
Page<Blob> page = getPage(gcsLocation, true, currentDirectory(), matchGlob(gcsLocation.path() + "*/"));
Iterator<Blob> blobIterator = Iterators.filter(page.iterateAll().iterator(), blob -> blob.isDirectory());
ImmutableSet.Builder<Location> locationBuilder = ImmutableSet.builder();
while (blobIterator.hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,13 @@ public FileIterator listFiles(Location location)
return fileSystem(location).listFiles(location);
}

@Override
public FileIterator listFilesNonRecursively(Location location)
throws IOException
{
return fileSystem(location).listFilesNonRecursively(location);
}

@Override
public Optional<Boolean> directoryExists(Location location)
throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,19 @@ public void renameFile(Location source, Location target)
@Override
public FileIterator listFiles(Location location)
throws IOException
{
return listFiles(location, true);
}

@Override
public FileIterator listFilesNonRecursively(Location location)
throws IOException
{
return listFiles(location, false);
}

public FileIterator listFiles(Location location, Boolean isRecursive)
throws IOException
{
S3Location s3Location = new S3Location(location);

Expand All @@ -178,11 +191,16 @@ public FileIterator listFiles(Location location)
key += "/";
}

ListObjectsV2Request request = ListObjectsV2Request.builder()
ListObjectsV2Request.Builder builder = ListObjectsV2Request.builder()
.overrideConfiguration(context::applyCredentialProviderOverride)
.bucket(s3Location.bucket())
.prefix(key)
.build();
.prefix(key);

if (!isRecursive) {
builder = builder.delimiter("/");
}

ListObjectsV2Request request = builder.build();

try {
Iterator<S3Object> iterator = client.listObjectsV2Paginator(request).contents().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,27 @@ void renameFile(Location source, Location target)
FileIterator listFiles(Location location)
throws IOException;

/**
* Lists all files within the specified directory recursively or non-recursively .Default
* implementation is recursively. The location can be empty,
* listing all files in the file system, otherwise the location must end with a slash. If the
* location does not exist, an empty iterator is returned.
* <p>
* For hierarchical file systems, if the path is not a directory, an exception is
* raised.
* For hierarchical file systems, if the path does not reference an existing
* directory, an empty iterator is returned. For blob file systems, all blobs
* that start with the location are listed. In the rare case that a blob exists with the
* exact name of the prefix, it is not included in the results.
* <p>
* The returned FileEntry locations will start with the specified location exactly.
*
* @param location the directory to list
* @throws IllegalArgumentException if location is not valid for this file system
*/
FileIterator listFilesNonRecursively(Location location)
throws IOException;

/**
* Checks if a directory exists at the specified location. For all file system types,
* this returns <tt>true</tt> if the location is empty (the root of the file system)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,13 @@ public FileIterator listFiles(Location location)
return delegate.listFiles(location);
}

@Override
public FileIterator listFilesNonRecursively(Location location)
throws IOException
{
return delegate.listFilesNonRecursively(location);
}

@Override
public Optional<Boolean> directoryExists(Location location)
throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class LocalFileIterator
private final Path rootPath;
private final Iterator<Path> iterator;

public LocalFileIterator(Location location, Path rootPath, Path path)
public LocalFileIterator(Location location, Path rootPath, Path path, boolean isRecursive)
throws IOException
{
this.rootPath = requireNonNull(rootPath, "rootPath is null");
Expand All @@ -46,15 +46,29 @@ public LocalFileIterator(Location location, Path rootPath, Path path)
this.iterator = emptyIterator();
}
else {
try (Stream<Path> stream = Files.walk(path)) {
this.iterator = stream
.filter(Files::isRegularFile)
// materialize full list so stream can be closed
.collect(toImmutableList())
.iterator();
if (isRecursive) {
try (Stream<Path> stream = Files.walk(path)) {
this.iterator = stream
.filter(Files::isRegularFile)
// materialize full list so stream can be closed
.collect(toImmutableList())
.iterator();
}
catch (IOException e) {
throw handleException(location, e);
}
}
catch (IOException e) {
throw handleException(location, e);
else {
try (Stream<Path> stream = Files.walk(path)) {
this.iterator = stream
.filter(Files::isRegularFile)
// materialize full list so stream can be closed
.collect(toImmutableList())
.iterator();
}
catch (IOException e) {
throw handleException(location, e);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,20 @@ public void renameFile(Location source, Location target)
public FileIterator listFiles(Location location)
throws IOException
{
return new LocalFileIterator(location, rootPath, toDirectoryPath(location));
return listFiles(location, true);
}

public FileIterator listFiles(Location location, boolean isRecursive)
throws IOException
{
return new LocalFileIterator(location, rootPath, toDirectoryPath(location), isRecursive);
}

@Override
public FileIterator listFilesNonRecursively(Location location)
throws IOException
{
return listFiles(location, false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,13 @@ public FileEntry next()
};
}

@Override
public FileIterator listFilesNonRecursively(Location location)
throws IOException
{
return listFiles(location);
}

@Override
public Optional<Boolean> directoryExists(Location location)
throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,16 @@ public FileIterator listFiles(Location location)
return withTracing(span, () -> delegate.listFiles(location));
}

@Override
public FileIterator listFilesNonRecursively(Location location)
throws IOException
{
Span span = tracer.spanBuilder("FileSystem.listFiles")
.setAttribute(FileSystemAttributes.FILE_LOCATION, location.toString())
.startSpan();
return withTracing(span, () -> delegate.listFilesNonRecursively(location));
}

@Override
public Optional<Boolean> directoryExists(Location location)
throws IOException
Expand Down
Loading