Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public final class RouterMetrics {
private MutableGaugeInt numGetContainerReportFailedRetrieved;
@Metric("# of getContainers failed to be retrieved")
private MutableGaugeInt numGetContainersFailedRetrieved;
@Metric("# of getContainers failed to be retrieved")
@Metric("# of listReservations failed to be retrieved")
private MutableGaugeInt numListReservationsFailedRetrieved;
@Metric("# of getResourceTypeInfo failed to be retrieved")
private MutableGaugeInt numGetResourceTypeInfo;
Expand Down Expand Up @@ -105,6 +105,8 @@ public final class RouterMetrics {
private MutableGaugeInt numUpdateReservationFailedRetrieved;
@Metric("# of deleteReservation failed to be retrieved")
private MutableGaugeInt numDeleteReservationFailedRetrieved;
@Metric("# of listReservation failed to be retrieved")
private MutableGaugeInt numListReservationFailedRetrieved;

// Aggregate metrics are shared, and don't have to be looked up per call
@Metric("Total number of successful Submitted apps and latency(ms)")
Expand Down Expand Up @@ -171,6 +173,8 @@ public final class RouterMetrics {
private MutableRate totalSucceededUpdateReservationRetrieved;
@Metric("Total number of successful Retrieved DeleteReservation and latency(ms)")
private MutableRate totalSucceededDeleteReservationRetrieved;
@Metric("Total number of successful Retrieved ListReservation and latency(ms)")
private MutableRate totalSucceededListReservationRetrieved;

/**
* Provide quantile counters for all latencies.
Expand Down Expand Up @@ -207,6 +211,7 @@ public final class RouterMetrics {
private MutableQuantiles submitReservationLatency;
private MutableQuantiles updateReservationLatency;
private MutableQuantiles deleteReservationLatency;
private MutableQuantiles listReservationLatency;

private static volatile RouterMetrics instance = null;
private static MetricsRegistry registry;
Expand Down Expand Up @@ -333,6 +338,10 @@ private RouterMetrics() {
deleteReservationLatency =
registry.newQuantiles("deleteReservationLatency",
"latency of delete reservation timeouts", "ops", "latency", 10);

listReservationLatency =
registry.newQuantiles("listReservationLatency",
"latency of list reservation timeouts", "ops", "latency", 10);
}

public static RouterMetrics getMetrics() {
Expand Down Expand Up @@ -514,6 +523,11 @@ public long getNumSucceededDeleteReservationRetrieved() {
return totalSucceededDeleteReservationRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public long getNumSucceededListReservationRetrieved() {
return totalSucceededListReservationRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public double getLatencySucceededAppsCreated() {
return totalSucceededAppsCreated.lastStat().mean();
Expand Down Expand Up @@ -674,6 +688,11 @@ public double getLatencySucceededDeleteReservationRetrieved() {
return totalSucceededDeleteReservationRetrieved.lastStat().mean();
}

@VisibleForTesting
public double getLatencySucceededListReservationRetrieved() {
return totalSucceededListReservationRetrieved.lastStat().mean();
}

@VisibleForTesting
public int getAppsFailedCreated() {
return numAppsFailedCreated.value();
Expand Down Expand Up @@ -823,6 +842,10 @@ public int getDeleteReservationFailedRetrieved() {
return numDeleteReservationFailedRetrieved.value();
}

public int getListReservationFailedRetrieved() {
return numListReservationFailedRetrieved.value();
}

public void succeededAppsCreated(long duration) {
totalSucceededAppsCreated.add(duration);
getNewApplicationLatency.add(duration);
Expand Down Expand Up @@ -983,6 +1006,11 @@ public void succeededDeleteReservationRetrieved(long duration) {
deleteReservationLatency.add(duration);
}

public void succeededListReservationRetrieved(long duration) {
totalSucceededListReservationRetrieved.add(duration);
listReservationLatency.add(duration);
}

public void incrAppsFailedCreated() {
numAppsFailedCreated.incr();
}
Expand Down Expand Up @@ -1110,4 +1138,8 @@ public void incrUpdateReservationFailedRetrieved() {
public void incrDeleteReservationFailedRetrieved() {
numDeleteReservationFailedRetrieved.incr();
}

public void incrListReservationFailedRetrieved() {
numListReservationFailedRetrieved.incr();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
Expand Down Expand Up @@ -1483,7 +1484,34 @@ public Response deleteReservation(ReservationDeleteRequestInfo resContext,
public Response listReservation(String queue, String reservationId,
long startTime, long endTime, boolean includeResourceAllocations,
HttpServletRequest hsr) throws Exception {
throw new NotImplementedException("Code is not implemented");

if (queue == null || queue.isEmpty()) {
routerMetrics.incrListReservationFailedRetrieved();
throw new IllegalArgumentException("Parameter error, the queue is empty or null.");
}

if (reservationId == null || reservationId.isEmpty()) {
routerMetrics.incrListReservationFailedRetrieved();
throw new IllegalArgumentException("Parameter error, the reservationId is empty or null.");
}

try {
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByReservationId(reservationId);
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
HttpServletRequest hsrCopy = clone(hsr);
Response response = interceptor.listReservation(queue, reservationId, startTime, endTime,
includeResourceAllocations, hsrCopy);
if (response != null) {
return response;
}
} catch (YarnException e) {
routerMetrics.incrListReservationFailedRetrieved();
RouterServerUtil.logAndThrowRunTimeException("listReservation Failed.", e);
}

routerMetrics.incrListReservationFailedRetrieved();
throw new YarnException("listReservation Failed.");
}

@Override
Expand Down Expand Up @@ -1808,6 +1836,31 @@ private SubClusterInfo getHomeSubClusterInfoByAppId(String appId)
throw new YarnException("Unable to get subCluster by applicationId = " + appId);
}

/**
* get the HomeSubCluster according to ReservationId.
*
* @param resId reservationId
* @return HomeSubCluster
* @throws YarnException on failure
*/
private SubClusterInfo getHomeSubClusterInfoByReservationId(String resId)
throws YarnException {
try {
ReservationId reservationId = ReservationId.parseReservationId(resId);
SubClusterId subClusterId = federationFacade.getReservationHomeSubCluster(reservationId);
if (subClusterId == null) {
RouterServerUtil.logAndThrowException(null,
"Can't get HomeSubCluster by reservationId %s", resId);
}
SubClusterInfo subClusterInfo = federationFacade.getSubCluster(subClusterId);
return subClusterInfo;
} catch (YarnException | IOException e) {
RouterServerUtil.logAndThrowException(e,
"Get HomeSubClusterInfo by reservationId %s failed.", resId);
}
throw new YarnException("Unable to get subCluster by reservationId = " + resId);
}

@VisibleForTesting
public LRUCacheHashMap<RouterAppInfoCacheKey, AppsInfo> getAppInfosCaches() {
return appInfosCaches;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,11 @@ public void getDeleteReservationFailed() {
LOG.info("Mocked: failed getDeleteReservationFailed call");
metrics.incrDeleteReservationFailedRetrieved();
}

public void getListReservationFailed() {
LOG.info("Mocked: failed getListReservationFailed call");
metrics.incrListReservationFailedRetrieved();
}
}

// Records successes for all calls
Expand Down Expand Up @@ -643,6 +648,11 @@ public void getDeleteReservationRetrieved(long duration) {
LOG.info("Mocked: successful getDeleteReservation call with duration {}", duration);
metrics.succeededDeleteReservationRetrieved(duration);
}

public void getListReservationRetrieved(long duration) {
LOG.info("Mocked: successful getListReservation call with duration {}", duration);
metrics.succeededListReservationRetrieved(duration);
}
}

@Test
Expand Down Expand Up @@ -1201,4 +1211,27 @@ public void testGetDeleteReservationRetrievedFailed() {
Assert.assertEquals(totalBadBefore + 1,
metrics.getDeleteReservationFailedRetrieved());
}

@Test
public void testGetListReservationRetrieved() {
long totalGoodBefore = metrics.getNumSucceededListReservationRetrieved();
goodSubCluster.getListReservationRetrieved(150);
Assert.assertEquals(totalGoodBefore + 1,
metrics.getNumSucceededListReservationRetrieved());
Assert.assertEquals(150,
metrics.getLatencySucceededListReservationRetrieved(), ASSERT_DOUBLE_DELTA);
goodSubCluster.getListReservationRetrieved(300);
Assert.assertEquals(totalGoodBefore + 2,
metrics.getNumSucceededListReservationRetrieved());
Assert.assertEquals(225,
metrics.getLatencySucceededListReservationRetrieved(), ASSERT_DOUBLE_DELTA);
}

@Test
public void testGetListReservationRetrievedFailed() {
long totalBadBefore = metrics.getListReservationFailedRetrieved();
badSubCluster.getListReservationFailed();
Assert.assertEquals(totalBadBefore + 1,
metrics.getListReservationFailedRetrieved());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.util.Sets;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
Expand All @@ -57,10 +59,18 @@
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.api.records.ApplicationTimeout;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
Expand All @@ -70,6 +80,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityLevel;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
Expand Down Expand Up @@ -97,6 +109,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationListInfo;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
Expand Down Expand Up @@ -126,12 +139,36 @@ public class MockDefaultRequestInterceptorREST
private Map<ApplicationId, ApplicationReport> applicationMap = new HashMap<>();
public static final String APP_STATE_RUNNING = "RUNNING";

private static final String QUEUE_DEFAULT = "default";
private static final String QUEUE_DEFAULT_FULL = CapacitySchedulerConfiguration.ROOT +
CapacitySchedulerConfiguration.DOT + QUEUE_DEFAULT;
private static final String QUEUE_DEDICATED = "dedicated";
public static final String QUEUE_DEDICATED_FULL = CapacitySchedulerConfiguration.ROOT +
CapacitySchedulerConfiguration.DOT + QUEUE_DEDICATED;
private MockRM mockRM;

// duration(milliseconds), 1mins
public static final long DURATION = 60*1000;

// Containers 4
public static final int NUM_CONTAINERS = 4;

private void validateRunning() throws ConnectException {
if (!isRunning) {
throw new ConnectException("RM is stopped");
}
}

public MockDefaultRequestInterceptorREST(){
super();
try {
mockRM = setupResourceManager();
} catch (Exception ex) {
mockRM = null;
LOG.error("mockRM init failed", ex);
}
}

@Override
public Response createNewApplication(HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
Expand Down Expand Up @@ -788,4 +825,77 @@ public AppActivitiesInfo getAppActivities(

return appActivitiesInfo;
}

@Override
public Response listReservation(String queue, String reservationId, long startTime, long endTime,
boolean includeResourceAllocations, HttpServletRequest hsr) throws Exception {

if (!isRunning) {
throw new RuntimeException("RM is stopped");
}

if (!StringUtils.equals(queue, QUEUE_DEDICATED_FULL)) {
throw new RuntimeException("The specified queue: " + queue +
" is not managed by reservation system." +
" Please try again with a valid reservable queue.");
}

ReservationId reservationID = ReservationId.parseReservationId(reservationId);
ReservationSystem reservationSystem = mockRM.getReservationSystem();
reservationSystem.synchronizePlan(QUEUE_DEDICATED_FULL, true);

// Generate reserved resources
ClientRMService clientService = mockRM.getClientRMService();

// arrival time from which the resource(s) can be allocated.
long arrival = Time.now();

// deadline by when the resource(s) must be allocated.
// The reason for choosing 1.05 is because this gives an integer
// DURATION * 0.05 = 3000(ms)
// deadline = arrival + 3000ms
long deadline = (long) (arrival + 1.05 * DURATION);

// In this test of reserved resources, we will apply for 4 containers (1 core, 1GB memory)
// arrival = Time.now(), and make sure deadline - arrival > duration,
// the current setting is greater than 3000ms
ReservationSubmissionRequest submissionRequest =
ReservationSystemTestUtil.createSimpleReservationRequest(
reservationID, NUM_CONTAINERS, arrival, deadline, DURATION);
clientService.submitReservation(submissionRequest);

// listReservations
ReservationListRequest request = ReservationListRequest.newInstance(
queue, reservationID.toString(), startTime, endTime, includeResourceAllocations);
ReservationListResponse resRespInfo = clientService.listReservations(request);
ReservationListInfo resResponse =
new ReservationListInfo(resRespInfo, includeResourceAllocations);
return Response.status(Status.OK).entity(resResponse).build();
}

private MockRM setupResourceManager() throws Exception {
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();

// Define default queue
conf.setCapacity(QUEUE_DEFAULT_FULL, 20);
// Define dedicated queues
conf.setQueues(CapacitySchedulerConfiguration.ROOT,
new String[] {QUEUE_DEFAULT, QUEUE_DEDICATED});
conf.setCapacity(QUEUE_DEDICATED_FULL, 80);
conf.setReservable(QUEUE_DEDICATED_FULL, true);

conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class);
conf.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true);
MockRM rm = new MockRM(conf);
rm.start();
rm.registerNode("127.0.0.1:5678", 100*1024, 100);
return rm;
}

@Override
public void shutdown() {
if (mockRM != null) {
mockRM.stop();
}
}
}
Loading