Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,7 @@ private static void checkSpecialResources(
* Supporting 'memory', 'memory-mb', 'vcores' also as invalid resource
* names, in addition to 'MEMORY' for historical reasons
*/
String[] keys = { "memory", ResourceInformation.MEMORY_URI,
ResourceInformation.VCORES_URI };
String[] keys = {"memory", ResourceInformation.MEMORY_URI, ResourceInformation.VCORES_URI};
for(String key : keys) {
if (resourceInformationMap.containsKey(key)) {
LOG.warn("Attempt to define resource '" + key + "', but it is not allowed.");
Expand Down Expand Up @@ -234,7 +233,8 @@ static void validateNameOfResourceNameAndThrowException(String resourceName)
}

/**
* Get maximum allocation from config, *THIS WILL NOT UPDATE INTERNAL DATA*
* Get maximum allocation from config, *THIS WILL NOT UPDATE INTERNAL DATA.
*
* @param conf config
* @return maximum allocation
*/
Expand Down Expand Up @@ -379,7 +379,7 @@ public static Map<String, Integer> getResourceTypeIndex() {

/**
* Get the resource types to be supported by the system.
* @return A map of the resource name to a ResouceInformation object
* @return A map of the resource name to a ResourceInformation object
* which contains details such as the unit.
*/
public static Map<String, ResourceInformation> getResourceTypes() {
Expand Down Expand Up @@ -473,10 +473,10 @@ private static void addResourcesFileToConf(String resourceFile,
LOG.debug("Found {}, adding to configuration", resourceFile);
conf.addResource(ris);
} catch (FileNotFoundException fe) {
LOG.info("Unable to find '" + resourceFile + "'.");
LOG.info("Unable to find '{}'.", resourceFile);
} catch (IOException | YarnException ex) {
LOG.error("Exception trying to read resource types configuration '"
+ resourceFile + "'.", ex);
LOG.error("Exception trying to read resource types configuration '{}'.",
resourceFile, ex);
throw new YarnRuntimeException(ex);
}
}
Expand Down Expand Up @@ -668,7 +668,7 @@ public static List<ResourceTypeInfo> getResourcesTypeInfo() {
/**
* Reinitialize all resource types from external source (in case of client,
* server will send the updated list and local resourceutils cache will be
* updated as per server's list of resources)
* updated as per server's list of resources).
*
* @param resourceTypeInfo
* List of resource types
Expand Down Expand Up @@ -857,6 +857,7 @@ private static Map<String, Long> parseResourcesString(String resourcesStr) {
units = "Gi";
} else if (units.isEmpty()) {
// do nothing;
LOG.debug("units is empty.");
} else {
throw new IllegalArgumentException("Acceptable units are M/G or empty");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ public final class RouterMetrics {
private MutableGaugeInt numGetQueueInfoFailedRetrieved;
@Metric("# of moveApplicationAcrossQueues failed to be retrieved")
private MutableGaugeInt numMoveApplicationAcrossQueuesFailedRetrieved;
@Metric("# of getResourceProfiles failed to be retrieved")
private MutableGaugeInt numGetResourceProfilesFailedRetrieved;
@Metric("# of getResourceProfile failed to be retrieved")
private MutableGaugeInt numGetResourceProfileFailedRetrieved;

// Aggregate metrics are shared, and don't have to be looked up per call
@Metric("Total number of successful Submitted apps and latency(ms)")
Expand Down Expand Up @@ -138,6 +142,11 @@ public final class RouterMetrics {
private MutableRate totalSucceededGetQueueInfoRetrieved;
@Metric("Total number of successful Retrieved moveApplicationAcrossQueues and latency(ms)")
private MutableRate totalSucceededMoveApplicationAcrossQueuesRetrieved;
@Metric("Total number of successful Retrieved getResourceProfiles and latency(ms)")
private MutableRate totalSucceededGetResourceProfilesRetrieved;

@Metric("Total number of successful Retrieved getResourceProfile and latency(ms)")
private MutableRate totalSucceededGetResourceProfileRetrieved;

/**
* Provide quantile counters for all latencies.
Expand Down Expand Up @@ -165,6 +174,8 @@ public final class RouterMetrics {
private MutableQuantiles signalToContainerLatency;
private MutableQuantiles getQueueInfoLatency;
private MutableQuantiles moveApplicationAcrossQueuesLatency;
private MutableQuantiles getResourceProfilesLatency;
private MutableQuantiles getResourceProfileLatency;

private static volatile RouterMetrics instance = null;
private static MetricsRegistry registry;
Expand Down Expand Up @@ -255,6 +266,14 @@ private RouterMetrics() {
moveApplicationAcrossQueuesLatency =
registry.newQuantiles("moveApplicationAcrossQueuesLatency",
"latency of move application across queues timeouts", "ops", "latency", 10);

getResourceProfilesLatency =
registry.newQuantiles("getResourceProfilesLatency",
"latency of get resource profiles timeouts", "ops", "latency", 10);

getResourceProfileLatency =
registry.newQuantiles("getResourceProfileLatency",
"latency of get resource profile timeouts", "ops", "latency", 10);
}

public static RouterMetrics getMetrics() {
Expand Down Expand Up @@ -391,6 +410,16 @@ public long getNumSucceededMoveApplicationAcrossQueuesRetrieved() {
return totalSucceededMoveApplicationAcrossQueuesRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public long getNumSucceededGetResourceProfilesRetrieved() {
return totalSucceededGetResourceProfilesRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public long getNumSucceededGetResourceProfileRetrieved() {
return totalSucceededGetResourceProfileRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public double getLatencySucceededAppsCreated() {
return totalSucceededAppsCreated.lastStat().mean();
Expand Down Expand Up @@ -506,6 +535,16 @@ public double getLatencySucceededMoveApplicationAcrossQueuesRetrieved() {
return totalSucceededMoveApplicationAcrossQueuesRetrieved.lastStat().mean();
}

@VisibleForTesting
public double getLatencySucceededGetResourceProfilesRetrieved() {
return totalSucceededGetResourceProfilesRetrieved.lastStat().mean();
}

@VisibleForTesting
public double getLatencySucceededGetResourceProfileRetrieved() {
return totalSucceededGetResourceProfileRetrieved.lastStat().mean();
}

@VisibleForTesting
public int getAppsFailedCreated() {
return numAppsFailedCreated.value();
Expand Down Expand Up @@ -619,6 +658,14 @@ public int getMoveApplicationAcrossQueuesFailedRetrieved() {
return numMoveApplicationAcrossQueuesFailedRetrieved.value();
}

public int getResourceProfilesFailedRetrieved() {
return numGetResourceProfilesFailedRetrieved.value();
}

public int getResourceProfileFailedRetrieved() {
return numGetResourceProfileFailedRetrieved.value();
}

public void succeededAppsCreated(long duration) {
totalSucceededAppsCreated.add(duration);
getNewApplicationLatency.add(duration);
Expand Down Expand Up @@ -734,6 +781,16 @@ public void succeededMoveApplicationAcrossQueuesRetrieved(long duration) {
moveApplicationAcrossQueuesLatency.add(duration);
}

public void succeededGetResourceProfilesRetrieved(long duration) {
totalSucceededGetResourceProfilesRetrieved.add(duration);
getResourceProfilesLatency.add(duration);
}

public void succeededGetResourceProfileRetrieved(long duration) {
totalSucceededGetResourceProfileRetrieved.add(duration);
getResourceProfileLatency.add(duration);
}

public void incrAppsFailedCreated() {
numAppsFailedCreated.incr();
}
Expand Down Expand Up @@ -825,4 +882,12 @@ public void incrGetQueueInfoFailedRetrieved() {
public void incrMoveApplicationAcrossQueuesFailedRetrieved() {
numMoveApplicationAcrossQueuesFailedRetrieved.incr();
}

public void incrGetResourceProfilesFailedRetrieved() {
numGetResourceProfilesFailedRetrieved.incr();
}

public void incrGetResourceProfileFailedRetrieved() {
numGetResourceProfileFailedRetrieved.incr();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1448,13 +1448,50 @@ public UpdateApplicationTimeoutsResponse updateApplicationTimeouts(
@Override
public GetAllResourceProfilesResponse getResourceProfiles(
GetAllResourceProfilesRequest request) throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
if (request == null) {
routerMetrics.incrGetResourceProfilesFailedRetrieved();
RouterServerUtil.logAndThrowException("Missing getResourceProfiles request.", null);
}
long startTime = clock.getTime();
ClientMethod remoteMethod = new ClientMethod("getResourceProfiles",
new Class[] {GetAllResourceProfilesRequest.class}, new Object[] {request});
Collection<GetAllResourceProfilesResponse> resourceProfiles = null;
try {
resourceProfiles = invokeAppClientProtocolMethod(true, remoteMethod,
GetAllResourceProfilesResponse.class);
} catch (Exception ex) {
routerMetrics.incrGetResourceProfilesFailedRetrieved();
RouterServerUtil.logAndThrowException("Unable to get resource profiles due to exception.",
ex);
}
long stopTime = clock.getTime();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't we move this and the setting inside of the try?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

code show as below:

    long startTime = clock.getTime();
    ClientMethod remoteMethod = new ClientMethod("getResourceProfile",
        new Class[] {GetResourceProfileRequest.class}, new Object[] {request});
    Collection<GetResourceProfileResponse> resourceProfile = null;
    try {
      resourceProfile = invokeAppClientProtocolMethod(true, remoteMethod,
          GetResourceProfileResponse.class);
    } catch (Exception ex) {
      routerMetrics.incrGetResourceProfileFailedRetrieved();
      RouterServerUtil.logAndThrowException("Unable to get resource profile due to exception.",
          ex);
    }
    long stopTime = clock.getTime();

startTime - stopTime represents the execution time of the code fragment

I think the current way should be a little better, can this part stay as it is?

routerMetrics.succeededGetResourceProfilesRetrieved(stopTime - startTime);
return RouterYarnClientUtils.mergeClusterResourceProfilesResponse(resourceProfiles);
}

@Override
public GetResourceProfileResponse getResourceProfile(
GetResourceProfileRequest request) throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
if (request == null || request.getProfileName() == null) {
routerMetrics.incrGetResourceProfileFailedRetrieved();
RouterServerUtil.logAndThrowException("Missing getResourceProfile request or profileName.",
null);
}
long startTime = clock.getTime();
ClientMethod remoteMethod = new ClientMethod("getResourceProfile",
new Class[] {GetResourceProfileRequest.class}, new Object[] {request});
Collection<GetResourceProfileResponse> resourceProfile = null;
try {
resourceProfile = invokeAppClientProtocolMethod(true, remoteMethod,
GetResourceProfileResponse.class);
} catch (Exception ex) {
routerMetrics.incrGetResourceProfileFailedRetrieved();
RouterServerUtil.logAndThrowException("Unable to get resource profile due to exception.",
ex);
}
long stopTime = clock.getTime();
routerMetrics.succeededGetResourceProfileRetrieved(stopTime - startTime);
return RouterYarnClientUtils.mergeClusterResourceProfileResponse(resourceProfile);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceTypeInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceProfilesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetResourceProfileResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
Expand All @@ -46,8 +48,10 @@
import org.apache.hadoop.yarn.api.records.ReservationAllocationState;
import org.apache.hadoop.yarn.api.records.ResourceTypeInfo;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;

/**
Expand Down Expand Up @@ -417,5 +421,52 @@ public static GetQueueInfoResponse mergeQueues(
queueResponse.setQueueInfo(queueInfo);
return queueResponse;
}

/**
* Merges a list of GetAllResourceProfilesResponse.
*
* @param responses a list of GetAllResourceProfilesResponse to merge.
* @return the merged GetAllResourceProfilesResponse.
*/
public static GetAllResourceProfilesResponse mergeClusterResourceProfilesResponse(
Collection<GetAllResourceProfilesResponse> responses) {
GetAllResourceProfilesResponse profilesResponse =
Records.newRecord(GetAllResourceProfilesResponse.class);
Map<String, Resource> profilesMap = new HashMap<>();
for (GetAllResourceProfilesResponse response : responses) {
if (response != null && response.getResourceProfiles() != null) {
for (Map.Entry<String, Resource> entry : response.getResourceProfiles().entrySet()) {
String key = entry.getKey();
Resource r1 = profilesMap.getOrDefault(key, null);
Resource r2 = entry.getValue();
Resource rAdd = r1 == null ? r2 : Resources.add(r1, r2);
profilesMap.put(key, rAdd);
}
}
}
profilesResponse.setResourceProfiles(profilesMap);
return profilesResponse;
}

/**
* Merges a list of GetResourceProfileResponse.
*
* @param responses a list of GetResourceProfileResponse to merge.
* @return the merged GetResourceProfileResponse.
*/
public static GetResourceProfileResponse mergeClusterResourceProfileResponse(
Collection<GetResourceProfileResponse> responses) {
GetResourceProfileResponse profileResponse =
Records.newRecord(GetResourceProfileResponse.class);
Resource resource = Resource.newInstance(0, 0);
for (GetResourceProfileResponse response : responses) {
if (response != null && response.getResource() != null) {
Resource responseResource = response.getResource();
resource = Resources.add(resource, responseResource);
}
}
profileResponse.setResource(resource);
return profileResponse;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,16 @@ public void moveApplicationAcrossQueuesFailed() {
LOG.info("Mocked: failed moveApplicationAcrossQueuesFailed call");
metrics.incrMoveApplicationAcrossQueuesFailedRetrieved();
}

public void getResourceProfilesFailed() {
LOG.info("Mocked: failed getResourceProfilesFailed call");
metrics.incrGetResourceProfilesFailedRetrieved();
}

public void getResourceProfileFailed() {
LOG.info("Mocked: failed getResourceProfileFailed call");
metrics.incrGetResourceProfileFailedRetrieved();
}
}

// Records successes for all calls
Expand Down Expand Up @@ -553,6 +563,16 @@ public void moveApplicationAcrossQueuesRetrieved(long duration) {
LOG.info("Mocked: successful moveApplicationAcrossQueues call with duration {}", duration);
metrics.succeededMoveApplicationAcrossQueuesRetrieved(duration);
}

public void getResourceProfilesRetrieved(long duration) {
LOG.info("Mocked: successful getResourceProfiles call with duration {}", duration);
metrics.succeededGetResourceProfilesRetrieved(duration);
}

public void getResourceProfileRetrieved(long duration) {
LOG.info("Mocked: successful getResourceProfile call with duration {}", duration);
metrics.succeededGetResourceProfileRetrieved(duration);
}
}

@Test
Expand Down Expand Up @@ -905,4 +925,49 @@ public void testMoveApplicationAcrossQueuesRetrievedFailed() {
metrics.getMoveApplicationAcrossQueuesFailedRetrieved());
}

@Test
public void testSucceededGetResourceProfilesRetrieved() {
long totalGoodBefore = metrics.getNumSucceededGetResourceProfilesRetrieved();
goodSubCluster.getResourceProfilesRetrieved(150);
Assert.assertEquals(totalGoodBefore + 1,
metrics.getNumSucceededGetResourceProfilesRetrieved());
Assert.assertEquals(150,
metrics.getLatencySucceededGetResourceProfilesRetrieved(), ASSERT_DOUBLE_DELTA);
goodSubCluster.getResourceProfilesRetrieved(300);
Assert.assertEquals(totalGoodBefore + 2,
metrics.getNumSucceededGetResourceProfilesRetrieved());
Assert.assertEquals(225,
metrics.getLatencySucceededGetResourceProfilesRetrieved(), ASSERT_DOUBLE_DELTA);
}

@Test
public void testGetResourceProfilesRetrievedFailed() {
long totalBadBefore = metrics.getResourceProfilesFailedRetrieved();
badSubCluster.getResourceProfilesFailed();
Assert.assertEquals(totalBadBefore + 1,
metrics.getResourceProfilesFailedRetrieved());
}

@Test
public void testSucceededGetResourceProfileRetrieved() {
long totalGoodBefore = metrics.getNumSucceededGetResourceProfileRetrieved();
goodSubCluster.getResourceProfileRetrieved(150);
Assert.assertEquals(totalGoodBefore + 1,
metrics.getNumSucceededGetResourceProfileRetrieved());
Assert.assertEquals(150,
metrics.getLatencySucceededGetResourceProfileRetrieved(), ASSERT_DOUBLE_DELTA);
goodSubCluster.getResourceProfileRetrieved(300);
Assert.assertEquals(totalGoodBefore + 2,
metrics.getNumSucceededGetResourceProfileRetrieved());
Assert.assertEquals(225,
metrics.getLatencySucceededGetResourceProfileRetrieved(), ASSERT_DOUBLE_DELTA);
}

@Test
public void testGetResourceProfileRetrievedFailed() {
long totalBadBefore = metrics.getResourceProfileFailedRetrieved();
badSubCluster.getResourceProfileFailed();
Assert.assertEquals(totalBadBefore + 1,
metrics.getResourceProfileFailedRetrieved());
}
}
Loading