Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.hudi.common.fs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
Expand Down Expand Up @@ -569,14 +568,12 @@ public static String getDFSFullPartitionPath(FileSystem fs, Path fullPartitionPa

/**
* This is due to HUDI-140 GCS has a different behavior for detecting EOF during seek().
*
* @param inputStream FSDataInputStream
*
* @param fs fileSystem instance.
* @return true if the inputstream or the wrapped one is of type GoogleHadoopFSInputStream
*/
public static boolean isGCSInputStream(FSDataInputStream inputStream) {
return inputStream.getClass().getCanonicalName().equals("com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream")
|| inputStream.getWrappedStream().getClass().getCanonicalName()
.equals("com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream");
public static boolean isGCSFileSystem(FileSystem fs) {
return fs.getScheme().equals(StorageSchemes.GCS.getScheme());
}

public static Configuration registerFileSystem(Path file, Configuration conf) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.common.fs;

import org.apache.hadoop.fs.FSDataInputStream;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;

/**
* Scheme aware FSDataInputStream so that we manipulate seeks for GS filesystem.
*/
public class SchemeAwareFSDataInputStream extends FSDataInputStream {

private final boolean isGCSFileSystem;

public SchemeAwareFSDataInputStream(InputStream in, boolean isGCSFileSystem) {
super(in);
this.isGCSFileSystem = isGCSFileSystem;
}

@Override
public void seek(long desired) throws IOException {
try {
super.seek(desired);
} catch (EOFException e) {
// with GCSFileSystem, accessing the last byte might throw EOFException and hence this fix.
if (isGCSFileSystem) {
super.seek(desired - 1);
} else {
throw e;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.common.table.log;

import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.SchemeAwareFSDataInputStream;
import org.apache.hudi.common.fs.TimedFSDataInputStream;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
Expand Down Expand Up @@ -75,20 +76,8 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader {
public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize,
boolean readBlockLazily, boolean reverseReader) throws IOException {
FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(), bufferSize);
if (FSUtils.isGCSInputStream(fsDataInputStream)) {
this.inputStream = new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
new BufferedFSInputStream((FSInputStream) ((
(FSDataInputStream) fsDataInputStream.getWrappedStream()).getWrappedStream()), bufferSize)));
} else if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
this.inputStream = new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize)));
} else {
// fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream
// need to wrap in another BufferedFSInputStream the make bufferSize work?
this.inputStream = fsDataInputStream;
}

this.logFile = logFile;
this.inputStream = getFSDataInputStream(fsDataInputStream, fs, bufferSize);
this.readerSchema = readerSchema;
this.readBlockLazily = readBlockLazily;
this.reverseReader = reverseReader;
Expand All @@ -107,6 +96,56 @@ public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSc
this(fs, logFile, readerSchema, DEFAULT_BUFFER_SIZE, false, false);
}

/**
* Fetch the right {@link FSDataInputStream} to be used by wrapping with required input streams.
* @param fsDataInputStream original instance of {@link FSDataInputStream}.
* @param fs instance of {@link FileSystem} in use.
* @param bufferSize buffer size to be used.
* @return the right {@link FSDataInputStream} as required.
*/
private FSDataInputStream getFSDataInputStream(FSDataInputStream fsDataInputStream, FileSystem fs, int bufferSize) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather rewrite it like this reducing cyclomatic complexity, but I am also fine with what is here originally:

  private FSDataInputStream getFSDataInputStream(FSDataInputStream fsDataInputStream, FileSystem fs, int bufferSize) {
    if (FSUtils.isGCSFileSystem(fs)) {
      return new SchemeAwareFSDataInputStream(
         getFSDataInputStreamForGCSFs(fsDataInputStream, fs, bufferSize), true);
    }
    if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
        return new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
            new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize)));
    }
    return fsDataInputStream;
  }
 
   private FSDataInputStream getFSDataInputStreamForGCSFs(FSDataInputStream fsDataInputStream, FileSystem fs, int bufferSize) {
    if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
      return new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
          new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize)));
    }
    if (fsDataInputStream.getWrappedStream() instanceof FSDataInputStream
        && ((FSDataInputStream) fsDataInputStream.getWrappedStream()).getWrappedStream() instanceof FSInputStream) {
      FSInputStream inputStream = (FSInputStream)((FSDataInputStream) fsDataInputStream.getWrappedStream()).getWrappedStream();
      return new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
          new BufferedFSInputStream(inputStream, bufferSize)));
    }
    return fsDataInputSt
 

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also agree with you. will restructure this a bit.

if (FSUtils.isGCSFileSystem(fs)) {
// in GCS FS, we might need to interceptor seek offsets as we might get EOF exception
return new SchemeAwareFSDataInputStream(getFSDataInputStreamForGCS(fsDataInputStream, bufferSize), true);
}

if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
return new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize)));
}

// fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream
// need to wrap in another BufferedFSInputStream the make bufferSize work?
return fsDataInputStream;
}

/**
* GCS FileSystem needs some special handling for seek and hence this method assists to fetch the right {@link FSDataInputStream} to be
* used by wrapping with required input streams.
* @param fsDataInputStream original instance of {@link FSDataInputStream}.
* @param bufferSize buffer size to be used.
* @return the right {@link FSDataInputStream} as required.
*/
private FSDataInputStream getFSDataInputStreamForGCS(FSDataInputStream fsDataInputStream, int bufferSize) {
// incase of GCS FS, there are two flows.
// a. fsDataInputStream.getWrappedStream() instanceof FSInputStream
// b. fsDataInputStream.getWrappedStream() not an instanceof FSInputStream, but an instance of FSDataInputStream.
// (a) is handled in the first if block and (b) is handled in the second if block. If not, we fallback to original fsDataInputStream
if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
return new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize)));
}

if (fsDataInputStream.getWrappedStream() instanceof FSDataInputStream
&& ((FSDataInputStream) fsDataInputStream.getWrappedStream()).getWrappedStream() instanceof FSInputStream) {
FSInputStream inputStream = (FSInputStream)((FSDataInputStream) fsDataInputStream.getWrappedStream()).getWrappedStream();
return new TimedFSDataInputStream(logFile.getPath(),
new FSDataInputStream(new BufferedFSInputStream(inputStream, bufferSize)));
}

return fsDataInputStream;
}

@Override
public HoodieLogFile getLogFile() {
return logFile;
Expand Down Expand Up @@ -238,11 +277,7 @@ private HoodieLogBlock createCorruptBlock() throws IOException {
private boolean isBlockCorrupt(int blocksize) throws IOException {
long currentPos = inputStream.getPos();
try {
if (FSUtils.isGCSInputStream(inputStream)) {
inputStream.seek(currentPos + blocksize - 1);
} else {
inputStream.seek(currentPos + blocksize);
}
inputStream.seek(currentPos + blocksize);
} catch (EOFException e) {
LOG.info("Found corrupted block in file " + logFile + " with block size(" + blocksize + ") running past EOF");
// this is corrupt
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.hudi.common.table.log.block;

import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.util.Option;
Expand Down Expand Up @@ -220,7 +219,7 @@ public static byte[] readOrSkipContent(FSDataInputStream inputStream, Integer co
inputStream.readFully(content, 0, contentLength);
} else {
// Seek to the end of the content block
safeSeek(inputStream, inputStream.getPos() + contentLength);
inputStream.seek(inputStream.getPos() + contentLength);
}
return content;
}
Expand All @@ -232,9 +231,9 @@ protected void inflate() throws HoodieIOException {

try {
content = Option.of(new byte[(int) this.getBlockContentLocation().get().getBlockSize()]);
safeSeek(inputStream, this.getBlockContentLocation().get().getContentPositionInLogFile());
inputStream.seek(this.getBlockContentLocation().get().getContentPositionInLogFile());
inputStream.readFully(content.get(), 0, content.get().length);
safeSeek(inputStream, this.getBlockContentLocation().get().getBlockEndPos());
inputStream.seek(this.getBlockContentLocation().get().getBlockEndPos());
} catch (IOException e) {
// TODO : fs.open() and return inputstream again, need to pass FS configuration
// because the inputstream might close/timeout for large number of log blocks to be merged
Expand All @@ -249,23 +248,4 @@ protected void inflate() throws HoodieIOException {
protected void deflate() {
content = Option.empty();
}

/**
* Handles difference in seek behavior for GCS and non-GCS input stream.
*
* @param inputStream Input Stream
* @param pos Position to seek
* @throws IOException -
*/
private static void safeSeek(FSDataInputStream inputStream, long pos) throws IOException {
try {
inputStream.seek(pos);
} catch (EOFException e) {
if (FSUtils.isGCSInputStream(inputStream)) {
inputStream.seek(pos - 1);
} else {
throw e;
}
}
}
}