Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -2215,6 +2215,39 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String DEFAULT_NM_NONSECURE_MODE_USER_PATTERN =
"^[_.A-Za-z0-9][-@_.A-Za-z0-9]{0,255}?[$]?$";

/**
* Whether or not to use precreated pool of local users in secure mode.
*/
public static final String NM_SECURE_MODE_USE_POOL_USER = NM_PREFIX +
"linux-container-executor.secure-mode.use-pool-user";

public static final boolean DEFAULT_NM_SECURE_MODE_USE_POOL_USER = false;

/**
* The number of pool local users. If set to -1, we'll take the value from:
* NM_PREFIX + "resource.cpu-vcores"
*/
public static final String NM_SECURE_MODE_POOL_USER_COUNT = NM_PREFIX +
"linux-container-executor.secure-mode.pool-user-count";

public static final int DEFAULT_NM_SECURE_MODE_POOL_USER_COUNT = -1;

/**
* The prefix of the local pool users can be used by Yarn Secure Container.
* The number of local pool users to use is specified by:
*
* For example, if prefix is "user" and pool-user-count configured to 20,
* then local user names are:
* user0
* user1
* ...
* user19
*/
public static final String NM_SECURE_MODE_POOL_USER_PREFIX = NM_PREFIX +
"linux-container-executor.secure-mode.pool-user-prefix";

public static final String DEFAULT_NM_SECURE_MODE_POOL_USER_PREFIX = "user";

/** The type of resource enforcement to use with the
* linux container executor.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ public class LinuxContainerExecutor extends ContainerExecutor {
private boolean containerSchedPriorityIsSet = false;
private int containerSchedPriorityAdjustment = 0;
private boolean containerLimitUsers;
private SecureModeLocalUserAllocator secureModeLocalUserAllocator;
private ResourceHandler resourceHandlerChain;
private LinuxContainerRuntime linuxContainerRuntime;
private Context nmContext;
Expand Down Expand Up @@ -214,6 +215,12 @@ public void setConf(Configuration conf) {
LOG.warn("{}: impersonation without authentication enabled",
YarnConfiguration.NM_NONSECURE_MODE_LIMIT_USERS);
}
boolean secureModeUseLocalUser = UserGroupInformation.isSecurityEnabled() &&
conf.getBoolean(YarnConfiguration.NM_SECURE_MODE_USE_POOL_USER,
YarnConfiguration.DEFAULT_NM_SECURE_MODE_USE_POOL_USER);
if (secureModeUseLocalUser) {
secureModeLocalUserAllocator = SecureModeLocalUserAllocator.getInstance(conf);
}
}

private LCEResourcesHandler getResourcesHandler(Configuration conf) {
Expand Down Expand Up @@ -242,8 +249,14 @@ void verifyUsernamePattern(String user) {
}

String getRunAsUser(String user) {
if (UserGroupInformation.isSecurityEnabled() ||
!containerLimitUsers) {
if (UserGroupInformation.isSecurityEnabled()) {
if (secureModeLocalUserAllocator != null) {
return secureModeLocalUserAllocator.getRunAsLocalUser(user);
}
else {
return user;
}
} else if (!containerLimitUsers) {
return user;
} else {
return nonsecureLocalUser;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,294 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.server.nodemanager;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class LocalUserInfo {
String localUser;
int localUserIndex;
int appCount;
int fileOpCount;
int logHandlingCount;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whitespace:end of line

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whitespace:end of line

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whitespace:end of line

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whitespace:end of line

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whitespace:end of line

public LocalUserInfo(String user, int userIndex) {
localUser = user;
localUserIndex = userIndex;
appCount = 0;
fileOpCount = 0;
logHandlingCount = 0;
}
}

/**
* Allocate local user to an appUser from a pool of precreated local users.
* Maintains the appUser to local user mapping, until:
* a) all applications of the appUser is finished;
* b) all FileDeletionTask for that appUser is executed;
* c) all log aggregation/handling requests for appUser's applications are done
*
* For now allocation is only maintained in memory so it does not support
* node manager recovery mode.
*/
public class SecureModeLocalUserAllocator {
public static final String NONEXISTUSER = "nonexistuser";
private static final Logger LOG =
LoggerFactory.getLogger(SecureModeLocalUserAllocator.class);
private static SecureModeLocalUserAllocator instance;
private Map<String, LocalUserInfo> appUserToLocalUser;
private ArrayList<Boolean> allocated;
private int localUserCount;
private String localUserPrefix;

SecureModeLocalUserAllocator(Configuration conf) {
if (conf.getBoolean(YarnConfiguration.NM_RECOVERY_ENABLED,
YarnConfiguration.DEFAULT_NM_RECOVERY_ENABLED)) {
String errMsg = "Invalidate configuration combination: " +
YarnConfiguration.NM_RECOVERY_ENABLED + "=true, " +
YarnConfiguration.NM_SECURE_MODE_USE_POOL_USER + "=true";
throw new RuntimeException(errMsg);
}
localUserPrefix = conf.get(
YarnConfiguration.NM_SECURE_MODE_POOL_USER_PREFIX,
YarnConfiguration.DEFAULT_NM_SECURE_MODE_POOL_USER_PREFIX);
localUserCount = conf.getInt(
YarnConfiguration.NM_SECURE_MODE_POOL_USER_COUNT,
YarnConfiguration.DEFAULT_NM_SECURE_MODE_POOL_USER_COUNT);
if (localUserCount == -1) {
localUserCount = conf.getInt(YarnConfiguration.NM_VCORES,
YarnConfiguration.DEFAULT_NM_VCORES);
}
allocated = new ArrayList<Boolean>(localUserCount);
appUserToLocalUser = new HashMap<String, LocalUserInfo>(localUserCount);
for (int i=0; i<localUserCount; ++i) {
allocated.add(false);
}
}

public static SecureModeLocalUserAllocator getInstance(Configuration conf) {
if (instance == null) {
synchronized (SecureModeLocalUserAllocator.class) {
if (instance == null) {
instance = new SecureModeLocalUserAllocator(conf);
}
}
}
return instance;
}

/**
* Get allocated local user for the appUser.
*/
public String getRunAsLocalUser(String appUser) {
if (!appUserToLocalUser.containsKey(appUser)) {
LOG.error("Cannot find runas local user for appUser " + appUser +
", return " + NONEXISTUSER);
return NONEXISTUSER;
}
return appUserToLocalUser.get(appUser).localUser;
}

/**
* Allocate a local user for appUser to run application appId
*/
synchronized public void allocate(String appUser, String appId) {
LOG.info("Allocating local user for " + appUser + " for " + appId);
if (!checkAndAllocateAppUser(appUser)) {
return;
}
LocalUserInfo localUserInfo = appUserToLocalUser.get(appUser);
localUserInfo.appCount++;
LOG.info("Incremented appCount for appUser " + appUser +
" to " + localUserInfo.appCount);
}

/**
* Deallocate local user for appUser for application appId.
*/
synchronized public void deallocate(String appUser, String appId) {
LOG.info("Deallocating local user for " + appUser + " for " + appId);
if (!appUserToLocalUser.containsKey(appUser)) {
// This should never happen
String errMsg = "deallocate: No local user allocation for appUser " +
appUser + ", appId " + appId;
LOG.error(errMsg);
return;
}

LocalUserInfo localUserInfo = appUserToLocalUser.get(appUser);
localUserInfo.appCount--;
LOG.info("Decremented appCount for appUser " + appUser +
" to " + localUserInfo.appCount);

checkAndDeallocateAppUser(appUser, localUserInfo);
}

/**
* Increment reference count for pending file operations
*
* Note that it is allowed to call incrementFileOpCount() before allocate().
* For example, during node manager restarts if there are old folders created
* by these local pool users, we should allocate the same named local user to
* the requested appUser, so that the local user is not allocated to new
* containers to use until the old folder deletions are done.
* The old app folders clean up code is in:
* ResourceLocalizationService.cleanUpFilesPerUserDir()
*/
synchronized public void incrementFileOpCount(String appUser) {
if (!checkAndAllocateAppUser(appUser)) {
return;
}
LocalUserInfo localUserInfo = appUserToLocalUser.get(appUser);
localUserInfo.fileOpCount++;
LOG.info("Incremented fileOpCount for appuser " + appUser +
" to " + localUserInfo.fileOpCount);
}

/**
* Decrement reference count for pending file operations
*/
synchronized public void decrementFileOpCount(String appUser) {
if (!appUserToLocalUser.containsKey(appUser)) {
String errMsg =
"decrementFileOpCount: No local user allocation for appUser " +
appUser;
LOG.error(errMsg);
return;
}
LocalUserInfo localUserInfo = appUserToLocalUser.get(appUser);
localUserInfo.fileOpCount--;
LOG.info("Decremented fileOpCount for appUser " + appUser +
" to " + localUserInfo.fileOpCount);

checkAndDeallocateAppUser(appUser, localUserInfo);
}

/**
* Increment pending log handling (or aggregation) per application
*/
synchronized public void incrementLogHandlingCount(String appUser) {
if (!appUserToLocalUser.containsKey(appUser)) {
String errMsg =
"incrementLogHandlingCount: No local user allocation for appUser " +
appUser;
LOG.error(errMsg);
return;
}
LocalUserInfo localUserInfo = appUserToLocalUser.get(appUser);
localUserInfo.logHandlingCount++;
LOG.info("Incremented logHandlingCount for appUser " + appUser +
" to " + localUserInfo.logHandlingCount);
}

/**
* Decrement pending log handling (or aggregation) per application
*/
synchronized public void decrementLogHandlingCount(String appUser) {
if (!appUserToLocalUser.containsKey(appUser)) {
String errMsg =
"decrementLogHandlingCount: No local user allocation for appUser " +
appUser;
LOG.error(errMsg);
return;
}
LocalUserInfo localUserInfo = appUserToLocalUser.get(appUser);
localUserInfo.logHandlingCount--;
LOG.info("Decremented logHandlingCount for appUser " + appUser +
" to " + localUserInfo.logHandlingCount);

checkAndDeallocateAppUser(appUser, localUserInfo);
}

/**
* Check if a given user name is local pool user
* if yes return index, otherwise return -1
*/
private int localUserIndex(String user) {
int result = -1;
if (!user.startsWith(localUserPrefix)) {
return result;
}
try {
result = Integer.parseInt(user.substring(localUserPrefix.length()));
}
catch(NumberFormatException e) {
result = -1;
}
if (result >= localUserCount) {
result = -1;
}
return result;
}

/**
* check if the appUser mapping exists, if not then allocate a local user.
* return true if appUser mapping exists or created,
* return false if not able to allocate local user.
*/
private boolean checkAndAllocateAppUser(String appUser) {
if (appUserToLocalUser.containsKey(appUser)) {
// If appUser exists, don't need to allocate again
return true;
}

LOG.info("Allocating local user for appUser " + appUser);
// check if the appUser is one of the local user, if yes just use it
int index = localUserIndex(appUser);
if (index == -1) {
// otherwise find the first empty slot in the pool of local users
for (int i=0; i<localUserCount; ++i) {
if (!allocated.get(i)) {
index = i;
break;
}
}
}
if (index == -1) {
String errMsg = "Cannot allocate local users from a pool of " +
localUserCount;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whitespace:end of line

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whitespace:end of line

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whitespace:end of line

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whitespace:end of line

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whitespace:end of line

LOG.error(errMsg);
return false;
}
else {
allocated.set(index, true);
}
appUserToLocalUser.put(appUser,
new LocalUserInfo(localUserPrefix + index, index));
LOG.info("Allocated local user index " + index + " for appUser "
+ appUser);
return true;
}

private void checkAndDeallocateAppUser(String appUser, LocalUserInfo localUserInfo) {
if (localUserInfo.fileOpCount <= 0 &&
localUserInfo.appCount <= 0 &&
localUserInfo.logHandlingCount <= 0) {
appUserToLocalUser.remove(appUser);
allocated.set(localUserInfo.localUserIndex, false);
LOG.info("Deallocated local user index " + localUserInfo.localUserIndex +
" for appUser " + appUser);
}
}
}
Loading