diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java index 1990c0a517e27..fb36bfe49b548 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java @@ -18,6 +18,15 @@ package org.apache.hudi.common.fs; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieFileFormat; @@ -31,16 +40,6 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.InvalidHoodiePathException; import org.apache.hudi.metadata.HoodieTableMetadata; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocatedFileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.fs.RemoteIterator; -import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -50,11 +49,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; -import java.util.Set; - import java.util.List; import java.util.Map.Entry; import java.util.Objects; +import java.util.Set; import java.util.UUID; import java.util.function.Function; import java.util.regex.Matcher; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java index 5bd43ac4b7cf4..e437b782e2e81 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java @@ -40,6 +40,7 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -57,6 +58,7 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader { public static final int DEFAULT_BUFFER_SIZE = 16 * 1024 * 1024; // 16 MB + private static final int BLOCK_SCAN_READ_BUFFER_SIZE = 1024 * 1024; // 1 MB private static final Logger LOG = LogManager.getLogger(HoodieLogFileReader.class); private final FSDataInputStream inputStream; @@ -71,9 +73,13 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader { private transient Thread shutdownThread = null; public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize, - boolean readBlockLazily, boolean reverseReader) throws IOException { + boolean readBlockLazily, boolean reverseReader) throws IOException { FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(), bufferSize); - if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) { + if (FSUtils.isGCSInputStream(fsDataInputStream)) { + this.inputStream = new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream( + new BufferedFSInputStream((FSInputStream) (( + (FSDataInputStream) fsDataInputStream.getWrappedStream()).getWrappedStream()), bufferSize))); + } else if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) { this.inputStream = new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream( new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize))); } else { @@ -274,19 +280,25 @@ private boolean isBlockCorrupt(int blocksize) throws IOException { } private long scanForNextAvailableBlockOffset() throws IOException { + // Make buffer large enough to scan through the file as quick as possible especially if it is on S3/GCS. + byte[] dataBuf = new byte[BLOCK_SCAN_READ_BUFFER_SIZE]; + boolean eof = false; while (true) { long currentPos = inputStream.getPos(); try { - boolean hasNextMagic = hasNextMagic(); - if (hasNextMagic) { - return currentPos; - } else { - // No luck - advance and try again - inputStream.seek(currentPos + 1); - } + Arrays.fill(dataBuf, (byte) 0); + inputStream.readFully(dataBuf, 0, dataBuf.length); } catch (EOFException e) { + eof = true; + } + long pos = Bytes.indexOf(dataBuf, HoodieLogFormat.MAGIC); + if (pos >= 0) { + return currentPos + pos; + } + if (eof) { return inputStream.getPos(); } + inputStream.seek(currentPos + dataBuf.length - HoodieLogFormat.MAGIC.length); } }