Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions python/ray/tests/test_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1402,13 +1402,13 @@ def inc(self):
return self.x

# Create many actors. It should take a while to finish initializing them.
actors = [Counter.remote() for _ in range(100)]
actors = [Counter.remote() for _ in range(15)]
# Allow some time to forward the actor creation tasks to the other node.
time.sleep(0.1)
# Kill the second node.
cluster.remove_node(remote_node)

# Get all of the results
# Get all of the results.
results = ray.get([actor.inc.remote() for actor in actors])
assert results == [1 for actor in actors]

Expand Down
75 changes: 72 additions & 3 deletions python/ray/tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -864,6 +864,71 @@ def method(self):
assert ray.get([id1, id2, id3, id4]) == [0, 1, "test", 2]


def test_many_fractional_resources(shutdown_only):
ray.init(num_cpus=2, num_gpus=2, resources={"Custom": 2})

@ray.remote
def g():
return 1

@ray.remote
def f(block, accepted_resources):
true_resources = {
resource: value[0][1]
for resource, value in ray.get_resource_ids().items()
}
if block:
ray.get(g.remote())
return true_resources == accepted_resources

# Check that the resource are assigned correctly.
result_ids = []
for rand1, rand2, rand3 in np.random.uniform(size=(100, 3)):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this going to be really slow since we're doing ray.get inside of the loop? One option is just to put the assert inside of the remote function so you don't need to check the return value.

resource_set = {"CPU": int(rand1 * 10000) / 10000}
result_ids.append(f._remote([False, resource_set], num_cpus=rand1))

resource_set = {"CPU": 1, "GPU": int(rand1 * 10000) / 10000}
result_ids.append(f._remote([False, resource_set], num_gpus=rand1))

resource_set = {"CPU": 1, "Custom": int(rand1 * 10000) / 10000}
result_ids.append(
f._remote([False, resource_set], resources={"Custom": rand1}))

resource_set = {
"CPU": int(rand1 * 10000) / 10000,
"GPU": int(rand2 * 10000) / 10000,
"Custom": int(rand3 * 10000) / 10000
}
result_ids.append(
f._remote(
[False, resource_set],
num_cpus=rand1,
num_gpus=rand2,
resources={"Custom": rand3}))
result_ids.append(
f._remote(
[True, resource_set],
num_cpus=rand1,
num_gpus=rand2,
resources={"Custom": rand3}))
assert all(ray.get(result_ids))

# Check that the available resources at the end are the same as the
# beginning.
stop_time = time.time() + 10
correct_available_resources = False
while time.time() < stop_time:
if ray.global_state.available_resources() == {
"CPU": 2.0,
"GPU": 2.0,
"Custom": 2.0,
}:
correct_available_resources = True
break
if not correct_available_resources:
assert False, "Did not get correct available resources."


def test_get_multiple(ray_start_regular):
object_ids = [ray.put(i) for i in range(10)]
assert ray.get(object_ids) == list(range(10))
Expand Down Expand Up @@ -2126,20 +2191,24 @@ def f():
ray.get(results)


# TODO: 5 retry attempts may be too little for Travis and we may need to
# increase it if this test begins to be flaky on Travis.
def test_zero_capacity_deletion_semantics(shutdown_only):
ray.init(num_cpus=2, num_gpus=1, resources={"test_resource": 1})

def test():
resources = ray.global_state.available_resources()
MAX_RETRY_ATTEMPTS = 5
retry_count = 0

while resources and retry_count < 5:
while resources and retry_count < MAX_RETRY_ATTEMPTS:
time.sleep(0.1)
resources = ray.global_state.available_resources()
retry_count += 1

if retry_count >= 5:
raise RuntimeError("Resources were available even after retries.")
if retry_count >= MAX_RETRY_ATTEMPTS:
raise RuntimeError(
"Resources were available even after five retries.")

return resources

Expand Down
20 changes: 5 additions & 15 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1468,17 +1468,13 @@ void NodeManager::HandleTaskBlocked(const std::shared_ptr<LocalClientConnection>
local_queues_.QueueTasks({task}, TaskState::RUNNING);
// Get the CPU resources required by the running task.
const auto required_resources = task.GetTaskSpecification().GetRequiredResources();
double required_cpus = required_resources.GetNumCpus();
std::unordered_map<std::string, double> cpu_resources;
if (required_cpus > 0) {
cpu_resources[kCPU_ResourceLabel] = required_cpus;
}
const ResourceSet cpu_resources = required_resources.GetNumCpus();

// Release the CPU resources.
auto const cpu_resource_ids = worker->ReleaseTaskCpuResources();
local_available_resources_.Release(cpu_resource_ids);
cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()].Release(
ResourceSet(cpu_resources));
cpu_resources);
worker->MarkBlocked();

// Try dispatching tasks since we may have released some resources.
Expand Down Expand Up @@ -1521,12 +1517,7 @@ void NodeManager::HandleTaskUnblocked(
local_queues_.QueueTasks({task}, TaskState::RUNNING);
// Get the CPU resources required by the running task.
const auto required_resources = task.GetTaskSpecification().GetRequiredResources();
double required_cpus = required_resources.GetNumCpus();
std::unordered_map<std::string, double> cpu_resources_map;
if (required_cpus > 0) {
cpu_resources_map[kCPU_ResourceLabel] = required_cpus;
}
const ResourceSet cpu_resources(cpu_resources_map);
const ResourceSet cpu_resources = required_resources.GetNumCpus();
// Check if we can reacquire the CPU resources.
bool oversubscribed = !local_available_resources_.Contains(cpu_resources);

Expand Down Expand Up @@ -1633,7 +1624,8 @@ bool NodeManager::AssignTask(const Task &task) {

if (spec.IsActorCreationTask()) {
// Check that we are not placing an actor creation task on a node with 0 CPUs.
RAY_CHECK(cluster_resource_map_[my_client_id].GetTotalResources().GetNumCpus() != 0);
RAY_CHECK(cluster_resource_map_[my_client_id].GetTotalResources().GetResourceMap().at(
kCPU_ResourceLabel) != 0);
worker->SetLifetimeResourceIds(acquired_resources);
} else {
worker->SetTaskResourceIds(acquired_resources);
Expand Down Expand Up @@ -2037,8 +2029,6 @@ void NodeManager::ForwardTaskOrResubmit(const Task &task,
RAY_LOG(INFO) << "Failed to forward task " << task_id << " to node manager "
<< node_manager_id;

// TODO(romilb): We should probably revert the load subtraction from
// SchedulingPolicy::Schedule()
// Mark the failed task as pending to let other raylets know that we still
// have the task. TaskDependencyManager::TaskPending() is assumed to be
// idempotent.
Expand Down
1 change: 1 addition & 0 deletions src/ray/raylet/node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ class NodeManager {
/// \param data Data associated with the new client.
/// \return Void.
void ClientAdded(const ClientTableDataT &data);

/// Handler for the removal of a GCS client.
/// \param client_data Data associated with the removed client.
/// \return Void.
Expand Down
7 changes: 4 additions & 3 deletions src/ray/raylet/scheduling_policy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,10 @@ std::unordered_map<TaskID, ClientID> SchedulingPolicy::Schedule(
const auto &node_resources = client_resource_pair.second;
ResourceSet available_node_resources =
ResourceSet(node_resources.GetAvailableResources());
// TODO(romilb): Why do we need to subtract load from available resources?
// Even if we don't the code path below for choosing a dst_client_id would be
// similar.
// We have to subtract the current "load" because we set the current "load"
// to be the resources used by tasks that are in the
// `SchedulingQueue::ready_queue_` in NodeManager::ProcessGetTaskMessage's
// call to SchedulingQueue::GetResourceLoad.
available_node_resources.SubtractResources(node_resources.GetLoadResources());
RAY_LOG(DEBUG) << "client_id " << node_client_id
<< " avail: " << node_resources.GetAvailableResources().ToString()
Expand Down
Loading