diff --git a/python/ray/tests/test_gcs_fault_tolerance.py b/python/ray/tests/test_gcs_fault_tolerance.py index ca34239fd7ca..afdf52311296 100644 --- a/python/ray/tests/test_gcs_fault_tolerance.py +++ b/python/ray/tests/test_gcs_fault_tolerance.py @@ -1,5 +1,4 @@ import sys -import asyncio import os import threading from time import sleep @@ -23,8 +22,6 @@ ) from ray.job_submission import JobSubmissionClient, JobStatus from ray._raylet import GcsClient -from ray._private.runtime_env.plugin import RuntimeEnvPlugin -from ray.util.state import list_placement_groups import psutil @@ -1216,87 +1213,6 @@ def spawn(self, name, namespace): raise ValueError(f"Unknown case: {case}") -MyPlugin = "MyPlugin" -MY_PLUGIN_CLASS_PATH = "ray.tests.test_gcs_fault_tolerance.HangPlugin" - - -class HangPlugin(RuntimeEnvPlugin): - name = MyPlugin - - async def create( - self, - uri, - runtime_env, - ctx, - logger, # noqa: F821 - ) -> float: - while True: - await asyncio.sleep(1) - - @staticmethod - def validate(runtime_env_dict: dict) -> str: - return 1 - - -@pytest.mark.parametrize( - "ray_start_regular_with_external_redis", - [ - generate_system_config_map( - gcs_rpc_server_reconnect_timeout_s=60, - testing_asio_delay_us="NodeManagerService.grpc_server.CancelResourceReserve=500000000:500000000", # noqa: E501 - ), - ], - indirect=True, -) -@pytest.mark.parametrize( - "set_runtime_env_plugins", - [ - '[{"class":"' + MY_PLUGIN_CLASS_PATH + '"}]', - ], - indirect=True, -) -def test_placement_group_removal_after_gcs_restarts( - set_runtime_env_plugins, ray_start_regular_with_external_redis -): - @ray.remote - def task(): - pass - - pg = ray.util.placement_group(bundles=[{"CPU": 1}]) - _ = task.options( - max_retries=0, - num_cpus=1, - scheduling_strategy=PlacementGroupSchedulingStrategy( - placement_group=pg, - ), - runtime_env={ - MyPlugin: {"name": "f2"}, - "config": {"setup_timeout_seconds": -1}, - }, - ).remote() - - # The task should be popping worker - # TODO(jjyao) Use a more determinstic way to - # decide whether the task is popping worker - sleep(5) - - ray.util.remove_placement_group(pg) - # The PG is marked as REMOVED in redis but not removed yet from raylet - # due to the injected delay of CancelResourceReserve rpc - wait_for_condition(lambda: list_placement_groups()[0].state == "REMOVED") - - ray._private.worker._global_node.kill_gcs_server() - # After GCS restarts, it will try to remove the PG resources - # again via ReleaseUnusedBundles rpc - ray._private.worker._global_node.start_gcs_server() - - def verify_pg_resources_cleaned(): - r_keys = ray.available_resources().keys() - return all("group" not in k for k in r_keys) - - wait_for_condition(verify_pg_resources_cleaned, timeout=30) - - if __name__ == "__main__": import pytest diff --git a/python/ray/tests/test_placement_group_5.py b/python/ray/tests/test_placement_group_5.py index 4af2894a99f3..9837b2ffce89 100644 --- a/python/ray/tests/test_placement_group_5.py +++ b/python/ray/tests/test_placement_group_5.py @@ -470,9 +470,10 @@ async def create( ) -> float: await asyncio.sleep(PLUGIN_TIMEOUT) - @staticmethod - def validate(runtime_env_dict: dict) -> str: - return 1 + +@staticmethod +def validate(runtime_env_dict: dict) -> str: + return 1 @pytest.mark.parametrize( diff --git a/src/ray/core_worker/core_worker_process.cc b/src/ray/core_worker/core_worker_process.cc index 95ab400f1689..5991ab6ab0e1 100644 --- a/src/ray/core_worker/core_worker_process.cc +++ b/src/ray/core_worker/core_worker_process.cc @@ -233,7 +233,6 @@ void CoreWorkerProcessImpl::InitializeSystemConfig() { thread.join(); RayConfig::instance().initialize(promise.get_future().get()); - ray::asio::testing::init(); } void CoreWorkerProcessImpl::RunWorkerTaskExecutionLoop() { diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc index 4a40e27c8319..0e3425d909e8 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc +++ b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc @@ -243,9 +243,9 @@ void GcsPlacementGroupScheduler::CancelResourceReserve( auto node_id = NodeID::FromBinary(node.value()->node_id()); if (max_retry == current_retry_cnt) { - RAY_LOG(ERROR) << "Failed to cancel resource reserved for bundle because the max " - "retry count is reached. " - << bundle_spec->DebugString() << " at node " << node_id; + RAY_LOG(INFO) << "Failed to cancel resource reserved for bundle because the max " + "retry count is reached. " + << bundle_spec->DebugString() << " at node " << node_id; return; } @@ -261,10 +261,11 @@ void GcsPlacementGroupScheduler::CancelResourceReserve( RAY_LOG(INFO) << "Finished cancelling the resource reserved for bundle: " << bundle_spec->DebugString() << " at node " << node_id; } else { - // We couldn't delete the pg resources because of network issue. Retry. - RAY_LOG(WARNING) << "Failed to cancel the resource reserved for bundle: " - << bundle_spec->DebugString() << " at node " << node_id - << ". Status: " << status; + // We couldn't delete the pg resources either becuase it is in use + // or network issue. Retry. + RAY_LOG(INFO) << "Failed to cancel the resource reserved for bundle: " + << bundle_spec->DebugString() << " at node " << node_id + << ". Status: " << status; execute_after( io_context_, [this, bundle_spec, node, max_retry, current_retry_cnt] { @@ -567,10 +568,14 @@ void GcsPlacementGroupScheduler::DestroyPlacementGroupPreparedBundleResources( for (const auto &iter : *(leasing_bundle_locations)) { auto &bundle_spec = iter.second.second; auto &node_id = iter.second.first; - CancelResourceReserve(bundle_spec, - gcs_node_manager_.GetAliveNode(node_id), - /*max_retry*/ 5, - /*num_retry*/ 0); + CancelResourceReserve( + bundle_spec, + gcs_node_manager_.GetAliveNode(node_id), + // Retry 10 * worker registeration timeout to avoid race condition. + // See https://github.com/ray-project/ray/pull/42942 + // for more details. + /*max_retry*/ RayConfig::instance().worker_register_timeout_seconds() * 10, + /*num_retry*/ 0); } } } @@ -589,10 +594,14 @@ void GcsPlacementGroupScheduler::DestroyPlacementGroupCommittedBundleResources( for (const auto &iter : *(committed_bundle_locations)) { auto &bundle_spec = iter.second.second; auto &node_id = iter.second.first; - CancelResourceReserve(bundle_spec, - gcs_node_manager_.GetAliveNode(node_id), - /*max_retry*/ 5, - /*num_retry*/ 0); + CancelResourceReserve( + bundle_spec, + gcs_node_manager_.GetAliveNode(node_id), + // Retry 10 * worker registeration timeout to avoid race condition. + // See https://github.com/ray-project/ray/pull/42942 + // for more details. + /*max_retry*/ RayConfig::instance().worker_register_timeout_seconds() * 10, + /*num_retry*/ 0); } committed_bundle_location_index_.Erase(placement_group_id); cluster_resource_scheduler_.GetClusterResourceManager() diff --git a/src/ray/gcs/gcs_server/gcs_server_main.cc b/src/ray/gcs/gcs_server/gcs_server_main.cc index c58fbfbd8477..5bd85f900dc2 100644 --- a/src/ray/gcs/gcs_server/gcs_server_main.cc +++ b/src/ray/gcs/gcs_server/gcs_server_main.cc @@ -62,7 +62,6 @@ int main(int argc, char *argv[]) { gflags::ShutDownCommandLineFlags(); RayConfig::instance().initialize(config_list); - ray::asio::testing::init(); // IO Service for main loop. instrumented_io_context main_service; diff --git a/src/ray/raylet/local_task_manager.cc b/src/ray/raylet/local_task_manager.cc index d499abacb205..f2161dc5c003 100644 --- a/src/ray/raylet/local_task_manager.cc +++ b/src/ray/raylet/local_task_manager.cc @@ -546,15 +546,21 @@ bool LocalTaskManager::PoppedWorkerHandler( not_detached_with_owner_failed = true; } - if (!canceled) { - const auto &required_resource = - task.GetTaskSpecification().GetRequiredResources().GetResourceMap(); - for (auto &entry : required_resource) { - // This is to make sure PG resource is not deleted during popping worker - // unless the lease request is cancelled. - RAY_CHECK(cluster_resource_scheduler_->GetLocalResourceManager().ResourcesExist( - scheduling::ResourceID(entry.first))) - << entry.first; + const auto &required_resource = + task.GetTaskSpecification().GetRequiredResources().GetResourceMap(); + for (auto &entry : required_resource) { + if (!cluster_resource_scheduler_->GetLocalResourceManager().ResourcesExist( + scheduling::ResourceID(entry.first))) { + RAY_CHECK(task.GetTaskSpecification().PlacementGroupBundleId().first != + PlacementGroupID::Nil()); + RAY_LOG(DEBUG) << "The placement group: " + << task.GetTaskSpecification().PlacementGroupBundleId().first + << " was removed when poping workers for task: " << task_id + << ", will cancel the task."; + CancelTask( + task_id, + rpc::RequestWorkerLeaseReply::SCHEDULING_CANCELLED_PLACEMENT_GROUP_REMOVED); + canceled = true; } } @@ -849,7 +855,7 @@ void LocalTaskManager::ReleaseTaskArgs(const TaskID &task_id) { } namespace { -void ReplyCancelled(const std::shared_ptr &work, +void ReplyCancelled(std::shared_ptr &work, rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type, const std::string &scheduling_failure_message) { auto reply = work->reply; @@ -861,67 +867,55 @@ void ReplyCancelled(const std::shared_ptr &work, } } // namespace -bool LocalTaskManager::CancelTasks( - std::function &)> predicate, +bool LocalTaskManager::CancelTask( + const TaskID &task_id, rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type, const std::string &scheduling_failure_message) { - bool tasks_cancelled = false; - - ray::erase_if>( - tasks_to_dispatch_, [&](const std::shared_ptr &work) { - if (predicate(work)) { - const TaskID task_id = work->task.GetTaskSpecification().TaskId(); - RAY_LOG(DEBUG) << "Canceling task " << task_id << " from dispatch queue."; - ReplyCancelled(work, failure_type, scheduling_failure_message); - if (work->GetState() == internal::WorkStatus::WAITING_FOR_WORKER) { - // We've already acquired resources so we need to release them. - cluster_resource_scheduler_->GetLocalResourceManager().ReleaseWorkerResources( - work->allocated_instances); - // Release pinned task args. - ReleaseTaskArgs(task_id); - } - if (!work->task.GetTaskSpecification().GetDependencies().empty()) { - task_dependency_manager_.RemoveTaskDependencies( - work->task.GetTaskSpecification().TaskId()); - } - RemoveFromRunningTasksIfExists(work->task); - work->SetStateCancelled(); - tasks_cancelled = true; - return true; - } else { - return false; + for (auto shapes_it = tasks_to_dispatch_.begin(); shapes_it != tasks_to_dispatch_.end(); + shapes_it++) { + auto &work_queue = shapes_it->second; + for (auto work_it = work_queue.begin(); work_it != work_queue.end(); work_it++) { + const auto &task = (*work_it)->task; + if (task.GetTaskSpecification().TaskId() == task_id) { + RAY_LOG(DEBUG) << "Canceling task " << task_id << " from dispatch queue."; + ReplyCancelled(*work_it, failure_type, scheduling_failure_message); + if ((*work_it)->GetState() == internal::WorkStatus::WAITING_FOR_WORKER) { + // We've already acquired resources so we need to release them. + cluster_resource_scheduler_->GetLocalResourceManager().ReleaseWorkerResources( + (*work_it)->allocated_instances); + // Release pinned task args. + ReleaseTaskArgs(task_id); } - }); - - ray::erase_if>( - waiting_task_queue_, [&](const std::shared_ptr &work) { - if (predicate(work)) { - ReplyCancelled(work, failure_type, scheduling_failure_message); - if (!work->task.GetTaskSpecification().GetDependencies().empty()) { - task_dependency_manager_.RemoveTaskDependencies( - work->task.GetTaskSpecification().TaskId()); - } - waiting_tasks_index_.erase(work->task.GetTaskSpecification().TaskId()); - tasks_cancelled = true; - return true; - } else { - return false; + if (!task.GetTaskSpecification().GetDependencies().empty()) { + task_dependency_manager_.RemoveTaskDependencies( + task.GetTaskSpecification().TaskId()); + } + RemoveFromRunningTasksIfExists(task); + (*work_it)->SetStateCancelled(); + work_queue.erase(work_it); + if (work_queue.empty()) { + tasks_to_dispatch_.erase(shapes_it); } - }); + return true; + } + } + } - return tasks_cancelled; -} + auto iter = waiting_tasks_index_.find(task_id); + if (iter != waiting_tasks_index_.end()) { + const auto &task = (*iter->second)->task; + ReplyCancelled(*iter->second, failure_type, scheduling_failure_message); + if (!task.GetTaskSpecification().GetDependencies().empty()) { + task_dependency_manager_.RemoveTaskDependencies( + task.GetTaskSpecification().TaskId()); + } + waiting_task_queue_.erase(iter->second); + waiting_tasks_index_.erase(iter); -bool LocalTaskManager::CancelTask( - const TaskID &task_id, - rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type, - const std::string &scheduling_failure_message) { - return CancelTasks( - [task_id](const std::shared_ptr &work) { - return work->task.GetTaskSpecification().TaskId() == task_id; - }, - failure_type, - scheduling_failure_message); + return true; + } + + return false; } bool LocalTaskManager::AnyPendingTasksForResourceAcquisition( diff --git a/src/ray/raylet/local_task_manager.h b/src/ray/raylet/local_task_manager.h index 77468548cd12..b72861ce95ed 100644 --- a/src/ray/raylet/local_task_manager.h +++ b/src/ray/raylet/local_task_manager.h @@ -111,15 +111,17 @@ class LocalTaskManager : public ILocalTaskManager { /// \param task: Output parameter. void TaskFinished(std::shared_ptr worker, RayTask *task); - /// Attempt to cancel all queued tasks that match the predicate. + /// Attempt to cancel an already queued task. /// - /// \param predicate: A function that returns true if a task needs to be cancelled. - /// \param failure_type: The reason for cancellation. - /// \param scheduling_failure_message: The reason message for cancellation. - /// \return True if any task was successfully cancelled. - bool CancelTasks(std::function &)> predicate, - rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type, - const std::string &scheduling_failure_message) override; + /// \param task_id: The id of the task to remove. + /// \param failure_type: The failure type. + /// + /// \return True if task was successfully removed. This function will return + /// false if the task is already running. + bool CancelTask(const TaskID &task_id, + rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type = + rpc::RequestWorkerLeaseReply::SCHEDULING_CANCELLED_INTENDED, + const std::string &scheduling_failure_message = "") override; /// Return if any tasks are pending resource acquisition. /// @@ -201,18 +203,6 @@ class LocalTaskManager : public ILocalTaskManager { const rpc::Address &owner_address, const std::string &runtime_env_setup_error_message); - /// Attempt to cancel an already queued task. - /// - /// \param task_id: The id of the task to remove. - /// \param failure_type: The failure type. - /// - /// \return True if task was successfully removed. This function will return - /// false if the task is already running. - bool CancelTask(const TaskID &task_id, - rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type = - rpc::RequestWorkerLeaseReply::SCHEDULING_CANCELLED_INTENDED, - const std::string &scheduling_failure_message = ""); - /// Attempts to dispatch all tasks which are ready to run. A task /// will be dispatched if it is on `tasks_to_dispatch_` and there are still /// available resources on the node. diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index 4c77133c29d1..19d90124892a 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -272,7 +272,6 @@ int main(int argc, char *argv[]) { RAY_CHECK_OK(status); RAY_CHECK(stored_raylet_config.has_value()); RayConfig::instance().initialize(stored_raylet_config.get()); - ray::asio::testing::init(); // Core worker tries to kill child processes when it exits. But they can't do // it perfectly: if the core worker is killed by SIGKILL, the child processes diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index cbcf198313b8..64a4aa97b1c4 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -683,16 +683,6 @@ void NodeManager::HandleReleaseUnusedBundles(rpc::ReleaseUnusedBundlesRequest re -1); } - // Cancel lease requests related to unused bundles - cluster_task_manager_->CancelTasks( - [&](const std::shared_ptr &work) { - const auto bundle_id = work->task.GetTaskSpecification().PlacementGroupBundleId(); - return !bundle_id.first.IsNil() && 0 == in_use_bundles.count(bundle_id); - }, - rpc::RequestWorkerLeaseReply::SCHEDULING_CANCELLED_INTENDED, - "The task is cancelled because it uses placement group bundles that are not " - "registered to GCS. It can happen upon GCS restart."); - // Kill all workers that are currently associated with the unused bundles. // NOTE: We can't traverse directly with `leased_workers_`, because `DestroyWorker` will // delete the element of `leased_workers_`. So we need to filter out @@ -1899,15 +1889,6 @@ void NodeManager::HandleCancelResourceReserve( RAY_LOG(DEBUG) << "Request to cancel reserved resource is received, " << bundle_spec.DebugString(); - // Cancel lease requests related to the placement group to be removed. - cluster_task_manager_->CancelTasks( - [&](const std::shared_ptr &work) { - const auto bundle_id = work->task.GetTaskSpecification().PlacementGroupBundleId(); - return bundle_id.first == bundle_spec.PlacementGroupId(); - }, - rpc::RequestWorkerLeaseReply::SCHEDULING_CANCELLED_PLACEMENT_GROUP_REMOVED, - ""); - // Kill all workers that are currently associated with the placement group. // NOTE: We can't traverse directly with `leased_workers_`, because `DestroyWorker` will // delete the element of `leased_workers_`. So we need to filter out @@ -1933,9 +1914,12 @@ void NodeManager::HandleCancelResourceReserve( DestroyWorker(worker, rpc::WorkerExitType::INTENDED_SYSTEM_EXIT, message); } - RAY_CHECK_OK(placement_group_resource_manager_->ReturnBundle(bundle_spec)); + // Return bundle resources. If it fails to return a bundle, + // it will return none-ok status. They are transient state, + // and GCS should retry. + auto status = placement_group_resource_manager_->ReturnBundle(bundle_spec); cluster_task_manager_->ScheduleAndDispatchTasks(); - send_reply_callback(Status::OK(), nullptr, nullptr); + send_reply_callback(status, nullptr, nullptr); } void NodeManager::HandleReturnWorker(rpc::ReturnWorkerRequest request, diff --git a/src/ray/raylet/placement_group_resource_manager.cc b/src/ray/raylet/placement_group_resource_manager.cc index e0906c23885f..0f16cf766535 100644 --- a/src/ray/raylet/placement_group_resource_manager.cc +++ b/src/ray/raylet/placement_group_resource_manager.cc @@ -26,7 +26,7 @@ void PlacementGroupResourceManager::ReturnUnusedBundle( const std::unordered_set &in_use_bundles) { for (auto iter = bundle_spec_map_.begin(); iter != bundle_spec_map_.end();) { if (0 == in_use_bundles.count(iter->first)) { - RAY_CHECK_OK(ReturnBundle(*iter->second)); + RAY_CHECK(ReturnBundle(*iter->second).ok()); bundle_spec_map_.erase(iter++); } else { iter++; diff --git a/src/ray/raylet/scheduling/cluster_task_manager.cc b/src/ray/raylet/scheduling/cluster_task_manager.cc index 99b998dc14fe..c4e6cff7a08a 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager.cc @@ -77,60 +77,54 @@ void ReplyCancelled(const internal::Work &work, } } // namespace -bool ClusterTaskManager::CancelTasks( - std::function &)> predicate, - rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type, - const std::string &scheduling_failure_message) { - bool tasks_cancelled = false; - - ray::erase_if>( - tasks_to_schedule_, [&](const std::shared_ptr &work) { - if (predicate(work)) { - RAY_LOG(DEBUG) << "Canceling task " - << work->task.GetTaskSpecification().TaskId() - << " from schedule queue."; - ReplyCancelled(*work, failure_type, scheduling_failure_message); - tasks_cancelled = true; - return true; - } else { - return false; - } - }); - - ray::erase_if>( - infeasible_tasks_, [&](const std::shared_ptr &work) { - if (predicate(work)) { - RAY_LOG(DEBUG) << "Canceling task " - << work->task.GetTaskSpecification().TaskId() - << " from infeasible queue."; - ReplyCancelled(*work, failure_type, scheduling_failure_message); - tasks_cancelled = true; - return true; - } else { - return false; - } - }); - - if (local_task_manager_->CancelTasks( - predicate, failure_type, scheduling_failure_message)) { - tasks_cancelled = true; - } - - return tasks_cancelled; -} - bool ClusterTaskManager::CancelAllTaskOwnedBy( const WorkerID &worker_id, rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type, const std::string &scheduling_failure_message) { // Only tasks and regular actors are canceled because their lifetime is // the same as the owner. - auto predicate = [worker_id](const std::shared_ptr &work) { - return !work->task.GetTaskSpecification().IsDetachedActor() && - work->task.GetTaskSpecification().CallerWorkerId() == worker_id; - }; + auto shapes_it = tasks_to_schedule_.begin(); + while (shapes_it != tasks_to_schedule_.end()) { + auto &work_queue = shapes_it->second; + auto work_it = work_queue.begin(); + while (work_it != work_queue.end()) { + const auto &task = (*work_it)->task; + const auto &spec = task.GetTaskSpecification(); + if (!spec.IsDetachedActor() && spec.CallerWorkerId() == worker_id) { + ReplyCancelled(*(*work_it), failure_type, scheduling_failure_message); + work_it = work_queue.erase(work_it); + } else { + ++work_it; + } + } + if (work_queue.empty()) { + tasks_to_schedule_.erase(shapes_it++); + } else { + ++shapes_it; + } + } - return CancelTasks(predicate, failure_type, scheduling_failure_message); + shapes_it = infeasible_tasks_.begin(); + while (shapes_it != infeasible_tasks_.end()) { + auto &work_queue = shapes_it->second; + auto work_it = work_queue.begin(); + while (work_it != work_queue.end()) { + const auto &task = (*work_it)->task; + const auto &spec = task.GetTaskSpecification(); + if (!spec.IsDetachedActor() && spec.CallerWorkerId() == worker_id) { + ReplyCancelled(*(*work_it), failure_type, scheduling_failure_message); + work_it = work_queue.erase(work_it); + } else { + ++work_it; + } + } + if (work_queue.empty()) { + infeasible_tasks_.erase(shapes_it++); + } else { + ++shapes_it; + } + } + return true; } void ClusterTaskManager::ScheduleAndDispatchTasks() { @@ -274,11 +268,44 @@ bool ClusterTaskManager::CancelTask( const TaskID &task_id, rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type, const std::string &scheduling_failure_message) { - auto predicate = [task_id](const std::shared_ptr &work) { - return work->task.GetTaskSpecification().TaskId() == task_id; - }; + // TODO(sang): There are lots of repetitive code around task backlogs. We should + // refactor them. + for (auto shapes_it = tasks_to_schedule_.begin(); shapes_it != tasks_to_schedule_.end(); + shapes_it++) { + auto &work_queue = shapes_it->second; + for (auto work_it = work_queue.begin(); work_it != work_queue.end(); work_it++) { + const auto &task = (*work_it)->task; + if (task.GetTaskSpecification().TaskId() == task_id) { + RAY_LOG(DEBUG) << "Canceling task " << task_id << " from schedule queue."; + ReplyCancelled(*(*work_it), failure_type, scheduling_failure_message); + work_queue.erase(work_it); + if (work_queue.empty()) { + tasks_to_schedule_.erase(shapes_it); + } + return true; + } + } + } + + for (auto shapes_it = infeasible_tasks_.begin(); shapes_it != infeasible_tasks_.end(); + shapes_it++) { + auto &work_queue = shapes_it->second; + for (auto work_it = work_queue.begin(); work_it != work_queue.end(); work_it++) { + const auto &task = (*work_it)->task; + if (task.GetTaskSpecification().TaskId() == task_id) { + RAY_LOG(DEBUG) << "Canceling task " << task_id << " from infeasible queue."; + ReplyCancelled(*(*work_it), failure_type, scheduling_failure_message); + work_queue.erase(work_it); + if (work_queue.empty()) { + infeasible_tasks_.erase(shapes_it); + } + return true; + } + } + } - return CancelTasks(predicate, failure_type, scheduling_failure_message); + return local_task_manager_->CancelTask( + task_id, failure_type, scheduling_failure_message); } void ClusterTaskManager::FillResourceUsage(rpc::ResourcesData &data) { diff --git a/src/ray/raylet/scheduling/cluster_task_manager.h b/src/ray/raylet/scheduling/cluster_task_manager.h index 058c40f97fcf..a3363365bb10 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager.h +++ b/src/ray/raylet/scheduling/cluster_task_manager.h @@ -93,16 +93,6 @@ class ClusterTaskManager : public ClusterTaskManagerInterface { rpc::RequestWorkerLeaseReply::SCHEDULING_CANCELLED_INTENDED, const std::string &scheduling_failure_message = "") override; - /// Attempt to cancel all queued tasks that match the predicate. - /// - /// \param predicate: A function that returns true if a task needs to be cancelled. - /// \param failure_type: The reason for cancellation. - /// \param scheduling_failure_message: The reason message for cancellation. - /// \return True if any task was successfully cancelled. - bool CancelTasks(std::function &)> predicate, - rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type, - const std::string &scheduling_failure_message) override; - /// Populate the relevant parts of the heartbeat table. This is intended for /// sending resource usage of raylet to gcs. In particular, this should fill in /// resource_load and resource_load_by_shape. diff --git a/src/ray/raylet/scheduling/cluster_task_manager_interface.h b/src/ray/raylet/scheduling/cluster_task_manager_interface.h index 0e2bdbe08bb6..8ae664479924 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager_interface.h +++ b/src/ray/raylet/scheduling/cluster_task_manager_interface.h @@ -54,17 +54,6 @@ class ClusterTaskManagerInterface { rpc::RequestWorkerLeaseReply::SCHEDULING_CANCELLED_INTENDED, const std::string &scheduling_failure_message = "") = 0; - /// Attempt to cancel all queued tasks that match the predicate. - /// - /// \param predicate: A function that returns true if a task needs to be cancelled. - /// \param failure_type: The reason for cancellation. - /// \param scheduling_failure_message: The reason message for cancellation. - /// \return True if any task was successfully cancelled. - virtual bool CancelTasks( - std::function &)> predicate, - rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type, - const std::string &scheduling_failure_message) = 0; - /// Queue task and schedule. This hanppens when processing the worker lease request. /// /// \param task: The incoming task to be queued and scheduled. diff --git a/src/ray/raylet/scheduling/cluster_task_manager_test.cc b/src/ray/raylet/scheduling/cluster_task_manager_test.cc index 6a03a6036f61..2fe7eec7452a 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager_test.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager_test.cc @@ -1218,6 +1218,7 @@ TEST_F(ClusterTaskManagerTest, TaskCancellationTest) { callback_called = false; reply.Clear(); ASSERT_FALSE(task_manager_.CancelTask(task2.GetTaskSpecification().TaskId())); + // Task2 will not execute. ASSERT_FALSE(reply.canceled()); ASSERT_FALSE(callback_called); ASSERT_EQ(pool_.workers.size(), 0); @@ -1228,22 +1229,6 @@ TEST_F(ClusterTaskManagerTest, TaskCancellationTest) { ASSERT_EQ(finished_task.GetTaskSpecification().TaskId(), task2.GetTaskSpecification().TaskId()); - RayTask task3 = CreateTask({{ray::kCPU_ResourceLabel, 2}}); - rpc::RequestWorkerLeaseReply reply3; - RayTask task4 = CreateTask({{ray::kCPU_ResourceLabel, 200}}); - rpc::RequestWorkerLeaseReply reply4; - // Task 3 should be popping worker - task_manager_.QueueAndScheduleTask(task3, false, false, &reply3, callback); - // Task 4 is infeasible - task_manager_.QueueAndScheduleTask(task4, false, false, &reply4, callback); - pool_.TriggerCallbacks(); - ASSERT_TRUE(task_manager_.CancelTasks( - [](const std::shared_ptr &work) { return true; }, - rpc::RequestWorkerLeaseReply::SCHEDULING_CANCELLED_INTENDED, - "")); - ASSERT_TRUE(reply3.canceled()); - ASSERT_TRUE(reply4.canceled()); - AssertNoLeaks(); } diff --git a/src/ray/raylet/scheduling/local_task_manager_interface.h b/src/ray/raylet/scheduling/local_task_manager_interface.h index 8bdce254a418..03f3a8b15a60 100644 --- a/src/ray/raylet/scheduling/local_task_manager_interface.h +++ b/src/ray/raylet/scheduling/local_task_manager_interface.h @@ -37,16 +37,18 @@ class ILocalTaskManager { // Schedule and dispatch tasks. virtual void ScheduleAndDispatchTasks() = 0; - /// Attempt to cancel all queued tasks that match the predicate. + /// Attempt to cancel an already queued task. /// - /// \param predicate: A function that returns true if a task needs to be cancelled. - /// \param failure_type: The reason for cancellation. - /// \param scheduling_failure_message: The reason message for cancellation. - /// \return True if any task was successfully cancelled. - virtual bool CancelTasks( - std::function &)> predicate, - rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type, - const std::string &scheduling_failure_message) = 0; + /// \param task_id: The id of the task to remove. + /// \param failure_type: The failure type. + /// + /// \return True if task was successfully removed. This function will return + /// false if the task is already running. + virtual bool CancelTask( + const TaskID &task_id, + rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type = + rpc::RequestWorkerLeaseReply::SCHEDULING_CANCELLED_INTENDED, + const std::string &scheduling_failure_message = "") = 0; virtual const absl::flat_hash_map>> @@ -86,9 +88,17 @@ class NoopLocalTaskManager : public ILocalTaskManager { // Schedule and dispatch tasks. void ScheduleAndDispatchTasks() override {} - bool CancelTasks(std::function &)> predicate, - rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type, - const std::string &scheduling_failure_message) override { + /// Attempt to cancel an already queued task. + /// + /// \param task_id: The id of the task to remove. + /// \param failure_type: The failure type. + /// + /// \return True if task was successfully removed. This function will return + /// false if the task is already running. + bool CancelTask(const TaskID &task_id, + rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type = + rpc::RequestWorkerLeaseReply::SCHEDULING_CANCELLED_INTENDED, + const std::string &scheduling_failure_message = "") override { return false; } diff --git a/src/ray/util/container_util.h b/src/ray/util/container_util.h index 6a363dc09d0f..6a6bc671e529 100644 --- a/src/ray/util/container_util.h +++ b/src/ray/util/container_util.h @@ -97,35 +97,19 @@ typename C::mapped_type &map_find_or_die(C &c, const typename C::key_type &k) { map_find_or_die(const_cast(c), k)); } -// This is guaranteed that predicate is applied to each element exactly once, -// so it can have side effect. -template -void erase_if(absl::flat_hash_map> &map, - std::function predicate) { - for (auto map_it = map.begin(); map_it != map.end();) { - auto &queue = map_it->second; - for (auto queue_it = queue.begin(); queue_it != queue.end();) { - if (predicate(*queue_it)) { - queue_it = queue.erase(queue_it); - } else { - ++queue_it; - } - } - if (queue.empty()) { - map.erase(map_it++); - } else { - ++map_it; - } - } -} - +/// Remove elements whole matcher returns true against the element. +/// +/// @param matcher the matcher function to be applied to each elements +/// @param container the container of the elements template -void erase_if(std::list &list, std::function predicate) { - for (auto list_it = list.begin(); list_it != list.end();) { - if (predicate(*list_it)) { - list_it = list.erase(list_it); - } else { - ++list_it; +void remove_elements(std::function matcher, std::deque &container) { + auto itr = container.begin(); + while (itr != container.end()) { + if (matcher(*itr)) { + itr = container.erase(itr); + } + if (itr != container.end()) { + itr++; } } } diff --git a/src/ray/util/tests/container_util_test.cc b/src/ray/util/tests/container_util_test.cc index 0e404efd2f8b..d5ba8a7aa7e4 100644 --- a/src/ray/util/tests/container_util_test.cc +++ b/src/ray/util/tests/container_util_test.cc @@ -36,36 +36,31 @@ TEST(ContainerUtilTest, TestMapFindOrDie) { } } -TEST(ContainerUtilTest, TestEraseIf) { - { - std::list list{1, 2, 3, 4}; - ray::erase_if(list, [](const int &value) { return value % 2 == 0; }); - ASSERT_EQ(list, (std::list{1, 3})); - } +TEST(ContainerUtilTest, RemoveElementsLastElement) { + std::deque queue{1, 2, 3, 4}; + std::function even = [](int value) { return value % 2 == 0; }; + remove_elements(even, queue); - { - std::list list{1, 2, 3}; - ray::erase_if(list, [](const int &value) { return value % 2 == 0; }); - ASSERT_EQ(list, (std::list{1, 3})); - } + std::deque expected{1, 3}; + ASSERT_EQ(queue, expected); +} - { - std::list list{}; - ray::erase_if(list, [](const int &value) { return value % 2 == 0; }); - ASSERT_EQ(list, (std::list{})); - } +TEST(ContainerUtilTest, RemoveElementsExcludeLastElement) { + std::deque queue{1, 2, 3}; + std::function even = [](int value) { return value % 2 == 0; }; + remove_elements(even, queue); - { - absl::flat_hash_map> map; - map[1] = std::deque{1, 3}; - map[2] = std::deque{2, 4}; - map[3] = std::deque{5, 6}; - ray::erase_if(map, [](const int &value) { return value % 2 == 0; }); + std::deque expected{1, 3}; + ASSERT_EQ(queue, expected); +} - ASSERT_EQ(map.size(), 2); - ASSERT_EQ(map[1], (std::deque{1, 3})); - ASSERT_EQ(map[3], (std::deque{5})); - } +TEST(ContainerUtilTest, RemoveElementsEmptyContainer) { + std::deque queue{}; + std::function even = [](int value) { return value % 2 == 0; }; + remove_elements(even, queue); + + std::deque expected{}; + ASSERT_EQ(queue, expected); } } // namespace ray