Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;

import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_MB;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;

import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -50,4 +54,52 @@ public static Configuration createBasicCSConfiguration() {
return createConfiguration(conf);
}

public static void setMinAllocMb(Configuration conf, int minAllocMb) {
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
minAllocMb);
}

public static void setMaxAllocMb(Configuration conf, int maxAllocMb) {
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
maxAllocMb);
}

public static void setMaxAllocMb(CapacitySchedulerConfiguration conf,
String queueName, int maxAllocMb) {
String propName = CapacitySchedulerConfiguration.getQueuePrefix(queueName)
+ MAXIMUM_ALLOCATION_MB;
conf.setInt(propName, maxAllocMb);
}

public static void setMinAllocVcores(Configuration conf, int minAllocVcores) {
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
minAllocVcores);
}

public static void setMaxAllocVcores(Configuration conf, int maxAllocVcores) {
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
maxAllocVcores);
}

public static void setMaxAllocVcores(CapacitySchedulerConfiguration conf,
String queueName, int maxAllocVcores) {
String propName = CapacitySchedulerConfiguration.getQueuePrefix(queueName)
+ CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_VCORES;
conf.setInt(propName, maxAllocVcores);
}

public static void setMaxAllocation(CapacitySchedulerConfiguration conf,
String queueName, String maxAllocation) {
String propName = CapacitySchedulerConfiguration.getQueuePrefix(queueName)
+ MAXIMUM_ALLOCATION;
conf.set(propName, maxAllocation);
}

public static void unsetMaxAllocation(CapacitySchedulerConfiguration conf,
String queueName) {
String propName = CapacitySchedulerConfiguration.getQueuePrefix(queueName)
+ MAXIMUM_ALLOCATION;
conf.unset(propName);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,48 @@

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.util.Sets;
import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.Application;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.NodeManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.junit.Assert;

import java.io.IOException;
import java.util.Set;

import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupQueueConfiguration;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public final class CapacitySchedulerTestUtilities {
public static final int GB = 1024;

Expand Down Expand Up @@ -69,4 +103,119 @@ public static void waitforNMRegistered(ResourceScheduler scheduler, int nodecoun
}
}
}

public static ResourceManager createResourceManager() throws Exception {
ResourceUtils.resetResourceTypes(new Configuration());
DefaultMetricsSystem.setMiniClusterMode(true);
ResourceManager resourceManager = new ResourceManager() {
@Override
protected RMNodeLabelsManager createNodeLabelManager() {
RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
mgr.init(getConfig());
return mgr;
}
};
CapacitySchedulerConfiguration csConf
= new CapacitySchedulerConfiguration();
setupQueueConfiguration(csConf);
YarnConfiguration conf = new YarnConfiguration(csConf);
conf.setClass(YarnConfiguration.RM_SCHEDULER,
CapacityScheduler.class, ResourceScheduler.class);
resourceManager.init(conf);
resourceManager.getRMContext().getContainerTokenSecretManager().rollMasterKey();
resourceManager.getRMContext().getNMTokenSecretManager().rollMasterKey();
((AsyncDispatcher) resourceManager.getRMContext().getDispatcher()).start();
return resourceManager;
}

public static RMContext createMockRMContext() {
RMContext mockContext = mock(RMContext.class);
when(mockContext.getConfigurationProvider()).thenReturn(
new LocalConfigurationProvider());
return mockContext;
}

public static void stopResourceManager(ResourceManager resourceManager) throws Exception {
if (resourceManager != null) {
QueueMetrics.clearQueueMetrics();
DefaultMetricsSystem.shutdown();
resourceManager.stop();
}
}

public static ApplicationAttemptId appHelper(MockRM rm, CapacityScheduler cs,
int clusterTs, int appId, String queue,
String user) {
ApplicationId appId1 = BuilderUtils.newApplicationId(clusterTs, appId);
ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId(
appId1, appId);

RMAppAttemptMetrics attemptMetric1 =
new RMAppAttemptMetrics(appAttemptId1, rm.getRMContext());
RMAppImpl app1 = mock(RMAppImpl.class);
when(app1.getApplicationId()).thenReturn(appId1);
RMAppAttemptImpl attempt1 = mock(RMAppAttemptImpl.class);
Container container = mock(Container.class);
when(attempt1.getMasterContainer()).thenReturn(container);
ApplicationSubmissionContext submissionContext = mock(
ApplicationSubmissionContext.class);
when(attempt1.getSubmissionContext()).thenReturn(submissionContext);
when(attempt1.getAppAttemptId()).thenReturn(appAttemptId1);
when(attempt1.getRMAppAttemptMetrics()).thenReturn(attemptMetric1);
when(app1.getCurrentAppAttempt()).thenReturn(attempt1);
rm.getRMContext().getRMApps().put(appId1, app1);

SchedulerEvent addAppEvent1 =
new AppAddedSchedulerEvent(appId1, queue, user);
cs.handle(addAppEvent1);
SchedulerEvent addAttemptEvent1 =
new AppAttemptAddedSchedulerEvent(appAttemptId1, false);
cs.handle(addAttemptEvent1);
return appAttemptId1;
}

public static MockRM setUpMove() {
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
return setUpMove(conf);
}

public static MockRM setUpMove(Configuration config) {
CapacitySchedulerConfiguration conf =
new CapacitySchedulerConfiguration(config);
setupQueueConfiguration(conf);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
MockRM rm = new MockRM(conf);
rm.start();
return rm;
}

public static void nodeUpdate(ResourceManager rm, NodeManager nm) {
RMNode node = rm.getRMContext().getRMNodes().get(nm.getNodeId());
// Send a heartbeat to kick the tires on the Scheduler
NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
rm.getResourceScheduler().handle(nodeUpdate);
}

public static NodeManager registerNode(ResourceManager rm, String hostName,
int containerManagerPort, int httpPort, String rackName,
Resource capability, NodeStatus nodeStatus)
throws IOException, YarnException {
NodeManager nm = new NodeManager(hostName,
containerManagerPort, httpPort, rackName, capability, rm, nodeStatus);
NodeAddedSchedulerEvent nodeAddEvent1 =
new NodeAddedSchedulerEvent(rm.getRMContext().getRMNodes()
.get(nm.getNodeId()));
rm.getResourceScheduler().handle(nodeAddEvent1);
return nm;
}

public static void checkApplicationResourceUsage(int expected, Application application) {
Assert.assertEquals(expected, application.getUsedResources().getMemorySize());
}

public static void checkNodeResourceUsage(int expected, NodeManager node) {
Assert.assertEquals(expected, node.getUsed().getMemorySize());
node.checkResourceUsage();
}
}
Loading