Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,17 @@
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;

import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;

@InterfaceAudience.Private
@InterfaceStability.Evolving
public class DefaultOperationQuota implements OperationQuota {

// a single scan estimate can consume no more than this proportion of the limiter's limit
// this prevents a long-running scan from being estimated at, say, 100MB of IO against
// a <100MB/IO throttle (because this would never succeed)
private static final double MAX_SCAN_ESTIMATE_PROPORTIONAL_LIMIT_CONSUMPTION = 0.9;

protected final List<QuotaLimiter> limiters;
private final long writeCapacityUnit;
private final long readCapacityUnit;
Expand All @@ -53,13 +60,17 @@ public class DefaultOperationQuota implements OperationQuota {
protected long readCapacityUnitDiff = 0;
private boolean useResultSizeBytes;
private long blockSizeBytes;
private long maxScanEstimate;

public DefaultOperationQuota(final Configuration conf, final int blockSizeBytes,
final QuotaLimiter... limiters) {
this(conf, Arrays.asList(limiters));
this.useResultSizeBytes =
conf.getBoolean(OperationQuota.USE_RESULT_SIZE_BYTES, USE_RESULT_SIZE_BYTES_DEFAULT);
this.blockSizeBytes = blockSizeBytes;
long readSizeLimit =
Arrays.stream(limiters).mapToLong(QuotaLimiter::getReadLimit).min().orElse(Long.MAX_VALUE);
maxScanEstimate = Math.round(MAX_SCAN_ESTIMATE_PROPORTIONAL_LIMIT_CONSUMPTION * readSizeLimit);
}

/**
Expand All @@ -80,21 +91,45 @@ public DefaultOperationQuota(final Configuration conf, final List<QuotaLimiter>
}

@Override
public void checkQuota(int numWrites, int numReads, int numScans) throws RpcThrottlingException {
updateEstimateConsumeQuota(numWrites, numReads, numScans);
public void checkQuota(int numWrites, int numReads) throws RpcThrottlingException {
updateEstimateConsumeBatchQuota(numWrites, numReads);

readAvailable = Long.MAX_VALUE;
for (final QuotaLimiter limiter : limiters) {
if (limiter.isBypass()) continue;
if (limiter.isBypass()) {
continue;
}

limiter.checkQuota(numWrites, writeConsumed, numReads + numScans, readConsumed,
limiter.checkQuota(numWrites, writeConsumed, numReads, readConsumed,
writeCapacityUnitConsumed, readCapacityUnitConsumed);
readAvailable = Math.min(readAvailable, limiter.getReadAvailable());
}

for (final QuotaLimiter limiter : limiters) {
limiter.grabQuota(numWrites, writeConsumed, numReads + numScans, readConsumed,
writeCapacityUnitConsumed, readCapacityUnitConsumed);
limiter.grabQuota(numWrites, writeConsumed, numReads, readConsumed, writeCapacityUnitConsumed,
readCapacityUnitConsumed);
}
}

@Override
public void checkScanQuota(ClientProtos.ScanRequest scanRequest, long maxScannerResultSize,
long maxBlockBytesScanned) throws RpcThrottlingException {
updateEstimateConsumeScanQuota(scanRequest, maxScannerResultSize, maxBlockBytesScanned);

readAvailable = Long.MAX_VALUE;
Comment thread
bbeaudreault marked this conversation as resolved.
for (final QuotaLimiter limiter : limiters) {
if (limiter.isBypass()) {
continue;
}

limiter.checkQuota(0, writeConsumed, 1, readConsumed, writeCapacityUnitConsumed,
readCapacityUnitConsumed);
readAvailable = Math.min(readAvailable, limiter.getReadAvailable());
}

for (final QuotaLimiter limiter : limiters) {
limiter.grabQuota(0, writeConsumed, 1, readConsumed, writeCapacityUnitConsumed,
readCapacityUnitConsumed);
}
}

Expand Down Expand Up @@ -158,24 +193,60 @@ public void addMutation(final Mutation mutation) {
* Update estimate quota(read/write size/capacityUnits) which will be consumed
* @param numWrites the number of write requests
* @param numReads the number of read requests
* @param numScans the number of scan requests
*/
protected void updateEstimateConsumeQuota(int numWrites, int numReads, int numScans) {
protected void updateEstimateConsumeBatchQuota(int numWrites, int numReads) {
writeConsumed = estimateConsume(OperationType.MUTATE, numWrites, 100);

if (useResultSizeBytes) {
readConsumed = estimateConsume(OperationType.GET, numReads, 100);
readConsumed += estimateConsume(OperationType.SCAN, numScans, 1000);
} else {
// assume 1 block required for reads. this is probably a low estimate, which is okay
readConsumed = numReads > 0 ? blockSizeBytes : 0;
readConsumed += numScans > 0 ? blockSizeBytes : 0;
}

writeCapacityUnitConsumed = calculateWriteCapacityUnit(writeConsumed);
readCapacityUnitConsumed = calculateReadCapacityUnit(readConsumed);
}

/**
* Update estimate quota(read/write size/capacityUnits) which will be consumed
* @param scanRequest the scan to be executed
* @param maxScannerResultSize the maximum bytes to be returned by the scanner
* @param maxBlockBytesScanned the maximum bytes scanned in a single RPC call by the scanner
*/
protected void updateEstimateConsumeScanQuota(ClientProtos.ScanRequest scanRequest,
long maxScannerResultSize, long maxBlockBytesScanned) {
if (useResultSizeBytes) {
readConsumed = estimateConsume(OperationType.GET, 1, 1000);
} else {
/*
* Estimating scan workload is more complicated, and if we severely underestimate workloads
* then throttled clients will exhaust retries too quickly, and could saturate the RPC layer.
* We have access to the ScanRequest's nextCallSeq number, the maxScannerResultSize, and the
* maxBlockBytesScanned by every relevant Scanner#next call. With these inputs we can make a
* more informed estimate about the scan's workload.
*/
long estimate;
if (scanRequest.getNextCallSeq() == 0) {
// start scanners with an optimistic 1 block IO estimate
// it is better to underestimate a large scan in the beginning
// than to overestimate, and block, a small scan
estimate = blockSizeBytes;
} else {
// scanner result sizes will be limited by quota availability, regardless of
// maxScannerResultSize. This means that we cannot safely assume that a long-running
// scan with a small maxBlockBytesScanned would not prefer to pull down
// a larger payload. So we should estimate with the assumption that long-running scans
// are appropriately configured to approach their maxScannerResultSize per RPC call
estimate =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like we should only double if the scan reached the previous limit.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind elaborating a bit? I don't know exactly what you mean

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So every time a next() comes in, we double the previous BBS as the new limit. So let's say someone is doing a long scan with a caching of 1. So each time it's doing only 1 block of IO, but we are still doubling the new estimate. So it becomes harder to get a quota if we are always estimating 2x more than really fetching. The doubling is good for reaching a plateau quickly, but then it should stop.

For the 1 block/1 caching case, it might only be 64k vs 128kb estimate. Not a huge deal. But lets say we have a can that's doing 25mb per next(). To each time set the estimate to 50mb, that's quite a difference. Ideally once the scan itself plateaus in BBS, that should be the limit going forward. So let's say for the 25mb example, it does 64, 129, 256, etc up to 25. The next request the estimate is 50, but the resulting bbs is only 25. Ideally after that we just use 25 as the estimate going forward

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So every time a next() comes in, we double the previous BBS as the new limit

Is this your proposal for what we should do, or your read on what the current estimation logic will do? If we're worried about overestimation, then I think this logic is actually worse than what you've described because we don't double the previous BBS — rather we multiply the max BBS by the next call sequence. So for 1 block/1 caching case, we'd quickly estimate at a, likely much higher, Math.min(maxScanerResultSize, maxScanEstimate). This is more of a feature than a bug though, because I think we'd prefer to overestimate against a saturated quota. For example, if we only manage to scan a few kb per next call because we're riding against the quota limit, but really the scan would prefer to pull down 100MB at a time, then we'll continue to use tiny estimations without the aforementioned estimate progression.

So, rather than risk falling into a cycle of quota saturation -> long running scans with tiny estimates -> tons of RPC calls -> quota saturation ..., I think we'd rather make the assumption describe in the comment here:

So we should estimate with the assumption that long-running scans
are appropriately configured to approach their maxScannerResultSize per RPC call

at the expense of potentially overestimating some scans. We also mitigate the risk of overestimation with the introduction of maxScanEstimate

Copy link
Copy Markdown
Contributor

@bbeaudreault bbeaudreault Mar 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was describing what I thought the current implementation does. It seems like every new scan will increase the estimate u til we hit the server configured max. But I'd like to stop increasing when we stop exceeding the old limit. Imagine the following 4 next calls:

Estimate = 64k; Scanned = 64k
Estimate = 128k; Scanned = 128k
Estimate = 256k; Scanned = 128k
Estimate = 384k; Scanned = 128k

So the actually scanned amount had leveled out, but the estimate keeps increasing. Ideally it'd be like this instead:

Estimate = 64k; Scanned = 64k
Estimate = 128k; Scanned = 128k
Estimate = 256k; Scanned = 128k
Estimate = 128k; Scanned = 128k <- we didn't hit our last estimate, so stop over estimating.

Or something else that doesn't keep increasing. Of course the impact here is not huge yet, but for larger values it seems to matter a bit more. Unless I'm missing something

Math.min(maxScannerResultSize, scanRequest.getNextCallSeq() * maxBlockBytesScanned);
Comment thread
rmdmattingly marked this conversation as resolved.
Outdated
}
readConsumed = Math.min(maxScanEstimate, estimate);
}

readCapacityUnitConsumed = calculateReadCapacityUnit(readConsumed);
}

private long estimateConsume(final OperationType type, int numReqs, long avgSize) {
if (numReqs > 0) {
return avgSize * numReqs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;

/*
* Internal class used to check and consume quota if exceed throttle quota is enabled. Exceed
* throttle quota means, user can over consume user/namespace/table quota if region server has
Expand All @@ -47,45 +49,86 @@ public ExceedOperationQuota(final Configuration conf, int blockSizeBytes,
}

@Override
public void checkQuota(int numWrites, int numReads, int numScans) throws RpcThrottlingException {
public void checkQuota(int numWrites, int numReads) throws RpcThrottlingException {
if (regionServerLimiter.isBypass()) {
// If region server limiter is bypass, which means no region server quota is set, check and
// throttle by all other quotas. In this condition, exceed throttle quota will not work.
LOG.warn("Exceed throttle quota is enabled but no region server quotas found");
super.checkQuota(numWrites, numReads, numScans);
super.checkQuota(numWrites, numReads);
} else {
// 1. Update estimate quota which will be consumed
updateEstimateConsumeQuota(numWrites, numReads, numScans);
updateEstimateConsumeBatchQuota(numWrites, numReads);
// 2. Check if region server limiter is enough. If not, throw RpcThrottlingException.
regionServerLimiter.checkQuota(numWrites, writeConsumed, numReads + numScans, readConsumed,
regionServerLimiter.checkQuota(numWrites, writeConsumed, numReads, readConsumed,
writeCapacityUnitConsumed, readCapacityUnitConsumed);
// 3. Check if other limiters are enough. If not, exceed other limiters because region server
// limiter is enough.
boolean exceed = false;
try {
super.checkQuota(numWrites, numReads, numScans);
super.checkQuota(numWrites, numReads);
} catch (RpcThrottlingException e) {
exceed = true;
if (LOG.isDebugEnabled()) {
LOG.debug("Read/Write requests num exceeds quota: writes:{} reads:{} scan:{}, "
+ "try use region server quota", numWrites, numReads, numScans);
LOG.debug("Read/Write requests num exceeds quota: writes:{} reads:{} scans:0, "
+ "try use region server quota", numWrites, numReads);
}
}
// 4. Region server limiter is enough and grab estimated consume quota.
readAvailable = Math.max(readAvailable, regionServerLimiter.getReadAvailable());
regionServerLimiter.grabQuota(numWrites, writeConsumed, numReads + numScans, readConsumed,
regionServerLimiter.grabQuota(numWrites, writeConsumed, numReads, readConsumed,
writeCapacityUnitConsumed, writeCapacityUnitConsumed);
if (exceed) {
// 5. Other quota limiter is exceeded and has not been grabbed (because throw
// RpcThrottlingException in Step 3), so grab it.
for (final QuotaLimiter limiter : limiters) {
limiter.grabQuota(numWrites, writeConsumed, numReads + numScans, readConsumed,
limiter.grabQuota(numWrites, writeConsumed, numReads, readConsumed,
writeCapacityUnitConsumed, writeCapacityUnitConsumed);
}
}
}
}

@Override
public void checkScanQuota(ClientProtos.ScanRequest scanRequest, long maxScannerResultSize,
long maxBlockBytesScanned) throws RpcThrottlingException {
if (regionServerLimiter.isBypass()) {
// If region server limiter is bypass, which means no region server quota is set, check and
// throttle by all other quotas. In this condition, exceed throttle quota will not work.
LOG.warn("Exceed throttle quota is enabled but no region server quotas found");
super.checkScanQuota(scanRequest, maxScannerResultSize, maxBlockBytesScanned);
} else {
// 1. Update estimate quota which will be consumed
updateEstimateConsumeScanQuota(scanRequest, maxScannerResultSize, maxBlockBytesScanned);
// 2. Check if region server limiter is enough. If not, throw RpcThrottlingException.
regionServerLimiter.checkQuota(0, writeConsumed, 1, readConsumed, writeCapacityUnitConsumed,
readCapacityUnitConsumed);
// 3. Check if other limiters are enough. If not, exceed other limiters because region server
// limiter is enough.
boolean exceed = false;
try {
super.checkScanQuota(scanRequest, maxScannerResultSize, maxBlockBytesScanned);
} catch (RpcThrottlingException e) {
exceed = true;
if (LOG.isDebugEnabled()) {
LOG.debug("Read/Write requests num exceeds quota: writes:0 reads:0, scans:1, "
+ "try use region server quota");
}
}
// 4. Region server limiter is enough and grab estimated consume quota.
readAvailable = Math.max(readAvailable, regionServerLimiter.getReadAvailable());
regionServerLimiter.grabQuota(0, writeConsumed, 1, readConsumed, writeCapacityUnitConsumed,
writeCapacityUnitConsumed);
if (exceed) {
// 5. Other quota limiter is exceeded and has not been grabbed (because throw
// RpcThrottlingException in Step 3), so grab it.
for (final QuotaLimiter limiter : limiters) {
limiter.grabQuota(0, writeConsumed, 1, readConsumed, writeCapacityUnitConsumed,
writeCapacityUnitConsumed);
}
}
}
}

@Override
public void close() {
super.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;

import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;

/**
* Noop operation quota returned when no quota is associated to the user/table
*/
Expand All @@ -40,7 +42,13 @@ public static OperationQuota get() {
}

@Override
public void checkQuota(int numWrites, int numReads, int numScans) throws RpcThrottlingException {
public void checkQuota(int numWrites, int numReads) throws RpcThrottlingException {
// no-op
}

@Override
public void checkScanQuota(ClientProtos.ScanRequest scanRequest, long maxScannerResultSize,
long maxBlockBytesScanned) throws RpcThrottlingException {
// no-op
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ public long getReadAvailable() {
throw new UnsupportedOperationException();
}

@Override
public long getReadLimit() {
return Long.MAX_VALUE;
}

@Override
public String toString() {
return "NoopQuotaLimiter";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;

import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;

/**
* Interface that allows to check the quota available for an operation.
*/
Expand Down Expand Up @@ -51,11 +53,22 @@ public enum OperationType {
* on the number of operations to perform and the average size accumulated during time.
* @param numWrites number of write operation that will be performed
* @param numReads number of small-read operation that will be performed
* @param numScans number of long-read operation that will be performed
* @throws RpcThrottlingException if the operation cannot be performed because RPC quota is
* exceeded.
*/
void checkQuota(int numWrites, int numReads, int numScans) throws RpcThrottlingException;
void checkQuota(int numWrites, int numReads) throws RpcThrottlingException;

/**
* Checks if it is possible to execute the scan. The quota will be estimated based on the
* composition of the scan.
* @param scanRequest the given scan operation
* @param maxScannerResultSize the maximum bytes to be returned by the scanner
* @param maxBlockBytesScanned the maximum bytes scanned in a single RPC call by the scanner
* @throws RpcThrottlingException if the operation cannot be performed because RPC quota is
* exceeded.
*/
void checkScanQuota(ClientProtos.ScanRequest scanRequest, long maxScannerResultSize,
long maxBlockBytesScanned) throws RpcThrottlingException;

/** Cleanup method on operation completion */
void close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize,
/** Returns the number of bytes available to read to avoid exceeding the quota */
long getReadAvailable();

/** Returns the maximum number of bytes ever available to read */
long getReadLimit();

/** Returns the number of bytes available to write to avoid exceeding the quota */
long getWriteAvailable();
}
Loading