Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ private void removeLocationFromCache(HRegionLocation loc) {

void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception, this::getCacheLocation,
this::addLocationToCache, this::removeLocationFromCache, null);
this::addLocationToCache, this::removeLocationFromCache, null, null);
}

void clearCache() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,13 @@ private void removeLocationFromCache(HRegionLocation loc) {
}
}

void removeServerLocationFromCache(HRegionLocation loc) {
if (clearCache(loc.getServerName())) {
conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheNumClearServer);
updateMetaReplicaSelector(loc);
}
}

private void updateMetaReplicaSelector(HRegionLocation loc) {
// Tell metaReplicaSelector that the location is stale. It will create a stale entry
// with timestamp internally. Next time the client looks up the same location,
Expand All @@ -569,7 +576,8 @@ private HRegionLocation getCachedLocation(HRegionLocation loc) {
void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
Optional<MetricsConnection> connectionMetrics = conn.getConnectionMetrics();
AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception, this::getCachedLocation,
this::addLocationToCache, this::removeLocationFromCache, connectionMetrics.orElse(null));
this::addLocationToCache, this::removeLocationFromCache, this::removeServerLocationFromCache,
connectionMetrics.orElse(null));
}

void clearCache(TableName tableName) {
Expand All @@ -595,10 +603,12 @@ void clearCache() {
cache.clear();
}

void clearCache(ServerName serverName) {
boolean clearCache(ServerName serverName) {
boolean removed = false;
for (TableCache tableCache : cache.values()) {
tableCache.regionLocationCache.removeForServer(serverName);
removed |= tableCache.regionLocationCache.removeForServer(serverName);
}
return removed;
}

// only used for testing whether we have cached the location for a region.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,8 @@ public int size() {
* are empty after removing the server from it.
* @param serverName server to remove from locations
*/
public synchronized void removeForServer(ServerName serverName) {
public synchronized boolean removeForServer(ServerName serverName) {
boolean removed = false;
for (Map.Entry<byte[], RegionLocations> entry : cache.entrySet()) {
byte[] regionName = entry.getKey();
RegionLocations locs = entry.getValue();
Expand All @@ -259,10 +260,11 @@ public synchronized void removeForServer(ServerName serverName) {
continue;
}
if (newLocs.isEmpty()) {
cache.remove(regionName, locs);
removed |= cache.remove(regionName, locs);
} else {
cache.put(regionName, newLocs);
}
}
return removed;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@
import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.findException;
import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.isMetaClearingException;

import java.net.ConnectException;
import java.util.Arrays;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.ipc.CallTimeoutException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -58,7 +60,7 @@ static boolean canUpdateOnError(HRegionLocation loc, HRegionLocation oldLoc) {
static void updateCachedLocationOnError(HRegionLocation loc, Throwable exception,
Function<HRegionLocation, HRegionLocation> cachedLocationSupplier,
Consumer<HRegionLocation> addToCache, Consumer<HRegionLocation> removeFromCache,
MetricsConnection metrics) {
Consumer<HRegionLocation> removeServerFromCache, MetricsConnection metrics) {
HRegionLocation oldLoc = cachedLocationSupplier.apply(loc);
if (LOG.isDebugEnabled()) {
LOG.debug("Try updating {} , the old value is {}, error={}", loc, oldLoc,
Expand All @@ -84,6 +86,23 @@ static void updateCachedLocationOnError(HRegionLocation loc, Throwable exception
LOG.debug("Try updating {} with the new location {} constructed by {}", loc, newLoc,
rme.toString());
addToCache.accept(newLoc);
}
if (
(cause instanceof CallTimeoutException || cause instanceof ConnectException)
&& removeServerFromCache != null
) {
// Clear all meta caches of the server on which hardware failure related exceptions occurred
//
// Those exceptions might be caused by a network or hardware issue of that server.
// So we might not be able to connect to that server for a while.
// If we don't clear the caches, we might get the same exceptions
// as many times as the number of location caches of that server.
LOG.debug("Try clearing all region locations of the server {} from cache",
loc.getServerName());
if (metrics != null) {
metrics.incrCacheDroppingExceptions(exception);
}
removeServerFromCache.accept(loc);
} else {
LOG.debug("Try removing {} from cache", loc);
if (metrics != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.ConnectException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
Expand Down Expand Up @@ -73,6 +74,7 @@
import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
import org.apache.hadoop.hbase.exceptions.ConnectionClosedException;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.ipc.CallTimeoutException;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
Expand Down Expand Up @@ -2132,11 +2134,26 @@ public void updateCachedLocations(final TableName tableName, byte[] regionName,
metaReplicaSelector.onError(oldLocation);
}

// Clear all meta caches of the server on which hardware failure related exceptions occurred
//
// Those exceptions might be caused by a network or hardware issue of that server.
// So we might not be able to connect to that server for a while.
// If we don't clear the caches, we might get the same exceptions
// as many times as the number of location caches of that server.
if (cause instanceof CallTimeoutException || cause instanceof ConnectException) {
clearCache(source);
return;
}

// If we're here, it means that can cannot be sure about the location, so we remove it from
// the cache. Do not send the source because source can be a new server in the same host:port
metaCache.clearCache(regionInfo);
}

void clearCache(ServerName serverName) {
metaCache.clearCache(serverName);
}

@Override
public AsyncProcess getAsyncProcess() {
return asyncProcess;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,23 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.io.IOException;
import java.net.ConnectException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
Expand All @@ -50,11 +59,13 @@
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.client.RegionReplicaTestHelper.Locator;
import org.apache.hadoop.hbase.ipc.CallTimeoutException;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.junit.After;
import org.junit.AfterClass;
Expand All @@ -67,6 +78,8 @@
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;

import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;

Expand Down Expand Up @@ -542,4 +555,85 @@ private void checkRegions(AsyncConnectionImpl conn, List<RegionInfo> regions, Re
}
}
}

@Test
public void testRemoveCachedLocationForServerOnError() throws Exception {
// create multiple tables
createMultiRegionTable();
TableName tableName2 = TableName.valueOf("async2");

try (Table ignored = TEST_UTIL.createTable(tableName2, FAMILY, SPLIT_KEYS)) {
TEST_UTIL.waitTableAvailable(tableName2);

List<RegionInfo> allRegions1 = TEST_UTIL.getAdmin().getRegions(TABLE_NAME);
List<RegionInfo> allRegions2 = TEST_UTIL.getAdmin().getRegions(tableName2);
Map<TableName, List<RegionInfo>> tableToRegions =
new HashMap<>(ImmutableMap.of(TABLE_NAME, allRegions1, tableName2, allRegions2));

// cache all locations
for (Map.Entry<TableName, List<RegionInfo>> entry : tableToRegions.entrySet()) {
for (RegionInfo region : entry.getValue()) {
RegionLocations locs = locator.getRegionLocations(entry.getKey(), region.getStartKey(),
RegionReplicaUtil.DEFAULT_REPLICA_ID, RegionLocateType.CURRENT, false).get();
assertNotNull(locs);
}
}

// due to timing issue when CatalogReplicaMode is LOAD_BALANCE
Thread.sleep(100);

// CallTimeoutException on a single region
HRegionLocation locOnError =
locator.getRegionLocationInCache(TABLE_NAME, allRegions1.get(0).getStartKey())
.getDefaultRegionLocation();
locator.updateCachedLocationOnError(locOnError, new CallTimeoutException("test"));
ServerName cacheRemovedServer = locOnError.getServerName();

// But all location cache on the same server should be removed
int cacheRemovedCount = 0;
int cacheRemainedCount = 0;
for (Map.Entry<TableName, List<RegionInfo>> entry : tableToRegions.entrySet()) {
for (RegionInfo region : entry.getValue()) {
RegionLocations locs =
locator.getRegionLocationInCache(entry.getKey(), region.getStartKey());
if (locs == null) {
cacheRemovedCount++;
continue;
}

HRegionLocation loc = locs.getDefaultRegionLocation();
assertNotEquals(cacheRemovedServer, loc.getServerName());
cacheRemainedCount++;
}
}
assertEquals(allRegions1.size() + allRegions2.size(), cacheRemovedCount + cacheRemainedCount);
} finally {
TEST_UTIL.deleteTableIfAny(tableName2);
}
}

@Test
public void testCallRemoveServerLocationFromCache()
throws IOException, InterruptedException, ExecutionException {
createSingleRegionTable();
AsyncNonMetaRegionLocator locator = spy(new AsyncNonMetaRegionLocator(conn));

// expectations: exception -> expected accumulated call times
List<Pair<Exception, Integer>> expectations =
new ArrayList<>(ImmutableList.of(Pair.newPair(new CallTimeoutException("test1"), 1),
Pair.newPair(new ConnectException("test2"), 2),
Pair.newPair(new NotServingRegionException("test3"), 2)));

for (Pair<Exception, Integer> pair : expectations) {
Exception exception = pair.getFirst();
Integer expectedCallTimes = pair.getSecond();

HRegionLocation loc = locator.getRegionLocations(TABLE_NAME, EMPTY_START_ROW,
RegionReplicaUtil.DEFAULT_REPLICA_ID, RegionLocateType.CURRENT, false).get()
.getDefaultRegionLocation();

locator.updateCachedLocationOnError(loc, exception);
verify(locator, times(expectedCallTimes)).removeServerLocationFromCache(any());
}
}
}
Loading