Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
0e18ca7
Use pubsub instead of timeout.
elibol May 16, 2018
fc2572c
Correct status message.
elibol May 17, 2018
2e8af60
eric's feedback!
elibol May 17, 2018
b57a548
Changes from Stephanie's review.
elibol May 17, 2018
a128698
object directory changes for ray.wait.
elibol May 18, 2018
fa5c32d
Merge branch 'master' into om_pubsub
elibol May 18, 2018
0ccf46b
Merge branch 'master' into om_pubsub
elibol May 18, 2018
b02de4f
Merge branch 'om_pubsub' into om_wait
elibol May 18, 2018
f9a9e16
wait without testing or timeout=0.
elibol May 18, 2018
15b7f61
Handle remaining cases for wait.
elibol May 18, 2018
a22263b
linting
elibol May 18, 2018
8ab41f0
added tests of om wait imp.
elibol May 18, 2018
98bacfa
add local test.
elibol May 18, 2018
d518a89
Merge branch 'master' into om_wait
elibol May 21, 2018
53f33e0
plasma imp.
elibol May 24, 2018
8ef35f7
block worker as with pull.
elibol May 29, 2018
6e10f9e
local scheduler implementation of wait.
elibol May 30, 2018
9a95c65
with passing tests.
elibol May 30, 2018
aa12bd7
minor adjustments.
elibol May 30, 2018
9e1602d
Merge branch 'master' into om_wait_local_scheduler
elibol May 30, 2018
304b39c
handle return statuses.
elibol May 30, 2018
5d63bb3
enable more tests.
elibol May 30, 2018
cf1fdb2
add test for existing num_returns semantics, and maintain existing nu…
elibol May 31, 2018
531d024
move error handling to both code paths.
elibol May 31, 2018
d0d3ea4
implementing another round of feedback.
elibol May 31, 2018
62ae832
Comment on OM tests.
elibol May 31, 2018
67eef67
remove check for length zero list.
elibol Jun 1, 2018
0796a17
remove elapsed.
elibol Jun 1, 2018
dd9f0db
Preserve input/output order.
elibol Jun 1, 2018
9d4ed2b
debias local objects.
elibol Jun 1, 2018
541b88c
Merge branch 'master' into om_wait_local_scheduler
elibol Jun 1, 2018
58af739
use common helper function in object directory.
elibol Jun 1, 2018
d9ef29b
updated documentation
elibol Jun 1, 2018
fa1928b
linting.
elibol Jun 1, 2018
d41b1d0
handle return status.
elibol Jun 1, 2018
aeaab5b
simplify order preservation test + fix valgrind test error.
elibol Jun 1, 2018
048f45f
update name of final Lookup callback.
elibol Jun 2, 2018
0aa7525
Merge branch 'master' into om_wait_local_scheduler
elibol Jun 2, 2018
833939f
linting
elibol Jun 2, 2018
8e1947c
c++ style casting.
elibol Jun 2, 2018
83d04dd
linting.
elibol Jun 2, 2018
080282f
linting.
elibol Jun 2, 2018
a58f5c9
incorporate second round of feedback.
elibol Jun 5, 2018
c6d8ba5
correct python tests.
elibol Jun 5, 2018
7d8d756
test comments.
elibol Jun 5, 2018
6b6e2f3
incorporate reviews.
elibol Jun 6, 2018
3a86c93
Fixes with regression tests.
elibol Jun 6, 2018
1a99f25
update documentation.
elibol Jun 6, 2018
00eafd7
reference to avoid copy.
elibol Jun 6, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions src/local_scheduler/lib/python/local_scheduler_extension.cc
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,15 @@ static PyObject *PyLocalSchedulerClient_wait(PyObject *self, PyObject *args) {
PyObject *py_object_ids;
int num_returns;
int64_t timeout_ms;
int wait_local;
PyObject *py_wait_local;

if (!PyArg_ParseTuple(args, "Oili", &py_object_ids, &num_returns, &timeout_ms,
&wait_local)) {
if (!PyArg_ParseTuple(args, "OilO", &py_object_ids, &num_returns, &timeout_ms,
&py_wait_local)) {
return NULL;
}

bool wait_local = PyObject_IsTrue(py_wait_local);

// Convert object ids.
PyObject *iter = PyObject_GetIter(py_object_ids);
if (!iter) {
Expand Down
5 changes: 2 additions & 3 deletions src/ray/object_manager/object_directory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,8 @@ ray::Status ObjectDirectory::SubscribeObjectLocations(const UniqueID &callback_i
listener_state.callbacks.emplace(callback_id, callback);
// Immediately notify of found object locations.
if (!listener_state.current_object_locations.empty()) {
std::vector<ClientID> client_id_vec(
listener_state.current_object_locations.begin(),
listener_state.current_object_locations.end());
std::vector<ClientID> client_id_vec(listener_state.current_object_locations.begin(),
listener_state.current_object_locations.end());
callback(client_id_vec, object_id);
}
return status;
Expand Down
10 changes: 5 additions & 5 deletions src/ray/object_manager/object_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ ray::Status ObjectManager::Wait(const std::vector<ObjectID> &object_ids,
wait_state.object_id_order = object_ids;
wait_state.timeout_ms = timeout_ms;
wait_state.num_required_objects = num_required_objects;
for (auto &object_id : object_ids) {
for (const auto &object_id : object_ids) {
if (local_objects_.count(object_id) > 0) {
wait_state.found.insert(object_id);
} else {
Expand All @@ -393,7 +393,7 @@ ray::Status ObjectManager::Wait(const std::vector<ObjectID> &object_ids,
// we obtain information about all given objects, regardless of their location.
// This is required to ensure we do not bias returning locally available objects
// as ready whenever Wait is invoked with a mixture of local and remote objects.
for (auto &object_id : wait_state.remaining) {
for (const auto &object_id : wait_state.remaining) {
// Lookup remaining objects.
wait_state.requested_objects.insert(object_id);
RAY_CHECK_OK(object_directory_->LookupLocations(
Expand Down Expand Up @@ -422,7 +422,7 @@ void ObjectManager::AllWaitLookupsComplete(const UniqueID &wait_id) {
WaitComplete(wait_id);
} else {
// Subscribe to objects in order to ensure Wait-related tests are deterministic.
for (auto &object_id : wait_state.object_id_order) {
for (const auto &object_id : wait_state.object_id_order) {
if (wait_state.remaining.count(object_id) == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bug that I described in the earlier comment is still an issue here. wait_state is a reference to the value at active_wait_requests_. The reference will become invalid if the entry is erased from active_wait_requests_ between iterations of this for loop, so this line can produce undefined behavior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the relevant check that corrects the bug you described earlier: active_wait_requests_.find(wait_id) == active_wait_requests_.end()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I don't think it will break in that particular way anymore, but undefined behavior is still possible because of the reference to wait_state. Same underlying issue, but it will break at a different line.

Copy link
Contributor Author

@elibol elibol Jun 6, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch. I've made the fix and added a regression test.

continue;
}
Expand Down Expand Up @@ -470,7 +470,7 @@ void ObjectManager::WaitComplete(const UniqueID &wait_id) {
RAY_CHECK(wait_state.timeout_ms > 0 || wait_state.timeout_ms == -1);
}
// Unsubscribe to any objects that weren't found in the time allotted.
for (auto &object_id : wait_state.requested_objects) {
for (const auto &object_id : wait_state.requested_objects) {
RAY_CHECK_OK(object_directory_->UnsubscribeObjectLocations(wait_id, object_id));
}
// Cancel the timer. This is okay even if the timer hasn't been started.
Expand All @@ -480,7 +480,7 @@ void ObjectManager::WaitComplete(const UniqueID &wait_id) {
// Order objects according to input order.
std::vector<ObjectID> found;
std::vector<ObjectID> remaining;
for (auto item : wait_state.object_id_order) {
for (const auto item : wait_state.object_id_order) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const auto & to avoid copy.

if (found.size() < wait_state.num_required_objects &&
wait_state.found.count(item) > 0) {
found.push_back(item);
Expand Down
7 changes: 4 additions & 3 deletions src/ray/object_manager/test/object_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -285,12 +285,12 @@ class TestObjectManagerCommands : public TestObjectManager {
ObjectID object_2 = WriteDataToClient(client2, data_size);
UniqueID sub_id = ray::ObjectID::from_random();

get_object_directory(*server1).SubscribeObjectLocations(
RAY_CHECK_OK(get_object_directory(*server1).SubscribeObjectLocations(
sub_id, object_1,
[this, sub_id, object_1, object_2](const std::vector<ray::ClientID> &,
const ray::ObjectID &object_id) {
TestWaitCallbacks2(sub_id, object_1, object_2);
});
}));
}

void TestWaitCallbacks2(UniqueID sub_id, ObjectID object_1, ObjectID object_2) {
Expand All @@ -313,7 +313,8 @@ class TestObjectManagerCommands : public TestObjectManager {
RAY_CHECK(found.size() == 1);
// There's nothing more to test. A check will fail if unexpected behavior is
// triggered.
get_object_directory(*server1).UnsubscribeObjectLocations(sub_id, object_1);
RAY_CHECK_OK(get_object_directory(*server1).UnsubscribeObjectLocations(
sub_id, object_1));
NextWaitTest();
}));
}
Expand Down