diff --git a/python/ray/tests/test_actor.py b/python/ray/tests/test_actor.py index 53f2a6a5f63c..b072d69267db 100644 --- a/python/ray/tests/test_actor.py +++ b/python/ray/tests/test_actor.py @@ -1402,13 +1402,13 @@ def inc(self): return self.x # Create many actors. It should take a while to finish initializing them. - actors = [Counter.remote() for _ in range(100)] + actors = [Counter.remote() for _ in range(15)] # Allow some time to forward the actor creation tasks to the other node. time.sleep(0.1) # Kill the second node. cluster.remove_node(remote_node) - # Get all of the results + # Get all of the results. results = ray.get([actor.inc.remote() for actor in actors]) assert results == [1 for actor in actors] diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index c24c90e0d8da..5fa7f3dbfdd8 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -864,6 +864,71 @@ def method(self): assert ray.get([id1, id2, id3, id4]) == [0, 1, "test", 2] +def test_many_fractional_resources(shutdown_only): + ray.init(num_cpus=2, num_gpus=2, resources={"Custom": 2}) + + @ray.remote + def g(): + return 1 + + @ray.remote + def f(block, accepted_resources): + true_resources = { + resource: value[0][1] + for resource, value in ray.get_resource_ids().items() + } + if block: + ray.get(g.remote()) + return true_resources == accepted_resources + + # Check that the resource are assigned correctly. + result_ids = [] + for rand1, rand2, rand3 in np.random.uniform(size=(100, 3)): + resource_set = {"CPU": int(rand1 * 10000) / 10000} + result_ids.append(f._remote([False, resource_set], num_cpus=rand1)) + + resource_set = {"CPU": 1, "GPU": int(rand1 * 10000) / 10000} + result_ids.append(f._remote([False, resource_set], num_gpus=rand1)) + + resource_set = {"CPU": 1, "Custom": int(rand1 * 10000) / 10000} + result_ids.append( + f._remote([False, resource_set], resources={"Custom": rand1})) + + resource_set = { + "CPU": int(rand1 * 10000) / 10000, + "GPU": int(rand2 * 10000) / 10000, + "Custom": int(rand3 * 10000) / 10000 + } + result_ids.append( + f._remote( + [False, resource_set], + num_cpus=rand1, + num_gpus=rand2, + resources={"Custom": rand3})) + result_ids.append( + f._remote( + [True, resource_set], + num_cpus=rand1, + num_gpus=rand2, + resources={"Custom": rand3})) + assert all(ray.get(result_ids)) + + # Check that the available resources at the end are the same as the + # beginning. + stop_time = time.time() + 10 + correct_available_resources = False + while time.time() < stop_time: + if ray.global_state.available_resources() == { + "CPU": 2.0, + "GPU": 2.0, + "Custom": 2.0, + }: + correct_available_resources = True + break + if not correct_available_resources: + assert False, "Did not get correct available resources." + + def test_get_multiple(ray_start_regular): object_ids = [ray.put(i) for i in range(10)] assert ray.get(object_ids) == list(range(10)) @@ -2126,20 +2191,24 @@ def f(): ray.get(results) +# TODO: 5 retry attempts may be too little for Travis and we may need to +# increase it if this test begins to be flaky on Travis. def test_zero_capacity_deletion_semantics(shutdown_only): ray.init(num_cpus=2, num_gpus=1, resources={"test_resource": 1}) def test(): resources = ray.global_state.available_resources() + MAX_RETRY_ATTEMPTS = 5 retry_count = 0 - while resources and retry_count < 5: + while resources and retry_count < MAX_RETRY_ATTEMPTS: time.sleep(0.1) resources = ray.global_state.available_resources() retry_count += 1 - if retry_count >= 5: - raise RuntimeError("Resources were available even after retries.") + if retry_count >= MAX_RETRY_ATTEMPTS: + raise RuntimeError( + "Resources were available even after five retries.") return resources diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 267dc7a3008f..6752789895d7 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1468,17 +1468,13 @@ void NodeManager::HandleTaskBlocked(const std::shared_ptr local_queues_.QueueTasks({task}, TaskState::RUNNING); // Get the CPU resources required by the running task. const auto required_resources = task.GetTaskSpecification().GetRequiredResources(); - double required_cpus = required_resources.GetNumCpus(); - std::unordered_map cpu_resources; - if (required_cpus > 0) { - cpu_resources[kCPU_ResourceLabel] = required_cpus; - } + const ResourceSet cpu_resources = required_resources.GetNumCpus(); // Release the CPU resources. auto const cpu_resource_ids = worker->ReleaseTaskCpuResources(); local_available_resources_.Release(cpu_resource_ids); cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()].Release( - ResourceSet(cpu_resources)); + cpu_resources); worker->MarkBlocked(); // Try dispatching tasks since we may have released some resources. @@ -1521,12 +1517,7 @@ void NodeManager::HandleTaskUnblocked( local_queues_.QueueTasks({task}, TaskState::RUNNING); // Get the CPU resources required by the running task. const auto required_resources = task.GetTaskSpecification().GetRequiredResources(); - double required_cpus = required_resources.GetNumCpus(); - std::unordered_map cpu_resources_map; - if (required_cpus > 0) { - cpu_resources_map[kCPU_ResourceLabel] = required_cpus; - } - const ResourceSet cpu_resources(cpu_resources_map); + const ResourceSet cpu_resources = required_resources.GetNumCpus(); // Check if we can reacquire the CPU resources. bool oversubscribed = !local_available_resources_.Contains(cpu_resources); @@ -1633,7 +1624,8 @@ bool NodeManager::AssignTask(const Task &task) { if (spec.IsActorCreationTask()) { // Check that we are not placing an actor creation task on a node with 0 CPUs. - RAY_CHECK(cluster_resource_map_[my_client_id].GetTotalResources().GetNumCpus() != 0); + RAY_CHECK(cluster_resource_map_[my_client_id].GetTotalResources().GetResourceMap().at( + kCPU_ResourceLabel) != 0); worker->SetLifetimeResourceIds(acquired_resources); } else { worker->SetTaskResourceIds(acquired_resources); @@ -2037,8 +2029,6 @@ void NodeManager::ForwardTaskOrResubmit(const Task &task, RAY_LOG(INFO) << "Failed to forward task " << task_id << " to node manager " << node_manager_id; - // TODO(romilb): We should probably revert the load subtraction from - // SchedulingPolicy::Schedule() // Mark the failed task as pending to let other raylets know that we still // have the task. TaskDependencyManager::TaskPending() is assumed to be // idempotent. diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index da9f7af7d1c3..ad2091c141f8 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -111,6 +111,7 @@ class NodeManager { /// \param data Data associated with the new client. /// \return Void. void ClientAdded(const ClientTableDataT &data); + /// Handler for the removal of a GCS client. /// \param client_data Data associated with the removed client. /// \return Void. diff --git a/src/ray/raylet/scheduling_policy.cc b/src/ray/raylet/scheduling_policy.cc index a8c1db4b17e1..32d4e4f039a6 100644 --- a/src/ray/raylet/scheduling_policy.cc +++ b/src/ray/raylet/scheduling_policy.cc @@ -49,9 +49,10 @@ std::unordered_map SchedulingPolicy::Schedule( const auto &node_resources = client_resource_pair.second; ResourceSet available_node_resources = ResourceSet(node_resources.GetAvailableResources()); - // TODO(romilb): Why do we need to subtract load from available resources? - // Even if we don't the code path below for choosing a dst_client_id would be - // similar. + // We have to subtract the current "load" because we set the current "load" + // to be the resources used by tasks that are in the + // `SchedulingQueue::ready_queue_` in NodeManager::ProcessGetTaskMessage's + // call to SchedulingQueue::GetResourceLoad. available_node_resources.SubtractResources(node_resources.GetLoadResources()); RAY_LOG(DEBUG) << "client_id " << node_client_id << " avail: " << node_resources.GetAvailableResources().ToString() diff --git a/src/ray/raylet/scheduling_resources.cc b/src/ray/raylet/scheduling_resources.cc index 17a9aba16c97..f53bf7958bcd 100644 --- a/src/ray/raylet/scheduling_resources.cc +++ b/src/ray/raylet/scheduling_resources.cc @@ -9,28 +9,80 @@ namespace ray { namespace raylet { -bool EqualsZeroEpsilon(double quantity) { - if ((quantity <= EPSILON) && (quantity >= -1 * EPSILON)) { - return true; - } - return false; +FractionalResourceQuantity::FractionalResourceQuantity() { resource_quantity_ = 0; } + +FractionalResourceQuantity::FractionalResourceQuantity(double resource_quantity) { + // We check for nonnegativeity due to the implicit conversion to + // FractionalResourceQuantity from ints/doubles when we do logical + // comparisons. + RAY_CHECK(resource_quantity >= 0) << "Resource capacity, " << resource_quantity + << ", should be nonnegative."; + + resource_quantity_ = static_cast(resource_quantity * kResourceConversionFactor); } -bool EqualsOneEpsilon(double quantity) { - if ((quantity <= 1 + EPSILON) && (quantity >= 1 - EPSILON)) { - return true; - } - return false; +const FractionalResourceQuantity FractionalResourceQuantity::operator+( + const FractionalResourceQuantity &rhs) const { + FractionalResourceQuantity result = *this; + result += rhs; + return result; +} + +const FractionalResourceQuantity FractionalResourceQuantity::operator-( + const FractionalResourceQuantity &rhs) const { + FractionalResourceQuantity result = *this; + result -= rhs; + return result; +} + +void FractionalResourceQuantity::operator+=(const FractionalResourceQuantity &rhs) { + resource_quantity_ += rhs.resource_quantity_; +} + +void FractionalResourceQuantity::operator-=(const FractionalResourceQuantity &rhs) { + resource_quantity_ -= rhs.resource_quantity_; +} + +bool FractionalResourceQuantity::operator==(const FractionalResourceQuantity &rhs) const { + return resource_quantity_ == rhs.resource_quantity_; +} + +bool FractionalResourceQuantity::operator!=(const FractionalResourceQuantity &rhs) const { + return !(*this == rhs); +} + +bool FractionalResourceQuantity::operator<(const FractionalResourceQuantity &rhs) const { + return resource_quantity_ < rhs.resource_quantity_; +} + +bool FractionalResourceQuantity::operator>(const FractionalResourceQuantity &rhs) const { + return rhs < *this; +} + +bool FractionalResourceQuantity::operator<=(const FractionalResourceQuantity &rhs) const { + return !(*this > rhs); +} + +bool FractionalResourceQuantity::operator>=(const FractionalResourceQuantity &rhs) const { + bool result = !(*this < rhs); + return result; +} + +double FractionalResourceQuantity::ToDouble() const { + return static_cast(resource_quantity_) / kResourceConversionFactor; } ResourceSet::ResourceSet() {} -ResourceSet::ResourceSet(const std::unordered_map &resource_map) - : resource_capacity_(resource_map) { - for (auto const &resource_pair : resource_capacity_) { - RAY_CHECK(resource_pair.second > 0 + EPSILON) - << "Resource " << resource_pair.first << " capacity is " << resource_pair.second - << ". Should have been greater than zero."; +ResourceSet::ResourceSet( + const std::unordered_map &resource_map) + : resource_capacity_(resource_map) {} + +ResourceSet::ResourceSet(const std::unordered_map &resource_map) { + for (auto const &resource_pair : resource_map) { + RAY_CHECK(resource_pair.second > 0); + resource_capacity_[resource_pair.first] = + FractionalResourceQuantity(resource_pair.second); } } @@ -38,10 +90,9 @@ ResourceSet::ResourceSet(const std::vector &resource_labels, const std::vector resource_capacity) { RAY_CHECK(resource_labels.size() == resource_capacity.size()); for (uint i = 0; i < resource_labels.size(); i++) { - RAY_CHECK(resource_capacity[i] > 0 + EPSILON) - << "Resource " << resource_labels[i] << " capacity is " << resource_capacity[i] - << ". Should have been greater than zero."; - resource_capacity_[resource_labels[i]] = resource_capacity[i]; + RAY_CHECK(resource_capacity[i] > 0); + resource_capacity_[resource_labels[i]] = + FractionalResourceQuantity(resource_capacity[i]); } } @@ -60,8 +111,8 @@ bool ResourceSet::IsSubset(const ResourceSet &other) const { // Check to make sure all keys of this are in other. for (const auto &resource_pair : resource_capacity_) { const auto &resource_name = resource_pair.first; - const double lhs_quantity = resource_pair.second; - double rhs_quantity = other.GetResource(resource_name); + const FractionalResourceQuantity &lhs_quantity = resource_pair.second; + const FractionalResourceQuantity &rhs_quantity = other.GetResource(resource_name); if (lhs_quantity > rhs_quantity) { // Resource found in rhs, but lhs capacity exceeds rhs capacity. return false; @@ -83,16 +134,18 @@ bool ResourceSet::IsEqual(const ResourceSet &rhs) const { bool ResourceSet::RemoveResource(const std::string &resource_name) { throw std::runtime_error("Method not implemented"); } + void ResourceSet::SubtractResources(const ResourceSet &other) { - // Subtract the resources and delete any if new capacity is zero. - for (const auto &resource_pair : other.GetResourceMap()) { + // Subtract the resources, make sure none goes below zero and delete any if new capacity + // is zero. + for (const auto &resource_pair : other.GetResourceAmountMap()) { const std::string &resource_label = resource_pair.first; - const double &resource_capacity = resource_pair.second; + const FractionalResourceQuantity &resource_capacity = resource_pair.second; if (resource_capacity_.count(resource_label) == 1) { resource_capacity_[resource_label] -= resource_capacity; - if (resource_capacity_[resource_label] < 0 + EPSILON) { - resource_capacity_.erase(resource_label); - } + } + if (resource_capacity_[resource_label] <= 0) { + resource_capacity_.erase(resource_label); } } } @@ -100,19 +153,20 @@ void ResourceSet::SubtractResources(const ResourceSet &other) { void ResourceSet::SubtractResourcesStrict(const ResourceSet &other) { // Subtract the resources, make sure none goes below zero and delete any if new capacity // is zero. - for (const auto &resource_pair : other.GetResourceMap()) { + for (const auto &resource_pair : other.GetResourceAmountMap()) { const std::string &resource_label = resource_pair.first; - const double &resource_capacity = resource_pair.second; + const FractionalResourceQuantity &resource_capacity = resource_pair.second; RAY_CHECK(resource_capacity_.count(resource_label) == 1) << "Attempt to acquire unknown resource: " << resource_label; resource_capacity_[resource_label] -= resource_capacity; - // TODO(romilb): Double precision subtraction may sometimes be less than zero by a - // small epsilon - need to fix. - RAY_CHECK(resource_capacity_[resource_label] >= 0 - EPSILON) - << "Capacity of resource " << resource_label << " after subtraction is negative (" - << resource_capacity_[resource_label] << ")." - << " Debug: resource_capacity_:" << ToString() << ", other: " << other.ToString(); - if (EqualsZeroEpsilon(resource_capacity_[resource_label])) { + + // Ensure that quantity is positive. Note, we have to have the check before + // erasing the object to make sure that it doesn't get added back. + RAY_CHECK(resource_capacity_[resource_label] >= 0) + << "Capacity of resource after subtraction is negative, " + << resource_capacity_[resource_label].ToDouble() << "."; + + if (resource_capacity_[resource_label] == 0) { resource_capacity_.erase(resource_label); } } @@ -120,46 +174,64 @@ void ResourceSet::SubtractResourcesStrict(const ResourceSet &other) { // Perform an outer join. void ResourceSet::AddResources(const ResourceSet &other) { - for (const auto &resource_pair : other.GetResourceMap()) { + for (const auto &resource_pair : other.GetResourceAmountMap()) { const std::string &resource_label = resource_pair.first; - const double &resource_capacity = resource_pair.second; + const FractionalResourceQuantity &resource_capacity = resource_pair.second; resource_capacity_[resource_label] += resource_capacity; } } -double ResourceSet::GetResource(const std::string &resource_name) const { - if (EqualsZeroEpsilon(resource_capacity_.count(resource_name))) { +FractionalResourceQuantity ResourceSet::GetResource( + const std::string &resource_name) const { + if (resource_capacity_.count(resource_name) == 0) { return 0; } - double capacity = resource_capacity_.at(resource_name); - RAY_CHECK(capacity > 0 + EPSILON) << "Resource " << resource_name << " capacity is " - << capacity - << ". Should have been greater than zero."; + const FractionalResourceQuantity capacity = resource_capacity_.at(resource_name); return capacity; } -double ResourceSet::GetNumCpus() const { return GetResource(kCPU_ResourceLabel); } +const ResourceSet ResourceSet::GetNumCpus() const { + ResourceSet cpu_resource_set; + cpu_resource_set.resource_capacity_[kCPU_ResourceLabel] = + GetResource(kCPU_ResourceLabel); + return cpu_resource_set; +} const std::string ResourceSet::ToString() const { - std::string return_string = ""; + if (resource_capacity_.size() == 0) { + return "{}"; + } else { + std::string return_string = ""; - auto it = resource_capacity_.begin(); + auto it = resource_capacity_.begin(); - // Convert the first element to a string. - if (it != resource_capacity_.end()) { - return_string += "{" + it->first + "," + std::to_string(it->second) + "}"; - it++; - } + // Convert the first element to a string. + if (it != resource_capacity_.end()) { + double resource_amount = (it->second).ToDouble(); + return_string += "{" + it->first + "," + std::to_string(resource_amount) + "}"; + it++; + } - // Add the remaining elements to the string (along with a comma). - for (; it != resource_capacity_.end(); ++it) { - return_string += ",{" + it->first + "," + std::to_string(it->second) + "}"; - } + // Add the remaining elements to the string (along with a comma). + for (; it != resource_capacity_.end(); ++it) { + double resource_amount = (it->second).ToDouble(); + return_string += ",{" + it->first + "," + std::to_string(resource_amount) + "}"; + } - return return_string; + return return_string; + } } -const std::unordered_map &ResourceSet::GetResourceMap() const { +const std::unordered_map ResourceSet::GetResourceMap() const { + std::unordered_map result; + for (const auto resource_pair : resource_capacity_) { + result[resource_pair.first] = resource_pair.second.ToDouble(); + } + return result; +}; + +const std::unordered_map + &ResourceSet::GetResourceAmountMap() const { return resource_capacity_; }; @@ -177,18 +249,20 @@ ResourceIds::ResourceIds(double resource_quantity) { ResourceIds::ResourceIds(const std::vector &whole_ids) : whole_ids_(whole_ids) {} -ResourceIds::ResourceIds(const std::vector> &fractional_ids) +ResourceIds::ResourceIds( + const std::vector> &fractional_ids) : fractional_ids_(fractional_ids) {} -ResourceIds::ResourceIds(const std::vector &whole_ids, - const std::vector> &fractional_ids) +ResourceIds::ResourceIds( + const std::vector &whole_ids, + const std::vector> &fractional_ids) : whole_ids_(whole_ids), fractional_ids_(fractional_ids) {} -bool ResourceIds::Contains(double resource_quantity) const { - RAY_CHECK(resource_quantity >= 0); +bool ResourceIds::Contains(FractionalResourceQuantity resource_quantity) const { if (resource_quantity >= 1) { - RAY_CHECK(IsWhole(resource_quantity)); - return whole_ids_.size() >= resource_quantity; + double whole_quantity = resource_quantity.ToDouble(); + RAY_CHECK(IsWhole(whole_quantity)); + return whole_ids_.size() >= whole_quantity; } else { if (whole_ids_.size() > 0) { return true; @@ -203,13 +277,13 @@ bool ResourceIds::Contains(double resource_quantity) const { } } -ResourceIds ResourceIds::Acquire(double resource_quantity) { - RAY_CHECK(resource_quantity >= 0); +ResourceIds ResourceIds::Acquire(FractionalResourceQuantity resource_quantity) { if (resource_quantity >= 1) { // Handle the whole case. - RAY_CHECK(IsWhole(resource_quantity)); - int64_t whole_quantity = resource_quantity; - RAY_CHECK(static_cast(whole_ids_.size()) >= whole_quantity); + double whole_quantity = resource_quantity.ToDouble(); + RAY_CHECK(IsWhole(whole_quantity)); + RAY_CHECK(static_cast(whole_ids_.size()) >= + static_cast(whole_quantity)); std::vector ids_to_return; for (int64_t i = 0; i < whole_quantity; ++i) { @@ -227,7 +301,7 @@ ResourceIds ResourceIds::Acquire(double resource_quantity) { fractional_pair.second -= resource_quantity; // Remove the fractional pair if the new capacity is 0 - if (EqualsZeroEpsilon(fractional_pair.second)) { + if (fractional_pair.second == 0) { std::swap(fractional_pair, fractional_ids_[fractional_ids_.size() - 1]); fractional_ids_.pop_back(); } @@ -242,7 +316,11 @@ ResourceIds ResourceIds::Acquire(double resource_quantity) { whole_ids_.pop_back(); auto return_pair = std::make_pair(whole_id, resource_quantity); - fractional_ids_.push_back(std::make_pair(whole_id, 1 - resource_quantity)); + // We cannot make use of the implicit conversion because ints have no + // operator-(const FractionalResourceQuantity&) function. + FractionalResourceQuantity remaining_amount = + FractionalResourceQuantity(1) - resource_quantity; + fractional_ids_.push_back(std::make_pair(whole_id, remaining_amount)); return ResourceIds({return_pair}); } } @@ -258,24 +336,20 @@ void ResourceIds::Release(const ResourceIds &resource_ids) { auto const &fractional_ids_to_return = resource_ids.FractionalIds(); for (auto const &fractional_pair_to_return : fractional_ids_to_return) { int64_t resource_id = fractional_pair_to_return.first; - auto const &fractional_pair_it = - std::find_if(fractional_ids_.begin(), fractional_ids_.end(), - [resource_id](std::pair &fractional_pair) { - return fractional_pair.first == resource_id; - }); + auto const &fractional_pair_it = std::find_if( + fractional_ids_.begin(), fractional_ids_.end(), + [resource_id](std::pair &fractional_pair) { + return fractional_pair.first == resource_id; + }); if (fractional_pair_it == fractional_ids_.end()) { fractional_ids_.push_back(fractional_pair_to_return); } else { fractional_pair_it->second += fractional_pair_to_return.second; - // TODO(romilb): Double precision addition may sometimes exceed 1 by a small epsilon - // - need to fix this. - RAY_CHECK(fractional_pair_it->second <= 1 + EPSILON) + RAY_CHECK(fractional_pair_it->second <= 1) << "Fractional Resource Id " << fractional_pair_it->first << " capacity is " - << fractional_pair_it->second << ". Should have been less than one."; + << fractional_pair_it->second.ToDouble() << ". Should have been less than one."; // If this makes the ID whole, then return it to the list of whole IDs. - // TODO(romilb): Double precision addition may sometimes exceed 1 by a small epsilon - // - need to fix this. - if (EqualsOneEpsilon(fractional_pair_it->second)) { + if (fractional_pair_it->second == 1) { whole_ids_.push_back(resource_id); fractional_ids_.erase(fractional_pair_it); } @@ -291,7 +365,8 @@ ResourceIds ResourceIds::Plus(const ResourceIds &resource_ids) const { const std::vector &ResourceIds::WholeIds() const { return whole_ids_; } -const std::vector> &ResourceIds::FractionalIds() const { +const std::vector> + &ResourceIds::FractionalIds() const { return fractional_ids_; } @@ -299,8 +374,9 @@ bool ResourceIds::TotalQuantityIsZero() const { return whole_ids_.empty() && fractional_ids_.empty(); } -double ResourceIds::TotalQuantity() const { - double total_quantity = whole_ids_.size(); +FractionalResourceQuantity ResourceIds::TotalQuantity() const { + FractionalResourceQuantity total_quantity = + FractionalResourceQuantity(whole_ids_.size()); for (auto const &fractional_pair : fractional_ids_) { total_quantity += fractional_pair.second; } @@ -314,8 +390,9 @@ std::string ResourceIds::ToString() const { } return_string += "], Fractional IDs: "; for (auto const &fractional_pair : fractional_ids_) { + double fractional_amount = fractional_pair.second.ToDouble(); return_string += "(" + std::to_string(fractional_pair.first) + ", " + - std::to_string(fractional_pair.second) + "), "; + std::to_string(fractional_amount) + "), "; } return_string += "]"; return return_string; @@ -343,12 +420,9 @@ ResourceIdSet::ResourceIdSet( : available_resources_(available_resources) {} bool ResourceIdSet::Contains(const ResourceSet &resource_set) const { - for (auto const &resource_pair : resource_set.GetResourceMap()) { + for (auto const &resource_pair : resource_set.GetResourceAmountMap()) { auto const &resource_name = resource_pair.first; - double resource_quantity = resource_pair.second; - RAY_CHECK(resource_quantity > 0 + EPSILON) << "Resource " << resource_name - << " capacity is " << resource_quantity - << ". Should have been greater than zero."; + FractionalResourceQuantity resource_quantity = resource_pair.second; auto it = available_resources_.find(resource_name); if (it == available_resources_.end()) { @@ -365,12 +439,9 @@ bool ResourceIdSet::Contains(const ResourceSet &resource_set) const { ResourceIdSet ResourceIdSet::Acquire(const ResourceSet &resource_set) { std::unordered_map acquired_resources; - for (auto const &resource_pair : resource_set.GetResourceMap()) { + for (auto const &resource_pair : resource_set.GetResourceAmountMap()) { auto const &resource_name = resource_pair.first; - double resource_quantity = resource_pair.second; - RAY_CHECK(resource_quantity > 0 + EPSILON) << "Resource " << resource_name - << " capacity is " << resource_quantity - << ". Should have been greater than zero."; + FractionalResourceQuantity resource_quantity = resource_pair.second; auto it = available_resources_.find(resource_name); RAY_CHECK(it != available_resources_.end()); @@ -421,7 +492,7 @@ ResourceIdSet ResourceIdSet::GetCpuResources() const { } ResourceSet ResourceIdSet::ToResourceSet() const { - std::unordered_map resource_set; + std::unordered_map resource_set; for (auto const &resource_pair : available_resources_) { resource_set[resource_pair.first] = resource_pair.second.TotalQuantity(); } @@ -460,7 +531,7 @@ std::vector> ResourceIdSet::ToF for (auto const &fractional_pair : resource_pair.second.FractionalIds()) { resource_ids.push_back(fractional_pair.first); - resource_fractions.push_back(fractional_pair.second); + resource_fractions.push_back(fractional_pair.second.ToDouble()); } auto resource_id_set_message = protocol::CreateResourceIdSetInfo( diff --git a/src/ray/raylet/scheduling_resources.h b/src/ray/raylet/scheduling_resources.h index df1a537797f9..aed05a6eb1c9 100644 --- a/src/ray/raylet/scheduling_resources.h +++ b/src/ray/raylet/scheduling_resources.h @@ -8,25 +8,58 @@ #include "ray/raylet/format/node_manager_generated.h" -#define EPSILON 0.00001 - namespace ray { namespace raylet { +/// Conversion factor that is the amount in internal units is equivalent to +/// one actual resource. Multiply to convert from actual to interal and +/// divide to convert from internal to actual. +constexpr double kResourceConversionFactor = 10000; + const std::string kCPU_ResourceLabel = "CPU"; -/// \brief Test if the quantity is within epsilon bounds of 0. -/// -/// \param quantity: Quantity to check -/// \return True if -epsilon <= Quantity <= epsilon, False otherwise. -bool EqualsZeroEpsilon(double quantity); +/// \class FractionalResourceQuantity +/// \brief Converts the resource quantities to an internal representation to +/// avoid machine precision errors. +class FractionalResourceQuantity { + public: + /// \brief Construct a FractionalResourceQuantity representing zero + /// resources. This constructor is used by std::unordered_map when we try + /// to add a new FractionalResourceQuantity in ResourceSets. + FractionalResourceQuantity(); + + /// \brief Construct a FractionalResourceQuantity representing + /// resource_quantity. + FractionalResourceQuantity(double resource_quantity); + + /// \brief Addition of FractionalResourceQuantity. + const FractionalResourceQuantity operator+(const FractionalResourceQuantity &rhs) const; + + /// \brief Subtraction of FractionalResourceQuantity. + const FractionalResourceQuantity operator-(const FractionalResourceQuantity &rhs) const; + + /// \brief Addition and assignment of FractionalResourceQuantity. + void operator+=(const FractionalResourceQuantity &rhs); -/// \brief Test if the quantity is within epsilon bounds of 1. -/// -/// \param quantity: Quantity to check -/// \return True if 1 - epsilon <= Quantity <= 1 + epsilon, False otherwise. -bool EqualsOneEpsilon(double quantity); + /// \brief Subtraction and assignment of FractionalResourceQuantity. + void operator-=(const FractionalResourceQuantity &rhs); + + bool operator==(const FractionalResourceQuantity &rhs) const; + bool operator!=(const FractionalResourceQuantity &rhs) const; + bool operator<(const FractionalResourceQuantity &rhs) const; + bool operator>(const FractionalResourceQuantity &rhs) const; + bool operator<=(const FractionalResourceQuantity &rhs) const; + bool operator>=(const FractionalResourceQuantity &rhs) const; + + /// \brief Return actual resource amount as a double. + double ToDouble() const; + + private: + /// The resource quantity represented as 1/kResourceConversionFactor-th of a + /// unit. + int resource_quantity_; +}; /// \class ResourceSet /// \brief Encapsulates and operates on a set of resources, including CPUs, @@ -36,6 +69,10 @@ class ResourceSet { /// \brief empty ResourceSet constructor. ResourceSet(); + /// \brief Constructs ResourceSet from the specified resource map. + ResourceSet( + const std::unordered_map &resource_map); + /// \brief Constructs ResourceSet from the specified resource map. ResourceSet(const std::unordered_map &resource_map); @@ -86,17 +123,17 @@ class ResourceSet { /// \return Void. void AddResources(const ResourceSet &other); - /// \brief Subtract a set of resources from the current set of resources. - /// Deletes any resource if the capacity after subtraction is zero or negative. + /// \brief Subtract a set of resources from the current set of resources and + /// check that the post-subtraction result nonnegative. Assumes other + /// is a subset of the ResourceSet. Deletes any resource if the capacity after + /// subtraction is zero. /// /// \param other: The resource set to subtract from the current resource set. /// \return Void. void SubtractResources(const ResourceSet &other); - /// \brief Subtract a set of resources from the current set of resources and - /// check that the post-subtraction result nonnegative. Assumes other - /// is a subset of the ResourceSet. Deletes any resource if the capacity after - /// subtraction is zero. + /// \brief Same as SubtractResources but throws an error if the resource value + /// goes below zero. /// /// \param other: The resource set to subtract from the current resource set. /// \return Void. @@ -107,12 +144,12 @@ class ResourceSet { /// \param resource_name: Resource name for which capacity is requested. /// \return The capacity value associated with the specified resource, zero if resource /// does not exist. - double GetResource(const std::string &resource_name) const; + FractionalResourceQuantity GetResource(const std::string &resource_name) const; /// Return the number of CPUs. /// /// \return Number of CPUs. - double GetNumCpus() const; + const ResourceSet GetNumCpus() const; /// Return true if the resource set is empty. False otherwise. /// @@ -120,13 +157,26 @@ class ResourceSet { bool IsEmpty() const; // TODO(atumanov): implement const_iterator class for the ResourceSet container. - const std::unordered_map &GetResourceMap() const; + // TODO(williamma12): Make sure that everywhere we use doubles we don't + // convert it back to FractionalResourceQuantity. + /// \brief Return a map of the resource and size in doubles. Note, size is in + /// regular units and does not need to be multiplied by kResourceConversionFactor. + /// + /// \return map of resource in string to size in double. + const std::unordered_map GetResourceMap() const; + + /// \brief Return a map of the resource and size in FractionalResourceQuantity. Note, + /// size is in kResourceConversionFactor of a unit. + /// + /// \return map of resource in string to size in FractionalResourceQuantity. + const std::unordered_map + &GetResourceAmountMap() const; const std::string ToString() const; private: - /// Resource capacity map. The capacities (double) are always positive. - std::unordered_map resource_capacity_; + /// Resource capacity map. + std::unordered_map resource_capacity_; }; /// \class ResourceIds @@ -154,14 +204,16 @@ class ResourceIds { /// \brief Constructs ResourceIds with a given set of fractional IDs. /// /// \param fractional_ids: A vector of the resource IDs that are partially available. - explicit ResourceIds(const std::vector> &fractional_ids); + explicit ResourceIds( + const std::vector> &fractional_ids); /// \brief Constructs ResourceIds with a given set of whole IDs and fractional IDs. /// /// \param whole_ids: A vector of the resource IDs that are completely available. /// \param fractional_ids: A vector of the resource IDs that are partially available. - ResourceIds(const std::vector &whole_ids, - const std::vector> &fractional_ids); + ResourceIds( + const std::vector &whole_ids, + const std::vector> &fractional_ids); /// \brief Check if we have at least the requested amount. /// @@ -173,14 +225,14 @@ class ResourceIds { /// /// \param resource_quantity Either a whole number or a fraction less than 1. /// \return True if there we have enough of the resource. - bool Contains(double resource_quantity) const; + bool Contains(FractionalResourceQuantity resource_quantity) const; /// \brief Acquire the requested amount of the resource. /// /// \param resource_quantity The amount to acquire. Either a whole number or a /// fraction less than 1. /// \return A ResourceIds representing the specific acquired IDs. - ResourceIds Acquire(double resource_quantity); + ResourceIds Acquire(FractionalResourceQuantity resource_quantity); /// \brief Return some resource IDs. /// @@ -202,7 +254,8 @@ class ResourceIds { /// \brief Return just the fractional IDs. /// /// \return The fractional IDs. - const std::vector> &FractionalIds() const; + const std::vector> &FractionalIds() + const; /// \brief Check if ResourceIds has any resources. /// @@ -212,7 +265,7 @@ class ResourceIds { /// \brief Return the total quantity of resources, ignoring the specific IDs. /// /// \return The total quantity of the resource. - double TotalQuantity() const; + FractionalResourceQuantity TotalQuantity() const; /// \brief Return a string representation of the object. /// @@ -230,7 +283,7 @@ class ResourceIds { std::vector whole_ids_; /// A vector of pairs of resource ID and a fraction of that ID (the fraction /// is at least zero and strictly less than 1). - std::vector> fractional_ids_; + std::vector> fractional_ids_; }; /// \class ResourceIdSet @@ -399,6 +452,6 @@ struct hash { return seed; } }; -} +} // namespace std #endif // RAY_RAYLET_SCHEDULING_RESOURCES_H diff --git a/src/ray/raylet/task_spec.cc b/src/ray/raylet/task_spec.cc index 917dec944ee6..5f301c47c1c3 100644 --- a/src/ray/raylet/task_spec.cc +++ b/src/ray/raylet/task_spec.cc @@ -203,13 +203,6 @@ size_t TaskSpecification::ArgValLength(int64_t arg_index) const { return message->args()->Get(arg_index)->data()->size(); } -double TaskSpecification::GetRequiredResource(const std::string &resource_name) const { - RAY_CHECK(required_resources_.GetResourceMap().empty() == false); - auto it = required_resources_.GetResourceMap().find(resource_name); - RAY_CHECK(it != required_resources_.GetResourceMap().end()); - return it->second; -} - const ResourceSet TaskSpecification::GetRequiredResources() const { return required_resources_; } diff --git a/src/ray/raylet/task_spec.h b/src/ray/raylet/task_spec.h index 86bf35d003b8..6bb2cdad972c 100644 --- a/src/ray/raylet/task_spec.h +++ b/src/ray/raylet/task_spec.h @@ -183,7 +183,6 @@ class TaskSpecification { ObjectID ReturnId(int64_t return_index) const; const uint8_t *ArgVal(int64_t arg_index) const; size_t ArgValLength(int64_t arg_index) const; - double GetRequiredResource(const std::string &resource_name) const; /// Return the resources that are to be acquired during the execution of this /// task. ///