diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java index 88ddf6102a55..1a0d1b978b55 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.procedure.flush; +import java.io.IOException; import java.util.List; import java.util.concurrent.Callable; import java.util.stream.Collectors; @@ -27,6 +28,7 @@ import org.apache.hadoop.hbase.procedure.flush.RegionServerFlushTableProcedureManager.FlushTableSubprocedurePool; import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -40,15 +42,18 @@ public class FlushTableSubprocedure extends Subprocedure { private static final Logger LOG = LoggerFactory.getLogger(FlushTableSubprocedure.class); + private final HRegionServer rs; private final String table; private final List families; private final List regions; private final FlushTableSubprocedurePool taskManager; - public FlushTableSubprocedure(ProcedureMember member, ForeignExceptionDispatcher errorListener, - long wakeFrequency, long timeout, List regions, String table, List families, + public FlushTableSubprocedure(HRegionServer rs, ProcedureMember member, + ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout, + List regions, String table, List families, FlushTableSubprocedurePool taskManager) { super(member, table, errorListener, wakeFrequency, timeout); + this.rs = rs; this.table = table; this.families = families; this.regions = regions; @@ -56,10 +61,12 @@ public FlushTableSubprocedure(ProcedureMember member, ForeignExceptionDispatcher } private static class RegionFlushTask implements Callable { + HRegionServer rs; HRegion region; List families; - RegionFlushTask(HRegion region, List families) { + RegionFlushTask(HRegionServer rs, HRegion region, List families) { + this.rs = rs; this.region = region; this.families = families; } @@ -70,12 +77,20 @@ public Void call() throws Exception { region.startRegionOperation(); try { LOG.debug("Flush region " + region.toString() + " started..."); + HRegion.FlushResult flushResult; if (families == null || families.isEmpty()) { - region.flush(true); + flushResult = region.flush(true); } else { - region.flushcache(families, false, FlushLifeCycleTracker.DUMMY); + flushResult = region.flushcache(families, false, FlushLifeCycleTracker.DUMMY); + } + if (flushResult.getResult() == HRegion.FlushResult.Result.CANNOT_FLUSH) { + throw new IOException("Unable to complete flush " + region.getRegionInfo()); + } + boolean shouldCompact = flushResult.isCompactionNeeded(); + if (shouldCompact) { + rs.getCompactSplitThread().requestSystemCompaction(region, + "Compaction is triggered by flush operation"); } - // TODO: flush result is not checked? } finally { LOG.debug("Closing region operation on " + region); region.closeRegionOperation(); @@ -106,7 +121,7 @@ private void flushRegions() throws ForeignException { // Add all hfiles already existing in region. for (HRegion region : regions) { // submit one task per region for parallelize by region. - taskManager.submitTask(new RegionFlushTask(region, familiesToFlush)); + taskManager.submitTask(new RegionFlushTask(rs, region, familiesToFlush)); monitor.rethrowException(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java index 2cee89b57498..8d0ff006250e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java @@ -160,8 +160,8 @@ public Subprocedure buildSubprocedure(String table, List families) { FlushTableSubprocedurePool taskManager = new FlushTableSubprocedurePool(rss.getServerName().toString(), conf, rss); - return new FlushTableSubprocedure(member, exnDispatcher, wakeMillis, timeoutMillis, - involvedRegions, table, families, taskManager); + return new FlushTableSubprocedure((HRegionServer) rss, member, exnDispatcher, wakeMillis, + timeoutMillis, involvedRegions, table, families, taskManager); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRegionCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRegionCallable.java index 3dd932a1736d..73ed6358002b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRegionCallable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRegionCallable.java @@ -60,6 +60,11 @@ protected void doCall() throws Exception { if (res.getResult() == HRegion.FlushResult.Result.CANNOT_FLUSH) { throw new IOException("Unable to complete flush " + regionInfo); } + boolean shouldCompact = res.isCompactionNeeded(); + if (shouldCompact) { + rs.getCompactSplitThread().requestSystemCompaction(region, + "Compaction is triggered by flush procedure"); + } } finally { LOG.debug("Closing region operation on {}", region); region.closeRegionOperation(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi2.java index 9db82a3bcd82..0523a03462c3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi2.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi2.java @@ -243,11 +243,25 @@ public void testCompactionTimestamps() throws Exception { table.put(p).join(); admin.flush(tableName).join(); } - admin.majorCompact(tableName).join(); + + // The flush operation would trigger minor compaction because the number of store files in the + // same column family exceeds 3, After waiting for the minor compaction to complete, proceed + // with the following operations to avoid interference. + Thread.sleep(1000); long curt = EnvironmentEdgeManager.currentTime(); long waitTime = 10000; long endt = curt + waitTime; CompactionState state = admin.getCompactionState(tableName).get(); + while (state != CompactionState.NONE && curt < endt) { + Thread.sleep(10); + state = admin.getCompactionState(tableName).get(); + curt = EnvironmentEdgeManager.currentTime(); + } + + admin.majorCompact(tableName).join(); + curt = EnvironmentEdgeManager.currentTime(); + endt = curt + waitTime; + state = admin.getCompactionState(tableName).get(); LOG.info("Current compaction state 1 is " + state); while (state == CompactionState.NONE && curt < endt) { Thread.sleep(100); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFlushFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFlushFromClient.java index ed41ce49fac9..a0938135bf6e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFlushFromClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFlushFromClient.java @@ -36,9 +36,11 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; +import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.junit.After; @@ -237,6 +239,47 @@ public void testAsyncFlushRegionServer() throws Exception { } } + @Test + public void testCompactAfterFlushing() throws IOException, InterruptedException { + TableName testCompactTable = TableName.valueOf("testCompactAfterFlushingTable"); + int compactionThreshold = + TEST_UTIL.getConfiguration().getInt("hbase.hstore.compactionThreshold", 3); + try (Admin admin = TEST_UTIL.getAdmin(); + Table t = TEST_UTIL.createTable(testCompactTable, "info")) { + assertTrue(TEST_UTIL.getAdmin().tableExists(testCompactTable)); + List regions = TEST_UTIL.getHBaseCluster().getRegions(testCompactTable); + assertEquals(1, regions.size()); + for (int i = 1; i <= compactionThreshold - 1; i++) { + Put put = new Put(Bytes.toBytes(i)); + put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("vvvvv")); + t.put(put); + admin.flush(testCompactTable); + } + int storefilesCount = + regions.get(0).getStores().stream().mapToInt(Store::getStorefilesCount).sum(); + assertEquals(compactionThreshold - 1, storefilesCount); + + Put put = new Put(Bytes.toBytes(compactionThreshold)); + put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("vvvvv")); + t.put(put); + admin.flush(testCompactTable); + Thread.sleep(1000); + long curt = EnvironmentEdgeManager.currentTime(); + long waitTime = 10000; + long endt = curt + waitTime; + CompactionState state = admin.getCompactionState(tableName); + while (state != CompactionState.NONE && curt < endt) { + Thread.sleep(10); + state = admin.getCompactionState(tableName); + curt = EnvironmentEdgeManager.currentTime(); + } + storefilesCount = + regions.get(0).getStores().stream().mapToInt(Store::getStorefilesCount).sum(); + assertTrue(storefilesCount < compactionThreshold); + } + TEST_UTIL.deleteTable(tableName); + } + private List getRegionInfo() { return TEST_UTIL.getHBaseCluster().getRegions(tableName); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java index 27a08a8e9b2e..6635f5aa7127 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java @@ -275,17 +275,18 @@ public void testAdvancedConfigOverride() throws Exception { TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); Admin admin = TEST_UTIL.getAdmin(); - // Create 3 store files. + // Create 2 store files. byte[] row = Bytes.toBytes(ThreadLocalRandom.current().nextInt()); - performMultiplePutAndFlush(admin, table, row, FAMILY, 3, 100); + performMultiplePutAndFlush(admin, table, row, FAMILY, 2, 100); try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) { // Verify we have multiple store files. HRegionLocation loc = locator.getRegionLocation(row, true); assertTrue(getStoreFileCount(admin, loc.getServerName(), loc.getRegion()) > 1); - // Issue a compaction request - admin.compact(tableName); + // Create 1 store file, which would trigger minor compaction automatically because the + // number of store files exceeds 3 + performMultiplePutAndFlush(admin, table, row, FAMILY, 1, 100); // poll wait for the compactions to happen for (int i = 0; i < 10 * 1000 / 40; ++i) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCleanupUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCleanupUtil.java index fc9eceb62412..d75439aef18c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCleanupUtil.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCleanupUtil.java @@ -99,7 +99,7 @@ public void setUp() throws Exception { } private void initConf() { - + conf.setInt("hbase.hstore.compaction.min", 3); conf.setInt("hfile.format.version", 3); conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, 0); conf.setInt("hbase.client.retries.number", 100); @@ -147,9 +147,9 @@ public void testMobFileCleanerChore() throws InterruptedException, IOException { loadData(0, 10); loadData(10, 10); - loadData(20, 10); + // loadData(20, 10); long num = getNumberOfMobFiles(conf, table.getName(), new String(fam)); - assertEquals(3, num); + assertEquals(2, num); // Major compact admin.majorCompact(tableDescriptor.getTableName(), fam); // wait until compaction is complete @@ -158,7 +158,7 @@ public void testMobFileCleanerChore() throws InterruptedException, IOException { } num = getNumberOfMobFiles(conf, table.getName(), new String(fam)); - assertEquals(4, num); + assertEquals(3, num); // We have guarantee, that compcated file discharger will run during this pause // because it has interval less than this wait time LOG.info("Waiting for {}ms", minAgeToArchive + 1000); @@ -170,16 +170,16 @@ public void testMobFileCleanerChore() throws InterruptedException, IOException { // verify that nothing have happened num = getNumberOfMobFiles(conf, table.getName(), new String(fam)); - assertEquals(4, num); + assertEquals(3, num); long scanned = scanTable(); - assertEquals(30, scanned); + assertEquals(20, scanned); // add a MOB file to with a name refering to a non-existing region Path extraMOBFile = MobTestUtil.generateMOBFileForRegion(conf, table.getName(), familyDescriptor, "nonExistentRegion"); num = getNumberOfMobFiles(conf, table.getName(), new String(fam)); - assertEquals(5, num); + assertEquals(4, num); LOG.info("Waiting for {}ms", minAgeToArchive + 1000); @@ -189,13 +189,13 @@ public void testMobFileCleanerChore() throws InterruptedException, IOException { // check that the extra file got deleted num = getNumberOfMobFiles(conf, table.getName(), new String(fam)); - assertEquals(4, num); + assertEquals(3, num); FileSystem fs = FileSystem.get(conf); assertFalse(fs.exists(extraMOBFile)); scanned = scanTable(); - assertEquals(30, scanned); + assertEquals(20, scanned); }