diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/BoundedFsDataInputStream.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/BoundedFsDataInputStream.java new file mode 100644 index 0000000000000..27315f85e62c9 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/BoundedFsDataInputStream.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + */ + +package org.apache.hudi.common.fs; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; + +public class BoundedFsDataInputStream extends FSDataInputStream { + private FileSystem fs; + private Path file; + private long fileLen = -1L; + + public BoundedFsDataInputStream(FileSystem fs, Path file, InputStream in) { + super(in); + this.fs = fs; + this.file = file; + } + + @Override + public boolean markSupported() { + return false; + } + + /* Return the file length */ + private long getFileLength() throws IOException { + if (fileLen == -1L) { + fileLen = fs.getContentSummary(file).getLength(); + } + return fileLen; + } + + @Override + public synchronized void seek(long pos) throws IOException { + if (pos < 0 || pos > getFileLength()) { + throw new EOFException("Try to seek pos[" + pos + "] , but fileSize is " + getFileLength()); + } + super.seek(pos); + } + + @Override + public synchronized long skip(long n) throws IOException { + long curPos = getPos(); + long fileLength = getFileLength(); + if (n + curPos > fileLength) { + n = fileLength - curPos; + } + return super.skip(n); + } + +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java index fe697197f20e9..474d60f8d5d80 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java @@ -653,6 +653,14 @@ public static boolean isGCSFileSystem(FileSystem fs) { return fs.getScheme().equals(StorageSchemes.GCS.getScheme()); } + /** + * Chdfs will throw {@code IOException} instead of {@code EOFException}. It will cause error in isBlockCorrupted(). + * Wrapped by {@code BoundedFsDataInputStream}, to check whether the desired offset is out of the file size in advance. + */ + public static boolean isCHDFileSystem(FileSystem fs) { + return StorageSchemes.CHDFS.getScheme().equals(fs.getScheme()); + } + public static Configuration registerFileSystem(Path file, Configuration conf) { Configuration returnConf = new Configuration(conf); String scheme = FSUtils.getFs(file.toString(), conf).getScheme(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java index c7c457b2e9ddd..cb16c8b141298 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java @@ -18,6 +18,7 @@ package org.apache.hudi.common.table.log; +import org.apache.hudi.common.fs.BoundedFsDataInputStream; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.SchemeAwareFSDataInputStream; import org.apache.hudi.common.fs.TimedFSDataInputStream; @@ -45,7 +46,6 @@ import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.util.Bytes; - import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -478,6 +478,10 @@ private static FSDataInputStream getFSDataInputStream(FileSystem fs, return new SchemeAwareFSDataInputStream(getFSDataInputStreamForGCS(fsDataInputStream, logFile, bufferSize), true); } + if (FSUtils.isCHDFileSystem(fs)) { + return new BoundedFsDataInputStream(fs, logFile.getPath(), fsDataInputStream); + } + if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) { return new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream( new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize)));