Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions python/ray/tests/test_draining.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
NodeAffinitySchedulingStrategy,
PlacementGroupSchedulingStrategy,
)
from ray.util.state import list_tasks


def test_idle_termination(ray_start_cluster):
Expand Down Expand Up @@ -571,6 +572,59 @@ def func(signal, nodes):
ray.get(r1)


def test_leases_rescheduling_during_draining(ray_start_cluster):
"""Test that when a node is being drained, leases inside local lease manager
will be cancelled and re-added to the cluster lease manager for rescheduling
instead of being marked as permanently infeasible.

This is regression test for https://github.com/ray-project/ray/pull/57834/
"""
cluster = ray_start_cluster
cluster.add_node(num_cpus=0)
ray.init(address=cluster.address)

worker1 = cluster.add_node(num_cpus=1)
cluster.wait_for_nodes()

gcs_client = GcsClient(address=ray.get_runtime_context().gcs_address)

@ray.remote(num_cpus=1)
class Actor:
def ping(self):
pass

actor = Actor.remote()
ray.get(actor.ping.remote())

@ray.remote(num_cpus=1)
def get_node_id():
return ray.get_runtime_context().get_node_id()

obj_ref = get_node_id.options(name="f1").remote()

def verify_f1_pending_node_assignment():
tasks = list_tasks(filters=[("name", "=", "f1")])
assert len(tasks) == 1
assert tasks[0]["state"] == "PENDING_NODE_ASSIGNMENT"
return True

# f1 should be in the local lease manager of worker1,
# waiting for resource to be available.
wait_for_condition(verify_f1_pending_node_assignment)

is_accepted, _ = gcs_client.drain_node(
worker1.node_id,
autoscaler_pb2.DrainNodeReason.Value("DRAIN_NODE_REASON_PREEMPTION"),
"preemption",
2**63 - 1,
)
assert is_accepted

# The task should be rescheduled on another node.
worker2 = cluster.add_node(num_cpus=1)
assert ray.get(obj_ref) == worker2.node_id


if __name__ == "__main__":

sys.exit(pytest.main(["-sv", __file__]))
4 changes: 4 additions & 0 deletions src/mock/ray/raylet/local_lease_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ class MockLocalLeaseManager : public LocalLeaseManagerInterface {
rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type,
const std::string &scheduling_failure_message),
(override));
MOCK_METHOD(std::vector<std::shared_ptr<internal::Work>>,
CancelLeasesWithoutReply,
(std::function<bool(const std::shared_ptr<internal::Work> &)> predicate),
(override));
MOCK_METHOD((const absl::flat_hash_map<SchedulingClass,
std::deque<std::shared_ptr<internal::Work>>> &),
GetLeasesToGrant,
Expand Down
98 changes: 53 additions & 45 deletions src/ray/raylet/local_lease_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,18 @@
namespace ray {
namespace raylet {

namespace {
void ReplyCancelled(const std::shared_ptr<internal::Work> &work,
rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type,
const std::string &scheduling_failure_message) {
auto reply = work->reply_;
reply->set_canceled(true);
reply->set_failure_type(failure_type);
reply->set_scheduling_failure_message(scheduling_failure_message);
work->send_reply_callback_(Status::OK(), nullptr, nullptr);
}
} // namespace

Comment on lines +35 to +46
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pure move.

LocalLeaseManager::LocalLeaseManager(
const NodeID &self_node_id,
ClusterResourceScheduler &cluster_resource_scheduler,
Expand Down Expand Up @@ -410,10 +422,10 @@ void LocalLeaseManager::GrantScheduledLeasesToWorkers() {
<< front_lease.DebugString();
auto leases_to_grant_queue_iter = leases_to_grant_queue.begin();
while (leases_to_grant_queue_iter != leases_to_grant_queue.end()) {
CancelLeaseToGrant(
*leases_to_grant_queue_iter,
rpc::RequestWorkerLeaseReply::SCHEDULING_CANCELLED_UNSCHEDULABLE,
"Lease granting failed due to the lease becoming infeasible.");
CancelLeaseToGrantWithoutReply(*leases_to_grant_queue_iter);
ReplyCancelled(*leases_to_grant_queue_iter,
rpc::RequestWorkerLeaseReply::SCHEDULING_CANCELLED_UNSCHEDULABLE,
"Lease granting failed due to the lease becoming infeasible.");
leases_to_grant_queue_iter =
leases_to_grant_queue.erase(leases_to_grant_queue_iter);
}
Expand Down Expand Up @@ -536,7 +548,7 @@ bool LocalLeaseManager::PoppedWorkerHandler(
const rpc::Address &owner_address,
const std::string &runtime_env_setup_error_message) {
const auto &reply = work->reply_;
const auto &callback = work->callback_;
const auto &send_reply_callback = work->send_reply_callback_;
const bool canceled = work->GetState() == internal::WorkStatus::CANCELLED;
const auto &lease = work->lease_;
bool granted = false;
Expand Down Expand Up @@ -650,7 +662,12 @@ bool LocalLeaseManager::PoppedWorkerHandler(
RAY_LOG(DEBUG) << "Granting lease " << lease_id << " to worker "
<< worker->WorkerId();

Grant(worker, leased_workers_, work->allocated_instances_, lease, reply, callback);
Grant(worker,
leased_workers_,
work->allocated_instances_,
lease,
reply,
send_reply_callback);
erase_from_leases_to_grant_queue_fn(work, scheduling_class);
granted = true;
}
Expand All @@ -660,11 +677,11 @@ bool LocalLeaseManager::PoppedWorkerHandler(

void LocalLeaseManager::Spillback(const NodeID &spillback_to,
const std::shared_ptr<internal::Work> &work) {
auto send_reply_callback = work->callback_;
auto send_reply_callback = work->send_reply_callback_;

if (work->grant_or_reject_) {
work->reply_->set_rejected(true);
send_reply_callback();
send_reply_callback(Status::OK(), nullptr, nullptr);
return;
}

Expand All @@ -690,7 +707,7 @@ void LocalLeaseManager::Spillback(const NodeID &spillback_to,
reply->mutable_retry_at_raylet_address()->set_port(node_info_ptr->node_manager_port());
reply->mutable_retry_at_raylet_address()->set_node_id(spillback_to.Binary());

send_reply_callback();
send_reply_callback(Status::OK(), nullptr, nullptr);
}

void LocalLeaseManager::LeasesUnblocked(const std::vector<LeaseID> &ready_ids) {
Expand Down Expand Up @@ -843,61 +860,52 @@ void LocalLeaseManager::ReleaseLeaseArgs(const LeaseID &lease_id) {
}
}

namespace {
void ReplyCancelled(const std::shared_ptr<internal::Work> &work,
rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type,
const std::string &scheduling_failure_message) {
auto reply = work->reply_;
auto callback = work->callback_;
reply->set_canceled(true);
reply->set_failure_type(failure_type);
reply->set_scheduling_failure_message(scheduling_failure_message);
callback();
}
} // namespace

bool LocalLeaseManager::CancelLeases(
std::function<bool(const std::shared_ptr<internal::Work> &)> predicate,
rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type,
const std::string &scheduling_failure_message) {
bool tasks_cancelled = false;
std::vector<std::shared_ptr<internal::Work>> LocalLeaseManager::CancelLeasesWithoutReply(
std::function<bool(const std::shared_ptr<internal::Work> &)> predicate) {
std::vector<std::shared_ptr<internal::Work>> cancelled_works;

ray::erase_if<SchedulingClass, std::shared_ptr<internal::Work>>(
leases_to_grant_, [&](const std::shared_ptr<internal::Work> &work) {
if (!predicate(work)) {
return false;
}
CancelLeaseToGrant(work, failure_type, scheduling_failure_message);
tasks_cancelled = true;
CancelLeaseToGrantWithoutReply(work);
cancelled_works.push_back(work);
return true;
});

ray::erase_if<std::shared_ptr<internal::Work>>(
waiting_lease_queue_, [&](const std::shared_ptr<internal::Work> &work) {
if (predicate(work)) {
ReplyCancelled(work, failure_type, scheduling_failure_message);
if (!work->lease_.GetLeaseSpecification().GetDependencies().empty()) {
lease_dependency_manager_.RemoveLeaseDependencies(
work->lease_.GetLeaseSpecification().LeaseId());
}
waiting_leases_index_.erase(work->lease_.GetLeaseSpecification().LeaseId());
tasks_cancelled = true;
return true;
} else {
if (!predicate(work)) {
return false;
}
if (!work->lease_.GetLeaseSpecification().GetDependencies().empty()) {
lease_dependency_manager_.RemoveLeaseDependencies(
work->lease_.GetLeaseSpecification().LeaseId());
}
waiting_leases_index_.erase(work->lease_.GetLeaseSpecification().LeaseId());
cancelled_works.push_back(work);
return true;
});

return tasks_cancelled;
return cancelled_works;
}

void LocalLeaseManager::CancelLeaseToGrant(
const std::shared_ptr<internal::Work> &work,
bool LocalLeaseManager::CancelLeases(
std::function<bool(const std::shared_ptr<internal::Work> &)> predicate,
rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type,
const std::string &scheduling_failure_message) {
auto cancelled_works = CancelLeasesWithoutReply(predicate);
for (const auto &work : cancelled_works) {
ReplyCancelled(work, failure_type, scheduling_failure_message);
}
return !cancelled_works.empty();
}

void LocalLeaseManager::CancelLeaseToGrantWithoutReply(
const std::shared_ptr<internal::Work> &work) {
const LeaseID lease_id = work->lease_.GetLeaseSpecification().LeaseId();
RAY_LOG(DEBUG) << "Canceling lease " << lease_id << " from leases_to_grant_queue.";
ReplyCancelled(work, failure_type, scheduling_failure_message);
if (work->GetState() == internal::WorkStatus::WAITING_FOR_WORKER) {
// We've already acquired resources so we need to release them.
cluster_resource_scheduler_.GetLocalResourceManager().ReleaseWorkerResources(
Expand Down Expand Up @@ -962,7 +970,7 @@ void LocalLeaseManager::Grant(
const std::shared_ptr<TaskResourceInstances> &allocated_instances,
const RayLease &lease,
rpc::RequestWorkerLeaseReply *reply,
std::function<void(void)> send_reply_callback) {
rpc::SendReplyCallback send_reply_callback) {
const auto &lease_spec = lease.GetLeaseSpecification();

if (lease_spec.IsActorCreationTask()) {
Expand Down Expand Up @@ -1011,7 +1019,7 @@ void LocalLeaseManager::Grant(
}
}
// Send the result back.
send_reply_callback();
send_reply_callback(Status::OK(), nullptr, nullptr);
}

void LocalLeaseManager::ClearWorkerBacklog(const WorkerID &worker_id) {
Expand Down
11 changes: 5 additions & 6 deletions src/ray/raylet/local_lease_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ class LocalLeaseManager : public LocalLeaseManagerInterface {
rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type,
const std::string &scheduling_failure_message) override;

std::vector<std::shared_ptr<internal::Work>> CancelLeasesWithoutReply(
std::function<bool(const std::shared_ptr<internal::Work> &)> predicate) override;

/// Return with an exemplar if any leases are pending resource acquisition.
///
/// \param[in,out] num_pending_actor_creation: Number of pending actor creation leases.
Expand Down Expand Up @@ -202,11 +205,7 @@ class LocalLeaseManager : public LocalLeaseManagerInterface {
const std::string &runtime_env_setup_error_message);

/// Cancels a lease in leases_to_grant_. Does not remove it from leases_to_grant_.
void CancelLeaseToGrant(
const std::shared_ptr<internal::Work> &work,
rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type =
rpc::RequestWorkerLeaseReply::SCHEDULING_CANCELLED_INTENDED,
const std::string &scheduling_failure_message = "");
void CancelLeaseToGrantWithoutReply(const std::shared_ptr<internal::Work> &work);

/// Attempts to grant all leases which are ready to run. A lease
/// will be granted if it is on `leases_to_grant_` and there are still
Expand Down Expand Up @@ -250,7 +249,7 @@ class LocalLeaseManager : public LocalLeaseManagerInterface {
const std::shared_ptr<TaskResourceInstances> &allocated_instances,
const RayLease &lease,
rpc::RequestWorkerLeaseReply *reply,
std::function<void(void)> send_reply_callback);
rpc::SendReplyCallback send_reply_callback);

void Spillback(const NodeID &spillback_to, const std::shared_ptr<internal::Work> &work);

Expand Down
16 changes: 16 additions & 0 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2083,7 +2083,23 @@ void NodeManager::HandleDrainRaylet(rpc::DrainRayletRequest request,
reply->set_is_accepted(true);
}

const bool is_drain_accepted = reply->is_accepted();
send_reply_callback(Status::OK(), nullptr, nullptr);

if (is_drain_accepted) {
// Fail fast on the leases in the local lease manager
// and add them back to the cluster lease manager so a new node
// can be selected by the scheduler.
auto cancelled_works = local_lease_manager_.CancelLeasesWithoutReply(
[&](const std::shared_ptr<internal::Work> &work) { return true; });
for (const auto &work : cancelled_works) {
cluster_lease_manager_.QueueAndScheduleLease(work->lease_,
work->grant_or_reject_,
work->is_selected_based_on_locality_,
work->reply_,
work->send_reply_callback_);
}
}
}

void NodeManager::HandleShutdownRaylet(rpc::ShutdownRayletRequest request,
Expand Down
10 changes: 5 additions & 5 deletions src/ray/raylet/node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,11 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
rpc::CancelWorkerLeaseReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

/// Handle a `DrainRaylet` request.
void HandleDrainRaylet(rpc::DrainRayletRequest request,
rpc::DrainRayletReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

private:
FRIEND_TEST(NodeManagerStaticTest, TestHandleReportWorkerBacklog);

Expand Down Expand Up @@ -589,11 +594,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
rpc::ShutdownRayletReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

/// Handle a `DrainRaylet` request.
void HandleDrainRaylet(rpc::DrainRayletRequest request,
rpc::DrainRayletReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

void HandleIsLocalWorkerDead(rpc::IsLocalWorkerDeadRequest request,
rpc::IsLocalWorkerDeadReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
Expand Down
1 change: 1 addition & 0 deletions src/ray/raylet/scheduling/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ ray_cc_library(
"//src/ray/common:lease",
"//src/ray/common/scheduling:cluster_resource_data",
"//src/ray/protobuf:node_manager_cc_proto",
"//src/ray/rpc:rpc_callback_types",
],
)

Expand Down
Loading