-
Notifications
You must be signed in to change notification settings - Fork 3.4k
HBASE-25830 HBaseCluster support CompactionServer for UTs (addendum) #3464
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -436,6 +436,27 @@ public JVMClusterUtil.RegionServerThread startRegionServer() | |
| return startRegionServer(newConf); | ||
| } | ||
|
|
||
| /** | ||
| * Starts a compaction server thread running | ||
| * | ||
| * @throws IOException | ||
| * @return New CompactionServerThread | ||
| */ | ||
| public JVMClusterUtil.CompactionServerThread startCompactionServer() throws IOException { | ||
| final Configuration configuration = HBaseConfiguration.create(conf); | ||
| User rsUser = HBaseTestingUtility.getDifferentUser(configuration, ".hfs." + index++); | ||
| JVMClusterUtil.CompactionServerThread t = null; | ||
| try { | ||
| t = hbaseCluster.addCompactionServer(configuration, | ||
| hbaseCluster.getCompactionServers().size(), rsUser); | ||
| t.start(); | ||
| t.waitForServerOnline(); | ||
| } catch (InterruptedException ie) { | ||
| throw new IOException("Interrupted adding compactionserver to cluster", ie); | ||
| } | ||
| return t; | ||
| } | ||
|
|
||
| private JVMClusterUtil.RegionServerThread startRegionServer(Configuration configuration) | ||
| throws IOException { | ||
| User rsUser = | ||
|
|
@@ -502,6 +523,20 @@ public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber) { | |
| return stopRegionServer(serverNumber, true); | ||
| } | ||
|
|
||
| /** | ||
| * Shut down the specified compaction server cleanly | ||
| * | ||
| * @param serverNumber Used as index into a list. | ||
| * @return the compaction server that was stopped | ||
| */ | ||
| public JVMClusterUtil.CompactionServerThread stopCompactionServer(int serverNumber) { | ||
| JVMClusterUtil.CompactionServerThread server = | ||
| hbaseCluster.getCompactionServers().get(serverNumber); | ||
| LOG.info("Stopping " + server.toString()); | ||
| server.getCompactionServer().stop("Stopping rs " + serverNumber); | ||
| return server; | ||
| } | ||
|
|
||
| /** | ||
| * Shut down the specified region server cleanly | ||
| * | ||
|
|
@@ -557,6 +592,15 @@ public String waitOnRegionServer(final int serverNumber) { | |
| return this.hbaseCluster.waitOnRegionServer(serverNumber); | ||
| } | ||
|
|
||
| /** | ||
| * Wait for the specified compaction server to stop. Removes this thread from list | ||
| * of running threads. | ||
| * @param serverNumber | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nits: remove empty param doc |
||
| * @return Name of compaction server that just went down. | ||
| */ | ||
| public String waitOnCompactionServer(final int serverNumber) { | ||
| return this.hbaseCluster.waitOnCompactionServer(serverNumber); | ||
| } | ||
|
|
||
| /** | ||
| * Starts a master thread running | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -95,8 +95,12 @@ public static void afterClass() throws Exception { | |
|
|
||
| @Before | ||
| public void before() throws Exception { | ||
| TEST_UTIL.createTable(TABLENAME, FAMILY); | ||
| TableDescriptor tableDescriptor = | ||
| TableDescriptorBuilder.newBuilder(TABLENAME).setCompactionOffloadEnabled(true).build(); | ||
| TEST_UTIL.createTable(tableDescriptor, Bytes.toByteArrays(FAMILY), | ||
| TEST_UTIL.getConfiguration()); | ||
| TEST_UTIL.waitTableAvailable(TABLENAME); | ||
| COMPACTION_SERVER.requestCount.reset(); | ||
| } | ||
|
|
||
| @After | ||
|
|
@@ -169,9 +173,9 @@ public void testCompactionWithVersions() throws Exception { | |
| TEST_UTIL.getAdmin().compactionSwitch(false, new ArrayList<>()); | ||
| ColumnFamilyDescriptor cfd = | ||
| ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(FAMILY)).setMaxVersions(3).build(); | ||
| TableDescriptor modifiedtableDescriptor = | ||
| TableDescriptorBuilder.newBuilder(TABLENAME).setColumnFamily(cfd).build(); | ||
| TEST_UTIL.getAdmin().modifyTable(modifiedtableDescriptor); | ||
| TableDescriptor modifiedTableDescriptor = TableDescriptorBuilder.newBuilder(TABLENAME) | ||
| .setColumnFamily(cfd).setCompactionOffloadEnabled(true).build(); | ||
| TEST_UTIL.getAdmin().modifyTable(modifiedTableDescriptor); | ||
| TEST_UTIL.waitTableAvailable(TABLENAME); | ||
| doFillRecord(1, 500, RandomUtils.nextBytes(20)); | ||
| doFillRecord(1, 500, RandomUtils.nextBytes(20)); | ||
|
|
@@ -198,6 +202,8 @@ public void testCompactionWithVersions() throws Exception { | |
| return hFileCount == 1; | ||
| }); | ||
|
|
||
| // To ensure do compaction on compaction server | ||
| TEST_UTIL.waitFor(60000, () -> COMPACTION_SERVER.requestCount.sum() > 0); | ||
| kVCount = 0; | ||
| for (HRegion region : TEST_UTIL.getHBaseCluster().getRegions(TABLENAME)) { | ||
| for (HStoreFile hStoreFile : region.getStore(Bytes.toBytes(FAMILY)).getStorefiles()) { | ||
|
|
@@ -208,11 +214,11 @@ public void testCompactionWithVersions() throws Exception { | |
| verifyRecord(1, 500, true); | ||
| } | ||
|
|
||
|
|
||
| @Test | ||
| public void testCompactionServerDown() throws Exception { | ||
| TEST_UTIL.getAdmin().compactionSwitch(false, new ArrayList<>()); | ||
| COMPACTION_SERVER.stop("test"); | ||
| TEST_UTIL.getHBaseCluster().stopCompactionServer(0); | ||
| TEST_UTIL.getHBaseCluster().waitOnCompactionServer(0); | ||
| TEST_UTIL.waitFor(60000, | ||
| () -> MASTER.getCompactionOffloadManager().getOnlineServersList().size() == 0); | ||
| doPutRecord(1, 1000, true); | ||
|
|
@@ -232,6 +238,12 @@ public void testCompactionServerDown() throws Exception { | |
| return hFile == 1; | ||
| }); | ||
| verifyRecord(1, 1000, true); | ||
| TEST_UTIL.getHBaseCluster().startCompactionServer(); | ||
| COMPACTION_SERVER = TEST_UTIL.getMiniHBaseCluster().getCompactionServerThreads().get(0) | ||
| .getCompactionServer(); | ||
| COMPACTION_SERVER_NAME = COMPACTION_SERVER.getServerName(); | ||
| TEST_UTIL.waitFor(60000, | ||
| () -> MASTER.getCompactionOffloadManager().getOnlineServersList().size() == 1); | ||
| } | ||
|
|
||
| @Test | ||
|
|
@@ -277,20 +289,20 @@ public void testCompactionOffloadTableDescriptor() throws Exception { | |
|
|
||
| TableDescriptor htd = | ||
| TableDescriptorBuilder.newBuilder(TEST_UTIL.getAdmin().getDescriptor(TABLENAME)) | ||
| .setCompactionOffloadEnabled(true).build(); | ||
| .setCompactionOffloadEnabled(false).build(); | ||
| TEST_UTIL.getAdmin().modifyTable(htd); | ||
| TEST_UTIL.waitUntilAllRegionsAssigned(TABLENAME); | ||
| // invoke compact | ||
| TEST_UTIL.compact(TABLENAME, false); | ||
| TEST_UTIL.waitFor(6000, () -> COMPACTION_SERVER.requestCount.sum() > 0); | ||
| long requestCount = COMPACTION_SERVER.requestCount.sum(); | ||
| Thread.sleep(1000); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Mind explaining a bit here?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK,this sleep is no need. My Initially thought is compact request is async, this sleep will make the check below effective, After view code I find the increment of request count on compaction server is sync in compaction request |
||
| TEST_UTIL.waitFor(6000, () -> COMPACTION_SERVER.requestCount.sum() == 0); | ||
|
|
||
| htd = TableDescriptorBuilder.newBuilder(TEST_UTIL.getAdmin().getDescriptor(TABLENAME)) | ||
| .setCompactionOffloadEnabled(false).build(); | ||
| .setCompactionOffloadEnabled(true).build(); | ||
| TEST_UTIL.getAdmin().modifyTable(htd); | ||
| TEST_UTIL.waitUntilAllRegionsAssigned(TABLENAME); | ||
| // invoke compact | ||
| TEST_UTIL.compact(TABLENAME, false); | ||
| TEST_UTIL.waitFor(6000, () -> COMPACTION_SERVER.requestCount.sum() == requestCount); | ||
| TEST_UTIL.waitFor(6000, () -> COMPACTION_SERVER.requestCount.sum() > 0); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove empty throws doc