Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ private Runnable getPreExecuteRunnable() {
@SuppressWarnings("unchecked")
@Test
@Timeout(value = 60)
public void testRecordReading() throws Exception {
public void testRecordReading() {

final List<HoodieRecord> hoodieRecords = dataGen.generateInserts(instantTime, 100);
ArrayList<HoodieRecord> beforeRecord = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,21 @@

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieTableQueryType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -52,14 +55,15 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hudi.common.util.ValidationUtils.checkState;
import static org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE;

/**
* Base implementation of the Hive's {@link FileInputFormat} allowing for reading of Hudi's
Expand Down Expand Up @@ -195,7 +199,8 @@ protected List<FileStatus> listStatusForIncrementalMode(JobConf job,
return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext, tableMetaClient, timeline.get(), fileStatuses, commitsToCheck.get());
}

protected FileStatus createFileStatusUnchecked(FileSlice fileSlice, HiveHoodieTableFileIndex fileIndex, HoodieTableMetaClient metaClient) {
protected FileStatus createFileStatusUnchecked(FileSlice fileSlice, Option<HoodieInstant> latestCompletedInstantOpt,
String tableBasePath, HoodieTableMetaClient metaClient) {
Option<HoodieBaseFile> baseFileOpt = fileSlice.getBaseFile();

if (baseFileOpt.isPresent()) {
Expand Down Expand Up @@ -241,31 +246,86 @@ private List<FileStatus> listStatusForSnapshotMode(JobConf job,
boolean shouldIncludePendingCommits =
HoodieHiveUtils.shouldIncludePendingCommits(job, tableMetaClient.getTableConfig().getTableName());

HiveHoodieTableFileIndex fileIndex =
new HiveHoodieTableFileIndex(
engineContext,
tableMetaClient,
props,
HoodieTableQueryType.SNAPSHOT,
partitionPaths,
queryCommitInstant,
shouldIncludePendingCommits);

Map<String, List<FileSlice>> partitionedFileSlices = fileIndex.listFileSlices();

targetFiles.addAll(
partitionedFileSlices.values()
.stream()
.flatMap(Collection::stream)
.filter(fileSlice -> checkIfValidFileSlice(fileSlice))
.map(fileSlice -> createFileStatusUnchecked(fileSlice, fileIndex, tableMetaClient))
.collect(Collectors.toList())
);
if (HoodieTableMetadataUtil.isFilesPartitionAvailable(tableMetaClient) || conf.getBoolean(ENABLE.key(), ENABLE.defaultValue())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

default value for metadata is false for reader, and true for writer. So, we should use the reader side default here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, defaulting to false may lead poor performance and I think that's why HUDI-5409 got reverted. Maybe we can create a separate parameter and use it?

HiveHoodieTableFileIndex fileIndex =
new HiveHoodieTableFileIndex(
engineContext,
tableMetaClient,
props,
HoodieTableQueryType.SNAPSHOT,
partitionPaths,
queryCommitInstant,
shouldIncludePendingCommits);

Map<String, List<FileSlice>> partitionedFileSlices = fileIndex.listFileSlices();

targetFiles.addAll(
partitionedFileSlices.values()
.stream()
.flatMap(Collection::stream)
.filter(fileSlice -> checkIfValidFileSlice(fileSlice))
.map(fileSlice -> createFileStatusUnchecked(fileSlice, fileIndex.getLatestCompletedInstant(),
fileIndex.getBasePath().toString(), tableMetaClient))
.collect(Collectors.toList())
);
} else {
// If hoodie.metadata.enabled is set to false and the table doesn't have the metadata,
// read the table using fs view cache instead of file index.
// This is because there's no file index in non-metadata table.
String basePath = tableMetaClient.getBasePathV2().toString();
Map<HoodieTableMetaClient, HoodieTableFileSystemView> fsViewCache = new HashMap<>();
HoodieTimeline timeline = getActiveTimeline(tableMetaClient, shouldIncludePendingCommits);
Option<String> queryInstant = queryCommitInstant.or(() -> timeline.lastInstant().map(HoodieInstant::getTimestamp));
validateInstant(timeline, queryInstant);

try {
HoodieTableFileSystemView fsView = fsViewCache.computeIfAbsent(tableMetaClient, hoodieTableMetaClient ->
FileSystemViewManager.createInMemoryFileSystemViewWithTimeline(engineContext, hoodieTableMetaClient,
HoodieInputFormatUtils.buildMetadataConfig(job), timeline));

List<FileSlice> filteredFileSlices = new ArrayList<>();

for (Path p : entry.getValue()) {
String relativePartitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), p);

List<FileSlice> fileSlices = queryInstant.map(
instant -> fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath, instant))
.orElse(fsView.getLatestFileSlices(relativePartitionPath))
.collect(Collectors.toList());

filteredFileSlices.addAll(fileSlices);
}

targetFiles.addAll(
filteredFileSlices.stream()
.filter(fileSlice -> checkIfValidFileSlice(fileSlice))
.map(fileSlice -> createFileStatusUnchecked(fileSlice, timeline.filterCompletedInstants().lastInstant(),
basePath, tableMetaClient))
.collect(Collectors.toList()));
} finally {
fsViewCache.forEach(((metaClient, fsView) -> fsView.close()));
}
}
}

return targetFiles;
}

private static HoodieTimeline getActiveTimeline(HoodieTableMetaClient metaClient, boolean shouldIncludePendingCommits) {
HoodieTimeline timeline = metaClient.getCommitsAndCompactionTimeline();
if (shouldIncludePendingCommits) {
return timeline;
} else {
return timeline.filterCompletedAndCompactionInstants();
}
}

private static void validateInstant(HoodieTimeline activeTimeline, Option<String> queryInstant) {
if (queryInstant.isPresent() && !activeTimeline.containsInstant(queryInstant.get())) {
throw new HoodieIOException(String.format("Query instant (%s) not found in the timeline", queryInstant.get()));
}
}

protected boolean checkIfValidFileSlice(FileSlice fileSlice) {
Option<HoodieBaseFile> baseFileOpt = fileSlice.getBaseFile();
Option<HoodieLogFile> latestLogFileOpt = fileSlice.getLatestLogFile();
Expand All @@ -280,11 +340,6 @@ protected boolean checkIfValidFileSlice(FileSlice fileSlice) {
}
}

private void validate(List<FileStatus> targetFiles, List<FileStatus> legacyFileStatuses) {
List<FileStatus> diff = CollectionUtils.diff(targetFiles, legacyFileStatuses);
checkState(diff.isEmpty(), "Should be empty");
}

@Nonnull
protected static FileStatus getFileStatusUnchecked(HoodieBaseFile baseFile) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.BootstrapBaseFileSplit;
import org.apache.hudi.hadoop.FileStatusWithBootstrapBaseFile;
import org.apache.hudi.hadoop.HiveHoodieTableFileIndex;
import org.apache.hudi.hadoop.HoodieCopyOnWriteTableInputFormat;
import org.apache.hudi.hadoop.LocatedFileStatusWithBootstrapBaseFile;
import org.apache.hudi.hadoop.RealtimeFileStatus;
Expand Down Expand Up @@ -92,14 +91,12 @@ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
}

@Override
protected FileStatus createFileStatusUnchecked(FileSlice fileSlice, HiveHoodieTableFileIndex fileIndex, HoodieTableMetaClient metaClient) {
protected FileStatus createFileStatusUnchecked(FileSlice fileSlice, Option<HoodieInstant> latestCompletedInstantOpt,
String tableBasePath, HoodieTableMetaClient metaClient) {
Option<HoodieBaseFile> baseFileOpt = fileSlice.getBaseFile();
Option<HoodieLogFile> latestLogFileOpt = fileSlice.getLatestLogFile();
Stream<HoodieLogFile> logFiles = fileSlice.getLogFiles();

Option<HoodieInstant> latestCompletedInstantOpt = fileIndex.getLatestCompletedInstant();
String tableBasePath = fileIndex.getBasePath().toString();

// Check if we're reading a MOR table
if (baseFileOpt.isPresent()) {
return createRealtimeFileStatusUnchecked(baseFileOpt.get(), logFiles, tableBasePath, latestCompletedInstantOpt, getHoodieVirtualKeyInfo(metaClient));
Expand Down