Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
/**
*
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand All @@ -20,7 +19,6 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -40,7 +38,13 @@

/**
* Class to hold dead servers list and utility querying dead server list.
* On znode expiration, servers are added here.
* Servers are added when they expire or when we find them in filesystem on startup.
* When a server crash procedure is queued, it will populate the processing list and
* then remove the server from processing list when done. Servers are removed from
* dead server list when a new instance is started over the old on same hostname and
* port or when new Master comes online tidying up after all initialization. Processing
* list and deadserver list are not tied together (you don't have to be in deadservers
* list to be processing and vice versa).
*/
@InterfaceAudience.Private
public class DeadServer {
Expand All @@ -56,37 +60,11 @@ public class DeadServer {
private final Map<ServerName, Long> deadServers = new HashMap<>();

/**
* Set of dead servers currently being processed
*/
private final Set<ServerName> processingServers = new HashSet<ServerName>();

/**
* Handles restart of a server. The new server instance has a different start code.
* The new start code should be greater than the old one. We don't check that here.
*
* @param newServerName Servername as either <code>host:port</code> or
* <code>host,port,startcode</code>.
* @return true if this server was dead before and coming back alive again
* Set of dead servers currently being processed by a SCP.
* Added to this list at the start of SCP and removed after it is done
* processing the crash.
*/
public synchronized boolean cleanPreviousInstance(final ServerName newServerName) {
Iterator<ServerName> it = deadServers.keySet().iterator();
while (it.hasNext()) {
ServerName sn = it.next();
if (ServerName.isSameAddress(sn, newServerName)) {
// remove from deadServers
it.remove();
// remove from processingServers
boolean removed = processingServers.remove(sn);
if (removed) {
LOG.debug("Removed {}, processing={}, numProcessing={}", sn, removed,
processingServers.size());
}
return true;
}
}

return false;
}
private final Set<ServerName> processingServers = new HashSet<>();

/**
* @param serverName server name.
Expand All @@ -96,22 +74,14 @@ public synchronized boolean isDeadServer(final ServerName serverName) {
return deadServers.containsKey(serverName);
}

/**
* @param serverName server name.
* @return true if this server is on the processing servers list false otherwise
*/
public synchronized boolean isProcessingServer(final ServerName serverName) {
return processingServers.contains(serverName);
}

/**
* Checks if there are currently any dead servers being processed by the
* master. Returns true if at least one region server is currently being
* processed as dead.
*
* @return true if any RS are being processed as dead
*/
public synchronized boolean areDeadServersInProgress() {
synchronized boolean areDeadServersInProgress() {
return !processingServers.isEmpty();
}

Expand All @@ -124,72 +94,90 @@ public synchronized Set<ServerName> copyServerNames() {
/**
* Adds the server to the dead server list if it's not there already.
*/
public synchronized void add(ServerName sn) {
if (!deadServers.containsKey(sn)){
deadServers.put(sn, EnvironmentEdgeManager.currentTime());
}
boolean added = processingServers.add(sn);
if (LOG.isDebugEnabled() && added) {
LOG.debug("Added " + sn + "; numProcessing=" + processingServers.size());
}
synchronized void putIfAbsent(ServerName sn) {
this.deadServers.putIfAbsent(sn, EnvironmentEdgeManager.currentTime());
processing(sn);
}

/**
* Notify that we started processing this dead server.
* @param sn ServerName for the dead server.
* Add <code>sn<</code> to set of processing deadservers.
* @see #finish(ServerName)
*/
public synchronized void notifyServer(ServerName sn) {
boolean added = processingServers.add(sn);
if (LOG.isDebugEnabled()) {
if (added) {
LOG.debug("Added " + sn + "; numProcessing=" + processingServers.size());
}
LOG.debug("Started processing " + sn + "; numProcessing=" + processingServers.size());
public synchronized void processing(ServerName sn) {
if (processingServers.add(sn)) {
// Only log on add.
LOG.debug("Processing {}; numProcessing={}", sn, processingServers.size());
}
}

/**
* Complete processing for this dead server.
* @param sn ServerName for the dead server.
* @see #processing(ServerName)
*/
public synchronized void finish(ServerName sn) {
boolean removed = processingServers.remove(sn);
if (LOG.isDebugEnabled()) {
LOG.debug("Finished processing " + sn + "; numProcessing=" + processingServers.size());
if (removed) {
LOG.debug("Removed " + sn + " ; numProcessing=" + processingServers.size());
}
if (processingServers.remove(sn)) {
LOG.debug("Removed {} from processing; numProcessing={}", sn, processingServers.size());
}
}

public synchronized int size() {
return deadServers.size();
}

public synchronized boolean isEmpty() {
synchronized boolean isEmpty() {
return deadServers.isEmpty();
}

public synchronized void cleanAllPreviousInstances(final ServerName newServerName) {
/**
* Handles restart of a server. The new server instance has a different start code.
* The new start code should be greater than the old one. We don't check that here.
* Removes the old server from deadserver list.
*
* @param newServerName Servername as either <code>host:port</code> or
* <code>host,port,startcode</code>.
* @return true if this server was dead before and coming back alive again
*/
synchronized boolean cleanPreviousInstance(final ServerName newServerName) {
Iterator<ServerName> it = deadServers.keySet().iterator();
while (it.hasNext()) {
ServerName sn = it.next();
if (ServerName.isSameAddress(sn, newServerName)) {
// remove from deadServers
it.remove();
// remove from processingServers
boolean removed = processingServers.remove(sn);
if (removed) {
LOG.debug("Removed " + sn + " ; numProcessing=" + processingServers.size());
}
if (cleanOldServerName(newServerName, it)) {
return true;
}
}
return false;
}

synchronized void cleanAllPreviousInstances(final ServerName newServerName) {
Iterator<ServerName> it = deadServers.keySet().iterator();
while (it.hasNext()) {
cleanOldServerName(newServerName, it);
}
}

/**
* @param newServerName Server to match port and hostname against.
* @param deadServerIterator Iterator primed so can call 'next' on it.
* @return True if <code>newServerName</code> and current primed
* iterator ServerName have same host and port and we removed old server
* from iterator and from processing list.
*/
private boolean cleanOldServerName(ServerName newServerName,
Iterator<ServerName> deadServerIterator) {
ServerName sn = deadServerIterator.next();
if (ServerName.isSameAddress(sn, newServerName)) {
// Remove from dead servers list. Don't remove from the processing list --
// let the SCP do it when it is done.
deadServerIterator.remove();
return true;
}
return false;
}

@Override
public synchronized String toString() {
// Display unified set of servers from both maps
Set<ServerName> servers = new HashSet<ServerName>();
Set<ServerName> servers = new HashSet<>();
servers.addAll(deadServers.keySet());
servers.addAll(processingServers);
StringBuilder sb = new StringBuilder();
Expand All @@ -211,7 +199,7 @@ public synchronized String toString() {
* @param ts the time, 0 for all
* @return a sorted array list, by death time, lowest values first.
*/
public synchronized List<Pair<ServerName, Long>> copyDeadServersSince(long ts){
synchronized List<Pair<ServerName, Long>> copyDeadServersSince(long ts) {
List<Pair<ServerName, Long>> res = new ArrayList<>(size());

for (Map.Entry<ServerName, Long> entry:deadServers.entrySet()){
Expand All @@ -220,7 +208,7 @@ public synchronized List<Pair<ServerName, Long>> copyDeadServersSince(long ts){
}
}

Collections.sort(res, ServerNameDeathDateComparator);
Collections.sort(res, (o1, o2) -> o1.getSecond().compareTo(o2.getSecond()));
return res;
}

Expand All @@ -234,28 +222,15 @@ public synchronized Date getTimeOfDeath(final ServerName deadServerName){
return time == null ? null : new Date(time);
}

private static Comparator<Pair<ServerName, Long>> ServerNameDeathDateComparator =
new Comparator<Pair<ServerName, Long>>(){

@Override
public int compare(Pair<ServerName, Long> o1, Pair<ServerName, Long> o2) {
return o1.getSecond().compareTo(o2.getSecond());
}
};

/**
* remove the specified dead server
* Called from rpc by operator cleaning up deadserver list.
* @param deadServerName the dead server name
* @return true if this server was removed
*/

public synchronized boolean removeDeadServer(final ServerName deadServerName) {
Preconditions.checkState(!processingServers.contains(deadServerName),
"Asked to remove server still in processingServers set " + deadServerName +
" (numProcessing=" + processingServers.size() + ")");
if (deadServers.remove(deadServerName) == null) {
return false;
}
return true;
return this.deadServers.remove(deadServerName) != null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ boolean checkAndRecordNewServer(final ServerName serverName, final ServerMetrics
*/
void findDeadServersAndProcess(Set<ServerName> deadServersFromPE,
Set<ServerName> liveServersFromWALDir) {
deadServersFromPE.forEach(deadservers::add);
deadServersFromPE.forEach(deadservers::putIfAbsent);
liveServersFromWALDir.stream().filter(sn -> !onlineServers.containsKey(sn))
.forEach(this::expireServer);
}
Expand Down Expand Up @@ -350,6 +350,8 @@ private void checkClockSkew(final ServerName serverName, final long serverCurren
}

/**
* Called when RegionServer first reports in for duty and thereafter each
* time it heartbeats to make sure it is has not been figured for dead.
* If this server is on the dead list, reject it with a YouAreDeadException.
* If it was dead but came back with a new start code, remove the old entry
* from the dead list.
Expand All @@ -358,21 +360,20 @@ private void checkClockSkew(final ServerName serverName, final long serverCurren
private void checkIsDead(final ServerName serverName, final String what)
throws YouAreDeadException {
if (this.deadservers.isDeadServer(serverName)) {
// host name, port and start code all match with existing one of the
// dead servers. So, this server must be dead.
// Exact match: host name, port and start code all match with existing one of the
// dead servers. So, this server must be dead. Tell it to kill itself.
String message = "Server " + what + " rejected; currently processing " +
serverName + " as dead server";
LOG.debug(message);
throw new YouAreDeadException(message);
}
// remove dead server with same hostname and port of newly checking in rs after master
// initialization.See HBASE-5916 for more information.
if ((this.master == null || this.master.isInitialized())
&& this.deadservers.cleanPreviousInstance(serverName)) {
// Remove dead server with same hostname and port of newly checking in rs after master
// initialization. See HBASE-5916 for more information.
if ((this.master == null || this.master.isInitialized()) &&
this.deadservers.cleanPreviousInstance(serverName)) {
// This server has now become alive after we marked it as dead.
// We removed it's previous entry from the dead list to reflect it.
LOG.debug(what + ":" + " Server " + serverName + " came back up," +
" removed it from the dead servers list");
LOG.debug("{} {} came back up, removed it from the dead servers list", what, serverName);
}
}

Expand Down Expand Up @@ -575,7 +576,10 @@ synchronized long expireServer(final ServerName serverName, boolean force) {
return pid;
}

// Note: this is currently invoked from RPC, not just tests. Locking in this class needs cleanup.
/**
* Called when server has expired.
*/
// Locking in this class needs cleanup.
@VisibleForTesting
public synchronized void moveFromOnlineToDeadServers(final ServerName sn) {
synchronized (this.onlineServers) {
Expand All @@ -584,7 +588,7 @@ public synchronized void moveFromOnlineToDeadServers(final ServerName sn) {
// Remove the server from the known servers lists and update load info BUT
// add to deadservers first; do this so it'll show in dead servers list if
// not in online servers list.
this.deadservers.add(sn);
this.deadservers.putIfAbsent(sn);
this.onlineServers.remove(sn);
onlineServers.notifyAll();
} else {
Expand Down Expand Up @@ -885,7 +889,7 @@ public synchronized ServerLiveState isServerKnownAndOnline(ServerName serverName
/**
* Check if a server is known to be dead. A server can be online,
* or known to be dead, or unknown to this manager (i.e, not online,
* not known to be dead either. it is simply not tracked by the
* not known to be dead either; it is simply not tracked by the
* master any more, for example, a very old previous instance).
*/
public synchronized boolean isServerDead(ServerName serverName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ protected Flow executeFromState(MasterProcedureEnv env, ServerCrashState state)
// This adds server to the DeadServer processing list but not to the DeadServers list.
// Server gets removed from processing list below on procedure successful finish.
if (!notifiedDeadServer) {
services.getServerManager().getDeadServers().notifyServer(serverName);
services.getServerManager().getDeadServers().processing(serverName);
notifiedDeadServer = true;
}

Expand Down
Loading