Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2265,6 +2265,20 @@ default boolean replicationPeerModificationSwitch(boolean on) throws IOException
*/
void decommissionRegionServers(List<ServerName> servers, boolean offload) throws IOException;

/**
* Mark region server(s) as decommissioned to prevent additional regions from getting assigned to
* them. Optionally unload the regions on the servers. If there are multiple servers to be
* decommissioned, decommissioning them at the same time can prevent wasteful region movements.
* Region unloading is asynchronous.
* @param servers The list of servers to decommission.
* @param offload True to offload the regions from the decommissioned servers
* @param matchHostNameOnly True to prevent the hostname from ever joining again, regardless of
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This language is a bit exaggerated, right? The host can join again, just remove this entry. You can tone it back and say something like "True to reject this host regardless of its startcode or port."

* startTime
* @throws IOException if a remote or network exception occurs
*/
void decommissionRegionServers(List<ServerName> servers, boolean offload,
boolean matchHostNameOnly) throws IOException;

/**
* List region servers marked as decommissioned, which can not be assigned regions.
* @return List of decommissioned region servers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -929,6 +929,12 @@ public void decommissionRegionServers(List<ServerName> servers, boolean offload)
get(admin.decommissionRegionServers(servers, offload));
}

@Override
public void decommissionRegionServers(List<ServerName> servers, boolean offload,
boolean matchHostNameOnly) throws IOException {
get(admin.decommissionRegionServers(servers, offload, matchHostNameOnly));
}

@Override
public List<ServerName> listDecommissionedRegionServers() throws IOException {
return get(admin.listDecommissionedRegionServers());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1146,6 +1146,9 @@ CompletableFuture<Boolean> isProcedureFinished(String signature, String instance
*/
CompletableFuture<Void> decommissionRegionServers(List<ServerName> servers, boolean offload);

CompletableFuture<Void> decommissionRegionServers(List<ServerName> servers, boolean offload,
boolean matchHostNameOnly);

/**
* List region servers marked as decommissioned, which can not be assigned regions.
* @return List of decommissioned region servers wrapped by {@link CompletableFuture}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,12 @@ public CompletableFuture<Void> decommissionRegionServers(List<ServerName> server
return wrap(rawAdmin.decommissionRegionServers(servers, offload));
}

@Override
public CompletableFuture<Void> decommissionRegionServers(List<ServerName> servers,
boolean offload, boolean matchHostNameOnly) {
return wrap(rawAdmin.decommissionRegionServers(servers, offload, matchHostNameOnly));
}

@Override
public CompletableFuture<List<ServerName>> listDecommissionedRegionServers() {
return wrap(rawAdmin.listDecommissionedRegionServers());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2509,10 +2509,21 @@ public CompletableFuture<String> getLocks() {
@Override
public CompletableFuture<Void> decommissionRegionServers(List<ServerName> servers,
boolean offload) {
// By default, when we decommission a RegionServer we don't mark the hostname as permanently
// decommissioned and instead mark the server location (host + port + startCode) as such
boolean matchHostNameOnly = false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By calling down to the higher arity method you're duplicating the default decision that's already described in protobuf. Doing so is not a terrible thing, but it is redundant. I'd say, leave the default in one place if at all possible.


return this.decommissionRegionServers(servers, offload, matchHostNameOnly);
}

@Override
public CompletableFuture<Void> decommissionRegionServers(List<ServerName> servers,
boolean offload, boolean matchHostNameOnly) {
return this.<Void> newMasterCaller()
.action((controller, stub) -> this.<DecommissionRegionServersRequest,
DecommissionRegionServersResponse, Void> call(controller, stub,
RequestConverter.buildDecommissionRegionServersRequest(servers, offload),
RequestConverter.buildDecommissionRegionServersRequest(servers, offload,
matchHostNameOnly),
(s, c, req, done) -> s.decommissionRegionServers(c, req, done), resp -> null))
.call();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1558,8 +1558,14 @@ public static GetQuotaStatesRequest buildGetQuotaStatesRequest() {

public static DecommissionRegionServersRequest
buildDecommissionRegionServersRequest(List<ServerName> servers, boolean offload) {
return RequestConverter.buildDecommissionRegionServersRequest(servers, offload, false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment here about duplicating the default state.

}

public static DecommissionRegionServersRequest buildDecommissionRegionServersRequest(
List<ServerName> servers, boolean offload, boolean matchHostNameOnly) {
return DecommissionRegionServersRequest.newBuilder()
.addAllServerName(toProtoServerNames(servers)).setOffload(offload).build();
.addAllServerName(toProtoServerNames(servers)).setOffload(offload)
.setMatchHostNameOnly(matchHostNameOnly).build();
}

public static RecommissionRegionServerRequest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,7 @@ message ListDecommissionedRegionServersResponse {
message DecommissionRegionServersRequest {
repeated ServerName server_name = 1;
required bool offload = 2;
optional bool match_host_name_only = 3 [default = false];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add comments for maintainers in both proto files that this default value should match the other.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also support a match_hostname_and_port_only variation on this feature?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that we want to prescribe a default value in the protobuf. Instead, the service should interpret the absence of a value appropriately.

}

message DecommissionRegionServersResponse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,7 @@ message DeprecatedTableState {
message SwitchState {
optional bool enabled = 1;
}

message DrainedZNodeServerData {
optional bool match_host_name_only = 1 [default = false];
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,16 @@
import org.apache.hadoop.hbase.zookeeper.ZKListener;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;

import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.DrainedZNodeServerData;

/**
* Tracks the list of draining region servers via ZK.
* <p>
Expand Down Expand Up @@ -77,36 +82,66 @@ public void serverAdded(ServerName sn) {
}
}
});

List<String> servers =
ZKUtil.listChildrenAndWatchThem(watcher, watcher.getZNodePaths().drainingZNode);
add(servers);

if (servers != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit -- it's safer to put the null-check in the method than to burden all callers with the responsibility.

add(servers);
}
}

private void add(final List<String> servers) throws IOException {
synchronized (this.drainingServers) {
this.drainingServers.clear();
// Clear all servers from the draining list that shouldn't stay drained. Information about
// whether a server should stay drained or not is added to the ZNode by the HMaster
this.drainingServers.removeIf(sn -> !shouldServerStayDrained(sn));

for (String n : servers) {
final ServerName sn = ServerName.valueOf(ZKUtil.getNodeName(n));
this.drainingServers.add(sn);
this.serverManager.addServerToDrainList(sn);
LOG.info("Draining RS node created, adding to list [" + sn + "]");

LOG.info("Draining RS node created, adding to list [{}]", sn);
}
}
}

private void remove(final ServerName sn) {
synchronized (this.drainingServers) {
if (shouldServerStayDrained(sn)) {
LOG.info(
"Refusing to remove drained RS {} from the list, it's marked as permanently drained", sn);
return;
}
this.drainingServers.remove(sn);
this.serverManager.removeServerFromDrainList(sn);
LOG.info("Successfully removed drained RS {} from the list", sn);
}
}

private boolean shouldServerStayDrained(final ServerName sn) {
boolean shouldBePermanentlyDecommissioned = false;
String parentZnode = this.watcher.getZNodePaths().drainingZNode;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's no accessor method for the drainingZNode field?

String node = ZNodePaths.joinZNode(parentZnode, sn.getServerName());

try {
byte[] rawData = ZKUtil.getData(this.watcher, node);
// Check if the data is present for backwards compatibility, some nodes may not have it
if (rawData != null && rawData.length > 0) {
DrainedZNodeServerData znodeData = DrainedZNodeServerData.parseFrom(rawData);
shouldBePermanentlyDecommissioned = znodeData.getMatchHostNameOnly();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You've changed terminology between the serialized form and the in-memory form. What does this field represent? Is it "match on hostname only" or is it a claim of "permanence" ? I'm partial toward the former because there are no guarantees in this world...

More constructively though, I don't care much for the use of "permanent" in these method names. The data itself stored in ZooKeeper is considered ephemeral and not crucial for the recovery of a failed cluster. We shouldn't put anything in ZK that is so important as to be called permanent.

Try again with this method, calling it something like honorsHostNameOnly or similar.

}
} catch (InterruptedException | KeeperException | InvalidProtocolBufferException e) {
// pass
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For InterruptedException, you should propagate it as far as you can. InvalidProtocolBufferException should be logged at warn. KeeperException, yeah probably propagate that too -- there's nothing gained by trying to hide the fact from itself that the DrainingServerTracker is using zookeeper

}
return shouldBePermanentlyDecommissioned;
}

@Override
public void nodeDeleted(final String path) {
if (path.startsWith(watcher.getZNodePaths().drainingZNode)) {
final ServerName sn = ServerName.valueOf(ZKUtil.getNodeName(path));
LOG.info("Draining RS node deleted, removing from list [" + sn + "]");
LOG.info("Draining RS node deleted, removing from list [{}]", sn);
remove(sn);
}
}
Expand All @@ -117,10 +152,11 @@ public void nodeChildrenChanged(final String path) {
try {
final List<String> newNodes =
ZKUtil.listChildrenAndWatchThem(watcher, watcher.getZNodePaths().drainingZNode);
add(newNodes);
} catch (KeeperException e) {
abortable.abort("Unexpected zk exception getting RS nodes", e);
} catch (IOException e) {

if (newNodes != null) {
add(newNodes);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise, push the null-check into add?

}
} catch (KeeperException | IOException e) {
abortable.abort("Unexpected zk exception getting RS nodes", e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
import org.apache.hbase.thirdparty.com.google.gson.JsonParseException;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hbase.thirdparty.com.google.protobuf.Service;
import org.apache.hbase.thirdparty.org.eclipse.jetty.server.Server;
import org.apache.hbase.thirdparty.org.eclipse.jetty.server.ServerConnector;
Expand All @@ -293,6 +294,7 @@
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.DrainedZNodeServerData;

/**
* HMaster is the "master server" for HBase. An HBase cluster has one active master. If many masters
Expand Down Expand Up @@ -4017,13 +4019,27 @@ public boolean isReplicationPeerModificationEnabled() {
*/
public void decommissionRegionServers(final List<ServerName> servers, final boolean offload)
throws IOException {
this.decommissionRegionServers(servers, offload, false);
}

public void decommissionRegionServers(final List<ServerName> servers, final boolean offload,
final boolean matchHostNameOnly) throws IOException {
List<ServerName> serversAdded = new ArrayList<>(servers.size());
// Place the decommission marker first.
String parentZnode = getZooKeeper().getZNodePaths().drainingZNode;
for (ServerName server : servers) {
try {
String node = ZNodePaths.joinZNode(parentZnode, server.getServerName());
ZKUtil.createAndFailSilent(getZooKeeper(), node);
// Encode whether the host should be decommissioned permanently regardless of its
// port + startCode combination or not in the znode's data
if (matchHostNameOnly) {
LOG.info("Marking the host of '{}' as permanently decommissioned in ZooKeeper",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this is fine for debug level.

server.getServerName());
}
byte[] data = DrainedZNodeServerData.newBuilder().setMatchHostNameOnly(matchHostNameOnly)
.build().toByteArray();
// Create a node with binary data
ZKUtil.createAndFailSilent(getZooKeeper(), node, data);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not fail silently. If it cannot create the znode that marks the decommissioning , it should fail the RPC. Let the exception throw -- the existing catch clause appears to handle it appropriately.

} catch (KeeperException ke) {
throw new HBaseIOException(
this.zooKeeper.prefix("Unable to decommission '" + server.getServerName() + "'."), ke);
Expand Down Expand Up @@ -4068,8 +4084,31 @@ public void recommissionRegionServer(final ServerName server,
// Remove the server from decommissioned (draining) server list.
String parentZnode = getZooKeeper().getZNodePaths().drainingZNode;
String node = ZNodePaths.joinZNode(parentZnode, server.getServerName());
boolean shouldBePermanentlyDecommissioned = false;

// Get the binary data in the node and check whether the hostname is marked as
// permanently decommissioned or not
try {
byte[] rawData = ZKUtil.getData(getZooKeeper(), node);

// Check if the data is present for backwards compatibility, some nodes may not have it
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most nodes won't have this data as the common use-case is to drain by hostname,port,startcode.

if (rawData != null && rawData.length > 0) {
DrainedZNodeServerData znodeData = DrainedZNodeServerData.parseFrom(rawData);
shouldBePermanentlyDecommissioned = znodeData.getMatchHostNameOnly();
}
} catch (InterruptedException | KeeperException | InvalidProtocolBufferException e) {
throw new HBaseIOException(this.zooKeeper.prefix("Unable to recommission "
+ server.getServerName() + ", was unable to read the node's data"), e);
}

try {
ZKUtil.deleteNodeFailSilent(getZooKeeper(), node);
if (shouldBePermanentlyDecommissioned) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be a variant of this method that allows the operator to remove a server by hostname only to be removed from the decommissioning list. Unless I'm missing something, once a server is added to the list with this flag enabled, it cannot be removed again.

LOG.info("Skipping recommissioning of server {} because it was marked as permanently"
+ " decommissioned in ZooKeeper", server.getServerName());
return;
} else {
ZKUtil.deleteNodeFailSilent(getZooKeeper(), node);
}
} catch (KeeperException ke) {
throw new HBaseIOException(
this.zooKeeper.prefix("Unable to recommission '" + server.getServerName() + "'."), ke);
Expand All @@ -4092,8 +4131,9 @@ public void recommissionRegionServer(final ServerName server,
}
RegionInfo hri = regionState.getRegion();
if (server.equals(regionState.getServerName())) {
LOG.info("Skipping move of region " + hri.getRegionNameAsString()
+ " because region already assigned to the same server " + server + ".");
LOG.info(
"Skipping move of region {} because region already assigned to the same server {}.",
hri.getRegionNameAsString(), server);
continue;
}
RegionPlan rp = new RegionPlan(hri, regionState.getServerName(), server);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2279,10 +2279,11 @@ public DecommissionRegionServersResponse decommissionRegionServers(RpcController
List<ServerName> servers = request.getServerNameList().stream()
.map(pbServer -> ProtobufUtil.toServerName(pbServer)).collect(Collectors.toList());
boolean offload = request.getOffload();
boolean matchHostNameOnly = request.getMatchHostNameOnly();
if (server.cpHost != null) {
server.cpHost.preDecommissionRegionServers(servers, offload);
}
server.decommissionRegionServers(servers, offload);
server.decommissionRegionServers(servers, offload, matchHostNameOnly);
if (server.cpHost != null) {
server.cpHost.postDecommissionRegionServers(servers, offload);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,10 @@
* <p>
* If a sever is known not to be running any more, it is called dead. The dead server needs to be
* handled by a ServerShutdownHandler. If the handler is not enabled yet, the server can't be
* handled right away so it is queued up. After the handler is enabled, the server will be submitted
* to a handler to handle. However, the handler may be just partially enabled. If so, the server
* cannot be fully processed, and be queued up for further processing. A server is fully processed
* only after the handler is fully enabled and has completed the handling.
* handled right away, so it is queued up. After the handler is enabled, the server will be
* submitted to a handler to handle. However, the handler may be just partially enabled. If so, the
* server cannot be fully processed, and be queued up for further processing. A server is fully
* processed only after the handler is fully enabled and has completed the handling.
*/
@InterfaceAudience.Private
public class ServerManager {
Expand Down Expand Up @@ -649,10 +649,9 @@ public synchronized void moveFromOnlineToDeadServers(final ServerName sn) {
public synchronized boolean removeServerFromDrainList(final ServerName sn) {
// Warn if the server (sn) is not online. ServerName is of the form:
// <hostname> , <port> , <startcode>

if (!this.isServerOnline(sn)) {
LOG.warn("Server " + sn + " is not currently online. "
+ "Removing from draining list anyway, as requested.");
LOG.warn(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't clean up all these logger usages in files that you don't otherwise touch. Lets fix them up in a dedicated PR.

"Server {} is not currently online. Removing from draining list anyway, as requested.", sn);
}
// Remove the server from the draining servers lists.
return this.drainingServers.remove(sn);
Expand All @@ -667,18 +666,18 @@ public synchronized boolean addServerToDrainList(final ServerName sn) {
// <hostname> , <port> , <startcode>

if (!this.isServerOnline(sn)) {
LOG.warn("Server " + sn + " is not currently online. "
+ "Ignoring request to add it to draining list.");
LOG.warn("Server {} is not currently online. Ignoring request to add it to draining list.",
sn);
return false;
}
// Add the server to the draining servers lists, if it's not already in
// it.
if (this.drainingServers.contains(sn)) {
LOG.warn("Server " + sn + " is already in the draining server list."
+ "Ignoring request to add it again.");
LOG.warn("Server {} is already in the draining server list.Ignoring request to add it again.",
sn);
return true;
}
LOG.info("Server " + sn + " added to draining server list.");
LOG.info("Server {} added to draining server list.", sn);
return this.drainingServers.add(sn);
}

Expand Down
Loading