Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2623,7 +2623,7 @@ public String getWebUIAddress() {
}

@VisibleForTesting
static void parseAllPlugins(
public static void parseAllPlugins(
List<NamedEntityDescriptor> taskSchedulerDescriptors, BiMap<String, Integer> taskSchedulerPluginMap,
List<NamedEntityDescriptor> containerLauncherDescriptors, BiMap<String, Integer> containerLauncherPluginMap,
List<NamedEntityDescriptor> taskCommDescriptors, BiMap<String, Integer> taskCommPluginMap,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -910,7 +910,8 @@ public void reportError(int taskSchedulerIndex, ServicePluginError servicePlugin
LOG.info("Error reported by scheduler {} - {}",
Utils.getTaskSchedulerIdentifierString(taskSchedulerIndex, appContext) + ": " +
diagnostics);
if (taskSchedulerDescriptors[taskSchedulerIndex].getClassName().equals(yarnSchedulerClassName)) {
if (taskSchedulerDescriptors[taskSchedulerIndex].getEntityName()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is the actual fix

just a note, what are the values in your case:

taskSchedulerDescriptors[taskSchedulerIndex].getClassName()
yarnSchedulerClassName
taskSchedulerDescriptors[taskSchedulerIndex].getEntityName()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is the only actual fix.

  • Before this PR
method return value
taskSchedulerDescriptors[taskSchedulerIndex].getClassName() null
yarnSchedulerClassName "org.apache.tez.dag.app.rm.YarnTaskSchedulerService"

taskSchedulerDescriptors[taskSchedulerIndex].getClassName() is set from the variable 'taskSchedulerDescriptors' of DAGAppMaster::serviceInit. In DAGAppMaster::parsePlugin, when we construct NamedEntityDescriptor for tez yarn plugin, the className is all null.

yarnSchedulerClassName is set from tez.am.yarn.scheduler.class, default value is "org.apache.tez.dag.app.rm.YarnTaskSchedulerService".

So for tez yarn plugin, taskSchedulerDescriptors[taskSchedulerIndex].getClassName() will never equals to yarnSchedulerClassName. Then

  • After this PR
taskSchedulerDescriptors[taskSchedulerIndex].getEntityName() will return "TezYarn"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, nice catch:

TezConstants.getTezYarnServicePluginName(), null).setUserPayload(defaultPayload);

we simply don't fill the classname, so we should not rely on it, only use it in case of createCustomTaskScheduler

.equals(TezConstants.getTezYarnServicePluginName())) {
LOG.warn(
"Reporting a SchedulerServiceError to the DAGAppMaster since the error" +
" was reported by the YARN task scheduler");
Expand Down Expand Up @@ -1078,4 +1079,9 @@ public String getTaskSchedulerClassName(int taskSchedulerIndex) {
return taskSchedulers[taskSchedulerIndex].getTaskScheduler().getClass().getName();
}

@VisibleForTesting
public TaskScheduler getTaskScheduler(int taskSchedulerIndex) {
return taskSchedulers[taskSchedulerIndex].getTaskScheduler();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,19 @@ public TaskSchedulerManagerForTest(AppContext appContext,
this.defaultPayload = defaultPayload;
}

TaskSchedulerManagerForTest(AppContext appContext,
EventHandler eventHandler,
TezAMRMClientAsync<CookieContainerRequest> amrmClientAsync,
ContainerSignatureMatcher containerSignatureMatcher,
UserPayload defaultPayload,
List<NamedEntityDescriptor> descriptors) {
super(appContext, null, eventHandler, containerSignatureMatcher, null, descriptors,
false, new HadoopShimsLoader(appContext.getAMConf()).getHadoopShim());
this.amrmClientAsync = amrmClientAsync;
this.containerSignatureMatcher = containerSignatureMatcher;
this.defaultPayload = defaultPayload;
}

@SuppressWarnings("unchecked")
@Override
public void instantiateSchedulers(String host, int port, String trackingUrl,
Expand Down Expand Up @@ -224,6 +237,10 @@ public Event verifyInvocation(Class<? extends Event> eventClass) {
fail("Expected Event: " + eventClass.getName() + " not sent");
return null;
}

public int getEventSize() {
return this.events.size();
}
}

static class TaskSchedulerWithDrainableContext extends YarnTaskSchedulerService {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,17 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
Expand All @@ -59,6 +65,8 @@
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.common.ContainerSignatureMatcher;
Expand All @@ -72,10 +80,16 @@
import org.apache.tez.dag.api.client.DAGClientServer;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ClusterInfo;
import org.apache.tez.dag.app.ContainerContext;
import org.apache.tez.dag.app.ContainerHeartbeatHandler;
import org.apache.tez.dag.app.DAGAppMaster;
import org.apache.tez.dag.app.DAGAppMasterState;
import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventSchedulingServiceError;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError;
import org.apache.tez.dag.app.dag.event.DAGEventTerminateDag;
Expand All @@ -89,6 +103,8 @@
import org.apache.tez.dag.app.rm.container.AMContainerEventType;
import org.apache.tez.dag.app.rm.container.AMContainerMap;
import org.apache.tez.dag.app.rm.container.AMContainerState;
import org.apache.tez.dag.app.rm.container.ContainerContextMatcher;
import org.apache.tez.dag.app.rm.node.AMNodeTracker;
import org.apache.tez.dag.app.web.WebUIService;
import org.apache.tez.dag.helpers.DagInfoImplForTest;
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
Expand Down Expand Up @@ -839,6 +855,60 @@ protected void instantiateSchedulers(String host, int port, String trackingUrl,
}
}

@Test(timeout = 10000)
public void testHandleException() throws Exception {
Configuration tezConf = new Configuration(new YarnConfiguration());
UserPayload defaultPayload = TezUtils.createUserPayloadFromConf(tezConf);

// Parse plugins
List<NamedEntityDescriptor> tsDescriptors = Lists.newLinkedList();
BiMap<String, Integer> tsMap = HashBiMap.create();
DAGAppMaster.parseAllPlugins(tsDescriptors, tsMap, Lists.newLinkedList(), HashBiMap.create(), Lists.newLinkedList(),
HashBiMap.create(), null, false, defaultPayload);

// Only TezYarn found.
Assert.assertEquals(1, tsDescriptors.size());
Assert.assertEquals(TezConstants.getTezYarnServicePluginName(), tsDescriptors.get(0).getEntityName());

// Construct eventHandler
TestTaskSchedulerHelpers.CapturingEventHandler eventHandler = new TestTaskSchedulerHelpers.CapturingEventHandler();
TezDAGID dagID = TezDAGID.getInstance("0", 0, 0);

// Construct AMRMClient
AMRMClient<YarnTaskSchedulerService.CookieContainerRequest> rmClientCore =
new TestTaskSchedulerHelpers.AMRMClientForTest();
TezAMRMClientAsync<YarnTaskSchedulerService.CookieContainerRequest> rmClient =
spy(new TestTaskSchedulerHelpers.AMRMClientAsyncForTest(rmClientCore, 100));

// Construct appContext
AppContext appContext = mock(AppContext.class);
doReturn(new Configuration(false)).when(appContext).getAMConf();
AMContainerMap amContainerMap = new AMContainerMap(mock(ContainerHeartbeatHandler.class),
mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), appContext);
AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext);
doReturn(amContainerMap).when(appContext).getAllContainers();
doReturn(amNodeTracker).when(appContext).getNodeTracker();
doReturn(DAGAppMasterState.RUNNING).when(appContext).getAMState();
doReturn(dagID).when(appContext).getCurrentDAGID();
doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo();

// Construct TaskSchedulerManager
TaskSchedulerManager taskSchedulerManagerReal =
new TestTaskSchedulerHelpers.TaskSchedulerManagerForTest(appContext, eventHandler, rmClient,
new TestTaskSchedulerHelpers.AlwaysMatchesContainerMatcher(), defaultPayload, tsDescriptors);
TaskSchedulerManager taskSchedulerManager = spy(taskSchedulerManagerReal);
taskSchedulerManager.init(tezConf);
taskSchedulerManager.start();

// Send error to schedule, then expect DAGAppMasterEventSchedulingServiceError event.
YarnTaskSchedulerService scheduler = ((YarnTaskSchedulerService) taskSchedulerManager.getTaskScheduler(0));
scheduler.onError(new Exception("Trigger by unit test"));
waitFor(() -> {
return eventHandler.getEventSize() > 0;
}, 1000, 5000);
eventHandler.verifyInvocation(DAGAppMasterEventSchedulingServiceError.class);
}

private static class ExceptionAnswer implements Answer {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
Expand Down Expand Up @@ -1107,4 +1177,23 @@ public boolean hasUnregistered() throws ServicePluginException {
public void dagComplete() throws ServicePluginException {
}
}

public static void waitFor(Supplier<Boolean> check, int checkEveryMillis,
int waitForMillis) throws TimeoutException, InterruptedException {
Preconditions.checkNotNull(check, "Input supplier interface should be initailized");
Preconditions.checkArgument(waitForMillis >= checkEveryMillis,
"Total wait time should be greater than check interval time");

long st = Time.monotonicNow();
boolean result = check.get();

while (!result && (Time.monotonicNow() - st < waitForMillis)) {
Thread.sleep(checkEveryMillis);
result = check.get();
}

if (!result) {
throw new TimeoutException("Timed out waiting for condition.");
}
}
}