Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.server.federation.utils;

import org.apache.hadoop.yarn.exceptions.YarnException;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;


public abstract class FederationMethodWrapper {

/**
* List of parameters: static and dynamic values, matchings types.
*/
private Object[] params;

/**
* List of method parameters types, matches parameters.
*/
private Class<?>[] types;

/**
* String name of the method.
*/
private String methodName;

public FederationMethodWrapper(Class<?>[] pTypes, Object... pParams)
throws IOException {
if (pParams.length != pTypes.length) {
throw new IOException("Invalid parameters for method.");
}
this.params = pParams;
this.types = Arrays.copyOf(pTypes, pTypes.length);
}

public Object[] getParams() {
return Arrays.copyOf(this.params, this.params.length);
}

public String getMethodName() {
return methodName;
}

public void setMethodName(String methodName) {
this.methodName = methodName;
}

/**
* Get the calling types for this method.
*
* @return An array of calling types.
*/
public Class<?>[] getTypes() {
return Arrays.copyOf(this.types, this.types.length);
}

protected abstract <R> Collection<R> invokeConcurrent(Class<R> clazz) throws YarnException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ public final class RouterMetrics {
private MutableGaugeInt numGetAppTimeoutFailedRetrieved;
@Metric("# of getAppTimeouts failed to be retrieved")
private MutableGaugeInt numGetAppTimeoutsFailedRetrieved;
@Metric("# of refreshQueues failed to be retrieved")
private MutableGaugeInt numRefreshQueuesFailedRetrieved;
@Metric("# of getRMNodeLabels failed to be retrieved")
private MutableGaugeInt numGetRMNodeLabelsFailedRetrieved;
@Metric("# of checkUserAccessToQueue failed to be retrieved")
Expand Down Expand Up @@ -207,6 +209,8 @@ public final class RouterMetrics {
private MutableRate totalSucceededGetAppTimeoutRetrieved;
@Metric("Total number of successful Retrieved GetAppTimeouts and latency(ms)")
private MutableRate totalSucceededGetAppTimeoutsRetrieved;
@Metric("Total number of successful Retrieved RefreshQueues and latency(ms)")
private MutableRate totalSucceededRefreshQueuesRetrieved;
@Metric("Total number of successful Retrieved GetRMNodeLabels and latency(ms)")
private MutableRate totalSucceededGetRMNodeLabelsRetrieved;
@Metric("Total number of successful Retrieved CheckUserAccessToQueue and latency(ms)")
Expand Down Expand Up @@ -255,6 +259,7 @@ public final class RouterMetrics {
private MutableQuantiles getUpdateQueueLatency;
private MutableQuantiles getAppTimeoutLatency;
private MutableQuantiles getAppTimeoutsLatency;
private MutableQuantiles getRefreshQueuesLatency;
private MutableQuantiles getRMNodeLabelsLatency;
private MutableQuantiles checkUserAccessToQueueLatency;

Expand Down Expand Up @@ -410,6 +415,9 @@ private RouterMetrics() {
getAppTimeoutsLatency = registry.newQuantiles("getAppTimeoutsLatency",
"latency of get apptimeouts timeouts", "ops", "latency", 10);

getRefreshQueuesLatency = registry.newQuantiles("getRefreshQueuesLatency",
"latency of get refresh queues timeouts", "ops", "latency", 10);

getRMNodeLabelsLatency = registry.newQuantiles("getRMNodeLabelsLatency",
"latency of get rmnodelabels timeouts", "ops", "latency", 10);

Expand Down Expand Up @@ -636,6 +644,11 @@ public long getNumSucceededGetAppTimeoutsRetrieved() {
return totalSucceededGetAppTimeoutsRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public long getNumSucceededRefreshQueuesRetrieved() {
return totalSucceededRefreshQueuesRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public long getNumSucceededGetRMNodeLabelsRetrieved() {
return totalSucceededGetRMNodeLabelsRetrieved.lastStat().numSamples();
Expand Down Expand Up @@ -846,6 +859,11 @@ public double getLatencySucceededGetAppTimeoutsRetrieved() {
return totalSucceededGetAppTimeoutsRetrieved.lastStat().mean();
}

@VisibleForTesting
public double getLatencySucceededRefreshQueuesRetrieved() {
return totalSucceededRefreshQueuesRetrieved.lastStat().mean();
}

@VisibleForTesting
public double getLatencySucceededGetRMNodeLabelsRetrieved() {
return totalSucceededGetRMNodeLabelsRetrieved.lastStat().mean();
Expand Down Expand Up @@ -1037,6 +1055,11 @@ public int getAppTimeoutsFailedRetrieved() {
return numGetAppTimeoutsFailedRetrieved.value();
}


public int getRefreshQueuesFailedRetrieved() {
return numRefreshQueuesFailedRetrieved.value();
}

public int getRMNodeLabelsFailedRetrieved() {
return numGetRMNodeLabelsFailedRetrieved.value();
}
Expand Down Expand Up @@ -1245,6 +1268,11 @@ public void succeededGetAppTimeoutsRetrieved(long duration) {
getAppTimeoutsLatency.add(duration);
}

public void succeededRefreshQueuesRetrieved(long duration) {
totalSucceededRefreshQueuesRetrieved.add(duration);
getRefreshQueuesLatency.add(duration);
}

public void succeededGetRMNodeLabelsRetrieved(long duration) {
totalSucceededGetRMNodeLabelsRetrieved.add(duration);
getRMNodeLabelsLatency.add(duration);
Expand Down Expand Up @@ -1415,11 +1443,15 @@ public void incrGetAppTimeoutsFailedRetrieved() {
numGetAppTimeoutsFailedRetrieved.incr();
}

public void incrRefreshQueuesFailedRetrieved() {
numRefreshQueuesFailedRetrieved.incr();
}

public void incrGetRMNodeLabelsFailedRetrieved() {
numGetRMNodeLabelsFailedRetrieved.incr();
}

public void incrCheckUserAccessToQueueFailedRetrieved() {
numCheckUserAccessToQueueFailedRetrieved.incr();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.exceptions.YarnException;
Expand Down Expand Up @@ -299,6 +300,28 @@ public static RuntimeException logAndReturnRunTimeException(
return logAndReturnRunTimeException(null, errMsgFormat, args);
}

/**
* Throws an YarnRuntimeException due to an error.
*
* @param t the throwable raised in the called class.
* @param errMsgFormat the error message format string.
* @param args referenced by the format specifiers in the format string.
* @return YarnRuntimeException
*/
@Public
@Unstable
public static YarnRuntimeException logAndReturnYarnRunTimeException(
Throwable t, String errMsgFormat, Object... args) {
String msg = String.format(errMsgFormat, args);
if (t != null) {
LOG.error(msg, t);
return new YarnRuntimeException(msg, t);
} else {
LOG.error(msg);
return new YarnRuntimeException(msg);
}
}

/**
* Check applicationId is accurate.
*
Expand Down Expand Up @@ -491,4 +514,27 @@ public static SubClusterId getRandomActiveSubCluster(
// Randomly choose a SubCluster
return subClusterIds.get(rand.nextInt(subClusterIds.size()));
}

public static UserGroupInformation setupUser(final String userName) {
UserGroupInformation user = null;
try {
// If userName is empty, we will return UserGroupInformation.getCurrentUser.
// Do not create a proxy user if user name matches the user name on
// current UGI
if (userName == null || userName.trim().isEmpty()) {
user = UserGroupInformation.getCurrentUser();
} else if (UserGroupInformation.isSecurityEnabled()) {
user = UserGroupInformation.createProxyUser(userName, UserGroupInformation.getLoginUser());
} else if (userName.equalsIgnoreCase(UserGroupInformation.getCurrentUser().getUserName())) {
user = UserGroupInformation.getCurrentUser();
} else {
user = UserGroupInformation.createProxyUser(userName,
UserGroupInformation.getCurrentUser());
}
return user;
} catch (IOException e) {
throw RouterServerUtil.logAndReturnYarnRunTimeException(e,
"Error while creating Router RMAdmin Service for user : %s.", user);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@

package org.apache.hadoop.yarn.server.router.clientrm;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.router.RouterServerUtil;
import org.apache.hadoop.yarn.server.router.security.RouterDelegationTokenSecretManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -80,7 +78,7 @@ public Configuration getConf() {
*/
@Override
public void init(String userName) {
setupUser(userName);
this.user = RouterServerUtil.setupUser(userName);
if (this.nextInterceptor != null) {
this.nextInterceptor.init(userName);
}
Expand All @@ -104,30 +102,6 @@ public ClientRequestInterceptor getNextInterceptor() {
return this.nextInterceptor;
}

private void setupUser(String userName) {

try {
// Do not create a proxy user if user name matches the user name on
// current UGI
if (UserGroupInformation.isSecurityEnabled()) {
user = UserGroupInformation.createProxyUser(userName, UserGroupInformation.getLoginUser());
} else if (userName.equalsIgnoreCase(UserGroupInformation.getCurrentUser().getUserName())) {
user = UserGroupInformation.getCurrentUser();
} else {
user = UserGroupInformation.createProxyUser(userName,
UserGroupInformation.getCurrentUser());
}
} catch (IOException e) {
String message = "Error while creating Router ClientRM Service for user:";
if (user != null) {
message += ", user: " + user;
}

LOG.info(message);
throw new YarnRuntimeException(message, e);
}
}

@Override
public RouterDelegationTokenSecretManager getTokenSecretManager() {
return tokenSecretManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.hadoop.yarn.server.router.rmadmin;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.server.router.RouterServerUtil;

/**
* Implements the {@link RMAdminRequestInterceptor} interface and provides
Expand All @@ -31,6 +33,9 @@ public abstract class AbstractRMAdminRequestInterceptor
private Configuration conf;
private RMAdminRequestInterceptor nextInterceptor;

@SuppressWarnings("checkstyle:visibilitymodifier")
protected UserGroupInformation user = null;

/**
* Sets the {@link RMAdminRequestInterceptor} in the chain.
*/
Expand Down Expand Up @@ -63,9 +68,10 @@ public Configuration getConf() {
* Initializes the {@link RMAdminRequestInterceptor}.
*/
@Override
public void init(String user) {
public void init(String userName) {
this.user = RouterServerUtil.setupUser(userName);
if (this.nextInterceptor != null) {
this.nextInterceptor.init(user);
this.nextInterceptor.init(userName);
}
}

Expand Down
Loading