From ce997b8f2b592d8b0d058cc7b436d8ef6b33606d Mon Sep 17 00:00:00 2001 From: Bryan Beaudreault Date: Sat, 25 Feb 2023 11:30:20 -0500 Subject: [PATCH 1/4] HBASE-27666 Allow preCompact hooks to return scanners whose cells can be shipped --- .../hbase/coprocessor/RegionObserver.java | 10 ++ .../hadoop/hbase/regionserver/Shipper.java | 3 +- .../regionserver/compactions/Compactor.java | 8 +- .../DelegatingInternalScanner.java | 15 ++- ...TestCompactionWithShippingCoprocessor.java | 94 +++++++++++++++++++ 5 files changed, 124 insertions(+), 6 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionWithShippingCoprocessor.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java index d37013534b17..018826644acb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScanOptions; import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.hadoop.hbase.regionserver.Shipper; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFileReader; @@ -277,6 +278,15 @@ default void preCompactScannerOpen(ObserverContext * {@link InternalScanner} with a custom implementation that is returned from this method. The * custom scanner can then inspect {@link org.apache.hadoop.hbase.Cell}s from the wrapped scanner, * applying its own policy to what gets written. + *

+ * If implementations are wrapping the passed in {@link InternalScanner}, they can also have their + * implementation implement {@link Shipper} and delegate to the original scanner. This will cause + * compactions to free up memory as they progress, which is especially important for people using + * off-heap memory pools. + *

+ * Keep in mind that when {@link Shipper#shipped()} is called, any cell references you maintain in + * your implementation may get corrupted. As such you should make sure to deep clone any cells + * that you need to keep reference to across invocations of shipped. * @param c the environment provided by the region server * @param store the store being compacted * @param scanner the scanner over existing data used in the store file rewriting diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Shipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Shipper.java index a7e60c07d298..f4f723ed9ea4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Shipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Shipper.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.yetus.audience.InterfaceAudience; /** @@ -25,7 +26,7 @@ * to server and fetch N rows/RPC. These are then shipped to client. At the end of every such batch * {@link #shipped()} will get called. */ -@InterfaceAudience.Private +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) public interface Shipper { /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index bd9ce6035ad4..363fa29909e0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -46,10 +46,10 @@ import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.HStoreFile; import org.apache.hadoop.hbase.regionserver.InternalScanner; -import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.ScanInfo; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.ScannerContext; +import org.apache.hadoop.hbase.regionserver.Shipper; import org.apache.hadoop.hbase.regionserver.ShipperListener; import org.apache.hadoop.hbase.regionserver.StoreFileReader; import org.apache.hadoop.hbase.regionserver.StoreFileScanner; @@ -433,7 +433,7 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); throughputController.start(compactionName); - KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null; + Shipper shipper = (scanner instanceof Shipper) ? (Shipper) scanner : null; long shippedCallSizeLimit = (long) request.getFiles().size() * this.store.getColumnFamilyDescriptor().getBlocksize(); try { @@ -473,7 +473,7 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel return false; } } - if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) { + if (shipper != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) { if (lastCleanCell != null) { // HBASE-16931, set back sequence id to avoid affecting scan order unexpectedly. // ShipperListener will do a clone of the last cells it refer, so need to set back @@ -489,7 +489,7 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel // we are doing the similar thing. In between the compaction (after every N cells // written with collective size of 'shippedCallSizeLimit') we will call shipped which // may clear prevBlocks list. - kvs.shipped(); + shipper.shipped(); bytesWrittenProgressForShippedCall = 0; } if (lastCleanCell != null) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingInternalScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingInternalScanner.java index b7ba8086280c..18a18eea26b9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingInternalScanner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingInternalScanner.java @@ -19,11 +19,17 @@ import java.io.IOException; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.hbase.Cell; import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private -public class DelegatingInternalScanner implements InternalScanner { +public class DelegatingInternalScanner implements InternalScanner, Shipper { + + /** + * For use in testing whether shipped is called + */ + public static final AtomicInteger SHIPPED_COUNT = new AtomicInteger(); protected final InternalScanner scanner; @@ -41,4 +47,11 @@ public void close() throws IOException { scanner.close(); } + @Override + public void shipped() throws IOException { + if (scanner instanceof Shipper) { + SHIPPED_COUNT.incrementAndGet(); + ((Shipper) scanner).shipped(); + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionWithShippingCoprocessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionWithShippingCoprocessor.java new file mode 100644 index 000000000000..fab54b14b6ee --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionWithShippingCoprocessor.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.hamcrest.MatcherAssert.*; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.hamcrest.Matchers; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category({ LargeTests.class, ClientTests.class }) +public class TestCompactionWithShippingCoprocessor { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestCompactionWithShippingCoprocessor.class); + + protected final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + private static final byte[] FAMILY = Bytes.toBytes("testFamily"); + + @Rule + public TestName name = new TestName(); + + /** + * @throws java.lang.Exception + */ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);// do not retry + TEST_UTIL.startMiniCluster(1); + } + + /** + * @throws java.lang.Exception + */ + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testCoprocScannersExtendingShipperGetShipped() throws Exception { + int shippedCountBefore = DelegatingInternalScanner.SHIPPED_COUNT.get(); + final TableName tableName = TableName.valueOf(name.getMethodName()); + // Create a table with block size as 1024 + final Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY }, 1, 1024, + NoOpScanPolicyObserver.class.getName()); + TEST_UTIL.loadTable(table, FAMILY); + TEST_UTIL.flush(); + try { + // get the block cache and region + RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); + String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); + HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); + // trigger a major compaction + TEST_UTIL.compact(true); + assertThat(DelegatingInternalScanner.SHIPPED_COUNT.get(), + Matchers.greaterThan(shippedCountBefore)); + } finally { + table.close(); + } + } +} From f3fa556623051719df6da3c80a76bf30913e6615 Mon Sep 17 00:00:00 2001 From: Bryan Beaudreault Date: Sat, 25 Feb 2023 12:32:39 -0500 Subject: [PATCH 2/4] checkstyle --- .../TestCompactionWithShippingCoprocessor.java | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionWithShippingCoprocessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionWithShippingCoprocessor.java index fab54b14b6ee..ae0b5392fb34 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionWithShippingCoprocessor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionWithShippingCoprocessor.java @@ -17,7 +17,7 @@ */ package org.apache.hadoop.hbase.regionserver; -import static org.hamcrest.MatcherAssert.*; +import static org.hamcrest.MatcherAssert.assertThat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -51,9 +51,6 @@ public class TestCompactionWithShippingCoprocessor { @Rule public TestName name = new TestName(); - /** - * @throws java.lang.Exception - */ @BeforeClass public static void setUpBeforeClass() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); @@ -61,9 +58,6 @@ public static void setUpBeforeClass() throws Exception { TEST_UTIL.startMiniCluster(1); } - /** - * @throws java.lang.Exception - */ @AfterClass public static void tearDownAfterClass() throws Exception { TEST_UTIL.shutdownMiniCluster(); From 18d66c39753e1b69dea31d5098713516dab36b98 Mon Sep 17 00:00:00 2001 From: Bryan Beaudreault Date: Tue, 28 Feb 2023 09:37:54 -0500 Subject: [PATCH 3/4] Don't overload DelegatingInternalScanner --- .../DelegatingInternalScanner.java | 15 +--- ...TestCompactionWithShippingCoprocessor.java | 72 +++++++++++++++++-- 2 files changed, 66 insertions(+), 21 deletions(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingInternalScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingInternalScanner.java index 18a18eea26b9..b7ba8086280c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingInternalScanner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingInternalScanner.java @@ -19,17 +19,11 @@ import java.io.IOException; import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.hbase.Cell; import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private -public class DelegatingInternalScanner implements InternalScanner, Shipper { - - /** - * For use in testing whether shipped is called - */ - public static final AtomicInteger SHIPPED_COUNT = new AtomicInteger(); +public class DelegatingInternalScanner implements InternalScanner { protected final InternalScanner scanner; @@ -47,11 +41,4 @@ public void close() throws IOException { scanner.close(); } - @Override - public void shipped() throws IOException { - if (scanner instanceof Shipper) { - SHIPPED_COUNT.incrementAndGet(); - ((Shipper) scanner).shipped(); - } - } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionWithShippingCoprocessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionWithShippingCoprocessor.java index ae0b5392fb34..ce9db479f543 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionWithShippingCoprocessor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionWithShippingCoprocessor.java @@ -18,16 +18,26 @@ package org.apache.hadoop.hbase.regionserver; import static org.hamcrest.MatcherAssert.assertThat; - +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.testclassification.ClientTests; -import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.testclassification.CoprocessorTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.hamcrest.Matchers; import org.junit.AfterClass; @@ -38,9 +48,11 @@ import org.junit.experimental.categories.Category; import org.junit.rules.TestName; -@Category({ LargeTests.class, ClientTests.class }) +@Category({ MediumTests.class, CoprocessorTests.class }) public class TestCompactionWithShippingCoprocessor { + private static final AtomicInteger SHIPPED_COUNT = new AtomicInteger(); + @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestCompactionWithShippingCoprocessor.class); @@ -63,13 +75,17 @@ public static void tearDownAfterClass() throws Exception { TEST_UTIL.shutdownMiniCluster(); } + /** + * Verifies that if a coproc returns an InternalScanner which implements Shipper, the shippped + * method is appropriately called in Compactor. + */ @Test public void testCoprocScannersExtendingShipperGetShipped() throws Exception { - int shippedCountBefore = DelegatingInternalScanner.SHIPPED_COUNT.get(); + int shippedCountBefore = SHIPPED_COUNT.get(); final TableName tableName = TableName.valueOf(name.getMethodName()); // Create a table with block size as 1024 final Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY }, 1, 1024, - NoOpScanPolicyObserver.class.getName()); + CompactionObserver.class.getName()); TEST_UTIL.loadTable(table, FAMILY); TEST_UTIL.flush(); try { @@ -79,10 +95,52 @@ public void testCoprocScannersExtendingShipperGetShipped() throws Exception { HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); // trigger a major compaction TEST_UTIL.compact(true); - assertThat(DelegatingInternalScanner.SHIPPED_COUNT.get(), + assertThat(SHIPPED_COUNT.get(), Matchers.greaterThan(shippedCountBefore)); } finally { table.close(); } } + + public static class CompactionObserver implements RegionCoprocessor, RegionObserver { + + @Override + public Optional getRegionObserver() { + return Optional.of(this); + } + + @Override + public InternalScanner preCompact(ObserverContext c, Store store, + InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker, + CompactionRequest request) throws IOException { + return new ShippedObservingScanner(scanner); + } + } + + public static class ShippedObservingScanner implements InternalScanner, Shipper { + + protected final InternalScanner scanner; + + public ShippedObservingScanner(InternalScanner scanner) { + this.scanner = scanner; + } + + @Override + public boolean next(List result, ScannerContext scannerContext) throws IOException { + return scanner.next(result, scannerContext); + } + + @Override + public void close() throws IOException { + scanner.close(); + } + + @Override + public void shipped() throws IOException { + if (scanner instanceof Shipper) { + SHIPPED_COUNT.incrementAndGet(); + ((Shipper) scanner).shipped(); + } + } + } } From 3ccea8e9708d464c47bc7ef926b206f6c12001fb Mon Sep 17 00:00:00 2001 From: Bryan Beaudreault Date: Tue, 28 Feb 2023 09:38:42 -0500 Subject: [PATCH 4/4] spotless --- .../regionserver/TestCompactionWithShippingCoprocessor.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionWithShippingCoprocessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionWithShippingCoprocessor.java index ce9db479f543..d8c5c8aa5959 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionWithShippingCoprocessor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionWithShippingCoprocessor.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.regionserver; import static org.hamcrest.MatcherAssert.assertThat; + import java.io.IOException; import java.util.List; import java.util.Optional; @@ -95,8 +96,7 @@ public void testCoprocScannersExtendingShipperGetShipped() throws Exception { HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); // trigger a major compaction TEST_UTIL.compact(true); - assertThat(SHIPPED_COUNT.get(), - Matchers.greaterThan(shippedCountBefore)); + assertThat(SHIPPED_COUNT.get(), Matchers.greaterThan(shippedCountBefore)); } finally { table.close(); }