From 0048901b11b2d2e123477b80b6e4b6344b0059b5 Mon Sep 17 00:00:00 2001 From: dayshah Date: Thu, 17 Oct 2024 01:28:52 -0400 Subject: [PATCH 01/19] general cleanup on related files Signed-off-by: dayshah --- .../transport/actor_task_submitter.cc | 16 +++++----------- .../transport/normal_task_submitter.cc | 14 ++++++++------ src/ray/rpc/worker/core_worker_client.h | 16 ++++++++-------- 3 files changed, 21 insertions(+), 25 deletions(-) diff --git a/src/ray/core_worker/transport/actor_task_submitter.cc b/src/ray/core_worker/transport/actor_task_submitter.cc index babd1ba8dc6d..60b2cbdbb2a6 100644 --- a/src/ray/core_worker/transport/actor_task_submitter.cc +++ b/src/ray/core_worker/transport/actor_task_submitter.cc @@ -14,14 +14,8 @@ #include "ray/core_worker/transport/actor_task_submitter.h" -#include - -#include "ray/common/task/task.h" #include "ray/gcs/pb_util.h" -using ray::rpc::ActorTableData; -using namespace ray::gcs; - namespace ray { namespace core { @@ -235,7 +229,7 @@ Status ActorTaskSubmitter::SubmitTask(TaskSpecification task_spec) { absl::MutexLock lock(&mu_); const auto queue_it = client_queues_.find(task_spec.ActorId()); const auto &death_cause = queue_it->second.death_cause; - error_info = GetErrorInfoFromActorDeathCause(death_cause); + error_info = gcs::GetErrorInfoFromActorDeathCause(death_cause); error_type = error_info.error_type(); } auto status = Status::IOError("cancelling task of dead actor"); @@ -366,7 +360,7 @@ void ActorTaskSubmitter::DisconnectActor(const ActorID &actor_id, const rpc::ActorDeathCause &death_cause, bool is_restartable) { RAY_LOG(DEBUG).WithField(actor_id) << "Disconnecting from actor, death context type=" - << GetActorDeathCauseString(death_cause); + << gcs::GetActorDeathCauseString(death_cause); absl::flat_hash_map> inflight_task_callbacks; @@ -432,7 +426,7 @@ void ActorTaskSubmitter::DisconnectActor(const ActorID &actor_id, // Failing tasks has to be done without mu_ hold because the callback // might require holding mu_ which will lead to a deadlock. auto status = Status::IOError("cancelling all pending tasks of dead actor"); - const auto error_info = GetErrorInfoFromActorDeathCause(death_cause); + const auto error_info = gcs::GetErrorInfoFromActorDeathCause(death_cause); const auto error_type = error_info.error_type(); for (auto &task_id : task_ids_to_fail) { @@ -704,7 +698,7 @@ void ActorTaskSubmitter::HandlePushTaskReply(const Status &status, is_actor_dead = queue.state == rpc::ActorTableData::DEAD; if (is_actor_dead) { const auto &death_cause = queue.death_cause; - error_info = GetErrorInfoFromActorDeathCause(death_cause); + error_info = gcs::GetErrorInfoFromActorDeathCause(death_cause); fail_immediately = error_info.has_actor_died_error() && error_info.actor_died_error().has_oom_context() && error_info.actor_died_error().oom_context().fail_immediately(); @@ -953,7 +947,7 @@ Status ActorTaskSubmitter::CancelTask(TaskSpecification task_spec, bool recursiv request.set_recursive(recursive); request.set_caller_worker_id(task_spec.CallerWorkerId().Binary()); client->CancelTask(request, - [this, task_spec, recursive, task_id]( + [this, task_spec = std::move(task_spec), recursive, task_id]( const Status &status, const rpc::CancelTaskReply &reply) { RAY_LOG(DEBUG).WithField(task_spec.TaskId()) << "CancelTask RPC response received with status " diff --git a/src/ray/core_worker/transport/normal_task_submitter.cc b/src/ray/core_worker/transport/normal_task_submitter.cc index 8c3628244a74..6e27ce979c14 100644 --- a/src/ray/core_worker/transport/normal_task_submitter.cc +++ b/src/ray/core_worker/transport/normal_task_submitter.cc @@ -16,7 +16,6 @@ #include "ray/core_worker/transport/dependency_resolver.h" #include "ray/gcs/pb_util.h" -#include "ray/stats/metric_defs.h" namespace ray { namespace core { @@ -65,7 +64,7 @@ Status NormalTaskSubmitter::SubmitTask(TaskSpecification task_spec) { // There are idle workers, so we don't need more // workers. - for (auto active_worker_addr : scheduling_key_entry.active_workers) { + for (const auto &active_worker_addr : scheduling_key_entry.active_workers) { RAY_CHECK(worker_to_lease_entry_.find(active_worker_addr) != worker_to_lease_entry_.end()); auto &lease_entry = worker_to_lease_entry_[active_worker_addr]; @@ -703,7 +702,7 @@ Status NormalTaskSubmitter::CancelTask(TaskSpecification task_spec, bool recursive) { RAY_LOG(INFO) << "Cancelling a task: " << task_spec.TaskId() << " force_kill: " << force_kill << " recursive: " << recursive; - const SchedulingKey scheduling_key( + SchedulingKey scheduling_key( task_spec.GetSchedulingClass(), task_spec.GetDependencyIds(), task_spec.IsActorCreationTask() ? task_spec.ActorCreationId() : ActorID::Nil(), @@ -763,8 +762,11 @@ Status NormalTaskSubmitter::CancelTask(TaskSpecification task_spec, request.set_caller_worker_id(task_spec.CallerWorkerId().Binary()); client->CancelTask( request, - [this, task_spec, scheduling_key, force_kill, recursive]( - const Status &status, const rpc::CancelTaskReply &reply) { + [this, + task_spec = std::move(task_spec), + scheduling_key = std::move(scheduling_key), + force_kill, + recursive](const Status &status, const rpc::CancelTaskReply &reply) mutable { absl::MutexLock lock(&mu_); RAY_LOG(DEBUG) << "CancelTask RPC response received for " << task_spec.TaskId() << " with status " << status.ToString(); @@ -789,7 +791,7 @@ Status NormalTaskSubmitter::CancelTask(TaskSpecification task_spec, cancel_retry_timer_->async_wait( boost::bind(&NormalTaskSubmitter::CancelTask, this, - task_spec, + std::move(task_spec), force_kill, recursive)); } else { diff --git a/src/ray/rpc/worker/core_worker_client.h b/src/ray/rpc/worker/core_worker_client.h index 3dc55da1c8db..12a46b3d5bdc 100644 --- a/src/ray/rpc/worker/core_worker_client.h +++ b/src/ray/rpc/worker/core_worker_client.h @@ -53,10 +53,10 @@ namespace ray { namespace rpc { /// The maximum number of requests in flight per client. -const int64_t kMaxBytesInFlight = 16 * 1024 * 1024; +constexpr int64_t kMaxBytesInFlight = 16L * 1024 * 1024; /// The base size in bytes per request. -const int64_t kBaseRequestSize = 1024; +constexpr int64_t kBaseRequestSize = 1024; /// Get the estimated size in bytes of the given task. const static int64_t RequestSizeInBytes(const PushTaskRequest &request) { @@ -202,7 +202,7 @@ class CoreWorkerClientInterface : public pubsub::SubscriberClientInterface { /// Returns the max acked sequence number, useful for checking on progress. virtual int64_t ClientProcessedUpToSeqno() { return -1; } - virtual ~CoreWorkerClientInterface(){}; + virtual ~CoreWorkerClientInterface() = default; }; /// Client used for communicating with a remote worker server. @@ -373,9 +373,9 @@ class CoreWorkerClient : public std::enable_shared_from_this, { absl::MutexLock lock(&mutex_); - send_queue_.push_back(std::make_pair( - std::move(request), - std::move(const_cast &>(callback)))); + send_queue_.emplace_back(std::move(request), + std::move(const_cast &>( + callback))); // TODO(dayshah) remove const casts } SendRequests(); } @@ -473,8 +473,8 @@ class CoreWorkerClient : public std::enable_shared_from_this, int64_t max_finished_seq_no_ ABSL_GUARDED_BY(mutex_) = -1; }; -typedef std::function(const rpc::Address &)> - ClientFactoryFn; +using ClientFactoryFn = + std::function(const rpc::Address &)>; } // namespace rpc } // namespace ray From 34c8be34ecef8714b88ffd39c9bac08d772fc14c Mon Sep 17 00:00:00 2001 From: dayshah Date: Thu, 17 Oct 2024 12:46:20 -0400 Subject: [PATCH 02/19] minor task submitter header changes Signed-off-by: dayshah --- src/ray/core_worker/transport/normal_task_submitter.h | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/ray/core_worker/transport/normal_task_submitter.h b/src/ray/core_worker/transport/normal_task_submitter.h index b434324008f2..532e3cb85600 100644 --- a/src/ray/core_worker/transport/normal_task_submitter.h +++ b/src/ray/core_worker/transport/normal_task_submitter.h @@ -34,9 +34,8 @@ namespace ray { namespace core { -typedef std::function(const std::string &ip_address, - int port)> - LeaseClientFactoryFn; +using LeaseClientFactoryFn = + std::function(const std::string &, int)>; // The task queues are keyed on resource shape & function descriptor // (encapsulated in SchedulingClass) to defer resource allocation decisions to the raylet @@ -49,7 +48,7 @@ typedef std::function(const std::string &i // be aware of the actor and is not able to manage it. It is also keyed on // RuntimeEnvHash, because a worker can only run a task if the worker's RuntimeEnvHash // matches the RuntimeEnvHash required by the task spec. -typedef int RuntimeEnvHash; +using RuntimeEnvHash = int; using SchedulingKey = std::tuple, ActorID, RuntimeEnvHash>; @@ -64,7 +63,7 @@ class LeaseRequestRateLimiter { // Lease request rate-limiter with fixed number. class StaticLeaseRequestRateLimiter : public LeaseRequestRateLimiter { public: - StaticLeaseRequestRateLimiter(size_t limit) : kLimit(limit) {} + explicit StaticLeaseRequestRateLimiter(size_t limit) : kLimit(limit) {} size_t GetMaxPendingLeaseRequestsPerSchedulingCategory() override { return kLimit; } private: @@ -89,7 +88,7 @@ class NormalTaskSubmitter { const JobID &job_id, std::shared_ptr lease_request_rate_limiter, absl::optional cancel_timer = absl::nullopt) - : rpc_address_(rpc_address), + : rpc_address_(std::move(rpc_address)), local_lease_client_(lease_client), lease_client_factory_(lease_client_factory), lease_policy_(std::move(lease_policy)), From bf649c97f8f1c402d6afa239fc90d7ec9e6a64c1 Mon Sep 17 00:00:00 2001 From: dayshah Date: Wed, 23 Oct 2024 20:35:35 -0400 Subject: [PATCH 03/19] remove const cast Signed-off-by: dayshah --- src/mock/ray/rpc/worker/core_worker_client.h | 2 +- src/ray/core_worker/test/direct_actor_transport_test.cc | 2 +- src/ray/core_worker/transport/actor_task_submitter.cc | 3 ++- src/ray/rpc/worker/core_worker_client.h | 8 +++----- 4 files changed, 7 insertions(+), 8 deletions(-) diff --git a/src/mock/ray/rpc/worker/core_worker_client.h b/src/mock/ray/rpc/worker/core_worker_client.h index abc82eb42999..78d36c22672e 100644 --- a/src/mock/ray/rpc/worker/core_worker_client.h +++ b/src/mock/ray/rpc/worker/core_worker_client.h @@ -22,7 +22,7 @@ class MockCoreWorkerClientInterface : public ray::pubsub::MockSubscriberClientIn PushActorTask, (std::unique_ptr request, bool skip_queue, - const ClientCallback &callback), + ClientCallback &&callback), (override)); MOCK_METHOD(void, PushNormalTask, diff --git a/src/ray/core_worker/test/direct_actor_transport_test.cc b/src/ray/core_worker/test/direct_actor_transport_test.cc index 2c73107acae6..ac3bb29074e3 100644 --- a/src/ray/core_worker/test/direct_actor_transport_test.cc +++ b/src/ray/core_worker/test/direct_actor_transport_test.cc @@ -80,7 +80,7 @@ class MockWorkerClient : public rpc::CoreWorkerClientInterface { void PushActorTask(std::unique_ptr request, bool skip_queue, - const rpc::ClientCallback &callback) override { + rpc::ClientCallback &&callback) override { received_seq_nos.push_back(request->sequence_number()); callbacks.push_back(callback); } diff --git a/src/ray/core_worker/transport/actor_task_submitter.cc b/src/ray/core_worker/transport/actor_task_submitter.cc index 978fef31fc31..fbb561f3bd39 100644 --- a/src/ray/core_worker/transport/actor_task_submitter.cc +++ b/src/ray/core_worker/transport/actor_task_submitter.cc @@ -634,7 +634,8 @@ void ActorTaskSubmitter::PushActorTask(ClientQueue &queue, task_finisher_.MarkTaskWaitingForExecution(task_id, NodeID::FromBinary(addr.raylet_id()), WorkerID::FromBinary(addr.worker_id())); - queue.rpc_client->PushActorTask(std::move(request), skip_queue, wrapped_callback); + queue.rpc_client->PushActorTask( + std::move(request), skip_queue, std::move(wrapped_callback)); } void ActorTaskSubmitter::HandlePushTaskReply(const Status &status, diff --git a/src/ray/rpc/worker/core_worker_client.h b/src/ray/rpc/worker/core_worker_client.h index 12a46b3d5bdc..58d80a19dc9d 100644 --- a/src/ray/rpc/worker/core_worker_client.h +++ b/src/ray/rpc/worker/core_worker_client.h @@ -93,7 +93,7 @@ class CoreWorkerClientInterface : public pubsub::SubscriberClientInterface { /// \return if the rpc call succeeds virtual void PushActorTask(std::unique_ptr request, bool skip_queue, - const ClientCallback &callback) {} + ClientCallback &&callback) {} /// Similar to PushActorTask, but sets no ordering constraint. This is used to /// push non-actor tasks directly to a worker. @@ -356,7 +356,7 @@ class CoreWorkerClient : public std::enable_shared_from_this, void PushActorTask(std::unique_ptr request, bool skip_queue, - const ClientCallback &callback) override { + ClientCallback &&callback) override { if (skip_queue) { // Set this value so that the actor does not skip any tasks when // processing this request. We could also set it to max_finished_seq_no_, @@ -373,9 +373,7 @@ class CoreWorkerClient : public std::enable_shared_from_this, { absl::MutexLock lock(&mutex_); - send_queue_.emplace_back(std::move(request), - std::move(const_cast &>( - callback))); // TODO(dayshah) remove const casts + send_queue_.emplace_back(std::move(request), std::move(callback)); } SendRequests(); } From b88c105d6cabfc97119c7e986175cb03dbd1c5da Mon Sep 17 00:00:00 2001 From: dayshah Date: Thu, 24 Oct 2024 15:48:31 -0400 Subject: [PATCH 04/19] fail task with unresolved dependencies Signed-off-by: dayshah --- .../transport/normal_task_submitter.cc | 29 ++++++++++--------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/src/ray/core_worker/transport/normal_task_submitter.cc b/src/ray/core_worker/transport/normal_task_submitter.cc index 6e27ce979c14..6b011a1e8f00 100644 --- a/src/ray/core_worker/transport/normal_task_submitter.cc +++ b/src/ray/core_worker/transport/normal_task_submitter.cc @@ -516,14 +516,13 @@ void NormalTaskSubmitter::RequestNewWorkerIfNeeded(const SchedulingKey &scheduli auto &task_spec = tasks_to_fail.front(); if (task_spec.IsActorCreationTask() && error_type == rpc::ErrorType::TASK_PLACEMENT_GROUP_REMOVED) { - RAY_UNUSED(task_finisher_->FailPendingTask( - task_spec.TaskId(), - rpc::ErrorType::ACTOR_PLACEMENT_GROUP_REMOVED, - &error_status, - &error_info)); + task_finisher_->FailPendingTask(task_spec.TaskId(), + rpc::ErrorType::ACTOR_PLACEMENT_GROUP_REMOVED, + &error_status, + &error_info); } else { - RAY_UNUSED(task_finisher_->FailPendingTask( - task_spec.TaskId(), error_type, &error_status, &error_info)); + task_finisher_->FailPendingTask( + task_spec.TaskId(), error_type, &error_status, &error_info); } tasks_to_fail.pop_front(); } @@ -629,8 +628,7 @@ void NormalTaskSubmitter::PushNormalTask( if (reply.was_cancelled_before_running()) { RAY_LOG(DEBUG) << "Task " << task_id << " was cancelled before it started running."; - RAY_UNUSED( - task_finisher_->FailPendingTask(task_id, rpc::ErrorType::TASK_CANCELLED)); + task_finisher_->FailPendingTask(task_id, rpc::ErrorType::TASK_CANCELLED); } else if (!task_spec.GetMessage().retry_exceptions() || !reply.is_retryable_error() || !task_finisher_->RetryTaskIfPossible( @@ -727,8 +725,8 @@ Status NormalTaskSubmitter::CancelTask(TaskSpecification task_spec, if (scheduling_tasks.empty()) { CancelWorkerLeaseIfNeeded(scheduling_key); } - RAY_UNUSED(task_finisher_->FailPendingTask(task_spec.TaskId(), - rpc::ErrorType::TASK_CANCELLED)); + task_finisher_->FailPendingTask(task_spec.TaskId(), + rpc::ErrorType::TASK_CANCELLED); return Status::OK(); } } @@ -741,12 +739,17 @@ Status NormalTaskSubmitter::CancelTask(TaskSpecification task_spec, if (rpc_client == executing_tasks_.end()) { // This case is reached for tasks that have unresolved dependencies. - // No executing tasks, so cancelling is a noop. if (scheduling_key_entry.CanDelete()) { // We can safely remove the entry keyed by scheduling_key from the // scheduling_key_entries_ hashmap. scheduling_key_entries_.erase(scheduling_key); + CancelWorkerLeaseIfNeeded(scheduling_key); + task_finisher_->FailPendingTask(task_spec.TaskId(), + rpc::ErrorType::TASK_CANCELLED); + // do we also have to come here and call core worker rpc cancel on the tasks it + // depends on? } + // if there is a parent task, need to get it and then cancel it return Status::OK(); } // Looks for an RPC handle for the worker executing the task. @@ -754,7 +757,7 @@ Status NormalTaskSubmitter::CancelTask(TaskSpecification task_spec, } RAY_CHECK(client != nullptr); - + RAY_LOG(INFO) << "Sending rpc to cancel task to core worker: " << task_spec.TaskId(); auto request = rpc::CancelTaskRequest(); request.set_intended_task_id(task_spec.TaskId().Binary()); request.set_force_kill(force_kill); From 7ff72ebf05f2556531e7d18079d929ded12a2bc1 Mon Sep 17 00:00:00 2001 From: dayshah Date: Mon, 28 Oct 2024 18:05:31 -0400 Subject: [PATCH 05/19] general cleanup linting on some files Signed-off-by: dayshah --- doc/source/ray-contribute/debugging.rst | 4 +-- .../memory_store/memory_store.cc | 1 + src/ray/core_worker/task_manager.cc | 35 ++++++++++--------- src/ray/core_worker/task_manager.h | 7 ++-- 4 files changed, 25 insertions(+), 22 deletions(-) diff --git a/doc/source/ray-contribute/debugging.rst b/doc/source/ray-contribute/debugging.rst index 66a602e891b0..5b31261ba34f 100644 --- a/doc/source/ray-contribute/debugging.rst +++ b/doc/source/ray-contribute/debugging.rst @@ -1,7 +1,7 @@ Debugging for Ray Developers ============================ -This debugging guide is for contributors to the Ray project. +This debugging guide is for contributors to the Ray project. Starting processes in a debugger -------------------------------- @@ -63,7 +63,7 @@ If it worked, you should see as the first line in ``raylet.err``: .. literalinclude:: /../../src/ray/util/logging.h :language: C - :lines: 52,54 + :lines: 113,120 Backend event stats ------------------- diff --git a/src/ray/core_worker/store_provider/memory_store/memory_store.cc b/src/ray/core_worker/store_provider/memory_store/memory_store.cc index c36c69325fd0..0ab5c82a07bf 100644 --- a/src/ray/core_worker/store_provider/memory_store/memory_store.cc +++ b/src/ray/core_worker/store_provider/memory_store/memory_store.cc @@ -103,6 +103,7 @@ bool GetRequest::Wait(int64_t timeout_ms) { auto remaining_timeout_ms = timeout_ms; auto timeout_timestamp = current_time_ms() + timeout_ms; while (!is_ready_) { + // TODO (dayshah): see if using cv condition function instead of busy while helps. auto status = cv_.wait_for(lock, std::chrono::milliseconds(remaining_timeout_ms)); auto current_timestamp = current_time_ms(); remaining_timeout_ms = diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index 7b6d33cbc2d6..82f38a488a27 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -16,8 +16,6 @@ #include "ray/common/buffer.h" #include "ray/common/common_protocol.h" -#include "ray/common/constants.h" -#include "ray/core_worker/common.h" #include "ray/gcs/pb_util.h" #include "ray/util/exponential_backoff.h" #include "ray/util/util.h" @@ -237,7 +235,9 @@ std::vector TaskManager::AddPendingTask( // Add new owned objects for the return values of the task. size_t num_returns = spec.NumReturns(); std::vector returned_refs; + returned_refs.reserve(num_returns); std::vector return_ids; + return_ids.reserve(num_returns); for (size_t i = 0; i < num_returns; i++) { auto return_id = spec.ReturnId(i); if (!spec.IsActorCreationTask()) { @@ -252,7 +252,7 @@ std::vector TaskManager::AddPendingTask( // language frontend. Note that the language bindings should set // skip_adding_local_ref=True to avoid double referencing the object. reference_counter_->AddOwnedObject(return_id, - /*inner_ids=*/{}, + /*contained_ids=*/{}, caller_address, call_site, -1, @@ -479,6 +479,7 @@ bool TaskManager::HandleTaskReturn(const ObjectID &object_id, rpc::Address owner_address; if (reference_counter_->GetOwner(object_id, &owner_address) && !nested_refs.empty()) { std::vector nested_ids; + nested_ids.reserve(nested_refs.size()); for (const auto &nested_ref : nested_refs) { nested_ids.emplace_back(ObjectRefToId(nested_ref)); } @@ -705,7 +706,7 @@ bool TaskManager::HandleReportGeneratorItemReturns( HandleTaskReturn(object_id, return_object, NodeID::FromBinary(request.worker_addr().raylet_id()), - /*store_in_plasma*/ store_in_plasma_ids.count(object_id)); + /*store_in_plasma*/ store_in_plasma_ids.contains(object_id)); } // Handle backpressure if needed. @@ -792,7 +793,7 @@ void TaskManager::CompletePendingTask(const TaskID &task_id, if (!HandleTaskReturn(object_id, return_object, NodeID::FromBinary(worker_addr.raylet_id()), - store_in_plasma_ids.count(object_id))) { + store_in_plasma_ids.contains(object_id))) { if (first_execution) { dynamic_returns_in_plasma.push_back(object_id); } @@ -805,7 +806,7 @@ void TaskManager::CompletePendingTask(const TaskID &task_id, if (HandleTaskReturn(object_id, return_object, NodeID::FromBinary(worker_addr.raylet_id()), - store_in_plasma_ids.count(object_id))) { + store_in_plasma_ids.contains(object_id))) { direct_return_ids.push_back(object_id); } } @@ -931,7 +932,7 @@ void TaskManager::CompletePendingTask(const TaskID &task_id, HandleTaskReturn(generator_return_id, return_object, NodeID::FromBinary(worker_addr.raylet_id()), - store_in_plasma_ids.count(generator_return_id)); + store_in_plasma_ids.contains(generator_return_id)); } } } @@ -1045,18 +1046,18 @@ void TaskManager::FailPendingTask(const TaskID &task_id, << "Tried to fail task that was not pending " << task_id; spec = it->second.spec; - if (status && status->IsIntentionalSystemExit()) { + if ((status != nullptr) && status->IsIntentionalSystemExit()) { // We don't mark intentional system exit as failures, such as tasks that // exit by exit_actor(), exit by ray.shutdown(), etc. These tasks are expected // to exit and not be marked as failure. SetTaskStatus(it->second, rpc::TaskStatus::FINISHED); } else { - SetTaskStatus( - it->second, - rpc::TaskStatus::FAILED, - (ray_error_info == nullptr - ? gcs::GetRayErrorInfo(error_type, (status ? status->ToString() : "")) - : *ray_error_info)); + SetTaskStatus(it->second, + rpc::TaskStatus::FAILED, + (ray_error_info == nullptr + ? gcs::GetRayErrorInfo( + error_type, (status != nullptr ? status->ToString() : "")) + : *ray_error_info)); } submissible_tasks_.erase(it); num_pending_tasks_--; @@ -1306,7 +1307,7 @@ void TaskManager::MarkTaskReturnObjectsFailed( int64_t num_returns = spec.NumReturns(); for (int i = 0; i < num_returns; i++) { const auto object_id = ObjectID::FromIndex(task_id, /*index=*/i + 1); - if (store_in_plasma_ids.count(object_id)) { + if (store_in_plasma_ids.contains(object_id)) { put_in_local_plasma_callback_(error, object_id); } else { in_memory_store_->Put(error, object_id); @@ -1314,7 +1315,7 @@ void TaskManager::MarkTaskReturnObjectsFailed( } if (spec.ReturnsDynamic()) { for (const auto &dynamic_return_id : spec.DynamicReturnIds()) { - if (store_in_plasma_ids.count(dynamic_return_id)) { + if (store_in_plasma_ids.contains(dynamic_return_id)) { put_in_local_plasma_callback_(error, dynamic_return_id); } else { in_memory_store_->Put(error, dynamic_return_id); @@ -1339,7 +1340,7 @@ void TaskManager::MarkTaskReturnObjectsFailed( auto num_streaming_generator_returns = spec.NumStreamingGeneratorReturns(); for (size_t i = 0; i < num_streaming_generator_returns; i++) { const auto generator_return_id = spec.StreamingGeneratorReturnId(i); - if (store_in_plasma_ids.count(generator_return_id)) { + if (store_in_plasma_ids.contains(generator_return_id)) { put_in_local_plasma_callback_(error, generator_return_id); } else { in_memory_store_->Put(error, generator_return_id); diff --git a/src/ray/core_worker/task_manager.h b/src/ray/core_worker/task_manager.h index 941ea514d777..0cc2c3edf145 100644 --- a/src/ray/core_worker/task_manager.h +++ b/src/ray/core_worker/task_manager.h @@ -66,14 +66,14 @@ class TaskFinisherInterface { virtual absl::optional GetTaskSpec(const TaskID &task_id) const = 0; - virtual ~TaskFinisherInterface() {} + virtual ~TaskFinisherInterface() = default; }; class TaskResubmissionInterface { public: virtual bool ResubmitTask(const TaskID &task_id, std::vector *task_deps) = 0; - virtual ~TaskResubmissionInterface() {} + virtual ~TaskResubmissionInterface() = default; }; using TaskStatusCounter = CounterMap>; @@ -222,7 +222,7 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa max_lineage_bytes_(max_lineage_bytes), task_event_buffer_(task_event_buffer) { task_counter_.SetOnChangeCallback( - [this](const std::tuple key) + [this](const std::tuple &key) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_) { ray::stats::STATS_tasks.Record( task_counter_.Get(key), @@ -643,6 +643,7 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa num_retries_left(num_retries_left_arg), counter(counter), num_oom_retries_left(num_oom_retries_left) { + reconstructable_return_ids.reserve(num_returns); for (size_t i = 0; i < num_returns; i++) { reconstructable_return_ids.insert(spec.ReturnId(i)); } From 1293f86a331215e3f6f7d99aaffd71291d9d0f68 Mon Sep 17 00:00:00 2001 From: dayshah Date: Mon, 28 Oct 2024 19:22:38 -0400 Subject: [PATCH 06/19] adding test and moving cancelled callback Signed-off-by: dayshah --- python/ray/tests/test_cancel.py | 21 +++++++++++++++++++ src/ray/core_worker/core_worker.cc | 4 ++-- src/ray/core_worker/core_worker.h | 2 +- .../transport/normal_task_submitter.cc | 4 ---- 4 files changed, 24 insertions(+), 7 deletions(-) diff --git a/python/ray/tests/test_cancel.py b/python/ray/tests/test_cancel.py index 5cdf44218db9..541c6f1081b5 100644 --- a/python/ray/tests/test_cancel.py +++ b/python/ray/tests/test_cancel.py @@ -587,6 +587,27 @@ def verify(): wait_for_condition(verify) +@pytest.mark.parametrize("use_force", [True, False]) +def test_cancel_with_dependency(shutdown_only, use_force): + ray.init(num_cpus=4) + + @ray.remote(num_cpus=1) + def inner(): + while True: + time.sleep(0.1) + return 1 + + @ray.remote(num_cpus=1) + def square(x): + return x * x + + wait_forever = inner.remote() + wait_forever_as_dep = square.remote(wait_forever) + ray.cancel(wait_forever_as_dep) + with pytest.raises(valid_exceptions(use_force)): + ray.get(wait_forever_as_dep) + + @pytest.mark.skip("Actor cancelation works now.") def test_recursive_cancel_error_messages(shutdown_only, capsys): """ diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 61052f231eda..ca4bc812c100 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -4030,7 +4030,7 @@ void CoreWorker::HandleCancelTask(rpc::CancelTaskRequest request, RAY_LOG(INFO).WithField(task_id).WithField(current_actor_id) << "Cancel an actor task"; CancelActorTaskOnExecutor( - caller_worker_id, task_id, force_kill, recursive, on_cancel_callback); + caller_worker_id, task_id, force_kill, recursive, std::move(on_cancel_callback)); } else { RAY_CHECK(current_actor_id.IsNil()); RAY_LOG(INFO).WithField(task_id) << "Cancel a normal task"; @@ -4041,7 +4041,7 @@ void CoreWorker::HandleCancelTask(rpc::CancelTaskRequest request, void CoreWorker::CancelTaskOnExecutor(TaskID task_id, bool force_kill, bool recursive, - OnCanceledCallback on_canceled) { + const OnCanceledCallback &on_canceled) { bool requested_task_running; { absl::MutexLock lock(&mutex_); diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 7eb1dca83891..6db9a41b9c7c 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -1733,7 +1733,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { void CancelTaskOnExecutor(TaskID intended_task_id, bool force_kill, bool recursive, - OnCanceledCallback on_canceled); + const OnCanceledCallback &on_canceled); /// Cancel an actor task queued or running in the current worker. /// diff --git a/src/ray/core_worker/transport/normal_task_submitter.cc b/src/ray/core_worker/transport/normal_task_submitter.cc index 6b011a1e8f00..41211fa3ab3b 100644 --- a/src/ray/core_worker/transport/normal_task_submitter.cc +++ b/src/ray/core_worker/transport/normal_task_submitter.cc @@ -746,10 +746,7 @@ Status NormalTaskSubmitter::CancelTask(TaskSpecification task_spec, CancelWorkerLeaseIfNeeded(scheduling_key); task_finisher_->FailPendingTask(task_spec.TaskId(), rpc::ErrorType::TASK_CANCELLED); - // do we also have to come here and call core worker rpc cancel on the tasks it - // depends on? } - // if there is a parent task, need to get it and then cancel it return Status::OK(); } // Looks for an RPC handle for the worker executing the task. @@ -757,7 +754,6 @@ Status NormalTaskSubmitter::CancelTask(TaskSpecification task_spec, } RAY_CHECK(client != nullptr); - RAY_LOG(INFO) << "Sending rpc to cancel task to core worker: " << task_spec.TaskId(); auto request = rpc::CancelTaskRequest(); request.set_intended_task_id(task_spec.TaskId().Binary()); request.set_force_kill(force_kill); From 24f0635df18fa247a3fa87a87f143cad74086791 Mon Sep 17 00:00:00 2001 From: dayshah Date: Mon, 28 Oct 2024 19:47:01 -0400 Subject: [PATCH 07/19] address test comments Signed-off-by: dayshah --- python/ray/tests/test_cancel.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/python/ray/tests/test_cancel.py b/python/ray/tests/test_cancel.py index 541c6f1081b5..07ac41e5c974 100644 --- a/python/ray/tests/test_cancel.py +++ b/python/ray/tests/test_cancel.py @@ -592,17 +592,16 @@ def test_cancel_with_dependency(shutdown_only, use_force): ray.init(num_cpus=4) @ray.remote(num_cpus=1) - def inner(): + def wait_forever_task(): while True: - time.sleep(0.1) - return 1 + time.sleep(1000) @ray.remote(num_cpus=1) def square(x): return x * x - wait_forever = inner.remote() - wait_forever_as_dep = square.remote(wait_forever) + wait_forever_obj = wait_forever_task.remote() + wait_forever_as_dep = square.remote(wait_forever_obj) ray.cancel(wait_forever_as_dep) with pytest.raises(valid_exceptions(use_force)): ray.get(wait_forever_as_dep) From 877242bc235fd379015ce981b2e118115be37218 Mon Sep 17 00:00:00 2001 From: dayshah Date: Mon, 28 Oct 2024 19:51:46 -0400 Subject: [PATCH 08/19] store in plasma equals Signed-off-by: dayshah --- src/ray/core_worker/task_manager.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index 82f38a488a27..a3a05de62cfe 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -706,7 +706,7 @@ bool TaskManager::HandleReportGeneratorItemReturns( HandleTaskReturn(object_id, return_object, NodeID::FromBinary(request.worker_addr().raylet_id()), - /*store_in_plasma*/ store_in_plasma_ids.contains(object_id)); + /*store_in_plasma=*/store_in_plasma_ids.contains(object_id)); } // Handle backpressure if needed. From 34f8199b3839e3dabd51ca258a11deadc23ebbf4 Mon Sep 17 00:00:00 2001 From: dayshah Date: Tue, 29 Oct 2024 16:27:51 -0400 Subject: [PATCH 09/19] reorder cancel worker lease for test Signed-off-by: dayshah --- src/ray/core_worker/transport/normal_task_submitter.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/ray/core_worker/transport/normal_task_submitter.cc b/src/ray/core_worker/transport/normal_task_submitter.cc index 41211fa3ab3b..c7c427456470 100644 --- a/src/ray/core_worker/transport/normal_task_submitter.cc +++ b/src/ray/core_worker/transport/normal_task_submitter.cc @@ -740,12 +740,12 @@ Status NormalTaskSubmitter::CancelTask(TaskSpecification task_spec, if (rpc_client == executing_tasks_.end()) { // This case is reached for tasks that have unresolved dependencies. if (scheduling_key_entry.CanDelete()) { - // We can safely remove the entry keyed by scheduling_key from the - // scheduling_key_entries_ hashmap. - scheduling_key_entries_.erase(scheduling_key); CancelWorkerLeaseIfNeeded(scheduling_key); task_finisher_->FailPendingTask(task_spec.TaskId(), rpc::ErrorType::TASK_CANCELLED); + // We can safely remove the entry keyed by scheduling_key from the + // scheduling_key_entries_ hashmap. + scheduling_key_entries_.erase(scheduling_key); } return Status::OK(); } From b667628926cd67c58844dd9b3ae1e8d5a95a6e91 Mon Sep 17 00:00:00 2001 From: dayshah Date: Thu, 31 Oct 2024 16:42:42 -0400 Subject: [PATCH 10/19] update resolver accordingly Signed-off-by: dayshah --- src/ray/core_worker/transport/normal_task_submitter.cc | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/ray/core_worker/transport/normal_task_submitter.cc b/src/ray/core_worker/transport/normal_task_submitter.cc index c7c427456470..19bb8288d7bc 100644 --- a/src/ray/core_worker/transport/normal_task_submitter.cc +++ b/src/ray/core_worker/transport/normal_task_submitter.cc @@ -82,10 +82,6 @@ Status NormalTaskSubmitter::SubmitTask(TaskSpecification task_spec) { RequestNewWorkerIfNeeded(scheduling_key); } } - if (!keep_executing) { - RAY_UNUSED(task_finisher_->FailOrRetryPendingTask( - task_spec.TaskId(), rpc::ErrorType::TASK_CANCELLED, nullptr)); - } }); return Status::OK(); } @@ -743,6 +739,8 @@ Status NormalTaskSubmitter::CancelTask(TaskSpecification task_spec, CancelWorkerLeaseIfNeeded(scheduling_key); task_finisher_->FailPendingTask(task_spec.TaskId(), rpc::ErrorType::TASK_CANCELLED); + // we want to remove from resolver's pending task as well + resolver_.CancelDependencyResolution(task_spec.TaskId()); // We can safely remove the entry keyed by scheduling_key from the // scheduling_key_entries_ hashmap. scheduling_key_entries_.erase(scheduling_key); From d2c325905223c49827b991720b4ea62abcf81901 Mon Sep 17 00:00:00 2001 From: dayshah Date: Fri, 1 Nov 2024 15:00:04 -0400 Subject: [PATCH 11/19] fix cpp test Signed-off-by: dayshah --- .../test/normal_task_submitter_test.cc | 3 +- .../transport/normal_task_submitter.cc | 74 +++++++++---------- 2 files changed, 36 insertions(+), 41 deletions(-) diff --git a/src/ray/core_worker/test/normal_task_submitter_test.cc b/src/ray/core_worker/test/normal_task_submitter_test.cc index 4b3ccda7f4a9..bf43037b8e2b 100644 --- a/src/ray/core_worker/test/normal_task_submitter_test.cc +++ b/src/ray/core_worker/test/normal_task_submitter_test.cc @@ -138,6 +138,7 @@ class MockTaskFinisher : public TaskFinisherInterface { const Status *status, const rpc::RayErrorInfo *ray_error_info = nullptr) override { num_fail_pending_task_calls++; + num_tasks_failed++; } bool FailOrRetryPendingTask(const TaskID &task_id, @@ -2044,7 +2045,7 @@ TEST(NormalTaskSubmitterTest, TestKillPendingTask) { ASSERT_EQ(raylet_client->num_workers_returned, 0); ASSERT_EQ(raylet_client->num_workers_disconnected, 0); ASSERT_EQ(task_finisher->num_tasks_complete, 0); - ASSERT_EQ(task_finisher->num_tasks_failed, 0); + ASSERT_EQ(task_finisher->num_tasks_failed, 1); ASSERT_EQ(task_finisher->num_fail_pending_task_calls, 1); ASSERT_EQ(raylet_client->num_leases_canceled, 1); ASSERT_TRUE(raylet_client->ReplyCancelWorkerLease()); diff --git a/src/ray/core_worker/transport/normal_task_submitter.cc b/src/ray/core_worker/transport/normal_task_submitter.cc index 19bb8288d7bc..e24fa2cc5f9b 100644 --- a/src/ray/core_worker/transport/normal_task_submitter.cc +++ b/src/ray/core_worker/transport/normal_task_submitter.cc @@ -38,49 +38,43 @@ Status NormalTaskSubmitter::SubmitTask(TaskSpecification task_spec) { } RAY_LOG(DEBUG) << "Task dependencies resolved " << task_spec.TaskId(); - bool keep_executing = true; - { - absl::MutexLock lock(&mu_); - if (cancelled_tasks_.find(task_spec.TaskId()) != cancelled_tasks_.end()) { - cancelled_tasks_.erase(task_spec.TaskId()); - keep_executing = false; - } - if (keep_executing) { - task_spec.GetMutableMessage().set_dependency_resolution_timestamp_ms( - current_sys_time_ms()); - // Note that the dependencies in the task spec are mutated to only contain - // plasma dependencies after ResolveDependencies finishes. - const SchedulingKey scheduling_key(task_spec.GetSchedulingClass(), - task_spec.GetDependencyIds(), - task_spec.IsActorCreationTask() - ? task_spec.ActorCreationId() - : ActorID::Nil(), - task_spec.GetRuntimeEnvHash()); - auto &scheduling_key_entry = scheduling_key_entries_[scheduling_key]; - scheduling_key_entry.task_queue.push_back(task_spec); - scheduling_key_entry.resource_spec = task_spec; - - if (!scheduling_key_entry.AllWorkersBusy()) { - // There are idle workers, so we don't need more - // workers. - - for (const auto &active_worker_addr : scheduling_key_entry.active_workers) { - RAY_CHECK(worker_to_lease_entry_.find(active_worker_addr) != - worker_to_lease_entry_.end()); - auto &lease_entry = worker_to_lease_entry_[active_worker_addr]; - if (!lease_entry.is_busy) { - OnWorkerIdle(active_worker_addr, - scheduling_key, - /*was_error*/ false, - /*error_detail*/ "", - /*worker_exiting*/ false, - lease_entry.assigned_resources); - break; - } + absl::MutexLock lock(&mu_); + if (cancelled_tasks_.find(task_spec.TaskId()) != cancelled_tasks_.end()) { + cancelled_tasks_.erase(task_spec.TaskId()); + } else { + task_spec.GetMutableMessage().set_dependency_resolution_timestamp_ms( + current_sys_time_ms()); + // Note that the dependencies in the task spec are mutated to only contain + // plasma dependencies after ResolveDependencies finishes. + const SchedulingKey scheduling_key( + task_spec.GetSchedulingClass(), + task_spec.GetDependencyIds(), + task_spec.IsActorCreationTask() ? task_spec.ActorCreationId() : ActorID::Nil(), + task_spec.GetRuntimeEnvHash()); + auto &scheduling_key_entry = scheduling_key_entries_[scheduling_key]; + scheduling_key_entry.task_queue.push_back(task_spec); + scheduling_key_entry.resource_spec = task_spec; + + if (!scheduling_key_entry.AllWorkersBusy()) { + // There are idle workers, so we don't need more + // workers. + + for (const auto &active_worker_addr : scheduling_key_entry.active_workers) { + RAY_CHECK(worker_to_lease_entry_.find(active_worker_addr) != + worker_to_lease_entry_.end()); + auto &lease_entry = worker_to_lease_entry_[active_worker_addr]; + if (!lease_entry.is_busy) { + OnWorkerIdle(active_worker_addr, + scheduling_key, + /*was_error*/ false, + /*error_detail*/ "", + /*worker_exiting*/ false, + lease_entry.assigned_resources); + break; } } - RequestNewWorkerIfNeeded(scheduling_key); } + RequestNewWorkerIfNeeded(scheduling_key); } }); return Status::OK(); From 869327ec1598ec4e191fcd40eb6e34f89f839cc3 Mon Sep 17 00:00:00 2001 From: dayshah Date: Thu, 7 Nov 2024 11:17:10 -0500 Subject: [PATCH 12/19] allow failpendingtask call on finished task Signed-off-by: dayshah --- src/ray/core_worker/task_manager.cc | 13 +++++++++---- src/ray/core_worker/task_manager.h | 7 +++---- .../core_worker/transport/normal_task_submitter.cc | 14 +++++--------- 3 files changed, 17 insertions(+), 17 deletions(-) diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index b0f5c6902118..dd329b60f8ac 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -24,10 +24,10 @@ namespace ray { namespace core { // Start throttling task failure logs once we hit this threshold. -const int64_t kTaskFailureThrottlingThreshold = 50; +constexpr int64_t kTaskFailureThrottlingThreshold = 50; // Throttle task failure logs to once this interval. -const int64_t kTaskFailureLoggingFrequencyMillis = 5000; +constexpr int64_t kTaskFailureLoggingFrequencyMillis = 5000; absl::flat_hash_set ObjectRefStream::GetItemsUnconsumed() const { absl::flat_hash_set result; @@ -1043,8 +1043,13 @@ void TaskManager::FailPendingTask(const TaskID &task_id, auto it = submissible_tasks_.find(task_id); RAY_CHECK(it != submissible_tasks_.end()) << "Tried to fail task that was not pending " << task_id; - RAY_CHECK(it->second.IsPending()) - << "Tried to fail task that was not pending " << task_id; + // task was finished by the time it got to cancelling here + if (it->second.GetStatus() == rpc::TaskStatus::FINISHED) { + submissible_tasks_.erase(it); + return; + } + RAY_CHECK(it->second.GetStatus() != rpc::TaskStatus::FAILED) + << "Tried to fail task that was already failed " << task_id; spec = it->second.spec; if ((status != nullptr) && status->IsIntentionalSystemExit()) { diff --git a/src/ray/core_worker/task_manager.h b/src/ray/core_worker/task_manager.h index e8167c9a1546..df0b26f3d0d9 100644 --- a/src/ray/core_worker/task_manager.h +++ b/src/ray/core_worker/task_manager.h @@ -625,10 +625,9 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa for (size_t i = 0; i < num_returns; i++) { reconstructable_return_ids.insert(spec.ReturnId(i)); } - auto new_status = + status = std::make_tuple(spec.GetName(), rpc::TaskStatus::PENDING_ARGS_AVAIL, false); - counter.Increment(new_status); - status = new_status; + counter.Increment(status); } void SetStatus(rpc::TaskStatus new_status) { @@ -641,7 +640,7 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa // for FINISHED and FAILED tasks. counter.Increment(new_tuple); } - status = new_tuple; + status = std::move(new_tuple); } void MarkRetry() { is_retry_ = true; } diff --git a/src/ray/core_worker/transport/normal_task_submitter.cc b/src/ray/core_worker/transport/normal_task_submitter.cc index e24fa2cc5f9b..e548fcd3e5a1 100644 --- a/src/ray/core_worker/transport/normal_task_submitter.cc +++ b/src/ray/core_worker/transport/normal_task_submitter.cc @@ -711,10 +711,7 @@ Status NormalTaskSubmitter::CancelTask(TaskSpecification task_spec, for (auto spec = scheduling_tasks.begin(); spec != scheduling_tasks.end(); spec++) { if (spec->TaskId() == task_spec.TaskId()) { scheduling_tasks.erase(spec); - - if (scheduling_tasks.empty()) { - CancelWorkerLeaseIfNeeded(scheduling_key); - } + CancelWorkerLeaseIfNeeded(scheduling_key); task_finisher_->FailPendingTask(task_spec.TaskId(), rpc::ErrorType::TASK_CANCELLED); return Status::OK(); @@ -729,12 +726,11 @@ Status NormalTaskSubmitter::CancelTask(TaskSpecification task_spec, if (rpc_client == executing_tasks_.end()) { // This case is reached for tasks that have unresolved dependencies. + CancelWorkerLeaseIfNeeded(scheduling_key); + task_finisher_->FailPendingTask(task_spec.TaskId(), rpc::ErrorType::TASK_CANCELLED); + // we want to remove from resolver's pending task as well + resolver_.CancelDependencyResolution(task_spec.TaskId()); if (scheduling_key_entry.CanDelete()) { - CancelWorkerLeaseIfNeeded(scheduling_key); - task_finisher_->FailPendingTask(task_spec.TaskId(), - rpc::ErrorType::TASK_CANCELLED); - // we want to remove from resolver's pending task as well - resolver_.CancelDependencyResolution(task_spec.TaskId()); // We can safely remove the entry keyed by scheduling_key from the // scheduling_key_entries_ hashmap. scheduling_key_entries_.erase(scheduling_key); From 28a8f25ce3a9f0d446bb33b48f0cd8b9504fe60c Mon Sep 17 00:00:00 2001 From: dayshah Date: Thu, 7 Nov 2024 15:08:55 -0500 Subject: [PATCH 13/19] dont move on cancel callback Signed-off-by: dayshah --- src/ray/core_worker/core_worker.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 8355f1303ee1..dc751da39517 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -4036,7 +4036,7 @@ void CoreWorker::HandleCancelTask(rpc::CancelTaskRequest request, RAY_LOG(INFO).WithField(task_id).WithField(current_actor_id) << "Cancel an actor task"; CancelActorTaskOnExecutor( - caller_worker_id, task_id, force_kill, recursive, std::move(on_cancel_callback)); + caller_worker_id, task_id, force_kill, recursive, on_cancel_callback); } else { RAY_CHECK(current_actor_id.IsNil()); RAY_LOG(INFO).WithField(task_id) << "Cancel a normal task"; From 5e089b9e06b1b730438e71ed630aaea91e296448 Mon Sep 17 00:00:00 2001 From: dayshah Date: Thu, 7 Nov 2024 15:11:40 -0500 Subject: [PATCH 14/19] use force in test Signed-off-by: dayshah --- python/ray/tests/test_cancel.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/tests/test_cancel.py b/python/ray/tests/test_cancel.py index 07ac41e5c974..658d77379345 100644 --- a/python/ray/tests/test_cancel.py +++ b/python/ray/tests/test_cancel.py @@ -602,7 +602,7 @@ def square(x): wait_forever_obj = wait_forever_task.remote() wait_forever_as_dep = square.remote(wait_forever_obj) - ray.cancel(wait_forever_as_dep) + ray.cancel(wait_forever_as_dep, force=use_force) with pytest.raises(valid_exceptions(use_force)): ray.get(wait_forever_as_dep) From 9f8655ad5d06e333dfe9ade9ba475073ca6b8afc Mon Sep 17 00:00:00 2001 From: dayshah Date: Thu, 7 Nov 2024 15:42:06 -0500 Subject: [PATCH 15/19] move on cancel callback Signed-off-by: dayshah --- src/ray/core_worker/core_worker.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index dc751da39517..8355f1303ee1 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -4036,7 +4036,7 @@ void CoreWorker::HandleCancelTask(rpc::CancelTaskRequest request, RAY_LOG(INFO).WithField(task_id).WithField(current_actor_id) << "Cancel an actor task"; CancelActorTaskOnExecutor( - caller_worker_id, task_id, force_kill, recursive, on_cancel_callback); + caller_worker_id, task_id, force_kill, recursive, std::move(on_cancel_callback)); } else { RAY_CHECK(current_actor_id.IsNil()); RAY_LOG(INFO).WithField(task_id) << "Cancel a normal task"; From 0bcb639e7a10c21ead99e51550883bff75957b96 Mon Sep 17 00:00:00 2001 From: dayshah Date: Thu, 7 Nov 2024 18:01:37 -0500 Subject: [PATCH 16/19] remove cancelled tasks in task submitter Signed-off-by: dayshah --- .../transport/dependency_resolver.cc | 8 ++- .../transport/normal_task_submitter.cc | 69 +++++++++---------- .../transport/normal_task_submitter.h | 3 - 3 files changed, 36 insertions(+), 44 deletions(-) diff --git a/src/ray/core_worker/transport/dependency_resolver.cc b/src/ray/core_worker/transport/dependency_resolver.cc index 997f770ca1de..d4543e4b9ce7 100644 --- a/src/ray/core_worker/transport/dependency_resolver.cc +++ b/src/ray/core_worker/transport/dependency_resolver.cc @@ -91,8 +91,10 @@ void LocalDependencyResolver::ResolveDependencies( // This is deleted when the last dependency fetch callback finishes. auto inserted = pending_tasks_.emplace( task_id, - std::make_unique( - task, local_dependency_ids, actor_dependency_ids, on_dependencies_resolved)); + std::make_unique(task, + local_dependency_ids, + actor_dependency_ids, + std::move(on_dependencies_resolved))); RAY_CHECK(inserted.second); } @@ -137,7 +139,7 @@ void LocalDependencyResolver::ResolveDependencies( for (const auto &actor_id : actor_dependency_ids) { actor_creator_.AsyncWaitForActorRegisterFinish( - actor_id, [this, task_id, on_dependencies_resolved](const Status &status) { + actor_id, [this, task_id](const Status &status) { std::unique_ptr resolved_task_state = nullptr; { diff --git a/src/ray/core_worker/transport/normal_task_submitter.cc b/src/ray/core_worker/transport/normal_task_submitter.cc index e548fcd3e5a1..3cb2a124365f 100644 --- a/src/ray/core_worker/transport/normal_task_submitter.cc +++ b/src/ray/core_worker/transport/normal_task_submitter.cc @@ -39,43 +39,39 @@ Status NormalTaskSubmitter::SubmitTask(TaskSpecification task_spec) { RAY_LOG(DEBUG) << "Task dependencies resolved " << task_spec.TaskId(); absl::MutexLock lock(&mu_); - if (cancelled_tasks_.find(task_spec.TaskId()) != cancelled_tasks_.end()) { - cancelled_tasks_.erase(task_spec.TaskId()); - } else { - task_spec.GetMutableMessage().set_dependency_resolution_timestamp_ms( - current_sys_time_ms()); - // Note that the dependencies in the task spec are mutated to only contain - // plasma dependencies after ResolveDependencies finishes. - const SchedulingKey scheduling_key( - task_spec.GetSchedulingClass(), - task_spec.GetDependencyIds(), - task_spec.IsActorCreationTask() ? task_spec.ActorCreationId() : ActorID::Nil(), - task_spec.GetRuntimeEnvHash()); - auto &scheduling_key_entry = scheduling_key_entries_[scheduling_key]; - scheduling_key_entry.task_queue.push_back(task_spec); - scheduling_key_entry.resource_spec = task_spec; - - if (!scheduling_key_entry.AllWorkersBusy()) { - // There are idle workers, so we don't need more - // workers. - - for (const auto &active_worker_addr : scheduling_key_entry.active_workers) { - RAY_CHECK(worker_to_lease_entry_.find(active_worker_addr) != - worker_to_lease_entry_.end()); - auto &lease_entry = worker_to_lease_entry_[active_worker_addr]; - if (!lease_entry.is_busy) { - OnWorkerIdle(active_worker_addr, - scheduling_key, - /*was_error*/ false, - /*error_detail*/ "", - /*worker_exiting*/ false, - lease_entry.assigned_resources); - break; - } + task_spec.GetMutableMessage().set_dependency_resolution_timestamp_ms( + current_sys_time_ms()); + // Note that the dependencies in the task spec are mutated to only contain + // plasma dependencies after ResolveDependencies finishes. + const SchedulingKey scheduling_key( + task_spec.GetSchedulingClass(), + task_spec.GetDependencyIds(), + task_spec.IsActorCreationTask() ? task_spec.ActorCreationId() : ActorID::Nil(), + task_spec.GetRuntimeEnvHash()); + auto &scheduling_key_entry = scheduling_key_entries_[scheduling_key]; + scheduling_key_entry.task_queue.push_back(task_spec); + scheduling_key_entry.resource_spec = task_spec; + + if (!scheduling_key_entry.AllWorkersBusy()) { + // There are idle workers, so we don't need more + // workers. + + for (const auto &active_worker_addr : scheduling_key_entry.active_workers) { + RAY_CHECK(worker_to_lease_entry_.find(active_worker_addr) != + worker_to_lease_entry_.end()); + auto &lease_entry = worker_to_lease_entry_[active_worker_addr]; + if (!lease_entry.is_busy) { + OnWorkerIdle(active_worker_addr, + scheduling_key, + /*was_error*/ false, + /*error_detail*/ "", + /*worker_exiting*/ false, + lease_entry.assigned_resources); + break; } } - RequestNewWorkerIfNeeded(scheduling_key); } + RequestNewWorkerIfNeeded(scheduling_key); }); return Status::OK(); } @@ -698,8 +694,7 @@ Status NormalTaskSubmitter::CancelTask(TaskSpecification task_spec, std::shared_ptr client = nullptr; { absl::MutexLock lock(&mu_); - if (cancelled_tasks_.find(task_spec.TaskId()) != cancelled_tasks_.end() || - !task_finisher_->MarkTaskCanceled(task_spec.TaskId())) { + if (!task_finisher_->MarkTaskCanceled(task_spec.TaskId())) { return Status::OK(); } @@ -721,7 +716,6 @@ Status NormalTaskSubmitter::CancelTask(TaskSpecification task_spec, // This will get removed either when the RPC call to cancel is returned // or when all dependencies are resolved. - RAY_CHECK(cancelled_tasks_.emplace(task_spec.TaskId()).second); auto rpc_client = executing_tasks_.find(task_spec.TaskId()); if (rpc_client == executing_tasks_.end()) { @@ -757,7 +751,6 @@ Status NormalTaskSubmitter::CancelTask(TaskSpecification task_spec, absl::MutexLock lock(&mu_); RAY_LOG(DEBUG) << "CancelTask RPC response received for " << task_spec.TaskId() << " with status " << status.ToString(); - cancelled_tasks_.erase(task_spec.TaskId()); // Retry is not attempted if !status.ok() because force-kill may kill the worker // before the reply is sent. diff --git a/src/ray/core_worker/transport/normal_task_submitter.h b/src/ray/core_worker/transport/normal_task_submitter.h index 532e3cb85600..157e776731e9 100644 --- a/src/ray/core_worker/transport/normal_task_submitter.h +++ b/src/ray/core_worker/transport/normal_task_submitter.h @@ -360,9 +360,6 @@ class NormalTaskSubmitter { absl::flat_hash_map scheduling_key_entries_ ABSL_GUARDED_BY(mu_); - // Tasks that were cancelled while being resolved. - absl::flat_hash_set cancelled_tasks_ ABSL_GUARDED_BY(mu_); - // Keeps track of where currently executing tasks are being run. absl::flat_hash_map executing_tasks_ ABSL_GUARDED_BY(mu_); From f459cf6d36c32a4977a70944186e056f63e83919 Mon Sep 17 00:00:00 2001 From: dayshah Date: Fri, 8 Nov 2024 00:15:12 -0500 Subject: [PATCH 17/19] reorder and don't cancel worker lease Signed-off-by: dayshah --- src/ray/core_worker/transport/normal_task_submitter.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/ray/core_worker/transport/normal_task_submitter.cc b/src/ray/core_worker/transport/normal_task_submitter.cc index 3cb2a124365f..0c4ef504536d 100644 --- a/src/ray/core_worker/transport/normal_task_submitter.cc +++ b/src/ray/core_worker/transport/normal_task_submitter.cc @@ -720,10 +720,10 @@ Status NormalTaskSubmitter::CancelTask(TaskSpecification task_spec, if (rpc_client == executing_tasks_.end()) { // This case is reached for tasks that have unresolved dependencies. - CancelWorkerLeaseIfNeeded(scheduling_key); - task_finisher_->FailPendingTask(task_spec.TaskId(), rpc::ErrorType::TASK_CANCELLED); - // we want to remove from resolver's pending task as well + // we want to remove from resolver's pending task resolver_.CancelDependencyResolution(task_spec.TaskId()); + task_finisher_->FailPendingTask( + task_spec.TaskId(), rpc::ErrorType::TASK_CANCELLED, nullptr); if (scheduling_key_entry.CanDelete()) { // We can safely remove the entry keyed by scheduling_key from the // scheduling_key_entries_ hashmap. From 085621c3673f2b4c0cc0225b1cf1fa6237682c9c Mon Sep 17 00:00:00 2001 From: dayshah Date: Fri, 8 Nov 2024 00:23:37 -0500 Subject: [PATCH 18/19] inline constexpr Signed-off-by: dayshah --- src/ray/rpc/worker/core_worker_client.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ray/rpc/worker/core_worker_client.h b/src/ray/rpc/worker/core_worker_client.h index 58d80a19dc9d..2e9cdb952891 100644 --- a/src/ray/rpc/worker/core_worker_client.h +++ b/src/ray/rpc/worker/core_worker_client.h @@ -53,10 +53,10 @@ namespace ray { namespace rpc { /// The maximum number of requests in flight per client. -constexpr int64_t kMaxBytesInFlight = 16L * 1024 * 1024; +inline constexpr int64_t kMaxBytesInFlight = 16L * 1024 * 1024; /// The base size in bytes per request. -constexpr int64_t kBaseRequestSize = 1024; +inline constexpr int64_t kBaseRequestSize = 1024; /// Get the estimated size in bytes of the given task. const static int64_t RequestSizeInBytes(const PushTaskRequest &request) { From aae4aa92cb21441c1b6cc2c270b27a16525622a7 Mon Sep 17 00:00:00 2001 From: dayshah Date: Fri, 8 Nov 2024 16:10:25 -0500 Subject: [PATCH 19/19] factor out actual functional changes for issue Signed-off-by: dayshah --- python/ray/tests/test_cancel.py | 20 ----- src/ray/core_worker/task_manager.cc | 11 +-- .../transport/normal_task_submitter.cc | 86 +++++++++++-------- .../transport/normal_task_submitter.h | 3 + 4 files changed, 56 insertions(+), 64 deletions(-) diff --git a/python/ray/tests/test_cancel.py b/python/ray/tests/test_cancel.py index e9cf6cfdb012..c46175670e60 100644 --- a/python/ray/tests/test_cancel.py +++ b/python/ray/tests/test_cancel.py @@ -636,26 +636,6 @@ def verify(): wait_for_condition(verify) -@pytest.mark.parametrize("use_force", [True, False]) -def test_cancel_with_dependency(shutdown_only, use_force): - ray.init(num_cpus=4) - - @ray.remote(num_cpus=1) - def wait_forever_task(): - while True: - time.sleep(1000) - - @ray.remote(num_cpus=1) - def square(x): - return x * x - - wait_forever_obj = wait_forever_task.remote() - wait_forever_as_dep = square.remote(wait_forever_obj) - ray.cancel(wait_forever_as_dep, force=use_force) - with pytest.raises(valid_exceptions(use_force)): - ray.get(wait_forever_as_dep) - - @pytest.mark.skip("Actor cancelation works now.") def test_recursive_cancel_error_messages(shutdown_only, capsys): """ diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index dd329b60f8ac..bc5a78c7862e 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -398,7 +398,7 @@ void TaskManager::DrainAndShutdown(std::function shutdown) { bool TaskManager::IsTaskSubmissible(const TaskID &task_id) const { absl::MutexLock lock(&mu_); - return submissible_tasks_.count(task_id) > 0; + return submissible_tasks_.contains(task_id); } bool TaskManager::IsTaskPending(const TaskID &task_id) const { @@ -1043,13 +1043,8 @@ void TaskManager::FailPendingTask(const TaskID &task_id, auto it = submissible_tasks_.find(task_id); RAY_CHECK(it != submissible_tasks_.end()) << "Tried to fail task that was not pending " << task_id; - // task was finished by the time it got to cancelling here - if (it->second.GetStatus() == rpc::TaskStatus::FINISHED) { - submissible_tasks_.erase(it); - return; - } - RAY_CHECK(it->second.GetStatus() != rpc::TaskStatus::FAILED) - << "Tried to fail task that was already failed " << task_id; + RAY_CHECK(it->second.IsPending()) + << "Tried to fail task that was not pending " << task_id; spec = it->second.spec; if ((status != nullptr) && status->IsIntentionalSystemExit()) { diff --git a/src/ray/core_worker/transport/normal_task_submitter.cc b/src/ray/core_worker/transport/normal_task_submitter.cc index 25f868d8206c..9e8ac970db1b 100644 --- a/src/ray/core_worker/transport/normal_task_submitter.cc +++ b/src/ray/core_worker/transport/normal_task_submitter.cc @@ -38,40 +38,54 @@ Status NormalTaskSubmitter::SubmitTask(TaskSpecification task_spec) { } RAY_LOG(DEBUG) << "Task dependencies resolved " << task_spec.TaskId(); - absl::MutexLock lock(&mu_); - task_spec.GetMutableMessage().set_dependency_resolution_timestamp_ms( - current_sys_time_ms()); - // Note that the dependencies in the task spec are mutated to only contain - // plasma dependencies after ResolveDependencies finishes. - const SchedulingKey scheduling_key( - task_spec.GetSchedulingClass(), - task_spec.GetDependencyIds(), - task_spec.IsActorCreationTask() ? task_spec.ActorCreationId() : ActorID::Nil(), - task_spec.GetRuntimeEnvHash()); - auto &scheduling_key_entry = scheduling_key_entries_[scheduling_key]; - scheduling_key_entry.task_queue.push_back(task_spec); - scheduling_key_entry.resource_spec = task_spec; - - if (!scheduling_key_entry.AllWorkersBusy()) { - // There are idle workers, so we don't need more - // workers. - - for (const auto &active_worker_addr : scheduling_key_entry.active_workers) { - RAY_CHECK(worker_to_lease_entry_.find(active_worker_addr) != - worker_to_lease_entry_.end()); - auto &lease_entry = worker_to_lease_entry_[active_worker_addr]; - if (!lease_entry.is_busy) { - OnWorkerIdle(active_worker_addr, - scheduling_key, - /*was_error*/ false, - /*error_detail*/ "", - /*worker_exiting*/ false, - lease_entry.assigned_resources); - break; + bool keep_executing = true; + { + absl::MutexLock lock(&mu_); + if (cancelled_tasks_.find(task_spec.TaskId()) != cancelled_tasks_.end()) { + cancelled_tasks_.erase(task_spec.TaskId()); + keep_executing = false; + } + if (keep_executing) { + task_spec.GetMutableMessage().set_dependency_resolution_timestamp_ms( + current_sys_time_ms()); + // Note that the dependencies in the task spec are mutated to only contain + // plasma dependencies after ResolveDependencies finishes. + const SchedulingKey scheduling_key(task_spec.GetSchedulingClass(), + task_spec.GetDependencyIds(), + task_spec.IsActorCreationTask() + ? task_spec.ActorCreationId() + : ActorID::Nil(), + task_spec.GetRuntimeEnvHash()); + auto &scheduling_key_entry = scheduling_key_entries_[scheduling_key]; + scheduling_key_entry.task_queue.push_back(task_spec); + scheduling_key_entry.resource_spec = task_spec; + + if (!scheduling_key_entry.AllWorkersBusy()) { + // There are idle workers, so we don't need more + // workers. + + for (const auto &active_worker_addr : scheduling_key_entry.active_workers) { + RAY_CHECK(worker_to_lease_entry_.find(active_worker_addr) != + worker_to_lease_entry_.end()); + auto &lease_entry = worker_to_lease_entry_[active_worker_addr]; + if (!lease_entry.is_busy) { + OnWorkerIdle(active_worker_addr, + scheduling_key, + /*was_error*/ false, + /*error_detail*/ "", + /*worker_exiting*/ false, + lease_entry.assigned_resources); + break; + } + } } + RequestNewWorkerIfNeeded(scheduling_key); } } - RequestNewWorkerIfNeeded(scheduling_key); + if (!keep_executing) { + RAY_UNUSED(task_finisher_->FailOrRetryPendingTask( + task_spec.TaskId(), rpc::ErrorType::TASK_CANCELLED, nullptr)); + } }); return Status::OK(); } @@ -694,7 +708,8 @@ Status NormalTaskSubmitter::CancelTask(TaskSpecification task_spec, std::shared_ptr client = nullptr; { absl::MutexLock lock(&mu_); - if (!task_finisher_->MarkTaskCanceled(task_spec.TaskId())) { + if (cancelled_tasks_.find(task_spec.TaskId()) != cancelled_tasks_.end() || + !task_finisher_->MarkTaskCanceled(task_spec.TaskId())) { return Status::OK(); } @@ -716,14 +731,12 @@ Status NormalTaskSubmitter::CancelTask(TaskSpecification task_spec, // This will get removed either when the RPC call to cancel is returned // or when all dependencies are resolved. + RAY_CHECK(cancelled_tasks_.emplace(task_spec.TaskId()).second); auto rpc_client = executing_tasks_.find(task_spec.TaskId()); if (rpc_client == executing_tasks_.end()) { // This case is reached for tasks that have unresolved dependencies. - // we want to remove from resolver's pending task - resolver_.CancelDependencyResolution(task_spec.TaskId()); - task_finisher_->FailPendingTask( - task_spec.TaskId(), rpc::ErrorType::TASK_CANCELLED, nullptr); + // No executing tasks, so cancelling is a noop. if (scheduling_key_entry.CanDelete()) { // We can safely remove the entry keyed by scheduling_key from the // scheduling_key_entries_ hashmap. @@ -751,6 +764,7 @@ Status NormalTaskSubmitter::CancelTask(TaskSpecification task_spec, absl::MutexLock lock(&mu_); RAY_LOG(DEBUG) << "CancelTask RPC response received for " << task_spec.TaskId() << " with status " << status.ToString(); + cancelled_tasks_.erase(task_spec.TaskId()); // Retry is not attempted if !status.ok() because force-kill may kill the worker // before the reply is sent. diff --git a/src/ray/core_worker/transport/normal_task_submitter.h b/src/ray/core_worker/transport/normal_task_submitter.h index 157e776731e9..532e3cb85600 100644 --- a/src/ray/core_worker/transport/normal_task_submitter.h +++ b/src/ray/core_worker/transport/normal_task_submitter.h @@ -360,6 +360,9 @@ class NormalTaskSubmitter { absl::flat_hash_map scheduling_key_entries_ ABSL_GUARDED_BY(mu_); + // Tasks that were cancelled while being resolved. + absl::flat_hash_set cancelled_tasks_ ABSL_GUARDED_BY(mu_); + // Keeps track of where currently executing tasks are being run. absl::flat_hash_map executing_tasks_ ABSL_GUARDED_BY(mu_);