Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,19 @@ void taskAllocated(Object task,
Object appCookie,
Container container);

/**
* Indicate to the framework that a container is being assigned to a task.
*
* @param task the task for which a container is being assigned. This should be the same
* instance that was provided when requesting for an allocation
* @param appCookie the cookie which was provided while requesting allocation for this task
* @param container the actual container assigned to the task
* @param taskSchedulingInfo task specific scheduling information to be passed to communicator
*/
void taskAllocated(Object task,
Object appCookie,
Container container, Object taskSchedulingInfo);


/**
* Indicate to the framework that a container has completed. This is typically used by sources
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

Expand Down Expand Up @@ -538,7 +539,8 @@ public void unregisterRunningContainer(ContainerId containerId, int taskCommId,

@Override
public void registerTaskAttempt(AMContainerTask amContainerTask,
ContainerId containerId, int taskCommId) {
ContainerId containerId, int taskCommId,
Object taskSchedulerInfo) {
ContainerInfo containerInfo = registeredContainers.get(containerId);
if (containerInfo == null) {
throw new TezUncheckedException("Registering task attempt: "
Expand All @@ -564,7 +566,7 @@ public void registerTaskAttempt(AMContainerTask amContainerTask,
try {
taskCommunicators[taskCommId].registerRunningTaskAttempt(containerId, amContainerTask.getTask(),
amContainerTask.getAdditionalResources(), amContainerTask.getCredentials(),
amContainerTask.haveCredentialsChanged(), amContainerTask.getPriority());
amContainerTask.haveCredentialsChanged(), amContainerTask.getPriority(), taskSchedulerInfo);
} catch (Exception e) {
String msg = "Error in TaskCommunicator when registering Task Attempt"
+ ", communicator=" + Utils.getTaskCommIdentifierString(taskCommId, context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,17 @@
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.rm.container.AMContainerTask;
import org.apache.tez.dag.records.TezTaskAttemptID;

import java.util.Map;

/**
* This class listens for changes to the state of a Task.
*/
public interface TaskCommunicatorManagerInterface {

void registerRunningContainer(ContainerId containerId, int taskCommId);

void registerTaskAttempt(AMContainerTask amContainerTask, ContainerId containerId, int taskCommId);
void registerTaskAttempt(AMContainerTask amContainerTask, ContainerId containerId, int taskCommId, Object taskSchedulerInfo);

void unregisterRunningContainer(ContainerId containerId, int taskCommId, ContainerEndReason endReason, String diagnostics);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ public void registerContainerEnd(ContainerId containerId, ContainerEndReason end
public void registerRunningTaskAttempt(ContainerId containerId, TaskSpec taskSpec,
Map<String, LocalResource> additionalResources,
Credentials credentials, boolean credentialsChanged,
int priority) throws Exception {
real.registerRunningTaskAttempt(containerId, taskSpec, additionalResources, credentials, credentialsChanged, priority);
int priority, Object taskSchedulerInfo) throws Exception {
real.registerRunningTaskAttempt(containerId, taskSpec, additionalResources, credentials, credentialsChanged,
priority, taskSchedulerInfo);
}

public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -66,7 +68,12 @@ public TaskSchedulerContextImpl(TaskSchedulerManager taskSchedulerManager, AppCo
// taskAllocated() upcall and deallocateTask() downcall
@Override
public void taskAllocated(Object task, Object appCookie, Container container) {
taskSchedulerManager.taskAllocated(schedulerId, task, appCookie, container);
taskAllocated(task, appCookie, container, null);
}

@Override
public void taskAllocated(Object task, Object appCookie, Container container, Object taskSchedulingInfo) {
taskSchedulerManager.taskAllocated(schedulerId, task, appCookie, container, taskSchedulingInfo);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,13 @@ public void taskAllocated(Object task, Object appCookie, Container container) {
container));
}

@Override
public void taskAllocated(Object task, Object appCookie, Container container,
Object taskSchedulingInfo) {
executorService.submit(new TaskAllocatedCallableWithCustomInfo(real, task, appCookie,
container, taskSchedulingInfo));
}

@Override
public void containerCompleted(Object taskLastAllocated,
ContainerStatus containerStatus) {
Expand Down Expand Up @@ -226,6 +233,29 @@ public Void call() throws Exception {
}
}

static class TaskAllocatedCallableWithCustomInfo extends TaskSchedulerContextCallbackBase
implements Callable<Void> {
private final Object task;
private final Object appCookie;
private final Container container;
private final Object taskSchedulingInfo;

public TaskAllocatedCallableWithCustomInfo(TaskSchedulerContext app, Object task,
Object appCookie, Container container, Object taskSchedulingInfo) {
super(app);
this.task = task;
this.appCookie = appCookie;
this.container = container;
this.taskSchedulingInfo = taskSchedulingInfo;
}

@Override
public Void call() throws Exception {
app.taskAllocated(task, appCookie, container, taskSchedulingInfo);
return null;
}
}

static class ContainerCompletedCallable extends TaskSchedulerContextCallbackBase
implements Callable<Void> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,7 @@ public void serviceStop() throws InterruptedException {
// TaskSchedulerAppCallback methods with schedulerId, where relevant
public synchronized void taskAllocated(int schedulerId, Object task,
Object appCookie,
Container container) {
Container container, Object taskSchedulingInfo) {
AMSchedulerEventTALaunchRequest event =
(AMSchedulerEventTALaunchRequest) appCookie;
ContainerId containerId = container.getId();
Expand Down Expand Up @@ -765,7 +765,7 @@ public synchronized void taskAllocated(int schedulerId, Object task,
}
sendEvent(new AMContainerEventAssignTA(containerId, taskAttempt.getID(),
event.getRemoteTaskSpec(), event.getContainerContext().getLocalResources(), event
.getContainerContext().getCredentials(), event.getPriority()));
.getContainerContext().getCredentials(), event.getPriority(), taskSchedulingInfo));
}

public synchronized void containerCompleted(int schedulerId, Object task, ContainerStatus containerStatus) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,18 @@ public class AMContainerEventAssignTA extends AMContainerEvent {
private final Map<String, LocalResource> taskLocalResources;
private final Credentials credentials;
private final int priority;
private final Object taskSchedulingInfo;

public AMContainerEventAssignTA(ContainerId containerId, TezTaskAttemptID attemptId,
Object remoteTaskSpec, Map<String, LocalResource> taskLocalResources, Credentials credentials,
int priority) {
int priority, Object taskSchedulingInfo) {
super(containerId, AMContainerEventType.C_ASSIGN_TA);
this.attemptId = attemptId;
this.remoteTaskSpec = (TaskSpec) remoteTaskSpec;
this.taskLocalResources = taskLocalResources;
this.credentials = credentials;
this.priority = priority;
this.taskSchedulingInfo = taskSchedulingInfo;
}

public TaskSpec getRemoteTaskSpec() {
Expand All @@ -64,4 +66,8 @@ public Credentials getCredentials() {
public int getPriority() {
return priority;
}

public Object getTaskSchedulingInfo() {
return taskSchedulingInfo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ public AMContainerState transition(
event.getRemoteTaskSpec(), container.additionalLocalResources,
container.credentialsChanged ? container.credentials : null, container.credentialsChanged,
event.getPriority());
container.registerAttemptWithListener(amContainerTask);
container.registerAttemptWithListener(amContainerTask, event.getTaskSchedulingInfo());
container.additionalLocalResources = null;
container.credentialsChanged = false;
if (container.getState() == AMContainerState.IDLE) {
Expand Down Expand Up @@ -1180,8 +1180,8 @@ protected void unregisterAttemptFromListener(TezTaskAttemptID attemptId, TaskAtt
taskCommunicatorManagerInterface.unregisterTaskAttempt(attemptId, taskCommId, endReason, diagnostics);
}

protected void registerAttemptWithListener(AMContainerTask amContainerTask) {
taskCommunicatorManagerInterface.registerTaskAttempt(amContainerTask, this.containerId, taskCommId);
protected void registerAttemptWithListener(AMContainerTask amContainerTask, Object taskSchedulingInfo) {
taskCommunicatorManagerInterface.registerTaskAttempt(amContainerTask, this.containerId, taskCommId, taskSchedulingInfo);
}

protected void registerWithTAListener() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,33 @@ public abstract void registerRunningTaskAttempt(ContainerId containerId, TaskSpe
boolean credentialsChanged, int priority) throws
ServicePluginException;

/**
* Register a task attempt to execute on a container. Communicator implementation must override
* this method rather than {@link #registerRunningTaskAttempt(ContainerId, TaskSpec, Map, Credentials, boolean, int)}
* if scheduler attaches custom info to tasks.
*
* @param containerId the containerId on which this task needs to run
* @param taskSpec the task specifications for the task to be executed
* @param additionalResources additional local resources which may be required to run this task
* on
* the container
* @param credentials the credentials required to run this task
* @param credentialsChanged whether the credentials are different from the original credentials
* associated with this container
* @param priority the priority of the task being executed
* @param taskSchedulerInfo task specific info created by scheduler
* @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
* This will cause the app to shutdown.
*/
public void registerRunningTaskAttempt(ContainerId containerId, TaskSpec taskSpec,
Map<String, LocalResource> additionalResources,
Credentials credentials,
boolean credentialsChanged, int priority,
Object taskSchedulerInfo) throws ServicePluginException {
registerRunningTaskAttempt(containerId, taskSpec, additionalResources, credentials, credentialsChanged,
priority);
}

/**
* Register the completion of a task. This may be a result of preemption, the container dying,
* the node dying, the task completing to success
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public void testGetTask() throws IOException {
assertNull(containerTask);

// Valid task registered
taskAttemptListener.registerTaskAttempt(amContainerTask, containerId2, 0);
taskAttemptListener.registerTaskAttempt(amContainerTask, containerId2, 0, null);
containerTask = tezUmbilical.getTask(containerContext2);
assertFalse(containerTask.shouldDie());
assertEquals(taskSpec, containerTask.getTaskSpec());
Expand All @@ -209,7 +209,7 @@ public void testGetTask() throws IOException {
TezTaskAttemptID taskAttemptId2 = mock(TezTaskAttemptID.class);
doReturn(taskAttemptId2).when(taskSpec2).getTaskAttemptID();
AMContainerTask amContainerTask2 = new AMContainerTask(taskSpec, null, null, false, 0);
taskAttemptListener.registerTaskAttempt(amContainerTask2, containerId3, 0);
taskAttemptListener.registerTaskAttempt(amContainerTask2, containerId3, 0, null);
taskAttemptListener.unregisterRunningContainer(containerId3, 0, ContainerEndReason.OTHER, null);
containerTask = tezUmbilical.getTask(containerContext3);
assertTrue(containerTask.shouldDie());
Expand All @@ -229,7 +229,7 @@ public void testGetTaskMultiplePulls() throws IOException {
assertNull(containerTask);

// Register task
taskAttemptListener.registerTaskAttempt(amContainerTask, containerId1, 0);
taskAttemptListener.registerTaskAttempt(amContainerTask, containerId1, 0, null);
containerTask = tezUmbilical.getTask(containerContext1);
assertFalse(containerTask.shouldDie());
assertEquals(taskSpec, containerTask.getTaskSpec());
Expand Down Expand Up @@ -423,7 +423,7 @@ private TaskHeartbeatResponse generateHeartbeat(List<TezEvent> events,
doReturn(eventInfo).when(vertex).getTaskAttemptTezEvents(taskAttemptID, fromEventId, 0, maxEvents);

taskAttemptListener.registerRunningContainer(containerId, 0);
taskAttemptListener.registerTaskAttempt(amContainerTask, containerId, 0);
taskAttemptListener.registerTaskAttempt(amContainerTask, containerId, 0, null);

TaskHeartbeatRequest request = mock(TaskHeartbeatRequest.class);
doReturn(containerId.toString()).when(request).getContainerIdentifier();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ private void registerRunningContainer(ContainerId containerId) {
}

private void registerTaskAttempt(ContainerId containerId, AMContainerTask amContainerTask) {
taskCommunicatorManager.registerTaskAttempt(amContainerTask, containerId, 0);
taskCommunicatorManager.registerTaskAttempt(amContainerTask, containerId, 0, null);
}

private TaskSpec createTaskSpec() {
Expand Down
Loading