Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,11 @@ public void close() {
public void shipped() throws IOException {
this.delegate.shipped();
}

@Override
public int getCurrentBlockSizeOnce() {
return this.delegate.getCurrentBlockSizeOnce();
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,9 @@ protected static class HFileScannerImpl implements HFileScanner {
// RegionScannerImpl#handleException). Call the releaseIfNotCurBlock() to release the
// unreferenced block please.
protected HFileBlock curBlock;
// Whether we returned a result for curBlock's size in getCurrentBlockSizeOnce().
// gets reset whenever curBlock is changed.
private boolean providedCurrentBlockSize = false;
// Previous blocks that were used in the course of the read
protected final ArrayList<HFileBlock> prevBlocks = new ArrayList<>();

Expand All @@ -355,6 +358,7 @@ void updateCurrBlockRef(HFileBlock block) {
prevBlocks.add(this.curBlock);
}
this.curBlock = block;
this.providedCurrentBlockSize = false;
}

void reset() {
Expand Down Expand Up @@ -415,6 +419,15 @@ public void close() {
this.returnBlocks(true);
}

@Override
public int getCurrentBlockSizeOnce() {
if (providedCurrentBlockSize || curBlock == null) {
return 0;
}
providedCurrentBlockSize = true;
return curBlock.getUncompressedSizeWithoutHeader();
}

// Returns the #bytes in HFile for the current cell. Used to skip these many bytes in current
// HFile block's buffer so as to position to the next cell.
private int getCurCellSerializedSize() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,4 +140,10 @@ public interface HFileScanner extends Shipper, Closeable {
*/
@Override
void close();

/**
* Returns the block size in bytes for the current block. Will only return a value once per block,
* otherwise 0. Used for calculating block IO in ScannerContext.
*/
int getCurrentBlockSizeOnce();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This API design is a bit strange... Let me take a look on the usage...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is better to introduce a method called recordBlockSize? The comment could say that the implementation should make sure that for every block we only record once.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems intuitive and reasonable. Done. Please see latest commit.

}
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ boolean isLatestCellFromMemstore() {
return !this.current.isFileScanner();
}

@Override
public int getCurrentBlockSizeOnce() {
return this.current.getCurrentBlockSizeOnce();
}

@Override
public Cell next() throws IOException {
if (this.current == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ default long getScannerOrder() {
/** Returns true if this is a file scanner. Otherwise a memory scanner is assumed. */
boolean isFileScanner();

/**
* Returns the block size in bytes for the current block. Will only return a value once per block,
* otherwise 0. Used for calculating block IO in ScannerContext.
*/
int getCurrentBlockSizeOnce();

/**
* @return the file path if this is a file scanner, otherwise null.
* @see #isFileScanner()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ public boolean isFileScanner() {
return false;
}

@Override
public int getCurrentBlockSizeOnce() {
// No block size by default.
return 0;
}

@Override
public Path getFilePath() {
// Not a file by default.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -3282,8 +3281,7 @@ private void checkLimitOfRows(int numOfCompleteRows, int limitOfRows, boolean mo
// return whether we have more results in region.
private void scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh,
long maxQuotaResultSize, int maxResults, int limitOfRows, List<Result> results,
ScanResponse.Builder builder, MutableObject<Object> lastBlock, RpcCall rpcCall)
throws IOException {
ScanResponse.Builder builder, RpcCall rpcCall) throws IOException {
HRegion region = rsh.r;
RegionScanner scanner = rsh.s;
long maxResultSize;
Expand Down Expand Up @@ -3343,7 +3341,9 @@ private void scan(HBaseRpcController controller, ScanRequest request, RegionScan
ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true);
// maxResultSize - either we can reach this much size for all cells(being read) data or sum
// of heap size occupied by cells(being read). Cell data means its key and value parts.
contextBuilder.setSizeLimit(sizeScope, maxResultSize, maxResultSize);
// maxQuotaResultSize - max results just from server side configuration and quotas, without
// user's specified max. We use this for evaluating limits based on blocks (not cells).
contextBuilder.setSizeLimit(sizeScope, maxResultSize, maxResultSize, maxQuotaResultSize);
contextBuilder.setBatchLimit(scanner.getBatch());
contextBuilder.setTimeLimit(timeScope, timeLimit);
contextBuilder.setTrackMetrics(trackMetrics);
Expand Down Expand Up @@ -3398,7 +3398,6 @@ private void scan(HBaseRpcController controller, ScanRequest request, RegionScan
}
boolean mayHaveMoreCellsInRow = scannerContext.mayHaveMoreCellsInRow();
Result r = Result.create(values, null, stale, mayHaveMoreCellsInRow);
lastBlock.setValue(addSize(rpcCall, r, lastBlock.getValue()));
results.add(r);
numOfResults++;
if (!mayHaveMoreCellsInRow && limitOfRows > 0) {
Expand Down Expand Up @@ -3431,8 +3430,10 @@ private void scan(HBaseRpcController controller, ScanRequest request, RegionScan
// there are more values to be read server side. If there aren't more values,
// marking it as a heartbeat is wasteful because the client will need to issue
// another ScanRequest only to realize that they already have all the values
if (moreRows && timeLimitReached) {
// Heartbeat messages occur when the time limit has been reached.
if (moreRows && (timeLimitReached || (sizeLimitReached && results.isEmpty()))) {
// Heartbeat messages occur when the time limit has been reached, or size limit has
// been reached before collecting any results. This can happen for heavily filtered
// scans which scan over too many blocks.
builder.setHeartbeatMessage(true);
if (rsh.needCursor) {
Cell cursorCell = scannerContext.getLastPeekedCell();
Expand All @@ -3445,6 +3446,10 @@ private void scan(HBaseRpcController controller, ScanRequest request, RegionScan
}
values.clear();
}
if (rpcCall != null) {
rpcCall.incrementResponseBlockSize(scannerContext.getBlockSizeProgress());
rpcCall.incrementResponseCellSize(scannerContext.getHeapSizeProgress());
}
builder.setMoreResultsInRegion(moreRows);
// Check to see if the client requested that we track metrics server side. If the
// client requested metrics, retrieve the metrics from the scanner context.
Expand Down Expand Up @@ -3606,7 +3611,6 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque
} else {
limitOfRows = -1;
}
MutableObject<Object> lastBlock = new MutableObject<>();
boolean scannerClosed = false;
try {
List<Result> results = new ArrayList<>(Math.min(rows, 512));
Expand All @@ -3616,8 +3620,9 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque
if (region.getCoprocessorHost() != null) {
Boolean bypass = region.getCoprocessorHost().preScannerNext(scanner, results, rows);
if (!results.isEmpty()) {
Object lastBlock = null;
for (Result r : results) {
lastBlock.setValue(addSize(rpcCall, r, lastBlock.getValue()));
lastBlock = addSize(rpcCall, r, lastBlock);
}
}
if (bypass != null && bypass.booleanValue()) {
Expand All @@ -3626,7 +3631,7 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque
}
if (!done) {
scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, limitOfRows,
results, builder, lastBlock, rpcCall);
results, builder, rpcCall);
} else {
builder.setMoreResultsInRegion(!results.isEmpty());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,8 @@ private boolean nextInternal(List<Cell> results, ScannerContext scannerContext)
results.clear();

// Read nothing as the rowkey was filtered, but still need to check time limit
if (scannerContext.checkTimeLimit(limitScope)) {
// We also check size limit because we might have read blocks in getting to this point.
if (scannerContext.checkAnyLimitReached(limitScope)) {
return true;
}
continue;
Expand Down Expand Up @@ -561,8 +562,9 @@ private boolean nextInternal(List<Cell> results, ScannerContext scannerContext)
// This row was totally filtered out, if this is NOT the last row,
// we should continue on. Otherwise, nothing else to do.
if (!shouldStop) {
// Read nothing as the cells was filtered, but still need to check time limit
if (scannerContext.checkTimeLimit(limitScope)) {
// Read nothing as the cells was filtered, but still need to check time limit.
// We also check size limit because we might have read blocks in getting to this point.
if (scannerContext.checkAnyLimitReached(limitScope)) {
return true;
}
continue;
Expand Down Expand Up @@ -608,6 +610,11 @@ private boolean nextInternal(List<Cell> results, ScannerContext scannerContext)
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
}
if (!shouldStop) {
// Read nothing as the cells were filtered, but still need to check time limit.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need to add a check here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to check the size limit any time we are potentially continuing the loop. This was the only case we missed, the others (which i converted from checkTimeLimit to checkAnyLimit above) are all similar.

Since nextRow is now accumulating block size, we want to check after calling nextRow to ensure we haven't exceeded the limit.

I could make this checkSizeLimit if you'd like. I made it checkAnyLimitReached so that it is the same as the other calls above, which were just checking time limit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest, I think this should have been here all along and was just missed along the way. I'm not sure why we'd want to check time limit for the nextRow calls above but not this one. This check here ensures that populating from joined heap + nextRow does not exceed time or (new) size limit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is on the critial path of reading, and the code here will executed every time when we get a row, so it may affect scan performance if we add more checks here.
I just mean is it a must to have a check here? Why we do not need to check here in the past...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, this section of the method is not nearly as hot as the rest of the method. The only real way we reach this point is when filter.filterRowCells(kvs) clears all cells from the results after having been accumulated in StoreScanner. There are only 2 standard filters which do this -- DependentColumnFilter and SingleColumnValueExcludeFilter.

That said, you do make a good point. We have never had a time limit here, so we may not need it. We do need a size limit check here, now that we track block sizes. Previously, we would not check size limit here because the results are empty so wouldn't have accumulated size progress. Now that we accumulate block size progress even for filtered rows, we need a check.

For this type of scan, we will have accumulated blocks in both populateResults() and possibly nextRow(). Right after populateResults() there's a scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS) call. That call doesn't protect against this case, because it passes BETWEEN_CELLS. For scans with filter.hasFilterRow(), the limit scope is changed to LimitScope.BETWEEN_ROWS. So this check is skipped for these. Scans which enter this code block will have skipped all other limit checks above. The checkSizeLimit I add here is the only safe place we can check BETWEEN_ROWS for these types of filtered scans.

The best way to illustrate this is with a test -- I just pushed a change which does the following:

  1. Change this line to just checkSizeLimit
  2. Adds a new test testCheckLimitAfterFilteringRowCells

If I comment out this checkSizeLimit, the added test fails -- the whole scan is able to complete in 1 rpc instead of the expected 4. So this illustrates that we need to have a size check here.

Personally I think it's also accurate to have a time limit check here, because for these types of scans I think they'd be able to circumvent our existing time limits. But within the scope of this JIRA, I can keep it to just size limit for now.

// We also check size limit because we might have read blocks in getting to this point.
if (scannerContext.checkAnyLimitReached(limitScope)) {
return true;
}
continue;
}
}
Expand Down Expand Up @@ -705,13 +712,21 @@ public int size() {

protected boolean nextRow(ScannerContext scannerContext, Cell curRowCell) throws IOException {
assert this.joinedContinuationRow == null : "Trying to go to next row during joinedHeap read.";

// Enable skipping row mode, which disables limits and skips tracking progress for all
// but block size. We keep tracking block size because skipping a row in this way
// might involve reading blocks along the way.
scannerContext.setSkippingRow(true);

Cell next;
while ((next = this.storeHeap.peek()) != null && CellUtil.matchingRows(next, curRowCell)) {
// Check for thread interrupt status in case we have been signaled from
// #interruptRegionOperation.
region.checkInterrupt();
this.storeHeap.next(MOCKED_LIST);
this.storeHeap.next(MOCKED_LIST, scannerContext);
}

scannerContext.setSkippingRow(false);
resetFilters();

// Calling the hook in CP which allows it to do a fast forward
Expand Down
Loading