Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions tez-dag/findbugs-exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -248,4 +248,29 @@
<Bug pattern="EI_EXPOSE_REP2"/>
</Match>

<!-- TEZ-4647 PluginManager EI_EXPOSE_REP warnings -->
<Match>
<Class name="org.apache.tez.dag.app.PluginManager"/>
<Method name="getTaskSchedulers"/>
<Bug pattern="EI_EXPOSE_REP"/>
</Match>

<Match>
<Class name="org.apache.tez.dag.app.PluginManager"/>
<Method name="getContainerLaunchers"/>
<Bug pattern="EI_EXPOSE_REP"/>
</Match>

<Match>
<Class name="org.apache.tez.dag.app.PluginManager"/>
<Method name="getTaskCommunicators"/>
<Bug pattern="EI_EXPOSE_REP"/>
</Match>

<Match>
<Class name="org.apache.tez.dag.app.PluginManager"/>
<Method name="&lt;init&gt;"/>
<Bug pattern="EI_EXPOSE_REP2"/>
</Match>

</FindBugsFilter>
163 changes: 13 additions & 150 deletions tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@
import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourcesProto;
import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto;
import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
import org.apache.tez.dag.app.RecoveryParser.DAGRecoveryData;
import org.apache.tez.dag.app.dag.DAG;
Expand Down Expand Up @@ -195,9 +194,6 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
Expand Down Expand Up @@ -254,7 +250,6 @@ public class DAGAppMaster extends AbstractService {
private final String workingDirectory;
private final String[] localDirs;
private final String[] logDirs;
private final AMPluginDescriptorProto amPluginDescriptorProto;
private HadoopShim hadoopShim;
private ContainerSignatureMatcher containerSignatureMatcher;
private AMContainerMap containers;
Expand Down Expand Up @@ -312,11 +307,8 @@ public class DAGAppMaster extends AbstractService {
private FileSystem recoveryFS;

private ListeningExecutorService execService;
private final PluginManager pluginManager;

// TODO May not need to be a bidi map
private final BiMap<String, Integer> taskSchedulers = HashBiMap.create();
private final BiMap<String, Integer> containerLaunchers = HashBiMap.create();
private final BiMap<String, Integer> taskCommunicators = HashBiMap.create();

/**
* set of already executed dag names.
Expand Down Expand Up @@ -376,7 +368,6 @@ public DAGAppMaster(ApplicationAttemptId applicationAttemptId,
this.dagVersionInfo = new TezDagVersionInfo();
this.clientVersion = clientVersion;
this.amCredentials = credentials;
this.amPluginDescriptorProto = pluginDescriptorProto;
this.appMasterUgi = UserGroupInformation
.createRemoteUser(jobUserName);
this.appMasterUgi.addCredentials(amCredentials);
Expand All @@ -387,6 +378,8 @@ public DAGAppMaster(ApplicationAttemptId applicationAttemptId,
LOG.info("Created DAGAppMaster for application " + applicationAttemptId
+ ", versionInfo=" + dagVersionInfo);
TezCommonUtils.logCredentials(LOG, this.appMasterUgi.getCredentials(), "am");

this.pluginManager = new PluginManager(pluginDescriptorProto);
}

// Pull this WebAppUtils function into Tez until YARN-4186
Expand Down Expand Up @@ -451,18 +444,10 @@ protected void serviceInit(final Configuration conf) throws Exception {

UserPayload defaultPayload = TezUtils.createUserPayloadFromConf(amConf);

List<NamedEntityDescriptor> taskSchedulerDescriptors = Lists.newLinkedList();
List<NamedEntityDescriptor> containerLauncherDescriptors = Lists.newLinkedList();
List<NamedEntityDescriptor> taskCommunicatorDescriptors = Lists.newLinkedList();

parseAllPlugins(taskSchedulerDescriptors, taskSchedulers, containerLauncherDescriptors,
containerLaunchers, taskCommunicatorDescriptors, taskCommunicators, amPluginDescriptorProto,
isLocal, defaultPayload);


LOG.info(buildPluginComponentLog(taskSchedulerDescriptors, taskSchedulers, "TaskSchedulers"));
LOG.info(buildPluginComponentLog(containerLauncherDescriptors, containerLaunchers, "ContainerLaunchers"));
LOG.info(buildPluginComponentLog(taskCommunicatorDescriptors, taskCommunicators, "TaskCommunicators"));
PluginManager.PluginDescriptors pluginDescriptors = pluginManager.parseAllPlugins(isLocal, defaultPayload);
List<NamedEntityDescriptor> taskSchedulerDescriptors = pluginDescriptors.getTaskSchedulerDescriptors();
List<NamedEntityDescriptor> containerLauncherDescriptors = pluginDescriptors.getContainerLauncherDescriptors();
List<NamedEntityDescriptor> taskCommunicatorDescriptors = pluginDescriptors.getTaskCommunicatorDescriptors();

boolean disableVersionCheck = conf.getBoolean(
TezConfiguration.TEZ_AM_DISABLE_CLIENT_VERSION_CHECK,
Expand Down Expand Up @@ -1672,32 +1657,32 @@ public Credentials getAppCredentials() {

@Override
public Integer getTaskCommunicatorIdentifier(String name) {
return taskCommunicators.get(name);
return pluginManager.getTaskCommunicators().get(name);
}

@Override
public Integer getTaskScheduerIdentifier(String name) {
return taskSchedulers.get(name);
return pluginManager.getTaskSchedulers().get(name);
}

@Override
public Integer getContainerLauncherIdentifier(String name) {
return containerLaunchers.get(name);
return pluginManager.getContainerLaunchers().get(name);
}

@Override
public String getTaskCommunicatorName(int taskCommId) {
return taskCommunicators.inverse().get(taskCommId);
return pluginManager.getTaskCommunicators().inverse().get(taskCommId);
}

@Override
public String getTaskSchedulerName(int schedulerId) {
return taskSchedulers.inverse().get(schedulerId);
return pluginManager.getTaskSchedulers().inverse().get(schedulerId);
}

@Override
public String getContainerLauncherName(int launcherId) {
return containerLaunchers.inverse().get(launcherId);
return pluginManager.getContainerLaunchers().inverse().get(launcherId);
}

@Override
Expand Down Expand Up @@ -2732,128 +2717,6 @@ public String getWebUIAddress() {
return webUIService == null ? null : webUIService.getBaseUrl();
}

@VisibleForTesting
public static void parseAllPlugins(
List<NamedEntityDescriptor> taskSchedulerDescriptors, BiMap<String, Integer> taskSchedulerPluginMap,
List<NamedEntityDescriptor> containerLauncherDescriptors, BiMap<String, Integer> containerLauncherPluginMap,
List<NamedEntityDescriptor> taskCommDescriptors, BiMap<String, Integer> taskCommPluginMap,
AMPluginDescriptorProto amPluginDescriptorProto, boolean isLocal, UserPayload defaultPayload) {

boolean tezYarnEnabled;
boolean uberEnabled;
if (!isLocal) {
if (amPluginDescriptorProto == null) {
tezYarnEnabled = true;
uberEnabled = false;
} else {
tezYarnEnabled = amPluginDescriptorProto.getContainersEnabled();
uberEnabled = amPluginDescriptorProto.getUberEnabled();
}
} else {
tezYarnEnabled = false;
uberEnabled = true;
}

parsePlugin(taskSchedulerDescriptors, taskSchedulerPluginMap,
(amPluginDescriptorProto == null || amPluginDescriptorProto.getTaskSchedulersCount() == 0 ?
null :
amPluginDescriptorProto.getTaskSchedulersList()),
tezYarnEnabled, uberEnabled, defaultPayload);
processSchedulerDescriptors(taskSchedulerDescriptors, isLocal, defaultPayload, taskSchedulerPluginMap);

parsePlugin(containerLauncherDescriptors, containerLauncherPluginMap,
(amPluginDescriptorProto == null ||
amPluginDescriptorProto.getContainerLaunchersCount() == 0 ? null :
amPluginDescriptorProto.getContainerLaunchersList()),
tezYarnEnabled, uberEnabled, defaultPayload);

parsePlugin(taskCommDescriptors, taskCommPluginMap,
(amPluginDescriptorProto == null ||
amPluginDescriptorProto.getTaskCommunicatorsCount() == 0 ? null :
amPluginDescriptorProto.getTaskCommunicatorsList()),
tezYarnEnabled, uberEnabled, defaultPayload);
}


@VisibleForTesting
public static void parsePlugin(List<NamedEntityDescriptor> resultList,
BiMap<String, Integer> pluginMap, List<TezNamedEntityDescriptorProto> namedEntityDescriptorProtos,
boolean tezYarnEnabled, boolean uberEnabled, UserPayload defaultPayload) {

if (tezYarnEnabled) {
// Default classnames will be populated by individual components
NamedEntityDescriptor r = new NamedEntityDescriptor(
TezConstants.getTezYarnServicePluginName(), null).setUserPayload(defaultPayload);
addDescriptor(resultList, pluginMap, r);
}

if (uberEnabled) {
// Default classnames will be populated by individual components
NamedEntityDescriptor r = new NamedEntityDescriptor(
TezConstants.getTezUberServicePluginName(), null).setUserPayload(defaultPayload);
addDescriptor(resultList, pluginMap, r);
}

if (namedEntityDescriptorProtos != null) {
for (TezNamedEntityDescriptorProto namedEntityDescriptorProto : namedEntityDescriptorProtos) {
NamedEntityDescriptor namedEntityDescriptor = DagTypeConverters
.convertNamedDescriptorFromProto(namedEntityDescriptorProto);
addDescriptor(resultList, pluginMap, namedEntityDescriptor);
}
}
}

@VisibleForTesting
static void addDescriptor(List<NamedEntityDescriptor> list, BiMap<String, Integer> pluginMap,
NamedEntityDescriptor namedEntityDescriptor) {
list.add(namedEntityDescriptor);
pluginMap.put(list.get(list.size() - 1).getEntityName(), list.size() - 1);
}

@VisibleForTesting
static void processSchedulerDescriptors(List<NamedEntityDescriptor> descriptors, boolean isLocal,
UserPayload defaultPayload,
BiMap<String, Integer> schedulerPluginMap) {
if (isLocal) {
boolean foundUberServiceName = false;
for (NamedEntityDescriptor descriptor : descriptors) {
if (descriptor.getEntityName().equals(TezConstants.getTezUberServicePluginName())) {
foundUberServiceName = true;
break;
}
}
Preconditions.checkState(foundUberServiceName);
} else {
boolean foundYarn = false;
for (int i = 0; i < descriptors.size(); i++) {
if (descriptors.get(i).getEntityName().equals(TezConstants.getTezYarnServicePluginName())) {
foundYarn = true;
break;
}
}
if (!foundYarn) {
NamedEntityDescriptor yarnDescriptor =
new NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), null)
.setUserPayload(defaultPayload);
addDescriptor(descriptors, schedulerPluginMap, yarnDescriptor);
}
}
}

String buildPluginComponentLog(List<NamedEntityDescriptor> namedEntityDescriptors, BiMap<String, Integer> map,
String component) {
StringBuilder sb = new StringBuilder();
sb.append("AM Level configured ").append(component).append(": ");
for (int i = 0; i < namedEntityDescriptors.size(); i++) {
sb.append("[").append(i).append(":").append(map.inverse().get(i))
.append(":").append(namedEntityDescriptors.get(i).getClassName()).append("]");
if (i != namedEntityDescriptors.size() - 1) {
sb.append(",");
}
}
return sb.toString();
}

public void vertexComplete(TezVertexID completedVertexID, Set<NodeId> nodesList) {
getContainerLauncherManager().vertexComplete(completedVertexID, jobTokenSecretManager, nodesList);
}
Expand Down
Loading