Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,19 @@
* The options class for RayCall or ActorCreation.
*/
public abstract class BaseTaskOptions {
public Map<String, Double> resources;
public final Map<String, Double> resources;

public BaseTaskOptions() {
resources = new HashMap<>();
}

public BaseTaskOptions(Map<String, Double> resources) {
for (Map.Entry<String, Double> entry : resources.entrySet()) {
if (entry.getValue().compareTo(0.0) <= 0) {
throw new RuntimeException(String.format("Resource capacity should be positive, " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use IllegalArgumentException here

"but got resource %s = %f.", entry.getKey(), entry.getValue()));
}
}
this.resources = resources;
}

Expand Down
2 changes: 1 addition & 1 deletion java/example.conf
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ ray {
run-mode = CLUSTER

// Available resources on this node.
resources: "CPU:4,GPU:0"
resources: "CPU:4"

// The address of the redis server to connect, in format `ip:port`.
// If not provided, Ray processes will be started locally, including
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.ray.runtime.task.ArgumentsBuilder;
import org.ray.runtime.task.TaskLanguage;
import org.ray.runtime.task.TaskSpec;
import org.ray.runtime.util.ResourceUtil;
import org.ray.runtime.util.UniqueIdUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -350,11 +349,6 @@ private TaskSpec createTaskSpec(RayFunc func, PyFunctionDescriptor pyFunctionDes
resources = new HashMap<>(taskOptions.resources);
}

if (!resources.containsKey(ResourceUtil.CPU_LITERAL)
&& !resources.containsKey(ResourceUtil.CPU_LITERAL.toLowerCase())) {
resources.put(ResourceUtil.CPU_LITERAL, 0.0);
}

int maxActorReconstruction = 0;
if (taskOptions instanceof ActorCreationOptions) {
maxActorReconstruction = ((ActorCreationOptions) taskOptions).maxReconstructions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,6 @@ public RayConfig(Config config) {
+ "setting it to the number of CPU cores: {}", numCpu);
resources.put("CPU", numCpu * 1.0);
}
if (!resources.containsKey("GPU")) {
LOGGER.warn("No GPU resource is set in configuration, setting it to 0");
resources.put("GPU", 0.0);
}
}
// Driver id.
String driverId = config.getString("ray.driver.id");
Expand Down
2 changes: 1 addition & 1 deletion java/streaming/src/main/resources/ray.conf
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
ray {
run-mode = SINGLE_PROCESS
resources = "CPU:4,GPU:0"
resources = "CPU:4"
redis.address = ""
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,30 +34,37 @@ public Integer echo(Integer number) {
@Test
public void testMethods() {
TestUtils.skipTestUnderSingleProcess();
CallOptions callOptions1 = new CallOptions(ImmutableMap.of("CPU", 4.0, "GPU", 0.0));
CallOptions callOptions1 = new CallOptions(ImmutableMap.of("CPU", 4.0));

// This is a case that can satisfy required resources.
// The static resources for test are "CPU:4,RES-A:4".
RayObject<Integer> result1 = Ray.call(ResourcesManagementTest::echo, 100, callOptions1);
Assert.assertEquals(100, (int) result1.get());

CallOptions callOptions2 = new CallOptions(ImmutableMap.of("CPU", 4.0, "GPU", 2.0));
CallOptions callOptions2 = new CallOptions(ImmutableMap.of("CPU", 4.0));

// This is a case that can't satisfy required resources.
// The static resources for test are "CPU:4,RES-A:4".
final RayObject<Integer> result2 = Ray.call(ResourcesManagementTest::echo, 200, callOptions2);
WaitResult<Integer> waitResult = Ray.wait(ImmutableList.of(result2), 1, 1000);

Assert.assertEquals(0, waitResult.getReady().size());
Assert.assertEquals(1, waitResult.getUnready().size());
Assert.assertEquals(1, waitResult.getReady().size());
Assert.assertEquals(0, waitResult.getUnready().size());

try {
CallOptions callOptions3 = new CallOptions(ImmutableMap.of("CPU", 0.0));
Assert.fail();
} catch (RuntimeException e) {
// We should receive a RuntimeException indicate that we should pass a zero capacity resource.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// We should receive a RuntimeException indicate that we should pass a zero capacity resource.
// We should receive a RuntimeException indicates that we should not pass a zero capacity resource.

}
}

@Test
public void testActors() {
TestUtils.skipTestUnderSingleProcess();

ActorCreationOptions actorCreationOptions1 =
new ActorCreationOptions(ImmutableMap.of("CPU", 2.0, "GPU", 0.0));
new ActorCreationOptions(ImmutableMap.of("CPU", 2.0));

// This is a case that can satisfy required resources.
// The static resources for test are "CPU:4,RES-A:4".
Expand All @@ -68,7 +75,7 @@ public void testActors() {
// This is a case that can't satisfy required resources.
// The static resources for test are "CPU:4,RES-A:4".
ActorCreationOptions actorCreationOptions2 =
new ActorCreationOptions(ImmutableMap.of("CPU", 8.0, "GPU", 0.0));
new ActorCreationOptions(ImmutableMap.of("CPU", 8.0));

RayActor<ResourcesManagementTest.Echo> echo2 =
Ray.createActor(Echo::new, actorCreationOptions2);
Expand Down
2 changes: 0 additions & 2 deletions python/ray/includes/task.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ cdef class Task:
# Parse the resource map.
if resource_map is not None:
required_resources = resource_map_from_dict(resource_map)
if required_resources.count(b"CPU") == 0:
required_resources[b"CPU"] = 1.0
if placement_resource_map is not None:
required_placement_resources = (
resource_map_from_dict(placement_resource_map))
Expand Down
18 changes: 15 additions & 3 deletions python/ray/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -1029,14 +1029,25 @@ def check_and_update_resources(num_cpus, num_gpus, resources):
if gpu_ids is not None:
resources["GPU"] = min(resources["GPU"], len(gpu_ids))

resources = {
resource_label: resource_quantity
for resource_label, resource_quantity in resources.items()
if resource_quantity != 0
}

# Check types.
for _, resource_quantity in resources.items():
assert (isinstance(resource_quantity, int)
or isinstance(resource_quantity, float))
if (isinstance(resource_quantity, float)
and not resource_quantity.is_integer()):
raise ValueError("Resource quantities must all be whole numbers.")

raise ValueError(
"Resource quantities must all be whole numbers. Received {}.".
format(resources))
if resource_quantity < 0:
raise ValueError(
"Resource quantities must be nonnegative. Received {}.".format(
resources))
if resource_quantity > ray_constants.MAX_RESOURCE_QUANTITY:
raise ValueError("Resource quantities must be at most {}.".format(
ray_constants.MAX_RESOURCE_QUANTITY))
Expand Down Expand Up @@ -1113,8 +1124,9 @@ def start_raylet(redis_address,

# Limit the number of workers that can be started in parallel by the
# raylet. However, make sure it is at least 1.
num_cpus_static = static_resources.get("CPU", 0)
maximum_startup_concurrency = max(
1, min(multiprocessing.cpu_count(), static_resources["CPU"]))
1, min(multiprocessing.cpu_count(), num_cpus_static))

# Format the resource argument in a form like 'CPU,1.0,GPU,0,Custom,3'.
resource_argument = ",".join(
Expand Down
32 changes: 29 additions & 3 deletions python/ray/tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -1934,15 +1934,15 @@ def run_lots_of_tasks():
store_names = []
store_names += [
client["ObjectStoreSocketName"] for client in client_table
if client["Resources"]["GPU"] == 0
if client["Resources"].get("GPU", 0) == 0
]
store_names += [
client["ObjectStoreSocketName"] for client in client_table
if client["Resources"]["GPU"] == 5
if client["Resources"].get("GPU", 0) == 5
]
store_names += [
client["ObjectStoreSocketName"] for client in client_table
if client["Resources"]["GPU"] == 1
if client["Resources"].get("GPU", 0) == 1
]
assert len(store_names) == 3

Expand Down Expand Up @@ -2112,6 +2112,32 @@ def f():
ray.get(results)


def test_zero_capacity_deletion_semantics(shutdown_only):
ray.init(num_cpus=2, num_gpus=1, resources={"test_resource": 1})

def test():
resources = ray.global_state.available_resources()
retry_count = 0

while resources and retry_count < 5:
time.sleep(0.1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably fine, but 5 retries may not be enough in an environment like Travis, so let's keep an eye on whether this test becomes flaky or not and if it starts failing then increase this.

resources = ray.global_state.available_resources()
retry_count += 1

if retry_count >= 5:
raise RuntimeError("Resources were available even after retries.")

return resources

function = ray.remote(
num_cpus=2, num_gpus=1, resources={"test_resource": 1})(test)
cluster_resources = ray.get(function.remote())

# All cluster resources should be utilized and
# cluster_resources must be empty
assert cluster_resources == {}


@pytest.fixture
def save_gpu_ids_shutdown_only():
# Record the curent value of this environment variable so that we can
Expand Down
4 changes: 2 additions & 2 deletions python/ray/tests/test_global_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ def cpu_task():

while not resource_used:
available_resources = ray.global_state.available_resources()
resource_used = available_resources[
"CPU"] == cluster_resources["CPU"] - 1
resource_used = available_resources.get(
"CPU", 0) == cluster_resources.get("CPU", 0) - 1

assert resource_used

Expand Down
21 changes: 13 additions & 8 deletions python/ray/tune/ray_trial_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import traceback

import ray
from ray.tune.error import TuneError, AbortTrialExecution
from ray.tune.error import AbortTrialExecution
from ray.tune.logger import NoopLogger
from ray.tune.trial import Trial, Resources, Checkpoint
from ray.tune.trial_executor import TrialExecutor
Expand Down Expand Up @@ -363,17 +363,22 @@ def _update_avail_resources(self, num_retries=5):
resources = ray.services.check_and_update_resources(
None, None, None)
if not resources:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@richardliaw We're changing resource semantics - zero capacity resources are now not included in resource datastructures. For instance instead of returning {resource: 0} we now return an empty dictionary.

To make tests work, I've made some changes to resource handling in tune - can you please check if this looks okay?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed a small tweak - thanks for letting me know!

logger.warning("Cluster resources not detected. Retrying...")
logger.warning(
"Cluster resources not detected or are 0. Retrying...")
time.sleep(0.5)

if not resources or "CPU" not in resources:
raise TuneError("Cluster resources cannot be detected. "
"You can resume this experiment by passing in "
"`resume=True` to `run`.")
if not resources:
# NOTE: This hides the possibility that Ray may be waiting for
# clients to connect.
resources.setdefault("CPU", 0)
resources.setdefault("GPU", 0)
logger.warning("Cluster resources cannot be detected or are 0. "
"You can resume this experiment by passing in "
"`resume=True` to `run`.")

resources = resources.copy()
num_cpus = resources.pop("CPU")
num_gpus = resources.pop("GPU")
num_cpus = resources.pop("CPU", 0)
num_gpus = resources.pop("GPU", 0)
custom_resources = resources

self._avail_resources = Resources(
Expand Down
9 changes: 8 additions & 1 deletion python/ray/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,13 @@ def submit_task(self,
raise ValueError(
"Resource quantities must all be whole numbers.")

# Remove any resources with zero quantity requirements
resources = {
resource_label: resource_quantity
for resource_label, resource_quantity in resources.items()
if resource_quantity > 0
}

if placement_resources is None:
placement_resources = {}

Expand Down Expand Up @@ -1870,7 +1877,7 @@ def connect(node,
nil_actor_counter, # actor_counter.
[], # new_actor_handles.
[], # execution_dependencies.
{"CPU": 0}, # resource_map.
{}, # resource_map.
{}, # placement_resource_map.
)

Expand Down
33 changes: 19 additions & 14 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1460,15 +1460,16 @@ void NodeManager::HandleTaskBlocked(const std::shared_ptr<LocalClientConnection>
// Get the CPU resources required by the running task.
const auto required_resources = task.GetTaskSpecification().GetRequiredResources();
double required_cpus = required_resources.GetNumCpus();
const std::unordered_map<std::string, double> cpu_resources = {
{kCPU_ResourceLabel, required_cpus}};
std::unordered_map<std::string, double> cpu_resources;
if (required_cpus > 0) {
cpu_resources[kCPU_ResourceLabel] = required_cpus;
}

// Release the CPU resources.
auto const cpu_resource_ids = worker->ReleaseTaskCpuResources();
local_available_resources_.Release(cpu_resource_ids);
RAY_CHECK(
cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()].Release(
ResourceSet(cpu_resources)));
cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()].Release(
ResourceSet(cpu_resources));
worker->MarkBlocked();

// Try dispatching tasks since we may have released some resources.
Expand Down Expand Up @@ -1512,9 +1513,11 @@ void NodeManager::HandleTaskUnblocked(
// Get the CPU resources required by the running task.
const auto required_resources = task.GetTaskSpecification().GetRequiredResources();
double required_cpus = required_resources.GetNumCpus();
const ResourceSet cpu_resources(
std::unordered_map<std::string, double>({{kCPU_ResourceLabel, required_cpus}}));

std::unordered_map<std::string, double> cpu_resources_map;
if (required_cpus > 0) {
cpu_resources_map[kCPU_ResourceLabel] = required_cpus;
}
const ResourceSet cpu_resources(cpu_resources_map);
// Check if we can reacquire the CPU resources.
bool oversubscribed = !local_available_resources_.Contains(cpu_resources);

Expand All @@ -1524,9 +1527,8 @@ void NodeManager::HandleTaskUnblocked(
// reacquire here may be different from the ones that the task started with.
auto const resource_ids = local_available_resources_.Acquire(cpu_resources);
worker->AcquireTaskCpuResources(resource_ids);
RAY_CHECK(
cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()].Acquire(
cpu_resources));
cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()].Acquire(
cpu_resources);
} else {
// In this case, we simply don't reacquire the CPU resources for the worker.
// The worker can keep running and when the task finishes, it will simply
Expand Down Expand Up @@ -1618,7 +1620,7 @@ bool NodeManager::AssignTask(const Task &task) {
auto acquired_resources =
local_available_resources_.Acquire(spec.GetRequiredResources());
const auto &my_client_id = gcs_client_->client_table().GetLocalClientId();
RAY_CHECK(cluster_resource_map_[my_client_id].Acquire(spec.GetRequiredResources()));
cluster_resource_map_[my_client_id].Acquire(spec.GetRequiredResources());

if (spec.IsActorCreationTask()) {
// Check that we are not placing an actor creation task on a node with 0 CPUs.
Expand Down Expand Up @@ -1732,8 +1734,8 @@ void NodeManager::FinishAssignedTask(Worker &worker) {
// Release task's resources. The worker's lifetime resources are still held.
auto const &task_resources = worker.GetTaskResourceIds();
local_available_resources_.Release(task_resources);
RAY_CHECK(cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()].Release(
task_resources.ToResourceSet()));
cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()].Release(
task_resources.ToResourceSet());
worker.ResetTaskResourceIds();

// If this was an actor or actor creation task, handle the actor's new state.
Expand Down Expand Up @@ -2025,6 +2027,9 @@ void NodeManager::ForwardTaskOrResubmit(const Task &task,

RAY_LOG(INFO) << "Failed to forward task " << task_id << " to node manager "
<< node_manager_id;

// TODO(romilb): We should probably revert the load subtraction from
// SchedulingPolicy::Schedule()
// Mark the failed task as pending to let other raylets know that we still
// have the task. TaskDependencyManager::TaskPending() is assumed to be
// idempotent.
Expand Down
8 changes: 5 additions & 3 deletions src/ray/raylet/scheduling_policy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,13 @@ std::unordered_map<TaskID, ClientID> SchedulingPolicy::Schedule(
const auto &node_resources = client_resource_pair.second;
ResourceSet available_node_resources =
ResourceSet(node_resources.GetAvailableResources());
available_node_resources.SubtractResourcesStrict(node_resources.GetLoadResources());
// TODO(romilb): Why do we need to subtract load from available resources?
// Even if we don't the code path below for choosing a dst_client_id would be
// similar.
available_node_resources.SubtractResources(node_resources.GetLoadResources());
RAY_LOG(DEBUG) << "client_id " << node_client_id
<< " avail: " << node_resources.GetAvailableResources().ToString()
<< " load: " << node_resources.GetLoadResources().ToString()
<< " avail-load: " << available_node_resources.ToString();
<< " load: " << node_resources.GetLoadResources().ToString();

if (resource_demand.IsSubset(available_node_resources)) {
// This node is a feasible candidate.
Expand Down
Loading