diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CSRpcServices.java index 25cfa1a2e607..7faa665d48d6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CSRpcServices.java @@ -19,7 +19,6 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import org.apache.hadoop.conf.Configuration; @@ -42,7 +41,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompactResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompactionService; -import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; @InterfaceAudience.Private public class CSRpcServices extends AbstractRpcServices @@ -96,12 +94,11 @@ public CompactResponse requestCompaction(RpcController controller, ColumnFamilyDescriptor cfd = ProtobufUtil.toColumnFamilyDescriptor(request.getFamily()); boolean major = request.getMajor(); int priority = request.getPriority(); - List favoredNodes = Collections.singletonList(request.getServer()); LOG.info("Receive compaction request from {}", ProtobufUtil.toString(request)); - CompactionTask compactionTask = - CompactionTask.newBuilder().setRsServerName(rsServerName).setRegionInfo(regionInfo) - .setColumnFamilyDescriptor(cfd).setRequestMajor(major).setPriority(priority) - .setFavoredNodes(favoredNodes).setSubmitTime(System.currentTimeMillis()).build(); + CompactionTask compactionTask = CompactionTask.newBuilder().setRsServerName(rsServerName) + .setRegionInfo(regionInfo).setColumnFamilyDescriptor(cfd).setRequestMajor(major) + .setPriority(priority).setFavoredNodes(request.getFavoredNodesList()) + .setSubmitTime(System.currentTimeMillis()).build(); try { compactionServer.compactionThreadManager.requestCompaction(compactionTask); return CompactionProtos.CompactResponse.newBuilder().build(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java index 2eb697d4526e..f5fcfe21d763 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java @@ -150,6 +150,27 @@ public void requestCompaction(CompactionTask compactionTask) throws IOException } } + /** + * Open store, and clean stale compacted file in cache + */ + private HStore openStore(RegionInfo regionInfo, ColumnFamilyDescriptor cfd, boolean major, + MonitoredTask status) throws IOException { + status.setStatus("Open store"); + HStore store = getStore(conf, server.getFileSystem(), rootDir, + tableDescriptors.get(regionInfo.getTable()), regionInfo, cfd.getNameAsString()); + // handle TTL case + store.removeUnneededFiles(false); + // CompactedHFilesDischarger only run on regionserver, so compactionserver does not have + // opportunity to clean compacted file at that time, we clean compacted files here + compactionFilesCache.cleanupCompactedFiles(regionInfo, cfd, + store.getStorefiles().stream().map(sf -> sf.getPath().getName()).collect(Collectors.toSet())); + if (major) { + status.setStatus("Trigger major compaction"); + store.triggerMajorCompaction(); + } + return store; + } + private void selectFileAndExecuteTask(CompactionTask compactionTask) throws IOException { ServerName rsServerName = compactionTask.getRsServerName(); RegionInfo regionInfo = compactionTask.getRegionInfo(); @@ -169,10 +190,10 @@ private void selectFileAndExecuteTask(CompactionTask compactionTask) throws IOEx // the three has consistent state, we need this condition to guarantee correct selection synchronized (compactionFilesCache.getCompactedStoreFilesAsLock(regionInfo, cfd)) { synchronized (compactionFilesCache.getSelectedStoreFilesAsLock(regionInfo, cfd)) { - Pair> pair = selectCompaction(regionInfo, cfd, + store = openStore(regionInfo, cfd, compactionTask.isRequestMajor(), status); + store.assignFavoredNodesForCompactionOffload(compactionTask.getFavoredNodes()); + Optional compaction = selectCompaction(store, regionInfo, cfd, compactionTask.isRequestMajor(), compactionTask.getPriority(), status, logStr); - store = pair.getFirst(); - Optional compaction = pair.getSecond(); if (!compaction.isPresent()) { store.close(); LOG.info("Compaction context is empty: {}", compactionTask); @@ -204,26 +225,12 @@ private void selectFileAndExecuteTask(CompactionTask compactionTask) throws IOEx } /** - * Open store, and select compaction context - * @return Store and CompactionContext + * select compaction context + * @return CompactionContext */ - Pair> selectCompaction(RegionInfo regionInfo, + Optional selectCompaction(HStore store, RegionInfo regionInfo, ColumnFamilyDescriptor cfd, boolean major, int priority, MonitoredTask status, String logStr) throws IOException { - status.setStatus("Open store"); - tableDescriptors.get(regionInfo.getTable()); - HStore store = getStore(conf, server.getFileSystem(), rootDir, - tableDescriptors.get(regionInfo.getTable()), regionInfo, cfd.getNameAsString()); - // handle TTL case - store.removeUnneededFiles(false); - // CompactedHFilesDischarger only run on regionserver, so compactionserver does not have - // opportunity to clean compacted file at that time, we clean compacted files here - compactionFilesCache.cleanupCompactedFiles(regionInfo, cfd, - store.getStorefiles().stream().map(sf -> sf.getPath().getName()).collect(Collectors.toSet())); - if (major) { - status.setStatus("Trigger major compaction"); - store.triggerMajorCompaction(); - } // get current compacting and compacted files, NOTE: these files are file names only, don't // include paths. status.setStatus("Get current compacting and compacted files from compactionFilesCache"); @@ -243,7 +250,7 @@ Pair> selectCompaction(RegionInfo regionInfo CompactionLifeCycleTracker.DUMMY, null, excludeStoreFiles); LOG.info("After select store: {}, if compaction context is present: {}", logStr, compaction.isPresent()); - return new Pair<>(store, compaction); + return compaction; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index b2df9b787c1c..f5e82df6d82e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -3758,16 +3758,18 @@ public boolean requestCompactRegion(RegionInfo regionInfo, ColumnFamilyDescripto } CompactionService.BlockingInterface cms = cmsStub; InetSocketAddress[] favoredNodesForRegion = - getFavoredNodesForRegion(regionInfo.getEncodedName()); - CompactRequest.Builder builder = - CompactRequest.newBuilder().setServer(ProtobufUtil.toServerName(getServerName())) + getFavoredNodesForRegion(regionInfo.getEncodedName()); + CompactRequest.Builder builder = CompactRequest.newBuilder() + .setServer(ProtobufUtil.toServerName(getServerName())) .setRegionInfo(ProtobufUtil.toRegionInfo(regionInfo)) .setFamily(ProtobufUtil.toColumnFamilySchema(cfd)).setMajor(major).setPriority(priority); - if (favoredNodesForRegion != null) { + if (favoredNodesForRegion != null && favoredNodesForRegion.length > 0) { for (InetSocketAddress address : favoredNodesForRegion) { builder.addFavoredNodes(ProtobufUtil - .toServerName(ServerName.valueOf(address.getHostName(), address.getPort(), 0L))); + .toServerName(ServerName.valueOf(address.getHostName(), address.getPort(), 0L))); } + } else { + builder.addFavoredNodes(ProtobufUtil.toServerName(getServerName())); } CompactRequest compactRequest = builder.build(); try { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 0569ad3c7adf..cacea25cc39b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -118,6 +118,7 @@ import org.apache.hbase.thirdparty.org.apache.commons.collections4.IterableUtils; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; /** @@ -344,14 +345,29 @@ private StoreContext initializeStoreContext(ColumnFamilyDescriptor family) throw } private InetSocketAddress[] getFavoredNodes() { - InetSocketAddress[] favoredNodes = null; if (region.getRegionServerServices() != null) { - favoredNodes = region.getRegionServerServices().getFavoredNodesForRegion( - region.getRegionInfo().getEncodedName()); + return region.getRegionServerServices() + .getFavoredNodesForRegion(region.getRegionInfo().getEncodedName()); } return favoredNodes; } + // Favored nodes used by compaction offload + private InetSocketAddress[] favoredNodes = null; + + // This method is not thread safe. + // We initialize a new store everytime for a compaction request when compaction offload. + // So the method is only called once after initializeStoreContext and before real do compaction. + public void assignFavoredNodesForCompactionOffload(List favoredNodes) { + if (CollectionUtils.isNotEmpty(favoredNodes)) { + this.favoredNodes = new InetSocketAddress[favoredNodes.size()]; + for (int i = 0; i < favoredNodes.size(); i++) { + this.favoredNodes[i] = InetSocketAddress.createUnresolved(favoredNodes.get(i).getHostName(), + favoredNodes.get(i).getPort()); + } + } + } + /** * @return MemStore Instance to use in this store. */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestRegionFavoredNodesWhenCompactOffload.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestRegionFavoredNodesWhenCompactOffload.java new file mode 100644 index 000000000000..d531c05bd177 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestRegionFavoredNodesWhenCompactOffload.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.compactionserver; + +import java.net.InetSocketAddress; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.StartMiniClusterOption; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.TestRegionFavoredNodes; +import org.apache.hadoop.hbase.testclassification.CompactionServerTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Assume; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ CompactionServerTests.class, MediumTests.class }) +public class TestRegionFavoredNodesWhenCompactOffload extends TestRegionFavoredNodes { + private static HCompactionServer COMPACTION_SERVER; + private static final int FLUSHES = 10; + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRegionFavoredNodesWhenCompactOffload.class); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + try { + checkFileSystemWithFavoredNode(); + } catch (NoSuchMethodException nm) { + return; + } + TableDescriptor tableDescriptor = + TableDescriptorBuilder.newBuilder(TABLE_NAME).setCompactionOffloadEnabled(true).build(); + TEST_UTIL.startMiniCluster(StartMiniClusterOption.builder().numCompactionServers(1) + .numDataNodes(REGION_SERVERS).numRegionServers(REGION_SERVERS).build()); + TEST_UTIL.getAdmin().switchCompactionOffload(true); + TEST_UTIL.getMiniHBaseCluster().waitForActiveAndReadyMaster(); + table = TEST_UTIL.createTable(tableDescriptor, Bytes.toByteArrays(COLUMN_FAMILY), + HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE, TEST_UTIL.getConfiguration()); + TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME); + COMPACTION_SERVER = TEST_UTIL.getMiniHBaseCluster().getCompactionServerThreads().get(0) + .getCompactionServer(); + } + + @Test + public void testFavoredNodes() throws Exception { + Assume.assumeTrue(createWithFavoredNode != null); + InetSocketAddress[] nodes = getDataNodes(); + String[] nodeNames = new String[REGION_SERVERS]; + for (int i = 0; i < REGION_SERVERS; i++) { + nodeNames[i] = nodes[i].getAddress().getHostAddress() + ":" + nodes[i].getPort(); + } + updateFavoredNodes(nodes); + // Write some data to each region and flush. Repeat some number of times to + // get multiple files for each region. + for (int i = 0; i < FLUSHES; i++) { + TEST_UTIL.loadTable(table, COLUMN_FAMILY, false); + TEST_UTIL.flush(); + } + TEST_UTIL.compact(TABLE_NAME, true); + TEST_UTIL.waitFor(60000, () -> { + int hFileCount = 0; + for (HRegion region : TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME)) { + hFileCount += region.getStore(COLUMN_FAMILY).getStorefilesCount(); + + } + return hFileCount == HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE.length + 1; + }); + checkFavoredNodes(nodeNames); + // To ensure do compaction on compaction server + TEST_UTIL.waitFor(60000, () -> COMPACTION_SERVER.requestCount.sum() > 0); + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionFavoredNodes.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionFavoredNodes.java index 64d3cb9f8b4c..2493be4c7eee 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionFavoredNodes.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionFavoredNodes.java @@ -55,22 +55,27 @@ public class TestRegionFavoredNodes { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestRegionFavoredNodes.class); - private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private static Table table; - private static final TableName TABLE_NAME = + protected static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + protected static Table table; + protected static final TableName TABLE_NAME = TableName.valueOf("table"); - private static final byte[] COLUMN_FAMILY = Bytes.toBytes("family"); + protected static final byte[] COLUMN_FAMILY = Bytes.toBytes("family"); private static final int FAVORED_NODES_NUM = 3; - private static final int REGION_SERVERS = 6; + protected static final int REGION_SERVERS = 6; private static final int FLUSHES = 3; - private static Method createWithFavoredNode = null; + protected static Method createWithFavoredNode = null; + + protected static void checkFileSystemWithFavoredNode() throws Exception { + createWithFavoredNode = DistributedFileSystem.class.getDeclaredMethod("create", Path.class, + FsPermission.class, boolean.class, int.class, short.class, long.class, Progressable.class, + InetSocketAddress[].class); + + } @BeforeClass public static void setUpBeforeClass() throws Exception { try { - createWithFavoredNode = DistributedFileSystem.class.getDeclaredMethod("create", Path.class, - FsPermission.class, boolean.class, int.class, short.class, long.class, - Progressable.class, InetSocketAddress[].class); + checkFileSystemWithFavoredNode(); } catch (NoSuchMethodException nm) { return; } @@ -91,9 +96,36 @@ public static void tearDownAfterClass() throws Exception { TEST_UTIL.shutdownMiniCluster(); } - @Test - public void testFavoredNodes() throws Exception { - Assume.assumeTrue(createWithFavoredNode != null); + protected void checkFavoredNodes(String[] nodeNames) throws Exception { + // For each region, check the block locations of each file and ensure that + // they are consistent with the favored nodes for that region. + for (int i = 0; i < REGION_SERVERS; i++) { + HRegionServer server = TEST_UTIL.getHBaseCluster().getRegionServer(i); + List regions = server.getRegions(TABLE_NAME); + for (HRegion region : regions) { + List files = region.getStoreFileList(new byte[][] { COLUMN_FAMILY }); + for (String file : files) { + FileStatus status = TEST_UTIL.getDFSCluster().getFileSystem() + .getFileStatus(new Path(new URI(file).getPath())); + BlockLocation[] lbks = ((DistributedFileSystem) TEST_UTIL.getDFSCluster().getFileSystem()) + .getFileBlockLocations(status, 0, Long.MAX_VALUE); + for (BlockLocation lbk : lbks) { + locations: for (String info : lbk.getNames()) { + for (int j = 0; j < FAVORED_NODES_NUM; j++) { + if (info.equals(nodeNames[(i + j) % REGION_SERVERS])) { + continue locations; + } + } + // This block was at a location that was not a favored location. + fail("Block location " + info + " not a favored node"); + } + } + } + } + } + } + + protected InetSocketAddress[] getDataNodes() throws Exception{ // Get the addresses of the datanodes in the cluster. InetSocketAddress[] nodes = new InetSocketAddress[REGION_SERVERS]; List datanodes = TEST_UTIL.getDFSCluster().getDataNodes(); @@ -106,13 +138,10 @@ public void testFavoredNodes() throws Exception { for (int i = 0; i < REGION_SERVERS; i++) { nodes[i] = (InetSocketAddress)selfAddress.invoke(datanodes.get(i)); } + return nodes; + } - String[] nodeNames = new String[REGION_SERVERS]; - for (int i = 0; i < REGION_SERVERS; i++) { - nodeNames[i] = nodes[i].getAddress().getHostAddress() + ":" + - nodes[i].getPort(); - } - + protected void updateFavoredNodes(InetSocketAddress[] nodes){ // For each region, choose some datanodes as the favored nodes then assign // them as favored nodes through the region. for (int i = 0; i < REGION_SERVERS; i++) { @@ -120,11 +149,11 @@ public void testFavoredNodes() throws Exception { List regions = server.getRegions(TABLE_NAME); for (HRegion region : regions) { ListfavoredNodes = - new ArrayList<>(3); + new ArrayList<>(3); String encodedRegionName = region.getRegionInfo().getEncodedName(); for (int j = 0; j < FAVORED_NODES_NUM; j++) { org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ServerName.Builder b = - org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ServerName.newBuilder(); + org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ServerName.newBuilder(); b.setHostName(nodes[(i + j) % REGION_SERVERS].getAddress().getHostAddress()); b.setPort(nodes[(i + j) % REGION_SERVERS].getPort()); b.setStartCode(-1); @@ -133,41 +162,23 @@ public void testFavoredNodes() throws Exception { server.updateRegionFavoredNodesMapping(encodedRegionName, favoredNodes); } } + } + @Test + public void testFavoredNodes() throws Exception { + Assume.assumeTrue(createWithFavoredNode != null); + InetSocketAddress[] nodes = getDataNodes(); + String[] nodeNames = new String[REGION_SERVERS]; + for (int i = 0; i < REGION_SERVERS; i++) { + nodeNames[i] = nodes[i].getAddress().getHostAddress() + ":" + nodes[i].getPort(); + } + updateFavoredNodes(nodes); // Write some data to each region and flush. Repeat some number of times to // get multiple files for each region. for (int i = 0; i < FLUSHES; i++) { TEST_UTIL.loadTable(table, COLUMN_FAMILY, false); TEST_UTIL.flush(); } - - // For each region, check the block locations of each file and ensure that - // they are consistent with the favored nodes for that region. - for (int i = 0; i < REGION_SERVERS; i++) { - HRegionServer server = TEST_UTIL.getHBaseCluster().getRegionServer(i); - List regions = server.getRegions(TABLE_NAME); - for (HRegion region : regions) { - List files = region.getStoreFileList(new byte[][]{COLUMN_FAMILY}); - for (String file : files) { - FileStatus status = TEST_UTIL.getDFSCluster().getFileSystem(). - getFileStatus(new Path(new URI(file).getPath())); - BlockLocation[] lbks = - ((DistributedFileSystem)TEST_UTIL.getDFSCluster().getFileSystem()) - .getFileBlockLocations(status, 0, Long.MAX_VALUE); - for (BlockLocation lbk : lbks) { - locations: - for (String info : lbk.getNames()) { - for (int j = 0; j < FAVORED_NODES_NUM; j++) { - if (info.equals(nodeNames[(i + j) % REGION_SERVERS])) { - continue locations; - } - } - // This block was at a location that was not a favored location. - fail("Block location " + info + " not a favored node"); - } - } - } - } - } + checkFavoredNodes(nodeNames); } }