Skip to content

Commit

Permalink
create proxied file system using multiple tokens
Browse files Browse the repository at this point in the history
  • Loading branch information
arjun4084346 committed Aug 17, 2023
1 parent 69d7e0f commit 99f0507
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -205,7 +207,9 @@ public static FileSystem getProxiedFileSystemUsingToken(@NonNull final String us

/**
* Cached version of {@link ProxiedFileSystemUtils#createProxiedFileSystemUsingToken(String, Token, URI, Configuration)}.
* Deprecated in favor of {@link #getProxiedFileSystemUsingTokens}
*/
@Deprecated
@Builder(builderClassName = "ProxiedFileSystemFromToken", builderMethodName = "fromToken")
private static FileSystem getProxiedFileSystemUsingToken(@NonNull String userNameToProxyAs, Token<?> userNameToken,
URI fsURI, Configuration conf, FileSystem referenceFS) throws IOException, ExecutionException {
Expand All @@ -215,7 +219,20 @@ private static FileSystem getProxiedFileSystemUsingToken(@NonNull String userNam
Configuration actualConfiguration = resolveConfiguration(conf, referenceFS);

return USER_NAME_TO_FILESYSTEM_CACHE.get(getFileSystemKey(actualURI, userNameToProxyAs, referenceFS),
new CreateProxiedFileSystemFromToken(userNameToProxyAs, userNameToken, actualURI, actualConfiguration,
new CreateProxiedFileSystemFromToken(userNameToProxyAs, Collections.singletonList(userNameToken), actualURI, actualConfiguration,
referenceFS));
}

@Builder(builderClassName = "ProxiedFileSystemFromTokens", builderMethodName = "fromTokens")
private static FileSystem getProxiedFileSystemUsingTokens(@NonNull String userNameToProxyAs, List<Token<?>> userNameTokens,
URI fsURI, Configuration conf, FileSystem referenceFS) throws IOException, ExecutionException {
Preconditions.checkNotNull(userNameToProxyAs, "Must provide a user name to proxy as.");
Preconditions.checkNotNull(userNameTokens, "Must provide token for user to proxy.");
URI actualURI = resolveUri(fsURI, conf, referenceFS);
Configuration actualConfiguration = resolveConfiguration(conf, referenceFS);

return USER_NAME_TO_FILESYSTEM_CACHE.get(getFileSystemKey(actualURI, userNameToProxyAs, referenceFS),
new CreateProxiedFileSystemFromToken(userNameToProxyAs, userNameTokens, actualURI, actualConfiguration,
referenceFS));
}

Expand Down Expand Up @@ -272,7 +289,7 @@ private static class CreateProxiedFileSystemFromToken implements Callable<FileSy
@NonNull
private final String userNameToProxyAs;
@NonNull
private final Token<?> userNameToken;
private final List<Token<?>> userNameTokens;
@NonNull
private final URI uri;
@NonNull
Expand All @@ -282,7 +299,7 @@ private static class CreateProxiedFileSystemFromToken implements Callable<FileSy
@Override
public FileSystem call() throws Exception {
FileSystem fs = ProxiedFileSystemUtils.createProxiedFileSystemUsingToken(this.userNameToProxyAs,
this.userNameToken, this.uri, this.configuration);
this.userNameTokens, this.uri, this.configuration);
if (this.referenceFS != null) {
return decorateFilesystemFromReferenceFS(fs, this.referenceFS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@
import java.io.IOException;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
import java.util.Collections;
import java.util.List;
import java.util.Properties;

import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.io.Closer;

Expand Down Expand Up @@ -86,10 +87,10 @@ static FileSystem createProxiedFileSystem(@NonNull final String userNameToProxyA
case TOKEN:
Preconditions.checkArgument(properties.containsKey(AUTH_TOKEN_PATH));
Path tokenPath = new Path(properties.getProperty(AUTH_TOKEN_PATH));
Optional<Token<?>> proxyToken = getTokenFromSeqFile(userNameToProxyAs, tokenPath);
if (proxyToken.isPresent()) {
List<Token<?>> proxyTokens = getTokenFromSeqFile(userNameToProxyAs, tokenPath);
if (proxyTokens.size() > 0) {
try {
return createProxiedFileSystemUsingToken(userNameToProxyAs, proxyToken.get(), fsURI, conf);
return createProxiedFileSystemUsingToken(userNameToProxyAs, proxyTokens, fsURI, conf);
} catch (InterruptedException e) {
throw new IOException("Failed to proxy as user " + userNameToProxyAs, e);
}
Expand Down Expand Up @@ -168,17 +169,19 @@ static FileSystem createProxiedFileSystemUsingKeytab(State state, URI fsURI, Con
* method to create a {@link FileSystem}.
*
* @param userNameToProxyAs The name of the user the super user should proxy as
* @param userNameToken The {@link Token} to add to the proxied user's {@link UserGroupInformation}.
* @param userNameTokens List of {@link Token}s to add to the proxied user's {@link UserGroupInformation}.
* @param fsURI The {@link URI} for the {@link FileSystem} that should be created
* @param conf The {@link Configuration} for the {@link FileSystem} that should be created
*
* @return a {@link FileSystem} that can execute commands on behalf of the specified userNameToProxyAs
*/
static FileSystem createProxiedFileSystemUsingToken(@NonNull String userNameToProxyAs,
@NonNull Token<?> userNameToken, URI fsURI, Configuration conf) throws IOException, InterruptedException {
@NonNull List<Token<?>> userNameTokens, URI fsURI, Configuration conf) throws IOException, InterruptedException {
UserGroupInformation ugi =
UserGroupInformation.createProxyUser(userNameToProxyAs, UserGroupInformation.getLoginUser());
ugi.addToken(userNameToken);
for (Token<?> userNameToken : userNameTokens) {
ugi.addToken(userNameToken);
}
return ugi.doAs(new ProxiedFileSystem(fsURI, conf));
}

Expand All @@ -205,7 +208,7 @@ public static boolean canProxyAs(String userNameToProxyAs, String superUserName,
*
* @return A {@link Token} for the given user name
*/
public static Optional<Token<?>> getTokenFromSeqFile(String userNameKey, Path tokenFilePath) throws IOException {
public static List<Token<?>> getTokenFromSeqFile(String userNameKey, Path tokenFilePath) throws IOException {
log.info("Reading tokens from sequence file " + tokenFilePath);

try (Closer closer = Closer.create()) {
Expand All @@ -218,12 +221,12 @@ public static Optional<Token<?>> getTokenFromSeqFile(String userNameKey, Path to
while (tokenReader.next(key, value)) {
log.debug("Found token for user: " + key);
if (key.toString().equals(userNameKey)) {
return Optional.<Token<?>> of(value);
return Collections.singletonList(value);
}
}
}
log.warn("Did not find any tokens for user " + userNameKey);
return Optional.absent();
return Collections.emptyList();
}

private static UserGroupInformation loginAndProxyAsUser(@NonNull String userNameToProxyAs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.net.URI;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;

Expand Down Expand Up @@ -364,12 +365,12 @@ private static FileSystem getWriterFsUsingToken(State state, URI uri)
throws IOException {
try {
String user = state.getProp(ConfigurationKeys.FS_PROXY_AS_USER_NAME);
Optional<Token<?>> token = ProxiedFileSystemUtils
List<Token<?>> tokens = ProxiedFileSystemUtils
.getTokenFromSeqFile(user, new Path(state.getProp(ConfigurationKeys.FS_PROXY_AS_USER_TOKEN_FILE)));
if (!token.isPresent()) {
if (tokens.isEmpty()) {
throw new IOException("No token found for user " + user);
}
return ProxiedFileSystemCache.fromToken().userNameToken(token.get())
return ProxiedFileSystemCache.fromTokens().userNameTokens(tokens)
.userNameToProxyAs(state.getProp(ConfigurationKeys.FS_PROXY_AS_USER_NAME)).fsURI(uri)
.conf(HadoopUtils.newConfiguration()).build();
} catch (ExecutionException e) {
Expand Down

0 comments on commit 99f0507

Please sign in to comment.