Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.AuthUtil;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
Expand Down Expand Up @@ -243,10 +242,6 @@ ClientService.Interface getRegionServerStub(ServerName serverName) throws IOExce
() -> createRegionServerStub(serverName));
}

private MasterService.Interface createMasterStub(ServerName serverName) throws IOException {
return MasterService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
}

private AdminService.Interface createAdminServerStub(ServerName serverName) throws IOException {
return AdminService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
}
Expand All @@ -258,26 +253,8 @@ AdminService.Interface getAdminStub(ServerName serverName) throws IOException {
}

CompletableFuture<MasterService.Interface> getMasterStub() {
return ConnectionUtils.getOrFetch(masterStub, masterStubMakeFuture, false, () -> {
CompletableFuture<MasterService.Interface> future = new CompletableFuture<>();
addListener(registry.getActiveMaster(), (addr, error) -> {
if (error != null) {
future.completeExceptionally(error);
} else if (addr == null) {
future.completeExceptionally(new MasterNotRunningException(
"ZooKeeper available but no active master location found"));
} else {
LOG.debug("The fetched master address is {}", addr);
try {
future.complete(createMasterStub(addr));
} catch (IOException e) {
future.completeExceptionally(e);
}
}

});
return future;
}, stub -> true, "master stub");
return ConnectionUtils.getMasterStub(registry, masterStub, masterStubMakeFuture, rpcClient,
user, rpcTimeout, TimeUnit.MILLISECONDS, MasterService::newStub, "MasterService");
}

String getClusterId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,137 +19,40 @@

import static org.apache.hadoop.hbase.util.FutureUtils.addListener;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.CellComparatorImpl;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService.Interface;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetAllMetaRegionLocationsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.LocateMetaRegionRequest;

/**
* The class for locating region for meta table.
*/
@InterfaceAudience.Private
class AsyncMetaTableRegionLocator extends AbstractAsyncTableRegionLocator {

private static final Logger LOG = LoggerFactory.getLogger(AsyncMetaTableRegionLocator.class);

private final AtomicReference<Interface> stub = new AtomicReference<>();

private final AtomicReference<CompletableFuture<Interface>> stubMakeFuture =
new AtomicReference<>();

AsyncMetaTableRegionLocator(AsyncConnectionImpl conn, TableName tableName, int maxConcurrent) {
// for meta region we should use MetaCellComparator to compare the row keys
super(conn, tableName, maxConcurrent, (r1, r2) -> CellComparatorImpl.MetaCellComparator
.compareRows(r1, 0, r1.length, r2, 0, r2.length));
}

private Interface createStub(ServerName serverName) throws IOException {
return ClientMetaService.newStub(conn.rpcClient.createRpcChannel(serverName, conn.user,
(int) TimeUnit.NANOSECONDS.toMillis(conn.connConf.getReadRpcTimeoutNs())));
}

CompletableFuture<Interface> getStub() {
return ConnectionUtils.getOrFetch(stub, stubMakeFuture, false, () -> {
CompletableFuture<Interface> future = new CompletableFuture<>();
addListener(conn.registry.getActiveMaster(), (addr, error) -> {
if (error != null) {
future.completeExceptionally(error);
} else if (addr == null) {
future.completeExceptionally(new MasterNotRunningException(
"ZooKeeper available but no active master location found"));
} else {
LOG.debug("The fetched master address is {}", addr);
try {
future.complete(createStub(addr));
} catch (IOException e) {
future.completeExceptionally(e);
}
}

});
return future;
}, stub -> true, "ClientLocateMetaStub");
}

private void tryClearMasterStubCache(IOException error, Interface currentStub) {
if (ClientExceptionsUtil.isConnectionException(error) ||
error instanceof ServerNotRunningYetException) {
stub.compareAndSet(currentStub, null);
}
super(conn, tableName, maxConcurrent, CellComparatorImpl.MetaCellComparator.ROW_COMPARATOR);
}

@Override
protected void locate(LocateRequest req) {
addListener(getStub(), (stub, error) -> {
addListener(conn.registry.locateMeta(req.row, req.locateType), (locs, error) -> {
if (error != null) {
onLocateComplete(req, null, error);
return;
}
HBaseRpcController controller = conn.rpcControllerFactory.newController();
stub.locateMetaRegion(controller,
LocateMetaRegionRequest.newBuilder().setRow(ByteString.copyFrom(req.row))
.setLocateType(ProtobufUtil.toProtoRegionLocateType(req.locateType)).build(),
resp -> {
if (controller.failed()) {
IOException ex = controller.getFailed();
tryClearMasterStubCache(ex, stub);
onLocateComplete(req, null, ex);
return;
}
RegionLocations locs = new RegionLocations(resp.getMetaLocationsList().stream()
.map(ProtobufUtil::toRegionLocation).collect(Collectors.toList()));
if (validateRegionLocations(locs, req)) {
onLocateComplete(req, locs, null);
}
});
if (validateRegionLocations(locs, req)) {
onLocateComplete(req, locs, null);
}
});
}

@Override
CompletableFuture<List<HRegionLocation>>
getAllRegionLocations(boolean excludeOfflinedSplitParents) {
CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
addListener(getStub(), (stub, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
}
HBaseRpcController controller = conn.rpcControllerFactory.newController();
stub.getAllMetaRegionLocations(controller, GetAllMetaRegionLocationsRequest.newBuilder()
.setExcludeOfflinedSplitParents(excludeOfflinedSplitParents).build(), resp -> {
if (controller.failed()) {
IOException ex = controller.getFailed();
tryClearMasterStubCache(ex, stub);
future.completeExceptionally(ex);
return;
}
List<HRegionLocation> locs = resp.getMetaLocationsList().stream()
.map(ProtobufUtil::toRegionLocation).collect(Collectors.toList());
future.complete(locs);
});
});
return future;
return conn.registry.getAllMetaRegionLocations(excludeOfflinedSplitParents);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,36 @@
package org.apache.hadoop.hbase.client;

import java.io.Closeable;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;

/**
* Registry for meta information needed for connection setup to a HBase cluster. Implementations
* hold cluster information such as this cluster's id, location of hbase:meta, etc..
* Internal use only.
* hold cluster information such as this cluster's id, location of hbase:meta, etc.. Internal use
* only.
*/
@InterfaceAudience.Private
interface ConnectionRegistry extends Closeable {

/**
* Get location of meta region for the given {@code row}.
*/
CompletableFuture<RegionLocations> locateMeta(byte[] row, RegionLocateType locateType);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems arbitrary that we do ROOT location in here but location for hbase:meta is elsewhere and if they want to do 'caching'/'replicas', they'd use a different system.

Or is idea that this cache of location be generalized? Or for ROOT only?

Implementations could get RS stubs I suppose so don't have to go to Master to get meta Locations?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think it is arbitrary. Before we have root table, meta location is here and user locations are elsewhere. The point here, is that, what things in HBase are called 'bootstraping' things. In the past it is meta, and now, it is root.

And see my plan below, my plan is to store the bootstraping things on the HA masters, so do not want to envolve RS in, let's keep them within masters.


/**
* Get all meta region locations, including the location of secondary regions.
* @param excludeOfflinedSplitParents whether to include split parent.
*/
CompletableFuture<List<HRegionLocation>>
getAllMetaRegionLocations(boolean excludeOfflinedSplitParents);

/**
* Should only be called once.
* <p>
* <p/>
* The upper layer should store this value somewhere as it will not be change any more.
*/
CompletableFuture<String> getClusterId();
Expand Down
Loading