From fba80c1b4548bac06fe4744be17cc82aa21856a4 Mon Sep 17 00:00:00 2001 From: Sean Busbey Date: Fri, 31 Jul 2020 02:34:24 -0500 Subject: [PATCH] HBASE-24805 HBaseTestingUtility.getConnection should be threadsafe --- .../hbase/HBaseCommonTestingUtility.java | 2 +- .../hadoop/hbase/HBaseTestingUtility.java | 66 ++++++++++--------- 2 files changed, 37 insertions(+), 31 deletions(-) diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/HBaseCommonTestingUtility.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/HBaseCommonTestingUtility.java index eb04cca410f5..487c926a1bc8 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/HBaseCommonTestingUtility.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/HBaseCommonTestingUtility.java @@ -69,7 +69,7 @@ public class HBaseCommonTestingUtility { Compression.Algorithm.NONE, Compression.Algorithm.GZ }; - protected Configuration conf; + protected final Configuration conf; public HBaseCommonTestingUtility() { this(null); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 0b5f8166410a..0c4f7fb9d857 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -25,6 +25,7 @@ import java.io.File; import java.io.IOException; import java.io.OutputStream; +import java.io.UncheckedIOException; import java.lang.reflect.Field; import java.lang.reflect.Modifier; import java.net.BindException; @@ -207,7 +208,7 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility { * HBaseTestingUtility*/ private Path dataTestDirOnTestFS = null; - private volatile AsyncClusterConnection asyncConnection; + private final AtomicReference asyncConnection = new AtomicReference<>(); /** Filesystem URI used for map-reduce mini-cluster setup */ private static String FS_URI; @@ -1237,14 +1238,7 @@ public void restartHBaseCluster(int servers, List ports) public void restartHBaseCluster(StartMiniClusterOption option) throws IOException, InterruptedException { - if (hbaseAdmin != null) { - hbaseAdmin.close(); - hbaseAdmin = null; - } - if (this.asyncConnection != null) { - this.asyncConnection.close(); - this.asyncConnection = null; - } + closeConnection(); this.hbaseCluster = new MiniHBaseCluster(this.conf, option.getNumMasters(), option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(), option.getMasterClass(), @@ -3041,11 +3035,6 @@ public HBaseCluster getHBaseClusterInterface() { return hbaseCluster; } - private void initConnection() throws IOException { - User user = UserProvider.instantiate(conf).getCurrent(); - this.asyncConnection = ClusterConnectionFactory.createAsyncClusterConnection(conf, null, user); - } - /** * Resets the connections so that the next time getConnection() is called, a new connection is * created. This is needed in cases where the entire cluster / all the masters are shutdown and @@ -3067,29 +3056,46 @@ public void invalidateConnection() throws IOException { } /** - * Get a Connection to the cluster. Not thread-safe (This class needs a lot of work to make it - * thread-safe). + * Get a shared Connection to the cluster. + * this method is threadsafe. * @return A Connection that can be shared. Don't close. Will be closed on shutdown of cluster. */ public Connection getConnection() throws IOException { - if (this.asyncConnection == null) { - initConnection(); - } - return this.asyncConnection.toConnection(); + return getAsyncConnection().toConnection(); } + /** + * Get a shared AsyncClusterConnection to the cluster. + * this method is threadsafe. + * @return An AsyncClusterConnection that can be shared. Don't close. Will be closed on shutdown of cluster. + */ public AsyncClusterConnection getAsyncConnection() throws IOException { - if (this.asyncConnection == null) { - initConnection(); + try { + return asyncConnection.updateAndGet(connection -> { + if (connection == null) { + try { + User user = UserProvider.instantiate(conf).getCurrent(); + connection = ClusterConnectionFactory.createAsyncClusterConnection(conf, null, user); + } catch(IOException ioe) { + throw new UncheckedIOException("Failed to create connection", ioe); + } + } + return connection; + }); + } catch (UncheckedIOException exception) { + throw exception.getCause(); } - return this.asyncConnection; } public void closeConnection() throws IOException { - Closeables.close(hbaseAdmin, true); - Closeables.close(asyncConnection, true); - this.hbaseAdmin = null; - this.asyncConnection = null; + if (hbaseAdmin != null) { + Closeables.close(hbaseAdmin, true); + hbaseAdmin = null; + } + AsyncClusterConnection asyncConnection = this.asyncConnection.getAndSet(null); + if (asyncConnection != null) { + Closeables.close(asyncConnection, true); + } } /** @@ -3252,7 +3258,7 @@ public String explainTableAvailability(TableName tableName) throws IOException { Map assignments = getHBaseCluster().getMaster().getAssignmentManager() .getRegionStates().getRegionAssignments(); final List> metaLocations = - MetaTableAccessor.getTableRegionsAndLocations(asyncConnection.toConnection(), tableName); + MetaTableAccessor.getTableRegionsAndLocations(getConnection(), tableName); for (Pair metaLocation : metaLocations) { RegionInfo hri = metaLocation.getFirst(); ServerName sn = metaLocation.getSecond(); @@ -3272,7 +3278,7 @@ public String explainTableAvailability(TableName tableName) throws IOException { public String explainTableState(final TableName table, TableState.State state) throws IOException { - TableState tableState = MetaTableAccessor.getTableState(asyncConnection.toConnection(), table); + TableState tableState = MetaTableAccessor.getTableState(getConnection(), table); if (tableState == null) { return "TableState in META: No table state in META for table " + table + " last state in meta (including deleted is " + findLastTableState(table) + ")"; @@ -3299,7 +3305,7 @@ public boolean visit(Result r) throws IOException { return true; } }; - MetaTableAccessor.scanMeta(asyncConnection.toConnection(), null, null, + MetaTableAccessor.scanMeta(getConnection(), null, null, ClientMetaTableAccessor.QueryType.TABLE, Integer.MAX_VALUE, visitor); return lastTableState.get(); }