diff --git a/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureFileSystem.java b/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureFileSystem.java index 40c75f942460..645a09d9a14b 100644 --- a/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureFileSystem.java +++ b/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureFileSystem.java @@ -236,12 +236,25 @@ private void renameGen2File(AzureLocation source, AzureLocation target) @Override public FileIterator listFiles(Location location) throws IOException + { + return listFiles(location, true); + } + + @Override + public FileIterator listFilesNonRecursively(Location location) + throws IOException + { + return listFiles(location, false); + } + + public FileIterator listFiles(Location location, Boolean isRecursive) + throws IOException { AzureLocation azureLocation = new AzureLocation(location); try { // blob API returns directories as blobs, so it cannot be used when Gen2 is enabled return isHierarchicalNamespaceEnabled(azureLocation) - ? listGen2Files(azureLocation) + ? listGen2Files(azureLocation, isRecursive) : listBlobFiles(azureLocation); } catch (RuntimeException e) { @@ -249,13 +262,13 @@ public FileIterator listFiles(Location location) } } - private FileIterator listGen2Files(AzureLocation location) + private FileIterator listGen2Files(AzureLocation location, boolean isRecursive) throws IOException { DataLakeFileSystemClient fileSystemClient = createFileSystemClient(location); PagedIterable pathItems; if (location.path().isEmpty()) { - pathItems = fileSystemClient.listPaths(new ListPathsOptions().setRecursive(true), null); + pathItems = fileSystemClient.listPaths(new ListPathsOptions().setRecursive(isRecursive), null); } else { DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(location.path()); diff --git a/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystem.java b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystem.java index c971925b3491..207fe9719b5c 100644 --- a/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystem.java +++ b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystem.java @@ -131,7 +131,7 @@ public void deleteDirectory(Location location) try { List> batchFutures = new ArrayList<>(); - for (List blobBatch : partition(getPage(gcsLocation).iterateAll(), batchSize)) { + for (List blobBatch : partition(getPage(gcsLocation, true).iterateAll(), batchSize)) { StorageBatch batch = storage.batch(); for (Blob blob : blobBatch) { batch.delete(blob.getBlobId()); @@ -155,10 +155,23 @@ public void renameFile(Location source, Location target) @Override public FileIterator listFiles(Location location) throws IOException + { + return listFiles(location, true); + } + + @Override + public FileIterator listFilesNonRecursively(Location location) + throws IOException + { + return listFiles(location, true); + } + + public FileIterator listFiles(Location location, Boolean isRecursive) + throws IOException { GcsLocation gcsLocation = new GcsLocation(normalizeToDirectory(location)); try { - return new GcsFileIterator(gcsLocation, getPage(gcsLocation)); + return new GcsFileIterator(gcsLocation, getPage(gcsLocation, isRecursive)); } catch (RuntimeException e) { throw handleGcsException(e, "listing files", gcsLocation); @@ -180,13 +193,16 @@ private static void checkIsValidFile(GcsLocation gcsLocation) checkState(!gcsLocation.path().endsWith("/"), "Location path ends with a slash: %s", gcsLocation); } - private Page getPage(GcsLocation location, BlobListOption... blobListOptions) + private Page getPage(GcsLocation location, Boolean isRecursive, BlobListOption... blobListOptions) { List optionsBuilder = new ArrayList<>(); if (!location.path().isEmpty()) { optionsBuilder.add(BlobListOption.prefix(location.path())); } + if (!isRecursive) { + optionsBuilder.add(BlobListOption.delimiter("/")); + } Arrays.stream(blobListOptions).forEach(optionsBuilder::add); optionsBuilder.add(pageSize(this.pageSize)); return storage.list(location.bucket(), optionsBuilder.toArray(BlobListOption[]::new)); @@ -249,7 +265,7 @@ public Set listDirectories(Location location) { GcsLocation gcsLocation = new GcsLocation(normalizeToDirectory(location)); try { - Page page = getPage(gcsLocation, currentDirectory(), matchGlob(gcsLocation.path() + "*/")); + Page page = getPage(gcsLocation, true, currentDirectory(), matchGlob(gcsLocation.path() + "*/")); Iterator blobIterator = Iterators.filter(page.iterateAll().iterator(), blob -> blob.isDirectory()); ImmutableSet.Builder locationBuilder = ImmutableSet.builder(); while (blobIterator.hasNext()) { diff --git a/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/SwitchingFileSystem.java b/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/SwitchingFileSystem.java index c38043cc0763..5e108589c856 100644 --- a/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/SwitchingFileSystem.java +++ b/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/SwitchingFileSystem.java @@ -110,6 +110,13 @@ public FileIterator listFiles(Location location) return fileSystem(location).listFiles(location); } + @Override + public FileIterator listFilesNonRecursively(Location location) + throws IOException + { + return fileSystem(location).listFilesNonRecursively(location); + } + @Override public Optional directoryExists(Location location) throws IOException diff --git a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystem.java b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystem.java index 8228c6d45c58..cdc0bc87c7ed 100644 --- a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystem.java +++ b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystem.java @@ -170,6 +170,19 @@ public void renameFile(Location source, Location target) @Override public FileIterator listFiles(Location location) throws IOException + { + return listFiles(location, true); + } + + @Override + public FileIterator listFilesNonRecursively(Location location) + throws IOException + { + return listFiles(location, false); + } + + public FileIterator listFiles(Location location, Boolean isRecursive) + throws IOException { S3Location s3Location = new S3Location(location); @@ -178,11 +191,16 @@ public FileIterator listFiles(Location location) key += "/"; } - ListObjectsV2Request request = ListObjectsV2Request.builder() + ListObjectsV2Request.Builder builder = ListObjectsV2Request.builder() .overrideConfiguration(context::applyCredentialProviderOverride) .bucket(s3Location.bucket()) - .prefix(key) - .build(); + .prefix(key); + + if (!isRecursive) { + builder = builder.delimiter("/"); + } + + ListObjectsV2Request request = builder.build(); try { Iterator iterator = client.listObjectsV2Paginator(request).contents().stream() diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/TrinoFileSystem.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/TrinoFileSystem.java index f4633964fa45..07e1186d6d9b 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/TrinoFileSystem.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/TrinoFileSystem.java @@ -153,6 +153,27 @@ void renameFile(Location source, Location target) FileIterator listFiles(Location location) throws IOException; + /** + * Lists all files within the specified directory recursively or non-recursively .Default + * implementation is recursively. The location can be empty, + * listing all files in the file system, otherwise the location must end with a slash. If the + * location does not exist, an empty iterator is returned. + *

+ * For hierarchical file systems, if the path is not a directory, an exception is + * raised. + * For hierarchical file systems, if the path does not reference an existing + * directory, an empty iterator is returned. For blob file systems, all blobs + * that start with the location are listed. In the rare case that a blob exists with the + * exact name of the prefix, it is not included in the results. + *

+ * The returned FileEntry locations will start with the specified location exactly. + * + * @param location the directory to list + * @throws IllegalArgumentException if location is not valid for this file system + */ + FileIterator listFilesNonRecursively(Location location) + throws IOException; + /** * Checks if a directory exists at the specified location. For all file system types, * this returns true if the location is empty (the root of the file system) diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/CacheFileSystem.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/CacheFileSystem.java index 6449db7a0199..03fbaf70e7ed 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/CacheFileSystem.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/CacheFileSystem.java @@ -96,6 +96,13 @@ public FileIterator listFiles(Location location) return delegate.listFiles(location); } + @Override + public FileIterator listFilesNonRecursively(Location location) + throws IOException + { + return delegate.listFilesNonRecursively(location); + } + @Override public Optional directoryExists(Location location) throws IOException diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/local/LocalFileIterator.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/local/LocalFileIterator.java index 31de369d43ac..746f8f2b3100 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/local/LocalFileIterator.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/local/LocalFileIterator.java @@ -35,7 +35,7 @@ class LocalFileIterator private final Path rootPath; private final Iterator iterator; - public LocalFileIterator(Location location, Path rootPath, Path path) + public LocalFileIterator(Location location, Path rootPath, Path path, boolean isRecursive) throws IOException { this.rootPath = requireNonNull(rootPath, "rootPath is null"); @@ -46,15 +46,29 @@ public LocalFileIterator(Location location, Path rootPath, Path path) this.iterator = emptyIterator(); } else { - try (Stream stream = Files.walk(path)) { - this.iterator = stream - .filter(Files::isRegularFile) - // materialize full list so stream can be closed - .collect(toImmutableList()) - .iterator(); + if (isRecursive) { + try (Stream stream = Files.walk(path)) { + this.iterator = stream + .filter(Files::isRegularFile) + // materialize full list so stream can be closed + .collect(toImmutableList()) + .iterator(); + } + catch (IOException e) { + throw handleException(location, e); + } } - catch (IOException e) { - throw handleException(location, e); + else { + try (Stream stream = Files.walk(path)) { + this.iterator = stream + .filter(Files::isRegularFile) + // materialize full list so stream can be closed + .collect(toImmutableList()) + .iterator(); + } + catch (IOException e) { + throw handleException(location, e); + } } } } diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/local/LocalFileSystem.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/local/LocalFileSystem.java index dc5069260d8d..ffb11fe2a1a4 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/local/LocalFileSystem.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/local/LocalFileSystem.java @@ -157,7 +157,20 @@ public void renameFile(Location source, Location target) public FileIterator listFiles(Location location) throws IOException { - return new LocalFileIterator(location, rootPath, toDirectoryPath(location)); + return listFiles(location, true); + } + + public FileIterator listFiles(Location location, boolean isRecursive) + throws IOException + { + return new LocalFileIterator(location, rootPath, toDirectoryPath(location), isRecursive); + } + + @Override + public FileIterator listFilesNonRecursively(Location location) + throws IOException + { + return listFiles(location, false); } @Override diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/memory/MemoryFileSystem.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/memory/MemoryFileSystem.java index 0a9bc1b74fe9..f70f6df4cd0c 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/memory/MemoryFileSystem.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/memory/MemoryFileSystem.java @@ -155,6 +155,13 @@ public FileEntry next() }; } + @Override + public FileIterator listFilesNonRecursively(Location location) + throws IOException + { + return listFiles(location); + } + @Override public Optional directoryExists(Location location) throws IOException diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracing/TracingFileSystem.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracing/TracingFileSystem.java index 0755a924fdd6..a89c269fec7f 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracing/TracingFileSystem.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracing/TracingFileSystem.java @@ -109,6 +109,16 @@ public FileIterator listFiles(Location location) return withTracing(span, () -> delegate.listFiles(location)); } + @Override + public FileIterator listFilesNonRecursively(Location location) + throws IOException + { + Span span = tracer.spanBuilder("FileSystem.listFiles") + .setAttribute(FileSystemAttributes.FILE_LOCATION, location.toString()) + .startSpan(); + return withTracing(span, () -> delegate.listFilesNonRecursively(location)); + } + @Override public Optional directoryExists(Location location) throws IOException diff --git a/lib/trino-filesystem/src/test/java/io/trino/filesystem/TrackingFileSystemFactory.java b/lib/trino-filesystem/src/test/java/io/trino/filesystem/TrackingFileSystemFactory.java deleted file mode 100644 index 195b3eede757..000000000000 --- a/lib/trino-filesystem/src/test/java/io/trino/filesystem/TrackingFileSystemFactory.java +++ /dev/null @@ -1,339 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.filesystem; - -import com.google.common.collect.ImmutableMap; -import io.airlift.slice.Slice; -import io.trino.memory.context.AggregatedMemoryContext; -import io.trino.spi.security.ConnectorIdentity; - -import java.io.IOException; -import java.io.OutputStream; -import java.time.Instant; -import java.util.Collection; -import java.util.Map; -import java.util.Optional; -import java.util.OptionalLong; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; - -import static com.google.common.base.Verify.verify; -import static io.trino.filesystem.TrackingFileSystemFactory.OperationType.INPUT_FILE_EXISTS; -import static io.trino.filesystem.TrackingFileSystemFactory.OperationType.INPUT_FILE_GET_LENGTH; -import static io.trino.filesystem.TrackingFileSystemFactory.OperationType.INPUT_FILE_LAST_MODIFIED; -import static io.trino.filesystem.TrackingFileSystemFactory.OperationType.INPUT_FILE_NEW_STREAM; -import static io.trino.filesystem.TrackingFileSystemFactory.OperationType.OUTPUT_FILE_CREATE; -import static io.trino.filesystem.TrackingFileSystemFactory.OperationType.OUTPUT_FILE_CREATE_EXCLUSIVE; -import static io.trino.filesystem.TrackingFileSystemFactory.OperationType.OUTPUT_FILE_CREATE_OR_OVERWRITE; -import static java.util.Objects.requireNonNull; - -public class TrackingFileSystemFactory - implements TrinoFileSystemFactory -{ - public enum OperationType - { - INPUT_FILE_GET_LENGTH, - INPUT_FILE_NEW_STREAM, - INPUT_FILE_EXISTS, - OUTPUT_FILE_CREATE, - OUTPUT_FILE_CREATE_OR_OVERWRITE, - OUTPUT_FILE_CREATE_EXCLUSIVE, - OUTPUT_FILE_TO_INPUT_FILE, - INPUT_FILE_LAST_MODIFIED, - } - - private final AtomicInteger fileId = new AtomicInteger(); - private final TrinoFileSystemFactory delegate; - - private final Map operationCounts = new ConcurrentHashMap<>(); - - public TrackingFileSystemFactory(TrinoFileSystemFactory delegate) - { - this.delegate = requireNonNull(delegate, "delegate is null"); - } - - public Map getOperationCounts() - { - return ImmutableMap.copyOf(operationCounts); - } - - public void reset() - { - operationCounts.clear(); - } - - private void increment(Location path, int fileId, OperationType operationType) - { - OperationContext context = new OperationContext(path, fileId, operationType); - operationCounts.merge(context, 1, Math::addExact); // merge is atomic for ConcurrentHashMap - } - - @Override - public TrinoFileSystem create(ConnectorIdentity identity) - { - return new TrackingFileSystem(delegate.create(identity), this::increment); - } - - private interface Tracker - { - void track(Location path, int fileId, OperationType operationType); - } - - private class TrackingFileSystem - implements TrinoFileSystem - { - private final TrinoFileSystem delegate; - private final Tracker tracker; - - private TrackingFileSystem(TrinoFileSystem delegate, Tracker tracker) - { - this.delegate = requireNonNull(delegate, "delegate is null"); - this.tracker = requireNonNull(tracker, "tracker is null"); - } - - @Override - public TrinoInputFile newInputFile(Location location) - { - int nextId = fileId.incrementAndGet(); - return new TrackingInputFile( - delegate.newInputFile(location), - OptionalLong.empty(), - operation -> tracker.track(location, nextId, operation)); - } - - @Override - public TrinoInputFile newInputFile(Location location, long length) - { - int nextId = fileId.incrementAndGet(); - return new TrackingInputFile( - delegate.newInputFile(location, length), - OptionalLong.of(length), - operation -> tracker.track(location, nextId, operation)); - } - - @Override - public TrinoOutputFile newOutputFile(Location location) - { - int nextId = fileId.incrementAndGet(); - return new TrackingOutputFile( - delegate.newOutputFile(location), - operationType -> tracker.track(location, nextId, operationType)); - } - - @Override - public void deleteFile(Location location) - throws IOException - { - delegate.deleteFile(location); - } - - @Override - public void deleteFiles(Collection locations) - throws IOException - { - delegate.deleteFiles(locations); - } - - @Override - public void deleteDirectory(Location location) - throws IOException - { - delegate.deleteDirectory(location); - } - - @Override - public void renameFile(Location source, Location target) - throws IOException - { - delegate.renameFile(source, target); - } - - @Override - public FileIterator listFiles(Location location) - throws IOException - { - return delegate.listFiles(location); - } - - @Override - public Optional directoryExists(Location location) - throws IOException - { - return delegate.directoryExists(location); - } - - @Override - public void createDirectory(Location location) - throws IOException - { - delegate.createDirectory(location); - } - - @Override - public void renameDirectory(Location source, Location target) - throws IOException - { - delegate.renameDirectory(source, target); - } - - @Override - public Set listDirectories(Location location) - throws IOException - { - return delegate.listDirectories(location); - } - - @Override - public Optional createTemporaryDirectory(Location targetPath, String temporaryPrefix, String relativePrefix) - throws IOException - { - return delegate.createTemporaryDirectory(targetPath, temporaryPrefix, relativePrefix); - } - } - - private static class TrackingInputFile - implements TrinoInputFile - { - private final TrinoInputFile delegate; - private final OptionalLong length; - private final Consumer tracker; - - public TrackingInputFile(TrinoInputFile delegate, OptionalLong length, Consumer tracker) - { - this.delegate = requireNonNull(delegate, "delegate is null"); - this.length = requireNonNull(length, "length is null"); - this.tracker = requireNonNull(tracker, "tracker is null"); - } - - @Override - public long length() - throws IOException - { - if (length.isPresent()) { - // Without TrinoInputFile, known length would be returned. This is additional verification - long actualLength = delegate.length(); - verify(length.getAsLong() == actualLength, "Provided length does not match actual: %s != %s", length.getAsLong(), actualLength); - // No call tracking -- the filesystem call is for verification only. Normally it wouldn't take place. - return length.getAsLong(); - } - tracker.accept(INPUT_FILE_GET_LENGTH); - return delegate.length(); - } - - @Override - public TrinoInput newInput() - throws IOException - { - tracker.accept(INPUT_FILE_NEW_STREAM); - return delegate.newInput(); - } - - @Override - public TrinoInputStream newStream() - throws IOException - { - tracker.accept(INPUT_FILE_NEW_STREAM); - return delegate.newStream(); - } - - @Override - public boolean exists() - throws IOException - { - tracker.accept(INPUT_FILE_EXISTS); - return delegate.exists(); - } - - @Override - public Instant lastModified() - throws IOException - { - tracker.accept(INPUT_FILE_LAST_MODIFIED); - return delegate.lastModified(); - } - - @Override - public Location location() - { - return delegate.location(); - } - - @Override - public String toString() - { - return delegate.toString(); - } - } - - private static class TrackingOutputFile - implements TrinoOutputFile - { - private final TrinoOutputFile delegate; - private final Consumer tracker; - - public TrackingOutputFile(TrinoOutputFile delegate, Consumer tracker) - { - this.delegate = requireNonNull(delegate, "delete is null"); - this.tracker = requireNonNull(tracker, "tracker is null"); - } - - @Override - public OutputStream create(AggregatedMemoryContext memoryContext) - throws IOException - { - tracker.accept(OUTPUT_FILE_CREATE); - return delegate.create(memoryContext); - } - - @Override - public OutputStream createOrOverwrite(AggregatedMemoryContext memoryContext) - throws IOException - { - tracker.accept(OUTPUT_FILE_CREATE_OR_OVERWRITE); - return delegate.createOrOverwrite(memoryContext); - } - - @Override - public void createExclusive(Slice content, AggregatedMemoryContext memoryContext) - throws IOException - { - tracker.accept(OUTPUT_FILE_CREATE_EXCLUSIVE); - delegate.createExclusive(content, memoryContext); - } - - @Override - public Location location() - { - // Not tracked because it's a cheap local operation - return delegate.location(); - } - - @Override - public String toString() - { - return delegate.toString(); - } - } - - public record OperationContext(Location location, int fileId, OperationType operationType) - { - public OperationContext - { - requireNonNull(location, "location is null"); - requireNonNull(operationType, "operationType is null"); - } - } -} diff --git a/lib/trino-hdfs/src/main/java/io/trino/filesystem/hdfs/HdfsFileSystem.java b/lib/trino-hdfs/src/main/java/io/trino/filesystem/hdfs/HdfsFileSystem.java index c7f8d83fd759..e817d0301ed6 100644 --- a/lib/trino-hdfs/src/main/java/io/trino/filesystem/hdfs/HdfsFileSystem.java +++ b/lib/trino-hdfs/src/main/java/io/trino/filesystem/hdfs/HdfsFileSystem.java @@ -227,13 +227,26 @@ public void renameFile(Location source, Location target) @Override public FileIterator listFiles(Location location) throws IOException + { + return listFiles(location, true); + } + + @Override + public FileIterator listFilesNonRecursively(Location location) + throws IOException + { + return listFiles(location, false); + } + + public FileIterator listFiles(Location location, Boolean isRecursive) + throws IOException { stats.getListFilesCalls().newCall(); Path directory = hadoopPath(location); FileSystem fileSystem = environment.getFileSystem(context, directory); return environment.doAs(context.getIdentity(), () -> { try (TimeStat.BlockTimer ignored = stats.getListFilesCalls().time()) { - return new HdfsFileIterator(location, directory, fileSystem.listFiles(directory, true)); + return new HdfsFileIterator(location, directory, fileSystem.listFiles(directory, isRecursive)); } catch (FileNotFoundException e) { return FileIterator.empty(); diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestBackgroundHiveSplitLoader.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestBackgroundHiveSplitLoader.java index 57d8c8221cf9..721542e122e9 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestBackgroundHiveSplitLoader.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestBackgroundHiveSplitLoader.java @@ -1305,6 +1305,26 @@ public FileEntry next() }; } + @Override + public FileIterator listFilesNonRecursively(Location location) + { + Iterator iterator = List.of(fileEntry).iterator(); + return new FileIterator() + { + @Override + public boolean hasNext() + { + return iterator.hasNext(); + } + + @Override + public FileEntry next() + { + return iterator.next(); + } + }; + } + @Override public TrinoInputFile newInputFile(Location location) { diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/table/HudiTableMetaClient.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/table/HudiTableMetaClient.java index 7f20c63fd115..2746e742ed1d 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/table/HudiTableMetaClient.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/table/HudiTableMetaClient.java @@ -163,12 +163,14 @@ public List scanHoodieInstantsFromFileSystem(Set includedEx private List scanFiles(Predicate pathPredicate) throws IOException { - FileIterator fileIterator = fileSystem.listFiles(metaPath); + FileIterator fileIterator = fileSystem.listFilesNonRecursively(metaPath); List result = new ArrayList<>(); while (fileIterator.hasNext()) { FileEntry fileEntry = fileIterator.next(); - if (pathPredicate.test(fileEntry.location())) { - result.add(fileEntry); + if (fileEntry.location().parentDirectory().toString().equals(metaPath.toString())) { // to avoid recursive file if any + if (pathPredicate.test(fileEntry.location())) { + result.add(fileEntry); + } } } return result; diff --git a/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/TestHudiSmokeTest.java b/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/TestHudiSmokeTest.java index 6313e921cdf2..ceef92675cc2 100644 --- a/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/TestHudiSmokeTest.java +++ b/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/TestHudiSmokeTest.java @@ -30,6 +30,7 @@ import static io.trino.plugin.hudi.HudiQueryRunner.createHudiQueryRunner; import static io.trino.plugin.hudi.testing.ResourceHudiTablesInitializer.TestingTable.HUDI_COW_PT_TBL; import static io.trino.plugin.hudi.testing.ResourceHudiTablesInitializer.TestingTable.HUDI_NON_PART_COW; +import static io.trino.plugin.hudi.testing.ResourceHudiTablesInitializer.TestingTable.STOCK_DATA_WITH_HASHING_MOR; import static io.trino.plugin.hudi.testing.ResourceHudiTablesInitializer.TestingTable.STOCK_TICKS_COW; import static io.trino.plugin.hudi.testing.ResourceHudiTablesInitializer.TestingTable.STOCK_TICKS_MOR; import static org.assertj.core.api.Assertions.assertThat; @@ -328,6 +329,12 @@ public void testPartitionFilterRequiredWithLike() "Filter required on tests." + HUDI_COW_PT_TBL.getTableName() + " for at least one of the partition columns: dt, hh"); } + @Test + public void testStockDataMORwithHashingTableCount() + { + assertQuery("SELECT count(1) FROM " + STOCK_DATA_WITH_HASHING_MOR, "SELECT * FROM VALUES ('99')"); // for non-recursive metadata dir testing + } + @Test public void testPartitionFilterRequiredFilterIncluded() { diff --git a/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/testing/ResourceHudiTablesInitializer.java b/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/testing/ResourceHudiTablesInitializer.java index 0832ea0643b4..d7775fbed4f5 100644 --- a/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/testing/ResourceHudiTablesInitializer.java +++ b/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/testing/ResourceHudiTablesInitializer.java @@ -167,6 +167,7 @@ public enum TestingTable HUDI_COW_PT_TBL(multiPartitionRegularColumns(), multiPartitionColumns(), multiPartitions()), STOCK_TICKS_COW(stockTicksRegularColumns(), stockTicksPartitionColumns(), stockTicksPartitions()), STOCK_TICKS_MOR(stockTicksRegularColumns(), stockTicksPartitionColumns(), stockTicksPartitions()), + STOCK_DATA_WITH_HASHING_MOR(stockTicksRegularColumns(), stockDataPartitionColumns(), stockDataPartitions()), /**/; private static final List HUDI_META_COLUMNS = ImmutableList.of( @@ -262,6 +263,16 @@ private static List multiPartitionRegularColumns() column("ts", HIVE_LONG)); } + private static List stockDataPartitionColumns() + { + return ImmutableList.of(column("dt", HIVE_STRING)); + } + + private static Map stockDataPartitions() + { + return ImmutableMap.of("dt=2023-12-20", "2023/12/20"); + } + private static List multiPartitionColumns() { return ImmutableList.of( diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_data_with_hashing_mor/.hoodie/.bucket_index/consistent_hashing_metadata/2023/12/15/20231215120114336.commit b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_data_with_hashing_mor/.hoodie/.bucket_index/consistent_hashing_metadata/2023/12/15/20231215120114336.commit new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_data_with_hashing_mor/.hoodie/.bucket_index/consistent_hashing_metadata/2023/12/15/20231215120114336.hashing_meta b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_data_with_hashing_mor/.hoodie/.bucket_index/consistent_hashing_metadata/2023/12/15/20231215120114336.hashing_meta new file mode 100644 index 000000000000..61ac04985e4c --- /dev/null +++ b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_data_with_hashing_mor/.hoodie/.bucket_index/consistent_hashing_metadata/2023/12/15/20231215120114336.hashing_meta @@ -0,0 +1,16 @@ +{ + "version" : 0, + "partitionPath" : "2018/08/31", + "instant" : "20231215180153407", + "numBuckets" : 64, + "seqNo" : 2, + "nodes" : [ { + "value" : 33554432, + "fileIdPrefix" : "1882bdfa-0e68-40bc-be4d-b8979f9ff7ee", + "tag" : "NORMAL" + }, { + "value" : 100663296, + "fileIdPrefix" : "e524521d-3910-46ed-8b37-0dd5ec01f954", + "tag" : "NORMAL" + } ] +} diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_data_with_hashing_mor/.hoodie/20231220170257762.deltacommit b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_data_with_hashing_mor/.hoodie/20231220170257762.deltacommit new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_data_with_hashing_mor/.hoodie/20231220170257762.deltacommit.inflight b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_data_with_hashing_mor/.hoodie/20231220170257762.deltacommit.inflight new file mode 100644 index 000000000000..af620eb176b3 --- /dev/null +++ b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_data_with_hashing_mor/.hoodie/20231220170257762.deltacommit.inflight @@ -0,0 +1,48 @@ +{ + "partitionToWriteStats" : { + "2023/12/20" : [ { + "fileId" : "", + "path" : null, + "prevCommit" : "null", + "numWrites" : 0, + "numDeletes" : 0, + "numUpdateWrites" : 0, + "numInserts" : 50, + "totalWriteBytes" : 0, + "totalWriteErrors" : 0, + "tempPath" : null, + "partitionPath" : null, + "totalLogRecords" : 0, + "totalLogFilesCompacted" : 0, + "totalLogSizeCompacted" : 0, + "totalUpdatedRecordsCompacted" : 0, + "totalLogBlocks" : 0, + "totalCorruptLogBlock" : 0, + "totalRollbackBlocks" : 0, + "fileSizeInBytes" : 0, + "minEventTime" : null, + "maxEventTime" : null + } ] + }, + "compacted" : false, + "extraMetadata" : { }, + "operationType" : "UPSERT", + "fileIdAndRelativePaths" : { + "" : null + }, + "totalRecordsDeleted" : 0, + "totalLogRecordsCompacted" : 0, + "totalLogFilesCompacted" : 0, + "totalCompactedRecordsUpdated" : 0, + "totalLogFilesSize" : 0, + "totalScanTime" : 0, + "totalCreateTime" : 0, + "totalUpsertTime" : 0, + "minAndMaxEventTime" : { + "Optional.empty" : { + "val" : null, + "present" : false + } + }, + "writePartitionPaths" : [ "2023/12/20" ] +} diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_data_with_hashing_mor/.hoodie/20231220170257762.deltacommit.requested b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_data_with_hashing_mor/.hoodie/20231220170257762.deltacommit.requested new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_data_with_hashing_mor/.hoodie/20231221160257783.deltacommit b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_data_with_hashing_mor/.hoodie/20231221160257783.deltacommit new file mode 100644 index 000000000000..3afb2ec9d7e0 --- /dev/null +++ b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_data_with_hashing_mor/.hoodie/20231221160257783.deltacommit @@ -0,0 +1,51 @@ +{ + "partitionToWriteStats" : { + "2/08/31" : [ { + "fileId" : "167a0e3e-9b94-444f-a178-242230cdb5a2-0", + "path" : "2018/08/31/167a0e3e-9b94-444f-a178-242230cdb5a2-0_0-28-26_20211221030120532.parquet", + "prevCommit" : "null", + "numWrites" : 99, + "numDeletes" : 0, + "numUpdateWrites" : 0, + "numInserts" : 99, + "totalWriteBytes" : 440746, + "totalWriteErrors" : 0, + "tempPath" : null, + "partitionPath" : "2018/08/31", + "totalLogRecords" : 0, + "totalLogFilesCompacted" : 0, + "totalLogSizeCompacted" : 0, + "totalUpdatedRecordsCompacted" : 0, + "totalLogBlocks" : 0, + "totalCorruptLogBlock" : 0, + "totalRollbackBlocks" : 0, + "fileSizeInBytes" : 440746, + "minEventTime" : null, + "maxEventTime" : null + } ] + }, + "compacted" : false, + "extraMetadata" : { + "schema" : "{\"type\":\"record\",\"name\":\"stock_ticks\",\"fields\":[{\"name\":\"volume\",\"type\":\"long\"},{\"name\":\"ts\",\"type\":\"string\"},{\"name\":\"symbol\",\"type\":\"string\"},{\"name\":\"year\",\"type\":\"int\"},{\"name\":\"month\",\"type\":\"string\"},{\"name\":\"high\",\"type\":\"double\"},{\"name\":\"low\",\"type\":\"double\"},{\"name\":\"key\",\"type\":\"string\"},{\"name\":\"date\",\"type\":\"string\"},{\"name\":\"close\",\"type\":\"double\"},{\"name\":\"open\",\"type\":\"double\"},{\"name\":\"day\",\"type\":\"string\"}]}", + "deltastreamer.checkpoint.key" : "stock_ticks,0:1668" + }, + "operationType" : "UPSERT", + "fileIdAndRelativePaths" : { + "167a0e3e-9b94-444f-a178-242230cdb5a2-0" : "2018/08/31/167a0e3e-9b94-444f-a178-242230cdb5a2-0_0-28-26_20211221030120532.parquet" + }, + "totalRecordsDeleted" : 0, + "totalLogRecordsCompacted" : 0, + "totalLogFilesCompacted" : 0, + "totalCompactedRecordsUpdated" : 0, + "totalLogFilesSize" : 0, + "totalScanTime" : 0, + "totalCreateTime" : 1402, + "totalUpsertTime" : 0, + "minAndMaxEventTime" : { + "Optional.empty" : { + "val" : null, + "present" : false + } + }, + "writePartitionPaths" : [ "2018/08/31" ] +} diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_data_with_hashing_mor/.hoodie/20231221160257783.deltacommit.inflight b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_data_with_hashing_mor/.hoodie/20231221160257783.deltacommit.inflight new file mode 100644 index 000000000000..020007749bbf --- /dev/null +++ b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_data_with_hashing_mor/.hoodie/20231221160257783.deltacommit.inflight @@ -0,0 +1,71 @@ +{ + "partitionToWriteStats" : { + "2023/12/21" : [ { + "fileId" : "", + "path" : null, + "prevCommit" : "null", + "numWrites" : 0, + "numDeletes" : 0, + "numUpdateWrites" : 0, + "numInserts" : 0, + "totalWriteBytes" : 0, + "totalWriteErrors" : 0, + "tempPath" : null, + "partitionPath" : null, + "totalLogRecords" : 0, + "totalLogFilesCompacted" : 0, + "totalLogSizeCompacted" : 0, + "totalUpdatedRecordsCompacted" : 0, + "totalLogBlocks" : 0, + "totalCorruptLogBlock" : 0, + "totalRollbackBlocks" : 0, + "fileSizeInBytes" : 0, + "minEventTime" : null, + "maxEventTime" : null + }, { + "fileId" : "167a0e3e-9b94-444f-a178-242230cdb5a2-0", + "path" : null, + "prevCommit" : "20211221030120532", + "numWrites" : 0, + "numDeletes" : 0, + "numUpdateWrites" : 99, + "numInserts" : 0, + "totalWriteBytes" : 0, + "totalWriteErrors" : 0, + "tempPath" : null, + "partitionPath" : null, + "totalLogRecords" : 0, + "totalLogFilesCompacted" : 0, + "totalLogSizeCompacted" : 0, + "totalUpdatedRecordsCompacted" : 0, + "totalLogBlocks" : 0, + "totalCorruptLogBlock" : 0, + "totalRollbackBlocks" : 0, + "fileSizeInBytes" : 0, + "minEventTime" : null, + "maxEventTime" : null + } ] + }, + "compacted" : false, + "extraMetadata" : { }, + "operationType" : "UPSERT", + "totalRecordsDeleted" : 0, + "totalLogRecordsCompacted" : 0, + "totalLogFilesCompacted" : 0, + "totalCompactedRecordsUpdated" : 0, + "totalLogFilesSize" : 0, + "totalScanTime" : 0, + "totalCreateTime" : 0, + "totalUpsertTime" : 0, + "minAndMaxEventTime" : { + "Optional.empty" : { + "val" : null, + "present" : false + } + }, + "writePartitionPaths" : [ "2023/12/21" ], + "fileIdAndRelativePaths" : { + "" : null, + "167a0e3e-9b94-444f-a178-242230cdb5a2-0" : null + } +} diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_data_with_hashing_mor/.hoodie/20231221160257783.deltacommit.requested b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_data_with_hashing_mor/.hoodie/20231221160257783.deltacommit.requested new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_data_with_hashing_mor/.hoodie/20231221160258162.compaction.inflight b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_data_with_hashing_mor/.hoodie/20231221160258162.compaction.inflight new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_data_with_hashing_mor/.hoodie/20231221160258162.compaction.requested b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_data_with_hashing_mor/.hoodie/20231221160258162.compaction.requested new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_data_with_hashing_mor/.hoodie/20231221160258162.deltacommit b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_data_with_hashing_mor/.hoodie/20231221160258162.deltacommit new file mode 100644 index 000000000000..f1cc26fecc7b --- /dev/null +++ b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_data_with_hashing_mor/.hoodie/20231221160258162.deltacommit @@ -0,0 +1,55 @@ +{ + "partitionToWriteStats" : { + "2018/08/31" : [ { + "fileId" : "167a0e3e-9b94-444f-a178-242230cdb5a2-0", + "path" : "2018/08/31/.167a0e3e-9b94-444f-a178-242230cdb5a2-0_20211221030120532.log.1_0-28-29", + "prevCommit" : "20211221030120532", + "numWrites" : 99, + "numDeletes" : 0, + "numUpdateWrites" : 99, + "numInserts" : 0, + "totalWriteBytes" : 22220, + "totalWriteErrors" : 0, + "tempPath" : null, + "partitionPath" : "2018/08/31", + "totalLogRecords" : 0, + "totalLogFilesCompacted" : 0, + "totalLogSizeCompacted" : 0, + "totalUpdatedRecordsCompacted" : 0, + "totalLogBlocks" : 0, + "totalCorruptLogBlock" : 0, + "totalRollbackBlocks" : 0, + "fileSizeInBytes" : 22220, + "minEventTime" : null, + "maxEventTime" : null, + "logVersion" : 1, + "logOffset" : 0, + "baseFile" : "167a0e3e-9b94-444f-a178-242230cdb5a2-0_0-28-26_20211221030120532.parquet", + "logFiles" : [ ".167a0e3e-9b94-444f-a178-242230cdb5a2-0_20211221030120532.log.1_0-28-29" ] + } ] + }, + "compacted" : false, + "extraMetadata" : { + "schema" : "{\"type\":\"record\",\"name\":\"stock_ticks\",\"fields\":[{\"name\":\"volume\",\"type\":\"long\"},{\"name\":\"ts\",\"type\":\"string\"},{\"name\":\"symbol\",\"type\":\"string\"},{\"name\":\"year\",\"type\":\"int\"},{\"name\":\"month\",\"type\":\"string\"},{\"name\":\"high\",\"type\":\"double\"},{\"name\":\"low\",\"type\":\"double\"},{\"name\":\"key\",\"type\":\"string\"},{\"name\":\"date\",\"type\":\"string\"},{\"name\":\"close\",\"type\":\"double\"},{\"name\":\"open\",\"type\":\"double\"},{\"name\":\"day\",\"type\":\"string\"}]}", + "deltastreamer.checkpoint.key" : "stock_ticks,0:3336" + }, + "operationType" : "UPSERT", + "totalRecordsDeleted" : 0, + "totalLogRecordsCompacted" : 0, + "totalLogFilesCompacted" : 0, + "totalCompactedRecordsUpdated" : 0, + "totalLogFilesSize" : 0, + "totalScanTime" : 0, + "totalCreateTime" : 0, + "totalUpsertTime" : 187, + "minAndMaxEventTime" : { + "Optional.empty" : { + "val" : null, + "present" : false + } + }, + "writePartitionPaths" : [ "2018/08/31" ], + "fileIdAndRelativePaths" : { + "167a0e3e-9b94-444f-a178-242230cdb5a2-0" : "2018/08/31/.167a0e3e-9b94-444f-a178-242230cdb5a2-0_20211221030120532.log.1_0-28-29" + } +} \ No newline at end of file diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_data_with_hashing_mor/.hoodie/hoodie.properties b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_data_with_hashing_mor/.hoodie/hoodie.properties new file mode 100644 index 000000000000..dacf89e742c7 --- /dev/null +++ b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_data_with_hashing_mor/.hoodie/hoodie.properties @@ -0,0 +1,21 @@ +#Properties saved on 2023-09-29T16:14:40.244Z +hoodie.table.timeline.timezone=LOCAL +hoodie.table.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator +hoodie.table.precombine.field=ts +hoodie.table.version=5 +hoodie.database.name= +hoodie.datasource.write.hive_style_partitioning=true +hoodie.partition.metafile.use.base.format=false +hoodie.archivelog.folder=archived +hoodie.table.cdc.enabled=false +hoodie.table.name=stock_data_with_hashing_mor +hoodie.compaction.payload.class=org.apache.hudi.common.model.OverwriteWithLatestAvroPayload +hoodie.populate.meta.fields=true +hoodie.compaction.record.merger.strategy=eeb8d96f-b1e4-49fd-bbf8-28ac514178e5 +hoodie.table.type=MERGE_ON_READ +hoodie.datasource.write.partitionpath.urlencode=false +hoodie.table.base.file.format=PARQUET +hoodie.datasource.write.drop.partition.columns=false +hoodie.timeline.layout.version=1 +hoodie.table.partition.fields=date +hoodie.table.recordkey.fields=key diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_data_with_hashing_mor/2023/12/20/.hoodie_partition_metadata b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_data_with_hashing_mor/2023/12/20/.hoodie_partition_metadata new file mode 100644 index 000000000000..340533d6e680 --- /dev/null +++ b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_data_with_hashing_mor/2023/12/20/.hoodie_partition_metadata @@ -0,0 +1,4 @@ +#partition metadata +#Tue Dec 21 03:01:25 UTC 2021 +commitTime=20211221030120532 +partitionDepth=3 diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_data_with_hashing_mor/2023/12/20/43888ee7-073e-4362-ada2-63ecbf99c772-0_110-10883-1182269_20231220160357772.parquet b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_data_with_hashing_mor/2023/12/20/43888ee7-073e-4362-ada2-63ecbf99c772-0_110-10883-1182269_20231220160357772.parquet new file mode 100755 index 000000000000..a95afe29f991 Binary files /dev/null and b/plugin/trino-hudi/src/test/resources/hudi-testing-data/stock_data_with_hashing_mor/2023/12/20/43888ee7-073e-4362-ada2-63ecbf99c772-0_110-10883-1182269_20231220160357772.parquet differ