diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java index e188231e14..7edcc136fe 100644 --- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java +++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java @@ -84,6 +84,19 @@ void taskAllocated(Object task, Object appCookie, Container container); + /** + * Indicate to the framework that a container is being assigned to a task. + * + * @param task the task for which a container is being assigned. This should be the same + * instance that was provided when requesting for an allocation + * @param appCookie the cookie which was provided while requesting allocation for this task + * @param container the actual container assigned to the task + * @param taskSchedulingInfo task specific scheduling information to be passed to communicator + */ + void taskAllocated(Object task, + Object appCookie, + Container container, Object taskSchedulingInfo); + /** * Indicate to the framework that a container has completed. This is typically used by sources diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java index 55b2d1b021..2d50f66641 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -538,7 +539,8 @@ public void unregisterRunningContainer(ContainerId containerId, int taskCommId, @Override public void registerTaskAttempt(AMContainerTask amContainerTask, - ContainerId containerId, int taskCommId) { + ContainerId containerId, int taskCommId, + Object taskSchedulerInfo) { ContainerInfo containerInfo = registeredContainers.get(containerId); if (containerInfo == null) { throw new TezUncheckedException("Registering task attempt: " @@ -564,7 +566,7 @@ public void registerTaskAttempt(AMContainerTask amContainerTask, try { taskCommunicators[taskCommId].registerRunningTaskAttempt(containerId, amContainerTask.getTask(), amContainerTask.getAdditionalResources(), amContainerTask.getCredentials(), - amContainerTask.haveCredentialsChanged(), amContainerTask.getPriority()); + amContainerTask.haveCredentialsChanged(), amContainerTask.getPriority(), taskSchedulerInfo); } catch (Exception e) { String msg = "Error in TaskCommunicator when registering Task Attempt" + ", communicator=" + Utils.getTaskCommIdentifierString(taskCommId, context) diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java index 254e74c734..027f7fc72f 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java @@ -27,6 +27,9 @@ import org.apache.tez.dag.app.dag.DAG; import org.apache.tez.dag.app.rm.container.AMContainerTask; import org.apache.tez.dag.records.TezTaskAttemptID; + +import java.util.Map; + /** * This class listens for changes to the state of a Task. */ @@ -34,7 +37,7 @@ public interface TaskCommunicatorManagerInterface { void registerRunningContainer(ContainerId containerId, int taskCommId); - void registerTaskAttempt(AMContainerTask amContainerTask, ContainerId containerId, int taskCommId); + void registerTaskAttempt(AMContainerTask amContainerTask, ContainerId containerId, int taskCommId, Object taskSchedulerInfo); void unregisterRunningContainer(ContainerId containerId, int taskCommId, ContainerEndReason endReason, String diagnostics); diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorWrapper.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorWrapper.java index 4afb427623..f7c67499c3 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorWrapper.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorWrapper.java @@ -52,8 +52,9 @@ public void registerContainerEnd(ContainerId containerId, ContainerEndReason end public void registerRunningTaskAttempt(ContainerId containerId, TaskSpec taskSpec, Map additionalResources, Credentials credentials, boolean credentialsChanged, - int priority) throws Exception { - real.registerRunningTaskAttempt(containerId, taskSpec, additionalResources, credentials, credentialsChanged, priority); + int priority, Object taskSchedulerInfo) throws Exception { + real.registerRunningTaskAttempt(containerId, taskSpec, additionalResources, credentials, credentialsChanged, + priority, taskSchedulerInfo); } public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID, diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java index a31b4f1e2d..3dd856182f 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java @@ -16,6 +16,8 @@ import javax.annotation.Nullable; import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -66,7 +68,12 @@ public TaskSchedulerContextImpl(TaskSchedulerManager taskSchedulerManager, AppCo // taskAllocated() upcall and deallocateTask() downcall @Override public void taskAllocated(Object task, Object appCookie, Container container) { - taskSchedulerManager.taskAllocated(schedulerId, task, appCookie, container); + taskAllocated(task, appCookie, container, null); + } + + @Override + public void taskAllocated(Object task, Object appCookie, Container container, Object taskSchedulingInfo) { + taskSchedulerManager.taskAllocated(schedulerId, task, appCookie, container, taskSchedulingInfo); } @Override diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java index 5e45e70993..49de58c020 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java @@ -70,6 +70,13 @@ public void taskAllocated(Object task, Object appCookie, Container container) { container)); } + @Override + public void taskAllocated(Object task, Object appCookie, Container container, + Object taskSchedulingInfo) { + executorService.submit(new TaskAllocatedCallableWithCustomInfo(real, task, appCookie, + container, taskSchedulingInfo)); + } + @Override public void containerCompleted(Object taskLastAllocated, ContainerStatus containerStatus) { @@ -226,6 +233,29 @@ public Void call() throws Exception { } } + static class TaskAllocatedCallableWithCustomInfo extends TaskSchedulerContextCallbackBase + implements Callable { + private final Object task; + private final Object appCookie; + private final Container container; + private final Object taskSchedulingInfo; + + public TaskAllocatedCallableWithCustomInfo(TaskSchedulerContext app, Object task, + Object appCookie, Container container, Object taskSchedulingInfo) { + super(app); + this.task = task; + this.appCookie = appCookie; + this.container = container; + this.taskSchedulingInfo = taskSchedulingInfo; + } + + @Override + public Void call() throws Exception { + app.taskAllocated(task, appCookie, container, taskSchedulingInfo); + return null; + } + } + static class ContainerCompletedCallable extends TaskSchedulerContextCallbackBase implements Callable { diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java index 57eba015d8..a1c6dd5017 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java @@ -735,7 +735,7 @@ public void serviceStop() throws InterruptedException { // TaskSchedulerAppCallback methods with schedulerId, where relevant public synchronized void taskAllocated(int schedulerId, Object task, Object appCookie, - Container container) { + Container container, Object taskSchedulingInfo) { AMSchedulerEventTALaunchRequest event = (AMSchedulerEventTALaunchRequest) appCookie; ContainerId containerId = container.getId(); @@ -765,7 +765,7 @@ public synchronized void taskAllocated(int schedulerId, Object task, } sendEvent(new AMContainerEventAssignTA(containerId, taskAttempt.getID(), event.getRemoteTaskSpec(), event.getContainerContext().getLocalResources(), event - .getContainerContext().getCredentials(), event.getPriority())); + .getContainerContext().getCredentials(), event.getPriority(), taskSchedulingInfo)); } public synchronized void containerCompleted(int schedulerId, Object task, ContainerStatus containerStatus) { diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java index 682cd02d07..d50a1e07b0 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java @@ -33,16 +33,18 @@ public class AMContainerEventAssignTA extends AMContainerEvent { private final Map taskLocalResources; private final Credentials credentials; private final int priority; + private final Object taskSchedulingInfo; public AMContainerEventAssignTA(ContainerId containerId, TezTaskAttemptID attemptId, Object remoteTaskSpec, Map taskLocalResources, Credentials credentials, - int priority) { + int priority, Object taskSchedulingInfo) { super(containerId, AMContainerEventType.C_ASSIGN_TA); this.attemptId = attemptId; this.remoteTaskSpec = (TaskSpec) remoteTaskSpec; this.taskLocalResources = taskLocalResources; this.credentials = credentials; this.priority = priority; + this.taskSchedulingInfo = taskSchedulingInfo; } public TaskSpec getRemoteTaskSpec() { @@ -64,4 +66,8 @@ public Credentials getCredentials() { public int getPriority() { return priority; } + + public Object getTaskSchedulingInfo() { + return taskSchedulingInfo; + } } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java index 6b67eb9da1..bb310b7405 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java @@ -653,7 +653,7 @@ public AMContainerState transition( event.getRemoteTaskSpec(), container.additionalLocalResources, container.credentialsChanged ? container.credentials : null, container.credentialsChanged, event.getPriority()); - container.registerAttemptWithListener(amContainerTask); + container.registerAttemptWithListener(amContainerTask, event.getTaskSchedulingInfo()); container.additionalLocalResources = null; container.credentialsChanged = false; if (container.getState() == AMContainerState.IDLE) { @@ -1180,8 +1180,8 @@ protected void unregisterAttemptFromListener(TezTaskAttemptID attemptId, TaskAtt taskCommunicatorManagerInterface.unregisterTaskAttempt(attemptId, taskCommId, endReason, diagnostics); } - protected void registerAttemptWithListener(AMContainerTask amContainerTask) { - taskCommunicatorManagerInterface.registerTaskAttempt(amContainerTask, this.containerId, taskCommId); + protected void registerAttemptWithListener(AMContainerTask amContainerTask, Object taskSchedulingInfo) { + taskCommunicatorManagerInterface.registerTaskAttempt(amContainerTask, this.containerId, taskCommId, taskSchedulingInfo); } protected void registerWithTAListener() { diff --git a/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicator.java b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicator.java index fceddf2522..29093ba534 100644 --- a/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicator.java +++ b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicator.java @@ -147,6 +147,33 @@ public abstract void registerRunningTaskAttempt(ContainerId containerId, TaskSpe boolean credentialsChanged, int priority) throws ServicePluginException; + /** + * Register a task attempt to execute on a container. Communicator implementation must override + * this method rather than {@link #registerRunningTaskAttempt(ContainerId, TaskSpec, Map, Credentials, boolean, int)} + * if scheduler attaches custom info to tasks. + * + * @param containerId the containerId on which this task needs to run + * @param taskSpec the task specifications for the task to be executed + * @param additionalResources additional local resources which may be required to run this task + * on + * the container + * @param credentials the credentials required to run this task + * @param credentialsChanged whether the credentials are different from the original credentials + * associated with this container + * @param priority the priority of the task being executed + * @param taskSchedulerInfo task specific info created by scheduler + * @throws ServicePluginException when the service runs into a fatal error which it cannot handle. + * This will cause the app to shutdown. + */ + public void registerRunningTaskAttempt(ContainerId containerId, TaskSpec taskSpec, + Map additionalResources, + Credentials credentials, + boolean credentialsChanged, int priority, + Object taskSchedulerInfo) throws ServicePluginException { + registerRunningTaskAttempt(containerId, taskSpec, additionalResources, credentials, credentialsChanged, + priority); + } + /** * Register the completion of a task. This may be a result of preemption, the container dying, * the node dying, the task completing to success diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java index 0f8afaafe6..ecbb2599cd 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java @@ -185,7 +185,7 @@ public void testGetTask() throws IOException { assertNull(containerTask); // Valid task registered - taskAttemptListener.registerTaskAttempt(amContainerTask, containerId2, 0); + taskAttemptListener.registerTaskAttempt(amContainerTask, containerId2, 0, null); containerTask = tezUmbilical.getTask(containerContext2); assertFalse(containerTask.shouldDie()); assertEquals(taskSpec, containerTask.getTaskSpec()); @@ -209,7 +209,7 @@ public void testGetTask() throws IOException { TezTaskAttemptID taskAttemptId2 = mock(TezTaskAttemptID.class); doReturn(taskAttemptId2).when(taskSpec2).getTaskAttemptID(); AMContainerTask amContainerTask2 = new AMContainerTask(taskSpec, null, null, false, 0); - taskAttemptListener.registerTaskAttempt(amContainerTask2, containerId3, 0); + taskAttemptListener.registerTaskAttempt(amContainerTask2, containerId3, 0, null); taskAttemptListener.unregisterRunningContainer(containerId3, 0, ContainerEndReason.OTHER, null); containerTask = tezUmbilical.getTask(containerContext3); assertTrue(containerTask.shouldDie()); @@ -229,7 +229,7 @@ public void testGetTaskMultiplePulls() throws IOException { assertNull(containerTask); // Register task - taskAttemptListener.registerTaskAttempt(amContainerTask, containerId1, 0); + taskAttemptListener.registerTaskAttempt(amContainerTask, containerId1, 0, null); containerTask = tezUmbilical.getTask(containerContext1); assertFalse(containerTask.shouldDie()); assertEquals(taskSpec, containerTask.getTaskSpec()); @@ -423,7 +423,7 @@ private TaskHeartbeatResponse generateHeartbeat(List events, doReturn(eventInfo).when(vertex).getTaskAttemptTezEvents(taskAttemptID, fromEventId, 0, maxEvents); taskAttemptListener.registerRunningContainer(containerId, 0); - taskAttemptListener.registerTaskAttempt(amContainerTask, containerId, 0); + taskAttemptListener.registerTaskAttempt(amContainerTask, containerId, 0, null); TaskHeartbeatRequest request = mock(TaskHeartbeatRequest.class); doReturn(containerId.toString()).when(request).getContainerIdentifier(); diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager2.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager2.java index bb7e94b5c7..1f0656a9a6 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager2.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager2.java @@ -292,7 +292,7 @@ private void registerRunningContainer(ContainerId containerId) { } private void registerTaskAttempt(ContainerId containerId, AMContainerTask amContainerTask) { - taskCommunicatorManager.registerTaskAttempt(amContainerTask, containerId, 0); + taskCommunicatorManager.registerTaskAttempt(amContainerTask, containerId, 0, null); } private TaskSpec createTaskSpec() { diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java index 4e29dd5917..bd7ca5223b 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java @@ -204,9 +204,9 @@ public void testDelayedReuseContainerBecomesAvailable() TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); verify(taskSchedulerManager).taskAllocated( - eq(0), eq(ta11), any(Object.class), eq(containerHost1)); + eq(0), eq(ta11), any(Object.class), eq(containerHost1), eq(null)); verify(taskSchedulerManager).taskAllocated( - eq(0), eq(ta21), any(Object.class), eq(containerHost2)); + eq(0), eq(ta21), any(Object.class), eq(containerHost2), eq(null)); // Adding the event later so that task1 assigned to containerHost1 // is deterministic. @@ -218,7 +218,7 @@ public void testDelayedReuseContainerBecomesAvailable() drainableAppCallback.drain(); verifyDeAllocateTask(taskScheduler, ta11, true, null, null); verify(taskSchedulerManager, times(1)).taskAllocated( - eq(0), eq(ta31), any(Object.class), eq(containerHost1)); + eq(0), eq(ta31), any(Object.class), eq(containerHost1), eq(null)); verify(rmClient, times(0)).releaseAssignedContainer( eq(containerHost1.getId())); eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); @@ -326,9 +326,9 @@ public void testDelayedReuseContainerNotAvailable() taskScheduler.onContainersAllocated(Lists.newArrayList(containerHost1, containerHost2)); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); - verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta11), any(Object.class), eq(containerHost1)); + verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta11), any(Object.class), eq(containerHost1), eq(null)); verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta21), any(Object.class), - eq(containerHost2)); + eq(containerHost2), eq(null)); // Adding the event later so that task1 assigned to containerHost1 is deterministic. taskSchedulerManager.handleEvent(lrTa31); @@ -339,7 +339,7 @@ public void testDelayedReuseContainerNotAvailable() drainableAppCallback.drain(); verifyDeAllocateTask(taskScheduler, ta21, true, null, null); verify(taskSchedulerManager, times(0)).taskAllocated( - eq(0), eq(ta31), any(Object.class), eq(containerHost2)); + eq(0), eq(ta31), any(Object.class), eq(containerHost2), eq(null)); verify(rmClient, times(1)).releaseAssignedContainer( eq(containerHost2.getId())); eventHandler.verifyInvocation(AMContainerEventStopRequest.class); @@ -428,7 +428,7 @@ public void testSimpleReuse() throws IOException, InterruptedException, Executio TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta11), any(Object.class), - eq(container1)); + eq(container1), eq(null)); // Task assigned to container completed successfully. Container should be re-used. taskSchedulerManager.handleEvent( @@ -436,7 +436,7 @@ public void testSimpleReuse() throws IOException, InterruptedException, Executio null, 0)); drainableAppCallback.drain(); verifyDeAllocateTask(taskScheduler, ta11, true, null, null); - verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta12), any(Object.class), eq(container1)); + verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta12), any(Object.class), eq(container1), eq(null)); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); eventHandler.reset(); @@ -449,7 +449,7 @@ public void testSimpleReuse() throws IOException, InterruptedException, Executio drainableAppCallback.drain(); verifyDeAllocateTask(taskScheduler, ta12, true, null, null); verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta13), any(Object.class), - eq(container1)); + eq(container1), eq(null)); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); eventHandler.reset(); @@ -460,7 +460,7 @@ public void testSimpleReuse() throws IOException, InterruptedException, Executio "TIMEOUT", 0)); drainableAppCallback.drain(); verify(taskSchedulerManager, times(0)).taskAllocated(eq(0), eq(ta14), any(Object.class), - eq(container1)); + eq(container1), eq(null)); verifyDeAllocateTask(taskScheduler, ta13, false, null, "TIMEOUT"); verify(rmClient).releaseAssignedContainer(eq(container1.getId())); eventHandler.verifyInvocation(AMContainerEventStopRequest.class); @@ -473,7 +473,7 @@ public void testSimpleReuse() throws IOException, InterruptedException, Executio TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta14), any(Object.class), - eq(container2)); + eq(container2), eq(null)); // Task assigned to container completed successfully. No pending requests. Container should be released. taskSchedulerManager.handleEvent( @@ -573,7 +573,7 @@ public void testReuseWithTaskSpecificLaunchCmdOption() throws IOException, Inter TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta11), any(Object.class), - eq(container1)); + eq(container1), eq(null)); // First task had profiling on. This container can not be reused further. taskSchedulerManager.handleEvent( @@ -582,7 +582,7 @@ public void testReuseWithTaskSpecificLaunchCmdOption() throws IOException, Inter drainableAppCallback.drain(); verifyDeAllocateTask(taskScheduler, ta11, true, null, null); verify(taskSchedulerManager, times(0)).taskAllocated(eq(0), eq(ta12), any(Object.class), - eq(container1)); + eq(container1), eq(null)); verify(rmClient, times(1)).releaseAssignedContainer(eq(container1.getId())); eventHandler.verifyInvocation(AMContainerEventStopRequest.class); eventHandler.reset(); @@ -615,7 +615,7 @@ public void testReuseWithTaskSpecificLaunchCmdOption() throws IOException, Inter taskScheduler.onContainersAllocated(Collections.singletonList(container2)); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); - verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta13), any(Object.class), eq(container2)); + verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta13), any(Object.class), eq(container2), eq(null)); // Verify that the container can not be reused when profiling option is turned on // Even for 2 tasks having same profiling option can have container reusability. @@ -625,7 +625,7 @@ public void testReuseWithTaskSpecificLaunchCmdOption() throws IOException, Inter drainableAppCallback.drain(); verifyDeAllocateTask(taskScheduler, ta13, true, null, null); verify(taskSchedulerManager, times(0)).taskAllocated(eq(0), eq(ta14), any(Object.class), - eq(container2)); + eq(container2), eq(null)); verify(rmClient, times(1)).releaseAssignedContainer(eq(container2.getId())); eventHandler.verifyInvocation(AMContainerEventStopRequest.class); eventHandler.reset(); @@ -663,7 +663,7 @@ public void testReuseWithTaskSpecificLaunchCmdOption() throws IOException, Inter TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta15), any(Object.class), - eq(container3)); + eq(container3), eq(null)); //Ensure task 6 (of vertex 1) is allocated to same container taskSchedulerManager.handleEvent( @@ -671,7 +671,7 @@ public void testReuseWithTaskSpecificLaunchCmdOption() throws IOException, Inter null, 0)); drainableAppCallback.drain(); verifyDeAllocateTask(taskScheduler, ta15, true, null, null); - verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta16), any(Object.class), eq(container3)); + verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta16), any(Object.class), eq(container3), eq(null)); eventHandler.reset(); taskScheduler.shutdown(); @@ -760,7 +760,7 @@ public void testReuseNonLocalRequest() TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); verify(taskSchedulerManager).taskAllocated( - eq(0), eq(ta11), any(Object.class), eq(container1)); + eq(0), eq(ta11), any(Object.class), eq(container1), eq(null)); // Send launch request for task2 (vertex2) taskSchedulerManager.handleEvent(lrEvent12); @@ -774,7 +774,7 @@ public void testReuseNonLocalRequest() drainableAppCallback.drain(); verifyDeAllocateTask(taskScheduler, ta11, true, null, null); verify(taskSchedulerManager, times(0)).taskAllocated( - eq(0), eq(ta12), any(Object.class), eq(container1)); + eq(0), eq(ta12), any(Object.class), eq(container1), eq(null)); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); eventHandler.reset(); @@ -782,7 +782,7 @@ public void testReuseNonLocalRequest() TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); verify(taskSchedulerManager).taskAllocated( - eq(0), eq(ta12), any(Object.class), eq(container1)); + eq(0), eq(ta12), any(Object.class), eq(container1), eq(null)); // TA12 completed. taskSchedulerManager.handleEvent( @@ -888,7 +888,7 @@ public void testReuseAcrossVertices() TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); verify(taskSchedulerManager).taskAllocated( - eq(0), eq(ta11), any(Object.class), eq(container1)); + eq(0), eq(ta11), any(Object.class), eq(container1), eq(null)); // Send launch request for task2 (vertex2) taskSchedulerManager.handleEvent(lrEvent21); @@ -901,7 +901,7 @@ public void testReuseAcrossVertices() drainableAppCallback.drain(); verifyDeAllocateTask(taskScheduler, ta11, true, null, null); verify(taskSchedulerManager).taskAllocated( - eq(0), eq(ta21), any(Object.class), eq(container1)); + eq(0), eq(ta21), any(Object.class), eq(container1), eq(null)); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); // Task 2 completes. @@ -1000,7 +1000,7 @@ public void testReuseLocalResourcesChanged() throws IOException, InterruptedExce taskScheduler.onContainersAllocated(Collections.singletonList(container1)); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); - verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta111), any(Object.class), eq(container1)); + verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta111), any(Object.class), eq(container1), eq(null)); assignEvent = (AMContainerEventAssignTA) eventHandler.verifyInvocation(AMContainerEventAssignTA.class); assertEquals(1, assignEvent.getRemoteTaskLocalResources().size()); @@ -1010,7 +1010,7 @@ public void testReuseLocalResourcesChanged() throws IOException, InterruptedExce null, 0)); drainableAppCallback.drain(); verifyDeAllocateTask(taskScheduler, ta111, true, null, null); - verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta112), any(Object.class), eq(container1)); + verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta112), any(Object.class), eq(container1), eq(null)); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); assignEvent = (AMContainerEventAssignTA) eventHandler.verifyInvocation(AMContainerEventAssignTA.class); @@ -1054,7 +1054,7 @@ public void testReuseLocalResourcesChanged() throws IOException, InterruptedExce TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta211), any(Object.class), - eq(container1)); + eq(container1), eq(null)); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); assignEvent = (AMContainerEventAssignTA) eventHandler.verifyInvocation(AMContainerEventAssignTA.class); @@ -1066,7 +1066,7 @@ public void testReuseLocalResourcesChanged() throws IOException, InterruptedExce null, 0)); drainableAppCallback.drain(); verifyDeAllocateTask(taskScheduler, ta211, true, null, null); - verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta212), any(Object.class), eq(container1)); + verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta212), any(Object.class), eq(container1), eq(null)); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); assignEvent = (AMContainerEventAssignTA) eventHandler.verifyInvocation(AMContainerEventAssignTA.class); @@ -1187,7 +1187,7 @@ public void testReuseConflictLocalResources() throws IOException, InterruptedExc taskScheduler.onContainersAllocated(Collections.singletonList(container1)); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); - verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta111), any(Object.class), eq(container1)); + verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta111), any(Object.class), eq(container1), eq(null)); assignEvent = (AMContainerEventAssignTA) eventHandler.verifyInvocation(AMContainerEventAssignTA.class); assertEquals(1, assignEvent.getRemoteTaskLocalResources().size()); @@ -1197,7 +1197,7 @@ public void testReuseConflictLocalResources() throws IOException, InterruptedExc null, 0)); drainableAppCallback.drain(); verifyDeAllocateTask(taskScheduler, ta111, true, null, null); - verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta112), any(Object.class), eq(container1)); + verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta112), any(Object.class), eq(container1), eq(null)); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); assignEvent = (AMContainerEventAssignTA) eventHandler.verifyInvocation(AMContainerEventAssignTA.class); @@ -1218,7 +1218,7 @@ public void testReuseConflictLocalResources() throws IOException, InterruptedExc taskSchedulerManager.handleEvent(lrEvent13); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); - verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta113), any(Object.class), eq(container1)); + verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta113), any(Object.class), eq(container1), eq(null)); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); eventHandler.reset(); @@ -1235,7 +1235,7 @@ public void testReuseConflictLocalResources() throws IOException, InterruptedExc taskSchedulerManager.handleEvent(lrEvent14); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); - verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta114), any(Object.class), eq(container1)); + verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta114), any(Object.class), eq(container1), eq(null)); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); eventHandler.reset(); @@ -1277,7 +1277,7 @@ public void testReuseConflictLocalResources() throws IOException, InterruptedExc taskScheduler.onContainersAllocated(Collections.singletonList(container2)); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); - verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta211), any(Object.class), eq(container2)); + verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta211), any(Object.class), eq(container2), eq(null)); eventHandler.reset(); taskSchedulerManager.handleEvent(new AMSchedulerEventTAEnded(ta211, container2.getId(), @@ -1306,7 +1306,7 @@ public void testReuseConflictLocalResources() throws IOException, InterruptedExc taskSchedulerManager.handleEvent(lrEvent31); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); - verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta311), any(Object.class), eq(container2)); + verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta311), any(Object.class), eq(container2), eq(null)); eventHandler.reset(); taskScheduler.shutdown(); @@ -1377,7 +1377,7 @@ public void testAssignmentOnShutdown() taskScheduler.onContainersAllocated(Collections.singletonList(container1)); drainableAppCallback.drain(); verify(taskSchedulerManager, times(0)).taskAllocated(eq(0), eq(ta11), - any(Object.class), eq(container1)); + any(Object.class), eq(container1), eq(null)); taskScheduler.shutdown(); taskSchedulerManager.close(); } @@ -1468,20 +1468,20 @@ eventHandler, rmClient, new AlwaysMatchesContainerMatcher(), taskScheduler.onContainersAllocated(Collections.singletonList(container1)); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); - verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta11), any(Object.class), eq(container1)); + verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta11), any(Object.class), eq(container1), eq(null)); // Second container allocated, should start ta13 taskScheduler.onContainersAllocated(Collections.singletonList(container2)); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); - verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta13), any(Object.class), eq(container2)); + verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta13), any(Object.class), eq(container2), eq(null)); // ta11 finished, should start ta12 taskSchedulerManager.handleEvent(new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED, null, null, 0)); drainableAppCallback.drain(); verifyDeAllocateTask(taskScheduler, ta11, true, null, null); - verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta12), any(Object.class), eq(container1)); + verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta12), any(Object.class), eq(container1), eq(null)); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); eventHandler.reset(); @@ -1491,7 +1491,7 @@ eventHandler, rmClient, new AlwaysMatchesContainerMatcher(), TaskAttemptState.SUCCEEDED, null, null, 0)); drainableAppCallback.drain(); verifyDeAllocateTask(taskScheduler, ta13, true, null, null); - verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta14), any(Object.class), eq(container2)); + verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta14), any(Object.class), eq(container2), eq(null)); verify(rmClient, times(0)).releaseAssignedContainer(eq(container2.getId())); eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); eventHandler.reset(); @@ -1610,14 +1610,14 @@ eventHandler, rmClient, new ContainerContextMatcher(), taskScheduler.onContainersAllocated(Collections.singletonList(container1)); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); - verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta11), any(Object.class), eq(container1)); + verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta11), any(Object.class), eq(container1), eq(null)); // finish ta11, should start ta13 taskSchedulerManager.handleEvent(new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED, null, null, 0)); drainableAppCallback.drain(); verifyDeAllocateTask(taskScheduler, ta11, true, null, null); - verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta13), any(Object.class), eq(container1)); + verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta13), any(Object.class), eq(container1), eq(null)); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); eventHandler.reset(); @@ -1635,7 +1635,7 @@ eventHandler, rmClient, new ContainerContextMatcher(), taskScheduler.onContainersAllocated(Collections.singletonList(container2)); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); - verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta12), any(Object.class), eq(container2)); + verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta12), any(Object.class), eq(container2), eq(null)); // ta12 finished, cannot reuse container, should release container2 taskSchedulerManager.handleEvent(new AMSchedulerEventTAEnded(ta12, container2.getId(), @@ -1650,7 +1650,7 @@ eventHandler, rmClient, new ContainerContextMatcher(), taskScheduler.onContainersAllocated(Collections.singletonList(container3)); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); - verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta14), any(Object.class), eq(container3)); + verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta14), any(Object.class), eq(container3), eq(null)); // ta14 finished, should release container3 taskSchedulerManager.handleEvent(new AMSchedulerEventTAEnded(ta14, container3.getId(), diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java index b7acc6876c..5f38460ea5 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java @@ -261,6 +261,12 @@ public void taskAllocated(Object task, Object appCookie, Container container) { real.taskAllocated(task, appCookie, container); } + @Override + public void taskAllocated(Object task, Object appCookie, Container container, + Object taskSchedulingInfo) { + taskAllocated(task, appCookie, container); + } + @Override public void containerCompleted(Object taskLastAllocated, ContainerStatus containerStatus) { diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java index dcf9a5dd69..5001ed67c7 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java @@ -210,13 +210,14 @@ public void testSimpleAllocate() throws Exception { AMSchedulerEventTALaunchRequest lr = new AMSchedulerEventTALaunchRequest(mockAttemptId, resource, null, mockTaskAttempt, locHint, priority, containerContext, 0, 0, 0); - schedulerHandler.taskAllocated(0, mockTaskAttempt, lr, container); + schedulerHandler.taskAllocated(0, mockTaskAttempt, lr, container, "extraInfo"); assertEquals(1, mockEventHandler.events.size()); assertTrue(mockEventHandler.events.get(0) instanceof AMContainerEventAssignTA); AMContainerEventAssignTA assignEvent = (AMContainerEventAssignTA) mockEventHandler.events.get(0); assertEquals(priority, assignEvent.getPriority()); assertEquals(mockAttemptId, assignEvent.getTaskAttemptId()); + assertEquals("extraInfo", assignEvent.getTaskSchedulingInfo()); } @Test(timeout = 5000) @@ -250,7 +251,7 @@ public void testTASucceededAfterContainerCleanup() throws Exception { AMSchedulerEventTALaunchRequest lr = new AMSchedulerEventTALaunchRequest(mockAttemptId, resource, null, mockTaskAttempt, locHint, priority, containerContext, 0, 0, 0); - schedulerHandler.taskAllocated(0, mockTaskAttempt, lr, container); + schedulerHandler.taskAllocated(0, mockTaskAttempt, lr, container, null); assertEquals(1, mockEventHandler.events.size()); assertTrue(mockEventHandler.events.get(0) instanceof AMContainerEventAssignTA); AMContainerEventAssignTA assignEvent = diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java index d3614d9ff2..ed914e9e00 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java @@ -132,7 +132,7 @@ public void tetSingleSuccessfulTaskFlow() { verify(wc.chh).register(wc.containerID); ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class); verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), - eq(0)); + eq(0), eq(null)); assertEquals(1, argumentCaptor.getAllValues().size()); assertEquals(wc.taskAttemptID, argumentCaptor.getAllValues().get(0).getTask().getTaskAttemptID()); assertEquals(WrappedContainer.taskPriority, argumentCaptor.getAllValues().get(0).getPriority()); @@ -186,7 +186,7 @@ public void testSingleSuccessfulTaskFlow2() { assertEquals(wc.taskAttemptID, wc.amContainer.getCurrentTaskAttempt()); ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class); verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), - eq(0)); + eq(0), eq(null)); assertEquals(1, argumentCaptor.getAllValues().size()); assertEquals(wc.taskAttemptID, argumentCaptor.getAllValues().get(0).getTask().getTaskAttemptID()); @@ -240,7 +240,7 @@ public void tetMultipleSuccessfulTaskFlow() { verify(wc.chh).register(wc.containerID); ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class); verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), - eq(0)); + eq(0), eq(null)); assertEquals(1, argumentCaptor.getAllValues().size()); assertEquals(wc.taskAttemptID, argumentCaptor.getAllValues().get(0).getTask().getTaskAttemptID()); @@ -257,7 +257,7 @@ public void tetMultipleSuccessfulTaskFlow() { wc.verifyState(AMContainerState.RUNNING); argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class); verify(wc.tal, times(2)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), - eq(0)); + eq(0), eq(null)); assertEquals(2, argumentCaptor.getAllValues().size()); assertEquals(taId2, argumentCaptor.getAllValues().get(1).getTask().getTaskAttemptID()); @@ -1070,7 +1070,7 @@ public void testLocalResourceAddition() { wc.containerLaunched(); wc.assignTaskAttempt(wc.taskAttemptID); ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class); - verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0)); + verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0), eq(null)); AMContainerTask task1 = argumentCaptor.getAllValues().get(0); assertEquals(0, task1.getAdditionalResources().size()); wc.taskAttemptSucceeded(wc.taskAttemptID); @@ -1081,9 +1081,9 @@ public void testLocalResourceAddition() { additionalResources.put(rsrc3, createLocalResource(rsrc3)); TezTaskAttemptID taID2 = TezTaskAttemptID.getInstance(wc.taskID, 2); - wc.assignTaskAttempt(taID2, additionalResources, new Credentials()); + wc.assignTaskAttempt(taID2, additionalResources, new Credentials(), null); argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class); - verify(wc.tal, times(2)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0)); + verify(wc.tal, times(2)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0), eq(null)); AMContainerTask task2 = argumentCaptor.getAllValues().get(1); Map pullTaskAdditionalResources = task2.getAdditionalResources(); assertEquals(2, pullTaskAdditionalResources.size()); @@ -1104,9 +1104,9 @@ public void testLocalResourceAddition() { // Try launching another task with the same reosurces as Task2. Verify the // task is not asked to re-localize again. TezTaskAttemptID taID3 = TezTaskAttemptID.getInstance(wc.taskID, 3); - wc.assignTaskAttempt(taID3, new HashMap(), new Credentials()); + wc.assignTaskAttempt(taID3, new HashMap(), new Credentials(), null); argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class); - verify(wc.tal, times(3)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0)); + verify(wc.tal, times(3)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0), eq(null)); AMContainerTask task3 = argumentCaptor.getAllValues().get(2); assertEquals(0, task3.getAdditionalResources().size()); wc.taskAttemptSucceeded(taID3); @@ -1157,18 +1157,18 @@ public void testCredentialsTransfer() { wc.launchContainer(new HashMap(), containerCredentials); wc.containerLaunched(); - wc.assignTaskAttempt(attempt11, LRs, dag1Credentials); + wc.assignTaskAttempt(attempt11, LRs, dag1Credentials, null); argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class); - verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0)); + verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0), eq(null)); fetchedTask = argumentCaptor.getAllValues().get(0); assertTrue(fetchedTask.haveCredentialsChanged()); assertNotNull(fetchedTask.getCredentials()); assertNotNull(fetchedTask.getCredentials().getToken(token1Name)); wc.taskAttemptSucceeded(attempt11); - wc.assignTaskAttempt(attempt12, LRs, dag1Credentials); + wc.assignTaskAttempt(attempt12, LRs, dag1Credentials, null); argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class); - verify(wc.tal, times(2)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0)); + verify(wc.tal, times(2)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0), eq(null)); fetchedTask = argumentCaptor.getAllValues().get(1); assertFalse(fetchedTask.haveCredentialsChanged()); assertNull(fetchedTask.getCredentials()); @@ -1176,17 +1176,17 @@ public void testCredentialsTransfer() { // Move to running a second DAG, with no credentials. wc.setNewDAGID(dagID2); - wc.assignTaskAttempt(attempt21, LRs, null); + wc.assignTaskAttempt(attempt21, LRs, null, null); argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class); - verify(wc.tal, times(3)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0)); + verify(wc.tal, times(3)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0), eq(null)); fetchedTask = argumentCaptor.getAllValues().get(2); assertTrue(fetchedTask.haveCredentialsChanged()); assertNull(fetchedTask.getCredentials()); wc.taskAttemptSucceeded(attempt21); - wc.assignTaskAttempt(attempt22, LRs, null); + wc.assignTaskAttempt(attempt22, LRs, null, null); argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class); - verify(wc.tal, times(4)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0)); + verify(wc.tal, times(4)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0), eq(null)); fetchedTask = argumentCaptor.getAllValues().get(3); assertFalse(fetchedTask.haveCredentialsChanged()); assertNull(fetchedTask.getCredentials()); @@ -1194,9 +1194,9 @@ public void testCredentialsTransfer() { // Move to running a third DAG, with Credentials this time wc.setNewDAGID(dagID3); - wc.assignTaskAttempt(attempt31, LRs , dag3Credentials); + wc.assignTaskAttempt(attempt31, LRs , dag3Credentials, null); argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class); - verify(wc.tal, times(5)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0)); + verify(wc.tal, times(5)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0), eq(null)); fetchedTask = argumentCaptor.getAllValues().get(4); assertTrue(fetchedTask.haveCredentialsChanged()); assertNotNull(fetchedTask.getCredentials()); @@ -1204,15 +1204,73 @@ public void testCredentialsTransfer() { assertNull(fetchedTask.getCredentials().getToken(token1Name)); wc.taskAttemptSucceeded(attempt31); - wc.assignTaskAttempt(attempt32, LRs, dag1Credentials); + wc.assignTaskAttempt(attempt32, LRs, dag1Credentials, null); argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class); - verify(wc.tal, times(6)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0)); + verify(wc.tal, times(6)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0), eq(null)); fetchedTask = argumentCaptor.getAllValues().get(5); assertFalse(fetchedTask.haveCredentialsChanged()); assertNull(fetchedTask.getCredentials()); wc.taskAttemptSucceeded(attempt32); } + @Test (timeout=5000) + public void testTaskSchedulingInfo() { + WrappedContainer wc = new WrappedContainer(); + + wc.verifyState(AMContainerState.ALLOCATED); + + // Launch request. + wc.launchContainer(); + wc.verifyState(AMContainerState.LAUNCHING); + // 1 Launch request. + wc.verifyCountAndGetOutgoingEvents(1); + verify(wc.tal).registerRunningContainer(wc.containerID, 0); + assertNull(wc.amContainer.getCurrentTaskAttempt()); + + // Assign task. + long currTime = wc.appContext.getClock().getTime(); + wc.assignTaskAttempt(wc.taskAttemptID, new HashMap(), new Credentials(), "extraSchedulingInfo"); + wc.verifyState(AMContainerState.LAUNCHING); + wc.verifyNoOutgoingEvents(); + assertEquals(wc.taskAttemptID, wc.amContainer.getCurrentTaskAttempt()); + assertTrue(wc.amContainer.getCurrentTaskAttemptAllocationTime() > 0); + assertTrue(wc.amContainer.getCurrentTaskAttemptAllocationTime() >= currTime); + + // Container Launched + wc.containerLaunched(); + wc.verifyState(AMContainerState.RUNNING); + wc.verifyNoOutgoingEvents(); + assertEquals(wc.taskAttemptID, wc.amContainer.getCurrentTaskAttempt()); + // Once for the previous NO_TASKS, one for the actual task. + verify(wc.chh).register(wc.containerID); + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class); + verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), + eq(0), eq("extraSchedulingInfo")); + assertEquals(1, argumentCaptor.getAllValues().size()); + assertEquals(wc.taskAttemptID, argumentCaptor.getAllValues().get(0).getTask().getTaskAttemptID()); + assertEquals(WrappedContainer.taskPriority, argumentCaptor.getAllValues().get(0).getPriority()); + + // Attempt succeeded + wc.taskAttemptSucceeded(wc.taskAttemptID); + wc.verifyState(AMContainerState.IDLE); + wc.verifyNoOutgoingEvents(); + assertNull(wc.amContainer.getCurrentTaskAttempt()); + verifyUnregisterTaskAttempt(wc.tal, wc.taskAttemptID, 0, TaskAttemptEndReason.OTHER, null); + + // Container completed + wc.containerCompleted(); + wc.verifyHistoryStopEvent(); + wc.verifyState(AMContainerState.COMPLETED); + List outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1); + verifyUnOrderedOutgoingEventTypes(outgoingEvents, + AMNodeEventType.N_CONTAINER_COMPLETED); + verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.COMPLETED, null); + verify(wc.chh).unregister(wc.containerID); + + assertEquals(1, wc.amContainer.getAllTaskAttempts().size()); + assertFalse(wc.amContainer.isInErrorState()); + } + // TODO Verify diagnostics in most of the tests. static class WrappedContainer { @@ -1349,15 +1407,15 @@ public void launchContainer(Map localResources, Credentia } public void assignTaskAttempt(TezTaskAttemptID taID) { - assignTaskAttempt(taID, new HashMap(), new Credentials()); + assignTaskAttempt(taID, new HashMap(), new Credentials(), null); } public void assignTaskAttempt(TezTaskAttemptID taID, - Map additionalResources, Credentials credentials) { + Map additionalResources, Credentials credentials, Object taskSchedulingInfo) { reset(eventHandler); doReturn(taID).when(taskSpec).getTaskAttemptID(); amContainer.handle(new AMContainerEventAssignTA(containerID, taID, taskSpec, - additionalResources, credentials, taskPriority)); + additionalResources, credentials, taskPriority, taskSchedulingInfo)); } public void containerLaunched() { diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java index d7d0f01126..9b976a0bf4 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java @@ -45,6 +45,12 @@ public class TezTestServiceTaskSchedulerService extends TaskScheduler { + /** + * This key is used to instruct scheduler to pass extra information to + * communicator along with task allocation + */ + public static final String TEST_PASS_CONTAINERID_WITH_ALLOCATION = "test.tez.pass.containerid.with.allocation"; + private static final Logger LOG = LoggerFactory.getLogger(TezTestServiceTaskSchedulerService.class); @@ -53,6 +59,7 @@ public class TezTestServiceTaskSchedulerService extends TaskScheduler { private final Random random = new Random(); // Currently all services must be running on the same port. private final int containerPort; + private final boolean passPerTaskInfo; private final ConcurrentMap runningTasks = new ConcurrentHashMap(); @@ -83,6 +90,7 @@ public TezTestServiceTaskSchedulerService(TaskSchedulerContext taskSchedulerCont } catch (IOException e) { throw new TezUncheckedException(e); } + this.passPerTaskInfo = conf.getBoolean(TEST_PASS_CONTAINERID_WITH_ALLOCATION, false); this.memoryPerInstance = conf .getInt(TezTestServiceConfConstants.TEZ_TEST_SERVICE_MEMORY_PER_INSTANCE_MB, -1); Preconditions.checkArgument(memoryPerInstance > 0, @@ -166,7 +174,11 @@ public void allocateTask(Object task, Resource capability, String[] hosts, Strin Container container = containerFactory.createContainer(resourcePerContainer, priority, host, containerPort); runningTasks.put(task, container.getId()); - getContext().taskAllocated(task, clientCookie, container); + if (passPerTaskInfo) { + getContext().taskAllocated(task, clientCookie, container, container.getId()); + } else { + getContext().taskAllocated(task, clientCookie, container); + } } @@ -177,7 +189,11 @@ public void allocateTask(Object task, Resource capability, ContainerId container Container container = containerFactory.createContainer(resourcePerContainer, priority, host, containerPort); runningTasks.put(task, container.getId()); - getContext().taskAllocated(task, clientCookie, container); + if (passPerTaskInfo) { + getContext().taskAllocated(task, clientCookie, container, container.getId()); + } else { + getContext().taskAllocated(task, clientCookie, container); + } } @Override diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java index 377217b0cf..65b86b8317 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java @@ -29,8 +29,10 @@ import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.tez.dag.app.rm.TezTestServiceTaskSchedulerService; import org.apache.tez.runtime.api.TaskFailureType; import org.apache.tez.serviceplugins.api.ContainerEndReason; +import org.apache.tez.serviceplugins.api.ServicePluginException; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.apache.tez.serviceplugins.api.TaskCommunicatorContext; import org.apache.tez.dag.app.TezTaskCommunicatorImpl; @@ -53,12 +55,22 @@ public class TezTestServiceTaskCommunicatorImpl extends TezTaskCommunicatorImpl private final SubmitWorkRequestProto BASE_SUBMIT_WORK_REQUEST; private final ConcurrentMap credentialMap; + /** + * This is to test if scheduler is able to pass custom info to communicator. Tez + * does not make use of this mechanism by default. External implementations of + * scheduler and communicator may make use of this feature. + */ + private final boolean expectContainerIdAsExtraInfo; + public TezTestServiceTaskCommunicatorImpl( TaskCommunicatorContext taskCommunicatorContext) { super(taskCommunicatorContext); // TODO Maybe make this configurable this.communicator = new TezTestServiceCommunicator(3); + this.expectContainerIdAsExtraInfo = + conf.getBoolean(TezTestServiceTaskSchedulerService.TEST_PASS_CONTAINERID_WITH_ALLOCATION, false); + SubmitWorkRequestProto.Builder baseBuilder = SubmitWorkRequestProto.newBuilder(); baseBuilder.setUser(System.getProperty("user.name")); @@ -175,6 +187,18 @@ public void indicateError(Throwable t) { }); } + @Override + public void registerRunningTaskAttempt(ContainerId containerId, TaskSpec taskSpec, + Map additionalResources, Credentials credentials, boolean credentialsChanged, int priority, + Object taskSchedulerInfo) throws ServicePluginException { + if (expectContainerIdAsExtraInfo && !containerId.equals(taskSchedulerInfo)) { + throw new RuntimeException("Expected containerId " + containerId + + " as custom info from scheduler but got " + taskSchedulerInfo); + } + super.registerRunningTaskAttempt(containerId, taskSpec, additionalResources, credentials, credentialsChanged, + priority, taskSchedulerInfo); + } + @Override public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID, TaskAttemptEndReason endReason, String diagnostics) { super.unregisterRunningTaskAttempt(taskAttemptID, endReason, diagnostics); diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestCustomSchedulerCommunicator.java b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestCustomSchedulerCommunicator.java new file mode 100644 index 0000000000..76b48b7d2e --- /dev/null +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestCustomSchedulerCommunicator.java @@ -0,0 +1,207 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.tests; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.tez.client.TezClient; +import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.dag.api.Vertex; +import org.apache.tez.dag.app.launcher.TezTestServiceNoOpContainerLauncher; +import org.apache.tez.dag.app.rm.TezTestServiceTaskSchedulerService; +import org.apache.tez.dag.app.taskcomm.TezTestServiceTaskCommunicatorImpl; +import org.apache.tez.examples.HashJoinExample; +import org.apache.tez.examples.JoinDataGen; +import org.apache.tez.examples.JoinValidateConfigured; +import org.apache.tez.service.MiniTezTestServiceCluster; +import org.apache.tez.serviceplugins.api.ContainerLauncherDescriptor; +import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor; +import org.apache.tez.serviceplugins.api.TaskCommunicatorDescriptor; +import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class TestCustomSchedulerCommunicator { + + private static final Logger LOG = LoggerFactory.getLogger(TestCustomSchedulerCommunicator.class); + + + private static final String EXT_PUSH_ENTITY_NAME = "ExtServiceTestPush"; + + private static String TEST_ROOT_DIR = + "target" + Path.SEPARATOR + TestCustomSchedulerCommunicator.class.getName() + + "-tmpDir"; + + private static final Path SRC_DATA_DIR = new Path(TEST_ROOT_DIR + Path.SEPARATOR + "data"); + private static final Path HASH_JOIN_EXPECTED_RESULT_PATH = + new Path(SRC_DATA_DIR, "expectedOutputPath"); + private static final Path HASH_JOIN_OUTPUT_PATH = new Path(SRC_DATA_DIR, "outPath"); + + private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_EXT_SERVICE_PUSH = + Vertex.VertexExecutionContext.create( + EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME); + + private static volatile Configuration clusterConf = new Configuration(); + private static volatile FileSystem localFs; + private static volatile MiniTezTestServiceCluster tezTestServiceCluster; + + private static volatile Configuration confForJobs; + + @BeforeClass + public static void setup() throws Exception { + + localFs = FileSystem.getLocal(clusterConf).getRaw(); + long jvmMax = Runtime.getRuntime().maxMemory(); + tezTestServiceCluster = MiniTezTestServiceCluster + .create(TestExternalTezServices.class.getSimpleName(), 3, ((long) (jvmMax * 0.5d)), 1); + tezTestServiceCluster.init(clusterConf); + tezTestServiceCluster.start(); + LOG.info("MiniTezTestServer started"); + + confForJobs = new Configuration(clusterConf); + for (Map.Entry entry : tezTestServiceCluster + .getClusterSpecificConfiguration()) { + confForJobs.set(entry.getKey(), entry.getValue()); + } + confForJobs.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true); + confForJobs.setBoolean(TezTestServiceTaskSchedulerService.TEST_PASS_CONTAINERID_WITH_ALLOCATION, true); + } + + @AfterClass + public static void tearDown() throws IOException, TezException { + if (tezTestServiceCluster != null) { + tezTestServiceCluster.stop(); + tezTestServiceCluster = null; + } + + Path testRootDirPath = new Path(TEST_ROOT_DIR); + testRootDirPath = localFs.makeQualified(testRootDirPath); + LOG.info("CLeaning up path: " + testRootDirPath); + localFs.delete(testRootDirPath, true); + } + + + @Test(timeout = 30000) + public void test1() throws Exception { + + UserPayload userPayload = TezUtils.createUserPayloadFromConf(confForJobs); + + TaskSchedulerDescriptor[] taskSchedulerDescriptors = new TaskSchedulerDescriptor[]{ + TaskSchedulerDescriptor + .create(EXT_PUSH_ENTITY_NAME, TezTestServiceTaskSchedulerService.class.getName()) + .setUserPayload(userPayload)}; + + ContainerLauncherDescriptor[] containerLauncherDescriptors = new ContainerLauncherDescriptor[]{ + ContainerLauncherDescriptor + .create(EXT_PUSH_ENTITY_NAME, TezTestServiceNoOpContainerLauncher.class.getName()) + .setUserPayload(userPayload)}; + + TaskCommunicatorDescriptor[] taskCommunicatorDescriptors = new TaskCommunicatorDescriptor[]{ + TaskCommunicatorDescriptor + .create(EXT_PUSH_ENTITY_NAME, TezTestServiceTaskCommunicatorImpl.class.getName()) + .setUserPayload(userPayload)}; + + ServicePluginsDescriptor servicePluginsDescriptor = ServicePluginsDescriptor.create(true, false, + taskSchedulerDescriptors, containerLauncherDescriptors, taskCommunicatorDescriptors); + + + TezConfiguration tezConf = new TezConfiguration(confForJobs); + + TezClient tezClient = TezClient.newBuilder("test1", tezConf).setIsSession(true) + .setServicePluginDescriptor(servicePluginsDescriptor).build(); + try { + tezClient.start(); + + + Path dataPath1 = new Path(SRC_DATA_DIR, "inPath1"); + Path dataPath2 = new Path(SRC_DATA_DIR, "inPath2"); + + Path expectedResultPath = new Path(SRC_DATA_DIR, "expectedOutputPath"); + + + JoinDataGen dataGen = new JoinDataGen(); + String[] dataGenArgs = new String[]{ + dataPath1.toString(), "1048576", dataPath2.toString(), "524288", + expectedResultPath.toString(), "2"}; + + assertEquals(0, dataGen.run(tezConf, dataGenArgs, tezClient)); + + Path outputPath = new Path(SRC_DATA_DIR, "outPath"); + HashJoinExample joinExample = new HashJoinExample(); + String[] args = new String[]{ + dataPath1.toString(), dataPath2.toString(), "2", outputPath.toString()}; + assertEquals(0, joinExample.run(tezConf, args, tezClient)); + LOG.info("Completed generating Data - Expected Hash Result and Actual Join Result"); + + assertEquals(0, tezTestServiceCluster.getNumSubmissions()); + + // ext can consume from ext. + runJoinValidate(tezClient, "allInExt", 7, EXECUTION_CONTEXT_EXT_SERVICE_PUSH, + EXECUTION_CONTEXT_EXT_SERVICE_PUSH, EXECUTION_CONTEXT_EXT_SERVICE_PUSH); + LOG.info("Completed allInExt"); + + // uber can consume from uber. + runJoinValidate(tezClient, "noneInExt", 0, null, null, null); + LOG.info("Completed noneInExt"); + + // uber can consume from ext + runJoinValidate(tezClient, "lhsInExt", 2, EXECUTION_CONTEXT_EXT_SERVICE_PUSH, null, null); + LOG.info("Completed lhsInExt"); + + // ext cannot consume from uber in this mode since there's no shuffle handler working, + // and the local data transfer semantics may not match. + + } finally { + tezClient.stop(); + } + + } + + private void runJoinValidate(TezClient tezClient, String name, int extExpectedCount, + Vertex.VertexExecutionContext lhsContext, + Vertex.VertexExecutionContext rhsContext, + Vertex.VertexExecutionContext validateContext) throws + Exception { + int externalSubmissionCount = tezTestServiceCluster.getNumSubmissions(); + + TezConfiguration tezConf = new TezConfiguration(confForJobs); + JoinValidateConfigured joinValidate = + new JoinValidateConfigured(null, lhsContext, rhsContext, + validateContext, name); + String[] validateArgs = new String[]{"-disableSplitGrouping", + HASH_JOIN_EXPECTED_RESULT_PATH.toString(), HASH_JOIN_OUTPUT_PATH.toString(), "3"}; + assertEquals(0, joinValidate.run(tezConf, validateArgs, tezClient)); + + // Ensure this was actually submitted to the external cluster + assertEquals(extExpectedCount, + (tezTestServiceCluster.getNumSubmissions() - externalSubmissionCount)); + } +}