diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java index da58dd8e1e53..e528e8b8f662 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java @@ -35,6 +35,7 @@ import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; +import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; @@ -151,36 +152,67 @@ CompletableFuture getRegionLocations(TableName tableName, byte[ }, AsyncRegionLocator::getRegionNames, supplier); } + private void internalAddListener(CompletableFuture future, + CompletableFuture locsFuture, TableName tableName, byte[] row, int replicaId, + RegionLocateType type) { + addListener(locsFuture, (locs, error) -> { + if (error != null) { + future.completeExceptionally(error); + return; + } + HRegionLocation loc = locs.getRegionLocation(replicaId); + if (loc == null) { + future.completeExceptionally( + new RegionOfflineException("No location for " + tableName + ", row='" + + Bytes.toStringBinary(row) + "', locateType=" + type + ", replicaId=" + replicaId)); + } else if (loc.getServerName() == null) { + future + .completeExceptionally(new RegionOfflineException("No server address listed for region '" + + loc.getRegion().getRegionNameAsString() + ", row='" + Bytes.toStringBinary(row) + + "', locateType=" + type + ", replicaId=" + replicaId)); + } else { + future.complete(loc); + } + }); + } + CompletableFuture getRegionLocation(TableName tableName, byte[] row, int replicaId, RegionLocateType type, boolean reload, long timeoutNs) { final Supplier supplier = new TableSpanBuilder(conn) .setName("AsyncRegionLocator.getRegionLocation").setTableName(tableName); return tracedLocationFuture(() -> { - // meta region can not be split right now so we always call the same method. - // Change it later if the meta table can have more than one regions. CompletableFuture future = new CompletableFuture<>(); - CompletableFuture locsFuture = isMeta(tableName) - ? metaRegionLocator.getRegionLocations(replicaId, reload) - : nonMetaRegionLocator.getRegionLocations(tableName, row, replicaId, type, reload); - addListener(locsFuture, (locs, error) -> { - if (error != null) { - future.completeExceptionally(error); - return; - } - HRegionLocation loc = locs.getRegionLocation(replicaId); - if (loc == null) { - future.completeExceptionally( - new RegionOfflineException("No location for " + tableName + ", row='" - + Bytes.toStringBinary(row) + "', locateType=" + type + ", replicaId=" + replicaId)); - } else if (loc.getServerName() == null) { - future.completeExceptionally( - new RegionOfflineException("No server address listed for region '" - + loc.getRegion().getRegionNameAsString() + ", row='" + Bytes.toStringBinary(row) - + "', locateType=" + type + ", replicaId=" + replicaId)); - } else { - future.complete(loc); - } - }); + if (replicaId == RegionReplicaUtil.DEFAULT_REPLICA_ID) { + // meta region can not be split right now so we always call the same method. + // Change it later if the meta table can have more than one regions. + CompletableFuture locsFuture = isMeta(tableName) + ? metaRegionLocator.getRegionLocations(replicaId, reload) + : nonMetaRegionLocator.getRegionLocations(tableName, row, replicaId, type, reload); + internalAddListener(future, locsFuture, tableName, row, replicaId, type); + } else { + addListener(conn.getAdmin().getDescriptor(tableName), (tdesc, error) -> { + if (error != null) { + future.completeExceptionally(error); + return; + } + int regionReplicationCount = tdesc.getRegionReplication(); + if (replicaId >= regionReplicationCount) { + future + .completeExceptionally(new DoNotRetryIOException("The specified region replica id " + + replicaId + " does not exist, the REGION_REPLICATION of this table " + + tableName.getNameAsString() + " is " + regionReplicationCount + "," + + " this means that the maximum region replica id you can specify is " + + (regionReplicationCount - 1) + ".")); + return; + } + // meta region can not be split right now so we always call the same method. + // Change it later if the meta table can have more than one regions. + CompletableFuture locsFuture = isMeta(tableName) + ? metaRegionLocator.getRegionLocations(replicaId, reload) + : nonMetaRegionLocator.getRegionLocations(tableName, row, replicaId, type, reload); + internalAddListener(future, locsFuture, tableName, row, replicaId, type); + }); + } return withTimeout(future, timeoutNs, () -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) + "ms) waiting for region location for " + tableName + ", row='" diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionLocatorWithRegionReplicaId.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionLocatorWithRegionReplicaId.java new file mode 100644 index 000000000000..2b82ef4a8d53 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionLocatorWithRegionReplicaId.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.IOUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; +import org.junit.rules.TestName; + +@Category({ MediumTests.class, ClientTests.class }) +public class TestAsyncTableRegionLocatorWithRegionReplicaId { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestAsyncTableRegionLocatorWithRegionReplicaId.class); + + @Rule + public TestName name = new TestName(); + + private ExpectedException exception = ExpectedException.none(); + + private final static HBaseTestingUtil UTIL = new HBaseTestingUtil(); + private static final String ROW = "r1"; + private static final byte[] FAMILY = Bytes.toBytes("info"); + private static final int REGION_REPLICATION_COUNT = 2; + // region replica id starts from 0 + private static final int NON_EXISTING_REGION_REPLICA_ID = REGION_REPLICATION_COUNT; + private static Connection connection; + private static AsyncConnection asyncConn; + private static Admin admin; + private TableName tableName; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + UTIL.startMiniCluster(1); + connection = UTIL.getConnection(); + asyncConn = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get(); + admin = UTIL.getAdmin(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + IOUtils.cleanup(null, admin); + IOUtils.cleanup(null, connection); + UTIL.shutdownMiniCluster(); + } + + @Before + public void setUp() throws Exception { + tableName = TableName.valueOf(name.getMethodName()); + ColumnFamilyDescriptor columnFamilyDescriptor = + ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).build(); + TableDescriptor tableDescriptor = + TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(columnFamilyDescriptor) + .setRegionReplication(REGION_REPLICATION_COUNT).build(); + admin.createTable(tableDescriptor); + UTIL.waitTableAvailable(tableName); + assertTrue(admin.tableExists(tableName)); + assertEquals(REGION_REPLICATION_COUNT, tableDescriptor.getRegionReplication()); + + List regions = UTIL.getHBaseCluster().getRegions(tableName); + assertEquals(REGION_REPLICATION_COUNT, regions.size()); + + Table table = connection.getTable(tableName); + Put put = new Put(Bytes.toBytes(ROW)).addColumn(FAMILY, Bytes.toBytes("q"), + Bytes.toBytes("test_value")); + table.put(put); + admin.flush(tableName); + + Scan scan = new Scan(); + ResultScanner rs = table.getScanner(scan); + rs.forEach(r -> assertEquals(ROW, Bytes.toString(r.getRow()))); + } + + @After + public void tearDown() throws Exception { + UTIL.deleteTable(tableName); + } + + @Test + public void testMetaTableRegionLocatorWithRegionReplicaId() + throws ExecutionException, InterruptedException { + AsyncTableRegionLocator locator = asyncConn.getRegionLocator(TableName.META_TABLE_NAME); + CompletableFuture future = + locator.getRegionLocation(tableName.getName(), RegionReplicaUtil.DEFAULT_REPLICA_ID, true); + HRegionLocation hrl = future.get(); + assertNotNull(hrl); + } + + @Test + public void testMetaTableRegionLocatorWithNonExistingRegionReplicaId() + throws InterruptedException { + AsyncTableRegionLocator locator = asyncConn.getRegionLocator(TableName.META_TABLE_NAME); + CompletableFuture future = + locator.getRegionLocation(tableName.getName(), NON_EXISTING_REGION_REPLICA_ID, true); + try { + future.get(); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof DoNotRetryIOException); + String message = "The specified region replica id " + NON_EXISTING_REGION_REPLICA_ID + + " does not exist, the REGION_REPLICATION of this table " + + TableName.META_TABLE_NAME.getNameAsString() + " is " + + TableDescriptorBuilder.DEFAULT_REGION_REPLICATION + ", " + + "this means that the maximum region replica id you can specify is " + + (TableDescriptorBuilder.DEFAULT_REGION_REPLICATION - 1) + "."; + assertEquals(message, e.getCause().getMessage()); + } + } + + @Test + public void testTableRegionLocatorWithRegionReplicaId() + throws ExecutionException, InterruptedException { + AsyncTableRegionLocator locator = asyncConn.getRegionLocator(tableName); + CompletableFuture future = + locator.getRegionLocation(Bytes.toBytes(ROW), RegionReplicaUtil.DEFAULT_REPLICA_ID, true); + HRegionLocation hrl = future.get(); + assertNotNull(hrl); + } + + @Test + public void testTableRegionLocatorWithNonExistingRegionReplicaId() throws InterruptedException { + AsyncTableRegionLocator locator = asyncConn.getRegionLocator(tableName); + CompletableFuture future = + locator.getRegionLocation(Bytes.toBytes(ROW), NON_EXISTING_REGION_REPLICA_ID, true); + try { + future.get(); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof DoNotRetryIOException); + String message = "The specified region replica id " + NON_EXISTING_REGION_REPLICA_ID + + " does not exist, the REGION_REPLICATION of this table " + tableName.getNameAsString() + + " is " + REGION_REPLICATION_COUNT + ", " + + "this means that the maximum region replica id you can specify is " + + (REGION_REPLICATION_COUNT - 1) + "."; + assertEquals(message, e.getCause().getMessage()); + } + } +}