Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public class ConfInfo {

private ArrayList<ConfItem> property = new ArrayList<>();

private String subClusterId;

public ConfInfo() {
} // JAXB needs this

Expand Down Expand Up @@ -74,5 +76,14 @@ public String getKey() {
public String getValue() {
return value;
}

}

public String getSubClusterId() {
return subClusterId;
}

public void setSubClusterId(String subClusterId) {
this.subClusterId = subClusterId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ public class SchedConfUpdateInfo {
@XmlElement(name = "update-queue")
private ArrayList<QueueConfigInfo> updateQueueInfo = new ArrayList<>();

@XmlElement(name = "subClusterId")
private String subClusterId = "";

private HashMap<String, String> global = new HashMap<>();

public SchedConfUpdateInfo() {
Expand Down Expand Up @@ -82,4 +85,12 @@ public HashMap<String, String> getGlobalParams() {
public void setGlobalParams(HashMap<String, String> globalInfo) {
this.global = globalInfo;
}

public String getSubClusterId() {
return subClusterId;
}

public void setSubClusterId(String subClusterId) {
this.subClusterId = subClusterId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ public final class RouterMetrics {
private MutableGaugeInt numAddToClusterNodeLabelsFailedRetrieved;
@Metric("# of removeFromClusterNodeLabels failed to be retrieved")
private MutableGaugeInt numRemoveFromClusterNodeLabelsFailedRetrieved;
@Metric("# of numUpdateSchedulerConfiguration failed to be retrieved")
private MutableGaugeInt numUpdateSchedulerConfigurationFailedRetrieved;
@Metric("# of numGetSchedulerConfiguration failed to be retrieved")
private MutableGaugeInt numGetSchedulerConfigurationFailedRetrieved;
@Metric("# of getClusterInfo failed to be retrieved")
private MutableGaugeInt numGetClusterInfoFailedRetrieved;
@Metric("# of getClusterUserInfo failed to be retrieved")
Expand Down Expand Up @@ -287,6 +291,10 @@ public final class RouterMetrics {
private MutableRate totalSucceededAddToClusterNodeLabelsRetrieved;
@Metric("Total number of successful Retrieved RemoveFromClusterNodeLabels and latency(ms)")
private MutableRate totalSucceededRemoveFromClusterNodeLabelsRetrieved;
@Metric("Total number of successful Retrieved updateSchedulerConfiguration and latency(ms)")
private MutableRate totalSucceededUpdateSchedulerConfigurationRetrieved;
@Metric("Total number of successful Retrieved getSchedulerConfiguration and latency(ms)")
private MutableRate totalSucceededGetSchedulerConfigurationRetrieved;
@Metric("Total number of successful Retrieved GetClusterInfoRetrieved and latency(ms)")
private MutableRate totalSucceededGetClusterInfoRetrieved;
@Metric("Total number of successful Retrieved GetClusterUserInfoRetrieved and latency(ms)")
Expand Down Expand Up @@ -358,6 +366,8 @@ public final class RouterMetrics {
private MutableQuantiles replaceLabelsOnNodeLatency;
private MutableQuantiles addToClusterNodeLabelsLatency;
private MutableQuantiles removeFromClusterNodeLabelsLatency;
private MutableQuantiles updateSchedulerConfigLatency;
private MutableQuantiles getSchedulerConfigurationLatency;
private MutableQuantiles getClusterInfoLatency;
private MutableQuantiles getClusterUserInfoLatency;
private MutableQuantiles updateNodeResourceLatency;
Expand Down Expand Up @@ -572,6 +582,12 @@ private RouterMetrics() {
removeFromClusterNodeLabelsLatency = registry.newQuantiles("removeFromClusterNodeLabelsLatency",
"latency of remove cluster nodelabels timeouts", "ops", "latency", 10);

updateSchedulerConfigLatency = registry.newQuantiles("updateSchedulerConfigurationLatency",
"latency of update scheduler configuration timeouts", "ops", "latency", 10);

getSchedulerConfigurationLatency = registry.newQuantiles("getSchedulerConfigurationLatency",
"latency of get scheduler configuration timeouts", "ops", "latency", 10);

getClusterInfoLatency = registry.newQuantiles("getClusterInfoLatency",
"latency of get cluster info timeouts", "ops", "latency", 10);

Expand Down Expand Up @@ -879,6 +895,16 @@ public long getNumSucceededRemoveFromClusterNodeLabelsRetrieved() {
return totalSucceededRemoveFromClusterNodeLabelsRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public long getNumSucceededUpdateSchedulerConfigurationRetrieved() {
return totalSucceededUpdateSchedulerConfigurationRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public long getNumSucceededGetSchedulerConfigurationRetrieved() {
return totalSucceededGetSchedulerConfigurationRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public long getNumSucceededGetClusterInfoRetrieved() {
return totalSucceededGetClusterInfoRetrieved.lastStat().numSamples();
Expand Down Expand Up @@ -1189,6 +1215,16 @@ public double getLatencySucceededRemoveFromClusterNodeLabelsRetrieved() {
return totalSucceededRemoveFromClusterNodeLabelsRetrieved.lastStat().mean();
}

@VisibleForTesting
public double getLatencySucceededUpdateSchedulerConfigurationRetrieved() {
return totalSucceededUpdateSchedulerConfigurationRetrieved.lastStat().mean();
}

@VisibleForTesting
public double getLatencySucceededGetSchedulerConfigurationRetrieved() {
return totalSucceededGetSchedulerConfigurationRetrieved.lastStat().mean();
}

@VisibleForTesting
public double getLatencySucceededGetClusterInfoRetrieved() {
return totalSucceededGetClusterInfoRetrieved.lastStat().mean();
Expand Down Expand Up @@ -1454,6 +1490,14 @@ public int getNumRemoveFromClusterNodeLabelsFailedRetrieved() {
return numRemoveFromClusterNodeLabelsFailedRetrieved.value();
}

public int getUpdateSchedulerConfigurationFailedRetrieved() {
return numUpdateSchedulerConfigurationFailedRetrieved.value();
}

public int getSchedulerConfigurationFailedRetrieved() {
return numGetSchedulerConfigurationFailedRetrieved.value();
}

public int getClusterInfoFailedRetrieved() {
return numGetClusterInfoFailedRetrieved.value();
}
Expand Down Expand Up @@ -1773,6 +1817,16 @@ public void succeededRemoveFromClusterNodeLabelsRetrieved(long duration) {
removeFromClusterNodeLabelsLatency.add(duration);
}

public void succeededUpdateSchedulerConfigurationRetrieved(long duration) {
totalSucceededUpdateSchedulerConfigurationRetrieved.add(duration);
updateSchedulerConfigLatency.add(duration);
}

public void succeededGetSchedulerConfigurationRetrieved(long duration) {
totalSucceededGetSchedulerConfigurationRetrieved.add(duration);
getSchedulerConfigurationLatency.add(duration);
}

public void succeededGetClusterInfoRetrieved(long duration) {
totalSucceededGetClusterInfoRetrieved.add(duration);
getClusterInfoLatency.add(duration);
Expand Down Expand Up @@ -2013,6 +2067,14 @@ public void incrRemoveFromClusterNodeLabelsFailedRetrieved() {
numRemoveFromClusterNodeLabelsFailedRetrieved.incr();
}

public void incrUpdateSchedulerConfigurationFailedRetrieved() {
numUpdateSchedulerConfigurationFailedRetrieved.incr();
}

public void incrGetSchedulerConfigurationFailedRetrieved() {
numGetSchedulerConfigurationFailedRetrieved.incr();
}

public void incrGetClusterInfoFailedRetrieved() {
numGetClusterInfoFailedRetrieved.incr();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import javax.ws.rs.core.Response.Status;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.impl.prefetch.Validate;
Expand Down Expand Up @@ -129,13 +128,15 @@
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationRMQueueAclInfo;
import org.apache.hadoop.yarn.server.router.webapp.dao.SubClusterResult;
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationSchedulerTypeInfo;
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationConfInfo;
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationClusterUserInfo;
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationClusterInfo;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
import org.apache.hadoop.yarn.util.LRUCacheHashMap;
import org.apache.hadoop.yarn.webapp.dao.ConfInfo;
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.MonotonicClock;
Expand Down Expand Up @@ -848,6 +849,29 @@ private Map<SubClusterId, SubClusterInfo> getActiveSubclusters()
}
}

/**
* Get the active subcluster in the federation.
*
* @param subClusterId subClusterId.
* @return subClusterInfo.
* @throws NotFoundException If the subclusters cannot be found.
*/
private SubClusterInfo getActiveSubCluster(String subClusterId)
throws NotFoundException {
try {
SubClusterId pSubClusterId = SubClusterId.newInstance(subClusterId);
Map<SubClusterId, SubClusterInfo> subClusterInfoMap =
federationFacade.getSubClusters(true);
SubClusterInfo subClusterInfo = subClusterInfoMap.get(pSubClusterId);
if (subClusterInfo == null) {
throw new NotFoundException(subClusterId + " not found.");
}
return subClusterInfo;
} catch (YarnException e) {
throw new NotFoundException(e.getMessage());
}
}

/**
* The YARN Router will forward to the request to all the SubClusters to find
* where the node is running.
Expand Down Expand Up @@ -2906,17 +2930,117 @@ public ContainerInfo getContainer(HttpServletRequest req,
throw new RuntimeException("getContainer Failed.");
}

/**
* This method updates the Scheduler configuration, and it is reachable by
* using {@link RMWSConsts#SCHEDULER_CONF}.
*
* @param mutationInfo th information for making scheduler configuration
* changes (supports adding, removing, or updating a queue, as well
* as global scheduler conf changes)
* @param hsr the servlet request
* @return Response containing the status code
* @throws AuthorizationException if the user is not authorized to invoke this
* method
* @throws InterruptedException if interrupted
*/
@Override
public Response updateSchedulerConfiguration(SchedConfUpdateInfo mutationInfo,
HttpServletRequest hsr)
throws AuthorizationException, InterruptedException {
throw new NotImplementedException("Code is not implemented");
HttpServletRequest hsr) throws AuthorizationException, InterruptedException {

// Make Sure mutationInfo is not null.
if (mutationInfo == null) {
routerMetrics.incrUpdateSchedulerConfigurationFailedRetrieved();
throw new IllegalArgumentException(
"Parameter error, the schedConfUpdateInfo is empty or null.");
}

// In federated mode, we may have a mix of multiple schedulers.
// In order to ensure accurate update scheduler configuration,
// we need users to explicitly set subClusterId.
String pSubClusterId = mutationInfo.getSubClusterId();
if (StringUtils.isBlank(pSubClusterId)) {
routerMetrics.incrUpdateSchedulerConfigurationFailedRetrieved();
throw new IllegalArgumentException("Parameter error, " +
"the subClusterId is empty or null.");
}

// Get the subClusterInfo , then update the scheduler configuration.
try {
long startTime = clock.getTime();
SubClusterInfo subClusterInfo = getActiveSubCluster(pSubClusterId);
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
Response response = interceptor.updateSchedulerConfiguration(mutationInfo, hsr);
if (response != null) {
long endTime = clock.getTime();
routerMetrics.succeededUpdateSchedulerConfigurationRetrieved(endTime - startTime);
return Response.status(response.getStatus()).entity(response.getEntity()).build();
}
} catch (NotFoundException e) {
routerMetrics.incrUpdateSchedulerConfigurationFailedRetrieved();
RouterServerUtil.logAndThrowRunTimeException(e,
"Get subCluster error. subClusterId = %s", pSubClusterId);
} catch (Exception e) {
routerMetrics.incrUpdateSchedulerConfigurationFailedRetrieved();
RouterServerUtil.logAndThrowRunTimeException(e,
"UpdateSchedulerConfiguration error. subClusterId = %s", pSubClusterId);
}

routerMetrics.incrUpdateSchedulerConfigurationFailedRetrieved();
throw new RuntimeException("UpdateSchedulerConfiguration error. subClusterId = "
+ pSubClusterId);
}

/**
* This method retrieves all the Scheduler configuration, and it is reachable
* by using {@link RMWSConsts#SCHEDULER_CONF}.
*
* @param hsr the servlet request
* @return Response containing the status code
* @throws AuthorizationException if the user is not authorized to invoke this
* method.
*/
@Override
public Response getSchedulerConfiguration(HttpServletRequest hsr)
throws AuthorizationException {
throw new NotImplementedException("Code is not implemented");
try {
long startTime = clock.getTime();
FederationConfInfo federationConfInfo = new FederationConfInfo();
Map<SubClusterId, SubClusterInfo> subClustersActive = getActiveSubclusters();
final HttpServletRequest hsrCopy = clone(hsr);
Class[] argsClasses = new Class[]{HttpServletRequest.class};
Object[] args = new Object[]{hsrCopy};
ClientMethod remoteMethod = new ClientMethod("getSchedulerConfiguration", argsClasses, args);
Map<SubClusterInfo, Response> responseMap =
invokeConcurrent(subClustersActive.values(), remoteMethod, Response.class);
responseMap.forEach((subClusterInfo, response) -> {
SubClusterId subClusterId = subClusterInfo.getSubClusterId();
if (response == null) {
String errorMsg = subClusterId + " Can't getSchedulerConfiguration.";
federationConfInfo.getErrorMsgs().add(errorMsg);
} else if (response.getStatus() == Status.BAD_REQUEST.getStatusCode()) {
String errorMsg = String.valueOf(response.getEntity());
federationConfInfo.getErrorMsgs().add(errorMsg);
} else if (response.getStatus() == Status.OK.getStatusCode()) {
ConfInfo fedConfInfo = ConfInfo.class.cast(response.getEntity());
fedConfInfo.setSubClusterId(subClusterId.getId());
federationConfInfo.getList().add(fedConfInfo);
}
});
long endTime = clock.getTime();
routerMetrics.succeededGetSchedulerConfigurationRetrieved(endTime - startTime);
return Response.status(Status.OK).entity(federationConfInfo).build();
} catch (NotFoundException e) {
RouterServerUtil.logAndThrowRunTimeException("get all active sub cluster(s) error.", e);
routerMetrics.incrGetSchedulerConfigurationFailedRetrieved();
} catch (Exception e) {
routerMetrics.incrGetSchedulerConfigurationFailedRetrieved();
RouterServerUtil.logAndThrowRunTimeException("getSchedulerConfiguration error.", e);
return Response.status(Status.BAD_REQUEST).entity("getSchedulerConfiguration error.").build();
}

routerMetrics.incrGetSchedulerConfigurationFailedRetrieved();
throw new RuntimeException("getSchedulerConfiguration error.");
}

@Override
Expand Down
Loading