Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ public final class RouterMetrics {
private MutableGaugeInt numUpdateAppPriorityFailedRetrieved;
@Metric("# of updateApplicationPriority failed to be retrieved")
private MutableGaugeInt numUpdateAppTimeoutsFailedRetrieved;
@Metric("# of signalToContainer failed to be retrieved")
private MutableGaugeInt numSignalToContainerFailedRetrieved;

// Aggregate metrics are shared, and don't have to be looked up per call
@Metric("Total number of successful Submitted apps and latency(ms)")
Expand Down Expand Up @@ -126,6 +128,8 @@ public final class RouterMetrics {
private MutableRate totalSucceededUpdateAppPriorityRetrieved;
@Metric("Total number of successful Retrieved updateApplicationTimeouts and latency(ms)")
private MutableRate totalSucceededUpdateAppTimeoutsRetrieved;
@Metric("Total number of successful Retrieved signalToContainer and latency(ms)")
private MutableRate totalSucceededSignalToContainerRetrieved;

/**
* Provide quantile counters for all latencies.
Expand All @@ -150,6 +154,7 @@ public final class RouterMetrics {
private MutableQuantiles failAppAttemptLatency;
private MutableQuantiles updateAppPriorityLatency;
private MutableQuantiles updateAppTimeoutsLatency;
private MutableQuantiles signalToContainerLatency;

private static volatile RouterMetrics instance = null;
private static MetricsRegistry registry;
Expand Down Expand Up @@ -228,6 +233,10 @@ private RouterMetrics() {
updateAppTimeoutsLatency =
registry.newQuantiles("updateApplicationTimeoutsLatency",
"latency of update application timeouts", "ops", "latency", 10);

signalToContainerLatency =
registry.newQuantiles("signalToContainerLatency",
"latency of signal to container timeouts", "ops", "latency", 10);
}

public static RouterMetrics getMetrics() {
Expand Down Expand Up @@ -349,6 +358,11 @@ public long getNumSucceededUpdateAppTimeoutsRetrieved() {
return totalSucceededUpdateAppTimeoutsRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public long getNumSucceededSignalToContainerRetrieved() {
return totalSucceededSignalToContainerRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public double getLatencySucceededAppsCreated() {
return totalSucceededAppsCreated.lastStat().mean();
Expand Down Expand Up @@ -449,6 +463,11 @@ public double getLatencySucceededUpdateAppTimeoutsRetrieved() {
return totalSucceededUpdateAppTimeoutsRetrieved.lastStat().mean();
}

@VisibleForTesting
public double getLatencySucceededSignalToContainerRetrieved() {
return totalSucceededSignalToContainerRetrieved.lastStat().mean();
}

@VisibleForTesting
public int getAppsFailedCreated() {
return numAppsFailedCreated.value();
Expand Down Expand Up @@ -549,6 +568,11 @@ public int getUpdateApplicationTimeoutsFailedRetrieved() {
return numUpdateAppTimeoutsFailedRetrieved.value();
}

@VisibleForTesting
public int getSignalToContainerFailedRetrieved() {
return numSignalToContainerFailedRetrieved.value();
}

public void succeededAppsCreated(long duration) {
totalSucceededAppsCreated.add(duration);
getNewApplicationLatency.add(duration);
Expand Down Expand Up @@ -649,6 +673,11 @@ public void succeededUpdateAppTimeoutsRetrieved(long duration) {
updateAppTimeoutsLatency.add(duration);
}

public void succeededSignalToContainerRetrieved(long duration) {
totalSucceededSignalToContainerRetrieved.add(duration);
signalToContainerLatency.add(duration);
}

public void incrAppsFailedCreated() {
numAppsFailedCreated.incr();
}
Expand Down Expand Up @@ -728,4 +757,8 @@ public void incrUpdateAppPriorityFailedRetrieved() {
public void incrUpdateApplicationTimeoutsRetrieved() {
numUpdateAppTimeoutsFailedRetrieved.incr();
}

public void incrSignalToContainerFailedRetrieved() {
numSignalToContainerFailedRetrieved.incr();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1304,7 +1304,43 @@ public UpdateApplicationPriorityResponse updateApplicationPriority(
@Override
public SignalContainerResponse signalToContainer(
SignalContainerRequest request) throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
if (request == null || request.getContainerId() == null
|| request.getCommand() == null) {
routerMetrics.incrSignalToContainerFailedRetrieved();
RouterServerUtil.logAndThrowException(
"Missing signalToContainer request or containerId " +
"or command information.", null);
}

long startTime = clock.getTime();
SubClusterId subClusterId = null;
ApplicationId applicationId =
request.getContainerId().getApplicationAttemptId().getApplicationId();
try {
subClusterId = getApplicationHomeSubCluster(applicationId);
} catch (YarnException ex) {
routerMetrics.incrSignalToContainerFailedRetrieved();
RouterServerUtil.logAndThrowException("Application " + applicationId +
" does not exist in FederationStateStore.", ex);
}

ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId);
SignalContainerResponse response = null;
try {
response = clientRMProxy.signalToContainer(request);
} catch (Exception ex) {
RouterServerUtil.logAndThrowException("Unable to signal to container for " +
applicationId + " from SubCluster " + subClusterId.getId(), ex);
}

if (response == null) {
LOG.error("No response when signal to container of " +
"the applicationId {} to SubCluster {}.", applicationId, subClusterId.getId());
}

long stopTime = clock.getTime();
routerMetrics.succeededSignalToContainerRetrieved(stopTime - startTime);
return response;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,11 @@ public void getUpdateApplicationTimeouts() {
LOG.info("Mocked: failed updateApplicationTimeouts call");
metrics.incrUpdateApplicationTimeoutsRetrieved();
}

public void getSignalContainer() {
LOG.info("Mocked: failed signalContainer call");
metrics.incrSignalToContainerFailedRetrieved();
}
}

// Records successes for all calls
Expand Down Expand Up @@ -523,6 +528,11 @@ public void getUpdateApplicationTimeouts(long duration) {
LOG.info("Mocked: successful updateApplicationTimeouts call with duration {}", duration);
metrics.succeededUpdateAppTimeoutsRetrieved(duration);
}

public void getSignalToContainerTimeouts(long duration) {
LOG.info("Mocked: successful signalToContainer call with duration {}", duration);
metrics.succeededSignalToContainerRetrieved(duration);
}
}

@Test
Expand Down Expand Up @@ -806,4 +816,27 @@ public void testUpdateAppTimeoutsFailed() {
metrics.getUpdateApplicationTimeoutsFailedRetrieved());
}

@Test
public void testSucceededSignalToContainerRetrieved() {
long totalGoodBefore = metrics.getNumSucceededSignalToContainerRetrieved();
goodSubCluster.getSignalToContainerTimeouts(150);
Assert.assertEquals(totalGoodBefore + 1,
metrics.getNumSucceededSignalToContainerRetrieved());
Assert.assertEquals(150,
metrics.getLatencySucceededSignalToContainerRetrieved(), ASSERT_DOUBLE_DELTA);
goodSubCluster.getSignalToContainerTimeouts(300);
Assert.assertEquals(totalGoodBefore + 2,
metrics.getNumSucceededSignalToContainerRetrieved());
Assert.assertEquals(225,
metrics.getLatencySucceededSignalToContainerRetrieved(), ASSERT_DOUBLE_DELTA);
}

@Test
public void testSignalToContainerFailed() {
long totalBadBefore = metrics.getSignalToContainerFailedRetrieved();
badSubCluster.getSignalContainer();
Assert.assertEquals(totalBadBefore + 1,
metrics.getSignalToContainerFailedRetrieved());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
Expand All @@ -83,6 +85,7 @@
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
Expand All @@ -91,6 +94,7 @@
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
Expand Down Expand Up @@ -1056,4 +1060,45 @@ public void testUpdateApplicationTimeouts() throws Exception {
Assert.assertNotNull(timeoutsResponse);
Assert.assertEquals(appTimeout, responseTimeOut);
}

@Test
public void testSignalContainer() throws Exception {
LOG.info("Test FederationClientInterceptor : Signal Container request.");

// null request
LambdaTestUtils.intercept(YarnException.class, "Missing signalToContainer request " +
"or containerId or command information.", () -> interceptor.signalToContainer(null));

// normal request
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);

// Submit the application
SubmitApplicationResponse response = interceptor.submitApplication(request);
Assert.assertNotNull(response);
Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));

SubClusterId subClusterId = interceptor.getApplicationHomeSubCluster(appId);
Assert.assertNotNull(subClusterId);

MockRM mockRM = interceptor.getMockRMs().get(subClusterId);
mockRM.waitForState(appId, RMAppState.ACCEPTED);
RMApp rmApp = mockRM.getRMContext().getRMApps().get(appId);
mockRM.waitForState(rmApp.getCurrentAppAttempt().getAppAttemptId(),
RMAppAttemptState.SCHEDULED);
MockNM nm = interceptor.getMockNMs().get(subClusterId);
nm.nodeHeartbeat(true);
mockRM.waitForState(rmApp.getCurrentAppAttempt(), RMAppAttemptState.ALLOCATED);
mockRM.sendAMLaunched(rmApp.getCurrentAppAttempt().getAppAttemptId());

ContainerId containerId = rmApp.getCurrentAppAttempt().getMasterContainer().getId();

SignalContainerRequest signalContainerRequest =
SignalContainerRequest.newInstance(containerId, SignalContainerCommand.GRACEFUL_SHUTDOWN);
SignalContainerResponse signalContainerResponse =
interceptor.signalToContainer(signalContainerRequest);

Assert.assertNotNull(signalContainerResponse);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
Expand All @@ -51,6 +52,9 @@ public class TestableFederationClientInterceptor
private ConcurrentHashMap<SubClusterId, MockRM> mockRMs =
new ConcurrentHashMap<>();

private ConcurrentHashMap<SubClusterId, MockNM> mockNMs =
new ConcurrentHashMap<>();

private List<SubClusterId> badSubCluster = new ArrayList<SubClusterId>();

@Override
Expand All @@ -71,7 +75,8 @@ protected ApplicationClientProtocol getClientRMProxyForSubCluster(
mockRM.init(super.getConf());
mockRM.start();
try {
mockRM.registerNode("h1:1234", 1024);
MockNM nm = mockRM.registerNode("127.0.0.1:1234", 8*1024, 4);
mockNMs.put(subClusterId, nm);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Expand Down Expand Up @@ -118,4 +123,8 @@ protected void registerBadSubCluster(SubClusterId badSC) throws IOException {
public ConcurrentHashMap<SubClusterId, MockRM> getMockRMs() {
return mockRMs;
}

public ConcurrentHashMap<SubClusterId, MockNM> getMockNMs() {
return mockNMs;
}
}