Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,18 @@ public void run() {
}
}

static class RegionScannerContext {
final String scannerName;
final RegionScannerHolder holder;
final OperationQuota quota;

RegionScannerContext(String scannerName, RegionScannerHolder holder, OperationQuota quota) {
this.scannerName = scannerName;
this.holder = holder;
this.quota = quota;
}
}

/**
* Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together.
*/
Expand Down Expand Up @@ -1350,12 +1362,12 @@ public int getScannersCount() {

/** Returns The outstanding RegionScanner for <code>scannerId</code> or null if none found. */
RegionScanner getScanner(long scannerId) {
RegionScannerHolder rsh = getRegionScannerHolder(scannerId);
RegionScannerHolder rsh = checkQuotaAndGetRegionScannerContext(scannerId);
return rsh == null ? null : rsh.s;
}

/** Returns The associated RegionScannerHolder for <code>scannerId</code> or null. */
private RegionScannerHolder getRegionScannerHolder(long scannerId) {
private RegionScannerHolder checkQuotaAndGetRegionScannerContext(long scannerId) {
return scanners.get(toScannerName(scannerId));
}

Expand Down Expand Up @@ -1396,7 +1408,7 @@ public String getScanDetailsWithRequest(ScanRequest request) {
* Get the vtime associated with the scanner. Currently the vtime is the number of "next" calls.
*/
long getScannerVirtualTime(long scannerId) {
RegionScannerHolder rsh = getRegionScannerHolder(scannerId);
RegionScannerHolder rsh = checkQuotaAndGetRegionScannerContext(scannerId);
return rsh == null ? 0L : rsh.getNextCallSeq();
}

Expand Down Expand Up @@ -3185,9 +3197,8 @@ private RegionScannerHolder getRegionScanner(ScanRequest request) throws IOExcep
* @return Pair with scannerName key to use with this new Scanner and its RegionScannerHolder
* value.
*/
private Pair<String, RegionScannerHolder> newRegionScanner(ScanRequest request,
private Pair<String, RegionScannerHolder> newRegionScanner(ScanRequest request, HRegion region,
ScanResponse.Builder builder) throws IOException {
HRegion region = getRegion(request.getRegion());
ClientProtos.Scan protoScan = request.getScan();
boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand();
Scan scan = ProtobufUtil.toScan(protoScan);
Expand Down Expand Up @@ -3601,22 +3612,10 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque
}
requestCount.increment();
rpcScanRequestCount.increment();
RegionScannerHolder rsh;
RegionScannerContext rsx;
ScanResponse.Builder builder = ScanResponse.newBuilder();
String scannerName;
try {
if (request.hasScannerId()) {
// The downstream projects such as AsyncHBase in OpenTSDB need this value. See HBASE-18000
// for more details.
long scannerId = request.getScannerId();
builder.setScannerId(scannerId);
scannerName = toScannerName(scannerId);
rsh = getRegionScanner(request);
} else {
Pair<String, RegionScannerHolder> scannerNameAndRSH = newRegionScanner(request, builder);
scannerName = scannerNameAndRSH.getFirst();
rsh = scannerNameAndRSH.getSecond();
}
rsx = checkQuotaAndGetRegionScannerContext(request, builder);
} catch (IOException e) {
if (e == SCANNER_ALREADY_CLOSED) {
// Now we will close scanner automatically if there are no more results for this region but
Expand All @@ -3625,6 +3624,9 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque
}
throw new ServiceException(e);
}
String scannerName = rsx.scannerName;
RegionScannerHolder rsh = rsx.holder;
OperationQuota quota = rsx.quota;
if (rsh.fullRegionScan) {
rpcFullScanRequestCount.increment();
}
Expand All @@ -3647,14 +3649,6 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque
}
return builder.build();
}
OperationQuota quota;
try {
quota = getRpcQuotaManager().checkScanQuota(region, request, maxScannerResultSize,
rsh.getMaxBlockBytesScanned(), rsh.getPrevBlockBytesScannedDifference());
} catch (IOException e) {
addScannerLeaseBack(lease);
throw new ServiceException(e);
}
try {
checkScanNextCallSeq(request, rsh);
} catch (OutOfOrderScannerNextException e) {
Expand Down Expand Up @@ -4192,4 +4186,26 @@ private void setReloadableGuardrails(Configuration conf) {
maxScannerResultSize = conf.getLong(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY,
HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE);
}

RegionScannerContext checkQuotaAndGetRegionScannerContext(ScanRequest request,
ScanResponse.Builder builder) throws IOException {
if (request.hasScannerId()) {
// The downstream projects such as AsyncHBase in OpenTSDB need this value. See HBASE-18000
// for more details.
long scannerId = request.getScannerId();
builder.setScannerId(scannerId);
String scannerName = toScannerName(scannerId);
RegionScannerHolder rsh = getRegionScanner(request);
OperationQuota quota =
getRpcQuotaManager().checkScanQuota(rsh.r, request, maxScannerResultSize,
rsh.getMaxBlockBytesScanned(), rsh.getPrevBlockBytesScannedDifference());
return new RegionScannerContext(scannerName, rsh, quota);
}

HRegion region = getRegion(request.getRegion());
OperationQuota quota =
getRpcQuotaManager().checkScanQuota(region, request, maxScannerResultSize, 0L, 0L);
Pair<String, RegionScannerHolder> pair = newRegionScanner(request, region, builder);
return new RegionScannerContext(pair.getFirst(), pair.getSecond(), quota);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.quotas;

import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Result;

import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;

public class TestNoopOperationQuota implements OperationQuota {
public static final TestNoopOperationQuota INSTANCE = new TestNoopOperationQuota();

@Override
public void checkBatchQuota(int numWrites, int numReads) throws RpcThrottlingException {
}

@Override
public void checkScanQuota(ClientProtos.ScanRequest scanRequest, long maxScannerResultSize,
long maxBlockBytesScanned, long prevBlockBytesScannedDifference) throws RpcThrottlingException {

}

@Override
public void close() {

}

@Override
public void addGetResult(Result result) {

}

@Override
public void addScanResult(List<Result> results) {

}

@Override
public void addScanResultCells(List<Cell> cells) {

}

@Override
public void addMutation(Mutation mutation) {

}

@Override
public long getReadAvailable() {
return 0L;
}

@Override
public long getReadConsumed() {
return 0;
}
}
Loading