diff --git a/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureFileSystem.java b/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureFileSystem.java index 702b68a38a3c..9ddc6c990dc8 100644 --- a/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureFileSystem.java +++ b/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureFileSystem.java @@ -232,12 +232,19 @@ private void renameGen2File(AzureLocation source, AzureLocation target) @Override public FileIterator listFiles(Location location) throws IOException + { + return listFiles(location, true); + } + + @Override + public FileIterator listFiles(Location location, boolean isRecursive) + throws IOException { AzureLocation azureLocation = new AzureLocation(location); try { // blob API returns directories as blobs, so it cannot be used when Gen2 is enabled - return isHierarchicalNamespaceEnabled(azureLocation) - ? listGen2Files(azureLocation) + return (isHierarchicalNamespaceEnabled(azureLocation)) + ? listGen2Files(azureLocation, isRecursive) : listBlobFiles(azureLocation); } catch (RuntimeException e) { @@ -245,13 +252,13 @@ public FileIterator listFiles(Location location) } } - private FileIterator listGen2Files(AzureLocation location) + private FileIterator listGen2Files(AzureLocation location, boolean isRecursive) throws IOException { DataLakeFileSystemClient fileSystemClient = createFileSystemClient(location); PagedIterable pathItems; if (location.path().isEmpty()) { - pathItems = fileSystemClient.listPaths(new ListPathsOptions().setRecursive(true), null); + pathItems = fileSystemClient.listPaths(new ListPathsOptions().setRecursive(isRecursive), null); } else { DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(location.path()); @@ -261,7 +268,7 @@ private FileIterator listGen2Files(AzureLocation location) if (!directoryClient.getProperties().isDirectory()) { throw new IOException("Location is not a directory: " + location); } - pathItems = directoryClient.listPaths(true, false, null, null); + pathItems = directoryClient.listPaths(isRecursive, false, null, null); } return new AzureDataLakeFileIterator( location, diff --git a/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystem.java b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystem.java index 3b7f2383cf4c..ea4108577fe8 100644 --- a/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystem.java +++ b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystem.java @@ -131,7 +131,7 @@ public void deleteDirectory(Location location) try { List> batchFutures = new ArrayList<>(); - for (List blobBatch : partition(getPage(gcsLocation).iterateAll(), batchSize)) { + for (List blobBatch : partition(getPage(gcsLocation, true).iterateAll(), batchSize)) { StorageBatch batch = storage.batch(); for (Blob blob : blobBatch) { batch.delete(blob.getBlobId()); @@ -155,10 +155,17 @@ public void renameFile(Location source, Location target) @Override public FileIterator listFiles(Location location) throws IOException + { + return listFiles(location, true); + } + + @Override + public FileIterator listFiles(Location location, boolean isRecursive) + throws IOException { GcsLocation gcsLocation = new GcsLocation(normalizeToDirectory(location)); try { - return new GcsFileIterator(gcsLocation, getPage(gcsLocation)); + return new GcsFileIterator(gcsLocation, getPage(gcsLocation, isRecursive)); } catch (RuntimeException e) { throw handleGcsException(e, "listing files", gcsLocation); @@ -180,13 +187,17 @@ private static void checkIsValidFile(GcsLocation gcsLocation) checkState(!gcsLocation.path().endsWith("/"), "Location path ends with a slash: %s", gcsLocation); } - private Page getPage(GcsLocation location, BlobListOption... blobListOptions) + private Page getPage(GcsLocation location, boolean isRecursive, BlobListOption... blobListOptions) { List optionsBuilder = new ArrayList<>(); if (!location.path().isEmpty()) { optionsBuilder.add(BlobListOption.prefix(location.path())); } + if (!isRecursive) { + optionsBuilder.add(BlobListOption.delimiter("/")); + } + Arrays.stream(blobListOptions).forEach(optionsBuilder::add); optionsBuilder.add(pageSize(this.pageSize)); return storage.list(location.bucket(), optionsBuilder.toArray(BlobListOption[]::new)); @@ -249,7 +260,7 @@ public Set listDirectories(Location location) { GcsLocation gcsLocation = new GcsLocation(normalizeToDirectory(location)); try { - Page page = getPage(gcsLocation, currentDirectory(), matchGlob(gcsLocation.path() + "*/")); + Page page = getPage(gcsLocation, true, currentDirectory(), matchGlob(gcsLocation.path() + "*/")); Iterator blobIterator = Iterators.filter(page.iterateAll().iterator(), blob -> blob.isDirectory()); ImmutableSet.Builder locationBuilder = ImmutableSet.builder(); while (blobIterator.hasNext()) { diff --git a/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/SwitchingFileSystem.java b/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/SwitchingFileSystem.java index c38043cc0763..28f90ccbbe46 100644 --- a/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/SwitchingFileSystem.java +++ b/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/SwitchingFileSystem.java @@ -110,6 +110,13 @@ public FileIterator listFiles(Location location) return fileSystem(location).listFiles(location); } + @Override + public FileIterator listFiles(Location location, boolean isRecursive) + throws IOException + { + return fileSystem(location).listFiles(location, isRecursive); + } + @Override public Optional directoryExists(Location location) throws IOException diff --git a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystem.java b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystem.java index e9b296349871..0b74a2823660 100644 --- a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystem.java +++ b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystem.java @@ -168,6 +168,13 @@ public void renameFile(Location source, Location target) @Override public FileIterator listFiles(Location location) throws IOException + { + return listFiles(location, true); + } + + @Override + public FileIterator listFiles(Location location, boolean isRecursive) + throws IOException { S3Location s3Location = new S3Location(location); @@ -176,10 +183,15 @@ public FileIterator listFiles(Location location) key += "/"; } - ListObjectsV2Request request = ListObjectsV2Request.builder() + ListObjectsV2Request.Builder builder = ListObjectsV2Request.builder() .bucket(s3Location.bucket()) - .prefix(key) - .build(); + .prefix(key); + + if (!isRecursive) { + builder = builder.delimiter("/"); + } + + ListObjectsV2Request request = builder.build(); try { Iterator iterator = client.listObjectsV2Paginator(request).contents().stream() diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/TrinoFileSystem.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/TrinoFileSystem.java index 9692154a39d0..47199e329c39 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/TrinoFileSystem.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/TrinoFileSystem.java @@ -153,6 +153,27 @@ void renameFile(Location source, Location target) FileIterator listFiles(Location location) throws IOException; + /** + * Lists all files within the specified directory recursively or non-recursively .Default + * implementation is recursively. The location can be empty, + * listing all files in the file system, otherwise the location must end with a slash. If the + * location does not exist, an empty iterator is returned. + *

+ * For hierarchical file systems, if the path is not a directory, an exception is + * raised. + * For hierarchical file systems, if the path does not reference an existing + * directory, an empty iterator is returned. For blob file systems, all blobs + * that start with the location are listed. In the rare case that a blob exists with the + * exact name of the prefix, it is not included in the results. + *

+ * The returned FileEntry locations will start with the specified location exactly. + * + * @param location the directory to list + * @throws IllegalArgumentException if location is not valid for this file system + */ + FileIterator listFiles(Location location, boolean isRecursive) + throws IOException; + /** * Checks if a directory exists at the specified location. For all file system types, * this returns true if the location is empty (the root of the file system) diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/local/LocalFileIterator.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/local/LocalFileIterator.java index 31de369d43ac..906395aa92d1 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/local/LocalFileIterator.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/local/LocalFileIterator.java @@ -35,7 +35,7 @@ class LocalFileIterator private final Path rootPath; private final Iterator iterator; - public LocalFileIterator(Location location, Path rootPath, Path path) + public LocalFileIterator(Location location, Path rootPath, Path path, boolean isRecursive) throws IOException { this.rootPath = requireNonNull(rootPath, "rootPath is null"); @@ -46,15 +46,29 @@ public LocalFileIterator(Location location, Path rootPath, Path path) this.iterator = emptyIterator(); } else { - try (Stream stream = Files.walk(path)) { - this.iterator = stream - .filter(Files::isRegularFile) - // materialize full list so stream can be closed - .collect(toImmutableList()) - .iterator(); + if (isRecursive) { + try (Stream stream = Files.walk(path)) { + this.iterator = stream + .filter(Files::isRegularFile) + // materialize full list so stream can be closed + .collect(toImmutableList()) + .iterator(); + } + catch (IOException e) { + throw handleException(location, e); + } } - catch (IOException e) { - throw handleException(location, e); + else { + try (Stream stream = Files.list(path)) { + this.iterator = stream + .filter(Files::isRegularFile) + // materialize full list so stream can be closed + .collect(toImmutableList()) + .iterator(); + } + catch (IOException e) { + throw handleException(location, e); + } } } } diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/local/LocalFileSystem.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/local/LocalFileSystem.java index a6fa596124c5..ec1a73c4df6d 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/local/LocalFileSystem.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/local/LocalFileSystem.java @@ -153,7 +153,14 @@ public void renameFile(Location source, Location target) public FileIterator listFiles(Location location) throws IOException { - return new LocalFileIterator(location, rootPath, toDirectoryPath(location)); + return listFiles(location, true); + } + + @Override + public FileIterator listFiles(Location location, boolean isRecursive) + throws IOException + { + return new LocalFileIterator(location, rootPath, toDirectoryPath(location), isRecursive); } @Override diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/memory/MemoryFileSystem.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/memory/MemoryFileSystem.java index 1b07a632fa69..c6818e608b05 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/memory/MemoryFileSystem.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/memory/MemoryFileSystem.java @@ -156,6 +156,14 @@ public FileEntry next() }; } + // implementing same as recursive true + @Override + public FileIterator listFiles(Location location, boolean isRecursive) + throws IOException + { + return listFiles(location); + } + @Override public Optional directoryExists(Location location) throws IOException diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracing/TracingFileSystem.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracing/TracingFileSystem.java index 0755a924fdd6..2142b2ff65eb 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracing/TracingFileSystem.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracing/TracingFileSystem.java @@ -109,6 +109,16 @@ public FileIterator listFiles(Location location) return withTracing(span, () -> delegate.listFiles(location)); } + @Override + public FileIterator listFiles(Location location, boolean isRecursive) + throws IOException + { + Span span = tracer.spanBuilder("FileSystem.listFiles") + .setAttribute(FileSystemAttributes.FILE_LOCATION, location.toString()) + .startSpan(); + return withTracing(span, () -> delegate.listFiles(location, isRecursive)); + } + @Override public Optional directoryExists(Location location) throws IOException diff --git a/lib/trino-filesystem/src/test/java/io/trino/filesystem/TrackingFileSystemFactory.java b/lib/trino-filesystem/src/test/java/io/trino/filesystem/TrackingFileSystemFactory.java index c9f4bae6e02c..2b0f659b4573 100644 --- a/lib/trino-filesystem/src/test/java/io/trino/filesystem/TrackingFileSystemFactory.java +++ b/lib/trino-filesystem/src/test/java/io/trino/filesystem/TrackingFileSystemFactory.java @@ -167,6 +167,13 @@ public FileIterator listFiles(Location location) return delegate.listFiles(location); } + @Override + public FileIterator listFiles(Location location, boolean isRecursive) + throws IOException + { + return delegate.listFiles(location, isRecursive); + } + @Override public Optional directoryExists(Location location) throws IOException diff --git a/lib/trino-hdfs/src/main/java/io/trino/filesystem/hdfs/HdfsFileSystem.java b/lib/trino-hdfs/src/main/java/io/trino/filesystem/hdfs/HdfsFileSystem.java index e1ae9400f461..b6250a098098 100644 --- a/lib/trino-hdfs/src/main/java/io/trino/filesystem/hdfs/HdfsFileSystem.java +++ b/lib/trino-hdfs/src/main/java/io/trino/filesystem/hdfs/HdfsFileSystem.java @@ -228,13 +228,20 @@ public void renameFile(Location source, Location target) @Override public FileIterator listFiles(Location location) throws IOException + { + return listFiles(location, true); + } + + @Override + public FileIterator listFiles(Location location, boolean isRecursive) + throws IOException { stats.getListFilesCalls().newCall(); Path directory = hadoopPath(location); FileSystem fileSystem = environment.getFileSystem(context, directory); return environment.doAs(context.getIdentity(), () -> { try (TimeStat.BlockTimer ignored = stats.getListFilesCalls().time()) { - return new HdfsFileIterator(location, directory, fileSystem.listFiles(directory, true)); + return new HdfsFileIterator(location, directory, fileSystem.listFiles(directory, isRecursive)); } catch (FileNotFoundException e) { return FileIterator.empty(); diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestBackgroundHiveSplitLoader.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestBackgroundHiveSplitLoader.java index 555ce79943bf..1f577fa03470 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestBackgroundHiveSplitLoader.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestBackgroundHiveSplitLoader.java @@ -1305,6 +1305,26 @@ public FileEntry next() }; } + @Override + public FileIterator listFiles(Location location, boolean isRecursive) + { + Iterator iterator = List.of(fileEntry).iterator(); + return new FileIterator() + { + @Override + public boolean hasNext() + { + return iterator.hasNext(); + } + + @Override + public FileEntry next() + { + return iterator.next(); + } + }; + } + @Override public TrinoInputFile newInputFile(Location location) { diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitSource.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitSource.java index ad3aae7e6717..d8bedbf003c4 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitSource.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitSource.java @@ -49,6 +49,7 @@ import static io.trino.plugin.hudi.HudiSessionProperties.getStandardSplitWeightSize; import static io.trino.plugin.hudi.HudiSessionProperties.isSizeBasedSplitWeightsEnabled; import static io.trino.plugin.hudi.HudiUtil.buildTableMetaClient; +import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; import static java.util.stream.Collectors.toList; public class HudiSplitSource @@ -91,7 +92,12 @@ public HudiSplitSource( queue, new BoundedExecutor(executor, getSplitGeneratorParallelism(session)), createSplitWeightProvider(session), - partitions); + partitions, + throwable -> { + trinoException.compareAndSet(null, new TrinoException(GENERIC_INTERNAL_ERROR, + "Failed to generate splits for " + table.getTableName(), throwable)); + queue.finish(); + }); this.splitLoaderFuture = splitLoaderExecutorService.schedule(splitLoader, 0, TimeUnit.MILLISECONDS); } diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/split/HudiBackgroundSplitLoader.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/split/HudiBackgroundSplitLoader.java index 31447f74d086..db043a470f4c 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/split/HudiBackgroundSplitLoader.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/split/HudiBackgroundSplitLoader.java @@ -18,7 +18,6 @@ import io.trino.plugin.hudi.HudiTableHandle; import io.trino.plugin.hudi.partition.HudiPartitionInfoLoader; import io.trino.plugin.hudi.query.HudiDirectoryLister; -import io.trino.spi.TrinoException; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorSplit; @@ -29,8 +28,8 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.Future; +import java.util.function.Consumer; -import static io.trino.plugin.hudi.HudiErrorCode.HUDI_CANNOT_OPEN_SPLIT; import static io.trino.plugin.hudi.HudiSessionProperties.getSplitGeneratorParallelism; import static java.util.Objects.requireNonNull; @@ -44,6 +43,8 @@ public class HudiBackgroundSplitLoader private final HudiSplitFactory hudiSplitFactory; private final List partitions; + private final Consumer errorListener; + public HudiBackgroundSplitLoader( ConnectorSession session, HudiTableHandle tableHandle, @@ -51,13 +52,15 @@ public HudiBackgroundSplitLoader( AsyncQueue asyncQueue, Executor splitGeneratorExecutor, HudiSplitWeightProvider hudiSplitWeightProvider, - List partitions) + List partitions, + Consumer errorListener) { this.hudiDirectoryLister = requireNonNull(hudiDirectoryLister, "hudiDirectoryLister is null"); this.asyncQueue = requireNonNull(asyncQueue, "asyncQueue is null"); this.splitGeneratorExecutor = requireNonNull(splitGeneratorExecutor, "splitGeneratorExecutorService is null"); this.splitGeneratorNumThreads = getSplitGeneratorParallelism(session); this.hudiSplitFactory = new HudiSplitFactory(tableHandle, hudiSplitWeightProvider); + this.errorListener = requireNonNull(errorListener, "errorListener is null"); this.partitions = requireNonNull(partitions, "partitions is null"); } @@ -86,7 +89,8 @@ public void run() future.get(); } catch (InterruptedException | ExecutionException e) { - throw new TrinoException(HUDI_CANNOT_OPEN_SPLIT, "Error generating Hudi split", e); + //throw new TrinoException(HUDI_CANNOT_OPEN_SPLIT, "Error generating Hudi split", e); + errorListener.accept(e); } } asyncQueue.finish(); diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/table/HudiTableFileSystemView.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/table/HudiTableFileSystemView.java index e041cf018be3..76117f9138cd 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/table/HudiTableFileSystemView.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/table/HudiTableFileSystemView.java @@ -40,7 +40,6 @@ import org.apache.avro.specific.SpecificRecordBase; import java.io.IOException; -import java.io.OutputStream; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -297,9 +296,7 @@ private FileIterator listPartition(Location partitionLocation) if (fileIterator.hasNext()) { return fileIterator; } - try (OutputStream ignored = metaClient.getFileSystem().newOutputFile(partitionLocation).create()) { - return FileIterator.empty(); - } + return FileIterator.empty(); } public List addFilesToView(FileIterator partitionFiles) diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/table/HudiTableMetaClient.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/table/HudiTableMetaClient.java index 7f20c63fd115..1e56a2dfc0c1 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/table/HudiTableMetaClient.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/table/HudiTableMetaClient.java @@ -163,12 +163,14 @@ public List scanHoodieInstantsFromFileSystem(Set includedEx private List scanFiles(Predicate pathPredicate) throws IOException { - FileIterator fileIterator = fileSystem.listFiles(metaPath); + FileIterator fileIterator = fileSystem.listFiles(metaPath, false); List result = new ArrayList<>(); while (fileIterator.hasNext()) { FileEntry fileEntry = fileIterator.next(); - if (pathPredicate.test(fileEntry.location())) { - result.add(fileEntry); + if (fileEntry.location().parentDirectory().toString().equals(metaPath.toString())) { // to avoid recursive file if any + if (pathPredicate.test(fileEntry.location())) { + result.add(fileEntry); + } } } return result;