Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -379,27 +379,38 @@ class EventProcessor implements Runnable {

@Override
public void run() {
LOG.info("Processing the event " + event.toString());
LOG.info("Processing the event {}", event);

// Load ContainerManager tokens before creating a connection.
// TODO: Do it only once per NodeManager.
ContainerId containerID = event.getContainerID();

Container c = getContainer(event);
switch(event.getType()) {

case CONTAINER_REMOTE_LAUNCH:
ContainerRemoteLaunchEvent launchEvent
= (ContainerRemoteLaunchEvent) event;
c.launch(launchEvent);
getContainer(event).launch(launchEvent);
break;

case CONTAINER_REMOTE_CLEANUP:
c.kill(event.getDumpContainerThreads());
// If the container failed to launch earlier (due to dead node for example),
// it has been marked as FAILED and removed from containers during
// CONTAINER_REMOTE_LAUNCH event handling.
// Skip kill() such container during CONTAINER_REMOTE_CLEANUP as
// it is not necessary and could cost 15 minutes delay if the node is dead.
if (!containers.containsKey(containerID)) {
LOG.info("Skip cleanup of already-removed container {}", containerID);
// send killed event to task attempt regardless like in kill().
context.getEventHandler().handle(new TaskAttemptEvent(event.getTaskAttemptID(),
TaskAttemptEventType.TA_CONTAINER_CLEANED));
return;
}
getContainer(event).kill(event.getDumpContainerThreads());
break;

case CONTAINER_COMPLETED:
c.done();
getContainer(event).done();
break;

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,14 +209,11 @@ public void testHandle() throws Exception {
ut.waitForPoolToIdle();

verify(mockCM).startContainers(any(StartContainersRequest.class));

LOG.info("inserting cleanup event");
ContainerLauncherEvent mockCleanupEvent =
mock(ContainerLauncherEvent.class);
when(mockCleanupEvent.getType())
.thenReturn(EventType.CONTAINER_REMOTE_CLEANUP);
when(mockCleanupEvent.getContainerID())
.thenReturn(contId);
ContainerLauncherEvent mockCleanupEvent = mock(ContainerLauncherEvent.class);
when(mockCleanupEvent.getType()).thenReturn(EventType.CONTAINER_REMOTE_CLEANUP);
when(mockCleanupEvent.getContainerID()).thenReturn(contId);
when(mockCleanupEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
when(mockCleanupEvent.getContainerMgrAddress()).thenReturn(cmAddress);
ut.handle(mockCleanupEvent);
Expand Down Expand Up @@ -283,8 +280,21 @@ public void testOutOfOrder() throws Exception {
ut.handle(mockLaunchEvent);

ut.waitForPoolToIdle();

verify(mockCM, never()).startContainers(any(StartContainersRequest.class));

verify(mockCM).startContainers(any(StartContainersRequest.class));

LOG.info("inserting cleanup event");
ContainerLauncherEvent mockCleanupEvent2 = mock(ContainerLauncherEvent.class);
when(mockCleanupEvent2.getType()).thenReturn(EventType.CONTAINER_REMOTE_CLEANUP);
when(mockCleanupEvent2.getContainerID()).thenReturn(contId);
when(mockCleanupEvent2.getTaskAttemptID()).thenReturn(taskAttemptId);
when(mockCleanupEvent2.getContainerMgrAddress()).thenReturn(cmAddress);
ut.handle(mockCleanupEvent2);

ut.waitForPoolToIdle();

// Verifies stopContainers is called on existing container
verify(mockCM).stopContainers(any(StopContainersRequest.class));
} finally {
ut.stop();
}
Expand Down