diff --git a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java index d4bf2a0fd8fb7..428da925c49ea 100644 --- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java +++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java @@ -36,6 +36,7 @@ import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieIOException; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -74,6 +75,7 @@ public abstract class BaseHoodieTableFileIndex { protected final List queryPaths; private final boolean shouldIncludePendingCommits; + private final boolean shouldValidateInstant; private final HoodieTableType tableType; protected final String basePath; @@ -98,6 +100,7 @@ public abstract class BaseHoodieTableFileIndex { * @param queryPaths target DFS paths being queried * @param specifiedQueryInstant instant as of which table is being queried * @param shouldIncludePendingCommits flags whether file-index should exclude any pending operations + * @param shouldValidateInstant flags to validate whether query instant is present in the timeline * @param fileStatusCache transient cache of fetched [[FileStatus]]es */ public BaseHoodieTableFileIndex(HoodieEngineContext engineContext, @@ -107,6 +110,7 @@ public BaseHoodieTableFileIndex(HoodieEngineContext engineContext, List queryPaths, Option specifiedQueryInstant, boolean shouldIncludePendingCommits, + boolean shouldValidateInstant, FileStatusCache fileStatusCache) { this.partitionColumns = metaClient.getTableConfig().getPartitionFields() .orElse(new String[0]); @@ -122,6 +126,7 @@ public BaseHoodieTableFileIndex(HoodieEngineContext engineContext, this.queryPaths = queryPaths; this.specifiedQueryInstant = specifiedQueryInstant; this.shouldIncludePendingCommits = shouldIncludePendingCommits; + this.shouldValidateInstant = shouldValidateInstant; this.tableType = metaClient.getTableType(); this.basePath = metaClient.getBasePath(); @@ -142,6 +147,13 @@ public Option getLatestCompletedInstant() { return getActiveTimeline().filterCompletedInstants().lastInstant(); } + /** + * Returns table's base-path + */ + public String getBasePath() { + return metaClient.getBasePath(); + } + /** * Fetch list of latest base files and log files per partition. * @@ -264,6 +276,8 @@ private void doRefresh() { Option queryInstant = specifiedQueryInstant.or(() -> latestInstant.map(HoodieInstant::getTimestamp)); + validate(activeTimeline, queryInstant); + if (tableType.equals(HoodieTableType.MERGE_ON_READ) && queryType.equals(HoodieTableQueryType.SNAPSHOT)) { cachedAllInputFileSlices = partitionFiles.keySet().stream() .collect(Collectors.toMap( @@ -277,15 +291,15 @@ private void doRefresh() { ) ); } else { - // TODO re-align with the branch (MOR, snapshot) branch cachedAllInputFileSlices = partitionFiles.keySet().stream() .collect(Collectors.toMap( Function.identity(), partitionPath -> - specifiedQueryInstant.map(instant -> - fileSystemView.getLatestFileSlicesBeforeOrOn(partitionPath.path, instant, true)) - .orElse(fileSystemView.getLatestFileSlices(partitionPath.path)) - .collect(Collectors.toList()) + queryInstant.map(instant -> + fileSystemView.getLatestFileSlicesBeforeOrOn(partitionPath.path, instant, true) + ) + .orElse(fileSystemView.getLatestFileSlices(partitionPath.path)) + .collect(Collectors.toList()) ) ); } @@ -303,6 +317,14 @@ private void doRefresh() { LOG.info(String.format("Refresh table %s, spent: %d ms", metaClient.getTableConfig().getTableName(), duration)); } + private void validate(HoodieTimeline activeTimeline, Option queryInstant) { + if (shouldValidateInstant) { + if (queryInstant.isPresent() && !activeTimeline.containsInstant(queryInstant.get())) { + throw new HoodieIOException(String.format("Query instant (%s) not found in the timeline", queryInstant.get())); + } + } + } + private static long fileSliceSize(FileSlice fileSlice) { long logFileSize = fileSlice.getLogFiles().map(HoodieLogFile::getFileSize) .filter(s -> s > 0) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java index d693d91f676fc..f2679a41c8bc0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hudi.common.fs.FSUtils; @@ -147,10 +148,12 @@ public Map getFileGroupIdAndFullPaths(String basePath * been touched multiple times in the given commits, the return value will keep the one * from the latest commit. * + * + * @param hadoopConf * @param basePath The base path * @return the file full path to file status mapping */ - public Map getFullPathToFileStatus(String basePath) { + public Map getFullPathToFileStatus(Configuration hadoopConf, String basePath) { Map fullPathToFileStatus = new HashMap<>(); for (List stats : getPartitionToWriteStats().values()) { // Iterate through all the written files. @@ -158,7 +161,8 @@ public Map getFullPathToFileStatus(String basePath) { String relativeFilePath = stat.getPath(); Path fullPath = relativeFilePath != null ? FSUtils.getPartitionPath(basePath, relativeFilePath) : null; if (fullPath != null) { - FileStatus fileStatus = new FileStatus(stat.getFileSizeInBytes(), false, 0, 0, + long blockSize = FSUtils.getFs(fullPath.toString(), hadoopConf).getDefaultBlockSize(fullPath); + FileStatus fileStatus = new FileStatus(stat.getFileSizeInBytes(), false, 0, blockSize, 0, fullPath); fullPathToFileStatus.put(fullPath.getName(), fileStatus); } @@ -172,14 +176,16 @@ public Map getFullPathToFileStatus(String basePath) { * been touched multiple times in the given commits, the return value will keep the one * from the latest commit by file group ID. * - *

Note: different with {@link #getFullPathToFileStatus(String)}, + *

Note: different with {@link #getFullPathToFileStatus(Configuration, String)}, * only the latest commit file for a file group is returned, * this is an optimization for COPY_ON_WRITE table to eliminate legacy files for filesystem view. * + * + * @param hadoopConf * @param basePath The base path * @return the file ID to file status mapping */ - public Map getFileIdToFileStatus(String basePath) { + public Map getFileIdToFileStatus(Configuration hadoopConf, String basePath) { Map fileIdToFileStatus = new HashMap<>(); for (List stats : getPartitionToWriteStats().values()) { // Iterate through all the written files. diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java index e8aafd830f10f..405522802c368 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java @@ -18,6 +18,10 @@ package org.apache.hudi.sink.partitioner.profile; +import org.apache.flink.core.fs.Path; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieCommitMetadata; @@ -29,11 +33,6 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; import org.apache.hudi.util.StreamerUtil; - -import org.apache.flink.core.fs.Path; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -117,7 +116,7 @@ private static Map getFilesToReadOfInstant( HoodieCommitMetadata metadata, FileSystem fs, HoodieTableType tableType) { - return getFilesToRead(metadata, basePath.toString(), tableType).entrySet().stream() + return getFilesToRead(fs.getConf(), metadata, basePath.toString(), tableType).entrySet().stream() // filter out the file paths that does not exist, some files may be cleaned by // the cleaner. .filter(entry -> { @@ -133,14 +132,16 @@ private static Map getFilesToReadOfInstant( } private static Map getFilesToRead( + Configuration hadoopConf, HoodieCommitMetadata metadata, String basePath, - HoodieTableType tableType) { + HoodieTableType tableType + ) { switch (tableType) { case COPY_ON_WRITE: - return metadata.getFileIdToFileStatus(basePath); + return metadata.getFileIdToFileStatus(hadoopConf, basePath); case MERGE_ON_READ: - return metadata.getFullPathToFileStatus(basePath); + return metadata.getFullPathToFileStatus(hadoopConf, basePath); default: throw new AssertionError(); } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/BaseFileWithLogsSplit.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/BaseFileWithLogsSplit.java deleted file mode 100644 index d0b168f29f75e..0000000000000 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/BaseFileWithLogsSplit.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.hadoop; - -import org.apache.hudi.common.model.HoodieLogFile; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.FileSplit; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -/** - * Encode additional information in split to track matching log file and base files. - * Hence, this class tracks a log/base file split. - */ -public class BaseFileWithLogsSplit extends FileSplit { - // a flag to mark this split is produced by incremental query or not. - private boolean belongsToIncrementalQuery = false; - // the log file paths of this split. - private List deltaLogFiles = new ArrayList<>(); - // max commit time of current split. - private String maxCommitTime = ""; - // the basePath of current hoodie table. - private String basePath = ""; - // the base file belong to this split. - private String baseFilePath = ""; - - public BaseFileWithLogsSplit(Path file, long start, long length, String[] hosts) { - super(file, start, length, hosts); - } - - @Override - public void write(DataOutput out) throws IOException { - super.write(out); - out.writeBoolean(belongsToIncrementalQuery); - Text.writeString(out, maxCommitTime); - Text.writeString(out, basePath); - Text.writeString(out, baseFilePath); - out.writeInt(deltaLogFiles.size()); - for (HoodieLogFile logFile : deltaLogFiles) { - Text.writeString(out, logFile.getPath().toString()); - out.writeLong(logFile.getFileSize()); - } - } - - @Override - public void readFields(DataInput in) throws IOException { - super.readFields(in); - belongsToIncrementalQuery = in.readBoolean(); - maxCommitTime = Text.readString(in); - basePath = Text.readString(in); - baseFilePath = Text.readString(in); - int deltaLogSize = in.readInt(); - List tempDeltaLogs = new ArrayList<>(); - for (int i = 0; i < deltaLogSize; i++) { - String logPath = Text.readString(in); - long logFileSize = in.readLong(); - tempDeltaLogs.add(new HoodieLogFile(new Path(logPath), logFileSize)); - } - deltaLogFiles = tempDeltaLogs; - } - - public boolean getBelongsToIncrementalQuery() { - return belongsToIncrementalQuery; - } - - public void setBelongsToIncrementalQuery(boolean belongsToIncrementalQuery) { - this.belongsToIncrementalQuery = belongsToIncrementalQuery; - } - - public List getDeltaLogFiles() { - return deltaLogFiles; - } - - public void setDeltaLogFiles(List deltaLogFiles) { - this.deltaLogFiles = deltaLogFiles; - } - - public String getMaxCommitTime() { - return maxCommitTime; - } - - public void setMaxCommitTime(String maxCommitTime) { - this.maxCommitTime = maxCommitTime; - } - - public String getBasePath() { - return basePath; - } - - public void setBasePath(String basePath) { - this.basePath = basePath; - } - - public String getBaseFilePath() { - return baseFilePath; - } - - public void setBaseFilePath(String baseFilePath) { - this.baseFilePath = baseFilePath; - } -} diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/BootstrapBaseFileSplit.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/BootstrapBaseFileSplit.java index 1a609e042c854..6db1751771904 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/BootstrapBaseFileSplit.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/BootstrapBaseFileSplit.java @@ -36,9 +36,7 @@ public class BootstrapBaseFileSplit extends FileSplit { * NOTE: This ctor is necessary for Hive to be able to serialize and * then instantiate it when deserializing back */ - public BootstrapBaseFileSplit() { - super(); - } + public BootstrapBaseFileSplit() {} public BootstrapBaseFileSplit(FileSplit baseSplit, FileSplit bootstrapFileSplit) throws IOException { diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieTableFileIndex.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieTableFileIndex.java index 176c1c5e5ca56..000fce5e8fbff 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieTableFileIndex.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieTableFileIndex.java @@ -53,6 +53,7 @@ public HiveHoodieTableFileIndex(HoodieEngineContext engineContext, queryPaths, specifiedQueryInstant, shouldIncludePendingCommits, + true, new NoopCache()); } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java index fd5ef8da781dd..2b8dae255e3c4 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java @@ -18,8 +18,6 @@ package org.apache.hudi.hadoop; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -36,16 +34,20 @@ import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieBaseFile; -import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieTableQueryType; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.hadoop.realtime.HoodieVirtualKeyInfo; import org.apache.hudi.hadoop.utils.HoodieHiveUtils; import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; +import org.apache.parquet.schema.MessageType; import javax.annotation.Nonnull; import java.io.IOException; @@ -57,7 +59,6 @@ import java.util.Map; import java.util.Properties; import java.util.stream.Collectors; -import java.util.stream.Stream; import static org.apache.hudi.common.util.ValidationUtils.checkState; @@ -73,20 +74,7 @@ * * NOTE: This class is invariant of the underlying file-format of the files being read */ -public class HoodieCopyOnWriteTableInputFormat extends FileInputFormat - implements Configurable { - - protected Configuration conf; - - @Override - public final Configuration getConf() { - return conf; - } - - @Override - public final void setConf(Configuration conf) { - this.conf = conf; - } +public class HoodieCopyOnWriteTableInputFormat extends HoodieTableInputFormat { @Override protected boolean isSplitable(FileSystem fs, Path filename) { @@ -159,10 +147,6 @@ public RecordReader getRecordReader(InputSplit spli throw new UnsupportedEncodingException("not implemented"); } - protected boolean includeLogFilesForSnapshotView() { - return false; - } - /** * Abstracts and exposes {@link FileInputFormat#listStatus(JobConf)} operation to subclasses that * lists files (returning an array of {@link FileStatus}) corresponding to the input paths specified @@ -200,6 +184,16 @@ protected List listStatusForIncrementalMode(JobConf job, return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext, tableMetaClient, timeline.get(), fileStatuses, commitsToCheck.get()); } + protected FileStatus createFileStatusUnchecked(FileSlice fileSlice, HiveHoodieTableFileIndex fileIndex, Option virtualKeyInfoOpt) { + Option baseFileOpt = fileSlice.getBaseFile(); + + if (baseFileOpt.isPresent()) { + return getFileStatusUnchecked(baseFileOpt.get()); + } else { + throw new IllegalStateException("Invalid state: base-file has to be present"); + } + } + private BootstrapBaseFileSplit makeExternalFileSplit(PathWithBootstrapFileStatus file, FileSplit split) { try { LOG.info("Making external data split for " + file); @@ -212,11 +206,6 @@ private BootstrapBaseFileSplit makeExternalFileSplit(PathWithBootstrapFileStatus } } - @Nonnull - private List listStatusForSnapshotModeLegacy(JobConf job, Map tableMetaClientMap, List snapshotPaths) throws IOException { - return HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job, tableMetaClientMap, snapshotPaths, includeLogFilesForSnapshotView()); - } - @Nonnull private List listStatusForSnapshotMode(JobConf job, Map tableMetaClientMap, @@ -253,42 +242,17 @@ private List listStatusForSnapshotMode(JobConf job, Map> partitionedFileSlices = fileIndex.listFileSlices(); + Option virtualKeyInfoOpt = getHoodieVirtualKeyInfo(tableMetaClient); + targetFiles.addAll( partitionedFileSlices.values() .stream() .flatMap(Collection::stream) - .map(fileSlice -> { - Option baseFileOpt = fileSlice.getBaseFile(); - Option latestLogFileOpt = fileSlice.getLatestLogFile(); - Stream logFiles = fileSlice.getLogFiles(); - - Option latestCompletedInstantOpt = fileIndex.getLatestCompletedInstant(); - - // Check if we're reading a MOR table - if (includeLogFilesForSnapshotView()) { - if (baseFileOpt.isPresent()) { - return createRealtimeFileStatusUnchecked(baseFileOpt.get(), logFiles, latestCompletedInstantOpt, tableMetaClient); - } else if (latestLogFileOpt.isPresent()) { - return createRealtimeFileStatusUnchecked(latestLogFileOpt.get(), logFiles, latestCompletedInstantOpt, tableMetaClient); - } else { - throw new IllegalStateException("Invalid state: either base-file or log-file has to be present"); - } - } else { - if (baseFileOpt.isPresent()) { - return getFileStatusUnchecked(baseFileOpt.get()); - } else { - throw new IllegalStateException("Invalid state: base-file has to be present"); - } - } - - }) + .map(fileSlice -> createFileStatusUnchecked(fileSlice, fileIndex, virtualKeyInfoOpt)) .collect(Collectors.toList()) ); } - // TODO(HUDI-3280) cleanup - validate(targetFiles, listStatusForSnapshotModeLegacy(job, tableMetaClientMap, snapshotPaths)); - return targetFiles; } @@ -298,7 +262,7 @@ private void validate(List targetFiles, List legacyFileS } @Nonnull - private static FileStatus getFileStatusUnchecked(HoodieBaseFile baseFile) { + protected static FileStatus getFileStatusUnchecked(HoodieBaseFile baseFile) { try { return HoodieInputFormatUtils.getFileStatus(baseFile); } catch (IOException ioe) { @@ -306,57 +270,20 @@ private static FileStatus getFileStatusUnchecked(HoodieBaseFile baseFile) { } } - @Nonnull - private static RealtimeFileStatus createRealtimeFileStatusUnchecked(HoodieBaseFile baseFile, - Stream logFiles, - Option latestCompletedInstantOpt, - HoodieTableMetaClient tableMetaClient) { - List sortedLogFiles = logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList()); - FileStatus baseFileStatus = getFileStatusUnchecked(baseFile); - try { - RealtimeFileStatus rtFileStatus = new RealtimeFileStatus(baseFileStatus); - rtFileStatus.setDeltaLogFiles(sortedLogFiles); - rtFileStatus.setBaseFilePath(baseFile.getPath()); - rtFileStatus.setBasePath(tableMetaClient.getBasePath()); - - if (latestCompletedInstantOpt.isPresent()) { - HoodieInstant latestCompletedInstant = latestCompletedInstantOpt.get(); - checkState(latestCompletedInstant.isCompleted()); - - rtFileStatus.setMaxCommitTime(latestCompletedInstant.getTimestamp()); - } - - if (baseFileStatus instanceof LocatedFileStatusWithBootstrapBaseFile || baseFileStatus instanceof FileStatusWithBootstrapBaseFile) { - rtFileStatus.setBootStrapFileStatus(baseFileStatus); - } - - return rtFileStatus; - } catch (IOException e) { - throw new HoodieIOException(String.format("Failed to init %s", RealtimeFileStatus.class.getSimpleName()), e); + protected static Option getHoodieVirtualKeyInfo(HoodieTableMetaClient metaClient) { + HoodieTableConfig tableConfig = metaClient.getTableConfig(); + if (tableConfig.populateMetaFields()) { + return Option.empty(); } - } - @Nonnull - private static RealtimeFileStatus createRealtimeFileStatusUnchecked(HoodieLogFile latestLogFile, - Stream logFiles, - Option latestCompletedInstantOpt, - HoodieTableMetaClient tableMetaClient) { - List sortedLogFiles = logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList()); + TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient); try { - RealtimeFileStatus rtFileStatus = new RealtimeFileStatus(latestLogFile.getFileStatus()); - rtFileStatus.setDeltaLogFiles(sortedLogFiles); - rtFileStatus.setBasePath(tableMetaClient.getBasePath()); - - if (latestCompletedInstantOpt.isPresent()) { - HoodieInstant latestCompletedInstant = latestCompletedInstantOpt.get(); - checkState(latestCompletedInstant.isCompleted()); - - rtFileStatus.setMaxCommitTime(latestCompletedInstant.getTimestamp()); - } - - return rtFileStatus; - } catch (IOException e) { - throw new HoodieIOException(String.format("Failed to init %s", RealtimeFileStatus.class.getSimpleName()), e); + MessageType parquetSchema = tableSchemaResolver.getTableParquetSchema(); + return Option.of(new HoodieVirtualKeyInfo(tableConfig.getRecordKeyFieldProp(), + tableConfig.getPartitionFieldProp(), parquetSchema.getFieldIndex(tableConfig.getRecordKeyFieldProp()), + parquetSchema.getFieldIndex(tableConfig.getPartitionFieldProp()))); + } catch (Exception exception) { + throw new HoodieException("Fetching table schema failed with exception ", exception); } } } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileInputFormat.java index 0f0f736243c33..6eb1663a0d12c 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileInputFormat.java @@ -41,11 +41,6 @@ protected HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline tim return HoodieInputFormatUtils.filterInstantsTimeline(timeline); } - @Override - protected boolean includeLogFilesForSnapshotView() { - return false; - } - @Override public RecordReader getRecordReader(final InputSplit split, final JobConf job, final Reporter reporter) throws IOException { diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormatBase.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormatBase.java index 4f8d07bf8f5c6..ed88acacb4d2f 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormatBase.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormatBase.java @@ -47,7 +47,7 @@ */ public abstract class HoodieParquetInputFormatBase extends MapredParquetInputFormat implements Configurable { - private final HoodieCopyOnWriteTableInputFormat inputFormatDelegate; + private final HoodieTableInputFormat inputFormatDelegate; protected HoodieParquetInputFormatBase(HoodieCopyOnWriteTableInputFormat inputFormatDelegate) { this.inputFormatDelegate = inputFormatDelegate; diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieTableInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieTableInputFormat.java new file mode 100644 index 0000000000000..d18cb7895ad00 --- /dev/null +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieTableInputFormat.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hadoop; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.JobConf; + +import java.io.IOException; + +/** + * Abstract base class of the Hive's {@link FileInputFormat} implementations allowing for reading of Hudi's + * Copy-on-Write (COW) and Merge-on-Read (MOR) tables + */ +public abstract class HoodieTableInputFormat extends FileInputFormat + implements Configurable { + + protected Configuration conf; + + @Override + public final Configuration getConf() { + return conf; + } + + @Override + public final void setConf(Configuration conf) { + this.conf = conf; + } + + @Override + protected boolean isSplitable(FileSystem fs, Path filename) { + return super.isSplitable(fs, filename); + } + + @Override + protected FileSplit makeSplit(Path file, long start, long length, String[] hosts) { + return super.makeSplit(file, start, length, hosts); + } + + @Override + protected FileSplit makeSplit(Path file, long start, long length, String[] hosts, String[] inMemoryHosts) { + return super.makeSplit(file, start, length, hosts, inMemoryHosts); + } + + @Override + protected FileStatus[] listStatus(JobConf job) throws IOException { + return super.listStatus(job); + } +} diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/PathWithLogFilePath.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/PathWithLogFilePath.java deleted file mode 100644 index 7983e09290180..0000000000000 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/PathWithLogFilePath.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.hadoop; - -import org.apache.hudi.common.model.HoodieLogFile; - -import org.apache.hadoop.fs.Path; - -import java.util.ArrayList; -import java.util.List; - -/** - * Encode additional information in Path to track matching log file and base files. - * Hence, this class tracks a log/base file status. - */ -public class PathWithLogFilePath extends Path { - // a flag to mark this split is produced by incremental query or not. - private boolean belongsToIncrementalPath = false; - // the log files belong this path. - private List deltaLogFiles = new ArrayList<>(); - // max commit time of current path. - private String maxCommitTime = ""; - // the basePath of current hoodie table. - private String basePath = ""; - // the base file belong to this path; - private String baseFilePath = ""; - // the bootstrap file belong to this path. - // only if current query table is bootstrap table, this field is used. - private PathWithBootstrapFileStatus pathWithBootstrapFileStatus; - - public PathWithLogFilePath(Path parent, String child) { - super(parent, child); - } - - public void setBelongsToIncrementalPath(boolean belongsToIncrementalPath) { - this.belongsToIncrementalPath = belongsToIncrementalPath; - } - - public List getDeltaLogFiles() { - return deltaLogFiles; - } - - public void setDeltaLogFiles(List deltaLogFiles) { - this.deltaLogFiles = deltaLogFiles; - } - - public String getMaxCommitTime() { - return maxCommitTime; - } - - public void setMaxCommitTime(String maxCommitTime) { - this.maxCommitTime = maxCommitTime; - } - - public String getBasePath() { - return basePath; - } - - public boolean getBelongsToIncrementalQuery() { - return belongsToIncrementalPath; - } - - public void setBasePath(String basePath) { - this.basePath = basePath; - } - - public void setBaseFilePath(String baseFilePath) { - this.baseFilePath = baseFilePath; - } - - public boolean splitable() { - return !baseFilePath.isEmpty(); - } - - public PathWithBootstrapFileStatus getPathWithBootstrapFileStatus() { - return pathWithBootstrapFileStatus; - } - - public void setPathWithBootstrapFileStatus(PathWithBootstrapFileStatus pathWithBootstrapFileStatus) { - this.pathWithBootstrapFileStatus = pathWithBootstrapFileStatus; - } - - public boolean includeBootstrapFilePath() { - return pathWithBootstrapFileStatus != null; - } - - public BaseFileWithLogsSplit buildSplit(Path file, long start, long length, String[] hosts) { - BaseFileWithLogsSplit bs = new BaseFileWithLogsSplit(file, start, length, hosts); - bs.setBelongsToIncrementalQuery(belongsToIncrementalPath); - bs.setDeltaLogFiles(deltaLogFiles); - bs.setMaxCommitTime(maxCommitTime); - bs.setBasePath(basePath); - bs.setBaseFilePath(baseFilePath); - return bs; - } -} diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/RealtimeFileStatus.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/RealtimeFileStatus.java index 1d732f5a612a3..641aa2759ff20 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/RealtimeFileStatus.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/RealtimeFileStatus.java @@ -18,13 +18,14 @@ package org.apache.hudi.hadoop; -import org.apache.hudi.common.model.HoodieLogFile; - import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.hadoop.realtime.HoodieRealtimePath; +import org.apache.hudi.hadoop.realtime.HoodieVirtualKeyInfo; import java.io.IOException; -import java.util.ArrayList; import java.util.List; /** @@ -34,51 +35,62 @@ * in Path. */ public class RealtimeFileStatus extends FileStatus { - // a flag to mark this split is produced by incremental query or not. - private boolean belongToIncrementalFileStatus = false; - // the log files belong this fileStatus. - private List deltaLogFiles = new ArrayList<>(); - // max commit time of current fileStatus. + /** + * Base path of the table this path belongs to + */ + private final String basePath; + /** + * List of delta log-files holding updated records for this base-file + */ + private final List deltaLogFiles; + /** + * Marks whether this path produced as part of Incremental Query + */ + private final boolean belongsToIncrementalQuery; + /** + * Latest commit instant available at the time of the query in which all of the files + * pertaining to this split are represented + */ private String maxCommitTime = ""; - // the basePath of current hoodie table. - private String basePath = ""; - // the base file belong to this status; - private String baseFilePath = ""; - // the bootstrap file belong to this status. - // only if current query table is bootstrap table, this field is used. + /** + * File status for the Bootstrap file (only relevant if this table is a bootstrapped table + */ private FileStatus bootStrapFileStatus; - - public RealtimeFileStatus(FileStatus fileStatus) throws IOException { + /** + * Virtual key configuration of the table this split belongs to + */ + private final Option virtualKeyInfo; + + public RealtimeFileStatus(FileStatus fileStatus, + String basePath, + List deltaLogFiles, + boolean belongsToIncrementalQuery, + Option virtualKeyInfo) throws IOException { super(fileStatus); + this.basePath = basePath; + this.deltaLogFiles = deltaLogFiles; + this.belongsToIncrementalQuery = belongsToIncrementalQuery; + this.virtualKeyInfo = virtualKeyInfo; } @Override public Path getPath() { Path path = super.getPath(); - PathWithLogFilePath pathWithLogFilePath = new PathWithLogFilePath(path.getParent(), path.getName()); - pathWithLogFilePath.setBelongsToIncrementalPath(belongToIncrementalFileStatus); - pathWithLogFilePath.setDeltaLogFiles(deltaLogFiles); - pathWithLogFilePath.setMaxCommitTime(maxCommitTime); - pathWithLogFilePath.setBasePath(basePath); - pathWithLogFilePath.setBaseFilePath(baseFilePath); + + HoodieRealtimePath realtimePath = new HoodieRealtimePath(path.getParent(), path.getName(), basePath, + deltaLogFiles, maxCommitTime, belongsToIncrementalQuery, virtualKeyInfo); + if (bootStrapFileStatus != null) { - pathWithLogFilePath.setPathWithBootstrapFileStatus((PathWithBootstrapFileStatus)bootStrapFileStatus.getPath()); + realtimePath.setPathWithBootstrapFileStatus((PathWithBootstrapFileStatus)bootStrapFileStatus.getPath()); } - return pathWithLogFilePath; - } - public void setBelongToIncrementalFileStatus(boolean belongToIncrementalFileStatus) { - this.belongToIncrementalFileStatus = belongToIncrementalFileStatus; + return realtimePath; } public List getDeltaLogFiles() { return deltaLogFiles; } - public void setDeltaLogFiles(List deltaLogFiles) { - this.deltaLogFiles = deltaLogFiles; - } - public String getMaxCommitTime() { return maxCommitTime; } @@ -87,18 +99,6 @@ public void setMaxCommitTime(String maxCommitTime) { this.maxCommitTime = maxCommitTime; } - public String getBasePath() { - return basePath; - } - - public void setBasePath(String basePath) { - this.basePath = basePath; - } - - public void setBaseFilePath(String baseFilePath) { - this.baseFilePath = baseFilePath; - } - public void setBootStrapFileStatus(FileStatus bootStrapFileStatus) { this.bootStrapFileStatus = bootStrapFileStatus; } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java index 7b482f4155de3..982d52b0d4807 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java @@ -26,8 +26,10 @@ import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.SplitLocationInfo; import org.apache.hadoop.mapreduce.Job; import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFileGroup; import org.apache.hudi.common.model.HoodieLogFile; @@ -37,11 +39,12 @@ import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.hadoop.BootstrapBaseFileSplit; import org.apache.hudi.hadoop.FileStatusWithBootstrapBaseFile; +import org.apache.hudi.hadoop.HiveHoodieTableFileIndex; import org.apache.hudi.hadoop.HoodieCopyOnWriteTableInputFormat; import org.apache.hudi.hadoop.LocatedFileStatusWithBootstrapBaseFile; -import org.apache.hudi.hadoop.PathWithLogFilePath; import org.apache.hudi.hadoop.RealtimeFileStatus; import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils; @@ -53,6 +56,9 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.hudi.common.util.ValidationUtils.checkState; /** * Base implementation of the Hive's {@link FileInputFormat} allowing for reading of Hudi's @@ -63,18 +69,38 @@ *

  • Incremental mode: reading table's state as of particular timestamp (or instant, in Hudi's terms)
  • *
  • External mode: reading non-Hudi partitions
  • * - * + *

    * NOTE: This class is invariant of the underlying file-format of the files being read */ public class HoodieMergeOnReadTableInputFormat extends HoodieCopyOnWriteTableInputFormat implements Configurable { @Override public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { - List fileSplits = Arrays.stream(super.getSplits(job, numSplits)).map(is -> (FileSplit) is).collect(Collectors.toList()); + List fileSplits = Arrays.stream(super.getSplits(job, numSplits)) + .map(is -> (FileSplit) is) + .collect(Collectors.toList()); + + return (containsIncrementalQuerySplits(fileSplits) ? filterIncrementalQueryFileSplits(fileSplits) : fileSplits) + .toArray(new FileSplit[0]); + } + + @Override + protected FileStatus createFileStatusUnchecked(FileSlice fileSlice, HiveHoodieTableFileIndex fileIndex, Option virtualKeyInfoOpt) { + Option baseFileOpt = fileSlice.getBaseFile(); + Option latestLogFileOpt = fileSlice.getLatestLogFile(); + Stream logFiles = fileSlice.getLogFiles(); - return HoodieRealtimeInputFormatUtils.isIncrementalQuerySplits(fileSplits) - ? HoodieRealtimeInputFormatUtils.getIncrementalRealtimeSplits(job, fileSplits) - : HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits); + Option latestCompletedInstantOpt = fileIndex.getLatestCompletedInstant(); + String tableBasePath = fileIndex.getBasePath(); + + // Check if we're reading a MOR table + if (baseFileOpt.isPresent()) { + return createRealtimeFileStatusUnchecked(baseFileOpt.get(), logFiles, tableBasePath, latestCompletedInstantOpt, virtualKeyInfoOpt); + } else if (latestLogFileOpt.isPresent()) { + return createRealtimeFileStatusUnchecked(latestLogFileOpt.get(), logFiles, tableBasePath, latestCompletedInstantOpt, virtualKeyInfoOpt); + } else { + throw new IllegalStateException("Invalid state: either base-file or log-file has to be present"); + } } /** @@ -126,7 +152,7 @@ protected List listStatusForIncrementalMode(JobConf job, // build fileGroup from fsView List affectedFileStatus = Arrays.asList(HoodieInputFormatUtils - .listAffectedFilesForCommits(new Path(tableMetaClient.getBasePath()), metadataList)); + .listAffectedFilesForCommits(job, new Path(tableMetaClient.getBasePath()), metadataList)); // step3 HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(tableMetaClient, commitsTimelineToReturn, affectedFileStatus.toArray(new FileStatus[0])); // build fileGroup from fsView @@ -152,21 +178,17 @@ protected List listStatusForIncrementalMode(JobConf job, candidateFileStatus.put(key, fileStatuses[i]); } + Option virtualKeyInfoOpt = getHoodieVirtualKeyInfo(tableMetaClient); String maxCommitTime = fsView.getLastInstant().get().getTimestamp(); // step6 - result.addAll(collectAllIncrementalFiles(fileGroups, maxCommitTime, basePath.toString(), candidateFileStatus)); + result.addAll(collectAllIncrementalFiles(fileGroups, maxCommitTime, basePath.toString(), candidateFileStatus, virtualKeyInfoOpt)); return result; } - @Override - protected boolean includeLogFilesForSnapshotView() { - return true; - } - @Override protected boolean isSplitable(FileSystem fs, Path filename) { - if (filename instanceof PathWithLogFilePath) { - return ((PathWithLogFilePath)filename).splitable(); + if (filename instanceof HoodieRealtimePath) { + return ((HoodieRealtimePath) filename).isSplitable(); } return super.isSplitable(fs, filename); @@ -177,21 +199,26 @@ protected boolean isSplitable(FileSystem fs, Path filename) { // PathWithLogFilePath, so those bootstrap files should be processed int this function. @Override protected FileSplit makeSplit(Path file, long start, long length, String[] hosts) { - if (file instanceof PathWithLogFilePath) { - return doMakeSplitForPathWithLogFilePath((PathWithLogFilePath) file, start, length, hosts, null); + if (file instanceof HoodieRealtimePath) { + return doMakeSplitForRealtimePath((HoodieRealtimePath) file, start, length, hosts, null); } return super.makeSplit(file, start, length, hosts); } @Override protected FileSplit makeSplit(Path file, long start, long length, String[] hosts, String[] inMemoryHosts) { - if (file instanceof PathWithLogFilePath) { - return doMakeSplitForPathWithLogFilePath((PathWithLogFilePath) file, start, length, hosts, inMemoryHosts); + if (file instanceof HoodieRealtimePath) { + return doMakeSplitForRealtimePath((HoodieRealtimePath) file, start, length, hosts, inMemoryHosts); } return super.makeSplit(file, start, length, hosts, inMemoryHosts); } - private List collectAllIncrementalFiles(List fileGroups, String maxCommitTime, String basePath, Map candidateFileStatus) { + private static List collectAllIncrementalFiles(List fileGroups, + String maxCommitTime, + String basePath, + Map candidateFileStatus, + Option virtualKeyInfoOpt) { + List result = new ArrayList<>(); fileGroups.stream().forEach(f -> { try { @@ -202,15 +229,12 @@ private List collectAllIncrementalFiles(List fileGr if (!candidateFileStatus.containsKey(baseFilePath)) { throw new HoodieException("Error obtaining fileStatus for file: " + baseFilePath); } + List deltaLogFiles = f.getLatestFileSlice().get().getLogFiles().collect(Collectors.toList()); // We cannot use baseFileStatus.getPath() here, since baseFileStatus.getPath() missing file size information. // So we use candidateFileStatus.get(baseFileStatus.getPath()) to get a correct path. - RealtimeFileStatus fileStatus = new RealtimeFileStatus(candidateFileStatus.get(baseFilePath)); + RealtimeFileStatus fileStatus = new RealtimeFileStatus(candidateFileStatus.get(baseFilePath), + basePath, deltaLogFiles, true, virtualKeyInfoOpt); fileStatus.setMaxCommitTime(maxCommitTime); - fileStatus.setBelongToIncrementalFileStatus(true); - fileStatus.setBasePath(basePath); - fileStatus.setBaseFilePath(baseFilePath); - fileStatus.setDeltaLogFiles(f.getLatestFileSlice().get().getLogFiles().collect(Collectors.toList())); - // try to set bootstrapfileStatus if (baseFileStatus instanceof LocatedFileStatusWithBootstrapBaseFile || baseFileStatus instanceof FileStatusWithBootstrapBaseFile) { fileStatus.setBootStrapFileStatus(baseFileStatus); } @@ -220,11 +244,10 @@ private List collectAllIncrementalFiles(List fileGr if (f.getLatestFileSlice().isPresent() && baseFiles.isEmpty()) { List logFileStatus = f.getLatestFileSlice().get().getLogFiles().map(logFile -> logFile.getFileStatus()).collect(Collectors.toList()); if (logFileStatus.size() > 0) { - RealtimeFileStatus fileStatus = new RealtimeFileStatus(logFileStatus.get(0)); - fileStatus.setBelongToIncrementalFileStatus(true); - fileStatus.setDeltaLogFiles(logFileStatus.stream().map(l -> new HoodieLogFile(l.getPath(), l.getLen())).collect(Collectors.toList())); + List deltaLogFiles = logFileStatus.stream().map(l -> new HoodieLogFile(l.getPath(), l.getLen())).collect(Collectors.toList()); + RealtimeFileStatus fileStatus = new RealtimeFileStatus(logFileStatus.get(0), basePath, + deltaLogFiles, true, virtualKeyInfoOpt); fileStatus.setMaxCommitTime(maxCommitTime); - fileStatus.setBasePath(basePath); result.add(fileStatus); } } @@ -235,20 +258,117 @@ private List collectAllIncrementalFiles(List fileGr return result; } - private FileSplit doMakeSplitForPathWithLogFilePath(PathWithLogFilePath path, long start, long length, String[] hosts, String[] inMemoryHosts) { - if (!path.includeBootstrapFilePath()) { - return path.buildSplit(path, start, length, hosts); - } else { + private FileSplit doMakeSplitForRealtimePath(HoodieRealtimePath path, long start, long length, String[] hosts, String[] inMemoryHosts) { + if (path.includeBootstrapFilePath()) { FileSplit bf = inMemoryHosts == null ? super.makeSplit(path.getPathWithBootstrapFileStatus(), start, length, hosts) : super.makeSplit(path.getPathWithBootstrapFileStatus(), start, length, hosts, inMemoryHosts); - return HoodieRealtimeInputFormatUtils.createRealtimeBoostrapBaseFileSplit( + return createRealtimeBoostrapBaseFileSplit( (BootstrapBaseFileSplit) bf, path.getBasePath(), path.getDeltaLogFiles(), path.getMaxCommitTime(), - path.getBelongsToIncrementalQuery()); + path.getBelongsToIncrementalQuery(), + path.getVirtualKeyInfo() + ); + } + + return createRealtimeFileSplit(path, start, length, hosts); + } + + private static boolean containsIncrementalQuerySplits(List fileSplits) { + return fileSplits.stream().anyMatch(HoodieRealtimeInputFormatUtils::doesBelongToIncrementalQuery); + } + + private static List filterIncrementalQueryFileSplits(List fileSplits) { + return fileSplits.stream().filter(HoodieRealtimeInputFormatUtils::doesBelongToIncrementalQuery) + .collect(Collectors.toList()); + } + + private static HoodieRealtimeFileSplit createRealtimeFileSplit(HoodieRealtimePath path, long start, long length, String[] hosts) { + try { + return new HoodieRealtimeFileSplit(new FileSplit(path, start, length, hosts), path); + } catch (IOException e) { + throw new HoodieIOException(String.format("Failed to create instance of %s", HoodieRealtimeFileSplit.class.getName()), e); + } + } + + private static HoodieRealtimeBootstrapBaseFileSplit createRealtimeBoostrapBaseFileSplit(BootstrapBaseFileSplit split, + String basePath, + List logFiles, + String maxInstantTime, + boolean belongsToIncrementalQuery, + Option virtualKeyInfoOpt) { + try { + String[] hosts = split.getLocationInfo() != null ? Arrays.stream(split.getLocationInfo()) + .filter(x -> !x.isInMemory()).toArray(String[]::new) : new String[0]; + String[] inMemoryHosts = split.getLocationInfo() != null ? Arrays.stream(split.getLocationInfo()) + .filter(SplitLocationInfo::isInMemory).toArray(String[]::new) : new String[0]; + FileSplit baseSplit = new FileSplit(split.getPath(), split.getStart(), split.getLength(), + hosts, inMemoryHosts); + return new HoodieRealtimeBootstrapBaseFileSplit(baseSplit, basePath, logFiles, maxInstantTime, split.getBootstrapFileSplit(), + belongsToIncrementalQuery, virtualKeyInfoOpt); + } catch (IOException e) { + throw new HoodieIOException("Error creating hoodie real time split ", e); + } + } + + /** + * Creates {@link RealtimeFileStatus} for the file-slice where base file is present + */ + private static RealtimeFileStatus createRealtimeFileStatusUnchecked(HoodieBaseFile baseFile, + Stream logFiles, + String basePath, + Option latestCompletedInstantOpt, + Option virtualKeyInfoOpt) { + FileStatus baseFileStatus = getFileStatusUnchecked(baseFile); + List sortedLogFiles = logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList()); + + try { + RealtimeFileStatus rtFileStatus = new RealtimeFileStatus(baseFileStatus, basePath, sortedLogFiles, + false, virtualKeyInfoOpt); + + if (latestCompletedInstantOpt.isPresent()) { + HoodieInstant latestCompletedInstant = latestCompletedInstantOpt.get(); + checkState(latestCompletedInstant.isCompleted()); + + rtFileStatus.setMaxCommitTime(latestCompletedInstant.getTimestamp()); + } + + if (baseFileStatus instanceof LocatedFileStatusWithBootstrapBaseFile || baseFileStatus instanceof FileStatusWithBootstrapBaseFile) { + rtFileStatus.setBootStrapFileStatus(baseFileStatus); + } + + return rtFileStatus; + } catch (IOException e) { + throw new HoodieIOException(String.format("Failed to init %s", RealtimeFileStatus.class.getSimpleName()), e); + } + } + + /** + * Creates {@link RealtimeFileStatus} for the file-slice where base file is NOT present + */ + private static RealtimeFileStatus createRealtimeFileStatusUnchecked(HoodieLogFile latestLogFile, + Stream logFiles, + String basePath, + Option latestCompletedInstantOpt, + Option virtualKeyInfoOpt) { + List sortedLogFiles = logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList()); + try { + RealtimeFileStatus rtFileStatus = new RealtimeFileStatus(latestLogFile.getFileStatus(), basePath, + sortedLogFiles, false, virtualKeyInfoOpt); + + if (latestCompletedInstantOpt.isPresent()) { + HoodieInstant latestCompletedInstant = latestCompletedInstantOpt.get(); + checkState(latestCompletedInstant.isCompleted()); + + rtFileStatus.setMaxCommitTime(latestCompletedInstant.getTimestamp()); + } + + return rtFileStatus; + } catch (IOException e) { + throw new HoodieIOException(String.format("Failed to init %s", RealtimeFileStatus.class.getSimpleName()), e); } } } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java index e204c3b075e6a..e8c806ed2cf67 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java @@ -94,7 +94,7 @@ void addProjectionToJobConf(final RealtimeSplit realtimeSplit, final JobConf job // TO fix this, hoodie columns are appended late at the time record-reader gets built instead of construction // time. if (!realtimeSplit.getDeltaLogPaths().isEmpty()) { - HoodieRealtimeInputFormatUtils.addRequiredProjectionFields(jobConf, realtimeSplit.getHoodieVirtualKeyInfo()); + HoodieRealtimeInputFormatUtils.addRequiredProjectionFields(jobConf, realtimeSplit.getVirtualKeyInfo()); } jobConf.set(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP, "true"); setConf(jobConf); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeBootstrapBaseFileSplit.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeBootstrapBaseFileSplit.java similarity index 54% rename from hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeBootstrapBaseFileSplit.java rename to hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeBootstrapBaseFileSplit.java index 2ac7204467618..c7022c98ad3cd 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeBootstrapBaseFileSplit.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeBootstrapBaseFileSplit.java @@ -22,81 +22,88 @@ import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.util.Option; import org.apache.hudi.hadoop.BootstrapBaseFileSplit; -import org.apache.hudi.hadoop.InputSplitUtils; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.stream.Collectors; /** - * Realtime File Split with external base file. + * Realtime {@link FileSplit} with external base file * * NOTE: If you're adding fields here you need to make sure that you appropriately de-/serialize them * in {@link #readFromInput(DataInput)} and {@link #writeToOutput(DataOutput)} */ -public class RealtimeBootstrapBaseFileSplit extends BootstrapBaseFileSplit implements RealtimeSplit { - - private List deltaLogPaths; +public class HoodieRealtimeBootstrapBaseFileSplit extends BootstrapBaseFileSplit implements RealtimeSplit { + /** + * Marks whether this path produced as part of Incremental Query + */ + private boolean belongsToIncrementalQuery = false; + /** + * List of delta log-files holding updated records for this base-file + */ private List deltaLogFiles = new ArrayList<>(); - - private String maxInstantTime; - + /** + * Latest commit instant available at the time of the query in which all of the files + * pertaining to this split are represented + */ + private String maxCommitTime; + /** + * Base path of the table this path belongs to + */ private String basePath; - - private boolean belongsToIncrementalSplit; + /** + * Virtual key configuration of the table this split belongs to + */ + private Option virtualKeyInfo = Option.empty(); /** * NOTE: This ctor is necessary for Hive to be able to serialize and * then instantiate it when deserializing back */ - public RealtimeBootstrapBaseFileSplit() { - super(); - } - - public RealtimeBootstrapBaseFileSplit(FileSplit baseSplit, - String basePath, - List deltaLogFiles, - String maxInstantTime, - FileSplit externalFileSplit, - boolean belongsToIncrementalQuery) throws IOException { + public HoodieRealtimeBootstrapBaseFileSplit() {} + + public HoodieRealtimeBootstrapBaseFileSplit(FileSplit baseSplit, + String basePath, + List deltaLogFiles, + String maxInstantTime, + FileSplit externalFileSplit, + boolean belongsToIncrementalQuery, + Option virtualKeyInfoOpt) throws IOException { super(baseSplit, externalFileSplit); - this.maxInstantTime = maxInstantTime; + this.maxCommitTime = maxInstantTime; this.deltaLogFiles = deltaLogFiles; - this.deltaLogPaths = deltaLogFiles.stream().map(entry -> entry.getPath().toString()).collect(Collectors.toList()); this.basePath = basePath; - this.belongsToIncrementalSplit = belongsToIncrementalQuery; + this.belongsToIncrementalQuery = belongsToIncrementalQuery; + this.virtualKeyInfo = virtualKeyInfoOpt; } @Override public void write(DataOutput out) throws IOException { super.write(out); writeToOutput(out); - InputSplitUtils.writeBoolean(belongsToIncrementalSplit, out); } @Override public void readFields(DataInput in) throws IOException { super.readFields(in); readFromInput(in); - belongsToIncrementalSplit = InputSplitUtils.readBoolean(in); } @Override - public List getDeltaLogPaths() { - return deltaLogPaths; + public List getDeltaLogFiles() { + return deltaLogFiles; } @Override - public List getDeltaLogFiles() { - return deltaLogFiles; + public void setDeltaLogFiles(List deltaLogFiles) { + this.deltaLogFiles = deltaLogFiles; } @Override public String getMaxCommitTime() { - return maxInstantTime; + return maxCommitTime; } @Override @@ -105,22 +112,23 @@ public String getBasePath() { } @Override - public Option getHoodieVirtualKeyInfo() { - return Option.empty(); + public Option getVirtualKeyInfo() { + return virtualKeyInfo; } + @Override public boolean getBelongsToIncrementalQuery() { - return belongsToIncrementalSplit; + return belongsToIncrementalQuery; } @Override - public void setDeltaLogPaths(List deltaLogPaths) { - this.deltaLogPaths = deltaLogPaths; + public void setBelongsToIncrementalQuery(boolean belongsToIncrementalPath) { + this.belongsToIncrementalQuery = belongsToIncrementalPath; } @Override public void setMaxCommitTime(String maxInstantTime) { - this.maxInstantTime = maxInstantTime; + this.maxCommitTime = maxInstantTime; } @Override @@ -129,6 +137,7 @@ public void setBasePath(String basePath) { } @Override - public void setHoodieVirtualKeyInfo(Option hoodieVirtualKeyInfo) {} - + public void setVirtualKeyInfo(Option virtualKeyInfo) { + this.virtualKeyInfo = virtualKeyInfo; + } } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeFileSplit.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeFileSplit.java index 2b45fe3f3c324..a424f021c2d20 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeFileSplit.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeFileSplit.java @@ -18,84 +18,125 @@ package org.apache.hudi.hadoop.realtime; +import org.apache.hadoop.mapred.FileSplit; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.util.Option; -import org.apache.hadoop.mapred.FileSplit; - import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.stream.Collectors; /** - * Filesplit that wraps the base split and a list of log files to merge deltas from. + * {@link FileSplit} implementation that holds + *

      + *
    1. Split corresponding to the base file
    2. + *
    3. List of {@link HoodieLogFile} that holds the delta to be merged (upon reading)
    4. + *
    + * + * This split is correspondent to a single file-slice in the Hudi terminology. * * NOTE: If you're adding fields here you need to make sure that you appropriately de-/serialize them * in {@link #readFromInput(DataInput)} and {@link #writeToOutput(DataOutput)} */ public class HoodieRealtimeFileSplit extends FileSplit implements RealtimeSplit { - - private List deltaLogPaths; + /** + * List of delta log-files holding updated records for this base-file + */ private List deltaLogFiles = new ArrayList<>(); - - private String maxCommitTime; - + /** + * Base path of the table this path belongs to + */ private String basePath; - - private Option hoodieVirtualKeyInfo = Option.empty(); + /** + * Latest commit instant available at the time of the query in which all of the files + * pertaining to this split are represented + */ + private String maxCommitTime; + /** + * Marks whether this path produced as part of Incremental Query + */ + private boolean belongsToIncrementalQuery = false; + /** + * Virtual key configuration of the table this split belongs to + */ + private Option virtualKeyInfo = Option.empty(); public HoodieRealtimeFileSplit() {} - public HoodieRealtimeFileSplit(FileSplit baseSplit, String basePath, List deltaLogFiles, String maxCommitTime, - Option hoodieVirtualKeyInfo) + public HoodieRealtimeFileSplit(FileSplit baseSplit, + HoodieRealtimePath path) + throws IOException { + this(baseSplit, + path.getBasePath(), + path.getDeltaLogFiles(), + path.getMaxCommitTime(), + path.getBelongsToIncrementalQuery(), + path.getVirtualKeyInfo()); + } + + /** + * @VisibleInTesting + */ + public HoodieRealtimeFileSplit(FileSplit baseSplit, + String basePath, + List deltaLogFiles, + String maxCommitTime, + boolean belongsToIncrementalQuery, + Option virtualKeyInfo) throws IOException { super(baseSplit.getPath(), baseSplit.getStart(), baseSplit.getLength(), baseSplit.getLocations()); this.deltaLogFiles = deltaLogFiles; - this.deltaLogPaths = deltaLogFiles.stream().map(entry -> entry.getPath().toString()).collect(Collectors.toList()); - this.maxCommitTime = maxCommitTime; this.basePath = basePath; - this.hoodieVirtualKeyInfo = hoodieVirtualKeyInfo; - } - - public List getDeltaLogPaths() { - return deltaLogPaths; + this.maxCommitTime = maxCommitTime; + this.belongsToIncrementalQuery = belongsToIncrementalQuery; + this.virtualKeyInfo = virtualKeyInfo; } public List getDeltaLogFiles() { return deltaLogFiles; } + @Override + public void setDeltaLogFiles(List deltaLogFiles) { + this.deltaLogFiles = deltaLogFiles; + } + public String getMaxCommitTime() { return maxCommitTime; } + public void setMaxCommitTime(String maxCommitTime) { + this.maxCommitTime = maxCommitTime; + } + public String getBasePath() { return basePath; } - @Override - public void setHoodieVirtualKeyInfo(Option hoodieVirtualKeyInfo) { - this.hoodieVirtualKeyInfo = hoodieVirtualKeyInfo; + public void setBasePath(String basePath) { + this.basePath = basePath; } @Override - public Option getHoodieVirtualKeyInfo() { - return hoodieVirtualKeyInfo; + public void setVirtualKeyInfo(Option virtualKeyInfo) { + this.virtualKeyInfo = virtualKeyInfo; } - public void setDeltaLogPaths(List deltaLogPaths) { - this.deltaLogPaths = deltaLogPaths; + @Override + public Option getVirtualKeyInfo() { + return virtualKeyInfo; } - public void setMaxCommitTime(String maxCommitTime) { - this.maxCommitTime = maxCommitTime; + @Override + public boolean getBelongsToIncrementalQuery() { + return belongsToIncrementalQuery; } - public void setBasePath(String basePath) { - this.basePath = basePath; + @Override + public void setBelongsToIncrementalQuery(boolean belongsToIncrementalPath) { + this.belongsToIncrementalQuery = belongsToIncrementalPath; } @Override @@ -112,7 +153,7 @@ public void readFields(DataInput in) throws IOException { @Override public String toString() { - return "HoodieRealtimeFileSplit{DataPath=" + getPath() + ", deltaLogPaths=" + deltaLogPaths + return "HoodieRealtimeFileSplit{DataPath=" + getPath() + ", deltaLogPaths=" + getDeltaLogPaths() + ", maxCommitTime='" + maxCommitTime + '\'' + ", basePath='" + basePath + '\'' + '}'; } } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimePath.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimePath.java new file mode 100644 index 0000000000000..bba44d5c6632c --- /dev/null +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimePath.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hadoop.realtime; + +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.hadoop.PathWithBootstrapFileStatus; + +import java.util.List; + +/** + * {@link Path} implementation encoding additional information necessary to appropriately read + * base files of the MOR tables, such as list of delta log files (holding updated records) associated + * w/ the base file, etc. + */ +public class HoodieRealtimePath extends Path { + /** + * Marks whether this path produced as part of Incremental Query + */ + private final boolean belongsToIncrementalQuery; + /** + * List of delta log-files holding updated records for this base-file + */ + private final List deltaLogFiles; + /** + * Latest commit instant available at the time of the query in which all of the files + * pertaining to this split are represented + */ + private final String maxCommitTime; + /** + * Base path of the table this path belongs to + */ + private final String basePath; + /** + * Virtual key configuration of the table this split belongs to + */ + private final Option virtualKeyInfo; + /** + * File status for the Bootstrap file (only relevant if this table is a bootstrapped table + */ + private PathWithBootstrapFileStatus pathWithBootstrapFileStatus; + + public HoodieRealtimePath(Path parent, + String child, + String basePath, + List deltaLogFiles, + String maxCommitTime, + boolean belongsToIncrementalQuery, + Option virtualKeyInfo) { + super(parent, child); + this.basePath = basePath; + this.deltaLogFiles = deltaLogFiles; + this.maxCommitTime = maxCommitTime; + this.belongsToIncrementalQuery = belongsToIncrementalQuery; + this.virtualKeyInfo = virtualKeyInfo; + } + + public List getDeltaLogFiles() { + return deltaLogFiles; + } + + public String getMaxCommitTime() { + return maxCommitTime; + } + + public String getBasePath() { + return basePath; + } + + public boolean getBelongsToIncrementalQuery() { + return belongsToIncrementalQuery; + } + + public boolean isSplitable() { + return !toString().isEmpty(); + } + + public PathWithBootstrapFileStatus getPathWithBootstrapFileStatus() { + return pathWithBootstrapFileStatus; + } + + public void setPathWithBootstrapFileStatus(PathWithBootstrapFileStatus pathWithBootstrapFileStatus) { + this.pathWithBootstrapFileStatus = pathWithBootstrapFileStatus; + } + + public boolean includeBootstrapFilePath() { + return pathWithBootstrapFileStatus != null; + } + + public Option getVirtualKeyInfo() { + return virtualKeyInfo; + } +} diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java index d35df9f33776e..b917f004bcd06 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java @@ -18,6 +18,12 @@ package org.apache.hudi.hadoop.realtime; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.fs.FSUtils; @@ -29,13 +35,6 @@ import org.apache.hudi.hadoop.config.HoodieRealtimeConfig; import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils; - -import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.io.ArrayWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordReader; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -55,7 +54,7 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader private final Set deltaRecordKeys; private final HoodieMergedLogRecordScanner mergedLogRecordScanner; - private int recordKeyIndex = HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS; + private final int recordKeyIndex; private Iterator deltaItr; public RealtimeCompactedRecordReader(RealtimeSplit split, JobConf job, @@ -65,9 +64,9 @@ public RealtimeCompactedRecordReader(RealtimeSplit split, JobConf job, this.mergedLogRecordScanner = getMergedLogRecordScanner(); this.deltaRecordMap = mergedLogRecordScanner.getRecords(); this.deltaRecordKeys = new HashSet<>(this.deltaRecordMap.keySet()); - if (split.getHoodieVirtualKeyInfo().isPresent()) { - this.recordKeyIndex = split.getHoodieVirtualKeyInfo().get().getRecordKeyFieldIndex(); - } + this.recordKeyIndex = split.getVirtualKeyInfo() + .map(HoodieVirtualKeyInfo::getRecordKeyFieldIndex) + .orElse(HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS); } /** diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeSplit.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeSplit.java index 6c1e02cf64bbf..d9b1923c60f80 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeSplit.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeSplit.java @@ -18,18 +18,18 @@ package org.apache.hudi.hadoop.realtime; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.InputSplitWithLocationInfo; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.util.Option; import org.apache.hudi.hadoop.InputSplitUtils; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.InputSplitWithLocationInfo; - import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.stream.Collectors; /** * Realtime Input Split Interface. @@ -40,10 +40,14 @@ public interface RealtimeSplit extends InputSplitWithLocationInfo { * Return Log File Paths. * @return */ - List getDeltaLogPaths(); + default List getDeltaLogPaths() { + return getDeltaLogFiles().stream().map(entry -> entry.getPath().toString()).collect(Collectors.toList()); + } List getDeltaLogFiles(); + void setDeltaLogFiles(List deltaLogFiles); + /** * Return Max Instant Time. * @return @@ -60,14 +64,12 @@ public interface RealtimeSplit extends InputSplitWithLocationInfo { * Returns Virtual key info if meta fields are disabled. * @return */ - Option getHoodieVirtualKeyInfo(); + Option getVirtualKeyInfo(); /** - * Update Log File Paths. - * - * @param deltaLogPaths + * Returns the flag whether this split belongs to an Incremental Query */ - void setDeltaLogPaths(List deltaLogPaths); + boolean getBelongsToIncrementalQuery(); /** * Update Maximum valid instant time. @@ -81,17 +83,25 @@ public interface RealtimeSplit extends InputSplitWithLocationInfo { */ void setBasePath(String basePath); - void setHoodieVirtualKeyInfo(Option hoodieVirtualKeyInfo); + /** + * Sets the flag whether this split belongs to an Incremental Query + */ + void setBelongsToIncrementalQuery(boolean belongsToIncrementalQuery); + + void setVirtualKeyInfo(Option virtualKeyInfo); default void writeToOutput(DataOutput out) throws IOException { InputSplitUtils.writeString(getBasePath(), out); InputSplitUtils.writeString(getMaxCommitTime(), out); - out.writeInt(getDeltaLogPaths().size()); - for (String logFilePath : getDeltaLogPaths()) { - InputSplitUtils.writeString(logFilePath, out); + InputSplitUtils.writeBoolean(getBelongsToIncrementalQuery(), out); + + out.writeInt(getDeltaLogFiles().size()); + for (HoodieLogFile logFile : getDeltaLogFiles()) { + InputSplitUtils.writeString(logFile.getPath().toString(), out); + out.writeLong(logFile.getFileSize()); } - Option virtualKeyInfoOpt = getHoodieVirtualKeyInfo(); + Option virtualKeyInfoOpt = getVirtualKeyInfo(); if (!virtualKeyInfoOpt.isPresent()) { InputSplitUtils.writeBoolean(false, out); } else { @@ -106,34 +116,39 @@ default void writeToOutput(DataOutput out) throws IOException { default void readFromInput(DataInput in) throws IOException { setBasePath(InputSplitUtils.readString(in)); setMaxCommitTime(InputSplitUtils.readString(in)); + setBelongsToIncrementalQuery(InputSplitUtils.readBoolean(in)); + int totalLogFiles = in.readInt(); - List deltaLogPaths = new ArrayList<>(totalLogFiles); + List deltaLogPaths = new ArrayList<>(totalLogFiles); for (int i = 0; i < totalLogFiles; i++) { - deltaLogPaths.add(InputSplitUtils.readString(in)); + String logFilePath = InputSplitUtils.readString(in); + long logFileSize = in.readLong(); + deltaLogPaths.add(new HoodieLogFile(new Path(logFilePath), logFileSize)); } - setDeltaLogPaths(deltaLogPaths); + setDeltaLogFiles(deltaLogPaths); + boolean hoodieVirtualKeyPresent = InputSplitUtils.readBoolean(in); if (hoodieVirtualKeyPresent) { String recordKeyField = InputSplitUtils.readString(in); String partitionPathField = InputSplitUtils.readString(in); int recordFieldIndex = Integer.parseInt(InputSplitUtils.readString(in)); int partitionPathIndex = Integer.parseInt(InputSplitUtils.readString(in)); - setHoodieVirtualKeyInfo(Option.of(new HoodieVirtualKeyInfo(recordKeyField, partitionPathField, recordFieldIndex, partitionPathIndex))); + setVirtualKeyInfo(Option.of(new HoodieVirtualKeyInfo(recordKeyField, partitionPathField, recordFieldIndex, partitionPathIndex))); } } /** * The file containing this split's data. */ - public Path getPath(); + Path getPath(); /** * The position of the first byte in the file to process. */ - public long getStart(); + long getStart(); /** * The number of bytes in the file to process. */ - public long getLength(); + long getLength(); } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java index b4f7e336335d4..fa2bce4875379 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java @@ -19,15 +19,11 @@ package org.apache.hudi.hadoop.utils; import org.apache.hadoop.conf.Configuration; -import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.util.CollectionUtils; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.exception.HoodieIOException; - import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hudi.common.util.CollectionUtils; +import org.apache.hudi.common.util.Option; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -148,39 +144,6 @@ public static List getIncrementalTableNames(JobContext job) { return result; } - /** - * Depending on the configs hoodie.%s.consume.pending.commits and hoodie.%s.consume.commit of job - * - * (hoodie..consume.pending.commits, hoodie..consume.commit) -> - * (true, validCommit) -> returns activeTimeline filtered until validCommit - * (true, InValidCommit) -> Raises HoodieIOException - * (true, notSet) -> Raises HoodieIOException - * (false, validCommit) -> returns completedTimeline filtered until validCommit - * (false, InValidCommit) -> Raises HoodieIOException - * (false or notSet, notSet) -> returns completedTimeline unfiltered - * - * validCommit is one which exists in the timeline being checked and vice versa - */ - public static HoodieTimeline getTableTimeline(final String tableName, final JobConf job, final HoodieTableMetaClient metaClient) { - HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline(); - - boolean includePendingCommits = shouldIncludePendingCommits(job, tableName); - Option maxCommit = getMaxCommit(job, tableName); - - HoodieTimeline finalizedTimeline = includePendingCommits ? timeline : timeline.filterCompletedInstants(); - - return !maxCommit.isPresent() ? finalizedTimeline : filterIfInstantExists(tableName, finalizedTimeline, maxCommit.get()); - - } - - private static HoodieTimeline filterIfInstantExists(String tableName, HoodieTimeline timeline, String maxCommit) { - if (maxCommit == null || !timeline.containsInstant(maxCommit)) { - LOG.info("Timestamp " + maxCommit + " doesn't exist in the commits timeline:" + timeline + " table: " + tableName); - throw new HoodieIOException("Valid timestamp is required for " + HOODIE_CONSUME_COMMIT + " in snapshot mode"); - } - return timeline.findInstantsBeforeOrEquals(maxCommit); - } - public static boolean isIncrementalUseDatabase(Configuration conf) { return conf.getBoolean(HOODIE_INCREMENTAL_USE_DATABASE, false); } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java index 63abfc87462c7..7fec1fb63f6fa 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java @@ -18,47 +18,40 @@ package org.apache.hudi.hadoop.utils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; +import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat; +import org.apache.hadoop.hive.ql.io.orc.OrcSerde; +import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat; +import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hudi.common.config.HoodieMetadataConfig; -import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodiePartitionMetadata; -import org.apache.hudi.common.model.HoodieBaseFile; -import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.table.view.FileSystemViewManager; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.table.view.TableFileSystemView; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.hadoop.FileStatusWithBootstrapBaseFile; import org.apache.hudi.hadoop.HoodieHFileInputFormat; import org.apache.hudi.hadoop.HoodieParquetInputFormat; -import org.apache.hudi.hadoop.RealtimeFileStatus; import org.apache.hudi.hadoop.LocatedFileStatusWithBootstrapBaseFile; -import org.apache.hudi.hadoop.FileStatusWithBootstrapBaseFile; import org.apache.hudi.hadoop.realtime.HoodieHFileRealtimeInputFormat; import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocatedFileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; -import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat; -import org.apache.hadoop.hive.ql.io.orc.OrcSerde; -import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat; -import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe; -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapreduce.Job; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -437,70 +430,6 @@ public static HoodieMetadataConfig buildMetadataConfig(Configuration conf) { .build(); } - /** - * @deprecated - */ - public static List filterFileStatusForSnapshotMode(JobConf job, Map tableMetaClientMap, - List snapshotPaths, boolean includeLogFiles) throws IOException { - HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext(job); - List returns = new ArrayList<>(); - - Map> groupedPaths = - HoodieInputFormatUtils.groupSnapshotPathsByMetaClient(tableMetaClientMap.values(), snapshotPaths); - - Map fsViewCache = new HashMap<>(); - - LOG.info("Found a total of " + groupedPaths.size() + " groups"); - - try { - for (Map.Entry> entry : groupedPaths.entrySet()) { - HoodieTableMetaClient metaClient = entry.getKey(); - if (LOG.isDebugEnabled()) { - LOG.debug("Hoodie Metadata initialized with completed commit instant as :" + metaClient); - } - - HoodieTimeline timeline = HoodieHiveUtils.getTableTimeline(metaClient.getTableConfig().getTableName(), job, metaClient); - - HoodieTableFileSystemView fsView = fsViewCache.computeIfAbsent(metaClient, tableMetaClient -> - FileSystemViewManager.createInMemoryFileSystemViewWithTimeline(engineContext, tableMetaClient, buildMetadataConfig(job), timeline)); - List filteredBaseFiles = new ArrayList<>(); - Map> filteredLogs = new HashMap<>(); - for (Path p : entry.getValue()) { - String relativePartitionPath = FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), p); - List matched = fsView.getLatestBaseFiles(relativePartitionPath).collect(Collectors.toList()); - filteredBaseFiles.addAll(matched); - if (includeLogFiles) { - List logMatched = fsView.getLatestFileSlices(relativePartitionPath) - .filter(f -> !f.getBaseFile().isPresent() && f.getLatestLogFile().isPresent()) - .collect(Collectors.toList()); - logMatched.forEach(f -> { - List logPathSizePairs = f.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList()); - filteredLogs.put(f.getLatestLogFile().get().getFileStatus(), logPathSizePairs); - }); - } - } - - LOG.info("Total paths to process after hoodie filter " + filteredBaseFiles.size()); - for (HoodieBaseFile filteredFile : filteredBaseFiles) { - if (LOG.isDebugEnabled()) { - LOG.debug("Processing latest hoodie file - " + filteredFile.getPath()); - } - filteredFile = refreshFileStatus(job, filteredFile); - returns.add(getFileStatus(filteredFile)); - } - - for (Map.Entry> filterLogEntry : filteredLogs.entrySet()) { - RealtimeFileStatus rs = new RealtimeFileStatus(filterLogEntry.getKey()); - rs.setDeltaLogFiles(filterLogEntry.getValue()); - returns.add(rs); - } - } - } finally { - fsViewCache.forEach(((metaClient, fsView) -> fsView.close())); - } - return returns; - } - /** * Checks the file status for a race condition which can set the file size to 0. 1. HiveInputFormat does * super.listStatus() and gets back a FileStatus[] 2. Then it creates the HoodieTableMetaClient for the paths listed. @@ -534,12 +463,12 @@ private static HoodieBaseFile refreshFileStatus(Configuration conf, HoodieBaseFi * * @return the affected file status array */ - public static FileStatus[] listAffectedFilesForCommits(Path basePath, List metadataList) { + public static FileStatus[] listAffectedFilesForCommits(Configuration hadoopConf, Path basePath, List metadataList) { // TODO: Use HoodieMetaTable to extract affected file directly. HashMap fullPathToFileStatus = new HashMap<>(); // Iterate through the given commits. for (HoodieCommitMetadata metadata: metadataList) { - fullPathToFileStatus.putAll(metadata.getFullPathToFileStatus(basePath.toString())); + fullPathToFileStatus.putAll(metadata.getFullPathToFileStatus(hadoopConf, basePath.toString())); } return fullPathToFileStatus.values().toArray(new FileStatus[0]); } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java index 7dc58d1e2f57e..396782d96eeec 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java @@ -22,42 +22,26 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.mapred.FileSplit; -import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.SplitLocationInfo; -import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.table.view.FileSystemViewManager; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; -import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.hadoop.BaseFileWithLogsSplit; -import org.apache.hudi.hadoop.BootstrapBaseFileSplit; +import org.apache.hudi.hadoop.realtime.HoodieRealtimeBootstrapBaseFileSplit; import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit; import org.apache.hudi.hadoop.realtime.HoodieVirtualKeyInfo; -import org.apache.hudi.hadoop.realtime.RealtimeBootstrapBaseFileSplit; import org.apache.hudi.hadoop.realtime.RealtimeSplit; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import org.apache.parquet.schema.MessageType; -import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -66,230 +50,23 @@ import java.util.stream.Stream; import static org.apache.hudi.TypeUtils.unsafeCast; -import static org.apache.hudi.common.util.ValidationUtils.checkState; public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils { private static final Logger LOG = LogManager.getLogger(HoodieRealtimeInputFormatUtils.class); - public static InputSplit[] getRealtimeSplits(Configuration conf, List fileSplits) throws IOException { - if (fileSplits.isEmpty()) { - return new InputSplit[0]; - } - - FileSplit fileSplit = fileSplits.get(0); - - // Pre-process table-config to fetch virtual key info - Path partitionPath = fileSplit.getPath().getParent(); - HoodieTableMetaClient metaClient = getTableMetaClientForBasePathUnchecked(conf, partitionPath); - - Option hoodieVirtualKeyInfoOpt = getHoodieVirtualKeyInfo(metaClient); - - // NOTE: This timeline is kept in sync w/ {@code HoodieTableFileIndexBase} - HoodieInstant latestCommitInstant = - metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get(); - - InputSplit[] finalSplits = fileSplits.stream() - .map(split -> { - // There are 4 types of splits could we have to handle here - // - {@code BootstrapBaseFileSplit}: in case base file does have associated bootstrap file, - // but does NOT have any log files appended (convert it to {@code RealtimeBootstrapBaseFileSplit}) - // - {@code RealtimeBootstrapBaseFileSplit}: in case base file does have associated bootstrap file - // and does have log files appended - // - {@code BaseFileWithLogsSplit}: in case base file does NOT have associated bootstrap file - // and does have log files appended; - // - {@code FileSplit}: in case Hive passed down non-Hudi path - if (split instanceof RealtimeBootstrapBaseFileSplit) { - return split; - } else if (split instanceof BootstrapBaseFileSplit) { - BootstrapBaseFileSplit bootstrapBaseFileSplit = unsafeCast(split); - return createRealtimeBoostrapBaseFileSplit( - bootstrapBaseFileSplit, - metaClient.getBasePath(), - Collections.emptyList(), - latestCommitInstant.getTimestamp(), - false); - } else if (split instanceof BaseFileWithLogsSplit) { - BaseFileWithLogsSplit baseFileWithLogsSplit = unsafeCast(split); - return createHoodieRealtimeSplitUnchecked(baseFileWithLogsSplit, hoodieVirtualKeyInfoOpt); - } else { - // Non-Hudi paths might result in just generic {@code FileSplit} being - // propagated up to this point - return split; - } - }) - .toArray(InputSplit[]::new); - - LOG.info("Returning a total splits of " + finalSplits.length); - - return finalSplits; - } - - /** - * @deprecated - */ - public static InputSplit[] getRealtimeSplitsLegacy(Configuration conf, Stream fileSplits) { - Map> partitionsToParquetSplits = - fileSplits.collect(Collectors.groupingBy(split -> split.getPath().getParent())); - // TODO(vc): Should we handle also non-hoodie splits here? - Map partitionsToMetaClient = getTableMetaClientByPartitionPath(conf, partitionsToParquetSplits.keySet()); - - // Create file system cache so metadata table is only instantiated once. Also can benefit normal file listing if - // partition path is listed twice so file groups will already be loaded in file system - Map fsCache = new HashMap<>(); - // for all unique split parents, obtain all delta files based on delta commit timeline, - // grouped on file id - List rtSplits = new ArrayList<>(); - try { - // Pre process tableConfig from first partition to fetch virtual key info - Option hoodieVirtualKeyInfo = Option.empty(); - if (partitionsToParquetSplits.size() > 0) { - HoodieTableMetaClient metaClient = partitionsToMetaClient.get(partitionsToParquetSplits.keySet().iterator().next()); - hoodieVirtualKeyInfo = getHoodieVirtualKeyInfo(metaClient); - } - Option finalHoodieVirtualKeyInfo = hoodieVirtualKeyInfo; - partitionsToParquetSplits.keySet().forEach(partitionPath -> { - // for each partition path obtain the data & log file groupings, then map back to inputsplits - HoodieTableMetaClient metaClient = partitionsToMetaClient.get(partitionPath); - if (!fsCache.containsKey(metaClient)) { - HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext(conf); - HoodieTableFileSystemView fsView = FileSystemViewManager.createInMemoryFileSystemViewWithTimeline(engineContext, - metaClient, HoodieInputFormatUtils.buildMetadataConfig(conf), metaClient.getActiveTimeline()); - fsCache.put(metaClient, fsView); - } - HoodieTableFileSystemView fsView = fsCache.get(metaClient); - - String relPartitionPath = FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), partitionPath); - // Both commit and delta-commits are included - pick the latest completed one - Option latestCompletedInstant = - metaClient.getActiveTimeline().getWriteTimeline().filterCompletedInstants().lastInstant(); - - Stream latestFileSlices = latestCompletedInstant - .map(instant -> fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, instant.getTimestamp())) - .orElse(Stream.empty()); - - // subgroup splits again by file id & match with log files. - Map> groupedInputSplits = partitionsToParquetSplits.get(partitionPath).stream() - .collect(Collectors.groupingBy(split -> FSUtils.getFileIdFromFilePath(split.getPath()))); - // Get the maxCommit from the last delta or compaction or commit - when bootstrapped from COW table - String maxCommitTime = metaClient.getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION, - HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION, HoodieTimeline.REPLACE_COMMIT_ACTION)) - .filterCompletedInstants().lastInstant().get().getTimestamp(); - latestFileSlices.forEach(fileSlice -> { - List dataFileSplits = groupedInputSplits.getOrDefault(fileSlice.getFileId(), new ArrayList<>()); - dataFileSplits.forEach(split -> { - try { - List logFiles = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()) - .collect(Collectors.toList()); - if (split instanceof BootstrapBaseFileSplit) { - BootstrapBaseFileSplit eSplit = (BootstrapBaseFileSplit) split; - rtSplits.add(createRealtimeBoostrapBaseFileSplit(eSplit, metaClient.getBasePath(), logFiles, maxCommitTime, false)); - } else { - rtSplits.add(new HoodieRealtimeFileSplit(split, metaClient.getBasePath(), logFiles, maxCommitTime, finalHoodieVirtualKeyInfo)); - } - } catch (IOException e) { - throw new HoodieIOException("Error creating hoodie real time split ", e); - } - }); - }); - }); - } catch (Exception e) { - throw new HoodieException("Error obtaining data file/log file grouping ", e); - } finally { - // close all the open fs views. - fsCache.forEach((k, view) -> view.close()); - } - LOG.info("Returning a total splits of " + rtSplits.size()); - return rtSplits.toArray(new InputSplit[0]); - } - - /** - * @deprecated will be replaced w/ {@link #getRealtimeSplits(Configuration, List)} - */ - // get IncrementalRealtimeSplits - public static InputSplit[] getIncrementalRealtimeSplits(Configuration conf, List fileSplits) throws IOException { - checkState(fileSplits.stream().allMatch(HoodieRealtimeInputFormatUtils::doesBelongToIncrementalQuery), - "All splits have to belong to incremental query"); - - List rtSplits = new ArrayList<>(); - Set partitionSet = fileSplits.stream().map(f -> f.getPath().getParent()).collect(Collectors.toSet()); - Map partitionsToMetaClient = getTableMetaClientByPartitionPath(conf, partitionSet); - // Pre process tableConfig from first partition to fetch virtual key info - Option hoodieVirtualKeyInfo = Option.empty(); - if (partitionSet.size() > 0) { - hoodieVirtualKeyInfo = getHoodieVirtualKeyInfo(partitionsToMetaClient.get(partitionSet.iterator().next())); - } - Option finalHoodieVirtualKeyInfo = hoodieVirtualKeyInfo; - fileSplits.stream().forEach(s -> { - // deal with incremental query. - try { - if (s instanceof BaseFileWithLogsSplit) { - BaseFileWithLogsSplit bs = unsafeCast(s); - rtSplits.add(new HoodieRealtimeFileSplit(bs, bs.getBasePath(), bs.getDeltaLogFiles(), bs.getMaxCommitTime(), finalHoodieVirtualKeyInfo)); - } else if (s instanceof RealtimeBootstrapBaseFileSplit) { - rtSplits.add(s); - } - } catch (IOException e) { - throw new HoodieIOException("Error creating hoodie real time split ", e); - } - }); - LOG.info("Returning a total splits of " + rtSplits.size()); - return rtSplits.toArray(new InputSplit[0]); - } - - public static Option getHoodieVirtualKeyInfo(HoodieTableMetaClient metaClient) { - HoodieTableConfig tableConfig = metaClient.getTableConfig(); - if (!tableConfig.populateMetaFields()) { - TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient); - try { - MessageType parquetSchema = tableSchemaResolver.getTableParquetSchema(); - return Option.of(new HoodieVirtualKeyInfo(tableConfig.getRecordKeyFieldProp(), - tableConfig.getPartitionFieldProp(), parquetSchema.getFieldIndex(tableConfig.getRecordKeyFieldProp()), - parquetSchema.getFieldIndex(tableConfig.getPartitionFieldProp()))); - } catch (Exception exception) { - throw new HoodieException("Fetching table schema failed with exception ", exception); - } - } - return Option.empty(); - } - - private static boolean doesBelongToIncrementalQuery(FileSplit s) { - if (s instanceof BaseFileWithLogsSplit) { - BaseFileWithLogsSplit bs = unsafeCast(s); + public static boolean doesBelongToIncrementalQuery(FileSplit s) { + if (s instanceof HoodieRealtimeFileSplit) { + HoodieRealtimeFileSplit bs = unsafeCast(s); return bs.getBelongsToIncrementalQuery(); - } else if (s instanceof RealtimeBootstrapBaseFileSplit) { - RealtimeBootstrapBaseFileSplit bs = unsafeCast(s); + } else if (s instanceof HoodieRealtimeBootstrapBaseFileSplit) { + HoodieRealtimeBootstrapBaseFileSplit bs = unsafeCast(s); return bs.getBelongsToIncrementalQuery(); } return false; } - public static boolean isIncrementalQuerySplits(List fileSplits) { - if (fileSplits == null || fileSplits.size() == 0) { - return false; - } - return fileSplits.stream().anyMatch(HoodieRealtimeInputFormatUtils::doesBelongToIncrementalQuery); - } - - public static RealtimeBootstrapBaseFileSplit createRealtimeBoostrapBaseFileSplit(BootstrapBaseFileSplit split, - String basePath, - List logFiles, - String maxInstantTime, - boolean belongsToIncrementalQuery) { - try { - String[] hosts = split.getLocationInfo() != null ? Arrays.stream(split.getLocationInfo()) - .filter(x -> !x.isInMemory()).toArray(String[]::new) : new String[0]; - String[] inMemoryHosts = split.getLocationInfo() != null ? Arrays.stream(split.getLocationInfo()) - .filter(SplitLocationInfo::isInMemory).toArray(String[]::new) : new String[0]; - FileSplit baseSplit = new FileSplit(split.getPath(), split.getStart(), split.getLength(), - hosts, inMemoryHosts); - return new RealtimeBootstrapBaseFileSplit(baseSplit, basePath, logFiles, maxInstantTime, split.getBootstrapFileSplit(), belongsToIncrementalQuery); - } catch (IOException e) { - throw new HoodieIOException("Error creating hoodie real time split ", e); - } - } - // Return parquet file with a list of log files in the same file group. public static List, List>> groupLogsByBaseFile(Configuration conf, List partitionPaths) { Set partitionSet = new HashSet<>(partitionPaths); @@ -382,7 +159,7 @@ public static boolean requiredProjectionFieldsExistInConf(Configuration configur public static boolean canAddProjectionToJobConf(final RealtimeSplit realtimeSplit, final JobConf jobConf) { return jobConf.get(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP) == null - || (!realtimeSplit.getDeltaLogPaths().isEmpty() && !HoodieRealtimeInputFormatUtils.requiredProjectionFieldsExistInConf(jobConf, realtimeSplit.getHoodieVirtualKeyInfo())); + || (!realtimeSplit.getDeltaLogPaths().isEmpty() && !HoodieRealtimeInputFormatUtils.requiredProjectionFieldsExistInConf(jobConf, realtimeSplit.getVirtualKeyInfo())); } /** @@ -400,18 +177,4 @@ public static void cleanProjectionColumnIds(Configuration conf) { } } } - - private static HoodieRealtimeFileSplit createHoodieRealtimeSplitUnchecked(BaseFileWithLogsSplit baseFileWithLogsSplit, - Option hoodieVirtualKeyInfoOpt) { - try { - return new HoodieRealtimeFileSplit( - baseFileWithLogsSplit, - baseFileWithLogsSplit.getBasePath(), - baseFileWithLogsSplit.getDeltaLogFiles(), - baseFileWithLogsSplit.getMaxCommitTime(), - hoodieVirtualKeyInfoOpt); - } catch (IOException e) { - throw new HoodieIOException(String.format("Failed to init %s", HoodieRealtimeFileSplit.class.getSimpleName()), e); - } - } } diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java index b7cb72e38a8ef..2ae7c36d98e7e 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java @@ -18,6 +18,15 @@ package org.apache.hudi.hadoop; +import org.apache.avro.Schema; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapreduce.Job; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieCommitMetadata; @@ -34,16 +43,6 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.hadoop.testutils.InputFormatTestUtil; import org.apache.hudi.hadoop.utils.HoodieHiveUtils; - -import org.apache.avro.Schema; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.io.ArrayWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapreduce.Job; import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; @@ -203,11 +202,11 @@ public void testSnapshotWithInvalidCommitShouldThrowException() throws IOExcepti FileInputFormat.setInputPaths(jobConf, partitionDir.getPath()); InputFormatTestUtil.setupSnapshotIncludePendingCommits(jobConf, "1"); Exception exception = assertThrows(HoodieIOException.class, () -> inputFormat.listStatus(jobConf)); - assertEquals("Valid timestamp is required for hoodie.%s.consume.commit in snapshot mode", exception.getMessage()); + assertEquals("Query instant (1) not found in the timeline", exception.getMessage()); InputFormatTestUtil.setupSnapshotMaxCommitTimeQueryMode(jobConf, "1"); exception = assertThrows(HoodieIOException.class, () -> inputFormat.listStatus(jobConf)); - assertEquals("Valid timestamp is required for hoodie.%s.consume.commit in snapshot mode", exception.getMessage()); + assertEquals("Query instant (1) not found in the timeline", exception.getMessage()); } @Test diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeFileSplit.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeFileSplit.java index 9d3855c47d663..a6ca32769cf8d 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeFileSplit.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeFileSplit.java @@ -18,12 +18,11 @@ package org.apache.hudi.hadoop.realtime; -import org.apache.hudi.common.model.HoodieLogFile; -import org.apache.hudi.common.util.Option; - import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileSplit; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.util.Option; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -72,7 +71,7 @@ public void setUp(@TempDir java.nio.file.Path tempDir) throws Exception { baseFileSplit = new FileSplit(new Path(fileSplitName), 0, 100, new String[] {}); maxCommitTime = "10001"; - split = new HoodieRealtimeFileSplit(baseFileSplit, basePath, deltaLogFiles, maxCommitTime, Option.empty()); + split = new HoodieRealtimeFileSplit(baseFileSplit, basePath, deltaLogFiles, maxCommitTime, false, Option.empty()); } @Test diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java index f8daf70542053..fc4eb7ce2c042 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java @@ -18,8 +18,28 @@ package org.apache.hudi.hadoop.realtime; +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieCompactionPlan; @@ -30,8 +50,8 @@ import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.model.HoodieTableType; -import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.HoodieLogFormat.Writer; import org.apache.hudi.common.table.log.block.HoodieLogBlock; @@ -44,32 +64,9 @@ import org.apache.hudi.common.util.collection.ExternalSpillableMap; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.hadoop.BaseFileWithLogsSplit; -import org.apache.hudi.hadoop.PathWithLogFilePath; import org.apache.hudi.hadoop.RealtimeFileStatus; -import org.apache.hudi.hadoop.testutils.InputFormatTestUtil; import org.apache.hudi.hadoop.config.HoodieRealtimeConfig; - -import org.apache.avro.Schema; -import org.apache.avro.Schema.Field; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; -import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat; -import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; -import org.apache.hadoop.io.ArrayWritable; -import org.apache.hadoop.io.BooleanWritable; -import org.apache.hadoop.io.DoubleWritable; -import org.apache.hadoop.io.FloatWritable; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.FileSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordReader; +import org.apache.hudi.hadoop.testutils.InputFormatTestUtil; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -84,12 +81,12 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.Set; -import java.util.Map; -import java.util.HashMap; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -229,7 +226,9 @@ private void testReaderInternal(ExternalSpillableMap.DiskMapType diskMapType, new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + baseInstant + ".parquet"), 0, 1, baseJobConf), basePath.toUri().toString(), fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()) .collect(Collectors.toList()), - instantTime, Option.empty()); + instantTime, + false, + Option.empty()); // create a RecordReader to be used by HoodieRealtimeRecordReader RecordReader reader = new MapredParquetInputFormat().getRecordReader( @@ -309,7 +308,7 @@ public void testUnMergedReader() throws Exception { // create a split with baseFile (parquet file written earlier) and new log file(s) HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit( new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + instantTime + ".parquet"), 0, 1, baseJobConf), - basePath.toUri().toString(), Collections.singletonList(writer.getLogFile()), newCommitTime, Option.empty()); + basePath.toUri().toString(), Collections.singletonList(writer.getLogFile()), newCommitTime, false, Option.empty()); // create a RecordReader to be used by HoodieRealtimeRecordReader RecordReader reader = new MapredParquetInputFormat().getRecordReader( @@ -388,7 +387,7 @@ public void testReaderWithNestedAndComplexSchema(ExternalSpillableMap.DiskMapTyp // create a split with baseFile (parquet file written earlier) and new log file(s) HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit( new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + instantTime + ".parquet"), 0, 1, baseJobConf), - basePath.toUri().toString(), Collections.singletonList(writer.getLogFile()), newCommitTime, Option.empty()); + basePath.toUri().toString(), Collections.singletonList(writer.getLogFile()), newCommitTime, false, Option.empty()); // create a RecordReader to be used by HoodieRealtimeRecordReader RecordReader reader = new MapredParquetInputFormat().getRecordReader( @@ -535,7 +534,7 @@ public void testSchemaEvolutionAndRollbackBlockInLastLogFile(ExternalSpillableMa // create a split with baseFile (parquet file written earlier) and new log file(s) HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit( new FileSplit(new Path(partitionDir + "/fileid0_1_" + instantTime + ".parquet"), 0, 1, baseJobConf), - basePath.toUri().toString(), logFiles, newCommitTime, Option.empty()); + basePath.toUri().toString(), logFiles, newCommitTime, false, Option.empty()); // create a RecordReader to be used by HoodieRealtimeRecordReader RecordReader reader = new MapredParquetInputFormat().getRecordReader( @@ -615,7 +614,7 @@ public void testIncrementalWithOnlylog() throws Exception { HoodieParquetRealtimeInputFormat inputFormat = new HoodieParquetRealtimeInputFormat(); inputFormat.setConf(baseJobConf); InputSplit[] splits = inputFormat.getSplits(baseJobConf, 1); - assertTrue(splits.length == 1); + assertEquals(1, splits.length); JobConf newJobConf = new JobConf(baseJobConf); List fields = schema.getFields(); setHiveColumnNameProps(fields, newJobConf, false); @@ -769,13 +768,16 @@ public void testLogOnlyReader() throws Exception { FileCreateUtils.createDeltaCommit(basePath.toString(), instantTime); // create a split with new log file(s) fileSlice.addLogFile(new HoodieLogFile(writer.getLogFile().getPath(), size)); - RealtimeFileStatus realtimeFileStatus = new RealtimeFileStatus(new FileStatus(writer.getLogFile().getFileSize(), false, 1, 1, 0, writer.getLogFile().getPath())); + RealtimeFileStatus realtimeFileStatus = new RealtimeFileStatus( + new FileStatus(writer.getLogFile().getFileSize(), false, 1, 1, 0, writer.getLogFile().getPath()), + basePath.toString(), + fileSlice.getLogFiles().collect(Collectors.toList()), + false, + Option.empty()); realtimeFileStatus.setMaxCommitTime(instantTime); - realtimeFileStatus.setBasePath(basePath.toString()); - realtimeFileStatus.setDeltaLogFiles(fileSlice.getLogFiles().collect(Collectors.toList())); - PathWithLogFilePath pathWithLogFileStatus = (PathWithLogFilePath) realtimeFileStatus.getPath(); - BaseFileWithLogsSplit bs = pathWithLogFileStatus.buildSplit(pathWithLogFileStatus, 0, 0, new String[] {""}); - HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit(bs, bs.getBasePath(), bs.getDeltaLogFiles(), bs.getMaxCommitTime(), Option.empty()); + HoodieRealtimePath realtimePath = (HoodieRealtimePath) realtimeFileStatus.getPath(); + HoodieRealtimeFileSplit split = + new HoodieRealtimeFileSplit(new FileSplit(realtimePath, 0, 0, new String[] {""}), realtimePath); JobConf newJobConf = new JobConf(baseJobConf); List fields = schema.getFields(); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala index db9ebeff4d7b6..cda7b81f02e0a 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala @@ -17,24 +17,17 @@ package org.apache.hudi +import org.apache.hadoop.fs.{GlobPattern, Path} +import org.apache.hadoop.mapred.JobConf import org.apache.hudi.common.model.HoodieRecord +import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.view.HoodieTableFileSystemView -import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.exception.HoodieException -import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.listAffectedFilesForCommits -import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getCommitMetadata -import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getWritePartitionPaths +import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.{getCommitMetadata, getWritePartitionPaths, listAffectedFilesForCommits} import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes - -import org.apache.hadoop.fs.{GlobPattern, Path} -import org.apache.hadoop.mapred.JobConf - -import org.apache.log4j.LogManager - import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources.PartitionedFile -import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{Row, SQLContext} @@ -167,7 +160,7 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext, def buildFileIndex(): List[HoodieMergeOnReadFileSplit] = { val metadataList = commitsToReturn.map(instant => getCommitMetadata(instant, commitsTimelineToReturn)) - val affectedFileStatus = listAffectedFilesForCommits(new Path(metaClient.getBasePath), metadataList) + val affectedFileStatus = listAffectedFilesForCommits(conf, new Path(metaClient.getBasePath), metadataList) val fsView = new HoodieTableFileSystemView(metaClient, commitsTimelineToReturn, affectedFileStatus) // Iterate partitions to create splits diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala index be3247104c423..46201c4132078 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala @@ -65,6 +65,7 @@ class SparkHoodieTableFileIndex(spark: SparkSession, queryPaths.asJava, toJavaOption(specifiedQueryInstant), false, + false, SparkHoodieTableFileIndex.adapt(fileStatusCache) ) with SparkAdapterSupport