Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.io.BoundedDelegatingInputStream;
import org.apache.hadoop.hbase.io.TagCompressionContext;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.util.Dictionary;
Expand Down Expand Up @@ -77,7 +76,7 @@ static class ValueCompressor {
private final Compression.Algorithm algorithm;
private Compressor compressor;
private Decompressor decompressor;
private BoundedDelegatingInputStream lowerIn;
private WALDecompressionBoundedDelegatingInputStream lowerIn;
private ByteArrayOutputStream lowerOut;
private InputStream compressedIn;
private OutputStream compressedOut;
Expand Down Expand Up @@ -108,20 +107,17 @@ public byte[] compress(byte[] valueArray, int valueOffset, int valueLength) thro

public void decompress(InputStream in, int inLength, byte[] outArray, int outOffset,
int outLength) throws IOException {

// Our input is a sequence of bounded byte ranges (call them segments), with
// BoundedDelegatingInputStream providing a way to switch in a new segment when the
// previous segment has been fully consumed.

// Create the input streams here the first time around.
if (compressedIn == null) {
lowerIn = new BoundedDelegatingInputStream(in, inLength);
lowerIn = new WALDecompressionBoundedDelegatingInputStream();
if (decompressor == null) {
decompressor = algorithm.getDecompressor();
}
compressedIn = algorithm.createDecompressionStream(lowerIn, decompressor, IO_BUFFER_SIZE);
} else {
lowerIn.setDelegate(in, inLength);
}
if (outLength == 0) {
// The BufferedInputStream will return earlier and skip reading anything if outLength == 0,
Expand All @@ -131,6 +127,7 @@ public void decompress(InputStream in, int inLength, byte[] outArray, int outOff
// such as data loss when splitting wal or replicating wal.
IOUtils.skipFully(in, inLength);
} else {
lowerIn.reset(in, inLength);
IOUtils.readFully(compressedIn, outArray, outOffset, outLength);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,86 +15,80 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io;
package org.apache.hadoop.hbase.regionserver.wal;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import org.apache.commons.io.IOUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This is a stream that will only supply bytes from its delegate up to a certain limit. When there
* is an attempt to set the position beyond that it will signal that the input is finished.
* This class is only used by WAL ValueCompressor for decompression.
* <p>
* <strong>WARNING: </strong>The implementation is very tricky and does not follow typical
* InputStream pattern, so do not use it in any other places.
*/
@InterfaceAudience.Private
public class BoundedDelegatingInputStream extends DelegatingInputStream {
class WALDecompressionBoundedDelegatingInputStream extends InputStream {

protected long limit;
protected long pos;
private static final Logger LOG =
LoggerFactory.getLogger(WALDecompressionBoundedDelegatingInputStream.class);

public BoundedDelegatingInputStream(InputStream in, long limit) {
super(in);
this.limit = limit;
this.pos = 0;
}
private InputStream in;

private long pos;

public void setDelegate(InputStream in, long limit) {
private long limit;

public void reset(InputStream in, long limit) {
this.in = in;
this.limit = limit;
this.pos = 0;
}

/**
* Call the delegate's {@code read()} method if the current position is less than the limit.
* @return the byte read or -1 if the end of stream or the limit has been reached.
*/
@Override
public int read() throws IOException {
if (pos >= limit) {
return -1;
}
int result = in.read();
if (result < 0) {
return -1;
}
pos++;
return result;
}

/**
* Call the delegate's {@code read(byte[], int, int)} method if the current position is less than
* the limit.
* @param b read buffer
* @param off Start offset
* @param len The number of bytes to read
* @return the number of bytes read or -1 if the end of stream or the limit has been reached.
*/
@Override
public int read(final byte[] b, final int off, final int len) throws IOException {
public int read(byte[] b, int off, int len) throws IOException {
if (pos >= limit) {
return -1;
}
long readLen = Math.min(len, limit - pos);
int read = in.read(b, off, (int) readLen);
if (read < 0) {
int readLen = (int) Math.min(len, limit - pos);
try {
IOUtils.readFully(in, b, off, readLen);
} catch (EOFException e) {
// This is trick here, we will always try to read enough bytes to fill the buffer passed in,
// or we reach the end of this compression block, if there are not enough bytes, we just
// return -1 to let the upper layer fail with EOF
// In WAL value decompression this is OK as if we can not read all the data, we will finally
// get an EOF somewhere
LOG.debug("Got EOF while we want to read {} bytes from stream", readLen, e);
return -1;
}
pos += read;
return read;
return readLen;
}

/**
* Call the delegate's {@code skip(long)} method.
* @param len the number of bytes to skip
* @return the actual number of bytes skipped
*/
@Override
public long skip(final long len) throws IOException {
long skipped = in.skip(Math.min(len, limit - pos));
pos += skipped;
return skipped;
}

/**
* @return the remaining bytes within the bound if the current position is less than the limit, or
* 0 otherwise.
*/
@Override
public int available() throws IOException {
if (pos >= limit) {
Expand All @@ -108,5 +102,4 @@ public int available() throws IOException {
// successful decompression depends on this behavior.
return (int) (limit - pos);
}

}