Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,11 @@ public enum OperationStatusCode {
public static final String COMPACTION_KV_MAX = "hbase.hstore.compaction.kv.max";
public static final int COMPACTION_KV_MAX_DEFAULT = 10;

/** Parameter name for the scanner size limit to be used in compactions */
public static final String COMPACTION_SCANNER_SIZE_MAX =
"hbase.hstore.compaction.scanner.size.limit";
public static final long COMPACTION_SCANNER_SIZE_MAX_DEFAULT = 10 * 1024 * 1024L; // 10MB

/** Parameter name for HBase instance root directory */
public static final String HBASE_DIR = "hbase.rootdir";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,8 +346,10 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
long cellsSizeCompactedToMob = 0, cellsSizeCompactedFromMob = 0;
boolean finished = false;

ScannerContext scannerContext =
ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(compactionKVMax)
.setSizeLimit(ScannerContext.LimitScope.BETWEEN_CELLS, Long.MAX_VALUE, Long.MAX_VALUE,
compactScannerSizeLimit)
.build();
throughputController.start(compactionName);
KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null;
long shippedCallSizeLimit =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public abstract class Compactor<T extends CellSink> {
protected final Configuration conf;
protected final HStore store;
protected final int compactionKVMax;
protected final long compactScannerSizeLimit;
protected final Compression.Algorithm majorCompactionCompression;
protected final Compression.Algorithm minorCompactionCompression;

Expand All @@ -108,6 +109,8 @@ public abstract class Compactor<T extends CellSink> {
this.store = store;
this.compactionKVMax =
this.conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
this.compactScannerSizeLimit = this.conf.getLong(HConstants.COMPACTION_SCANNER_SIZE_MAX,
HConstants.COMPACTION_SCANNER_SIZE_MAX_DEFAULT);
this.majorCompactionCompression = (store.getColumnFamilyDescriptor() == null)
? Compression.Algorithm.NONE
: store.getColumnFamilyDescriptor().getMajorCompactionCompressionType();
Expand Down Expand Up @@ -428,8 +431,10 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction");
long now = 0;
boolean hasMore;
ScannerContext scannerContext =
ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(compactionKVMax)
.setSizeLimit(ScannerContext.LimitScope.BETWEEN_CELLS, Long.MAX_VALUE, Long.MAX_VALUE,
compactScannerSizeLimit)
.build();

throughputController.start(compactionName);
Shipper shipper = (scanner instanceof Shipper) ? (Shipper) scanner : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,10 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
long cellsSizeCompactedToMob = 0, cellsSizeCompactedFromMob = 0;
boolean finished = false;

ScannerContext scannerContext =
ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(compactionKVMax)
.setSizeLimit(ScannerContext.LimitScope.BETWEEN_CELLS, Long.MAX_VALUE, Long.MAX_VALUE,
compactScannerSizeLimit)
.build();
throughputController.start(compactionName);
KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null;
long shippedCallSizeLimit =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ public TestCompaction() {
// Set cache flush size to 1MB
conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024);
conf.setInt(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, 100);
conf.setLong(HConstants.COMPACTION_SCANNER_SIZE_MAX, 10L);
conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
NoLimitThroughputController.class.getName());
compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6909,6 +6909,75 @@ public void testCellTTLs() throws IOException {
assertNull(r.getValue(fam1, q1));
}

@Test
public void testTTLsUsingSmallHeartBeatCells() throws IOException {
IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
EnvironmentEdgeManager.injectEdge(edge);

final byte[] row = Bytes.toBytes("testRow");
final byte[] q1 = Bytes.toBytes("q1");
final byte[] q2 = Bytes.toBytes("q2");
final byte[] q3 = Bytes.toBytes("q3");
final byte[] q4 = Bytes.toBytes("q4");
final byte[] q5 = Bytes.toBytes("q5");
final byte[] q6 = Bytes.toBytes("q6");
final byte[] q7 = Bytes.toBytes("q7");
final byte[] q8 = Bytes.toBytes("q8");

// 10 seconds
int ttlSecs = 10;
TableDescriptor tableDescriptor =
TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName())).setColumnFamily(
ColumnFamilyDescriptorBuilder.newBuilder(fam1).setTimeToLive(ttlSecs).build()).build();

Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS);
// using small heart beat cells
conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 2);

region = HBaseTestingUtility.createRegionAndWAL(
RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()).build(),
TEST_UTIL.getDataTestDir(), conf, tableDescriptor);
assertNotNull(region);
long now = EnvironmentEdgeManager.currentTime();
// Add a cell that will expire in 5 seconds via cell TTL
region.put(new Put(row).addColumn(fam1, q1, now, HConstants.EMPTY_BYTE_ARRAY));
region.put(new Put(row).addColumn(fam1, q2, now, HConstants.EMPTY_BYTE_ARRAY));
region.put(new Put(row).addColumn(fam1, q3, now, HConstants.EMPTY_BYTE_ARRAY));
// Add a cell that will expire after 10 seconds via family setting
region
.put(new Put(row).addColumn(fam1, q4, now + ttlSecs * 1000 + 1, HConstants.EMPTY_BYTE_ARRAY));
region
.put(new Put(row).addColumn(fam1, q5, now + ttlSecs * 1000 + 1, HConstants.EMPTY_BYTE_ARRAY));

region.put(new Put(row).addColumn(fam1, q6, now, HConstants.EMPTY_BYTE_ARRAY));
region.put(new Put(row).addColumn(fam1, q7, now, HConstants.EMPTY_BYTE_ARRAY));
region
.put(new Put(row).addColumn(fam1, q8, now + ttlSecs * 1000 + 1, HConstants.EMPTY_BYTE_ARRAY));

// Flush so we are sure store scanning gets this right
region.flush(true);

// A query at time T+0 should return all cells
checkScan(8);
region.delete(new Delete(row).addColumn(fam1, q8));

// Increment time to T+ttlSecs seconds
edge.incrementTime(ttlSecs * 1000);
checkScan(2);
}

private void checkScan(int expectCellSize) throws IOException {
Scan s = new Scan().withStartRow(row);
ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true);
ScannerContext scannerContext = contextBuilder.build();
RegionScanner scanner = region.getScanner(s);
List<Cell> kvs = new ArrayList<>();
scanner.next(kvs, scannerContext);
assertEquals(expectCellSize, kvs.size());
scanner.close();
}

@Test
public void testIncrementTimestampsAreMonotonic() throws IOException {
region = initHRegion(tableName, method, CONF, fam1);
Expand Down