Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions docs/src/site/markdown/tez_acls.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@

Access control in Tez can be categorized as follows:

- Modify permissions on the Tez AM ( or Session ). Users with this permision can:
- Modify permissions on the Tez AM ( or Session ). Users with this permission can:
- Submit a DAG to a Tez Session
- Kill any DAG within the given AM/Session
- Kill the Session
- View permissions on the Tez AM ( or Session ). Users with this permision can:
- View permissions on the Tez AM ( or Session ). Users with this permission can:
- Monitor/View the status of the Session
- Monitor/View the progress/status of any DAG within the given AM/Session
- Modify permissions on a particular Tez DAG. Users with this permision can:
- Modify permissions on a particular Tez DAG. Users with this permission can:
- Kill the DAG
- View permissions on a particular Tez DAG. Users with this permision can:
- View permissions on a particular Tez DAG. Users with this permission can:
- Monitor/View the progress/status of the DAG

From the above, you can see that All users/groups that have access to do operations on the AM also have access to similar operations on all DAGs within that AM/session. Also, by default, the owner of the Tez AM, i.e. the user who started the Tez AM, is considered a super-user and has access to all operations on the AM as well as all DAGs within the AM/Session.
Expand Down
68 changes: 34 additions & 34 deletions tez-api/src/main/java/org/apache/tez/client/TezClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public class TezClient {
private static final String appIdStrPrefix = "application";
private static final String APPLICATION_ID_PREFIX = appIdStrPrefix + '_';
private static final long PREWARM_WAIT_MS = 500;

@VisibleForTesting
static final String NO_CLUSTER_DIAGNOSTICS_MSG = "No cluster diagnostics found.";

Expand Down Expand Up @@ -303,7 +303,7 @@ public static TezClient create(String name, TezConfiguration tezConf, boolean is
}

/**
* Add local files for the DAG App Master. These may be files, archives,
* Add local files for the DAG App Master. These may be files, archives,
* jars etc.<br>
* <p>
* In non-session mode these will be added to the files of the App Master
Expand All @@ -321,7 +321,7 @@ public static TezClient create(String name, TezConfiguration tezConf, boolean is
* accumulate across DAG submissions and are never removed from the classpath.
* Only LocalResourceType.FILE is supported. All files will be treated as
* private.
*
*
* @param localFiles the files to be made available in the AM
*/
public synchronized void addAppMasterLocalFiles(Map<String, LocalResource> localFiles) {
Expand All @@ -331,7 +331,7 @@ public synchronized void addAppMasterLocalFiles(Map<String, LocalResource> local
}
amConfig.addAMLocalResources(localFiles);
}

/**
* If the next DAG App Master needs different local files, then use this
* method to clear the local files and then add the new local files
Expand All @@ -341,7 +341,7 @@ public synchronized void addAppMasterLocalFiles(Map<String, LocalResource> local
public synchronized void clearAppMasterLocalFiles() {
amConfig.clearAMLocalResources();
}

/**
* Set security credentials to be used inside the app master, if needed. Tez App
* Master needs credentials to access the staging directory and for most HDFS
Expand All @@ -351,7 +351,7 @@ public synchronized void clearAppMasterLocalFiles() {
* credentials must be supplied by the user. These will be used by the App
* Master for the next DAG. <br>In session mode, credentials, if needed, must be
* set before calling start()
*
*
* @param credentials credentials
*/
public synchronized void setAppMasterCredentials(Credentials credentials) {
Expand Down Expand Up @@ -387,16 +387,16 @@ public synchronized void start() throws TezException, IOException {
LOG.info("Session mode. Starting session.");
TezClientUtils.processTezLocalCredentialsFile(sessionCredentials,
amConfig.getTezConfiguration());

clientTimeout = amConfig.getTezConfiguration().getInt(
TezConfiguration.TEZ_SESSION_CLIENT_TIMEOUT_SECS,
TezConfiguration.TEZ_SESSION_CLIENT_TIMEOUT_SECS_DEFAULT);

try {
if (sessionAppId == null) {
sessionAppId = createApplication();
}

ApplicationSubmissionContext appContext = setupApplicationContext();
frameworkClient.submitApplication(appContext);
ApplicationReport appReport = frameworkClient.getApplicationReport(sessionAppId);
Expand Down Expand Up @@ -597,15 +597,15 @@ public DAGClientAMProtocolBlockingPB sendAMHeartbeat(DAGClientAMProtocolBlocking
* cluster.<br>In session mode, it submits the DAG to the session App Master. It
* blocks until either the DAG is submitted to the session or configured
* timeout period expires. Cleans up session if the submission timed out.
*
*
* @param dag
* DAG to be submitted to Session
* @return DAGClient to monitor the DAG
* @throws TezException
* @throws IOException
* @throws DAGSubmissionTimedOut
* if submission timed out
*/
*/
public synchronized DAGClient submitDAG(DAG dag) throws TezException, IOException {
DAGClient result = isSession ? submitDAGSession(dag) : submitDAGApplication(dag);
if (result != null) {
Expand Down Expand Up @@ -642,12 +642,12 @@ private void closePrewarmDagClient() {
}
prewarmDagClient = null;
}

private DAGClient submitDAGSession(DAG dag) throws TezException, IOException {
Preconditions.checkState(isSession == true,
"submitDAG with additional resources applies to only session mode. " +
Preconditions.checkState(isSession,
"submitDAG with additional resources applies to only session mode. " +
"In non-session mode please specify all resources in the initial configuration");

verifySessionStateForSubmission();

String callerContextStr = "";
Expand All @@ -659,7 +659,7 @@ private DAGClient submitDAGSession(DAG dag) throws TezException, IOException {
+ ", applicationId=" + sessionAppId
+ ", dagName=" + dag.getName()
+ callerContextStr);

if (!additionalLocalResources.isEmpty()) {
for (LocalResource lr : additionalLocalResources.values()) {
Preconditions.checkArgument(lr.getType() == LocalResourceType.FILE, "LocalResourceType: "
Expand All @@ -677,7 +677,7 @@ private DAGClient submitDAGSession(DAG dag) throws TezException, IOException {
requestBuilder.setAdditionalAmResources(DagTypeConverters
.convertFromLocalResources(additionalLocalResources));
}

additionalLocalResources.clear();

// if request size exceeds maxSubmitDAGRequestSizeThroughIPC, we serialize them to HDFS
Expand Down Expand Up @@ -804,7 +804,7 @@ private boolean isJobInTerminalState(YarnApplicationState yarnApplicationState)
public String getClientName() {
return clientName;
}

@Private
@VisibleForTesting
public synchronized ApplicationId getAppMasterApplicationId() {
Expand All @@ -817,16 +817,16 @@ public synchronized ApplicationId getAppMasterApplicationId() {

/**
* Get the status of the App Master executing the DAG
* In non-session mode it returns the status of the last submitted DAG App Master
* In non-session mode it returns the status of the last submitted DAG App Master
* In session mode, it returns the status of the App Master hosting the session
*
*
* @return State of the session
* @throws TezException
* @throws IOException
*/
public synchronized TezAppMasterStatus getAppMasterStatus() throws TezException, IOException {
// Supporting per-DAG app master case since user may choose to run the same
// code in that mode and the code should continue to work. Its easy to provide
// Supporting per-DAG app master case since user may choose to run the same
// code in that mode and the code should continue to work. Its easy to provide
// the correct view for per-DAG app master too.
ApplicationId appId = null;
if (isSession) {
Expand Down Expand Up @@ -869,21 +869,21 @@ public synchronized TezAppMasterStatus getAppMasterStatus() throws TezException,
}
return TezAppMasterStatus.INITIALIZING;
}

/**
* API to help pre-allocate containers in session mode. In non-session mode
* this is ignored. The pre-allocated containers may be re-used by subsequent
* job DAGs to improve performance.
* this is ignored. The pre-allocated containers may be re-used by subsequent
* job DAGs to improve performance.
* The preWarm vertex should be configured and setup exactly
* like the other vertices in the job DAGs so that the pre-allocated containers
* like the other vertices in the job DAGs so that the pre-allocated containers
* may be re-used by the subsequent DAGs to improve performance.
* The processor for the preWarmVertex may be used to pre-warm the containers
* by pre-loading classes etc. It should be short-running so that pre-warming
* by pre-loading classes etc. It should be short-running so that pre-warming
* does not block real execution. Users can specify their custom processors or
* use the PreWarmProcessor from the runtime library.
* The parallelism of the preWarmVertex will determine the number of preWarmed
* containers.
* Pre-warming is best efforts and among other factors is limited by the free
* Pre-warming is best efforts and among other factors is limited by the free
* resources on the cluster.
* @param preWarmVertex
* @throws TezException
Expand Down Expand Up @@ -929,7 +929,7 @@ public synchronized void preWarm(PreWarmVertex preWarmVertex,
}

verifySessionStateForSubmission();

DAG dag = org.apache.tez.dag.api.DAG.create(TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX + "_"
+ preWarmDAGCounter++);
dag.addVertex(preWarmVertex);
Expand All @@ -948,15 +948,15 @@ public synchronized void preWarm(PreWarmVertex preWarmVertex,
}
}


/**
* Wait till the DAG is ready to be submitted.
* In non-session mode this is a no-op since the application can be immediately
* submitted.
* In session mode, this waits for the session host to be ready to accept a DAG
* @throws IOException
* @throws TezException
* @throws InterruptedException
* @throws InterruptedException
*/
@Evolving
public synchronized void waitTillReady() throws IOException, TezException, InterruptedException {
Expand Down Expand Up @@ -1042,7 +1042,7 @@ private void verifySessionStateForSubmission() throws SessionNotRunning {
throw new SessionNotRunning("Session stopped by user");
}
}

private DAGClient submitDAGApplication(DAG dag)
throws TezException, IOException {
ApplicationId appId = createApplication();
Expand All @@ -1055,7 +1055,7 @@ DAGClient submitDAGApplication(ApplicationId appId, DAG dag)
LOG.info("Submitting DAG application with id: " + appId);
try {
// Use the AMCredentials object in client mode, since this won't be re-used.
// Ensures we don't fetch credentially unnecessarily if the user has already provided them.
// Ensures we don't fetch credentials unnecessarily if the user has already provided them.
Credentials credentials = amConfig.getCredentials();
if (credentials == null) {
credentials = new Credentials();
Expand Down Expand Up @@ -1259,7 +1259,7 @@ public TezClient build() {
}
}

//Copied this helper method from
//Copied this helper method from
//org.apache.hadoop.yarn.api.records.ApplicationId in Hadoop 2.8+
//to simplify implementation on 2.7.x
@Public
Expand Down
28 changes: 14 additions & 14 deletions tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ private static FileStatus[] getLRFileStatus(String fileName, Configuration conf)
FileSystem fs = p.getFileSystem(conf);
p = fs.resolvePath(p.makeQualified(fs.getUri(),
fs.getWorkingDirectory()));
FileSystem targetFS = p.getFileSystem(conf);
FileSystem targetFS = p.getFileSystem(conf);
if (targetFS.isDirectory(p)) {
return targetFS.listStatus(p);
} else {
Expand All @@ -150,7 +150,7 @@ private static FileStatus[] getLRFileStatus(String fileName, Configuration conf)

/**
* Setup LocalResource map for Tez jars based on provided Configuration
*
*
* @param conf
* Configuration to use to access Tez jars' locations
* @param credentials
Expand All @@ -167,7 +167,7 @@ static boolean setupTezJarsLocalResources(TezConfiguration conf,
boolean usingTezArchive = false;

if (conf.getBoolean(TezConfiguration.TEZ_IGNORE_LIB_URIS, false)){
LOG.info("Ignoring '" + TezConfiguration.TEZ_LIB_URIS + "' since '" +
LOG.info("Ignoring '" + TezConfiguration.TEZ_LIB_URIS + "' since '" +
TezConfiguration.TEZ_IGNORE_LIB_URIS + "' is set to true");
} else {
// Add tez jars to local resource
Expand Down Expand Up @@ -357,7 +357,7 @@ public static FileSystem ensureStagingDirExists(Configuration conf,
}
return fs;
}

/**
* Populate {@link Credentials} for the URI's to access them from their {@link FileSystem}s
* @param uris URIs that need to be accessed
Expand Down Expand Up @@ -385,7 +385,7 @@ public Path apply(URI input) {
* Obtains tokens for the DAG based on the list of URIs setup in the DAG. The
* fetched credentials are populated back into the DAG and can be retrieved
* via dag.getCredentials
*
*
* @param dag
* the dag for which credentials need to be setup
* @param sessionCredentials
Expand All @@ -403,9 +403,9 @@ static Credentials setupDAGCredentials(DAG dag, Credentials sessionCredentials,
Credentials dagCredentials = new Credentials();
// All session creds are required for the DAG.
dagCredentials.mergeAll(sessionCredentials);

// Add additional credentials based on any URIs that the user may have specified.

// Obtain Credentials for any paths that the user may have configured.
addFileSystemCredentialsFromURIs(dag.getURIsForCredentials(), dagCredentials, conf);

Expand All @@ -425,7 +425,7 @@ static Credentials setupDAGCredentials(DAG dag, Credentials sessionCredentials,
addFileSystemCredentialsFromURIs(dataSink.getURIsForCredentials(), dagCredentials, conf);
}
}

for (LocalResource lr: dag.getTaskLocalFiles().values()) {
lrPaths.add(ConverterUtils.getPathFromYarnURL(lr.getResource()));
}
Expand All @@ -436,7 +436,7 @@ static Credentials setupDAGCredentials(DAG dag, Credentials sessionCredentials,
} catch (URISyntaxException e) {
throw new IOException(e);
}

return dagCredentials;
}

Expand Down Expand Up @@ -597,7 +597,7 @@ public static ApplicationSubmissionContext createApplicationSubmissionContext(
if (amLocalResources != null && !amLocalResources.isEmpty()) {
amResourceProto = DagTypeConverters.convertFromLocalResources(amLocalResources);
} else {
amResourceProto = DAGProtos.PlanLocalResourcesProto.getDefaultInstance();
amResourceProto = DAGProtos.PlanLocalResourcesProto.getDefaultInstance();
}
amResourceProto.writeDelimitedTo(sessionJarsPBOutStream);
} finally {
Expand All @@ -619,7 +619,7 @@ public static ApplicationSubmissionContext createApplicationSubmissionContext(
Map<ApplicationAccessType, String> acls = aclManager.toYARNACls();

if(dag != null) {

DAGPlan dagPB = prepareAndCreateDAGPlan(dag, amConfig, tezJarResources, tezLrsAsArchive,
sessionCreds, servicePluginsDescriptor, javaOptsChecker);

Expand Down Expand Up @@ -749,7 +749,7 @@ private static void populateTokenCache(TezConfiguration conf, Credentials creden
TokenCache.obtainTokensForFileSystems(credentials, ps, conf);
}
}

static DAGPlan prepareAndCreateDAGPlan(DAG dag, AMConfiguration amConfig,
Map<String, LocalResource> tezJarResources, boolean tezLrsAsArchive,
Credentials credentials, ServicePluginsDescriptor servicePluginsDescriptor,
Expand All @@ -760,7 +760,7 @@ static DAGPlan prepareAndCreateDAGPlan(DAG dag, AMConfiguration amConfig,
return dag.createDag(amConfig.getTezConfiguration(), dagCredentials, tezJarResources,
amConfig.getBinaryConfLR(), tezLrsAsArchive, servicePluginsDescriptor, javaOptsChecker);
}

static void maybeAddDefaultLoggingJavaOpts(String logLevel, List<String> vargs) {
Objects.requireNonNull(vargs);
TezClientUtils.addLog4jSystemProperties(logLevel, vargs);
Expand Down Expand Up @@ -1119,7 +1119,7 @@ public static void setApplicationPriority(ApplicationSubmissionContext context,
int priority = amConfig.getTezConfiguration().getInt(TezConfiguration.TEZ_AM_APPLICATION_PRIORITY, 0);
context.setPriority(Priority.newInstance(priority));
if (LOG.isDebugEnabled()) {
LOG.debug("Settting TEZ application priority, applicationId= " + context.getApplicationId() +
LOG.debug("Setting TEZ application priority, applicationId= " + context.getApplicationId() +
", priority= " + context.getPriority().getPriority());
}
}
Expand Down
Loading