Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public DAGClient submitDag(DAG dag, SubmitDAGRequestProto request, String client
+ ", applicationId=" + sessionAppId
+ ", dagId=" + dagId
+ ", dagName=" + dag.getName());
return new DAGClientImpl(sessionAppId, dagId, tezConf, this, ugi);
return getDAGClient(sessionAppId, dagId, tezConf, ugi);
}

protected DAGClientAMProtocolBlockingPB waitForProxy(long clientTimeout, Configuration conf,
Expand Down Expand Up @@ -186,4 +186,9 @@ protected DAGClientAMProtocolBlockingPB getProxy(Configuration conf, Application
UserGroupInformation ugi) throws TezException, IOException {
return TezClientUtils.getAMProxy(this, conf, sessionAppId, ugi);
}

public DAGClient getDAGClient(ApplicationId appId, String dagId, TezConfiguration tezConf,
UserGroupInformation ugi) {
return new DAGClientImpl(appId, dagId, tezConf, this, ugi);
}
}
11 changes: 1 addition & 10 deletions tez-api/src/main/java/org/apache/tez/client/TezClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusRequestProto;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRequestProto;
import org.apache.tez.dag.api.client.DAGClientImpl;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -1117,15 +1116,7 @@ private synchronized Map<String, LocalResource> getTezJarResources(Credentials c
@Private
static DAGClient getDAGClient(ApplicationId appId, TezConfiguration tezConf,
FrameworkClient frameworkClient, UserGroupInformation ugi) throws IOException, TezException {
return new DAGClientImpl(appId, getDefaultTezDAGID(appId), tezConf, frameworkClient, ugi);
}

@Private // Used only for MapReduce compatibility code
static DAGClient getDAGClient(ApplicationId appId, TezConfiguration tezConf,
FrameworkClient frameworkClient) throws IOException, TezException {
UserGroupInformation ugi =
UserGroupInformation.createRemoteUser(UserGroupInformation.getCurrentUser().getUserName());
return getDAGClient(appId, tezConf, frameworkClient, ugi);
return frameworkClient.getDAGClient(appId, getDefaultTezDAGID(appId), tezConf, ugi);
}

// DO NOT CHANGE THIS. This code is replicated from TezDAGID.java
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,9 +296,13 @@ public DAGStatus getDAGStatus(@Nullable Set<StatusGetOpts> statusOptions) throws
}

@Override
public VertexStatus getVertexStatus(String vertexName, Set<StatusGetOpts> statusOptions) throws
IOException, TezException {
public VertexStatus getVertexStatus(String vertexName, Set<StatusGetOpts> statusOptions)
throws IOException, TezException {
return getVertexStatusInternal(statusOptions, vertexName);
}

protected VertexStatus getVertexStatusInternal(Set<StatusGetOpts> statusOptions, String vertexName)
throws IOException, TezException {
if (!dagCompleted) {
VertexStatus vertexStatus = getVertexStatusViaAM(vertexName, statusOptions);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,15 @@
public class DAGClientImplLocal extends DAGClientImpl {

private BiFunction<Set<StatusGetOpts>, Long, DAGStatus> dagStatusFunction;
private BiFunction<Set<StatusGetOpts>, String, VertexStatus> vertexStatusFunction;

public DAGClientImplLocal(ApplicationId appId, String dagId, TezConfiguration conf,
FrameworkClient frameworkClient, UserGroupInformation ugi,
BiFunction<Set<StatusGetOpts>, Long, DAGStatus> dagStatusFunction) {
BiFunction<Set<StatusGetOpts>, Long, DAGStatus> dagStatusFunction,
BiFunction<Set<StatusGetOpts>, String, VertexStatus> vertexStatusFunction) {
super(appId, dagId, conf, frameworkClient, ugi);
this.dagStatusFunction = dagStatusFunction;
this.vertexStatusFunction = vertexStatusFunction;
}

@Override
Expand All @@ -50,4 +53,10 @@ protected DAGStatus getDAGStatusInternal(@Nullable Set<StatusGetOpts> statusOpti
return dagStatusFunction.apply(statusOptions == null ? new HashSet<>() : statusOptions,
timeout);
}

@Override
protected VertexStatus getVertexStatusInternal(@Nullable Set<StatusGetOpts> statusOptions, String vertexName)
throws TezException, IOException {
return vertexStatusFunction.apply(statusOptions == null ? new HashSet<>() : statusOptions, vertexName);
}
}
37 changes: 25 additions & 12 deletions tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.tez.dag.api.client.DAGClientImplLocal;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.dag.api.client.VertexStatus;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRequestProto;
import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto;
import org.apache.tez.dag.app.AppContext;
Expand Down Expand Up @@ -426,20 +427,32 @@ public DAGClient submitDag(org.apache.tez.dag.api.DAG dag, SubmitDAGRequestProto
}

String dagId = dagAppMaster.submitDAGToAppMaster(request.getDAGPlan(), additionalResources);
return getDAGClient(sessionAppId, dagId, tezConf, ugi);
}

@Override
public DAGClient getDAGClient(ApplicationId appId, String dagId, TezConfiguration tezConf,
UserGroupInformation ugi) {
return isLocalWithoutNetwork
? new DAGClientImplLocal(sessionAppId, dagId, tezConf, this,
ugi, new BiFunction<Set<StatusGetOpts>, Long, DAGStatus>() {
@Override
public DAGStatus apply(Set<StatusGetOpts> statusOpts, Long timeout) {
try {
return clientHandler.getDAGStatus(dagId, statusOpts, timeout);
} catch (TezException e) {
throw new RuntimeException(e);
}
}
})
: new DAGClientImpl(sessionAppId, dagId, tezConf, this, ugi);
? new DAGClientImplLocal(appId, dagId, tezConf, this, ugi, new BiFunction<Set<StatusGetOpts>, Long, DAGStatus>() {
@Override
public DAGStatus apply(Set<StatusGetOpts> statusOpts, Long timeout) {
try {
return clientHandler.getDAGStatus(dagId, statusOpts, timeout);
} catch (TezException e) {
throw new RuntimeException(e);
}
}
}, new BiFunction<Set<StatusGetOpts>, String, VertexStatus>() {
@Override
public VertexStatus apply(Set<StatusGetOpts> statusOpts, String vertexName) {
try {
return clientHandler.getVertexStatus(dagId, vertexName, statusOpts);
} catch (TezException e) {
throw new RuntimeException(e);
}
}
}) : new DAGClientImpl(appId, dagId, tezConf, this, ugi);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.MRDAGClient;

@InterfaceAudience.Private
public class MRTezClient extends TezClient {
Expand All @@ -46,9 +45,4 @@ public DAGClient submitDAGApplication(ApplicationId appId, org.apache.tez.dag.ap
throws TezException, IOException {
return super.submitDAGApplication(appId, dag);
}

public static MRDAGClient getDAGClient(ApplicationId appId, TezConfiguration tezConf, FrameworkClient frameworkClient)
throws IOException, TezException {
return new MRDAGClient(TezClient.getDAGClient(appId, tezConf, frameworkClient));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)

tezClient = new MRTezClient("MapReduce", dagAMConf, false, jobLocalResources, ts);
tezClient.start();
tezClient.submitDAGApplication(appId, dag);
dagClient = new MRDAGClient(tezClient.submitDAGApplication(appId, dag));
tezClient.stop();
} catch (TezException e) {
throw new IOException(e);
Expand Down Expand Up @@ -702,9 +702,6 @@ public JobStatus getJobStatus(JobID jobID) throws IOException,
String jobFile = MRApps.getJobFile(conf, user, jobID);
DAGStatus dagStatus;
try {
if(dagClient == null) {
dagClient = MRTezClient.getDAGClient(TypeConverter.toYarn(jobID).getAppId(), tezConf, null);
}
dagStatus = dagClient.getDAGStatus(null);
return new DAGJobStatus(dagClient.getApplicationReport(), dagStatus, jobFile);
} catch (TezException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
public class SleepProcessor extends AbstractLogicalIOProcessor {

private static final Logger LOG = LoggerFactory.getLogger(SleepProcessor.class);
public static final String SLEEP_VERTEX_NAME = "Sleep";

private int timeToSleepMS;
protected Map<String, LogicalInput> inputs;
Expand Down
23 changes: 16 additions & 7 deletions tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.VertexStatus;
import org.apache.tez.examples.OrderedWordCount;
import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
import org.apache.tez.runtime.api.Event;
Expand Down Expand Up @@ -131,6 +132,8 @@ public void testMultipleClientsWithSession() throws TezException, InterruptedExc
DAGClient dagClient1 = tezClient1.submitDAG(dag1);
dagClient1.waitForCompletion();
assertEquals(DAGStatus.State.SUCCEEDED, dagClient1.getDAGStatus(null).getState());
assertEquals(VertexStatus.State.SUCCEEDED,
dagClient1.getVertexStatus(SleepProcessor.SLEEP_VERTEX_NAME, null).getState());

dagClient1.close();
tezClient1.stop();
Expand All @@ -142,6 +145,8 @@ public void testMultipleClientsWithSession() throws TezException, InterruptedExc
DAGClient dagClient2 = tezClient2.submitDAG(dag2);
dagClient2.waitForCompletion();
assertEquals(DAGStatus.State.SUCCEEDED, dagClient2.getDAGStatus(null).getState());
assertEquals(VertexStatus.State.SUCCEEDED,
dagClient2.getVertexStatus(SleepProcessor.SLEEP_VERTEX_NAME, null).getState());
assertFalse(dagClient1.getExecutionContext().equals(dagClient2.getExecutionContext()));
dagClient2.close();
tezClient2.stop();
Expand All @@ -159,7 +164,8 @@ public void testMultipleClientsWithoutSession() throws TezException, Interrupted
DAGClient dagClient1 = tezClient1.submitDAG(dag1);
dagClient1.waitForCompletion();
assertEquals(DAGStatus.State.SUCCEEDED, dagClient1.getDAGStatus(null).getState());

assertEquals(VertexStatus.State.SUCCEEDED,
dagClient1.getVertexStatus(SleepProcessor.SLEEP_VERTEX_NAME, null).getState());
dagClient1.close();
tezClient1.stop();

Expand All @@ -171,6 +177,8 @@ public void testMultipleClientsWithoutSession() throws TezException, Interrupted
DAGClient dagClient2 = tezClient2.submitDAG(dag2);
dagClient2.waitForCompletion();
assertEquals(DAGStatus.State.SUCCEEDED, dagClient2.getDAGStatus(null).getState());
assertEquals(VertexStatus.State.SUCCEEDED,
dagClient2.getVertexStatus(SleepProcessor.SLEEP_VERTEX_NAME, null).getState());
assertFalse(dagClient1.getExecutionContext().equals(dagClient2.getExecutionContext()));
dagClient2.close();
tezClient2.stop();
Expand All @@ -189,7 +197,8 @@ public void testNoSysExitOnSuccessfulDAG() throws TezException, InterruptedExcep
DAGClient dagClient1 = tezClient1.submitDAG(dag1);
dagClient1.waitForCompletion();
assertEquals(DAGStatus.State.SUCCEEDED, dagClient1.getDAGStatus(null).getState());

assertEquals(VertexStatus.State.SUCCEEDED,
dagClient1.getVertexStatus(SleepProcessor.SLEEP_VERTEX_NAME, null).getState());
// Sleep for more time than is required for the DAG to complete.
Thread.sleep((long) (TezConstants.TEZ_DAG_SLEEP_TIME_BEFORE_EXIT * 1.5));

Expand All @@ -210,7 +219,8 @@ public void testNoSysExitOnFailinglDAG() throws TezException, InterruptedExcepti
DAGClient dagClient1 = tezClient1.submitDAG(dag1);
dagClient1.waitForCompletion();
assertEquals(DAGStatus.State.FAILED, dagClient1.getDAGStatus(null).getState());

assertEquals(VertexStatus.State.FAILED,
dagClient1.getVertexStatus(SleepProcessor.SLEEP_VERTEX_NAME, null).getState());
// Sleep for more time than is required for the DAG to complete.
Thread.sleep((long) (TezConstants.TEZ_DAG_SLEEP_TIME_BEFORE_EXIT * 1.5));

Expand Down Expand Up @@ -245,12 +255,11 @@ public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> out
}

private DAG createSimpleDAG(String dagName, String processorName) {
DAG dag = DAG.create(dagName).addVertex(Vertex.create("Sleep", ProcessorDescriptor.create(
processorName).setUserPayload(
new SleepProcessor.SleepProcessorConfig(1).toUserPayload()), 1));
DAG dag = DAG.create(dagName).addVertex(Vertex.create(SleepProcessor.SLEEP_VERTEX_NAME, ProcessorDescriptor
.create(processorName).setUserPayload(new SleepProcessor.SleepProcessorConfig(1).toUserPayload()), 1));
return dag;

}

@Test(timeout=30000)
public void testMultiDAGsOnSession() throws IOException, TezException, InterruptedException {
int dags = 2;//two dags will be submitted to session
Expand Down