diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index cde77b3bf6..09077dcdf2 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -191,7 +191,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; -import com.google.common.base.Joiner; import org.apache.tez.common.Preconditions; import com.google.common.collect.Maps; import com.google.common.util.concurrent.ListeningExecutorService; @@ -225,7 +224,6 @@ public class DAGAppMaster extends AbstractService { * Priority of the DAGAppMaster shutdown hook. */ public static final int SHUTDOWN_HOOK_PRIORITY = 30; - private static final Joiner PATH_JOINER = Joiner.on('/'); @VisibleForTesting static final String INVALID_SESSION_ERR_MSG = "Initial application attempt in session mode failed. " @@ -313,7 +311,6 @@ public class DAGAppMaster extends AbstractService { /** * set of already executed dag names. */ - Set dagNames = new HashSet(); Set dagIDs = new HashSet(); protected boolean isLastAMRetry = false; @@ -373,19 +370,17 @@ public DAGAppMaster(ApplicationAttemptId applicationAttemptId, this.containerID.toString(), this.appMasterUgi.getShortUserName()); LOG.info("Created DAGAppMaster for application " + applicationAttemptId - + ", versionInfo=" + dagVersionInfo.toString()); + + ", versionInfo=" + dagVersionInfo); TezCommonUtils.logCredentials(LOG, this.appMasterUgi.getCredentials(), "am"); } // Pull this WebAppUtils function into Tez until YARN-4186 - public static String getRunningLogURL(String nodeHttpAddress, + private static String getRunningLogURL(String nodeHttpAddress, String containerId, String user) { - if (nodeHttpAddress == null || nodeHttpAddress.isEmpty() - || containerId == null || containerId.isEmpty() || user == null - || user.isEmpty()) { + if (containerId.isEmpty() || user == null | user.isEmpty()) { return null; } - return PATH_JOINER.join(nodeHttpAddress, "node", "containerlogs", + return String.format("%s/node/containerlogs/%s/%s", nodeHttpAddress, containerId, user); } @@ -696,8 +691,7 @@ private void handleInternalError(String errDiagnosticsPrefix, String errDiagDagE state = DAGAppMasterState.ERROR; if (currentDAG != null) { _updateLoggers(currentDAG, "_post"); - String errDiagnostics = errDiagnosticsPrefix + ". Aborting dag: " + currentDAG.getID(); - LOG.info(errDiagnostics); + LOG.info(errDiagnosticsPrefix + ". Aborting dag: " + currentDAG.getID()); // Inform the current DAG about the error sendEvent(new DAGEventInternalError(currentDAG.getID(), errDiagDagEvent)); } else { @@ -759,8 +753,8 @@ protected synchronized void handle(DAGAppMasterEvent event) { DAGAppMasterEventDAGFinished finishEvt = (DAGAppMasterEventDAGFinished) event; String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime()); - System.err.println(timeStamp + " Completed Dag: " + finishEvt.getDAGId().toString()); - System.out.println(timeStamp + " Completed Dag: " + finishEvt.getDAGId().toString()); + System.err.println(timeStamp + " Completed Dag: " + finishEvt.getDAGId()); + System.out.println(timeStamp + " Completed Dag: " + finishEvt.getDAGId()); // Stop vertex services if any stopVertexServices(currentDAG); if (!isSession) { @@ -768,13 +762,11 @@ protected synchronized void handle(DAGAppMasterEvent event) { this.taskSchedulerManager.setShouldUnregisterFlag(); _updateLoggers(currentDAG, "_post"); setStateOnDAGCompletion(); - LOG.info("Shutting down on completion of dag:" + - finishEvt.getDAGId().toString()); + LOG.info("Shutting down on completion of dag:" + finishEvt.getDAGId()); shutdownHandler.shutdown(); } else { - LOG.info("DAG completed, dagId=" - + finishEvt.getDAGId().toString() - + ", dagState=" + finishEvt.getDAGState()); + LOG.info("DAG completed, dagId=" + finishEvt.getDAGId() + ", dagState=" + + finishEvt.getDAGState()); lastDAGCompletionTime = clock.getTime(); _updateLoggers(currentDAG, "_post"); if (this.historyEventHandler.hasRecoveryFailed()) { @@ -1033,9 +1025,8 @@ DAGImpl createDAG(DAGPlan dagPB, TezDAGID dagId) { try { if (LOG.isDebugEnabled()) { - LOG.debug("JSON dump for submitted DAG, dagId=" + dagId.toString() - + ", json=" - + DAGUtils.generateSimpleJSONPlan(dagPB).toString()); + LOG.debug("JSON dump for submitted DAG, dagId=" + dagId + ", json=" + + DAGUtils.generateSimpleJSONPlan(dagPB)); } } catch (JSONException e) { LOG.warn("Failed to generate json for DAG", e); @@ -1043,7 +1034,7 @@ DAGImpl createDAG(DAGPlan dagPB, TezDAGID dagId) { writeDebugArtifacts(dagPB, newDag); return newDag; - } // end createDag() + } private void writeDebugArtifacts(DAGPlan dagPB, DAGImpl newDag) { boolean debugArtifacts = @@ -1057,7 +1048,7 @@ private void writeDebugArtifacts(DAGPlan dagPB, DAGImpl newDag) { private void writePBTextFile(DAG dag) { String logFile = logDirs[new Random().nextInt(logDirs.length)] + File.separatorChar - + dag.getID().toString() + "-" + TezConstants.TEZ_PB_PLAN_TEXT_NAME; + + dag.getID() + "-" + TezConstants.TEZ_PB_PLAN_TEXT_NAME; LOG.info("Writing DAG plan to: " + logFile); File outFile = new File(logFile); @@ -1066,7 +1057,7 @@ private void writePBTextFile(DAG dag) { printWriter.println(TezUtilsInternal.convertDagPlanToString(dag.getJobPlan())); printWriter.close(); } catch (IOException e) { - LOG.warn("Failed to write TEZ_PLAN to " + outFile.toString(), e); + LOG.warn("Failed to write TEZ_PLAN to " + outFile, e); } } @@ -2269,15 +2260,6 @@ public void handle(VertexEvent event) { } } - private static void validateInputParam(String value, String param) - throws IOException { - if (value == null) { - String msg = param + " is null"; - LOG.error(msg); - throw new IOException(msg); - } - } - private long checkAndHandleDAGClientTimeout() throws TezException { if (EnumSet.of(DAGAppMasterState.NEW, DAGAppMasterState.RECOVERING).contains(this.state) || sessionStopped.get()) { @@ -2346,8 +2328,8 @@ public static void main(String[] args) { clientVersion = VersionInfo.UNKNOWN; } - validateInputParam(appSubmitTimeStr, - ApplicationConstants.APP_SUBMIT_TIME_ENV); + Objects.requireNonNull(appSubmitTimeStr, + ApplicationConstants.APP_SUBMIT_TIME_ENV + " is null"); ContainerId containerId = ConverterUtils.toContainerId(containerIdStr); ApplicationAttemptId applicationAttemptId =