diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java index 9f11e68ddc36d..07cb36bb169bb 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java @@ -18,13 +18,6 @@ package org.apache.hudi.common.table.log; -import org.apache.avro.Schema; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.BufferedFSInputStream; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.SchemeAwareFSDataInputStream; import org.apache.hudi.common.fs.TimedFSDataInputStream; @@ -43,10 +36,19 @@ import org.apache.hudi.exception.CorruptedLogFileException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieNotSupportedException; + +import org.apache.avro.Schema; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BufferedFSInputStream; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import javax.annotation.Nullable; + import java.io.EOFException; import java.io.IOException; import java.util.Arrays; @@ -254,8 +256,17 @@ private HoodieLogBlock createCorruptBlock() throws IOException { private boolean isBlockCorrupted(int blocksize) throws IOException { long currentPos = inputStream.getPos(); + long blockSizeFromFooter; + try { - inputStream.seek(currentPos + blocksize); + // check if the blocksize mentioned in the footer is the same as the header; + // by seeking and checking the length of a long. We do not seek `currentPos + blocksize` + // which can be the file size for the last block in the file, causing EOFException + // for some FSDataInputStream implementation + inputStream.seek(currentPos + blocksize - Long.BYTES); + // Block size in the footer includes the magic header, which the header does not include. + // So we have to shorten the footer block size by the size of magic hash + blockSizeFromFooter = inputStream.readLong() - magicBuffer.length; } catch (EOFException e) { LOG.info("Found corrupted block in file " + logFile + " with block size(" + blocksize + ") running past EOF"); // this is corrupt @@ -266,19 +277,13 @@ private boolean isBlockCorrupted(int blocksize) throws IOException { return true; } - // check if the blocksize mentioned in the footer is the same as the header; by seeking back the length of a long - // the backward seek does not incur additional IO as {@link org.apache.hadoop.hdfs.DFSInputStream#seek()} - // only moves the index. actual IO happens on the next read operation - inputStream.seek(inputStream.getPos() - Long.BYTES); - // Block size in the footer includes the magic header, which the header does not include. - // So we have to shorten the footer block size by the size of magic hash - long blockSizeFromFooter = inputStream.readLong() - magicBuffer.length; if (blocksize != blockSizeFromFooter) { LOG.info("Found corrupted block in file " + logFile + ". Header block size(" + blocksize - + ") did not match the footer block size(" + blockSizeFromFooter + ")"); + + ") did not match the footer block size(" + blockSizeFromFooter + ")"); inputStream.seek(currentPos); return true; } + try { readMagic(); // all good - either we found the sync marker or EOF. Reset position and continue