From c6851260a6cea0148be6dca5acb1d74a74aadd90 Mon Sep 17 00:00:00 2001 From: Sean Busbey Date: Fri, 31 Jul 2020 02:34:24 -0500 Subject: [PATCH 1/2] HBASE-24805 HBaseTestingUtility.getConnection should be threadsafe * refactor how we use connection to rely on the access method * refactor initialization and cleanup of the shared connection * incompatibly change HCTU's Configuration member variable to be final so it can be safely accessed from multiple threads. Closes #2180 adapted for jdk7 (cherry picked from commit 86ebbdd8a2df89de37c2c3bd50e64292eaf28b11) (cherry picked from commit 0806349adab338330428c900588234d7f6fcfcc2) --- .../hbase/HBaseCommonTestingUtility.java | 2 +- .../hadoop/hbase/HBaseTestingUtility.java | 56 +++++++++++++------ 2 files changed, 39 insertions(+), 19 deletions(-) diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/HBaseCommonTestingUtility.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/HBaseCommonTestingUtility.java index 19a9ac290b5f..1871d11e1d87 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/HBaseCommonTestingUtility.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/HBaseCommonTestingUtility.java @@ -40,7 +40,7 @@ public class HBaseCommonTestingUtility { protected static final Log LOG = LogFactory.getLog(HBaseCommonTestingUtility.class); - protected Configuration conf; + protected final Configuration conf; public HBaseCommonTestingUtility() { this(null); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 90ed49ccb4fa..fffaf6701683 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -46,6 +46,7 @@ import java.util.Set; import java.util.TreeSet; import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.TimeUnit; import org.apache.commons.io.FileUtils; @@ -190,10 +191,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * HBaseTestingUtility*/ private Path dataTestDirOnTestFS = null; - /** - * Shared cluster connection. - */ - private volatile Connection connection; + private final AtomicReference connection = new AtomicReference<>(); /** * System property key to get test directory value. @@ -1170,10 +1168,6 @@ public MiniHBaseCluster getMiniHBaseCluster() { */ public void shutdownMiniCluster() throws Exception { LOG.info("Shutting down minicluster"); - if (this.connection != null && !this.connection.isClosed()) { - this.connection.close(); - this.connection = null; - } shutdownMiniHBaseCluster(); if (!this.passedZkCluster){ shutdownMiniZKCluster(); @@ -1203,10 +1197,7 @@ public boolean cleanupTestDir() { * @throws IOException */ public void shutdownMiniHBaseCluster() throws IOException { - if (hbaseAdmin != null) { - hbaseAdmin.close0(); - hbaseAdmin = null; - } + closeConnection(); // unset the configuration for MIN and MAX RS to start conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1); @@ -3020,16 +3011,26 @@ public HBaseCluster getHBaseClusterInterface() { } /** - * Get a Connection to the cluster. - * Not thread-safe (This class needs a lot of work to make it thread-safe). + * Get a shared Connection to the cluster. + * this method is threadsafe. * @return A Connection that can be shared. Don't close. Will be closed on shutdown of cluster. * @throws IOException */ public Connection getConnection() throws IOException { - if (this.connection == null) { - this.connection = ConnectionFactory.createConnection(this.conf); + Connection connection = this.connection.get(); + while (connection == null) { + connection = ConnectionFactory.createConnection(this.conf); + if (! this.connection.compareAndSet(null, connection)) { + try { + connection.close(); + } catch (IOException exception) { + LOG.debug("Ignored failure while closing connection on contended connection creation.", + exception); + } + connection = this.connection.get(); + } } - return this.connection; + return connection; } /** @@ -3067,6 +3068,25 @@ private synchronized void close0() throws IOException { } } + public void closeConnection() throws IOException { + if (hbaseAdmin != null) { + try { + hbaseAdmin.close0(); + } catch (IOException exception) { + LOG.debug("Ignored failure while closing admin.", exception); + } + hbaseAdmin = null; + } + Connection connection = this.connection.getAndSet(null); + if (connection != null) { + try { + connection.close(); + } catch (IOException exception) { + LOG.debug("Ignored failure while closing connection.", exception); + } + } + } + /** * Returns a ZooKeeperWatcher instance. * This instance is shared between HBaseTestingUtility instance users. @@ -3240,7 +3260,7 @@ public String explainTableAvailability(TableName tableName) throws IOException { .getRegionAssignments(); final List> metaLocations = MetaTableAccessor - .getTableRegionsAndLocations(getZooKeeperWatcher(), connection, tableName); + .getTableRegionsAndLocations(getZooKeeperWatcher(), getConnection(), tableName); for (Pair metaLocation : metaLocations) { HRegionInfo hri = metaLocation.getFirst(); ServerName sn = metaLocation.getSecond(); From 6224ba39ceb82cd77d949f614e6aa99311db8ea1 Mon Sep 17 00:00:00 2001 From: Sean Busbey Date: Mon, 3 Aug 2020 13:08:09 -0500 Subject: [PATCH 2/2] HBASE-24805 review feedback: use a different name for the atomic ref to the shared connection. --- .../org/apache/hadoop/hbase/HBaseTestingUtility.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index fffaf6701683..3335b493aaa9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -191,7 +191,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * HBaseTestingUtility*/ private Path dataTestDirOnTestFS = null; - private final AtomicReference connection = new AtomicReference<>(); + private final AtomicReference connectionRef = new AtomicReference<>(); /** * System property key to get test directory value. @@ -3017,17 +3017,17 @@ public HBaseCluster getHBaseClusterInterface() { * @throws IOException */ public Connection getConnection() throws IOException { - Connection connection = this.connection.get(); + Connection connection = this.connectionRef.get(); while (connection == null) { connection = ConnectionFactory.createConnection(this.conf); - if (! this.connection.compareAndSet(null, connection)) { + if (! this.connectionRef.compareAndSet(null, connection)) { try { connection.close(); } catch (IOException exception) { LOG.debug("Ignored failure while closing connection on contended connection creation.", exception); } - connection = this.connection.get(); + connection = this.connectionRef.get(); } } return connection; @@ -3077,7 +3077,7 @@ public void closeConnection() throws IOException { } hbaseAdmin = null; } - Connection connection = this.connection.getAndSet(null); + Connection connection = this.connectionRef.getAndSet(null); if (connection != null) { try { connection.close();