Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.hadoop.hbase.io.ByteBufferWriterDataOutputStream;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.encoding.EncodedDataBlock;
import org.apache.hadoop.hbase.io.encoding.EncodingState;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
Expand Down Expand Up @@ -239,6 +240,10 @@ static class Header {
static final byte[] DUMMY_HEADER_NO_CHECKSUM =
new byte[HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM];

public static final String BLOCK_DELIMIT_COMPRESSED = "hbase.block.delimit.compressed";

public static final String MAX_BLOCK_SIZE_UNCOMPRESSED = "hbase.block.max.size.uncompressed";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this and it's logic to the predicate itself?


/**
* Used deserializing blocks from Cache. <code>
* ++++++++++++++
Expand Down Expand Up @@ -454,7 +459,7 @@ int getOnDiskSizeWithoutHeader() {
}

/** Returns the uncompressed size of data part (header and checksum excluded). */
int getUncompressedSizeWithoutHeader() {
public int getUncompressedSizeWithoutHeader() {
return uncompressedSizeWithoutHeader;
}

Expand Down Expand Up @@ -729,6 +734,16 @@ private enum State {
BLOCK_READY
}

public boolean isDelimitByCompressedSize() {
return delimitByCompressedSize;
}

private boolean delimitByCompressedSize;

private int maxSizeUnCompressed;

private int adjustedBlockSize;

/** Writer state. Used to ensure the correct usage protocol. */
private State state = State.INIT;

Expand Down Expand Up @@ -807,11 +822,13 @@ EncodingState getEncodingState() {
*/
public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder,
HFileContext fileContext) {
this(conf, dataBlockEncoder, fileContext, ByteBuffAllocator.HEAP);
this(conf, dataBlockEncoder, fileContext, ByteBuffAllocator.HEAP, false,
fileContext.getBlocksize());
}

public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder,
HFileContext fileContext, ByteBuffAllocator allocator) {
HFileContext fileContext, ByteBuffAllocator allocator, boolean sizeLimitcompleted,
int maxSizeUnCompressed) {
if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) {
throw new RuntimeException("Unsupported value of bytesPerChecksum. " + " Minimum is "
+ HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is "
Expand All @@ -834,6 +851,8 @@ public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder,
// TODO: Why fileContext saved away when we have dataBlockEncoder and/or
// defaultDataBlockEncoder?
this.fileContext = fileContext;
this.delimitByCompressedSize = sizeLimitcompleted;
this.maxSizeUnCompressed = maxSizeUnCompressed;
}

/**
Expand Down Expand Up @@ -886,6 +905,27 @@ void ensureBlockReady() throws IOException {
finishBlock();
}

public boolean shouldFinishBlock() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason to have this logic here vs in HFileWriteRImpl with the rest of the shouldfinish logic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method involves dealing with some block specifics, like compression, the block content byte array buffer and what to do with compression size when deciding what should be a block limit. Moving it to HFileWriteRImpl would spill some block specific variables and logic into the file writer logic. It just feels to me, putting it here is more cohesive.

int uncompressedBlockSize = blockSizeWritten();
if (uncompressedBlockSize >= fileContext.getBlocksize()) {
if (delimitByCompressedSize && uncompressedBlockSize < maxSizeUnCompressed) {
// In order to avoid excessive compression size calculations, we do it only once when
// the uncompressed size has reached BLOCKSIZE. We then use this compression size to
// calculate the compression rate, and adjust the block size limit by this ratio.
if (adjustedBlockSize == 0 || uncompressedBlockSize >= adjustedBlockSize) {
int compressedSize = EncodedDataBlock.getCompressedSize(fileContext.getCompression(),
fileContext.getCompression().getCompressor(), baosInMemory.getBuffer(), 0,
baosInMemory.size());
adjustedBlockSize = uncompressedBlockSize / compressedSize;
adjustedBlockSize *= fileContext.getBlocksize();
}
return uncompressedBlockSize >= adjustedBlockSize;
}
return true;
}
return false;
}

/**
* Finish up writing of the block. Flushes the compressing stream (if using compression), fills
* out the header, does any compression/encryption of bytes to flush out to disk, and manages
Expand Down Expand Up @@ -1066,7 +1106,7 @@ int getUncompressedSizeWithoutHeader() {
/**
* The uncompressed size of the block data, including header size.
*/
int getUncompressedSizeWithHeader() {
public int getUncompressedSizeWithHeader() {
expectState(State.BLOCK_READY);
return baosInMemory.size();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hbase.io.hfile;

import static org.apache.hadoop.hbase.io.hfile.HFileBlock.BLOCK_DELIMIT_COMPRESSED;
import static org.apache.hadoop.hbase.io.hfile.HFileBlock.MAX_BLOCK_SIZE_UNCOMPRESSED;

import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
Expand Down Expand Up @@ -291,8 +294,9 @@ protected void finishInit(final Configuration conf) {
if (blockWriter != null) {
throw new IllegalStateException("finishInit called twice");
}
blockWriter =
new HFileBlock.Writer(conf, blockEncoder, hFileContext, cacheConf.getByteBuffAllocator());
blockWriter = new HFileBlock.Writer(conf, blockEncoder, hFileContext,
cacheConf.getByteBuffAllocator(), conf.getBoolean(BLOCK_DELIMIT_COMPRESSED, false),
conf.getInt(MAX_BLOCK_SIZE_UNCOMPRESSED, hFileContext.getBlocksize() * 10));
// Data block index writer
boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite();
dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(blockWriter,
Expand All @@ -319,6 +323,9 @@ protected void checkBlockBoundary() throws IOException {
shouldFinishBlock = blockWriter.encodedBlockSizeWritten() >= hFileContext.getBlocksize()
|| blockWriter.blockSizeWritten() >= hFileContext.getBlocksize();
}
if (blockWriter.isDelimitByCompressedSize()) {
shouldFinishBlock &= blockWriter.shouldFinishBlock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

related to the above comment by @bbeaudreault

in what situation the shouldFinishBlock = true and blockWriter.shouldFinishBlock() = false ? is it possible?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, shouldFinishBlock could be true at this point because so far here we just checked "raw" uncompressed size or encoded uncompressed against BLOCK_SIZE. It is possible that these sizes are higher than BLOCK_SIZE, but the compressed size might still be less than the BLOCK_SIZE.

}
if (shouldFinishBlock) {
finishBlock();
writeInlineBlocks(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.regionserver;

import static org.apache.hadoop.hbase.io.hfile.HFileBlock.BLOCK_DELIMIT_COMPRESSED;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand Down Expand Up @@ -61,6 +62,7 @@
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
Expand Down Expand Up @@ -189,6 +191,24 @@ public static void writeStoreFile(final StoreFileWriter writer, byte[] fam, byte
}
}

public static void writeLargeStoreFile(final StoreFileWriter writer, byte[] fam, byte[] qualifier,
int rounds) throws IOException {
long now = EnvironmentEdgeManager.currentTime();
try {
for (int i = 0; i < rounds; i++) {
for (char d = FIRST_CHAR; d <= LAST_CHAR; d++) {
for (char e = FIRST_CHAR; e <= LAST_CHAR; e++) {
byte[] b = new byte[] { (byte) d, (byte) e };
byte[] key = new byte[] { (byte) i };
writer.append(new KeyValue(key, fam, qualifier, now, b));
}
}
}
} finally {
writer.close();
}
}

/**
* Test that our mechanism of writing store files in one region to reference store files in other
* regions works.
Expand Down Expand Up @@ -1193,4 +1213,42 @@ public void testDataBlockSizeEncoded() throws Exception {
}
}

@Test
public void testDataBlockSizeCompressed() throws Exception {
Path dir = new Path(new Path(this.testDir, "7e0102"), "familyname");
Path path = new Path(dir, "1234567890");
DataBlockEncoding dataBlockEncoderAlgo = DataBlockEncoding.FAST_DIFF;
conf.setBoolean(BLOCK_DELIMIT_COMPRESSED, true);
cacheConf = new CacheConfig(conf);
HFileContext meta =
new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).withChecksumType(CKTYPE)
.withBytesPerCheckSum(CKBYTES).withDataBlockEncoding(dataBlockEncoderAlgo)
.withCompression(Compression.Algorithm.GZ).build();
// Make a store file and write data to it.
StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
.withFilePath(path).withMaxKeyCount(2000).withFileContext(meta).build();
writeLargeStoreFile(writer, Bytes.toBytes(name.getMethodName()),
Bytes.toBytes(name.getMethodName()), 200);
writer.close();
HStoreFile storeFile =
new HStoreFile(fs, writer.getPath(), conf, cacheConf, BloomType.NONE, true);
storeFile.initReader();
HFile.Reader fReader =
HFile.createReader(fs, writer.getPath(), storeFile.getCacheConf(), true, conf);
FSDataInputStreamWrapper fsdis = new FSDataInputStreamWrapper(fs, writer.getPath());
long fileSize = fs.getFileStatus(writer.getPath()).getLen();
FixedFileTrailer trailer = FixedFileTrailer.readFromStream(fsdis.getStream(false), fileSize);
long offset = trailer.getFirstDataBlockOffset(), max = trailer.getLastDataBlockOffset();
HFileBlock block;
int blockCount = 0;
while (offset <= max) {
block = fReader.readBlock(offset, -1, /* cacheBlock */ false, /* pread */ false,
/* isCompaction */ false, /* updateCacheMetrics */ false, null, null);
offset += block.getOnDiskSizeWithHeader();
blockCount++;
assertTrue(block.getUncompressedSizeWithoutHeader() >= BLOCKSIZE_SMALL);
}
assertEquals(blockCount, 100);
}

}