diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java index 65a7259a6f14d..140e7ff5b6330 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java @@ -35,6 +35,7 @@ import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieTableQueryType; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -253,6 +254,7 @@ private List listStatusForSnapshotMode(JobConf job, partitionedFileSlices.values() .stream() .flatMap(Collection::stream) + .filter(fileSlice -> checkIfValidFileSlice(fileSlice)) .map(fileSlice -> createFileStatusUnchecked(fileSlice, fileIndex, virtualKeyInfoOpt)) .collect(Collectors.toList()) ); @@ -261,6 +263,20 @@ private List listStatusForSnapshotMode(JobConf job, return targetFiles; } + protected boolean checkIfValidFileSlice(FileSlice fileSlice) { + Option baseFileOpt = fileSlice.getBaseFile(); + Option latestLogFileOpt = fileSlice.getLatestLogFile(); + + if (baseFileOpt.isPresent()) { + return true; + } else if (latestLogFileOpt.isPresent()) { + // It happens when reading optimized query to mor. + return false; + } else { + throw new IllegalStateException("Invalid state: base-file has to be present for " + fileSlice.getFileId()); + } + } + private void validate(List targetFiles, List legacyFileStatuses) { List diff = CollectionUtils.diff(targetFiles, legacyFileStatuses); checkState(diff.isEmpty(), "Should be empty"); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java index 982d52b0d4807..64fc54392a28f 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java @@ -103,6 +103,18 @@ protected FileStatus createFileStatusUnchecked(FileSlice fileSlice, HiveHoodieTa } } + @Override + protected boolean checkIfValidFileSlice(FileSlice fileSlice) { + Option baseFileOpt = fileSlice.getBaseFile(); + Option latestLogFileOpt = fileSlice.getLatestLogFile(); + + if (baseFileOpt.isPresent() || latestLogFileOpt.isPresent()) { + return true; + } else { + throw new IllegalStateException("Invalid state: either base-file or log-file has to be present for " + fileSlice.getFileId()); + } + } + /** * Keep the logic of mor_incr_view as same as spark datasource. * Step1: Get list of commits to be fetched based on start commit and max commits(for snapshot max commits is -1).