Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1187,10 +1187,7 @@ public ContainerState transition(ContainerImpl container,
if (container.recoveredStatus == RecoveredContainerStatus.COMPLETED) {
container.sendFinishedEvents();
return ContainerState.DONE;
} else if (container.recoveredStatus == RecoveredContainerStatus.QUEUED) {
return ContainerState.SCHEDULED;
} else if (container.recoveredAsKilled &&
container.recoveredStatus == RecoveredContainerStatus.REQUESTED) {
} else if (isContainerRecoveredAsKilled(container)) {
// container was killed but never launched
container.metrics.killedContainer();
NMAuditLogger.logSuccess(container.user,
Expand All @@ -1201,6 +1198,8 @@ public ContainerState transition(ContainerImpl container,
container.containerTokenIdentifier.getResource());
container.sendFinishedEvents();
return ContainerState.DONE;
} else if (container.recoveredStatus == RecoveredContainerStatus.QUEUED) {
return ContainerState.SCHEDULED;
}

final ContainerLaunchContext ctxt = container.launchContext;
Expand Down Expand Up @@ -1264,6 +1263,16 @@ public ContainerState transition(ContainerImpl container,
return ContainerState.LOCALIZATION_FAILED;
}
}

static boolean isContainerRecoveredAsKilled(ContainerImpl container) {
if (!container.recoveredAsKilled) {
return false;
}
// container was killed but never launched
RecoveredContainerStatus containerStatus = container.recoveredStatus;
return containerStatus == RecoveredContainerStatus.REQUESTED
|| containerStatus == RecoveredContainerStatus.QUEUED;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
Expand Down Expand Up @@ -682,7 +683,49 @@ public void testContainerCleanupOnShutdown() throws Exception {
verify(cm, never()).handle(isA(CMgrCompletedAppsEvent.class));
}

private void commonLaunchContainer(ApplicationId appId, ContainerId cid,
@Test
public void testKilledContainerInQueuedStateRecovery() throws Exception {
conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, true);
NMStateStoreService stateStore = new NMMemoryStateStoreService();
stateStore.init(conf);
stateStore.start();
context = createContext(conf, stateStore);
ContainerManagerImpl cm = createContainerManager(context, delSrvc);
((NMContext) context).setContainerManager(cm);
cm.init(conf);
cm.start();

// add an application by starting a container
ApplicationId appId = ApplicationId.newInstance(0, 0);
ApplicationAttemptId attemptId =
ApplicationAttemptId.newInstance(appId, 1);
ContainerId cid = ContainerId.newContainerId(attemptId, 1);
createStartContainerRequest(appId, cid, cm);

Application app = context.getApplications().get(appId);
assertEquals(1, context.getApplications().size());
assertNotNull(app);

stateStore.storeContainerKilled(cid);
// restart and verify container scheduler has recovered correctly
cm.stop();
context = createContext(conf, stateStore);
cm = createContainerManager(context, delSrvc);
((NMContext) context).setContainerManager(cm);
cm.init(conf);
cm.start();
assertEquals(1, context.getApplications().size());

ConcurrentMap<ContainerId, Container> containers = context.getContainers();
Container c = containers.get(cid);
assertEquals(ContainerState.DONE, c.getContainerState());
app = context.getApplications().get(appId);
assertNotNull(app);
cm.stop();
}

private void createStartContainerRequest(ApplicationId appId, ContainerId cid,
ContainerManagerImpl cm) throws Exception {
Map<String, String> containerEnv = new HashMap<>();
setFlowContext(containerEnv, "app_name1", appId);
Expand Down Expand Up @@ -727,6 +770,11 @@ private void commonLaunchContainer(ApplicationId appId, ContainerId cid,
context, cm, cid, clc, null, ContainerType.TASK);
assertTrue(startResponse.getFailedRequests().isEmpty());
assertEquals(1, context.getApplications().size());
}

private void commonLaunchContainer(ApplicationId appId, ContainerId cid,
ContainerManagerImpl cm) throws Exception {
createStartContainerRequest(appId, cid, cm);
// make sure the container reaches RUNNING state
waitForNMContainerState(cm, cid,
org.apache.hadoop.yarn.server.nodemanager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ public synchronized void storeContainer(ContainerId containerId,
int version, long startTime, StartContainerRequest startRequest) {
RecoveredContainerState rcs = new RecoveredContainerState(containerId);
rcs.startRequest = startRequest;
rcs.status = RecoveredContainerStatus.REQUESTED;
rcs.version = version;
try {
ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
Expand Down