diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java index 759e0f1a3e434..d05b95dfdb495 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java @@ -31,12 +31,11 @@ import org.apache.hadoop.fs.Path; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.stream.Collectors; public class FileSystemBackedTableMetadata implements HoodieTableMetadata { @@ -64,45 +63,41 @@ public FileStatus[] getAllFilesInPartition(Path partitionPath) throws IOExceptio @Override public List getAllPartitionPaths() throws IOException { + FileSystem fs = new Path(datasetBasePath).getFileSystem(hadoopConf.get()); if (assumeDatePartitioning) { - FileSystem fs = new Path(datasetBasePath).getFileSystem(hadoopConf.get()); return FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, datasetBasePath); } - List pathsToList = new LinkedList<>(); + List pathsToList = new CopyOnWriteArrayList<>(); pathsToList.add(new Path(datasetBasePath)); - List partitionPaths = new ArrayList<>(); + List partitionPaths = new CopyOnWriteArrayList<>(); while (!pathsToList.isEmpty()) { // TODO: Get the parallelism from HoodieWriteConfig int listingParallelism = Math.min(DEFAULT_LISTING_PARALLELISM, pathsToList.size()); // List all directories in parallel - List> dirToFileListing = engineContext.map(pathsToList, path -> { + List dirToFileListing = engineContext.map(pathsToList, path -> { FileSystem fileSystem = path.getFileSystem(hadoopConf.get()); - return Pair.of(path, fileSystem.listStatus(path)); + return fileSystem.listStatus(path); }, listingParallelism); pathsToList.clear(); - // If the listing reveals a directory, add it to queue. If the listing reveals a hoodie partition, add it to - // the results. - dirToFileListing.forEach(p -> { - Option partitionMetaFile = Option.fromJavaOptional(Arrays.stream(p.getRight()).parallel() - .filter(fs -> fs.getPath().getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)) - .findFirst()); - - if (partitionMetaFile.isPresent()) { - // Is a partition. - String partitionName = FSUtils.getRelativePartitionPath(new Path(datasetBasePath), p.getLeft()); - partitionPaths.add(partitionName); - } else { - // Add sub-dirs to the queue - pathsToList.addAll(Arrays.stream(p.getRight()) - .filter(fs -> fs.isDirectory() && !fs.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) - .map(fs -> fs.getPath()) - .collect(Collectors.toList())); - } - }); + // if current dictionary contains PartitionMetadata, add it to result + // if current dictionary does not contain PartitionMetadata, add it to queue + dirToFileListing.stream().flatMap(Arrays::stream).parallel() + .forEach(fileStatus -> { + if (fileStatus.isDirectory()) { + if (HoodiePartitionMetadata.hasPartitionMetadata(fs, fileStatus.getPath())) { + partitionPaths.add(FSUtils.getRelativePartitionPath(new Path(datasetBasePath), fileStatus.getPath())); + } else if (!fileStatus.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) { + pathsToList.add(fileStatus.getPath()); + } + } else if (fileStatus.getPath().getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)) { + String partitionName = FSUtils.getRelativePartitionPath(new Path(datasetBasePath), fileStatus.getPath().getParent()); + partitionPaths.add(partitionName); + } + }); } return partitionPaths; }