diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java index 9a3913fa118d0..26afe6aec740a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java @@ -76,6 +76,11 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader { private boolean closed = false; private transient Thread shutdownThread = null; + public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize, + boolean readBlockLazily) throws IOException { + this(fs, logFile, readerSchema, bufferSize, readBlockLazily, false); + } + public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize, boolean readBlockLazily, boolean reverseReader) throws IOException { this(fs, logFile, readerSchema, bufferSize, readBlockLazily, reverseReader, false, @@ -94,16 +99,11 @@ public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSc this.enableInlineReading = enableInlineReading; this.keyField = keyField; if (this.reverseReader) { - this.reverseLogFilePosition = this.lastReverseLogFilePosition = fs.getFileStatus(logFile.getPath()).getLen(); + this.reverseLogFilePosition = this.lastReverseLogFilePosition = logFile.getFileSize(); } addShutDownHook(); } - public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, boolean readBlockLazily, - boolean reverseReader) throws IOException { - this(fs, logFile, readerSchema, DEFAULT_BUFFER_SIZE, readBlockLazily, reverseReader); - } - public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema) throws IOException { this(fs, logFile, readerSchema, DEFAULT_BUFFER_SIZE, false, false); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java index c566788fd1667..569b4a23b683b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java @@ -274,7 +274,7 @@ static WriterBuilder newWriterBuilder() { static HoodieLogFormat.Reader newReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema) throws IOException { - return new HoodieLogFileReader(fs, logFile, readerSchema, HoodieLogFileReader.DEFAULT_BUFFER_SIZE, false, false); + return new HoodieLogFileReader(fs, logFile, readerSchema, HoodieLogFileReader.DEFAULT_BUFFER_SIZE, false); } static HoodieLogFormat.Reader newReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java index 2db5437697094..d4a173d069c79 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java @@ -104,9 +104,8 @@ public boolean hasNext() { } else { this.prevReadersInOpenState.add(currentReader); } - this.currentReader = - new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false, - enableInlineReading, recordKeyField); + this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false, + enableInlineReading, recordKeyField); } catch (IOException io) { throw new HoodieIOException("unable to initialize read with log file ", io); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java index fe159df007783..c1b20cbb4c55c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java @@ -27,14 +27,15 @@ import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.collection.Pair; import org.apache.avro.Schema; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import java.io.IOException; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; /** @@ -42,9 +43,10 @@ */ public class LogReaderUtils { - private static Schema readSchemaFromLogFileInReverse(FileSystem fs, HoodieActiveTimeline activeTimeline, Path path) + private static Schema readSchemaFromLogFileInReverse(FileSystem fs, HoodieActiveTimeline activeTimeline, HoodieLogFile hoodieLogFile) throws IOException { - Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path), null, true, true); + // set length for the HoodieLogFile as it will be leveraged by HoodieLogFormat.Reader with reverseReading enabled + Reader reader = HoodieLogFormat.newReader(fs, hoodieLogFile, null, true, true); Schema writerSchema = null; HoodieTimeline completedTimeline = activeTimeline.getCommitsTimeline().filterCompletedInstants(); while (reader.hasPrev()) { @@ -62,17 +64,17 @@ private static Schema readSchemaFromLogFileInReverse(FileSystem fs, HoodieActive return writerSchema; } - public static Schema readLatestSchemaFromLogFiles(String basePath, List deltaFilePaths, Configuration config) + public static Schema readLatestSchemaFromLogFiles(String basePath, List logFiles, Configuration config) throws IOException { HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(config).setBasePath(basePath).build(); - List deltaPaths = deltaFilePaths.stream().map(s -> new HoodieLogFile(new Path(s))) - .sorted(HoodieLogFile.getReverseLogFileComparator()).map(s -> s.getPath().toString()) + List deltaPaths = logFiles.stream().sorted(HoodieLogFile.getReverseLogFileComparator()).map(s -> s.getPath().toString()) .collect(Collectors.toList()); if (deltaPaths.size() > 0) { + Map deltaFilePathToFileStatus = logFiles.stream().map(entry -> Pair.of(entry.getPath().toString(), entry)) + .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); for (String logPath : deltaPaths) { FileSystem fs = FSUtils.getFs(logPath, config); - Schema schemaFromLogFile = - readSchemaFromLogFileInReverse(fs, metaClient.getActiveTimeline(), new Path(logPath)); + Schema schemaFromLogFile = readSchemaFromLogFileInReverse(fs, metaClient.getActiveTimeline(), deltaFilePathToFileStatus.get(logPath)); if (schemaFromLogFile != null) { return schemaFromLogFile; } @@ -80,5 +82,4 @@ public static Schema readLatestSchemaFromLogFiles(String basePath, List } return null; } - } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java index 5be3b9674573f..17bf9135381e5 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java @@ -1482,7 +1482,8 @@ public void testBasicAppendAndReadInReverse(boolean readBlocksLazily) FileCreateUtils.createDeltaCommit(basePath, "100", fs); - HoodieLogFileReader reader = new HoodieLogFileReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema(), + HoodieLogFileReader reader = new HoodieLogFileReader(fs, new HoodieLogFile(writer.getLogFile().getPath(), + fs.getFileStatus(writer.getLogFile().getPath()).getLen()), SchemaTestUtil.getSimpleSchema(), bufferSize, readBlocksLazily, true); assertTrue(reader.hasPrev(), "Last block should be available"); @@ -1560,7 +1561,8 @@ public void testAppendAndReadOnCorruptedLogInReverse(boolean readBlocksLazily) // First round of reads - we should be able to read the first block and then EOF HoodieLogFileReader reader = - new HoodieLogFileReader(fs, writer.getLogFile(), schema, bufferSize, readBlocksLazily, true); + new HoodieLogFileReader(fs, new HoodieLogFile(writer.getLogFile().getPath(), + fs.getFileStatus(writer.getLogFile().getPath()).getLen()), schema, bufferSize, readBlocksLazily, true); assertTrue(reader.hasPrev(), "Last block should be available"); HoodieLogBlock block = reader.prev(); @@ -1610,7 +1612,8 @@ public void testBasicAppendAndTraverseInReverse(boolean readBlocksLazily) FileCreateUtils.createDeltaCommit(basePath, "100", fs); - HoodieLogFileReader reader = new HoodieLogFileReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema(), + HoodieLogFileReader reader = new HoodieLogFileReader(fs, new HoodieLogFile(writer.getLogFile().getPath(), + fs.getFileStatus(writer.getLogFile().getPath()).getLen()), SchemaTestUtil.getSimpleSchema(), bufferSize, readBlocksLazily, true); assertTrue(reader.hasPrev(), "Third block should be available"); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/BaseFileWithLogsSplit.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/BaseFileWithLogsSplit.java index 34e0a392bd9dc..c9afa9119c0c5 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/BaseFileWithLogsSplit.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/BaseFileWithLogsSplit.java @@ -18,6 +18,8 @@ package org.apache.hudi.hadoop; +import org.apache.hudi.common.model.HoodieLogFile; + import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileSplit; @@ -36,7 +38,7 @@ public class BaseFileWithLogsSplit extends FileSplit { // a flag to mark this split is produced by incremental query or not. private boolean belongToIncrementalSplit = false; // the log file paths of this split. - private List deltaLogPaths = new ArrayList<>(); + private List deltaLogFiles = new ArrayList<>(); // max commit time of current split. private String maxCommitTime = ""; // the basePath of current hoodie table. @@ -55,9 +57,10 @@ public void write(DataOutput out) throws IOException { Text.writeString(out, maxCommitTime); Text.writeString(out, basePath); Text.writeString(out, baseFilePath); - out.writeInt(deltaLogPaths.size()); - for (String logPath : deltaLogPaths) { - Text.writeString(out, logPath); + out.writeInt(deltaLogFiles.size()); + for (HoodieLogFile logFile : deltaLogFiles) { + Text.writeString(out, logFile.getPath().toString()); + out.writeLong(logFile.getFileSize()); } } @@ -69,11 +72,13 @@ public void readFields(DataInput in) throws IOException { basePath = Text.readString(in); baseFilePath = Text.readString(in); int deltaLogSize = in.readInt(); - List tempDeltaLogs = new ArrayList<>(); + List tempDeltaLogs = new ArrayList<>(); for (int i = 0; i < deltaLogSize; i++) { - tempDeltaLogs.add(Text.readString(in)); + String logPath = Text.readString(in); + long logFileSize = in.readLong(); + tempDeltaLogs.add(new HoodieLogFile(new Path(logPath), logFileSize)); } - deltaLogPaths = tempDeltaLogs; + deltaLogFiles = tempDeltaLogs; } public boolean getBelongToIncrementalSplit() { @@ -84,12 +89,12 @@ public void setBelongToIncrementalSplit(boolean belongToIncrementalSplit) { this.belongToIncrementalSplit = belongToIncrementalSplit; } - public List getDeltaLogPaths() { - return deltaLogPaths; + public List getDeltaLogFiles() { + return deltaLogFiles; } - public void setDeltaLogPaths(List deltaLogPaths) { - this.deltaLogPaths = deltaLogPaths; + public void setDeltaLogFiles(List deltaLogFiles) { + this.deltaLogFiles = deltaLogFiles; } public String getMaxCommitTime() { diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/PathWithLogFilePath.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/PathWithLogFilePath.java index 5b4e535e62d1a..8f9ac8b03d575 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/PathWithLogFilePath.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/PathWithLogFilePath.java @@ -18,6 +18,8 @@ package org.apache.hudi.hadoop; +import org.apache.hudi.common.model.HoodieLogFile; + import org.apache.hadoop.fs.Path; import java.util.ArrayList; @@ -31,7 +33,7 @@ public class PathWithLogFilePath extends Path { // a flag to mark this split is produced by incremental query or not. private boolean belongToIncrementalPath = false; // the log files belong this path. - private List deltaLogPaths = new ArrayList<>(); + private List deltaLogFiles = new ArrayList<>(); // max commit time of current path. private String maxCommitTime = ""; // the basePath of current hoodie table. @@ -50,12 +52,12 @@ public void setBelongToIncrementalPath(boolean belongToIncrementalPath) { this.belongToIncrementalPath = belongToIncrementalPath; } - public List getDeltaLogPaths() { - return deltaLogPaths; + public List getDeltaLogFiles() { + return deltaLogFiles; } - public void setDeltaLogPaths(List deltaLogPaths) { - this.deltaLogPaths = deltaLogPaths; + public void setDeltaLogFiles(List deltaLogFiles) { + this.deltaLogFiles = deltaLogFiles; } public String getMaxCommitTime() { @@ -97,7 +99,7 @@ public boolean includeBootstrapFilePath() { public BaseFileWithLogsSplit buildSplit(Path file, long start, long length, String[] hosts) { BaseFileWithLogsSplit bs = new BaseFileWithLogsSplit(file, start, length, hosts); bs.setBelongToIncrementalSplit(belongToIncrementalPath); - bs.setDeltaLogPaths(deltaLogPaths); + bs.setDeltaLogFiles(deltaLogFiles); bs.setMaxCommitTime(maxCommitTime); bs.setBasePath(basePath); bs.setBaseFilePath(baseFilePath); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/RealtimeFileStatus.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/RealtimeFileStatus.java index 542720b4919b7..e8e1a28987c56 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/RealtimeFileStatus.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/RealtimeFileStatus.java @@ -18,6 +18,8 @@ package org.apache.hudi.hadoop; +import org.apache.hudi.common.model.HoodieLogFile; + import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; @@ -35,7 +37,7 @@ public class RealtimeFileStatus extends FileStatus { // a flag to mark this split is produced by incremental query or not. private boolean belongToIncrementalFileStatus = false; // the log files belong this fileStatus. - private List deltaLogPaths = new ArrayList<>(); + private List deltaLogFiles = new ArrayList<>(); // max commit time of current fileStatus. private String maxCommitTime = ""; // the basePath of current hoodie table. @@ -55,7 +57,7 @@ public Path getPath() { Path path = super.getPath(); PathWithLogFilePath pathWithLogFilePath = new PathWithLogFilePath(path.getParent(), path.getName()); pathWithLogFilePath.setBelongToIncrementalPath(belongToIncrementalFileStatus); - pathWithLogFilePath.setDeltaLogPaths(deltaLogPaths); + pathWithLogFilePath.setDeltaLogFiles(deltaLogFiles); pathWithLogFilePath.setMaxCommitTime(maxCommitTime); pathWithLogFilePath.setBasePath(basePath); pathWithLogFilePath.setBaseFilePath(baseFilePath); @@ -69,12 +71,12 @@ public void setBelongToIncrementalFileStatus(boolean belongToIncrementalFileStat this.belongToIncrementalFileStatus = belongToIncrementalFileStatus; } - public List getDeltaLogPaths() { - return deltaLogPaths; + public List getDeltaLogFiles() { + return deltaLogFiles; } - public void setDeltaLogPaths(List deltaLogPaths) { - this.deltaLogPaths = deltaLogPaths; + public void setDeltaLogFiles(List deltaLogFiles) { + this.deltaLogFiles = deltaLogFiles; } public String getMaxCommitTime() { diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java index ef3d4f1c8cea8..78ac8805d8aaf 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java @@ -82,7 +82,7 @@ private boolean usesCustomPayload() { * job conf. */ private void init() throws IOException { - Schema schemaFromLogFile = LogReaderUtils.readLatestSchemaFromLogFiles(split.getBasePath(), split.getDeltaLogPaths(), jobConf); + Schema schemaFromLogFile = LogReaderUtils.readLatestSchemaFromLogFiles(split.getBasePath(), split.getDeltaLogFiles(), jobConf); if (schemaFromLogFile == null) { writerSchema = InputSplitUtils.getBaseFileSchema((FileSplit)split, jobConf); LOG.info("Writer Schema From Parquet => " + writerSchema.getFields()); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java index cb529cbbf7cf2..e683840c6f4dc 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFileGroup; +import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -189,7 +190,7 @@ private List collectAllIncrementalFiles(List fileGr fileStatus.setBelongToIncrementalFileStatus(true); fileStatus.setBasePath(basePath); fileStatus.setBaseFilePath(baseFilePath); - fileStatus.setDeltaLogPaths(f.getLatestFileSlice().get().getLogFiles().map(l -> l.getPath().toString()).collect(Collectors.toList())); + fileStatus.setDeltaLogFiles(f.getLatestFileSlice().get().getLogFiles().collect(Collectors.toList())); // try to set bootstrapfileStatus if (baseFileStatus instanceof LocatedFileStatusWithBootstrapBaseFile || baseFileStatus instanceof FileStatusWithBootstrapBaseFile) { fileStatus.setBootStrapFileStatus(baseFileStatus); @@ -202,7 +203,7 @@ private List collectAllIncrementalFiles(List fileGr if (logFileStatus.size() > 0) { RealtimeFileStatus fileStatus = new RealtimeFileStatus(logFileStatus.get(0)); fileStatus.setBelongToIncrementalFileStatus(true); - fileStatus.setDeltaLogPaths(logFileStatus.stream().map(l -> l.getPath().toString()).collect(Collectors.toList())); + fileStatus.setDeltaLogFiles(logFileStatus.stream().map(l -> new HoodieLogFile(l.getPath(), l.getLen())).collect(Collectors.toList())); fileStatus.setMaxCommitTime(maxCommitTime); fileStatus.setBasePath(basePath); result.add(fileStatus); @@ -256,7 +257,7 @@ private FileSplit doMakeSplitForPathWithLogFilePath(PathWithLogFilePath path, lo ? super.makeSplit(path.getPathWithBootstrapFileStatus(), start, length, hosts) : super.makeSplit(path.getPathWithBootstrapFileStatus(), start, length, hosts, inMemoryHosts); return HoodieRealtimeInputFormatUtils - .createRealtimeBoostrapBaseFileSplit((BootstrapBaseFileSplit) bf, path.getBasePath(), path.getDeltaLogPaths(), path.getMaxCommitTime()); + .createRealtimeBoostrapBaseFileSplit((BootstrapBaseFileSplit) bf, path.getBasePath(), path.getDeltaLogFiles(), path.getMaxCommitTime()); } } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeFileSplit.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeFileSplit.java index 6423f2cfd46e8..a39ec35507a77 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeFileSplit.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeFileSplit.java @@ -18,6 +18,7 @@ package org.apache.hudi.hadoop.realtime; +import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.util.Option; import org.apache.hadoop.mapred.FileSplit; @@ -25,7 +26,9 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.ArrayList; import java.util.List; +import java.util.stream.Collectors; /** * Filesplit that wraps the base split and a list of log files to merge deltas from. @@ -33,6 +36,7 @@ public class HoodieRealtimeFileSplit extends FileSplit implements RealtimeSplit { private List deltaLogPaths; + private List deltaLogFiles = new ArrayList<>(); private String maxCommitTime; @@ -44,11 +48,12 @@ public HoodieRealtimeFileSplit() { super(); } - public HoodieRealtimeFileSplit(FileSplit baseSplit, String basePath, List deltaLogPaths, String maxCommitTime, + public HoodieRealtimeFileSplit(FileSplit baseSplit, String basePath, List deltaLogFiles, String maxCommitTime, Option hoodieVirtualKeyInfo) throws IOException { super(baseSplit.getPath(), baseSplit.getStart(), baseSplit.getLength(), baseSplit.getLocations()); - this.deltaLogPaths = deltaLogPaths; + this.deltaLogFiles = deltaLogFiles; + this.deltaLogPaths = deltaLogFiles.stream().map(entry -> entry.getPath().toString()).collect(Collectors.toList()); this.maxCommitTime = maxCommitTime; this.basePath = basePath; this.hoodieVirtualKeyInfo = hoodieVirtualKeyInfo; @@ -58,6 +63,10 @@ public List getDeltaLogPaths() { return deltaLogPaths; } + public List getDeltaLogFiles() { + return deltaLogFiles; + } + public String getMaxCommitTime() { return maxCommitTime; } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeBootstrapBaseFileSplit.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeBootstrapBaseFileSplit.java index 4da310da4fba4..79d2d815ee809 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeBootstrapBaseFileSplit.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeBootstrapBaseFileSplit.java @@ -18,6 +18,7 @@ package org.apache.hudi.hadoop.realtime; +import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.util.Option; import org.apache.hudi.hadoop.BootstrapBaseFileSplit; @@ -26,7 +27,9 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.ArrayList; import java.util.List; +import java.util.stream.Collectors; /** * Realtime File Split with external base file. @@ -34,6 +37,7 @@ public class RealtimeBootstrapBaseFileSplit extends BootstrapBaseFileSplit implements RealtimeSplit { private List deltaLogPaths; + private List deltaLogFiles = new ArrayList<>(); private String maxInstantTime; @@ -43,11 +47,12 @@ public RealtimeBootstrapBaseFileSplit() { super(); } - public RealtimeBootstrapBaseFileSplit(FileSplit baseSplit, String basePath, List deltaLogPaths, + public RealtimeBootstrapBaseFileSplit(FileSplit baseSplit, String basePath, List deltaLogFiles, String maxInstantTime, FileSplit externalFileSplit) throws IOException { super(baseSplit, externalFileSplit); this.maxInstantTime = maxInstantTime; - this.deltaLogPaths = deltaLogPaths; + this.deltaLogFiles = deltaLogFiles; + this.deltaLogPaths = deltaLogFiles.stream().map(entry -> entry.getPath().toString()).collect(Collectors.toList()); this.basePath = basePath; } @@ -68,6 +73,11 @@ public List getDeltaLogPaths() { return deltaLogPaths; } + @Override + public List getDeltaLogFiles() { + return deltaLogFiles; + } + @Override public String getMaxCommitTime() { return maxInstantTime; diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeSplit.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeSplit.java index 108613c182827..a7f0d2cc2f5e7 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeSplit.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeSplit.java @@ -18,6 +18,7 @@ package org.apache.hudi.hadoop.realtime; +import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.util.Option; import org.apache.hudi.hadoop.InputSplitUtils; @@ -41,6 +42,8 @@ public interface RealtimeSplit extends InputSplitWithLocationInfo { */ List getDeltaLogPaths(); + List getDeltaLogFiles(); + /** * Return Max Instant Time. * @return diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java index 25dde840c13bb..4f4b69f9813ac 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java @@ -466,7 +466,7 @@ public static List filterFileStatusForSnapshotMode(JobConf job, Map< HoodieTableFileSystemView fsView = fsViewCache.computeIfAbsent(metaClient, tableMetaClient -> FileSystemViewManager.createInMemoryFileSystemViewWithTimeline(engineContext, tableMetaClient, buildMetadataConfig(job), timeline)); List filteredBaseFiles = new ArrayList<>(); - Map> filteredLogs = new HashMap<>(); + Map> filteredLogs = new HashMap<>(); for (Path p : entry.getValue()) { String relativePartitionPath = FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), p); List matched = fsView.getLatestBaseFiles(relativePartitionPath).collect(Collectors.toList()); @@ -476,9 +476,8 @@ public static List filterFileStatusForSnapshotMode(JobConf job, Map< .filter(f -> !f.getBaseFile().isPresent() && f.getLatestLogFile().isPresent()) .collect(Collectors.toList()); logMatched.forEach(f -> { - List logPaths = f.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()) - .map(log -> log.getPath().toString()).collect(Collectors.toList()); - filteredLogs.put(f.getLatestLogFile().get().getFileStatus(), logPaths); + List logPathSizePairs = f.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList()); + filteredLogs.put(f.getLatestLogFile().get().getFileStatus(), logPathSizePairs); }); } } @@ -492,9 +491,9 @@ public static List filterFileStatusForSnapshotMode(JobConf job, Map< returns.add(getFileStatus(filteredFile)); } - for (Map.Entry> filterLogEntry : filteredLogs.entrySet()) { + for (Map.Entry> filterLogEntry : filteredLogs.entrySet()) { RealtimeFileStatus rs = new RealtimeFileStatus(filterLogEntry.getKey()); - rs.setDeltaLogPaths(filterLogEntry.getValue()); + rs.setDeltaLogFiles(filterLogEntry.getValue()); returns.add(rs); } } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java index 11ce0a66ad330..1761092f5bcb6 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java @@ -120,13 +120,13 @@ public static InputSplit[] getRealtimeSplits(Configuration conf, Stream dataFileSplits = groupedInputSplits.getOrDefault(fileSlice.getFileId(), new ArrayList<>()); dataFileSplits.forEach(split -> { try { - List logFilePaths = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()) - .map(logFile -> logFile.getPath().toString()).collect(Collectors.toList()); + List logFiles = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()) + .collect(Collectors.toList()); if (split instanceof BootstrapBaseFileSplit) { BootstrapBaseFileSplit eSplit = (BootstrapBaseFileSplit) split; - rtSplits.add(createRealtimeBoostrapBaseFileSplit(eSplit, metaClient.getBasePath(), logFilePaths, maxCommitTime)); + rtSplits.add(createRealtimeBoostrapBaseFileSplit(eSplit, metaClient.getBasePath(), logFiles, maxCommitTime)); } else { - rtSplits.add(new HoodieRealtimeFileSplit(split, metaClient.getBasePath(), logFilePaths, maxCommitTime, finalHoodieVirtualKeyInfo)); + rtSplits.add(new HoodieRealtimeFileSplit(split, metaClient.getBasePath(), logFiles, maxCommitTime, finalHoodieVirtualKeyInfo)); } } catch (IOException e) { throw new HoodieIOException("Error creating hoodie real time split ", e); @@ -162,7 +162,7 @@ public static InputSplit[] getIncrementalRealtimeSplits(Configuration conf, Stre if (s instanceof BaseFileWithLogsSplit) { BaseFileWithLogsSplit bs = (BaseFileWithLogsSplit)s; if (bs.getBelongToIncrementalSplit()) { - rtSplits.add(new HoodieRealtimeFileSplit(bs, bs.getBasePath(), bs.getDeltaLogPaths(), bs.getMaxCommitTime(), finalHoodieVirtualKeyInfo)); + rtSplits.add(new HoodieRealtimeFileSplit(bs, bs.getBasePath(), bs.getDeltaLogFiles(), bs.getMaxCommitTime(), finalHoodieVirtualKeyInfo)); } } else if (s instanceof RealtimeBootstrapBaseFileSplit) { rtSplits.add(s); @@ -206,7 +206,7 @@ public static boolean isIncrementalQuerySplits(List fileSplits) { } public static RealtimeBootstrapBaseFileSplit createRealtimeBoostrapBaseFileSplit( - BootstrapBaseFileSplit split, String basePath, List deltaLogPaths, String maxInstantTime) { + BootstrapBaseFileSplit split, String basePath, List logFiles, String maxInstantTime) { try { String[] hosts = split.getLocationInfo() != null ? Arrays.stream(split.getLocationInfo()) .filter(x -> !x.isInMemory()).toArray(String[]::new) : new String[0]; @@ -214,7 +214,7 @@ public static RealtimeBootstrapBaseFileSplit createRealtimeBoostrapBaseFileSplit .filter(SplitLocationInfo::isInMemory).toArray(String[]::new) : new String[0]; FileSplit baseSplit = new FileSplit(split.getPath(), split.getStart(), split.getLength(), hosts, inMemoryHosts); - return new RealtimeBootstrapBaseFileSplit(baseSplit, basePath, deltaLogPaths, maxInstantTime, split.getBootstrapFileSplit()); + return new RealtimeBootstrapBaseFileSplit(baseSplit, basePath, logFiles, maxInstantTime, split.getBootstrapFileSplit()); } catch (IOException e) { throw new HoodieIOException("Error creating hoodie real time split ", e); } diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeFileSplit.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeFileSplit.java index ac857868c0411..9d3855c47d663 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeFileSplit.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeFileSplit.java @@ -18,6 +18,7 @@ package org.apache.hudi.hadoop.realtime; +import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.util.Option; import org.apache.hadoop.fs.Path; @@ -56,6 +57,7 @@ public class TestHoodieRealtimeFileSplit { private HoodieRealtimeFileSplit split; private String basePath; + private List deltaLogFiles; private List deltaLogPaths; private String fileSplitName; private FileSplit baseFileSplit; @@ -64,12 +66,13 @@ public class TestHoodieRealtimeFileSplit { @BeforeEach public void setUp(@TempDir java.nio.file.Path tempDir) throws Exception { basePath = tempDir.toAbsolutePath().toString(); + deltaLogFiles = Collections.singletonList(new HoodieLogFile(new Path(basePath + "/1.log"), 0L)); deltaLogPaths = Collections.singletonList(basePath + "/1.log"); fileSplitName = basePath + "/test.file"; baseFileSplit = new FileSplit(new Path(fileSplitName), 0, 100, new String[] {}); maxCommitTime = "10001"; - split = new HoodieRealtimeFileSplit(baseFileSplit, basePath, deltaLogPaths, maxCommitTime, Option.empty()); + split = new HoodieRealtimeFileSplit(baseFileSplit, basePath, deltaLogFiles, maxCommitTime, Option.empty()); } @Test diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java index 8375dd3afca69..9f6e77bd1f4d0 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java @@ -221,7 +221,7 @@ private void testReaderInternal(ExternalSpillableMap.DiskMapType diskMapType, HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit( new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + baseInstant + ".parquet"), 0, 1, baseJobConf), basePath.toUri().toString(), fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()) - .map(h -> h.getPath().toString()).collect(Collectors.toList()), + .collect(Collectors.toList()), instantTime, Option.empty()); // create a RecordReader to be used by HoodieRealtimeRecordReader @@ -290,10 +290,9 @@ public void testUnMergedReader() throws Exception { FileCreateUtils.createDeltaCommit(basePath.toString(), newCommitTime); // create a split with baseFile (parquet file written earlier) and new log file(s) - String logFilePath = writer.getLogFile().getPath().toString(); HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit( new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + instantTime + ".parquet"), 0, 1, baseJobConf), - basePath.toUri().toString(), Collections.singletonList(logFilePath), newCommitTime, Option.empty()); + basePath.toUri().toString(), Collections.singletonList(writer.getLogFile()), newCommitTime, Option.empty()); // create a RecordReader to be used by HoodieRealtimeRecordReader RecordReader reader = new MapredParquetInputFormat().getRecordReader( @@ -370,10 +369,9 @@ public void testReaderWithNestedAndComplexSchema(ExternalSpillableMap.DiskMapTyp InputFormatTestUtil.deltaCommit(basePath, newCommitTime); // create a split with baseFile (parquet file written earlier) and new log file(s) - String logFilePath = writer.getLogFile().getPath().toString(); HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit( new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + instantTime + ".parquet"), 0, 1, baseJobConf), - basePath.toUri().toString(), Collections.singletonList(logFilePath), newCommitTime, Option.empty()); + basePath.toUri().toString(), Collections.singletonList(writer.getLogFile()), newCommitTime, Option.empty()); // create a RecordReader to be used by HoodieRealtimeRecordReader RecordReader reader = new MapredParquetInputFormat().getRecordReader( @@ -483,7 +481,7 @@ public void testReaderWithNestedAndComplexSchema(ExternalSpillableMap.DiskMapTyp public void testSchemaEvolutionAndRollbackBlockInLastLogFile(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled) throws Exception { // initial commit - List logFilePaths = new ArrayList<>(); + List logFiles = new ArrayList<>(); Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema()); HoodieTestUtils.init(hadoopConf, basePath.toString(), HoodieTableType.MERGE_ON_READ); String instantTime = "100"; @@ -504,7 +502,7 @@ public void testSchemaEvolutionAndRollbackBlockInLastLogFile(ExternalSpillableMa InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, "fileid0", instantTime, newCommitTime, numberOfLogRecords, 0, 1); long size = writer.getCurrentSize(); - logFilePaths.add(writer.getLogFile().getPath().toString()); + logFiles.add(writer.getLogFile()); writer.close(); assertTrue(size > 0, "block - size should be > 0"); @@ -512,14 +510,14 @@ public void testSchemaEvolutionAndRollbackBlockInLastLogFile(ExternalSpillableMa newCommitTime = "102"; writer = InputFormatTestUtil.writeRollbackBlockToLogFile(partitionDir, fs, schema, "fileid0", instantTime, newCommitTime, "101", 1); - logFilePaths.add(writer.getLogFile().getPath().toString()); + logFiles.add(writer.getLogFile()); writer.close(); InputFormatTestUtil.deltaCommit(basePath, newCommitTime); // create a split with baseFile (parquet file written earlier) and new log file(s) HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit( new FileSplit(new Path(partitionDir + "/fileid0_1_" + instantTime + ".parquet"), 0, 1, baseJobConf), - basePath.toUri().toString(), logFilePaths, newCommitTime, Option.empty()); + basePath.toUri().toString(), logFiles, newCommitTime, Option.empty()); // create a RecordReader to be used by HoodieRealtimeRecordReader RecordReader reader = new MapredParquetInputFormat().getRecordReader( @@ -695,6 +693,7 @@ private HoodieWriteStat createHoodieWriteStat(java.nio.file.Path basePath, Strin writeStat.setNumUpdateWrites(100); writeStat.setNumWrites(100); writeStat.setPath(filePath); + writeStat.setFileSizeInBytes(new File(new Path(basePath.toString(), filePath).toString()).length()); writeStat.setPartitionPath(partitionPath); writeStat.setTotalLogFilesCompacted(100L); HoodieWriteStat.RuntimeStats runtimeStats = new HoodieWriteStat.RuntimeStats(); @@ -750,14 +749,14 @@ public void testLogOnlyReader() throws Exception { assertTrue(size > 0, "block - size should be > 0"); FileCreateUtils.createDeltaCommit(basePath.toString(), instantTime); // create a split with new log file(s) - fileSlice.addLogFile(writer.getLogFile()); - RealtimeFileStatus realtimeFileStatus = new RealtimeFileStatus(new FileStatus(0, false, 1, 1, 0, writer.getLogFile().getPath())); + fileSlice.addLogFile(new HoodieLogFile(writer.getLogFile().getPath(), size)); + RealtimeFileStatus realtimeFileStatus = new RealtimeFileStatus(new FileStatus(writer.getLogFile().getFileSize(), false, 1, 1, 0, writer.getLogFile().getPath())); realtimeFileStatus.setMaxCommitTime(instantTime); realtimeFileStatus.setBasePath(basePath.toString()); - realtimeFileStatus.setDeltaLogPaths(fileSlice.getLogFiles().map(l -> l.getPath().toString()).collect(Collectors.toList())); + realtimeFileStatus.setDeltaLogFiles(fileSlice.getLogFiles().collect(Collectors.toList())); PathWithLogFilePath pathWithLogFileStatus = (PathWithLogFilePath) realtimeFileStatus.getPath(); BaseFileWithLogsSplit bs = pathWithLogFileStatus.buildSplit(pathWithLogFileStatus, 0, 0, new String[] {""}); - HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit(bs, bs.getBasePath(), bs.getDeltaLogPaths(), bs.getMaxCommitTime(), Option.empty()); + HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit(bs, bs.getBasePath(), bs.getDeltaLogFiles(), bs.getMaxCommitTime(), Option.empty()); JobConf newJobConf = new JobConf(baseJobConf); List fields = schema.getFields();