diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java index ec551822ec20..b856c7a71977 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java @@ -168,6 +168,19 @@ public LocalHBaseCluster(final Configuration conf, final int noMasters, LOG.debug("Setting RS InfoServer Port to random."); conf.set(HConstants.REGIONSERVER_INFO_PORT, "0"); } + if (conf.getInt(HConstants.COMPACTION_SERVER_PORT, HConstants.DEFAULT_COMPACTION_SERVER_PORT) + == HConstants.DEFAULT_COMPACTION_SERVER_PORT) { + LOG.debug("Setting CompactionServer Port to random."); + conf.set(HConstants.COMPACTION_SERVER_PORT, "0"); + } + // treat info ports special; expressly don't change '-1' (keep off) + // in case we make that the default behavior. + if (conf.getInt(HConstants.COMPACTION_SERVER_INFO_PORT, 0) != -1 && conf.getInt( + HConstants.COMPACTION_SERVER_INFO_PORT, HConstants.DEFAULT_COMPACTION_SERVER_INFOPORT) + == HConstants.DEFAULT_COMPACTION_SERVER_INFOPORT) { + LOG.debug("Setting CS InfoServer Port to random."); + conf.set(HConstants.COMPACTION_SERVER_INFO_PORT, "0"); + } if (conf.getInt(HConstants.MASTER_INFO_PORT, 0) != -1 && conf.getInt(HConstants.MASTER_INFO_PORT, HConstants.DEFAULT_MASTER_INFOPORT) == HConstants.DEFAULT_MASTER_INFOPORT) { @@ -308,6 +321,17 @@ public String waitOnRegionServer(int serverNumber) { return waitOnRegionServer(regionServerThread); } + /** + * Wait for the specified compaction server to stop. Removes this thread from list of running + * threads. + * @return Name of compaction server that just went down. + */ + public String waitOnCompactionServer(int serverNumber) { + JVMClusterUtil.CompactionServerThread regionServerThread = + this.compactionServerThreads.get(serverNumber); + return waitOnCompactionServer(regionServerThread); + } + /** * Wait for the specified region server to stop. Removes this thread from list of running threads. * @return Name of region server that just went down. @@ -326,6 +350,25 @@ public String waitOnRegionServer(JVMClusterUtil.RegionServerThread rst) { return rst.getName(); } + /** + * Wait for the specified compaction server to stop. Removes this thread from list of running + * threads. + * @return Name of compaction server that just went down. + */ + public String waitOnCompactionServer(JVMClusterUtil.CompactionServerThread cst) { + while (cst.isAlive()) { + try { + LOG.info("Waiting on " + cst.getCompactionServer().toString()); + cst.join(); + } catch (InterruptedException e) { + LOG.error("Interrupted while waiting for {} to finish. Retrying join", cst.getName(), e); + Thread.currentThread().interrupt(); + } + } + compactionServerThreads.remove(cst); + return cst.getName(); + } + /** * @return the HMaster thread */ @@ -427,6 +470,18 @@ public void join() { } } } + if (this.compactionServerThreads != null) { + for(Thread t: this.compactionServerThreads) { + if (t.isAlive()) { + try { + Threads.threadDumpingIsAlive(t); + } catch (InterruptedException e) { + LOG.debug("Interrupted", e); + } + } + } + } + } @SuppressWarnings("unchecked") @@ -491,7 +546,7 @@ public void startup() throws IOException { * Shut down the mini HBase cluster */ public void shutdown() { - JVMClusterUtil.shutdown(this.masterThreads, this.regionThreads); + JVMClusterUtil.shutdown(this.masterThreads, this.regionThreads, this.compactionServerThreads); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java index 57a74a0ea7a7..679190026944 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java @@ -294,12 +294,64 @@ private static void waitForEvent(long millis, String action, Supplier c } + private static boolean shutdown(final List servers, + boolean wasInterrupted) { + final long maxTime = System.currentTimeMillis() + 30 * 1000; + // first try nicely. + for (Thread t : servers) { + long now = System.currentTimeMillis(); + if (t.isAlive() && !wasInterrupted && now < maxTime) { + try { + t.join(maxTime - now); + } catch (InterruptedException e) { + LOG.info( + "Got InterruptedException on shutdown - " + "not waiting anymore on region server ends", + e); + wasInterrupted = true; // someone wants us to speed up. + } + } + } + + // Let's try to interrupt the remaining threads if any. + for (int i = 0; i < 100; ++i) { + boolean atLeastOneLiveServer = false; + for (Thread t : servers) { + if (t.isAlive()) { + atLeastOneLiveServer = true; + try { + LOG.warn("{} remaining, give one more chance before interrupting", + t.getClass().getSimpleName()); + t.join(1000); + } catch (InterruptedException e) { + wasInterrupted = true; + } + } + } + if (!atLeastOneLiveServer) { + break; + } + for (Thread t : servers) { + if (t.isAlive()) { + LOG.warn( + "{} taking too long to stop, interrupting; thread dump " + "if > 3 attempts: i=" + i, + t.getClass().getSimpleName()); + if (i > 3) { + Threads.printThreadInfo(System.out, "Thread dump " + t.getName()); + } + t.interrupt(); + } + } + } + return wasInterrupted; + } + /** * @param masters * @param regionservers */ public static void shutdown(final List masters, - final List regionservers) { + final List regionservers, + final List compactionServers) { LOG.debug("Shutting down HBase Cluster"); if (masters != null) { // Do backups first. @@ -335,51 +387,18 @@ public static void shutdown(final List masters, } } boolean wasInterrupted = false; - final long maxTime = System.currentTimeMillis() + 30 * 1000; if (regionservers != null) { - // first try nicely. for (RegionServerThread t : regionservers) { t.getRegionServer().stop("Shutdown requested"); } - for (RegionServerThread t : regionservers) { - long now = System.currentTimeMillis(); - if (t.isAlive() && !wasInterrupted && now < maxTime) { - try { - t.join(maxTime - now); - } catch (InterruptedException e) { - LOG.info("Got InterruptedException on shutdown - " + - "not waiting anymore on region server ends", e); - wasInterrupted = true; // someone wants us to speed up. - } - } - } - - // Let's try to interrupt the remaining threads if any. - for (int i = 0; i < 100; ++i) { - boolean atLeastOneLiveServer = false; - for (RegionServerThread t : regionservers) { - if (t.isAlive()) { - atLeastOneLiveServer = true; - try { - LOG.warn("RegionServerThreads remaining, give one more chance before interrupting"); - t.join(1000); - } catch (InterruptedException e) { - wasInterrupted = true; - } - } - } - if (!atLeastOneLiveServer) break; - for (RegionServerThread t : regionservers) { - if (t.isAlive()) { - LOG.warn("RegionServerThreads taking too long to stop, interrupting; thread dump " + - "if > 3 attempts: i=" + i); - if (i > 3) { - Threads.printThreadInfo(System.out, "Thread dump " + t.getName()); - } - t.interrupt(); - } - } + wasInterrupted = shutdown(regionservers, wasInterrupted); + } + if (compactionServers != null) { + // first try nicely. + for (CompactionServerThread t : compactionServers) { + t.getCompactionServer().stop("Shutdown requested"); } + wasInterrupted = shutdown(compactionServers, wasInterrupted); } if (masters != null) { @@ -398,12 +417,14 @@ public static void shutdown(final List masters, } } } - LOG.info("Shutdown of " + - ((masters != null) ? masters.size() : "0") + " master(s) and " + - ((regionservers != null) ? regionservers.size() : "0") + - " regionserver(s) " + (wasInterrupted ? "interrupted" : "complete")); + LOG.info("Shutdown of " + ((masters != null) ? masters.size() : "0") + " master(s) and " + + ((regionservers != null) ? regionservers.size() : "0") + " regionserver(s) " + + ((compactionServers != null) ? compactionServers.size() : "0") + " compactionServer(s) " + + (wasInterrupted ? "interrupted" : "complete") + + ); - if (wasInterrupted){ + if (wasInterrupted) { Thread.currentThread().interrupt(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java index fc36ac80023b..05fad48a5814 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java @@ -436,6 +436,25 @@ public JVMClusterUtil.RegionServerThread startRegionServer() return startRegionServer(newConf); } + /** + * Starts a compaction server thread running + * @return New CompactionServerThread + */ + public JVMClusterUtil.CompactionServerThread startCompactionServer() throws IOException { + final Configuration configuration = HBaseConfiguration.create(conf); + User rsUser = HBaseTestingUtility.getDifferentUser(configuration, ".hfs." + index++); + JVMClusterUtil.CompactionServerThread t = null; + try { + t = hbaseCluster.addCompactionServer(configuration, + hbaseCluster.getCompactionServers().size(), rsUser); + t.start(); + t.waitForServerOnline(); + } catch (InterruptedException ie) { + throw new IOException("Interrupted adding compactionserver to cluster", ie); + } + return t; + } + private JVMClusterUtil.RegionServerThread startRegionServer(Configuration configuration) throws IOException { User rsUser = @@ -502,6 +521,20 @@ public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber) { return stopRegionServer(serverNumber, true); } + /** + * Shut down the specified compaction server cleanly + * + * @param serverNumber Used as index into a list. + * @return the compaction server that was stopped + */ + public JVMClusterUtil.CompactionServerThread stopCompactionServer(int serverNumber) { + JVMClusterUtil.CompactionServerThread server = + hbaseCluster.getCompactionServers().get(serverNumber); + LOG.info("Stopping " + server.toString()); + server.getCompactionServer().stop("Stopping rs " + serverNumber); + return server; + } + /** * Shut down the specified region server cleanly * @@ -557,6 +590,14 @@ public String waitOnRegionServer(final int serverNumber) { return this.hbaseCluster.waitOnRegionServer(serverNumber); } + /** + * Wait for the specified compaction server to stop. Removes this thread from list + * of running threads. + * @return Name of compaction server that just went down. + */ + public String waitOnCompactionServer(final int serverNumber) { + return this.hbaseCluster.waitOnCompactionServer(serverNumber); + } /** * Starts a master thread running diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServer.java index dfc17e497501..6d0ccccce4d7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServer.java @@ -95,8 +95,12 @@ public static void afterClass() throws Exception { @Before public void before() throws Exception { - TEST_UTIL.createTable(TABLENAME, FAMILY); + TableDescriptor tableDescriptor = + TableDescriptorBuilder.newBuilder(TABLENAME).setCompactionOffloadEnabled(true).build(); + TEST_UTIL.createTable(tableDescriptor, Bytes.toByteArrays(FAMILY), + TEST_UTIL.getConfiguration()); TEST_UTIL.waitTableAvailable(TABLENAME); + COMPACTION_SERVER.requestCount.reset(); } @After @@ -169,9 +173,9 @@ public void testCompactionWithVersions() throws Exception { TEST_UTIL.getAdmin().compactionSwitch(false, new ArrayList<>()); ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(FAMILY)).setMaxVersions(3).build(); - TableDescriptor modifiedtableDescriptor = - TableDescriptorBuilder.newBuilder(TABLENAME).setColumnFamily(cfd).build(); - TEST_UTIL.getAdmin().modifyTable(modifiedtableDescriptor); + TableDescriptor modifiedTableDescriptor = TableDescriptorBuilder.newBuilder(TABLENAME) + .setColumnFamily(cfd).setCompactionOffloadEnabled(true).build(); + TEST_UTIL.getAdmin().modifyTable(modifiedTableDescriptor); TEST_UTIL.waitTableAvailable(TABLENAME); doFillRecord(1, 500, RandomUtils.nextBytes(20)); doFillRecord(1, 500, RandomUtils.nextBytes(20)); @@ -198,6 +202,8 @@ public void testCompactionWithVersions() throws Exception { return hFileCount == 1; }); + // To ensure do compaction on compaction server + TEST_UTIL.waitFor(60000, () -> COMPACTION_SERVER.requestCount.sum() > 0); kVCount = 0; for (HRegion region : TEST_UTIL.getHBaseCluster().getRegions(TABLENAME)) { for (HStoreFile hStoreFile : region.getStore(Bytes.toBytes(FAMILY)).getStorefiles()) { @@ -208,11 +214,11 @@ public void testCompactionWithVersions() throws Exception { verifyRecord(1, 500, true); } - @Test public void testCompactionServerDown() throws Exception { TEST_UTIL.getAdmin().compactionSwitch(false, new ArrayList<>()); - COMPACTION_SERVER.stop("test"); + TEST_UTIL.getHBaseCluster().stopCompactionServer(0); + TEST_UTIL.getHBaseCluster().waitOnCompactionServer(0); TEST_UTIL.waitFor(60000, () -> MASTER.getCompactionOffloadManager().getOnlineServersList().size() == 0); doPutRecord(1, 1000, true); @@ -232,6 +238,12 @@ public void testCompactionServerDown() throws Exception { return hFile == 1; }); verifyRecord(1, 1000, true); + TEST_UTIL.getHBaseCluster().startCompactionServer(); + COMPACTION_SERVER = TEST_UTIL.getMiniHBaseCluster().getCompactionServerThreads().get(0) + .getCompactionServer(); + COMPACTION_SERVER_NAME = COMPACTION_SERVER.getServerName(); + TEST_UTIL.waitFor(60000, + () -> MASTER.getCompactionOffloadManager().getOnlineServersList().size() == 1); } @Test @@ -277,20 +289,19 @@ public void testCompactionOffloadTableDescriptor() throws Exception { TableDescriptor htd = TableDescriptorBuilder.newBuilder(TEST_UTIL.getAdmin().getDescriptor(TABLENAME)) - .setCompactionOffloadEnabled(true).build(); + .setCompactionOffloadEnabled(false).build(); TEST_UTIL.getAdmin().modifyTable(htd); TEST_UTIL.waitUntilAllRegionsAssigned(TABLENAME); // invoke compact TEST_UTIL.compact(TABLENAME, false); - TEST_UTIL.waitFor(6000, () -> COMPACTION_SERVER.requestCount.sum() > 0); - long requestCount = COMPACTION_SERVER.requestCount.sum(); + TEST_UTIL.waitFor(6000, () -> COMPACTION_SERVER.requestCount.sum() == 0); htd = TableDescriptorBuilder.newBuilder(TEST_UTIL.getAdmin().getDescriptor(TABLENAME)) - .setCompactionOffloadEnabled(false).build(); + .setCompactionOffloadEnabled(true).build(); TEST_UTIL.getAdmin().modifyTable(htd); TEST_UTIL.waitUntilAllRegionsAssigned(TABLENAME); // invoke compact TEST_UTIL.compact(TABLENAME, false); - TEST_UTIL.waitFor(6000, () -> COMPACTION_SERVER.requestCount.sum() == requestCount); + TEST_UTIL.waitFor(6000, () -> COMPACTION_SERVER.requestCount.sum() > 0); } }