Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2078,6 +2078,7 @@ private void registerConfigurationObservers() {
configurationManager.registerObserver(this.cacheFlusher);
configurationManager.registerObserver(this.rpcServices);
configurationManager.registerObserver(this);
configurationManager.registerObserver(RegionScannerLimiter.create(conf));
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.PrivateCellUtil;
Expand Down Expand Up @@ -629,7 +630,8 @@ private boolean nextInternal(List<Cell> results, ScannerContext scannerContext)
}
}

private void incrementCountOfRowsFilteredMetric(ScannerContext scannerContext) {
private void incrementCountOfRowsFilteredMetric(ScannerContext scannerContext)
throws DoNotRetryIOException {
region.filteredReadRequestsCount.increment();
if (region.getMetrics() != null) {
region.getMetrics().updateFilteredRecords();
Expand All @@ -639,7 +641,15 @@ private void incrementCountOfRowsFilteredMetric(ScannerContext scannerContext) {
return;
}

scannerContext.getMetrics().countOfRowsFiltered.incrementAndGet();
long countOfRowsFiltered = scannerContext.getMetrics().countOfRowsFiltered.incrementAndGet();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd better not mix things up with metrics? Metrics is not a critical thing usually.

long maxRowsFilteredPerRequest = RegionScannerLimiter.get().getMaxRowsFilteredPerRequest();
if (maxRowsFilteredPerRequest > 0 && countOfRowsFiltered >= maxRowsFilteredPerRequest) {
String errMsg = String.format(
"Too many rows filtered, higher than the limit threshold of %s, so kill the scan request!",
maxRowsFilteredPerRequest);
LOG.warn("ScannerContext={}, errMsg={}", scannerContext, errMsg);
throw new DoNotRetryIOException(errMsg);
}
}

private void incrementCountOfRowsScannedMetric(ScannerContext scannerContext) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Limit max count of rows filtered per scan request.
*/
@InterfaceAudience.Private
public class RegionScannerLimiter implements ConfigurationObserver {

private static final Logger LOG = LoggerFactory.getLogger(RegionScannerLimiter.class);

public static final String HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY =
"hbase.server.scanner.max.rows.filtered.per.request";

private static RegionScannerLimiter INSTANCE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better not use the singleton pattern here. Just create a RegionScannerLimiter per region server and store it as a field of HRegionServer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.


// Max count of rows filtered per request. If zero, it means no limitation.
private volatile long maxRowsFilteredPerRequest = 0;

private RegionScannerLimiter(Configuration conf) {
updateLimiterConf(conf, HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY);
}

private void updateLimiterConf(Configuration conf, String configKey) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The config key is always HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY? Do we really need to pass it as a parameter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

try {
if (conf.get(configKey) == null) {
return;
}

long targetValue = conf.getLong(configKey, -1);
if (targetValue < 0) {
LOG.warn("Invalid parameter, should be greater than or equal to zero, target value: {}",
targetValue);
return;
}
if (maxRowsFilteredPerRequest == targetValue) {
return;
}

LOG.info("Config key={}, old value={}, new value={}", configKey, maxRowsFilteredPerRequest,
targetValue);
this.maxRowsFilteredPerRequest = targetValue;
} catch (Exception e) {
LOG.error("Failed to update config key: {}", configKey, e);
}
}

public long getMaxRowsFilteredPerRequest() {
return this.maxRowsFilteredPerRequest;
}

public static RegionScannerLimiter get() {
return INSTANCE;
}

public static synchronized RegionScannerLimiter create(Configuration conf) {
if (INSTANCE == null) {
INSTANCE = new RegionScannerLimiter(conf);
}
return INSTANCE;
}

@Override
public void onConfigurationChange(Configuration conf) {
updateLimiterConf(conf, HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@
package org.apache.hadoop.hbase;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
Expand All @@ -39,6 +43,8 @@
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionScannerLimiter;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
Expand Down Expand Up @@ -263,6 +269,40 @@ public void testRowsFilteredMetric() throws Exception {
testRowsSeenMetric(baseScan);
}

@Test
public void testRowsFilteredMetricLimiter() throws Exception {
// Base scan configuration
Scan baseScan;
baseScan = new Scan();
baseScan.setScanMetricsEnabled(true);

// No matching column value should exist in any row. Filter all rows
Filter filter =
new SingleColumnValueFilter(FAMILIES[0], QUALIFIERS[0], CompareOperator.NOT_EQUAL, VALUE);
testRowsFilteredMetric(baseScan, filter, ROWS.length);

HRegionServer rs1;
// update server side max count of rows filtered config.
try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(TABLE.getName())) {
RegionInfo firstHRI = locator.getAllRegionLocations().get(0).getRegion();
rs1 = TEST_UTIL.getHBaseCluster()
.getRegionServer(TEST_UTIL.getHBaseCluster().getServerWith(firstHRI.getRegionName()));
}

Configuration conf = TEST_UTIL.getConfiguration();
// set max rows filtered limitation.
conf.setLong(RegionScannerLimiter.HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY, 5);
rs1.getConfigurationManager().notifyAllObservers(conf);

assertThrows("Should throw a DoNotRetryIOException when too many rows have been filtered.",
DoNotRetryIOException.class, () -> testRowsFilteredMetric(baseScan, filter, ROWS.length));

// no max rows filtered limitation.
conf.setLong(RegionScannerLimiter.HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY, 0);
rs1.getConfigurationManager().notifyAllObservers(conf);
testRowsFilteredMetric(baseScan, filter, ROWS.length);
}

private void testRowsFilteredMetric(Scan baseScan) throws Exception {
testRowsFilteredMetric(baseScan, null, 0);

Expand Down