diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 48d6b948c413..de82a8247633 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -27,6 +27,7 @@ import org.apache.hudi.client.AbstractHoodieWriteClient; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.ConsistencyGuardConfig; import org.apache.hudi.common.fs.FSUtils; @@ -39,7 +40,6 @@ import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; -import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.WriteConcurrencyMode; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.log.HoodieLogFormat; @@ -51,7 +51,6 @@ import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; -import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.metrics.HoodieMetricsConfig; import org.apache.hudi.config.HoodieWriteConfig; @@ -68,6 +67,8 @@ import org.apache.log4j.Logger; import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -168,7 +169,7 @@ private HoodieWriteConfig createMetadataWriteConfig(HoodieWriteConfig writeConfi .withMaxConsistencyChecks(writeConfig.getConsistencyGuardConfig().getMaxConsistencyChecks()) .build()) .withWriteConcurrencyMode(WriteConcurrencyMode.SINGLE_WRITER) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).withFileListingParallelism(writeConfig.getFileListingParallelism()).build()) .withAutoCommit(true) .withAvroSchemaValidate(true) .withEmbeddedTimelineServerEnabled(false) @@ -379,92 +380,68 @@ private boolean bootstrapFromFilesystem(HoodieEngineContext engineContext, Hoodi // List all partitions in the basePath of the containing dataset LOG.info("Initializing metadata table by using file listings in " + dataWriteConfig.getBasePath()); - Map> partitionToFileStatus = getPartitionsToFilesMapping(dataMetaClient); - - // Create a HoodieCommitMetadata with writeStats for all discovered files - int[] stats = {0}; - HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); - - partitionToFileStatus.forEach((partition, statuses) -> { - // Filter the statuses to only include files which were created before or on createInstantTime - statuses.stream().filter(status -> { - String filename = status.getPath().getName(); - return !HoodieTimeline.compareTimestamps(FSUtils.getCommitTime(filename), HoodieTimeline.GREATER_THAN, - createInstantTime); - }).forEach(status -> { - HoodieWriteStat writeStat = new HoodieWriteStat(); - writeStat.setPath((partition.isEmpty() ? "" : partition + Path.SEPARATOR) + status.getPath().getName()); - writeStat.setPartitionPath(partition); - writeStat.setTotalWriteBytes(status.getLen()); - commitMetadata.addWriteStat(partition, writeStat); - stats[0] += 1; - }); - - // If the partition has no files then create a writeStat with no file path - if (commitMetadata.getWriteStats(partition) == null) { - HoodieWriteStat writeStat = new HoodieWriteStat(); - writeStat.setPartitionPath(partition); - commitMetadata.addWriteStat(partition, writeStat); - } - }); + List dirInfoList = listAllPartitions(dataMetaClient); - LOG.info("Committing " + partitionToFileStatus.size() + " partitions and " + stats[0] + " files to metadata"); - update(commitMetadata, createInstantTime, false); + // During bootstrap, the list of files to be committed can be huge. So creating a HoodieCommitMetadata out of these + // large number of files and calling the existing update(HoodieCommitMetadata) function does not scale well. + // Hence, we have a special commit just for the bootstrap scenario. + bootstrapCommit(dirInfoList, createInstantTime); return true; } /** * Function to find hoodie partitions and list files in them in parallel. * - * @param dataMetaClient + * @param datasetMetaClient data set meta client instance. * @return Map of partition names to a list of FileStatus for all the files in the partition */ - private Map> getPartitionsToFilesMapping(HoodieTableMetaClient dataMetaClient) { + private List listAllPartitions(HoodieTableMetaClient datasetMetaClient) { List pathsToList = new LinkedList<>(); pathsToList.add(new Path(dataWriteConfig.getBasePath())); - Map> partitionToFileStatus = new HashMap<>(); + List partitionsToBootstrap = new LinkedList<>(); final int fileListingParallelism = metadataWriteConfig.getFileListingParallelism(); - SerializableConfiguration conf = new SerializableConfiguration(dataMetaClient.getHadoopConf()); + SerializableConfiguration conf = new SerializableConfiguration(datasetMetaClient.getHadoopConf()); final String dirFilterRegex = dataWriteConfig.getMetadataConfig().getDirectoryFilterRegex(); + final String datasetBasePath = datasetMetaClient.getBasePath(); while (!pathsToList.isEmpty()) { - int listingParallelism = Math.min(fileListingParallelism, pathsToList.size()); + // In each round we will list a section of directories + int numDirsToList = Math.min(fileListingParallelism, pathsToList.size()); // List all directories in parallel - List> dirToFileListing = engineContext.map(pathsToList, path -> { + List processedDirectories = engineContext.map(pathsToList.subList(0, numDirsToList), path -> { FileSystem fs = path.getFileSystem(conf.get()); - return Pair.of(path, fs.listStatus(path)); - }, listingParallelism); - pathsToList.clear(); + String relativeDirPath = FSUtils.getRelativePartitionPath(new Path(datasetBasePath), path); + return new DirectoryInfo(relativeDirPath, fs.listStatus(path)); + }, numDirsToList); + + pathsToList = new LinkedList<>(pathsToList.subList(numDirsToList, pathsToList.size())); // If the listing reveals a directory, add it to queue. If the listing reveals a hoodie partition, add it to // the results. - dirToFileListing.forEach(p -> { - if (!dirFilterRegex.isEmpty() && p.getLeft().getName().matches(dirFilterRegex)) { - LOG.info("Ignoring directory " + p.getLeft() + " which matches the filter regex " + dirFilterRegex); - return; + for (DirectoryInfo dirInfo : processedDirectories) { + if (!dirFilterRegex.isEmpty()) { + final String relativePath = dirInfo.getRelativePath(); + if (!relativePath.isEmpty()) { + Path partitionPath = new Path(datasetBasePath, relativePath); + if (partitionPath.getName().matches(dirFilterRegex)) { + LOG.info("Ignoring directory " + partitionPath + " which matches the filter regex " + dirFilterRegex); + continue; + } + } } - List filesInDir = Arrays.stream(p.getRight()).parallel() - .filter(fs -> !fs.getPath().getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)) - .collect(Collectors.toList()); - - if (p.getRight().length > filesInDir.size()) { - String partitionName = FSUtils.getRelativePartitionPath(new Path(dataMetaClient.getBasePath()), p.getLeft()); - // deal with Non-partition table, we should exclude .hoodie - partitionToFileStatus.put(partitionName, filesInDir.stream() - .filter(f -> !f.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)).collect(Collectors.toList())); + if (dirInfo.isHoodiePartition()) { + // Add to result + partitionsToBootstrap.add(dirInfo); } else { // Add sub-dirs to the queue - pathsToList.addAll(Arrays.stream(p.getRight()) - .filter(fs -> fs.isDirectory() && !fs.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) - .map(fs -> fs.getPath()) - .collect(Collectors.toList())); + pathsToList.addAll(dirInfo.getSubDirectories()); } - }); + } } - return partitionToFileStatus; + return partitionsToBootstrap; } /** @@ -528,7 +505,7 @@ private interface ConvertMetadataFunction { private void processAndCommit(String instantTime, ConvertMetadataFunction convertMetadataFunction, boolean canTriggerTableService) { if (enabled && metadata != null) { List records = convertMetadataFunction.convertMetadata(); - commit(records, MetadataPartitionType.FILES.partitionPath(), instantTime, canTriggerTableService); + commit(engineContext.parallelize(records, 1), MetadataPartitionType.FILES.partitionPath(), instantTime, canTriggerTableService); } } @@ -590,7 +567,7 @@ public void update(HoodieRollbackMetadata rollbackMetadata, String instantTime) List records = HoodieTableMetadataUtil.convertMetadataToRecords(metadataMetaClient.getActiveTimeline(), rollbackMetadata, instantTime, metadata.getSyncedInstantTime(), wasSynced); - commit(records, MetadataPartitionType.FILES.partitionPath(), instantTime, false); + commit(engineContext.parallelize(records, 1), MetadataPartitionType.FILES.partitionPath(), instantTime, false); } } @@ -603,12 +580,12 @@ public void close() throws Exception { /** * Commit the {@code HoodieRecord}s to Metadata Table as a new delta-commit. - * @param records The list of records to be written. + * @param records The HoodieData of records to be written. * @param partitionName The partition to which the records are to be written. * @param instantTime The timestamp to use for the deltacommit. * @param canTriggerTableService true if table services can be scheduled and executed. false otherwise. */ - protected abstract void commit(List records, String partitionName, String instantTime, boolean canTriggerTableService); + protected abstract void commit(HoodieData records, String partitionName, String instantTime, boolean canTriggerTableService); /** * Perform a compaction on the Metadata Table. @@ -647,4 +624,96 @@ protected void doClean(AbstractHoodieWriteClient writeClient, String instantTime // metadata table. writeClient.clean(instantTime + "002"); } + + /** + * This is invoked to bootstrap metadata table for a dataset. Bootstrap Commit has special handling mechanism due to its scale compared to + * other regular commits. + * + */ + protected void bootstrapCommit(List partitionInfoList, String createInstantTime) { + List partitions = partitionInfoList.stream().map(p -> p.getRelativePath()).collect(Collectors.toList()); + final int totalFiles = partitionInfoList.stream().mapToInt(p -> p.getTotalFiles()).sum(); + + // Record which saves the list of all partitions + HoodieRecord allPartitionRecord = HoodieMetadataPayload.createPartitionListRecord(partitions); + if (partitions.isEmpty()) { + // in case of boostrapping of a fresh table, there won't be any partitions, but we need to make a boostrap commit + commit(engineContext.parallelize(Collections.singletonList(allPartitionRecord), 1), MetadataPartitionType.FILES.partitionPath(), createInstantTime, false); + return; + } + HoodieData partitionRecords = engineContext.parallelize(Arrays.asList(allPartitionRecord), 1); + if (!partitionInfoList.isEmpty()) { + HoodieData fileListRecords = engineContext.parallelize(partitionInfoList, partitionInfoList.size()).map(partitionInfo -> { + // Record which saves files within a partition + return HoodieMetadataPayload.createPartitionFilesRecord( + partitionInfo.getRelativePath(), Option.of(partitionInfo.getFileNameToSizeMap()), Option.empty()); + }); + partitionRecords = partitionRecords.union(fileListRecords); + } + + LOG.info("Committing " + partitions.size() + " partitions and " + totalFiles + " files to metadata"); + ValidationUtils.checkState(partitionRecords.count() == (partitions.size() + 1)); + commit(partitionRecords, MetadataPartitionType.FILES.partitionPath(), createInstantTime, false); + } + + /** + * A class which represents a directory and the files and directories inside it. + * + * A {@code PartitionFileInfo} object saves the name of the partition and various properties requires of each file + * required for bootstrapping the metadata table. Saving limited properties reduces the total memory footprint when + * a very large number of files are present in the dataset being bootstrapped. + */ + static class DirectoryInfo implements Serializable { + // Relative path of the directory (relative to the base directory) + private final String relativePath; + // Map of filenames within this partition to their respective sizes + private HashMap filenameToSizeMap; + // List of directories within this partition + private final List subDirectories = new ArrayList<>(); + // Is this a hoodie partition + private boolean isHoodiePartition = false; + + public DirectoryInfo(String relativePath, FileStatus[] fileStatus) { + this.relativePath = relativePath; + + // Pre-allocate with the maximum length possible + filenameToSizeMap = new HashMap<>(fileStatus.length); + + for (FileStatus status : fileStatus) { + if (status.isDirectory()) { + // Ignore .hoodie directory as there cannot be any partitions inside it + if (!status.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) { + this.subDirectories.add(status.getPath()); + } + } else if (status.getPath().getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)) { + // Presence of partition meta file implies this is a HUDI partition + this.isHoodiePartition = true; + } else if (FSUtils.isDataFile(status.getPath())) { + // Regular HUDI data file (base file or log file) + filenameToSizeMap.put(status.getPath().getName(), status.getLen()); + } + } + } + + String getRelativePath() { + return relativePath; + } + + int getTotalFiles() { + return filenameToSizeMap.size(); + } + + boolean isHoodiePartition() { + return isHoodiePartition; + } + + List getSubDirectories() { + return subDirectories; + } + + // Returns a map of filenames mapped to their lengths + Map getFileNameToSizeMap() { + return filenameToSizeMap; + } + } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java index 8254d0b88461..85630f72b2c6 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java @@ -21,6 +21,7 @@ import org.apache.avro.specific.SpecificRecordBase; import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.metrics.Registry; import org.apache.hudi.common.model.FileSlice; @@ -90,8 +91,9 @@ protected void initialize(HoodieEngineContext eng } @Override - protected void commit(List records, String partitionName, String instantTime, boolean canTriggerTableService) { + protected void commit(HoodieData hoodieDataRecords, String partitionName, String instantTime, boolean canTriggerTableService) { ValidationUtils.checkState(enabled, "Metadata table cannot be committed to as it is not enabled"); + List records = (List) hoodieDataRecords.get(); List recordList = prepRecords(records, partitionName, 1); try (HoodieFlinkWriteClient writeClient = new HoodieFlinkWriteClient(engineContext, metadataWriteConfig)) { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java index ceaee4728dee..d4eb25963e5b 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java @@ -130,6 +130,11 @@ public HoodieData distinct() { return HoodieJavaRDD.of(rddData.distinct()); } + @Override + public HoodieData union(HoodieData other) { + return HoodieJavaRDD.of(rddData.union((JavaRDD) other.get())); + } + @Override public List collectAsList() { return rddData.collect(); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java index 95ab7dc79a20..fc460e58d9a1 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java @@ -22,6 +22,7 @@ import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.metrics.Registry; import org.apache.hudi.common.model.FileSlice; @@ -39,7 +40,6 @@ import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; import java.io.IOException; import java.util.List; @@ -102,9 +102,9 @@ protected void initialize(HoodieEngineContext eng } } - @Override - protected void commit(List records, String partitionName, String instantTime, boolean canTriggerTableService) { + protected void commit(HoodieData hoodieDataRecords, String partitionName, String instantTime, boolean canTriggerTableService) { ValidationUtils.checkState(enabled, "Metadata table cannot be committed to as it is not enabled"); + JavaRDD records = (JavaRDD) hoodieDataRecords.get(); JavaRDD recordRDD = prepRecords(records, partitionName, 1); try (SparkRDDWriteClient writeClient = new SparkRDDWriteClient(engineContext, metadataWriteConfig, true)) { @@ -147,12 +147,11 @@ protected void commit(List records, String partitionName, String i * * The record is tagged with respective file slice's location based on its record key. */ - private JavaRDD prepRecords(List records, String partitionName, int numFileGroups) { + private JavaRDD prepRecords(JavaRDD recordsRDD, String partitionName, int numFileGroups) { List fileSlices = HoodieTableMetadataUtil.loadPartitionFileGroupsWithLatestFileSlices(metadataMetaClient, partitionName); ValidationUtils.checkArgument(fileSlices.size() == numFileGroups, String.format("Invalid number of file groups: found=%d, required=%d", fileSlices.size(), numFileGroups)); - JavaSparkContext jsc = ((HoodieSparkEngineContext) engineContext).getJavaSparkContext(); - return jsc.parallelize(records, 1).map(r -> { + return recordsRDD.map(r -> { FileSlice slice = fileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(r.getRecordKey(), numFileGroups)); r.setCurrentLocation(new HoodieRecordLocation(slice.getBaseInstantTime(), slice.getFileId())); return r; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java index 7ea7e0d649f3..093fd439db09 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java @@ -97,6 +97,13 @@ public abstract HoodieData mapPartitions( */ public abstract HoodieData distinct(); + /** + * Unions this {@link HoodieData} with other {@link HoodieData}. + * @param other {@link HoodieData} of interest. + * @return the union of two as as instance of {@link HoodieData}. + */ + public abstract HoodieData union(HoodieData other); + /** * @return collected results in {@link List}. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieList.java b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieList.java index 6c23fdff2216..94416192abfb 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieList.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieList.java @@ -132,6 +132,14 @@ public HoodieData distinct() { return HoodieList.of(new ArrayList<>(new HashSet<>(listData))); } + @Override + public HoodieData union(HoodieData other) { + List unionResult = new ArrayList<>(); + unionResult.addAll(listData); + unionResult.addAll(other.collectAsList()); + return HoodieList.of(unionResult); + } + @Override public List collectAsList() { return listData; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java index 8273ca7f35f8..dc4df23a4c3d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java @@ -239,7 +239,7 @@ public static List getAllFoldersWithPartitionMetaFile(FileSystem fs, Str /** * Recursively processes all files in the base-path. If excludeMetaFolder is set, the meta-folder and all its subdirs * are skipped - * + * * @param fs File System * @param basePathStr Base-Path * @param consumer Callback for processing @@ -431,17 +431,29 @@ public static int getFileVersionFromLog(Path logPath) { public static String makeLogFileName(String fileId, String logFileExtension, String baseCommitTime, int version, String writeToken) { - String suffix = - (writeToken == null) ? String.format("%s_%s%s.%d", fileId, baseCommitTime, logFileExtension, version) - : String.format("%s_%s%s.%d_%s", fileId, baseCommitTime, logFileExtension, version, writeToken); + String suffix = (writeToken == null) + ? String.format("%s_%s%s.%d", fileId, baseCommitTime, logFileExtension, version) + : String.format("%s_%s%s.%d_%s", fileId, baseCommitTime, logFileExtension, version, writeToken); return LOG_FILE_PREFIX + suffix; } + public static boolean isBaseFile(Path path) { + String extension = getFileExtension(path.getName()); + return HoodieFileFormat.BASE_FILE_EXTENSIONS.contains(extension); + } + public static boolean isLogFile(Path logPath) { Matcher matcher = LOG_FILE_PATTERN.matcher(logPath.getName()); return matcher.find() && logPath.getName().contains(".log"); } + /** + * Returns true if the given path is a Base file or a Log file. + */ + public static boolean isDataFile(Path path) { + return isBaseFile(path) || isLogFile(path); + } + /** * Get the names of all the base and log files in the given partition path. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java index f7fdcd01d500..326391035e61 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java @@ -18,6 +18,11 @@ package org.apache.hudi.common.model; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; + /** * Hoodie file format. */ @@ -27,6 +32,11 @@ public enum HoodieFileFormat { HFILE(".hfile"), ORC(".orc"); + public static final Set BASE_FILE_EXTENSIONS = Arrays.stream(HoodieFileFormat.values()) + .map(HoodieFileFormat::getFileExtension) + .filter(x -> !x.equals(HoodieFileFormat.HOODIE_LOG.getFileExtension())) + .collect(Collectors.toCollection(HashSet::new)); + private final String extension; HoodieFileFormat(String extension) {