Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,6 @@

package org.apache.hudi.common.table.log;

import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BufferedFSInputStream;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.SchemeAwareFSDataInputStream;
import org.apache.hudi.common.fs.TimedFSDataInputStream;
Expand All @@ -43,10 +36,19 @@
import org.apache.hudi.exception.CorruptedLogFileException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieNotSupportedException;

import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BufferedFSInputStream;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import javax.annotation.Nullable;

import java.io.EOFException;
import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -254,8 +256,17 @@ private HoodieLogBlock createCorruptBlock() throws IOException {

private boolean isBlockCorrupted(int blocksize) throws IOException {
long currentPos = inputStream.getPos();
long blockSizeFromFooter;

try {
inputStream.seek(currentPos + blocksize);
// check if the blocksize mentioned in the footer is the same as the header;
// by seeking and checking the length of a long. We do not seek `currentPos + blocksize`
// which can be the file size for the last block in the file, causing EOFException
// for some FSDataInputStream implementation
inputStream.seek(currentPos + blocksize - Long.BYTES);
// Block size in the footer includes the magic header, which the header does not include.
// So we have to shorten the footer block size by the size of magic hash
blockSizeFromFooter = inputStream.readLong() - magicBuffer.length;
} catch (EOFException e) {
LOG.info("Found corrupted block in file " + logFile + " with block size(" + blocksize + ") running past EOF");
// this is corrupt
Expand All @@ -266,19 +277,13 @@ private boolean isBlockCorrupted(int blocksize) throws IOException {
return true;
}

// check if the blocksize mentioned in the footer is the same as the header; by seeking back the length of a long
// the backward seek does not incur additional IO as {@link org.apache.hadoop.hdfs.DFSInputStream#seek()}
// only moves the index. actual IO happens on the next read operation
inputStream.seek(inputStream.getPos() - Long.BYTES);
// Block size in the footer includes the magic header, which the header does not include.
// So we have to shorten the footer block size by the size of magic hash
long blockSizeFromFooter = inputStream.readLong() - magicBuffer.length;
if (blocksize != blockSizeFromFooter) {
LOG.info("Found corrupted block in file " + logFile + ". Header block size(" + blocksize
+ ") did not match the footer block size(" + blockSizeFromFooter + ")");
+ ") did not match the footer block size(" + blockSizeFromFooter + ")");
inputStream.seek(currentPos);
return true;
}

try {
readMagic();
// all good - either we found the sync marker or EOF. Reset position and continue
Expand Down