From 0d0ad7405b7d170684ddb76725a9a75aa4061585 Mon Sep 17 00:00:00 2001 From: Ryan Pifer Date: Mon, 14 Dec 2020 21:58:51 -0800 Subject: [PATCH] [RFC-15] Support for metadata listing for snapshot queries through Hive/SparkSQL --- .../hudi/hadoop/HoodieHFileInputFormat.java | 12 ++--- .../hudi/hadoop/HoodieParquetInputFormat.java | 13 ++--- .../hadoop/utils/HoodieInputFormatUtils.java | 53 ++++++++++++++----- .../utils/HoodieRealtimeInputFormatUtils.java | 20 ++++++- 4 files changed, 68 insertions(+), 30 deletions(-) diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileInputFormat.java index 1747888b15ad7..048402af1a434 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileInputFormat.java @@ -31,7 +31,6 @@ import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.Job; -import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -97,13 +96,10 @@ public FileStatus[] listStatus(JobConf job) throws IOException { // process snapshot queries next. List snapshotPaths = inputPathHandler.getSnapshotPaths(); if (snapshotPaths.size() > 0) { - setInputPaths(job, snapshotPaths.toArray(new Path[snapshotPaths.size()])); - FileStatus[] fileStatuses = super.listStatus(job); - Map> groupedFileStatus = - HoodieInputFormatUtils.groupFileStatusForSnapshotPaths(fileStatuses, HoodieFileFormat.HFILE.getFileExtension(), - tableMetaClientMap.values()); - LOG.info("Found a total of " + groupedFileStatus.size() + " groups"); - for (Map.Entry> entry : groupedFileStatus.entrySet()) { + Map> groupedPaths = + HoodieInputFormatUtils.groupSnapshotPathsByMetaClient(tableMetaClientMap.values(), snapshotPaths); + LOG.info("Found a total of " + groupedPaths.size() + " groups"); + for (Map.Entry> entry : groupedPaths.entrySet()) { List result = HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job, entry.getKey(), entry.getValue()); if (result != null) { returns.addAll(result); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java index 8b89949e803b6..d51aff0c0ef24 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java @@ -18,7 +18,6 @@ package org.apache.hudi.hadoop; -import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline; @@ -109,13 +108,11 @@ public FileStatus[] listStatus(JobConf job) throws IOException { // process snapshot queries next. List snapshotPaths = inputPathHandler.getSnapshotPaths(); if (snapshotPaths.size() > 0) { - setInputPaths(job, snapshotPaths.toArray(new Path[snapshotPaths.size()])); - FileStatus[] fileStatuses = super.listStatus(job); - Map> groupedFileStatus = - HoodieInputFormatUtils.groupFileStatusForSnapshotPaths(fileStatuses, - HoodieFileFormat.PARQUET.getFileExtension(), tableMetaClientMap.values()); - LOG.info("Found a total of " + groupedFileStatus.size() + " groups"); - for (Map.Entry> entry : groupedFileStatus.entrySet()) { + Map> groupedPaths = + HoodieInputFormatUtils.groupSnapshotPathsByMetaClient(tableMetaClientMap.values(), snapshotPaths); + LOG.info("Found a total of " + groupedPaths.size() + " groups"); + for (Map.Entry> entry : groupedPaths.entrySet()) { + HoodieTableMetaClient metaClient = entry.getKey(); List result = HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job, entry.getKey(), entry.getValue()); if (result != null) { returns.addAll(result); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java index b36885106cce0..c10fdad98f548 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.FileSystemViewManager; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.table.view.TableFileSystemView; import org.apache.hudi.common.util.Option; @@ -62,6 +63,11 @@ import java.util.function.Function; import java.util.stream.Collectors; +import static org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_ENABLE_PROP; +import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS; +import static org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_VALIDATE_PROP; +import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE; + public class HoodieInputFormatUtils { // These positions have to be deterministic across all tables @@ -391,27 +397,48 @@ public static Map> groupFileStatusForSna return grouped; } + public static Map> groupSnapshotPathsByMetaClient( + Collection metaClientList, + List snapshotPaths + ) { + Map> grouped = new HashMap<>(); + metaClientList.forEach(metaClient -> grouped.put(metaClient, new ArrayList<>())); + for (Path path : snapshotPaths) { + // Find meta client associated with the input path + metaClientList.stream().filter(metaClient -> path.toString().contains(metaClient.getBasePath())) + .forEach(metaClient -> grouped.get(metaClient).add(path)); + } + return grouped; + } + /** - * Filters data files for a snapshot queried table. + * Filters data files under @param paths for a snapshot queried table. * @param job - * @param metadata - * @param fileStatuses + * @param metaClient + * @param paths * @return */ public static List filterFileStatusForSnapshotMode( - JobConf job, HoodieTableMetaClient metadata, List fileStatuses) throws IOException { - FileStatus[] statuses = fileStatuses.toArray(new FileStatus[0]); + JobConf job, HoodieTableMetaClient metaClient, List paths) throws IOException { if (LOG.isDebugEnabled()) { - LOG.debug("Hoodie Metadata initialized with completed commit Ts as :" + metadata); + LOG.debug("Hoodie Metadata initialized with completed commit Ts as :" + metaClient); } - // Get all commits, delta commits, compactions, as all of them produce a base parquet file today - HoodieTimeline timeline = metadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); - TableFileSystemView.BaseFileOnlyView roView = new HoodieTableFileSystemView(metadata, timeline, statuses); - // filter files on the latest commit found - List filteredFiles = roView.getLatestBaseFiles().collect(Collectors.toList()); - LOG.info("Total paths to process after hoodie filter " + filteredFiles.size()); + + boolean useFileListingFromMetadata = job.getBoolean(METADATA_ENABLE_PROP, DEFAULT_METADATA_ENABLE_FOR_READERS); + boolean verifyFileListing = job.getBoolean(METADATA_VALIDATE_PROP, DEFAULT_METADATA_VALIDATE); + HoodieTableFileSystemView fsView = FileSystemViewManager.createInMemoryFileSystemView(metaClient, + useFileListingFromMetadata, verifyFileListing); + + List filteredBaseFiles = new ArrayList<>(); + for (Path p : paths) { + String relativePartitionPath = FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), p); + List matched = fsView.getLatestBaseFiles(relativePartitionPath).collect(Collectors.toList()); + filteredBaseFiles.addAll(matched); + } + + LOG.info("Total paths to process after hoodie filter " + filteredBaseFiles.size()); List returns = new ArrayList<>(); - for (HoodieBaseFile filteredFile : filteredFiles) { + for (HoodieBaseFile filteredFile : filteredBaseFiles) { if (LOG.isDebugEnabled()) { LOG.debug("Processing latest hoodie file - " + filteredFile.getPath()); } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java index 760dd961be65b..f14bc40faec10 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java @@ -26,6 +26,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.FileSystemViewManager; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; @@ -53,6 +54,11 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_ENABLE_PROP; +import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS; +import static org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_VALIDATE_PROP; +import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE; + public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils { private static final Logger LOG = LogManager.getLogger(HoodieRealtimeInputFormatUtils.class); @@ -63,13 +69,25 @@ public static InputSplit[] getRealtimeSplits(Configuration conf, Stream partitionsToMetaClient = getTableMetaClientByBasePath(conf, partitionsToParquetSplits.keySet()); + boolean useFileListingFromMetadata = conf.getBoolean(METADATA_ENABLE_PROP, DEFAULT_METADATA_ENABLE_FOR_READERS); + boolean verifyFileListing = conf.getBoolean(METADATA_VALIDATE_PROP, DEFAULT_METADATA_VALIDATE); + // Create file system cache so metadata table is only instantiated once. Also can benefit normal file listing if + // partition path is listed twice so file groups will already be loaded in file system + Map fsCache = new HashMap<>(); // for all unique split parents, obtain all delta files based on delta commit timeline, // grouped on file id List rtSplits = new ArrayList<>(); partitionsToParquetSplits.keySet().forEach(partitionPath -> { // for each partition path obtain the data & log file groupings, then map back to inputsplits HoodieTableMetaClient metaClient = partitionsToMetaClient.get(partitionPath); - HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline()); + if (!fsCache.containsKey(metaClient)) { + + HoodieTableFileSystemView fsView = FileSystemViewManager.createInMemoryFileSystemView(metaClient, + useFileListingFromMetadata, verifyFileListing); + fsCache.put(metaClient, fsView); + } + HoodieTableFileSystemView fsView = fsCache.get(metaClient); + String relPartitionPath = FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), partitionPath); try {