Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 10 additions & 12 deletions hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@

package org.apache.hudi.common.fs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieFileFormat;
Expand All @@ -31,16 +40,6 @@
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.InvalidHoodiePathException;
import org.apache.hudi.metadata.HoodieTableMetadata;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand All @@ -50,11 +49,10 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;

import java.util.List;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import java.util.regex.Matcher;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand All @@ -57,6 +58,7 @@
public class HoodieLogFileReader implements HoodieLogFormat.Reader {

public static final int DEFAULT_BUFFER_SIZE = 16 * 1024 * 1024; // 16 MB
private static final int BLOCK_SCAN_READ_BUFFER_SIZE = 1024 * 1024; // 1 MB
private static final Logger LOG = LogManager.getLogger(HoodieLogFileReader.class);

private final FSDataInputStream inputStream;
Expand All @@ -71,9 +73,13 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader {
private transient Thread shutdownThread = null;

public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize,
boolean readBlockLazily, boolean reverseReader) throws IOException {
boolean readBlockLazily, boolean reverseReader) throws IOException {
FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(), bufferSize);
if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
if (FSUtils.isGCSInputStream(fsDataInputStream)) {
this.inputStream = new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
new BufferedFSInputStream((FSInputStream) ((
(FSDataInputStream) fsDataInputStream.getWrappedStream()).getWrappedStream()), bufferSize)));
} else if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
this.inputStream = new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize)));
} else {
Expand Down Expand Up @@ -274,19 +280,25 @@ private boolean isBlockCorrupt(int blocksize) throws IOException {
}

private long scanForNextAvailableBlockOffset() throws IOException {
// Make buffer large enough to scan through the file as quick as possible especially if it is on S3/GCS.
byte[] dataBuf = new byte[BLOCK_SCAN_READ_BUFFER_SIZE];
boolean eof = false;
while (true) {
long currentPos = inputStream.getPos();
try {
boolean hasNextMagic = hasNextMagic();
if (hasNextMagic) {
return currentPos;
} else {
// No luck - advance and try again
inputStream.seek(currentPos + 1);
}
Arrays.fill(dataBuf, (byte) 0);
inputStream.readFully(dataBuf, 0, dataBuf.length);
} catch (EOFException e) {
eof = true;
}
long pos = Bytes.indexOf(dataBuf, HoodieLogFormat.MAGIC);
if (pos >= 0) {
return currentPos + pos;
}
if (eof) {
return inputStream.getPos();
}
inputStream.seek(currentPos + dataBuf.length - HoodieLogFormat.MAGIC.length);
}
}

Expand Down