Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import static org.apache.hadoop.hbase.client.ConnectionUtils.isRemote;
import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Scope;
Expand All @@ -40,12 +39,11 @@
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.client.trace.TableOperationSpanBuilder;
import org.apache.hadoop.hbase.filter.ScanRangeOptimizer;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.io.netty.util.Timer;

import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface;
Expand Down Expand Up @@ -105,6 +103,7 @@ public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableN
if (scan.getStopRow() == null) {
scan.withStopRow(EMPTY_END_ROW, scan.includeStopRow());
}
ScanRangeOptimizer.optimize(scan);
this.scan = scan;
this.consumer = consumer;
this.tableName = tableName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ public class Scan extends Query {

private boolean needCursorResult = false;

private boolean enableOptimization = false;

/**
* Create a Scan operation across all rows.
*/
Expand Down Expand Up @@ -990,4 +992,20 @@ public boolean isNeedCursorResult() {
public static Scan createScanFromCursor(Cursor cursor) {
return new Scan().withStartRow(cursor.getRow());
}

/**
* allow optimization of start row and stop row based on {@link Filter} by
* {@link org.apache.hadoop.hbase.filter.ScanRangeOptimizer}
*/
public Scan enableOptimization() {
this.enableOptimization = true;
return this;
}

/**
* Whether to allow optimization of start row and stop row based on {@link Filter}
*/
public boolean optimizationEnabled() {
return enableOptimization;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.filter;

import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;

import org.apache.hadoop.hbase.client.ClientUtil;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;

/**
* optimize scan range through filter to reduce unnecessary reading of data.
*/
@InterfaceAudience.Private
public final class ScanRangeOptimizer {

private ScanRangeOptimizer() {}

public static void optimize(Scan scan) {
if (!scan.optimizationEnabled()) {
return;
}
if (scan.hasFilter()) {
byte[] largestPossibleStartRow = findLargestPossibleStartRow(scan.getFilter());
byte[] smallestPossibleStopRow = findSmallestPossibleStopRow(scan.getFilter());
if (largestPossibleStartRow != null) {
if (scan.getStartRow() == null) {
// always include the startrow. if the startrow is unwanted, it will be filtered
// out by filters.
scan.withStartRow(largestPossibleStartRow, /* inclusive= */true);
} else {
if (Bytes.compareTo(largestPossibleStartRow, scan.getStartRow()) != 0) {
scan.withStartRow(largestPossibleStartRow, /* inclusive= */true);
}
}
}
if (smallestPossibleStopRow != null) {
if (scan.getStopRow() != null) {
// always include the stoprow. if the stoprow is unwanted, it will be filtered
// out by filters.
scan.withStopRow(smallestPossibleStopRow, /* inclusive= */true);
} else {
if (Bytes.compareTo(smallestPossibleStopRow, scan.getStopRow()) != 0) {
scan.withStopRow(smallestPossibleStopRow, /* inclusive= */true);
}
}
}
}
}

private static byte[] findLargestPossibleStartRow(Filter filter) {
if (filter instanceof PrefixFilter) {
return findLargestPossibleStartRowInPrefixFilter((PrefixFilter) filter);
} else if (filter instanceof RowFilter) {
return findLargestPossibleStartRowInRowFilter((RowFilter) filter);
} else if (filter instanceof FilterListWithAND) {
return findLargestPossibleStartRowInFilterList((FilterListWithAND) filter, Bytes::max);
} else if (filter instanceof FilterListWithOR) {
return findLargestPossibleStartRowInFilterList((FilterListWithOR) filter, Bytes::min);
}
return null;
}

private static byte[] findLargestPossibleStartRowInPrefixFilter(PrefixFilter filter) {
return filter.getPrefix();
}

private static byte[] findLargestPossibleStartRowInRowFilter(RowFilter filter) {
switch (filter.getCompareOperator()) {
case GREATER:
case GREATER_OR_EQUAL:
case EQUAL:
return filter.getComparator().getValue();
default:
return null;
}
}

private static byte[] findLargestPossibleStartRowInFilterList(
FilterListBase filterList, Aggregator<byte[]> aggregator) {
byte[] ret = null;
byte[] tmp = null;
for (Filter filter : filterList.getFilters()) {
if (filter instanceof PrefixFilter) {
tmp = findLargestPossibleStartRowInPrefixFilter((PrefixFilter) filter);
} else if (filter instanceof RowFilter) {
tmp = findLargestPossibleStartRowInRowFilter((RowFilter) filter);
} else if (filter instanceof FilterListBase) {
tmp = findLargestPossibleStartRowInFilterList((FilterListBase) filter, aggregator);
}
if (tmp == null) {
continue;
}
if (ret == null) {
ret = tmp;
} else {
ret = aggregator.aggregate(ret, tmp);
}
}
return ret;
}

private static byte[] findSmallestPossibleStopRow(Filter filter) {
if (filter instanceof PrefixFilter) {
return findSmallestPossibleStopRowInPrefixFilter((PrefixFilter) filter);
} else if (filter instanceof RowFilter) {
return findSmallestPossibleStopRowInRowFilter((RowFilter) filter);
} else if (filter instanceof FilterListWithAND) {
return findSmallestPossibleStopRowInFilterList((FilterListWithAND) filter, Bytes::min);
} else if (filter instanceof FilterListWithOR) {
return findSmallestPossibleStopRowInFilterList((FilterListWithOR) filter, Bytes::max);
}
return null;
}

private static byte[] findSmallestPossibleStopRowInPrefixFilter(PrefixFilter filter) {
if (filter == null) {
return null;
} else {
return ClientUtil.calculateTheClosestNextRowKeyForPrefix(filter.getPrefix());
}
}

private static byte[] findSmallestPossibleStopRowInRowFilter(RowFilter filter) {
switch (filter.getCompareOperator()) {
case LESS:
case LESS_OR_EQUAL:
case EQUAL:
return filter.getComparator().getValue();
default:
return null;
}
}

private static byte[] findSmallestPossibleStopRowInFilterList(
FilterListBase filterList, Aggregator<byte[]> aggregator) {
byte[] ret = null;
byte[] tmp = null;
for (Filter filter : filterList.getFilters()) {
if (filter instanceof PrefixFilter) {
tmp = findSmallestPossibleStopRowInPrefixFilter((PrefixFilter) filter);
} else if (filter instanceof RowFilter) {
tmp = findSmallestPossibleStopRowInRowFilter((RowFilter) filter);
} else if (filter instanceof FilterListBase) {
tmp = findSmallestPossibleStopRowInFilterList((FilterListBase) filter, aggregator);
}
if (tmp == null) {
continue;
}
if (ret == null) {
ret = tmp;
} else {
ret = aggregator.aggregate(ret, tmp);
}
}
return ret;
}

private interface Aggregator<T> {
T aggregate(T first, T second);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.filter;

import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.client.ClientUtil;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.util.Arrays;

@Category({ MiscTests.class, SmallTests.class })
public class TestScanRangeOptimizer {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestScanRangeOptimizer.class);

@Test
public void testOptimizePrefixFilter() {
// scan data where rowkey starts with 'aaa'
byte[] prefix = Bytes.toBytes("aaa");
byte[] prefixNext = ClientUtil.calculateTheClosestNextRowKeyForPrefix(prefix); // ="aab"

Scan scan = new Scan().setFilter(new PrefixFilter(prefix));
ScanRangeOptimizer.optimize(scan);
Assert.assertEquals(0, Bytes.compareTo(prefix, scan.getStartRow()));
Assert.assertEquals(0, Bytes.compareTo(prefixNext, scan.getStopRow()));
}

@Test
public void testOptimizeRowFilter() {
// scan data where rowkey > 'hhh' and rowkey < 'mmm'
RowFilter low = new RowFilter(CompareOperator.GREATER,
new BinaryComparator(Bytes.toBytes("hhh")));
RowFilter high = new RowFilter(CompareOperator.LESS,
new BinaryComparator(Bytes.toBytes("mmm")));

Scan scan = new Scan()
.withStartRow(Bytes.toBytes("aaa"))
.withStartRow(Bytes.toBytes("zzz"))
.setFilter(new FilterListWithAND(Arrays.asList(low, high)));
ScanRangeOptimizer.optimize(scan);
Assert.assertEquals(0, Bytes.compareTo(Bytes.toBytes("hhh"), scan.getStartRow()));
Assert.assertEquals(0, Bytes.compareTo(Bytes.toBytes("mmm"), scan.getStopRow()));
}

@Test
public void testOptimizeFilterList() {
// scan data where (rowkey start with 'prefix') or (rowkey >= 'hhh' and rowkey < 'mmm')
PrefixFilter prefixFilter = new PrefixFilter(Bytes.toBytes("prefix"));
FilterListWithAND and = new FilterListWithAND(Arrays.asList(
new RowFilter(CompareOperator.GREATER_OR_EQUAL, new BinaryComparator(Bytes.toBytes("hhh"))),
new RowFilter(CompareOperator.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes("mmm")))));
FilterListWithOR or = new FilterListWithOR(Arrays.asList(and, prefixFilter));

Scan scan = new Scan().setFilter(or);
ScanRangeOptimizer.optimize(scan);
Assert.assertEquals(0, Bytes.compareTo(Bytes.toBytes("hhh"), scan.getStartRow()));
Assert.assertEquals(0, Bytes.compareTo(Bytes.toBytes("prefiy"), scan.getStopRow()));
}
}
16 changes: 16 additions & 0 deletions hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java
Original file line number Diff line number Diff line change
Expand Up @@ -2530,4 +2530,20 @@ public static int findCommonPrefix(byte[] left, byte[] right, int leftLength, in
return CommonPrefixerHolder.BEST_COMMON_PREFIXER.findCommonPrefix(left, leftOffset, leftLength,
right, rightOffset, rightLength);
}

public static byte[] min(byte[] left, byte[] right) {
if (compareTo(left, right) <= 0) {
return left;
} else {
return right;
}
}

public static byte[] max(byte[] left, byte[] right) {
if (compareTo(left, right) >= 0) {
return left;
} else {
return right;
}
}
}