diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 47e74eeb43b8..06abad1a4966 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -473,6 +473,18 @@ public void run() { } } + static class RegionScannerContext { + final String scannerName; + final RegionScannerHolder holder; + final OperationQuota quota; + + RegionScannerContext(String scannerName, RegionScannerHolder holder, OperationQuota quota) { + this.scannerName = scannerName; + this.holder = holder; + this.quota = quota; + } + } + /** * Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together. */ @@ -1350,12 +1362,12 @@ public int getScannersCount() { /** Returns The outstanding RegionScanner for scannerId or null if none found. */ RegionScanner getScanner(long scannerId) { - RegionScannerHolder rsh = getRegionScannerHolder(scannerId); + RegionScannerHolder rsh = checkQuotaAndGetRegionScannerContext(scannerId); return rsh == null ? null : rsh.s; } /** Returns The associated RegionScannerHolder for scannerId or null. */ - private RegionScannerHolder getRegionScannerHolder(long scannerId) { + private RegionScannerHolder checkQuotaAndGetRegionScannerContext(long scannerId) { return scanners.get(toScannerName(scannerId)); } @@ -1396,7 +1408,7 @@ public String getScanDetailsWithRequest(ScanRequest request) { * Get the vtime associated with the scanner. Currently the vtime is the number of "next" calls. */ long getScannerVirtualTime(long scannerId) { - RegionScannerHolder rsh = getRegionScannerHolder(scannerId); + RegionScannerHolder rsh = checkQuotaAndGetRegionScannerContext(scannerId); return rsh == null ? 0L : rsh.getNextCallSeq(); } @@ -3185,9 +3197,8 @@ private RegionScannerHolder getRegionScanner(ScanRequest request) throws IOExcep * @return Pair with scannerName key to use with this new Scanner and its RegionScannerHolder * value. */ - private Pair newRegionScanner(ScanRequest request, + private Pair newRegionScanner(ScanRequest request, HRegion region, ScanResponse.Builder builder) throws IOException { - HRegion region = getRegion(request.getRegion()); ClientProtos.Scan protoScan = request.getScan(); boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand(); Scan scan = ProtobufUtil.toScan(protoScan); @@ -3601,22 +3612,10 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque } requestCount.increment(); rpcScanRequestCount.increment(); - RegionScannerHolder rsh; + RegionScannerContext rsx; ScanResponse.Builder builder = ScanResponse.newBuilder(); - String scannerName; try { - if (request.hasScannerId()) { - // The downstream projects such as AsyncHBase in OpenTSDB need this value. See HBASE-18000 - // for more details. - long scannerId = request.getScannerId(); - builder.setScannerId(scannerId); - scannerName = toScannerName(scannerId); - rsh = getRegionScanner(request); - } else { - Pair scannerNameAndRSH = newRegionScanner(request, builder); - scannerName = scannerNameAndRSH.getFirst(); - rsh = scannerNameAndRSH.getSecond(); - } + rsx = checkQuotaAndGetRegionScannerContext(request, builder); } catch (IOException e) { if (e == SCANNER_ALREADY_CLOSED) { // Now we will close scanner automatically if there are no more results for this region but @@ -3625,6 +3624,9 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque } throw new ServiceException(e); } + String scannerName = rsx.scannerName; + RegionScannerHolder rsh = rsx.holder; + OperationQuota quota = rsx.quota; if (rsh.fullRegionScan) { rpcFullScanRequestCount.increment(); } @@ -3647,14 +3649,6 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque } return builder.build(); } - OperationQuota quota; - try { - quota = getRpcQuotaManager().checkScanQuota(region, request, maxScannerResultSize, - rsh.getMaxBlockBytesScanned(), rsh.getPrevBlockBytesScannedDifference()); - } catch (IOException e) { - addScannerLeaseBack(lease); - throw new ServiceException(e); - } try { checkScanNextCallSeq(request, rsh); } catch (OutOfOrderScannerNextException e) { @@ -4192,4 +4186,26 @@ private void setReloadableGuardrails(Configuration conf) { maxScannerResultSize = conf.getLong(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY, HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE); } + + RegionScannerContext checkQuotaAndGetRegionScannerContext(ScanRequest request, + ScanResponse.Builder builder) throws IOException { + if (request.hasScannerId()) { + // The downstream projects such as AsyncHBase in OpenTSDB need this value. See HBASE-18000 + // for more details. + long scannerId = request.getScannerId(); + builder.setScannerId(scannerId); + String scannerName = toScannerName(scannerId); + RegionScannerHolder rsh = getRegionScanner(request); + OperationQuota quota = + getRpcQuotaManager().checkScanQuota(rsh.r, request, maxScannerResultSize, + rsh.getMaxBlockBytesScanned(), rsh.getPrevBlockBytesScannedDifference()); + return new RegionScannerContext(scannerName, rsh, quota); + } + + HRegion region = getRegion(request.getRegion()); + OperationQuota quota = + getRpcQuotaManager().checkScanQuota(region, request, maxScannerResultSize, 0L, 0L); + Pair pair = newRegionScanner(request, region, builder); + return new RegionScannerContext(pair.getFirst(), pair.getSecond(), quota); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestNoopOperationQuota.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestNoopOperationQuota.java new file mode 100644 index 000000000000..ad2b79075a31 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestNoopOperationQuota.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import java.util.List; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Result; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; + +public class TestNoopOperationQuota implements OperationQuota { + public static final TestNoopOperationQuota INSTANCE = new TestNoopOperationQuota(); + + @Override + public void checkBatchQuota(int numWrites, int numReads) throws RpcThrottlingException { + } + + @Override + public void checkScanQuota(ClientProtos.ScanRequest scanRequest, long maxScannerResultSize, + long maxBlockBytesScanned, long prevBlockBytesScannedDifference) throws RpcThrottlingException { + + } + + @Override + public void close() { + + } + + @Override + public void addGetResult(Result result) { + + } + + @Override + public void addScanResult(List results) { + + } + + @Override + public void addScanResultCells(List cells) { + + } + + @Override + public void addMutation(Mutation mutation) { + + } + + @Override + public long getReadAvailable() { + return 0L; + } + + @Override + public long getReadConsumed() { + return 0; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerLeaseCount.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerLeaseCount.java new file mode 100644 index 000000000000..cf99c53e1d9f --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerLeaseCount.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.StartMiniClusterOption; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.RetriesExhaustedException; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.quotas.OperationQuota; +import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager; +import org.apache.hadoop.hbase.quotas.RpcThrottlingException; +import org.apache.hadoop.hbase.quotas.TestNoopOperationQuota; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; + +@Category({ MediumTests.class, ClientTests.class }) +public class TestScannerLeaseCount { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestScannerLeaseCount.class); + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + private static final TableName TABLE_NAME = TableName.valueOf("ScannerLeaseCount"); + private static final byte[] FAM = Bytes.toBytes("Fam"); + private static final String SCAN_IDENTIFIER_NAME = "_scan_id_"; + private static final byte[] SCAN_IDENTIFIER = Bytes.toBytes("_scan_id_"); + private static final Scan SCAN = new Scan().setAttribute(SCAN_IDENTIFIER_NAME, SCAN_IDENTIFIER); + + private static volatile boolean SHOULD_THROW = false; + private static final AtomicBoolean EXCEPTION_THROWN = new AtomicBoolean(false); + private static final AtomicBoolean SCAN_SEEN = new AtomicBoolean(false); + + private static Connection CONN; + private static Table TABLE; + + @BeforeClass + public static void setUp() throws Exception { + StartMiniClusterOption option = + StartMiniClusterOption.builder().rsClass(MockedQuotaManagerRegionServer.class).build(); + UTIL.startMiniCluster(option); + UTIL.getAdmin().createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAM)).build()); + Configuration conf = new Configuration(UTIL.getConfiguration()); + // Otherwise, we will spin for a while when waiting on the RpcThrottleException + conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); + CONN = ConnectionFactory.createConnection(conf); + TABLE = CONN.getTable(TABLE_NAME); + UTIL.loadTable(TABLE, FAM); + } + + @AfterClass + public static void tearDown() throws Exception { + try { + TABLE.close(); + } catch (Exception ignore) { + } + try { + CONN.close(); + } catch (Exception ignore) { + } + UTIL.shutdownMiniCluster(); + } + + @Before + public void before() { + SHOULD_THROW = false; + SCAN_SEEN.set(false); + EXCEPTION_THROWN.set(false); + } + + @Test + public void itIncreasesScannerCount() throws Exception { + try (ResultScanner scanner = TABLE.getScanner(SCAN)) { + scanner.next(); + // We need to wait until the scan and lease are created server-side. + // Otherwise, our scanner counting will not reflect the new scan that was created + UTIL.waitFor(1000, () -> SCAN_SEEN.get() && !EXCEPTION_THROWN.get()); + } + } + + @Test + public void itDoesNotIncreaseScannerLeaseCount() throws Exception { + SHOULD_THROW = true; + try (ResultScanner scanner = TABLE.getScanner(SCAN)) { + Exception e = Assert.assertThrows(RetriesExhaustedException.class, scanner::next); + Throwable[] throwables = ExceptionUtils.getThrowables(e); + + boolean foundThrottleException = false; + for (Throwable throwable : throwables) { + if (throwable instanceof RpcThrottlingException) { + foundThrottleException = true; + } + } + + Assert.assertTrue(foundThrottleException); + + // We need to wait until the scan and lease are created server-side. + // Otherwise, our scanner counting will not reflect the new scan that was created + UTIL.waitFor(1000, () -> !SCAN_SEEN.get() && EXCEPTION_THROWN.get()); + } + } + + public static final class MockedQuotaManagerRegionServer + extends MiniHBaseCluster.MiniHBaseClusterRegionServer { + private final MockedRpcQuotaManager rpcQuotaManager; + + public MockedQuotaManagerRegionServer(Configuration conf) + throws IOException, InterruptedException { + super(conf); + this.rpcQuotaManager = new MockedRpcQuotaManager(this); + } + + @Override + public RegionServerRpcQuotaManager getRegionServerRpcQuotaManager() { + return rpcQuotaManager; + } + + @Override + protected RSRpcServices createRpcServices() throws IOException { + return new ScannerTrackingRSRpcServicesForTest(this); + } + } + + private static class MockedRpcQuotaManager extends RegionServerRpcQuotaManager { + private static final RpcThrottlingException EX = new RpcThrottlingException("test_ex"); + + public MockedRpcQuotaManager(RegionServerServices rsServices) { + super(rsServices); + } + + @Override + public OperationQuota checkScanQuota(Region region, ClientProtos.ScanRequest scanRequest, + long maxScannerResultSize, long maxBlockBytesScanned, long prevBlockBytesScannedDifference) + throws IOException, RpcThrottlingException { + if (isTestScan(scanRequest) && SHOULD_THROW) { + EXCEPTION_THROWN.set(true); + throw EX; + } + return TestNoopOperationQuota.INSTANCE; + } + + @Override + public OperationQuota checkBatchQuota(Region region, OperationQuota.OperationType type) + throws IOException, RpcThrottlingException { + if (SHOULD_THROW) { + throw EX; + } + return TestNoopOperationQuota.INSTANCE; + } + + @Override + public OperationQuota checkBatchQuota(Region region, List actions, + boolean hasCondition) throws IOException, RpcThrottlingException { + if (SHOULD_THROW) { + throw EX; + } + return TestNoopOperationQuota.INSTANCE; + } + + @Override + public OperationQuota checkBatchQuota(Region region, int numWrites, int numReads) + throws IOException, RpcThrottlingException { + if (SHOULD_THROW) { + throw EX; + } + return TestNoopOperationQuota.INSTANCE; + } + } + + private static class ScannerTrackingRSRpcServicesForTest extends RSRpcServices { + public ScannerTrackingRSRpcServicesForTest(HRegionServer rs) throws IOException { + super(rs); + } + + @Override + RegionScannerContext checkQuotaAndGetRegionScannerContext(ClientProtos.ScanRequest request, + ClientProtos.ScanResponse.Builder builder) throws IOException { + RegionScannerContext rsx = super.checkQuotaAndGetRegionScannerContext(request, builder); + if (isTestScan(request)) { + SCAN_SEEN.set(true); + } + return rsx; + } + } + + private static boolean isTestScan(ClientProtos.ScanRequest request) { + ClientProtos.Scan scan = request.getScan(); + return scan.getAttributeList().stream() + .anyMatch(nbp -> nbp.getName().equals(SCAN_IDENTIFIER_NAME) + && Bytes.equals(nbp.getValue().toByteArray(), SCAN_IDENTIFIER)); + } +}