diff --git a/tez-dag/findbugs-exclude.xml b/tez-dag/findbugs-exclude.xml
index e8755832f4..9ee1b3d946 100644
--- a/tez-dag/findbugs-exclude.xml
+++ b/tez-dag/findbugs-exclude.xml
@@ -248,4 +248,29 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 99579c7ff7..ec4a89be03 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -132,7 +132,6 @@
import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourcesProto;
-import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto;
import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
import org.apache.tez.dag.app.RecoveryParser.DAGRecoveryData;
import org.apache.tez.dag.app.dag.DAG;
@@ -195,9 +194,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
-import com.google.common.collect.BiMap;
-import com.google.common.collect.HashBiMap;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
@@ -254,7 +250,6 @@ public class DAGAppMaster extends AbstractService {
private final String workingDirectory;
private final String[] localDirs;
private final String[] logDirs;
- private final AMPluginDescriptorProto amPluginDescriptorProto;
private HadoopShim hadoopShim;
private ContainerSignatureMatcher containerSignatureMatcher;
private AMContainerMap containers;
@@ -312,11 +307,8 @@ public class DAGAppMaster extends AbstractService {
private FileSystem recoveryFS;
private ListeningExecutorService execService;
+ private final PluginManager pluginManager;
- // TODO May not need to be a bidi map
- private final BiMap taskSchedulers = HashBiMap.create();
- private final BiMap containerLaunchers = HashBiMap.create();
- private final BiMap taskCommunicators = HashBiMap.create();
/**
* set of already executed dag names.
@@ -376,7 +368,6 @@ public DAGAppMaster(ApplicationAttemptId applicationAttemptId,
this.dagVersionInfo = new TezDagVersionInfo();
this.clientVersion = clientVersion;
this.amCredentials = credentials;
- this.amPluginDescriptorProto = pluginDescriptorProto;
this.appMasterUgi = UserGroupInformation
.createRemoteUser(jobUserName);
this.appMasterUgi.addCredentials(amCredentials);
@@ -387,6 +378,8 @@ public DAGAppMaster(ApplicationAttemptId applicationAttemptId,
LOG.info("Created DAGAppMaster for application " + applicationAttemptId
+ ", versionInfo=" + dagVersionInfo);
TezCommonUtils.logCredentials(LOG, this.appMasterUgi.getCredentials(), "am");
+
+ this.pluginManager = new PluginManager(pluginDescriptorProto);
}
// Pull this WebAppUtils function into Tez until YARN-4186
@@ -451,18 +444,10 @@ protected void serviceInit(final Configuration conf) throws Exception {
UserPayload defaultPayload = TezUtils.createUserPayloadFromConf(amConf);
- List taskSchedulerDescriptors = Lists.newLinkedList();
- List containerLauncherDescriptors = Lists.newLinkedList();
- List taskCommunicatorDescriptors = Lists.newLinkedList();
-
- parseAllPlugins(taskSchedulerDescriptors, taskSchedulers, containerLauncherDescriptors,
- containerLaunchers, taskCommunicatorDescriptors, taskCommunicators, amPluginDescriptorProto,
- isLocal, defaultPayload);
-
-
- LOG.info(buildPluginComponentLog(taskSchedulerDescriptors, taskSchedulers, "TaskSchedulers"));
- LOG.info(buildPluginComponentLog(containerLauncherDescriptors, containerLaunchers, "ContainerLaunchers"));
- LOG.info(buildPluginComponentLog(taskCommunicatorDescriptors, taskCommunicators, "TaskCommunicators"));
+ PluginManager.PluginDescriptors pluginDescriptors = pluginManager.parseAllPlugins(isLocal, defaultPayload);
+ List taskSchedulerDescriptors = pluginDescriptors.getTaskSchedulerDescriptors();
+ List containerLauncherDescriptors = pluginDescriptors.getContainerLauncherDescriptors();
+ List taskCommunicatorDescriptors = pluginDescriptors.getTaskCommunicatorDescriptors();
boolean disableVersionCheck = conf.getBoolean(
TezConfiguration.TEZ_AM_DISABLE_CLIENT_VERSION_CHECK,
@@ -1672,32 +1657,32 @@ public Credentials getAppCredentials() {
@Override
public Integer getTaskCommunicatorIdentifier(String name) {
- return taskCommunicators.get(name);
+ return pluginManager.getTaskCommunicators().get(name);
}
@Override
public Integer getTaskScheduerIdentifier(String name) {
- return taskSchedulers.get(name);
+ return pluginManager.getTaskSchedulers().get(name);
}
@Override
public Integer getContainerLauncherIdentifier(String name) {
- return containerLaunchers.get(name);
+ return pluginManager.getContainerLaunchers().get(name);
}
@Override
public String getTaskCommunicatorName(int taskCommId) {
- return taskCommunicators.inverse().get(taskCommId);
+ return pluginManager.getTaskCommunicators().inverse().get(taskCommId);
}
@Override
public String getTaskSchedulerName(int schedulerId) {
- return taskSchedulers.inverse().get(schedulerId);
+ return pluginManager.getTaskSchedulers().inverse().get(schedulerId);
}
@Override
public String getContainerLauncherName(int launcherId) {
- return containerLaunchers.inverse().get(launcherId);
+ return pluginManager.getContainerLaunchers().inverse().get(launcherId);
}
@Override
@@ -2732,128 +2717,6 @@ public String getWebUIAddress() {
return webUIService == null ? null : webUIService.getBaseUrl();
}
- @VisibleForTesting
- public static void parseAllPlugins(
- List taskSchedulerDescriptors, BiMap taskSchedulerPluginMap,
- List containerLauncherDescriptors, BiMap containerLauncherPluginMap,
- List taskCommDescriptors, BiMap taskCommPluginMap,
- AMPluginDescriptorProto amPluginDescriptorProto, boolean isLocal, UserPayload defaultPayload) {
-
- boolean tezYarnEnabled;
- boolean uberEnabled;
- if (!isLocal) {
- if (amPluginDescriptorProto == null) {
- tezYarnEnabled = true;
- uberEnabled = false;
- } else {
- tezYarnEnabled = amPluginDescriptorProto.getContainersEnabled();
- uberEnabled = amPluginDescriptorProto.getUberEnabled();
- }
- } else {
- tezYarnEnabled = false;
- uberEnabled = true;
- }
-
- parsePlugin(taskSchedulerDescriptors, taskSchedulerPluginMap,
- (amPluginDescriptorProto == null || amPluginDescriptorProto.getTaskSchedulersCount() == 0 ?
- null :
- amPluginDescriptorProto.getTaskSchedulersList()),
- tezYarnEnabled, uberEnabled, defaultPayload);
- processSchedulerDescriptors(taskSchedulerDescriptors, isLocal, defaultPayload, taskSchedulerPluginMap);
-
- parsePlugin(containerLauncherDescriptors, containerLauncherPluginMap,
- (amPluginDescriptorProto == null ||
- amPluginDescriptorProto.getContainerLaunchersCount() == 0 ? null :
- amPluginDescriptorProto.getContainerLaunchersList()),
- tezYarnEnabled, uberEnabled, defaultPayload);
-
- parsePlugin(taskCommDescriptors, taskCommPluginMap,
- (amPluginDescriptorProto == null ||
- amPluginDescriptorProto.getTaskCommunicatorsCount() == 0 ? null :
- amPluginDescriptorProto.getTaskCommunicatorsList()),
- tezYarnEnabled, uberEnabled, defaultPayload);
- }
-
-
- @VisibleForTesting
- public static void parsePlugin(List resultList,
- BiMap pluginMap, List namedEntityDescriptorProtos,
- boolean tezYarnEnabled, boolean uberEnabled, UserPayload defaultPayload) {
-
- if (tezYarnEnabled) {
- // Default classnames will be populated by individual components
- NamedEntityDescriptor r = new NamedEntityDescriptor(
- TezConstants.getTezYarnServicePluginName(), null).setUserPayload(defaultPayload);
- addDescriptor(resultList, pluginMap, r);
- }
-
- if (uberEnabled) {
- // Default classnames will be populated by individual components
- NamedEntityDescriptor r = new NamedEntityDescriptor(
- TezConstants.getTezUberServicePluginName(), null).setUserPayload(defaultPayload);
- addDescriptor(resultList, pluginMap, r);
- }
-
- if (namedEntityDescriptorProtos != null) {
- for (TezNamedEntityDescriptorProto namedEntityDescriptorProto : namedEntityDescriptorProtos) {
- NamedEntityDescriptor namedEntityDescriptor = DagTypeConverters
- .convertNamedDescriptorFromProto(namedEntityDescriptorProto);
- addDescriptor(resultList, pluginMap, namedEntityDescriptor);
- }
- }
- }
-
- @VisibleForTesting
- static void addDescriptor(List list, BiMap pluginMap,
- NamedEntityDescriptor namedEntityDescriptor) {
- list.add(namedEntityDescriptor);
- pluginMap.put(list.get(list.size() - 1).getEntityName(), list.size() - 1);
- }
-
- @VisibleForTesting
- static void processSchedulerDescriptors(List descriptors, boolean isLocal,
- UserPayload defaultPayload,
- BiMap schedulerPluginMap) {
- if (isLocal) {
- boolean foundUberServiceName = false;
- for (NamedEntityDescriptor descriptor : descriptors) {
- if (descriptor.getEntityName().equals(TezConstants.getTezUberServicePluginName())) {
- foundUberServiceName = true;
- break;
- }
- }
- Preconditions.checkState(foundUberServiceName);
- } else {
- boolean foundYarn = false;
- for (int i = 0; i < descriptors.size(); i++) {
- if (descriptors.get(i).getEntityName().equals(TezConstants.getTezYarnServicePluginName())) {
- foundYarn = true;
- break;
- }
- }
- if (!foundYarn) {
- NamedEntityDescriptor yarnDescriptor =
- new NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), null)
- .setUserPayload(defaultPayload);
- addDescriptor(descriptors, schedulerPluginMap, yarnDescriptor);
- }
- }
- }
-
- String buildPluginComponentLog(List namedEntityDescriptors, BiMap map,
- String component) {
- StringBuilder sb = new StringBuilder();
- sb.append("AM Level configured ").append(component).append(": ");
- for (int i = 0; i < namedEntityDescriptors.size(); i++) {
- sb.append("[").append(i).append(":").append(map.inverse().get(i))
- .append(":").append(namedEntityDescriptors.get(i).getClassName()).append("]");
- if (i != namedEntityDescriptors.size() - 1) {
- sb.append(",");
- }
- }
- return sb.toString();
- }
-
public void vertexComplete(TezVertexID completedVertexID, Set nodesList) {
getContainerLauncherManager().vertexComplete(completedVertexID, jobTokenSecretManager, nodesList);
}
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/PluginManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/PluginManager.java
new file mode 100644
index 0000000000..24f6077581
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/PluginManager.java
@@ -0,0 +1,264 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.dag.app;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.tez.dag.api.DagTypeConverters;
+import org.apache.tez.dag.api.NamedEntityDescriptor;
+import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto;
+import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import com.google.common.collect.Lists;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manager for AM service plugins.
+ *
+ * This component parses the configured plugins for TaskSchedulers,
+ * ContainerLaunchers, and TaskCommunicators from the AM configuration,
+ * maintains their name-to-identifier mappings, and provides the parsed
+ * descriptor lists used to initialize the corresponding managers.
+ */
+public class PluginManager {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PluginManager.class);
+
+ private final AMPluginDescriptorProto amPluginDescriptorProto;
+
+ // Plugin maps for task schedulers, container launchers, and task communicators
+ private final BiMap taskSchedulers = HashBiMap.create();
+ private final BiMap containerLaunchers = HashBiMap.create();
+ private final BiMap taskCommunicators = HashBiMap.create();
+
+ /**
+ * Wrapper for parsed plugin descriptors.
+ *
+ * The descriptor lists exposed by this class are unmodifiable snapshots
+ * created at parse time. Callers must not attempt to modify these
+ * collections; any modification attempts will throw an exception.
+ */
+ public static final class PluginDescriptors {
+ private final List taskSchedulerDescriptors;
+ private final List containerLauncherDescriptors;
+ private final List taskCommunicatorDescriptors;
+
+ public PluginDescriptors(List taskSchedulerDescriptors,
+ List containerLauncherDescriptors,
+ List taskCommunicatorDescriptors) {
+ this.taskSchedulerDescriptors = Collections.unmodifiableList(taskSchedulerDescriptors);
+ this.containerLauncherDescriptors = Collections.unmodifiableList(containerLauncherDescriptors);
+ this.taskCommunicatorDescriptors = Collections.unmodifiableList(taskCommunicatorDescriptors);
+ }
+
+ public List getTaskSchedulerDescriptors() {
+ return taskSchedulerDescriptors;
+ }
+
+ public List getContainerLauncherDescriptors() {
+ return containerLauncherDescriptors;
+ }
+
+ public List getTaskCommunicatorDescriptors() {
+ return taskCommunicatorDescriptors;
+ }
+ }
+
+ public PluginManager() {
+ this(null);
+ }
+
+ public PluginManager(AMPluginDescriptorProto amPluginDescriptorProto) {
+ this.amPluginDescriptorProto = amPluginDescriptorProto;
+ }
+
+ /**
+ * Parse all plugins for task schedulers, container launchers, and task communicators.
+ */
+ public PluginDescriptors parseAllPlugins(boolean isLocal, UserPayload defaultPayload) {
+
+ List taskSchedulerDescriptors = Lists.newLinkedList();
+ List containerLauncherDescriptors = Lists.newLinkedList();
+ List taskCommDescriptors = Lists.newLinkedList();
+
+ boolean tezYarnEnabled;
+ boolean uberEnabled;
+ if (!isLocal) {
+ if (amPluginDescriptorProto == null) {
+ tezYarnEnabled = true;
+ uberEnabled = false;
+ } else {
+ tezYarnEnabled = amPluginDescriptorProto.getContainersEnabled();
+ uberEnabled = amPluginDescriptorProto.getUberEnabled();
+ }
+ } else {
+ tezYarnEnabled = false;
+ uberEnabled = true;
+ }
+
+ // parse task scheduler plugins
+ parsePlugin(taskSchedulerDescriptors, taskSchedulers,
+ (amPluginDescriptorProto == null || amPluginDescriptorProto.getTaskSchedulersCount() == 0 ?
+ null :
+ amPluginDescriptorProto.getTaskSchedulersList()),
+ tezYarnEnabled, uberEnabled, defaultPayload);
+
+ // post-process task scheduler plugin descriptors
+ processSchedulerDescriptors(taskSchedulerDescriptors, isLocal, defaultPayload, taskSchedulers);
+
+ // parse container launcher plugins
+ parsePlugin(containerLauncherDescriptors, containerLaunchers,
+ (amPluginDescriptorProto == null ||
+ amPluginDescriptorProto.getContainerLaunchersCount() == 0 ? null :
+ amPluginDescriptorProto.getContainerLaunchersList()),
+ tezYarnEnabled, uberEnabled, defaultPayload);
+
+ // parse task communicator plugins
+ parsePlugin(taskCommDescriptors, taskCommunicators,
+ (amPluginDescriptorProto == null ||
+ amPluginDescriptorProto.getTaskCommunicatorsCount() == 0 ? null :
+ amPluginDescriptorProto.getTaskCommunicatorsList()),
+ tezYarnEnabled, uberEnabled, defaultPayload);
+
+ // Log plugin component information
+ LOG.info(buildPluginComponentLog(taskSchedulerDescriptors, taskSchedulers, "TaskSchedulers"));
+ LOG.info(buildPluginComponentLog(containerLauncherDescriptors, containerLaunchers, "ContainerLaunchers"));
+ LOG.info(buildPluginComponentLog(taskCommDescriptors, taskCommunicators, "TaskCommunicators"));
+
+ return new PluginDescriptors(taskSchedulerDescriptors, containerLauncherDescriptors, taskCommDescriptors);
+ }
+
+ /**
+ * Parse a specific plugin type.
+ */
+ @VisibleForTesting
+ public static void parsePlugin(List resultList,
+ BiMap pluginMap, List namedEntityDescriptorProtos,
+ boolean tezYarnEnabled, boolean uberEnabled, UserPayload defaultPayload) {
+
+ if (tezYarnEnabled) {
+ // Default classnames will be populated by individual components
+ NamedEntityDescriptor descriptor = new NamedEntityDescriptor(
+ TezConstants.getTezYarnServicePluginName(), null).setUserPayload(defaultPayload);
+ addDescriptor(resultList, pluginMap, descriptor);
+ }
+
+ if (uberEnabled) {
+ // Default classnames will be populated by individual components
+ NamedEntityDescriptor descriptor = new NamedEntityDescriptor(
+ TezConstants.getTezUberServicePluginName(), null).setUserPayload(defaultPayload);
+ addDescriptor(resultList, pluginMap, descriptor);
+ }
+
+ if (namedEntityDescriptorProtos != null) {
+ for (TezNamedEntityDescriptorProto namedEntityDescriptorProto : namedEntityDescriptorProtos) {
+ NamedEntityDescriptor descriptor = DagTypeConverters
+ .convertNamedDescriptorFromProto(namedEntityDescriptorProto);
+ addDescriptor(resultList, pluginMap, descriptor);
+ }
+ }
+ }
+
+ /**
+ * Add a descriptor to the list and map.
+ */
+ public static void addDescriptor(List list, BiMap pluginMap,
+ NamedEntityDescriptor namedEntityDescriptor) {
+ list.add(namedEntityDescriptor);
+ pluginMap.put(list.getLast().getEntityName(), list.size() - 1);
+ }
+
+ /**
+ * Process scheduler descriptors with framework-specific logic.
+ */
+ public void processSchedulerDescriptors(List descriptors, boolean isLocal,
+ UserPayload defaultPayload,
+ BiMap schedulerPluginMap) {
+ if (isLocal) {
+ boolean foundUberServiceName = false;
+ for (NamedEntityDescriptor> descriptor : descriptors) {
+ if (descriptor.getEntityName().equals(TezConstants.getTezUberServicePluginName())) {
+ foundUberServiceName = true;
+ break;
+ }
+ }
+ Preconditions.checkState(foundUberServiceName);
+ } else {
+ boolean foundYarn = false;
+ for (NamedEntityDescriptor descriptor : descriptors) {
+ if (descriptor.getEntityName().equals(TezConstants.getTezYarnServicePluginName())) {
+ foundYarn = true;
+ break;
+ }
+ }
+ if (!foundYarn) {
+ NamedEntityDescriptor> yarnDescriptor =
+ new NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), null)
+ .setUserPayload(defaultPayload);
+ addDescriptor(descriptors, schedulerPluginMap, yarnDescriptor);
+ }
+ }
+ }
+
+ /**
+ * Get the task schedulers map.
+ */
+ public BiMap getTaskSchedulers() {
+ return taskSchedulers;
+ }
+
+ /**
+ * Get the container launchers map.
+ */
+ public BiMap getContainerLaunchers() {
+ return containerLaunchers;
+ }
+
+ /**
+ * Get the task communicators map.
+ */
+ public BiMap getTaskCommunicators() {
+ return taskCommunicators;
+ }
+
+ /**
+ * Build a log message for plugin component information.
+ */
+ private String buildPluginComponentLog(List namedEntityDescriptors, BiMap map,
+ String component) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("AM Level configured ").append(component).append(": ");
+ for (int i = 0; i < namedEntityDescriptors.size(); i++) {
+ sb.append("[").append(i).append(":").append(map.inverse().get(i))
+ .append(":").append(namedEntityDescriptors.get(i).getClassName()).append("]");
+ if (i != namedEntityDescriptors.size() - 1) {
+ sb.append(",");
+ }
+ }
+ return sb.toString();
+ }
+}
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java
index 85a8248b95..01a61b64e5 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java
@@ -87,7 +87,6 @@
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
-import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import org.junit.After;
@@ -158,7 +157,7 @@ public void testPluginParsing() throws IOException {
// Test empty descriptor list, yarn enabled
pluginMap.clear();
entities = new LinkedList<>();
- DAGAppMaster.parsePlugin(entities, pluginMap, null, true, false, defaultPayload);
+ PluginManager.parsePlugin(entities, pluginMap, null, true, false, defaultPayload);
assertEquals(1, pluginMap.size());
assertEquals(1, entities.size());
assertTrue(pluginMap.containsKey(TezConstants.getTezYarnServicePluginName()));
@@ -169,7 +168,7 @@ public void testPluginParsing() throws IOException {
// Test empty descriptor list, uber enabled
pluginMap.clear();
entities = new LinkedList<>();
- DAGAppMaster.parsePlugin(entities, pluginMap, null, false, true, defaultPayload);
+ PluginManager.parsePlugin(entities, pluginMap, null, false, true, defaultPayload);
assertEquals(1, pluginMap.size());
assertEquals(1, entities.size());
assertTrue(pluginMap.containsKey(TezConstants.getTezUberServicePluginName()));
@@ -180,7 +179,7 @@ public void testPluginParsing() throws IOException {
// Test empty descriptor list, yarn enabled, uber enabled
pluginMap.clear();
entities = new LinkedList<>();
- DAGAppMaster.parsePlugin(entities, pluginMap, null, true, true, defaultPayload);
+ PluginManager.parsePlugin(entities, pluginMap, null, true, true, defaultPayload);
assertEquals(2, pluginMap.size());
assertEquals(2, entities.size());
assertTrue(pluginMap.containsKey(TezConstants.getTezYarnServicePluginName()));
@@ -203,7 +202,7 @@ public void testPluginParsing() throws IOException {
// Test descriptor, no yarn, no uber
pluginMap.clear();
entities = new LinkedList<>();
- DAGAppMaster.parsePlugin(entities, pluginMap, entityDescriptors, false, false, defaultPayload);
+ PluginManager.parsePlugin(entities, pluginMap, entityDescriptors, false, false, defaultPayload);
assertEquals(1, pluginMap.size());
assertEquals(1, entities.size());
assertTrue(pluginMap.containsKey(pluginName));
@@ -212,7 +211,7 @@ public void testPluginParsing() throws IOException {
// Test descriptor, yarn and uber
pluginMap.clear();
entities = new LinkedList<>();
- DAGAppMaster.parsePlugin(entities, pluginMap, entityDescriptors, true, true, defaultPayload);
+ PluginManager.parsePlugin(entities, pluginMap, entityDescriptors, true, true, defaultPayload);
assertEquals(3, pluginMap.size());
assertEquals(3, entities.size());
assertTrue(pluginMap.containsKey(TezConstants.getTezYarnServicePluginName()));
@@ -227,43 +226,37 @@ public void testPluginParsing() throws IOException {
@Test(timeout = 5000)
public void testParseAllPluginsNoneSpecified() throws IOException {
+ PluginManager pluginManager = new PluginManager();
Configuration conf = new Configuration(false);
conf.set(TEST_KEY, TEST_VAL);
UserPayload defaultPayload = TezUtils.createUserPayloadFromConf(conf);
- List tsDescriptors;
- BiMap tsMap;
- List clDescriptors;
- BiMap clMap;
- List tcDescriptors;
- BiMap tcMap;
-
-
// No plugins. Non local
- tsDescriptors = Lists.newLinkedList();
- tsMap = HashBiMap.create();
- clDescriptors = Lists.newLinkedList();
- clMap = HashBiMap.create();
- tcDescriptors = Lists.newLinkedList();
- tcMap = HashBiMap.create();
- DAGAppMaster.parseAllPlugins(tsDescriptors, tsMap, clDescriptors, clMap, tcDescriptors, tcMap,
- null, false, defaultPayload);
- verifyDescAndMap(tsDescriptors, tsMap, 1, true, TezConstants.getTezYarnServicePluginName());
- verifyDescAndMap(clDescriptors, clMap, 1, true, TezConstants.getTezYarnServicePluginName());
- verifyDescAndMap(tcDescriptors, tcMap, 1, true, TezConstants.getTezYarnServicePluginName());
+ PluginManager.PluginDescriptors pluginDescriptorsNonLocal = pluginManager.parseAllPlugins(false, defaultPayload);
+
+ verifyDescAndMap(pluginDescriptorsNonLocal.getTaskSchedulerDescriptors(),
+ pluginManager.getTaskSchedulers(), 1, true,
+ TezConstants.getTezYarnServicePluginName());
+ verifyDescAndMap(pluginDescriptorsNonLocal.getContainerLauncherDescriptors(),
+ pluginManager.getContainerLaunchers(), 1, true,
+ TezConstants.getTezYarnServicePluginName());
+ verifyDescAndMap(pluginDescriptorsNonLocal.getTaskCommunicatorDescriptors(),
+ pluginManager.getTaskCommunicators(), 1, true,
+ TezConstants.getTezYarnServicePluginName());
// No plugins. Local
- tsDescriptors = Lists.newLinkedList();
- tsMap = HashBiMap.create();
- clDescriptors = Lists.newLinkedList();
- clMap = HashBiMap.create();
- tcDescriptors = Lists.newLinkedList();
- tcMap = HashBiMap.create();
- DAGAppMaster.parseAllPlugins(tsDescriptors, tsMap, clDescriptors, clMap, tcDescriptors, tcMap,
- null, true, defaultPayload);
- verifyDescAndMap(tsDescriptors, tsMap, 1, true, TezConstants.getTezUberServicePluginName());
- verifyDescAndMap(clDescriptors, clMap, 1, true, TezConstants.getTezUberServicePluginName());
- verifyDescAndMap(tcDescriptors, tcMap, 1, true, TezConstants.getTezUberServicePluginName());
+ pluginManager = new PluginManager();
+
+ PluginManager.PluginDescriptors pluginDescriptorsLocal = pluginManager.parseAllPlugins(true, defaultPayload);
+ verifyDescAndMap(pluginDescriptorsLocal.getTaskSchedulerDescriptors(),
+ pluginManager.getTaskSchedulers(), 1, true,
+ TezConstants.getTezUberServicePluginName());
+ verifyDescAndMap(pluginDescriptorsLocal.getContainerLauncherDescriptors(),
+ pluginManager.getContainerLaunchers(), 1, true,
+ TezConstants.getTezUberServicePluginName());
+ verifyDescAndMap(pluginDescriptorsLocal.getTaskCommunicatorDescriptors(),
+ pluginManager.getTaskCommunicators(), 1, true,
+ TezConstants.getTezUberServicePluginName());
}
@Test(timeout = 5000)
@@ -276,30 +269,19 @@ public void testParseAllPluginsOnlyCustomSpecified() throws IOException {
AMPluginDescriptorProto proto = createAmPluginDescriptor(false, false, true, payloadProto);
- List tsDescriptors;
- BiMap tsMap;
- List clDescriptors;
- BiMap clMap;
- List tcDescriptors;
- BiMap tcMap;
-
-
// Only plugin, Yarn.
- tsDescriptors = Lists.newLinkedList();
- tsMap = HashBiMap.create();
- clDescriptors = Lists.newLinkedList();
- clMap = HashBiMap.create();
- tcDescriptors = Lists.newLinkedList();
- tcMap = HashBiMap.create();
- DAGAppMaster.parseAllPlugins(tsDescriptors, tsMap, clDescriptors, clMap, tcDescriptors, tcMap,
- proto, false, defaultPayload);
- verifyDescAndMap(tsDescriptors, tsMap, 2, true, TS_NAME,
+ PluginManager pluginManager = new PluginManager(proto);
+ PluginManager.PluginDescriptors pluginDescriptors = pluginManager.parseAllPlugins(false, defaultPayload);
+ verifyDescAndMap(pluginDescriptors.getTaskSchedulerDescriptors(),
+ pluginManager.getTaskSchedulers(), 2, true, TS_NAME,
TezConstants.getTezYarnServicePluginName());
- verifyDescAndMap(clDescriptors, clMap, 1, true, CL_NAME);
- verifyDescAndMap(tcDescriptors, tcMap, 1, true, TC_NAME);
- assertEquals(TS_NAME + CLASS_SUFFIX, tsDescriptors.get(0).getClassName());
- assertEquals(CL_NAME + CLASS_SUFFIX, clDescriptors.get(0).getClassName());
- assertEquals(TC_NAME + CLASS_SUFFIX, tcDescriptors.get(0).getClassName());
+ verifyDescAndMap(pluginDescriptors.getContainerLauncherDescriptors(),
+ pluginManager.getContainerLaunchers(), 1, true, CL_NAME);
+ verifyDescAndMap(pluginDescriptors.getTaskCommunicatorDescriptors(),
+ pluginManager.getTaskCommunicators(), 1, true, TC_NAME);
+ assertEquals(TS_NAME + CLASS_SUFFIX, pluginDescriptors.getTaskSchedulerDescriptors().get(0).getClassName());
+ assertEquals(CL_NAME + CLASS_SUFFIX, pluginDescriptors.getContainerLauncherDescriptors().get(0).getClassName());
+ assertEquals(TC_NAME + CLASS_SUFFIX, pluginDescriptors.getTaskCommunicatorDescriptors().get(0).getClassName());
}
@Test(timeout = 5000)
@@ -312,35 +294,24 @@ public void testParseAllPluginsCustomAndYarnSpecified() throws IOException {
AMPluginDescriptorProto proto = createAmPluginDescriptor(true, false, true, payloadProto);
- List tsDescriptors;
- BiMap tsMap;
- List clDescriptors;
- BiMap clMap;
- List tcDescriptors;
- BiMap tcMap;
-
-
// Only plugin, Yarn.
- tsDescriptors = Lists.newLinkedList();
- tsMap = HashBiMap.create();
- clDescriptors = Lists.newLinkedList();
- clMap = HashBiMap.create();
- tcDescriptors = Lists.newLinkedList();
- tcMap = HashBiMap.create();
- DAGAppMaster.parseAllPlugins(tsDescriptors, tsMap, clDescriptors, clMap, tcDescriptors, tcMap,
- proto, false, defaultPayload);
- verifyDescAndMap(tsDescriptors, tsMap, 2, true, TezConstants.getTezYarnServicePluginName(),
- TS_NAME);
- verifyDescAndMap(clDescriptors, clMap, 2, true, TezConstants.getTezYarnServicePluginName(),
- CL_NAME);
- verifyDescAndMap(tcDescriptors, tcMap, 2, true, TezConstants.getTezYarnServicePluginName(),
- TC_NAME);
- assertNull(tsDescriptors.get(0).getClassName());
- assertNull(clDescriptors.get(0).getClassName());
- assertNull(tcDescriptors.get(0).getClassName());
- assertEquals(TS_NAME + CLASS_SUFFIX, tsDescriptors.get(1).getClassName());
- assertEquals(CL_NAME + CLASS_SUFFIX, clDescriptors.get(1).getClassName());
- assertEquals(TC_NAME + CLASS_SUFFIX, tcDescriptors.get(1).getClassName());
+ PluginManager pluginManager = new PluginManager(proto);
+ PluginManager.PluginDescriptors pluginDescriptors = pluginManager.parseAllPlugins(false, defaultPayload);
+
+ verifyDescAndMap(pluginDescriptors.getTaskSchedulerDescriptors(), pluginManager.getTaskSchedulers(),
+ 2, true, TezConstants.getTezYarnServicePluginName(), TS_NAME);
+ verifyDescAndMap(pluginDescriptors.getContainerLauncherDescriptors(), pluginManager.getContainerLaunchers(),
+ 2, true, TezConstants.getTezYarnServicePluginName(), CL_NAME);
+ verifyDescAndMap(pluginDescriptors.getTaskCommunicatorDescriptors(), pluginManager.getTaskCommunicators(),
+ 2, true, TezConstants.getTezYarnServicePluginName(), TC_NAME);
+
+ assertNull(pluginDescriptors.getTaskSchedulerDescriptors().get(0).getClassName());
+ assertNull(pluginDescriptors.getContainerLauncherDescriptors().get(0).getClassName());
+ assertNull(pluginDescriptors.getTaskCommunicatorDescriptors().get(0).getClassName());
+
+ assertEquals(TS_NAME + CLASS_SUFFIX, pluginDescriptors.getTaskSchedulerDescriptors().get(1).getClassName());
+ assertEquals(CL_NAME + CLASS_SUFFIX, pluginDescriptors.getContainerLauncherDescriptors().get(1).getClassName());
+ assertEquals(TC_NAME + CLASS_SUFFIX, pluginDescriptors.getTaskCommunicatorDescriptors().get(1).getClassName());
}
@Test(timeout = 60000)
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java
index 1f6d1e5e8f..c29a471604 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java
@@ -37,7 +37,6 @@
import org.apache.hadoop.yarn.util.Clock;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.DagTypeConverters;
-import org.apache.tez.dag.api.NamedEntityDescriptor;
import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezConstants;
@@ -52,7 +51,7 @@
import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ContainerContext;
-import org.apache.tez.dag.app.DAGAppMaster;
+import org.apache.tez.dag.app.PluginManager;
import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.DAG;
@@ -381,13 +380,9 @@ public ExecutionContextTestInfoHolder(VertexExecutionContext vertexExecutionCont
} catch (IOException e) {
throw new TezUncheckedException(e);
}
- DAGAppMaster.parsePlugin(Lists.newLinkedList(), taskSchedulers, null,
- true, false, defaultPayload);
- DAGAppMaster
- .parsePlugin(Lists.newLinkedList(), containerLaunchers, null,
- true, false, defaultPayload);
- DAGAppMaster.parsePlugin(Lists.newLinkedList(), taskComms, null,
- true, false, defaultPayload);
+ PluginManager.parsePlugin(Lists.newLinkedList(), taskSchedulers, null, true, false, defaultPayload);
+ PluginManager.parsePlugin(Lists.newLinkedList(), containerLaunchers, null, true, false, defaultPayload);
+ PluginManager.parsePlugin(Lists.newLinkedList(), taskComms, null, true, false, defaultPayload);
} else { // Add N plugins, no YARN defaults
List schedulerList = new LinkedList<>();
List launcherList = new LinkedList<>();
@@ -407,13 +402,9 @@ public ExecutionContextTestInfoHolder(VertexExecutionContext vertexExecutionCont
DAGProtos.TezEntityDescriptorProto.newBuilder()
.setClassName(append(TASK_COMM_NAME_BASE, i))).build());
}
- DAGAppMaster.parsePlugin(Lists.newLinkedList(), taskSchedulers,
- schedulerList, false, false, null);
- DAGAppMaster.parsePlugin(Lists.newLinkedList(), containerLaunchers,
- launcherList, false, false, null);
- DAGAppMaster
- .parsePlugin(Lists.newLinkedList(), taskComms, taskCommList,
- false, false, null);
+ PluginManager.parsePlugin(Lists.newLinkedList(), taskSchedulers, schedulerList, false, false, null);
+ PluginManager.parsePlugin(Lists.newLinkedList(), containerLaunchers, launcherList, false, false, null);
+ PluginManager.parsePlugin(Lists.newLinkedList(), taskComms, taskCommList, false, false, null);
}
this.appContext = createDefaultMockAppContext();
@@ -557,4 +548,5 @@ private static AppContext createDefaultMockAppContext() {
doReturn(mockDag).when(appContext).getCurrentDAG();
return appContext;
}
+
}
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
index 3795e3e6b0..3b9936d287 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
@@ -80,8 +80,8 @@
import org.apache.tez.dag.app.ClusterInfo;
import org.apache.tez.dag.app.ContainerContext;
import org.apache.tez.dag.app.ContainerHeartbeatHandler;
-import org.apache.tez.dag.app.DAGAppMaster;
import org.apache.tez.dag.app.DAGAppMasterState;
+import org.apache.tez.dag.app.PluginManager;
import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
import org.apache.tez.dag.app.dag.DAG;
@@ -865,14 +865,16 @@ public void testHandleException() throws Exception {
UserPayload defaultPayload = TezUtils.createUserPayloadFromConf(tezConf);
// Parse plugins
- List tsDescriptors = Lists.newLinkedList();
+ PluginManager pluginManager = new PluginManager();
+ PluginManager.PluginDescriptors pluginDescriptors = pluginManager.parseAllPlugins(false, defaultPayload);
+ List tsDescriptors = pluginDescriptors.getTaskSchedulerDescriptors();
BiMap tsMap = HashBiMap.create();
- DAGAppMaster.parseAllPlugins(tsDescriptors, tsMap, Lists.newLinkedList(), HashBiMap.create(), Lists.newLinkedList(),
- HashBiMap.create(), null, false, defaultPayload);
// Only TezYarn found.
Assert.assertEquals(1, tsDescriptors.size());
Assert.assertEquals(TezConstants.getTezYarnServicePluginName(), tsDescriptors.get(0).getEntityName());
+ Assert.assertEquals(1, pluginManager.getTaskSchedulers().size());
+ Assert.assertTrue(pluginManager.getTaskSchedulers().containsKey(TezConstants.getTezYarnServicePluginName()));
// Construct eventHandler
TestTaskSchedulerHelpers.CapturingEventHandler eventHandler = new TestTaskSchedulerHelpers.CapturingEventHandler();