Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public class HBaseCommonTestingUtility {
Compression.Algorithm.NONE, Compression.Algorithm.GZ
};

protected Configuration conf;
protected final Configuration conf;
Copy link
Contributor Author

@busbey busbey Jul 31, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing this variable to final breaks binary compatibility, but is needed to make access to this field threadsafe.

I could maintain binary compatibility by making a new final variable called "originalConf" or something like that which is used by our own internals. but that would still cause a behavior change because if someone made a subclass of these classes that assigned to the conf variable we would not pay any attention to that change.

I think we should just release note the break, especially given the on-going issue of "should we be labeling these things IA.Public yet?"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should just release note the break, especially given the on-going issue of "should we be labeling these things IA.Public yet?"

Agreed with putting description on the release notes. And feel weird that these are IA Public, should it be changed to IA Private here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should it be changed to IA Private here?

no, there 's a whole back and forth community discussion that has been on-going in bits and pieces for years. the short version is that we definitely need something downstream facing for running tests of hbase client code, but no one has provided a clean way to separate that out from how we test our own stuff.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/hbase client code/code that uses hbase client

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are anyways going to release note the breaking change, can we also make this field private? Downstream extended classes will have to mandatorily start using super constructor for setting conf anyways and now that it is final, we can also make it private.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we make use of this across our internal subclasses of HBaseCommonTestingUtility so if we made it private instead of protected we'd need to add an accessor.


public HBaseCommonTestingUtility() {
this(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.net.BindException;
Expand Down Expand Up @@ -207,7 +208,7 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
* HBaseTestingUtility*/
private Path dataTestDirOnTestFS = null;

private volatile AsyncClusterConnection asyncConnection;
private final AtomicReference<AsyncClusterConnection> asyncConnection = new AtomicReference<>();

/** Filesystem URI used for map-reduce mini-cluster setup */
private static String FS_URI;
Expand Down Expand Up @@ -1237,14 +1238,7 @@ public void restartHBaseCluster(int servers, List<Integer> ports)

public void restartHBaseCluster(StartMiniClusterOption option)
throws IOException, InterruptedException {
if (hbaseAdmin != null) {
hbaseAdmin.close();
hbaseAdmin = null;
}
if (this.asyncConnection != null) {
this.asyncConnection.close();
this.asyncConnection = null;
}
closeConnection();
this.hbaseCluster =
new MiniHBaseCluster(this.conf, option.getNumMasters(), option.getNumAlwaysStandByMasters(),
option.getNumRegionServers(), option.getRsPorts(), option.getMasterClass(),
Expand Down Expand Up @@ -3041,11 +3035,6 @@ public HBaseCluster getHBaseClusterInterface() {
return hbaseCluster;
}

private void initConnection() throws IOException {
User user = UserProvider.instantiate(conf).getCurrent();
this.asyncConnection = ClusterConnectionFactory.createAsyncClusterConnection(conf, null, user);
}

/**
* Resets the connections so that the next time getConnection() is called, a new connection is
* created. This is needed in cases where the entire cluster / all the masters are shutdown and
Expand All @@ -3067,29 +3056,46 @@ public void invalidateConnection() throws IOException {
}

/**
* Get a Connection to the cluster. Not thread-safe (This class needs a lot of work to make it
* thread-safe).
* Get a shared Connection to the cluster.
* this method is threadsafe.
* @return A Connection that can be shared. Don't close. Will be closed on shutdown of cluster.
*/
public Connection getConnection() throws IOException {
if (this.asyncConnection == null) {
initConnection();
}
return this.asyncConnection.toConnection();
return getAsyncConnection().toConnection();
}

/**
* Get a shared AsyncClusterConnection to the cluster.
* this method is threadsafe.
* @return An AsyncClusterConnection that can be shared. Don't close. Will be closed on shutdown of cluster.
*/
public AsyncClusterConnection getAsyncConnection() throws IOException {
if (this.asyncConnection == null) {
initConnection();
try {
return asyncConnection.updateAndGet(connection -> {
if (connection == null) {
try {
User user = UserProvider.instantiate(conf).getCurrent();
connection = ClusterConnectionFactory.createAsyncClusterConnection(conf, null, user);
} catch(IOException ioe) {
throw new UncheckedIOException("Failed to create connection", ioe);
}
}
return connection;
});
} catch (UncheckedIOException exception) {
throw exception.getCause();
}
return this.asyncConnection;
}

public void closeConnection() throws IOException {
Closeables.close(hbaseAdmin, true);
Closeables.close(asyncConnection, true);
this.hbaseAdmin = null;
this.asyncConnection = null;
if (hbaseAdmin != null) {
Closeables.close(hbaseAdmin, true);
hbaseAdmin = null;
}
AsyncClusterConnection asyncConnection = this.asyncConnection.getAndSet(null);
if (asyncConnection != null) {
Closeables.close(asyncConnection, true);
}
}

/**
Expand Down Expand Up @@ -3252,7 +3258,7 @@ public String explainTableAvailability(TableName tableName) throws IOException {
Map<RegionInfo, ServerName> assignments = getHBaseCluster().getMaster().getAssignmentManager()
.getRegionStates().getRegionAssignments();
final List<Pair<RegionInfo, ServerName>> metaLocations =
MetaTableAccessor.getTableRegionsAndLocations(asyncConnection.toConnection(), tableName);
MetaTableAccessor.getTableRegionsAndLocations(getConnection(), tableName);
for (Pair<RegionInfo, ServerName> metaLocation : metaLocations) {
RegionInfo hri = metaLocation.getFirst();
ServerName sn = metaLocation.getSecond();
Expand All @@ -3272,7 +3278,7 @@ public String explainTableAvailability(TableName tableName) throws IOException {

public String explainTableState(final TableName table, TableState.State state)
throws IOException {
TableState tableState = MetaTableAccessor.getTableState(asyncConnection.toConnection(), table);
TableState tableState = MetaTableAccessor.getTableState(getConnection(), table);
if (tableState == null) {
return "TableState in META: No table state in META for table " + table +
" last state in meta (including deleted is " + findLastTableState(table) + ")";
Expand All @@ -3299,7 +3305,7 @@ public boolean visit(Result r) throws IOException {
return true;
}
};
MetaTableAccessor.scanMeta(asyncConnection.toConnection(), null, null,
MetaTableAccessor.scanMeta(getConnection(), null, null,
ClientMetaTableAccessor.QueryType.TABLE, Integer.MAX_VALUE, visitor);
return lastTableState.get();
}
Expand Down