Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public abstract class ApplicationResourceUsageReport {
public static ApplicationResourceUsageReport newInstance(
int numUsedContainers, int numReservedContainers, Resource usedResources,
Resource reservedResources, Resource neededResources, long memorySeconds,
long vcoreSeconds) {
long vcoreSeconds, long GPUSeconds, long GpuBitVecSeconds) {
ApplicationResourceUsageReport report =
Records.newRecord(ApplicationResourceUsageReport.class);
report.setNumUsedContainers(numUsedContainers);
Expand All @@ -46,6 +46,8 @@ public static ApplicationResourceUsageReport newInstance(
report.setNeededResources(neededResources);
report.setMemorySeconds(memorySeconds);
report.setVcoreSeconds(vcoreSeconds);
report.setGPUSeconds(GPUSeconds);
report.setGpuBitVecSeconds(GpuBitVecSeconds);
return report;
}

Expand Down Expand Up @@ -152,4 +154,41 @@ public static ApplicationResourceUsageReport newInstance(
@Public
@Unstable
public abstract long getVcoreSeconds();

/**
* Set the aggregated number of GPUs that the application has allocated
* times the number of seconds the application has been running.
* @param GPU_seconds the aggregated number of GPU seconds
*/
@Private
@Unstable
public abstract void setGPUSeconds(long GPU_seconds);

/**
* Get the aggregated number of GPUs that the application has allocated
* times the number of seconds the application has been running.
* @return the aggregated number of GPU seconds
*/
@Public
@Unstable
public abstract long getGPUSeconds();

/**
* Set the Gpu bit vector that the application has allocated
* times the number of seconds the application has been running.
* @param GpuBitVec_seconds the gpu bit vector seconds
*/

@Private
@Unstable
public abstract void setGpuBitVecSeconds(long GpuBitVec_seconds);

/**
* Get the Gpu bit vector that the application has allocated
* times the number of seconds the application has been running.
* @return the gpu bit vector seconds
*/
@Public
@Unstable
public abstract long getGpuBitVecSeconds();
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,12 @@ public abstract class Resource implements Comparable<Resource> {

@Public
@Stable
public static Resource newInstance(int memory, int vCores) {
public static Resource newInstance(int memory, int vCores, int GPUs, int GpuBitVec ) {
Resource resource = Records.newRecord(Resource.class);
resource.setMemory(memory);
resource.setVirtualCores(vCores);
resource.setGPUs(GPUs);
resource.setGpuBitVec(GpuBitVec);
return resource;
}

Expand Down Expand Up @@ -105,12 +107,68 @@ public static Resource newInstance(int memory, int vCores) {
@Evolving
public abstract void setVirtualCores(int vCores);

/**
* Get <em>number of GPUs</em> of the resource.
*
* GPUs are a unit for expressing GPU parallelism. A node's capacity
* should be configured with GPUs equal to its number of GPUs.
* A container should be requested with the number of GPUs it can saturate, i.e.
* the average number of GPU parallelism it expects to have runnable at a time.
*
* @return <em>number of GPUs</em> of the resource
*/
@Public
@Evolving
public abstract int getGPUs();

/**
* Set <em>number of GPUs</em> of the resource.
*
* GPUs are a unit for expressing GPU parallelism. A node's capacity
* should be configured with GPUs equal to its number of GPUs.
* A container should be requested with the number of GPUs it can saturate, i.e.
* the average number of GPU parallelism it expects to have runnable at a time.
*
* @param GPUs <em>number of GPUs</em> of the resource
*/
@Public
@Evolving
public abstract void setGPUs(int GPUs);

/**
* Get <em> Gpu Bit Vector </em> of the resource.
*
* GpuBitVec is a unit for representing GPU information with locality awareness.
* Bit being set 1 means the related GPU is requested/occupied. The sum of 1 bits should equal to #GPUs.
* Bit being set 0 means the related GPU is idle/not exist.
*
* @return <em>Gpu Bit Vector</em> of the resource
*/
@Public
@Evolving
public abstract int getGpuBitVec();

/**
* Set <em>Gpu Bit Vector</em> of the resource.
*
* GpuBitVec is a unit for representing GPU information with locality awareness.
* Bit being set 1 means the related GPU is requested/occupied. The sum of 1 bits should equal to #GPUs.
* Bit being set 0 means the related GPU is idle/not exist.
*
* @param GpuBitVec <em>Gpu Bit Vector</em> of the resource
*/
@Public
@Evolving
public abstract void setGpuBitVec(int GpuBitVec);

@Override
public int hashCode() {
final int prime = 263167;
int result = 3571;
result = 939769357 + getMemory(); // prime * result = 939769357 initially
result = prime * result + getVirtualCores();
result = prime * result + getGPUs();
result = prime * result + getGpuBitVec();
return result;
}

Expand All @@ -123,15 +181,17 @@ public boolean equals(Object obj) {
if (!(obj instanceof Resource))
return false;
Resource other = (Resource) obj;
if (getMemory() != other.getMemory() ||
getVirtualCores() != other.getVirtualCores()) {
if (getMemory() != other.getMemory() ||
getVirtualCores() != other.getVirtualCores() ||
getGPUs() != other.getGPUs() ||
getGpuBitVec() != other.getGpuBitVec()) {
return false;
}
return true;
}

@Override
public String toString() {
return "<memory:" + getMemory() + ", vCores:" + getVirtualCores() + ">";
return "<memory:" + getMemory() + ", vCores:" + getVirtualCores() + ", GPUs:" + getGPUs() + ", GpuBitVec:" + getGpuBitVec() + ">";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,12 @@ private static void addDeprecatedKeys() {
public static final String RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES =
YARN_PREFIX + "scheduler.minimum-allocation-vcores";
public static final int DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES = 1;
public static final String RM_SCHEDULER_MINIMUM_ALLOCATION_GPUS =
YARN_PREFIX + "scheduler.minimum-allocation-gpus";
public static final int DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_GPUS = 1;
public static final String RM_SCHEDULER_MINIMUM_ALLOCATION_GPUBITVEC =
YARN_PREFIX + "scheduler.minimum-allocation-gpu-bit-vec";
public static final int DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_GPUBITVEC = 0;

/** Maximum request grant-able by the RM scheduler. */
public static final String RM_SCHEDULER_MAXIMUM_ALLOCATION_MB =
Expand All @@ -179,6 +185,12 @@ private static void addDeprecatedKeys() {
public static final String RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES =
YARN_PREFIX + "scheduler.maximum-allocation-vcores";
public static final int DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES = 4;
public static final String RM_SCHEDULER_MAXIMUM_ALLOCATION_GPUS =
YARN_PREFIX + "scheduler.maximum-allocation-gpus";
public static final int DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_GPUS = 4;
public static final String RM_SCHEDULER_MAXIMUM_ALLOCATION_GPUBITVEC =
YARN_PREFIX + "scheduler.maximum-allocation-gpu-bit-vec";
public static final int DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_GPUBITVEC = 15;

/** Number of threads to handle scheduler interface.*/
public static final String RM_SCHEDULER_CLIENT_THREAD_COUNT =
Expand Down Expand Up @@ -844,6 +856,20 @@ private static void addDeprecatedKeys() {
NM_PREFIX + "resource.percentage-physical-cpu-limit";
public static final int DEFAULT_NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT =
100;

/** Number of GPUs which can be allocated for containers.*/
public static final String NM_GPUS = NM_PREFIX + "resource.GPUs";
public static final int DEFAULT_NM_GPUS = 8;

/** Number of GpuBitVec which can be allocated for containers.*/
public static final String NM_GPUBITVEC = NM_PREFIX + "resource.GpuBitVec";
public static final int DEFAULT_NM_GPUBITVEC = 8;

/** Percentage of overall GPU which can be allocated for containers. */
public static final String NM_RESOURCE_PERCENTAGE_PHYSICAL_GPU_LIMIT =
NM_PREFIX + "resource.percentage-GPU-limit";
public static final int DEFAULT_NM_RESOURCE_PERCENTAGE_PHYSICAL_GPU_LIMIT =
100;

/** NM Webapp address.**/
public static final String NM_WEBAPP_ADDRESS = NM_PREFIX + "webapp.address";
Expand Down Expand Up @@ -1075,7 +1101,7 @@ private static void addDeprecatedKeys() {
20;

/**
* Indicates if memory and CPU limits will be set for the Windows Job
* Indicates if memory, CPU, and GPU limits will be set for the Windows Job
* Object for the containers launched by the default container executor.
*/
public static final String NM_WINDOWS_CONTAINER_MEMORY_LIMIT_ENABLED =
Expand All @@ -1086,6 +1112,10 @@ private static void addDeprecatedKeys() {
NM_PREFIX + "windows-container.cpu-limit.enabled";
public static final boolean DEFAULT_NM_WINDOWS_CONTAINER_CPU_LIMIT_ENABLED = false;

public static final String NM_WINDOWS_CONTAINER_GPU_LIMIT_ENABLED =
NM_PREFIX + "windows-container.GPU-limit.enabled";
public static final boolean DEFAULT_NM_WINDOWS_CONTAINER_GPU_LIMIT_ENABLED = false;

/**
/* The Windows group that the windows-secure-container-executor should run as.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ message ContainerIdProto {
message ResourceProto {
optional int32 memory = 1;
optional int32 virtual_cores = 2;
optional int32 GPUs = 3;
optional int32 GpuBitVec = 4;
}

message ResourceOptionProto {
Expand Down Expand Up @@ -171,6 +173,8 @@ message ApplicationResourceUsageReportProto {
optional ResourceProto needed_resources = 5;
optional int64 memory_seconds = 6;
optional int64 vcore_seconds = 7;
optional int64 GPU_seconds = 8;
optional int64 GpuBitVec_seconds = 9;
}

message ApplicationReportProto {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ message AllocateResponseProto {
enum SchedulerResourceTypes {
MEMORY = 0;
CPU = 1;
GPU = 2;
GPUBITVEC = 3;
}

//////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,10 @@ public static enum DSEntity {
private int containerMemory = 10;
// VirtualCores to request for the container on which the shell command will run
private int containerVirtualCores = 1;
// GPUs to request for the container on which the shell command will run
private int containerGPUs = 1;
// GPU location to request for the container on which the shell command will run
private int containerGpuBitVec = 1;
// Priority of the request
private int requestPriority;

Expand Down Expand Up @@ -358,6 +362,8 @@ public boolean init(String[] args) throws ParseException, IOException {
"Amount of memory in MB to be requested to run the shell command");
opts.addOption("container_vcores", true,
"Amount of virtual cores to be requested to run the shell command");
opts.addOption("container_GPUs", true,
"Amount of GPUs to be requested to run the shell command");
opts.addOption("num_containers", true,
"No. of containers on which the shell command needs to be executed");
opts.addOption("priority", true, "Application Priority. Default 0");
Expand Down Expand Up @@ -490,6 +496,8 @@ public boolean init(String[] args) throws ParseException, IOException {
"container_memory", "10"));
containerVirtualCores = Integer.parseInt(cliParser.getOptionValue(
"container_vcores", "1"));
containerGPUs = Integer.parseInt(cliParser.getOptionValue(
"container_GPUs", "1"));
numTotalContainers = Integer.parseInt(cliParser.getOptionValue(
"num_containers", "1"));
if (numTotalContainers == 0) {
Expand Down Expand Up @@ -577,10 +585,13 @@ public void run() throws YarnException, IOException, InterruptedException {
// Dump out information about cluster capability as seen by the
// resource manager
int maxMem = response.getMaximumResourceCapability().getMemory();
LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
LOG.info("Max mem capability of resources in this cluster " + maxMem);

int maxVCores = response.getMaximumResourceCapability().getVirtualCores();
LOG.info("Max vcores capabililty of resources in this cluster " + maxVCores);
LOG.info("Max vcores capability of resources in this cluster " + maxVCores);

int maxGPUs = response.getMaximumResourceCapability().getGPUs();
LOG.info("Max GPUs capability of resources in this cluster " + maxGPUs);

// A resource ask cannot exceed the max.
if (containerMemory > maxMem) {
Expand All @@ -597,6 +608,13 @@ public void run() throws YarnException, IOException, InterruptedException {
containerVirtualCores = maxVCores;
}

if (containerGPUs > maxGPUs) {
LOG.info("Container GPUs specified above max threshold of cluster."
+ " Using max value." + ", specified=" + containerGPUs + ", max="
+ maxGPUs);
containerGPUs = maxGPUs;
}

List<Container> previousAMRunningContainers =
response.getContainersFromPreviousAttempts();
LOG.info(appAttemptID + " received " + previousAMRunningContainers.size()
Expand Down Expand Up @@ -792,7 +810,9 @@ public void onContainersAllocated(List<Container> allocatedContainers) {
+ ", containerResourceMemory"
+ allocatedContainer.getResource().getMemory()
+ ", containerResourceVirtualCores"
+ allocatedContainer.getResource().getVirtualCores());
+ allocatedContainer.getResource().getVirtualCores()
+ ", containerResourceGPUs"
+ allocatedContainer.getResource().getGPUs());
// + ", containerToken"
// +allocatedContainer.getContainerToken().getIdentifier().toString());

Expand Down Expand Up @@ -1056,7 +1076,7 @@ private ContainerRequest setupContainerAskForRM() {
// Set up resource type requirements
// For now, memory and CPU are supported so we set memory and cpu requirements
Resource capability = Resource.newInstance(containerMemory,
containerVirtualCores);
containerVirtualCores, containerGPUs, containerGpuBitVec);

ContainerRequest request = new ContainerRequest(capability, null, null,
pri);
Expand Down
Loading