diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java index b61f5b80c9e7..c50c449cb430 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java @@ -27,7 +27,6 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.isRemote; import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead; import static org.apache.hadoop.hbase.util.FutureUtils.addListener; - import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.context.Scope; @@ -40,12 +39,11 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.client.trace.TableOperationSpanBuilder; +import org.apache.hadoop.hbase.filter.ScanRangeOptimizer; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.yetus.audience.InterfaceAudience; - import org.apache.hbase.thirdparty.io.netty.util.Timer; - import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface; @@ -105,6 +103,7 @@ public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableN if (scan.getStopRow() == null) { scan.withStopRow(EMPTY_END_ROW, scan.includeStopRow()); } + ScanRangeOptimizer.optimize(scan); this.scan = scan; this.consumer = consumer; this.tableName = tableName; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java index b9adefb40cde..8bb685ac5fa3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java @@ -160,6 +160,8 @@ public class Scan extends Query { private boolean needCursorResult = false; + private boolean enableOptimization = false; + /** * Create a Scan operation across all rows. */ @@ -990,4 +992,20 @@ public boolean isNeedCursorResult() { public static Scan createScanFromCursor(Cursor cursor) { return new Scan().withStartRow(cursor.getRow()); } + + /** + * allow optimization of start row and stop row based on {@link Filter} by + * {@link org.apache.hadoop.hbase.filter.ScanRangeOptimizer} + */ + public Scan enableOptimization() { + this.enableOptimization = true; + return this; + } + + /** + * Whether to allow optimization of start row and stop row based on {@link Filter} + */ + public boolean optimizationEnabled() { + return enableOptimization; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/ScanRangeOptimizer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/ScanRangeOptimizer.java new file mode 100644 index 000000000000..b9a654153591 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/ScanRangeOptimizer.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.filter; + +import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW; +import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW; + +import org.apache.hadoop.hbase.client.ClientUtil; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * optimize scan range through filter to reduce unnecessary reading of data. + */ +@InterfaceAudience.Private +public final class ScanRangeOptimizer { + + private ScanRangeOptimizer() {} + + public static void optimize(Scan scan) { + if (!scan.optimizationEnabled()) { + return; + } + if (scan.hasFilter()) { + byte[] largestPossibleStartRow = findLargestPossibleStartRow(scan.getFilter()); + byte[] smallestPossibleStopRow = findSmallestPossibleStopRow(scan.getFilter()); + if (largestPossibleStartRow != null) { + if (scan.getStartRow() == null) { + // always include the startrow. if the startrow is unwanted, it will be filtered + // out by filters. + scan.withStartRow(largestPossibleStartRow, /* inclusive= */true); + } else { + if (Bytes.compareTo(largestPossibleStartRow, scan.getStartRow()) != 0) { + scan.withStartRow(largestPossibleStartRow, /* inclusive= */true); + } + } + } + if (smallestPossibleStopRow != null) { + if (scan.getStopRow() != null) { + // always include the stoprow. if the stoprow is unwanted, it will be filtered + // out by filters. + scan.withStopRow(smallestPossibleStopRow, /* inclusive= */true); + } else { + if (Bytes.compareTo(smallestPossibleStopRow, scan.getStopRow()) != 0) { + scan.withStopRow(smallestPossibleStopRow, /* inclusive= */true); + } + } + } + } + } + + private static byte[] findLargestPossibleStartRow(Filter filter) { + if (filter instanceof PrefixFilter) { + return findLargestPossibleStartRowInPrefixFilter((PrefixFilter) filter); + } else if (filter instanceof RowFilter) { + return findLargestPossibleStartRowInRowFilter((RowFilter) filter); + } else if (filter instanceof FilterListWithAND) { + return findLargestPossibleStartRowInFilterList((FilterListWithAND) filter, Bytes::max); + } else if (filter instanceof FilterListWithOR) { + return findLargestPossibleStartRowInFilterList((FilterListWithOR) filter, Bytes::min); + } + return null; + } + + private static byte[] findLargestPossibleStartRowInPrefixFilter(PrefixFilter filter) { + return filter.getPrefix(); + } + + private static byte[] findLargestPossibleStartRowInRowFilter(RowFilter filter) { + switch (filter.getCompareOperator()) { + case GREATER: + case GREATER_OR_EQUAL: + case EQUAL: + return filter.getComparator().getValue(); + default: + return null; + } + } + + private static byte[] findLargestPossibleStartRowInFilterList( + FilterListBase filterList, Aggregator aggregator) { + byte[] ret = null; + byte[] tmp = null; + for (Filter filter : filterList.getFilters()) { + if (filter instanceof PrefixFilter) { + tmp = findLargestPossibleStartRowInPrefixFilter((PrefixFilter) filter); + } else if (filter instanceof RowFilter) { + tmp = findLargestPossibleStartRowInRowFilter((RowFilter) filter); + } else if (filter instanceof FilterListBase) { + tmp = findLargestPossibleStartRowInFilterList((FilterListBase) filter, aggregator); + } + if (tmp == null) { + continue; + } + if (ret == null) { + ret = tmp; + } else { + ret = aggregator.aggregate(ret, tmp); + } + } + return ret; + } + + private static byte[] findSmallestPossibleStopRow(Filter filter) { + if (filter instanceof PrefixFilter) { + return findSmallestPossibleStopRowInPrefixFilter((PrefixFilter) filter); + } else if (filter instanceof RowFilter) { + return findSmallestPossibleStopRowInRowFilter((RowFilter) filter); + } else if (filter instanceof FilterListWithAND) { + return findSmallestPossibleStopRowInFilterList((FilterListWithAND) filter, Bytes::min); + } else if (filter instanceof FilterListWithOR) { + return findSmallestPossibleStopRowInFilterList((FilterListWithOR) filter, Bytes::max); + } + return null; + } + + private static byte[] findSmallestPossibleStopRowInPrefixFilter(PrefixFilter filter) { + if (filter == null) { + return null; + } else { + return ClientUtil.calculateTheClosestNextRowKeyForPrefix(filter.getPrefix()); + } + } + + private static byte[] findSmallestPossibleStopRowInRowFilter(RowFilter filter) { + switch (filter.getCompareOperator()) { + case LESS: + case LESS_OR_EQUAL: + case EQUAL: + return filter.getComparator().getValue(); + default: + return null; + } + } + + private static byte[] findSmallestPossibleStopRowInFilterList( + FilterListBase filterList, Aggregator aggregator) { + byte[] ret = null; + byte[] tmp = null; + for (Filter filter : filterList.getFilters()) { + if (filter instanceof PrefixFilter) { + tmp = findSmallestPossibleStopRowInPrefixFilter((PrefixFilter) filter); + } else if (filter instanceof RowFilter) { + tmp = findSmallestPossibleStopRowInRowFilter((RowFilter) filter); + } else if (filter instanceof FilterListBase) { + tmp = findSmallestPossibleStopRowInFilterList((FilterListBase) filter, aggregator); + } + if (tmp == null) { + continue; + } + if (ret == null) { + ret = tmp; + } else { + ret = aggregator.aggregate(ret, tmp); + } + } + return ret; + } + + private interface Aggregator { + T aggregate(T first, T second); + } +} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/filter/TestScanRangeOptimizer.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/filter/TestScanRangeOptimizer.java new file mode 100644 index 000000000000..aaff80fbd1aa --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/filter/TestScanRangeOptimizer.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.filter; + +import org.apache.hadoop.hbase.CompareOperator; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.client.ClientUtil; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.testclassification.MiscTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import java.util.Arrays; + +@Category({ MiscTests.class, SmallTests.class }) +public class TestScanRangeOptimizer { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestScanRangeOptimizer.class); + + @Test + public void testOptimizePrefixFilter() { + // scan data where rowkey starts with 'aaa' + byte[] prefix = Bytes.toBytes("aaa"); + byte[] prefixNext = ClientUtil.calculateTheClosestNextRowKeyForPrefix(prefix); // ="aab" + + Scan scan = new Scan().setFilter(new PrefixFilter(prefix)); + ScanRangeOptimizer.optimize(scan); + Assert.assertEquals(0, Bytes.compareTo(prefix, scan.getStartRow())); + Assert.assertEquals(0, Bytes.compareTo(prefixNext, scan.getStopRow())); + } + + @Test + public void testOptimizeRowFilter() { + // scan data where rowkey > 'hhh' and rowkey < 'mmm' + RowFilter low = new RowFilter(CompareOperator.GREATER, + new BinaryComparator(Bytes.toBytes("hhh"))); + RowFilter high = new RowFilter(CompareOperator.LESS, + new BinaryComparator(Bytes.toBytes("mmm"))); + + Scan scan = new Scan() + .withStartRow(Bytes.toBytes("aaa")) + .withStartRow(Bytes.toBytes("zzz")) + .setFilter(new FilterListWithAND(Arrays.asList(low, high))); + ScanRangeOptimizer.optimize(scan); + Assert.assertEquals(0, Bytes.compareTo(Bytes.toBytes("hhh"), scan.getStartRow())); + Assert.assertEquals(0, Bytes.compareTo(Bytes.toBytes("mmm"), scan.getStopRow())); + } + + @Test + public void testOptimizeFilterList() { + // scan data where (rowkey start with 'prefix') or (rowkey >= 'hhh' and rowkey < 'mmm') + PrefixFilter prefixFilter = new PrefixFilter(Bytes.toBytes("prefix")); + FilterListWithAND and = new FilterListWithAND(Arrays.asList( + new RowFilter(CompareOperator.GREATER_OR_EQUAL, new BinaryComparator(Bytes.toBytes("hhh"))), + new RowFilter(CompareOperator.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes("mmm"))))); + FilterListWithOR or = new FilterListWithOR(Arrays.asList(and, prefixFilter)); + + Scan scan = new Scan().setFilter(or); + ScanRangeOptimizer.optimize(scan); + Assert.assertEquals(0, Bytes.compareTo(Bytes.toBytes("hhh"), scan.getStartRow())); + Assert.assertEquals(0, Bytes.compareTo(Bytes.toBytes("prefiy"), scan.getStopRow())); + } +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java index 96b3dbd4a8a5..80587d5d05ce 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java @@ -2530,4 +2530,20 @@ public static int findCommonPrefix(byte[] left, byte[] right, int leftLength, in return CommonPrefixerHolder.BEST_COMMON_PREFIXER.findCommonPrefix(left, leftOffset, leftLength, right, rightOffset, rightLength); } + + public static byte[] min(byte[] left, byte[] right) { + if (compareTo(left, right) <= 0) { + return left; + } else { + return right; + } + } + + public static byte[] max(byte[] left, byte[] right) { + if (compareTo(left, right) >= 0) { + return left; + } else { + return right; + } + } }