diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java index a2b3889829780..71c6ed28a1206 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java @@ -19,7 +19,6 @@ package org.apache.hudi.common.fs; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; @@ -569,14 +568,12 @@ public static String getDFSFullPartitionPath(FileSystem fs, Path fullPartitionPa /** * This is due to HUDI-140 GCS has a different behavior for detecting EOF during seek(). - * - * @param inputStream FSDataInputStream + * + * @param fs fileSystem instance. * @return true if the inputstream or the wrapped one is of type GoogleHadoopFSInputStream */ - public static boolean isGCSInputStream(FSDataInputStream inputStream) { - return inputStream.getClass().getCanonicalName().equals("com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream") - || inputStream.getWrappedStream().getClass().getCanonicalName() - .equals("com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream"); + public static boolean isGCSFileSystem(FileSystem fs) { + return fs.getScheme().equals(StorageSchemes.GCS.getScheme()); } public static Configuration registerFileSystem(Path file, Configuration conf) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/SchemeAwareFSDataInputStream.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/SchemeAwareFSDataInputStream.java new file mode 100644 index 0000000000000..8795bf19d3568 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/SchemeAwareFSDataInputStream.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.fs; + +import org.apache.hadoop.fs.FSDataInputStream; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; + +/** + * Scheme aware FSDataInputStream so that we manipulate seeks for GS filesystem. + */ +public class SchemeAwareFSDataInputStream extends FSDataInputStream { + + private final boolean isGCSFileSystem; + + public SchemeAwareFSDataInputStream(InputStream in, boolean isGCSFileSystem) { + super(in); + this.isGCSFileSystem = isGCSFileSystem; + } + + @Override + public void seek(long desired) throws IOException { + try { + super.seek(desired); + } catch (EOFException e) { + // with GCSFileSystem, accessing the last byte might throw EOFException and hence this fix. + if (isGCSFileSystem) { + super.seek(desired - 1); + } else { + throw e; + } + } + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java index e437b782e2e81..f0f3842e97b36 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java @@ -19,6 +19,7 @@ package org.apache.hudi.common.table.log; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.fs.SchemeAwareFSDataInputStream; import org.apache.hudi.common.fs.TimedFSDataInputStream; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock; @@ -75,20 +76,8 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader { public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize, boolean readBlockLazily, boolean reverseReader) throws IOException { FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(), bufferSize); - if (FSUtils.isGCSInputStream(fsDataInputStream)) { - this.inputStream = new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream( - new BufferedFSInputStream((FSInputStream) (( - (FSDataInputStream) fsDataInputStream.getWrappedStream()).getWrappedStream()), bufferSize))); - } else if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) { - this.inputStream = new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream( - new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize))); - } else { - // fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream - // need to wrap in another BufferedFSInputStream the make bufferSize work? - this.inputStream = fsDataInputStream; - } - this.logFile = logFile; + this.inputStream = getFSDataInputStream(fsDataInputStream, fs, bufferSize); this.readerSchema = readerSchema; this.readBlockLazily = readBlockLazily; this.reverseReader = reverseReader; @@ -107,6 +96,56 @@ public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSc this(fs, logFile, readerSchema, DEFAULT_BUFFER_SIZE, false, false); } + /** + * Fetch the right {@link FSDataInputStream} to be used by wrapping with required input streams. + * @param fsDataInputStream original instance of {@link FSDataInputStream}. + * @param fs instance of {@link FileSystem} in use. + * @param bufferSize buffer size to be used. + * @return the right {@link FSDataInputStream} as required. + */ + private FSDataInputStream getFSDataInputStream(FSDataInputStream fsDataInputStream, FileSystem fs, int bufferSize) { + if (FSUtils.isGCSFileSystem(fs)) { + // in GCS FS, we might need to interceptor seek offsets as we might get EOF exception + return new SchemeAwareFSDataInputStream(getFSDataInputStreamForGCS(fsDataInputStream, bufferSize), true); + } + + if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) { + return new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream( + new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize))); + } + + // fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream + // need to wrap in another BufferedFSInputStream the make bufferSize work? + return fsDataInputStream; + } + + /** + * GCS FileSystem needs some special handling for seek and hence this method assists to fetch the right {@link FSDataInputStream} to be + * used by wrapping with required input streams. + * @param fsDataInputStream original instance of {@link FSDataInputStream}. + * @param bufferSize buffer size to be used. + * @return the right {@link FSDataInputStream} as required. + */ + private FSDataInputStream getFSDataInputStreamForGCS(FSDataInputStream fsDataInputStream, int bufferSize) { + // incase of GCS FS, there are two flows. + // a. fsDataInputStream.getWrappedStream() instanceof FSInputStream + // b. fsDataInputStream.getWrappedStream() not an instanceof FSInputStream, but an instance of FSDataInputStream. + // (a) is handled in the first if block and (b) is handled in the second if block. If not, we fallback to original fsDataInputStream + if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) { + return new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream( + new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize))); + } + + if (fsDataInputStream.getWrappedStream() instanceof FSDataInputStream + && ((FSDataInputStream) fsDataInputStream.getWrappedStream()).getWrappedStream() instanceof FSInputStream) { + FSInputStream inputStream = (FSInputStream)((FSDataInputStream) fsDataInputStream.getWrappedStream()).getWrappedStream(); + return new TimedFSDataInputStream(logFile.getPath(), + new FSDataInputStream(new BufferedFSInputStream(inputStream, bufferSize))); + } + + return fsDataInputStream; + } + @Override public HoodieLogFile getLogFile() { return logFile; @@ -238,11 +277,7 @@ private HoodieLogBlock createCorruptBlock() throws IOException { private boolean isBlockCorrupt(int blocksize) throws IOException { long currentPos = inputStream.getPos(); try { - if (FSUtils.isGCSInputStream(inputStream)) { - inputStream.seek(currentPos + blocksize - 1); - } else { - inputStream.seek(currentPos + blocksize); - } + inputStream.seek(currentPos + blocksize); } catch (EOFException e) { LOG.info("Found corrupted block in file " + logFile + " with block size(" + blocksize + ") running past EOF"); // this is corrupt diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java index 1d185e49bf040..2fbcd992087e2 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java @@ -18,7 +18,6 @@ package org.apache.hudi.common.table.log.block; -import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; import org.apache.hudi.common.util.Option; @@ -220,7 +219,7 @@ public static byte[] readOrSkipContent(FSDataInputStream inputStream, Integer co inputStream.readFully(content, 0, contentLength); } else { // Seek to the end of the content block - safeSeek(inputStream, inputStream.getPos() + contentLength); + inputStream.seek(inputStream.getPos() + contentLength); } return content; } @@ -232,9 +231,9 @@ protected void inflate() throws HoodieIOException { try { content = Option.of(new byte[(int) this.getBlockContentLocation().get().getBlockSize()]); - safeSeek(inputStream, this.getBlockContentLocation().get().getContentPositionInLogFile()); + inputStream.seek(this.getBlockContentLocation().get().getContentPositionInLogFile()); inputStream.readFully(content.get(), 0, content.get().length); - safeSeek(inputStream, this.getBlockContentLocation().get().getBlockEndPos()); + inputStream.seek(this.getBlockContentLocation().get().getBlockEndPos()); } catch (IOException e) { // TODO : fs.open() and return inputstream again, need to pass FS configuration // because the inputstream might close/timeout for large number of log blocks to be merged @@ -249,23 +248,4 @@ protected void inflate() throws HoodieIOException { protected void deflate() { content = Option.empty(); } - - /** - * Handles difference in seek behavior for GCS and non-GCS input stream. - * - * @param inputStream Input Stream - * @param pos Position to seek - * @throws IOException - - */ - private static void safeSeek(FSDataInputStream inputStream, long pos) throws IOException { - try { - inputStream.seek(pos); - } catch (EOFException e) { - if (FSUtils.isGCSInputStream(inputStream)) { - inputStream.seek(pos - 1); - } else { - throw e; - } - } - } }