diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java index 136d18a96187..c00e05557e3f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java @@ -109,7 +109,7 @@ private void removeLocationFromCache(HRegionLocation loc) { void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) { AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception, this::getCacheLocation, - this::addLocationToCache, this::removeLocationFromCache, null); + this::addLocationToCache, this::removeLocationFromCache, null, null); } void clearCache() { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java index 4c770dc914a3..670ae5fc6919 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java @@ -544,6 +544,13 @@ private void removeLocationFromCache(HRegionLocation loc) { } } + void removeServerLocationFromCache(HRegionLocation loc) { + if (clearCache(loc.getServerName())) { + conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheNumClearServer); + updateMetaReplicaSelector(loc); + } + } + private void updateMetaReplicaSelector(HRegionLocation loc) { // Tell metaReplicaSelector that the location is stale. It will create a stale entry // with timestamp internally. Next time the client looks up the same location, @@ -569,7 +576,8 @@ private HRegionLocation getCachedLocation(HRegionLocation loc) { void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) { Optional connectionMetrics = conn.getConnectionMetrics(); AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception, this::getCachedLocation, - this::addLocationToCache, this::removeLocationFromCache, connectionMetrics.orElse(null)); + this::addLocationToCache, this::removeLocationFromCache, this::removeServerLocationFromCache, + connectionMetrics.orElse(null)); } void clearCache(TableName tableName) { @@ -595,10 +603,12 @@ void clearCache() { cache.clear(); } - void clearCache(ServerName serverName) { + boolean clearCache(ServerName serverName) { + boolean removed = false; for (TableCache tableCache : cache.values()) { - tableCache.regionLocationCache.removeForServer(serverName); + removed |= tableCache.regionLocationCache.removeForServer(serverName); } + return removed; } // only used for testing whether we have cached the location for a region. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocationCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocationCache.java index ee5ccaf33d62..07c5f3f92593 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocationCache.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocationCache.java @@ -250,7 +250,8 @@ public int size() { * are empty after removing the server from it. * @param serverName server to remove from locations */ - public synchronized void removeForServer(ServerName serverName) { + public synchronized boolean removeForServer(ServerName serverName) { + boolean removed = false; for (Map.Entry entry : cache.entrySet()) { byte[] regionName = entry.getKey(); RegionLocations locs = entry.getValue(); @@ -259,10 +260,11 @@ public synchronized void removeForServer(ServerName serverName) { continue; } if (newLocs.isEmpty()) { - cache.remove(regionName, locs); + removed |= cache.remove(regionName, locs); } else { cache.put(regionName, newLocs); } } + return removed; } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java index cc0eccca6e29..839fbbb14809 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java @@ -20,6 +20,7 @@ import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.findException; import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.isMetaClearingException; +import java.net.ConnectException; import java.util.Arrays; import java.util.function.Consumer; import java.util.function.Function; @@ -27,6 +28,7 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.exceptions.RegionMovedException; +import org.apache.hadoop.hbase.ipc.CallTimeoutException; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,7 +60,7 @@ static boolean canUpdateOnError(HRegionLocation loc, HRegionLocation oldLoc) { static void updateCachedLocationOnError(HRegionLocation loc, Throwable exception, Function cachedLocationSupplier, Consumer addToCache, Consumer removeFromCache, - MetricsConnection metrics) { + Consumer removeServerFromCache, MetricsConnection metrics) { HRegionLocation oldLoc = cachedLocationSupplier.apply(loc); if (LOG.isDebugEnabled()) { LOG.debug("Try updating {} , the old value is {}, error={}", loc, oldLoc, @@ -84,6 +86,23 @@ static void updateCachedLocationOnError(HRegionLocation loc, Throwable exception LOG.debug("Try updating {} with the new location {} constructed by {}", loc, newLoc, rme.toString()); addToCache.accept(newLoc); + } + if ( + (cause instanceof CallTimeoutException || cause instanceof ConnectException) + && removeServerFromCache != null + ) { + // Clear all meta caches of the server on which hardware failure related exceptions occurred + // + // Those exceptions might be caused by a network or hardware issue of that server. + // So we might not be able to connect to that server for a while. + // If we don't clear the caches, we might get the same exceptions + // as many times as the number of location caches of that server. + LOG.debug("Try clearing all region locations of the server {} from cache", + loc.getServerName()); + if (metrics != null) { + metrics.incrCacheDroppingExceptions(exception); + } + removeServerFromCache.accept(loc); } else { LOG.debug("Try removing {} from cache", loc); if (metrics != null) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index eef82643b5b4..4f60eae56b0a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -35,6 +35,7 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.lang.reflect.UndeclaredThrowableException; +import java.net.ConnectException; import java.util.ArrayList; import java.util.Collections; import java.util.Date; @@ -73,6 +74,7 @@ import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; import org.apache.hadoop.hbase.exceptions.ConnectionClosedException; import org.apache.hadoop.hbase.exceptions.RegionMovedException; +import org.apache.hadoop.hbase.ipc.CallTimeoutException; import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcClientFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; @@ -2132,11 +2134,26 @@ public void updateCachedLocations(final TableName tableName, byte[] regionName, metaReplicaSelector.onError(oldLocation); } + // Clear all meta caches of the server on which hardware failure related exceptions occurred + // + // Those exceptions might be caused by a network or hardware issue of that server. + // So we might not be able to connect to that server for a while. + // If we don't clear the caches, we might get the same exceptions + // as many times as the number of location caches of that server. + if (cause instanceof CallTimeoutException || cause instanceof ConnectException) { + clearCache(source); + return; + } + // If we're here, it means that can cannot be sure about the location, so we remove it from // the cache. Do not send the source because source can be a new server in the same host:port metaCache.clearCache(regionInfo); } + void clearCache(ServerName serverName) { + metaCache.clearCache(serverName); + } + @Override public AsyncProcess getAsyncProcess() { return asyncProcess; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java index 24e1ade54fd8..84bed804bb23 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java @@ -25,14 +25,23 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import java.io.IOException; +import java.net.ConnectException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadLocalRandom; @@ -50,11 +59,13 @@ import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; import org.apache.hadoop.hbase.client.RegionReplicaTestHelper.Locator; +import org.apache.hadoop.hbase.ipc.CallTimeoutException; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; import org.junit.After; import org.junit.AfterClass; @@ -67,6 +78,8 @@ import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameter; +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.hbase.thirdparty.com.google.common.io.Closeables; @@ -542,4 +555,85 @@ private void checkRegions(AsyncConnectionImpl conn, List regions, Re } } } + + @Test + public void testRemoveCachedLocationForServerOnError() throws Exception { + // create multiple tables + createMultiRegionTable(); + TableName tableName2 = TableName.valueOf("async2"); + + try (Table ignored = TEST_UTIL.createTable(tableName2, FAMILY, SPLIT_KEYS)) { + TEST_UTIL.waitTableAvailable(tableName2); + + List allRegions1 = TEST_UTIL.getAdmin().getRegions(TABLE_NAME); + List allRegions2 = TEST_UTIL.getAdmin().getRegions(tableName2); + Map> tableToRegions = + new HashMap<>(ImmutableMap.of(TABLE_NAME, allRegions1, tableName2, allRegions2)); + + // cache all locations + for (Map.Entry> entry : tableToRegions.entrySet()) { + for (RegionInfo region : entry.getValue()) { + RegionLocations locs = locator.getRegionLocations(entry.getKey(), region.getStartKey(), + RegionReplicaUtil.DEFAULT_REPLICA_ID, RegionLocateType.CURRENT, false).get(); + assertNotNull(locs); + } + } + + // due to timing issue when CatalogReplicaMode is LOAD_BALANCE + Thread.sleep(100); + + // CallTimeoutException on a single region + HRegionLocation locOnError = + locator.getRegionLocationInCache(TABLE_NAME, allRegions1.get(0).getStartKey()) + .getDefaultRegionLocation(); + locator.updateCachedLocationOnError(locOnError, new CallTimeoutException("test")); + ServerName cacheRemovedServer = locOnError.getServerName(); + + // But all location cache on the same server should be removed + int cacheRemovedCount = 0; + int cacheRemainedCount = 0; + for (Map.Entry> entry : tableToRegions.entrySet()) { + for (RegionInfo region : entry.getValue()) { + RegionLocations locs = + locator.getRegionLocationInCache(entry.getKey(), region.getStartKey()); + if (locs == null) { + cacheRemovedCount++; + continue; + } + + HRegionLocation loc = locs.getDefaultRegionLocation(); + assertNotEquals(cacheRemovedServer, loc.getServerName()); + cacheRemainedCount++; + } + } + assertEquals(allRegions1.size() + allRegions2.size(), cacheRemovedCount + cacheRemainedCount); + } finally { + TEST_UTIL.deleteTableIfAny(tableName2); + } + } + + @Test + public void testCallRemoveServerLocationFromCache() + throws IOException, InterruptedException, ExecutionException { + createSingleRegionTable(); + AsyncNonMetaRegionLocator locator = spy(new AsyncNonMetaRegionLocator(conn)); + + // expectations: exception -> expected accumulated call times + List> expectations = + new ArrayList<>(ImmutableList.of(Pair.newPair(new CallTimeoutException("test1"), 1), + Pair.newPair(new ConnectException("test2"), 2), + Pair.newPair(new NotServingRegionException("test3"), 2))); + + for (Pair pair : expectations) { + Exception exception = pair.getFirst(); + Integer expectedCallTimes = pair.getSecond(); + + HRegionLocation loc = locator.getRegionLocations(TABLE_NAME, EMPTY_START_ROW, + RegionReplicaUtil.DEFAULT_REPLICA_ID, RegionLocateType.CURRENT, false).get() + .getDefaultRegionLocation(); + + locator.updateCachedLocationOnError(loc, exception); + verify(locator, times(expectedCallTimes)).removeServerLocationFromCache(any()); + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestConnectionImplementation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestConnectionImplementation.java index fd80682d7d3e..bdead2e9feeb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestConnectionImplementation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestConnectionImplementation.java @@ -19,17 +19,25 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import java.io.IOException; import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Modifier; +import java.net.ConnectException; import java.net.SocketTimeoutException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; @@ -50,6 +58,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; @@ -59,6 +68,7 @@ import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterBase; +import org.apache.hadoop.hbase.ipc.CallTimeoutException; import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.regionserver.HRegion; @@ -70,6 +80,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hbase.util.Threads; import org.junit.After; @@ -85,6 +96,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector; @@ -1212,4 +1225,85 @@ public void testCancelConnectionMemoryLeak() throws IOException, InterruptedExce Thread.sleep(1000); } } + + @Test + public void testRemoveCachedLocationForServerOnError() throws IOException { + byte[] cf = "c".getBytes(); + byte[][] splitKeys = new byte[8][]; + for (int i = 0; i < 8; i++) { + splitKeys[i] = ("row" + i + 1).getBytes(); + } + + try (Table table1 = TEST_UTIL.createTable(TABLE_NAME1, cf, splitKeys); + Table table2 = TEST_UTIL.createTable(TABLE_NAME2, cf, splitKeys)) { + // cache all region locations + List allRegions1 = table1.getRegionLocator().getAllRegionLocations(); + List allRegions2 = table2.getRegionLocator().getAllRegionLocations(); + + Map> tableToRegions = + new HashMap<>(ImmutableMap.of(TABLE_NAME1, allRegions1, TABLE_NAME2, allRegions2)); + + // verify that all regions are cached + ConnectionImplementation conn = (ConnectionImplementation) TEST_UTIL.getConnection(); + for (Map.Entry> entry : tableToRegions.entrySet()) { + for (HRegionLocation loc : entry.getValue()) { + assertNotNull(conn.getCachedLocation(entry.getKey(), loc.getRegion().getStartKey())); + } + } + + // CallTimeoutException on a single region + byte[] rowOnError = allRegions1.get(0).getRegion().getStartKey(); + HRegionLocation locOnError = conn.getRegionLocation(TABLE_NAME1, rowOnError, false); + conn.updateCachedLocations(TABLE_NAME1, locOnError.getRegion().getRegionName(), rowOnError, + new CallTimeoutException("test"), locOnError.getServerName()); + ServerName cacheRemovedServer = locOnError.getServerName(); + + // But all location cache on the same server should be removed + int cacheRemovedCount = 0; + int cacheRemainedCount = 0; + for (Map.Entry> entry : tableToRegions.entrySet()) { + for (HRegionLocation loc : entry.getValue()) { + RegionLocations locCached = + conn.getCachedLocation(entry.getKey(), loc.getRegion().getStartKey()); + if (locCached == null) { + cacheRemovedCount++; + continue; + } + + assertNotEquals(cacheRemovedServer, locCached.getRegionLocation().getServerName()); + cacheRemainedCount++; + } + } + assertEquals(allRegions1.size() + allRegions2.size(), cacheRemovedCount + cacheRemainedCount); + } finally { + TEST_UTIL.deleteTableIfAny(TABLE_NAME1); + TEST_UTIL.deleteTableIfAny(TABLE_NAME2); + } + } + + @Test + public void testCallRemoveCachedLocationForServerOnError() throws IOException { + try (Table ignored = TEST_UTIL.createTable(TABLE_NAME, "c".getBytes())) { + ConnectionImplementation conn = spy((ConnectionImplementation) TEST_UTIL.getConnection()); + + // expectations: exception -> expected accumulated call times + List> expectations = + new ArrayList<>(ImmutableList.of(Pair.newPair(new CallTimeoutException("test1"), 1), + Pair.newPair(new ConnectException("test2"), 2), + Pair.newPair(new NotServingRegionException("test3"), 2))); + + for (Pair pair : expectations) { + Exception exception = pair.getFirst(); + Integer expectedCallTimes = pair.getSecond(); + + byte[] rowOnError = "row".getBytes(); + HRegionLocation locOnError = conn.getRegionLocation(TABLE_NAME, rowOnError, false); + conn.updateCachedLocations(TABLE_NAME, locOnError.getRegion().getRegionName(), rowOnError, + exception, locOnError.getServerName()); + verify(conn, times(expectedCallTimes)).clearCache(any()); + } + } finally { + TEST_UTIL.deleteTableIfAny(TABLE_NAME); + } + } }