Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
Expand Down Expand Up @@ -97,13 +96,10 @@ public FileStatus[] listStatus(JobConf job) throws IOException {
// process snapshot queries next.
List<Path> snapshotPaths = inputPathHandler.getSnapshotPaths();
if (snapshotPaths.size() > 0) {
setInputPaths(job, snapshotPaths.toArray(new Path[snapshotPaths.size()]));
FileStatus[] fileStatuses = super.listStatus(job);
Map<HoodieTableMetaClient, List<FileStatus>> groupedFileStatus =
HoodieInputFormatUtils.groupFileStatusForSnapshotPaths(fileStatuses, HoodieFileFormat.HFILE.getFileExtension(),
tableMetaClientMap.values());
LOG.info("Found a total of " + groupedFileStatus.size() + " groups");
for (Map.Entry<HoodieTableMetaClient, List<FileStatus>> entry : groupedFileStatus.entrySet()) {
Map<HoodieTableMetaClient, List<Path>> groupedPaths =
HoodieInputFormatUtils.groupSnapshotPathsByMetaClient(tableMetaClientMap.values(), snapshotPaths);
LOG.info("Found a total of " + groupedPaths.size() + " groups");
for (Map.Entry<HoodieTableMetaClient, List<Path>> entry : groupedPaths.entrySet()) {
List<FileStatus> result = HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job, entry.getKey(), entry.getValue());
if (result != null) {
returns.addAll(result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.hudi.hadoop;

import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
Expand Down Expand Up @@ -109,13 +108,11 @@ public FileStatus[] listStatus(JobConf job) throws IOException {
// process snapshot queries next.
List<Path> snapshotPaths = inputPathHandler.getSnapshotPaths();
if (snapshotPaths.size() > 0) {
setInputPaths(job, snapshotPaths.toArray(new Path[snapshotPaths.size()]));
FileStatus[] fileStatuses = super.listStatus(job);
Map<HoodieTableMetaClient, List<FileStatus>> groupedFileStatus =
HoodieInputFormatUtils.groupFileStatusForSnapshotPaths(fileStatuses,
HoodieFileFormat.PARQUET.getFileExtension(), tableMetaClientMap.values());
LOG.info("Found a total of " + groupedFileStatus.size() + " groups");
for (Map.Entry<HoodieTableMetaClient, List<FileStatus>> entry : groupedFileStatus.entrySet()) {
Map<HoodieTableMetaClient, List<Path>> groupedPaths =
HoodieInputFormatUtils.groupSnapshotPathsByMetaClient(tableMetaClientMap.values(), snapshotPaths);
LOG.info("Found a total of " + groupedPaths.size() + " groups");
for (Map.Entry<HoodieTableMetaClient, List<Path>> entry : groupedPaths.entrySet()) {
HoodieTableMetaClient metaClient = entry.getKey();
List<FileStatus> result = HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job, entry.getKey(), entry.getValue());
if (result != null) {
returns.addAll(result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.util.Option;
Expand Down Expand Up @@ -62,6 +63,11 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_ENABLE_PROP;
import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS;
import static org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_VALIDATE_PROP;
import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE;

public class HoodieInputFormatUtils {

// These positions have to be deterministic across all tables
Expand Down Expand Up @@ -391,27 +397,48 @@ public static Map<HoodieTableMetaClient, List<FileStatus>> groupFileStatusForSna
return grouped;
}

public static Map<HoodieTableMetaClient, List<Path>> groupSnapshotPathsByMetaClient(
Collection<HoodieTableMetaClient> metaClientList,
List<Path> snapshotPaths
) {
Map<HoodieTableMetaClient, List<Path>> grouped = new HashMap<>();
metaClientList.forEach(metaClient -> grouped.put(metaClient, new ArrayList<>()));
for (Path path : snapshotPaths) {
// Find meta client associated with the input path
metaClientList.stream().filter(metaClient -> path.toString().contains(metaClient.getBasePath()))
.forEach(metaClient -> grouped.get(metaClient).add(path));
}
return grouped;
}

/**
* Filters data files for a snapshot queried table.
* Filters data files under @param paths for a snapshot queried table.
* @param job
* @param metadata
* @param fileStatuses
* @param metaClient
* @param paths
* @return
*/
public static List<FileStatus> filterFileStatusForSnapshotMode(
JobConf job, HoodieTableMetaClient metadata, List<FileStatus> fileStatuses) throws IOException {
FileStatus[] statuses = fileStatuses.toArray(new FileStatus[0]);
JobConf job, HoodieTableMetaClient metaClient, List<Path> paths) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Hoodie Metadata initialized with completed commit Ts as :" + metadata);
LOG.debug("Hoodie Metadata initialized with completed commit Ts as :" + metaClient);
}
// Get all commits, delta commits, compactions, as all of them produce a base parquet file today
HoodieTimeline timeline = metadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
TableFileSystemView.BaseFileOnlyView roView = new HoodieTableFileSystemView(metadata, timeline, statuses);
// filter files on the latest commit found
List<HoodieBaseFile> filteredFiles = roView.getLatestBaseFiles().collect(Collectors.toList());
LOG.info("Total paths to process after hoodie filter " + filteredFiles.size());

boolean useFileListingFromMetadata = job.getBoolean(METADATA_ENABLE_PROP, DEFAULT_METADATA_ENABLE_FOR_READERS);
boolean verifyFileListing = job.getBoolean(METADATA_VALIDATE_PROP, DEFAULT_METADATA_VALIDATE);
HoodieTableFileSystemView fsView = FileSystemViewManager.createInMemoryFileSystemView(metaClient,
useFileListingFromMetadata, verifyFileListing);

List<HoodieBaseFile> filteredBaseFiles = new ArrayList<>();
for (Path p : paths) {
String relativePartitionPath = FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), p);
List<HoodieBaseFile> matched = fsView.getLatestBaseFiles(relativePartitionPath).collect(Collectors.toList());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note to self: doing this by path is ok, since the FileSystemView internally caches per partition.

filteredBaseFiles.addAll(matched);
}

LOG.info("Total paths to process after hoodie filter " + filteredBaseFiles.size());
List<FileStatus> returns = new ArrayList<>();
for (HoodieBaseFile filteredFile : filteredFiles) {
for (HoodieBaseFile filteredFile : filteredBaseFiles) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm. slightly orthogonal, but the HoodieBaseFile itself should hand us a FileStatus object right? we should probably rethink the need for refreshing file status.

if (LOG.isDebugEnabled()) {
LOG.debug("Processing latest hoodie file - " + filteredFile.getPath());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
Expand Down Expand Up @@ -53,6 +54,11 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_ENABLE_PROP;
import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS;
import static org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_VALIDATE_PROP;
import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE;

public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {

private static final Logger LOG = LogManager.getLogger(HoodieRealtimeInputFormatUtils.class);
Expand All @@ -63,13 +69,25 @@ public static InputSplit[] getRealtimeSplits(Configuration conf, Stream<FileSpli
// TODO(vc): Should we handle also non-hoodie splits here?
Map<Path, HoodieTableMetaClient> partitionsToMetaClient = getTableMetaClientByBasePath(conf, partitionsToParquetSplits.keySet());

boolean useFileListingFromMetadata = conf.getBoolean(METADATA_ENABLE_PROP, DEFAULT_METADATA_ENABLE_FOR_READERS);
boolean verifyFileListing = conf.getBoolean(METADATA_VALIDATE_PROP, DEFAULT_METADATA_VALIDATE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These 2 config options is every where, can we just pass the JobConf to the tool method FileSystemViewManager.createInMemoryFileSystemView and fetch them inside the method ? That would make the invocation more clean, IMO.

// Create file system cache so metadata table is only instantiated once. Also can benefit normal file listing if
// partition path is listed twice so file groups will already be loaded in file system
Map<HoodieTableMetaClient, HoodieTableFileSystemView> fsCache = new HashMap<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense

// for all unique split parents, obtain all delta files based on delta commit timeline,
// grouped on file id
List<InputSplit> rtSplits = new ArrayList<>();
partitionsToParquetSplits.keySet().forEach(partitionPath -> {
// for each partition path obtain the data & log file groupings, then map back to inputsplits
HoodieTableMetaClient metaClient = partitionsToMetaClient.get(partitionPath);
HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline());
if (!fsCache.containsKey(metaClient)) {

HoodieTableFileSystemView fsView = FileSystemViewManager.createInMemoryFileSystemView(metaClient,
useFileListingFromMetadata, verifyFileListing);
fsCache.put(metaClient, fsView);
}
HoodieTableFileSystemView fsView = fsCache.get(metaClient);

String relPartitionPath = FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), partitionPath);

try {
Expand Down