Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.base.Supplier;
import org.apache.hbase.thirdparty.com.google.common.base.Suppliers;
import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
Expand Down Expand Up @@ -169,6 +171,9 @@
@InterfaceAudience.Private
public class ConnectionImplementation implements ClusterConnection, Closeable {
public static final String RETRIES_BY_SERVER_KEY = "hbase.client.retries.by.server";

public static final String MASTER_STATE_CACHE_TIMEOUT_SEC =
"hbase.client.master.state.cache.timeout.sec";
private static final Logger LOG = LoggerFactory.getLogger(ConnectionImplementation.class);

// The mode tells if HedgedRead, LoadBalance mode is supported.
Expand Down Expand Up @@ -249,6 +254,12 @@ public class ConnectionImplementation implements ClusterConnection, Closeable {
/** lock guards against multiple threads trying to query the meta region at the same time */
private final ReentrantLock userRegionLock = new ReentrantLock();

/**
* Supplier to get masterState.By default uses simple supplier without TTL cache. When
* hbase.client.master.state.cache.timeout.sec > 0 it uses TTL Cache.
*/
private final Supplier<Boolean> masterStateSupplier;

private ChoreService choreService;

/**
Expand Down Expand Up @@ -382,6 +393,39 @@ public void newDead(ServerName sn) {
default:
// Doing nothing
}

long masterStateCacheTimeout = conf.getLong(MASTER_STATE_CACHE_TIMEOUT_SEC, 0);

Supplier<Boolean> masterConnSupplier = masterConnectionStateSupplier();
if (masterStateCacheTimeout <= 0L) {
this.masterStateSupplier = masterConnSupplier;
} else {
this.masterStateSupplier = Suppliers.memoizeWithExpiration(masterConnSupplier,
masterStateCacheTimeout, TimeUnit.SECONDS);
}
}

/**
* Visible for tests
*/
Supplier<Boolean> masterConnectionStateSupplier() {
return () -> {
if (this.masterServiceState.getStub() == null) {
return false;
}
try {
LOG.info("Getting master state using rpc call");
return this.masterServiceState.isMasterRunning();
} catch (UndeclaredThrowableException e) {
// It's somehow messy, but we can receive exceptions such as
// java.net.ConnectException but they're not declared. So we catch it...
LOG.info("Master connection is not running anymore", e.getUndeclaredThrowable());
return false;
} catch (IOException se) {
LOG.warn("Checking master connection", se);
return false;
}
};
}

private void spawnRenewalChore(final UserGroupInformation user) {
Expand Down Expand Up @@ -1255,7 +1299,6 @@ public void addError() {
* Class to make a MasterServiceStubMaker stub.
*/
private final class MasterServiceStubMaker {

private void isMasterRunning(MasterProtos.MasterService.BlockingInterface stub)
throws IOException {
try {
Expand Down Expand Up @@ -1355,6 +1398,13 @@ public BlockingInterface getClient(ServerName serverName) throws IOException {

final MasterServiceState masterServiceState = new MasterServiceState(this);

/**
* Visible for tests
*/
MasterServiceState getMasterServiceState() {
return this.masterServiceState;
}

@Override
public MasterKeepAliveConnection getMaster() throws IOException {
return getKeepAliveMasterService();
Expand All @@ -1365,13 +1415,16 @@ private void resetMasterServiceState(final MasterServiceState mss) {
}

private MasterKeepAliveConnection getKeepAliveMasterService() throws IOException {
synchronized (masterLock) {
if (!isKeepAliveMasterConnectedAndRunning(this.masterServiceState)) {
MasterServiceStubMaker stubMaker = new MasterServiceStubMaker();
this.masterServiceState.stub = stubMaker.makeStub();
if (!isKeepAliveMasterConnectedAndRunning()) {
synchronized (masterLock) {
if (!isKeepAliveMasterConnectedAndRunning()) {
MasterServiceStubMaker stubMaker = new MasterServiceStubMaker();
this.masterServiceState.stub = stubMaker.makeStub();
}
resetMasterServiceState(this.masterServiceState);
}
resetMasterServiceState(this.masterServiceState);
}

// Ugly delegation just so we can add in a Close method.
final MasterProtos.MasterService.BlockingInterface stub = this.masterServiceState.stub;
return new MasterKeepAliveConnection() {
Expand Down Expand Up @@ -1944,21 +1997,9 @@ private static void release(MasterServiceState mss) {
}
}

private boolean isKeepAliveMasterConnectedAndRunning(MasterServiceState mss) {
if (mss.getStub() == null) {
return false;
}
try {
return mss.isMasterRunning();
} catch (UndeclaredThrowableException e) {
// It's somehow messy, but we can receive exceptions such as
// java.net.ConnectException but they're not declared. So we catch it...
LOG.info("Master connection is not running anymore", e.getUndeclaredThrowable());
return false;
} catch (IOException se) {
LOG.warn("Checking master connection", se);
return false;
}
private boolean isKeepAliveMasterConnectedAndRunning() {
LOG.info("Getting master connection state from TTL Cache");
return masterStateSupplier.get();
}

void releaseMaster(MasterServiceState mss) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.UndeclaredThrowableException;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.IntegrationTestingUtility;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

@Category({ ClientTests.class, MediumTests.class })
@RunWith(MockitoJUnitRunner.class)
public class TestConnectionImplementation {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestConnectionImplementation.class);
private static final IntegrationTestingUtility TEST_UTIL = new IntegrationTestingUtility();

@BeforeClass
public static void beforeClass() throws Exception {
TEST_UTIL.startMiniCluster(1);
}

@AfterClass
public static void afterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}

@Test
public void testGetMaster_noCachedMasterState() throws IOException, IllegalAccessException {
Configuration conf = TEST_UTIL.getConfiguration();
conf.setLong(ConnectionImplementation.MASTER_STATE_CACHE_TIMEOUT_SEC, 0L);
ConnectionImplementation conn =
new ConnectionImplementation(conf, null, UserProvider.instantiate(conf).getCurrent());
ConnectionImplementation.MasterServiceState masterServiceState = spyMasterServiceState(conn);
conn.getMaster(); // This initializes the stubs but don't call isMasterRunning
conn.getMaster(); // Calls isMasterRunning since stubs are initialized. Invocation 1
conn.getMaster(); // Calls isMasterRunning since stubs are initialized. Invocation 2
Mockito.verify(masterServiceState, Mockito.times(2)).isMasterRunning();
conn.close();
}

@Test
public void testGetMaster_masterStateCacheHit() throws IOException, IllegalAccessException {
Configuration conf = TEST_UTIL.getConfiguration();
conf.setLong(ConnectionImplementation.MASTER_STATE_CACHE_TIMEOUT_SEC, 15L);
ConnectionImplementation conn =
new ConnectionImplementation(conf, null, UserProvider.instantiate(conf).getCurrent());
ConnectionImplementation.MasterServiceState masterServiceState = spyMasterServiceState(conn);
conn.getMaster(); // This initializes the stubs but don't call isMasterRunning
conn.getMaster(); // Uses cached value, don't call isMasterRunning
conn.getMaster(); // Uses cached value, don't call isMasterRunning
Mockito.verify(masterServiceState, Mockito.times(0)).isMasterRunning();
conn.close();
}

@Test
public void testGetMaster_masterStateCacheMiss()
throws IOException, InterruptedException, IllegalAccessException {
Configuration conf = TEST_UTIL.getConfiguration();
conf.setLong(ConnectionImplementation.MASTER_STATE_CACHE_TIMEOUT_SEC, 5L);
ConnectionImplementation conn =
new ConnectionImplementation(conf, null, UserProvider.instantiate(conf).getCurrent());
ConnectionImplementation.MasterServiceState masterServiceState = spyMasterServiceState(conn);
conn.getMaster(); // This initializes the stubs but don't call isMasterRunning
conn.getMaster(); // Uses cached value, don't call isMasterRunning
conn.getMaster(); // Uses cached value, don't call isMasterRunning
Thread.sleep(10000);
conn.getMaster(); // Calls isMasterRunning after cache expiry
Mockito.verify(masterServiceState, Mockito.times(1)).isMasterRunning();
conn.close();
}

@Test
public void testIsKeepAliveMasterConnectedAndRunning_UndeclaredThrowableException()
throws IOException, IllegalAccessException, NoSuchMethodException, InvocationTargetException {
Configuration conf = TEST_UTIL.getConfiguration();
conf.setLong(ConnectionImplementation.MASTER_STATE_CACHE_TIMEOUT_SEC, 0);
ConnectionImplementation conn =
new ConnectionImplementation(conf, null, UserProvider.instantiate(conf).getCurrent());
conn.getMaster(); // Initializes stubs

ConnectionImplementation.MasterServiceState masterServiceState = spyMasterServiceState(conn);
Mockito.doThrow(new UndeclaredThrowableException(new Exception("DUMMY EXCEPTION")))
.when(masterServiceState).isMasterRunning();

// Verify that masterState is "false" because of to injected exception
boolean isKeepAliveMasterRunning =
(boolean) getIsKeepAliveMasterConnectedAndRunningMethod().invoke(conn);
Assert.assertFalse(isKeepAliveMasterRunning);
conn.close();
}

@Test
public void testIsKeepAliveMasterConnectedAndRunning_IOException()
throws IOException, IllegalAccessException, NoSuchMethodException, InvocationTargetException {
Configuration conf = TEST_UTIL.getConfiguration();
conf.setLong(ConnectionImplementation.MASTER_STATE_CACHE_TIMEOUT_SEC, 0);
ConnectionImplementation conn =
new ConnectionImplementation(conf, null, UserProvider.instantiate(conf).getCurrent());
conn.getMaster();

ConnectionImplementation.MasterServiceState masterServiceState = spyMasterServiceState(conn);
Mockito.doThrow(new IOException("DUMMY EXCEPTION")).when(masterServiceState).isMasterRunning();

boolean isKeepAliveMasterRunning =
(boolean) getIsKeepAliveMasterConnectedAndRunningMethod().invoke(conn);

// Verify that masterState is "false" because of to injected exception
Assert.assertFalse(isKeepAliveMasterRunning);
conn.close();
}

// Spy the masterServiceState object using reflection
private ConnectionImplementation.MasterServiceState
spyMasterServiceState(ConnectionImplementation conn) throws IllegalAccessException {
ConnectionImplementation.MasterServiceState spiedMasterServiceState =
Mockito.spy(conn.getMasterServiceState());
FieldUtils.writeDeclaredField(conn, "masterServiceState", spiedMasterServiceState, true);
return spiedMasterServiceState;
}

// Get isKeepAliveMasterConnectedAndRunning using reflection
private Method getIsKeepAliveMasterConnectedAndRunningMethod() throws NoSuchMethodException {
Method method =
ConnectionImplementation.class.getDeclaredMethod("isKeepAliveMasterConnectedAndRunning");
method.setAccessible(true);
return method;
}
}