Skip to content

Conversation

@stephanie-wang
Copy link
Contributor

@stephanie-wang stephanie-wang commented Jan 18, 2021

Why are these changes needed?

Previously, we would fetch all requested objects simultaneously, including queued tasks' arguments and ray.get or ray.wait requests from local workers. If the total size was greater than the node's capacity, this could result in starvation.

This adds admission control when choosing which objects to fetch to the local node. This makes a couple changes:

  1. Pull requests are served in FIFO order.
  2. The total size of objects actively fetched is kept under the node's current capacity (defined as the object store's total capacity - size of pinned objects).
  3. We do not start pulling an object until its size is known. This is to prevent flooding the object manager with incoming objects when many requests are made simultaneously for different objects.

The algorithm is implemented by finding the longest contiguous prefix of the current request queue whose total size is known and under the current capacity. Object size is now attached to all object table replies. The total set of objects needed by the chosen requests will be actively pulled or restored. The current capacity is dynamically and asynchronously updated at every ObjectManager tick.

Related issue number

Closes #12663.

Checks

  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

To test this, the Python tests submit tasks with several arguments, and there is only enough memory to run one task at a time. On master, this timed out, but the test would finish in ~5s if the Python code is modified to manually submit and get one task at a time. With this PR, the run time now matches the version with manual admission control.

Copy link
Contributor

@rkooo567 rkooo567 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, ray.get and ray.wait are prioritized over fetching task dependencies. (Lmk if I am wrong?) Should we add test cases for this?



@pytest.mark.timeout(30)
def test_pull_bundles_admission_control(shutdown_only):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we just use ray_start_cluster?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was too lazy to figure out how to parametrize it properly :D Also, I was running into trouble where the non-head node would connect first, so the rest of the test wouldn't run properly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh lol. I think you can just do like

cluster = ray_start_cluster
cluster.add_node()
cluster.wait_for_nodes()
ray.init(address=cluster.address)
cluster.add_node...

// addition or deletion.
bool isUpdated = false;
for (const auto &update : location_updates) {
if (update.size() > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When's the case where update.size is not bigger than 0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's 0 for deletion. We can add a flag instead if there are cases where the object size really is 0.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mind writing a comment here?


// Request the current available memory from the object
// store.
plasma::plasma_store_runner->GetAvailableMemoryAsync([this](size_t available_memory) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like when we collect stats, we use config_.object_store_memory - used_memory_ to calculate available memory (inside object manager). Are we directly querying it from the plasma store because we need to take into account of pinned objects?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I don't think we currently report the available memory metric that I'm using (but maybe we should!).

if (request_it == pull_request_bundles_.begin()) {
highest_req_id_being_pulled_ = 0;
} else {
highest_req_id_being_pulled_ = std::prev(request_it)->first;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: Is the map ordered?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

ASSERT_FALSE(IsUnderCapacity((num_requests_expected + 1) * num_oids_per_request *
object_size));
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also test the case where the whole pull requests are digested and new pull requests are queued to test this part?

if (highest_req_id_being_pulled_ == request_it->first) {
    if (request_it == pull_request_bundles_.begin()) {
      highest_req_id_being_pulled_ = 0;
    } else {
      highest_req_id_being_pulled_ = std::prev(request_it)->first;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I think this will get covered by the cancellation tests, but let me know if there is something else you're looking for.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, if it is convered by that test, it is fine. (It was a bit hard to understand if this was tested).


@pytest.mark.timeout(30)
def test_pull_bundles_admission_control_dynamic(shutdown_only):
# This test is the same as test_pull_bundles_admission_control, except that
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a nice test!

num_bytes_available_ = num_bytes_available;

std::unordered_set<ObjectID> object_ids_to_pull;
// While there is available capacity, activate the next pull request.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we always pull at least the first bundle in the queue? Otherwise I can see the workload stalling when it is possible we can make space via object spilling.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm that's a good point, I'll change that and see if I can write a regression test for it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, it turns out this doesn't work because the object store just keeps evicting other objects needed by the first bundle, so it never triggers the OOM handling. So I'm going to change this to directly trigger OOM instead.

std::unordered_set<ObjectID> object_ids_to_cancel;
// While the total bytes requested is over the available capacity, deactivate
// the last pull request, ordered by request ID.
while (num_bytes_being_pulled_ > num_bytes_available_) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice.

// NOTE(swang): We could also just wait for the next tick to pull the
// objects, but this would add a delay of up to one tick for any bundles
// of multiple objects, even when we are not under memory pressure.
TryToMakeObjectLocal(obj_id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we instead trigger the tick?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, good idea!

Copy link
Contributor

@ericl ericl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good at a high level--- some questions about the corner case handling when under memory pressure.

@ericl ericl added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Jan 18, 2021
@stephanie-wang stephanie-wang removed the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Jan 19, 2021
Copy link
Contributor

@rkooo567 rkooo567 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. A few follow up comments, but they are minor.



@pytest.mark.timeout(30)
def test_pull_bundles_admission_control(shutdown_only):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh lol. I think you can just do like

cluster = ray_start_cluster
cluster.add_node()
cluster.wait_for_nodes()
ray.init(address=cluster.address)
cluster.add_node...

absl::optional<absl::flat_hash_set<NodeID>> GetObjectLocations(
const ObjectID &object_id) LOCKS_EXCLUDED(mutex_);

size_t GetObjectSize(const ObjectID &object_id) const;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you write a comment? (and explain when 0 is returned?)

// addition or deletion.
bool isUpdated = false;
for (const auto &update : location_updates) {
if (update.size() > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mind writing a comment here?

if (!it->second.object_size_set) {
RAY_LOG(DEBUG) << "No size for " << obj_id << ", canceling activation for pull "
<< next_request_it->first;
return false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it something we should improve in the future? (Like re-queue until sizes are known or something)?

@ericl ericl added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Jan 19, 2021
@stephanie-wang
Copy link
Contributor Author

@ericl, @rkooo567, FYI:

I had to change a couple things to get the tests working:

  • removed the tests under "run_object_manager_tests.sh". These tests have been useless for a while, and now we have python and unit tests that cover the code better anyway.
  • disabled //:core_worker_test since it relies on the legacy plasma store and I couldn't figure out how to disable that. This test is also pretty useless since the Python tests are a superset.
  • disabled the new object spilling test added in this PR. It also hangs on master, so I don't think this is a big deal.

It turns out there is a race condition causing the new object spilling test to fail that I think is non-trivial to fix. Here is the issue:

  1. Task 1 requires A, task 2 requires B. We only have room for 1 object.
  2. We pull A and lease the worker for task 1. At this point, we cancel the pull request for A and start the pull request for B.
  3. We pull B, which evicts A. The worker executing task 1 now requests A again. We hang because we're trying to pull B first.

To break the deadlock, we need to prioritize running workers' dependencies over queued task dependencies (as @rkooo567 mentioned above). Unfortunately, this won't fix the problem of evicting A before the worker gets it. To fix that, we'd need wait to to cancel the pull manager request for A until after we were sure the worker got a ref to A, which seems pretty complicated to do right now.

So for now, I can open a second PR to break the deadlock and at least report some metrics on how often such thrashing is happening.

@ericl
Copy link
Contributor

ericl commented Jan 21, 2021 via email

@ericl
Copy link
Contributor

ericl commented Jan 21, 2021 via email

@stephanie-wang
Copy link
Contributor Author

Could we pin the object A in memory until the task 1 has finished execution? When the task 1 finishes it could do a double decrement of the refcount to release the object.

I think that is the right approach long-term, but it's pretty complicated to do right now. The problem is that we're calculating the PullManager's available memory based on total_capacity - pinned_objects_size. If we pin pulled objects, we'll have to make sure to account for that in the available memory. Plus the available memory is reported asynchronously right now, since the object store is running in a different thread.

It seems like we need a larger refactor to really fix this problem properly.

@ericl
Copy link
Contributor

ericl commented Jan 21, 2021 via email

@stephanie-wang
Copy link
Contributor Author

After some offline discussion, the decision is to merge this and fix the deadlock problem by pinning pulled objects until they're no longer needed. We'll account for the pinned memory by asking for the current availability synchronously and subtracting the size of the objects pinned by the PullManager.

@stephanie-wang stephanie-wang merged commit 0998d69 into ray-project:master Jan 22, 2021
@stephanie-wang stephanie-wang deleted the admission-control branch January 22, 2021 00:46
fishbone pushed a commit to fishbone/ray that referenced this pull request Feb 16, 2021
…roject#13514)

* Admission control, TODO: tests, object size

* Unit tests for admission control and some bug fixes

* Add object size to object table, only activate pull if object size is known

* Some fixes, reset timer on eviction

* doc

* update

* Trigger OOM from the pull manager

* don't spam

* doc

* Update src/ray/object_manager/pull_manager.cc

Co-authored-by: Eric Liang <[email protected]>

* Remove useless tests

* Fix test

* osx build

* Skip broken test

* tests

* Skip failing tests

Co-authored-by: Eric Liang <[email protected]>
fishbone added a commit to fishbone/ray that referenced this pull request Feb 16, 2021
@kfstorm kfstorm mentioned this pull request Dec 31, 2021
6 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

@author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Object Spilling] Thrashing when there are large number of dependencies for many tasks

3 participants