Skip to content

Commit

Permalink
Revert "[core] Fix wrong local resource view in raylet (#19911)" (#19992
Browse files Browse the repository at this point in the history
)

This reverts commit a907168.

## Why are these changes needed?

This PR seems to have some huge perf regression on `placement_group_test_2.py`. It took 128s before, and after this PR was merged, it took 315 seconds. 

## Related issue number
  • Loading branch information
rkooo567 authored Nov 2, 2021
1 parent f8a6cad commit f1eedb1
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 75 deletions.
52 changes: 0 additions & 52 deletions python/ray/tests/test_placement_group_3.py
Original file line number Diff line number Diff line change
Expand Up @@ -646,57 +646,5 @@ def check_bundle_leaks():
wait_for_condition(check_bundle_leaks)


def test_placement_group_local_resource_view(monkeypatch, ray_start_cluster):
"""Please refer to https://github.com/ray-project/ray/pull/19911
for more details.
"""
with monkeypatch.context() as m:
# Increase broadcasting interval so that node resource will arrive
# at raylet after local resource all being allocated.
m.setenv("RAY_raylet_report_resources_period_milliseconds", "2000")
m.setenv("RAY_grpc_based_resource_broadcast", "true")
cluster = ray_start_cluster

cluster.add_node(num_cpus=16, object_store_memory=1e9)
cluster.wait_for_nodes()
cluster.add_node(num_cpus=16, num_gpus=1)
cluster.wait_for_nodes()
NUM_CPU_BUNDLES = 30

@ray.remote(num_cpus=1)
class Worker(object):
def __init__(self, i):
self.i = i

def work(self):
time.sleep(0.1)
print("work ", self.i)

@ray.remote(num_cpus=1, num_gpus=1)
class Trainer(object):
def __init__(self, i):
self.i = i

def train(self):
time.sleep(0.2)
print("train ", self.i)

ray.init(address="auto")
bundles = [{"CPU": 1, "GPU": 1}]
bundles += [{"CPU": 1} for _ in range(NUM_CPU_BUNDLES)]
pg = placement_group(bundles, strategy="PACK")
ray.get(pg.ready())

# Local resource will be allocated and here we are to ensure
# local view is consistent and node resouce updates are discarded
workers = [
Worker.options(placement_group=pg).remote(i)
for i in range(NUM_CPU_BUNDLES)
]
trainer = Trainer.options(placement_group=pg).remote(0)
ray.get([workers[i].work.remote() for i in range(NUM_CPU_BUNDLES)])
ray.get(trainer.train.remote())


if __name__ == "__main__":
sys.exit(pytest.main(["-sv", __file__]))
2 changes: 1 addition & 1 deletion src/ray/gcs/gcs_server/gcs_resource_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ void GcsResourceManager::HandleUpdateResources(
const rpc::UpdateResourcesRequest &request, rpc::UpdateResourcesReply *reply,
rpc::SendReplyCallback send_reply_callback) {
NodeID node_id = NodeID::FromBinary(request.node_id());
RAY_LOG(DEBUG) << "Updating resources, node id = " << node_id;
RAY_LOG(INFO) << "Updating resources, node id = " << node_id;
auto changed_resources = std::make_shared<std::unordered_map<std::string, double>>();
for (const auto &entry : request.resources()) {
changed_resources->emplace(entry.first, entry.second.resource_capacity());
Expand Down
40 changes: 18 additions & 22 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -891,9 +891,6 @@ void NodeManager::ResourceCreateUpdated(const NodeID &node_id,
RAY_LOG(DEBUG) << "[ResourceCreateUpdated] received callback from node id " << node_id
<< " with created or updated resources: "
<< createUpdatedResources.ToString() << ". Updating resource map.";
if (node_id == self_node_id_) {
return;
}

// Update local_available_resources_ and SchedulingResources
for (const auto &resource_pair : createUpdatedResources.GetResourceMap()) {
Expand All @@ -903,7 +900,11 @@ void NodeManager::ResourceCreateUpdated(const NodeID &node_id,
new_resource_capacity);
}
RAY_LOG(DEBUG) << "[ResourceCreateUpdated] Updated cluster_resource_map.";
cluster_task_manager_->ScheduleAndDispatchTasks();

if (node_id == self_node_id_) {
// The resource update is on the local node, check if we can reschedule tasks.
cluster_task_manager_->ScheduleAndDispatchTasks();
}
}

void NodeManager::ResourceDeleted(const NodeID &node_id,
Expand Down Expand Up @@ -1473,44 +1474,39 @@ void NodeManager::HandleUpdateResourceUsage(
rpc::SendReplyCallback send_reply_callback) {
rpc::ResourceUsageBroadcastData resource_usage_batch;
resource_usage_batch.ParseFromString(request.serialized_resource_usage_batch());
// When next_resource_seq_no_ == 0 it means it just started.
// TODO: Fetch a snapshot from gcs for lightweight resource broadcasting
if (next_resource_seq_no_ != 0 &&
resource_usage_batch.seq_no() != next_resource_seq_no_) {
// TODO (Alex): Ideally we would be really robust, and potentially eagerly
// pull a full resource "snapshot" from gcs to make sure our state doesn't
// diverge from GCS.

if (resource_usage_batch.seq_no() != next_resource_seq_no_) {
RAY_LOG(WARNING)
<< "Raylet may have missed a resource broadcast. This either means that GCS has "
"restarted, the network is heavily congested and is dropping, reordering, or "
"duplicating packets. Expected seq#: "
<< next_resource_seq_no_ << ", but got: " << resource_usage_batch.seq_no() << ".";
if (resource_usage_batch.seq_no() < next_resource_seq_no_) {
RAY_LOG(WARNING) << "Discard the the resource update since local version is newer";
return;
}
// TODO (Alex): Ideally we would be really robust, and potentially eagerly
// pull a full resource "snapshot" from gcs to make sure our state doesn't
// diverge from GCS.
}
next_resource_seq_no_ = resource_usage_batch.seq_no() + 1;

for (const auto &resource_change_or_data : resource_usage_batch.batch()) {
if (resource_change_or_data.has_data()) {
const auto &resource_usage = resource_change_or_data.data();
auto node_id = NodeID::FromBinary(resource_usage.node_id());
// Skip messages from self.
if (node_id != self_node_id_) {
UpdateResourceUsage(node_id, resource_usage);
const NodeID &node_id = NodeID::FromBinary(resource_usage.node_id());
if (node_id == self_node_id_) {
// Skip messages from self.
continue;
}
UpdateResourceUsage(node_id, resource_usage);
} else if (resource_change_or_data.has_change()) {
const auto &resource_notification = resource_change_or_data.change();
auto node_id = NodeID::FromBinary(resource_notification.node_id());
auto id = NodeID::FromBinary(resource_notification.node_id());
if (resource_notification.updated_resources_size() != 0) {
ResourceSet resource_set(
MapFromProtobuf(resource_notification.updated_resources()));
ResourceCreateUpdated(node_id, resource_set);
ResourceCreateUpdated(id, resource_set);
}

if (resource_notification.deleted_resources_size() != 0) {
ResourceDeleted(node_id,
ResourceDeleted(id,
VectorFromProtobuf(resource_notification.deleted_resources()));
}
}
Expand Down

0 comments on commit f1eedb1

Please sign in to comment.