Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -157,39 +157,44 @@ public OperationQuota getQuota(final UserGroupInformation ugi, final TableName t
/**
* Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
* available quota and to report the data/usage of the operation.
* @param region the region where the operation will be performed
* @param type the operation type
* @param region the region where the operation will be performed
* @param type the operation type
* @param isAtomic whether the given operation is atomic
* @return the OperationQuota
* @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
*/
public OperationQuota checkQuota(final Region region, final OperationQuota.OperationType type)
throws IOException, RpcThrottlingException {
public OperationQuota checkQuota(final Region region, final OperationQuota.OperationType type,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking we should add another OperationType CHECK_AND_MUTATE. We can pass that in here instead of adding an isAtomic boolean. We can also add a util in QuotaUtil to return an OperationType for an Action + hasCondition boolean.

We can then use that same api in the other checkQuota below. Something like:

for (final ClientProtos.Action action : actions) {
  OperationType type = QuotaUtil.getOperationType(action, hasCondition);
  if (type.isRead()) {
    numReads++;
  }
  if (type.isWrite()) {
    numWrites++;
  }
}

The goal with this change is to DRY up the mutation type checking, and also to clarify a bit. Currently we are using the isAtomic boolean name which I'm realizing is not accurate. RowMutations is an example of an atomic operation which is not conditional (thus has no reads).

boolean isAtomic) throws IOException, RpcThrottlingException {
switch (type) {
case SCAN:
return checkQuota(region, 0, 0, 1);
case GET:
return checkQuota(region, 0, 1, 0);
case MUTATE:
return checkQuota(region, 1, 0, 0);
return checkQuota(region, 1, isAtomic ? 1 : 0, 0);
}
throw new RuntimeException("Invalid operation type: " + type);
}

/**
* Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
* available quota and to report the data/usage of the operation.
* @param region the region where the operation will be performed
* @param actions the "multi" actions to perform
* @param region the region where the operation will be performed
* @param actions the "multi" actions to perform
* @param isAtomic whether the given operation is atomic
* @return the OperationQuota
* @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
*/
public OperationQuota checkQuota(final Region region, final List<ClientProtos.Action> actions)
throws IOException, RpcThrottlingException {
public OperationQuota checkQuota(final Region region, final List<ClientProtos.Action> actions,
boolean isAtomic) throws IOException, RpcThrottlingException {
int numWrites = 0;
int numReads = 0;
for (final ClientProtos.Action action : actions) {
if (action.hasMutation()) {
numWrites++;
if (isAtomic) {
numReads++;
}
} else if (action.hasGet()) {
numReads++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2462,7 +2462,7 @@ public GetResponse get(final RpcController controller, final GetRequest request)
}
Boolean existence = null;
Result r = null;
quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.GET);
quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.GET, false);

Get clientGet = ProtobufUtil.toGet(get);
if (get.getExistenceOnly() && region.getCoprocessorHost() != null) {
Expand Down Expand Up @@ -2679,7 +2679,8 @@ public MultiResponse multi(final RpcController rpcc, final MultiRequest request)

try {
region = getRegion(regionSpecifier);
quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList());
quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList(),
regionAction.getAtomic());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

per my other comment about RowMutations I think we actually want to change this and the other usage of getAtomic to hasCondition. I'm realizing that getAtomic seems specific to RowMutations, which may or may not be conditional depending of if the RowMutations is called via mutateRow, batch, or checkAndMutate (only the last one is conditional).

With this, we'll need to update your RowMutations test to ensure that it doesnt count against read capacity unless its a checkAndMutate

} catch (IOException e) {
failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e);
return responseBuilder.build();
Expand Down Expand Up @@ -2741,7 +2742,8 @@ public MultiResponse multi(final RpcController rpcc, final MultiRequest request)

try {
region = getRegion(regionSpecifier);
quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList());
quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList(),
regionAction.getAtomic());
} catch (IOException e) {
failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e);
continue; // For this region it's a failure.
Expand Down Expand Up @@ -2924,7 +2926,11 @@ public MutateResponse mutate(final RpcController rpcc, final MutateRequest reque
server.getMemStoreFlusher().reclaimMemStoreMemory();
}
long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.MUTATE);
MutationType mutationType = mutation.getMutateType();
boolean isAtomic = request.hasCondition() || mutationType == MutationType.INCREMENT
|| mutationType == MutationType.APPEND;
quota =
getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.MUTATE, isAtomic);
ActivePolicyEnforcement spaceQuotaEnforcement =
getSpaceQuotaManager().getActiveEnforcements();

Expand Down Expand Up @@ -3583,7 +3589,7 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque
}
OperationQuota quota;
try {
quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN);
quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN, false);
} catch (IOException e) {
addScannerLeaseBack(lease);
throw new ServiceException(e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.quotas;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({ RegionServerTests.class, MediumTests.class })
public class TestAtomicReadQuota {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestAtomicReadQuota.class);
private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
private static final TableName TABLE_NAME = TableName.valueOf(UUID.randomUUID().toString());
private static final byte[] FAMILY = Bytes.toBytes("cf");
private static final byte[] QUALIFIER = Bytes.toBytes("q");

@After
public void tearDown() throws Exception {
ThrottleQuotaTestUtil.clearQuotaCache(TEST_UTIL);
EnvironmentEdgeManager.reset();
TEST_UTIL.deleteTable(TABLE_NAME);
TEST_UTIL.shutdownMiniCluster();
}

@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, 1000);
TEST_UTIL.startMiniCluster(1);
TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME);
TEST_UTIL.createTable(TABLE_NAME, FAMILY);
TEST_UTIL.waitTableAvailable(TABLE_NAME);
QuotaCache.TEST_FORCE_REFRESH = true;
}

@Test
public void testAtomicOpCountedAgainstReadCapacity() throws Exception {
ThrottleQuotaTestUtil.triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME);
try (Admin admin = TEST_UTIL.getAdmin()) {
admin.setQuota(QuotaSettingsFactory.throttleUser(User.getCurrent().getShortName(),
ThrottleType.READ_NUMBER, 1, TimeUnit.MINUTES));
}
ThrottleQuotaTestUtil.triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);

Increment inc = new Increment(Bytes.toBytes("doot"));
inc.addColumn(FAMILY, QUALIFIER, 1);
try (Table table = getTable()) {
// we have a read quota configured, so this should fail
TEST_UTIL.waitFor(60_000, () -> {
try {
table.increment(inc);
return false;
} catch (Exception e) {
return e.getCause() instanceof RpcThrottlingException;
}
});
}
}

private Table getTable() throws IOException {
TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 100);
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
return TEST_UTIL.getConnection().getTableBuilder(TABLE_NAME, null).setOperationTimeout(250)
.build();
}

}