Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ public final class RouterMetrics {
private MutableGaugeInt numGetRMNodeLabelsFailedRetrieved;
@Metric("# of checkUserAccessToQueue failed to be retrieved")
private MutableGaugeInt numCheckUserAccessToQueueFailedRetrieved;
@Metric("# of getDelegationToken failed to be retrieved")
private MutableGaugeInt numGetDelegationTokenFailedRetrieved;
@Metric("# of renewDelegationToken failed to be retrieved")
private MutableGaugeInt numRenewDelegationTokenFailedRetrieved;
@Metric("# of renewDelegationToken failed to be retrieved")
private MutableGaugeInt numCancelDelegationTokenFailedRetrieved;

// Aggregate metrics are shared, and don't have to be looked up per call
@Metric("Total number of successful Submitted apps and latency(ms)")
Expand Down Expand Up @@ -211,6 +217,12 @@ public final class RouterMetrics {
private MutableRate totalSucceededGetRMNodeLabelsRetrieved;
@Metric("Total number of successful Retrieved CheckUserAccessToQueue and latency(ms)")
private MutableRate totalSucceededCheckUserAccessToQueueRetrieved;
@Metric("Total number of successful Retrieved GetDelegationToken and latency(ms)")
private MutableRate totalSucceededGetDelegationTokenRetrieved;
@Metric("Total number of successful Retrieved RenewDelegationToken and latency(ms)")
private MutableRate totalSucceededRenewDelegationTokenRetrieved;
@Metric("Total number of successful Retrieved CancelDelegationToken and latency(ms)")
private MutableRate totalSucceededCancelDelegationTokenRetrieved;

/**
* Provide quantile counters for all latencies.
Expand Down Expand Up @@ -257,6 +269,9 @@ public final class RouterMetrics {
private MutableQuantiles getAppTimeoutsLatency;
private MutableQuantiles getRMNodeLabelsLatency;
private MutableQuantiles checkUserAccessToQueueLatency;
private MutableQuantiles getDelegationTokenLatency;
private MutableQuantiles renewDelegationTokenLatency;
private MutableQuantiles cancelDelegationTokenLatency;

private static volatile RouterMetrics instance = null;
private static MetricsRegistry registry;
Expand Down Expand Up @@ -415,6 +430,15 @@ private RouterMetrics() {

checkUserAccessToQueueLatency = registry.newQuantiles("checkUserAccessToQueueLatency",
"latency of get apptimeouts timeouts", "ops", "latency", 10);

getDelegationTokenLatency = registry.newQuantiles("getDelegationTokenLatency",
"latency of get delegation token timeouts", "ops", "latency", 10);

renewDelegationTokenLatency = registry.newQuantiles("renewDelegationTokenLatency",
"latency of renew delegation token timeouts", "ops", "latency", 10);

cancelDelegationTokenLatency = registry.newQuantiles("cancelDelegationTokenLatency",
"latency of cancel delegation token timeouts", "ops", "latency", 10);
}

public static RouterMetrics getMetrics() {
Expand Down Expand Up @@ -642,10 +666,25 @@ public long getNumSucceededGetRMNodeLabelsRetrieved() {
}

@VisibleForTesting
public long getNumSucceededCheckUserAccessToQueueRetrievedRetrieved() {
public long getNumSucceededCheckUserAccessToQueueRetrieved() {
return totalSucceededCheckUserAccessToQueueRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public long getNumSucceededGetDelegationTokenRetrieved() {
return totalSucceededGetDelegationTokenRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public long getNumSucceededRenewDelegationTokenRetrieved() {
return totalSucceededRenewDelegationTokenRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public long getNumSucceededCancelDelegationTokenRetrieved() {
return totalSucceededCancelDelegationTokenRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public double getLatencySucceededAppsCreated() {
return totalSucceededAppsCreated.lastStat().mean();
Expand Down Expand Up @@ -856,6 +895,21 @@ public double getLatencySucceededCheckUserAccessToQueueRetrieved() {
return totalSucceededCheckUserAccessToQueueRetrieved.lastStat().mean();
}

@VisibleForTesting
public double getLatencySucceededGetDelegationTokenRetrieved() {
return totalSucceededGetDelegationTokenRetrieved.lastStat().mean();
}

@VisibleForTesting
public double getLatencySucceededRenewDelegationTokenRetrieved() {
return totalSucceededRenewDelegationTokenRetrieved.lastStat().mean();
}

@VisibleForTesting
public double getLatencySucceededCancelDelegationTokenRetrieved() {
return totalSucceededCancelDelegationTokenRetrieved.lastStat().mean();
}

@VisibleForTesting
public int getAppsFailedCreated() {
return numAppsFailedCreated.value();
Expand Down Expand Up @@ -1045,6 +1099,18 @@ public int getCheckUserAccessToQueueFailedRetrieved() {
return numCheckUserAccessToQueueFailedRetrieved.value();
}

public int getDelegationTokenFailedRetrieved() {
return numGetDelegationTokenFailedRetrieved.value();
}

public int getRenewDelegationTokenFailedRetrieved() {
return numRenewDelegationTokenFailedRetrieved.value();
}

public int getCancelDelegationTokenFailedRetrieved() {
return numCancelDelegationTokenFailedRetrieved.value();
}

public void succeededAppsCreated(long duration) {
totalSucceededAppsCreated.add(duration);
getNewApplicationLatency.add(duration);
Expand Down Expand Up @@ -1255,6 +1321,21 @@ public void succeededCheckUserAccessToQueueRetrieved(long duration) {
checkUserAccessToQueueLatency.add(duration);
}

public void succeededGetDelegationTokenRetrieved(long duration) {
totalSucceededGetDelegationTokenRetrieved.add(duration);
getDelegationTokenLatency.add(duration);
}

public void succeededRenewDelegationTokenRetrieved(long duration) {
totalSucceededRenewDelegationTokenRetrieved.add(duration);
renewDelegationTokenLatency.add(duration);
}

public void succeededCancelDelegationTokenRetrieved(long duration) {
totalSucceededCancelDelegationTokenRetrieved.add(duration);
cancelDelegationTokenLatency.add(duration);
}

public void incrAppsFailedCreated() {
numAppsFailedCreated.incr();
}
Expand Down Expand Up @@ -1422,4 +1503,16 @@ public void incrGetRMNodeLabelsFailedRetrieved() {
public void incrCheckUserAccessToQueueFailedRetrieved() {
numCheckUserAccessToQueueFailedRetrieved.incr();
}

public void incrGetDelegationTokenFailedRetrieved() {
numGetDelegationTokenFailedRetrieved.incr();
}

public void incrRenewDelegationTokenFailedRetrieved() {
numRenewDelegationTokenFailedRetrieved.incr();
}

public void incrCancelDelegationTokenFailedRetrieved() {
numCancelDelegationTokenFailedRetrieved.incr();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
Expand All @@ -42,6 +45,7 @@
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.EnumSet;
import java.io.IOException;

/**
Expand Down Expand Up @@ -491,4 +495,25 @@ public static SubClusterId getRandomActiveSubCluster(
// Randomly choose a SubCluster
return subClusterIds.get(rand.nextInt(subClusterIds.size()));
}

public static boolean isAllowedDelegationTokenOp() throws IOException {
if (UserGroupInformation.isSecurityEnabled()) {
return EnumSet.of(UserGroupInformation.AuthenticationMethod.KERBEROS,
UserGroupInformation.AuthenticationMethod.KERBEROS_SSL,
UserGroupInformation.AuthenticationMethod.CERTIFICATE)
.contains(UserGroupInformation.getCurrentUser()
.getRealAuthenticationMethod());
} else {
return true;
}
}

public static String getRenewerForToken(Token<RMDelegationTokenIdentifier> token)
throws IOException {
UserGroupInformation user = UserGroupInformation.getCurrentUser();
UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
// we can always renew our own tokens
return loginUser.getUserName().equals(user.getUserName())
? token.decodeIdentifier().getRenewer().toString() : user.getShortUserName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.lang.reflect.Method;
Expand All @@ -40,7 +41,6 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.security.UserGroupInformation;
Expand Down Expand Up @@ -118,9 +118,13 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;

import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.policies.RouterPolicyFacade;
Expand All @@ -136,6 +140,7 @@
import org.apache.hadoop.yarn.server.router.RouterServerUtil;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.MonotonicClock;
import org.apache.hadoop.yarn.util.Records;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -1474,19 +1479,104 @@ public GetContainersResponse getContainers(GetContainersRequest request)
@Override
public GetDelegationTokenResponse getDelegationToken(
GetDelegationTokenRequest request) throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");

if (request == null || request.getRenewer() == null) {
routerMetrics.incrGetDelegationTokenFailedRetrieved();
RouterServerUtil.logAndThrowException(
"Missing getDelegationToken request or Renewer.", null);
}

try {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid this empty line

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will fix it.

// Verify that the connection is kerberos authenticated
if (!RouterServerUtil.isAllowedDelegationTokenOp()) {
routerMetrics.incrGetDelegationTokenFailedRetrieved();
throw new IOException(
"Delegation Token can be issued only with kerberos authentication.");
}

long startTime = clock.getTime();
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
Text owner = new Text(ugi.getUserName());
Text realUser = null;
if (ugi.getRealUser() != null) {
realUser = new Text(ugi.getRealUser().getUserName());
}

RMDelegationTokenIdentifier tokenIdentifier =
new RMDelegationTokenIdentifier(owner, new Text(request.getRenewer()), realUser);
Token<RMDelegationTokenIdentifier> realRMDToken =
new Token<>(tokenIdentifier, this.getTokenSecretManager());

org.apache.hadoop.yarn.api.records.Token routerRMDTToken =
BuilderUtils.newDelegationToken(realRMDToken.getIdentifier(),
realRMDToken.getKind().toString(),
realRMDToken.getPassword(), realRMDToken.getService().toString());

long stopTime = clock.getTime();
routerMetrics.succeededGetDelegationTokenRetrieved((stopTime - startTime));
return GetDelegationTokenResponse.newInstance(routerRMDTToken);
} catch(IOException e) {
routerMetrics.incrGetDelegationTokenFailedRetrieved();
throw new YarnException(e);
}
}

@Override
public RenewDelegationTokenResponse renewDelegationToken(
RenewDelegationTokenRequest request) throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
try {

if (!RouterServerUtil.isAllowedDelegationTokenOp()) {
routerMetrics.incrRenewDelegationTokenFailedRetrieved();
throw new IOException(
"Delegation Token can be renewed only with kerberos authentication");
}

long startTime = clock.getTime();
org.apache.hadoop.yarn.api.records.Token protoToken = request.getDelegationToken();
Token<RMDelegationTokenIdentifier> token = new Token<>(
protoToken.getIdentifier().array(), protoToken.getPassword().array(),
new Text(protoToken.getKind()), new Text(protoToken.getService()));
String user = RouterServerUtil.getRenewerForToken(token);
long nextExpTime = this.getTokenSecretManager().renewToken(token, user);
RenewDelegationTokenResponse renewResponse =
Records.newRecord(RenewDelegationTokenResponse.class);
renewResponse.setNextExpirationTime(nextExpTime);
long stopTime = clock.getTime();
routerMetrics.succeededRenewDelegationTokenRetrieved((stopTime - startTime));
return renewResponse;

} catch (IOException e) {
routerMetrics.incrRenewDelegationTokenFailedRetrieved();
throw new YarnException(e);
}
}

@Override
public CancelDelegationTokenResponse cancelDelegationToken(
CancelDelegationTokenRequest request) throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
try {
if (!RouterServerUtil.isAllowedDelegationTokenOp()) {
routerMetrics.incrCancelDelegationTokenFailedRetrieved();
throw new IOException(
"Delegation Token can be cancelled only with kerberos authentication");
}

long startTime = clock.getTime();
org.apache.hadoop.yarn.api.records.Token protoToken = request.getDelegationToken();
Token<RMDelegationTokenIdentifier> token = new Token<>(
protoToken.getIdentifier().array(), protoToken.getPassword().array(),
new Text(protoToken.getKind()), new Text(protoToken.getService()));
String user = UserGroupInformation.getCurrentUser().getUserName();
this.getTokenSecretManager().cancelToken(token, user);
long stopTime = clock.getTime();
routerMetrics.succeededCancelDelegationTokenRetrieved((stopTime - startTime));
return Records.newRecord(CancelDelegationTokenResponse.class);
} catch (IOException e) {
routerMetrics.incrCancelDelegationTokenFailedRetrieved();
throw new YarnException(e);
}
}

@Override
Expand Down Expand Up @@ -1998,4 +2088,5 @@ protected int getNumMaxThreads(Configuration conf) {
public void setNumSubmitRetries(int numSubmitRetries) {
this.numSubmitRetries = numSubmitRetries;
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much for reviewing the code, I will fix it.

}
Loading