diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java index 73cf4821db00..633decab0a82 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java @@ -28,7 +28,6 @@ import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseInterfaceAudience; -import org.apache.hadoop.hbase.io.BoundedDelegatingInputStream; import org.apache.hadoop.hbase.io.TagCompressionContext; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.util.Dictionary; @@ -77,7 +76,7 @@ static class ValueCompressor { private final Compression.Algorithm algorithm; private Compressor compressor; private Decompressor decompressor; - private BoundedDelegatingInputStream lowerIn; + private WALDecompressionBoundedDelegatingInputStream lowerIn; private ByteArrayOutputStream lowerOut; private InputStream compressedIn; private OutputStream compressedOut; @@ -108,20 +107,17 @@ public byte[] compress(byte[] valueArray, int valueOffset, int valueLength) thro public void decompress(InputStream in, int inLength, byte[] outArray, int outOffset, int outLength) throws IOException { - // Our input is a sequence of bounded byte ranges (call them segments), with // BoundedDelegatingInputStream providing a way to switch in a new segment when the // previous segment has been fully consumed. // Create the input streams here the first time around. if (compressedIn == null) { - lowerIn = new BoundedDelegatingInputStream(in, inLength); + lowerIn = new WALDecompressionBoundedDelegatingInputStream(); if (decompressor == null) { decompressor = algorithm.getDecompressor(); } compressedIn = algorithm.createDecompressionStream(lowerIn, decompressor, IO_BUFFER_SIZE); - } else { - lowerIn.setDelegate(in, inLength); } if (outLength == 0) { // The BufferedInputStream will return earlier and skip reading anything if outLength == 0, @@ -131,6 +127,7 @@ public void decompress(InputStream in, int inLength, byte[] outArray, int outOff // such as data loss when splitting wal or replicating wal. IOUtils.skipFully(in, inLength); } else { + lowerIn.reset(in, inLength); IOUtils.readFully(compressedIn, outArray, outOffset, outLength); } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedDelegatingInputStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALDecompressionBoundedDelegatingInputStream.java similarity index 57% rename from hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedDelegatingInputStream.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALDecompressionBoundedDelegatingInputStream.java index 2a6db09050c6..0f4fd78a0b83 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedDelegatingInputStream.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALDecompressionBoundedDelegatingInputStream.java @@ -15,75 +15,73 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hbase.io; +package org.apache.hadoop.hbase.regionserver.wal; +import java.io.EOFException; import java.io.IOException; import java.io.InputStream; +import org.apache.commons.io.IOUtils; import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * This is a stream that will only supply bytes from its delegate up to a certain limit. When there - * is an attempt to set the position beyond that it will signal that the input is finished. + * This class is only used by WAL ValueCompressor for decompression. + *
+ * WARNING: The implementation is very tricky and does not follow typical + * InputStream pattern, so do not use it in any other places. */ @InterfaceAudience.Private -public class BoundedDelegatingInputStream extends DelegatingInputStream { +class WALDecompressionBoundedDelegatingInputStream extends InputStream { - protected long limit; - protected long pos; + private static final Logger LOG = + LoggerFactory.getLogger(WALDecompressionBoundedDelegatingInputStream.class); - public BoundedDelegatingInputStream(InputStream in, long limit) { - super(in); - this.limit = limit; - this.pos = 0; - } + private InputStream in; + + private long pos; - public void setDelegate(InputStream in, long limit) { + private long limit; + + public void reset(InputStream in, long limit) { this.in = in; this.limit = limit; this.pos = 0; } - /** - * Call the delegate's {@code read()} method if the current position is less than the limit. - * @return the byte read or -1 if the end of stream or the limit has been reached. - */ @Override public int read() throws IOException { if (pos >= limit) { return -1; } int result = in.read(); + if (result < 0) { + return -1; + } pos++; return result; } - /** - * Call the delegate's {@code read(byte[], int, int)} method if the current position is less than - * the limit. - * @param b read buffer - * @param off Start offset - * @param len The number of bytes to read - * @return the number of bytes read or -1 if the end of stream or the limit has been reached. - */ @Override - public int read(final byte[] b, final int off, final int len) throws IOException { + public int read(byte[] b, int off, int len) throws IOException { if (pos >= limit) { return -1; } - long readLen = Math.min(len, limit - pos); - int read = in.read(b, off, (int) readLen); - if (read < 0) { + int readLen = (int) Math.min(len, limit - pos); + try { + IOUtils.readFully(in, b, off, readLen); + } catch (EOFException e) { + // This is trick here, we will always try to read enough bytes to fill the buffer passed in, + // or we reach the end of this compression block, if there are not enough bytes, we just + // return -1 to let the upper layer fail with EOF + // In WAL value decompression this is OK as if we can not read all the data, we will finally + // get an EOF somewhere + LOG.debug("Got EOF while we want to read {} bytes from stream", readLen, e); return -1; } - pos += read; - return read; + return readLen; } - /** - * Call the delegate's {@code skip(long)} method. - * @param len the number of bytes to skip - * @return the actual number of bytes skipped - */ @Override public long skip(final long len) throws IOException { long skipped = in.skip(Math.min(len, limit - pos)); @@ -91,10 +89,6 @@ public long skip(final long len) throws IOException { return skipped; } - /** - * @return the remaining bytes within the bound if the current position is less than the limit, or - * 0 otherwise. - */ @Override public int available() throws IOException { if (pos >= limit) { @@ -108,5 +102,4 @@ public int available() throws IOException { // successful decompression depends on this behavior. return (int) (limit - pos); } - }