Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hudi.common.config.TypedProperties;
Expand Down Expand Up @@ -65,12 +67,32 @@
* <li>Incremental mode: reading table's state as of particular timestamp (or instant, in Hudi's terms)</li>
* <li>External mode: reading non-Hudi partitions</li>
* </ul>
*
* NOTE: This class is invariant of the underlying file-format of the files being read
*/
public abstract class HoodieFileInputFormatBase extends FileInputFormat<NullWritable, ArrayWritable>
implements Configurable {

protected Configuration conf;

@Nonnull
private static RealtimeFileStatus createRealtimeFileStatusUnchecked(HoodieBaseFile baseFile, Stream<HoodieLogFile> logFiles) {
List<HoodieLogFile> sortedLogFiles = logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
FileStatus baseFileStatus = getFileStatusUnchecked(baseFile);
try {
RealtimeFileStatus rtFileStatus = new RealtimeFileStatus(baseFileStatus);
rtFileStatus.setDeltaLogFiles(sortedLogFiles);
rtFileStatus.setBaseFilePath(baseFile.getPath());
if (baseFileStatus instanceof LocatedFileStatusWithBootstrapBaseFile || baseFileStatus instanceof FileStatusWithBootstrapBaseFile) {
rtFileStatus.setBootStrapFileStatus(baseFileStatus);
}

return rtFileStatus;
} catch (IOException e) {
throw new HoodieIOException(String.format("Failed to init %s", RealtimeFileStatus.class.getSimpleName()), e);
}
}

@Override
public final Configuration getConf() {
return conf;
Expand All @@ -81,6 +103,24 @@ public final void setConf(Configuration conf) {
this.conf = conf;
}

protected abstract boolean includeLogFilesForSnapshotView();

@Override
protected boolean isSplitable(FileSystem fs, Path filename) {
return !(filename instanceof PathWithBootstrapFileStatus);
}

@Override
protected FileSplit makeSplit(Path file, long start, long length,
String[] hosts) {
FileSplit split = new FileSplit(file, start, length, hosts);

if (file instanceof PathWithBootstrapFileStatus) {
return makeExternalFileSplit((PathWithBootstrapFileStatus)file, split);
}
return split;
}

@Override
public FileStatus[] listStatus(JobConf job) throws IOException {
// Segregate inputPaths[] to incremental, snapshot and non hoodie paths
Expand Down Expand Up @@ -121,20 +161,6 @@ public FileStatus[] listStatus(JobConf job) throws IOException {
return returns.toArray(new FileStatus[0]);
}

private void validate(List<FileStatus> targetFiles, List<FileStatus> legacyFileStatuses) {
List<FileStatus> diff = CollectionUtils.diff(targetFiles, legacyFileStatuses);
checkState(diff.isEmpty(), "Should be empty");
}

@Nonnull
private static FileStatus getFileStatusUnchecked(HoodieBaseFile baseFile) {
try {
return HoodieInputFormatUtils.getFileStatus(baseFile);
} catch (IOException ioe) {
throw new HoodieIOException("Failed to get file-status", ioe);
}
}

/**
* Abstracts and exposes {@link FileInputFormat#listStatus(JobConf)} operation to subclasses that
* lists files (returning an array of {@link FileStatus}) corresponding to the input paths specified
Expand Down Expand Up @@ -172,35 +198,25 @@ protected List<FileStatus> listStatusForIncrementalMode(JobConf job,
return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext, tableMetaClient, timeline.get(), fileStatuses, commitsToCheck.get());
}

protected abstract boolean includeLogFilesForSnapshotView();
@Override
protected FileSplit makeSplit(Path file, long start, long length,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: put two makeSplit() methods together?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will clean this up in the final PR #4743

String[] hosts, String[] inMemoryHosts) {
FileSplit split = new FileSplit(file, start, length, hosts, inMemoryHosts);
if (file instanceof PathWithBootstrapFileStatus) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you help me understand why don't I see diff FileStatuses here like RealtimeFileStatus for eg, but just PathWithBootstrapFileStatus?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that these methods are moved from HoodieParquetInputFormat. @alexeykudinkin is the logic parquet agnostic and going to be reused by other file formats like HFile? I think if that's not the case, better to keep them in HoodieParquetInputFormat, since this base class is going to be general for different formats.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used only for COW

Copy link
Contributor

@yihua yihua Feb 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alexeykudinkin what about my above question regarding whether the logic should reside in HoodieParquetInputFormat or this class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yihua GH comments are weird, when i was responding to @nsivabalan there were no comments of yours, and then it sandwiched your comment in b/w of those.

My understanding is that these methods are moved from HoodieParquetInputFormat. @alexeykudinkin is the logic parquet agnostic and going to be reused by other file formats like HFile? I think if that's not the case, better to keep them in HoodieParquetInputFormat, since this base class is going to be general for different formats.

Your understanding is correct, all of this logic is file-format agnostic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alexeykudinkin no worries. That sg.

return makeExternalFileSplit((PathWithBootstrapFileStatus)file, split);
}
return split;
}

@Nonnull
private static RealtimeFileStatus createRealtimeFileStatusUnchecked(HoodieBaseFile baseFile,
Stream<HoodieLogFile> logFiles,
Option<HoodieInstant> latestCompletedInstantOpt,
HoodieTableMetaClient tableMetaClient) {
List<HoodieLogFile> sortedLogFiles = logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
FileStatus baseFileStatus = getFileStatusUnchecked(baseFile);
private BootstrapBaseFileSplit makeExternalFileSplit(PathWithBootstrapFileStatus file, FileSplit split) {
try {
RealtimeFileStatus rtFileStatus = new RealtimeFileStatus(baseFileStatus);
rtFileStatus.setDeltaLogFiles(sortedLogFiles);
rtFileStatus.setBaseFilePath(baseFile.getPath());
rtFileStatus.setBasePath(tableMetaClient.getBasePath());

if (latestCompletedInstantOpt.isPresent()) {
HoodieInstant latestCompletedInstant = latestCompletedInstantOpt.get();
checkState(latestCompletedInstant.isCompleted());

rtFileStatus.setMaxCommitTime(latestCompletedInstant.getTimestamp());
}

if (baseFileStatus instanceof LocatedFileStatusWithBootstrapBaseFile || baseFileStatus instanceof FileStatusWithBootstrapBaseFile) {
rtFileStatus.setBootStrapFileStatus(baseFileStatus);
}

return rtFileStatus;
LOG.info("Making external data split for " + file);
FileStatus externalFileStatus = file.getBootstrapFileStatus();
FileSplit externalFileSplit = makeSplit(externalFileStatus.getPath(), 0, externalFileStatus.getLen(),
new String[0], new String[0]);
return new BootstrapBaseFileSplit(split, externalFileSplit);
} catch (IOException e) {
throw new HoodieIOException(String.format("Failed to init %s", RealtimeFileStatus.class.getSimpleName()), e);
throw new HoodieIOException(e.getMessage(), e);
}
}

Expand All @@ -209,38 +225,6 @@ private List<FileStatus> listStatusForSnapshotModeLegacy(JobConf job, Map<String
return HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job, tableMetaClientMap, snapshotPaths, includeLogFilesForSnapshotView());
}

@Nonnull
private static RealtimeFileStatus createRealtimeFileStatusUnchecked(HoodieLogFile latestLogFile,
Stream<HoodieLogFile> logFiles,
Option<HoodieInstant> latestCompletedInstantOpt,
HoodieTableMetaClient tableMetaClient) {
List<HoodieLogFile> sortedLogFiles = logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
try {
RealtimeFileStatus rtFileStatus = new RealtimeFileStatus(latestLogFile.getFileStatus());
rtFileStatus.setDeltaLogFiles(sortedLogFiles);
rtFileStatus.setBasePath(tableMetaClient.getBasePath());

if (latestCompletedInstantOpt.isPresent()) {
HoodieInstant latestCompletedInstant = latestCompletedInstantOpt.get();
checkState(latestCompletedInstant.isCompleted());

rtFileStatus.setMaxCommitTime(latestCompletedInstant.getTimestamp());
}

return rtFileStatus;
} catch (IOException e) {
throw new HoodieIOException(String.format("Failed to init %s", RealtimeFileStatus.class.getSimpleName()), e);
}
}

private static Option<HoodieInstant> fromScala(scala.Option<HoodieInstant> opt) {
if (opt.isDefined()) {
return Option.of(opt.get());
}

return Option.empty();
}

@Nonnull
private List<FileStatus> listStatusForSnapshotMode(JobConf job,
Map<String, HoodieTableMetaClient> tableMetaClientMap,
Expand Down Expand Up @@ -317,4 +301,80 @@ private List<FileStatus> listStatusForSnapshotMode(JobConf job,

return targetFiles;
}

private void validate(List<FileStatus> targetFiles, List<FileStatus> legacyFileStatuses) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the methods below merely moved? Looks like the order changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, GJF rule is applied

(non-static methods)
public
protected
private

(static methods)
public
protected
private

List<FileStatus> diff = CollectionUtils.diff(targetFiles, legacyFileStatuses);
checkState(diff.isEmpty(), "Should be empty");
}

@Nonnull
private static FileStatus getFileStatusUnchecked(HoodieBaseFile baseFile) {
try {
return HoodieInputFormatUtils.getFileStatus(baseFile);
} catch (IOException ioe) {
throw new HoodieIOException("Failed to get file-status", ioe);
}
}

@Nonnull
private static RealtimeFileStatus createRealtimeFileStatusUnchecked(HoodieBaseFile baseFile,
Stream<HoodieLogFile> logFiles,
Option<HoodieInstant> latestCompletedInstantOpt,
HoodieTableMetaClient tableMetaClient) {
List<HoodieLogFile> sortedLogFiles = logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
FileStatus baseFileStatus = getFileStatusUnchecked(baseFile);
try {
RealtimeFileStatus rtFileStatus = new RealtimeFileStatus(baseFileStatus);
rtFileStatus.setDeltaLogFiles(sortedLogFiles);
rtFileStatus.setBaseFilePath(baseFile.getPath());
rtFileStatus.setBasePath(tableMetaClient.getBasePath());

if (latestCompletedInstantOpt.isPresent()) {
HoodieInstant latestCompletedInstant = latestCompletedInstantOpt.get();
checkState(latestCompletedInstant.isCompleted());

rtFileStatus.setMaxCommitTime(latestCompletedInstant.getTimestamp());
}

if (baseFileStatus instanceof LocatedFileStatusWithBootstrapBaseFile || baseFileStatus instanceof FileStatusWithBootstrapBaseFile) {
rtFileStatus.setBootStrapFileStatus(baseFileStatus);
}

return rtFileStatus;
} catch (IOException e) {
throw new HoodieIOException(String.format("Failed to init %s", RealtimeFileStatus.class.getSimpleName()), e);
}
}

@Nonnull
private static RealtimeFileStatus createRealtimeFileStatusUnchecked(HoodieLogFile latestLogFile,
Stream<HoodieLogFile> logFiles,
Option<HoodieInstant> latestCompletedInstantOpt,
HoodieTableMetaClient tableMetaClient) {
List<HoodieLogFile> sortedLogFiles = logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
try {
RealtimeFileStatus rtFileStatus = new RealtimeFileStatus(latestLogFile.getFileStatus());
rtFileStatus.setDeltaLogFiles(sortedLogFiles);
rtFileStatus.setBasePath(tableMetaClient.getBasePath());

if (latestCompletedInstantOpt.isPresent()) {
HoodieInstant latestCompletedInstant = latestCompletedInstantOpt.get();
checkState(latestCompletedInstant.isCompleted());

rtFileStatus.setMaxCommitTime(latestCompletedInstant.getTimestamp());
}

return rtFileStatus;
} catch (IOException e) {
throw new HoodieIOException(String.format("Failed to init %s", RealtimeFileStatus.class.getSimpleName()), e);
}
}

private static Option<HoodieInstant> fromScala(scala.Option<HoodieInstant> opt) {
if (opt.isDefined()) {
return Option.of(opt.get());
}

return Option.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,7 @@

package org.apache.hudi.hadoop;

import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;

import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
Expand All @@ -39,6 +29,9 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand All @@ -62,10 +55,6 @@ public class HoodieParquetInputFormat extends HoodieFileInputFormatBase implemen
// {@code RecordReader}
private final MapredParquetInputFormat mapredParquetInputFormat = new MapredParquetInputFormat();

protected HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline timeline) {
return HoodieInputFormatUtils.filterInstantsTimeline(timeline);
}

protected boolean includeLogFilesForSnapshotView() {
return false;
}
Expand Down Expand Up @@ -96,32 +85,6 @@ public RecordReader<NullWritable, ArrayWritable> getRecordReader(final InputSpli
return getRecordReaderInternal(split, job, reporter);
}

@Override
protected boolean isSplitable(FileSystem fs, Path filename) {
return !(filename instanceof PathWithBootstrapFileStatus);
}

@Override
protected FileSplit makeSplit(Path file, long start, long length,
String[] hosts) {
FileSplit split = new FileSplit(file, start, length, hosts);

if (file instanceof PathWithBootstrapFileStatus) {
return makeExternalFileSplit((PathWithBootstrapFileStatus)file, split);
}
return split;
}

@Override
protected FileSplit makeSplit(Path file, long start, long length,
String[] hosts, String[] inMemoryHosts) {
FileSplit split = new FileSplit(file, start, length, hosts, inMemoryHosts);
if (file instanceof PathWithBootstrapFileStatus) {
return makeExternalFileSplit((PathWithBootstrapFileStatus)file, split);
}
return split;
}

private RecordReader<NullWritable, ArrayWritable> getRecordReaderInternal(InputSplit split,
JobConf job,
Reporter reporter) throws IOException {
Expand Down Expand Up @@ -176,16 +139,4 @@ private RecordReader<NullWritable, ArrayWritable> createBootstrappingRecordReade
true);
}
}

private BootstrapBaseFileSplit makeExternalFileSplit(PathWithBootstrapFileStatus file, FileSplit split) {
try {
LOG.info("Making external data split for " + file);
FileStatus externalFileStatus = file.getBootstrapFileStatus();
FileSplit externalFileSplit = makeSplit(externalFileStatus.getPath(), 0, externalFileStatus.getLen(),
new String[0], new String[0]);
return new BootstrapBaseFileSplit(split, externalFileSplit);
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.hadoop.HoodieFileInputFormatBase;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.realtime.HoodieCombineRealtimeRecordReader;
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
Expand Down Expand Up @@ -876,7 +877,7 @@ protected List<FileStatus> listStatus(JobContext job) throws IOException {
LOG.info("Listing status in HoodieCombineHiveInputFormat.HoodieCombineFileInputFormatShim");
List<FileStatus> result;
if (hoodieFilter) {
HoodieParquetInputFormat input;
HoodieFileInputFormatBase input;
if (isRealTime) {
LOG.info("Using HoodieRealtimeInputFormat");
input = createParquetRealtimeInputFormat();
Expand Down Expand Up @@ -916,7 +917,7 @@ public CombineFileSplit[] getSplits(JobConf job, int numSplits) throws IOExcepti
job.set("hudi.hive.realtime", "true");
InputSplit[] splits;
if (hoodieFilter) {
HoodieParquetInputFormat input = createParquetRealtimeInputFormat();
HoodieParquetRealtimeInputFormat input = createParquetRealtimeInputFormat();
input.setConf(job);
splits = input.getSplits(job, numSplits);
} else {
Expand Down
Loading