Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions doc/source/ray-contribute/debugging.rst
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Debugging for Ray Developers
============================

This debugging guide is for contributors to the Ray project.
This debugging guide is for contributors to the Ray project.

Starting processes in a debugger
--------------------------------
Expand Down Expand Up @@ -63,7 +63,7 @@ If it worked, you should see as the first line in ``raylet.err``:

.. literalinclude:: /../../src/ray/util/logging.h
:language: C
:lines: 52,54
:lines: 113,120

Backend event stats
-------------------
Expand Down
2 changes: 1 addition & 1 deletion src/mock/ray/rpc/worker/core_worker_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class MockCoreWorkerClientInterface : public ray::pubsub::MockSubscriberClientIn
PushActorTask,
(std::unique_ptr<PushTaskRequest> request,
bool skip_queue,
const ClientCallback<PushTaskReply> &callback),
ClientCallback<PushTaskReply> &&callback),
(override));
MOCK_METHOD(void,
PushNormalTask,
Expand Down
4 changes: 2 additions & 2 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4037,7 +4037,7 @@ void CoreWorker::HandleCancelTask(rpc::CancelTaskRequest request,
RAY_LOG(INFO).WithField(task_id).WithField(current_actor_id)
<< "Cancel an actor task";
CancelActorTaskOnExecutor(
caller_worker_id, task_id, force_kill, recursive, on_cancel_callback);
caller_worker_id, task_id, force_kill, recursive, std::move(on_cancel_callback));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to move since the parameter is const &?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ya, my bad left it in even after changing param

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait actually CancelActorTaskOnExecucutor takes by value, CancelTaskOnExecutor takes by const ref

} else {
RAY_CHECK(current_actor_id.IsNil());
RAY_LOG(INFO).WithField(task_id) << "Cancel a normal task";
Expand All @@ -4048,7 +4048,7 @@ void CoreWorker::HandleCancelTask(rpc::CancelTaskRequest request,
void CoreWorker::CancelTaskOnExecutor(TaskID task_id,
bool force_kill,
bool recursive,
OnCanceledCallback on_canceled) {
const OnCanceledCallback &on_canceled) {
bool requested_task_running;
{
absl::MutexLock lock(&mutex_);
Expand Down
2 changes: 1 addition & 1 deletion src/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -1710,7 +1710,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
void CancelTaskOnExecutor(TaskID intended_task_id,
bool force_kill,
bool recursive,
OnCanceledCallback on_canceled);
const OnCanceledCallback &on_canceled);

/// Cancel an actor task queued or running in the current worker.
///
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ bool GetRequest::Wait(int64_t timeout_ms) {
auto remaining_timeout_ms = timeout_ms;
auto timeout_timestamp = current_time_ms() + timeout_ms;
while (!is_ready_) {
// TODO (dayshah): see if using cv condition function instead of busy while helps.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this still relevant?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ya i think it could still be relevant, pretty sure using cv.wait_for(lock, timeout, []() { return !is_ready; }); could lead to less cpu usage vs the busy while loop but want to look more into how it would affect performance

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, let's do it in another PR, if needed

auto status = cv_.wait_for(lock, std::chrono::milliseconds(remaining_timeout_ms));
auto current_timestamp = current_time_ms();
remaining_timeout_ms =
Expand Down
38 changes: 19 additions & 19 deletions src/ray/core_worker/task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

#include "ray/common/buffer.h"
#include "ray/common/common_protocol.h"
#include "ray/common/constants.h"
#include "ray/core_worker/common.h"
#include "ray/gcs/pb_util.h"
#include "ray/util/exponential_backoff.h"
#include "ray/util/util.h"
Expand All @@ -26,10 +24,10 @@ namespace ray {
namespace core {

// Start throttling task failure logs once we hit this threshold.
const int64_t kTaskFailureThrottlingThreshold = 50;
constexpr int64_t kTaskFailureThrottlingThreshold = 50;

// Throttle task failure logs to once this interval.
const int64_t kTaskFailureLoggingFrequencyMillis = 5000;
constexpr int64_t kTaskFailureLoggingFrequencyMillis = 5000;

absl::flat_hash_set<ObjectID> ObjectRefStream::GetItemsUnconsumed() const {
absl::flat_hash_set<ObjectID> result;
Expand Down Expand Up @@ -237,7 +235,9 @@ std::vector<rpc::ObjectReference> TaskManager::AddPendingTask(
// Add new owned objects for the return values of the task.
size_t num_returns = spec.NumReturns();
std::vector<rpc::ObjectReference> returned_refs;
returned_refs.reserve(num_returns);
std::vector<ObjectID> return_ids;
return_ids.reserve(num_returns);
for (size_t i = 0; i < num_returns; i++) {
auto return_id = spec.ReturnId(i);
if (!spec.IsActorCreationTask()) {
Expand All @@ -252,7 +252,7 @@ std::vector<rpc::ObjectReference> TaskManager::AddPendingTask(
// language frontend. Note that the language bindings should set
// skip_adding_local_ref=True to avoid double referencing the object.
reference_counter_->AddOwnedObject(return_id,
/*inner_ids=*/{},
/*contained_ids=*/{},
caller_address,
call_site,
-1,
Expand Down Expand Up @@ -398,7 +398,7 @@ void TaskManager::DrainAndShutdown(std::function<void()> shutdown) {

bool TaskManager::IsTaskSubmissible(const TaskID &task_id) const {
absl::MutexLock lock(&mu_);
return submissible_tasks_.count(task_id) > 0;
return submissible_tasks_.contains(task_id);
}

bool TaskManager::IsTaskPending(const TaskID &task_id) const {
Expand Down Expand Up @@ -707,7 +707,7 @@ bool TaskManager::HandleReportGeneratorItemReturns(
HandleTaskReturn(object_id,
return_object,
NodeID::FromBinary(request.worker_addr().raylet_id()),
/*store_in_plasma*/ store_in_plasma_ids.count(object_id));
/*store_in_plasma=*/store_in_plasma_ids.contains(object_id));
}

// Handle backpressure if needed.
Expand Down Expand Up @@ -807,7 +807,7 @@ void TaskManager::CompletePendingTask(const TaskID &task_id,
if (HandleTaskReturn(object_id,
return_object,
NodeID::FromBinary(worker_addr.raylet_id()),
store_in_plasma_ids.count(object_id))) {
store_in_plasma_ids.contains(object_id))) {
direct_return_ids.push_back(object_id);
}
}
Expand Down Expand Up @@ -933,7 +933,7 @@ void TaskManager::CompletePendingTask(const TaskID &task_id,
HandleTaskReturn(generator_return_id,
return_object,
NodeID::FromBinary(worker_addr.raylet_id()),
store_in_plasma_ids.count(generator_return_id));
store_in_plasma_ids.contains(generator_return_id));
}
}
}
Expand Down Expand Up @@ -1047,18 +1047,18 @@ void TaskManager::FailPendingTask(const TaskID &task_id,
<< "Tried to fail task that was not pending " << task_id;
spec = it->second.spec;

if (status && status->IsIntentionalSystemExit()) {
if ((status != nullptr) && status->IsIntentionalSystemExit()) {
// We don't mark intentional system exit as failures, such as tasks that
// exit by exit_actor(), exit by ray.shutdown(), etc. These tasks are expected
// to exit and not be marked as failure.
SetTaskStatus(it->second, rpc::TaskStatus::FINISHED);
} else {
SetTaskStatus(
it->second,
rpc::TaskStatus::FAILED,
(ray_error_info == nullptr
? gcs::GetRayErrorInfo(error_type, (status ? status->ToString() : ""))
: *ray_error_info));
SetTaskStatus(it->second,
rpc::TaskStatus::FAILED,
(ray_error_info == nullptr
? gcs::GetRayErrorInfo(
error_type, (status != nullptr ? status->ToString() : ""))
: *ray_error_info));
}
submissible_tasks_.erase(it);
num_pending_tasks_--;
Expand Down Expand Up @@ -1308,15 +1308,15 @@ void TaskManager::MarkTaskReturnObjectsFailed(
int64_t num_returns = spec.NumReturns();
for (int i = 0; i < num_returns; i++) {
const auto object_id = ObjectID::FromIndex(task_id, /*index=*/i + 1);
if (store_in_plasma_ids.count(object_id)) {
if (store_in_plasma_ids.contains(object_id)) {
put_in_local_plasma_callback_(error, object_id);
} else {
in_memory_store_->Put(error, object_id);
}
}
if (spec.ReturnsDynamic()) {
for (const auto &dynamic_return_id : spec.DynamicReturnIds()) {
if (store_in_plasma_ids.count(dynamic_return_id)) {
if (store_in_plasma_ids.contains(dynamic_return_id)) {
put_in_local_plasma_callback_(error, dynamic_return_id);
} else {
in_memory_store_->Put(error, dynamic_return_id);
Expand All @@ -1341,7 +1341,7 @@ void TaskManager::MarkTaskReturnObjectsFailed(
auto num_streaming_generator_returns = spec.NumStreamingGeneratorReturns();
for (size_t i = 0; i < num_streaming_generator_returns; i++) {
const auto generator_return_id = spec.StreamingGeneratorReturnId(i);
if (store_in_plasma_ids.count(generator_return_id)) {
if (store_in_plasma_ids.contains(generator_return_id)) {
put_in_local_plasma_callback_(error, generator_return_id);
} else {
in_memory_store_->Put(error, generator_return_id);
Expand Down
14 changes: 7 additions & 7 deletions src/ray/core_worker/task_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,14 @@ class TaskFinisherInterface {

virtual absl::optional<TaskSpecification> GetTaskSpec(const TaskID &task_id) const = 0;

virtual ~TaskFinisherInterface() {}
virtual ~TaskFinisherInterface() = default;
};

class TaskResubmissionInterface {
public:
virtual bool ResubmitTask(const TaskID &task_id, std::vector<ObjectID> *task_deps) = 0;

virtual ~TaskResubmissionInterface() {}
virtual ~TaskResubmissionInterface() = default;
};

using TaskStatusCounter = CounterMap<std::tuple<std::string, rpc::TaskStatus, bool>>;
Expand Down Expand Up @@ -222,7 +222,7 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa
max_lineage_bytes_(max_lineage_bytes),
task_event_buffer_(task_event_buffer) {
task_counter_.SetOnChangeCallback(
[this](const std::tuple<std::string, rpc::TaskStatus, bool> key)
[this](const std::tuple<std::string, rpc::TaskStatus, bool> &key)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_) {
ray::stats::STATS_tasks.Record(
task_counter_.Get(key),
Expand Down Expand Up @@ -621,13 +621,13 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa
num_retries_left(num_retries_left_arg),
counter(counter),
num_oom_retries_left(num_oom_retries_left) {
reconstructable_return_ids.reserve(num_returns);
for (size_t i = 0; i < num_returns; i++) {
reconstructable_return_ids.insert(spec.ReturnId(i));
}
auto new_status =
status =
std::make_tuple(spec.GetName(), rpc::TaskStatus::PENDING_ARGS_AVAIL, false);
counter.Increment(new_status);
status = new_status;
counter.Increment(status);
}

void SetStatus(rpc::TaskStatus new_status) {
Expand All @@ -640,7 +640,7 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa
// for FINISHED and FAILED tasks.
counter.Increment(new_tuple);
}
status = new_tuple;
status = std::move(new_tuple);
}

void MarkRetry() { is_retry_ = true; }
Expand Down
2 changes: 1 addition & 1 deletion src/ray/core_worker/test/direct_actor_transport_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class MockWorkerClient : public rpc::CoreWorkerClientInterface {

void PushActorTask(std::unique_ptr<rpc::PushTaskRequest> request,
bool skip_queue,
const rpc::ClientCallback<rpc::PushTaskReply> &callback) override {
rpc::ClientCallback<rpc::PushTaskReply> &&callback) override {
received_seq_nos.push_back(request->sequence_number());
callbacks.push_back(callback);
}
Expand Down
3 changes: 2 additions & 1 deletion src/ray/core_worker/test/normal_task_submitter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ class MockTaskFinisher : public TaskFinisherInterface {
const Status *status,
const rpc::RayErrorInfo *ray_error_info = nullptr) override {
num_fail_pending_task_calls++;
num_tasks_failed++;
}

bool FailOrRetryPendingTask(const TaskID &task_id,
Expand Down Expand Up @@ -2093,7 +2094,7 @@ TEST(NormalTaskSubmitterTest, TestKillPendingTask) {
ASSERT_EQ(raylet_client->num_workers_returned, 0);
ASSERT_EQ(raylet_client->num_workers_disconnected, 0);
ASSERT_EQ(task_finisher->num_tasks_complete, 0);
ASSERT_EQ(task_finisher->num_tasks_failed, 0);
ASSERT_EQ(task_finisher->num_tasks_failed, 1);
ASSERT_EQ(task_finisher->num_fail_pending_task_calls, 1);
ASSERT_EQ(raylet_client->num_leases_canceled, 1);
ASSERT_TRUE(raylet_client->ReplyCancelWorkerLease());
Expand Down
19 changes: 7 additions & 12 deletions src/ray/core_worker/transport/actor_task_submitter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,8 @@

#include "ray/core_worker/transport/actor_task_submitter.h"

#include <thread>

#include "ray/common/task/task.h"
#include "ray/gcs/pb_util.h"

using ray::rpc::ActorTableData;
using namespace ray::gcs;

namespace ray {
namespace core {

Expand Down Expand Up @@ -235,7 +229,7 @@ Status ActorTaskSubmitter::SubmitTask(TaskSpecification task_spec) {
absl::MutexLock lock(&mu_);
const auto queue_it = client_queues_.find(task_spec.ActorId());
const auto &death_cause = queue_it->second.death_cause;
error_info = GetErrorInfoFromActorDeathCause(death_cause);
error_info = gcs::GetErrorInfoFromActorDeathCause(death_cause);
error_type = error_info.error_type();
}
auto status = Status::IOError("cancelling task of dead actor");
Expand Down Expand Up @@ -366,7 +360,7 @@ void ActorTaskSubmitter::DisconnectActor(const ActorID &actor_id,
const rpc::ActorDeathCause &death_cause,
bool is_restartable) {
RAY_LOG(DEBUG).WithField(actor_id) << "Disconnecting from actor, death context type="
<< GetActorDeathCauseString(death_cause);
<< gcs::GetActorDeathCauseString(death_cause);

absl::flat_hash_map<TaskID, rpc::ClientCallback<rpc::PushTaskReply>>
inflight_task_callbacks;
Expand Down Expand Up @@ -432,7 +426,7 @@ void ActorTaskSubmitter::DisconnectActor(const ActorID &actor_id,
// Failing tasks has to be done without mu_ hold because the callback
// might require holding mu_ which will lead to a deadlock.
auto status = Status::IOError("cancelling all pending tasks of dead actor");
const auto error_info = GetErrorInfoFromActorDeathCause(death_cause);
const auto error_info = gcs::GetErrorInfoFromActorDeathCause(death_cause);
const auto error_type = error_info.error_type();

for (auto &task_id : task_ids_to_fail) {
Expand Down Expand Up @@ -639,7 +633,8 @@ void ActorTaskSubmitter::PushActorTask(ClientQueue &queue,
task_finisher_.MarkTaskWaitingForExecution(task_id,
NodeID::FromBinary(addr.raylet_id()),
WorkerID::FromBinary(addr.worker_id()));
queue.rpc_client->PushActorTask(std::move(request), skip_queue, wrapped_callback);
queue.rpc_client->PushActorTask(
std::move(request), skip_queue, std::move(wrapped_callback));
}

void ActorTaskSubmitter::HandlePushTaskReply(const Status &status,
Expand Down Expand Up @@ -704,7 +699,7 @@ void ActorTaskSubmitter::HandlePushTaskReply(const Status &status,
is_actor_dead = queue.state == rpc::ActorTableData::DEAD;
if (is_actor_dead) {
const auto &death_cause = queue.death_cause;
error_info = GetErrorInfoFromActorDeathCause(death_cause);
error_info = gcs::GetErrorInfoFromActorDeathCause(death_cause);
fail_immediately = error_info.has_actor_died_error() &&
error_info.actor_died_error().has_oom_context() &&
error_info.actor_died_error().oom_context().fail_immediately();
Expand Down Expand Up @@ -953,7 +948,7 @@ Status ActorTaskSubmitter::CancelTask(TaskSpecification task_spec, bool recursiv
request.set_recursive(recursive);
request.set_caller_worker_id(task_spec.CallerWorkerId().Binary());
client->CancelTask(request,
[this, task_spec, recursive, task_id](
[this, task_spec = std::move(task_spec), recursive, task_id](
const Status &status, const rpc::CancelTaskReply &reply) {
RAY_LOG(DEBUG).WithField(task_spec.TaskId())
<< "CancelTask RPC response received with status "
Expand Down
8 changes: 5 additions & 3 deletions src/ray/core_worker/transport/dependency_resolver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,10 @@ void LocalDependencyResolver::ResolveDependencies(
// This is deleted when the last dependency fetch callback finishes.
auto inserted = pending_tasks_.emplace(
task_id,
std::make_unique<TaskState>(
task, local_dependency_ids, actor_dependency_ids, on_dependencies_resolved));
std::make_unique<TaskState>(task,
local_dependency_ids,
actor_dependency_ids,
std::move(on_dependencies_resolved)));
RAY_CHECK(inserted.second);
}

Expand Down Expand Up @@ -137,7 +139,7 @@ void LocalDependencyResolver::ResolveDependencies(

for (const auto &actor_id : actor_dependency_ids) {
actor_creator_.AsyncWaitForActorRegisterFinish(
actor_id, [this, task_id, on_dependencies_resolved](const Status &status) {
actor_id, [this, task_id](const Status &status) {
std::unique_ptr<TaskState> resolved_task_state = nullptr;

{
Expand Down
Loading