diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
index fc465f0387ad..35410f720bc3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
@@ -47,6 +47,7 @@
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.regionserver.RSAnnotationReadingPriorityFunction;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ExceptionUtil;
@@ -183,8 +184,7 @@ public static HRegionLocation getRegionLocation(Connection connection, byte[] re
} catch (Exception parseEx) {
// Ignore. This is used with tableName passed as regionName.
}
- Get get = new Get(row);
- get.addFamily(HConstants.CATALOG_FAMILY);
+ Get get = buildInternalCatalogFamilyGet(row);
Result r;
try (Table t = getMetaHTable(connection)) {
r = t.get(get);
@@ -210,8 +210,7 @@ public static HRegionLocation getRegionLocation(Connection connection, RegionInf
/** Returns Return the {@link HConstants#CATALOG_FAMILY} row from hbase:meta. */
public static Result getCatalogFamilyRow(Connection connection, RegionInfo ri)
throws IOException {
- Get get = new Get(CatalogFamilyFormat.getMetaKeyForRegion(ri));
- get.addFamily(HConstants.CATALOG_FAMILY);
+ Get get = buildInternalCatalogFamilyGet(CatalogFamilyFormat.getMetaKeyForRegion(ri));
try (Table t = getMetaHTable(connection)) {
return t.get(get);
}
@@ -225,13 +224,17 @@ public static Result getCatalogFamilyRow(Connection connection, RegionInfo ri)
*/
public static Result getRegionResult(Connection connection, byte[] regionName)
throws IOException {
- Get get = new Get(regionName);
- get.addFamily(HConstants.CATALOG_FAMILY);
+ Get get = buildInternalCatalogFamilyGet(regionName);
try (Table t = getMetaHTable(connection)) {
return t.get(get);
}
}
+ private static Get buildInternalCatalogFamilyGet(byte[] row) {
+ return new Get(row).addFamily(HConstants.CATALOG_FAMILY)
+ .setPriority(RSAnnotationReadingPriorityFunction.INTERNAL_READ_QOS);
+ }
+
/**
* Scans META table for a row whose key contains the specified regionEncodedName, returning
* a single related Result instance if any row is found, null otherwise.
@@ -340,6 +343,7 @@ private static Scan getMetaScan(Configuration conf, int rowUpperLimit) {
scan.setReadType(Scan.ReadType.PREAD);
}
scan.setCaching(scannerCaching);
+ scan.setPriority(RSAnnotationReadingPriorityFunction.INTERNAL_READ_QOS);
return scan;
}
@@ -387,7 +391,7 @@ public static void fullScanMetaAndPrint(Connection connection) throws IOExceptio
LOG.info("fullScanMetaAndPrint.Current Meta Row: " + r);
TableState state = CatalogFamilyFormat.getTableState(r);
if (state != null) {
- LOG.info("fullScanMetaAndPrint.Table State={}" + state);
+ LOG.info("fullScanMetaAndPrint.Table State={}", state);
} else {
RegionLocations locations = CatalogFamilyFormat.getRegionLocations(r);
if (locations == null) {
@@ -409,6 +413,14 @@ public static void scanMetaForTableRegions(Connection connection,
scanMeta(connection, tableName, QueryType.REGION, Integer.MAX_VALUE, visitor);
}
+ public static void scanMetaForTableRegions(Connection connection,
+ ClientMetaTableAccessor.Visitor visitor, TableName tableName, int priority) throws IOException {
+ scanMeta(connection,
+ ClientMetaTableAccessor.getTableStartRowForMeta(tableName, QueryType.REGION),
+ ClientMetaTableAccessor.getTableStopRowForMeta(tableName, QueryType.REGION), QueryType.REGION,
+ null, Integer.MAX_VALUE, visitor, priority);
+ }
+
private static void scanMeta(Connection connection, TableName table, QueryType type, int maxRows,
final ClientMetaTableAccessor.Visitor visitor) throws IOException {
scanMeta(connection, ClientMetaTableAccessor.getTableStartRowForMeta(table, type),
@@ -463,6 +475,23 @@ public static void scanMeta(Connection connection, @Nullable final byte[] startR
public static void scanMeta(Connection connection, @Nullable final byte[] startRow,
@Nullable final byte[] stopRow, QueryType type, @Nullable Filter filter, int maxRows,
final ClientMetaTableAccessor.Visitor visitor) throws IOException {
+ scanMeta(connection, startRow, stopRow, type, filter, maxRows, visitor,
+ RSAnnotationReadingPriorityFunction.INTERNAL_READ_QOS);
+ }
+
+ /**
+ * Performs a scan of META table.
+ * @param connection connection we're using
+ * @param startRow Where to start the scan. Pass null if want to begin scan at first row.
+ * @param stopRow Where to stop the scan. Pass null if want to scan all rows from the start one
+ * @param type scanned part of meta
+ * @param maxRows maximum rows to return
+ * @param visitor Visitor invoked against each row.
+ * @param priority priority assigned to the meta read request
+ */
+ public static void scanMeta(Connection connection, @Nullable final byte[] startRow,
+ @Nullable final byte[] stopRow, QueryType type, @Nullable Filter filter, int maxRows,
+ final ClientMetaTableAccessor.Visitor visitor, final int priority) throws IOException {
int rowUpperLimit = maxRows > 0 ? maxRows : Integer.MAX_VALUE;
Scan scan = getMetaScan(connection.getConfiguration(), rowUpperLimit);
@@ -479,10 +508,14 @@ public static void scanMeta(Connection connection, @Nullable final byte[] startR
scan.setFilter(filter);
}
+ scan.setPriority(priority);
+
if (LOG.isTraceEnabled()) {
- LOG.trace("Scanning META" + " starting at row=" + Bytes.toStringBinary(startRow)
- + " stopping at row=" + Bytes.toStringBinary(stopRow) + " for max=" + rowUpperLimit
- + " with caching=" + scan.getCaching());
+ LOG.trace(
+ "Scanning META starting at row={} stopping at row={} for max={} with caching={} "
+ + "priority={}",
+ Bytes.toStringBinary(startRow), Bytes.toStringBinary(stopRow), rowUpperLimit,
+ scan.getCaching(), priority);
}
int currentRow = 0;
@@ -584,8 +617,9 @@ public static TableState getTableState(Connection conn, TableName tableName) thr
return new TableState(tableName, TableState.State.ENABLED);
}
Table metaHTable = getMetaHTable(conn);
- Get get = new Get(tableName.getName()).addColumn(HConstants.TABLE_FAMILY,
- HConstants.TABLE_STATE_QUALIFIER);
+ Get get = new Get(tableName.getName())
+ .addColumn(HConstants.TABLE_FAMILY, HConstants.TABLE_STATE_QUALIFIER)
+ .setPriority(RSAnnotationReadingPriorityFunction.INTERNAL_READ_QOS);
Result result = metaHTable.get(get);
return CatalogFamilyFormat.getTableState(result);
}
@@ -881,7 +915,7 @@ private static void updateLocation(Connection connection, RegionInfo regionInfo,
addRegionInfo(put, regionInfo);
addLocation(put, sn, openSeqNum, regionInfo.getReplicaId());
putToMetaTable(connection, put);
- LOG.info("Updated row {} with server=", regionInfo.getRegionNameAsString(), sn);
+ LOG.info("Updated row {} with server={}", regionInfo.getRegionNameAsString(), sn);
}
public static Put addRegionInfo(final Put p, final RegionInfo hri) throws IOException {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetaRWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetaRWQueueRpcExecutor.java
index a86e6554b1cc..97c3a8765256 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetaRWQueueRpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetaRWQueueRpcExecutor.java
@@ -19,6 +19,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.regionserver.RSAnnotationReadingPriorityFunction;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
@@ -32,7 +33,10 @@ public class MetaRWQueueRpcExecutor extends RWQueueRpcExecutor {
"hbase.ipc.server.metacallqueue.read.ratio";
public static final String META_CALL_QUEUE_SCAN_SHARE_CONF_KEY =
"hbase.ipc.server.metacallqueue.scan.ratio";
- public static final float DEFAULT_META_CALL_QUEUE_READ_SHARE = 0.9f;
+ public static final String META_CALL_QUEUE_HANDLER_FACTOR_CONF_KEY =
+ "hbase.ipc.server.metacallqueue.handler.factor";
+ public static final float DEFAULT_META_CALL_QUEUE_READ_SHARE = 0.8f;
+ private static final float DEFAULT_META_CALL_QUEUE_SCAN_SHARE = 0.2f;
public MetaRWQueueRpcExecutor(final String name, final int handlerCount, final int maxQueueLength,
final PriorityFunction priority, final Configuration conf, final Abortable abortable) {
@@ -46,6 +50,23 @@ protected float getReadShare(final Configuration conf) {
@Override
protected float getScanShare(final Configuration conf) {
- return conf.getFloat(META_CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0);
+ return conf.getFloat(META_CALL_QUEUE_SCAN_SHARE_CONF_KEY, DEFAULT_META_CALL_QUEUE_SCAN_SHARE);
+ }
+
+ @Override
+ public boolean dispatch(CallRunner callTask) {
+ RpcCall call = callTask.getRpcCall();
+ int level = call.getHeader().getPriority();
+ final boolean toWriteQueue = isWriteRequest(call.getHeader(), call.getParam());
+ // dispatch client system read request to read handlers
+ // dispatch internal system read request to scan handlers
+ final boolean toScanQueue =
+ getNumScanQueues() > 0 && level == RSAnnotationReadingPriorityFunction.INTERNAL_READ_QOS;
+ return dispatchTo(toWriteQueue, toScanQueue, callTask);
+ }
+
+ @Override
+ protected float getCallQueueHandlerFactor(Configuration conf) {
+ return conf.getFloat(META_CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0.5f);
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
index 4030304a11e7..9b242970b035 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
@@ -295,4 +295,8 @@ private void propagateBalancerConfigChange(QueueBalancer balancer, Configuration
((ConfigurationObserver) balancer).onConfigurationChange(conf);
}
}
+
+ protected int getNumScanQueues() {
+ return numScanQueues;
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
index 7e5bdfcc7d6f..15c9afe030c2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
@@ -130,7 +130,7 @@ public RpcExecutor(final String name, final int handlerCount, final String callQ
this.conf = conf;
this.abortable = abortable;
- float callQueuesHandlersFactor = this.conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0.1f);
+ float callQueuesHandlersFactor = getCallQueueHandlerFactor(conf);
if (
Float.compare(callQueuesHandlersFactor, 1.0f) > 0
|| Float.compare(0.0f, callQueuesHandlersFactor) > 0
@@ -468,4 +468,8 @@ public void onConfigurationChange(Configuration conf) {
}
}
}
+
+ protected float getCallQueueHandlerFactor(Configuration conf) {
+ return conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0.1f);
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/janitor/CatalogJanitor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/janitor/CatalogJanitor.java
index 9c0513b56b9e..d84fd9fe5996 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/janitor/CatalogJanitor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/janitor/CatalogJanitor.java
@@ -234,7 +234,8 @@ && cleanParent(e.getKey(), e.getValue())
protected CatalogJanitorReport scanForReport() throws IOException {
ReportMakingVisitor visitor = new ReportMakingVisitor(this.services);
// Null tablename means scan all of meta.
- MetaTableAccessor.scanMetaForTableRegions(this.services.getConnection(), visitor, null);
+ MetaTableAccessor.scanMetaForTableRegions(this.services.getConnection(), visitor, null,
+ HConstants.SYSTEMTABLE_QOS);
return visitor.getReport();
}
@@ -491,7 +492,8 @@ public static void main(String[] args) throws IOException {
r.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER));
t.put(p);
}
- MetaTableAccessor.scanMetaForTableRegions(connection, visitor, null);
+ MetaTableAccessor.scanMetaForTableRegions(connection, visitor, null,
+ HConstants.SYSTEMTABLE_QOS);
CatalogJanitorReport report = visitor.getReport();
LOG.info(report != null ? report.toString() : "empty");
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSAnnotationReadingPriorityFunction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSAnnotationReadingPriorityFunction.java
index 1197f7b5359c..bddec7154f5c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSAnnotationReadingPriorityFunction.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSAnnotationReadingPriorityFunction.java
@@ -46,7 +46,8 @@
* Priority function specifically for the region server.
*/
@InterfaceAudience.Private
-class RSAnnotationReadingPriorityFunction extends AnnotationReadingPriorityFunction {
+public class RSAnnotationReadingPriorityFunction
+ extends AnnotationReadingPriorityFunction {
private static final Logger LOG =
LoggerFactory.getLogger(RSAnnotationReadingPriorityFunction.class);
@@ -54,6 +55,8 @@ class RSAnnotationReadingPriorityFunction extends AnnotationReadingPriorityFunct
/** Used to control the scan delay, currently sqrt(numNextCall * weight) */
public static final String SCAN_VTIME_WEIGHT_CONF_KEY = "hbase.ipc.server.scan.vtime.weight";
+ public static final int INTERNAL_READ_QOS = 250;
+
@SuppressWarnings("unchecked")
private final Class extends Message>[] knownArgumentClasses =
new Class[] { GetRegionInfoRequest.class, GetStoreFileRequest.class, CloseRegionRequest.class,
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
index 0c629231728f..888ec05520bc 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
@@ -48,6 +48,7 @@
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl;
+import org.apache.hadoop.hbase.regionserver.RSAnnotationReadingPriorityFunction;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RPCTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -108,7 +109,7 @@ public void testBasic() throws IOException, InterruptedException {
RpcScheduler scheduler = new SimpleRpcScheduler(conf, 10, 0, 0, qosFunction, 0);
scheduler.init(CONTEXT);
scheduler.start();
- CallRunner task = createMockTask();
+ CallRunner task = createMockTask(HConstants.NORMAL_QOS);
task.setStatus(new MonitoredRPCHandlerImpl());
scheduler.dispatch(task);
verify(task, timeout(10000)).run();
@@ -163,7 +164,7 @@ public void testCallQueueInfo() throws IOException, InterruptedException {
int totalCallMethods = 10;
for (int i = totalCallMethods; i > 0; i--) {
- CallRunner task = createMockTask();
+ CallRunner task = createMockTask(HConstants.NORMAL_QOS);
task.setStatus(new MonitoredRPCHandlerImpl());
scheduler.dispatch(task);
}
@@ -185,9 +186,9 @@ public void testCallQueueInfo() throws IOException, InterruptedException {
@Test
public void testHandlerIsolation() throws IOException, InterruptedException {
- CallRunner generalTask = createMockTask();
- CallRunner priorityTask = createMockTask();
- CallRunner replicationTask = createMockTask();
+ CallRunner generalTask = createMockTask(HConstants.NORMAL_QOS);
+ CallRunner priorityTask = createMockTask(HConstants.HIGH_QOS + 1);
+ CallRunner replicationTask = createMockTask(HConstants.REPLICATION_QOS);
List tasks = ImmutableList.of(generalTask, priorityTask, replicationTask);
Map qos = ImmutableMap.of(generalTask, 0, priorityTask,
HConstants.HIGH_QOS + 1, replicationTask, HConstants.REPLICATION_QOS);
@@ -227,10 +228,12 @@ public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
assertEquals(3, ImmutableSet.copyOf(handlerThreads.values()).size());
}
- private CallRunner createMockTask() {
+ private CallRunner createMockTask(int priority) {
ServerCall call = mock(ServerCall.class);
CallRunner task = mock(CallRunner.class);
+ RequestHeader header = RequestHeader.newBuilder().setPriority(priority).build();
when(task.getRpcCall()).thenReturn(call);
+ when(call.getHeader()).thenReturn(header);
return task;
}
@@ -707,7 +710,7 @@ public void testFastPathBalancedQueueRpcExecutorWithQueueLength0() throws Except
@Test
public void testMetaRWScanQueues() throws Exception {
Configuration schedConf = HBaseConfiguration.create();
- schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
+ schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f);
schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f);
@@ -728,36 +731,37 @@ public void testMetaRWScanQueues() throws Exception {
when(putCall.getHeader()).thenReturn(putHead);
when(putCall.getParam()).thenReturn(putCall.param);
- CallRunner getCallTask = mock(CallRunner.class);
- ServerCall getCall = mock(ServerCall.class);
- RequestHeader getHead = RequestHeader.newBuilder().setMethodName("get").build();
- when(getCallTask.getRpcCall()).thenReturn(getCall);
- when(getCall.getHeader()).thenReturn(getHead);
-
- CallRunner scanCallTask = mock(CallRunner.class);
- ServerCall scanCall = mock(ServerCall.class);
- scanCall.param = ScanRequest.newBuilder().build();
- RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build();
- when(scanCallTask.getRpcCall()).thenReturn(scanCall);
- when(scanCall.getHeader()).thenReturn(scanHead);
- when(scanCall.getParam()).thenReturn(scanCall.param);
+ CallRunner clientReadCallTask = mock(CallRunner.class);
+ ServerCall clientReadCall = mock(ServerCall.class);
+ RequestHeader clientReadHead = RequestHeader.newBuilder().setMethodName("get").build();
+ when(clientReadCallTask.getRpcCall()).thenReturn(clientReadCall);
+ when(clientReadCall.getHeader()).thenReturn(clientReadHead);
+
+ CallRunner internalReadCallTask = mock(CallRunner.class);
+ ServerCall internalReadCall = mock(ServerCall.class);
+ internalReadCall.param = ScanRequest.newBuilder().build();
+ RequestHeader masterReadHead = RequestHeader.newBuilder().setMethodName("get")
+ .setPriority(RSAnnotationReadingPriorityFunction.INTERNAL_READ_QOS).build();
+ when(internalReadCallTask.getRpcCall()).thenReturn(internalReadCall);
+ when(internalReadCall.getHeader()).thenReturn(masterReadHead);
+ when(internalReadCall.getParam()).thenReturn(internalReadCall.param);
ArrayList work = new ArrayList<>();
doAnswerTaskExecution(putCallTask, work, 1, 1000);
- doAnswerTaskExecution(getCallTask, work, 2, 1000);
- doAnswerTaskExecution(scanCallTask, work, 3, 1000);
+ doAnswerTaskExecution(clientReadCallTask, work, 2, 1000);
+ doAnswerTaskExecution(internalReadCallTask, work, 3, 1000);
// There are 3 queues: [puts], [gets], [scans]
// so the calls will be interleaved
scheduler.dispatch(putCallTask);
scheduler.dispatch(putCallTask);
scheduler.dispatch(putCallTask);
- scheduler.dispatch(getCallTask);
- scheduler.dispatch(getCallTask);
- scheduler.dispatch(getCallTask);
- scheduler.dispatch(scanCallTask);
- scheduler.dispatch(scanCallTask);
- scheduler.dispatch(scanCallTask);
+ scheduler.dispatch(clientReadCallTask);
+ scheduler.dispatch(clientReadCallTask);
+ scheduler.dispatch(clientReadCallTask);
+ scheduler.dispatch(internalReadCallTask);
+ scheduler.dispatch(internalReadCallTask);
+ scheduler.dispatch(internalReadCallTask);
while (work.size() < 6) {
Thread.sleep(100);