Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader {
private boolean closed = false;
private transient Thread shutdownThread = null;

public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize,
boolean readBlockLazily) throws IOException {
this(fs, logFile, readerSchema, bufferSize, readBlockLazily, false);
}

public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize,
boolean readBlockLazily, boolean reverseReader) throws IOException {
this(fs, logFile, readerSchema, bufferSize, readBlockLazily, reverseReader, false,
Expand All @@ -94,16 +99,11 @@ public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSc
this.enableInlineReading = enableInlineReading;
this.keyField = keyField;
if (this.reverseReader) {
this.reverseLogFilePosition = this.lastReverseLogFilePosition = fs.getFileStatus(logFile.getPath()).getLen();
this.reverseLogFilePosition = this.lastReverseLogFilePosition = logFile.getFileSize();
}
addShutDownHook();
}

public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, boolean readBlockLazily,
boolean reverseReader) throws IOException {
this(fs, logFile, readerSchema, DEFAULT_BUFFER_SIZE, readBlockLazily, reverseReader);
}

public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema) throws IOException {
this(fs, logFile, readerSchema, DEFAULT_BUFFER_SIZE, false, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ static WriterBuilder newWriterBuilder() {

static HoodieLogFormat.Reader newReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema)
throws IOException {
return new HoodieLogFileReader(fs, logFile, readerSchema, HoodieLogFileReader.DEFAULT_BUFFER_SIZE, false, false);
return new HoodieLogFileReader(fs, logFile, readerSchema, HoodieLogFileReader.DEFAULT_BUFFER_SIZE, false);
}

static HoodieLogFormat.Reader newReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,8 @@ public boolean hasNext() {
} else {
this.prevReadersInOpenState.add(currentReader);
}
this.currentReader =
new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false,
enableInlineReading, recordKeyField);
this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false,
enableInlineReading, recordKeyField);
} catch (IOException io) {
throw new HoodieIOException("unable to initialize read with log file ", io);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.collection.Pair;

import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -35,16 +36,18 @@

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Utils class for performing various log file reading operations.
*/
public class LogReaderUtils {

private static Schema readSchemaFromLogFileInReverse(FileSystem fs, HoodieActiveTimeline activeTimeline, Path path)
private static Schema readSchemaFromLogFileInReverse(FileSystem fs, HoodieActiveTimeline activeTimeline, Pair<String, Long> logFilePathSizePairs)
throws IOException {
Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path), null, true, true);
// set length for the HoodieLogFile as it will be leveraged by HoodieLogFormat.Reader with reverseReading enabled
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is this length being set? Or is this a stale comment?

Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(new Path(logFilePathSizePairs.getKey()), logFilePathSizePairs.getValue()), null, true, true);
Schema writerSchema = null;
HoodieTimeline completedTimeline = activeTimeline.getCommitsTimeline().filterCompletedInstants();
while (reader.hasPrev()) {
Expand All @@ -62,23 +65,23 @@ private static Schema readSchemaFromLogFileInReverse(FileSystem fs, HoodieActive
return writerSchema;
}

public static Schema readLatestSchemaFromLogFiles(String basePath, List<String> deltaFilePaths, Configuration config)
public static Schema readLatestSchemaFromLogFiles(String basePath, List<Pair<String, Long>> logFilePathSizePairs, Configuration config)
throws IOException {
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(config).setBasePath(basePath).build();
List<String> deltaPaths = deltaFilePaths.stream().map(s -> new HoodieLogFile(new Path(s)))
List<String> deltaPaths = logFilePathSizePairs.stream().map(s -> new HoodieLogFile(s.getKey()))
.sorted(HoodieLogFile.getReverseLogFileComparator()).map(s -> s.getPath().toString())
.collect(Collectors.toList());
if (deltaPaths.size() > 0) {
Map<String, Long> deltaFilePathToFileStatus = logFilePathSizePairs.stream().collect(Collectors.toMap(Pair::getKey, Pair::getValue));
for (String logPath : deltaPaths) {
FileSystem fs = FSUtils.getFs(logPath, config);
Schema schemaFromLogFile =
readSchemaFromLogFileInReverse(fs, metaClient.getActiveTimeline(), new Path(logPath));
readSchemaFromLogFileInReverse(fs, metaClient.getActiveTimeline(), Pair.of(logPath, deltaFilePathToFileStatus.get(logPath)));
if (schemaFromLogFile != null) {
return schemaFromLogFile;
}
}
}
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1482,7 +1482,8 @@ public void testBasicAppendAndReadInReverse(boolean readBlocksLazily)

FileCreateUtils.createDeltaCommit(basePath, "100", fs);

HoodieLogFileReader reader = new HoodieLogFileReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema(),
HoodieLogFileReader reader = new HoodieLogFileReader(fs, new HoodieLogFile(writer.getLogFile().getPath(),
fs.getFileStatus(writer.getLogFile().getPath()).getLen()), SchemaTestUtil.getSimpleSchema(),
bufferSize, readBlocksLazily, true);

assertTrue(reader.hasPrev(), "Last block should be available");
Expand Down Expand Up @@ -1560,7 +1561,8 @@ public void testAppendAndReadOnCorruptedLogInReverse(boolean readBlocksLazily)

// First round of reads - we should be able to read the first block and then EOF
HoodieLogFileReader reader =
new HoodieLogFileReader(fs, writer.getLogFile(), schema, bufferSize, readBlocksLazily, true);
new HoodieLogFileReader(fs, new HoodieLogFile(writer.getLogFile().getPath(),
fs.getFileStatus(writer.getLogFile().getPath()).getLen()), schema, bufferSize, readBlocksLazily, true);

assertTrue(reader.hasPrev(), "Last block should be available");
HoodieLogBlock block = reader.prev();
Expand Down Expand Up @@ -1610,7 +1612,8 @@ public void testBasicAppendAndTraverseInReverse(boolean readBlocksLazily)

FileCreateUtils.createDeltaCommit(basePath, "100", fs);

HoodieLogFileReader reader = new HoodieLogFileReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema(),
HoodieLogFileReader reader = new HoodieLogFileReader(fs, new HoodieLogFile(writer.getLogFile().getPath(),
fs.getFileStatus(writer.getLogFile().getPath()).getLen()), SchemaTestUtil.getSimpleSchema(),
bufferSize, readBlocksLazily, true);

assertTrue(reader.hasPrev(), "Third block should be available");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hudi.hadoop;

import org.apache.hudi.common.util.collection.Pair;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
Expand All @@ -36,7 +38,7 @@ public class BaseFileWithLogsSplit extends FileSplit {
// a flag to mark this split is produced by incremental query or not.
private boolean belongToIncrementalSplit = false;
// the log file paths of this split.
private List<String> deltaLogPaths = new ArrayList<>();
private List<Pair<String, Long>> deltaLogPathSizePairs = new ArrayList<>();
// max commit time of current split.
private String maxCommitTime = "";
// the basePath of current hoodie table.
Expand All @@ -55,9 +57,10 @@ public void write(DataOutput out) throws IOException {
Text.writeString(out, maxCommitTime);
Text.writeString(out, basePath);
Text.writeString(out, baseFilePath);
out.writeInt(deltaLogPaths.size());
for (String logPath : deltaLogPaths) {
Text.writeString(out, logPath);
out.writeInt(deltaLogPathSizePairs.size());
for (Pair<String, Long> logPathSizePair : deltaLogPathSizePairs) {
Text.writeString(out, logPathSizePair.getKey());
out.writeLong(logPathSizePair.getValue());
}
}

Expand All @@ -69,11 +72,13 @@ public void readFields(DataInput in) throws IOException {
basePath = Text.readString(in);
baseFilePath = Text.readString(in);
int deltaLogSize = in.readInt();
List<String> tempDeltaLogs = new ArrayList<>();
List<Pair<String, Long>> tempDeltaLogs = new ArrayList<>();
for (int i = 0; i < deltaLogSize; i++) {
tempDeltaLogs.add(Text.readString(in));
String logPath = Text.readString(in);
long logFileSize = in.readLong();
tempDeltaLogs.add(Pair.of(logPath, logFileSize));
}
deltaLogPaths = tempDeltaLogs;
deltaLogPathSizePairs = tempDeltaLogs;
}

public boolean getBelongToIncrementalSplit() {
Expand All @@ -84,12 +89,12 @@ public void setBelongToIncrementalSplit(boolean belongToIncrementalSplit) {
this.belongToIncrementalSplit = belongToIncrementalSplit;
}

public List<String> getDeltaLogPaths() {
return deltaLogPaths;
public List<Pair<String, Long>> getDeltaLogPathSizePairs() {
return deltaLogPathSizePairs;
}

public void setDeltaLogPaths(List<String> deltaLogPaths) {
this.deltaLogPaths = deltaLogPaths;
public void setDeltaLogPathSizePairs(List<Pair<String, Long>> deltaLogPathSizePairs) {
this.deltaLogPathSizePairs = deltaLogPathSizePairs;
}

public String getMaxCommitTime() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hudi.hadoop;

import org.apache.hudi.common.util.collection.Pair;

import org.apache.hadoop.fs.Path;

import java.util.ArrayList;
Expand All @@ -31,7 +33,7 @@ public class PathWithLogFilePath extends Path {
// a flag to mark this split is produced by incremental query or not.
private boolean belongToIncrementalPath = false;
// the log files belong this path.
private List<String> deltaLogPaths = new ArrayList<>();
private List<Pair<String, Long>> deltaLogPathSizePairs = new ArrayList<>();
// max commit time of current path.
private String maxCommitTime = "";
// the basePath of current hoodie table.
Expand All @@ -50,12 +52,12 @@ public void setBelongToIncrementalPath(boolean belongToIncrementalPath) {
this.belongToIncrementalPath = belongToIncrementalPath;
}

public List<String> getDeltaLogPaths() {
return deltaLogPaths;
public List<Pair<String, Long>> getDeltaLogPathSizePairs() {
return deltaLogPathSizePairs;
}

public void setDeltaLogPaths(List<String> deltaLogPaths) {
this.deltaLogPaths = deltaLogPaths;
public void setDeltaLogPathSizePairs(List<Pair<String, Long>> deltaLogPathSizePairs) {
this.deltaLogPathSizePairs = deltaLogPathSizePairs;
}

public String getMaxCommitTime() {
Expand Down Expand Up @@ -97,7 +99,7 @@ public boolean includeBootstrapFilePath() {
public BaseFileWithLogsSplit buildSplit(Path file, long start, long length, String[] hosts) {
BaseFileWithLogsSplit bs = new BaseFileWithLogsSplit(file, start, length, hosts);
bs.setBelongToIncrementalSplit(belongToIncrementalPath);
bs.setDeltaLogPaths(deltaLogPaths);
bs.setDeltaLogPathSizePairs(deltaLogPathSizePairs);
bs.setMaxCommitTime(maxCommitTime);
bs.setBasePath(basePath);
bs.setBaseFilePath(baseFilePath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hudi.hadoop;

import org.apache.hudi.common.util.collection.Pair;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;

Expand All @@ -35,7 +37,7 @@ public class RealtimeFileStatus extends FileStatus {
// a flag to mark this split is produced by incremental query or not.
private boolean belongToIncrementalFileStatus = false;
// the log files belong this fileStatus.
private List<String> deltaLogPaths = new ArrayList<>();
private List<Pair<String, Long>> deltaLogPathSizePairs = new ArrayList<>();
// max commit time of current fileStatus.
private String maxCommitTime = "";
// the basePath of current hoodie table.
Expand All @@ -55,7 +57,7 @@ public Path getPath() {
Path path = super.getPath();
PathWithLogFilePath pathWithLogFilePath = new PathWithLogFilePath(path.getParent(), path.getName());
pathWithLogFilePath.setBelongToIncrementalPath(belongToIncrementalFileStatus);
pathWithLogFilePath.setDeltaLogPaths(deltaLogPaths);
pathWithLogFilePath.setDeltaLogPathSizePairs(deltaLogPathSizePairs);
pathWithLogFilePath.setMaxCommitTime(maxCommitTime);
pathWithLogFilePath.setBasePath(basePath);
pathWithLogFilePath.setBaseFilePath(baseFilePath);
Expand All @@ -69,12 +71,12 @@ public void setBelongToIncrementalFileStatus(boolean belongToIncrementalFileStat
this.belongToIncrementalFileStatus = belongToIncrementalFileStatus;
}

public List<String> getDeltaLogPaths() {
return deltaLogPaths;
public List<Pair<String, Long>> getDeltaLogPathSizePairs() {
return deltaLogPathSizePairs;
}

public void setDeltaLogPaths(List<String> deltaLogPaths) {
this.deltaLogPaths = deltaLogPaths;
public void setDeltaLogPathSizePairs(List<Pair<String, Long>> deltaLogPathSizePairs) {
this.deltaLogPathSizePairs = deltaLogPathSizePairs;
}

public String getMaxCommitTime() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ private boolean usesCustomPayload() {
* job conf.
*/
private void init() throws IOException {
Schema schemaFromLogFile = LogReaderUtils.readLatestSchemaFromLogFiles(split.getBasePath(), split.getDeltaLogPaths(), jobConf);
Schema schemaFromLogFile = LogReaderUtils.readLatestSchemaFromLogFiles(split.getBasePath(), split.getDeltaLogFilePathSizePairs(), jobConf);
if (schemaFromLogFile == null) {
writerSchema = InputSplitUtils.getBaseFileSchema((FileSplit)split, jobConf);
LOG.info("Writer Schema From Parquet => " + writerSchema.getFields());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.BootstrapBaseFileSplit;
import org.apache.hudi.hadoop.FileStatusWithBootstrapBaseFile;
Expand Down Expand Up @@ -189,7 +190,7 @@ private List<FileStatus> collectAllIncrementalFiles(List<HoodieFileGroup> fileGr
fileStatus.setBelongToIncrementalFileStatus(true);
fileStatus.setBasePath(basePath);
fileStatus.setBaseFilePath(baseFilePath);
fileStatus.setDeltaLogPaths(f.getLatestFileSlice().get().getLogFiles().map(l -> l.getPath().toString()).collect(Collectors.toList()));
fileStatus.setDeltaLogPathSizePairs(f.getLatestFileSlice().get().getLogFiles().map(l -> Pair.of(l.getPath().toString(), l.getFileSize())).collect(Collectors.toList()));
// try to set bootstrapfileStatus
if (baseFileStatus instanceof LocatedFileStatusWithBootstrapBaseFile || baseFileStatus instanceof FileStatusWithBootstrapBaseFile) {
fileStatus.setBootStrapFileStatus(baseFileStatus);
Expand All @@ -202,7 +203,7 @@ private List<FileStatus> collectAllIncrementalFiles(List<HoodieFileGroup> fileGr
if (logFileStatus.size() > 0) {
RealtimeFileStatus fileStatus = new RealtimeFileStatus(logFileStatus.get(0));
fileStatus.setBelongToIncrementalFileStatus(true);
fileStatus.setDeltaLogPaths(logFileStatus.stream().map(l -> l.getPath().toString()).collect(Collectors.toList()));
fileStatus.setDeltaLogPathSizePairs(logFileStatus.stream().map(l -> Pair.of(l.getPath().toString(), l.getLen())).collect(Collectors.toList()));
fileStatus.setMaxCommitTime(maxCommitTime);
fileStatus.setBasePath(basePath);
result.add(fileStatus);
Expand Down Expand Up @@ -256,7 +257,7 @@ private FileSplit doMakeSplitForPathWithLogFilePath(PathWithLogFilePath path, lo
? super.makeSplit(path.getPathWithBootstrapFileStatus(), start, length, hosts)
: super.makeSplit(path.getPathWithBootstrapFileStatus(), start, length, hosts, inMemoryHosts);
return HoodieRealtimeInputFormatUtils
.createRealtimeBoostrapBaseFileSplit((BootstrapBaseFileSplit) bf, path.getBasePath(), path.getDeltaLogPaths(), path.getMaxCommitTime());
.createRealtimeBoostrapBaseFileSplit((BootstrapBaseFileSplit) bf, path.getBasePath(), path.getDeltaLogPathSizePairs(), path.getMaxCommitTime());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,24 @@
package org.apache.hudi.hadoop.realtime;

import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;

import org.apache.hadoop.mapred.FileSplit;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

/**
* Filesplit that wraps the base split and a list of log files to merge deltas from.
*/
public class HoodieRealtimeFileSplit extends FileSplit implements RealtimeSplit {

private List<String> deltaLogPaths;
private List<Pair<String, Long>> deltaLogFilePathSizePairs = new ArrayList<>();

private String maxCommitTime;

Expand All @@ -44,11 +48,12 @@ public HoodieRealtimeFileSplit() {
super();
}

public HoodieRealtimeFileSplit(FileSplit baseSplit, String basePath, List<String> deltaLogPaths, String maxCommitTime,
public HoodieRealtimeFileSplit(FileSplit baseSplit, String basePath, List<Pair<String, Long>> deltaLogFilePathSizePairs, String maxCommitTime,
Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfo)
throws IOException {
super(baseSplit.getPath(), baseSplit.getStart(), baseSplit.getLength(), baseSplit.getLocations());
this.deltaLogPaths = deltaLogPaths;
this.deltaLogFilePathSizePairs = deltaLogFilePathSizePairs;
this.deltaLogPaths = deltaLogFilePathSizePairs.stream().map(entry -> entry.getKey()).collect(Collectors.toList());
this.maxCommitTime = maxCommitTime;
this.basePath = basePath;
this.hoodieVirtualKeyInfo = hoodieVirtualKeyInfo;
Expand All @@ -58,6 +63,10 @@ public List<String> getDeltaLogPaths() {
return deltaLogPaths;
}

public List<Pair<String, Long>> getDeltaLogFilePathSizePairs() {
return deltaLogFilePathSizePairs;
}

public String getMaxCommitTime() {
return maxCommitTime;
}
Expand Down
Loading