-
Notifications
You must be signed in to change notification settings - Fork 7k
[xray] Implements ray.wait #2162
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test FAILed. |
|
Btw does this also fix #1128 ? |
|
Chatted with @ericl offline: The existing semantics of Here are some options:
@robertnishihara can you provide your thoughts on this? |
|
I think we should keep the existing semantics since changing it now would be a surprise to other Ray users. I'm not sure if "fairness" is necessary for |
|
Yeah, I think the existing semantics make sense, especially since If you want to just get all of the objects that are ready after a certain timeout, then you can specify a timeout along with |
|
|
||
| @unittest.skipIf( | ||
| os.environ.get("RAY_USE_XRAY") == "1", | ||
| "This test does not work with xray yet.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there are other tests that can be added back in, e.g., testMultipleWaitsAndGets and testWait in stress_tests.py.
python/ray/worker.py
Outdated
| if num_returns > len(object_ids): | ||
| raise Exception("num_returns cannot be greater than the number " | ||
| "of objects provided to ray.wait.") | ||
| timeout = timeout if timeout is not None else 2**30 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all the above code should run in both code paths, not just the raylet code path
python/ray/worker.py
Outdated
| object_id_strs = [ | ||
| plasma.ObjectID(object_id.id()) for object_id in object_ids | ||
| ] | ||
| timeout = timeout if timeout is not None else 2**30 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
duplicated
|
Test FAILed. |
|
Test PASSed. |
|
Test PASSed. |
|
Test PASSed. |
|
Test PASSed. |
|
Test PASSed. |
stephanie-wang
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most pressing comment is the one about the issue with the SubscribeObjectLocations callbacks getting invoked in the same call stack. It would be great to add a regression test that triggers the issue I described in the comment. If it's not feasible to write that kind of test right now, we should figure out how to restructure the code to make it feasible.
| } | ||
| } | ||
|
|
||
| if (wait_state.remaining.empty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a comment to this if block detailing the reason for this condition? I had the same question as Robert when reading this code, so it'd be good to explain to the reader why you still have to do lookups for the remaining objects.
| status = gcs_client_->object_table().RequestNotifications( | ||
| JobID::nil(), object_id, gcs_client_->client_table().GetLocalClientId()); | ||
| } | ||
| if (listeners_[object_id].callbacks.count(callback_id) > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we can, let's try to avoid using the bracket accessor wherever we can, and use listeners_.find(object_id) instead.
| status = gcs_client_->object_table().RequestNotifications( | ||
| JobID::nil(), object_id, gcs_client_->client_table().GetLocalClientId()); | ||
| } | ||
| if (listeners_[object_id].callbacks.count(callback_id) > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, this can happen when you call SubscribeObjectLocations twice, for a Pull?
|
|
||
| /// Unfulfilled Push tasks. | ||
| /// The timer is for removing a push task due to unsatisfied local object. | ||
| UniqueID object_directory_pull_callback_id_ = UniqueID::from_random(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Document this field.
| current_wait_test += 1; | ||
| switch (current_wait_test) { | ||
| case 0: { | ||
| TestWait(100, 5, 3, 0, false, false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you document why you chose these values for each of the calls to TestWait? It'd be good if the reader can tell immediately from reading this code what is different about each of these cases.
| @unittest.skipIf( | ||
| os.environ.get("RAY_USE_XRAY") == "1", | ||
| "This test does not work with xray yet.") | ||
| def testWaitIterables(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did this test case get removed?
| void ObjectManager::WaitComplete(const UniqueID &wait_id) { | ||
| auto &wait_state = active_wait_requests_.find(wait_id)->second; | ||
| // If we complete with outstanding requests, then wait_ms should be non-zero. | ||
| RAY_CHECK(!(wait_state.requested_objects.size() > 0) || wait_state.wait_ms > 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit, but this is confusing to read. Can we change it to something like:
if (!wait_state.requested_objects.empty()) {
RAY_CHECK(wait_state.wait_ms > 0);
}
| wait_id, oid, [this, wait_id](const std::vector<ClientID> &client_ids, | ||
| const ObjectID &object_id) { | ||
| auto &wait_state = active_wait_requests_.find(wait_id)->second; | ||
| if (wait_state.remaining.count(object_id) != 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I'm reading the code correctly, this condition should always be true, right? If yes, we should change it to skip the if check and just do a RAY_CHECK that the wait_state.remaining.erase(object_id) succeeds.
| RAY_CHECK_OK(object_directory_->SubscribeObjectLocations( | ||
| wait_id, oid, [this, wait_id](const std::vector<ClientID> &client_ids, | ||
| const ObjectID &object_id) { | ||
| auto &wait_state = active_wait_requests_.find(wait_id)->second; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is an issue with the way this code is currently structured because of the fact that the callback registered in SubscribeObjectLocations may now get called directly. It's possible that right now we will not actually see a bug, but that is only because of the specific ordering of calls made on the object directory by the object manager, and that seems quite brittle. Here is a scenario where I think the code would break:
SubscribeObjectLocationsgets called on objects A and B (e.g., for aPull, or for a differentWait). Locations for both are cached in the object directory.Waitis called on objects A and B, with 1 object required.- In the same call stack,
SubscribeObjectLocationsis called on object A. The cached locations are found, this callback fires, and the wait request completes and is erased fromactive_wait_requests_. - Again, in the same call stack,
SubscribeObjectLocationsis called on object B.active_wait_requests_.find(wait_id)will fail (silently).
| @unittest.skipIf( | ||
| os.environ.get("RAY_USE_XRAY") == "1", | ||
| "This test does not work with xray yet.") | ||
| def testWait(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happened to the other test cases that should now pass? I forget exactly why ones, but I think there was one like testMultipleWaitsAndGets?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've enabled all wait-related tests.
Add test for ObjectManager.Wait during subscribe to a common same object.
|
Test PASSed. |
| int wait_local; | ||
|
|
||
| if (!PyArg_ParseTuple(args, "Oili", &py_object_ids, &num_returns, &timeout_ms, | ||
| &wait_local)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the correct way to do this is to parse it as O and then use PyObject_IsTrue
stephanie-wang
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe undefined behavior is still possible due to the way SubscribeObjectLocations is implemented. Although the scenario might be rare because of the current ordering of calls on the object directory, we should really try to come up with a way to trigger the failure before we merge this PR. Since the bug depends on a particular order of calls and callbacks from the object directory, perhaps there's a way we can trigger the failure by either mocking the object directory and/or calling the methods (e.g., AllWaitLookupsComplete) on the object manager directly?
| for (auto &oid : object_ids) { | ||
| if (local_objects_.count(oid) > 0) { | ||
| wait_state.found.insert(oid); | ||
| for (auto &object_id : object_ids) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const auto wherever you can.
| } else { | ||
| for (auto &oid : wait_state.remaining) { | ||
| // Subscribe to objects in order to ensure Wait-related tests are deterministic. | ||
| for (auto &object_id : wait_state.object_id_order) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const auto wherever you can.
| for (auto &oid : wait_state.remaining) { | ||
| // Subscribe to objects in order to ensure Wait-related tests are deterministic. | ||
| for (auto &object_id : wait_state.object_id_order) { | ||
| if (wait_state.remaining.count(object_id) == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The bug that I described in the earlier comment is still an issue here. wait_state is a reference to the value at active_wait_requests_. The reference will become invalid if the entry is erased from active_wait_requests_ between iterations of this for loop, so this line can produce undefined behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the relevant check that corrects the bug you described earlier: active_wait_requests_.find(wait_id) == active_wait_requests_.end()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I don't think it will break in that particular way anymore, but undefined behavior is still possible because of the reference to wait_state. Same underlying issue, but it will break at a different line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch. I've made the fix and added a regression test.
|
@stephanie-wang Before each |
|
Test PASSed. |
|
Test PASSed. |
stephanie-wang
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, the regression test looks good! Just a few small comments, and then we can merge.
| std::vector<ObjectID> found; | ||
| std::vector<ObjectID> remaining; | ||
| for (auto item : wait_state.object_id_order) { | ||
| for (const auto item : wait_state.object_id_order) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const auto & to avoid copy.
| } | ||
| } | ||
|
|
||
| return ray::Status::OK(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this method always returns OK, I would make it void.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It actually also returns ray::Status::NotImplemented currently if wait_local=true.
| /// \param success_cb Invoked with non-empty list of client ids and object_id. | ||
| /// \return Status of whether subscription succeeded. | ||
| virtual ray::Status SubscribeObjectLocations(const ObjectID &object_id, | ||
| virtual ray::Status SubscribeObjectLocations(const UniqueID &callback_id, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a NOTE to the documentation here that the callback may fire in the invocation of SubscribeObjectLocations? Until we can figure out a better way to do it, it'd be good to warn the user.
|
Test PASSed. |
|
Test PASSed. |
Implements
ray.waitfor xray via the local scheduler. This includes both back-end and front-end changes.