Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4107,6 +4107,16 @@ public static boolean isAclEnabled(Configuration conf) {
public static final long DEFAULT_ROUTER_WEBAPP_READ_TIMEOUT =
TimeUnit.SECONDS.toMillis(30);

/** The Kerberos keytab for the yarn router.*/
public static final String ROUTER_KEYTAB = ROUTER_PREFIX + "keytab.file";

/** The Kerberos principal for the yarn router.*/
public static final String ROUTER_PRINCIPAL = ROUTER_PREFIX + "kerberos.principal";

/** The Kerberos principal hostname for the yarn router.*/
public static final String ROUTER_KERBEROS_PRINCIPAL_HOSTNAME_KEY = ROUTER_PREFIX +
"kerberos.principal.hostname";

////////////////////////////////
// CSI Volume configs
////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4888,4 +4888,37 @@
default implementation LocalityAppPlacementAllocator is used.
</description>
</property>

<property>
<name>yarn.router.keytab.file</name>
<value></value>
<description>
The keytab file used by router to login as its
service principal. The principal name is configured with
dfs.federation.router.kerberos.principal.
</description>
</property>

<property>
<name>yarn.router.kerberos.principal</name>
<value></value>
<description>
The Router service principal. This is typically set to
router/[email protected]. Each Router will substitute _HOST with its
own fully qualified hostname at startup. The _HOST placeholder
allows using the same configuration setting on both Router setup.
</description>
</property>

<property>
<name>yarn.router.kerberos.principal.hostname</name>
<value></value>
<description>
Optional.
The hostname for the Router containing this
configuration file. Will be different for each machine.
Defaults to current hostname.
</description>
</property>

</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ public static SubClusterId newInstance(String subClusterId) {
return id;
}

@Private
@Unstable
public static SubClusterId newInstance(Integer subClusterId) {
SubClusterId id = Records.newRecord(SubClusterId.class);
id.setId(String.valueOf(subClusterId));
return id;
}

/**
* Get the string identifier of the <em>subcluster</em> which is unique across
* the federated cluster. The identifier is static, i.e. preserved across
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -609,4 +609,10 @@ public boolean equals(Object obj) {
protected interface Func<T, TResult> {
TResult invoke(T input) throws Exception;
}


@VisibleForTesting
public FederationStateStore getStateStore() {
return stateStore;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -382,8 +382,13 @@ protected <T> T createRMProxy(Class<T> protocol, Configuration config,
protected Token<AMRMTokenIdentifier> initializeUnmanagedAM(
ApplicationId appId) throws IOException, YarnException {
try {
UserGroupInformation appSubmitter =
UserGroupInformation.createRemoteUser(this.submitter);
UserGroupInformation appSubmitter;
if (UserGroupInformation.isSecurityEnabled()) {
appSubmitter = UserGroupInformation.createProxyUser(this.submitter,
UserGroupInformation.getLoginUser());
} else {
appSubmitter = UserGroupInformation.createRemoteUser(this.submitter);
}
this.rmClient = createRMProxy(ApplicationClientProtocol.class, this.conf,
appSubmitter, null);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,8 +459,13 @@ public void recover(Map<String, byte[]> recoveredDataMap) {
// Get the running containers from home RM, note that we will also get the
// AM container itself from here. We don't need it, but no harm to put the
// map as well.
UserGroupInformation appSubmitter = UserGroupInformation
.createRemoteUser(getApplicationContext().getUser());
UserGroupInformation appSubmitter;
if (UserGroupInformation.isSecurityEnabled()) {
appSubmitter = UserGroupInformation.createProxyUser(getApplicationContext().getUser(),
UserGroupInformation.getLoginUser());
} else {
appSubmitter = UserGroupInformation.createRemoteUser(getApplicationContext().getUser());
}
ApplicationClientProtocol rmClient =
createHomeRMProxy(getApplicationContext(),
ApplicationClientProtocol.class, appSubmitter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,19 @@
<artifactId>guice</artifactId>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minikdc</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@
package org.apache.hadoop.yarn.server.router;

import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.source.JvmMetrics;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.JvmPauseMonitor;
import org.apache.hadoop.util.ShutdownHookManager;
Expand Down Expand Up @@ -88,7 +91,8 @@ public Router() {
}

protected void doSecureLogin() throws IOException {
// TODO YARN-6539 Create SecureLogin inside Router
SecurityUtil.login(this.conf, YarnConfiguration.ROUTER_KEYTAB,
YarnConfiguration.ROUTER_PRINCIPAL, getHostName(this.conf));
}

@Override
Expand Down Expand Up @@ -195,4 +199,31 @@ public static void main(String[] argv) {
System.exit(-1);
}
}

@VisibleForTesting
public RouterClientRMService getClientRMProxyService() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

VisibleForTesting?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much for helping to review the code, I will modify the code.

return clientRMProxyService;
}

@VisibleForTesting
public RouterRMAdminService getRmAdminProxyService() {
return rmAdminProxyService;
}

/**
* Returns the hostname for this Router. If the hostname is not
* explicitly configured in the given config, then it is determined.
*
* @param config configuration
* @return the hostname (NB: may not be a FQDN)
* @throws UnknownHostException if the hostname cannot be determined
*/
private String getHostName(Configuration config)
throws UnknownHostException {
String name = config.get(YarnConfiguration.ROUTER_KERBEROS_PRINCIPAL_HOSTNAME_KEY);
if (name == null) {
name = InetAddress.getLocalHost().getHostName();
}
return name;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,9 @@ private void setupUser(String userName) {
try {
// Do not create a proxy user if user name matches the user name on
// current UGI
if (userName.equalsIgnoreCase(
UserGroupInformation.getCurrentUser().getUserName())) {
if (UserGroupInformation.isSecurityEnabled()) {
user = UserGroupInformation.createProxyUser(userName, UserGroupInformation.getLoginUser());
} else if (userName.equalsIgnoreCase(UserGroupInformation.getCurrentUser().getUserName())) {
user = UserGroupInformation.getCurrentUser();
} else {
user = UserGroupInformation.createProxyUser(userName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1623,4 +1623,14 @@ protected SubClusterId getApplicationHomeSubCluster(
String.format("Can't Found applicationId = %s in any sub clusters", applicationId);
throw new YarnException(errorMsg);
}

@VisibleForTesting
public FederationStateStoreFacade getFederationFacade() {
return federationFacade;
}

@VisibleForTesting
public Map<SubClusterId, ApplicationClientProtocol> getClientRMProxies() {
return clientRMProxies;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ public GetNodesToAttributesResponse getNodesToAttributes(
}

@VisibleForTesting
protected RequestInterceptorChainWrapper getInterceptorChain()
public RequestInterceptorChainWrapper getInterceptorChain()
throws IOException {
String user = UserGroupInformation.getCurrentUser().getUserName();
RequestInterceptorChainWrapper chain = userPipelineMap.get(user);
Expand Down Expand Up @@ -616,4 +616,9 @@ protected void finalize() {
rootInterceptor.shutdown();
}
}

@VisibleForTesting
public Map<String, RequestInterceptorChainWrapper> getUserPipelineMap() {
return userPipelineMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,9 @@ public void init(String userName) {
try {
// Do not create a proxy user if user name matches the user name on
// current UGI
if (userName.equalsIgnoreCase(
UserGroupInformation.getCurrentUser().getUserName())) {
if (UserGroupInformation.isSecurityEnabled()) {
user = UserGroupInformation.createProxyUser(userName, UserGroupInformation.getLoginUser());
} else if (userName.equalsIgnoreCase(UserGroupInformation.getCurrentUser().getUserName())) {
user = UserGroupInformation.getCurrentUser();
} else {
user = UserGroupInformation.createProxyUser(userName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ private List<String> getInterceptorClassNames(Configuration conf) {
}

@VisibleForTesting
protected RequestInterceptorChainWrapper getInterceptorChain()
public RequestInterceptorChainWrapper getInterceptorChain()
throws IOException {
String user = UserGroupInformation.getCurrentUser().getUserName();
RequestInterceptorChainWrapper chain = userPipelineMap.get(user);
Expand Down
Loading