diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java index 140e7ff5b6330..ed9e16c8d0a7c 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java @@ -18,42 +18,38 @@ package org.apache.hudi.hadoop; -import org.apache.avro.Schema; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.ArrayWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.FileSplit; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapreduce.Job; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieTableQueryType; -import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.StringUtils; -import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.hadoop.realtime.HoodieVirtualKeyInfo; import org.apache.hudi.hadoop.utils.HoodieHiveUtils; import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.Job; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import javax.annotation.Nonnull; + import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.ArrayList; @@ -190,7 +186,7 @@ protected List listStatusForIncrementalMode(JobConf job, return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext, tableMetaClient, timeline.get(), fileStatuses, commitsToCheck.get()); } - protected FileStatus createFileStatusUnchecked(FileSlice fileSlice, HiveHoodieTableFileIndex fileIndex, Option virtualKeyInfoOpt) { + protected FileStatus createFileStatusUnchecked(FileSlice fileSlice, HiveHoodieTableFileIndex fileIndex, HoodieTableMetaClient metaClient) { Option baseFileOpt = fileSlice.getBaseFile(); if (baseFileOpt.isPresent()) { @@ -215,7 +211,7 @@ private BootstrapBaseFileSplit makeExternalFileSplit(PathWithBootstrapFileStatus @Nonnull private List listStatusForSnapshotMode(JobConf job, Map tableMetaClientMap, - List snapshotPaths) throws IOException { + List snapshotPaths) { HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext(job); List targetFiles = new ArrayList<>(); @@ -248,14 +244,12 @@ private List listStatusForSnapshotMode(JobConf job, Map> partitionedFileSlices = fileIndex.listFileSlices(); - Option virtualKeyInfoOpt = getHoodieVirtualKeyInfo(tableMetaClient); - targetFiles.addAll( partitionedFileSlices.values() .stream() .flatMap(Collection::stream) .filter(fileSlice -> checkIfValidFileSlice(fileSlice)) - .map(fileSlice -> createFileStatusUnchecked(fileSlice, fileIndex, virtualKeyInfoOpt)) + .map(fileSlice -> createFileStatusUnchecked(fileSlice, fileIndex, tableMetaClient)) .collect(Collectors.toList()) ); } @@ -290,24 +284,4 @@ protected static FileStatus getFileStatusUnchecked(HoodieBaseFile baseFile) { throw new HoodieIOException("Failed to get file-status", ioe); } } - - protected static Option getHoodieVirtualKeyInfo(HoodieTableMetaClient metaClient) { - HoodieTableConfig tableConfig = metaClient.getTableConfig(); - if (tableConfig.populateMetaFields()) { - return Option.empty(); - } - TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient); - try { - Schema schema = tableSchemaResolver.getTableAvroSchema(); - boolean isNonPartitionedKeyGen = StringUtils.isNullOrEmpty(tableConfig.getPartitionFieldProp()); - return Option.of( - new HoodieVirtualKeyInfo( - tableConfig.getRecordKeyFieldProp(), - isNonPartitionedKeyGen ? Option.empty() : Option.of(tableConfig.getPartitionFieldProp()), - schema.getField(tableConfig.getRecordKeyFieldProp()).pos(), - isNonPartitionedKeyGen ? Option.empty() : Option.of(schema.getField(tableConfig.getPartitionFieldProp()).pos()))); - } catch (Exception exception) { - throw new HoodieException("Fetching table schema failed with exception ", exception); - } - } } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java index 95a1a74b65b91..014548401719c 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java @@ -18,27 +18,20 @@ package org.apache.hudi.hadoop.realtime; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.FileSplit; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.SplitLocationInfo; -import org.apache.hadoop.mapreduce.Job; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFileGroup; import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineUtils; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.hadoop.BootstrapBaseFileSplit; @@ -50,6 +43,18 @@ import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils; +import org.apache.avro.Schema; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.SplitLocationInfo; +import org.apache.hadoop.mapreduce.Job; + import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -86,7 +91,7 @@ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { } @Override - protected FileStatus createFileStatusUnchecked(FileSlice fileSlice, HiveHoodieTableFileIndex fileIndex, Option virtualKeyInfoOpt) { + protected FileStatus createFileStatusUnchecked(FileSlice fileSlice, HiveHoodieTableFileIndex fileIndex, HoodieTableMetaClient metaClient) { Option baseFileOpt = fileSlice.getBaseFile(); Option latestLogFileOpt = fileSlice.getLatestLogFile(); Stream logFiles = fileSlice.getLogFiles(); @@ -96,9 +101,9 @@ protected FileStatus createFileStatusUnchecked(FileSlice fileSlice, HiveHoodieTa // Check if we're reading a MOR table if (baseFileOpt.isPresent()) { - return createRealtimeFileStatusUnchecked(baseFileOpt.get(), logFiles, tableBasePath, latestCompletedInstantOpt, virtualKeyInfoOpt); + return createRealtimeFileStatusUnchecked(baseFileOpt.get(), logFiles, tableBasePath, latestCompletedInstantOpt, getHoodieVirtualKeyInfo(metaClient)); } else if (latestLogFileOpt.isPresent()) { - return createRealtimeFileStatusUnchecked(latestLogFileOpt.get(), logFiles, tableBasePath, latestCompletedInstantOpt, virtualKeyInfoOpt); + return createRealtimeFileStatusUnchecked(latestLogFileOpt.get(), logFiles, tableBasePath, latestCompletedInstantOpt, getHoodieVirtualKeyInfo(metaClient)); } else { throw new IllegalStateException("Invalid state: either base-file or log-file has to be present"); } @@ -384,5 +389,24 @@ private static RealtimeFileStatus createRealtimeFileStatusUnchecked(HoodieLogFil throw new HoodieIOException(String.format("Failed to init %s", RealtimeFileStatus.class.getSimpleName()), e); } } -} + private static Option getHoodieVirtualKeyInfo(HoodieTableMetaClient metaClient) { + HoodieTableConfig tableConfig = metaClient.getTableConfig(); + if (tableConfig.populateMetaFields()) { + return Option.empty(); + } + TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient); + try { + Schema schema = tableSchemaResolver.getTableAvroSchema(); + boolean isNonPartitionedKeyGen = StringUtils.isNullOrEmpty(tableConfig.getPartitionFieldProp()); + return Option.of( + new HoodieVirtualKeyInfo( + tableConfig.getRecordKeyFieldProp(), + isNonPartitionedKeyGen ? Option.empty() : Option.of(tableConfig.getPartitionFieldProp()), + schema.getField(tableConfig.getRecordKeyFieldProp()).pos(), + isNonPartitionedKeyGen ? Option.empty() : Option.of(schema.getField(tableConfig.getPartitionFieldProp()).pos()))); + } catch (Exception exception) { + throw new HoodieException("Fetching table schema failed with exception ", exception); + } + } +}