Skip to content

Commit f0f52fa

Browse files
authored
[Core] Cancel lease requests before returning a PG bundle (ray-project#45919)
Signed-off-by: Jiajun Yao <[email protected]>
1 parent 2f80503 commit f0f52fa

17 files changed

+366
-237
lines changed

python/ray/tests/test_gcs_fault_tolerance.py

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import sys
2+
import asyncio
23
import os
34
import threading
45
from time import sleep
@@ -22,6 +23,8 @@
2223
)
2324
from ray.job_submission import JobSubmissionClient, JobStatus
2425
from ray._raylet import GcsClient
26+
from ray._private.runtime_env.plugin import RuntimeEnvPlugin
27+
from ray.util.state import list_placement_groups
2528

2629
import psutil
2730

@@ -1213,6 +1216,87 @@ def spawn(self, name, namespace):
12131216
raise ValueError(f"Unknown case: {case}")
12141217

12151218

1219+
MyPlugin = "MyPlugin"
1220+
MY_PLUGIN_CLASS_PATH = "ray.tests.test_gcs_fault_tolerance.HangPlugin"
1221+
1222+
1223+
class HangPlugin(RuntimeEnvPlugin):
1224+
name = MyPlugin
1225+
1226+
async def create(
1227+
self,
1228+
uri,
1229+
runtime_env,
1230+
ctx,
1231+
logger, # noqa: F821
1232+
) -> float:
1233+
while True:
1234+
await asyncio.sleep(1)
1235+
1236+
@staticmethod
1237+
def validate(runtime_env_dict: dict) -> str:
1238+
return 1
1239+
1240+
1241+
@pytest.mark.parametrize(
1242+
"ray_start_regular_with_external_redis",
1243+
[
1244+
generate_system_config_map(
1245+
gcs_rpc_server_reconnect_timeout_s=60,
1246+
testing_asio_delay_us="NodeManagerService.grpc_server.CancelResourceReserve=500000000:500000000", # noqa: E501
1247+
),
1248+
],
1249+
indirect=True,
1250+
)
1251+
@pytest.mark.parametrize(
1252+
"set_runtime_env_plugins",
1253+
[
1254+
'[{"class":"' + MY_PLUGIN_CLASS_PATH + '"}]',
1255+
],
1256+
indirect=True,
1257+
)
1258+
def test_placement_group_removal_after_gcs_restarts(
1259+
set_runtime_env_plugins, ray_start_regular_with_external_redis
1260+
):
1261+
@ray.remote
1262+
def task():
1263+
pass
1264+
1265+
pg = ray.util.placement_group(bundles=[{"CPU": 1}])
1266+
_ = task.options(
1267+
max_retries=0,
1268+
num_cpus=1,
1269+
scheduling_strategy=PlacementGroupSchedulingStrategy(
1270+
placement_group=pg,
1271+
),
1272+
runtime_env={
1273+
MyPlugin: {"name": "f2"},
1274+
"config": {"setup_timeout_seconds": -1},
1275+
},
1276+
).remote()
1277+
1278+
# The task should be popping worker
1279+
# TODO(jjyao) Use a more determinstic way to
1280+
# decide whether the task is popping worker
1281+
sleep(5)
1282+
1283+
ray.util.remove_placement_group(pg)
1284+
# The PG is marked as REMOVED in redis but not removed yet from raylet
1285+
# due to the injected delay of CancelResourceReserve rpc
1286+
wait_for_condition(lambda: list_placement_groups()[0].state == "REMOVED")
1287+
1288+
ray._private.worker._global_node.kill_gcs_server()
1289+
# After GCS restarts, it will try to remove the PG resources
1290+
# again via ReleaseUnusedBundles rpc
1291+
ray._private.worker._global_node.start_gcs_server()
1292+
1293+
def verify_pg_resources_cleaned():
1294+
r_keys = ray.available_resources().keys()
1295+
return all("group" not in k for k in r_keys)
1296+
1297+
wait_for_condition(verify_pg_resources_cleaned, timeout=30)
1298+
1299+
12161300
if __name__ == "__main__":
12171301

12181302
import pytest

python/ray/tests/test_placement_group_5.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -470,10 +470,9 @@ async def create(
470470
) -> float:
471471
await asyncio.sleep(PLUGIN_TIMEOUT)
472472

473-
474-
@staticmethod
475-
def validate(runtime_env_dict: dict) -> str:
476-
return 1
473+
@staticmethod
474+
def validate(runtime_env_dict: dict) -> str:
475+
return 1
477476

478477

479478
@pytest.mark.parametrize(

src/ray/core_worker/core_worker_process.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,7 @@ void CoreWorkerProcessImpl::InitializeSystemConfig() {
233233
thread.join();
234234

235235
RayConfig::instance().initialize(promise.get_future().get());
236+
ray::asio::testing::init();
236237
}
237238

238239
void CoreWorkerProcessImpl::RunWorkerTaskExecutionLoop() {

src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc

Lines changed: 15 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -243,9 +243,9 @@ void GcsPlacementGroupScheduler::CancelResourceReserve(
243243
auto node_id = NodeID::FromBinary(node.value()->node_id());
244244

245245
if (max_retry == current_retry_cnt) {
246-
RAY_LOG(INFO) << "Failed to cancel resource reserved for bundle because the max "
247-
"retry count is reached. "
248-
<< bundle_spec->DebugString() << " at node " << node_id;
246+
RAY_LOG(ERROR) << "Failed to cancel resource reserved for bundle because the max "
247+
"retry count is reached. "
248+
<< bundle_spec->DebugString() << " at node " << node_id;
249249
return;
250250
}
251251

@@ -261,11 +261,10 @@ void GcsPlacementGroupScheduler::CancelResourceReserve(
261261
RAY_LOG(INFO) << "Finished cancelling the resource reserved for bundle: "
262262
<< bundle_spec->DebugString() << " at node " << node_id;
263263
} else {
264-
// We couldn't delete the pg resources either becuase it is in use
265-
// or network issue. Retry.
266-
RAY_LOG(INFO) << "Failed to cancel the resource reserved for bundle: "
267-
<< bundle_spec->DebugString() << " at node " << node_id
268-
<< ". Status: " << status;
264+
// We couldn't delete the pg resources because of network issue. Retry.
265+
RAY_LOG(WARNING) << "Failed to cancel the resource reserved for bundle: "
266+
<< bundle_spec->DebugString() << " at node " << node_id
267+
<< ". Status: " << status;
269268
execute_after(
270269
io_context_,
271270
[this, bundle_spec, node, max_retry, current_retry_cnt] {
@@ -568,14 +567,10 @@ void GcsPlacementGroupScheduler::DestroyPlacementGroupPreparedBundleResources(
568567
for (const auto &iter : *(leasing_bundle_locations)) {
569568
auto &bundle_spec = iter.second.second;
570569
auto &node_id = iter.second.first;
571-
CancelResourceReserve(
572-
bundle_spec,
573-
gcs_node_manager_.GetAliveNode(node_id),
574-
// Retry 10 * worker registeration timeout to avoid race condition.
575-
// See https://github.com/ray-project/ray/pull/42942
576-
// for more details.
577-
/*max_retry*/ RayConfig::instance().worker_register_timeout_seconds() * 10,
578-
/*num_retry*/ 0);
570+
CancelResourceReserve(bundle_spec,
571+
gcs_node_manager_.GetAliveNode(node_id),
572+
/*max_retry*/ 5,
573+
/*num_retry*/ 0);
579574
}
580575
}
581576
}
@@ -594,14 +589,10 @@ void GcsPlacementGroupScheduler::DestroyPlacementGroupCommittedBundleResources(
594589
for (const auto &iter : *(committed_bundle_locations)) {
595590
auto &bundle_spec = iter.second.second;
596591
auto &node_id = iter.second.first;
597-
CancelResourceReserve(
598-
bundle_spec,
599-
gcs_node_manager_.GetAliveNode(node_id),
600-
// Retry 10 * worker registeration timeout to avoid race condition.
601-
// See https://github.com/ray-project/ray/pull/42942
602-
// for more details.
603-
/*max_retry*/ RayConfig::instance().worker_register_timeout_seconds() * 10,
604-
/*num_retry*/ 0);
592+
CancelResourceReserve(bundle_spec,
593+
gcs_node_manager_.GetAliveNode(node_id),
594+
/*max_retry*/ 5,
595+
/*num_retry*/ 0);
605596
}
606597
committed_bundle_location_index_.Erase(placement_group_id);
607598
cluster_resource_scheduler_.GetClusterResourceManager()

src/ray/gcs/gcs_server/gcs_server_main.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ int main(int argc, char *argv[]) {
6262
gflags::ShutDownCommandLineFlags();
6363

6464
RayConfig::instance().initialize(config_list);
65+
ray::asio::testing::init();
6566

6667
// IO Service for main loop.
6768
instrumented_io_context main_service;

src/ray/raylet/local_task_manager.cc

Lines changed: 65 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -546,21 +546,15 @@ bool LocalTaskManager::PoppedWorkerHandler(
546546
not_detached_with_owner_failed = true;
547547
}
548548

549-
const auto &required_resource =
550-
task.GetTaskSpecification().GetRequiredResources().GetResourceMap();
551-
for (auto &entry : required_resource) {
552-
if (!cluster_resource_scheduler_->GetLocalResourceManager().ResourcesExist(
553-
scheduling::ResourceID(entry.first))) {
554-
RAY_CHECK(task.GetTaskSpecification().PlacementGroupBundleId().first !=
555-
PlacementGroupID::Nil());
556-
RAY_LOG(DEBUG) << "The placement group: "
557-
<< task.GetTaskSpecification().PlacementGroupBundleId().first
558-
<< " was removed when poping workers for task: " << task_id
559-
<< ", will cancel the task.";
560-
CancelTask(
561-
task_id,
562-
rpc::RequestWorkerLeaseReply::SCHEDULING_CANCELLED_PLACEMENT_GROUP_REMOVED);
563-
canceled = true;
549+
if (!canceled) {
550+
const auto &required_resource =
551+
task.GetTaskSpecification().GetRequiredResources().GetResourceMap();
552+
for (auto &entry : required_resource) {
553+
// This is to make sure PG resource is not deleted during popping worker
554+
// unless the lease request is cancelled.
555+
RAY_CHECK(cluster_resource_scheduler_->GetLocalResourceManager().ResourcesExist(
556+
scheduling::ResourceID(entry.first)))
557+
<< entry.first;
564558
}
565559
}
566560

@@ -855,7 +849,7 @@ void LocalTaskManager::ReleaseTaskArgs(const TaskID &task_id) {
855849
}
856850

857851
namespace {
858-
void ReplyCancelled(std::shared_ptr<internal::Work> &work,
852+
void ReplyCancelled(const std::shared_ptr<internal::Work> &work,
859853
rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type,
860854
const std::string &scheduling_failure_message) {
861855
auto reply = work->reply;
@@ -867,55 +861,67 @@ void ReplyCancelled(std::shared_ptr<internal::Work> &work,
867861
}
868862
} // namespace
869863

870-
bool LocalTaskManager::CancelTask(
871-
const TaskID &task_id,
864+
bool LocalTaskManager::CancelTasks(
865+
std::function<bool(const std::shared_ptr<internal::Work> &)> predicate,
872866
rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type,
873867
const std::string &scheduling_failure_message) {
874-
for (auto shapes_it = tasks_to_dispatch_.begin(); shapes_it != tasks_to_dispatch_.end();
875-
shapes_it++) {
876-
auto &work_queue = shapes_it->second;
877-
for (auto work_it = work_queue.begin(); work_it != work_queue.end(); work_it++) {
878-
const auto &task = (*work_it)->task;
879-
if (task.GetTaskSpecification().TaskId() == task_id) {
880-
RAY_LOG(DEBUG) << "Canceling task " << task_id << " from dispatch queue.";
881-
ReplyCancelled(*work_it, failure_type, scheduling_failure_message);
882-
if ((*work_it)->GetState() == internal::WorkStatus::WAITING_FOR_WORKER) {
883-
// We've already acquired resources so we need to release them.
884-
cluster_resource_scheduler_->GetLocalResourceManager().ReleaseWorkerResources(
885-
(*work_it)->allocated_instances);
886-
// Release pinned task args.
887-
ReleaseTaskArgs(task_id);
888-
}
889-
if (!task.GetTaskSpecification().GetDependencies().empty()) {
890-
task_dependency_manager_.RemoveTaskDependencies(
891-
task.GetTaskSpecification().TaskId());
868+
bool tasks_cancelled = false;
869+
870+
ray::erase_if<SchedulingClass, std::shared_ptr<internal::Work>>(
871+
tasks_to_dispatch_, [&](const std::shared_ptr<internal::Work> &work) {
872+
if (predicate(work)) {
873+
const TaskID task_id = work->task.GetTaskSpecification().TaskId();
874+
RAY_LOG(DEBUG) << "Canceling task " << task_id << " from dispatch queue.";
875+
ReplyCancelled(work, failure_type, scheduling_failure_message);
876+
if (work->GetState() == internal::WorkStatus::WAITING_FOR_WORKER) {
877+
// We've already acquired resources so we need to release them.
878+
cluster_resource_scheduler_->GetLocalResourceManager().ReleaseWorkerResources(
879+
work->allocated_instances);
880+
// Release pinned task args.
881+
ReleaseTaskArgs(task_id);
882+
}
883+
if (!work->task.GetTaskSpecification().GetDependencies().empty()) {
884+
task_dependency_manager_.RemoveTaskDependencies(
885+
work->task.GetTaskSpecification().TaskId());
886+
}
887+
RemoveFromRunningTasksIfExists(work->task);
888+
work->SetStateCancelled();
889+
tasks_cancelled = true;
890+
return true;
891+
} else {
892+
return false;
892893
}
893-
RemoveFromRunningTasksIfExists(task);
894-
(*work_it)->SetStateCancelled();
895-
work_queue.erase(work_it);
896-
if (work_queue.empty()) {
897-
tasks_to_dispatch_.erase(shapes_it);
894+
});
895+
896+
ray::erase_if<std::shared_ptr<internal::Work>>(
897+
waiting_task_queue_, [&](const std::shared_ptr<internal::Work> &work) {
898+
if (predicate(work)) {
899+
ReplyCancelled(work, failure_type, scheduling_failure_message);
900+
if (!work->task.GetTaskSpecification().GetDependencies().empty()) {
901+
task_dependency_manager_.RemoveTaskDependencies(
902+
work->task.GetTaskSpecification().TaskId());
903+
}
904+
waiting_tasks_index_.erase(work->task.GetTaskSpecification().TaskId());
905+
tasks_cancelled = true;
906+
return true;
907+
} else {
908+
return false;
898909
}
899-
return true;
900-
}
901-
}
902-
}
910+
});
903911

904-
auto iter = waiting_tasks_index_.find(task_id);
905-
if (iter != waiting_tasks_index_.end()) {
906-
const auto &task = (*iter->second)->task;
907-
ReplyCancelled(*iter->second, failure_type, scheduling_failure_message);
908-
if (!task.GetTaskSpecification().GetDependencies().empty()) {
909-
task_dependency_manager_.RemoveTaskDependencies(
910-
task.GetTaskSpecification().TaskId());
911-
}
912-
waiting_task_queue_.erase(iter->second);
913-
waiting_tasks_index_.erase(iter);
914-
915-
return true;
916-
}
912+
return tasks_cancelled;
913+
}
917914

918-
return false;
915+
bool LocalTaskManager::CancelTask(
916+
const TaskID &task_id,
917+
rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type,
918+
const std::string &scheduling_failure_message) {
919+
return CancelTasks(
920+
[task_id](const std::shared_ptr<internal::Work> &work) {
921+
return work->task.GetTaskSpecification().TaskId() == task_id;
922+
},
923+
failure_type,
924+
scheduling_failure_message);
919925
}
920926

921927
bool LocalTaskManager::AnyPendingTasksForResourceAcquisition(

src/ray/raylet/local_task_manager.h

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -111,17 +111,15 @@ class LocalTaskManager : public ILocalTaskManager {
111111
/// \param task: Output parameter.
112112
void TaskFinished(std::shared_ptr<WorkerInterface> worker, RayTask *task);
113113

114-
/// Attempt to cancel an already queued task.
114+
/// Attempt to cancel all queued tasks that match the predicate.
115115
///
116-
/// \param task_id: The id of the task to remove.
117-
/// \param failure_type: The failure type.
118-
///
119-
/// \return True if task was successfully removed. This function will return
120-
/// false if the task is already running.
121-
bool CancelTask(const TaskID &task_id,
122-
rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type =
123-
rpc::RequestWorkerLeaseReply::SCHEDULING_CANCELLED_INTENDED,
124-
const std::string &scheduling_failure_message = "") override;
116+
/// \param predicate: A function that returns true if a task needs to be cancelled.
117+
/// \param failure_type: The reason for cancellation.
118+
/// \param scheduling_failure_message: The reason message for cancellation.
119+
/// \return True if any task was successfully cancelled.
120+
bool CancelTasks(std::function<bool(const std::shared_ptr<internal::Work> &)> predicate,
121+
rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type,
122+
const std::string &scheduling_failure_message) override;
125123

126124
/// Return if any tasks are pending resource acquisition.
127125
///
@@ -203,6 +201,18 @@ class LocalTaskManager : public ILocalTaskManager {
203201
const rpc::Address &owner_address,
204202
const std::string &runtime_env_setup_error_message);
205203

204+
/// Attempt to cancel an already queued task.
205+
///
206+
/// \param task_id: The id of the task to remove.
207+
/// \param failure_type: The failure type.
208+
///
209+
/// \return True if task was successfully removed. This function will return
210+
/// false if the task is already running.
211+
bool CancelTask(const TaskID &task_id,
212+
rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type =
213+
rpc::RequestWorkerLeaseReply::SCHEDULING_CANCELLED_INTENDED,
214+
const std::string &scheduling_failure_message = "");
215+
206216
/// Attempts to dispatch all tasks which are ready to run. A task
207217
/// will be dispatched if it is on `tasks_to_dispatch_` and there are still
208218
/// available resources on the node.

0 commit comments

Comments
 (0)