Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -1017,4 +1017,93 @@ public void addOrUpdateApplicationHomeSubCluster(ApplicationId applicationId,
updateApplicationHomeSubCluster(subClusterId, applicationId, appHomeSubCluster);
}
}

/**
* Exists ReservationHomeSubCluster Mapping.
*
* @param reservationId reservationId
* @return true - exist, false - not exist
*/
public boolean existsReservationHomeSubCluster(ReservationId reservationId) {
try {
SubClusterId subClusterId = getReservationHomeSubCluster(reservationId);
if (subClusterId != null) {
return true;
}
} catch (YarnException e) {
LOG.warn("get homeSubCluster by reservationId = {} error.", reservationId, e);
}
return false;
}

/**
* Save Reservation And HomeSubCluster Mapping.
*
* @param reservationId reservationId
* @param homeSubCluster homeSubCluster
* @throws YarnException on failure
*/
public void addReservationHomeSubCluster(ReservationId reservationId,
ReservationHomeSubCluster homeSubCluster) throws YarnException {
try {
// persist the mapping of reservationId and the subClusterId which has
// been selected as its home
addReservationHomeSubCluster(homeSubCluster);
} catch (YarnException e) {
String msg = String.format(
"Unable to insert the ReservationId %s into the FederationStateStore.", reservationId);
throw new YarnException(msg, e);
}
}

/**
* Update Reservation And HomeSubCluster Mapping.
*
* @param subClusterId subClusterId
* @param reservationId reservationId
* @param homeSubCluster homeSubCluster
* @throws YarnException on failure
*/
public void updateReservationHomeSubCluster(SubClusterId subClusterId,
ReservationId reservationId, ReservationHomeSubCluster homeSubCluster) throws YarnException {
try {
// update the mapping of reservationId and the home subClusterId to
// the new subClusterId we have selected
updateReservationHomeSubCluster(homeSubCluster);
} catch (YarnException e) {
SubClusterId subClusterIdInStateStore = getReservationHomeSubCluster(reservationId);
if (subClusterId == subClusterIdInStateStore) {
LOG.info("Reservation {} already submitted on SubCluster {}.", reservationId, subClusterId);
} else {
String msg = String.format(
"Unable to update the ReservationId %s into the FederationStateStore.", reservationId);
throw new YarnException(msg, e);
}
}
}

/**
* Add or Update ReservationHomeSubCluster.
*
* @param reservationId reservationId.
* @param subClusterId homeSubClusterId, this is selected by strategy.
* @param retryCount number of retries.
* @throws YarnException yarn exception.
*/
public void addOrUpdateReservationHomeSubCluster(ReservationId reservationId,
SubClusterId subClusterId, int retryCount) throws YarnException {
Boolean exists = existsReservationHomeSubCluster(reservationId);
ReservationHomeSubCluster reservationHomeSubCluster =
ReservationHomeSubCluster.newInstance(reservationId, subClusterId);
if (!exists || retryCount == 0) {
// persist the mapping of reservationId and the subClusterId which has
// been selected as its home.
addReservationHomeSubCluster(reservationId, reservationHomeSubCluster);
} else {
// update the mapping of reservationId and the home subClusterId to
// the new subClusterId we have selected.
updateReservationHomeSubCluster(subClusterId, reservationId,
reservationHomeSubCluster);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ReservationRequests;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDefinitionInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestInfo;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
Expand Down Expand Up @@ -57,6 +67,8 @@ public final class RouterServerUtil {

private static final String EPOCH_PREFIX = "e";

private static final String RESERVEIDSTR_PREFIX = "reservation_";

/** Disable constructor. */
private RouterServerUtil() {
}
Expand Down Expand Up @@ -494,6 +506,15 @@ public static String getRenewerForToken(Token<RMDelegationTokenIdentifier> token
? token.decodeIdentifier().getRenewer().toString() : user.getShortUserName();
}

/**
* Set User information.
*
* If the username is empty, we will use the Yarn Router user directly.
* Do not create a proxy user if userName matches the userName on current UGI.
*
* @param userName userName.
* @return UserGroupInformation.
*/
public static UserGroupInformation setupUser(final String userName) {
UserGroupInformation user = null;
try {
Expand All @@ -513,7 +534,94 @@ public static UserGroupInformation setupUser(final String userName) {
return user;
} catch (IOException e) {
throw RouterServerUtil.logAndReturnYarnRunTimeException(e,
"Error while creating Router RMAdmin Service for user : %s.", user);
"Error while creating Router Service for user : %s.", user);
}
}

/**
* Check reservationId is accurate.
*
* We need to ensure that reservationId cannot be empty and
* can be converted to ReservationId object normally.
*
* @param reservationId reservationId.
* @throws IllegalArgumentException If the format of the reservationId is not accurate,
* an IllegalArgumentException needs to be thrown.
*/
@Public
@Unstable
public static void validateReservationId(String reservationId) throws IllegalArgumentException {

if (reservationId == null || reservationId.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the reservationId is empty or null.");
}

if (!reservationId.startsWith(RESERVEIDSTR_PREFIX)) {
throw new IllegalArgumentException("Invalid ReservationId: " + reservationId);
}

String[] resFields = reservationId.split("_");
if (resFields.length != 3) {
throw new IllegalArgumentException("Invalid ReservationId: " + reservationId);
}

String clusterTimestamp = resFields[1];
String id = resFields[2];
if (!NumberUtils.isDigits(id) || !NumberUtils.isDigits(clusterTimestamp)) {
throw new IllegalArgumentException("Invalid ReservationId: " + reservationId);
}
}

/**
* Convert ReservationDefinitionInfo to ReservationDefinition.
*
* @param definitionInfo ReservationDefinitionInfo Object.
* @return ReservationDefinition.
*/
public static ReservationDefinition convertReservationDefinition(
ReservationDefinitionInfo definitionInfo) {
if (definitionInfo == null || definitionInfo.getReservationRequests() == null
|| definitionInfo.getReservationRequests().getReservationRequest() == null
|| definitionInfo.getReservationRequests().getReservationRequest().isEmpty()) {
throw new RuntimeException("definitionInfo Or ReservationRequests is Null.");
}

// basic variable
long arrival = definitionInfo.getArrival();
long deadline = definitionInfo.getDeadline();

// ReservationRequests reservationRequests
String name = definitionInfo.getReservationName();
String recurrenceExpression = definitionInfo.getRecurrenceExpression();
Priority priority = Priority.newInstance(definitionInfo.getPriority());

// reservation requests info
List<ReservationRequest> reservationRequestList = new ArrayList<>();

ReservationRequestsInfo reservationRequestsInfo = definitionInfo.getReservationRequests();

List<ReservationRequestInfo> reservationRequestInfos =
reservationRequestsInfo.getReservationRequest();

for (ReservationRequestInfo resRequestInfo : reservationRequestInfos) {
ResourceInfo resourceInfo = resRequestInfo.getCapability();
Resource capability =
Resource.newInstance(resourceInfo.getMemorySize(), resourceInfo.getvCores());
ReservationRequest reservationRequest = ReservationRequest.newInstance(capability,
resRequestInfo.getNumContainers(), resRequestInfo.getMinConcurrency(),
resRequestInfo.getDuration());
reservationRequestList.add(reservationRequest);
}

ReservationRequestInterpreter[] values = ReservationRequestInterpreter.values();
ReservationRequestInterpreter reservationRequestInterpreter =
values[reservationRequestsInfo.getReservationRequestsInterpreter()];
ReservationRequests reservationRequests = ReservationRequests.newInstance(
reservationRequestList, reservationRequestInterpreter);

ReservationDefinition definition = ReservationDefinition.newInstance(
arrival, deadline, reservationRequests, name, recurrenceExpression, priority);

return definition;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;

import java.io.IOException;
import org.apache.hadoop.yarn.server.router.RouterServerUtil;

/**
* Extends the RequestInterceptor class and provides common functionality which
Expand Down Expand Up @@ -68,7 +66,7 @@ public Configuration getConf() {
*/
@Override
public void init(String userName) {
setupUser(userName);
this.user = RouterServerUtil.setupUser(userName);
if (this.nextInterceptor != null) {
this.nextInterceptor.init(userName);
}
Expand All @@ -92,34 +90,6 @@ public RESTRequestInterceptor getNextInterceptor() {
return this.nextInterceptor;
}

/**
* Set User information.
*
* If the username is empty, we will use the Yarn Router user directly.
* Do not create a proxy user if user name matches the user name on current UGI.
* @param userName userName.
*/
private void setupUser(final String userName) {
try {
if (userName == null || userName.isEmpty()) {
user = UserGroupInformation.getCurrentUser();
} else if (UserGroupInformation.isSecurityEnabled()) {
user = UserGroupInformation.createProxyUser(userName, UserGroupInformation.getLoginUser());
} else if (userName.equalsIgnoreCase(UserGroupInformation.getCurrentUser().getUserName())) {
user = UserGroupInformation.getCurrentUser();
} else {
user = UserGroupInformation.createProxyUser(userName,
UserGroupInformation.getCurrentUser());
}
} catch (IOException e) {
String message = "Error while creating Router RMAdmin Service for user:";
if (user != null) {
message += ", user: " + user;
}
throw new YarnRuntimeException(message, e);
}
}

public UserGroupInformation getUser() {
return user;
}
Expand Down
Loading