Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.procedure.flush;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;
Expand All @@ -27,6 +28,7 @@
import org.apache.hadoop.hbase.procedure.flush.RegionServerFlushTableProcedureManager.FlushTableSubprocedurePool;
import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
Expand All @@ -40,26 +42,31 @@
public class FlushTableSubprocedure extends Subprocedure {
private static final Logger LOG = LoggerFactory.getLogger(FlushTableSubprocedure.class);

private final HRegionServer rs;
private final String table;
private final List<String> families;
private final List<HRegion> regions;
private final FlushTableSubprocedurePool taskManager;

public FlushTableSubprocedure(ProcedureMember member, ForeignExceptionDispatcher errorListener,
long wakeFrequency, long timeout, List<HRegion> regions, String table, List<String> families,
public FlushTableSubprocedure(HRegionServer rs, ProcedureMember member,
ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout,
List<HRegion> regions, String table, List<String> families,
FlushTableSubprocedurePool taskManager) {
super(member, table, errorListener, wakeFrequency, timeout);
this.rs = rs;
this.table = table;
this.families = families;
this.regions = regions;
this.taskManager = taskManager;
}

private static class RegionFlushTask implements Callable<Void> {
HRegionServer rs;
HRegion region;
List<byte[]> families;

RegionFlushTask(HRegion region, List<byte[]> families) {
RegionFlushTask(HRegionServer rs, HRegion region, List<byte[]> families) {
this.rs = rs;
this.region = region;
this.families = families;
}
Expand All @@ -70,12 +77,20 @@ public Void call() throws Exception {
region.startRegionOperation();
try {
LOG.debug("Flush region " + region.toString() + " started...");
HRegion.FlushResult flushResult;
if (families == null || families.isEmpty()) {
region.flush(true);
flushResult = region.flush(true);
} else {
region.flushcache(families, false, FlushLifeCycleTracker.DUMMY);
flushResult = region.flushcache(families, false, FlushLifeCycleTracker.DUMMY);
}
if (flushResult.getResult() == HRegion.FlushResult.Result.CANNOT_FLUSH) {
throw new IOException("Unable to complete flush " + region.getRegionInfo());
}
boolean shouldCompact = flushResult.isCompactionNeeded();
if (shouldCompact) {
rs.getCompactSplitThread().requestSystemCompaction(region,
"Compaction is triggered by flush operation");
}
// TODO: flush result is not checked?
} finally {
LOG.debug("Closing region operation on " + region);
region.closeRegionOperation();
Expand Down Expand Up @@ -106,7 +121,7 @@ private void flushRegions() throws ForeignException {
// Add all hfiles already existing in region.
for (HRegion region : regions) {
// submit one task per region for parallelize by region.
taskManager.submitTask(new RegionFlushTask(region, familiesToFlush));
taskManager.submitTask(new RegionFlushTask(rs, region, familiesToFlush));
monitor.rethrowException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ public Subprocedure buildSubprocedure(String table, List<String> families) {

FlushTableSubprocedurePool taskManager =
new FlushTableSubprocedurePool(rss.getServerName().toString(), conf, rss);
return new FlushTableSubprocedure(member, exnDispatcher, wakeMillis, timeoutMillis,
involvedRegions, table, families, taskManager);
return new FlushTableSubprocedure((HRegionServer) rss, member, exnDispatcher, wakeMillis,
timeoutMillis, involvedRegions, table, families, taskManager);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ protected void doCall() throws Exception {
if (res.getResult() == HRegion.FlushResult.Result.CANNOT_FLUSH) {
throw new IOException("Unable to complete flush " + regionInfo);
}
boolean shouldCompact = res.isCompactionNeeded();
if (shouldCompact) {
rs.getCompactSplitThread().requestSystemCompaction(region,
"Compaction is triggered by flush procedure");
}
} finally {
LOG.debug("Closing region operation on {}", region);
region.closeRegionOperation();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,25 @@ public void testCompactionTimestamps() throws Exception {
table.put(p).join();
admin.flush(tableName).join();
}
admin.majorCompact(tableName).join();

// The flush operation would trigger minor compaction because the number of store files in the
// same column family exceeds 3, After waiting for the minor compaction to complete, proceed
// with the following operations to avoid interference.
Thread.sleep(1000);
long curt = EnvironmentEdgeManager.currentTime();
long waitTime = 10000;
long endt = curt + waitTime;
CompactionState state = admin.getCompactionState(tableName).get();
while (state != CompactionState.NONE && curt < endt) {
Thread.sleep(10);
state = admin.getCompactionState(tableName).get();
curt = EnvironmentEdgeManager.currentTime();
}

admin.majorCompact(tableName).join();
curt = EnvironmentEdgeManager.currentTime();
endt = curt + waitTime;
state = admin.getCompactionState(tableName).get();
LOG.info("Current compaction state 1 is " + state);
while (state == CompactionState.NONE && curt < endt) {
Thread.sleep(100);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.junit.After;
Expand Down Expand Up @@ -237,6 +239,47 @@ public void testAsyncFlushRegionServer() throws Exception {
}
}

@Test
public void testCompactAfterFlushing() throws IOException, InterruptedException {
TableName testCompactTable = TableName.valueOf("testCompactAfterFlushingTable");
int compactionThreshold =
TEST_UTIL.getConfiguration().getInt("hbase.hstore.compactionThreshold", 3);
try (Admin admin = TEST_UTIL.getAdmin();
Table t = TEST_UTIL.createTable(testCompactTable, "info")) {
assertTrue(TEST_UTIL.getAdmin().tableExists(testCompactTable));
List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(testCompactTable);
assertEquals(1, regions.size());
for (int i = 1; i <= compactionThreshold - 1; i++) {
Put put = new Put(Bytes.toBytes(i));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("vvvvv"));
t.put(put);
admin.flush(testCompactTable);
}
int storefilesCount =
regions.get(0).getStores().stream().mapToInt(Store::getStorefilesCount).sum();
assertEquals(compactionThreshold - 1, storefilesCount);

Put put = new Put(Bytes.toBytes(compactionThreshold));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("vvvvv"));
t.put(put);
admin.flush(testCompactTable);
Thread.sleep(1000);
long curt = EnvironmentEdgeManager.currentTime();
long waitTime = 10000;
long endt = curt + waitTime;
CompactionState state = admin.getCompactionState(tableName);
while (state != CompactionState.NONE && curt < endt) {
Thread.sleep(10);
state = admin.getCompactionState(tableName);
curt = EnvironmentEdgeManager.currentTime();
}
storefilesCount =
regions.get(0).getStores().stream().mapToInt(Store::getStorefilesCount).sum();
assertTrue(storefilesCount < compactionThreshold);
}
TEST_UTIL.deleteTable(tableName);
}

private List<HRegion> getRegionInfo() {
return TEST_UTIL.getHBaseCluster().getRegions(tableName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,17 +275,18 @@ public void testAdvancedConfigOverride() throws Exception {
TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
Admin admin = TEST_UTIL.getAdmin();

// Create 3 store files.
// Create 2 store files.
byte[] row = Bytes.toBytes(ThreadLocalRandom.current().nextInt());
performMultiplePutAndFlush(admin, table, row, FAMILY, 3, 100);
performMultiplePutAndFlush(admin, table, row, FAMILY, 2, 100);

try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
// Verify we have multiple store files.
HRegionLocation loc = locator.getRegionLocation(row, true);
assertTrue(getStoreFileCount(admin, loc.getServerName(), loc.getRegion()) > 1);

// Issue a compaction request
admin.compact(tableName);
// Create 1 store file, which would trigger minor compaction automatically because the
// number of store files exceeds 3
performMultiplePutAndFlush(admin, table, row, FAMILY, 1, 100);

// poll wait for the compactions to happen
for (int i = 0; i < 10 * 1000 / 40; ++i) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public void setUp() throws Exception {
}

private void initConf() {

conf.setInt("hbase.hstore.compaction.min", 3);
conf.setInt("hfile.format.version", 3);
conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, 0);
conf.setInt("hbase.client.retries.number", 100);
Expand Down Expand Up @@ -147,9 +147,9 @@ public void testMobFileCleanerChore() throws InterruptedException, IOException {

loadData(0, 10);
loadData(10, 10);
loadData(20, 10);
// loadData(20, 10);
long num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
assertEquals(3, num);
assertEquals(2, num);
// Major compact
admin.majorCompact(tableDescriptor.getTableName(), fam);
// wait until compaction is complete
Expand All @@ -158,7 +158,7 @@ public void testMobFileCleanerChore() throws InterruptedException, IOException {
}

num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
assertEquals(4, num);
assertEquals(3, num);
// We have guarantee, that compcated file discharger will run during this pause
// because it has interval less than this wait time
LOG.info("Waiting for {}ms", minAgeToArchive + 1000);
Expand All @@ -170,16 +170,16 @@ public void testMobFileCleanerChore() throws InterruptedException, IOException {

// verify that nothing have happened
num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
assertEquals(4, num);
assertEquals(3, num);

long scanned = scanTable();
assertEquals(30, scanned);
assertEquals(20, scanned);

// add a MOB file to with a name refering to a non-existing region
Path extraMOBFile = MobTestUtil.generateMOBFileForRegion(conf, table.getName(),
familyDescriptor, "nonExistentRegion");
num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
assertEquals(5, num);
assertEquals(4, num);

LOG.info("Waiting for {}ms", minAgeToArchive + 1000);

Expand All @@ -189,13 +189,13 @@ public void testMobFileCleanerChore() throws InterruptedException, IOException {

// check that the extra file got deleted
num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
assertEquals(4, num);
assertEquals(3, num);

FileSystem fs = FileSystem.get(conf);
assertFalse(fs.exists(extraMOBFile));

scanned = scanTable();
assertEquals(30, scanned);
assertEquals(20, scanned);

}

Expand Down