diff --git a/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java b/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java index 2ec6d2864d..26c11fd084 100644 --- a/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java +++ b/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java @@ -145,7 +145,7 @@ public DAGClient submitDag(DAG dag, SubmitDAGRequestProto request, String client + ", applicationId=" + sessionAppId + ", dagId=" + dagId + ", dagName=" + dag.getName()); - return new DAGClientImpl(sessionAppId, dagId, tezConf, this, ugi); + return getDAGClient(sessionAppId, dagId, tezConf, ugi); } protected DAGClientAMProtocolBlockingPB waitForProxy(long clientTimeout, Configuration conf, @@ -186,4 +186,9 @@ protected DAGClientAMProtocolBlockingPB getProxy(Configuration conf, Application UserGroupInformation ugi) throws TezException, IOException { return TezClientUtils.getAMProxy(this, conf, sessionAppId, ugi); } + + public DAGClient getDAGClient(ApplicationId appId, String dagId, TezConfiguration tezConf, + UserGroupInformation ugi) { + return new DAGClientImpl(appId, dagId, tezConf, this, ugi); + } } diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java index c37f0c181d..93807fdf46 100644 --- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java +++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java @@ -76,7 +76,6 @@ import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB; import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusRequestProto; import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRequestProto; -import org.apache.tez.dag.api.client.DAGClientImpl; import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; import com.google.common.annotations.VisibleForTesting; @@ -1117,15 +1116,7 @@ private synchronized Map getTezJarResources(Credentials c @Private static DAGClient getDAGClient(ApplicationId appId, TezConfiguration tezConf, FrameworkClient frameworkClient, UserGroupInformation ugi) throws IOException, TezException { - return new DAGClientImpl(appId, getDefaultTezDAGID(appId), tezConf, frameworkClient, ugi); - } - - @Private // Used only for MapReduce compatibility code - static DAGClient getDAGClient(ApplicationId appId, TezConfiguration tezConf, - FrameworkClient frameworkClient) throws IOException, TezException { - UserGroupInformation ugi = - UserGroupInformation.createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()); - return getDAGClient(appId, tezConf, frameworkClient, ugi); + return frameworkClient.getDAGClient(appId, getDefaultTezDAGID(appId), tezConf, ugi); } // DO NOT CHANGE THIS. This code is replicated from TezDAGID.java diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java index bfea96b998..95dd85f388 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java @@ -296,9 +296,13 @@ public DAGStatus getDAGStatus(@Nullable Set statusOptions) throws } @Override - public VertexStatus getVertexStatus(String vertexName, Set statusOptions) throws - IOException, TezException { + public VertexStatus getVertexStatus(String vertexName, Set statusOptions) + throws IOException, TezException { + return getVertexStatusInternal(statusOptions, vertexName); + } + protected VertexStatus getVertexStatusInternal(Set statusOptions, String vertexName) + throws IOException, TezException { if (!dagCompleted) { VertexStatus vertexStatus = getVertexStatusViaAM(vertexName, statusOptions); diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImplLocal.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImplLocal.java index a0509cdc5f..851bb687a1 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImplLocal.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImplLocal.java @@ -36,12 +36,15 @@ public class DAGClientImplLocal extends DAGClientImpl { private BiFunction, Long, DAGStatus> dagStatusFunction; + private BiFunction, String, VertexStatus> vertexStatusFunction; public DAGClientImplLocal(ApplicationId appId, String dagId, TezConfiguration conf, FrameworkClient frameworkClient, UserGroupInformation ugi, - BiFunction, Long, DAGStatus> dagStatusFunction) { + BiFunction, Long, DAGStatus> dagStatusFunction, + BiFunction, String, VertexStatus> vertexStatusFunction) { super(appId, dagId, conf, frameworkClient, ugi); this.dagStatusFunction = dagStatusFunction; + this.vertexStatusFunction = vertexStatusFunction; } @Override @@ -50,4 +53,10 @@ protected DAGStatus getDAGStatusInternal(@Nullable Set statusOpti return dagStatusFunction.apply(statusOptions == null ? new HashSet<>() : statusOptions, timeout); } + + @Override + protected VertexStatus getVertexStatusInternal(@Nullable Set statusOptions, String vertexName) + throws TezException, IOException { + return vertexStatusFunction.apply(statusOptions == null ? new HashSet<>() : statusOptions, vertexName); + } } diff --git a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java index d0580bbae9..c9b3d7314b 100644 --- a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java +++ b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java @@ -60,6 +60,7 @@ import org.apache.tez.dag.api.client.DAGClientImplLocal; import org.apache.tez.dag.api.client.DAGStatus; import org.apache.tez.dag.api.client.StatusGetOpts; +import org.apache.tez.dag.api.client.VertexStatus; import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRequestProto; import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto; import org.apache.tez.dag.app.AppContext; @@ -426,20 +427,32 @@ public DAGClient submitDag(org.apache.tez.dag.api.DAG dag, SubmitDAGRequestProto } String dagId = dagAppMaster.submitDAGToAppMaster(request.getDAGPlan(), additionalResources); + return getDAGClient(sessionAppId, dagId, tezConf, ugi); + } + @Override + public DAGClient getDAGClient(ApplicationId appId, String dagId, TezConfiguration tezConf, + UserGroupInformation ugi) { return isLocalWithoutNetwork - ? new DAGClientImplLocal(sessionAppId, dagId, tezConf, this, - ugi, new BiFunction, Long, DAGStatus>() { - @Override - public DAGStatus apply(Set statusOpts, Long timeout) { - try { - return clientHandler.getDAGStatus(dagId, statusOpts, timeout); - } catch (TezException e) { - throw new RuntimeException(e); - } - } - }) - : new DAGClientImpl(sessionAppId, dagId, tezConf, this, ugi); + ? new DAGClientImplLocal(appId, dagId, tezConf, this, ugi, new BiFunction, Long, DAGStatus>() { + @Override + public DAGStatus apply(Set statusOpts, Long timeout) { + try { + return clientHandler.getDAGStatus(dagId, statusOpts, timeout); + } catch (TezException e) { + throw new RuntimeException(e); + } + } + }, new BiFunction, String, VertexStatus>() { + @Override + public VertexStatus apply(Set statusOpts, String vertexName) { + try { + return clientHandler.getVertexStatus(dagId, vertexName, statusOpts); + } catch (TezException e) { + throw new RuntimeException(e); + } + } + }) : new DAGClientImpl(appId, dagId, tezConf, this, ugi); } @Override diff --git a/tez-mapreduce/src/main/java/org/apache/tez/client/MRTezClient.java b/tez-mapreduce/src/main/java/org/apache/tez/client/MRTezClient.java index 86089e9a67..1057932e1d 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/client/MRTezClient.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/client/MRTezClient.java @@ -31,7 +31,6 @@ import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.client.DAGClient; -import org.apache.tez.dag.api.client.MRDAGClient; @InterfaceAudience.Private public class MRTezClient extends TezClient { @@ -46,9 +45,4 @@ public DAGClient submitDAGApplication(ApplicationId appId, org.apache.tez.dag.ap throws TezException, IOException { return super.submitDAGApplication(appId, dag); } - - public static MRDAGClient getDAGClient(ApplicationId appId, TezConfiguration tezConf, FrameworkClient frameworkClient) - throws IOException, TezException { - return new MRDAGClient(TezClient.getDAGClient(appId, tezConf, frameworkClient)); - } } diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java index 9dba357951..7aed4a04a8 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java @@ -639,7 +639,7 @@ public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) tezClient = new MRTezClient("MapReduce", dagAMConf, false, jobLocalResources, ts); tezClient.start(); - tezClient.submitDAGApplication(appId, dag); + dagClient = new MRDAGClient(tezClient.submitDAGApplication(appId, dag)); tezClient.stop(); } catch (TezException e) { throw new IOException(e); @@ -702,9 +702,6 @@ public JobStatus getJobStatus(JobID jobID) throws IOException, String jobFile = MRApps.getJobFile(conf, user, jobID); DAGStatus dagStatus; try { - if(dagClient == null) { - dagClient = MRTezClient.getDAGClient(TypeConverter.toYarn(jobID).getAppId(), tezConf, null); - } dagStatus = dagClient.getDAGStatus(null); return new DAGJobStatus(dagClient.getApplicationReport(), dagStatus, jobFile); } catch (TezException e) { diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java index 3efcd21c92..7750a13a63 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java @@ -46,6 +46,7 @@ public class SleepProcessor extends AbstractLogicalIOProcessor { private static final Logger LOG = LoggerFactory.getLogger(SleepProcessor.class); + public static final String SLEEP_VERTEX_NAME = "Sleep"; private int timeToSleepMS; protected Map inputs; diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java b/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java index bdb71ad177..00125fd547 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java @@ -39,6 +39,7 @@ import org.apache.tez.dag.api.Vertex; import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.DAGStatus; +import org.apache.tez.dag.api.client.VertexStatus; import org.apache.tez.examples.OrderedWordCount; import org.apache.tez.runtime.api.AbstractLogicalIOProcessor; import org.apache.tez.runtime.api.Event; @@ -131,6 +132,8 @@ public void testMultipleClientsWithSession() throws TezException, InterruptedExc DAGClient dagClient1 = tezClient1.submitDAG(dag1); dagClient1.waitForCompletion(); assertEquals(DAGStatus.State.SUCCEEDED, dagClient1.getDAGStatus(null).getState()); + assertEquals(VertexStatus.State.SUCCEEDED, + dagClient1.getVertexStatus(SleepProcessor.SLEEP_VERTEX_NAME, null).getState()); dagClient1.close(); tezClient1.stop(); @@ -142,6 +145,8 @@ public void testMultipleClientsWithSession() throws TezException, InterruptedExc DAGClient dagClient2 = tezClient2.submitDAG(dag2); dagClient2.waitForCompletion(); assertEquals(DAGStatus.State.SUCCEEDED, dagClient2.getDAGStatus(null).getState()); + assertEquals(VertexStatus.State.SUCCEEDED, + dagClient2.getVertexStatus(SleepProcessor.SLEEP_VERTEX_NAME, null).getState()); assertFalse(dagClient1.getExecutionContext().equals(dagClient2.getExecutionContext())); dagClient2.close(); tezClient2.stop(); @@ -159,7 +164,8 @@ public void testMultipleClientsWithoutSession() throws TezException, Interrupted DAGClient dagClient1 = tezClient1.submitDAG(dag1); dagClient1.waitForCompletion(); assertEquals(DAGStatus.State.SUCCEEDED, dagClient1.getDAGStatus(null).getState()); - + assertEquals(VertexStatus.State.SUCCEEDED, + dagClient1.getVertexStatus(SleepProcessor.SLEEP_VERTEX_NAME, null).getState()); dagClient1.close(); tezClient1.stop(); @@ -171,6 +177,8 @@ public void testMultipleClientsWithoutSession() throws TezException, Interrupted DAGClient dagClient2 = tezClient2.submitDAG(dag2); dagClient2.waitForCompletion(); assertEquals(DAGStatus.State.SUCCEEDED, dagClient2.getDAGStatus(null).getState()); + assertEquals(VertexStatus.State.SUCCEEDED, + dagClient2.getVertexStatus(SleepProcessor.SLEEP_VERTEX_NAME, null).getState()); assertFalse(dagClient1.getExecutionContext().equals(dagClient2.getExecutionContext())); dagClient2.close(); tezClient2.stop(); @@ -189,7 +197,8 @@ public void testNoSysExitOnSuccessfulDAG() throws TezException, InterruptedExcep DAGClient dagClient1 = tezClient1.submitDAG(dag1); dagClient1.waitForCompletion(); assertEquals(DAGStatus.State.SUCCEEDED, dagClient1.getDAGStatus(null).getState()); - + assertEquals(VertexStatus.State.SUCCEEDED, + dagClient1.getVertexStatus(SleepProcessor.SLEEP_VERTEX_NAME, null).getState()); // Sleep for more time than is required for the DAG to complete. Thread.sleep((long) (TezConstants.TEZ_DAG_SLEEP_TIME_BEFORE_EXIT * 1.5)); @@ -210,7 +219,8 @@ public void testNoSysExitOnFailinglDAG() throws TezException, InterruptedExcepti DAGClient dagClient1 = tezClient1.submitDAG(dag1); dagClient1.waitForCompletion(); assertEquals(DAGStatus.State.FAILED, dagClient1.getDAGStatus(null).getState()); - + assertEquals(VertexStatus.State.FAILED, + dagClient1.getVertexStatus(SleepProcessor.SLEEP_VERTEX_NAME, null).getState()); // Sleep for more time than is required for the DAG to complete. Thread.sleep((long) (TezConstants.TEZ_DAG_SLEEP_TIME_BEFORE_EXIT * 1.5)); @@ -245,12 +255,11 @@ public void run(Map inputs, Map out } private DAG createSimpleDAG(String dagName, String processorName) { - DAG dag = DAG.create(dagName).addVertex(Vertex.create("Sleep", ProcessorDescriptor.create( - processorName).setUserPayload( - new SleepProcessor.SleepProcessorConfig(1).toUserPayload()), 1)); + DAG dag = DAG.create(dagName).addVertex(Vertex.create(SleepProcessor.SLEEP_VERTEX_NAME, ProcessorDescriptor + .create(processorName).setUserPayload(new SleepProcessor.SleepProcessorConfig(1).toUserPayload()), 1)); return dag; - } + @Test(timeout=30000) public void testMultiDAGsOnSession() throws IOException, TezException, InterruptedException { int dags = 2;//two dags will be submitted to session