Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.tez.dag.records;

import org.apache.hadoop.yarn.api.records.ApplicationId;

public interface DAGIDAware {
TezDAGID getDAGID();

default ApplicationId getApplicationId() {
return getDAGID().getApplicationId();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.tez.dag.records;

public interface TaskAttemptIDAware extends TaskIDAware {
TezTaskAttemptID getTaskAttemptID();

@Override
default TezTaskID getTaskID() {
return getTaskAttemptID().getTaskID();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.tez.dag.records;

public interface TaskIDAware extends VertexIDAware {
TezTaskID getTaskID();

@Override
default TezVertexID getVertexID() {
return getTaskID().getVertexID();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class TezTaskAttemptID extends TezID {
public class TezTaskAttemptID extends TezID implements TaskIDAware {
public static final String ATTEMPT = "attempt";
private TezTaskID taskId;

Expand Down Expand Up @@ -73,6 +73,7 @@ private TezTaskAttemptID(TezTaskID taskId, int id) {
}

/** Returns the {@link TezTaskID} object that this task attempt belongs to */
@Override
public TezTaskID getTaskID() {
return taskId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class TezTaskID extends TezID {
public class TezTaskID extends TezID implements VertexIDAware {
public static final String TASK = "task";
private final int serializingHash;

Expand Down Expand Up @@ -79,6 +79,7 @@ public int getSerializingHash() {
}

/** Returns the {@link TezVertexID} object that this task belongs to */
@Override
public TezVertexID getVertexID() {
return vertexId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class TezVertexID extends TezID {
public class TezVertexID extends TezID implements DAGIDAware {
public static final String VERTEX = "vertex";
static final ThreadLocal<FastNumberFormat> tezVertexIdFormat = new ThreadLocal<FastNumberFormat>() {

Expand Down Expand Up @@ -79,7 +79,8 @@ private TezVertexID(TezDAGID dagId, int id) {
}

/** Returns the {@link TezDAGID} object that this tip belongs to */
public TezDAGID getDAGId() {
@Override
public TezDAGID getDAGID() {
return dagId;
}

Expand Down Expand Up @@ -158,5 +159,4 @@ public static TezVertexID fromString(String vertexIdStr) {
}
return null;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.tez.dag.records;

public interface VertexIDAware extends DAGIDAware {
TezVertexID getVertexID();

@Override
default TezDAGID getDAGID() {
return getVertexID().getDAGID();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class VertexIdentifierImpl implements VertexIdentifier {
public VertexIdentifierImpl(String dagName, String vertexName, TezVertexID vertexId) {
this.vertexId = vertexId;
this.vertexName = vertexName;
this.dagIdentifier = new DagIdentifierImpl(dagName, vertexId.getDAGId());
this.dagIdentifier = new DagIdentifierImpl(dagName, vertexId.getDAGID());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ private void verifyDagInfo(String[] splits, TezDAGID dagId) {
}

private void verifyVertexInfo(String[] splits, TezVertexID vId) {
verifyDagInfo(splits, vId.getDAGId());
verifyDagInfo(splits, vId.getDAGID());
Assert.assertEquals(vId.getId(),
Integer.valueOf(splits[4]).intValue());
}
Expand Down
52 changes: 26 additions & 26 deletions tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ public class DAGAppMaster extends AbstractService {
private Path currentRecoveryDataDir;
private Path tezSystemStagingDir;
private FileSystem recoveryFS;

private ExecutorService rawExecutor;
private ListeningExecutorService execService;

Expand Down Expand Up @@ -335,7 +335,7 @@ public class DAGAppMaster extends AbstractService {
private String clientVersion;
private boolean versionMismatch = false;
private String versionMismatchDiagnostics;

private ResourceCalculatorProcessTree cpuPlugin;
private GcTimeUpdater gcPlugin;

Expand Down Expand Up @@ -387,7 +387,7 @@ private static String getRunningLogURL(String nodeHttpAddress,
return String.format("%s/node/containerlogs/%s/%s", nodeHttpAddress,
containerId, user);
}

private void initResourceCalculatorPlugins() {
Class<? extends ResourceCalculatorProcessTree> clazz = amConf.getClass(
TezConfiguration.TEZ_TASK_RESOURCE_CALCULATOR_PROCESS_TREE_CLASS,
Expand All @@ -402,10 +402,10 @@ private void initResourceCalculatorPlugins() {
pid = processName.split("@")[0];
}
cpuPlugin = ResourceCalculatorProcessTree.getResourceCalculatorProcessTree(pid, clazz, amConf);

gcPlugin = new GcTimeUpdater(null);
}

private long getAMCPUTime() {
if (cpuPlugin != null) {
cpuPlugin.updateProcessTree();
Expand Down Expand Up @@ -559,14 +559,14 @@ protected void serviceInit(final Configuration conf) throws Exception {
dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDispatcher());
} else {
int concurrency = conf.getInt(TezConfiguration.TEZ_AM_CONCURRENT_DISPATCHER_CONCURRENCY,
int concurrency = conf.getInt(TezConfiguration.TEZ_AM_CONCURRENT_DISPATCHER_CONCURRENCY,
TezConfiguration.TEZ_AM_CONCURRENT_DISPATCHER_CONCURRENCY_DEFAULT);
AsyncDispatcherConcurrent sharedDispatcher = dispatcher.registerAndCreateDispatcher(
TaskEventType.class, new TaskEventDispatcher(), "TaskAndAttemptEventThread", concurrency);
dispatcher.registerWithExistingDispatcher(TaskAttemptEventType.class,
new TaskAttemptEventDispatcher(), sharedDispatcher);
}

// register other delegating dispatchers
dispatcher.registerAndCreateDispatcher(SpeculatorEventType.class, new SpeculatorEventHandler(),
"Speculator");
Expand Down Expand Up @@ -658,7 +658,7 @@ protected TaskSchedulerManager createTaskSchedulerManager(
protected ContainerSignatureMatcher createContainerSignatureMatcher() {
return new ContainerContextMatcher();
}

@VisibleForTesting
protected AsyncDispatcher createDispatcher() {
return new AsyncDispatcher("Central");
Expand All @@ -677,7 +677,7 @@ protected void sysexit() {
System.exit(0);
}
}

@VisibleForTesting
protected TaskSchedulerManager getTaskSchedulerManager() {
return taskSchedulerManager;
Expand Down Expand Up @@ -1401,7 +1401,7 @@ public void tryKillDAG(DAG dag, String message) throws TezException {
}
dispatcher.getEventHandler().handle(new DAGEventTerminateDag(dag.getID(), DAGTerminationCause.DAG_KILL, message));
}

private Map<String, LocalResource> getAdditionalLocalResourceDiff(
DAG dag, Map<String, LocalResource> additionalResources) throws TezException {
if (additionalResources == null) {
Expand Down Expand Up @@ -1553,7 +1553,7 @@ public long getStartTime() {
public DAG getCurrentDAG() {
return dag;
}

@Override
public ListeningExecutorService getExecService() {
return execService;
Expand Down Expand Up @@ -1754,7 +1754,7 @@ public void setDAG(DAG dag) {
public long getCumulativeCPUTime() {
return getAMCPUTime();
}

@Override
public long getCumulativeGCTime() {
return getAMGCTime();
Expand Down Expand Up @@ -1984,7 +1984,7 @@ private DAGRecoveryData recoverDAG() throws IOException, TezException {
}
return null;
}

@Override
public void serviceStart() throws Exception {
//start all the components
Expand Down Expand Up @@ -2222,7 +2222,7 @@ private class DagEventDispatcher implements EventHandler<DAGEvent> {
@Override
public void handle(DAGEvent event) {
DAG dag = context.getCurrentDAG();
int eventDagIndex = event.getDAGId().getId();
int eventDagIndex = event.getDAGID().getId();
if (dag == null || eventDagIndex != dag.getID().getId()) {
return; // event not relevant any more
}
Expand All @@ -2235,18 +2235,18 @@ private class TaskEventDispatcher implements EventHandler<TaskEvent> {
@Override
public void handle(TaskEvent event) {
DAG dag = context.getCurrentDAG();
int eventDagIndex =
event.getTaskID().getVertexID().getDAGId().getId();
int eventDagIndex =
event.getDAGID().getId();
if (dag == null || eventDagIndex != dag.getID().getId()) {
return; // event not relevant any more
}
Task task =
dag.getVertex(event.getTaskID().getVertexID()).
dag.getVertex(event.getVertexID()).
getTask(event.getTaskID());
((EventHandler<TaskEvent>)task).handle(event);
}
}

private class SpeculatorEventHandler implements EventHandler<SpeculatorEvent> {
@Override
public void handle(SpeculatorEvent event) {
Expand All @@ -2265,14 +2265,14 @@ private class TaskAttemptEventDispatcher
@Override
public void handle(TaskAttemptEvent event) {
DAG dag = context.getCurrentDAG();
int eventDagIndex =
event.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId();
int eventDagIndex =
event.getDAGID().getId();
if (dag == null || eventDagIndex != dag.getID().getId()) {
return; // event not relevant any more
}
Task task =
dag.getVertex(event.getTaskAttemptID().getTaskID().getVertexID()).
getTask(event.getTaskAttemptID().getTaskID());
dag.getVertex(event.getVertexID()).
getTask(event.getTaskID());
TaskAttempt attempt = task.getAttempt(event.getTaskAttemptID());
((EventHandler<TaskAttemptEvent>) attempt).handle(event);
}
Expand All @@ -2284,14 +2284,14 @@ private class VertexEventDispatcher
@Override
public void handle(VertexEvent event) {
DAG dag = context.getCurrentDAG();
int eventDagIndex =
event.getVertexId().getDAGId().getId();
int eventDagIndex =
event.getDAGID().getId();
if (dag == null || eventDagIndex != dag.getID().getId()) {
return; // event not relevant any more
}

Vertex vertex =
dag.getVertex(event.getVertexId());
dag.getVertex(event.getVertexID());
((EventHandler<VertexEvent>) vertex).handle(event);
}
}
Expand Down
Loading