-
Notifications
You must be signed in to change notification settings - Fork 7k
[core] Make CancelTask RPC Fault Tolerant #58018
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 10 commits
f8150c0
0a630a7
8ae4e3a
a733422
8a2e428
901099d
7d4ab2e
9070db5
dcec398
9df37aa
d846b90
873a17c
430a4a6
a253c81
9d5cf6f
d0fddda
c8e0ed6
49250fb
3dbcc22
73445ab
f737eef
0429f79
2f9c24e
bed7884
2a66834
0fb240e
6bab852
c1f1e0f
758ecd6
22a53f5
e053c2f
6b2674b
07da167
41fa586
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -998,6 +998,8 @@ void ActorTaskSubmitter::CancelTask(TaskSpecification task_spec, bool recursive) | |
|
|
||
| // If there's no client, it means actor is not created yet. | ||
| // Retry in 1 second. | ||
| NodeID node_id; | ||
| std::string executor_worker_id; | ||
| { | ||
| absl::MutexLock lock(&mu_); | ||
| RAY_LOG(DEBUG).WithField(task_id) << "Task was sent to an actor. Send a cancel RPC."; | ||
|
|
@@ -1007,34 +1009,81 @@ void ActorTaskSubmitter::CancelTask(TaskSpecification task_spec, bool recursive) | |
| RetryCancelTask(task_spec, recursive, 1000); | ||
| return; | ||
| } | ||
| node_id = NodeID::FromBinary(queue->second.client_address_.value().node_id()); | ||
| executor_worker_id = queue->second.client_address_.value().worker_id(); | ||
| } | ||
|
|
||
| auto do_cancel_local_task = | ||
| [this, | ||
| task_spec = std::move(task_spec), | ||
| task_id, | ||
| force_kill, | ||
| recursive, | ||
| executor_worker_id](const rpc::GcsNodeAddressAndLiveness &node_info) mutable { | ||
| rpc::Address raylet_address; | ||
| raylet_address.set_node_id(node_info.node_id()); | ||
| raylet_address.set_ip_address(node_info.node_manager_address()); | ||
| raylet_address.set_port(node_info.node_manager_port()); | ||
|
|
||
| rpc::CancelLocalTaskRequest request; | ||
| request.set_intended_task_id(task_spec.TaskIdBinary()); | ||
| request.set_force_kill(force_kill); | ||
| request.set_recursive(recursive); | ||
| request.set_caller_worker_id(task_spec.CallerWorkerIdBinary()); | ||
| request.set_executor_worker_id(executor_worker_id); | ||
|
|
||
| auto raylet_client = raylet_client_pool_.GetOrConnectByAddress(raylet_address); | ||
| raylet_client->CancelLocalTask( | ||
| request, | ||
| [this, task_spec = std::move(task_spec), recursive, task_id]( | ||
| const Status &status, const rpc::CancelLocalTaskReply &reply) mutable { | ||
| RAY_LOG(DEBUG).WithField(task_spec.TaskId()) | ||
| << "CancelTask RPC response received with status " << status.ToString(); | ||
|
|
||
| // Keep retrying every 2 seconds until a task is officially | ||
| // finished. | ||
| if (!task_manager_.GetTaskSpec(task_id)) { | ||
|
||
| // Task is already finished. | ||
| RAY_LOG(DEBUG).WithField(task_spec.TaskId()) | ||
| << "Task is finished. Stop a cancel request."; | ||
| return; | ||
| } | ||
|
|
||
| rpc::CancelTaskRequest request; | ||
| request.set_intended_task_id(task_spec.TaskIdBinary()); | ||
| request.set_force_kill(force_kill); | ||
| request.set_recursive(recursive); | ||
| request.set_caller_worker_id(task_spec.CallerWorkerIdBinary()); | ||
| auto client = core_worker_client_pool_.GetOrConnect(*queue->second.client_address_); | ||
| client->CancelTask(request, | ||
| [this, task_spec = std::move(task_spec), recursive, task_id]( | ||
| const Status &status, const rpc::CancelTaskReply &reply) { | ||
| RAY_LOG(DEBUG).WithField(task_spec.TaskId()) | ||
| << "CancelTask RPC response received with status " | ||
| << status.ToString(); | ||
|
|
||
| // Keep retrying every 2 seconds until a task is officially | ||
| // finished. | ||
| if (!task_manager_.GetTaskSpec(task_id)) { | ||
| // Task is already finished. | ||
| RAY_LOG(DEBUG).WithField(task_spec.TaskId()) | ||
| << "Task is finished. Stop a cancel request."; | ||
| return; | ||
| } | ||
|
|
||
| if (!reply.attempt_succeeded()) { | ||
| RetryCancelTask(task_spec, recursive, 2000); | ||
| } | ||
| }); | ||
| if (!reply.attempt_succeeded()) { | ||
Sparks0219 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| RetryCancelTask(std::move(task_spec), recursive, 2000); | ||
| } | ||
| }); | ||
| }; | ||
|
|
||
| // Check GCS node cache. If node info is not in the cache, query the GCS instead. | ||
| auto *node_info = gcs_client_->Nodes().GetNodeAddressAndLiveness( | ||
| node_id, /*filter_dead_nodes=*/false); | ||
Sparks0219 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| if (node_info == nullptr) { | ||
| gcs_client_->Nodes().AsyncGetAllNodeAddressAndLiveness( | ||
| [do_cancel_local_task = std::move(do_cancel_local_task), node_id]( | ||
| const Status &status, | ||
| std::vector<rpc::GcsNodeAddressAndLiveness> &&nodes) mutable { | ||
| if (!status.ok()) { | ||
| RAY_LOG(INFO) << "Failed to get node info from GCS"; | ||
| return; | ||
| } | ||
Sparks0219 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| if (nodes.empty() || nodes[0].state() != rpc::GcsNodeInfo::ALIVE) { | ||
| RAY_LOG(INFO).WithField(node_id) | ||
| << "Not sending CancelLocalTask because node is dead"; | ||
| return; | ||
| } | ||
| do_cancel_local_task(nodes[0]); | ||
| }, | ||
| -1, | ||
| {node_id}); | ||
| return; | ||
| } | ||
| if (node_info->state() == rpc::GcsNodeInfo::DEAD) { | ||
| RAY_LOG(INFO).WithField(node_id) | ||
| << "Not sending CancelLocalTask because node is dead"; | ||
| return; | ||
| } | ||
| do_cancel_local_task(*node_info); | ||
| } | ||
|
|
||
| bool ActorTaskSubmitter::QueueGeneratorForResubmit(const TaskSpecification &spec) { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.